[ovs-dev] [RFC PATCH 12/26] mpsc-queue: Module for lock-free message passing

Gaetan Rivet grive at u256.net
Sat Dec 5 14:22:07 UTC 2020


Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
intrusive, unbounded queue that does not require deferred memory
management.

The queue is an implementation of the structure described by Dmitri
Vyukov[1]. It adds a slightly more explicit API explaining the proper use
of the queue.
Alternatives were considered such as a Treiber Stack [2] or a
Michael-Scott queue [3], but this one is faster, simpler and scalable.

[1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue

[2]: R. K. Treiber. Systems programming: Coping with parallelism.
     Technical Report RJ 5118, IBM Almaden Research Center, April 1986.

[3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking
     Concurrent Queue Algorithms.
     https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html

The queue is designed to improve the specific MPSC setup.  A benchmark
accompanies the unit tests to measure the difference in this configuration.
A single reader thread polls the queue while N writers enqueue elements
as fast as possible.  The mpsc-queue is compared against the regular ovs-list
as well as the guarded list.  The latter usually offers a slight improvement
by batching the element removal, however the mpsc-queue is faster.

The average is of each producer threads time:

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
   Benchmarking n=3000000 on 1 + 1 threads.
    type\thread:  Reader      1    Avg
     mpsc-queue:     161    161    161 ms
           list:     803    803    803 ms
   guarded list:     665    665    665 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
   Benchmarking n=3000000 on 1 + 2 threads.
    type\thread:  Reader      1      2    Avg
     mpsc-queue:     102    101     97     99 ms
           list:     246    212    246    229 ms
   guarded list:     264    263    214    238 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
   Benchmarking n=3000000 on 1 + 3 threads.
    type\thread:  Reader      1      2      3    Avg
     mpsc-queue:      92     91     92     91     91 ms
           list:     520    517    515    520    517 ms
   guarded list:     405    395    401    404    400 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
   Benchmarking n=3000000 on 1 + 4 threads.
    type\thread:  Reader      1      2      3      4    Avg
     mpsc-queue:      77     73     73     77     75     74 ms
           list:     371    359    361    287    370    344 ms
   guarded list:     389    388    359    363    357    366 ms

Signed-off-by: Gaetan Rivet <grive at u256.net>
---
 lib/automake.mk         |   2 +
 lib/mpsc-queue.c        | 190 +++++++++++++
 lib/mpsc-queue.h        | 149 +++++++++++
 tests/automake.mk       |   1 +
 tests/library.at        |   5 +
 tests/test-mpsc-queue.c | 580 ++++++++++++++++++++++++++++++++++++++++
 6 files changed, 927 insertions(+)
 create mode 100644 lib/mpsc-queue.c
 create mode 100644 lib/mpsc-queue.h
 create mode 100644 tests/test-mpsc-queue.c

diff --git a/lib/automake.mk b/lib/automake.mk
index 52c99b288..3012d4700 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -167,6 +167,8 @@ lib_libopenvswitch_la_SOURCES = \
 	lib/memory.h \
 	lib/meta-flow.c \
 	lib/mov-avg.h \
+	lib/mpsc-queue.c \
+	lib/mpsc-queue.h \
 	lib/multipath.c \
 	lib/multipath.h \
 	lib/namemap.c \
diff --git a/lib/mpsc-queue.c b/lib/mpsc-queue.c
new file mode 100644
index 000000000..9280d81f6
--- /dev/null
+++ b/lib/mpsc-queue.c
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "ovs-atomic.h"
+
+#include "mpsc-queue.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This an implementation of the MPSC queue described by Dmitri Vyukov [1].
+ *
+ * One atomic exchange operation is done per insertion.  Removal in most cases
+ * will not require atomic operation and will use one atomic exchange to close
+ * the queue chain.
+ *
+ * Insertion
+ * =========
+ *
+ * The queue is implemented using a linked-list.  Insertion is done at the
+ * back of the queue, by swapping the current end with the new node atomically,
+ * then pointing the previous end toward the new node.  To follow Vyukov
+ * nomenclature, the end-node of the chain is called head.  A producer will
+ * only manipulate the head.
+ *
+ * The head swap is atomic, however the link from the previous head to the new
+ * one is done in a separate operation.  This means that the chain is
+ * momentarily broken, when the previous head still points to NULL and the
+ * current head has been inserted.
+ *
+ * Considering a series of insertions, the queue state will remain consistent
+ * and the insertions order is compatible with their precedence, thus the
+ * queue is serializable.  However, because an insertion consists in two
+ * separate memory transactions, it is not linearizable.
+ *
+ * Removal
+ * =======
+ *
+ * The consumer must deal with the queue inconsistency.  It will manipulate
+ * the tail of the queue and move it along the latest consumed elements.
+ * When an end of the chain of elements is found (the next pointer is NULL),
+ * the tail is compared with the head.
+ *
+ * If both points to different addresses, then the queue is in an inconsistent
+ * state: the tail cannot move forward as the next is NULL, but the head is not
+ * the last element in the chain: this can only happen if the chain is broken.
+ *
+ * In this case, the consumer must wait for the producer to finish writing the
+ * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.
+ *
+ * Removal is thus in most cases (when there are elements in the queue)
+ * accomplished without using atomics, until the last element of the queue.
+ * There, the head is atomically loaded. If the queue is in a consistent state,
+ * the head is moved back to the queue stub by inserting the stub in the queue:
+ * ending the queue is the same as an insertion, which is one atomic XCHG.
+ *
+ * Limitations
+ * ===========
+ *
+ * The chain will remain broken as long as a producer is not finished writing
+ * its next pointer.  If a producer is cancelled for example, the queue could
+ * remain broken for any future readings.  This queue should either be used
+ * with cooperative threads or outside any cancellable sections.
+ *
+ * Performances
+ * ============
+ *
+ * In benchmarks this structure was better than alternatives such as:
+ *
+ *   * A reversed Treiber stack [2], using 1 CAS per operations
+ *     and requiring reversal of the node list on removal.
+ *
+ *   * Michael-Scott lock-free queue [3], using 2 CAS per operations.
+ *
+ * While it is not linearizable, this queue is well-suited for message passing.
+ * If a proper hardware XCHG operation is used, it scales better than
+ * CAS-based implementations.
+ *
+ * References
+ * ==========
+ *
+ * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+ *
+ * [2]: R. K. Treiber. Systems programming: Coping with parallelism.
+ *      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
+ *
+ * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and
+ *      Blocking Concurrent Queue Algorithms
+ * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
+ *
+ */
+
+void
+mpsc_queue_init(struct mpsc_queue *queue)
+{
+    atomic_store_relaxed(&queue->head, &queue->stub);
+    atomic_store_relaxed(&queue->tail, &queue->stub);
+    atomic_store_relaxed(&queue->stub.next, NULL);
+
+    ovs_mutex_init(&queue->read_lock);
+}
+
+void
+mpsc_queue_destroy(struct mpsc_queue *queue)
+{
+    ovs_mutex_destroy(&queue->read_lock);
+}
+
+int
+mpsc_queue_acquire(struct mpsc_queue *queue)
+    OVS_TRY_LOCK(1, queue->read_lock)
+{
+    return !ovs_mutex_trylock(&queue->read_lock);
+}
+
+void
+mpsc_queue_release(struct mpsc_queue *queue)
+    OVS_RELEASES(queue->read_lock)
+{
+    ovs_mutex_unlock(&queue->read_lock);
+}
+
+enum mpsc_queue_poll_result
+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)
+    OVS_REQUIRES(queue->read_lock)
+{
+    struct mpsc_queue_node *tail;
+    struct mpsc_queue_node *next;
+    struct mpsc_queue_node *head;
+
+    atomic_read_relaxed(&queue->tail, &tail);
+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+
+    if (tail == &queue->stub) {
+        if (next == NULL) {
+            return MPSC_QUEUE_EMPTY;
+        }
+
+        atomic_store_relaxed(&queue->tail, next);
+        tail = next;
+        atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+    }
+
+    if (next != NULL) {
+        atomic_store_relaxed(&queue->tail, next);
+        *node = tail;
+        return MPSC_QUEUE_ITEM;
+    }
+
+    atomic_read_explicit(&queue->head, &head, memory_order_acquire);
+    if (tail != head) {
+        return MPSC_QUEUE_RETRY;
+    }
+
+    mpsc_queue_insert(queue, &queue->stub);
+
+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+    if (next != NULL) {
+        atomic_store_relaxed(&queue->tail, next);
+        *node = tail;
+        return MPSC_QUEUE_ITEM;
+    }
+
+    return MPSC_QUEUE_EMPTY;
+}
+
+void
+mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+{
+    struct mpsc_queue_node *prev;
+
+    atomic_store_relaxed(&node->next, NULL);
+    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
+    atomic_store_explicit(&prev->next, node, memory_order_release);
+}
diff --git a/lib/mpsc-queue.h b/lib/mpsc-queue.h
new file mode 100644
index 000000000..8ee59409f
--- /dev/null
+++ b/lib/mpsc-queue.h
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MPSC_QUEUE_H
+#define MPSC_QUEUE_H 1
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include <openvswitch/thread.h>
+#include <openvswitch/util.h>
+
+#include "ovs-atomic.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This data structure is a lockless queue implementation with
+ * the following properties:
+ *
+ *  * Multi-producer: multiple threads can write concurrently.
+ *    Insertion in the queue is thread-safe, no inter-thread
+ *    synchronization is necessary.
+ *
+ *  * Single-consumer: only a single thread can safely remove
+ *    nodes from the queue.  The queue must be 'acquired' using
+ *    'mpsc_queue_acquire()' before removing nodes.
+ *
+ *  * Unbounded: the queue is backed by a linked-list and is not
+ *    limited in number of elements.
+ *
+ *  * Intrusive: queue elements are allocated as part of larger
+ *    objects.  Objects are retrieved by offset manipulation.
+ *
+ *  * per-producer FIFO: Elements in the queue are kept in the
+ *    order their producer inserted them.  The consumer retrieves
+ *    them in in the same insertion order.  When multiple
+ *    producers insert at the same time, either will proceed.
+ *
+ * This queue is well-suited for message passing between threads,
+ * where any number of thread can insert a message and a single
+ * thread is meant to receive and process them.
+ *
+ * Thread-safety
+ * =============
+ *
+ *  The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.
+ *  If no error is returned, the thread can call 'mpsc_queue_poll()'.
+ *  When a thread is finished with reading the queue, it can release the
+ *  reader lock using 'mpsc_queue_release()'.
+ *
+ *  Producers can always insert elements in the queue, even if no consumer
+ *  acquired the reader lock.  No inter-producer synchronization (e.g. using a
+ *  lock) is needed.
+ *
+ *  The consumer thread is also allowed to insert elements while it holds the
+ *  reader lock.
+ *
+ *  Producer threads must never be cancelled while writing to the queue.
+ *  This will block the consumer, that will then lose any subsequent writes
+ *  to the queue.  Producers should ideally be cooperatively managed or
+ *  the queue insertion should be within non-cancellable sections.
+ *
+ * Queue state
+ * ===========
+ *
+ *  When polling the queue, three states can be observed: 'empty', 'non-empty',
+ *  and 'inconsistent'.  Three polling results are defined, respectively:
+ *
+ *   * MPSC_QUEUE_EMPTY: the queue is empty.
+ *   * MPSC_QUEUE_ITEM: an item was available and has been removed.
+ *   * MPSC_QUEUE_RETRY: the queue is inconsistent.
+ *
+ *  If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished
+ *  writing to the queue and the list of nodes is not coherent.  The consumer
+ *  can retry shortly to check if the producer has finished.
+ *
+ *  This behavior is the reason the removal function is called
+ *  'mpsc_queue_poll()'.
+ *
+ */
+
+struct mpsc_queue_node {
+    ATOMIC(struct mpsc_queue_node *) next;
+};
+
+struct mpsc_queue {
+    ATOMIC(struct mpsc_queue_node *) head;
+    ATOMIC(struct mpsc_queue_node *) tail;
+    struct mpsc_queue_node stub;
+    struct ovs_mutex read_lock;
+};
+
+#define MPSC_QUEUE_INITIALIZER(Q) { \
+    .head = ATOMIC_VAR_INIT(&(Q)->stub), \
+    .tail = ATOMIC_VAR_INIT(&(Q)->stub), \
+    .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \
+    .read_lock = OVS_MUTEX_INITIALIZER, \
+}
+
+/* Consumer API. */
+
+/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */
+void mpsc_queue_init(struct mpsc_queue *queue);
+/* The reader lock must be released prior to destroying the queue. */
+void mpsc_queue_destroy(struct mpsc_queue *queue);
+/* Acquire the reader lock if 1 is returned. */
+int mpsc_queue_acquire(struct mpsc_queue *queue);
+/* Release the reader lock. */
+void mpsc_queue_release(struct mpsc_queue *queue);
+
+enum mpsc_queue_poll_result {
+    /* Queue is empty. */
+    MPSC_QUEUE_EMPTY,
+    /* Polling the queue returned an item. */
+    MPSC_QUEUE_ITEM,
+    /* Data has been enqueued but one or more producer thread have not
+     * finished writing it. The queue is in an inconsistent state.
+     * Retrying shortly, if the producer threads are still active, will
+     * return the data.
+     */
+    MPSC_QUEUE_RETRY,
+};
+
+/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is
+ * returned, otherwise 'node' is not set.
+ */
+enum mpsc_queue_poll_result
+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node);
+
+/* Producer API. */
+
+void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);
+
+#endif /* MPSC_QUEUE_H */
diff --git a/tests/automake.mk b/tests/automake.mk
index 677b99a6b..d7ae5df90 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -460,6 +460,7 @@ tests_ovstest_SOURCES = \
 	tests/test-list.c \
 	tests/test-lockfile.c \
 	tests/test-multipath.c \
+	tests/test-mpsc-queue.c \
 	tests/test-netflow.c \
 	tests/test-odp.c \
 	tests/test-ofpbuf.c \
diff --git a/tests/library.at b/tests/library.at
index ac4ea4abf..537f0aa4c 100644
--- a/tests/library.at
+++ b/tests/library.at
@@ -253,3 +253,8 @@ AT_SETUP([stopwatch module])
 AT_CHECK([ovstest test-stopwatch], [0], [......
 ], [ignore])
 AT_CLEANUP
+
+AT_SETUP([mpsc-queue module])
+AT_CHECK([ovstest test-mpsc-queue check], [0], [..
+])
+AT_CLEANUP
diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c
new file mode 100644
index 000000000..791715848
--- /dev/null
+++ b/tests/test-mpsc-queue.c
@@ -0,0 +1,580 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <assert.h>
+#include <getopt.h>
+#include <string.h>
+
+#include <config.h>
+
+#include "command-line.h"
+#include "guarded-list.h"
+#include "mpsc-queue.h"
+#include "openvswitch/list.h"
+#include "openvswitch/util.h"
+#include "ovs-thread.h"
+#include "ovstest.h"
+#include "timeval.h"
+#include "util.h"
+
+struct element {
+    union {
+        struct mpsc_queue_node mpscq;
+        struct ovs_list list;
+    } node;
+    uint64_t mark;
+};
+
+static void
+test_mpsc_queue_mark_element(struct mpsc_queue_node *node,
+                             uint64_t mark,
+                             unsigned int *counter)
+{
+    struct element *elem;
+
+    elem = CONTAINER_OF(node, struct element, node.mpscq);
+    elem->mark = mark;
+    *counter += 1;
+}
+
+static void
+test_mpsc_queue_insert(void)
+{
+    struct element elements[100];
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    unsigned int counter;
+    size_t i;
+
+    memset(elements, 0, sizeof(elements));
+    mpsc_queue_init(&queue);
+    ignore(mpsc_queue_acquire(&queue));
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    counter = 0;
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, 1, &counter);
+    }
+
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+
+    ovs_assert(counter == ARRAY_SIZE(elements));
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        ovs_assert(elements[i].mark == 1);
+    }
+
+    printf(".");
+}
+
+static void
+test_mpsc_queue_flush_is_fifo(void)
+{
+    struct element elements[100];
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    unsigned int counter;
+    size_t i;
+
+    memset(elements, 0, sizeof(elements));
+
+    mpsc_queue_init(&queue);
+    ignore(mpsc_queue_acquire(&queue));
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    /* Elements are in the same order in the list as they
+     * were declared / initialized.
+     */
+    counter = 0;
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, counter, &counter);
+    }
+
+    /* The list is valid once extracted from the queue,
+     * the queue can be destroyed here.
+     */
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+
+    for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) {
+        struct element *e1, *e2;
+
+        e1 = &elements[i];
+        e2 = &elements[i + 1];
+
+        ovs_assert(e1->mark < e2->mark);
+    }
+
+    printf(".");
+}
+
+static void
+run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    /* Verify basic insertion worked. */
+    test_mpsc_queue_insert();
+    /* Verify flush() happens in FIFO if configured. */
+    test_mpsc_queue_flush_is_fifo();
+    printf("\n");
+}
+
+static struct element *elements;
+static uint64_t *thread_working_ms; /* Measured work time. */
+
+static unsigned int n_threads;
+static unsigned int n_elems;
+
+static struct ovs_barrier barrier;
+static volatile bool working;
+
+static int
+elapsed(const struct timeval *start)
+{
+    struct timeval end;
+
+    xgettimeofday(&end);
+    return timeval_to_msec(&end) - timeval_to_msec(start);
+}
+
+struct mpscq_aux {
+    struct mpsc_queue *queue;
+    atomic_uint thread_id;
+};
+
+static void *
+mpsc_queue_insert_thread(void *aux_)
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct mpscq_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_mpsc_queue(void)
+{
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    struct mpscq_aux aux;
+    uint64_t epoch;
+    uint64_t avg;
+    size_t i;
+
+    memset(elements, 0, n_elems & sizeof *elements);
+    memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms);
+
+    mpsc_queue_init(&queue);
+
+    aux.queue = &queue;
+    atomic_store(&aux.thread_id, 0);
+
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("sc_queue_insert",
+                                       mpsc_queue_insert_thread, &aux);
+    }
+
+    ignore(mpsc_queue_acquire(&queue));
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+            test_mpsc_queue_mark_element(node, epoch, &counter);
+        }
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    avg = 0;
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+        avg += thread_working_ms[i];
+    }
+    avg /= n_threads;
+
+    /* Elements might have been inserted before threads were joined. */
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, epoch, &counter);
+    }
+
+    printf("  mpsc-queue:  %6d", elapsed(&start));
+    for (i = 0; i < n_threads; i++) {
+        printf(" %6" PRIu64, thread_working_ms[i]);
+    }
+    printf(" %6" PRIu64 " ms\n", avg);
+
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+struct list_aux {
+    struct ovs_list *list;
+    struct ovs_mutex *lock;
+    atomic_uint thread_id;
+};
+
+static void *
+locked_list_insert_thread(void *aux_)
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct list_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        ovs_mutex_lock(aux->lock);
+        ovs_list_push_front(aux->list, &th_elements[i].node.list);
+        ovs_mutex_unlock(aux->lock);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_list(void)
+{
+    struct ovs_mutex lock;
+    struct ovs_list list;
+    struct element *elem;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    struct list_aux aux;
+    uint64_t epoch;
+    uint64_t avg;
+    size_t i;
+
+    memset(elements, 0, n_elems * sizeof *elements);
+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+    ovs_mutex_init(&lock);
+    ovs_list_init(&list);
+
+    aux.list = &list;
+    aux.lock = &lock;
+    atomic_store(&aux.thread_id, 0);
+
+    ovs_mutex_lock(&lock);
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        ovs_list_push_front(&list, &elements[i].node.list);
+    }
+    ovs_mutex_unlock(&lock);
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("locked_list_insert",
+                                       locked_list_insert_thread, &aux);
+    }
+
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        ovs_mutex_lock(&lock);
+        LIST_FOR_EACH_POP (elem, node.list, &list) {
+            elem->mark = epoch;
+            counter++;
+        }
+        ovs_mutex_unlock(&lock);
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    avg = 0;
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+        avg += thread_working_ms[i];
+    }
+    avg /= n_threads;
+
+    /* Elements might have been inserted before threads were joined. */
+    ovs_mutex_lock(&lock);
+    LIST_FOR_EACH_POP (elem, node.list, &list) {
+        elem->mark = epoch;
+        counter++;
+    }
+    ovs_mutex_unlock(&lock);
+
+    printf("        list:  %6d", elapsed(&start));
+    for (i = 0; i < n_threads; i++) {
+        printf(" %6" PRIu64, thread_working_ms[i]);
+    }
+    printf(" %6" PRIu64 " ms\n", avg);
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+struct guarded_list_aux {
+    struct guarded_list *glist;
+    atomic_uint thread_id;
+};
+
+static void *
+guarded_list_insert_thread(void *aux_)
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct guarded_list_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_guarded_list(void)
+{
+    struct guarded_list_aux aux;
+    struct ovs_list extracted;
+    struct guarded_list glist;
+    struct element *elem;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    uint64_t epoch;
+    uint64_t avg;
+    size_t i;
+
+    memset(elements, 0, n_elems * sizeof *elements);
+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+    guarded_list_init(&glist);
+    ovs_list_init(&extracted);
+
+    aux.glist = &glist;
+    atomic_store(&aux.thread_id, 0);
+
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
+    }
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("guarded_list_insert",
+                                       guarded_list_insert_thread, &aux);
+    }
+
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        guarded_list_pop_all(&glist, &extracted);
+        LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+            elem->mark = epoch;
+            counter++;
+        }
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    avg = 0;
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+        avg += thread_working_ms[i];
+    }
+    avg /= n_threads;
+
+    /* Elements might have been inserted before threads were joined. */
+    guarded_list_pop_all(&glist, &extracted);
+    LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+        elem->mark = epoch;
+        counter++;
+    }
+
+    printf("guarded list:  %6d", elapsed(&start));
+    for (i = 0; i < n_threads; i++) {
+        printf(" %6" PRIu64, thread_working_ms[i]);
+    }
+    printf(" %6" PRIu64 " ms\n", avg);
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+    guarded_list_destroy(&glist);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+static void
+run_benchmarks(struct ovs_cmdl_context *ctx)
+{
+    long int l_threads;
+    long int l_elems;
+    size_t i;
+
+    l_elems = strtol(ctx->argv[1], NULL, 10);
+    l_threads = strtol(ctx->argv[2], NULL, 10);
+    ovs_assert(l_elems > 0 && l_threads > 0);
+
+    n_elems = l_elems;
+    n_threads = l_threads;
+
+    elements = xcalloc(n_elems, sizeof *elements);
+    thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms);
+
+    printf("Benchmarking n=%u on 1 + %u threads.\n", n_elems, n_threads);
+
+    printf(" type\\thread:  Reader ");
+    for (i = 0; i < n_threads; i++) {
+        printf("   %3" PRIuSIZE " ", i + 1);
+    }
+    printf("   Avg\n");
+
+    benchmark_mpsc_queue();
+    benchmark_list();
+    benchmark_guarded_list();
+
+    free(thread_working_ms);
+    free(elements);
+}
+
+static const struct ovs_cmdl_command commands[] = {
+    {"check", NULL, 0, 0, run_tests, OVS_RO},
+    {"benchmark", "<nb elem> <nb threads>", 2, 2, run_benchmarks, OVS_RO},
+    {NULL, NULL, 0, 0, NULL, OVS_RO},
+};
+
+static void
+test_mpsc_queue_main(int argc, char *argv[])
+{
+    struct ovs_cmdl_context ctx = {
+        .argc = argc - optind,
+        .argv = argv + optind,
+    };
+
+    set_program_name(argv[0]);
+    ovs_cmdl_run_command(&ctx, commands);
+}
+
+OVSTEST_REGISTER("test-mpsc-queue", test_mpsc_queue_main);
-- 
2.29.2



More information about the dev mailing list