[ovs-dev] [ovs-dev, 3/6] netdev-dpdk: Add intermediate queue support.

Bodireddy, Bhanuprakash bhanuprakash.bodireddy at intel.com
Wed Jun 28 21:12:35 UTC 2017


>At first, this patch should be applied after the patch with flushing on
>reconfiguration because we must not reconfigure ports while there are
>unsent packets in the intermediate queue.
>Otherwise we may destroy the memory pool which contains that packets and
>will try to send them after that. This may lead to serious problems.

This is a good point. Will handle this appropriately in next version.

>
>Second thing is that you should also modify 'dpdk_do_tx_copy'
>function, otherwise where will be reordering issues and flood traffic will have
>accidentally higher priority because not buffered.

I presume you are referring to netdev_dpdk_eth_tx_burst()where we burst the packets. You think we should queue the packets here and flush them?

- Bhanuprakash.

>On 18.06.2017 22:56, Bhanuprakash Bodireddy wrote:
>> This commit introduces netdev_dpdk_eth_tx_queue() function that
>> implements intermediate queue and packet buffering. The packets get
>> buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is
>> reached and eventually gets transmitted.
>>
>> To handle the case(eg: ping) where packets are sent at low rate and
>> can potentially get stuck in the queue, flush logic is implemented
>> that gets invoked from dp_netdev_flush_txq_ports() as part of PMD
>> packet processing loop.
>>
>> Signed-off-by: Bhanuprakash Bodireddy
>> <bhanuprakash.bodireddy at intel.com>
>> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
>> Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
>> Signed-off-by: Markus Magnusson <markus.magnusson at ericsson.com>
>> Co-authored-by: Markus Magnusson <markus.magnusson at ericsson.com>
>> Acked-by: Eelco Chaudron <echaudro at redhat.com>
>> ---
>>  lib/dpif-netdev.c | 44
>+++++++++++++++++++++++++++++++++++++++++++-
>>  lib/netdev-dpdk.c | 35 ++++++++++++++++++++++++++++++++++-
>>  2 files changed, 77 insertions(+), 2 deletions(-)
>>
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index
>> 2b65dc7..d59208e 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type {  };
>>
>>  #define XPS_TIMEOUT_MS 500LL
>> +#define LAST_USED_QID_NONE -1
>>
>>  /* Contained by struct dp_netdev_port's 'rxqs' member.  */  struct
>> dp_netdev_rxq { @@ -492,7 +493,13 @@ struct rxq_poll {  struct tx_port
>> {
>>      struct dp_netdev_port *port;
>>      int qid;
>> -    long long last_used;
>> +    int last_used_qid;        /* Last queue id where packets got
>> +                                 enqueued. */
>> +    long long last_used;      /* In case XPS is enabled, it contains the
>> +                               * timestamp of the last time the port was
>> +                               * used by the thread to send data.  After
>> +                               * XPS_TIMEOUT_MS elapses the qid will be
>> +                               * marked as -1. */
>>      struct hmap_node node;
>>  };
>>
>> @@ -3081,6 +3088,25 @@ cycles_count_end(struct
>dp_netdev_pmd_thread
>> *pmd,  }
>>
>>  static void
>> +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd) {
>> +    struct tx_port *cached_tx_port;
>> +    int tx_qid;
>> +
>> +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
>> +        tx_qid = cached_tx_port->last_used_qid;
>> +
>> +        if (tx_qid != LAST_USED_QID_NONE) {
>> +            netdev_txq_flush(cached_tx_port->port->netdev, tx_qid,
>> +                         cached_tx_port->port->dynamic_txqs);
>> +
>> +            /* Queue flushed and mark it empty. */
>> +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
>> +        }
>> +    }
>> +}
>> +
>> +static void
>>  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>>                             struct netdev_rxq *rx,
>>                             odp_port_t port_no) @@ -4356,6 +4382,7 @@
>> dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>>
>>      tx->port = port;
>>      tx->qid = -1;
>> +    tx->last_used_qid = LAST_USED_QID_NONE;
>>
>>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no));
>>      pmd->need_reload = true;
>> @@ -4926,6 +4953,14 @@ dpif_netdev_xps_get_tx_qid(const struct
>> dp_netdev_pmd_thread *pmd,
>>
>>      dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>>
>> +    /* The tx queue can change in XPS case, make sure packets in previous
>> +     * queue is flushed properly. */
>> +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
>> +               tx->qid != tx->last_used_qid) {
>> +        netdev_txq_flush(port->netdev, tx->last_used_qid, port-
>>dynamic_txqs);
>> +        tx->last_used_qid = LAST_USED_QID_NONE;
>> +    }
>> +
>>      VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>>               pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>>      return min_qid;
>> @@ -5021,6 +5056,13 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
>>                  tx_qid = pmd->static_tx_qid;
>>              }
>>
>> +            /* In case these packets gets buffered into an intermediate
>> +             * queue and XPS is enabled the flush function could find a
>> +             * different tx qid assigned to its thread.  We keep track
>> +             * of the qid we're now using, that will trigger the flush
>> +             * function and will select the right queue to flush. */
>> +            p->last_used_qid = tx_qid;
>> +
>>              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>>                          dynamic_txqs);
>>              return;
>> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index
>> 1e83116..50a9a2c 100644
>> --- a/lib/netdev-dpdk.c
>> +++ b/lib/netdev-dpdk.c
>> @@ -1434,6 +1434,7 @@ static inline int
>> netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
>>                           struct rte_mbuf **pkts, int cnt)  {
>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>>      uint32_t nb_tx = 0;
>>
>>      while (nb_tx != cnt) {
>> @@ -1457,6 +1458,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk
>*dev, int qid,
>>          }
>>      }
>>
>> +    txq->dpdk_pkt_cnt = 0;
>>      return cnt - nb_tx;
>>  }
>>
>> @@ -1841,6 +1843,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid,
>struct dp_packet_batch *batch)
>>      }
>>  }
>>
>> +/* Enqueue packets in an intermediate queue and call the flush
>> + * function when the queue is full.  This way we can amortize the
>> + * cost of MMIO writes. */
>> +static inline int
>> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
>> +                            struct rte_mbuf **pkts, int cnt) {
>> +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
>> +
>> +    int i = 0;
>> +    int dropped = 0;
>> +
>> +    while (i < cnt) {
>> +        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq-
>>dpdk_pkt_cnt;
>> +        int tocopy = MIN(freeslots, cnt-i);
>> +
>> +        memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i],
>> +               tocopy * sizeof (struct rte_mbuf *));
>> +
>> +        txq->dpdk_pkt_cnt += tocopy;
>> +        i += tocopy;
>> +
>> +        /* Queue full, burst the packets. */
>> +        if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
>> +            dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq-
>>dpdk_burst_pkts,
>> +                                                txq->dpdk_pkt_cnt);
>> +        }
>> +    }
>> +    return dropped;
>> +}
>> +
>>  static int
>>  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>>                         struct dp_packet_batch *batch, @@ -1889,7
>> +1922,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>>          cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
>>          dropped = batch->count - cnt;
>>
>> -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
>> +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
>>
>>          if (OVS_UNLIKELY(dropped)) {
>>              rte_spinlock_lock(&dev->stats_lock);
>>


More information about the dev mailing list