[ovs-dev] [PATCH V2 5/5] dpif-netdev: Create 'number of dpdk ifaces on cpu socket' pmd threads for each cpu socket.

Alex Wang alexw at nicira.com
Wed Jun 25 01:40:06 UTC 2014


The pmd threads are pinned to available cpu cores on the
corresponding cpu socket.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
PATCH -> V2:
- Add latch_destory().
- Use 'int' for cpu socket/core id.
---
 lib/dpif-netdev.c |  178 ++++++++++++++++++++++++++++++++++++++---------------
 lib/netdev-dpdk.c |   27 +++++++-
 lib/netdev-dpdk.h |   14 +++++
 3 files changed, 167 insertions(+), 52 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 4dcc268..f36ff2b 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -51,6 +51,7 @@
 #include "odp-util.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packet-dpif.h"
 #include "packets.h"
@@ -69,7 +70,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 /* By default, choose a priority in the middle. */
 #define NETDEV_RULE_PRIORITY 0x8000
 
-#define NR_THREADS 1
 /* Use per thread recirc_depth to prevent recirculation loop. */
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
@@ -166,11 +166,9 @@ struct dp_netdev {
     struct cmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
-    /* Forwarding threads. */
-    struct latch exit_latch;
-    struct pmd_thread *pmd_threads;
-    size_t n_pmd_threads;
-    int pmd_count;
+    /* Per-cpu-socket struct for configuring pmd threads. */
+    struct pmd_socket *pmd_sockets;
+    int n_pmd_sockets;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -305,6 +303,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
     const struct dp_netdev_flow *);
 static void dp_netdev_actions_free(struct dp_netdev_actions *);
 
+/* Represents the PMD configuration on a cpu socket. */
+struct pmd_socket {
+    struct dp_netdev *dp;
+    struct latch exit_latch;
+    struct pmd_thread *pmd_threads;
+    int socket_id;
+    int n_pmd_threads;
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
@@ -317,9 +324,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
  * table, and executes the actions it finds.
  **/
 struct pmd_thread {
-    struct dp_netdev *dp;
+    struct pmd_socket *socket;
     pthread_t thread;
     int id;
+    int core_id;
     atomic_uint change_seq;
 };
 
@@ -359,7 +367,9 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
                                  struct dpif_packet **packets, int cnt,
                                  odp_port_t port_no);
 
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
+static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *);
+static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id);
+static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id);
 
 static struct dpif_netdev *
 dpif_netdev_cast(const struct dpif *dpif)
@@ -473,7 +483,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev *dp;
-    int error;
+    int n_sockets, error, i;
 
     dp = xzalloc(sizeof *dp);
     shash_add(&dp_netdevs, name, dp);
@@ -494,7 +504,15 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
-    latch_init(&dp->exit_latch);
+
+    n_sockets = ovs_numa_get_n_sockets();
+    dp->n_pmd_sockets = n_sockets != OVS_SOCKET_UNSPEC ? n_sockets : 0;
+    dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets);
+    for (i = 0; i < dp->n_pmd_sockets; i++) {
+        dp->pmd_sockets[i].dp = dp;
+        dp->pmd_sockets[i].socket_id = i;
+        latch_init(&dp->pmd_sockets[i].exit_latch);
+    }
 
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -563,8 +581,8 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_set_pmd_threads(dp, 0);
-    free(dp->pmd_threads);
+    dp_netdev_destroy_all_pmd_sockets(dp);
+    free(dp->pmd_sockets);
 
     dp_netdev_flow_flush(dp);
     ovs_mutex_lock(&dp->port_mutex);
@@ -590,7 +608,6 @@ dp_netdev_free(struct dp_netdev *dp)
     ovs_mutex_destroy(&dp->flow_mutex);
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
-    latch_destroy(&dp->exit_latch);
     free(CONST_CAST(char *, dp->name));
     free(dp);
 }
@@ -659,12 +676,12 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
 }
 
 static void
-dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+dp_netdev_reload_pmd_threads(struct pmd_socket *s)
 {
     int i;
 
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+    for (i = 0; i < s->n_pmd_threads; i++) {
+        struct pmd_thread *f = &s->pmd_threads[i];
         int id;
 
         atomic_add(&f->change_seq, 1, &id);
@@ -735,10 +752,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     }
     port->sf = sf;
 
-    if (netdev_is_pmd(netdev)) {
-        dp->pmd_count++;
-        dp_netdev_set_pmd_threads(dp, NR_THREADS);
-        dp_netdev_reload_pmd_threads(dp);
+    if (netdev_is_pmd(netdev) && dp->n_pmd_sockets) {
+        dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev));
     }
     ovs_refcount_init(&port->ref_cnt);
 
@@ -879,6 +894,23 @@ get_port_by_name(struct dp_netdev *dp,
     return ENOENT;
 }
 
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on cpu socket 'socket_id'. */
+static bool
+has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_dpdk_get_socket_id(port->netdev) == socket_id) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -886,7 +918,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
     if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_reload_pmd_threads(dp);
+        int socket_id = netdev_dpdk_get_socket_id(port->netdev);
+
+        /* If there is no netdev on the cpu socket, unsets the pmd threads
+         * for that cpu socket.  Else, just reload the queues.  */
+        if (!has_pmd_port_for_socket(dp, socket_id)) {
+            dp_netdev_unset_pmd_threads(dp, socket_id);
+        } else {
+            dp_netdev_reload_pmd_threads(&dp->pmd_sockets[socket_id]);
+        }
     }
 
     port_unref(port);
@@ -1828,7 +1868,7 @@ static int
 pmd_load_queues(struct pmd_thread *f,
                 struct rxq_poll **ppoll_list, int poll_cnt)
 {
-    struct dp_netdev *dp = f->dp;
+    struct pmd_socket *s = f->socket;
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
     int id = f->id;
@@ -1843,12 +1883,13 @@ pmd_load_queues(struct pmd_thread *f,
     poll_cnt = 0;
     index = 0;
 
-    CMAP_FOR_EACH (port, node, &f->dp->ports) {
-        if (netdev_is_pmd(port->netdev)) {
+    CMAP_FOR_EACH (port, node, &s->dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) {
             int i;
 
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                if ((index % dp->n_pmd_threads) == id) {
+                if ((index % s->n_pmd_threads) == id) {
                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
 
                     port_ref(port);
@@ -1869,7 +1910,7 @@ static void *
 pmd_thread_main(void *f_)
 {
     struct pmd_thread *f = f_;
-    struct dp_netdev *dp = f->dp;
+    struct dp_netdev *dp = f->socket->dp;
     unsigned int lc = 0;
     struct rxq_poll *poll_list;
     unsigned int port_seq;
@@ -1879,7 +1920,7 @@ pmd_thread_main(void *f_)
     poll_cnt = 0;
     poll_list = NULL;
 
-    pmd_thread_setaffinity_cpu(f->id);
+    pmd_thread_setaffinity_cpu(f->core_id);
 reload:
     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
     atomic_read(&f->change_seq, &port_seq);
@@ -1892,6 +1933,8 @@ reload:
             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
         }
 
+        netdev_dpdk_flush_non_local(f->socket->socket_id, f->core_id);
+
         if (lc++ > 1024) {
             ovsrcu_quiesce();
 
@@ -1906,7 +1949,7 @@ reload:
         }
     }
 
-    if (!latch_is_set(&f->dp->exit_latch)){
+    if (!latch_is_set(&f->socket->exit_latch)){
         goto reload;
     }
 
@@ -1919,40 +1962,75 @@ reload:
 }
 
 static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
 {
     int i;
 
-    if (n == dp->n_pmd_threads) {
-        return;
+    /* Exits all existing pmd threads. */
+    for (i = 0; i < dp->n_pmd_sockets; i++) {
+        struct pmd_socket *s = &dp->pmd_sockets[i];
+
+        dp_netdev_unset_pmd_threads(dp, i);
+        latch_destroy(&s->exit_latch);
     }
+}
+
+/* Deletes all pmd threads on cpu socket 'socket_id'. */
+static void
+dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+    struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+    int i;
 
-    /* Stop existing threads. */
-    latch_set(&dp->exit_latch);
-    dp_netdev_reload_pmd_threads(dp);
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+    if (s->n_pmd_threads) {
+        latch_set(&s->exit_latch);
+        dp_netdev_reload_pmd_threads(s);
+        for (i = 0; i < s->n_pmd_threads; i++) {
+            struct pmd_thread *f = &s->pmd_threads[i];
 
-        xpthread_join(f->thread, NULL);
+            ovs_numa_unpin_core(f->core_id);
+            xpthread_join(f->thread, NULL);
+        }
+        latch_poll(&s->exit_latch);
+        free(s->pmd_threads);
+        s->pmd_threads = NULL;
+        s->n_pmd_threads = 0;
     }
-    latch_poll(&dp->exit_latch);
-    free(dp->pmd_threads);
+}
+
+/* Checks the cpu socket id of 'netdev' and starts pmd threads for
+ * the cpu socket. */
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+    struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+
+    /* If there are already pmd threads create for the cpu socket
+     * in which 'netdev' is on, do nothing.  Else, creates the
+     * pmd threads for the socket. */
+    if (!s->n_pmd_threads) {
+        int n_threads = netdev_dpdk_get_n_devs_on_socket(socket_id);
+        int i;
 
-    /* Start new threads. */
-    dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
-    dp->n_pmd_threads = n;
+        /* Starts a new thread for the cpu socket. */
+        s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads);
+        s->n_pmd_threads = n_threads;
 
-    for (i = 0; i < n; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+        for (i = 0; i < s->n_pmd_threads; i++) {
+            struct pmd_thread *f = &s->pmd_threads[i];
 
-        f->dp = dp;
-        f->id = i;
-        atomic_store(&f->change_seq, 1);
+            f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id);
+            f->id = i;
+            f->socket = s;
+            atomic_store(&f->change_seq, 1);
 
-        /* Each thread will distribute all devices rx-queues among
-         * themselves. */
-        f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+            /* Each thread will distribute all devices rx-queues among
+             * themselves. */
+            f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+        }
     }
+
+    dp_netdev_reload_pmd_threads(s);
 }
 
 
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index b0254e7..5a0ca76 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -491,8 +491,8 @@ netdev_dpdk_construct(struct netdev *netdev_)
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
 
-    /* TODO: need to discover device node at run time. */
-    netdev->socket_id = SOCKET0;
+    netdev->socket_id = rte_eth_dev_socket_id(port_no);
+    CPU_SOCKET_ID_ASSERT(netdev->socket_id);
     netdev->port_id = port_no;
 
     netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
@@ -1308,6 +1308,29 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_)
     }
 }
 
+/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */
+int
+netdev_dpdk_get_n_devs_on_socket(int socket_id)
+{
+    CPU_SOCKET_ID_ASSERT(socket_id);
+
+    return dpdk_get_n_devs(socket_id);
+}
+
+/* For all dpdk ifaces not on the cpu socket 'socket_id', flushes the 'tx_q'.
+ * for this thread. */
+void
+netdev_dpdk_flush_non_local(int socket_id, int qid)
+{
+    struct netdev_dpdk *dev;
+
+    LIST_FOR_EACH (dev, list_node, &dpdk_list) {
+        if (dev->socket_id != socket_id) {
+            dpdk_queue_flush(dev, qid);
+        }
+    }
+}
+
 int
 pmd_thread_setaffinity_cpu(int cpu)
 {
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index da507ce..fc6c217 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -25,6 +25,8 @@ struct netdev;
 int dpdk_init(int argc, char **argv);
 void netdev_dpdk_register(void);
 int netdev_dpdk_get_socket_id(const struct netdev *);
+int netdev_dpdk_get_n_devs_on_socket(int socket_id);
+void netdev_dpdk_flush_non_local(int socket_id, int qid);
 void free_dpdk_buf(struct dpif_packet *);
 int pmd_thread_setaffinity_cpu(int cpu);
 
@@ -50,6 +52,18 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED)
     return -1;
 }
 
+static inline int
+netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED)
+{
+    return -1;
+}
+
+static inline void
+netdev_dpdk_flush_non_local(int socket_id OVS_UNUSED, int qid OVS_UNUSED)
+{
+    /* Nothing */
+}
+
 static inline void
 free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
 {
-- 
1.7.9.5




More information about the dev mailing list