[ovs-dev] [PATCH v3 12/28] mpsc-queue: Module for lock-free message passing

Gaëtan Rivet grive at u256.net
Sat May 1 13:20:28 UTC 2021


On Fri, Apr 30, 2021, at 17:24, Ilya Maximets wrote:
> On 4/25/21 1:55 PM, Gaetan Rivet wrote:
> > Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
> > intrusive, unbounded queue that does not require deferred memory
> > management.
> > 
> > The queue is an implementation of the structure described by Dmitri
> > Vyukov[1]. It adds a slightly more explicit API explaining the proper use
> > of the queue.
> > Alternatives were considered such as a Treiber Stack [2] or a
> > Michael-Scott queue [3], but this one is faster, simpler and scalable.
> > 
> > [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
> > 
> > [2]: R. K. Treiber. Systems programming: Coping with parallelism.
> >      Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
> > 
> > [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking
> >      Concurrent Queue Algorithms.
> >      https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
> > 
> > The queue is designed to improve the specific MPSC setup.  A benchmark
> > accompanies the unit tests to measure the difference in this configuration.
> > A single reader thread polls the queue while N writers enqueue elements
> > as fast as possible.  The mpsc-queue is compared against the regular ovs-list
> > as well as the guarded list.  The latter usually offers a slight improvement
> > by batching the element removal, however the mpsc-queue is faster.
> > 
> > The average is of each producer threads time:
> > 
> >    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
> >    Benchmarking n=3000000 on 1 + 1 threads.
> >     type\thread:  Reader      1    Avg
> >      mpsc-queue:     161    161    161 ms
> >            list:     803    803    803 ms
> >    guarded list:     665    665    665 ms
> > 
> >    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
> >    Benchmarking n=3000000 on 1 + 2 threads.
> >     type\thread:  Reader      1      2    Avg
> >      mpsc-queue:     102    101     97     99 ms
> >            list:     246    212    246    229 ms
> >    guarded list:     264    263    214    238 ms
> > 
> >    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
> >    Benchmarking n=3000000 on 1 + 3 threads.
> >     type\thread:  Reader      1      2      3    Avg
> >      mpsc-queue:      92     91     92     91     91 ms
> >            list:     520    517    515    520    517 ms
> >    guarded list:     405    395    401    404    400 ms
> > 
> >    $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
> >    Benchmarking n=3000000 on 1 + 4 threads.
> >     type\thread:  Reader      1      2      3      4    Avg
> >      mpsc-queue:      77     73     73     77     75     74 ms
> >            list:     371    359    361    287    370    344 ms
> >    guarded list:     389    388    359    363    357    366 ms
> 
> Hi, Gaetan.
> 
> This is an interesting implementation.  However.  I spent a few hours
> benchmarking it on my machine and I can say that benchmark itself has
> a few issues:
> 
> 1. Benchmark need to care about cores on which threads are working,
>    otherwise results are highly inconsistent, e.g. if several threads
>    are scheduled on the same core.  Also, results are different if
>    threads are scheduled on sibling cores vs on separate physical cores.
> 
> 2. ovs-list test is not fair in compare with test for mpsc.  In test
>    for ovs-list, reader blocks the writer for the whole loop that also
>    includes some calculations inside.  I see you point that, likely,
>    taking a mutex for every pop() operation will destroy the performance,
>    but you're missing the fact that spinlock could be used instead
>    of a mutex.
> 
> What I did is that I replaced a mutex with spinlock, set affinity for
> threads to separate physical cores and performed a series of tests.
> Every test variant was executed 10 times, calculated average, min and
> max of the 'Avg' column from these runs.
> Since spinlocks are not fair, they sometimes perform badly in case
> of too high contention.  For that reason I also included 90% average
> that cuts off one worst result.
> 
> Results:
> 
> ./tests/ovstest test-mpsc-queue benchmark 3000000 1
> mpsc-queue: avg 183.5  | max 200  | min 169 | 90% avg 181
>  spin+list: avg 89.2   | max 142  | min 64  | 90% avg 89
> 
> ./tests/ovstest test-mpsc-queue benchmark 3000000 2
> mpsc-queue: avg 83.3   | max 90   | min 79  | 90% avg 83
>  spin+list: avg 141.8  | max 675  | min 43  | 90% avg 83
> 
> ./tests/ovstest test-mpsc-queue benchmark 3000000 3
> mpsc-queue: avg 75.5   | max 80   | min 70  | 90% avg 75
>  spin+list: avg 96.2   | max 166  | min 63  | 90% avg 88
> 
> We can see that in this setup list+spinlock absolutely crashes the
> mspc in case of single reader + single writer, which is the most
> likely case for a non-synthetic workload.  It's on par with mpsc
> in case of 1+2 and also very similar in 1+3.  I didn't have 5
> physical cores to test 1+4.
> 
> I also run a test without contention (All reads after join).  And
> there is no noticeable difference between implementations in this
> case:
> 
> ./tests/ovstest test-mpsc-queue benchmark 3000000 1 - without contention
> mpsc-queue: avg 60.9   | max 75   | min 42  | 90% avg 59
>  spin+list: avg 60.5   | max 73   | min 43  | 90% avg 59
>    guarded: avg 79.7   | max 95   | min 61  | 90% avg 78
> 
> Results are much more flaky in case of scheduling threads to siblings,
> but spinlock+list still not too far from mpsc even in this case.
> 
> So, the question is: do we really need this new data structure?
> spinlock + ovs-list shows very good results and should work just fine,
> especially in non-synthetic cases, where contention of more than 2
> threads is very unlikely (assuming that critical sections are short).
> 
> In OVS these data structures will be used from PMD threads that are
> guaranteed to run on different cores.  Non-PMD threads are running
> on different set of cores too, so there should be no problems with
> contention between threads running on a same core.
> 
> Of course, spinlocks should be used wisely, i.e. we should not hold
> them longer than necessary.  But if the code is already written for
> mspc, it should not be a problem to convert it to spinlock + list
> without need to hold a lock for longer than one push() or pop().
> 
> Any thoughts?

Hello Ilya,

You raise valid points, and performance work is usually nuanced.
Thanks for taking a thorough look into it. Using a list + spinlock
would reduce code so it is worth examining.

Getting the benchmark closer to reality:

The first application of the queue is with the offload management case.

The benchmark departs from the actual workload by having all producers enqueuing equally.
I'm not sure how the number of offloads inserted by the PMD compares to the number deleted
by revalidators. Maybe in the benchmark producers should be asymmetrical?

I don't think that the 1+1 test is the least synthetic. Multiple PMDs can be used,
but even if only 1 PMD is used, a number of revalidators are configured by default
(7 I think?). Maybe they are not constantly working in a normal setup?
But I don't think that multiple PMDs || active revalidators is such an edge-case
that it should be ignored.

Thread affinity:

Only PMDs have dedicated cores. The offload thread and revalidators do not.
I think this is an important point to keep in mind when considering the spinlock
as it is unfair.

If I run the 1+1 test with all producers + reader affined to their core, I get
results closer to yours, though with slightly improved numbers for mpsc-queue (I'm guessing
the benchmark is not isolated enough on our machines and it will still depend
on general load):

$ ./mpsc-stats.sh  1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 1':
   mpsc-queue reader: avg   70.7 | stdev    3.8 | max    75 | min    61
  mpsc-queue writers: avg   56.2 | stdev    3.4 | max    61 | min    48
    spin+list reader: avg   49.0 | stdev    7.5 | max    65 | min    38
   spin+list writers: avg   30.9 | stdev    8.6 | max    51 | min    21
 guarded list reader: avg  251.6 | stdev   62.7 | max   369 | min   173
guarded list writers: avg  237.8 | stdev   62.3 | max   351 | min   161

If I set a single producer affinity however (not the reader nor other producers),
I get surprising results:

$ ./mpsc-stats.sh  1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 1':
   mpsc-queue reader: avg   85.6 | stdev   29.1 | max   111 | min    24
  mpsc-queue writers: avg   84.2 | stdev   30.8 | max   108 | min    24
    spin+list reader: avg 1730.1 | stdev 4403.4 | max 19418 | min    40
   spin+list writers: avg 1726.2 | stdev 4404.8 | max 19418 | min    35
 guarded list reader: avg  470.0 | stdev  147.7 | max   710 | min   204
guarded list writers: avg  469.9 | stdev  147.7 | max   710 | min   204

If I profile some of the worst runs in this configuration, I can see
84% of the time spent waiting on the spinlock. The affinity for the producer
on core 0 has an adverse effect. If I affine it to core 1 instead,
I get still unstable but better results:

$ ./mpsc-stats.sh  1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 1':
   mpsc-queue reader: avg   48.8 | stdev   15.1 | max    60 | min    13
  mpsc-queue writers: avg   48.8 | stdev   15.1 | max    60 | min    13
    spin+list reader: avg   62.3 | stdev   81.0 | max   328 | min    23
   spin+list writers: avg   59.0 | stdev   82.4 | max   328 | min    20
 guarded list reader: avg  271.5 | stdev   46.9 | max   337 | min   190
guarded list writers: avg  271.3 | stdev   46.9 | max   337 | min   190
$ ./mpsc-stats.sh  1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 1':
   mpsc-queue reader: avg   47.1 | stdev   15.8 | max    59 | min    13
  mpsc-queue writers: avg   45.8 | stdev   16.9 | max    59 | min    13
    spin+list reader: avg  172.4 | stdev  467.8 | max  2100 | min    23
   spin+list writers: avg  168.9 | stdev  468.8 | max  2100 | min    20
 guarded list reader: avg  260.0 | stdev   68.5 | max   362 | min   129
guarded list writers: avg  259.9 | stdev   68.5 | max   362 | min   129

Once we use more than 1+1 threads it improves:

$ ./mpsc-stats.sh  1000000 2
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 2':
   mpsc-queue reader: avg   36.7 | stdev    5.3 | max    51 | min    27
  mpsc-queue writers: avg   31.5 | stdev    4.9 | max    39 | min    23
    spin+list reader: avg   49.0 | stdev   15.4 | max    73 | min    26
   spin+list writers: avg   40.2 | stdev   15.3 | max    65 | min    16
 guarded list reader: avg   77.9 | stdev    6.3 | max    95 | min    69
guarded list writers: avg   73.1 | stdev    5.6 | max    87 | min    63

$ ./mpsc-stats.sh  1000000 3
20 times './tests/ovstest test-mpsc-queue benchmark  1000000 3':
   mpsc-queue reader: avg   31.1 | stdev    6.0 | max    42 | min    24
  mpsc-queue writers: avg   28.4 | stdev    5.9 | max    41 | min    19
    spin+list reader: avg   82.2 | stdev   18.9 | max   127 | min    46
   spin+list writers: avg   70.8 | stdev   19.5 | max   119 | min    36
 guarded list reader: avg  162.2 | stdev   15.7 | max   189 | min   132
guarded list writers: avg  158.6 | stdev   16.2 | max   185 | min   126

Avoiding additional code would be nice, but spinlocks can have
surprising effects in some configurations (or so it seems).

The spinlock reduces stability and that could be hard to pinpoint in the
larger system. In future work to improve latency in the datapath, it will add noise.

The exponential backoff will work just as well in the offload thread, so
if we decide to go with list + spinlock it's alright. My vote still goes to
the mpsc-queue as it is more predictable with honorable results in 1+1.

To decide whether we need a new structure, I think we should look at the overall architecture.
OVS-DPDK runs a mix of datapath threads on affined cores and non-affined helper threads.
Using spinlocks in this context might be an issue. I think it makes sense to have ways to
avoid them if needed.

Gaetan

PS: here is the diff I used to generate the results, including the script
for the stats:

---
diff --git a/mpsc-stats.sh b/mpsc-stats.sh
new file mode 100755
index 000000000..1ee177ede
--- /dev/null
+++ b/mpsc-stats.sh
@@ -0,0 +1,80 @@
+#!/usr/bin/env sh
+
+N_ELEM=${1:-1000000}
+N_CORE=${2:-1}
+N_RUN=${3:-20}
+
+#BIN="taskset -c 0,1,2,3 ./tests/ovstest test-mpsc-queue benchmark "
+BIN="./tests/ovstest test-mpsc-queue benchmark "
+CMD="$BIN $N_ELEM $N_CORE"
+CMD_fast="$BIN 1 1"
+
+xc() { python -c "import math; print(float($*))"; }
+join_by() {( IFS="$1"; shift; echo "$*"; )}
+
+stdev() {(
+    m=$1; shift;
+    sum=0
+    for v in $*; do
+        sum=$(xc "$sum + pow($v - $m, 2)")
+    done
+    echo $(xc "math.sqrt($sum / $#)")
+)}
+mean() { echo $(xc "($(join_by + $*)) / $#"); }
+
+# $1: name field max width
+# $2: stat name
+# $3-: values
+print_stats() {(
+    len=$1; shift;
+    name=$1; shift;
+    values="$*"
+    m=$(mean $values)
+    sd=$(stdev $m $values)
+    printf "%*s: avg %6.1f | stdev %6.1f | max %5.0f | min %5.0f\n" \
+        "$len" "$name" "$m" "$sd" \
+        "$(xc "max($(join_by , $values))")" \
+        "$(xc "min($(join_by , $values))")"
+)}
+
+tmp=$(mktemp)
+$CMD_fast | grep -v 'Benchmarking\|Reader' | \
+while read line; do
+    name="$(echo $line | cut -d: -f1)"
+    printf "%s reader\t%s writers\t" "$name" "$name"
+    #printf "%s writers\t" "$name"
+done >> $tmp
+printf "\n" >> $tmp
+
+echo "$N_RUN times '$CMD':"
+
+for i in $(seq 1 $N_RUN); do
+$CMD | grep -v 'Benchmarking\|Reader' | \
+while read line; do
+    name="$(echo $line    | cut -d: -f1)"
+    reader="$(echo $line  | cut -d: -f2- | cut -d' ' -f2)"
+    writers="$(echo $line | cut -d: -f2- | rev | cut -d' ' -f2 | rev)"
+
+    printf "%d\t%d\t" $reader $writers
+    #printf "%d\t" $writers
+done >> $tmp
+printf "\n" >> $tmp
+done
+
+#cat $tmp
+
+nb_col=$(awk -F$'\t' '{print NF-1}' $tmp | head -1)
+maxlen=0
+for i in $(seq 1 $nb_col); do
+    name=$(head -1 $tmp | cut -d$'\t' -f$i)
+    len=$(printf "%s" "$name" |wc -m)
+    [ "$maxlen" -lt "$len" ] && maxlen=$len
+done
+
+for i in $(seq 1 $nb_col); do
+    name=$(head -1 $tmp | cut -d$'\t' -f$i)
+    values=$(tail -n +2 $tmp | cut -d$'\t' -f$i)
+    print_stats $maxlen "$name" $values
+done
+
+rm $tmp
diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c
index ebd1226fe..cad7a0ce3 100644
--- a/tests/test-mpsc-queue.c
+++ b/tests/test-mpsc-queue.c
@@ -25,12 +25,22 @@
 #include "guarded-list.h"
 #include "mpsc-queue.h"
 #include "openvswitch/list.h"
+#include "openvswitch/vlog.h"
 #include "openvswitch/util.h"
+#include "ovs-numa.h"
+#include "ovs-rcu.h"
 #include "ovs-thread.h"
 #include "ovstest.h"
 #include "timeval.h"
 #include "util.h"
 
+static void
+set_affinity(unsigned int tid OVS_UNUSED)
+{
+    //ovs_numa_thread_setaffinity_core(tid * 2);
+    //ovs_numa_thread_setaffinity_core(tid);
+}
+
 struct element {
     union {
         struct mpsc_queue_node mpscq;
@@ -318,7 +328,13 @@ mpsc_queue_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    if (id == 0) {
+        ovs_numa_thread_setaffinity_core(1);
+    }
+    set_affinity(id + 1);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -358,6 +374,7 @@ benchmark_mpsc_queue(void)
 
     aux.queue = &queue;
     atomic_store(&aux.thread_id, 0);
+    set_affinity(0);
 
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         mpsc_queue_insert(&queue, &elements[i].node.mpscq);
@@ -424,7 +441,7 @@ benchmark_mpsc_queue(void)
 
 struct list_aux {
     struct ovs_list *list;
-    struct ovs_mutex *lock;
+    struct ovs_spin *lock;
     atomic_uint thread_id;
 };
 
@@ -438,7 +455,13 @@ locked_list_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    if (id == 0) {
+        ovs_numa_thread_setaffinity_core(1);
+    }
+    set_affinity(id + 1);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -446,9 +469,9 @@ locked_list_insert_thread(void *aux_)
     xgettimeofday(&start);
 
     for (i = 0; i < n_elems_per_thread; i++) {
-        ovs_mutex_lock(aux->lock);
+        ovs_spin_lock(aux->lock);
         ovs_list_push_front(aux->list, &th_elements[i].node.list);
-        ovs_mutex_unlock(aux->lock);
+        ovs_spin_unlock(aux->lock);
     }
 
     thread_working_ms[id] = elapsed(&start);
@@ -462,7 +485,7 @@ locked_list_insert_thread(void *aux_)
 static void
 benchmark_list(void)
 {
-    struct ovs_mutex lock;
+    struct ovs_spin lock;
     struct ovs_list list;
     struct element *elem;
     struct timeval start;
@@ -477,18 +500,19 @@ benchmark_list(void)
     memset(elements, 0, n_elems * sizeof *elements);
     memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
 
-    ovs_mutex_init(&lock);
+    ovs_spin_init(&lock);
     ovs_list_init(&list);
 
     aux.list = &list;
     aux.lock = &lock;
     atomic_store(&aux.thread_id, 0);
+    set_affinity(0);
 
-    ovs_mutex_lock(&lock);
+    ovs_spin_lock(&lock);
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         ovs_list_push_front(&list, &elements[i].node.list);
     }
-    ovs_mutex_unlock(&lock);
+    ovs_spin_unlock(&lock);
 
     working = true;
 
@@ -505,12 +529,21 @@ benchmark_list(void)
     counter = 0;
     epoch = 1;
     do {
-        ovs_mutex_lock(&lock);
-        LIST_FOR_EACH_POP (elem, node.list, &list) {
-            elem->mark = epoch;
-            counter++;
+        struct ovs_list *node = NULL;
+
+        ovs_spin_lock(&lock);
+        if (!ovs_list_is_empty(&list)) {
+            node = ovs_list_pop_front(&list);
+        }
+        ovs_spin_unlock(&lock);
+
+        if (!node) {
+            continue;
         }
-        ovs_mutex_unlock(&lock);
+
+        elem = CONTAINER_OF(node, struct element, node.list);
+        elem->mark = epoch;
+        counter++;
         if (epoch == UINT64_MAX) {
             epoch = 0;
         }
@@ -525,14 +558,14 @@ benchmark_list(void)
     avg /= n_threads;
 
     /* Elements might have been inserted before threads were joined. */
-    ovs_mutex_lock(&lock);
+    ovs_spin_lock(&lock);
     LIST_FOR_EACH_POP (elem, node.list, &list) {
         elem->mark = epoch;
         counter++;
     }
-    ovs_mutex_unlock(&lock);
+    ovs_spin_unlock(&lock);
 
-    printf("        list:  %6d", elapsed(&start));
+    printf("   spin+list:  %6d", elapsed(&start));
     for (i = 0; i < n_threads; i++) {
         printf(" %6" PRIu64, thread_working_ms[i]);
     }
@@ -566,7 +599,13 @@ guarded_list_insert_thread(void *aux_)
     unsigned int id;
     size_t i;
 
+    ovsrcu_quiesce_start();
+
     atomic_add(&aux->thread_id, 1u, &id);
+    if (id == 0) {
+        ovs_numa_thread_setaffinity_core(1);
+    }
+    set_affinity(id + 1);
     n_elems_per_thread = n_elems / n_threads;
     th_elements = &elements[id * n_elems_per_thread];
 
@@ -585,6 +624,7 @@ guarded_list_insert_thread(void *aux_)
     return NULL;
 }
 
+OVS_UNUSED
 static void
 benchmark_guarded_list(void)
 {
@@ -608,6 +648,7 @@ benchmark_guarded_list(void)
 
     aux.glist = &glist;
     atomic_store(&aux.thread_id, 0);
+    set_affinity(0);
 
     for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
         guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
@@ -680,6 +721,9 @@ run_benchmarks(struct ovs_cmdl_context *ctx)
     long int l_elems;
     size_t i;
 
+    vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);
+    ovs_numa_init();
+
     l_elems = strtol(ctx->argv[1], NULL, 10);
     l_threads = strtol(ctx->argv[2], NULL, 10);
     ovs_assert(l_elems > 0 && l_threads > 0);


More information about the dev mailing list