[ovs-dev] [PATCH v5] dpif-netdev: proper tx queue id

Ilya Maximets i.maximets at samsung.com
Fri Oct 2 09:23:40 UTC 2015


Ping v2

On 23.09.2015 15:22, Ilya Maximets wrote:
> Ping.
> 
> On 11.09.2015 14:38, Ilya Maximets wrote:
>> Currently tx_qid is equal to pmd->core_id. This leads to unexpected
>> behavior if pmd-cpu-mask different from '/(0*)(1|3|7)?(f*)/',
>> e.g. if core_ids are not sequential, or doesn't start from 0, or both.
>>
>> Example:
>> 	starting 2 pmd threads with 1 port, 2 rxqs per port,
>> 	pmd-cpu-mask = 00000014 and let dev->real_n_txq = 2
>>
>> 	It that case pmd_1->tx_qid = 2, pmd_2->tx_qid = 4 and
>> 	txq_needs_locking = true (if device hasn't ovs_numa_get_n_cores()+1
>> 	queues).
>>
>> 	In that case, after truncating in netdev_dpdk_send__():
>> 		'qid = qid % dev->real_n_txq;'
>> 	pmd_1: qid = 2 % 2 = 0
>> 	pmd_2: qid = 4 % 2 = 0
>>
>> 	So, both threads will call dpdk_queue_pkts() with same qid = 0.
>> 	This is unexpected behavior if there is 2 tx queues in device.
>> 	Queue #1 will not be used and both threads will lock queue #0
>> 	on each send.
>>
>> Fix that by introducing per pmd thread hash map 'tx_queues', where will
>> be stored all available tx queues for that pmd thread with
>> port_no as a key(hash). All tx_qid-s will be unique per port and
>> sequential to prevent described unexpected mapping after truncating.
>>
>> Implemented infrastructure can be used in the future to choose
>> between all tx queues available for that pmd thread.
>>
>> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
>> ---
>> version 5:
>> 	* txqs 0 from ports of non-pmd netdevs added to all pmd threads
>>
>> version 4:
>> 	* fixed distribution of tx queues if multiqueue is not supported
>>
>> version 3:
>> 	* fixed failing of unit tests by adding tx queues of non
>> 	  pmd devices to non pmd thread. (they haven't been used by any thread)
>> 	* pmd_flush_tx_queues --> dp_netdev_pmd_detach_tx_queues
>> 	* function names changed to dp_netdev_*
>> 	* dp_netdev_pmd_lookup_txq now looks by port_no.
>> 	* removed unnecessary dp_netdev_lookup_port in dp_execute_cb
>> 	  for OVS_ACTION_ATTR_OUTPUT.
>> 	* refactoring
>>
>>  lib/dpif-netdev.c | 173 ++++++++++++++++++++++++++++++++++++++++++++++--------
>>  1 file changed, 147 insertions(+), 26 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index db76290..65cd533 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
>>      atomic_ullong n[PMD_N_CYCLES];
>>  };
>>  
>> +struct dp_netdev_pmd_txq {
>> +    struct cmap_node node;        /* In owning dp_netdev_pmd_thread's */
>> +                                  /* 'tx_queues'. */
>> +    struct dp_netdev_port *port;
>> +    int tx_qid;
>> +};
>> +
>>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>>   * the performance overhead of interrupt processing.  Therefore netdev can
>>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
>> @@ -427,8 +434,8 @@ struct dp_netdev_pmd_thread {
>>                                      /* threads on same numa node. */
>>      unsigned core_id;               /* CPU core id of this pmd thread. */
>>      int numa_id;                    /* numa node id of this pmd thread. */
>> -    int tx_qid;                     /* Queue id used by this pmd thread to
>> -                                     * send packets on all netdevs */
>> +    struct cmap tx_queues;          /* Queue ids used by this pmd thread to
>> +                                     * send packets to ports */
>>  
>>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
>> @@ -470,6 +477,15 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *,
>>  
>>  static void dp_netdev_disable_upcall(struct dp_netdev *);
>>  void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>> +static void dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd);
>> +static void dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>> +                                  struct dp_netdev_port *port, int queue_id);
>> +static void dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>> +                                  struct dp_netdev_pmd_txq *txq);
>> +static void dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd);
>> +static struct dp_netdev_pmd_txq *
>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>> +                         odp_port_t port_no);
>>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>>                                      struct dp_netdev *dp, int index,
>>                                      unsigned core_id, int numa_id);
>> @@ -1051,6 +1067,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>>      struct netdev_saved_flags *sf;
>>      struct dp_netdev_port *port;
>>      struct netdev *netdev;
>> +    struct dp_netdev_pmd_thread *non_pmd;
>>      enum netdev_flags flags;
>>      const char *open_type;
>>      int error;
>> @@ -1127,10 +1144,15 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>>      ovs_refcount_init(&port->ref_cnt);
>>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>>  
>> -    if (netdev_is_pmd(netdev)) {
>> -        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>> -        dp_netdev_reload_pmds(dp);
>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>> +    if (non_pmd) {
>> +        dp_netdev_pmd_add_txq(non_pmd, port, ovs_numa_get_n_cores());
>> +        dp_netdev_pmd_unref(non_pmd);
>>      }
>> +    if (netdev_is_pmd(netdev))
>> +        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
>> +    dp_netdev_reload_pmds(dp);
>> +
>>      seq_change(dp->port_seq);
>>  
>>      return 0;
>> @@ -1308,18 +1330,32 @@ static void
>>  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>>      OVS_REQUIRES(dp->port_mutex)
>>  {
>> +    struct dp_netdev_pmd_thread *non_pmd;
>> +
>>      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
>>      seq_change(dp->port_seq);
>> +
>> +    non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
>> +    if (non_pmd) {
>> +        /* There is only one txq for each port for non pmd thread */
>> +        struct dp_netdev_pmd_txq *txq;
>> +        txq = dp_netdev_pmd_lookup_txq(non_pmd, port->port_no);
>> +        if (OVS_LIKELY(txq))
>> +            dp_netdev_pmd_del_txq(non_pmd, txq);
>> +        dp_netdev_pmd_unref(non_pmd);
>> +    }
>> +
>>      if (netdev_is_pmd(port->netdev)) {
>>          int numa_id = netdev_get_numa_id(port->netdev);
>>  
>>          /* If there is no netdev on the numa node, deletes the pmd threads
>> -         * for that numa.  Else, just reloads the queues.  */
>> +         * for that numa. */
>>          if (!has_pmd_port_for_numa(dp, numa_id)) {
>>              dp_netdev_del_pmds_on_numa(dp, numa_id);
>>          }
>> -        dp_netdev_reload_pmds(dp);
>>      }
>> +    /* Reload queues of pmd threads. */
>> +    dp_netdev_reload_pmds(dp);
>>  
>>      port_unref(port);
>>  }
>> @@ -2580,6 +2616,80 @@ dpif_netdev_wait(struct dpif *dpif)
>>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>>  }
>>  
>> +static void
>> +dp_netdev_pmd_add_txq(struct dp_netdev_pmd_thread *pmd,
>> +                      struct dp_netdev_port *port, int queue_id)
>> +{
>> +    if (port_try_ref(port)) {
>> +        struct dp_netdev_pmd_txq * txq = xmalloc(sizeof *txq);
>> +        txq->port = port;
>> +        txq->tx_qid = queue_id;
>> +        cmap_insert(&pmd->tx_queues, &txq->node,
>> +                        hash_port_no(port->port_no));
>> +    }
>> +}
>> +
>> +/* Configures tx_queues for non pmd thread. */
>> +static void
>> +dp_netdev_configure_non_pmd_txqs(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    if (!cmap_is_empty(&pmd->tx_queues))
>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>> +    struct dp_netdev_port *port;
>> +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>> +        dp_netdev_pmd_add_txq(pmd, port, ovs_numa_get_n_cores());
>> +    }
>> +}
>> +
>> +static void
>> +dp_netdev_pmd_del_txq(struct dp_netdev_pmd_thread *pmd,
>> +                      struct dp_netdev_pmd_txq *txq)
>> +{
>> +    cmap_remove(&pmd->tx_queues, &txq->node,
>> +                hash_port_no(txq->port->port_no));
>> +    port_unref(txq->port);
>> +    free(txq);
>> +}
>> +
>> +/* Removes all queues from 'tx_queues' of pmd thread. */
>> +static void
>> +dp_netdev_pmd_detach_tx_queues(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>> +        dp_netdev_pmd_del_txq(pmd, txq);
>> +    }
>> +}
>> +
>> +static void OVS_UNUSED
>> +dp_netdev_pmd_tx_queues_print(struct dp_netdev_pmd_thread *pmd)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH (txq, node, &pmd->tx_queues) {
>> +        VLOG_INFO("Core_id: %d, Port: %s, tx_qid: %d\n",
>> +                   pmd->core_id, netdev_get_name(txq->port->netdev),
>> +                   txq->tx_qid);
>> +    }
>> +}
>> +
>> +static struct dp_netdev_pmd_txq *
>> +dp_netdev_pmd_lookup_txq(const struct dp_netdev_pmd_thread *pmd,
>> +                         odp_port_t port_no)
>> +{
>> +    struct dp_netdev_pmd_txq *txq;
>> +
>> +    CMAP_FOR_EACH_WITH_HASH (txq, node, hash_port_no(port_no),
>> +                             &pmd->tx_queues) {
>> +        if (txq->port->port_no == port_no) {
>> +            return txq;
>> +        }
>> +    }
>> +    return NULL;
>> +}
>> +
>>  struct rxq_poll {
>>      struct dp_netdev_port *port;
>>      struct netdev_rxq *rx;
>> @@ -2591,16 +2701,19 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>  {
>>      struct rxq_poll *poll_list = *ppoll_list;
>>      struct dp_netdev_port *port;
>> -    int n_pmds_on_numa, index, i;
>> +    int n_pmds_on_numa, rx_index, tx_index, i, n_txq;
>>  
>>      /* Simple scheduler for netdev rx polling. */
>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>>      for (i = 0; i < poll_cnt; i++) {
>>          port_unref(poll_list[i].port);
>>      }
>>  
>>      poll_cnt = 0;
>>      n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
>> -    index = 0;
>> +    rx_index = 0;
>> +    tx_index = 0;
>>  
>>      CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
>>          /* Calls port_try_ref() to prevent the main thread
>> @@ -2611,7 +2724,7 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>                  int i;
>>  
>>                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
>> -                    if ((index % n_pmds_on_numa) == pmd->index) {
>> +                    if ((rx_index % n_pmds_on_numa) == pmd->index) {
>>                          poll_list = xrealloc(poll_list,
>>                                          sizeof *poll_list * (poll_cnt + 1));
>>  
>> @@ -2620,9 +2733,20 @@ pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
>>                          poll_list[poll_cnt].rx = port->rxq[i];
>>                          poll_cnt++;
>>                      }
>> -                    index++;
>> +                    rx_index++;
>>                  }
>> +
>> +            }
>> +
>> +            n_txq = netdev_n_txq(port->netdev);
>> +            /* Last queue reserved for non pmd threads */
>> +            n_txq = n_txq == 1 ? 1 : n_txq - 1;
>> +            for (i = 0; i < n_txq; i++) {
>> +                if ((tx_index % n_pmds_on_numa) == pmd->index || n_txq == 1)
>> +                    dp_netdev_pmd_add_txq(pmd, port, i);
>> +                tx_index++;
>>              }
>> +
>>              /* Unrefs the port_try_ref(). */
>>              port_unref(port);
>>          }
>> @@ -2691,6 +2815,8 @@ reload:
>>          goto reload;
>>      }
>>  
>> +    dp_netdev_pmd_detach_tx_queues(pmd);
>> +
>>      for (i = 0; i < poll_cnt; i++) {
>>           port_unref(poll_list[i].port);
>>      }
>> @@ -2804,16 +2930,6 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
>>      return next;
>>  }
>>  
>> -static int
>> -core_id_to_qid(unsigned core_id)
>> -{
>> -    if (core_id != NON_PMD_CORE_ID) {
>> -        return core_id;
>> -    } else {
>> -        return ovs_numa_get_n_cores();
>> -    }
>> -}
>> -
>>  /* Configures the 'pmd' based on the input argument. */
>>  static void
>>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>> @@ -2822,7 +2938,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>      pmd->dp = dp;
>>      pmd->index = index;
>>      pmd->core_id = core_id;
>> -    pmd->tx_qid = core_id_to_qid(core_id);
>>      pmd->numa_id = numa_id;
>>  
>>      ovs_refcount_init(&pmd->ref_cnt);
>> @@ -2833,9 +2948,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>>      ovs_mutex_init(&pmd->flow_mutex);
>>      dpcls_init(&pmd->cls);
>>      cmap_init(&pmd->flow_table);
>> -    /* init the 'flow_cache' since there is no
>> +    cmap_init(&pmd->tx_queues);
>> +    /* init the 'flow_cache' and 'tx_queues' since there is no
>>       * actual thread created for NON_PMD_CORE_ID. */
>>      if (core_id == NON_PMD_CORE_ID) {
>> +        dp_netdev_configure_non_pmd_txqs(pmd);
>>          emc_cache_init(&pmd->flow_cache);
>>      }
>>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
>> @@ -2848,6 +2965,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
>>      dp_netdev_pmd_flow_flush(pmd);
>>      dpcls_destroy(&pmd->cls);
>>      cmap_destroy(&pmd->flow_table);
>> +    cmap_destroy(&pmd->tx_queues);
>>      ovs_mutex_destroy(&pmd->flow_mutex);
>>      latch_destroy(&pmd->exit_latch);
>>      xpthread_cond_destroy(&pmd->cond);
>> @@ -2864,6 +2982,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>>       * no actual thread uninit it for NON_PMD_CORE_ID. */
>>      if (pmd->core_id == NON_PMD_CORE_ID) {
>>          emc_cache_uninit(&pmd->flow_cache);
>> +        dp_netdev_pmd_detach_tx_queues(pmd);
>>      } else {
>>          latch_set(&pmd->exit_latch);
>>          dp_netdev_reload_pmd__(pmd);
>> @@ -3473,13 +3592,15 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
>>      struct dp_netdev *dp = pmd->dp;
>>      int type = nl_attr_type(a);
>>      struct dp_netdev_port *p;
>> +    struct dp_netdev_pmd_txq *txq;
>>      int i;
>>  
>>      switch ((enum ovs_action_attr)type) {
>>      case OVS_ACTION_ATTR_OUTPUT:
>> -        p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
>> -        if (OVS_LIKELY(p)) {
>> -            netdev_send(p->netdev, pmd->tx_qid, packets, cnt, may_steal);
>> +        txq = dp_netdev_pmd_lookup_txq(pmd, u32_to_odp(nl_attr_get_u32(a)));
>> +        if (OVS_LIKELY(txq)) {
>> +            netdev_send(txq->port->netdev, txq->tx_qid,
>> +                        packets, cnt, may_steal);
>>              return;
>>          }
>>          break;
>>



More information about the dev mailing list