[ovs-dev] [ovs-dev, 3/6] netdev-dpdk: Add intermediate queue support.

Ilya Maximets i.maximets at samsung.com
Wed Jun 28 08:33:39 UTC 2017


At first, this patch should be applied after the patch with
flushing on reconfiguration because we must not reconfigure
ports while there are unsent packets in the intermediate queue.
Otherwise we may destroy the memory pool which contains that
packets and will try to send them after that. This may lead to
serious problems.

Second thing is that you should also modify 'dpdk_do_tx_copy'
function, otherwise where will be reordering issues and flood
traffic will have accidentally higher priority because not
buffered.

Best regards, Ilya Maximets.


On 18.06.2017 22:56, Bhanuprakash Bodireddy wrote:
> This commit introduces netdev_dpdk_eth_tx_queue() function that
> implements intermediate queue and packet buffering. The packets get
> buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is
> reached and eventually gets transmitted.
> 
> To handle the case(eg: ping) where packets are sent at low rate and
> can potentially get stuck in the queue, flush logic is implemented
> that gets invoked from dp_netdev_flush_txq_ports() as part of PMD packet
> processing loop.
> 
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
> Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
> Signed-off-by: Markus Magnusson <markus.magnusson at ericsson.com>
> Co-authored-by: Markus Magnusson <markus.magnusson at ericsson.com>
> Acked-by: Eelco Chaudron <echaudro at redhat.com>
> ---
>  lib/dpif-netdev.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>  lib/netdev-dpdk.c | 35 ++++++++++++++++++++++++++++++++++-
>  2 files changed, 77 insertions(+), 2 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 2b65dc7..d59208e 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type {
>  };
>  
>  #define XPS_TIMEOUT_MS 500LL
> +#define LAST_USED_QID_NONE -1
>  
>  /* Contained by struct dp_netdev_port's 'rxqs' member.  */
>  struct dp_netdev_rxq {
> @@ -492,7 +493,13 @@ struct rxq_poll {
>  struct tx_port {
>      struct dp_netdev_port *port;
>      int qid;
> -    long long last_used;
> +    int last_used_qid;        /* Last queue id where packets got
> +                                 enqueued. */
> +    long long last_used;      /* In case XPS is enabled, it contains the
> +                               * timestamp of the last time the port was
> +                               * used by the thread to send data.  After
> +                               * XPS_TIMEOUT_MS elapses the qid will be
> +                               * marked as -1. */
>      struct hmap_node node;
>  };
>  
> @@ -3081,6 +3088,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
>  }
>  
>  static void
> +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd)
> +{
> +    struct tx_port *cached_tx_port;
> +    int tx_qid;
> +
> +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
> +        tx_qid = cached_tx_port->last_used_qid;
> +
> +        if (tx_qid != LAST_USED_QID_NONE) {
> +            netdev_txq_flush(cached_tx_port->port->netdev, tx_qid,
> +                         cached_tx_port->port->dynamic_txqs);
> +
> +            /* Queue flushed and mark it empty. */
> +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
> +        }
> +    }
> +}
> +
> +static void
>  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                             struct netdev_rxq *rx,
>                             odp_port_t port_no)
> @@ -4356,6 +4382,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>  
>      tx->port = port;
>      tx->qid = -1;
> +    tx->last_used_qid = LAST_USED_QID_NONE;
>  
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>      pmd->need_reload = true;
> @@ -4926,6 +4953,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>  
>      dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>  
> +    /* The tx queue can change in XPS case, make sure packets in previous
> +     * queue is flushed properly. */
> +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
> +               tx->qid != tx->last_used_qid) {
> +        netdev_txq_flush(port->netdev, tx->last_used_qid, port->dynamic_txqs);
> +        tx->last_used_qid = LAST_USED_QID_NONE;
> +    }
> +
>      VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>               pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>      return min_qid;
> @@ -5021,6 +5056,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
>                  tx_qid = pmd->static_tx_qid;
>              }
>  
> +            /* In case these packets gets buffered into an intermediate
> +             * queue and XPS is enabled the flush function could find a
> +             * different tx qid assigned to its thread.  We keep track
> +             * of the qid we're now using, that will trigger the flush
> +             * function and will select the right queue to flush. */
> +            p->last_used_qid = tx_qid;
> +
>              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>                          dynamic_txqs);
>              return;
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 1e83116..50a9a2c 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -1434,6 +1434,7 @@ static inline int
>  netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>                           struct rte_mbuf **pkts, int cnt)
>  {
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>      uint32_t nb_tx = 0;
>  
>      while (nb_tx != cnt) {
> @@ -1457,6 +1458,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>          }
>      }
>  
> +    txq->dpdk_pkt_cnt = 0;
>      return cnt - nb_tx;
>  }
>  
> @@ -1841,6 +1843,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
>      }
>  }
>  
> +/* Enqueue packets in an intermediate queue and call the flush
> + * function when the queue is full.  This way we can amortize the
> + * cost of MMIO writes. */
> +static inline int
> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
> +                            struct rte_mbuf **pkts, int cnt)
> +{
> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> +    int i = 0;
> +    int dropped = 0;
> +
> +    while (i < cnt) {
> +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->dpdk_pkt_cnt;
> +        int tocopy = MIN(freeslots, cnt-i);
> +
> +        memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i],
> +               tocopy * sizeof (struct rte_mbuf *));
> +
> +        txq->dpdk_pkt_cnt += tocopy;
> +        i += tocopy;
> +
> +        /* Queue full, burst the packets. */
> +        if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
> +            dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->dpdk_burst_pkts,
> +                                                txq->dpdk_pkt_cnt);
> +        }
> +    }
> +    return dropped;
> +}
> +
>  static int
>  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>                         struct dp_packet_batch *batch,
> @@ -1889,7 +1922,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>          cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
>          dropped = batch->count - cnt;
>  
> -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
> +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
>  
>          if (OVS_UNLIKELY(dropped)) {
>              rte_spinlock_lock(&dev->stats_lock);
> 


More information about the dev mailing list