[ovs-dev] [PATCH v2 24/28] dpif-netdev: Use lockless queue to manage offloads

Gaetan Rivet grive at u256.net
Mon Apr 12 15:20:10 UTC 2021


The dataplane threads (PMDs) send offloading commands to a dedicated
offload management thread. The current implementation uses a lock
and benchmarks show a high contention on the queue in some cases.

With high-contention, the mutex will more often lead to the locking
thread yielding in wait, using a syscall. This should be avoided in
a userland dataplane.

The mpsc-queue can be used instead. It uses less cycles and has
lower latency. Benchmarks show better behavior as multiple
revalidators and one or multiple PMDs writes to a single queue
while another thread polls it.

One trade-off with the new scheme however is to be forced to poll
the queue from the offload thread. Without mutex, a cond_wait
cannot be used for signaling. The offload thread is implementing
an exponential backoff and will sleep in short increments when no
data is available. This makes the thread yield, at the price of
some latency to manage offloads after an inactivity period.

Signed-off-by: Gaetan Rivet <grive at u256.net>
Reviewed-by: Eli Britstein <elibr at nvidia.com>
---
 lib/dpif-netdev.c | 109 ++++++++++++++++++++++++----------------------
 1 file changed, 57 insertions(+), 52 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 177d6a6dc..bc87b368b 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -52,6 +52,7 @@
 #include "id-pool.h"
 #include "ipf.h"
 #include "mov-avg.h"
+#include "mpsc-queue.h"
 #include "netdev.h"
 #include "netdev-offload.h"
 #include "netdev-provider.h"
@@ -452,25 +453,22 @@ union dp_offload_thread_data {
 };
 
 struct dp_offload_thread_item {
-    struct ovs_list node;
+    struct mpsc_queue_node node;
     enum dp_offload_type type;
     long long int timestamp;
     union dp_offload_thread_data data[0];
 };
 
 struct dp_offload_thread {
-    struct ovs_mutex mutex;
-    struct ovs_list list;
-    uint64_t enqueued_item;
+    struct mpsc_queue queue;
+    atomic_uint64_t enqueued_item;
     struct mov_avg_cma cma;
     struct mov_avg_ema ema;
-    pthread_cond_t cond;
 };
 
 static struct dp_offload_thread dp_offload_thread = {
-    .mutex = OVS_MUTEX_INITIALIZER,
-    .list  = OVS_LIST_INITIALIZER(&dp_offload_thread.list),
-    .enqueued_item = 0,
+    .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
+    .enqueued_item = ATOMIC_VAR_INIT(0),
     .cma = MOV_AVG_CMA_INITIALIZER,
     .ema = MOV_AVG_EMA_INITIALIZER(100),
 };
@@ -2697,11 +2695,8 @@ dp_netdev_free_offload(struct dp_offload_thread_item *offload)
 static void
 dp_netdev_append_offload(struct dp_offload_thread_item *offload)
 {
-    ovs_mutex_lock(&dp_offload_thread.mutex);
-    ovs_list_push_back(&dp_offload_thread.list, &offload->node);
-    dp_offload_thread.enqueued_item++;
-    xpthread_cond_signal(&dp_offload_thread.cond);
-    ovs_mutex_unlock(&dp_offload_thread.mutex);
+    mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
+    atomic_count_inc64(&dp_offload_thread.enqueued_item);
 }
 
 static int
@@ -2845,59 +2840,69 @@ dp_offload_flush(struct dp_offload_thread_item *item)
     ovs_barrier_block(flush->barrier);
 }
 
+#define DP_NETDEV_OFFLOAD_BACKOFF_MIN 1
+#define DP_NETDEV_OFFLOAD_BACKOFF_MAX 64
 #define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
 
 static void *
 dp_netdev_flow_offload_main(void *data OVS_UNUSED)
 {
     struct dp_offload_thread_item *offload;
-    struct ovs_list *list;
+    struct mpsc_queue_node *node;
+    struct mpsc_queue *queue;
     long long int latency_us;
     long long int next_rcu;
     long long int now;
+    uint64_t backoff;
 
-    next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-    for (;;) {
-        ovs_mutex_lock(&dp_offload_thread.mutex);
-        if (ovs_list_is_empty(&dp_offload_thread.list)) {
-            ovsrcu_quiesce_start();
-            ovs_mutex_cond_wait(&dp_offload_thread.cond,
-                                &dp_offload_thread.mutex);
-            ovsrcu_quiesce_end();
-            next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
-        }
-        list = ovs_list_pop_front(&dp_offload_thread.list);
-        dp_offload_thread.enqueued_item--;
-        offload = CONTAINER_OF(list, struct dp_offload_thread_item, node);
-        ovs_mutex_unlock(&dp_offload_thread.mutex);
-
-        switch (offload->type) {
-        case DP_OFFLOAD_FLOW:
-            dp_offload_flow(offload);
-            break;
-        case DP_OFFLOAD_FLUSH:
-            dp_offload_flush(offload);
-            break;
-        default:
-            OVS_NOT_REACHED();
+    queue = &dp_offload_thread.queue;
+    mpsc_queue_acquire(queue);
+
+    while (true) {
+        backoff = DP_NETDEV_OFFLOAD_BACKOFF_MIN;
+        while (mpsc_queue_tail(queue) == NULL) {
+            xnanosleep(backoff * 1E6);
+            if (backoff < DP_NETDEV_OFFLOAD_BACKOFF_MAX) {
+                backoff <<= 1;
+            }
         }
 
-        now = time_usec();
+        next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+        MPSC_QUEUE_FOR_EACH_POP (node, queue) {
+            offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
+            atomic_count_dec64(&dp_offload_thread.enqueued_item);
 
-        latency_us = now - offload->timestamp;
-        mov_avg_cma_update(&dp_offload_thread.cma, latency_us);
-        mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+            switch (offload->type) {
+            case DP_OFFLOAD_FLOW:
+                dp_offload_flow(offload);
+                break;
+            case DP_OFFLOAD_FLUSH:
+                dp_offload_flush(offload);
+                break;
+            default:
+                OVS_NOT_REACHED();
+            }
 
-        dp_netdev_free_offload(offload);
+            now = time_usec();
 
-        /* Do RCU synchronization at fixed interval. */
-        if (now > next_rcu) {
-            if (!ovsrcu_try_quiesce()) {
-                next_rcu += DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+            latency_us = now - offload->timestamp;
+            mov_avg_cma_update(&dp_offload_thread.cma, latency_us);
+            mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+
+            dp_netdev_free_offload(offload);
+
+            /* Do RCU synchronization at fixed interval. */
+            if (now > next_rcu) {
+                if (!ovsrcu_try_quiesce()) {
+                    next_rcu += DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
+                }
             }
         }
     }
 
+    OVS_NOT_REACHED();
+    mpsc_queue_release(queue);
+
     return NULL;
 }
 
@@ -2908,7 +2913,7 @@ queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
     struct dp_offload_thread_item *offload;
 
     if (ovsthread_once_start(&offload_thread_once)) {
-        xpthread_cond_init(&dp_offload_thread.cond, NULL);
+        mpsc_queue_init(&dp_offload_thread.queue);
         ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
         ovsthread_once_done(&offload_thread_once);
     }
@@ -2933,7 +2938,7 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
     }
 
     if (ovsthread_once_start(&offload_thread_once)) {
-        xpthread_cond_init(&dp_offload_thread.cond, NULL);
+        mpsc_queue_init(&dp_offload_thread.queue);
         ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
         ovsthread_once_done(&offload_thread_once);
     }
@@ -2984,7 +2989,7 @@ dp_netdev_offload_flush_enqueue(struct dp_netdev *dp,
     struct dp_offload_flush_item *flush;
 
     if (ovsthread_once_start(&offload_thread_once)) {
-        xpthread_cond_init(&dp_offload_thread.cond, NULL);
+        mpsc_queue_init(&dp_offload_thread.queue);
         ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
         ovsthread_once_done(&offload_thread_once);
     }
@@ -4471,8 +4476,8 @@ dpif_netdev_offload_stats_get(struct dpif *dpif,
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
-    stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value =
-        dp_offload_thread.enqueued_item;
+    atomic_read_relaxed(&dp_offload_thread.enqueued_item,
+        &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
     stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
     stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].value =
         mov_avg_cma(&dp_offload_thread.cma);
-- 
2.31.1



More information about the dev mailing list