[ovs-dev] [PATCH v6] dpif-netdev: proper tx queue id
Ilya Maximets
i.maximets at samsung.com
Thu Nov 5 10:51:07 UTC 2015
There is a race for total number of threads between pmd threads.
I'll prepare new version.
Best regards, Ilya Maximets.
On 23.10.2015 18:19, Ilya Maximets wrote:
> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>
> Example:
> starting 2 pmd threads with 1 port, 2 rxqs per port,
> pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
>
> It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
> txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
> queues).
>
> In that case, after truncating in netdev_dpdk_send__():
> 'qid = qid % dev->real_n_txq;'
> pmd_1: qid = 2 % 2 = 0
> pmd_2: qid = 4 % 2 = 0
>
> So, both threads will call dpdk_queue_pkts() with same qid = 0.
> This is unexpected behavior if there is 2 tx queues in device.
> Queue #1 will not be used and both threads will lock queue #0
> on each send.
>
> Fix that by introducing per pmd thread hash map 'tx_queues', where will
> be stored all available tx queues for that pmd thread with
> port_no as a key(hash). All tx_qid-s will be unique per port and
> sequential to prevent described unexpected mapping after truncating.
>
> Implemented infrastructure can be used in the future to choose
> between all tx queues available for that pmd thread.
>
> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
> ---
> Note: I will be on vacation until November 2nd.
>
> version 6:
> * fixed assigning same txq to multiple pmd threads on different
> NUMA sockets.
> * Changed logic of txq distribution in pmd_load_queues().
> version 5:
> * txqs 0 from ports of non-pmd netdevs added to all pmd threads
>
> version 4:
> * fixed distribution of tx queues if multiqueue is not supported
>
> version 3:
> * fixed failing of unit tests by adding tx queues of non
> pmd devices to non pmd thread. (they haven't been used by any thread)
> * pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
> * function names changed to dp_netdev_*
> * dp_netdev_pmd_lookup_txq now looks by port_no.
> * removed unnecessary dp_netdev_lookup_port in dp_execute_cb
> for OVS_ACTION_ATTR_OUTPUT.
> * refactoring
>
> lib/dpif-netdev.c | 189 ++++++++++++++++++++++++++++++++++++++++++++++--------
> 1 file changed, 163 insertions(+), 26 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 47fa9e2..b268586 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
> atomic_ullong n[PMD_N_CYCLES];
> };
>
> +struct dp_netdev_pmd_txq {
> + struct cmap_node node; /* In owning dp_netdev_pmd_thread's */
> + /* 'tx_queues'. */
> + struct dp_netdev_port *port;
> + int tx_qid;
> +};
> +
> /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
> * the performance overhead of interrupt processing. Therefore netdev can
> * not implement rx-wait for these devices. dpif-netdev needs to poll
> @@ -425,10 +432,12 @@ struct dp_netdev_pmd_thread {
> pthread_t thread;
> int index; /* Idx of this pmd thread among pmd*/
> /* threads on same numa node. */
> + int global_index; /* Idx of this pmd thread among all*/
> + /* pmd threads in dp. */
> unsigned core_id; /* CPU core id of this pmd thread. */
> int numa_id; /* numa node id of this pmd thread. */
> - int tx_qid; /* Queue id used by this pmd thread to
> - * send packets on all netdevs */
> + struct cmap tx_queues; /* Queue ids used by this pmd thread to
> + * send packets to ports */
>
> /* Only a pmd thread can write on its own 'cycles' and 'stats'.
> * The main thread keeps 'stats_zero' and 'cycles_zero' as base
> @@ -470,6 +479,15 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *,
>
> static void dp_netdev_disable_upcall(struct dp_netdev *);
> void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
> + struct dp_netdev_port *port, int queue_id);
> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
> + struct dp_netdev_pmd_txq *txq);
> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd);
> +static struct dp_netdev_pmd_txq *
> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
> + odp_port_t port_no);
> static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
> struct dp_netdev *dp, int index,
> unsigned core_id, int numa_id);
> @@ -1051,6 +1069,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
> struct netdev_saved_flags *sf;
> struct dp_netdev_port *port;
> struct netdev *netdev;
> + struct dp_netdev_pmd_thread *non_pmd;
> enum netdev_flags flags;
> const char *open_type;
> int error;
> @@ -1127,10 +1146,15 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
> ovs_refcount_init(&port->ref_cnt);
> cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>
> - if (netdev_is_pmd(netdev)) {
> - dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
> - dp_netdev_reload_pmds(dp);
> + non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> + if (non_pmd) {
> + dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
> + dp_netdev_pmd_unref(non_pmd);
> }
> + if (netdev_is_pmd(netdev))
> + dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
> + dp_netdev_reload_pmds(dp);
> +
> seq_change(dp->port_seq);
>
> return 0;
> @@ -1272,6 +1296,13 @@ get_port_by_name(struct dp_netdev *dp,
> }
>
> static int
> +get_n_pmd_threads(struct dp_netdev *dp)
> +{
> + /* There is one non pmd thread in dp->poll_threads */
> + return cmap_count(&dp->poll_threads) - 1;
> +}
> +
> +static int
> get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
> {
> struct dp_netdev_pmd_thread *pmd;
> @@ -1308,18 +1339,32 @@ static void
> do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
> OVS_REQUIRES(dp->port_mutex)
> {
> + struct dp_netdev_pmd_thread *non_pmd;
> +
> cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
> seq_change(dp->port_seq);
> +
> + non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> + if (non_pmd) {
> + /* There is only one txq for each port for non pmd thread */
> + struct dp_netdev_pmd_txq *txq;
> + txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
> + if (OVS_LIKELY(txq))
> + dp_netdev_pmd_del_txq(non_pmd, txq);
> + dp_netdev_pmd_unref(non_pmd);
> + }
> +
> if (netdev_is_pmd(port->netdev)) {
> int numa_id = netdev_get_numa_id(port->netdev);
>
> /* If there is no netdev on the numa node, deletes the pmd threads
> - * for that numa. Else, just reloads the queues. */
> + * for that numa. */
> if (!has_pmd_port_for_numa(dp, numa_id)) {
> dp_netdev_del_pmds_on_numa(dp, numa_id);
> }
> - dp_netdev_reload_pmds(dp);
> }
> + /* Reload queues of pmd threads. */
> + dp_netdev_reload_pmds(dp);
>
> port_unref(port);
> }
> @@ -2586,6 +2631,80 @@ dpif_netdev_wait(struct dpif *dpif)
> seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
> }
>
> +static void
> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
> + struct dp_netdev_port *port, int queue_id)
> +{
> + if (port_try_ref(port)) {
> + struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
> + txq->port = port;
> + txq->tx_qid = queue_id;
> + cmap_insert(&pmd->tx_queues, &txq->node,
> + hash_port_no(port->port_no));
> + }
> +}
> +
> +/* Configures tx_queues for non pmd thread. */
> +static void
> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
> +{
> + if (!cmap_is_empty(&pmd->tx_queues))
> + dp_netdev_pmd_detach_tx_queues(pmd);
> +
> + struct dp_netdev_port *port;
> + CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
> + dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
> + }
> +}
> +
> +static void
> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
> + struct dp_netdev_pmd_txq *txq)
> +{
> + cmap_remove(&pmd->tx_queues, &txq->node,
> + hash_port_no(txq->port->port_no));
> + port_unref(txq->port);
> + free(txq);
> +}
> +
> +/* Removes all queues from 'tx_queues' of pmd thread. */
> +static void
> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
> +{
> + struct dp_netdev_pmd_txq *txq;
> +
> + CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
> + dp_netdev_pmd_del_txq(pmd, txq);
> + }
> +}
> +
> +static void OVS_UNUSED
> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
> +{
> + struct dp_netdev_pmd_txq *txq;
> +
> + CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
> + VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
> + pmd->core_id, netdev_get_name(txq->port->netdev),
> + txq->tx_qid);
> + }
> +}
> +
> +static struct dp_netdev_pmd_txq *
> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
> + odp_port_t port_no)
> +{
> + struct dp_netdev_pmd_txq *txq;
> +
> + CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
> + &pmd->tx_queues) {
> + if (txq->port->port_no == port_no) {
> + return txq;
> + }
> + }
> + return NULL;
> +}
> +
> struct rxq_poll {
> struct dp_netdev_port *port;
> struct netdev_rxq *rx;
> @@ -2597,16 +2716,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> {
> struct rxq_poll *poll_list = *ppoll_list;
> struct dp_netdev_port *port;
> - int n_pmds_on_numa, index, i;
> + int n_pmds, n_pmds_on_numa, rx_index, i, n_txq;
>
> /* Simple scheduler for netdev rx polling. */
> + dp_netdev_pmd_detach_tx_queues(pmd);
> +
> for (i = 0; i < poll_cnt; i++) {
> port_unref(poll_list[i].port);
> }
>
> poll_cnt = 0;
> + n_pmds = get_n_pmd_threads(pmd->dp);
> n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
> - index = 0;
> + rx_index = 0;
>
> CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
> /* Calls port_try_ref() to prevent the main thread
> @@ -2617,7 +2739,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> int i;
>
> for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> - if ((index % n_pmds_on_numa) == pmd->index) {
> + if ((rx_index % n_pmds_on_numa) == pmd->index) {
> poll_list = xrealloc(poll_list,
> sizeof *poll_list * (poll_cnt + 1));
>
> @@ -2626,9 +2748,26 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> poll_list[poll_cnt].rx = port->rxq[i];
> poll_cnt++;
> }
> - index++;
> + rx_index++;
> + }
> +
> + }
> +
> + n_txq = netdev_n_txq(port->netdev);
> + /* If multi-queue isn't supported by netdev, add txq 0
> + * to that pmd thread. Otherwise, add all queues available
> + * for that thread. pmd->global_index used to prevent
> + * assigning same txq to multiple pmd threads on different
> + * NUMA sockets.*/
> + if (n_txq == 1) {
> + dp_netdev_pmd_add_txq(pmd, port, 0);
> + } else {
> + /* Last queue reserved for non pmd threads */
> + for (i = pmd->global_index; i < n_txq - 1; i += n_pmds) {
> + dp_netdev_pmd_add_txq(pmd, port, i);
> }
> }
> +
> /* Unrefs the port_try_ref(). */
> port_unref(port);
> }
> @@ -2697,6 +2836,8 @@ reload:
> goto reload;
> }
>
> + dp_netdev_pmd_detach_tx_queues(pmd);
> +
> for (i = 0; i < poll_cnt; i++) {
> port_unref(poll_list[i].port);
> }
> @@ -2810,16 +2951,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
> return next;
> }
>
> -static int
> -core_id_to_qid(unsigned core_id)
> -{
> - if (core_id != NON_PMD_CORE_ID) {
> - return core_id;
> - } else {
> - return ovs_numa_get_n_cores();
> - }
> -}
> -
> /* Configures the 'pmd' based on the input argument. */
> static void
> dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> @@ -2827,8 +2958,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> {
> pmd->dp = dp;
> pmd->index = index;
> + pmd->global_index = get_n_pmd_threads(dp);
> pmd->core_id = core_id;
> - pmd->tx_qid = core_id_to_qid(core_id);
> pmd->numa_id = numa_id;
>
> ovs_refcount_init(&pmd->ref_cnt);
> @@ -2839,9 +2970,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> ovs_mutex_init(&pmd->flow_mutex);
> dpcls_init(&pmd->cls);
> cmap_init(&pmd->flow_table);
> - /* init the 'flow_cache' since there is no
> + cmap_init(&pmd->tx_queues);
> + /* init the 'flow_cache' and 'tx_queues' since there is no
> * actual thread created for NON_PMD_CORE_ID. */
> if (core_id == NON_PMD_CORE_ID) {
> + dp_netdev_configure_non_pmd_txqs(pmd);
> emc_cache_init(&pmd->flow_cache);
> }
> cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
> @@ -2854,6 +2987,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
> dp_netdev_pmd_flow_flush(pmd);
> dpcls_destroy(&pmd->cls);
> cmap_destroy(&pmd->flow_table);
> + cmap_destroy(&pmd->tx_queues);
> ovs_mutex_destroy(&pmd->flow_mutex);
> latch_destroy(&pmd->exit_latch);
> xpthread_cond_destroy(&pmd->cond);
> @@ -2870,6 +3004,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
> * no actual thread uninit it for NON_PMD_CORE_ID. */
> if (pmd->core_id == NON_PMD_CORE_ID) {
> emc_cache_uninit(&pmd->flow_cache);
> + dp_netdev_pmd_detach_tx_queues(pmd);
> } else {
> latch_set(&pmd->exit_latch);
> dp_netdev_reload_pmd__(pmd);
> @@ -3490,13 +3625,15 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
> struct dp_netdev *dp = pmd->dp;
> int type = nl_attr_type(a);
> struct dp_netdev_port *p;
> + struct dp_netdev_pmd_txq *txq;
> int i;
>
> switch ((enum ovs_action_attr)type) {
> case OVS_ACTION_ATTR_OUTPUT:
> - p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
> - if (OVS_LIKELY(p)) {
> - netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
> + txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a)));
> + if (OVS_LIKELY(txq)) {
> + netdev_send(txq->port->netdev, txq->tx_qid,
> + packets, cnt, may_steal);
> return;
> }
> break;
>
More information about the dev
mailing list