[ovs-dev] [PATCH v2 18/19] dpif-netdev: Centralized threads and queues handling code.

Daniele Di Proietto diproiettod at vmware.com
Mon Dec 5 19:46:29 UTC 2016


Thanks again for looking at this, please let me know if you have other comments.


On 05/12/2016 05:39, "Ilya Maximets" <i.maximets at samsung.com> wrote:

>Thanks for v2.
>
>Global comment:
>Could you please add a space between '{C,H}MAP_FOR_EACH[_WITH_HASH]' and
>the expressions that follow them. (Not only in that patch)

Good point, I fixed that in every commit of the series.

>
>Few more comments inline.
>
>Best regards, Ilya Maximets.
>
>On 03.12.2016 05:14, Daniele Di Proietto wrote:
>> Currently we have three different code paths that deal with pmd threads
>> and queues, in response to different input
>> 
>> 1. When a port is added
>> 2. When a port is deleted
>> 3. When the cpumask changes or a port must be reconfigured.
>> 
>> 1. and 2. are carefully written to minimize disruption to the running
>> datapath, while 3. brings down all the threads reconfigure all the ports
>> and restarts everything.
>> 
>> This commit removes the three separate code paths by introducing the
>> reconfigure_datapath() function, that takes care of adapting the pmd
>> threads and queues to the current datapath configuration, no matter how
>> we got there.
>> 
>> This aims at simplifying mantenaince and introduces a long overdue
>
>s/mantenaince/maintenance

changed, thanks

>
>> improvement: port reconfiguration (can happen quite frequently for
>> dpdkvhost ports) is now done without shutting down the whole datapath,
>> but just by temporarily removing the port that needs to be reconfigured
>> (while the rest of the datapath is running).
>> 
>> We now also recompute the rxq scheduling from scratch every time a port
>> is added of deleted.  This means that the queues will be more balanced,
>> especially when dealing with explicit rxq-affinity from the user
>> (without shutting down the threads and restarting them), but it also
>> means that adding or deleting a port might cause existing queues to be
>> moved between pmd threads.  This negative effect can be avoided by
>> taking into account the existing distribution when computing the new
>> scheduling, but I considered code clarity and fast reconfiguration more
>> important than optimizing port addition or removal (a port is added and
>> removed only once, but can be reconfigured many times)
>> 
>> Lastly, this commit moves the pmd threads state away from ovs-numa.  Now
>> the pmd threads state is kept only in dpif-netdev.
>> 
>> Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
>> Co-authored-by: Ilya Maximets <i.maximets at samsung.com>
>> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
>> ---
>>  lib/dpif-netdev.c | 910 +++++++++++++++++++++++++++---------------------------
>>  tests/pmd.at      |   3 +-
>>  2 files changed, 464 insertions(+), 449 deletions(-)
>> 
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index 37479b8..3509493 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -289,6 +289,7 @@ struct dp_netdev_rxq {
>>                                            pinned. RXQ_CORE_UNPINNED if the
>>                                            queue doesn't need to be pinned to a
>>                                            particular core. */
>> +    struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this queue. */
>>  };
>>  
>>  /* A port in a netdev-based datapath. */
>> @@ -304,6 +305,7 @@ struct dp_netdev_port {
>>      struct ovs_mutex txq_used_mutex;
>>      char *type;                 /* Port type as requested by user. */
>>      char *rxq_affinity_list;    /* Requested affinity of rx queues. */
>> +    bool need_reconfigure;      /* True if we should reconfigure netdev. */
>>  };
>>  
>>  /* Contained by struct dp_netdev_flow's 'stats' member.  */
>> @@ -506,7 +508,7 @@ struct dp_netdev_pmd_thread {
>>  
>>      /* Queue id used by this pmd thread to send packets on all netdevs if
>>       * XPS disabled for this netdev. All static_tx_qid's are unique and less
>> -     * than 'ovs_numa_get_n_cores() + 1'. */
>> +     * than 'cmap_count(dp->poll_threads)'. */
>>      const int static_tx_qid;
>>  
>>      struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>> @@ -535,6 +537,9 @@ struct dp_netdev_pmd_thread {
>>       * reporting to the user */
>>      unsigned long long stats_zero[DP_N_STATS];
>>      uint64_t cycles_zero[PMD_N_CYCLES];
>> +
>> +    /* Set to true if the pmd thread needs to be reloaded. */
>> +    bool need_reload;
>>  };
>>  
>>  /* Interface to netdev-based datapath. */
>> @@ -579,29 +584,26 @@ static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
>>  static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
>>      OVS_REQUIRES(dp->port_mutex);
>>  
>> +static void *pmd_thread_main(void *);
>>  static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
>>                                                        unsigned core_id);
>>  static struct dp_netdev_pmd_thread *
>>  dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
>> -static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
>> -static void dp_netdev_stop_pmds(struct dp_netdev *dp);
>> -static void dp_netdev_start_pmds(struct dp_netdev *dp)
>> -    OVS_REQUIRES(dp->port_mutex);
>> +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
>>  static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
>> -static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
>> -                                             struct dp_netdev_port *port);
>> -static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
>> -                                       struct dp_netdev_port *port);
>>  static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> -                                         struct dp_netdev_port *port);
>> +                                         struct dp_netdev_port *port)
>> +    OVS_REQUIRES(pmd->port_mutex);
>> +static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> +                                           struct tx_port *tx)
>> +    OVS_REQUIRES(pmd->port_mutex);
>>  static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>>                                       struct dp_netdev_rxq *rxq)
>>      OVS_REQUIRES(pmd->port_mutex);
>> -static struct dp_netdev_pmd_thread *
>> -dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
>> -static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
>> -    OVS_REQUIRES(dp->port_mutex);
>> -static void reconfigure_pmd_threads(struct dp_netdev *dp)
>> +static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> +                                       struct rxq_poll *poll)
>> +    OVS_REQUIRES(pmd->port_mutex);
>> +static void reconfigure_datapath(struct dp_netdev *dp)
>>      OVS_REQUIRES(dp->port_mutex);
>>  static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>>  static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
>> @@ -1152,7 +1154,7 @@ dp_netdev_free(struct dp_netdev *dp)
>>          do_del_port(dp, port);
>>      }
>>      ovs_mutex_unlock(&dp->port_mutex);
>> -    dp_netdev_destroy_all_pmds(dp);
>> +    dp_netdev_destroy_all_pmds(dp, true);
>>      cmap_destroy(&dp->poll_threads);
>>  
>>      ovs_mutex_destroy(&dp->non_pmd_mutex);
>> @@ -1287,10 +1289,7 @@ port_create(const char *devname, const char *type,
>>      struct dp_netdev_port *port;
>>      enum netdev_flags flags;
>>      struct netdev *netdev;
>> -    int n_open_rxqs = 0;
>> -    int n_cores = 0;
>> -    int i, error;
>> -    bool dynamic_txqs = false;
>> +    int error;
>>  
>>      *portp = NULL;
>>  
>> @@ -1308,79 +1307,24 @@ port_create(const char *devname, const char *type,
>>          goto out;
>>      }
>>  
>> -    if (netdev_is_pmd(netdev)) {
>> -        n_cores = ovs_numa_get_n_cores();
>> -
>> -        if (n_cores == OVS_CORE_UNSPEC) {
>> -            VLOG_ERR("%s, cannot get cpu core info", devname);
>> -            error = ENOENT;
>> -            goto out;
>> -        }
>> -        /* There can only be ovs_numa_get_n_cores() pmd threads,
>> -         * so creates a txq for each, and one extra for the non
>> -         * pmd threads. */
>> -        error = netdev_set_tx_multiq(netdev, n_cores + 1);
>> -        if (error && (error != EOPNOTSUPP)) {
>> -            VLOG_ERR("%s, cannot set multiq", devname);
>> -            goto out;
>> -        }
>> -    }
>> -
>> -    if (netdev_is_reconf_required(netdev)) {
>> -        error = netdev_reconfigure(netdev);
>> -        if (error) {
>> -            goto out;
>> -        }
>> -    }
>> -
>> -    if (netdev_is_pmd(netdev)) {
>> -        if (netdev_n_txq(netdev) < n_cores + 1) {
>> -            dynamic_txqs = true;
>> -        }
>> +    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
>> +    if (error) {
>> +        VLOG_ERR("%s: cannot set promisc flag", devname);
>> +        goto out;
>>      }
>>  
>>      port = xzalloc(sizeof *port);
>>      port->port_no = port_no;
>>      port->netdev = netdev;
>> -    port->n_rxq = netdev_n_rxq(netdev);
>> -    port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
>> -    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
>>      port->type = xstrdup(type);
>> -    ovs_mutex_init(&port->txq_used_mutex);
>> -    port->dynamic_txqs = dynamic_txqs;
>> -
>> -    for (i = 0; i < port->n_rxq; i++) {
>> -        port->rxqs[i].port = port;
>> -        error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
>> -        if (error) {
>> -            VLOG_ERR("%s: cannot receive packets on this network device (%s)",
>> -                     devname, ovs_strerror(errno));
>> -            goto out_rxq_close;
>> -        }
>> -        port->rxqs[i].core_id = RXQ_CORE_UNPINNED;
>> -        n_open_rxqs++;
>> -    }
>> -
>> -    error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
>> -    if (error) {
>> -        goto out_rxq_close;
>> -    }
>>      port->sf = sf;
>> +    port->need_reconfigure = true;
>> +    ovs_mutex_init(&port->txq_used_mutex);
>>  
>>      *portp = port;
>>  
>>      return 0;
>>  
>> -out_rxq_close:
>> -    for (i = 0; i < n_open_rxqs; i++) {
>> -        netdev_rxq_close(port->rxqs[i].rx);
>> -    }
>> -    ovs_mutex_destroy(&port->txq_used_mutex);
>> -    free(port->type);
>> -    free(port->txq_used);
>> -    free(port->rxqs);
>> -    free(port);
>> -
>>  out:
>>      netdev_close(netdev);
>>      return error;
>> @@ -1404,15 +1348,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>>          return error;
>>      }
>>  
>> -    if (netdev_is_pmd(port->netdev)) {
>> -        dp_netdev_start_pmds(dp);
>> -    }
>> -
>> -    dp_netdev_add_port_to_pmds(dp, port);
>> -
>>      hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>>      seq_change(dp->port_seq);
>>  
>> +    reconfigure_datapath(dp);
>> +
>>      return 0;
>>  }
>>  
>> @@ -1537,13 +1477,6 @@ get_port_by_name(struct dp_netdev *dp,
>>      return ENOENT;
>>  }
>>  
>> -static int
>> -get_n_pmd_threads(struct dp_netdev *dp)
>> -{
>> -    /* There is one non pmd thread in dp->poll_threads */
>> -    return cmap_count(&dp->poll_threads) - 1;
>> -}
>> -
>>  /* Returns 'true' if there is a port with pmd netdev. */
>>  static bool
>>  has_pmd_port(struct dp_netdev *dp)
>> @@ -1560,7 +1493,6 @@ has_pmd_port(struct dp_netdev *dp)
>>      return false;
>>  }
>>  
>> -
>>  static void
>>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>      OVS_REQUIRES(dp->port_mutex)
>> @@ -1568,14 +1500,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>      hmap_remove(&dp->ports, &port->node);
>>      seq_change(dp->port_seq);
>>  
>> -    dp_netdev_del_port_from_all_pmds(dp, port);
>> -
>> -    if (netdev_is_pmd(port->netdev)) {
>> -        /* If there is no pmd netdev, delete the pmd threads */
>> -        if (!has_pmd_port(dp)) {
>> -            dp_netdev_stop_pmds(dp);
>> -        }
>> -    }
>> +    reconfigure_datapath(dp);
>>  
>>      port_destroy(port);
>>  }
>> @@ -2977,15 +2902,27 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>>      }
>>  }
>>  
>> +static struct tx_port *
>> +tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
>> +{
>> +    struct tx_port *tx;
>> +
>> +    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>> +        if (tx->port->port_no == port_no) {
>> +            return tx;
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>>  static int
>>  port_reconfigure(struct dp_netdev_port *port)
>>  {
>>      struct netdev *netdev = port->netdev;
>>      int i, err;
>>  
>> -    if (!netdev_is_reconf_required(netdev)) {
>> -        return 0;
>> -    }
>> +    port->need_reconfigure = false;
>>  
>>      /* Closes the existing 'rxq's. */
>>      for (i = 0; i < port->n_rxq; i++) {
>> @@ -2995,11 +2932,13 @@ port_reconfigure(struct dp_netdev_port *port)
>>      port->n_rxq = 0;
>>  
>>      /* Allows 'netdev' to apply the pending configuration changes. */
>> -    err = netdev_reconfigure(netdev);
>> -    if (err && (err != EOPNOTSUPP)) {
>> -        VLOG_ERR("Failed to set interface %s new configuration",
>> -                 netdev_get_name(netdev));
>> -        return err;
>> +    if (netdev_is_reconf_required(netdev)) {
>> +        err = netdev_reconfigure(netdev);
>> +        if (err && (err != EOPNOTSUPP)) {
>> +            VLOG_ERR("Failed to set interface %s new configuration",
>> +                     netdev_get_name(netdev));
>> +            return err;
>> +        }
>>      }
>>      /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
>>      port->rxqs = xrealloc(port->rxqs,
>> @@ -3023,42 +2962,393 @@ port_reconfigure(struct dp_netdev_port *port)
>>      return 0;
>>  }
>>  
>> +struct rr_numa_list {
>> +    struct hmap numas;  /* Contains 'struct rr_numa' */
>> +};
>> +
>> +struct rr_numa {
>> +    struct hmap_node node;
>> +
>> +    int numa_id;
>> +
>> +    /* Non isolated pmds on numa node 'numa_id' */
>> +    struct dp_netdev_pmd_thread **pmds;
>> +    int n_pmds;
>> +
>> +    int cur_index;
>> +};
>> +
>> +static struct rr_numa *
>> +rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
>> +{
>> +    struct rr_numa *numa;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH(numa, node, hash_int(numa_id, 0), &rr->numas) {
>> +        if (numa->numa_id == numa_id) {
>> +            return numa;
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static void
>> +rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
>> +{
>> +    struct dp_netdev_pmd_thread *pmd;
>> +    struct rr_numa *numa;
>> +
>> +    hmap_init(&rr->numas);
>> +
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
>> +            continue;
>> +        }
>> +
>> +        numa = rr_numa_list_lookup(rr, pmd->numa_id);
>> +        if (!numa) {
>> +            numa = xzalloc(sizeof *numa);
>> +            numa->numa_id = pmd->numa_id;
>> +            hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
>> +        }
>> +        numa->n_pmds++;
>> +        numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
>> +        numa->pmds[numa->n_pmds - 1] = pmd;
>> +    }
>> +}
>> +
>> +static struct dp_netdev_pmd_thread *
>> +rr_numa_get_pmd(struct rr_numa *numa)
>> +{
>> +    return numa->pmds[numa->cur_index++ % numa->n_pmds];
>> +}
>> +
>> +static void
>> +rr_numa_list_destroy(struct rr_numa_list *rr)
>> +{
>> +    struct rr_numa *numa;
>> +
>> +    HMAP_FOR_EACH_POP(numa, node, &rr->numas) {
>> +        free(numa->pmds);
>> +        free(numa);
>> +    }
>> +    hmap_destroy(&rr->numas);
>> +}
>> +
>> +/* Assign pmds to queues.  If 'pinned' is true, assign pmds to pinned
>> + * queues and marks the pmds as isolated.  Otherwise, assign non isolated
>> + * pmds to unpinned queues.
>> + *
>> + * The function doesn't touch the pmd threads, it just stores the assignment
>> + * in the 'pmd' member of each rxq. */
>> +static void
>> +rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
>> +{
>> +    struct dp_netdev_port *port;
>> +    struct rr_numa_list rr;
>> +
>> +    rr_numa_list_populate(dp, &rr);
>> +
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>> +        struct rr_numa *numa;
>> +        int numa_id;
>> +
>> +        if (!netdev_is_pmd(port->netdev)) {
>> +            continue;
>> +        }
>> +
>> +        numa_id = netdev_get_numa_id(port->netdev);
>> +        numa = rr_numa_list_lookup(&rr, numa_id);
>> +
>> +        for (int qid = 0; qid < port->n_rxq; qid++) {
>> +            struct dp_netdev_rxq *q = &port->rxqs[qid];
>> +
>> +            if (pinned && q->core_id != RXQ_CORE_UNPINNED) {
>> +                struct dp_netdev_pmd_thread *pmd;
>> +
>> +                pmd = dp_netdev_get_pmd(dp, q->core_id);
>> +                if (!pmd) {
>> +                    VLOG_WARN("There is no PMD thread on core %d. Queue "
>> +                              "%d on port \'%s\' will not be polled.",
>> +                              q->core_id, qid, netdev_get_name(port->netdev));
>> +                } else {
>> +                    q->pmd = pmd;
>> +                    pmd->isolated = true;
>> +                    dp_netdev_pmd_unref(pmd);
>> +                }
>> +            } else if (!pinned && q->core_id == RXQ_CORE_UNPINNED) {
>> +                if (!numa) {
>> +                    VLOG_WARN("There's no available (non isolated) pmd thread"
>> +                              "on numa node %d. Queue %d on port \'%s\' will"
>> +                              "not be polled.",
>> +                              numa_id, qid, netdev_get_name(port->netdev));
>> +                } else {
>> +                    q->pmd = rr_numa_get_pmd(numa);
>> +                }
>> +            }
>> +        }
>> +    }
>> +
>> +    rr_numa_list_destroy(&rr);
>> +}
>> +
>>  static void
>>  reconfigure_pmd_threads(struct dp_netdev *dp)
>>      OVS_REQUIRES(dp->port_mutex)
>>  {
>> -    struct dp_netdev_port *port, *next;
>> -    int n_cores;
>> +    struct dp_netdev_pmd_thread *pmd;
>> +    struct ovs_numa_dump *pmd_cores;
>> +    bool changed = false;
>> +
>> +    /* The pmd threads should be started only if there's a pmd port in the
>> +     * datapath.  If the user didn't provide any "pmd-cpu-mask", we start
>> +     * NR_PMD_THREADS per numa node. */
>> +    if (!has_pmd_port(dp)) {
>> +        pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
>> +    } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
>> +        pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
>> +    } else {
>> +        pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
>> +    }
>> +
>> +    /* Check for changed configuration */
>> +    if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
>> +        changed = true;
>> +    } else {
>> +        CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +            if (pmd->core_id != NON_PMD_CORE_ID
>> +                && !ovs_numa_dump_contains_core(pmd_cores,
>> +                                                pmd->numa_id,
>> +                                                pmd->core_id)) {
>> +                changed = true;
>> +                break;
>> +            }
>> +        }
>> +    }
>> +
>> +    /* Destroy the old and recreate the new pmd threads.  We don't perform an
>> +     * incremental update because we would have to adjust 'static_tx_qid'. */
>> +    if (changed) {
>> +        struct ovs_numa_dump *all_numas;
>> +        struct ovs_numa_info_core *core;
>> +        struct ovs_numa_info_core *numa;
>> +
>> +        /* Do not destroy the non pmd thread. */
>> +        dp_netdev_destroy_all_pmds(dp, false);
>> +        FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
>> +            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
>> +
>> +            dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
>> +
>> +            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
>> +        }
>> +
>> +        /* Log the number of pmd threads per numa node. */
>> +        all_numas = ovs_numa_dump_n_cores_per_numa(1);
>> +
>> +        FOR_EACH_CORE_ON_DUMP(numa, all_numas) {
>> +            int n = 0;
>> +
>> +            FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
>> +                if (core->numa_id == numa->numa_id) {
>> +                    n++;
>> +                }
>> +            }
>> +
>> +            if (n) {
>> +                VLOG_INFO("Created %d pmd threads on numa node %d",
>> +                          n, numa->numa_id);
>> +            }
>
>Hmm, 'FOR_EACH_NUMA_ON_DUMP()' implemented but still the same code here.

Oops, I forgot to squash the new code.

Please consider the following incremental applied:

---8<---

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 3509493..70a082e 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -3129,9 +3129,8 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
     /* Destroy the old and recreate the new pmd threads.  We don't perform an
      * incremental update because we would have to adjust 'static_tx_qid'. */
     if (changed) {
-        struct ovs_numa_dump *all_numas;
         struct ovs_numa_info_core *core;
-        struct ovs_numa_info_core *numa;
+        struct ovs_numa_info_numa *numa;
 
         /* Do not destroy the non pmd thread. */
         dp_netdev_destroy_all_pmds(dp, false);
@@ -3144,23 +3143,10 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
         }
 
         /* Log the number of pmd threads per numa node. */
-        all_numas = ovs_numa_dump_n_cores_per_numa(1);
-
-        FOR_EACH_CORE_ON_DUMP(numa, all_numas) {
-            int n = 0;
-
-            FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
-                if (core->numa_id == numa->numa_id) {
-                    n++;
-                }
-            }
-
-            if (n) {
-                VLOG_INFO("Created %d pmd threads on numa node %d",
-                          n, numa->numa_id);
-            }
+        FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
+            VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+                      numa->n_cores, numa->numa_id);
         }
-        ovs_numa_dump_destroy(all_numas);
     }
 
     ovs_numa_dump_destroy(pmd_cores);

---8<---

>
>> +        }
>> +        ovs_numa_dump_destroy(all_numas);
>> +    }
>> +
>> +    ovs_numa_dump_destroy(pmd_cores);
>> +}
>> +
>> +static void
>> +reload_affected_pmds(struct dp_netdev *dp)
>> +{
>> +    struct dp_netdev_pmd_thread *pmd;
>> +
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        if (pmd->need_reload) {
>> +            dp_netdev_reload_pmd__(pmd);
>> +            pmd->need_reload = false;
>> +        }
>> +    }
>> +}
>> +
>> +static void
>> +pmd_remove_stale_ports(struct dp_netdev *dp,
>> +                       struct dp_netdev_pmd_thread *pmd)
>> +    OVS_EXCLUDED(pmd->port_mutex)
>> +    OVS_REQUIRES(dp->port_mutex)
>> +{
>> +    struct rxq_poll *poll, *poll_next;
>> +    struct tx_port *tx, *tx_next;
>> +
>> +    ovs_mutex_lock(&pmd->port_mutex);
>> +    HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
>> +        struct dp_netdev_port *port = poll->rxq->port;
>> +
>> +        if (port->need_reconfigure
>> +            || dp_netdev_lookup_port(dp, port->port_no) != port) {
>
>-            || dp_netdev_lookup_port(dp, port->port_no) != port) {
>+            || !hmap_contains(&dp->ports, &port->node)) {
>
>
>> +            dp_netdev_del_rxq_from_pmd(pmd, poll);
>> +        }
>> +    }
>> +    HMAP_FOR_EACH_SAFE(tx, tx_next, node, &pmd->tx_ports) {
>> +        struct dp_netdev_port *port = tx->port;
>> +
>> +        if (port->need_reconfigure
>> +            || dp_netdev_lookup_port(dp, port->port_no) != port) {
>
>-            || dp_netdev_lookup_port(dp, port->port_no) != port) {
>+            || !hmap_contains(&dp->ports, &port->node)) {

It looks better, I squashed the two changes, thanks

>
>> +            dp_netdev_del_port_tx_from_pmd(pmd, tx);
>> +        }
>> +    }
>> +    ovs_mutex_unlock(&pmd->port_mutex);
>> +}
>> +
>> +/* Must be called each time a port is added/removed or the cmask changes.
>> + * This creates and destroys pmd threads, reconfigures ports, opens their
>> + * rxqs and assigns all rxqs/txqs to pmd threads. */
>> +static void
>> +reconfigure_datapath(struct dp_netdev *dp)
>> +    OVS_REQUIRES(dp->port_mutex)
>> +{
>> +    struct dp_netdev_pmd_thread *pmd;
>> +    struct dp_netdev_port *port;
>> +    int wanted_txqs;
>>  
>>      dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
>>  
>> -    dp_netdev_destroy_all_pmds(dp);
>> +    /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
>> +     * on the system and the user configuration. */
>> +    reconfigure_pmd_threads(dp);
>>  
>> -    /* Reconfigures the cpu mask. */
>> -    ovs_numa_set_cpu_mask(dp->pmd_cmask);
>> +    wanted_txqs = cmap_count(&dp->poll_threads);
>>  
>> -    n_cores = ovs_numa_get_n_cores();
>> -    if (n_cores == OVS_CORE_UNSPEC) {
>> -        VLOG_ERR("Cannot get cpu core info");
>> -        return;
>> +    /* The number of pmd threads might have changed, or a port can be new:
>> +     * adjust the txqs. */
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>> +        netdev_set_tx_multiq(port->netdev, wanted_txqs);
>>      }
>>  
>> -    HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
>> +    /* Step 2: Remove from the pmd threads ports that have been removed or
>> +     * need reconfiguration. */
>> +
>> +    /* Check for all the ports that need reconfiguration.  We cache this in
>> +     * 'port->reconfigure', because netdev_is_reconf_required() can change at
>> +     * any time. */
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>> +        if (netdev_is_reconf_required(port->netdev)) {
>> +            port->need_reconfigure = true;
>> +        }
>> +    }
>> +
>> +    /* Remove from the pmd threads all the ports that have been deleted or
>> +     * need reconfiguration. */
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        pmd_remove_stale_ports(dp, pmd);
>> +    }
>> +
>> +    /* Reload affected pmd threads.  We must wait for the pmd threads before
>> +     * reconfiguring the ports, because a port cannot be reconfigured while
>> +     * it's being used. */
>> +    reload_affected_pmds(dp);
>> +
>> +    /* Step 3: Reconfigure ports. */
>> +
>> +    /* We only reconfigure the ports that we determined above, because they're
>> +     * not being used by any pmd thread at the moment.  If a port fails to
>> +     * reconfigure we remove it from the datapath. */
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>>          int err;
>>  
>> +        if (!port->need_reconfigure) {
>> +            continue;
>> +        }
>> +
>>          err = port_reconfigure(port);
>>          if (err) {
>>              hmap_remove(&dp->ports, &port->node);
>>              seq_change(dp->port_seq);
>>              port_destroy(port);
>>          } else {
>> -            port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
>> +            port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
>>          }
>>      }
>> -    /* Restores the non-pmd. */
>> -    dp_netdev_set_nonpmd(dp);
>> -    /* Restores all pmd threads. */
>> -    dp_netdev_reset_pmd_threads(dp);
>> +
>> +    /* Step 4: Compute new rxq scheduling.  We don't touch the pmd threads
>> +     * for now, we just update the 'pmd' pointer in each rxq to point to the
>> +     * wanted thread according to the scheduling policy. */
>> +
>> +    /* Reset all the pmd threads to non isolated. */
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        pmd->isolated = false;
>> +    }
>> +
>> +    /* Reset all the queues to unassigned */
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>> +        for (int i = 0; i < port->n_rxq; i++) {
>> +            port->rxqs[i].pmd = NULL;
>> +        }
>> +    }
>> +
>> +    /* Add pinned queues and mark pmd threads isolated. */
>> +    rxq_scheduling(dp, true);
>> +
>> +    /* Add non-pinned queues. */
>> +    rxq_scheduling(dp, false);
>> +
>> +    /* Step 5: Remove queues not compliant with new scheduling. */
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        struct rxq_poll *poll, *poll_next;
>> +
>> +        ovs_mutex_lock(&pmd->port_mutex);
>> +        HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
>> +            if (poll->rxq->pmd != pmd) {
>> +                dp_netdev_del_rxq_from_pmd(pmd, poll);
>> +            }
>> +        }
>> +        ovs_mutex_unlock(&pmd->port_mutex);
>> +    }
>> +
>> +    /* Reload affected pmd threads.  We must wait for the pmd threads to remove
>> +     * the old queues before readding them, otherwise a queue can be polled by
>> +     * two threads at the same time. */
>> +    reload_affected_pmds(dp);
>> +
>> +    /* Step 6: Add queues from scheduling, if they're not there already. */
>> +    HMAP_FOR_EACH(port, node, &dp->ports) {
>> +        if (!netdev_is_pmd(port->netdev)) {
>> +            continue;
>> +        }
>> +
>> +        for (int qid = 0; qid < port->n_rxq; qid++) {
>> +            struct dp_netdev_rxq *q = &port->rxqs[qid];
>> +
>> +            if (q->pmd) {
>> +                ovs_mutex_lock(&q->pmd->port_mutex);
>> +                dp_netdev_add_rxq_to_pmd(q->pmd, q);
>> +                ovs_mutex_unlock(&q->pmd->port_mutex);
>> +            }
>> +        }
>> +    }
>> +
>> +    /* Add every port to the tx cache of every pmd thread, if it's not
>> +     * there already and if this pmd has at least one rxq to poll. */
>> +    CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> +        ovs_mutex_lock(&pmd->port_mutex);
>> +        if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
>> +            HMAP_FOR_EACH(port, node, &dp->ports) {
>> +                dp_netdev_add_port_tx_to_pmd(pmd, port);
>> +            }
>> +        }
>> +        ovs_mutex_unlock(&pmd->port_mutex);
>> +    }
>> +
>> +    /* Reload affected pmd threads. */
>> +    reload_affected_pmds(dp);
>>  }
>>  
>>  /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
>> @@ -3107,7 +3397,7 @@ dpif_netdev_run(struct dpif *dpif)
>>      }
>>  
>>      if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
>> -        reconfigure_pmd_threads(dp);
>> +        reconfigure_datapath(dp);
>>      }
>>      ovs_mutex_unlock(&dp->port_mutex);
>>  
>> @@ -3357,16 +3647,9 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
>>      OVS_REQUIRES(dp->port_mutex)
>>  {
>>      struct dp_netdev_pmd_thread *non_pmd;
>> -    struct dp_netdev_port *port;
>>  
>>      non_pmd = xzalloc(sizeof *non_pmd);
>>      dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
>> -
>> -    HMAP_FOR_EACH (port, node, &dp->ports) {
>> -        dp_netdev_add_port_tx_to_pmd(non_pmd, port);
>> -    }
>> -
>> -    dp_netdev_reload_pmd__(non_pmd);
>>  }
>>  
>>  /* Caller must have valid pointer to 'pmd'. */
>> @@ -3412,10 +3695,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>      pmd->dp = dp;
>>      pmd->core_id = core_id;
>>      pmd->numa_id = numa_id;
>> +    pmd->need_reload = false;
>>  
>> -    *CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
>> -                                              ? ovs_numa_get_n_cores()
>> -                                              : get_n_pmd_threads(dp);
>> +    *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
>>  
>>      ovs_refcount_init(&pmd->ref_cnt);
>>      latch_init(&pmd->exit_latch);
>> @@ -3483,7 +3765,6 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>>      } else {
>>          latch_set(&pmd->exit_latch);
>>          dp_netdev_reload_pmd__(pmd);
>> -        ovs_numa_unpin_core(pmd->core_id);
>>          xpthread_join(pmd->thread, NULL);
>>      }
>>  
>> @@ -3498,20 +3779,20 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>>      dp_netdev_pmd_unref(pmd);
>>  }
>>  
>> -/* Destroys all pmd threads, but not the non pmd thread. */
>> +/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
>> + * thread. */
>>  static void
>> -dp_netdev_stop_pmds(struct dp_netdev *dp)
>> +dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
>>  {
>>      struct dp_netdev_pmd_thread *pmd;
>>      struct dp_netdev_pmd_thread **pmd_list;
>>      size_t k = 0, n_pmds;
>>  
>> -    n_pmds = get_n_pmd_threads(dp);
>> +    n_pmds = cmap_count(&dp->poll_threads);
>>      pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
>>  
>>      CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> -        /* We don't need to destroy the non pmd thread */
>> -        if (pmd->core_id == NON_PMD_CORE_ID) {
>> +        if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
>>              continue;
>>          }
>>          /* We cannot call dp_netdev_del_pmd(), since it alters
>> @@ -3527,32 +3808,6 @@ dp_netdev_stop_pmds(struct dp_netdev *dp)
>>      free(pmd_list);
>>  }
>>  
>> -/* Destroys all pmd threads, including the non pmd thread. */
>> -static void
>> -dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
>> -{
>> -    struct dp_netdev_pmd_thread *pmd;
>> -    struct dp_netdev_pmd_thread **pmd_list;
>> -    size_t k = 0, n_pmds;
>> -
>> -    n_pmds = cmap_count(&dp->poll_threads);
>> -    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
>> -
>> -    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> -        /* We cannot call dp_netdev_del_pmd(), since it alters
>> -         * 'dp->poll_threads' (while we're iterating it) and it
>> -         * might quiesce. */
>> -        ovs_assert(k < n_pmds);
>> -        pmd_list[k++] = pmd;
>> -    }
>> -
>> -    for (size_t i = 0; i < k; i++) {
>> -        dp_netdev_del_pmd(dp, pmd_list[i]);
>> -    }
>> -
>> -    free(pmd_list);
>> -}
>> -
>>  /* Deletes all rx queues from pmd->poll_list and all the ports from
>>   * pmd->tx_ports. */
>>  static void
>> @@ -3571,126 +3826,40 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
>>      ovs_mutex_unlock(&pmd->port_mutex);
>>  }
>>  
>> -static struct tx_port *
>> -tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
>> -{
>> -    struct tx_port *tx;
>> -
>> -    HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>> -        if (tx->port->port_no == port_no) {
>> -            return tx;
>> -        }
>> -    }
>> -
>> -    return NULL;
>> -}
>> -
>> -/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
>> - * 'tx_ports' of 'pmd' thread.  Returns true if 'port' was found in 'pmd'
>> - * (therefore a restart is required). */
>> -static bool
>> -dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
>> -                              struct dp_netdev_pmd_thread *pmd)
>> -{
>> -    struct rxq_poll *poll, *next;
>> -    struct tx_port *tx;
>> -    bool found = false;
>> -
>> -    ovs_mutex_lock(&pmd->port_mutex);
>> -    HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
>> -        if (poll->rxq->port == port) {
>> -            found = true;
>> -            hmap_remove(&pmd->poll_list, &poll->node);
>> -            free(poll);
>> -        }
>> -    }
>> -
>> -    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
>> -    if (tx) {
>> -        hmap_remove(&pmd->tx_ports, &tx->node);
>> -        free(tx);
>> -        found = true;
>> -    }
>> -    ovs_mutex_unlock(&pmd->port_mutex);
>> -
>> -    return found;
>> -}
>> -
>> -/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
>> - * threads.  The pmd threads that need to be restarted are inserted in
>> - * 'to_reload'. */
>> +/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
>>  static void
>> -dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
>> -                                   struct dp_netdev_port *port,
>> -                                   struct hmapx *to_reload)
>> +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> +                         struct dp_netdev_rxq *rxq)
>> +    OVS_REQUIRES(pmd->port_mutex)
>>  {
>> -    struct dp_netdev_pmd_thread *pmd;
>> -
>> -    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> -        bool found;
>> -
>> -        found = dp_netdev_del_port_from_pmd__(port, pmd);
>> +    int qid = netdev_rxq_get_queue_id(rxq->rx);
>> +    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
>> +    struct rxq_poll *poll;
>>  
>> -        if (found) {
>> -            hmapx_add(to_reload, pmd);
>> +    HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
>> +        if (poll->rxq == rxq) {
>> +            /* 'rxq' is already polled by this thread. Do nothing. */
>> +            return;
>>          }
>>      }
>> -}
>> -
>> -/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
>> - * threads. Reloads the threads if needed. */
>> -static void
>> -dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
>> -                                 struct dp_netdev_port *port)
>> -{
>> -    struct dp_netdev_pmd_thread *pmd;
>> -    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> -    struct hmapx_node *node;
>>  
>> -    dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
>> -
>> -    HMAPX_FOR_EACH (node, &to_reload) {
>> -        pmd = (struct dp_netdev_pmd_thread *) node->data;
>> -        dp_netdev_reload_pmd__(pmd);
>> -    }
>> -
>> -    hmapx_destroy(&to_reload);
>> -}
>> -
>> -
>> -/* Returns non-isolated PMD thread from this numa node with fewer
>> - * rx queues to poll. Returns NULL if there is no non-isolated  PMD threads
>> - * on this numa node. Can be called safely only by main thread. */
>> -static struct dp_netdev_pmd_thread *
>> -dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
>> -{
>> -    int min_cnt = -1;
>> -    struct dp_netdev_pmd_thread *pmd, *res = NULL;
>> -
>> -    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> -        if (!pmd->isolated && pmd->numa_id == numa_id
>> -            && (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
>> -            min_cnt = hmap_count(&pmd->poll_list);
>> -            res = pmd;
>> -        }
>> -    }
>> +    poll = xmalloc(sizeof *poll);
>> +    poll->rxq = rxq;
>> +    hmap_insert(&pmd->poll_list, &poll->node, hash);
>>  
>> -    return res;
>> +    pmd->need_reload = true;
>>  }
>>  
>> -/* Adds rx queue to poll_list of PMD thread. */
>> +/* Delete 'poll' from poll_list of PMD thread. */
>>  static void
>> -dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> -                         struct dp_netdev_rxq *rxq)
>> +dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> +                           struct rxq_poll *poll)
>>      OVS_REQUIRES(pmd->port_mutex)
>>  {
>> -    int qid = netdev_rxq_get_queue_id(rxq->rx);
>> -    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
>> -    struct rxq_poll *poll;
>> +    hmap_remove(&pmd->poll_list, &poll->node);
>> +    free(poll);
>>  
>> -    poll = xmalloc(sizeof *poll);
>> -    poll->rxq = rxq;
>> -    hmap_insert(&pmd->poll_list, &poll->node, hash);
>> +    pmd->need_reload = true;
>>  }
>>  
>>  /* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
>> @@ -3698,190 +3867,37 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>>  static void
>>  dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>>                               struct dp_netdev_port *port)
>> +    OVS_REQUIRES(pmd->port_mutex)
>>  {
>>      struct tx_port *tx;
>>  
>> +    tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
>> +    if (tx) {
>> +        /* 'port' is already on this thread tx cache. Do nothing. */
>> +        return;
>> +    }
>> +
>>      tx = xzalloc(sizeof *tx);
>>  
>>      tx->port = port;
>>      tx->qid = -1;
>>  
>> -    ovs_mutex_lock(&pmd->port_mutex);
>>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>> -    ovs_mutex_unlock(&pmd->port_mutex);
>> -}
>> -
>> -/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
>> - * threads in 'dp'. The pmd threads that need to be restarted are inserted
>> - * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
>> -static void
>> -dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
>> -                              struct dp_netdev_port *port,
>> -                              struct hmapx *to_reload, bool pinned)
>> -{
>> -    int numa_id = netdev_get_numa_id(port->netdev);
>> -    struct dp_netdev_pmd_thread *pmd;
>> -    int i;
>> -
>> -    if (!netdev_is_pmd(port->netdev)) {
>> -        return;
>> -    }
>> -
>> -    for (i = 0; i < port->n_rxq; i++) {
>> -        if (pinned) {
>> -            if (port->rxqs[i].core_id == RXQ_CORE_UNPINNED) {
>> -                continue;
>> -            }
>> -            pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
>> -            if (!pmd) {
>> -                VLOG_WARN("There is no PMD thread on core %d. "
>> -                          "Queue %d on port \'%s\' will not be polled.",
>> -                          port->rxqs[i].core_id, i,
>> -                          netdev_get_name(port->netdev));
>> -                continue;
>> -            }
>> -            pmd->isolated = true;
>> -            dp_netdev_pmd_unref(pmd);
>> -        } else {
>> -            if (port->rxqs[i].core_id != RXQ_CORE_UNPINNED) {
>> -                continue;
>> -            }
>> -            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
>> -            if (!pmd) {
>> -                VLOG_WARN("There's no available pmd thread on numa node %d",
>> -                          numa_id);
>> -                break;
>> -            }
>> -        }
>> -
>> -        ovs_mutex_lock(&pmd->port_mutex);
>> -        dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
>> -        ovs_mutex_unlock(&pmd->port_mutex);
>> -
>> -        hmapx_add(to_reload, pmd);
>> -    }
>> -}
>> -
>> -/* Distributes all non-pinned rx queues of 'port' between all PMD threads
>> - * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
>> - * that need to be restarted are inserted in 'to_reload'. */
>> -static void
>> -dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
>> -                             struct hmapx *to_reload)
>> -{
>> -    struct dp_netdev_pmd_thread *pmd;
>> -
>> -    dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
>> -
>> -    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> -        dp_netdev_add_port_tx_to_pmd(pmd, port);
>> -        hmapx_add(to_reload, pmd);
>> -    }
>> -}
>> -
>> -/* Distributes all non-pinned rx queues of 'port' between all PMD threads
>> - * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
>> - * if needed. */
>> -static void
>> -dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
>> -{
>> -    struct dp_netdev_pmd_thread *pmd;
>> -    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> -    struct hmapx_node *node;
>> -
>> -    dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
>> -
>> -    HMAPX_FOR_EACH (node, &to_reload) {
>> -        pmd = (struct dp_netdev_pmd_thread *) node->data;
>> -        dp_netdev_reload_pmd__(pmd);
>> -    }
>> -
>> -    hmapx_destroy(&to_reload);
>> -}
>> -
>> -static void
>> -dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>> -{
>> -    int can_have, n_unpinned, i;
>> -
>> -    n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
>> -    if (!n_unpinned) {
>> -        VLOG_WARN("Cannot create pmd threads due to out of unpinned "
>> -                  "cores on numa node %d", numa_id);
>> -        return;
>> -    }
>> -
>> -    /* If cpu mask is specified, uses all unpinned cores, otherwise
>> -     * tries creating NR_PMD_THREADS pmd threads. */
>> -    can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
>> -    for (i = 0; i < can_have; i++) {
>> -        unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
>> -        struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
>> -        struct dp_netdev_port *port;
>> -
>> -        dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
>> -
>> -        HMAP_FOR_EACH (port, node, &dp->ports) {
>> -            dp_netdev_add_port_tx_to_pmd(pmd, port);
>> -        }
>> -
>> -        pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
>> -    }
>> -    VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
>> +    pmd->need_reload = true;
>>  }
>>  
>> -/* Starts pmd threads, if not already started. The function takes care of
>> - * filling the threads tx port cache. */
>> +/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
>> + * changes to take effect. */
>>  static void
>> -dp_netdev_start_pmds(struct dp_netdev *dp)
>> -    OVS_REQUIRES(dp->port_mutex)
>> +dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> +                               struct tx_port *tx)
>> +    OVS_REQUIRES(pmd->port_mutex)
>>  {
>> -    int n_pmds;
>> -
>> -    n_pmds = get_n_pmd_threads(dp);
>> -
>> -    /* If there are already pmd threads created for the datapath, do nothing.
>> -     * Else, creates the pmd threads. */
>> -    if (!n_pmds) {
>> -        int n_numas = ovs_numa_get_n_numas();
>> -
>> -        for (int numa_id = 0; numa_id < n_numas; numa_id++) {
>> -            dp_netdev_start_pmds_on_numa(dp, numa_id);
>> -        }
>> -    }
>> +    hmap_remove(&pmd->tx_ports, &tx->node);
>> +    free(tx);
>> +    pmd->need_reload = true;
>>  }
>> -
>>  ?
>> -/* Called after pmd threads config change.  Restarts pmd threads with
>> - * new configuration. */
>> -static void
>> -dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
>> -    OVS_REQUIRES(dp->port_mutex)
>> -{
>> -    struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> -    struct dp_netdev_pmd_thread *pmd;
>> -    struct dp_netdev_port *port;
>> -    struct hmapx_node *node;
>> -
>> -    dp_netdev_start_pmds(dp);
>> -    HMAP_FOR_EACH (port, node, &dp->ports) {
>> -        /* Distribute only pinned rx queues first to mark threads as isolated */
>> -        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
>> -    }
>> -
>> -    /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
>> -    HMAP_FOR_EACH (port, node, &dp->ports) {
>> -        dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
>> -    }
>> -
>> -    HMAPX_FOR_EACH (node, &to_reload) {
>> -        pmd = (struct dp_netdev_pmd_thread *) node->data;
>> -        dp_netdev_reload_pmd__(pmd);
>> -    }
>> -
>> -    hmapx_destroy(&to_reload);
>> -}
>> -
>>  static char *
>>  dpif_netdev_get_datapath_version(void)
>>  {
>> @@ -4871,12 +4887,12 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
>>  
>>      /* Remove port. */
>>      hmap_remove(&dp->ports, &port->node);
>> -    dp_netdev_del_port_from_all_pmds(dp, port);
>> +    reconfigure_datapath(dp);
>>  
>>      /* Reinsert with new port number. */
>>      port->port_no = port_no;
>>      hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>> -    dp_netdev_add_port_to_pmds(dp, port);
>> +    reconfigure_datapath(dp);
>>  
>>      seq_change(dp->port_seq);
>>      unixctl_command_reply(conn, NULL);
>> diff --git a/tests/pmd.at b/tests/pmd.at
>> index 147dfda..d3dcf24 100644
>> --- a/tests/pmd.at
>> +++ b/tests/pmd.at
>> @@ -607,8 +607,7 @@ p2 0 0 0
>>  p2 1 0 0
>>  ])
>>  
>> -dnl During reconfiguration some packets will be dropped. This is expected
>> -OVS_VSWITCHD_STOP(["/dpif(monitor[[0-9]]\+)|WARN|dummy at ovs-dummy: execute [[0-9]]\+ failed/d"])
>> +OVS_VSWITCHD_STOP
>>  AT_CLEANUP
>>  
>>  AT_SETUP([PMD - dpctl])
>> 


More information about the dev mailing list