[ovs-dev] [PATCH v4 12/27] mpsc-queue: Module for lock-free message passing

Gaetan Rivet grive at u256.net
Wed Jun 9 13:09:20 UTC 2021


Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
intrusive, unbounded queue that does not require deferred memory
management.

The queue is designed to improve the specific MPSC setup.  A benchmark
accompanies the unit tests to measure the difference in this configuration.
A single reader thread polls the queue while N writers enqueue elements
as fast as possible.  The mpsc-queue is compared against the regular ovs-list
as well as the guarded list.  The latter usually offers a slight improvement
by batching the element removal, however the mpsc-queue is faster.

The average is of each producer threads time:

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
   Benchmarking n=3000000 on 1 + 1 threads.
    type\thread:  Reader      1    Avg
     mpsc-queue:     167    167    167 ms
     list(spin):      89     80     80 ms
    list(mutex):     745    745    745 ms
   guarded list:     788    788    788 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
   Benchmarking n=3000000 on 1 + 2 threads.
    type\thread:  Reader      1      2    Avg
     mpsc-queue:      98     97     94     95 ms
     list(spin):     185    171    173    172 ms
    list(mutex):     203    199    203    201 ms
   guarded list:     269    269    188    228 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
   Benchmarking n=3000000 on 1 + 3 threads.
    type\thread:  Reader      1      2      3    Avg
     mpsc-queue:      76     76     65     76     72 ms
     list(spin):     246    110    240    238    196 ms
    list(mutex):     542    541    541    539    540 ms
   guarded list:     535    535    507    511    517 ms

   $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
   Benchmarking n=3000000 on 1 + 4 threads.
    type\thread:  Reader      1      2      3      4    Avg
     mpsc-queue:      73     68     68     68     68     68 ms
     list(spin):     294    275    279    277    282    278 ms
    list(mutex):     346    309    287    345    302    310 ms
   guarded list:     378    319    334    378    351    345 ms

Signed-off-by: Gaetan Rivet <grive at u256.net>
Reviewed-by: Eli Britstein <elibr at nvidia.com>
---
 lib/automake.mk         |   2 +
 lib/mpsc-queue.c        | 251 +++++++++++++
 lib/mpsc-queue.h        | 190 ++++++++++
 tests/automake.mk       |   1 +
 tests/library.at        |   5 +
 tests/test-mpsc-queue.c | 772 ++++++++++++++++++++++++++++++++++++++++
 6 files changed, 1221 insertions(+)
 create mode 100644 lib/mpsc-queue.c
 create mode 100644 lib/mpsc-queue.h
 create mode 100644 tests/test-mpsc-queue.c

diff --git a/lib/automake.mk b/lib/automake.mk
index 111179736..b45801852 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -167,6 +167,8 @@ lib_libopenvswitch_la_SOURCES = \
 	lib/memory.h \
 	lib/meta-flow.c \
 	lib/mov-avg.h \
+	lib/mpsc-queue.c \
+	lib/mpsc-queue.h \
 	lib/multipath.c \
 	lib/multipath.h \
 	lib/namemap.c \
diff --git a/lib/mpsc-queue.c b/lib/mpsc-queue.c
new file mode 100644
index 000000000..ee762e1dc
--- /dev/null
+++ b/lib/mpsc-queue.c
@@ -0,0 +1,251 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "ovs-atomic.h"
+
+#include "mpsc-queue.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This an implementation of the MPSC queue described by Dmitri Vyukov [1].
+ *
+ * One atomic exchange operation is done per insertion.  Removal in most cases
+ * will not require atomic operation and will use one atomic exchange to close
+ * the queue chain.
+ *
+ * Insertion
+ * =========
+ *
+ * The queue is implemented using a linked-list.  Insertion is done at the
+ * back of the queue, by swapping the current end with the new node atomically,
+ * then pointing the previous end toward the new node.  To follow Vyukov
+ * nomenclature, the end-node of the chain is called head.  A producer will
+ * only manipulate the head.
+ *
+ * The head swap is atomic, however the link from the previous head to the new
+ * one is done in a separate operation.  This means that the chain is
+ * momentarily broken, when the previous head still points to NULL and the
+ * current head has been inserted.
+ *
+ * Considering a series of insertions, the queue state will remain consistent
+ * and the insertions order is compatible with their precedence, thus the
+ * queue is serializable.  However, because an insertion consists in two
+ * separate memory transactions, it is not linearizable.
+ *
+ * Removal
+ * =======
+ *
+ * The consumer must deal with the queue inconsistency.  It will manipulate
+ * the tail of the queue and move it along the latest consumed elements.
+ * When an end of the chain of elements is found (the next pointer is NULL),
+ * the tail is compared with the head.
+ *
+ * If both points to different addresses, then the queue is in an inconsistent
+ * state: the tail cannot move forward as the next is NULL, but the head is not
+ * the last element in the chain: this can only happen if the chain is broken.
+ *
+ * In this case, the consumer must wait for the producer to finish writing the
+ * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned.
+ *
+ * Removal is thus in most cases (when there are elements in the queue)
+ * accomplished without using atomics, until the last element of the queue.
+ * There, the head is atomically loaded. If the queue is in a consistent state,
+ * the head is moved back to the queue stub by inserting the stub in the queue:
+ * ending the queue is the same as an insertion, which is one atomic XCHG.
+ *
+ * Forward guarantees
+ * ==================
+ *
+ * Insertion and peeking are wait-free: they will execute in a known bounded
+ * number of instructions, regardless of the state of the queue.
+ *
+ * However, while removal consists in peeking and a constant write to
+ * update the tail, it can repeatedly fail until the queue become consistent.
+ * It is thus dependent on other threads progressing.  This means that the
+ * queue forward progress is obstruction-free only.  It has a potential for
+ * livelocking.
+ *
+ * The chain will remain broken as long as a producer is not finished writing
+ * its next pointer.  If a producer is cancelled for example, the queue could
+ * remain broken for any future readings.  This queue should either be used
+ * with cooperative threads or insertion must only be done outside cancellable
+ * sections.
+ *
+ * Performances
+ * ============
+ *
+ * In benchmarks this structure was better than alternatives such as:
+ *
+ *   * A reversed Treiber stack [2], using 1 CAS per operations
+ *     and requiring reversal of the node list on removal.
+ *
+ *   * Michael-Scott lock-free queue [3], using 2 CAS per operations.
+ *
+ * While it is not linearizable, this queue is well-suited for message passing.
+ * If a proper hardware XCHG operation is used, it scales better than
+ * CAS-based implementations.
+ *
+ * References
+ * ==========
+ *
+ * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
+ *
+ * [2]: R. K. Treiber. Systems programming: Coping with parallelism.
+ *      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
+ *
+ * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and
+ *      Blocking Concurrent Queue Algorithms
+ * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
+ *
+ */
+
+void
+mpsc_queue_init(struct mpsc_queue *queue)
+{
+    atomic_store_relaxed(&queue->head, &queue->stub);
+    atomic_store_relaxed(&queue->tail, &queue->stub);
+    atomic_store_relaxed(&queue->stub.next, NULL);
+
+    ovs_mutex_init(&queue->read_lock);
+}
+
+void
+mpsc_queue_destroy(struct mpsc_queue *queue)
+    OVS_EXCLUDED(queue->read_lock)
+{
+    ovs_mutex_destroy(&queue->read_lock);
+}
+
+enum mpsc_queue_poll_result
+mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node)
+    OVS_REQUIRES(queue->read_lock)
+{
+    struct mpsc_queue_node *tail;
+    struct mpsc_queue_node *next;
+    struct mpsc_queue_node *head;
+
+    atomic_read_relaxed(&queue->tail, &tail);
+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+
+    if (tail == &queue->stub) {
+        if (next == NULL) {
+            return MPSC_QUEUE_EMPTY;
+        }
+
+        atomic_store_relaxed(&queue->tail, next);
+        tail = next;
+        atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+    }
+
+    if (next != NULL) {
+        atomic_store_relaxed(&queue->tail, next);
+        *node = tail;
+        return MPSC_QUEUE_ITEM;
+    }
+
+    atomic_read_explicit(&queue->head, &head, memory_order_acquire);
+    if (tail != head) {
+        return MPSC_QUEUE_RETRY;
+    }
+
+    mpsc_queue_insert(queue, &queue->stub);
+
+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+    if (next != NULL) {
+        atomic_store_relaxed(&queue->tail, next);
+        *node = tail;
+        return MPSC_QUEUE_ITEM;
+    }
+
+    return MPSC_QUEUE_EMPTY;
+}
+
+struct mpsc_queue_node *
+mpsc_queue_pop(struct mpsc_queue *queue)
+    OVS_REQUIRES(queue->read_lock)
+{
+    enum mpsc_queue_poll_result result;
+    struct mpsc_queue_node *node;
+
+    do {
+        result = mpsc_queue_poll(queue, &node);
+        if (result == MPSC_QUEUE_EMPTY) {
+            return NULL;
+        }
+    } while (result == MPSC_QUEUE_RETRY);
+
+    return node;
+}
+
+void
+mpsc_queue_push_front(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+    OVS_REQUIRES(queue->read_lock)
+{
+    struct mpsc_queue_node *tail;
+
+    atomic_read_relaxed(&queue->tail, &tail);
+    atomic_store_relaxed(&node->next, tail);
+    atomic_store_relaxed(&queue->tail, node);
+}
+
+struct mpsc_queue_node *
+mpsc_queue_tail(struct mpsc_queue *queue)
+    OVS_REQUIRES(queue->read_lock)
+{
+    struct mpsc_queue_node *tail;
+    struct mpsc_queue_node *next;
+
+    atomic_read_relaxed(&queue->tail, &tail);
+    atomic_read_explicit(&tail->next, &next, memory_order_acquire);
+
+    if (tail == &queue->stub) {
+        if (next == NULL) {
+            return NULL;
+        }
+
+        atomic_store_relaxed(&queue->tail, next);
+        tail = next;
+    }
+
+    return tail;
+}
+
+/* Get the next element of a node. */
+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
+                                        struct mpsc_queue_node *prev)
+    OVS_REQUIRES(queue->read_lock)
+{
+    struct mpsc_queue_node *next;
+
+    atomic_read_explicit(&prev->next, &next, memory_order_acquire);
+    if (next == &queue->stub) {
+        atomic_read_explicit(&next->next, &next, memory_order_acquire);
+    }
+    return next;
+}
+
+void
+mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+{
+    struct mpsc_queue_node *prev;
+
+    atomic_store_relaxed(&node->next, NULL);
+    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
+    atomic_store_explicit(&prev->next, node, memory_order_release);
+}
diff --git a/lib/mpsc-queue.h b/lib/mpsc-queue.h
new file mode 100644
index 000000000..3bb9e3bee
--- /dev/null
+++ b/lib/mpsc-queue.h
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef MPSC_QUEUE_H
+#define MPSC_QUEUE_H 1
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stddef.h>
+
+#include <openvswitch/thread.h>
+#include <openvswitch/util.h>
+
+#include "ovs-atomic.h"
+
+/* Multi-producer, single-consumer queue
+ * =====================================
+ *
+ * This data structure is a lockless queue implementation with
+ * the following properties:
+ *
+ *  * Multi-producer: multiple threads can write concurrently.
+ *    Insertion in the queue is thread-safe, no inter-thread
+ *    synchronization is necessary.
+ *
+ *  * Single-consumer: only a single thread can safely remove
+ *    nodes from the queue.  The queue must be 'acquired' using
+ *    'mpsc_queue_acquire()' before removing nodes.
+ *
+ *  * Unbounded: the queue is backed by a linked-list and is not
+ *    limited in number of elements.
+ *
+ *  * Intrusive: queue elements are allocated as part of larger
+ *    objects.  Objects are retrieved by offset manipulation.
+ *
+ *  * per-producer FIFO: Elements in the queue are kept in the
+ *    order their producer inserted them.  The consumer retrieves
+ *    them in in the same insertion order.  When multiple
+ *    producers insert at the same time, either will proceed.
+ *
+ * This queue is well-suited for message passing between threads,
+ * where any number of thread can insert a message and a single
+ * thread is meant to receive and process it.
+ *
+ * Thread-safety
+ * =============
+ *
+ *  The consumer thread must acquire the queue using 'mpsc_queue_acquire()'.
+ *  Once the queue is protected against concurrent reads, the thread can call
+ *  the consumer API:
+ *
+ *      * mpsc_queue_poll() to peek and return the tail of the queue
+ *      * mpsc_queue_pop() to remove the tail of the queue
+ *      * mpsc_queue_tail() to read the current tail
+ *      * mpsc_queue_push_front() to enqueue an element safely at the tail
+ *      * MPSC_QUEUE_FOR_EACH() to iterate over the current elements,
+ *        without removing them.
+ *      * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while
+ *        removing them.
+ *
+ *  When a thread is finished with reading the queue, it can release the
+ *  reader lock using 'mpsc_queue_release()'.
+ *
+ *  Producers can always insert elements in the queue, even if no consumer
+ *  acquired the reader lock.  No inter-producer synchronization is needed.
+ *
+ *  The consumer thread is also allowed to insert elements while it holds the
+ *  reader lock.
+ *
+ *  Producer threads must never be cancelled while writing to the queue.
+ *  This will block the consumer, that will then lose any subsequent elements
+ *  in the queue.  Producers should ideally be cooperatively managed or
+ *  the queue insertion should be within non-cancellable sections.
+ *
+ * Queue state
+ * ===========
+ *
+ *  When polling the queue, three states can be observed: 'empty', 'non-empty',
+ *  and 'inconsistent'.  Three polling results are defined, respectively:
+ *
+ *   * MPSC_QUEUE_EMPTY: the queue is empty.
+ *   * MPSC_QUEUE_ITEM: an item was available and has been removed.
+ *   * MPSC_QUEUE_RETRY: the queue is inconsistent.
+ *
+ *  If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished
+ *  writing to the queue and the list of nodes is not coherent.  The consumer
+ *  can retry shortly to check if the producer has finished.
+ *
+ *  This behavior is the reason the removal function is called
+ *  'mpsc_queue_poll()'.
+ *
+ */
+
+struct mpsc_queue_node {
+    ATOMIC(struct mpsc_queue_node *) next;
+};
+
+struct mpsc_queue {
+    ATOMIC(struct mpsc_queue_node *) head;
+    ATOMIC(struct mpsc_queue_node *) tail;
+    struct mpsc_queue_node stub;
+    struct ovs_mutex read_lock;
+};
+
+#define MPSC_QUEUE_INITIALIZER(Q) { \
+    .head = ATOMIC_VAR_INIT(&(Q)->stub), \
+    .tail = ATOMIC_VAR_INIT(&(Q)->stub), \
+    .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \
+    .read_lock = OVS_MUTEX_INITIALIZER, \
+}
+
+/* Consumer API. */
+
+/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */
+void mpsc_queue_init(struct mpsc_queue *queue);
+/* The reader lock must be released prior to destroying the queue. */
+void mpsc_queue_destroy(struct mpsc_queue *queue);
+
+/* Acquire and release the consumer lock. */
+#define mpsc_queue_acquire(q) do { \
+        ovs_mutex_lock(&(q)->read_lock); \
+    } while (0)
+#define mpsc_queue_release(q) do { \
+        ovs_mutex_unlock(&(q)->read_lock); \
+    } while (0)
+
+enum mpsc_queue_poll_result {
+    /* Queue is empty. */
+    MPSC_QUEUE_EMPTY,
+    /* Polling the queue returned an item. */
+    MPSC_QUEUE_ITEM,
+    /* Data has been enqueued but one or more producer thread have not
+     * finished writing it. The queue is in an inconsistent state.
+     * Retrying shortly, if the producer threads are still active, will
+     * return the data.
+     */
+    MPSC_QUEUE_RETRY,
+};
+
+/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is
+ * returned, otherwise 'node' is not set.
+ */
+enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue,
+                                            struct mpsc_queue_node **node)
+    OVS_REQUIRES(queue->read_lock);
+
+/* Pop an element if there is any in the queue. */
+struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue)
+    OVS_REQUIRES(queue->read_lock);
+
+/* Insert at the front of the queue. Only the consumer can do it. */
+void mpsc_queue_push_front(struct mpsc_queue *queue,
+                           struct mpsc_queue_node *node)
+    OVS_REQUIRES(queue->read_lock);
+
+/* Get the current queue tail. */
+struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue)
+    OVS_REQUIRES(queue->read_lock);
+
+/* Get the next element of a node. */
+struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue,
+                                        struct mpsc_queue_node *prev)
+    OVS_REQUIRES(queue->read_lock);
+
+#define MPSC_QUEUE_FOR_EACH(node, queue) \
+    for (node = mpsc_queue_tail(queue); node != NULL; \
+         node = mpsc_queue_next((queue), node))
+
+#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \
+    for (node = mpsc_queue_pop(queue); node != NULL; \
+         node = mpsc_queue_pop(queue))
+
+/* Producer API. */
+
+void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node);
+
+#endif /* MPSC_QUEUE_H */
diff --git a/tests/automake.mk b/tests/automake.mk
index a32abd41c..4588d5b49 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -466,6 +466,7 @@ tests_ovstest_SOURCES = \
 	tests/test-list.c \
 	tests/test-lockfile.c \
 	tests/test-multipath.c \
+	tests/test-mpsc-queue.c \
 	tests/test-netflow.c \
 	tests/test-odp.c \
 	tests/test-ofpbuf.c \
diff --git a/tests/library.at b/tests/library.at
index e572c22e3..0e47bc445 100644
--- a/tests/library.at
+++ b/tests/library.at
@@ -259,3 +259,8 @@ AT_SETUP([stopwatch module])
 AT_CHECK([ovstest test-stopwatch], [0], [......
 ], [ignore])
 AT_CLEANUP
+
+AT_SETUP([mpsc-queue module])
+AT_CHECK([ovstest test-mpsc-queue check], [0], [....
+])
+AT_CLEANUP
diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c
new file mode 100644
index 000000000..7bcecb8ff
--- /dev/null
+++ b/tests/test-mpsc-queue.c
@@ -0,0 +1,772 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <assert.h>
+#include <getopt.h>
+#include <string.h>
+
+#include <config.h>
+
+#include "command-line.h"
+#include "guarded-list.h"
+#include "mpsc-queue.h"
+#include "openvswitch/list.h"
+#include "openvswitch/util.h"
+#include "openvswitch/vlog.h"
+#include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "ovstest.h"
+#include "timeval.h"
+#include "util.h"
+
+struct element {
+    union {
+        struct mpsc_queue_node mpscq;
+        struct ovs_list list;
+    } node;
+    uint64_t mark;
+};
+
+static void
+test_mpsc_queue_mark_element(struct mpsc_queue_node *node,
+                             uint64_t mark,
+                             unsigned int *counter)
+{
+    struct element *elem;
+
+    elem = CONTAINER_OF(node, struct element, node.mpscq);
+    elem->mark = mark;
+    *counter += 1;
+}
+
+static void
+test_mpsc_queue_insert(void)
+{
+    struct element elements[100];
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    unsigned int counter;
+    size_t i;
+
+    memset(elements, 0, sizeof(elements));
+    mpsc_queue_init(&queue);
+    mpsc_queue_acquire(&queue);
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    counter = 0;
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, 1, &counter);
+    }
+
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+
+    ovs_assert(counter == ARRAY_SIZE(elements));
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        ovs_assert(elements[i].mark == 1);
+    }
+
+    printf(".");
+}
+
+static void
+test_mpsc_queue_removal_fifo(void)
+{
+    struct element elements[100];
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    unsigned int counter;
+    size_t i;
+
+    memset(elements, 0, sizeof(elements));
+
+    mpsc_queue_init(&queue);
+    mpsc_queue_acquire(&queue);
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    /* Elements are in the same order in the list as they
+     * were declared / initialized.
+     */
+    counter = 0;
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, counter, &counter);
+    }
+
+    /* The list is valid once extracted from the queue,
+     * the queue can be destroyed here.
+     */
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+
+    for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) {
+        struct element *e1, *e2;
+
+        e1 = &elements[i];
+        e2 = &elements[i + 1];
+
+        ovs_assert(e1->mark < e2->mark);
+    }
+
+    printf(".");
+}
+
+/* Partial insert:
+ *
+ * Those functions are 'mpsc_queue_insert()' divided in two parts.
+ * They serve to test the behavior of the queue when forcing the potential
+ * condition of a thread starting an insertion then yielding.
+ */
+static struct mpsc_queue_node *
+mpsc_queue_insert_begin(struct mpsc_queue *queue, struct mpsc_queue_node *node)
+{
+    struct mpsc_queue_node *prev;
+
+    atomic_store_explicit(&node->next, NULL, memory_order_relaxed);
+    prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel);
+    return prev;
+}
+
+static void
+mpsc_queue_insert_end(struct mpsc_queue_node *prev,
+                      struct mpsc_queue_node *node)
+{
+    atomic_store_explicit(&prev->next, node, memory_order_release);
+}
+
+static void
+test_mpsc_queue_insert_partial(void)
+{
+    struct element elements[10];
+    struct mpsc_queue_node *prevs[ARRAY_SIZE(elements)];
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue, *q = &queue;
+    size_t i;
+
+    mpsc_queue_init(q);
+
+    /* Insert the first half of elements entirely,
+     * insert the second hald of elements partially.
+     */
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        elements[i].mark = i;
+        if (i > ARRAY_SIZE(elements) / 2) {
+            prevs[i] = mpsc_queue_insert_begin(q, &elements[i].node.mpscq);
+        } else {
+            prevs[i] = NULL;
+            mpsc_queue_insert(q, &elements[i].node.mpscq);
+        }
+    }
+
+    mpsc_queue_acquire(q);
+
+    /* Verify that when the chain is broken, iterators will stop. */
+    i = 0;
+    MPSC_QUEUE_FOR_EACH (node, q) {
+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+        ovs_assert(e == &elements[i]);
+        i++;
+    }
+    ovs_assert(i < ARRAY_SIZE(elements));
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        if (prevs[i] != NULL) {
+            mpsc_queue_insert_end(prevs[i], &elements[i].node.mpscq);
+        }
+    }
+
+    i = 0;
+    MPSC_QUEUE_FOR_EACH (node, q) {
+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+        ovs_assert(e == &elements[i]);
+        i++;
+    }
+    ovs_assert(i == ARRAY_SIZE(elements));
+
+    MPSC_QUEUE_FOR_EACH_POP (node, q) {
+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+        ovs_assert(e->mark == (unsigned int)(e - elements));
+    }
+
+    mpsc_queue_release(q);
+    mpsc_queue_destroy(q);
+
+    printf(".");
+}
+
+static void
+test_mpsc_queue_push_front(void)
+{
+    struct mpsc_queue queue, *q = &queue;
+    struct mpsc_queue_node *node;
+    struct element elements[10];
+    size_t i;
+
+    mpsc_queue_init(q);
+    mpsc_queue_acquire(q);
+
+    ovs_assert(mpsc_queue_pop(q) == NULL);
+    mpsc_queue_push_front(q, &elements[0].node.mpscq);
+    node = mpsc_queue_pop(q);
+    ovs_assert(node == &elements[0].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == NULL);
+
+    mpsc_queue_push_front(q, &elements[0].node.mpscq);
+    mpsc_queue_push_front(q, &elements[1].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == NULL);
+
+    mpsc_queue_push_front(q, &elements[1].node.mpscq);
+    mpsc_queue_push_front(q, &elements[0].node.mpscq);
+    mpsc_queue_insert(q, &elements[2].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == &elements[2].node.mpscq);
+    ovs_assert(mpsc_queue_pop(q) == NULL);
+
+    for (i = 0; i < ARRAY_SIZE(elements); i++) {
+        elements[i].mark = i;
+        mpsc_queue_insert(q, &elements[i].node.mpscq);
+    }
+
+    node = mpsc_queue_pop(q);
+    mpsc_queue_push_front(q, node);
+    ovs_assert(mpsc_queue_pop(q) == node);
+    mpsc_queue_push_front(q, node);
+
+    i = 0;
+    MPSC_QUEUE_FOR_EACH (node, q) {
+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+        ovs_assert(e == &elements[i]);
+        i++;
+    }
+    ovs_assert(i == ARRAY_SIZE(elements));
+
+    MPSC_QUEUE_FOR_EACH_POP (node, q) {
+        struct element *e = CONTAINER_OF(node, struct element, node.mpscq);
+        ovs_assert(e->mark == (unsigned int)(e - elements));
+    }
+
+    mpsc_queue_release(q);
+    mpsc_queue_destroy(q);
+
+    printf(".");
+}
+
+static void
+run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    /* Verify basic insertion. */
+    test_mpsc_queue_insert();
+    /* Test partial insertion. */
+    test_mpsc_queue_insert_partial();
+    /* Verify removal order is respected. */
+    test_mpsc_queue_removal_fifo();
+    /* Verify tail-end insertion works. */
+    test_mpsc_queue_push_front();
+    printf("\n");
+}
+
+static struct element *elements;
+static uint64_t *thread_working_ms; /* Measured work time. */
+
+static unsigned int n_threads;
+static unsigned int n_elems;
+
+static struct ovs_barrier barrier;
+static volatile bool working;
+
+static int
+elapsed(const struct timeval *start)
+{
+    struct timeval end;
+
+    xgettimeofday(&end);
+    return timeval_to_msec(&end) - timeval_to_msec(start);
+}
+
+static void
+print_result(const char *prefix, int reader_elapsed)
+{
+    uint64_t avg;
+    size_t i;
+
+    avg = 0;
+    for (i = 0; i < n_threads; i++) {
+        avg += thread_working_ms[i];
+    }
+    avg /= n_threads;
+    printf("%s:  %6d", prefix, reader_elapsed);
+    for (i = 0; i < n_threads; i++) {
+        printf(" %6" PRIu64, thread_working_ms[i]);
+    }
+    printf(" %6" PRIu64 " ms\n", avg);
+}
+
+struct mpscq_aux {
+    struct mpsc_queue *queue;
+    atomic_uint thread_id;
+};
+
+static void *
+mpsc_queue_insert_thread(void *aux_)
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct mpscq_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_mpsc_queue(void)
+{
+    struct mpsc_queue_node *node;
+    struct mpsc_queue queue;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    struct mpscq_aux aux;
+    uint64_t epoch;
+    size_t i;
+
+    memset(elements, 0, n_elems & sizeof *elements);
+    memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms);
+
+    mpsc_queue_init(&queue);
+
+    aux.queue = &queue;
+    atomic_store(&aux.thread_id, 0);
+
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        mpsc_queue_insert(&queue, &elements[i].node.mpscq);
+    }
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("sc_queue_insert",
+                                       mpsc_queue_insert_thread, &aux);
+    }
+
+    mpsc_queue_acquire(&queue);
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+            test_mpsc_queue_mark_element(node, epoch, &counter);
+        }
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+    }
+
+    /* Elements might have been inserted before threads were joined. */
+    while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) {
+        test_mpsc_queue_mark_element(node, epoch, &counter);
+    }
+
+    print_result("  mpsc-queue", elapsed(&start));
+
+    mpsc_queue_release(&queue);
+    mpsc_queue_destroy(&queue);
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+#ifdef HAVE_PTHREAD_SPIN_LOCK
+#define spin_lock_type       ovs_spin
+#define spin_lock_init(l)    ovs_spin_init(l)
+#define spin_lock_destroy(l) ovs_spin_destroy(l)
+#define spin_lock(l)         ovs_spin_lock(l)
+#define spin_unlock(l)       ovs_spin_unlock(l)
+#else
+#define spin_lock_type       ovs_mutex
+#define spin_lock_init(l)    ovs_mutex_init(l)
+#define spin_lock_destroy(l) ovs_mutex_destroy(l)
+#define spin_lock(l)         ovs_mutex_lock(l)
+#define spin_unlock(l)       ovs_mutex_unlock(l)
+#endif
+
+struct list_aux {
+    struct ovs_list *list;
+    struct ovs_mutex *mutex;
+    struct spin_lock_type *spin;
+    atomic_uint thread_id;
+};
+
+static void *
+locked_list_insert_main(void *aux_)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct list_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        aux->mutex ? ovs_mutex_lock(aux->mutex)
+                   : spin_lock(aux->spin);
+        ovs_list_push_front(aux->list, &th_elements[i].node.list);
+        aux->mutex ? ovs_mutex_unlock(aux->mutex)
+                   : spin_unlock(aux->spin);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_list(bool use_mutex)
+{
+    struct ovs_mutex mutex;
+    struct spin_lock_type spin;
+    struct ovs_list list;
+    struct element *elem;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    struct list_aux aux;
+    uint64_t epoch;
+    size_t i;
+
+    memset(elements, 0, n_elems * sizeof *elements);
+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+    use_mutex ? ovs_mutex_init(&mutex) : spin_lock_init(&spin);
+
+    ovs_list_init(&list);
+
+    aux.list = &list;
+    aux.mutex = use_mutex ? &mutex : NULL;
+    aux.spin = use_mutex ? NULL : &spin;
+    atomic_store(&aux.thread_id, 0);
+
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        ovs_list_push_front(&list, &elements[i].node.list);
+    }
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("locked_list_insert",
+                                       locked_list_insert_main, &aux);
+    }
+
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        if (use_mutex) {
+            ovs_mutex_lock(&mutex);
+            LIST_FOR_EACH_POP (elem, node.list, &list) {
+                elem->mark = epoch;
+                counter++;
+            }
+            ovs_mutex_unlock(&mutex);
+        } else {
+            struct ovs_list *node = NULL;
+
+            spin_lock(&spin);
+            if (!ovs_list_is_empty(&list)) {
+                node = ovs_list_pop_front(&list);
+            }
+            spin_unlock(&spin);
+
+            if (!node) {
+                continue;
+            }
+
+            elem = CONTAINER_OF(node, struct element, node.list);
+            elem->mark = epoch;
+            counter++;
+        }
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+    }
+
+    /* Elements might have been inserted before threads were joined. */
+    LIST_FOR_EACH_POP (elem, node.list, &list) {
+        elem->mark = epoch;
+        counter++;
+    }
+
+    if (use_mutex) {
+        print_result(" list(mutex)", elapsed(&start));
+    } else {
+        print_result("  list(spin)", elapsed(&start));
+    }
+
+    use_mutex ? ovs_mutex_destroy(&mutex) : spin_lock_destroy(&spin);
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+struct guarded_list_aux {
+    struct guarded_list *glist;
+    atomic_uint thread_id;
+};
+
+static void *
+guarded_list_insert_thread(void *aux_)
+{
+    unsigned int n_elems_per_thread;
+    struct element *th_elements;
+    struct guarded_list_aux *aux = aux_;
+    struct timeval start;
+    unsigned int id;
+    size_t i;
+
+    atomic_add(&aux->thread_id, 1u, &id);
+    n_elems_per_thread = n_elems / n_threads;
+    th_elements = &elements[id * n_elems_per_thread];
+
+    ovs_barrier_block(&barrier);
+    xgettimeofday(&start);
+
+    for (i = 0; i < n_elems_per_thread; i++) {
+        guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems);
+    }
+
+    thread_working_ms[id] = elapsed(&start);
+    ovs_barrier_block(&barrier);
+
+    working = false;
+
+    return NULL;
+}
+
+static void
+benchmark_guarded_list(void)
+{
+    struct guarded_list_aux aux;
+    struct ovs_list extracted;
+    struct guarded_list glist;
+    struct element *elem;
+    struct timeval start;
+    unsigned int counter;
+    bool work_complete;
+    pthread_t *threads;
+    uint64_t epoch;
+    size_t i;
+
+    memset(elements, 0, n_elems * sizeof *elements);
+    memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
+
+    guarded_list_init(&glist);
+    ovs_list_init(&extracted);
+
+    aux.glist = &glist;
+    atomic_store(&aux.thread_id, 0);
+
+    for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
+        guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
+    }
+
+    working = true;
+
+    threads = xmalloc(n_threads * sizeof *threads);
+    ovs_barrier_init(&barrier, n_threads);
+
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("guarded_list_insert",
+                                       guarded_list_insert_thread, &aux);
+    }
+
+    xgettimeofday(&start);
+
+    counter = 0;
+    epoch = 1;
+    do {
+        guarded_list_pop_all(&glist, &extracted);
+        LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+            elem->mark = epoch;
+            counter++;
+        }
+        if (epoch == UINT64_MAX) {
+            epoch = 0;
+        }
+        epoch++;
+    } while (working);
+
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+    }
+
+    /* Elements might have been inserted before threads were joined. */
+    guarded_list_pop_all(&glist, &extracted);
+    LIST_FOR_EACH_POP (elem, node.list, &extracted) {
+        elem->mark = epoch;
+        counter++;
+    }
+
+    print_result("guarded list", elapsed(&start));
+
+    ovs_barrier_destroy(&barrier);
+    free(threads);
+    guarded_list_destroy(&glist);
+
+    work_complete = true;
+    for (i = 0; i < n_elems; i++) {
+        if (elements[i].mark == 0) {
+            printf("Element %" PRIuSIZE " was never consumed.\n", i);
+            work_complete = false;
+        }
+    }
+    ovs_assert(work_complete);
+    ovs_assert(counter == n_elems);
+}
+
+static void
+run_benchmarks(struct ovs_cmdl_context *ctx)
+{
+    long int l_threads;
+    long int l_elems;
+    size_t i;
+
+    ovsrcu_quiesce_start();
+
+    l_elems = strtol(ctx->argv[1], NULL, 10);
+    l_threads = strtol(ctx->argv[2], NULL, 10);
+    ovs_assert(l_elems > 0 && l_threads > 0);
+
+    n_elems = l_elems;
+    n_threads = l_threads;
+
+    elements = xcalloc(n_elems, sizeof *elements);
+    thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms);
+
+    printf("Benchmarking n=%u on 1 + %u threads.\n", n_elems, n_threads);
+
+    printf(" type\\thread:  Reader ");
+    for (i = 0; i < n_threads; i++) {
+        printf("   %3" PRIuSIZE " ", i + 1);
+    }
+    printf("   Avg\n");
+
+    benchmark_mpsc_queue();
+#ifdef HAVE_PTHREAD_SPIN_LOCK
+    benchmark_list(false);
+#endif
+    benchmark_list(true);
+    benchmark_guarded_list();
+
+    free(thread_working_ms);
+    free(elements);
+}
+
+static const struct ovs_cmdl_command commands[] = {
+    {"check", NULL, 0, 0, run_tests, OVS_RO},
+    {"benchmark", "<nb elem> <nb threads>", 2, 2, run_benchmarks, OVS_RO},
+    {NULL, NULL, 0, 0, NULL, OVS_RO},
+};
+
+static void
+test_mpsc_queue_main(int argc, char *argv[])
+{
+    struct ovs_cmdl_context ctx = {
+        .argc = argc - optind,
+        .argv = argv + optind,
+    };
+
+    vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);
+
+    set_program_name(argv[0]);
+    ovs_cmdl_run_command(&ctx, commands);
+}
+
+OVSTEST_REGISTER("test-mpsc-queue", test_mpsc_queue_main);
-- 
2.31.1




More information about the dev mailing list