[ovs-dev] [PATCH v3 12/28] mpsc-queue: Module for lock-free message passing

Ilya Maximets i.maximets at ovn.org
Fri Apr 30 15:24:14 UTC 2021


On 4/25/21 1:55 PM, Gaetan Rivet wrote:
> Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
> intrusive, unbounded queue that does not require deferred memory
> management.
> 
> The queue is an implementation of the structure described by Dmitri
> Vyukov[1]. It adds a slightly more explicit API explaining the proper use
> of the queue.
> Alternatives were considered such as a Treiber Stack [2] or a
> Michael-Scott queue [3], but this one is faster, simpler and scalable.
> 
> [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
> 
> [2]: R. K. Treiber. Systems programming: Coping with parallelism.
>      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
> 
> [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking
>      Concurrent Queue Algorithms.
>      https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
> 
> The queue is designed to improve the specific MPSC setup.  A benchmark
> accompanies the unit tests to measure the difference in this configuration.
> A single reader thread polls the queue while N writers enqueue elements
> as fast as possible.  The mpsc-queue is compared against the regular ovs-list
> as well as the guarded list.  The latter usually offers a slight improvement
> by batching the element removal, however the mpsc-queue is faster.
> 
> The average is of each producer threads time:
> 
>    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
>    Benchmarking n=3000000 on 1 + 1 threads.
>     type\thread:  Reader      1    Avg
>      mpsc-queue:     161    161    161 ms
>            list:     803    803    803 ms
>    guarded list:     665    665    665 ms
> 
>    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
>    Benchmarking n=3000000 on 1 + 2 threads.
>     type\thread:  Reader      1      2    Avg
>      mpsc-queue:     102    101     97     99 ms
>            list:     246    212    246    229 ms
>    guarded list:     264    263    214    238 ms
> 
>    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
>    Benchmarking n=3000000 on 1 + 3 threads.
>     type\thread:  Reader      1      2      3    Avg
>      mpsc-queue:      92     91     92     91     91 ms
>            list:     520    517    515    520    517 ms
>    guarded list:     405    395    401    404    400 ms
> 
>    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
>    Benchmarking n=3000000 on 1 + 4 threads.
>     type\thread:  Reader      1      2      3      4    Avg
>      mpsc-queue:      77     73     73     77     75     74 ms
>            list:     371    359    361    287    370    344 ms
>    guarded list:     389    388    359    363    357    366 ms

Hi, Gaetan.

This is an interesting implementation.  However.  I spent a few hours
benchmarking it on my machine and I can say that benchmark itself has
a few issues:

1. Benchmark need to care about cores on which threads are working,
   otherwise results are highly inconsistent, e.g. if several threads
   are scheduled on the same core.  Also, results are different if
   threads are scheduled on sibling cores vs on separate physical cores.

2. ovs-list test is not fair in compare with test for mpsc.  In test
   for ovs-list, reader blocks the writer for the whole loop that also
   includes some calculations inside.  I see you point that, likely,
   taking a mutex for every pop() operation will destroy the performance,
   but you're missing the fact that spinlock could be used instead
   of a mutex.

What I did is that I replaced a mutex with spinlock, set affinity for
threads to separate physical cores and performed a series of tests.
Every test variant was executed 10 times, calculated average, min and
max of the 'Avg' column from these runs.
Since spinlocks are not fair, they sometimes perform badly in case
of too high contention.  For that reason I also included 90% average
that cuts off one worst result.

Results:

./tests/ovstest test-mpsc-queue benchmark 3000000 1
mpsc-queue: avg 183.5  | max 200  | min 169 | 90% avg 181
 spin+list: avg 89.2   | max 142  | min 64  | 90% avg 89

./tests/ovstest test-mpsc-queue benchmark 3000000 2
mpsc-queue: avg 83.3   | max 90   | min 79  | 90% avg 83
 spin+list: avg 141.8  | max 675  | min 43  | 90% avg 83

./tests/ovstest test-mpsc-queue benchmark 3000000 3
mpsc-queue: avg 75.5   | max 80   | min 70  | 90% avg 75
 spin+list: avg 96.2   | max 166  | min 63  | 90% avg 88

We can see that in this setup list+spinlock absolutely crashes the
mspc in case of single reader + single writer, which is the most
likely case for a non-synthetic workload.  It's on par with mpsc
in case of 1+2 and also very similar in 1+3.  I didn't have 5
physical cores to test 1+4.

I also run a test without contention (All reads after join).  And
there is no noticeable difference between implementations in this
case:

./tests/ovstest test-mpsc-queue benchmark 3000000 1 - without contention
mpsc-queue: avg 60.9   | max 75   | min 42  | 90% avg 59
 spin+list: avg 60.5   | max 73   | min 43  | 90% avg 59
   guarded: avg 79.7   | max 95   | min 61  | 90% avg 78

Results are much more flaky in case of scheduling threads to siblings,
but spinlock+list still not too far from mpsc even in this case.

So, the question is: do we really need this new data structure?
spinlock + ovs-list shows very good results and should work just fine,
especially in non-synthetic cases, where contention of more than 2
threads is very unlikely (assuming that critical sections are short).

In OVS these data structures will be used from PMD threads that are
guaranteed to run on different cores.  Non-PMD threads are running
on different set of cores too, so there should be no problems with
contention between threads running on a same core.

Of course, spinlocks should be used wisely, i.e. we should not hold
them longer than necessary.  But if the code is already written for
mspc, it should not be a problem to convert it to spinlock + list
without need to hold a lock for longer than one push() or pop().

Any thoughts?

Below is the patch that I applied on top of this one to perform
tests (on my machine cores 0 and 1 are siblings):

diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c
index 26d48b9ff..67cbce421 100644
--- a/tests/test-mpsc-queue.c
+++ b/tests/test-mpsc-queue.c
@@ -25,7 +25,10 @@
 #include "guarded-list.h"
 #include "mpsc-queue.h"
 #include "openvswitch/list.h"
+#include "openvswitch/vlog.h"
 #include "openvswitch/util.h"
+#include "ovs-numa.h"
+#include "ovs-rcu.h"
 #include "ovs-thread.h"
 #include "ovstest.h"
 #include "timeval.h"
@@ -318,7 +321,10 @@ mpsc_queue_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    ovs_numa_thread_setaffinity_core((id + 1) * 2);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -358,6 +364,7 @@ benchmark_mpsc_queue(void)
 
     aux.queue = &queue;
     atomic_store(&aux.thread_id, 0);
+    ovs_numa_thread_setaffinity_core(0);
 
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         mpsc_queue_insert(&queue, &elements[i].node.mpscq);
@@ -424,7 +431,7 @@ benchmark_mpsc_queue(void)
 
 struct list_aux {
     struct ovs_list *list;
-    struct ovs_mutex *lock;
+    struct ovs_spin *lock;
     atomic_uint thread_id;
 };
 
@@ -438,7 +445,10 @@ locked_list_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    ovs_numa_thread_setaffinity_core((id + 1) * 2);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -446,9 +456,9 @@ locked_list_insert_thread(void *aux_)
     xgettimeofday(&start);
 
     for (i = 0; i < n_elems_per_thread; i++) {
-        ovs_mutex_lock(aux->lock);
+        ovs_spin_lock(aux->lock);
         ovs_list_push_front(aux->list, &th_elements[i].node.list);
-        ovs_mutex_unlock(aux->lock);
+        ovs_spin_unlock(aux->lock);
     }
 
     thread_working_ms[id] = elapsed(&start);
@@ -462,7 +472,7 @@ locked_list_insert_thread(void *aux_)
 static void
 benchmark_list(void)
 {
-    struct ovs_mutex lock;
+    struct ovs_spin lock;
     struct ovs_list list;
     struct element *elem;
     struct timeval start;
@@ -477,18 +487,19 @@ benchmark_list(void)
     memset(elements, 0, n_elems * sizeof *elements);
     memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
 
-    ovs_mutex_init(&lock);
+    ovs_spin_init(&lock);
     ovs_list_init(&list);
 
     aux.list = &list;
     aux.lock = &lock;
     atomic_store(&aux.thread_id, 0);
+    ovs_numa_thread_setaffinity_core(0);
 
-    ovs_mutex_lock(&lock);
+    ovs_spin_lock(&lock);
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         ovs_list_push_front(&list, &elements[i].node.list);
     }
-    ovs_mutex_unlock(&lock);
+    ovs_spin_unlock(&lock);
 
     working = true;
 
@@ -505,12 +516,21 @@ benchmark_list(void)
     counter = 0;
     epoch = 1;
     do {
-        ovs_mutex_lock(&lock);
-        LIST_FOR_EACH_POP (elem, node.list, &list) {
-            elem->mark = epoch;
-            counter++;
+        struct ovs_list *node = NULL;
+
+        ovs_spin_lock(&lock);
+        if (!ovs_list_is_empty(&list)) {
+            node = ovs_list_pop_front(&list);
+        }
+        ovs_spin_unlock(&lock);
+
+        if (!node) {
+            continue;
         }
-        ovs_mutex_unlock(&lock);
+
+        elem = CONTAINER_OF(node, struct element, node.list);
+        elem->mark = epoch;
+        counter++;
         if (epoch == UINT64_MAX) {
             epoch = 0;
         }
@@ -525,12 +545,12 @@ benchmark_list(void)
     avg /= n_threads;
 
     /* Elements might have been inserted before threads were joined. */
-    ovs_mutex_lock(&lock);
+    ovs_spin_lock(&lock);
     LIST_FOR_EACH_POP (elem, node.list, &list) {
         elem->mark = epoch;
         counter++;
     }
-    ovs_mutex_unlock(&lock);
+    ovs_spin_unlock(&lock);
 
     printf("        list:  %6d", elapsed(&start));
     for (i = 0; i < n_threads; i++) {
@@ -566,7 +586,10 @@ guarded_list_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    ovs_numa_thread_setaffinity_core((id + 1) * 2);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -608,6 +631,7 @@ benchmark_guarded_list(void)
 
     aux.glist = &glist;
     atomic_store(&aux.thread_id, 0);
+    ovs_numa_thread_setaffinity_core(0);
 
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
@@ -680,6 +704,9 @@ run_benchmarks(struct ovs_cmdl_context *ctx)
     long int l_elems;
     size_t i;
 
+    vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);
+    ovs_numa_init();
+
     l_elems = strtol(ctx->argv[1], NULL, 10);
     l_threads = strtol(ctx->argv[2], NULL, 10);
     ovs_assert(l_elems > 0 && l_threads > 0);
---


More information about the dev mailing list