[ovs-dev] [dpdk patch 8/8] dpdk: Allow configuration of pmd multithreading and dpdk interface rx queues.

Alex Wang alexw at nicira.com
Tue Aug 12 04:56:43 UTC 2014


This commits adds the multithreading functionality to OVS dpdk
module.  Users are able to create multiple pmd threads and set
their cpu affinity via specifying the cpu mask string similar
to the EAL '-c COREMASK' option.

Also, the number of rx queues for each dpdk interface is made
configurable to help distribution of rx packets among multiple
pmd threads.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
 lib/dpif-linux.c           |    1 +
 lib/dpif-netdev.c          |  130 ++++++++++++++++++++++++++++++++++++++------
 lib/dpif-provider.h        |    7 +++
 lib/dpif.c                 |   17 ++++++
 lib/dpif.h                 |    1 +
 lib/ovs-numa.c             |    2 +-
 ofproto/ofproto-dpif.c     |    2 +
 ofproto/ofproto-provider.h |    6 ++
 ofproto/ofproto.c          |   16 ++++++
 ofproto/ofproto.h          |    2 +
 vswitchd/bridge.c          |    3 +
 vswitchd/vswitch.xml       |   29 ++++++++++
 12 files changed, 198 insertions(+), 18 deletions(-)

diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index aa01cef..c2d48a2 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -1941,6 +1941,7 @@ const struct dpif_class dpif_linux_class = {
     dpif_linux_operate,
     dpif_linux_recv_set,
     dpif_linux_handlers_set,
+    NULL,                       /* poll_thread_set */
     dpif_linux_queue_to_priority,
     dpif_linux_recv,
     dpif_linux_recv_wait,
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 14784bf..d136df3 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -147,6 +147,9 @@ struct dp_netdev {
     /* Per-cpu-socket struct for configuring pmd threads. */
     struct pmd_socket *pmd_sockets;
     int n_pmd_sockets;
+
+    size_t n_dpdk_rxqs;
+    char *pmd_cmask;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -346,9 +349,11 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
                                  odp_port_t port_no);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
-static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *);
+static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *,
+                                              bool del_latch);
 static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id);
 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id);
+static void dp_netdev_reset_pmd_threads(struct dp_netdev *);
 
 static struct dpif_netdev *
 dpif_netdev_cast(const struct dpif *dpif)
@@ -556,7 +561,7 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_destroy_all_pmd_sockets(dp);
+    dp_netdev_destroy_all_pmd_sockets(dp, true);
     free(dp->pmd_sockets);
 
     dp_netdev_flow_flush(dp);
@@ -1572,6 +1577,79 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     return 0;
 }
 
+static bool
+pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
+{
+    if (dp->n_dpdk_rxqs != rxqs) {
+        return true;
+    } else {
+        if (dp->pmd_cmask != NULL && cmask != NULL) {
+            return strcmp(dp->pmd_cmask, cmask);
+        } else {
+            return (dp->pmd_cmask != NULL || cmask != NULL);
+        }
+    }
+}
+
+static int
+dpif_netdev_pmd_set(struct dpif *dpif, size_t n_rxqs, const char *cmask)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    int err = 0;
+
+    /* If there is any configuration change, destroys all pmd threads,
+     * adopts the new config, and restarts pmd threads. */
+    if (pmd_config_changed(dp, n_rxqs, cmask)) {
+        struct dp_netdev_port *port;
+
+        dp_netdev_destroy_all_pmd_sockets(dp, false);
+
+        /* Closes all dpdk port rx queues. */
+        CMAP_FOR_EACH (port, node, &dp->ports) {
+            if (netdev_is_pmd(port->netdev)) {
+                int i;
+
+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                    netdev_rxq_close(port->rxq[i]);
+                }
+            }
+        }
+
+        err = netdev_dpdk_set_rx_queues(n_rxqs);
+        if (err) {
+            VLOG_ERR("Failed to set dpdk rx_queue to: %"PRIu64","
+                     "reset the default configuration", n_rxqs);
+            /* On error, resets the rx_queues to default. */
+            dp->n_dpdk_rxqs = 0;
+            netdev_dpdk_set_rx_queues(dp->n_dpdk_rxqs);
+        } else {
+            dp->n_dpdk_rxqs = n_rxqs;
+        }
+
+        /* Reloads all dpdk port rx queues. */
+        CMAP_FOR_EACH (port, node, &dp->ports) {
+            if (netdev_is_pmd(port->netdev)) {
+                int i;
+
+                port->rxq = xrealloc(port->rxq, sizeof *port->rxq
+                                         * netdev_n_rxq(port->netdev));
+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                    netdev_rxq_open(port->netdev, &port->rxq[i], i);
+                }
+            }
+        }
+
+        /* Reconfigures the cpu mask. */
+        ovs_numa_set_cpu_mask(cmask);
+        free(dp->pmd_cmask);
+        dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL;
+
+        dp_netdev_reset_pmd_threads(dp);
+    }
+
+    return err;
+}
+
 static int
 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
                               uint32_t queue_id, uint32_t *priority)
@@ -1697,8 +1775,6 @@ pmd_load_queues(struct pmd_thread *f,
     CMAP_FOR_EACH (port, node, &s->dp->ports) {
         if (netdev_is_pmd(port->netdev)
             && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) {
-            int i;
-
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
                 if ((index % s->n_pmd_threads) == f->index) {
                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
@@ -1838,7 +1914,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
 }
 
 static void
-dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
+dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp, bool del_latch)
 {
     int i;
 
@@ -1847,7 +1923,9 @@ dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
         struct pmd_socket *s = &dp->pmd_sockets[i];
 
         dp_netdev_unset_pmd_threads(dp, i);
-        latch_destroy(&s->exit_latch);
+        if (del_latch) {
+            latch_destroy(&s->exit_latch);
+        }
     }
 }
 
@@ -1884,26 +1962,27 @@ dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
     ovs_assert(ovs_numa_get_n_sockets() != OVS_SOCKET_UNSPEC
                && ovs_numa_get_n_cores() != OVS_CORE_UNSPEC);
 
-    /* If there are already pmd threads create for the cpu socket
+    /* If there are already pmd threads created for the cpu socket
      * in which 'netdev' is on, do nothing.  Else, creates the
      * pmd threads for the socket. */
     if (!s->n_pmd_threads) {
-        int n_devs = netdev_dpdk_get_n_devs_on_socket(socket_id);
-        int n_unpinned_cores =
-            ovs_numa_get_n_unpinned_cores_on_socket(socket_id);
-        int n_threads, i;
+        int wanted, n_unpinned, i;
 
-        if (!n_unpinned_cores) {
+        n_unpinned = ovs_numa_get_n_unpinned_cores_on_socket(socket_id);
+        if (!n_unpinned) {
             VLOG_ERR("Cannot create pmd threads due to out of unpinned "
                      "cores on socket");
             return;
         }
 
-        /* Starts new pmd threads on the cpu socket. */
-        n_threads = MIN(n_devs, n_unpinned_cores);
-        s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads);
-        s->n_pmd_threads = n_threads;
+        /* If cpu mask is specified, uses all unpinned cores, otherwise
+         * tries creating 'number of dpdk ifaces on socket' pmd threads. */
+        wanted = dp->pmd_cmask ? n_unpinned
+            : MIN(n_unpinned, netdev_dpdk_get_n_devs_on_socket(socket_id));
 
+        /* Starts new pmd threads on the cpu socket. */
+        s->pmd_threads = xmalloc(wanted * sizeof *s->pmd_threads);
+        s->n_pmd_threads = wanted;
         for (i = 0; i < s->n_pmd_threads; i++) {
             struct pmd_thread *f = &s->pmd_threads[i];
 
@@ -1916,11 +1995,27 @@ dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
              * themselves. */
             f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
         }
+        VLOG_INFO("Created %d pmd threads on socket %d", wanted, socket_id);
     }
-
     dp_netdev_reload_pmd_threads(dp);
 }
 
+/* Called after pmd threads config change.  Restarts pmd threads with
+ * new configuration. */
+static void
+dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)) {
+            int socket_id = netdev_dpdk_get_socket_id(port->netdev);
+
+            dp_netdev_set_pmd_threads(dp, socket_id);
+        }
+    }
+}
+
 
 static void *
 dp_netdev_flow_stats_new_cb(void)
@@ -2366,6 +2461,7 @@ const struct dpif_class dpif_netdev_class = {
     NULL,                       /* operate */
     NULL,                       /* recv_set */
     NULL,                       /* handlers_set */
+    dpif_netdev_pmd_set,
     dpif_netdev_queue_to_priority,
     NULL,                       /* recv */
     NULL,                       /* recv_wait */
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index bf24a9d..068df43 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -372,6 +372,13 @@ struct dpif_class {
      * */
     int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);
 
+    /* If 'dpif' creates its own I/O polling threads, refreshes poll threads
+     * configuration.  'n_rxqs' configures the number of rx_queues, which
+     * are distributed among threads.  'cmask' configures the cpu mask
+     * for setting the polling threads cpu affinity. */
+    int (*poll_threads_set)(struct dpif *dpif, size_t n_rxqs,
+                            const char *cmask);
+
     /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
      * priority value used for setting packet priority. */
     int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
diff --git a/lib/dpif.c b/lib/dpif.c
index 43141df..3d1eb16 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1394,6 +1394,23 @@ dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
     }
 }
 
+/* If 'dpif' creates its own I/O polling threads, refreshes poll threads
+ * configuration. */
+int
+dpif_poll_threads_set(struct dpif *dpif, size_t n_rxqs, const char *cmask)
+{
+    int error = 0;
+
+    if (dpif->dpif_class->poll_threads_set) {
+        error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask);
+        if (error) {
+            log_operation(dpif, "poll_threads_set", error);
+        }
+    }
+
+    return error;
+}
+
 /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
  * there can be multiple poll loops, 'handler_id' is needed as index to
  * identify the corresponding poll loop.  If successful, stores the upcall
diff --git a/lib/dpif.h b/lib/dpif.h
index f4a2a9e..521f3d9 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -675,6 +675,7 @@ typedef void exec_upcall_cb(struct dpif *, struct dpif_upcall *,
 
 int dpif_recv_set(struct dpif *, bool enable);
 int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
+int dpif_poll_threads_set(struct dpif *, size_t n_rxqs, const char *cmask);
 int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
               struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
diff --git a/lib/ovs-numa.c b/lib/ovs-numa.c
index ec8c81a..fd4e605 100644
--- a/lib/ovs-numa.c
+++ b/lib/ovs-numa.c
@@ -311,7 +311,7 @@ ovs_numa_set_cpu_mask(const char *cmask)
         return;
     }
 
-    for (i = 0; i < strlen(cmask); i--) {
+    for (i = 0; i < strlen(cmask); i++) {
         char hex = cmask[i];
         int bin, j;
 
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index e17377f..d5a7b39 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -514,6 +514,8 @@ type_run(const char *type)
         udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
     }
 
+    dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask);
+
     if (backer->need_revalidate) {
         struct ofproto_dpif *ofproto;
         struct simap_node *node;
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index d490679..e9fabc0 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle;
  * ofproto-dpif implementation. */
 extern size_t n_handlers, n_revalidators;
 
+/* Number of rx queues to be created for each dpdk interface. */
+extern size_t n_dpdk_rxqs;
+
+/* Cpu mask for dpdk pmd threads. */
+extern char *pmd_cpu_mask;
+
 static inline struct rule *rule_from_cls_rule(const struct cls_rule *);
 
 void ofproto_rule_expire(struct rule *rule, uint8_t reason)
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 4db6fec..99b19fa 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT;
 unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT;
 
 size_t n_handlers, n_revalidators;
+size_t n_dpdk_rxqs;
+char *pmd_cpu_mask;
 
 /* Map from datapath name to struct ofproto, for use by unixctl commands. */
 static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos);
@@ -731,6 +733,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood)
 }
 
 void
+ofproto_set_n_dpdk_rxqs(int n_rxqs)
+{
+    n_dpdk_rxqs = MAX(n_rxqs, 0);
+}
+
+void
+ofproto_set_cpu_mask(const char *cmask)
+{
+    free(pmd_cpu_mask);
+
+    pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL;
+}
+
+void
 ofproto_set_threads(int n_handlers_, int n_revalidators_)
 {
     int threads = MAX(count_cpu_cores(), 2);
diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h
index 97617a3..8b0fdf8 100644
--- a/ofproto/ofproto.h
+++ b/ofproto/ofproto.h
@@ -254,6 +254,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto,
 int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux,
                                     bool flood);
 void ofproto_set_threads(int n_handlers, int n_revalidators);
+void ofproto_set_n_dpdk_rxqs(int n_rxqs);
+void ofproto_set_cpu_mask(const char *cmask);
 void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc);
 int ofproto_set_snoops(struct ofproto *, const struct sset *snoops);
 int ofproto_set_netflow(struct ofproto *,
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index 42fc0ec..2374539 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -530,6 +530,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
                                         OFPROTO_FLOW_LIMIT_DEFAULT));
     ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle",
                                       OFPROTO_MAX_IDLE_DEFAULT));
+    ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config,
+                                         "n-dpdk-rxqs", 0));
+    ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask"));
 
     ofproto_set_threads(
         smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index d47fc1a..b21cd63 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -152,6 +152,35 @@
         </p>
       </column>
 
+      <column name="other_config" key="n-dpdk-rxqs"
+              type='{"type": "integer", "minInteger": 1}'>
+        <p>
+          Specifies the number of rx queues to be created for each dpdk
+          interface.  If not specified or specified to 0, "number of dpdk
+          interfaces on same cpu socket" rx queues will be created as default.
+        </p>
+      </column>
+
+      <column name="other_config" key="pmd-cpu-mask">
+        <p>
+          Specifies CPU mask for setting the cpu affinity of PMD (Poll
+          Mode Driver) threads.  Value should be in the form of all
+          uppercase hexadecimal string.
+        </p>
+        <p>
+          A set bit in the bitwise representation of the hexadecimal string
+          means the user wants a pmd thread created and pinned to the cpu
+          core whose core id is same as the index of the bit.  If the input
+          string does not cover all cores, those uncovered cores are considered
+          not set.
+        </p>
+        <p>
+          If not specified, "number of dpdk interfaces on same cpu socket" pmd
+          threads will be created and pinned to any available cores on the
+          cpu socket as default.
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>
-- 
1.7.9.5




More information about the dev mailing list