[ovs-dev] [PATCH RFC v2 6/6] dpif-netdev: Add dpif-netdev/pmd-rxq-set appctl command.

Ilya Maximets i.maximets at samsung.com
Tue May 24 13:34:09 UTC 2016


New appctl command to perform manual pinning of RX queues
to desired cores.

Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
---
 INSTALL.DPDK.md            |  24 +++++-
 NEWS                       |   2 +
 lib/dpif-netdev.c          | 206 ++++++++++++++++++++++++++++++++++++---------
 vswitchd/ovs-vswitchd.8.in |   7 ++
 4 files changed, 198 insertions(+), 41 deletions(-)

diff --git a/INSTALL.DPDK.md b/INSTALL.DPDK.md
index bb14bb5..6e727c7 100644
--- a/INSTALL.DPDK.md
+++ b/INSTALL.DPDK.md
@@ -337,7 +337,29 @@ Performance Tuning:
 
 	`ovs-appctl dpif-netdev/pmd-rxq-show`
 
-	This can also be checked with:
+	To change default rxq assignment to pmd threads rxq may be manually
+	pinned to desired core using:
+
+	`ovs-appctl dpif-netdev/pmd-rxq-set [dp] <port> <rx_queue_id> <core_id>`
+
+	To apply new configuration after `pmd-rxq-set` reconfiguration required:
+
+	`ovs-appctl dpif-netdev/pmd-reconfigure`
+
+	After that PMD thread on core `core_id` will become `isolated`. This means
+	that this thread will poll only pinned RX queues.
+
+	WARNING: If there are no `non-isolated` PMD threads, `non-pinned` RX queues
+	will not be polled. Also, if provided `core_id` is non-negative and  not
+	available (ex. this `core_id` not in `pmd-cpu-mask`), RX queue will not be
+	polled by any pmd-thread.
+
+	Isolation of PMD threads and pinning settings also can be checked using
+	`ovs-appctl dpif-netdev/pmd-rxq-show` command.
+
+	To unpin RX queue use same command with `core-id` equal to `-1`.
+
+	Affinity mask of the pmd thread can be checked with:
 
 	```
 	top -H
diff --git a/NEWS b/NEWS
index 817cba1..8fedeb7 100644
--- a/NEWS
+++ b/NEWS
@@ -22,6 +22,8 @@ Post-v2.5.0
    - DPDK:
      * New option "n_rxq" for PMD interfaces.
        Old 'other_config:n-dpdk-rxqs' is no longer supported.
+     * New appctl command 'dpif-netdev/pmd-rxq-set' to perform manual
+       pinning of RX queues to desired core.
      * New appctl command 'dpif-netdev/pmd-rxq-show' to check the port/rxq
        assignment.
      * New appctl command 'dpif-netdev/pmd-reconfigure' to force
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index b295d10..531042c 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -250,6 +250,13 @@ enum pmd_cycles_counter_type {
 
 #define XPS_CYCLES 1000000000ULL
 
+/* Contained by struct dp_netdev_port's 'rxqs' member.  */
+struct dp_netdev_rxq {
+    struct netdev_rxq *rxq;
+    unsigned core_id;           /* Сore to which this queue is pinned. */
+    bool pinned;                /* 'True' if this rxq pinned to some core. */
+};
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -257,7 +264,7 @@ struct dp_netdev_port {
     struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
     struct netdev_saved_flags *sf;
     unsigned n_rxq;             /* Number of elements in 'rxq' */
-    struct netdev_rxq **rxq;
+    struct dp_netdev_rxq *rxqs;
     unsigned *txq_used;         /* Number of threads that uses each tx queue. */
     char *type;                 /* Port type as requested by user. */
 };
@@ -447,6 +454,7 @@ struct dp_netdev_pmd_thread {
     pthread_t thread;
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
+    bool isolated;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
@@ -722,21 +730,35 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
         struct rxq_poll *poll;
         const char *prev_name = NULL;
 
-        ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",
-                      pmd->numa_id, pmd->core_id);
+        ds_put_format(reply,
+                      "pmd thread numa_id %d core_id %u:\nisolated : %s\n",
+                      pmd->numa_id, pmd->core_id, (pmd->isolated)
+                                                  ? "true" : "false");
 
         ovs_mutex_lock(&pmd->port_mutex);
         LIST_FOR_EACH (poll, node, &pmd->poll_list) {
             const char *name = netdev_get_name(poll->port->netdev);
+            struct dp_netdev_rxq *rxq;
+            int rx_qid;
 
             if (!prev_name || strcmp(name, prev_name)) {
                 if (prev_name) {
                     ds_put_cstr(reply, "\n");
                 }
-                ds_put_format(reply, "\tport: %s\tqueue-id:",
+                ds_put_format(reply, "\tport: %s\n",
                               netdev_get_name(poll->port->netdev));
             }
-            ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
+
+            rx_qid = netdev_rxq_get_queue_id(poll->rx);
+            rxq = &poll->port->rxqs[rx_qid];
+
+            ds_put_format(reply, "\t\tqueue-id: %d\tpinned = %s",
+                          rx_qid, (rxq->pinned) ? "true" : "false");
+            if (rxq->pinned) {
+                ds_put_format(reply, "\tcore_id: %u\n", rxq->core_id);
+            } else {
+                ds_put_cstr(reply, "\n");
+            }
             prev_name = name;
         }
         ovs_mutex_unlock(&pmd->port_mutex);
@@ -800,6 +822,74 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
 }
 
 static void
+dpif_netdev_pmd_rxq_set(struct unixctl_conn *conn, int argc,
+                        const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ds reply = DS_EMPTY_INITIALIZER;
+    struct dp_netdev_port *port;
+    struct dp_netdev *dp = NULL;
+    int k = 0;
+    int core_id, rx_qid;
+
+    if (argc < 4) {
+        unixctl_command_reply_error(conn, "Invalid argument");
+        return;
+    }
+
+    ovs_mutex_lock(&dp_netdev_mutex);
+
+    if (argc == 5) {
+        dp = shash_find_data(&dp_netdevs, argv[1]);
+        k = 2;
+    } else if (shash_count(&dp_netdevs) == 1) {
+        /* There's only one datapath */
+        dp = shash_first(&dp_netdevs)->data;
+        k = 1;
+    }
+
+    if (!dp) {
+        unixctl_command_reply_error(conn,
+                                    "please specify an existing datapath");
+        goto exit_dp;
+    }
+
+    ovs_mutex_lock(&dp->port_mutex);
+    if (get_port_by_name(dp, argv[k], &port)) {
+        unixctl_command_reply_error(conn, "Unknown port");
+        goto exit;
+    }
+
+    if (!netdev_is_pmd(port->netdev)) {
+        unixctl_command_reply_error(conn, "Not a PMD port.");
+        goto exit;
+    }
+
+    rx_qid = strtol(argv[k + 1], NULL, 10);
+    if (rx_qid < 0 || rx_qid >= port->n_rxq) {
+        unixctl_command_reply_error(conn, "Bad rx queue id.");
+        goto exit;
+    }
+
+    core_id = strtol(argv[k + 2], NULL, 10);
+    if (core_id < 0) {
+        port->rxqs[rx_qid].pinned = false;
+        unixctl_command_reply(conn, "Unpinned.");
+        goto exit;
+    }
+
+    port->rxqs[rx_qid].pinned = true;
+    port->rxqs[rx_qid].core_id = core_id;
+
+    unixctl_command_reply(conn, ds_cstr(&reply));
+    ds_destroy(&reply);
+
+exit:
+    ovs_mutex_unlock(&dp->port_mutex);
+exit_dp:
+    ovs_mutex_unlock(&dp_netdev_mutex);
+}
+
+static void
 dpif_netdev_pmd_reconfigure(struct unixctl_conn *conn, int argc,
                             const char *argv[], void *aux OVS_UNUSED)
 {
@@ -852,6 +942,9 @@ dpif_netdev_init(void)
     unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
                              0, 1, dpif_netdev_pmd_info,
                              (void *)&poll_aux);
+    unixctl_command_register("dpif-netdev/pmd-rxq-set",
+                             "[dp] port queue-id core-id",
+                             3, 4, dpif_netdev_pmd_rxq_set, NULL);
     unixctl_command_register("dpif-netdev/pmd-reconfigure", "[dp]",
                              0, 1, dpif_netdev_pmd_reconfigure, NULL);
     return 0;
@@ -1208,17 +1301,18 @@ port_create(const char *devname, const char *open_type, const char *type,
     port->port_no = port_no;
     port->netdev = netdev;
     port->n_rxq = netdev_n_rxq(netdev);
-    port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
+    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
 
     for (i = 0; i < port->n_rxq; i++) {
-        error = netdev_rxq_open(netdev, &port->rxq[i], i);
+        error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
         if (error) {
             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
                      devname, ovs_strerror(errno));
             goto out_rxq_close;
         }
+        port->rxqs[i].pinned = false;
         n_open_rxqs++;
     }
 
@@ -1234,11 +1328,11 @@ port_create(const char *devname, const char *open_type, const char *type,
 
 out_rxq_close:
     for (i = 0; i < n_open_rxqs; i++) {
-        netdev_rxq_close(port->rxq[i]);
+        netdev_rxq_close(port->rxqs[i].rxq);
     }
     free(port->type);
     free(port->txq_used);
-    free(port->rxq);
+    free(port->rxqs);
     free(port);
 
 out:
@@ -1375,11 +1469,11 @@ port_destroy(struct dp_netdev_port *port)
     netdev_restore_flags(port->sf);
 
     for (unsigned i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxq[i]);
+        netdev_rxq_close(port->rxqs[i].rxq);
     }
 
     free(port->txq_used);
-    free(port->rxq);
+    free(port->rxqs);
     free(port->type);
     free(port);
 }
@@ -2626,7 +2720,7 @@ static int
 port_reconfigure(struct dp_netdev_port *port)
 {
     struct netdev *netdev = port->netdev;
-    int i, err;
+    int i, err, old_n_rxq;
 
     if (!netdev_is_reconf_required(netdev)) {
         return 0;
@@ -2634,9 +2728,10 @@ port_reconfigure(struct dp_netdev_port *port)
 
     /* Closes the existing 'rxq's. */
     for (i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxq[i]);
-        port->rxq[i] = NULL;
+        netdev_rxq_close(port->rxqs[i].rxq);
+        port->rxqs[i].rxq = NULL;
     }
+    old_n_rxq = port->n_rxq;
     port->n_rxq = 0;
 
     /* Allows 'netdev' to apply the pending configuration changes. */
@@ -2647,16 +2742,21 @@ port_reconfigure(struct dp_netdev_port *port)
         return err;
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
-    port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
+    port->rxqs = xrealloc(port->rxqs,
+                          sizeof *port->rxqs * netdev_n_rxq(netdev));
     /* Realloc 'used' counters for tx queues. */
     free(port->txq_used);
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
 
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
-        err = netdev_rxq_open(netdev, &port->rxq[i], i);
+        err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
         if (err) {
             return err;
         }
+        /* Initialization for newly allocated memory. */
+        if (i >= old_n_rxq) {
+            port->rxqs[i].pinned = false;
+        }
         port->n_rxq++;
     }
 
@@ -2727,7 +2827,7 @@ dpif_netdev_run(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
+                dp_netdev_process_rxq_port(non_pmd, port, port->rxqs[i].rxq);
             }
         }
     }
@@ -2766,7 +2866,7 @@ dpif_netdev_wait(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                netdev_rxq_wait(port->rxq[i]);
+                netdev_rxq_wait(port->rxqs[i].rxq);
             }
         }
     }
@@ -3241,9 +3341,9 @@ dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
 }
 
 
-/* Returns PMD thread from this numa node with fewer rx queues to poll.
- * Returns NULL if there is no PMD threads on this numa node.
- * Can be called safely only by main thread. */
+/* Returns non-isolated PMD thread from this numa node with fewer
+ * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads
+ * on this numa node. Can be called safely only by main thread. */
 static struct dp_netdev_pmd_thread *
 dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
 {
@@ -3251,7 +3351,7 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
     struct dp_netdev_pmd_thread *pmd, *res = NULL;
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (pmd->numa_id == numa_id
+        if (!pmd->isolated && pmd->numa_id == numa_id
             && (min_cnt > pmd->poll_cnt || res == NULL)) {
             min_cnt = pmd->poll_cnt;
             res = pmd;
@@ -3293,14 +3393,16 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
     ovs_mutex_unlock(&pmd->port_mutex);
 }
 
-/* Distribute all rx queues of 'port' between PMD threads in 'dp'. The pmd
- * threads that need to be restarted are inserted in 'to_reload'. */
+/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
+ * threads in 'dp'. The pmd threads that need to be restarted are inserted
+ * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
 static void
 dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
                               struct dp_netdev_port *port,
-                              struct hmapx *to_reload)
+                              struct hmapx *to_reload, bool pinned)
 {
     int numa_id = netdev_get_numa_id(port->netdev);
+    struct dp_netdev_pmd_thread *pmd;
     int i;
 
     if (!netdev_is_pmd(port->netdev)) {
@@ -3308,32 +3410,49 @@ dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
     }
 
     for (i = 0; i < port->n_rxq; i++) {
-        struct dp_netdev_pmd_thread *pmd;
-
-        pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
-        if (!pmd) {
-            VLOG_WARN("There's no pmd thread on numa node %d", numa_id);
-            break;
+        if (pinned) {
+            if (!port->rxqs[i].pinned) {
+                continue;
+            }
+            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
+            if (!pmd) {
+                VLOG_WARN("There is no PMD thread on core %d. "
+                          "Queue %d on port \'%s\' will not be polled.",
+                          port->rxqs[i].core_id, i,
+                          netdev_get_name(port->netdev));
+                continue;
+            }
+            pmd->isolated = true;
+            dp_netdev_pmd_unref(pmd);
+        } else {
+            if (port->rxqs[i].pinned) {
+                continue;
+            }
+            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+            if (!pmd) {
+                VLOG_WARN("There's no available pmd thread on numa node %d",
+                          numa_id);
+                break;
+            }
         }
-
         ovs_mutex_lock(&pmd->port_mutex);
-        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
         ovs_mutex_unlock(&pmd->port_mutex);
 
         hmapx_add(to_reload, pmd);
     }
 }
 
-/* Distributes all rx queues of 'port' between all PMD threads in 'dp' and
- * inserts 'port' in the PMD threads 'tx_ports'. The pmd threads that need to
- * be restarted are inserted in 'to_reload'. */
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
+ * that need to be restarted are inserted in 'to_reload'. */
 static void
 dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
                              struct hmapx *to_reload)
 {
     struct dp_netdev_pmd_thread *pmd;
 
-    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload);
+    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         dp_netdev_add_port_tx_to_pmd(pmd, port);
@@ -3341,8 +3460,9 @@ dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
     }
 }
 
-/* Distributes all rx queues of 'port' between all PMD threads in 'dp', inserts
- * 'port' in the PMD threads 'tx_ports' and reloads them, if needed. */
+/* Distributes all non-pinned rx queues of 'port' between all PMD threads
+ * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
+ * if needed. */
 static void
 dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
 {
@@ -3427,7 +3547,13 @@ dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
 
             dp_netdev_set_pmds_on_numa(dp, numa_id);
         }
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload);
+        /* Distribute only pinned rx queues first to mark threads as isolated */
+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
+    }
+
+    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
+    HMAP_FOR_EACH (port, node, &dp->ports) {
+        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
     }
 
     HMAPX_FOR_EACH (node, &to_reload) {
diff --git a/vswitchd/ovs-vswitchd.8.in b/vswitchd/ovs-vswitchd.8.in
index b181918..9855f2a 100644
--- a/vswitchd/ovs-vswitchd.8.in
+++ b/vswitchd/ovs-vswitchd.8.in
@@ -259,6 +259,13 @@ measuring infrastructure.
 Resets to zero the per pmd thread performance numbers shown by the
 \fBdpif-netdev/pmd-stats-show\fR command.  It will NOT reset datapath or
 bridge statistics, only the values shown by the above command.
+.IP "\fBdpif-netdev/pmd-rxq-set\fR [\fIdp\fR] \fIport\fR \fIqueue-id\fR \fIcore-id\fR"
+This command can be used to perform manual pinning of RX queues to desired core.
+If \fIcore-id\fR is non-negative and not in \fIpmd-cpu-mask\fR than this RX
+queue will not be polled by any pmd-thread. PMD thread will become ``isolated''.
+This means that this thread will poll only pinned RX queues. If there are no
+``non-isolated'' PMD threads non-pinned RX queues will not be polled.
+To unpin RX queue use same command with \fIcore-id\fR = -1.
 .IP "\fBdpif-netdev/pmd-rxq-show\fR [\fIdp\fR]"
 For each pmd thread of the datapath \fIdp\fR shows list of queue-ids with
 port names, which this thread polls.
-- 
2.5.0




More information about the dev mailing list