[ovs-dev] [PATCH v2 09/19] dpif-netdev: Create pmd threads for every numa node.

Daniele Di Proietto diproiettod at vmware.com
Sat Dec 3 02:14:08 UTC 2016


A lot of the complexity in the code that handles pmd threads and ports
in dpif-netdev is due to the fact that we postpone the creation of pmd
threads on a numa node until we have a port that needs to be polled on
that particular node.

Since the previous commit, a pmd thread with no ports will not consume
any CPU, so it seems easier to create all the threads at once.

This will also make future commits easier.

Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
---
 lib/dpif-netdev.c | 206 ++++++++++++++++++------------------------------------
 tests/pmd.at      |   2 +-
 2 files changed, 68 insertions(+), 140 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 4a10956..35e798b 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -575,8 +575,8 @@ static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
 static struct dp_netdev_pmd_thread *
 dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
-static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+static void dp_netdev_stop_pmds(struct dp_netdev *dp);
+static void dp_netdev_start_pmds(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
@@ -1092,19 +1092,20 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_destroy_all_pmds(dp);
-    ovs_mutex_destroy(&dp->non_pmd_mutex);
-    ovsthread_key_delete(dp->per_pmd_key);
-
-    conntrack_destroy(&dp->conntrack);
-
     ovs_mutex_lock(&dp->port_mutex);
     HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
+    dp_netdev_destroy_all_pmds(dp);
     cmap_destroy(&dp->poll_threads);
 
+    ovs_mutex_destroy(&dp->non_pmd_mutex);
+    ovsthread_key_delete(dp->per_pmd_key);
+
+    conntrack_destroy(&dp->conntrack);
+
+
     seq_destroy(dp->reconfigure_seq);
 
     seq_destroy(dp->port_seq);
@@ -1348,10 +1349,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     }
 
     if (netdev_is_pmd(port->netdev)) {
-        int numa_id = netdev_get_numa_id(port->netdev);
-
-        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
-        dp_netdev_set_pmds_on_numa(dp, numa_id);
+        dp_netdev_start_pmds(dp);
     }
 
     dp_netdev_add_port_to_pmds(dp, port);
@@ -1490,45 +1488,16 @@ get_n_pmd_threads(struct dp_netdev *dp)
     return cmap_count(&dp->poll_threads) - 1;
 }
 
-static int
-get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    int n_pmds = 0;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->numa_id == numa_id) {
-            n_pmds++;
-        }
-    }
-
-    return n_pmds;
-}
-
-/* Returns 'true' if there is a port with pmd netdev and the netdev is on
- * numa node 'numa_id' or its rx queue assigned to core on that numa node . */
+/* Returns 'true' if there is a port with pmd netdev. */
 static bool
-has_pmd_rxq_for_numa(struct dp_netdev *dp, int numa_id)
+has_pmd_port(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_port *port;
 
     HMAP_FOR_EACH (port, node, &dp->ports) {
         if (netdev_is_pmd(port->netdev)) {
-            int i;
-
-            if (netdev_get_numa_id(port->netdev) == numa_id) {
-                return true;
-            }
-
-            for (i = 0; i < port->n_rxq; i++) {
-                unsigned core_id = port->rxqs[i].core_id;
-
-                if (core_id != -1
-                    && ovs_numa_get_numa_id(core_id) == numa_id) {
-                    return true;
-                }
-            }
+            return true;
         }
     }
 
@@ -1546,14 +1515,9 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     dp_netdev_del_port_from_all_pmds(dp, port);
 
     if (netdev_is_pmd(port->netdev)) {
-        int numa_id = netdev_get_numa_id(port->netdev);
-
-        /* PMD threads can not be on invalid numa node. */
-        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
-        /* If there is no netdev on the numa node, deletes the pmd threads
-         * for that numa. */
-        if (!has_pmd_rxq_for_numa(dp, numa_id)) {
-            dp_netdev_del_pmds_on_numa(dp, numa_id);
+        /* If there is no pmd netdev, delete the pmd threads */
+        if (!has_pmd_port(dp)) {
+            dp_netdev_stop_pmds(dp);
         }
     }
 
@@ -3404,18 +3368,22 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
     dp_netdev_pmd_unref(pmd);
 }
 
-/* Destroys all pmd threads. */
+/* Destroys all pmd threads, but not the non pmd thread. */
 static void
-dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
+dp_netdev_stop_pmds(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *pmd;
     struct dp_netdev_pmd_thread **pmd_list;
     size_t k = 0, n_pmds;
 
-    n_pmds = cmap_count(&dp->poll_threads);
+    n_pmds = get_n_pmd_threads(dp);
     pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /* We don't need to destroy the non pmd thread */
+        if (pmd->core_id == NON_PMD_CORE_ID) {
+            continue;
+        }
         /* We cannot call dp_netdev_del_pmd(), since it alters
          * 'dp->poll_threads' (while we're iterating it) and it
          * might quiesce. */
@@ -3429,51 +3397,30 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
     free(pmd_list);
 }
 
-/* Deletes all pmd threads on numa node 'numa_id' and
- * fixes static_tx_qids of other threads to keep them sequential. */
+/* Destroys all pmd threads, including the non pmd thread. */
 static void
-dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *pmd;
-    int n_pmds_on_numa, n_pmds;
-    int *free_idx, k = 0;
     struct dp_netdev_pmd_thread **pmd_list;
+    size_t k = 0, n_pmds;
 
-    n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
-    free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
-    pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
+    n_pmds = cmap_count(&dp->poll_threads);
+    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         /* We cannot call dp_netdev_del_pmd(), since it alters
          * 'dp->poll_threads' (while we're iterating it) and it
          * might quiesce. */
-        if (pmd->numa_id == numa_id && pmd->core_id != NON_PMD_CORE_ID) {
-            atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]);
-            pmd_list[k] = pmd;
-            ovs_assert(k < n_pmds_on_numa);
-            k++;
-        }
+        ovs_assert(k < n_pmds);
+        pmd_list[k++] = pmd;
     }
 
-    for (int i = 0; i < k; i++) {
+    for (size_t i = 0; i < k; i++) {
         dp_netdev_del_pmd(dp, pmd_list[i]);
     }
 
-    n_pmds = get_n_pmd_threads(dp);
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        int old_tx_qid;
-
-        atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid);
-
-        if (old_tx_qid >= n_pmds && pmd->core_id != NON_PMD_CORE_ID) {
-            int new_tx_qid = free_idx[--k];
-
-            atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid);
-        }
-    }
-
     free(pmd_list);
-    free(free_idx);
 }
 
 /* Deletes all rx queues from pmd->poll_list and all the ports from
@@ -3724,52 +3671,55 @@ dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
     hmapx_destroy(&to_reload);
 }
 
-/* Starts pmd threads for the numa node 'numa_id', if not already started.
- * The function takes care of filling the threads tx port cache. */
 static void
-dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
-    OVS_REQUIRES(dp->port_mutex)
+dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 {
-    int n_pmds;
+    int can_have, n_unpinned, i;
 
-    if (!ovs_numa_numa_id_is_valid(numa_id)) {
-        VLOG_WARN("Cannot create pmd threads due to numa id (%d) invalid",
-                  numa_id);
+    n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
+    if (!n_unpinned) {
+        VLOG_WARN("Cannot create pmd threads due to out of unpinned "
+                  "cores on numa node %d", numa_id);
         return;
     }
 
-    n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
+    /* If cpu mask is specified, uses all unpinned cores, otherwise
+     * tries creating NR_PMD_THREADS pmd threads. */
+    can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
+    for (i = 0; i < can_have; i++) {
+        unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
+        struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+        struct dp_netdev_port *port;
 
-    /* If there are already pmd threads created for the numa node
-     * in which 'netdev' is on, do nothing.  Else, creates the
-     * pmd threads for the numa node. */
-    if (!n_pmds) {
-        int can_have, n_unpinned, i;
+        dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
 
-        n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
-        if (!n_unpinned) {
-            VLOG_WARN("Cannot create pmd threads due to out of unpinned "
-                      "cores on numa node %d", numa_id);
-            return;
+        HMAP_FOR_EACH (port, node, &dp->ports) {
+            dp_netdev_add_port_tx_to_pmd(pmd, port);
         }
 
-        /* If cpu mask is specified, uses all unpinned cores, otherwise
-         * tries creating NR_PMD_THREADS pmd threads. */
-        can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
-        for (i = 0; i < can_have; i++) {
-            unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
-            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
-            struct dp_netdev_port *port;
+        pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+    }
+    VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
+}
+
+/* Starts pmd threads, if not already started. The function takes care of
+ * filling the threads tx port cache. */
+static void
+dp_netdev_start_pmds(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    int n_pmds;
 
-            dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
+    n_pmds = get_n_pmd_threads(dp);
 
-            HMAP_FOR_EACH (port, node, &dp->ports) {
-                dp_netdev_add_port_tx_to_pmd(pmd, port);
-            }
+    /* If there are already pmd threads created for the datapath, do nothing.
+     * Else, creates the pmd threads. */
+    if (!n_pmds) {
+        int n_numas = ovs_numa_get_n_numas();
 
-            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+        for (int numa_id = 0; numa_id < n_numas; numa_id++) {
+            dp_netdev_start_pmds_on_numa(dp, numa_id);
         }
-        VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
     }
 }
 
@@ -3785,30 +3735,8 @@ dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
     struct dp_netdev_port *port;
     struct hmapx_node *node;
 
+    dp_netdev_start_pmds(dp);
     HMAP_FOR_EACH (port, node, &dp->ports) {
-        if (netdev_is_pmd(port->netdev)) {
-            struct hmapx numas = HMAPX_INITIALIZER(&numas);
-            struct hmapx_node *numa_node;
-            uintptr_t numa_id;
-            int i;
-
-            numa_id = netdev_get_numa_id(port->netdev);
-            hmapx_add(&numas, (void *) numa_id);
-            for (i = 0; i < port->n_rxq; i++) {
-                unsigned core_id = port->rxqs[i].core_id;
-
-                if (core_id != -1) {
-                    numa_id = ovs_numa_get_numa_id(core_id);
-                    hmapx_add(&numas, (void *) numa_id);
-                }
-            }
-
-            HMAPX_FOR_EACH (numa_node, &numas) {
-                dp_netdev_set_pmds_on_numa(dp, (uintptr_t) numa_node->data);
-            }
-
-            hmapx_destroy(&numas);
-        }
         /* Distribute only pinned rx queues first to mark threads as isolated */
         dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
     }
diff --git a/tests/pmd.at b/tests/pmd.at
index 8f05d74..369120f 100644
--- a/tests/pmd.at
+++ b/tests/pmd.at
@@ -53,7 +53,7 @@ m4_define([CHECK_PMD_THREADS_CREATED], [
 ])
 
 m4_define([SED_NUMA_CORE_PATTERN], ["s/\(numa_id \)[[0-9]]*\( core_id \)[[0-9]]*:/\1<cleared>\2<cleared>:/"])
-m4_define([DUMMY_NUMA], [--dummy-numa="0,0,0,0,1,1,1,1"])
+m4_define([DUMMY_NUMA], [--dummy-numa="0,0,0,0"])
 
 AT_SETUP([PMD - creating a thread/add-port])
 OVS_VSWITCHD_START([add-port br0 p0 -- set Interface p0 type=dummy-pmd], [], [], [DUMMY_NUMA])
-- 
2.10.2



More information about the dev mailing list