[ovs-dev] [PATCH v2 18/19] dpif-netdev: Centralized threads and queues handling code.
Daniele Di Proietto
diproiettod at vmware.com
Mon Dec 5 19:46:29 UTC 2016
Thanks again for looking at this, please let me know if you have other comments.
On 05/12/2016 05:39, "Ilya Maximets" <i.maximets at samsung.com> wrote:
>Thanks for v2.
>
>Global comment:
>Could you please add a space between '{C,H}MAP_FOR_EACH[_WITH_HASH]' and
>the expressions that follow them. (Not only in that patch)
Good point, I fixed that in every commit of the series.
>
>Few more comments inline.
>
>Best regards, Ilya Maximets.
>
>On 03.12.2016 05:14, Daniele Di Proietto wrote:
>> Currently we have three different code paths that deal with pmd threads
>> and queues, in response to different input
>>
>> 1. When a port is added
>> 2. When a port is deleted
>> 3. When the cpumask changes or a port must be reconfigured.
>>
>> 1. and 2. are carefully written to minimize disruption to the running
>> datapath, while 3. brings down all the threads reconfigure all the ports
>> and restarts everything.
>>
>> This commit removes the three separate code paths by introducing the
>> reconfigure_datapath() function, that takes care of adapting the pmd
>> threads and queues to the current datapath configuration, no matter how
>> we got there.
>>
>> This aims at simplifying mantenaince and introduces a long overdue
>
>s/mantenaince/maintenance
changed, thanks
>
>> improvement: port reconfiguration (can happen quite frequently for
>> dpdkvhost ports) is now done without shutting down the whole datapath,
>> but just by temporarily removing the port that needs to be reconfigured
>> (while the rest of the datapath is running).
>>
>> We now also recompute the rxq scheduling from scratch every time a port
>> is added of deleted. This means that the queues will be more balanced,
>> especially when dealing with explicit rxq-affinity from the user
>> (without shutting down the threads and restarting them), but it also
>> means that adding or deleting a port might cause existing queues to be
>> moved between pmd threads. This negative effect can be avoided by
>> taking into account the existing distribution when computing the new
>> scheduling, but I considered code clarity and fast reconfiguration more
>> important than optimizing port addition or removal (a port is added and
>> removed only once, but can be reconfigured many times)
>>
>> Lastly, this commit moves the pmd threads state away from ovs-numa. Now
>> the pmd threads state is kept only in dpif-netdev.
>>
>> Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
>> Co-authored-by: Ilya Maximets <i.maximets at samsung.com>
>> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
>> ---
>> lib/dpif-netdev.c | 910 +++++++++++++++++++++++++++---------------------------
>> tests/pmd.at | 3 +-
>> 2 files changed, 464 insertions(+), 449 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index 37479b8..3509493 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -289,6 +289,7 @@ struct dp_netdev_rxq {
>> pinned. RXQ_CORE_UNPINNED if the
>> queue doesn't need to be pinned to a
>> particular core. */
>> + struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */
>> };
>>
>> /* A port in a netdev-based datapath. */
>> @@ -304,6 +305,7 @@ struct dp_netdev_port {
>> struct ovs_mutex txq_used_mutex;
>> char *type; /* Port type as requested by user. */
>> char *rxq_affinity_list; /* Requested affinity of rx queues. */
>> + bool need_reconfigure; /* True if we should reconfigure netdev. */
>> };
>>
>> /* Contained by struct dp_netdev_flow's 'stats' member. */
>> @@ -506,7 +508,7 @@ struct dp_netdev_pmd_thread {
>>
>> /* Queue id used by this pmd thread to send packets on all netdevs if
>> * XPS disabled for this netdev. All static_tx_qid's are unique and less
>> - * than 'ovs_numa_get_n_cores() + 1'. */
>> + * than 'cmap_count(dp->poll_threads)'. */
>> const int static_tx_qid;
>>
>> struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
>> @@ -535,6 +537,9 @@ struct dp_netdev_pmd_thread {
>> * reporting to the user */
>> unsigned long long stats_zero[DP_N_STATS];
>> uint64_t cycles_zero[PMD_N_CYCLES];
>> +
>> + /* Set to true if the pmd thread needs to be reloaded. */
>> + bool need_reload;
>> };
>>
>> /* Interface to netdev-based datapath. */
>> @@ -579,29 +584,26 @@ static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
>> static void dp_netdev_set_nonpmd(struct dp_netdev *dp)
>> OVS_REQUIRES(dp->port_mutex);
>>
>> +static void *pmd_thread_main(void *);
>> static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
>> unsigned core_id);
>> static struct dp_netdev_pmd_thread *
>> dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
>> -static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
>> -static void dp_netdev_stop_pmds(struct dp_netdev *dp);
>> -static void dp_netdev_start_pmds(struct dp_netdev *dp)
>> - OVS_REQUIRES(dp->port_mutex);
>> +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd);
>> static void dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd);
>> -static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
>> - struct dp_netdev_port *port);
>> -static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
>> - struct dp_netdev_port *port);
>> static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> - struct dp_netdev_port *port);
>> + struct dp_netdev_port *port)
>> + OVS_REQUIRES(pmd->port_mutex);
>> +static void dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> + struct tx_port *tx)
>> + OVS_REQUIRES(pmd->port_mutex);
>> static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> struct dp_netdev_rxq *rxq)
>> OVS_REQUIRES(pmd->port_mutex);
>> -static struct dp_netdev_pmd_thread *
>> -dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
>> -static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
>> - OVS_REQUIRES(dp->port_mutex);
>> -static void reconfigure_pmd_threads(struct dp_netdev *dp)
>> +static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> + struct rxq_poll *poll)
>> + OVS_REQUIRES(pmd->port_mutex);
>> +static void reconfigure_datapath(struct dp_netdev *dp)
>> OVS_REQUIRES(dp->port_mutex);
>> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>> static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
>> @@ -1152,7 +1154,7 @@ dp_netdev_free(struct dp_netdev *dp)
>> do_del_port(dp, port);
>> }
>> ovs_mutex_unlock(&dp->port_mutex);
>> - dp_netdev_destroy_all_pmds(dp);
>> + dp_netdev_destroy_all_pmds(dp, true);
>> cmap_destroy(&dp->poll_threads);
>>
>> ovs_mutex_destroy(&dp->non_pmd_mutex);
>> @@ -1287,10 +1289,7 @@ port_create(const char *devname, const char *type,
>> struct dp_netdev_port *port;
>> enum netdev_flags flags;
>> struct netdev *netdev;
>> - int n_open_rxqs = 0;
>> - int n_cores = 0;
>> - int i, error;
>> - bool dynamic_txqs = false;
>> + int error;
>>
>> *portp = NULL;
>>
>> @@ -1308,79 +1307,24 @@ port_create(const char *devname, const char *type,
>> goto out;
>> }
>>
>> - if (netdev_is_pmd(netdev)) {
>> - n_cores = ovs_numa_get_n_cores();
>> -
>> - if (n_cores == OVS_CORE_UNSPEC) {
>> - VLOG_ERR("%s, cannot get cpu core info", devname);
>> - error = ENOENT;
>> - goto out;
>> - }
>> - /* There can only be ovs_numa_get_n_cores() pmd threads,
>> - * so creates a txq for each, and one extra for the non
>> - * pmd threads. */
>> - error = netdev_set_tx_multiq(netdev, n_cores + 1);
>> - if (error && (error != EOPNOTSUPP)) {
>> - VLOG_ERR("%s, cannot set multiq", devname);
>> - goto out;
>> - }
>> - }
>> -
>> - if (netdev_is_reconf_required(netdev)) {
>> - error = netdev_reconfigure(netdev);
>> - if (error) {
>> - goto out;
>> - }
>> - }
>> -
>> - if (netdev_is_pmd(netdev)) {
>> - if (netdev_n_txq(netdev) < n_cores + 1) {
>> - dynamic_txqs = true;
>> - }
>> + error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
>> + if (error) {
>> + VLOG_ERR("%s: cannot set promisc flag", devname);
>> + goto out;
>> }
>>
>> port = xzalloc(sizeof *port);
>> port->port_no = port_no;
>> port->netdev = netdev;
>> - port->n_rxq = netdev_n_rxq(netdev);
>> - port->rxqs = xcalloc(port->n_rxq, sizeof *port->rxqs);
>> - port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
>> port->type = xstrdup(type);
>> - ovs_mutex_init(&port->txq_used_mutex);
>> - port->dynamic_txqs = dynamic_txqs;
>> -
>> - for (i = 0; i < port->n_rxq; i++) {
>> - port->rxqs[i].port = port;
>> - error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
>> - if (error) {
>> - VLOG_ERR("%s: cannot receive packets on this network device (%s)",
>> - devname, ovs_strerror(errno));
>> - goto out_rxq_close;
>> - }
>> - port->rxqs[i].core_id = RXQ_CORE_UNPINNED;
>> - n_open_rxqs++;
>> - }
>> -
>> - error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
>> - if (error) {
>> - goto out_rxq_close;
>> - }
>> port->sf = sf;
>> + port->need_reconfigure = true;
>> + ovs_mutex_init(&port->txq_used_mutex);
>>
>> *portp = port;
>>
>> return 0;
>>
>> -out_rxq_close:
>> - for (i = 0; i < n_open_rxqs; i++) {
>> - netdev_rxq_close(port->rxqs[i].rx);
>> - }
>> - ovs_mutex_destroy(&port->txq_used_mutex);
>> - free(port->type);
>> - free(port->txq_used);
>> - free(port->rxqs);
>> - free(port);
>> -
>> out:
>> netdev_close(netdev);
>> return error;
>> @@ -1404,15 +1348,11 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>> return error;
>> }
>>
>> - if (netdev_is_pmd(port->netdev)) {
>> - dp_netdev_start_pmds(dp);
>> - }
>> -
>> - dp_netdev_add_port_to_pmds(dp, port);
>> -
>> hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>> seq_change(dp->port_seq);
>>
>> + reconfigure_datapath(dp);
>> +
>> return 0;
>> }
>>
>> @@ -1537,13 +1477,6 @@ get_port_by_name(struct dp_netdev *dp,
>> return ENOENT;
>> }
>>
>> -static int
>> -get_n_pmd_threads(struct dp_netdev *dp)
>> -{
>> - /* There is one non pmd thread in dp->poll_threads */
>> - return cmap_count(&dp->poll_threads) - 1;
>> -}
>> -
>> /* Returns 'true' if there is a port with pmd netdev. */
>> static bool
>> has_pmd_port(struct dp_netdev *dp)
>> @@ -1560,7 +1493,6 @@ has_pmd_port(struct dp_netdev *dp)
>> return false;
>> }
>>
>> -
>> static void
>> do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>> OVS_REQUIRES(dp->port_mutex)
>> @@ -1568,14 +1500,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>> hmap_remove(&dp->ports, &port->node);
>> seq_change(dp->port_seq);
>>
>> - dp_netdev_del_port_from_all_pmds(dp, port);
>> -
>> - if (netdev_is_pmd(port->netdev)) {
>> - /* If there is no pmd netdev, delete the pmd threads */
>> - if (!has_pmd_port(dp)) {
>> - dp_netdev_stop_pmds(dp);
>> - }
>> - }
>> + reconfigure_datapath(dp);
>>
>> port_destroy(port);
>> }
>> @@ -2977,15 +2902,27 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>> }
>> }
>>
>> +static struct tx_port *
>> +tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
>> +{
>> + struct tx_port *tx;
>> +
>> + HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>> + if (tx->port->port_no == port_no) {
>> + return tx;
>> + }
>> + }
>> +
>> + return NULL;
>> +}
>> +
>> static int
>> port_reconfigure(struct dp_netdev_port *port)
>> {
>> struct netdev *netdev = port->netdev;
>> int i, err;
>>
>> - if (!netdev_is_reconf_required(netdev)) {
>> - return 0;
>> - }
>> + port->need_reconfigure = false;
>>
>> /* Closes the existing 'rxq's. */
>> for (i = 0; i < port->n_rxq; i++) {
>> @@ -2995,11 +2932,13 @@ port_reconfigure(struct dp_netdev_port *port)
>> port->n_rxq = 0;
>>
>> /* Allows 'netdev' to apply the pending configuration changes. */
>> - err = netdev_reconfigure(netdev);
>> - if (err && (err != EOPNOTSUPP)) {
>> - VLOG_ERR("Failed to set interface %s new configuration",
>> - netdev_get_name(netdev));
>> - return err;
>> + if (netdev_is_reconf_required(netdev)) {
>> + err = netdev_reconfigure(netdev);
>> + if (err && (err != EOPNOTSUPP)) {
>> + VLOG_ERR("Failed to set interface %s new configuration",
>> + netdev_get_name(netdev));
>> + return err;
>> + }
>> }
>> /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
>> port->rxqs = xrealloc(port->rxqs,
>> @@ -3023,42 +2962,393 @@ port_reconfigure(struct dp_netdev_port *port)
>> return 0;
>> }
>>
>> +struct rr_numa_list {
>> + struct hmap numas; /* Contains 'struct rr_numa' */
>> +};
>> +
>> +struct rr_numa {
>> + struct hmap_node node;
>> +
>> + int numa_id;
>> +
>> + /* Non isolated pmds on numa node 'numa_id' */
>> + struct dp_netdev_pmd_thread **pmds;
>> + int n_pmds;
>> +
>> + int cur_index;
>> +};
>> +
>> +static struct rr_numa *
>> +rr_numa_list_lookup(struct rr_numa_list *rr, int numa_id)
>> +{
>> + struct rr_numa *numa;
>> +
>> + HMAP_FOR_EACH_WITH_HASH(numa, node, hash_int(numa_id, 0), &rr->numas) {
>> + if (numa->numa_id == numa_id) {
>> + return numa;
>> + }
>> + }
>> +
>> + return NULL;
>> +}
>> +
>> +static void
>> +rr_numa_list_populate(struct dp_netdev *dp, struct rr_numa_list *rr)
>> +{
>> + struct dp_netdev_pmd_thread *pmd;
>> + struct rr_numa *numa;
>> +
>> + hmap_init(&rr->numas);
>> +
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + if (pmd->core_id == NON_PMD_CORE_ID || pmd->isolated) {
>> + continue;
>> + }
>> +
>> + numa = rr_numa_list_lookup(rr, pmd->numa_id);
>> + if (!numa) {
>> + numa = xzalloc(sizeof *numa);
>> + numa->numa_id = pmd->numa_id;
>> + hmap_insert(&rr->numas, &numa->node, hash_int(pmd->numa_id, 0));
>> + }
>> + numa->n_pmds++;
>> + numa->pmds = xrealloc(numa->pmds, numa->n_pmds * sizeof *numa->pmds);
>> + numa->pmds[numa->n_pmds - 1] = pmd;
>> + }
>> +}
>> +
>> +static struct dp_netdev_pmd_thread *
>> +rr_numa_get_pmd(struct rr_numa *numa)
>> +{
>> + return numa->pmds[numa->cur_index++ % numa->n_pmds];
>> +}
>> +
>> +static void
>> +rr_numa_list_destroy(struct rr_numa_list *rr)
>> +{
>> + struct rr_numa *numa;
>> +
>> + HMAP_FOR_EACH_POP(numa, node, &rr->numas) {
>> + free(numa->pmds);
>> + free(numa);
>> + }
>> + hmap_destroy(&rr->numas);
>> +}
>> +
>> +/* Assign pmds to queues. If 'pinned' is true, assign pmds to pinned
>> + * queues and marks the pmds as isolated. Otherwise, assign non isolated
>> + * pmds to unpinned queues.
>> + *
>> + * The function doesn't touch the pmd threads, it just stores the assignment
>> + * in the 'pmd' member of each rxq. */
>> +static void
>> +rxq_scheduling(struct dp_netdev *dp, bool pinned) OVS_REQUIRES(dp->port_mutex)
>> +{
>> + struct dp_netdev_port *port;
>> + struct rr_numa_list rr;
>> +
>> + rr_numa_list_populate(dp, &rr);
>> +
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + struct rr_numa *numa;
>> + int numa_id;
>> +
>> + if (!netdev_is_pmd(port->netdev)) {
>> + continue;
>> + }
>> +
>> + numa_id = netdev_get_numa_id(port->netdev);
>> + numa = rr_numa_list_lookup(&rr, numa_id);
>> +
>> + for (int qid = 0; qid < port->n_rxq; qid++) {
>> + struct dp_netdev_rxq *q = &port->rxqs[qid];
>> +
>> + if (pinned && q->core_id != RXQ_CORE_UNPINNED) {
>> + struct dp_netdev_pmd_thread *pmd;
>> +
>> + pmd = dp_netdev_get_pmd(dp, q->core_id);
>> + if (!pmd) {
>> + VLOG_WARN("There is no PMD thread on core %d. Queue "
>> + "%d on port \'%s\' will not be polled.",
>> + q->core_id, qid, netdev_get_name(port->netdev));
>> + } else {
>> + q->pmd = pmd;
>> + pmd->isolated = true;
>> + dp_netdev_pmd_unref(pmd);
>> + }
>> + } else if (!pinned && q->core_id == RXQ_CORE_UNPINNED) {
>> + if (!numa) {
>> + VLOG_WARN("There's no available (non isolated) pmd thread"
>> + "on numa node %d. Queue %d on port \'%s\' will"
>> + "not be polled.",
>> + numa_id, qid, netdev_get_name(port->netdev));
>> + } else {
>> + q->pmd = rr_numa_get_pmd(numa);
>> + }
>> + }
>> + }
>> + }
>> +
>> + rr_numa_list_destroy(&rr);
>> +}
>> +
>> static void
>> reconfigure_pmd_threads(struct dp_netdev *dp)
>> OVS_REQUIRES(dp->port_mutex)
>> {
>> - struct dp_netdev_port *port, *next;
>> - int n_cores;
>> + struct dp_netdev_pmd_thread *pmd;
>> + struct ovs_numa_dump *pmd_cores;
>> + bool changed = false;
>> +
>> + /* The pmd threads should be started only if there's a pmd port in the
>> + * datapath. If the user didn't provide any "pmd-cpu-mask", we start
>> + * NR_PMD_THREADS per numa node. */
>> + if (!has_pmd_port(dp)) {
>> + pmd_cores = ovs_numa_dump_n_cores_per_numa(0);
>> + } else if (dp->pmd_cmask && dp->pmd_cmask[0]) {
>> + pmd_cores = ovs_numa_dump_cores_with_cmask(dp->pmd_cmask);
>> + } else {
>> + pmd_cores = ovs_numa_dump_n_cores_per_numa(NR_PMD_THREADS);
>> + }
>> +
>> + /* Check for changed configuration */
>> + if (ovs_numa_dump_count(pmd_cores) != cmap_count(&dp->poll_threads) - 1) {
>> + changed = true;
>> + } else {
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + if (pmd->core_id != NON_PMD_CORE_ID
>> + && !ovs_numa_dump_contains_core(pmd_cores,
>> + pmd->numa_id,
>> + pmd->core_id)) {
>> + changed = true;
>> + break;
>> + }
>> + }
>> + }
>> +
>> + /* Destroy the old and recreate the new pmd threads. We don't perform an
>> + * incremental update because we would have to adjust 'static_tx_qid'. */
>> + if (changed) {
>> + struct ovs_numa_dump *all_numas;
>> + struct ovs_numa_info_core *core;
>> + struct ovs_numa_info_core *numa;
>> +
>> + /* Do not destroy the non pmd thread. */
>> + dp_netdev_destroy_all_pmds(dp, false);
>> + FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
>> + struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
>> +
>> + dp_netdev_configure_pmd(pmd, dp, core->core_id, core->numa_id);
>> +
>> + pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
>> + }
>> +
>> + /* Log the number of pmd threads per numa node. */
>> + all_numas = ovs_numa_dump_n_cores_per_numa(1);
>> +
>> + FOR_EACH_CORE_ON_DUMP(numa, all_numas) {
>> + int n = 0;
>> +
>> + FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
>> + if (core->numa_id == numa->numa_id) {
>> + n++;
>> + }
>> + }
>> +
>> + if (n) {
>> + VLOG_INFO("Created %d pmd threads on numa node %d",
>> + n, numa->numa_id);
>> + }
>
>Hmm, 'FOR_EACH_NUMA_ON_DUMP()' implemented but still the same code here.
Oops, I forgot to squash the new code.
Please consider the following incremental applied:
---8<---
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 3509493..70a082e 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -3129,9 +3129,8 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
/* Destroy the old and recreate the new pmd threads. We don't perform an
* incremental update because we would have to adjust 'static_tx_qid'. */
if (changed) {
- struct ovs_numa_dump *all_numas;
struct ovs_numa_info_core *core;
- struct ovs_numa_info_core *numa;
+ struct ovs_numa_info_numa *numa;
/* Do not destroy the non pmd thread. */
dp_netdev_destroy_all_pmds(dp, false);
@@ -3144,23 +3143,10 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
}
/* Log the number of pmd threads per numa node. */
- all_numas = ovs_numa_dump_n_cores_per_numa(1);
-
- FOR_EACH_CORE_ON_DUMP(numa, all_numas) {
- int n = 0;
-
- FOR_EACH_CORE_ON_DUMP(core, pmd_cores) {
- if (core->numa_id == numa->numa_id) {
- n++;
- }
- }
-
- if (n) {
- VLOG_INFO("Created %d pmd threads on numa node %d",
- n, numa->numa_id);
- }
+ FOR_EACH_NUMA_ON_DUMP (numa, pmd_cores) {
+ VLOG_INFO("Created %"PRIuSIZE" pmd threads on numa node %d",
+ numa->n_cores, numa->numa_id);
}
- ovs_numa_dump_destroy(all_numas);
}
ovs_numa_dump_destroy(pmd_cores);
---8<---
>
>> + }
>> + ovs_numa_dump_destroy(all_numas);
>> + }
>> +
>> + ovs_numa_dump_destroy(pmd_cores);
>> +}
>> +
>> +static void
>> +reload_affected_pmds(struct dp_netdev *dp)
>> +{
>> + struct dp_netdev_pmd_thread *pmd;
>> +
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + if (pmd->need_reload) {
>> + dp_netdev_reload_pmd__(pmd);
>> + pmd->need_reload = false;
>> + }
>> + }
>> +}
>> +
>> +static void
>> +pmd_remove_stale_ports(struct dp_netdev *dp,
>> + struct dp_netdev_pmd_thread *pmd)
>> + OVS_EXCLUDED(pmd->port_mutex)
>> + OVS_REQUIRES(dp->port_mutex)
>> +{
>> + struct rxq_poll *poll, *poll_next;
>> + struct tx_port *tx, *tx_next;
>> +
>> + ovs_mutex_lock(&pmd->port_mutex);
>> + HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
>> + struct dp_netdev_port *port = poll->rxq->port;
>> +
>> + if (port->need_reconfigure
>> + || dp_netdev_lookup_port(dp, port->port_no) != port) {
>
>- || dp_netdev_lookup_port(dp, port->port_no) != port) {
>+ || !hmap_contains(&dp->ports, &port->node)) {
>
>
>> + dp_netdev_del_rxq_from_pmd(pmd, poll);
>> + }
>> + }
>> + HMAP_FOR_EACH_SAFE(tx, tx_next, node, &pmd->tx_ports) {
>> + struct dp_netdev_port *port = tx->port;
>> +
>> + if (port->need_reconfigure
>> + || dp_netdev_lookup_port(dp, port->port_no) != port) {
>
>- || dp_netdev_lookup_port(dp, port->port_no) != port) {
>+ || !hmap_contains(&dp->ports, &port->node)) {
It looks better, I squashed the two changes, thanks
>
>> + dp_netdev_del_port_tx_from_pmd(pmd, tx);
>> + }
>> + }
>> + ovs_mutex_unlock(&pmd->port_mutex);
>> +}
>> +
>> +/* Must be called each time a port is added/removed or the cmask changes.
>> + * This creates and destroys pmd threads, reconfigures ports, opens their
>> + * rxqs and assigns all rxqs/txqs to pmd threads. */
>> +static void
>> +reconfigure_datapath(struct dp_netdev *dp)
>> + OVS_REQUIRES(dp->port_mutex)
>> +{
>> + struct dp_netdev_pmd_thread *pmd;
>> + struct dp_netdev_port *port;
>> + int wanted_txqs;
>>
>> dp->last_reconfigure_seq = seq_read(dp->reconfigure_seq);
>>
>> - dp_netdev_destroy_all_pmds(dp);
>> + /* Step 1: Adjust the pmd threads based on the datapath ports, the cores
>> + * on the system and the user configuration. */
>> + reconfigure_pmd_threads(dp);
>>
>> - /* Reconfigures the cpu mask. */
>> - ovs_numa_set_cpu_mask(dp->pmd_cmask);
>> + wanted_txqs = cmap_count(&dp->poll_threads);
>>
>> - n_cores = ovs_numa_get_n_cores();
>> - if (n_cores == OVS_CORE_UNSPEC) {
>> - VLOG_ERR("Cannot get cpu core info");
>> - return;
>> + /* The number of pmd threads might have changed, or a port can be new:
>> + * adjust the txqs. */
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + netdev_set_tx_multiq(port->netdev, wanted_txqs);
>> }
>>
>> - HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
>> + /* Step 2: Remove from the pmd threads ports that have been removed or
>> + * need reconfiguration. */
>> +
>> + /* Check for all the ports that need reconfiguration. We cache this in
>> + * 'port->reconfigure', because netdev_is_reconf_required() can change at
>> + * any time. */
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + if (netdev_is_reconf_required(port->netdev)) {
>> + port->need_reconfigure = true;
>> + }
>> + }
>> +
>> + /* Remove from the pmd threads all the ports that have been deleted or
>> + * need reconfiguration. */
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + pmd_remove_stale_ports(dp, pmd);
>> + }
>> +
>> + /* Reload affected pmd threads. We must wait for the pmd threads before
>> + * reconfiguring the ports, because a port cannot be reconfigured while
>> + * it's being used. */
>> + reload_affected_pmds(dp);
>> +
>> + /* Step 3: Reconfigure ports. */
>> +
>> + /* We only reconfigure the ports that we determined above, because they're
>> + * not being used by any pmd thread at the moment. If a port fails to
>> + * reconfigure we remove it from the datapath. */
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> int err;
>>
>> + if (!port->need_reconfigure) {
>> + continue;
>> + }
>> +
>> err = port_reconfigure(port);
>> if (err) {
>> hmap_remove(&dp->ports, &port->node);
>> seq_change(dp->port_seq);
>> port_destroy(port);
>> } else {
>> - port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
>> + port->dynamic_txqs = netdev_n_txq(port->netdev) < wanted_txqs;
>> }
>> }
>> - /* Restores the non-pmd. */
>> - dp_netdev_set_nonpmd(dp);
>> - /* Restores all pmd threads. */
>> - dp_netdev_reset_pmd_threads(dp);
>> +
>> + /* Step 4: Compute new rxq scheduling. We don't touch the pmd threads
>> + * for now, we just update the 'pmd' pointer in each rxq to point to the
>> + * wanted thread according to the scheduling policy. */
>> +
>> + /* Reset all the pmd threads to non isolated. */
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + pmd->isolated = false;
>> + }
>> +
>> + /* Reset all the queues to unassigned */
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + for (int i = 0; i < port->n_rxq; i++) {
>> + port->rxqs[i].pmd = NULL;
>> + }
>> + }
>> +
>> + /* Add pinned queues and mark pmd threads isolated. */
>> + rxq_scheduling(dp, true);
>> +
>> + /* Add non-pinned queues. */
>> + rxq_scheduling(dp, false);
>> +
>> + /* Step 5: Remove queues not compliant with new scheduling. */
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + struct rxq_poll *poll, *poll_next;
>> +
>> + ovs_mutex_lock(&pmd->port_mutex);
>> + HMAP_FOR_EACH_SAFE(poll, poll_next, node, &pmd->poll_list) {
>> + if (poll->rxq->pmd != pmd) {
>> + dp_netdev_del_rxq_from_pmd(pmd, poll);
>> + }
>> + }
>> + ovs_mutex_unlock(&pmd->port_mutex);
>> + }
>> +
>> + /* Reload affected pmd threads. We must wait for the pmd threads to remove
>> + * the old queues before readding them, otherwise a queue can be polled by
>> + * two threads at the same time. */
>> + reload_affected_pmds(dp);
>> +
>> + /* Step 6: Add queues from scheduling, if they're not there already. */
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + if (!netdev_is_pmd(port->netdev)) {
>> + continue;
>> + }
>> +
>> + for (int qid = 0; qid < port->n_rxq; qid++) {
>> + struct dp_netdev_rxq *q = &port->rxqs[qid];
>> +
>> + if (q->pmd) {
>> + ovs_mutex_lock(&q->pmd->port_mutex);
>> + dp_netdev_add_rxq_to_pmd(q->pmd, q);
>> + ovs_mutex_unlock(&q->pmd->port_mutex);
>> + }
>> + }
>> + }
>> +
>> + /* Add every port to the tx cache of every pmd thread, if it's not
>> + * there already and if this pmd has at least one rxq to poll. */
>> + CMAP_FOR_EACH(pmd, node, &dp->poll_threads) {
>> + ovs_mutex_lock(&pmd->port_mutex);
>> + if (hmap_count(&pmd->poll_list) || pmd->core_id == NON_PMD_CORE_ID) {
>> + HMAP_FOR_EACH(port, node, &dp->ports) {
>> + dp_netdev_add_port_tx_to_pmd(pmd, port);
>> + }
>> + }
>> + ovs_mutex_unlock(&pmd->port_mutex);
>> + }
>> +
>> + /* Reload affected pmd threads. */
>> + reload_affected_pmds(dp);
>> }
>>
>> /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
>> @@ -3107,7 +3397,7 @@ dpif_netdev_run(struct dpif *dpif)
>> }
>>
>> if (dp_netdev_is_reconf_required(dp) || ports_require_restart(dp)) {
>> - reconfigure_pmd_threads(dp);
>> + reconfigure_datapath(dp);
>> }
>> ovs_mutex_unlock(&dp->port_mutex);
>>
>> @@ -3357,16 +3647,9 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
>> OVS_REQUIRES(dp->port_mutex)
>> {
>> struct dp_netdev_pmd_thread *non_pmd;
>> - struct dp_netdev_port *port;
>>
>> non_pmd = xzalloc(sizeof *non_pmd);
>> dp_netdev_configure_pmd(non_pmd, dp, NON_PMD_CORE_ID, OVS_NUMA_UNSPEC);
>> -
>> - HMAP_FOR_EACH (port, node, &dp->ports) {
>> - dp_netdev_add_port_tx_to_pmd(non_pmd, port);
>> - }
>> -
>> - dp_netdev_reload_pmd__(non_pmd);
>> }
>>
>> /* Caller must have valid pointer to 'pmd'. */
>> @@ -3412,10 +3695,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>> pmd->dp = dp;
>> pmd->core_id = core_id;
>> pmd->numa_id = numa_id;
>> + pmd->need_reload = false;
>>
>> - *CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
>> - ? ovs_numa_get_n_cores()
>> - : get_n_pmd_threads(dp);
>> + *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
>>
>> ovs_refcount_init(&pmd->ref_cnt);
>> latch_init(&pmd->exit_latch);
>> @@ -3483,7 +3765,6 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>> } else {
>> latch_set(&pmd->exit_latch);
>> dp_netdev_reload_pmd__(pmd);
>> - ovs_numa_unpin_core(pmd->core_id);
>> xpthread_join(pmd->thread, NULL);
>> }
>>
>> @@ -3498,20 +3779,20 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>> dp_netdev_pmd_unref(pmd);
>> }
>>
>> -/* Destroys all pmd threads, but not the non pmd thread. */
>> +/* Destroys all pmd threads. If 'non_pmd' is true it also destroys the non pmd
>> + * thread. */
>> static void
>> -dp_netdev_stop_pmds(struct dp_netdev *dp)
>> +dp_netdev_destroy_all_pmds(struct dp_netdev *dp, bool non_pmd)
>> {
>> struct dp_netdev_pmd_thread *pmd;
>> struct dp_netdev_pmd_thread **pmd_list;
>> size_t k = 0, n_pmds;
>>
>> - n_pmds = get_n_pmd_threads(dp);
>> + n_pmds = cmap_count(&dp->poll_threads);
>> pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
>>
>> CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> - /* We don't need to destroy the non pmd thread */
>> - if (pmd->core_id == NON_PMD_CORE_ID) {
>> + if (!non_pmd && pmd->core_id == NON_PMD_CORE_ID) {
>> continue;
>> }
>> /* We cannot call dp_netdev_del_pmd(), since it alters
>> @@ -3527,32 +3808,6 @@ dp_netdev_stop_pmds(struct dp_netdev *dp)
>> free(pmd_list);
>> }
>>
>> -/* Destroys all pmd threads, including the non pmd thread. */
>> -static void
>> -dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
>> -{
>> - struct dp_netdev_pmd_thread *pmd;
>> - struct dp_netdev_pmd_thread **pmd_list;
>> - size_t k = 0, n_pmds;
>> -
>> - n_pmds = cmap_count(&dp->poll_threads);
>> - pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
>> -
>> - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> - /* We cannot call dp_netdev_del_pmd(), since it alters
>> - * 'dp->poll_threads' (while we're iterating it) and it
>> - * might quiesce. */
>> - ovs_assert(k < n_pmds);
>> - pmd_list[k++] = pmd;
>> - }
>> -
>> - for (size_t i = 0; i < k; i++) {
>> - dp_netdev_del_pmd(dp, pmd_list[i]);
>> - }
>> -
>> - free(pmd_list);
>> -}
>> -
>> /* Deletes all rx queues from pmd->poll_list and all the ports from
>> * pmd->tx_ports. */
>> static void
>> @@ -3571,126 +3826,40 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
>> ovs_mutex_unlock(&pmd->port_mutex);
>> }
>>
>> -static struct tx_port *
>> -tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
>> -{
>> - struct tx_port *tx;
>> -
>> - HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>> - if (tx->port->port_no == port_no) {
>> - return tx;
>> - }
>> - }
>> -
>> - return NULL;
>> -}
>> -
>> -/* Deletes all rx queues of 'port' from 'poll_list', and the 'port' from
>> - * 'tx_ports' of 'pmd' thread. Returns true if 'port' was found in 'pmd'
>> - * (therefore a restart is required). */
>> -static bool
>> -dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
>> - struct dp_netdev_pmd_thread *pmd)
>> -{
>> - struct rxq_poll *poll, *next;
>> - struct tx_port *tx;
>> - bool found = false;
>> -
>> - ovs_mutex_lock(&pmd->port_mutex);
>> - HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
>> - if (poll->rxq->port == port) {
>> - found = true;
>> - hmap_remove(&pmd->poll_list, &poll->node);
>> - free(poll);
>> - }
>> - }
>> -
>> - tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
>> - if (tx) {
>> - hmap_remove(&pmd->tx_ports, &tx->node);
>> - free(tx);
>> - found = true;
>> - }
>> - ovs_mutex_unlock(&pmd->port_mutex);
>> -
>> - return found;
>> -}
>> -
>> -/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
>> - * threads. The pmd threads that need to be restarted are inserted in
>> - * 'to_reload'. */
>> +/* Adds rx queue to poll_list of PMD thread, if it's not there already. */
>> static void
>> -dp_netdev_del_port_from_all_pmds__(struct dp_netdev *dp,
>> - struct dp_netdev_port *port,
>> - struct hmapx *to_reload)
>> +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> + struct dp_netdev_rxq *rxq)
>> + OVS_REQUIRES(pmd->port_mutex)
>> {
>> - struct dp_netdev_pmd_thread *pmd;
>> -
>> - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> - bool found;
>> -
>> - found = dp_netdev_del_port_from_pmd__(port, pmd);
>> + int qid = netdev_rxq_get_queue_id(rxq->rx);
>> + uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
>> + struct rxq_poll *poll;
>>
>> - if (found) {
>> - hmapx_add(to_reload, pmd);
>> + HMAP_FOR_EACH_WITH_HASH (poll, node, hash, &pmd->poll_list) {
>> + if (poll->rxq == rxq) {
>> + /* 'rxq' is already polled by this thread. Do nothing. */
>> + return;
>> }
>> }
>> -}
>> -
>> -/* Deletes 'port' from the 'poll_list' and from the 'tx_ports' of all the pmd
>> - * threads. Reloads the threads if needed. */
>> -static void
>> -dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
>> - struct dp_netdev_port *port)
>> -{
>> - struct dp_netdev_pmd_thread *pmd;
>> - struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> - struct hmapx_node *node;
>>
>> - dp_netdev_del_port_from_all_pmds__(dp, port, &to_reload);
>> -
>> - HMAPX_FOR_EACH (node, &to_reload) {
>> - pmd = (struct dp_netdev_pmd_thread *) node->data;
>> - dp_netdev_reload_pmd__(pmd);
>> - }
>> -
>> - hmapx_destroy(&to_reload);
>> -}
>> -
>> -
>> -/* Returns non-isolated PMD thread from this numa node with fewer
>> - * rx queues to poll. Returns NULL if there is no non-isolated PMD threads
>> - * on this numa node. Can be called safely only by main thread. */
>> -static struct dp_netdev_pmd_thread *
>> -dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
>> -{
>> - int min_cnt = -1;
>> - struct dp_netdev_pmd_thread *pmd, *res = NULL;
>> -
>> - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> - if (!pmd->isolated && pmd->numa_id == numa_id
>> - && (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
>> - min_cnt = hmap_count(&pmd->poll_list);
>> - res = pmd;
>> - }
>> - }
>> + poll = xmalloc(sizeof *poll);
>> + poll->rxq = rxq;
>> + hmap_insert(&pmd->poll_list, &poll->node, hash);
>>
>> - return res;
>> + pmd->need_reload = true;
>> }
>>
>> -/* Adds rx queue to poll_list of PMD thread. */
>> +/* Delete 'poll' from poll_list of PMD thread. */
>> static void
>> -dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> - struct dp_netdev_rxq *rxq)
>> +dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> + struct rxq_poll *poll)
>> OVS_REQUIRES(pmd->port_mutex)
>> {
>> - int qid = netdev_rxq_get_queue_id(rxq->rx);
>> - uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
>> - struct rxq_poll *poll;
>> + hmap_remove(&pmd->poll_list, &poll->node);
>> + free(poll);
>>
>> - poll = xmalloc(sizeof *poll);
>> - poll->rxq = rxq;
>> - hmap_insert(&pmd->poll_list, &poll->node, hash);
>> + pmd->need_reload = true;
>> }
>>
>> /* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
>> @@ -3698,190 +3867,37 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> static void
>> dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>> struct dp_netdev_port *port)
>> + OVS_REQUIRES(pmd->port_mutex)
>> {
>> struct tx_port *tx;
>>
>> + tx = tx_port_lookup(&pmd->tx_ports, port->port_no);
>> + if (tx) {
>> + /* 'port' is already on this thread tx cache. Do nothing. */
>> + return;
>> + }
>> +
>> tx = xzalloc(sizeof *tx);
>>
>> tx->port = port;
>> tx->qid = -1;
>>
>> - ovs_mutex_lock(&pmd->port_mutex);
>> hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>> - ovs_mutex_unlock(&pmd->port_mutex);
>> -}
>> -
>> -/* Distribute all {pinned|non-pinned} rx queues of 'port' between PMD
>> - * threads in 'dp'. The pmd threads that need to be restarted are inserted
>> - * in 'to_reload'. PMD threads with pinned queues marked as isolated. */
>> -static void
>> -dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
>> - struct dp_netdev_port *port,
>> - struct hmapx *to_reload, bool pinned)
>> -{
>> - int numa_id = netdev_get_numa_id(port->netdev);
>> - struct dp_netdev_pmd_thread *pmd;
>> - int i;
>> -
>> - if (!netdev_is_pmd(port->netdev)) {
>> - return;
>> - }
>> -
>> - for (i = 0; i < port->n_rxq; i++) {
>> - if (pinned) {
>> - if (port->rxqs[i].core_id == RXQ_CORE_UNPINNED) {
>> - continue;
>> - }
>> - pmd = dp_netdev_get_pmd(dp, port->rxqs[i].core_id);
>> - if (!pmd) {
>> - VLOG_WARN("There is no PMD thread on core %d. "
>> - "Queue %d on port \'%s\' will not be polled.",
>> - port->rxqs[i].core_id, i,
>> - netdev_get_name(port->netdev));
>> - continue;
>> - }
>> - pmd->isolated = true;
>> - dp_netdev_pmd_unref(pmd);
>> - } else {
>> - if (port->rxqs[i].core_id != RXQ_CORE_UNPINNED) {
>> - continue;
>> - }
>> - pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
>> - if (!pmd) {
>> - VLOG_WARN("There's no available pmd thread on numa node %d",
>> - numa_id);
>> - break;
>> - }
>> - }
>> -
>> - ovs_mutex_lock(&pmd->port_mutex);
>> - dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
>> - ovs_mutex_unlock(&pmd->port_mutex);
>> -
>> - hmapx_add(to_reload, pmd);
>> - }
>> -}
>> -
>> -/* Distributes all non-pinned rx queues of 'port' between all PMD threads
>> - * in 'dp' and inserts 'port' in the PMD threads 'tx_ports'. The pmd threads
>> - * that need to be restarted are inserted in 'to_reload'. */
>> -static void
>> -dp_netdev_add_port_to_pmds__(struct dp_netdev *dp, struct dp_netdev_port *port,
>> - struct hmapx *to_reload)
>> -{
>> - struct dp_netdev_pmd_thread *pmd;
>> -
>> - dp_netdev_add_port_rx_to_pmds(dp, port, to_reload, false);
>> -
>> - CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>> - dp_netdev_add_port_tx_to_pmd(pmd, port);
>> - hmapx_add(to_reload, pmd);
>> - }
>> -}
>> -
>> -/* Distributes all non-pinned rx queues of 'port' between all PMD threads
>> - * in 'dp', inserts 'port' in the PMD threads 'tx_ports' and reloads them,
>> - * if needed. */
>> -static void
>> -dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
>> -{
>> - struct dp_netdev_pmd_thread *pmd;
>> - struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> - struct hmapx_node *node;
>> -
>> - dp_netdev_add_port_to_pmds__(dp, port, &to_reload);
>> -
>> - HMAPX_FOR_EACH (node, &to_reload) {
>> - pmd = (struct dp_netdev_pmd_thread *) node->data;
>> - dp_netdev_reload_pmd__(pmd);
>> - }
>> -
>> - hmapx_destroy(&to_reload);
>> -}
>> -
>> -static void
>> -dp_netdev_start_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>> -{
>> - int can_have, n_unpinned, i;
>> -
>> - n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
>> - if (!n_unpinned) {
>> - VLOG_WARN("Cannot create pmd threads due to out of unpinned "
>> - "cores on numa node %d", numa_id);
>> - return;
>> - }
>> -
>> - /* If cpu mask is specified, uses all unpinned cores, otherwise
>> - * tries creating NR_PMD_THREADS pmd threads. */
>> - can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
>> - for (i = 0; i < can_have; i++) {
>> - unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
>> - struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
>> - struct dp_netdev_port *port;
>> -
>> - dp_netdev_configure_pmd(pmd, dp, core_id, numa_id);
>> -
>> - HMAP_FOR_EACH (port, node, &dp->ports) {
>> - dp_netdev_add_port_tx_to_pmd(pmd, port);
>> - }
>> -
>> - pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
>> - }
>> - VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
>> + pmd->need_reload = true;
>> }
>>
>> -/* Starts pmd threads, if not already started. The function takes care of
>> - * filling the threads tx port cache. */
>> +/* Del 'tx' from the tx port cache of 'pmd', which must be reloaded for the
>> + * changes to take effect. */
>> static void
>> -dp_netdev_start_pmds(struct dp_netdev *dp)
>> - OVS_REQUIRES(dp->port_mutex)
>> +dp_netdev_del_port_tx_from_pmd(struct dp_netdev_pmd_thread *pmd,
>> + struct tx_port *tx)
>> + OVS_REQUIRES(pmd->port_mutex)
>> {
>> - int n_pmds;
>> -
>> - n_pmds = get_n_pmd_threads(dp);
>> -
>> - /* If there are already pmd threads created for the datapath, do nothing.
>> - * Else, creates the pmd threads. */
>> - if (!n_pmds) {
>> - int n_numas = ovs_numa_get_n_numas();
>> -
>> - for (int numa_id = 0; numa_id < n_numas; numa_id++) {
>> - dp_netdev_start_pmds_on_numa(dp, numa_id);
>> - }
>> - }
>> + hmap_remove(&pmd->tx_ports, &tx->node);
>> + free(tx);
>> + pmd->need_reload = true;
>> }
>> -
>> ?
>> -/* Called after pmd threads config change. Restarts pmd threads with
>> - * new configuration. */
>> -static void
>> -dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
>> - OVS_REQUIRES(dp->port_mutex)
>> -{
>> - struct hmapx to_reload = HMAPX_INITIALIZER(&to_reload);
>> - struct dp_netdev_pmd_thread *pmd;
>> - struct dp_netdev_port *port;
>> - struct hmapx_node *node;
>> -
>> - dp_netdev_start_pmds(dp);
>> - HMAP_FOR_EACH (port, node, &dp->ports) {
>> - /* Distribute only pinned rx queues first to mark threads as isolated */
>> - dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, true);
>> - }
>> -
>> - /* Distribute remaining non-pinned rx queues to non-isolated PMD threads. */
>> - HMAP_FOR_EACH (port, node, &dp->ports) {
>> - dp_netdev_add_port_rx_to_pmds(dp, port, &to_reload, false);
>> - }
>> -
>> - HMAPX_FOR_EACH (node, &to_reload) {
>> - pmd = (struct dp_netdev_pmd_thread *) node->data;
>> - dp_netdev_reload_pmd__(pmd);
>> - }
>> -
>> - hmapx_destroy(&to_reload);
>> -}
>> -
>> static char *
>> dpif_netdev_get_datapath_version(void)
>> {
>> @@ -4871,12 +4887,12 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
>>
>> /* Remove port. */
>> hmap_remove(&dp->ports, &port->node);
>> - dp_netdev_del_port_from_all_pmds(dp, port);
>> + reconfigure_datapath(dp);
>>
>> /* Reinsert with new port number. */
>> port->port_no = port_no;
>> hmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>> - dp_netdev_add_port_to_pmds(dp, port);
>> + reconfigure_datapath(dp);
>>
>> seq_change(dp->port_seq);
>> unixctl_command_reply(conn, NULL);
>> diff --git a/tests/pmd.at b/tests/pmd.at
>> index 147dfda..d3dcf24 100644
>> --- a/tests/pmd.at
>> +++ b/tests/pmd.at
>> @@ -607,8 +607,7 @@ p2 0 0 0
>> p2 1 0 0
>> ])
>>
>> -dnl During reconfiguration some packets will be dropped. This is expected
>> -OVS_VSWITCHD_STOP(["/dpif(monitor[[0-9]]\+)|WARN|dummy at ovs-dummy: execute [[0-9]]\+ failed/d"])
>> +OVS_VSWITCHD_STOP
>> AT_CLEANUP
>>
>> AT_SETUP([PMD - dpctl])
>>
More information about the dev
mailing list