[ovs-dev] [PATCH v2 18/19] dpif-netdev: Centralized threads and queues handling code.

Daniele Di Proietto diproiettod at vmware.com
Sat Dec 3 02:14:17 UTC 2016


Currently we have three different code paths that deal with pmd threads
and queues, in response to different input

1. When a port is added
2. When a port is deleted
3. When the cpumask changes or a port must be reconfigured.

1. and 2. are carefully written to minimize disruption to the running
datapath, while 3. brings down all the threads reconfigure all the ports
and restarts everything.

This commit removes the three separate code paths by introducing the
reconfigure_datapath() function, that takes care of adapting the pmd
threads and queues to the current datapath configuration, no matter how
we got there.

This aims at simplifying mantenaince and introduces a long overdue
improvement: port reconfiguration (can happen quite frequently for
dpdkvhost ports) is now done without shutting down the whole datapath,
but just by temporarily removing the port that needs to be reconfigured
(while the rest of the datapath is running).

We now also recompute the rxq scheduling from scratch every time a port
is added of deleted.  This means that the queues will be more balanced,
especially when dealing with explicit rxq-affinity from the user
(without shutting down the threads and restarting them), but it also
means that adding or deleting a port might cause existing queues to be
moved between pmd threads.  This negative effect can be avoided by
taking into account the existing distribution when computing the new
scheduling, but I considered code clarity and fast reconfiguration more
important than optimizing port addition or removal (a port is added and
removed only once, but can be reconfigured many times)

Lastly, this commit moves the pmd threads state away from ovs-numa.  Now
the pmd threads state is kept only in dpif-netdev.

Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
Co-authored-by: Ilya Maximets <i.maximets at samsung.com>
Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
---
 lib/dpif-netdev.c | 910 +++++++++++++++++++++++++++---------------------------
 tests/pmd.at      |   3 +-
 2 files changed, 464 insertions(+), 449 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 37479b8..3509493 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -289,6 +289,7 @@ struct dp_netdev_rxq {
                                           pinned. RXQ_CORE_UNPINNED if the
                                           queue doesn't need to be pinned to a
                                           particular core. */
+    struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this queue. */
 };
 
 /* A port in a netdev-based datapath. */
@@ -304,6 +305,7 @@ struct dp_netdev_port {
     struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
     char *rxq_affinity_list;    /* Requested affinity of rx queues. */
+    bool need_reconfigure;      /* True if we should reconfigure netdev. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -506,7 +508,7 @@ struct dp_netdev_pmd_thread {
 
     /* Queue id used by this pmd thread to send packets on all netdevs if
      * XPS disabled for this netdev. All static_tx_qid's are unique and less
-     * than 'ovs_numa_get_n_cores() + 1'. */
+     * than 'cmap_count(dp->poll_threads)'. */
     const int static_tx_qid;
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
@@ -535,6 +537,9 @@ struct dp_netdev_pmd_thread {
      * reporting to the user */
     unsigned long long stats_zero[DP_N_STATS];
     uint64_t cycles_zero[PMD_N_CYCLES];
+
+    /* Set to true if the pmd thread needs to be reloaded. */
+    bool need_reload;
 };
 
 /* Interface to netdev-based datapath. */
@@ -579,29 +584,26 @@ static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 
+static void *pmd_thread_main(void *);
 static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
                                                       unsigned core_id);
 static struct dp_netdev_pmd_thread *
 dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
-static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
-static void dp_netdev_stop_pmds(struct dp_netdev *dp);
-static void dp_netdev_start_pmds(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
 static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
-static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
-                                             struct dp_netdev_port *port);
-static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
-                                       struct dp_netdev_port *port);
 static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                                         struct dp_netdev_port *port);
+                                         struct dp_netdev_port *port)
+    OVS_REQUIRES(pmd->port_mutex);
+static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                           struct tx_port *tx)
+    OVS_REQUIRES(pmd->port_mutex);
 static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
                                      struct dp_netdev_rxq *rxq)
     OVS_REQUIRES(pmd->port_mutex);
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
-static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex);
-static void reconfigure_pmd_threads(struct dp_netdev *dp)
+static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                                       struct rxq_poll *poll)
+    OVS_REQUIRES(pmd->port_mutex);
+static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
@@ -1152,7 +1154,7 @@ dp_netdev_free(struct dp_netdev *dp)
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
-    dp_netdev_destroy_all_pmds(dp);
+    dp_netdev_destroy_all_pmds(dp, true);
     cmap_destroy(&dp->poll_threads);
 
     ovs_mutex_destroy(&dp->non_pmd_mutex);
@@ -1287,10 +1289,7 @@ port_create(const char *devname, const char *type,
     struct dp_netdev_port *port;
     enum netdev_flags flags;
     struct netdev *netdev;
-    int n_open_rxqs = 0;
-    int n_cores = 0;
-    int i, error;
-    bool dynamic_txqs = false;
+    int error;
 
     *portp = NULL;
 
@@ -1308,79 +1307,24 @@ port_create(const char *devname, const char *type,
         goto out;
     }
 
-    if (netdev_is_pmd(netdev)) {
-        n_cores = ovs_numa_get_n_cores();
-
-        if (n_cores == OVS_CORE_UNSPEC) {
-            VLOG_ERR("%s, cannot get cpu core info", devname);
-            error = ENOENT;
-            goto out;
-        }
-        /* There can only be ovs_numa_get_n_cores() pmd threads,
-         * so creates a txq for each, and one extra for the non
-         * pmd threads. */
-        error = netdev_set_tx_multiq(netdev, n_cores + 1);
-        if (error && (error != EOPNOTSUPP)) {
-            VLOG_ERR("%s, cannot set multiq", devname);
-            goto out;
-        }
-    }
-
-    if (netdev_is_reconf_required(netdev)) {
-        error = netdev_reconfigure(netdev);
-        if (error) {
-            goto out;
-        }
-    }
-
-    if (netdev_is_pmd(netdev)) {
-        if (netdev_n_txq(netdev) < n_cores + 1) {
-            dynamic_txqs = true;
-        }
+    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
+    if (error) {
+        VLOG_ERR("%s: cannot set promisc flag", devname);
+        goto out;
     }
 
     port = xzalloc(sizeof *port);
     port->port_no = port_no;
     port->netdev = netdev;
-    port->n_rxq = netdev_n_rxq(netdev);
-    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
-    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
-    ovs_mutex_init(&port->txq_used_mutex);
-    port->dynamic_txqs = dynamic_txqs;
-
-    for (i = 0; i < port->n_rxq; i++) {
-        port->rxqs[i].port = port;
-        error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
-        if (error) {
-            VLOG_ERR("%s: cannot receive packets on this network device (%s)",
-                     devname, ovs_strerror(errno));
-            goto out_rxq_close;
-        }
-        port->rxqs[i].core_id = RXQ_CORE_UNPINNED;
-        n_open_rxqs++;
-    }
-
-    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
-    if (error) {
-        goto out_rxq_close;
-    }
     port->sf = sf;
+    port->need_reconfigure = true;
+    ovs_mutex_init(&port->txq_used_mutex);
 
     *portp = port;
 
     return 0;
 
-out_rxq_close:
-    for (i = 0; i < n_open_rxqs; i++) {
-        netdev_rxq_close(port->rxqs[i].rx);
-    }
-    ovs_mutex_destroy(&port->txq_used_mutex);
-    free(port->type);
-    free(port->txq_used);
-    free(port->rxqs);
-    free(port);
-
 out:
     netdev_close(netdev);
     return error;
@@ -1404,15 +1348,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         return error;
     }
 
-    if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_start_pmds(dp);
-    }
-
-    dp_netdev_add_port_to_pmds(dp, port);
-
     hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
     seq_change(dp->port_seq);
 
+    reconfigure_datapath(dp);
+
     return 0;
 }
 
@@ -1537,13 +1477,6 @@ get_port_by_name(struct dp_netdev *dp,
     return ENOENT;
 }
 
-static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
-    /* There is one non pmd thread in dp->poll_threads */
-    return cmap_count(&dp->poll_threads) - 1;
-}
-
 /* Returns 'true' if there is a port with pmd netdev. */
 static bool
 has_pmd_port(struct dp_netdev *dp)
@@ -1560,7 +1493,6 @@ has_pmd_port(struct dp_netdev *dp)
     return false;
 }
 
-
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -1568,14 +1500,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     hmap_remove(&dp->ports, &port->node);
     seq_change(dp->port_seq);
 
-    dp_netdev_del_port_from_all_pmds(dp, port);
-
-    if (netdev_is_pmd(port->netdev)) {
-        /* If there is no pmd netdev, delete the pmd threads */
-        if (!has_pmd_port(dp)) {
-            dp_netdev_stop_pmds(dp);
-        }
-    }
+    reconfigure_datapath(dp);
 
     port_destroy(port);
 }
@@ -2977,15 +2902,27 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     }
 }
 
+static struct tx_port *
+tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
+{
+    struct tx_port *tx;
+
+    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
+        if (tx->port->port_no == port_no) {
+            return tx;
+        }
+    }
+
+    return NULL;
+}
+
 static int
 port_reconfigure(struct dp_netdev_port *port)
 {
     struct netdev *netdev = port->netdev;
     int i, err;
 
-    if (!netdev_is_reconf_required(netdev)) {
-        return 0;
-    }
+    port->need_reconfigure = false;
 
     /* Closes the existing 'rxq's. */
     for (i = 0; i < port->n_rxq; i++) {
@@ -2995,11 +2932,13 @@ port_reconfigure(struct dp_netdev_port *port)
     port->n_rxq = 0;
 
     /* Allows 'netdev' to apply the pending configuration changes. */
-    err = netdev_reconfigure(netdev);
-    if (err && (err != EOPNOTSUPP)) {
-        VLOG_ERR("Failed to set interface %s new configuration",
-                 netdev_get_name(netdev));
-        return err;
+    if (netdev_is_reconf_required(netdev)) {
+        err = netdev_reconfigure(netdev);
+        if (err && (err != EOPNOTSUPP)) {
+            VLOG_ERR("Failed to set interface %s new configuration",
+                     netdev_get_name(netdev));
+            return err;
+        }
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
     port->rxqs = xrealloc(port->rxqs,
@@ -3023,42 +2962,393 @@ port_reconfigure(struct dp_netdev_port *port)
     return 0;
 }
 
+struct rr_numa_list {
+    struct hmap numas;  /* Contains 'struct rr_numa' */
+};
+
+struct rr_numa {
+    struct hmap_node node;
+
+    int numa_id;
+
+    /* Non isolated pmds on numa node 'numa_id' */
+    struct dp_netdev_pmd_thread **pmds;
+    int n_pmds;
+
+    int cur_index;
+};
+
+static struct rr_numa *
+rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
+{
+    struct rr_numa *numa;
+
+    HMAP_FOR_EACH_WITH_HASH(numa, node, hash_int(numa_id, 0), &rr->numas) {
+        if (numa->numa_id == numa_id) {
+            return numa;
+        }
+    }
+
+    return NULL;
+}
+
+static void
+rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct rr_numa *numa;
+
+    hmap_init(&rr->numas);
+
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
+            continue;
+        }
+
+        numa = rr_numa_list_lookup(rr, pmd->numa_id);
+        if (!numa) {
+            numa = xzalloc(sizeof *numa);
+            numa->numa_id = pmd->numa_id;
+            hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
+        }
+        numa->n_pmds++;
+        numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
+        numa->pmds[numa->n_pmds - 1] = pmd;
+    }
+}
+
+static struct dp_netdev_pmd_thread *
+rr_numa_get_pmd(struct rr_numa *numa)
+{
+    return numa->pmds[numa->cur_index++ % numa->n_pmds];
+}
+
+static void
+rr_numa_list_destroy(struct rr_numa_list *rr)
+{
+    struct rr_numa *numa;
+
+    HMAP_FOR_EACH_POP(numa, node, &rr->numas) {
+        free(numa->pmds);
+        free(numa);
+    }
+    hmap_destroy(&rr->numas);
+}
+
+/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
+ * queues and marks the pmds as isolated.  Otherwise, assign non isolated
+ * pmds to unpinned queues.
+ *
+ * The function doesn't touch the pmd threads, it just stores the assignment
+ * in the 'pmd' member of each rxq. */
+static void
+rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
+{
+    struct dp_netdev_port *port;
+    struct rr_numa_list rr;
+
+    rr_numa_list_populate(dp, &rr);
+
+    HMAP_FOR_EACH(port, node, &dp->ports) {
+        struct rr_numa *numa;
+        int numa_id;
+
+        if (!netdev_is_pmd(port->netdev)) {
+            continue;
+        }
+
+        numa_id = netdev_get_numa_id(port->netdev);
+        numa = rr_numa_list_lookup(&rr, numa_id);
+
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+            if (pinned && q->core_id != RXQ_CORE_UNPINNED) {
+                struct dp_netdev_pmd_thread *pmd;
+
+                pmd = dp_netdev_get_pmd(dp, q->core_id);
+                if (!pmd) {
+                    VLOG_WARN("There is no PMD thread on core %d. Queue "
+                              "%d on port \'%s\' will not be polled.",
+                              q->core_id, qid, netdev_get_name(port->netdev));
+                } else {
+                    q->pmd = pmd;
+                    pmd->isolated = true;
+                    dp_netdev_pmd_unref(pmd);
+                }
+            } else if (!pinned && q->core_id == RXQ_CORE_UNPINNED) {
+                if (!numa) {
+                    VLOG_WARN("There's no available (non isolated) pmd thread"
+                              "on numa node %d. Queue %d on port \'%s\' will"
+                              "not be polled.",
+                              numa_id, qid, netdev_get_name(port->netdev));
+                } else {
+                    q->pmd = rr_numa_get_pmd(numa);
+                }
+            }
+        }
+    }
+
+    rr_numa_list_destroy(&rr);
+}
+
 static void
 reconfigure_pmd_threads(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
-    struct dp_netdev_port *port, *next;
-    int n_cores;
+    struct dp_netdev_pmd_thread *pmd;
+    struct ovs_numa_dump *pmd_cores;
+    bool changed = false;
+
+    /* The pmd threads should be started only if there's a pmd port in the
+     * datapath.  If the user didn't provide any "pmd-cpu-mask", we start
+     * NR_PMD_THREADS per numa node. */
+    if (!has_pmd_port(dp)) {
+        pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
+    } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
+        pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
+    } else {
+        pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
+    }
+
+    /* Check for changed configuration */
+    if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
+        changed = true;
+    } else {
+        CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+            if (pmd->core_id != NON_PMD_CORE_ID
+                && !ovs_numa_dump_contains_core(pmd_cores,
+                                                pmd->numa_id,
+                                                pmd->core_id)) {
+                changed = true;
+                break;
+            }
+        }
+    }
+
+    /* Destroy the old and recreate the new pmd threads.  We don't perform an
+     * incremental update because we would have to adjust 'static_tx_qid'. */
+    if (changed) {
+        struct ovs_numa_dump *all_numas;
+        struct ovs_numa_info_core *core;
+        struct ovs_numa_info_core *numa;
+
+        /* Do not destroy the non pmd thread. */
+        dp_netdev_destroy_all_pmds(dp, false);
+        FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+
+            dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
+
+            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+        }
+
+        /* Log the number of pmd threads per numa node. */
+        all_numas = ovs_numa_dump_n_cores_per_numa(1);
+
+        FOR_EACH_CORE_ON_DUMP(numa, all_numas) {
+            int n = 0;
+
+            FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
+                if (core->numa_id == numa->numa_id) {
+                    n++;
+                }
+            }
+
+            if (n) {
+                VLOG_INFO("Created %d pmd threads on numa node %d",
+                          n, numa->numa_id);
+            }
+        }
+        ovs_numa_dump_destroy(all_numas);
+    }
+
+    ovs_numa_dump_destroy(pmd_cores);
+}
+
+static void
+reload_affected_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        if (pmd->need_reload) {
+            dp_netdev_reload_pmd__(pmd);
+            pmd->need_reload = false;
+        }
+    }
+}
+
+static void
+pmd_remove_stale_ports(struct dp_netdev *dp,
+                       struct dp_netdev_pmd_thread *pmd)
+    OVS_EXCLUDED(pmd->port_mutex)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    struct rxq_poll *poll, *poll_next;
+    struct tx_port *tx, *tx_next;
+
+    ovs_mutex_lock(&pmd->port_mutex);
+    HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
+        struct dp_netdev_port *port = poll->rxq->port;
+
+        if (port->need_reconfigure
+            || dp_netdev_lookup_port(dp, port->port_no) != port) {
+            dp_netdev_del_rxq_from_pmd(pmd, poll);
+        }
+    }
+    HMAP_FOR_EACH_SAFE(tx, tx_next, node, &pmd->tx_ports) {
+        struct dp_netdev_port *port = tx->port;
+
+        if (port->need_reconfigure
+            || dp_netdev_lookup_port(dp, port->port_no) != port) {
+            dp_netdev_del_port_tx_from_pmd(pmd, tx);
+        }
+    }
+    ovs_mutex_unlock(&pmd->port_mutex);
+}
+
+/* Must be called each time a port is added/removed or the cmask changes.
+ * This creates and destroys pmd threads, reconfigures ports, opens their
+ * rxqs and assigns all rxqs/txqs to pmd threads. */
+static void
+reconfigure_datapath(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->port_mutex)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_port *port;
+    int wanted_txqs;
 
     dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
 
-    dp_netdev_destroy_all_pmds(dp);
+    /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
+     * on the system and the user configuration. */
+    reconfigure_pmd_threads(dp);
 
-    /* Reconfigures the cpu mask. */
-    ovs_numa_set_cpu_mask(dp->pmd_cmask);
+    wanted_txqs = cmap_count(&dp->poll_threads);
 
-    n_cores = ovs_numa_get_n_cores();
-    if (n_cores == OVS_CORE_UNSPEC) {
-        VLOG_ERR("Cannot get cpu core info");
-        return;
+    /* The number of pmd threads might have changed, or a port can be new:
+     * adjust the txqs. */
+    HMAP_FOR_EACH(port, node, &dp->ports) {
+        netdev_set_tx_multiq(port->netdev, wanted_txqs);
     }
 
-    HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
+    /* Step 2: Remove from the pmd threads ports that have been removed or
+     * need reconfiguration. */
+
+    /* Check for all the ports that need reconfiguration.  We cache this in
+     * 'port->reconfigure', because netdev_is_reconf_required() can change at
+     * any time. */
+    HMAP_FOR_EACH(port, node, &dp->ports) {
+        if (netdev_is_reconf_required(port->netdev)) {
+            port->need_reconfigure = true;
+        }
+    }
+
+    /* Remove from the pmd threads all the ports that have been deleted or
+     * need reconfiguration. */
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        pmd_remove_stale_ports(dp, pmd);
+    }
+
+    /* Reload affected pmd threads.  We must wait for the pmd threads before
+     * reconfiguring the ports, because a port cannot be reconfigured while
+     * it's being used. */
+    reload_affected_pmds(dp);
+
+    /* Step 3: Reconfigure ports. */
+
+    /* We only reconfigure the ports that we determined above, because they're
+     * not being used by any pmd thread at the moment.  If a port fails to
+     * reconfigure we remove it from the datapath. */
+    HMAP_FOR_EACH(port, node, &dp->ports) {
         int err;
 
+        if (!port->need_reconfigure) {
+            continue;
+        }
+
         err = port_reconfigure(port);
         if (err) {
             hmap_remove(&dp->ports, &port->node);
             seq_change(dp->port_seq);
             port_destroy(port);
         } else {
-            port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
+            port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
         }
     }
-    /* Restores the non-pmd. */
-    dp_netdev_set_nonpmd(dp);
-    /* Restores all pmd threads. */
-    dp_netdev_reset_pmd_threads(dp);
+
+    /* Step 4: Compute new rxq scheduling.  We don't touch the pmd threads
+     * for now, we just update the 'pmd' pointer in each rxq to point to the
+     * wanted thread according to the scheduling policy. */
+
+    /* Reset all the pmd threads to non isolated. */
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        pmd->isolated = false;
+    }
+
+    /* Reset all the queues to unassigned */
+    HMAP_FOR_EACH(port, node, &dp->ports) {
+        for (int i = 0; i < port->n_rxq; i++) {
+            port->rxqs[i].pmd = NULL;
+        }
+    }
+
+    /* Add pinned queues and mark pmd threads isolated. */
+    rxq_scheduling(dp, true);
+
+    /* Add non-pinned queues. */
+    rxq_scheduling(dp, false);
+
+    /* Step 5: Remove queues not compliant with new scheduling. */
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        struct rxq_poll *poll, *poll_next;
+
+        ovs_mutex_lock(&pmd->port_mutex);
+        HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
+            if (poll->rxq->pmd != pmd) {
+                dp_netdev_del_rxq_from_pmd(pmd, poll);
+            }
+        }
+        ovs_mutex_unlock(&pmd->port_mutex);
+    }
+
+    /* Reload affected pmd threads.  We must wait for the pmd threads to remove
+     * the old queues before readding them, otherwise a queue can be polled by
+     * two threads at the same time. */
+    reload_affected_pmds(dp);
+
+    /* Step 6: Add queues from scheduling, if they're not there already. */
+    HMAP_FOR_EACH(port, node, &dp->ports) {
+        if (!netdev_is_pmd(port->netdev)) {
+            continue;
+        }
+
+        for (int qid = 0; qid < port->n_rxq; qid++) {
+            struct dp_netdev_rxq *q = &port->rxqs[qid];
+
+            if (q->pmd) {
+                ovs_mutex_lock(&q->pmd->port_mutex);
+                dp_netdev_add_rxq_to_pmd(q->pmd, q);
+                ovs_mutex_unlock(&q->pmd->port_mutex);
+            }
+        }
+    }
+
+    /* Add every port to the tx cache of every pmd thread, if it's not
+     * there already and if this pmd has at least one rxq to poll. */
+    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
+        ovs_mutex_lock(&pmd->port_mutex);
+        if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
+            HMAP_FOR_EACH(port, node, &dp->ports) {
+                dp_netdev_add_port_tx_to_pmd(pmd, port);
+            }
+        }
+        ovs_mutex_unlock(&pmd->port_mutex);
+    }
+
+    /* Reload affected pmd threads. */
+    reload_affected_pmds(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -3107,7 +3397,7 @@ dpif_netdev_run(struct dpif *dpif)
     }
 
     if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
-        reconfigure_pmd_threads(dp);
+        reconfigure_datapath(dp);
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
@@ -3357,16 +3647,9 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex)
 {
     struct dp_netdev_pmd_thread *non_pmd;
-    struct dp_netdev_port *port;
 
     non_pmd = xzalloc(sizeof *non_pmd);
     dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
-
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        dp_netdev_add_port_tx_to_pmd(non_pmd, port);
-    }
-
-    dp_netdev_reload_pmd__(non_pmd);
 }
 
 /* Caller must have valid pointer to 'pmd'. */
@@ -3412,10 +3695,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->dp = dp;
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
+    pmd->need_reload = false;
 
-    *CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
-                                              ? ovs_numa_get_n_cores()
-                                              : get_n_pmd_threads(dp);
+    *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
 
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
@@ -3483,7 +3765,6 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
     } else {
         latch_set(&pmd->exit_latch);
         dp_netdev_reload_pmd__(pmd);
-        ovs_numa_unpin_core(pmd->core_id);
         xpthread_join(pmd->thread, NULL);
     }
 
@@ -3498,20 +3779,20 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
     dp_netdev_pmd_unref(pmd);
 }
 
-/* Destroys all pmd threads, but not the non pmd thread. */
+/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
+ * thread. */
 static void
-dp_netdev_stop_pmds(struct dp_netdev *dp)
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
 {
     struct dp_netdev_pmd_thread *pmd;
     struct dp_netdev_pmd_thread **pmd_list;
     size_t k = 0, n_pmds;
 
-    n_pmds = get_n_pmd_threads(dp);
+    n_pmds = cmap_count(&dp->poll_threads);
     pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        /* We don't need to destroy the non pmd thread */
-        if (pmd->core_id == NON_PMD_CORE_ID) {
+        if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
             continue;
         }
         /* We cannot call dp_netdev_del_pmd(), since it alters
@@ -3527,32 +3808,6 @@ dp_netdev_stop_pmds(struct dp_netdev *dp)
     free(pmd_list);
 }
 
-/* Destroys all pmd threads, including the non pmd thread. */
-static void
-dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct dp_netdev_pmd_thread **pmd_list;
-    size_t k = 0, n_pmds;
-
-    n_pmds = cmap_count(&dp->poll_threads);
-    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        /* We cannot call dp_netdev_del_pmd(), since it alters
-         * 'dp->poll_threads' (while we're iterating it) and it
-         * might quiesce. */
-        ovs_assert(k < n_pmds);
-        pmd_list[k++] = pmd;
-    }
-
-    for (size_t i = 0; i < k; i++) {
-        dp_netdev_del_pmd(dp, pmd_list[i]);
-    }
-
-    free(pmd_list);
-}
-
 /* Deletes all rx queues from pmd->poll_list and all the ports from
  * pmd->tx_ports. */
 static void
@@ -3571,126 +3826,40 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
     ovs_mutex_unlock(&pmd->port_mutex);
 }
 
-static struct tx_port *
-tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
-{
-    struct tx_port *tx;
-
-    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
-        if (tx->port->port_no == port_no) {
-            return tx;
-        }
-    }
-
-    return NULL;
-}
-
-/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
- * 'tx_ports' of 'pmd' thread.  Returns true if 'port' was found in 'pmd'
- * (therefore a restart is required). */
-static bool
-dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
-                              struct dp_netdev_pmd_thread *pmd)
-{
-    struct rxq_poll *poll, *next;
-    struct tx_port *tx;
-    bool found = false;
-
-    ovs_mutex_lock(&pmd->port_mutex);
-    HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
-        if (poll->rxq->port == port) {
-            found = true;
-            hmap_remove(&pmd->poll_list, &poll->node);
-            free(poll);
-        }
-    }
-
-    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
-    if (tx) {
-        hmap_remove(&pmd->tx_ports, &tx->node);
-        free(tx);
-        found = true;
-    }
-    ovs_mutex_unlock(&pmd->port_mutex);
-
-    return found;
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads.  The pmd threads that need to be restarted are inserted in
- * 'to_reload'. */
+/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
 static void
-dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
-                                   struct dp_netdev_port *port,
-                                   struct hmapx *to_reload)
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+                         struct dp_netdev_rxq *rxq)
+    OVS_REQUIRES(pmd->port_mutex)
 {
-    struct dp_netdev_pmd_thread *pmd;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        bool found;
-
-        found = dp_netdev_del_port_from_pmd__(port, pmd);
+    int qid = netdev_rxq_get_queue_id(rxq->rx);
+    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
+    struct rxq_poll *poll;
 
-        if (found) {
-            hmapx_add(to_reload, pmd);
+    HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
+        if (poll->rxq == rxq) {
+            /* 'rxq' is already polled by this thread. Do nothing. */
+            return;
         }
     }
-}
-
-/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
- * threads. Reloads the threads if needed. */
-static void
-dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
-                                 struct dp_netdev_port *port)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct hmapx_node *node;
 
-    dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
-
-/* Returns non-isolated PMD thread from this numa node with fewer
- * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads
- * on this numa node. Can be called safely only by main thread. */
-static struct dp_netdev_pmd_thread *
-dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
-{
-    int min_cnt = -1;
-    struct dp_netdev_pmd_thread *pmd, *res = NULL;
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        if (!pmd->isolated && pmd->numa_id == numa_id
-            && (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
-            min_cnt = hmap_count(&pmd->poll_list);
-            res = pmd;
-        }
-    }
+    poll = xmalloc(sizeof *poll);
+    poll->rxq = rxq;
+    hmap_insert(&pmd->poll_list, &poll->node, hash);
 
-    return res;
+    pmd->need_reload = true;
 }
 
-/* Adds rx queue to poll_list of PMD thread. */
+/* Delete 'poll' from poll_list of PMD thread. */
 static void
-dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                         struct dp_netdev_rxq *rxq)
+dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                           struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex)
 {
-    int qid = netdev_rxq_get_queue_id(rxq->rx);
-    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
-    struct rxq_poll *poll;
+    hmap_remove(&pmd->poll_list, &poll->node);
+    free(poll);
 
-    poll = xmalloc(sizeof *poll);
-    poll->rxq = rxq;
-    hmap_insert(&pmd->poll_list, &poll->node, hash);
+    pmd->need_reload = true;
 }
 
 /* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
@@ -3698,190 +3867,37 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
 static void
 dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
                              struct dp_netdev_port *port)
+    OVS_REQUIRES(pmd->port_mutex)
 {
     struct tx_port *tx;
 
+    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
+    if (tx) {
+        /* 'port' is already on this thread tx cache. Do nothing. */
+        return;
+    }
+
     tx = xzalloc(sizeof *tx);
 
     tx->port = port;
     tx->qid = -1;
 
-    ovs_mutex_lock(&pmd->port_mutex);
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
-    ovs_mutex_unlock(&pmd->port_mutex);
-}
-
-/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
- * threads in 'dp'. The pmd threads that need to be restarted are inserted
- * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
-static void
-dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
-                              struct dp_netdev_port *port,
-                              struct hmapx *to_reload, bool pinned)
-{
-    int numa_id = netdev_get_numa_id(port->netdev);
-    struct dp_netdev_pmd_thread *pmd;
-    int i;
-
-    if (!netdev_is_pmd(port->netdev)) {
-        return;
-    }
-
-    for (i = 0; i < port->n_rxq; i++) {
-        if (pinned) {
-            if (port->rxqs[i].core_id == RXQ_CORE_UNPINNED) {
-                continue;
-            }
-            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
-            if (!pmd) {
-                VLOG_WARN("There is no PMD thread on core %d. "
-                          "Queue %d on port \'%s\' will not be polled.",
-                          port->rxqs[i].core_id, i,
-                          netdev_get_name(port->netdev));
-                continue;
-            }
-            pmd->isolated = true;
-            dp_netdev_pmd_unref(pmd);
-        } else {
-            if (port->rxqs[i].core_id != RXQ_CORE_UNPINNED) {
-                continue;
-            }
-            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
-            if (!pmd) {
-                VLOG_WARN("There's no available pmd thread on numa node %d",
-                          numa_id);
-                break;
-            }
-        }
-
-        ovs_mutex_lock(&pmd->port_mutex);
-        dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
-        ovs_mutex_unlock(&pmd->port_mutex);
-
-        hmapx_add(to_reload, pmd);
-    }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
- * that need to be restarted are inserted in 'to_reload'. */
-static void
-dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
-                             struct hmapx *to_reload)
-{
-    struct dp_netdev_pmd_thread *pmd;
-
-    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
-
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        dp_netdev_add_port_tx_to_pmd(pmd, port);
-        hmapx_add(to_reload, pmd);
-    }
-}
-
-/* Distributes all non-pinned rx queues of 'port' between all PMD threads
- * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
- * if needed. */
-static void
-dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
-{
-    struct dp_netdev_pmd_thread *pmd;
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct hmapx_node *node;
-
-    dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
-static void
-dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
-{
-    int can_have, n_unpinned, i;
-
-    n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
-    if (!n_unpinned) {
-        VLOG_WARN("Cannot create pmd threads due to out of unpinned "
-                  "cores on numa node %d", numa_id);
-        return;
-    }
-
-    /* If cpu mask is specified, uses all unpinned cores, otherwise
-     * tries creating NR_PMD_THREADS pmd threads. */
-    can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
-    for (i = 0; i < can_have; i++) {
-        unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
-        struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
-        struct dp_netdev_port *port;
-
-        dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
-
-        HMAP_FOR_EACH (port, node, &dp->ports) {
-            dp_netdev_add_port_tx_to_pmd(pmd, port);
-        }
-
-        pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
-    }
-    VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
+    pmd->need_reload = true;
 }
 
-/* Starts pmd threads, if not already started. The function takes care of
- * filling the threads tx port cache. */
+/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
+ * changes to take effect. */
 static void
-dp_netdev_start_pmds(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex)
+dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
+                               struct tx_port *tx)
+    OVS_REQUIRES(pmd->port_mutex)
 {
-    int n_pmds;
-
-    n_pmds = get_n_pmd_threads(dp);
-
-    /* If there are already pmd threads created for the datapath, do nothing.
-     * Else, creates the pmd threads. */
-    if (!n_pmds) {
-        int n_numas = ovs_numa_get_n_numas();
-
-        for (int numa_id = 0; numa_id < n_numas; numa_id++) {
-            dp_netdev_start_pmds_on_numa(dp, numa_id);
-        }
-    }
+    hmap_remove(&pmd->tx_ports, &tx->node);
+    free(tx);
+    pmd->need_reload = true;
 }
-
 
-/* Called after pmd threads config change.  Restarts pmd threads with
- * new configuration. */
-static void
-dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->port_mutex)
-{
-    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
-    struct dp_netdev_pmd_thread *pmd;
-    struct dp_netdev_port *port;
-    struct hmapx_node *node;
-
-    dp_netdev_start_pmds(dp);
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        /* Distribute only pinned rx queues first to mark threads as isolated */
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
-    }
-
-    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
-    HMAP_FOR_EACH (port, node, &dp->ports) {
-        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
-    }
-
-    HMAPX_FOR_EACH (node, &to_reload) {
-        pmd = (struct dp_netdev_pmd_thread *) node->data;
-        dp_netdev_reload_pmd__(pmd);
-    }
-
-    hmapx_destroy(&to_reload);
-}
-
 static char *
 dpif_netdev_get_datapath_version(void)
 {
@@ -4871,12 +4887,12 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
     /* Remove port. */
     hmap_remove(&dp->ports, &port->node);
-    dp_netdev_del_port_from_all_pmds(dp, port);
+    reconfigure_datapath(dp);
 
     /* Reinsert with new port number. */
     port->port_no = port_no;
     hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
-    dp_netdev_add_port_to_pmds(dp, port);
+    reconfigure_datapath(dp);
 
     seq_change(dp->port_seq);
     unixctl_command_reply(conn, NULL);
diff --git a/tests/pmd.at b/tests/pmd.at
index 147dfda..d3dcf24 100644
--- a/tests/pmd.at
+++ b/tests/pmd.at
@@ -607,8 +607,7 @@ p2 0 0 0
 p2 1 0 0
 ])
 
-dnl During reconfiguration some packets will be dropped. This is expected
-OVS_VSWITCHD_STOP(["/dpif(monitor[[0-9]]\+)|WARN|dummy at ovs-dummy: execute [[0-9]]\+ failed/d"])
+OVS_VSWITCHD_STOP
 AT_CLEANUP
 
 AT_SETUP([PMD - dpctl])
-- 
2.10.2



More information about the dev mailing list