[ovs-dev] [PATCH v3 12/28] mpsc-queue: Module for lock-free message passing
Gaëtan Rivet
grive at u256.net
Sat May 1 13:20:28 UTC 2021
On Fri, Apr 30, 2021, at 17:24, Ilya Maximets wrote:
> On 4/25/21 1:55 PM, Gaetan Rivet wrote:
> > Add a lockless multi-producer/single-consumer (MPSC), linked-list based,
> > intrusive, unbounded queue that does not require deferred memory
> > management.
> >
> > The queue is an implementation of the structure described by Dmitri
> > Vyukov[1]. It adds a slightly more explicit API explaining the proper use
> > of the queue.
> > Alternatives were considered such as a Treiber Stack [2] or a
> > Michael-Scott queue [3], but this one is faster, simpler and scalable.
> >
> > [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
> >
> > [2]: R. K. Treiber. Systems programming: Coping with parallelism.
> > Technical Report RJ 5118, IBM Almaden Research Center, April 1986.
> >
> > [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking
> > Concurrent Queue Algorithms.
> > https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
> >
> > The queue is designed to improve the specific MPSC setup. A benchmark
> > accompanies the unit tests to measure the difference in this configuration.
> > A single reader thread polls the queue while N writers enqueue elements
> > as fast as possible. The mpsc-queue is compared against the regular ovs-list
> > as well as the guarded list. The latter usually offers a slight improvement
> > by batching the element removal, however the mpsc-queue is faster.
> >
> > The average is of each producer threads time:
> >
> > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1
> > Benchmarking n=3000000 on 1 + 1 threads.
> > type\thread: Reader 1 Avg
> > mpsc-queue: 161 161 161 ms
> > list: 803 803 803 ms
> > guarded list: 665 665 665 ms
> >
> > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2
> > Benchmarking n=3000000 on 1 + 2 threads.
> > type\thread: Reader 1 2 Avg
> > mpsc-queue: 102 101 97 99 ms
> > list: 246 212 246 229 ms
> > guarded list: 264 263 214 238 ms
> >
> > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3
> > Benchmarking n=3000000 on 1 + 3 threads.
> > type\thread: Reader 1 2 3 Avg
> > mpsc-queue: 92 91 92 91 91 ms
> > list: 520 517 515 520 517 ms
> > guarded list: 405 395 401 404 400 ms
> >
> > $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4
> > Benchmarking n=3000000 on 1 + 4 threads.
> > type\thread: Reader 1 2 3 4 Avg
> > mpsc-queue: 77 73 73 77 75 74 ms
> > list: 371 359 361 287 370 344 ms
> > guarded list: 389 388 359 363 357 366 ms
>
> Hi, Gaetan.
>
> This is an interesting implementation. However. I spent a few hours
> benchmarking it on my machine and I can say that benchmark itself has
> a few issues:
>
> 1. Benchmark need to care about cores on which threads are working,
> otherwise results are highly inconsistent, e.g. if several threads
> are scheduled on the same core. Also, results are different if
> threads are scheduled on sibling cores vs on separate physical cores.
>
> 2. ovs-list test is not fair in compare with test for mpsc. In test
> for ovs-list, reader blocks the writer for the whole loop that also
> includes some calculations inside. I see you point that, likely,
> taking a mutex for every pop() operation will destroy the performance,
> but you're missing the fact that spinlock could be used instead
> of a mutex.
>
> What I did is that I replaced a mutex with spinlock, set affinity for
> threads to separate physical cores and performed a series of tests.
> Every test variant was executed 10 times, calculated average, min and
> max of the 'Avg' column from these runs.
> Since spinlocks are not fair, they sometimes perform badly in case
> of too high contention. For that reason I also included 90% average
> that cuts off one worst result.
>
> Results:
>
> ./tests/ovstest test-mpsc-queue benchmark 3000000 1
> mpsc-queue: avg 183.5 | max 200 | min 169 | 90% avg 181
> spin+list: avg 89.2 | max 142 | min 64 | 90% avg 89
>
> ./tests/ovstest test-mpsc-queue benchmark 3000000 2
> mpsc-queue: avg 83.3 | max 90 | min 79 | 90% avg 83
> spin+list: avg 141.8 | max 675 | min 43 | 90% avg 83
>
> ./tests/ovstest test-mpsc-queue benchmark 3000000 3
> mpsc-queue: avg 75.5 | max 80 | min 70 | 90% avg 75
> spin+list: avg 96.2 | max 166 | min 63 | 90% avg 88
>
> We can see that in this setup list+spinlock absolutely crashes the
> mspc in case of single reader + single writer, which is the most
> likely case for a non-synthetic workload. It's on par with mpsc
> in case of 1+2 and also very similar in 1+3. I didn't have 5
> physical cores to test 1+4.
>
> I also run a test without contention (All reads after join). And
> there is no noticeable difference between implementations in this
> case:
>
> ./tests/ovstest test-mpsc-queue benchmark 3000000 1 - without contention
> mpsc-queue: avg 60.9 | max 75 | min 42 | 90% avg 59
> spin+list: avg 60.5 | max 73 | min 43 | 90% avg 59
> guarded: avg 79.7 | max 95 | min 61 | 90% avg 78
>
> Results are much more flaky in case of scheduling threads to siblings,
> but spinlock+list still not too far from mpsc even in this case.
>
> So, the question is: do we really need this new data structure?
> spinlock + ovs-list shows very good results and should work just fine,
> especially in non-synthetic cases, where contention of more than 2
> threads is very unlikely (assuming that critical sections are short).
>
> In OVS these data structures will be used from PMD threads that are
> guaranteed to run on different cores. Non-PMD threads are running
> on different set of cores too, so there should be no problems with
> contention between threads running on a same core.
>
> Of course, spinlocks should be used wisely, i.e. we should not hold
> them longer than necessary. But if the code is already written for
> mspc, it should not be a problem to convert it to spinlock + list
> without need to hold a lock for longer than one push() or pop().
>
> Any thoughts?
Hello Ilya,
You raise valid points, and performance work is usually nuanced.
Thanks for taking a thorough look into it. Using a list + spinlock
would reduce code so it is worth examining.
Getting the benchmark closer to reality:
The first application of the queue is with the offload management case.
The benchmark departs from the actual workload by having all producers enqueuing equally.
I'm not sure how the number of offloads inserted by the PMD compares to the number deleted
by revalidators. Maybe in the benchmark producers should be asymmetrical?
I don't think that the 1+1 test is the least synthetic. Multiple PMDs can be used,
but even if only 1 PMD is used, a number of revalidators are configured by default
(7 I think?). Maybe they are not constantly working in a normal setup?
But I don't think that multiple PMDs || active revalidators is such an edge-case
that it should be ignored.
Thread affinity:
Only PMDs have dedicated cores. The offload thread and revalidators do not.
I think this is an important point to keep in mind when considering the spinlock
as it is unfair.
If I run the 1+1 test with all producers + reader affined to their core, I get
results closer to yours, though with slightly improved numbers for mpsc-queue (I'm guessing
the benchmark is not isolated enough on our machines and it will still depend
on general load):
$ ./mpsc-stats.sh 1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 1':
mpsc-queue reader: avg 70.7 | stdev 3.8 | max 75 | min 61
mpsc-queue writers: avg 56.2 | stdev 3.4 | max 61 | min 48
spin+list reader: avg 49.0 | stdev 7.5 | max 65 | min 38
spin+list writers: avg 30.9 | stdev 8.6 | max 51 | min 21
guarded list reader: avg 251.6 | stdev 62.7 | max 369 | min 173
guarded list writers: avg 237.8 | stdev 62.3 | max 351 | min 161
If I set a single producer affinity however (not the reader nor other producers),
I get surprising results:
$ ./mpsc-stats.sh 1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 1':
mpsc-queue reader: avg 85.6 | stdev 29.1 | max 111 | min 24
mpsc-queue writers: avg 84.2 | stdev 30.8 | max 108 | min 24
spin+list reader: avg 1730.1 | stdev 4403.4 | max 19418 | min 40
spin+list writers: avg 1726.2 | stdev 4404.8 | max 19418 | min 35
guarded list reader: avg 470.0 | stdev 147.7 | max 710 | min 204
guarded list writers: avg 469.9 | stdev 147.7 | max 710 | min 204
If I profile some of the worst runs in this configuration, I can see
84% of the time spent waiting on the spinlock. The affinity for the producer
on core 0 has an adverse effect. If I affine it to core 1 instead,
I get still unstable but better results:
$ ./mpsc-stats.sh 1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 1':
mpsc-queue reader: avg 48.8 | stdev 15.1 | max 60 | min 13
mpsc-queue writers: avg 48.8 | stdev 15.1 | max 60 | min 13
spin+list reader: avg 62.3 | stdev 81.0 | max 328 | min 23
spin+list writers: avg 59.0 | stdev 82.4 | max 328 | min 20
guarded list reader: avg 271.5 | stdev 46.9 | max 337 | min 190
guarded list writers: avg 271.3 | stdev 46.9 | max 337 | min 190
$ ./mpsc-stats.sh 1000000 1
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 1':
mpsc-queue reader: avg 47.1 | stdev 15.8 | max 59 | min 13
mpsc-queue writers: avg 45.8 | stdev 16.9 | max 59 | min 13
spin+list reader: avg 172.4 | stdev 467.8 | max 2100 | min 23
spin+list writers: avg 168.9 | stdev 468.8 | max 2100 | min 20
guarded list reader: avg 260.0 | stdev 68.5 | max 362 | min 129
guarded list writers: avg 259.9 | stdev 68.5 | max 362 | min 129
Once we use more than 1+1 threads it improves:
$ ./mpsc-stats.sh 1000000 2
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 2':
mpsc-queue reader: avg 36.7 | stdev 5.3 | max 51 | min 27
mpsc-queue writers: avg 31.5 | stdev 4.9 | max 39 | min 23
spin+list reader: avg 49.0 | stdev 15.4 | max 73 | min 26
spin+list writers: avg 40.2 | stdev 15.3 | max 65 | min 16
guarded list reader: avg 77.9 | stdev 6.3 | max 95 | min 69
guarded list writers: avg 73.1 | stdev 5.6 | max 87 | min 63
$ ./mpsc-stats.sh 1000000 3
20 times './tests/ovstest test-mpsc-queue benchmark 1000000 3':
mpsc-queue reader: avg 31.1 | stdev 6.0 | max 42 | min 24
mpsc-queue writers: avg 28.4 | stdev 5.9 | max 41 | min 19
spin+list reader: avg 82.2 | stdev 18.9 | max 127 | min 46
spin+list writers: avg 70.8 | stdev 19.5 | max 119 | min 36
guarded list reader: avg 162.2 | stdev 15.7 | max 189 | min 132
guarded list writers: avg 158.6 | stdev 16.2 | max 185 | min 126
Avoiding additional code would be nice, but spinlocks can have
surprising effects in some configurations (or so it seems).
The spinlock reduces stability and that could be hard to pinpoint in the
larger system. In future work to improve latency in the datapath, it will add noise.
The exponential backoff will work just as well in the offload thread, so
if we decide to go with list + spinlock it's alright. My vote still goes to
the mpsc-queue as it is more predictable with honorable results in 1+1.
To decide whether we need a new structure, I think we should look at the overall architecture.
OVS-DPDK runs a mix of datapath threads on affined cores and non-affined helper threads.
Using spinlocks in this context might be an issue. I think it makes sense to have ways to
avoid them if needed.
Gaetan
PS: here is the diff I used to generate the results, including the script
for the stats:
---
diff --git a/mpsc-stats.sh b/mpsc-stats.sh
new file mode 100755
index 000000000..1ee177ede
--- /dev/null
+++ b/mpsc-stats.sh
@@ -0,0 +1,80 @@
+#!/usr/bin/env sh
+
+N_ELEM=${1:-1000000}
+N_CORE=${2:-1}
+N_RUN=${3:-20}
+
+#BIN="taskset -c 0,1,2,3 ./tests/ovstest test-mpsc-queue benchmark "
+BIN="./tests/ovstest test-mpsc-queue benchmark "
+CMD="$BIN $N_ELEM $N_CORE"
+CMD_fast="$BIN 1 1"
+
+xc() { python -c "import math; print(float($*))"; }
+join_by() {( IFS="$1"; shift; echo "$*"; )}
+
+stdev() {(
+ m=$1; shift;
+ sum=0
+ for v in $*; do
+ sum=$(xc "$sum + pow($v - $m, 2)")
+ done
+ echo $(xc "math.sqrt($sum / $#)")
+)}
+mean() { echo $(xc "($(join_by + $*)) / $#"); }
+
+# $1: name field max width
+# $2: stat name
+# $3-: values
+print_stats() {(
+ len=$1; shift;
+ name=$1; shift;
+ values="$*"
+ m=$(mean $values)
+ sd=$(stdev $m $values)
+ printf "%*s: avg %6.1f | stdev %6.1f | max %5.0f | min %5.0f\n" \
+ "$len" "$name" "$m" "$sd" \
+ "$(xc "max($(join_by , $values))")" \
+ "$(xc "min($(join_by , $values))")"
+)}
+
+tmp=$(mktemp)
+$CMD_fast | grep -v 'Benchmarking\|Reader' | \
+while read line; do
+ name="$(echo $line | cut -d: -f1)"
+ printf "%s reader\t%s writers\t" "$name" "$name"
+ #printf "%s writers\t" "$name"
+done >> $tmp
+printf "\n" >> $tmp
+
+echo "$N_RUN times '$CMD':"
+
+for i in $(seq 1 $N_RUN); do
+$CMD | grep -v 'Benchmarking\|Reader' | \
+while read line; do
+ name="$(echo $line | cut -d: -f1)"
+ reader="$(echo $line | cut -d: -f2- | cut -d' ' -f2)"
+ writers="$(echo $line | cut -d: -f2- | rev | cut -d' ' -f2 | rev)"
+
+ printf "%d\t%d\t" $reader $writers
+ #printf "%d\t" $writers
+done >> $tmp
+printf "\n" >> $tmp
+done
+
+#cat $tmp
+
+nb_col=$(awk -F$'\t' '{print NF-1}' $tmp | head -1)
+maxlen=0
+for i in $(seq 1 $nb_col); do
+ name=$(head -1 $tmp | cut -d$'\t' -f$i)
+ len=$(printf "%s" "$name" |wc -m)
+ [ "$maxlen" -lt "$len" ] && maxlen=$len
+done
+
+for i in $(seq 1 $nb_col); do
+ name=$(head -1 $tmp | cut -d$'\t' -f$i)
+ values=$(tail -n +2 $tmp | cut -d$'\t' -f$i)
+ print_stats $maxlen "$name" $values
+done
+
+rm $tmp
diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c
index ebd1226fe..cad7a0ce3 100644
--- a/tests/test-mpsc-queue.c
+++ b/tests/test-mpsc-queue.c
@@ -25,12 +25,22 @@
#include "guarded-list.h"
#include "mpsc-queue.h"
#include "openvswitch/list.h"
+#include "openvswitch/vlog.h"
#include "openvswitch/util.h"
+#include "ovs-numa.h"
+#include "ovs-rcu.h"
#include "ovs-thread.h"
#include "ovstest.h"
#include "timeval.h"
#include "util.h"
+static void
+set_affinity(unsigned int tid OVS_UNUSED)
+{
+ //ovs_numa_thread_setaffinity_core(tid * 2);
+ //ovs_numa_thread_setaffinity_core(tid);
+}
+
struct element {
union {
struct mpsc_queue_node mpscq;
@@ -318,7 +328,13 @@ mpsc_queue_insert_thread(void *aux_)
unsigned int id;
size_t i;
+ ovsrcu_quiesce_start();
+
atomic_add(&aux->thread_id, 1u, &id);
+ if (id == 0) {
+ ovs_numa_thread_setaffinity_core(1);
+ }
+ set_affinity(id + 1);
n_elems_per_thread = n_elems / n_threads;
th_elements = &elements[id * n_elems_per_thread];
@@ -358,6 +374,7 @@ benchmark_mpsc_queue(void)
aux.queue = &queue;
atomic_store(&aux.thread_id, 0);
+ set_affinity(0);
for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
mpsc_queue_insert(&queue, &elements[i].node.mpscq);
@@ -424,7 +441,7 @@ benchmark_mpsc_queue(void)
struct list_aux {
struct ovs_list *list;
- struct ovs_mutex *lock;
+ struct ovs_spin *lock;
atomic_uint thread_id;
};
@@ -438,7 +455,13 @@ locked_list_insert_thread(void *aux_)
unsigned int id;
size_t i;
+ ovsrcu_quiesce_start();
+
atomic_add(&aux->thread_id, 1u, &id);
+ if (id == 0) {
+ ovs_numa_thread_setaffinity_core(1);
+ }
+ set_affinity(id + 1);
n_elems_per_thread = n_elems / n_threads;
th_elements = &elements[id * n_elems_per_thread];
@@ -446,9 +469,9 @@ locked_list_insert_thread(void *aux_)
xgettimeofday(&start);
for (i = 0; i < n_elems_per_thread; i++) {
- ovs_mutex_lock(aux->lock);
+ ovs_spin_lock(aux->lock);
ovs_list_push_front(aux->list, &th_elements[i].node.list);
- ovs_mutex_unlock(aux->lock);
+ ovs_spin_unlock(aux->lock);
}
thread_working_ms[id] = elapsed(&start);
@@ -462,7 +485,7 @@ locked_list_insert_thread(void *aux_)
static void
benchmark_list(void)
{
- struct ovs_mutex lock;
+ struct ovs_spin lock;
struct ovs_list list;
struct element *elem;
struct timeval start;
@@ -477,18 +500,19 @@ benchmark_list(void)
memset(elements, 0, n_elems * sizeof *elements);
memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms);
- ovs_mutex_init(&lock);
+ ovs_spin_init(&lock);
ovs_list_init(&list);
aux.list = &list;
aux.lock = &lock;
atomic_store(&aux.thread_id, 0);
+ set_affinity(0);
- ovs_mutex_lock(&lock);
+ ovs_spin_lock(&lock);
for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
ovs_list_push_front(&list, &elements[i].node.list);
}
- ovs_mutex_unlock(&lock);
+ ovs_spin_unlock(&lock);
working = true;
@@ -505,12 +529,21 @@ benchmark_list(void)
counter = 0;
epoch = 1;
do {
- ovs_mutex_lock(&lock);
- LIST_FOR_EACH_POP (elem, node.list, &list) {
- elem->mark = epoch;
- counter++;
+ struct ovs_list *node = NULL;
+
+ ovs_spin_lock(&lock);
+ if (!ovs_list_is_empty(&list)) {
+ node = ovs_list_pop_front(&list);
+ }
+ ovs_spin_unlock(&lock);
+
+ if (!node) {
+ continue;
}
- ovs_mutex_unlock(&lock);
+
+ elem = CONTAINER_OF(node, struct element, node.list);
+ elem->mark = epoch;
+ counter++;
if (epoch == UINT64_MAX) {
epoch = 0;
}
@@ -525,14 +558,14 @@ benchmark_list(void)
avg /= n_threads;
/* Elements might have been inserted before threads were joined. */
- ovs_mutex_lock(&lock);
+ ovs_spin_lock(&lock);
LIST_FOR_EACH_POP (elem, node.list, &list) {
elem->mark = epoch;
counter++;
}
- ovs_mutex_unlock(&lock);
+ ovs_spin_unlock(&lock);
- printf(" list: %6d", elapsed(&start));
+ printf(" spin+list: %6d", elapsed(&start));
for (i = 0; i < n_threads; i++) {
printf(" %6" PRIu64, thread_working_ms[i]);
}
@@ -566,7 +599,13 @@ guarded_list_insert_thread(void *aux_)
unsigned int id;
size_t i;
+ ovsrcu_quiesce_start();
+
atomic_add(&aux->thread_id, 1u, &id);
+ if (id == 0) {
+ ovs_numa_thread_setaffinity_core(1);
+ }
+ set_affinity(id + 1);
n_elems_per_thread = n_elems / n_threads;
th_elements = &elements[id * n_elems_per_thread];
@@ -585,6 +624,7 @@ guarded_list_insert_thread(void *aux_)
return NULL;
}
+OVS_UNUSED
static void
benchmark_guarded_list(void)
{
@@ -608,6 +648,7 @@ benchmark_guarded_list(void)
aux.glist = &glist;
atomic_store(&aux.thread_id, 0);
+ set_affinity(0);
for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) {
guarded_list_push_back(&glist, &elements[i].node.list, n_elems);
@@ -680,6 +721,9 @@ run_benchmarks(struct ovs_cmdl_context *ctx)
long int l_elems;
size_t i;
+ vlog_set_levels(NULL, VLF_ANY_DESTINATION, VLL_OFF);
+ ovs_numa_init();
+
l_elems = strtol(ctx->argv[1], NULL, 10);
l_threads = strtol(ctx->argv[2], NULL, 10);
ovs_assert(l_elems > 0 && l_threads > 0);
More information about the dev
mailing list