[ovs-dev] [RFC PATCH 25/26] dpif-netdev: Use one or more offload threads

Gaetan Rivet grive at u256.net
Sat Dec 5 14:22:20 UTC 2020


Read the user configuration in the netdev-offload module to modify the
number of threads used to manage hardware offload requests.

This allows processing insertion, deletion and modification
concurrently.

The offload thread structure was modified to contain all needed
elements. This structure is multiplied by the number of requested
threads and used separately.

Signed-off-by: Gaetan Rivet <grive at u256.net>
---
 lib/dpif-netdev.c | 210 ++++++++++++++++++++++++++++++++--------------
 1 file changed, 145 insertions(+), 65 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index dedfaae37..f10478f79 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -437,25 +437,48 @@ struct dp_offload_thread_item {
 };
 
 struct dp_offload_thread {
-    struct mpsc_queue queue;
-    atomic_uint64_t enqueued_item;
-    struct cmap megaflow_to_mark;
-    struct cmap mark_to_flow;
-    struct mov_avg_ema ema;
+    PADDED_MEMBERS(CACHE_LINE_SIZE,
+        struct mpsc_queue queue;
+        atomic_uint64_t enqueued_item;
+        struct cmap megaflow_to_mark;
+        struct cmap mark_to_flow;
+        struct mov_avg_ema ema;
+    );
 };
 
+static struct dp_offload_thread *dp_offload_threads;
+static void *dp_netdev_flow_offload_main(void *arg);
+
 #define DP_NETDEV_OFFLOAD_EMA_N (10)
 
-static struct dp_offload_thread dp_offload_thread = {
-    .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
-    .megaflow_to_mark = CMAP_INITIALIZER,
-    .mark_to_flow = CMAP_INITIALIZER,
-    .enqueued_item = ATOMIC_VAR_INIT(0),
-    .ema = MOV_AVG_EMA_INITIALIZER(DP_NETDEV_OFFLOAD_EMA_N),
-};
+static void
+dp_netdev_offload_init(void)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+    unsigned int nb_offload_thread = netdev_offload_thread_nb();
+    unsigned int tid;
+
+    if (!ovsthread_once_start(&once)) {
+        return;
+    }
+
+    dp_offload_threads = xcalloc(nb_offload_thread,
+                                 sizeof *dp_offload_threads);
+
+    for (tid = 0; tid < nb_offload_thread; tid++) {
+        struct dp_offload_thread *thread;
 
-static struct ovsthread_once offload_thread_once
-    = OVSTHREAD_ONCE_INITIALIZER;
+        thread = &dp_offload_threads[tid];
+        mpsc_queue_init(&thread->queue);
+        cmap_init(&thread->megaflow_to_mark);
+        cmap_init(&thread->mark_to_flow);
+        atomic_init(&thread->enqueued_item, 0);
+        mov_avg_ema_init(&thread->ema, DP_NETDEV_OFFLOAD_EMA_N);
+        ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, thread);
+    }
+
+    ovsthread_once_done(&once);
+}
 
 #define XPS_TIMEOUT 500000LL    /* In microseconds. */
 
@@ -2450,11 +2473,12 @@ megaflow_to_mark_associate(const ovs_u128 *mega_ufid, uint32_t mark)
 {
     size_t hash = dp_netdev_flow_hash(mega_ufid);
     struct megaflow_to_mark_data *data = xzalloc(sizeof(*data));
+    unsigned int tid = netdev_offload_thread_id();
 
     data->mega_ufid = *mega_ufid;
     data->mark = mark;
 
-    cmap_insert(&dp_offload_thread.megaflow_to_mark,
+    cmap_insert(&dp_offload_threads[tid].megaflow_to_mark,
                 CONST_CAST(struct cmap_node *, &data->node), hash);
 }
 
@@ -2464,11 +2488,12 @@ megaflow_to_mark_disassociate(const ovs_u128 *mega_ufid)
 {
     size_t hash = dp_netdev_flow_hash(mega_ufid);
     struct megaflow_to_mark_data *data;
+    unsigned int tid = netdev_offload_thread_id();
 
     CMAP_FOR_EACH_WITH_HASH (data, node, hash,
-                             &dp_offload_thread.megaflow_to_mark) {
+                             &dp_offload_threads[tid].megaflow_to_mark) {
         if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
-            cmap_remove(&dp_offload_thread.megaflow_to_mark,
+            cmap_remove(&dp_offload_threads[tid].megaflow_to_mark,
                         CONST_CAST(struct cmap_node *, &data->node), hash);
             ovsrcu_postpone(free, data);
             return;
@@ -2484,9 +2509,10 @@ megaflow_to_mark_find(const ovs_u128 *mega_ufid)
 {
     size_t hash = dp_netdev_flow_hash(mega_ufid);
     struct megaflow_to_mark_data *data;
+    unsigned int tid = netdev_offload_thread_id();
 
     CMAP_FOR_EACH_WITH_HASH (data, node, hash,
-                             &dp_offload_thread.megaflow_to_mark) {
+                             &dp_offload_threads[tid].megaflow_to_mark) {
         if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
             return data->mark;
         }
@@ -2501,9 +2527,10 @@ megaflow_to_mark_find(const ovs_u128 *mega_ufid)
 static void
 mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
 {
+    unsigned int tid = netdev_offload_thread_id();
     dp_netdev_flow_ref(flow);
 
-    cmap_insert(&dp_offload_thread.mark_to_flow,
+    cmap_insert(&dp_offload_threads[tid].mark_to_flow,
                 CONST_CAST(struct cmap_node *, &flow->mark_node),
                 hash_int(mark, 0));
     flow->mark = mark;
@@ -2515,10 +2542,11 @@ mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
 static bool
 flow_mark_has_no_ref(uint32_t mark)
 {
+    unsigned int tid = netdev_offload_thread_id();
     struct dp_netdev_flow *flow;
 
     CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash_int(mark, 0),
-                             &dp_offload_thread.mark_to_flow) {
+                             &dp_offload_threads[tid].mark_to_flow) {
         if (flow->mark == mark) {
             return false;
         }
@@ -2534,6 +2562,7 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
     const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
     struct cmap_node *mark_node = CONST_CAST(struct cmap_node *,
                                              &flow->mark_node);
+    unsigned int tid = netdev_offload_thread_id();
     uint32_t mark = flow->mark;
     int ret = 0;
 
@@ -2543,7 +2572,8 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
         return EINVAL;
     }
 
-    cmap_remove(&dp_offload_thread.mark_to_flow, mark_node, hash_int(mark, 0));
+    cmap_remove(&dp_offload_threads[tid].mark_to_flow,
+                mark_node, hash_int(mark, 0));
     flow->mark = INVALID_FLOW_MARK;
 
     /*
@@ -2579,10 +2609,18 @@ static void
 flow_mark_flush(struct dp_netdev_pmd_thread *pmd)
 {
     struct dp_netdev_flow *flow;
+    unsigned int tid;
 
-    CMAP_FOR_EACH (flow, mark_node, &dp_offload_thread.mark_to_flow) {
-        if (flow->pmd_id == pmd->core_id) {
-            queue_netdev_flow_del(pmd, flow);
+    if (dp_offload_threads == NULL) {
+        return;
+    }
+
+    for (tid = 0; tid < netdev_offload_thread_nb(); tid++) {
+        CMAP_FOR_EACH (flow, mark_node,
+                       &dp_offload_threads[tid].mark_to_flow) {
+            if (flow->pmd_id == pmd->core_id) {
+                queue_netdev_flow_del(pmd, flow);
+            }
         }
     }
 }
@@ -2592,12 +2630,21 @@ mark_to_flow_find(const struct dp_netdev_pmd_thread *pmd,
                   const uint32_t mark)
 {
     struct dp_netdev_flow *flow;
+    unsigned int tid;
+    size_t hash;
 
-    CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash_int(mark, 0),
-                             &dp_offload_thread.mark_to_flow) {
-        if (flow->mark == mark && flow->pmd_id == pmd->core_id &&
-            flow->dead == false) {
-            return flow;
+    if (dp_offload_threads == NULL) {
+        return NULL;
+    }
+
+    hash = hash_int(mark, 0);
+    for (tid = 0; tid < netdev_offload_thread_nb(); tid++) {
+        CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash,
+                                 &dp_offload_threads[tid].mark_to_flow) {
+            if (flow->mark == mark && flow->pmd_id == pmd->core_id &&
+                flow->dead == false) {
+                return flow;
+            }
         }
     }
 
@@ -2641,8 +2688,13 @@ dp_netdev_flow_offload_unref(struct dp_offload_thread_item *offload)
 static void
 dp_netdev_append_flow_offload(struct dp_offload_thread_item *offload)
 {
-    mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
-    atomic_count_inc64(&dp_offload_thread.enqueued_item);
+    unsigned int i;
+
+    dp_netdev_offload_init();
+
+    i = netdev_offload_ufid_to_thread_id(offload->flow->mega_ufid);
+    mpsc_queue_insert(&dp_offload_threads[i].queue, &offload->node);
+    atomic_count_inc64(&dp_offload_threads[i].enqueued_item);
 }
 
 static int
@@ -2745,8 +2797,9 @@ err_free:
 #define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
 
 static void *
-dp_netdev_flow_offload_main(void *data OVS_UNUSED)
+dp_netdev_flow_offload_main(void *arg)
 {
+    struct dp_offload_thread *ofl_thread = arg;
     struct dp_offload_thread_item *offload;
     enum mpsc_queue_poll_result poll_result;
     struct mpsc_queue_node *node;
@@ -2758,7 +2811,7 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
     const char *op;
     int ret;
 
-    queue = &dp_offload_thread.queue;
+    queue = &ofl_thread->queue;
     if (!mpsc_queue_acquire(queue)) {
         VLOG_ERR("failed to register as consumer of the offload queue");
         return NULL;
@@ -2781,7 +2834,7 @@ sleep_until_next:
         }
 
         offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
-        atomic_count_dec64(&dp_offload_thread.enqueued_item);
+        atomic_count_dec64(&ofl_thread->enqueued_item);
 
         switch (offload->op) {
         case DP_NETDEV_FLOW_OFFLOAD_OP_ADD:
@@ -2803,7 +2856,7 @@ sleep_until_next:
         now = time_usec();
 
         latency_us = now - offload->timestamp;
-        mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+        mov_avg_ema_update(&ofl_thread->ema, latency_us);
 
         VLOG_DBG("%s to %s netdev flow "UUID_FMT,
                  ret == 0 ? "succeed" : "failed", op,
@@ -2832,12 +2885,6 @@ queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
 {
     struct dp_offload_thread_item *offload;
 
-    if (ovsthread_once_start(&offload_thread_once)) {
-        mpsc_queue_init(&dp_offload_thread.queue);
-        ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
-        ovsthread_once_done(&offload_thread_once);
-    }
-
     offload = dp_netdev_alloc_flow_offload(pmd, flow,
                                            DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
     offload->timestamp = pmd->ctx.now;
@@ -2856,12 +2903,6 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
         return;
     }
 
-    if (ovsthread_once_start(&offload_thread_once)) {
-        mpsc_queue_init(&dp_offload_thread.queue);
-        ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
-        ovsthread_once_done(&offload_thread_once);
-    }
-
     if (flow->mark != INVALID_FLOW_MARK) {
         op = DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
     } else {
@@ -4259,45 +4300,84 @@ dpif_netdev_offload_stats_get(struct dpif *dpif,
         DP_NETDEV_HW_OFFLOADS_STATS_INSERTED,
         DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN,
     };
-    const char *names[] = {
-        [DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED]     = "    Enqueued offloads",
-        [DP_NETDEV_HW_OFFLOADS_STATS_INSERTED]     = "    Inserted offloads",
-        [DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN] = " Average latency (us)",
+    struct {
+        const char *name;
+        uint64_t total;
+    } hwol_stats[] = {
+        [DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED] =
+            { "    Enqueued offloads", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] =
+            { "    Inserted offloads", 0 },
+        [DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN] =
+            { " Average latency (us)", 0 },
     };
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_port *port;
-    uint64_t nb_offloads;
+    unsigned int nb_thread;
+    uint64_t *port_nb_offloads;
+    uint64_t *nb_offloads;
+    unsigned int tid;
     size_t i;
 
     if (!netdev_is_flow_api_enabled()) {
         return EINVAL;
     }
 
-    stats->size = ARRAY_SIZE(names);
+    nb_thread = netdev_offload_thread_nb();
+    /* nb_thread counters for the overall total as well. */
+    stats->size = ARRAY_SIZE(hwol_stats) * (nb_thread + 1);
     stats->counters = xcalloc(stats->size, sizeof *stats->counters);
 
-    nb_offloads = 0;
+    nb_offloads = xcalloc(nb_thread, sizeof *nb_offloads);
+    port_nb_offloads = xcalloc(nb_thread, sizeof *port_nb_offloads);
 
     ovs_rwlock_rdlock(&dp->port_rwlock);
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        uint64_t port_nb_offloads = 0;
-
+        memset(port_nb_offloads, 0, nb_thread * sizeof *port_nb_offloads);
         /* Do not abort on read error from a port, just report 0. */
-        if (!netdev_hw_offload_stats_get(port->netdev, &port_nb_offloads)) {
-            nb_offloads += port_nb_offloads;
+        if (!netdev_hw_offload_stats_get(port->netdev, port_nb_offloads)) {
+            for (i = 0; i < nb_thread; i++) {
+                nb_offloads[i] += port_nb_offloads[i];
+            }
         }
     }
     ovs_rwlock_unlock(&dp->port_rwlock);
 
-    atomic_read_relaxed(&dp_offload_thread.enqueued_item,
-                 &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
-    stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
-    stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN].value =
-                                           mov_avg_ema(&dp_offload_thread.ema);
+    free(port_nb_offloads);
+
+    for (tid = 0; tid < nb_thread; tid++) {
+        uint64_t counts[ARRAY_SIZE(hwol_stats)];
+        size_t idx = ((tid + 1) * ARRAY_SIZE(hwol_stats));
+
+        memset(counts, 0, sizeof counts);
+        counts[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] = nb_offloads[tid];
+        if (dp_offload_threads != NULL) {
+            atomic_read_relaxed(&dp_offload_threads[tid].enqueued_item,
+                                &counts[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED]);
+
+            counts[DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN] =
+                                     mov_avg_ema(&dp_offload_threads[tid].ema);
+
+        }
+
+        for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
+            snprintf(stats->counters[idx + i].name,
+                     sizeof(stats->counters[idx + i].name),
+                     "  [%3u] %s", tid, hwol_stats[i].name);
+            stats->counters[idx + i].value = counts[i];
+            hwol_stats[i].total += counts[i];
+        }
+    }
+
+    free(nb_offloads);
+
+    /* Do an average of the average for the aggregate. */
+    hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LATENCY_MEAN].total /= nb_thread;
 
-    for (i = 0; i < ARRAY_SIZE(names); i++) {
+    for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
         snprintf(stats->counters[i].name, sizeof(stats->counters[i].name),
-                 "%s", names[i]);
+                 "  Total %s", hwol_stats[i].name);
+        stats->counters[i].value = hwol_stats[i].total;
     }
 
     return 0;
-- 
2.29.2



More information about the dev mailing list