[ovs-dev] [dpdk patch v3 6/6] dpif-netdev: Create multiple pmd threads by default.

Alex Wang alexw at nicira.com
Wed Sep 10 00:00:52 UTC 2014


With this commit, ovs by default will try creating 'number of
dpdk interfaces on numa node' pmd threads for each numa node
and pin the pmd threads to available cpu cores on the numa node.

NON_PMD_CORE_ID (currently 0) is used to reserve a particular
cpu core for the I/O of all non-pmd threads.  No pmd thread
can be pinned to this reserved core.

As side-effects of this commit:

- the exact-match cache for non-pmd threads is removed from
  'struct dp_netdev'.  Instead, all non-pmd threads will use
  the exact-match cache defined in the 'struct dp_netdev_pmd_thread'
  for NON_PMD_CORE_ID.

- the received packet processing functions are refactored to use
  'struct dp_netdev_pmd_thread' as input.

- the 'netdev_send()' function will be called with the proper
  queue id.

Signed-off-by: Alex Wang <alexw at nicira.com>

---
PATCH -> V2
- rebase and refactor the code.

V2 -> V3:
- both pmd and non-pmd threads can call the dpif_netdev_execute().
  so, use a per-thread variable to help recognize the calling thread.
---
 lib/dpif-netdev.c |  407 +++++++++++++++++++++++++++++++++++++----------------
 lib/dpif-netdev.h |    4 +-
 lib/netdev-dpdk.c |   17 ++-
 lib/netdev-dpdk.h |    7 +
 4 files changed, 302 insertions(+), 133 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index dcce02e..29a92b3 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -52,6 +52,7 @@
 #include "odp-util.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packet-dpif.h"
 #include "packets.h"
@@ -158,7 +159,6 @@ struct emc_cache {
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
- *    emc_mutex
  *    flow_mutex
  */
 struct dp_netdev {
@@ -195,17 +195,16 @@ struct dp_netdev {
     upcall_callback *upcall_cb;  /* Callback function for executing upcalls. */
     void *upcall_aux;
 
-    /* Forwarding threads. */
-    struct latch exit_latch;
-    struct pmd_thread *pmd_threads;
-    size_t n_pmd_threads;
-    int pmd_count;
-
-    /* Exact match cache for non-pmd devices.
-     * Pmd devices use instead each thread's flow_cache for this purpose.
-     * Protected by emc_mutex */
-    struct emc_cache flow_cache OVS_GUARDED;
-    struct ovs_mutex emc_mutex;
+    /* Stores all 'struct dp_netdev_pmd_thread's. */
+    struct cmap poll_threads;
+
+    /* Protects the access of the 'struct dp_netdev_pmd_thread'
+     * instance for non-pmd thread. */
+    struct ovs_mutex non_pmd_mutex;
+
+    /* Each pmd thread will store its pointer to
+     * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
+    ovsthread_key_t per_pmd_key;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -340,15 +339,25 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
  *
  * DPDK used PMD for accessing NIC.
  *
- * A thread that receives packets from PMD ports, looks them up in the flow
- * table, and executes the actions it finds.
+ * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
+ * I/O of all non-pmd threads.  There will be no actual thread created
+ * for the instance.
  **/
-struct pmd_thread {
+struct dp_netdev_pmd_thread {
     struct dp_netdev *dp;
+    struct cmap_node node;          /* In 'dp->poll_threads'. */
+    /* Per thread exact-match cache.  Note, the instance for cpu core
+     * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
+     * need to be protected (e.g. by 'dp_netdev_mutex').  All other
+     * instances will only be accessed by its own pmd thread. */
     struct emc_cache flow_cache;
+    struct latch exit_latch;        /* For terminating the pmd thread. */
+    atomic_uint change_seq;         /* For reloading pmd ports. */
     pthread_t thread;
-    int id;
-    atomic_uint change_seq;
+    int index;                      /* Idx of this pmd thread among pmd*/
+                                    /* threads on same numa node. */
+    int core_id;                    /* CPU core id of this pmd thread. */
+    int numa_id;                    /* numa node id of this pmd thread. */
 };
 
 #define PMD_INITIAL_SEQ 1
@@ -374,18 +383,22 @@ static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
     OVS_REQUIRES(dp->port_mutex);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
-static void dp_netdev_execute_actions(struct dp_netdev *dp,
+static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       struct dpif_packet **, int c,
                                       bool may_steal, struct pkt_metadata *,
-                                      struct emc_cache *flow_cache,
                                       const struct nlattr *actions,
                                       size_t actions_len);
-static void dp_netdev_input(struct dp_netdev *, struct emc_cache *,
+static void dp_netdev_input(struct dp_netdev_pmd_thread *,
                             struct dpif_packet **, int cnt,
                             struct pkt_metadata *);
-
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
 static void dp_netdev_disable_upcall(struct dp_netdev *);
+static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
+                                    struct dp_netdev *dp, int index,
+                                    int core_id, int numa_id);
+static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
+static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
+static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 
 static void emc_clear_entry(struct emc_entry *ce);
 
@@ -524,6 +537,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev *dp;
+    struct dp_netdev_pmd_thread *non_pmd;
     int error;
 
     dp = xzalloc(sizeof *dp);
@@ -543,7 +557,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
-    latch_init(&dp->exit_latch);
     fat_rwlock_init(&dp->upcall_rwlock);
 
     /* Disable upcalls by default. */
@@ -551,6 +564,16 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->upcall_aux = NULL;
     dp->upcall_cb = NULL;
 
+    cmap_init(&dp->poll_threads);
+    ovs_mutex_init_recursive(&dp->non_pmd_mutex);
+    ovsthread_key_create(&dp->per_pmd_key, NULL);
+
+    /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
+    ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
+    non_pmd = xzalloc(sizeof *non_pmd);
+    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
+                            OVS_NUMA_UNSPEC);
+
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
     ovs_mutex_unlock(&dp->port_mutex);
@@ -559,9 +582,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
         return error;
     }
 
-    ovs_mutex_init_recursive(&dp->emc_mutex);
-    emc_cache_init(&dp->flow_cache);
-
     *dpp = dp;
     return 0;
 }
@@ -603,8 +623,9 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_set_pmd_threads(dp, 0);
-    free(dp->pmd_threads);
+    dp_netdev_destroy_all_pmds(dp);
+    ovs_mutex_destroy(&dp->non_pmd_mutex);
+    ovsthread_key_delete(dp->per_pmd_key);
 
     dp_netdev_flow_flush(dp);
     ovs_mutex_lock(&dp->port_mutex);
@@ -625,10 +646,6 @@ dp_netdev_free(struct dp_netdev *dp)
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
     fat_rwlock_destroy(&dp->upcall_rwlock);
-    latch_destroy(&dp->exit_latch);
-
-    emc_cache_uninit(&dp->flow_cache);
-    ovs_mutex_destroy(&dp->emc_mutex);
 
     free(CONST_CAST(char *, dp->name));
     free(dp);
@@ -696,15 +713,22 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
 }
 
 static void
-dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
 {
-    int i;
+    int old_seq;
+
+    atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
+}
 
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
-        int old_seq;
+/* Causes all pmd threads to reload its tx/rx devices.
+ * Must be called after adding/removing ports. */
+static void
+dp_netdev_reload_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
 
-        atomic_add_relaxed(&f->change_seq, 1, &old_seq);
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_reload_pmd__(pmd);
     }
 }
 
@@ -777,9 +801,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     port->sf = sf;
 
     if (netdev_is_pmd(netdev)) {
-        dp->pmd_count++;
-        dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
-        dp_netdev_reload_pmd_threads(dp);
+        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
+        dp_netdev_reload_pmds(dp);
     }
     ovs_refcount_init(&port->ref_cnt);
 
@@ -930,6 +953,39 @@ get_port_by_name(struct dp_netdev *dp,
     return ENOENT;
 }
 
+static int
+get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    int n_pmds = 0;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id) {
+            n_pmds++;
+        }
+    }
+
+    return n_pmds;
+}
+
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on numa node 'numa_id'. */
+static bool
+has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_get_numa_id(port->netdev) == numa_id) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -937,7 +993,14 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
     if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_reload_pmd_threads(dp);
+        int numa_id = netdev_get_numa_id(port->netdev);
+
+        /* If there is no netdev on the numa node, deletes the pmd threads
+         * for that numa.  Else, just reloads the queues.  */
+        if (!has_pmd_port_for_numa(dp, numa_id)) {
+            dp_netdev_del_pmds_on_numa(dp, numa_id);
+        }
+        dp_netdev_reload_pmds(dp);
     }
 
     port_unref(port);
@@ -1694,8 +1757,10 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
 
 static int
 dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
     struct dpif_packet packet, *pp;
     struct pkt_metadata *md = &execute->md;
 
@@ -1707,11 +1772,24 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     packet.ofpbuf = *execute->packet;
     pp = &packet;
 
-    ovs_mutex_lock(&dp->emc_mutex);
-    dp_netdev_execute_actions(dp, &pp, 1, false, md,
-                              &dp->flow_cache, execute->actions,
+    /* Tries finding the 'pmd'.  If NULL is returned, that means
+     * the current thread is a non-pmd thread and should use
+     * dp_netdev_get_nonpmd(). */
+    pmd = ovsthread_getspecific(dp->per_pmd_key);
+    if (!pmd) {
+        pmd = dp_netdev_get_nonpmd(dp);
+    }
+
+    /* If the current thread is non-pmd thread, acquires
+     * the 'non_pmd_mutex'. */
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        ovs_mutex_lock(&dp->non_pmd_mutex);
+    }
+    dp_netdev_execute_actions(pmd, &pp, 1, false, md, execute->actions,
                               execute->actions_len);
-    ovs_mutex_unlock(&dp->emc_mutex);
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        ovs_mutex_unlock(&dp->non_pmd_mutex);
+    }
 
     /* Even though may_steal is set to false, some actions could modify or
      * reallocate the ofpbuf memory. We need to pass those changes to the
@@ -1788,8 +1866,7 @@ dp_netdev_actions_free(struct dp_netdev_actions *actions)
 
 
 static void
-dp_netdev_process_rxq_port(struct dp_netdev *dp,
-                           struct emc_cache *flow_cache,
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct dp_netdev_port *port,
                            struct netdev_rxq *rxq)
 {
@@ -1801,7 +1878,7 @@ dp_netdev_process_rxq_port(struct dp_netdev *dp,
         struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no);
 
         *recirc_depth_get() = 0;
-        dp_netdev_input(dp, flow_cache, packets, cnt, &md);
+        dp_netdev_input(pmd, packets, cnt, &md);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -1815,19 +1892,19 @@ dpif_netdev_run(struct dpif *dpif)
 {
     struct dp_netdev_port *port;
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp);
 
-    ovs_mutex_lock(&dp->emc_mutex);
+    ovs_mutex_lock(&dp->non_pmd_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
         if (!netdev_is_pmd(port->netdev)) {
             int i;
 
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                dp_netdev_process_rxq_port(dp, &dp->flow_cache, port,
-                                           port->rxq[i]);
+                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
             }
         }
     }
-    ovs_mutex_unlock(&dp->emc_mutex);
+    ovs_mutex_unlock(&dp->non_pmd_mutex);
 }
 
 static void
@@ -1855,33 +1932,32 @@ struct rxq_poll {
 };
 
 static int
-pmd_load_queues(struct pmd_thread *f,
+pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                 struct rxq_poll **ppoll_list, int poll_cnt)
 {
-    struct dp_netdev *dp = f->dp;
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
-    int id = f->id;
-    int index;
-    int i;
+    int n_pmds_on_numa, index, i;
 
     /* Simple scheduler for netdev rx polling. */
     for (i = 0; i < poll_cnt; i++) {
-         port_unref(poll_list[i].port);
+        port_unref(poll_list[i].port);
     }
 
     poll_cnt = 0;
+    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
     index = 0;
 
-    CMAP_FOR_EACH (port, node, &f->dp->ports) {
+    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
         /* Calls port_try_ref() to prevent the main thread
          * from deleting the port. */
         if (port_try_ref(port)) {
-            if (netdev_is_pmd(port->netdev)) {
+            if (netdev_is_pmd(port->netdev)
+                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
                 int i;
 
                 for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                    if ((index % dp->n_pmd_threads) == id) {
+                    if ((index % n_pmds_on_numa) == pmd->index) {
                         poll_list = xrealloc(poll_list,
                                         sizeof *poll_list * (poll_cnt + 1));
 
@@ -1905,8 +1981,7 @@ pmd_load_queues(struct pmd_thread *f,
 static void *
 pmd_thread_main(void *f_)
 {
-    struct pmd_thread *f = f_;
-    struct dp_netdev *dp = f->dp;
+    struct dp_netdev_pmd_thread *pmd = f_;
     unsigned int lc = 0;
     struct rxq_poll *poll_list;
     unsigned int port_seq = PMD_INITIAL_SEQ;
@@ -1916,17 +1991,18 @@ pmd_thread_main(void *f_)
     poll_cnt = 0;
     poll_list = NULL;
 
-    pmd_thread_setaffinity_cpu(f->id);
+    /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
+    ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
+    pmd_thread_setaffinity_cpu(pmd->core_id);
 reload:
-    emc_cache_init(&f->flow_cache);
-    poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+    emc_cache_init(&pmd->flow_cache);
+    poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
 
     for (;;) {
         int i;
 
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port,
-                                       poll_list[i].rx);
+            dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
         }
 
         if (lc++ > 1024) {
@@ -1936,7 +2012,7 @@ reload:
 
             ovsrcu_quiesce();
 
-            atomic_read_relaxed(&f->change_seq, &seq);
+            atomic_read_relaxed(&pmd->change_seq, &seq);
             if (seq != port_seq) {
                 port_seq = seq;
                 break;
@@ -1944,9 +2020,9 @@ reload:
         }
     }
 
-    emc_cache_uninit(&f->flow_cache);
+    emc_cache_uninit(&pmd->flow_cache);
 
-    if (!latch_is_set(&f->dp->exit_latch)){
+    if (!latch_is_set(&pmd->exit_latch)){
         goto reload;
     }
 
@@ -1988,40 +2064,124 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
     dp_netdev_enable_upcall(dp);
 }
 
+/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_get_nonpmd(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct cmap_node *pnode;
+
+    pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0));
+    ovs_assert(pnode);
+    pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
+
+    return pmd;
+}
+
+/* Configures the 'pmd' based on the input argument. */
+static void
+dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
+                        int index, int core_id, int numa_id)
+{
+    pmd->dp = dp;
+    pmd->index = index;
+    pmd->core_id = core_id;
+    pmd->numa_id = numa_id;
+    latch_init(&pmd->exit_latch);
+    atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
+    /* init the 'flow_cache' since there is no
+     * actual thread created for NON_PMD_CORE_ID. */
+    if (core_id == NON_PMD_CORE_ID) {
+        emc_cache_init(&pmd->flow_cache);
+    }
+    cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
+                hash_int(core_id, 0));
+}
+
+/* Stops the pmd thread, removes it from the 'dp->poll_threads'
+ * and destroys the struct. */
 static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
 {
-    int i;
+    /* Uninit the 'flow_cache' since there is
+     * no actual thread uninit it. */
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        emc_cache_uninit(&pmd->flow_cache);
+    } else {
+        latch_set(&pmd->exit_latch);
+        dp_netdev_reload_pmd__(pmd);
+        ovs_numa_unpin_core(pmd->core_id);
+        xpthread_join(pmd->thread, NULL);
+    }
+    cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
+    latch_destroy(&pmd->exit_latch);
+    free(pmd);
+}
 
-    if (n == dp->n_pmd_threads) {
-        return;
+/* Destroys all pmd threads. */
+static void
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_del_pmd(pmd);
     }
+}
 
-    /* Stop existing threads. */
-    latch_set(&dp->exit_latch);
-    dp_netdev_reload_pmd_threads(dp);
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+/* Deletes all pmd threads on numa node 'numa_id'. */
+static void
+dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_pmd_thread *pmd;
 
-        xpthread_join(f->thread, NULL);
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id) {
+            dp_netdev_del_pmd(pmd);
+        }
     }
-    latch_poll(&dp->exit_latch);
-    free(dp->pmd_threads);
+}
 
-    /* Start new threads. */
-    dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
-    dp->n_pmd_threads = n;
+/* Checks the numa node id of 'netdev' and starts pmd threads for
+ * the numa node. */
+static void
+dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    int n_pmds;
 
-    for (i = 0; i < n; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+    if (!ovs_numa_numa_id_is_valid(numa_id)) {
+        VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
+                 "invalid", numa_id);
+        return ;
+    }
+
+    n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
+
+    /* If there are already pmd threads created for the numa node
+     * in which 'netdev' is on, do nothing.  Else, creates the
+     * pmd threads for the numa node. */
+    if (!n_pmds) {
+        int can_have, n_unpinned, i;
+
+        n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
+        if (!n_unpinned) {
+            VLOG_ERR("Cannot create pmd threads due to out of unpinned "
+                     "cores on numa node");
+            return;
+        }
 
-        f->dp = dp;
-        f->id = i;
-        atomic_init(&f->change_seq, PMD_INITIAL_SEQ);
+        /* Tries creating 'number of dpdk ifaces on numa node' pmd threads. */
+        can_have = MIN(n_unpinned, netdev_dpdk_n_devs_on_numa(numa_id));
+        for (i = 0; i < can_have; i++) {
+            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+            int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
 
-        /* Each thread will distribute all devices rx-queues among
-         * themselves. */
-        f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+            dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id);
+            /* Each thread will distribute all devices rx-queues among
+             * themselves. */
+            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+        }
+        VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
     }
 }
 
@@ -2161,8 +2321,8 @@ packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow,
 }
 
 static inline void
-packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
-                     struct emc_cache *flow_cache)
+packet_batch_execute(struct packet_batch *batch,
+                     struct dp_netdev_pmd_thread *pmd)
 {
     struct dp_netdev_actions *actions;
     struct dp_netdev_flow *flow = batch->flow;
@@ -2172,11 +2332,10 @@ packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
 
     actions = dp_netdev_flow_get_actions(flow);
 
-    dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true,
-                              &batch->md, flow_cache,
-                              actions->actions, actions->size);
+    dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
+                              &batch->md, actions->actions, actions->size);
 
-    dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
+    dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
 }
 
 static inline bool
@@ -2231,12 +2390,13 @@ dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b)
  * 'packets' array (they have been moved to the beginning of the vector).
  */
 static inline size_t
-emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
-               struct dpif_packet **packets, size_t cnt,
-               struct pkt_metadata *md, struct netdev_flow_key *keys)
+emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
+               size_t cnt, struct pkt_metadata *md,
+               struct netdev_flow_key *keys)
 {
     struct netdev_flow_key key;
     struct packet_batch batches[4];
+    struct emc_cache *flow_cache = &pmd->flow_cache;
     size_t n_batches, i;
     size_t notfound_cnt = 0;
 
@@ -2269,14 +2429,14 @@ emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
     }
 
     for (i = 0; i < n_batches; i++) {
-        packet_batch_execute(&batches[i], dp, flow_cache);
+        packet_batch_execute(&batches[i], pmd);
     }
 
     return notfound_cnt;
 }
 
 static inline void
-fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
+fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      struct dpif_packet **packets, size_t cnt,
                      struct pkt_metadata *md, struct netdev_flow_key *keys)
 {
@@ -2289,6 +2449,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
     struct packet_batch batches[PKT_ARRAY_SIZE];
     const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. */
     struct cls_rule *rules[PKT_ARRAY_SIZE];
+    struct dp_netdev *dp = pmd->dp;
+    struct emc_cache *flow_cache = &pmd->flow_cache;
     size_t n_batches, i;
     bool any_miss;
 
@@ -2337,8 +2499,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
             /* We can't allow the packet batching in the next loop to execute
              * the actions.  Otherwise, if there are any slow path actions,
              * we'll send the packet up twice. */
-            dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
-                                      flow_cache, ofpbuf_data(&actions),
+            dp_netdev_execute_actions(pmd, &packets[i], 1, false, md,
+                                      ofpbuf_data(&actions),
                                       ofpbuf_size(&actions));
 
             add_actions = ofpbuf_size(&put_actions)
@@ -2375,18 +2537,19 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
         }
 
         flow = dp_netdev_flow_cast(rules[i]);
-        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), flow);
+        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet),
+                   flow);
         dp_netdev_queue_batches(packet, md, flow, mfs[i], batches, &n_batches,
                                 ARRAY_SIZE(batches));
     }
 
     for (i = 0; i < n_batches; i++) {
-        packet_batch_execute(&batches[i], dp, flow_cache);
+        packet_batch_execute(&batches[i], pmd);
     }
 }
 
 static void
-dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
+dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
                 struct dpif_packet **packets, int cnt, struct pkt_metadata *md)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
@@ -2398,15 +2561,14 @@ dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
     struct netdev_flow_key keys[PKT_ARRAY_SIZE];
     size_t newcnt;
 
-    newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys);
+    newcnt = emc_processing(pmd, packets, cnt, md, keys);
     if (OVS_UNLIKELY(newcnt)) {
-        fast_path_processing(dp, flow_cache, packets, newcnt, md, keys);
+        fast_path_processing(pmd, packets, newcnt, md, keys);
     }
 }
 
 struct dp_netdev_execute_aux {
-    struct dp_netdev *dp;
-    struct emc_cache *flow_cache;
+    struct dp_netdev_pmd_thread *pmd;
 };
 
 static void
@@ -2426,7 +2588,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 {
     struct dp_netdev_execute_aux *aux = aux_;
     uint32_t *depth = recirc_depth_get();
-    struct dp_netdev *dp = aux->dp;
+    struct dp_netdev_pmd_thread *pmd= aux->pmd;
+    struct dp_netdev *dp= pmd->dp;
     int type = nl_attr_type(a);
     struct dp_netdev_port *p;
     int i;
@@ -2435,7 +2598,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
     case OVS_ACTION_ATTR_OUTPUT:
         p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
         if (OVS_LIKELY(p)) {
-            netdev_send(p->netdev, NETDEV_QID_NONE, packets, cnt, may_steal);
+            netdev_send(p->netdev, pmd->core_id, packets, cnt, may_steal);
         } else if (may_steal) {
             for (i = 0; i < cnt; i++) {
                 dpif_packet_delete(packets[i]);
@@ -2462,8 +2625,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                                          DPIF_UC_ACTION, userdata, &actions,
                                          NULL);
                 if (!error || error == ENOSPC) {
-                    dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
-                                              aux->flow_cache,
+                    dp_netdev_execute_actions(pmd, &packets[i], 1, false, md,
                                               ofpbuf_data(&actions),
                                               ofpbuf_size(&actions));
                 }
@@ -2525,7 +2687,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                 /* Hash is private to each packet */
                 recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]);
 
-                dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1,
+                dp_netdev_input(pmd, &recirc_pkt, 1,
                                 &recirc_md);
             }
             (*depth)--;
@@ -2555,13 +2717,12 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 }
 
 static void
-dp_netdev_execute_actions(struct dp_netdev *dp,
+dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                           struct dpif_packet **packets, int cnt,
                           bool may_steal, struct pkt_metadata *md,
-                          struct emc_cache *flow_cache,
                           const struct nlattr *actions, size_t actions_len)
 {
-    struct dp_netdev_execute_aux aux = {dp, flow_cache};
+    struct dp_netdev_execute_aux aux = {pmd};
 
     odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
                         actions_len, dp_execute_cb);
diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
index adbbf87..f501f7c 100644
--- a/lib/dpif-netdev.h
+++ b/lib/dpif-netdev.h
@@ -40,9 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b)
     }
 }
 
-#define NETDEV_QID_NONE INT_MAX
-
-#define NR_PMD_THREADS 1
+#define NON_PMD_CORE_ID 0
 
 #ifdef  __cplusplus
 }
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 26b1591..928ca3f 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -404,7 +404,6 @@ dpdk_get_n_devs(int numa_id)
             count++;
         }
     }
-    ovs_assert(count);
 
     return count;
 }
@@ -495,8 +494,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk
 
     ovs_mutex_lock(&netdev->mutex);
 
-    /* XXX: need to discover device node at run time. */
-    netdev->socket_id = SOCKET0;
+    netdev->socket_id = rte_eth_dev_socket_id(port_no);
 
     /* There can only be ovs_numa_get_n_cores() pmd threads, so creates a tx_q
      * for each of them. */
@@ -1503,6 +1501,13 @@ netdev_dpdk_register(void)
     }
 }
 
+/* Returns the number of dpdk interfaces on numa node 'numa_id'. */
+int
+netdev_dpdk_n_devs_on_numa(int numa_id)
+{
+    return dpdk_get_n_devs(numa_id);
+}
+
 int
 pmd_thread_setaffinity_cpu(int cpu)
 {
@@ -1517,7 +1522,8 @@ pmd_thread_setaffinity_cpu(int cpu)
         return err;
     }
     /* lcore_id 0 is reseved for use by non pmd threads. */
-    RTE_PER_LCORE(_lcore_id) = cpu + 1;
+    ovs_assert(cpu);
+    RTE_PER_LCORE(_lcore_id) = cpu;
 
     return 0;
 }
@@ -1525,9 +1531,6 @@ pmd_thread_setaffinity_cpu(int cpu)
 void
 thread_set_nonpmd(void)
 {
-    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
-     * for non pmd threads */
-    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
     /* We have to use 0 to allow non pmd threads to perform certain DPDK
      * operations, like rte_eth_dev_configure(). */
     RTE_PER_LCORE(_lcore_id) = 0;
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index e4ba6fc..8fe5a42 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -22,6 +22,7 @@ struct dpif_packet;
 
 int dpdk_init(int argc, char **argv);
 void netdev_dpdk_register(void);
+int netdev_dpdk_n_devs_on_numa(int numa_id);
 void free_dpdk_buf(struct dpif_packet *);
 int pmd_thread_setaffinity_cpu(int cpu);
 void thread_set_nonpmd(void);
@@ -40,6 +41,12 @@ netdev_dpdk_register(void)
     /* Nothing */
 }
 
+static inline int
+netdev_dpdk_n_devs_on_numa(int numa_id OVS_UNUSED)
+{
+    return -1;
+}
+
 static inline void
 free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
 {
-- 
1.7.9.5




More information about the dev mailing list