[ovs-dev] [dpdk patch 2/2] dpif-netdev: Allow multi-rx-queue, multi-pmd-thread configuration.

Alex Wang alexw at nicira.com
Mon Sep 15 21:03:34 UTC 2014


This commits adds the multithreading functionality to OVS dpdk
module.  Users are able to create multiple pmd threads and set
their cpu affinity via specifying the cpu mask string similar
to the EAL '-c COREMASK' option.

Also, the number of rx queues for each dpdk interface is made
configurable to help distribution of rx packets among multiple
pmd threads.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
 lib/dpif-linux.c           |    1 +
 lib/dpif-netdev.c          |  122 +++++++++++++++++++++++++++++++++++++++++---
 lib/dpif-provider.h        |    7 +++
 lib/dpif.c                 |   18 +++++++
 lib/dpif.h                 |    2 +
 ofproto/ofproto-dpif.c     |    2 +
 ofproto/ofproto-provider.h |    6 +++
 ofproto/ofproto.c          |   16 ++++++
 ofproto/ofproto.h          |    2 +
 vswitchd/bridge.c          |    3 ++
 vswitchd/vswitch.xml       |   27 ++++++++++
 11 files changed, 198 insertions(+), 8 deletions(-)

diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index 2c387ed..ed2058c 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -1873,6 +1873,7 @@ const struct dpif_class dpif_linux_class = {
     dpif_linux_operate,
     dpif_linux_recv_set,
     dpif_linux_handlers_set,
+    NULL,                       /* poll_thread_set */
     dpif_linux_queue_to_priority,
     dpif_linux_recv,
     dpif_linux_recv_wait,
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 3f69219..bfab78a 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -205,6 +205,11 @@ struct dp_netdev {
     /* Each pmd thread will store its pointer to
      * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
     ovsthread_key_t per_pmd_key;
+
+    /* Number of rx queues for each dpdk interface and the cpu mask
+     * for pin of pmd threads. */
+    size_t n_dpdk_rxqs;
+    char *pmd_cmask;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -395,10 +400,12 @@ static void dp_netdev_disable_upcall(struct dp_netdev *);
 static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
                                     struct dp_netdev *dp, int index,
                                     int core_id, int numa_id);
+static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
 static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
 static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
+static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
 
 static void emc_clear_entry(struct emc_entry *ce);
 
@@ -537,7 +544,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev *dp;
-    struct dp_netdev_pmd_thread *non_pmd;
     int error;
 
     dp = xzalloc(sizeof *dp);
@@ -570,9 +576,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
 
     /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
     ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
-    non_pmd = xzalloc(sizeof *non_pmd);
-    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
-                            OVS_NUMA_UNSPEC);
+    dp_netdev_set_nonpmd(dp);
 
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -776,8 +780,10 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
             return ENOENT;
         }
         /* There can only be ovs_numa_get_n_cores() pmd threads,
-         * so creates a tx_q for each. */
-        error = netdev_set_multiq(netdev, n_cores, NR_QUEUE);
+         * so creates a txq for each. */
+        error = netdev_set_multiq(netdev, n_cores,
+                                  dp->n_dpdk_rxqs ? dp->n_dpdk_rxqs
+                                                  : NR_QUEUE);
         if (error) {
             VLOG_ERR("%s, cannot set multiq", devname);
             return errno;
@@ -1842,6 +1848,77 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
     }
 }
 
+/* Returns true if the configuration for rx queues or cpu mask
+ * changed. */
+static bool
+pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
+{
+    if (dp->n_dpdk_rxqs != rxqs) {
+        return true;
+    } else {
+        if (dp->pmd_cmask != NULL && cmask != NULL) {
+            return strcmp(dp->pmd_cmask, cmask);
+        } else {
+            return (dp->pmd_cmask != NULL || cmask != NULL);
+        }
+    }
+}
+
+/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
+static int
+dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    if (pmd_config_changed(dp, n_rxqs, cmask)) {
+        struct dp_netdev_port *port;
+
+        dp_netdev_destroy_all_pmds(dp);
+
+        CMAP_FOR_EACH (port, node, &dp->ports) {
+            if (netdev_is_pmd(port->netdev)) {
+                int i, err;
+
+                /* Closes the existing 'rxq's. */
+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                    netdev_rxq_close(port->rxq[i]);
+                    port->rxq[i] = NULL;
+                }
+
+                /* Sets the new rx queue config.  */
+                err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(),
+                                        n_rxqs);
+                if (err) {
+                    VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
+                             " %u", netdev_get_name(port->netdev),
+                             n_rxqs);
+                    return err;
+                }
+
+                /* If the set_multiq() above succeeds, reopens the 'rxq's. */
+                port->rxq = xrealloc(port->rxq, sizeof *port->rxq
+                                     * netdev_n_rxq(port->netdev));
+                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+                    netdev_rxq_open(port->netdev, &port->rxq[i], i);
+                }
+            }
+        }
+        dp->n_dpdk_rxqs = n_rxqs;
+
+        /* Reconfigures the cpu mask. */
+        ovs_numa_set_cpu_mask(cmask);
+        free(dp->pmd_cmask);
+        dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL;
+
+        /* Restores the non-pmd. */
+        dp_netdev_set_nonpmd(dp);
+        /* Restores all pmd threads. */
+        dp_netdev_reset_pmd_threads(dp);
+    }
+
+    return 0;
+}
+
 static int
 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
                               uint32_t queue_id, uint32_t *priority)
@@ -2093,6 +2170,17 @@ dp_netdev_get_nonpmd(struct dp_netdev *dp)
     return pmd;
 }
 
+/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
+static void
+dp_netdev_set_nonpmd(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *non_pmd;
+
+    non_pmd = xzalloc(sizeof *non_pmd);
+    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
+                            OVS_NUMA_UNSPEC);
+}
+
 /* Configures the 'pmd' based on the input argument. */
 static void
 dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
@@ -2185,8 +2273,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
             return;
         }
 
-        /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */
-        can_have = MIN(n_unpinned, NR_PMD_THREADS);
+        /* If cpu mask is specified, uses all unpinned cores, otherwise
+         * tries creating NR_PMD_THREADS pmd threads. */
+        can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
         for (i = 0; i < can_have; i++) {
             struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
             int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
@@ -2209,6 +2298,22 @@ dp_netdev_flow_stats_new_cb(void)
     return bucket;
 }
 
+/* Called after pmd threads config change.  Restarts pmd threads with
+ * new configuration. */
+static void
+dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)) {
+            int numa_id = netdev_get_numa_id(port->netdev);
+
+            dp_netdev_set_pmds_on_numa(dp, numa_id);
+        }
+    }
+}
+
 static void
 dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
                     int cnt, int size,
@@ -2772,6 +2877,7 @@ const struct dpif_class dpif_netdev_class = {
     dpif_netdev_operate,
     NULL,                       /* recv_set */
     NULL,                       /* handlers_set */
+    dpif_netdev_pmd_set,
     dpif_netdev_queue_to_priority,
     NULL,                       /* recv */
     NULL,                       /* recv_wait */
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index 89b32dd..e1136e1 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -300,6 +300,13 @@ struct dpif_class {
      * */
     int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);
 
+    /* If 'dpif' creates its own I/O polling threads, refreshes poll threads
+     * configuration.  'n_rxqs' configures the number of rx_queues, which
+     * are distributed among threads.  'cmask' configures the cpu mask
+     * for setting the polling threads' cpu affinity. */
+    int (*poll_threads_set)(struct dpif *dpif, unsigned int n_rxqs,
+                            const char *cmask);
+
     /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
      * priority value used for setting packet priority. */
     int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
diff --git a/lib/dpif.c b/lib/dpif.c
index bf2c5f9..91ccfd8 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1300,6 +1300,24 @@ dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
     }
 }
 
+/* If 'dpif' creates its own I/O polling threads, refreshes poll threads
+ * configuration. */
+int
+dpif_poll_threads_set(struct dpif *dpif, unsigned int n_rxqs,
+                      const char *cmask)
+{
+    int error = 0;
+
+    if (dpif->dpif_class->poll_threads_set) {
+        error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask);
+        if (error) {
+            log_operation(dpif, "poll_threads_set", error);
+        }
+    }
+
+    return error;
+}
+
 /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
  * there can be multiple poll loops, 'handler_id' is needed as index to
  * identify the corresponding poll loop.  If successful, stores the upcall
diff --git a/lib/dpif.h b/lib/dpif.h
index be1bc4f..c57c8b0 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -769,6 +769,8 @@ void dpif_register_upcall_cb(struct dpif *, upcall_callback *, void *aux);
 
 int dpif_recv_set(struct dpif *, bool enable);
 int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
+int dpif_poll_threads_set(struct dpif *, unsigned int n_rxqs,
+                          const char *cmask);
 int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
               struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 6a59098..6cc9789 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -532,6 +532,8 @@ type_run(const char *type)
         udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
     }
 
+    dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask);
+
     if (backer->need_revalidate) {
         struct ofproto_dpif *ofproto;
         struct simap_node *node;
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index de354ec..158f86e 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle;
  * ofproto-dpif implementation. */
 extern size_t n_handlers, n_revalidators;
 
+/* Number of rx queues to be created for each dpdk interface. */
+extern size_t n_dpdk_rxqs;
+
+/* Cpu mask for pmd threads. */
+extern char *pmd_cpu_mask;
+
 static inline struct rule *rule_from_cls_rule(const struct cls_rule *);
 
 void ofproto_rule_expire(struct rule *rule, uint8_t reason)
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 7b1d478..818e23f 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT;
 unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT;
 
 size_t n_handlers, n_revalidators;
+size_t n_dpdk_rxqs;
+char *pmd_cpu_mask;
 
 /* Map from datapath name to struct ofproto, for use by unixctl commands. */
 static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos);
@@ -731,6 +733,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood)
 }
 
 void
+ofproto_set_n_dpdk_rxqs(int n_rxqs)
+{
+    n_dpdk_rxqs = MAX(n_rxqs, 0);
+}
+
+void
+ofproto_set_cpu_mask(const char *cmask)
+{
+    free(pmd_cpu_mask);
+
+    pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL;
+}
+
+void
 ofproto_set_threads(int n_handlers_, int n_revalidators_)
 {
     int threads = MAX(count_cpu_cores(), 2);
diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h
index d60b198..40bb3b7 100644
--- a/ofproto/ofproto.h
+++ b/ofproto/ofproto.h
@@ -299,6 +299,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto,
 int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux,
                                     bool flood);
 void ofproto_set_threads(int n_handlers, int n_revalidators);
+void ofproto_set_n_dpdk_rxqs(int n_rxqs);
+void ofproto_set_cpu_mask(const char *cmask);
 void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc);
 int ofproto_set_snoops(struct ofproto *, const struct sset *snoops);
 int ofproto_set_netflow(struct ofproto *,
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index 8f99d7d..045dd77 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -537,6 +537,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
                                         OFPROTO_FLOW_LIMIT_DEFAULT));
     ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle",
                                       OFPROTO_MAX_IDLE_DEFAULT));
+    ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config,
+                                         "n-dpdk-rxqs", 0));
+    ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask"));
 
     ofproto_set_threads(
         smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index d07d54f..b00f74d 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -152,6 +152,33 @@
         </p>
       </column>
 
+      <column name="other_config" key="n-dpdk-rxqs"
+              type='{"type": "integer", "minInteger": 1}'>
+        <p>
+          Specifies the number of rx queues to be created for each dpdk
+          interface.  If not specified or specified to 0, one rx queue will
+          be created for each dpdk interface by default.
+        </p>
+      </column>
+
+      <column name="other_config" key="pmd-cpu-mask">
+        <p>
+          Specifies CPU mask for setting the cpu affinity of PMD (Poll
+          Mode Driver) threads.  Value should be in the form of hex string,
+          similar to the dpdk EAL '-c COREMASK' option input or the 'taskset'
+          mask input.
+        </p>
+        <p>
+          The lowest order bit corresponds to the first CPU core.  A set bit
+          means the corresponding core is available.  If the input does not
+          cover all cores, those uncovered cores are considered not set.
+        </p>
+        <p>
+          If not specified, one pmd thread will be created for each numa node
+          and pinned to any available core on the numa node by default.
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>
-- 
1.7.9.5




More information about the dev mailing list