[ovs-dev] [dpdk patch 5/8] dpif-netdev: Create 'number of dpdk ifaces on cpu socket' pmd threads for each cpu socket.

Alex Wang alexw at nicira.com
Tue Aug 12 04:56:40 UTC 2014


The pmd threads are pinned to available cpu cores on the
corresponding cpu socket.  Note, core 0 is not pinnable
and is reserved for all non-pmd threads.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
 lib/dpif-netdev.c |  254 +++++++++++++++++++++++++++++++++++++++++------------
 lib/dpif-netdev.h |    2 +-
 lib/netdev-dpdk.c |   40 ++++++---
 lib/netdev-dpdk.h |   15 ++++
 4 files changed, 244 insertions(+), 67 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index c637d9f..14784bf 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -52,6 +52,7 @@
 #include "odp-util.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packet-dpif.h"
 #include "packets.h"
@@ -71,6 +72,7 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 #define NETDEV_RULE_PRIORITY 0x8000
 
 #define FLOW_DUMP_MAX_BATCH 50
+
 /* Use per thread recirc_depth to prevent recirculation loop. */
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
@@ -142,11 +144,9 @@ struct dp_netdev {
     struct fat_rwlock upcall_rwlock;
     exec_upcall_cb *upcall_cb;  /* Callback function for executing upcalls. */
 
-    /* Forwarding threads. */
-    struct latch exit_latch;
-    struct pmd_thread *pmd_threads;
-    size_t n_pmd_threads;
-    int pmd_count;
+    /* Per-cpu-socket struct for configuring pmd threads. */
+    struct pmd_socket *pmd_sockets;
+    int n_pmd_sockets;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -281,6 +281,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
     const struct dp_netdev_flow *);
 static void dp_netdev_actions_free(struct dp_netdev_actions *);
 
+/* Represents the PMD configuration on a cpu socket. */
+struct pmd_socket {
+    struct dp_netdev *dp;
+    struct latch exit_latch;
+    struct pmd_thread *pmd_threads;
+    int socket_id;
+    int n_pmd_threads;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -293,9 +302,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
  * table, and executes the actions it finds.
  **/
 struct pmd_thread {
-    struct dp_netdev *dp;
+    struct pmd_socket *socket;
     pthread_t thread;
-    int id;
+    int index;
+    int core_id;
     atomic_uint change_seq;
 };
 
@@ -335,8 +345,10 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
                                  struct dpif_packet **packets, int cnt,
                                  odp_port_t port_no);
 
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
 static void dp_netdev_disable_upcall(struct dp_netdev *);
+static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *);
+static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id);
+static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id);
 
 static struct dpif_netdev *
 dpif_netdev_cast(const struct dpif *dpif)
@@ -450,7 +462,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev *dp;
-    int error;
+    int n_sockets, error;
 
     dp = xzalloc(sizeof *dp);
     shash_add(&dp_netdevs, name, dp);
@@ -469,13 +481,32 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
-    latch_init(&dp->exit_latch);
     fat_rwlock_init(&dp->upcall_rwlock);
 
     /* Disable upcalls by default. */
     dp_netdev_disable_upcall(dp);
     dp->upcall_cb = NULL;
 
+    /* Reserves the core 0 for main thread. */
+    ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
+
+    /* Creates 'pmd_socket' struct for each cpu socket. */
+    n_sockets = ovs_numa_get_n_sockets();
+    if (n_sockets != OVS_SOCKET_UNSPEC) {
+        int i;
+
+        dp->n_pmd_sockets = n_sockets;
+        dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets);
+        for (i = 0; i < dp->n_pmd_sockets; i++) {
+            dp->pmd_sockets[i].dp = dp;
+            dp->pmd_sockets[i].socket_id = i;
+            latch_init(&dp->pmd_sockets[i].exit_latch);
+        }
+    } else {
+        dp->n_pmd_sockets = 0;
+        dp->pmd_sockets = NULL;
+    }
+
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
     ovs_mutex_unlock(&dp->port_mutex);
@@ -525,8 +556,8 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_set_pmd_threads(dp, 0);
-    free(dp->pmd_threads);
+    dp_netdev_destroy_all_pmd_sockets(dp);
+    free(dp->pmd_sockets);
 
     dp_netdev_flow_flush(dp);
     ovs_mutex_lock(&dp->port_mutex);
@@ -547,7 +578,6 @@ dp_netdev_free(struct dp_netdev *dp)
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
     fat_rwlock_destroy(&dp->upcall_rwlock);
-    latch_destroy(&dp->exit_latch);
     free(CONST_CAST(char *, dp->name));
     free(dp);
 }
@@ -618,12 +648,17 @@ dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
 {
     int i;
 
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
-        int id;
+    for (i = 0; i < dp->n_pmd_sockets; i++) {
+        struct pmd_socket *s = &dp->pmd_sockets[i];
+        int j;
 
-        atomic_add(&f->change_seq, 1, &id);
-   }
+        for (j = 0; j < s->n_pmd_threads; j++) {
+            struct pmd_thread *f = &s->pmd_threads[j];
+            int id;
+
+            atomic_add(&f->change_seq, 1, &id);
+        }
+    }
 }
 
 static uint32_t
@@ -690,10 +725,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     }
     port->sf = sf;
 
-    if (netdev_is_pmd(netdev)) {
-        dp->pmd_count++;
-        dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
-        dp_netdev_reload_pmd_threads(dp);
+    if (netdev_is_pmd(netdev) && dp->pmd_sockets) {
+        dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev));
     }
     ovs_refcount_init(&port->ref_cnt);
 
@@ -834,6 +867,23 @@ get_port_by_name(struct dp_netdev *dp,
     return ENOENT;
 }
 
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on cpu socket 'socket_id'. */
+static bool
+has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_dpdk_get_socket_id(port->netdev) == socket_id) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -841,7 +891,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
     if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_reload_pmd_threads(dp);
+        int socket_id = netdev_dpdk_get_socket_id(port->netdev);
+
+        /* If there is no netdev on the cpu socket, unsets the pmd threads
+         * for that cpu socket.  Else, just reloads the queues.  */
+        if (!has_pmd_port_for_socket(dp, socket_id)) {
+            dp_netdev_unset_pmd_threads(dp, socket_id);
+        } else {
+            dp_netdev_reload_pmd_threads(dp);
+        }
     }
 
     port_unref(port);
@@ -1614,14 +1672,17 @@ struct rxq_poll {
     struct netdev_rxq *rx;
 };
 
+struct non_local_pmd_dev {
+    struct netdev *dev;
+};
+
 static int
 pmd_load_queues(struct pmd_thread *f,
                 struct rxq_poll **ppoll_list, int poll_cnt)
 {
-    struct dp_netdev *dp = f->dp;
+    struct pmd_socket *s = f->socket;
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
-    int id = f->id;
     int index;
     int i;
 
@@ -1633,12 +1694,13 @@ pmd_load_queues(struct pmd_thread *f,
     poll_cnt = 0;
     index = 0;
 
-    CMAP_FOR_EACH (port, node, &f->dp->ports) {
-        if (netdev_is_pmd(port->netdev)) {
+    CMAP_FOR_EACH (port, node, &s->dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) {
             int i;
 
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                if ((index % dp->n_pmd_threads) == id) {
+                if ((index % s->n_pmd_threads) == f->index) {
                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
 
                     port_ref(port);
@@ -1655,23 +1717,56 @@ pmd_load_queues(struct pmd_thread *f,
     return poll_cnt;
 }
 
+static int
+pmd_get_non_local_pmd_dev(struct pmd_thread *f,
+                          struct non_local_pmd_dev **pdev_list, int dev_cnt)
+{
+    struct pmd_socket *s = f->socket;
+    struct non_local_pmd_dev *dev_list = *pdev_list;
+    struct dp_netdev_port *port;
+    int i;
+
+    for (i = 0; i < dev_cnt; i++) {
+         netdev_close(dev_list[i].dev);
+    }
+
+    dev_cnt = 0;
+
+    CMAP_FOR_EACH (port, node, &s->dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_dpdk_get_socket_id(port->netdev) != s->socket_id) {
+            dev_list = xrealloc(dev_list, sizeof *dev_list * (dev_cnt + 1));
+            netdev_ref(port->netdev);
+            dev_list[dev_cnt].dev = port->netdev;
+            dev_cnt++;
+        }
+    }
+
+    *pdev_list = dev_list;
+    return dev_cnt;
+}
+
 static void *
 pmd_thread_main(void *f_)
 {
     struct pmd_thread *f = f_;
-    struct dp_netdev *dp = f->dp;
+    struct dp_netdev *dp = f->socket->dp;
     unsigned int lc = 0;
     struct rxq_poll *poll_list;
+    struct non_local_pmd_dev *dev_list;
     unsigned int port_seq;
-    int poll_cnt;
+    int poll_cnt, dev_cnt;
     int i;
 
     poll_cnt = 0;
+    dev_cnt = 0;
     poll_list = NULL;
+    dev_list = NULL;
 
-    pmd_thread_setaffinity_cpu(f->id);
+    pmd_thread_setaffinity_cpu(f->core_id);
 reload:
     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+    dev_cnt = pmd_get_non_local_pmd_dev(f, &dev_list, dev_cnt);
     atomic_read(&f->change_seq, &port_seq);
 
     for (;;) {
@@ -1682,6 +1777,10 @@ reload:
             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
         }
 
+        for (i = 0; i < dev_cnt; i++) {
+            netdev_dpdk_flush_non_local(dev_list[i].dev, f->core_id);
+        }
+
         if (lc++ > 1024) {
             ovsrcu_quiesce();
 
@@ -1696,7 +1795,7 @@ reload:
         }
     }
 
-    if (!latch_is_set(&f->dp->exit_latch)){
+    if (!latch_is_set(&f->socket->exit_latch)){
         goto reload;
     }
 
@@ -1739,40 +1838,87 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
 }
 
 static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
 {
     int i;
 
-    if (n == dp->n_pmd_threads) {
-        return;
+    /* Exits all existing pmd threads. */
+    for (i = 0; i < dp->n_pmd_sockets; i++) {
+        struct pmd_socket *s = &dp->pmd_sockets[i];
+
+        dp_netdev_unset_pmd_threads(dp, i);
+        latch_destroy(&s->exit_latch);
     }
+}
 
-    /* Stop existing threads. */
-    latch_set(&dp->exit_latch);
-    dp_netdev_reload_pmd_threads(dp);
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+/* Deletes all pmd threads on cpu socket 'socket_id'. */
+static void
+dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+    struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+    int i;
+
+    if (s->n_pmd_threads) {
+        latch_set(&s->exit_latch);
+        dp_netdev_reload_pmd_threads(dp);
+        for (i = 0; i < s->n_pmd_threads; i++) {
+            struct pmd_thread *f = &s->pmd_threads[i];
 
-        xpthread_join(f->thread, NULL);
+            ovs_numa_unpin_core(f->core_id);
+            xpthread_join(f->thread, NULL);
+        }
+        latch_poll(&s->exit_latch);
+        free(s->pmd_threads);
+        s->pmd_threads = NULL;
+        s->n_pmd_threads = 0;
     }
-    latch_poll(&dp->exit_latch);
-    free(dp->pmd_threads);
+}
+
+/* Checks the cpu socket id of 'netdev' and starts pmd threads for
+ * the cpu socket. */
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+    struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+
+    ovs_assert(ovs_numa_get_n_sockets() != OVS_SOCKET_UNSPEC
+               && ovs_numa_get_n_cores() != OVS_CORE_UNSPEC);
+
+    /* If there are already pmd threads create for the cpu socket
+     * in which 'netdev' is on, do nothing.  Else, creates the
+     * pmd threads for the socket. */
+    if (!s->n_pmd_threads) {
+        int n_devs = netdev_dpdk_get_n_devs_on_socket(socket_id);
+        int n_unpinned_cores =
+            ovs_numa_get_n_unpinned_cores_on_socket(socket_id);
+        int n_threads, i;
+
+        if (!n_unpinned_cores) {
+            VLOG_ERR("Cannot create pmd threads due to out of unpinned "
+                     "cores on socket");
+            return;
+        }
 
-    /* Start new threads. */
-    dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
-    dp->n_pmd_threads = n;
+        /* Starts new pmd threads on the cpu socket. */
+        n_threads = MIN(n_devs, n_unpinned_cores);
+        s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads);
+        s->n_pmd_threads = n_threads;
 
-    for (i = 0; i < n; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+        for (i = 0; i < s->n_pmd_threads; i++) {
+            struct pmd_thread *f = &s->pmd_threads[i];
 
-        f->dp = dp;
-        f->id = i;
-        atomic_store(&f->change_seq, 1);
+            f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id);
+            f->index = i;
+            f->socket = s;
+            atomic_store(&f->change_seq, 1);
 
-        /* Each thread will distribute all devices rx-queues among
-         * themselves. */
-        f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+            /* Each thread will distribute all devices rx-queues among
+             * themselves. */
+            f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+        }
     }
+
+    dp_netdev_reload_pmd_threads(dp);
 }
 
 
diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
index 50c1198..f501f7c 100644
--- a/lib/dpif-netdev.h
+++ b/lib/dpif-netdev.h
@@ -40,7 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b)
     }
 }
 
-#define NR_PMD_THREADS 1
+#define NON_PMD_CORE_ID 0
 
 #ifdef  __cplusplus
 }
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 432524f..012ee68 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -401,7 +401,6 @@ dpdk_get_n_devs(int socket_id)
             count++;
         }
     }
-    ovs_assert(count);
 
     return count;
 }
@@ -508,14 +507,12 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk
         rte_spinlock_init(&netdev->tx_q[i].tx_lock);
     }
 
-    netdev->port_id = port_no;
-
     netdev->flags = 0;
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
 
-    /* XXX: need to discover device node at run time. */
-    netdev->socket_id = SOCKET0;
+    netdev->socket_id = rte_eth_dev_socket_id(port_no);
+    netdev->port_id = port_no;
 
     netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
     if (!netdev->dpdk_mp) {
@@ -699,7 +696,11 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
     int nb_rx;
 
-    dpdk_queue_flush(dev, rxq_->queue_id);
+    /* There is only one tx queue for this core.  Do not flush other
+     * queueus. */
+    if (rxq_->queue_id == rte_lcore_id()) {
+        dpdk_queue_flush(dev, rxq_->queue_id);
+    }
 
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
                              (struct rte_mbuf **) packets,
@@ -1496,6 +1497,23 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_)
     }
 }
 
+/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */
+int
+netdev_dpdk_get_n_devs_on_socket(int socket_id)
+{
+    return dpdk_get_n_devs(socket_id);
+}
+
+/* Flushes pkts sent from the calling thread to other dpdk ifaces that are
+ * not on the same cpu socket. */
+void
+netdev_dpdk_flush_non_local(struct netdev *netdev_, int qid)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
+
+    dpdk_queue_flush(dev, qid);
+}
+
 int
 pmd_thread_setaffinity_cpu(int cpu)
 {
@@ -1510,7 +1528,8 @@ pmd_thread_setaffinity_cpu(int cpu)
         return err;
     }
     /* lcore_id 0 is reseved for use by non pmd threads. */
-    RTE_PER_LCORE(_lcore_id) = cpu + 1;
+    ovs_assert(cpu);
+    RTE_PER_LCORE(_lcore_id) = cpu;
 
     return 0;
 }
@@ -1518,16 +1537,13 @@ pmd_thread_setaffinity_cpu(int cpu)
 void
 thread_set_nonpmd(void)
 {
-    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
-     * for non pmd threads */
-    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
     /* We have to use 0 to allow non pmd threads to perform certain DPDK
      * operations, like rte_eth_dev_configure(). */
-    RTE_PER_LCORE(_lcore_id) = 0;
+    RTE_PER_LCORE(_lcore_id) = NON_PMD_CORE_ID;
 }
 
 static bool
 thread_is_pmd(void)
 {
-    return rte_lcore_id() != 0;
+    return rte_lcore_id() != NON_PMD_CORE_ID;
 }
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index 75f6a0b..5e8f296 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -24,6 +24,8 @@ struct netdev;
 int dpdk_init(int argc, char **argv);
 void netdev_dpdk_register(void);
 int netdev_dpdk_get_socket_id(const struct netdev *);
+int netdev_dpdk_get_n_devs_on_socket(int socket_id);
+void netdev_dpdk_flush_non_local(struct netdev *, int qid);
 void free_dpdk_buf(struct dpif_packet *);
 int pmd_thread_setaffinity_cpu(int cpu);
 void thread_set_nonpmd(void);
@@ -48,6 +50,19 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED)
     return -1;
 }
 
+static inline int
+netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED)
+{
+    return -1;
+}
+
+static inline void
+netdev_dpdk_flush_non_local(struct netdev *netdev OVS_UNUSED,
+                            int qid OVS_UNUSED)
+{
+    /* Nothing */
+}
+
 static inline void
 free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
 {
-- 
1.7.9.5




More information about the dev mailing list