[ovs-dev] [PATCH v2 1/2] dpif-netdev: Rework of rx queue management.
Daniele Di Proietto
diproiettod at vmware.com
Sat Jan 23 01:21:00 UTC 2016
Hi Ilya,
Thank you very much for the patch.
I definitely like that the queue assignment is performed by the
main thread: not only is less bug-prone, but the logic will be more
easily customizable.
I absolutely welcome the changes to do_add_port and do_del_port to
keep the queues to the currently assigned threads.
I think we can avoid pausing and resuming the threads each time and,
instead, leave the current reloading logic unaltered. Here's a way:
* pmd_thread_main() would be identical to master. pmd_load_queues(),
instead, would return a poll_list by copying the struct rxq_poll
from 'pmd->poll_list'.
* do_add_port() and do_del_port() would still write on the pmd
specific lists while the threads are running. After updating
a list for a pmd thread, they would call dp_netdev_reload_pmd__().
This behaviour should still fix the bugs, but it requires less
sychronization. What do you think? I don't think this should
create any problems to the following patch, right?
I've prepared an incremental on top of this patch to illustrate the
idea, but other ideas/implementations/fixes are welcome.
Thanks,
Daniele
----------------------------------------
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index fd6ac48..3f5cf42 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -480,7 +480,7 @@ static void dp_netdev_input(struct
dp_netdev_pmd_thread *,
struct dp_packet **, int cnt);
static void dp_netdev_disable_upcall(struct dp_netdev *);
-void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd);
+void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev *dp, int index,
unsigned core_id, int numa_id);
@@ -1026,9 +1026,8 @@ dpif_netdev_get_stats(const struct dpif *dpif,
struct dpif_dp_stats *stats)
return 0;
}
-/* Causes pmd thread to break from infinite polling cycle. */
static void
-dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
{
int old_seq;
@@ -1042,39 +1041,6 @@ dp_netdev_break_pmd__(struct dp_netdev_pmd_thread
*pmd)
ovs_mutex_unlock(&pmd->cond_mutex);
}
-/* Causes pmd thread to break from infinite polling cycle and
- * lock on poll_mutex. Not applicable for non-PMD threads. */
-static void
-dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd)
- OVS_ACQUIRES(pmd->poll_mutex)
-{
- int old_seq;
-
- ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
-
- /* Wait until pmd thread starts polling cycle to
- * avoid deadlock. */
- while (!ovs_mutex_trylock(&pmd->poll_mutex)) {
- ovs_mutex_unlock(&pmd->poll_mutex);
- }
-
- ovs_mutex_lock(&pmd->cond_mutex);
- atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
- ovs_mutex_lock(&pmd->poll_mutex);
- ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
- ovs_mutex_unlock(&pmd->cond_mutex);
-}
-
-/* Unlocks pmd thread by unlocking poll_mutex.
- * Not applicable for non-PMD threads. */
-static void
-dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd)
- OVS_RELEASES(pmd->poll_mutex)
-{
- ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
- ovs_mutex_unlock(&pmd->poll_mutex);
-}
-
static uint32_t
hash_port_no(odp_port_t port_no)
{
@@ -1181,9 +1147,10 @@ do_add_port(struct dp_netdev *dp, const char
*devname, const char *type,
break;
}
- dp_netdev_pause_pmd__(pmd);
+ ovs_mutex_lock(&pmd->poll_mutex);
dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
- dp_netdev_resume_pmd__(pmd);
+ ovs_mutex_unlock(&pmd->poll_mutex);
+ dp_netdev_reload_pmd__(pmd);
}
}
seq_change(dp->port_seq);
@@ -1366,29 +1333,27 @@ do_del_port(struct dp_netdev *dp, struct
dp_netdev_port *port)
dp_netdev_del_pmds_on_numa(dp, numa_id);
}
else {
- bool found;
struct dp_netdev_pmd_thread *pmd;
struct rxq_poll *poll, *next;
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
if (pmd->numa_id == numa_id) {
- found = false;
- dp_netdev_pause_pmd__(pmd);
+ bool found = false;
+
+ ovs_mutex_lock(&pmd->poll_mutex);
LIST_FOR_EACH_SAFE (poll, next, node,
&pmd->poll_list) {
if (poll->port == port) {
+ found = true;
port_unref(poll->port);
list_remove(&poll->node);
pmd->poll_cnt--;
free(poll);
- found = true;
}
}
+ ovs_mutex_unlock(&pmd->poll_mutex);
if (found) {
- /* Clean up emc cache if poll_list modified. */
- emc_cache_uninit(&pmd->flow_cache);
- emc_cache_init(&pmd->flow_cache);
+ dp_netdev_reload_pmd__(pmd);
}
- dp_netdev_resume_pmd__(pmd);
}
}
}
@@ -2656,28 +2621,56 @@ dpif_netdev_wait(struct dpif *dpif)
seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
}
+static int
+pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
+ struct rxq_poll **ppoll_list)
+{
+ struct rxq_poll *poll_list = *ppoll_list;
+ struct rxq_poll *poll;
+ int i = 0;
+
+ poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
+
+ LIST_FOR_EACH (poll, node, &pmd->poll_list) {
+ poll_list[i++] = *poll;
+ }
+
+ *ppoll_list = poll_list;
+ return pmd->poll_cnt;
+}
+
static void *
pmd_thread_main(void *f_)
{
struct dp_netdev_pmd_thread *pmd = f_;
- struct rxq_poll *poll;
unsigned int lc = 0;
+ struct rxq_poll *poll_list;
unsigned int port_seq = PMD_INITIAL_SEQ;
+ int poll_cnt;
+ int i;
+
+ poll_cnt = 0;
+ poll_list = NULL;
/* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
pmd_thread_setaffinity_cpu(pmd->core_id);
reload:
- ovs_mutex_lock(&pmd->poll_mutex);
+ emc_cache_init(&pmd->flow_cache);
+ poll_cnt = pmd_load_queues(pmd, &poll_list);
+
/* List port/core affinity */
- LIST_FOR_EACH (poll, node, &pmd->poll_list) {
- VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
- netdev_get_name(poll->port->netdev));
+ for (i = 0; i < poll_cnt; i++) {
+ VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
netdev_get_name(poll_list[i].port->netdev));
}
+ /* Signal here to make sure the pmd finishes
+ * reloading the updated configuration. */
+ dp_netdev_pmd_reload_done(pmd);
+
for (;;) {
- LIST_FOR_EACH (poll, node, &pmd->poll_list) {
- dp_netdev_process_rxq_port(pmd, poll->port, poll->rx);
+ for (i = 0; i < poll_cnt; i++) {
+ dp_netdev_process_rxq_port(pmd, poll_list[i].port,
poll_list[i].rx);
}
if (lc++ > 1024) {
@@ -2696,15 +2689,16 @@ reload:
}
}
}
- ovs_mutex_unlock(&pmd->poll_mutex);
- /* Synchronize with breaker thread. */
- dp_netdev_pmd_break_done(pmd);
+ emc_cache_uninit(&pmd->flow_cache);
- if (!latch_is_set(&pmd->exit_latch)) {
+ if (!latch_is_set(&pmd->exit_latch)){
goto reload;
}
+ dp_netdev_pmd_reload_done(pmd);
+
+ free(poll_list);
return NULL;
}
@@ -2739,7 +2733,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
}
void
-dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd)
+dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
{
ovs_mutex_lock(&pmd->cond_mutex);
xpthread_cond_signal(&pmd->cond);
@@ -2843,8 +2837,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
*pmd, struct dp_netdev *dp,
dpcls_init(&pmd->cls);
cmap_init(&pmd->flow_table);
list_init(&pmd->poll_list);
- emc_cache_init(&pmd->flow_cache);
-
+ /* init the 'flow_cache' since there is no
+ * actual thread created for NON_PMD_CORE_ID. */
+ if (core_id == NON_PMD_CORE_ID) {
+ emc_cache_init(&pmd->flow_cache);
+ }
cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
&pmd->node),
hash_int(core_id, 0));
}
@@ -2870,11 +2867,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
dp_netdev_pmd_thread *pmd)
{
struct rxq_poll *poll;
- emc_cache_uninit(&pmd->flow_cache);
-
- if (pmd->core_id != NON_PMD_CORE_ID) {
+ /* Uninit the 'flow_cache' since there is
+ * no actual thread uninit it for NON_PMD_CORE_ID. */
+ if (pmd->core_id == NON_PMD_CORE_ID) {
+ emc_cache_uninit(&pmd->flow_cache);
+ } else {
latch_set(&pmd->exit_latch);
- dp_netdev_break_pmd__(pmd);
+ dp_netdev_reload_pmd__(pmd);
ovs_numa_unpin_core(pmd->core_id);
xpthread_join(pmd->thread, NULL);
}
On 14/01/2016 06:47, "Ilya Maximets" <i.maximets at samsung.com> wrote:
>Current rx queue management model is buggy and will not work properly
>without additional barriers and other syncronization between PMD
>threads and main thread.
>
>Known BUGS of current model:
> * While reloading, two PMD threads, one already reloaded and
> one not yet reloaded, can poll same queue of the same port.
> This behavior may lead to dpdk driver failure, because they
> are not thread-safe.
> * Same bug as fixed in commit e4e74c3a2b
> ("dpif-netdev: Purge all ukeys when reconfigure pmd.") but
> reproduced while only reconfiguring of pmd threads without
> restarting, because addition may change the sequence of
> other ports, which is important in time of reconfiguration.
>
>Introducing the new model, where distribution of queues made by main
>thread with minimal synchronizations and without data races between
>pmd threads. Also, this model should work faster, because only
>needed threads will be interrupted for reconfiguraition and total
>computational complexity of reconfiguration is less.
>
>Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
>---
> lib/dpif-netdev.c | 301
>++++++++++++++++++++++++++++++++----------------------
> 1 file changed, 180 insertions(+), 121 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>index cd72e62..fd6ac48 100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
> atomic_ullong n[PMD_N_CYCLES];
> };
>
>+/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
>+struct rxq_poll {
>+ struct dp_netdev_port *port;
>+ struct netdev_rxq *rx;
>+ struct ovs_list node;
>+};
>+
> /* PMD: Poll modes drivers. PMD accesses devices via polling to
>eliminate
> * the performance overhead of interrupt processing. Therefore netdev
>can
> * not implement rx-wait for these devices. dpif-netdev needs to poll
>@@ -430,6 +437,10 @@ struct dp_netdev_pmd_thread {
> int tx_qid; /* Queue id used by this pmd thread
>to
> * send packets on all netdevs */
>
>+ struct ovs_list poll_list; /* List of rx queues to poll. */
>+ int poll_cnt; /* Number of elemints in poll_list.
>*/
>+ struct ovs_mutex poll_mutex; /* Mutex for poll_list. */
>+
> /* Only a pmd thread can write on its own 'cycles' and 'stats'.
> * The main thread keeps 'stats_zero' and 'cycles_zero' as base
> * values and subtracts them from 'stats' and 'cycles' before
>@@ -469,7 +480,7 @@ static void dp_netdev_input(struct
>dp_netdev_pmd_thread *,
> struct dp_packet **, int cnt);
>
> static void dp_netdev_disable_upcall(struct dp_netdev *);
>-void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>+void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd);
> static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
> struct dp_netdev *dp, int index,
> unsigned core_id, int numa_id);
>@@ -482,6 +493,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct
>cmap_position *pos);
> static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
> static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int
>numa_id);
> static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int
>numa_id);
>+static void
>+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>+ struct dp_netdev_port *port, struct netdev_rxq
>*rx);
>+static struct dp_netdev_pmd_thread *
>+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
> static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
> static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
> static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
>@@ -1010,8 +1026,9 @@ dpif_netdev_get_stats(const struct dpif *dpif,
>struct dpif_dp_stats *stats)
> return 0;
> }
>
>+/* Causes pmd thread to break from infinite polling cycle. */
> static void
>-dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
>+dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd)
> {
> int old_seq;
>
>@@ -1025,16 +1042,37 @@ dp_netdev_reload_pmd__(struct
>dp_netdev_pmd_thread *pmd)
> ovs_mutex_unlock(&pmd->cond_mutex);
> }
>
>-/* Causes all pmd threads to reload its tx/rx devices.
>- * Must be called after adding/removing ports. */
>+/* Causes pmd thread to break from infinite polling cycle and
>+ * lock on poll_mutex. Not applicable for non-PMD threads. */
> static void
>-dp_netdev_reload_pmds(struct dp_netdev *dp)
>+dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd)
>+ OVS_ACQUIRES(pmd->poll_mutex)
> {
>- struct dp_netdev_pmd_thread *pmd;
>+ int old_seq;
>
>- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>- dp_netdev_reload_pmd__(pmd);
>+ ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
>+
>+ /* Wait until pmd thread starts polling cycle to
>+ * avoid deadlock. */
>+ while (!ovs_mutex_trylock(&pmd->poll_mutex)) {
>+ ovs_mutex_unlock(&pmd->poll_mutex);
> }
>+
>+ ovs_mutex_lock(&pmd->cond_mutex);
>+ atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
>+ ovs_mutex_lock(&pmd->poll_mutex);
>+ ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
>+ ovs_mutex_unlock(&pmd->cond_mutex);
>+}
>+
>+/* Unlocks pmd thread by unlocking poll_mutex.
>+ * Not applicable for non-PMD threads. */
>+static void
>+dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd)
>+ OVS_RELEASES(pmd->poll_mutex)
>+{
>+ ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
>+ ovs_mutex_unlock(&pmd->poll_mutex);
> }
>
> static uint32_t
>@@ -1128,8 +1166,25 @@ do_add_port(struct dp_netdev *dp, const char
>*devname, const char *type,
> cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>
> if (netdev_is_pmd(netdev)) {
>- dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>- dp_netdev_reload_pmds(dp);
>+ int numa_id = netdev_get_numa_id(netdev);
>+ struct dp_netdev_pmd_thread *pmd;
>+
>+ /* Cannot create pmd threads for invalid numa node. */
>+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
>+
>+ for (i = 0; i < netdev_n_rxq(netdev); i++) {
>+ pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
>+ if (!pmd) {
>+ /* There is no pmd threads on this numa node. */
>+ dp_netdev_set_pmds_on_numa(dp, numa_id);
>+ /* Assigning of rx queues done. */
>+ break;
>+ }
>+
>+ dp_netdev_pause_pmd__(pmd);
>+ dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
>+ dp_netdev_resume_pmd__(pmd);
>+ }
> }
> seq_change(dp->port_seq);
>
>@@ -1226,16 +1281,6 @@ port_ref(struct dp_netdev_port *port)
> }
> }
>
>-static bool
>-port_try_ref(struct dp_netdev_port *port)
>-{
>- if (port) {
>- return ovs_refcount_try_ref_rcu(&port->ref_cnt);
>- }
>-
>- return false;
>-}
>-
> static void
> port_unref(struct dp_netdev_port *port)
> {
>@@ -1313,12 +1358,40 @@ do_del_port(struct dp_netdev *dp, struct
>dp_netdev_port *port)
> if (netdev_is_pmd(port->netdev)) {
> int numa_id = netdev_get_numa_id(port->netdev);
>
>+ /* PMD threads can not be on invalid numa node. */
>+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
> /* If there is no netdev on the numa node, deletes the pmd
>threads
>- * for that numa. Else, just reloads the queues. */
>+ * for that numa. Else, deletes the queues from polling lists.
>*/
> if (!has_pmd_port_for_numa(dp, numa_id)) {
> dp_netdev_del_pmds_on_numa(dp, numa_id);
> }
>- dp_netdev_reload_pmds(dp);
>+ else {
>+ bool found;
>+ struct dp_netdev_pmd_thread *pmd;
>+ struct rxq_poll *poll, *next;
>+
>+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>+ if (pmd->numa_id == numa_id) {
>+ found = false;
>+ dp_netdev_pause_pmd__(pmd);
>+ LIST_FOR_EACH_SAFE (poll, next, node,
>&pmd->poll_list) {
>+ if (poll->port == port) {
>+ port_unref(poll->port);
>+ list_remove(&poll->node);
>+ pmd->poll_cnt--;
>+ free(poll);
>+ found = true;
>+ }
>+ }
>+ if (found) {
>+ /* Clean up emc cache if poll_list modified. */
>+ emc_cache_uninit(&pmd->flow_cache);
>+ emc_cache_init(&pmd->flow_cache);
>+ }
>+ dp_netdev_resume_pmd__(pmd);
>+ }
>+ }
>+ }
> }
>
> port_unref(port);
>@@ -2583,92 +2656,28 @@ dpif_netdev_wait(struct dpif *dpif)
> seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
> }
>
>-struct rxq_poll {
>- struct dp_netdev_port *port;
>- struct netdev_rxq *rx;
>-};
>-
>-static int
>-pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>- struct rxq_poll **ppoll_list, int poll_cnt)
>-{
>- struct rxq_poll *poll_list = *ppoll_list;
>- struct dp_netdev_port *port;
>- int n_pmds_on_numa, index, i;
>-
>- /* Simple scheduler for netdev rx polling. */
>- for (i = 0; i < poll_cnt; i++) {
>- port_unref(poll_list[i].port);
>- }
>-
>- poll_cnt = 0;
>- n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>- index = 0;
>-
>- CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>- /* Calls port_try_ref() to prevent the main thread
>- * from deleting the port. */
>- if (port_try_ref(port)) {
>- if (netdev_is_pmd(port->netdev)
>- && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
>- int i;
>-
>- for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>- if ((index % n_pmds_on_numa) == pmd->index) {
>- poll_list = xrealloc(poll_list,
>- sizeof *poll_list * (poll_cnt +
>1));
>-
>- port_ref(port);
>- poll_list[poll_cnt].port = port;
>- poll_list[poll_cnt].rx = port->rxq[i];
>- poll_cnt++;
>- }
>- index++;
>- }
>- }
>- /* Unrefs the port_try_ref(). */
>- port_unref(port);
>- }
>- }
>-
>- *ppoll_list = poll_list;
>- return poll_cnt;
>-}
>-
> static void *
> pmd_thread_main(void *f_)
> {
> struct dp_netdev_pmd_thread *pmd = f_;
>+ struct rxq_poll *poll;
> unsigned int lc = 0;
>- struct rxq_poll *poll_list;
> unsigned int port_seq = PMD_INITIAL_SEQ;
>- int poll_cnt;
>- int i;
>-
>- poll_cnt = 0;
>- poll_list = NULL;
>
> /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
> ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> pmd_thread_setaffinity_cpu(pmd->core_id);
> reload:
>- emc_cache_init(&pmd->flow_cache);
>- poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
>-
>+ ovs_mutex_lock(&pmd->poll_mutex);
> /* List port/core affinity */
>- for (i = 0; i < poll_cnt; i++) {
>- VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
>netdev_get_name(poll_list[i].port->netdev));
>+ LIST_FOR_EACH (poll, node, &pmd->poll_list) {
>+ VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
>+ netdev_get_name(poll->port->netdev));
> }
>
>- /* Signal here to make sure the pmd finishes
>- * reloading the updated configuration. */
>- dp_netdev_pmd_reload_done(pmd);
>-
> for (;;) {
>- int i;
>-
>- for (i = 0; i < poll_cnt; i++) {
>- dp_netdev_process_rxq_port(pmd, poll_list[i].port,
>poll_list[i].rx);
>+ LIST_FOR_EACH (poll, node, &pmd->poll_list) {
>+ dp_netdev_process_rxq_port(pmd, poll->port, poll->rx);
> }
>
> if (lc++ > 1024) {
>@@ -2687,20 +2696,15 @@ reload:
> }
> }
> }
>+ ovs_mutex_unlock(&pmd->poll_mutex);
>
>- emc_cache_uninit(&pmd->flow_cache);
>+ /* Synchronize with breaker thread. */
>+ dp_netdev_pmd_break_done(pmd);
>
>- if (!latch_is_set(&pmd->exit_latch)){
>+ if (!latch_is_set(&pmd->exit_latch)) {
> goto reload;
> }
>
>- for (i = 0; i < poll_cnt; i++) {
>- port_unref(poll_list[i].port);
>- }
>-
>- dp_netdev_pmd_reload_done(pmd);
>-
>- free(poll_list);
> return NULL;
> }
>
>@@ -2735,7 +2739,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
> }
>
> void
>-dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
>+dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd)
> {
> ovs_mutex_lock(&pmd->cond_mutex);
> xpthread_cond_signal(&pmd->cond);
>@@ -2827,6 +2831,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
>*pmd, struct dp_netdev *dp,
> pmd->core_id = core_id;
> pmd->tx_qid = core_id_to_qid(core_id);
> pmd->numa_id = numa_id;
>+ pmd->poll_cnt = 0;
>
> ovs_refcount_init(&pmd->ref_cnt);
> latch_init(&pmd->exit_latch);
>@@ -2834,13 +2839,12 @@ dp_netdev_configure_pmd(struct
>dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> xpthread_cond_init(&pmd->cond, NULL);
> ovs_mutex_init(&pmd->cond_mutex);
> ovs_mutex_init(&pmd->flow_mutex);
>+ ovs_mutex_init(&pmd->poll_mutex);
> dpcls_init(&pmd->cls);
> cmap_init(&pmd->flow_table);
>- /* init the 'flow_cache' since there is no
>- * actual thread created for NON_PMD_CORE_ID. */
>- if (core_id == NON_PMD_CORE_ID) {
>- emc_cache_init(&pmd->flow_cache);
>- }
>+ list_init(&pmd->poll_list);
>+ emc_cache_init(&pmd->flow_cache);
>+
> cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
>&pmd->node),
> hash_int(core_id, 0));
> }
>@@ -2855,6 +2859,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>*pmd)
> latch_destroy(&pmd->exit_latch);
> xpthread_cond_destroy(&pmd->cond);
> ovs_mutex_destroy(&pmd->cond_mutex);
>+ ovs_mutex_destroy(&pmd->poll_mutex);
> free(pmd);
> }
>
>@@ -2863,16 +2868,23 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread
>*pmd)
> static void
> dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
> {
>- /* Uninit the 'flow_cache' since there is
>- * no actual thread uninit it for NON_PMD_CORE_ID. */
>- if (pmd->core_id == NON_PMD_CORE_ID) {
>- emc_cache_uninit(&pmd->flow_cache);
>- } else {
>+ struct rxq_poll *poll;
>+
>+ emc_cache_uninit(&pmd->flow_cache);
>+
>+ if (pmd->core_id != NON_PMD_CORE_ID) {
> latch_set(&pmd->exit_latch);
>- dp_netdev_reload_pmd__(pmd);
>+ dp_netdev_break_pmd__(pmd);
> ovs_numa_unpin_core(pmd->core_id);
> xpthread_join(pmd->thread, NULL);
> }
>+
>+ /* Unref all ports and free poll_list. */
>+ LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
>+ port_unref(poll->port);
>+ free(poll);
>+ }
>+
> /* Purges the 'pmd''s flows after stopping the thread, but before
> * destroying the flows, so that the flow stats can be collected. */
> if (dp->dp_purge_cb) {
>@@ -2906,6 +2918,42 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
> }
> }
>
>+/* Returns PMD thread from this numa node with fewer rx queues to poll.
>+ * Returns NULL if there is no PMD threads on this numa node. */
>+static struct dp_netdev_pmd_thread *
>+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
>+{
>+ int min_cnt = -1;
>+ struct dp_netdev_pmd_thread *pmd, *res = NULL;
>+
>+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>+ if (pmd->numa_id == numa_id
>+ && (min_cnt > pmd->poll_cnt || res == NULL)) {
>+ min_cnt = pmd->poll_cnt;
>+ res = pmd;
>+ }
>+ }
>+
>+ return res;
>+}
>+
>+/* Adds rx queue to poll_list of PMD thread. May be called only
>+ * when PMD thread paused or not started yet. */
>+static void
>+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>+ struct dp_netdev_port *port, struct netdev_rxq
>*rx)
>+ OVS_REQUIRES(pmd->poll_mutex)
>+{
>+ struct rxq_poll *poll = xmalloc(sizeof *poll);
>+
>+ port_ref(port);
>+ poll->port = port;
>+ poll->rx = rx;
>+
>+ list_push_back(&pmd->poll_list, &poll->node);
>+ pmd->poll_cnt++;
>+}
>+
> /* Checks the numa node id of 'netdev' and starts pmd threads for
> * the numa node. */
> static void
>@@ -2925,8 +2973,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
> * in which 'netdev' is on, do nothing. Else, creates the
> * pmd threads for the numa node. */
> if (!n_pmds) {
>- int can_have, n_unpinned, i;
>+ int can_have, n_unpinned, i, index = 0;
> struct dp_netdev_pmd_thread **pmds;
>+ struct dp_netdev_port *port;
>
> n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
> if (!n_unpinned) {
>@@ -2944,13 +2993,23 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp,
>int numa_id)
> pmds[i] = xzalloc(sizeof **pmds);
> dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
> }
>- /* The pmd thread code needs to see all the others configured pmd
>- * threads on the same numa node. That's why we call
>- * 'dp_netdev_configure_pmd()' on all the threads and then we
>actually
>- * start them. */
>+
>+ /* Distributes rx queues of this numa node between new pmd
>threads. */
>+ CMAP_FOR_EACH (port, node, &dp->ports) {
>+ if (netdev_is_pmd(port->netdev)
>+ && netdev_get_numa_id(port->netdev) == numa_id) {
>+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>+ /* Make thread-safety analyser happy. */
>+ ovs_mutex_lock(&pmds[index]->poll_mutex);
>+ dp_netdev_add_rxq_to_pmd(pmds[index], port,
>port->rxq[i]);
>+ ovs_mutex_unlock(&pmds[index]->poll_mutex);
>+ index = (index + 1) % can_have;
>+ }
>+ }
>+ }
>+
>+ /* Actual start of pmd threads. */
> for (i = 0; i < can_have; i++) {
>- /* Each thread will distribute all devices rx-queues among
>- * themselves. */
> pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main,
>pmds[i]);
> }
> free(pmds);
>--
>2.5.0
>
More information about the dev
mailing list