[ovs-dev] [PATCH 3/8] netdev-dpdk: Add intermediate queue support.

Bhanuprakash Bodireddy bhanuprakash.bodireddy at intel.com
Wed Jun 7 09:21:00 UTC 2017


This commit introduces netdev_dpdk_eth_tx_queue() function that
implements intermediate queue and packet buffering. The packets get
buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is
reached and eventually gets transmitted.

To handle the case(eg: ping) where packets get stuck in the queue
at low rates, drain logic is implemented and gets invoked from
dp_netdev_drain_txq_ports() as part of PMD packet processing loop.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
Signed-off-by: Markus Magnusson <markus.magnusson at ericsson.com>
Co-authored-by: Markus Magnusson <markus.magnusson at ericsson.com>
---
 lib/dpif-netdev.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.c | 35 ++++++++++++++++++++++++++++++++++-
 2 files changed, 77 insertions(+), 2 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 2f224db..e1c43fe 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -332,6 +332,7 @@ enum pmd_cycles_counter_type {
 };
 
 #define XPS_TIMEOUT_MS 500LL
+#define LAST_USED_QID_NONE -1
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
@@ -492,7 +493,13 @@ struct rxq_poll {
 struct tx_port {
     struct dp_netdev_port *port;
     int qid;
-    long long last_used;
+    int last_used_qid;        /* Last queue id where packets got
+                                 enqueued. */
+    long long last_used;      /* In case XPS is enabled, it contains the
+                               * timestamp of the last time the port was
+                               * used by the thread to send data.  After
+                               * XPS_TIMEOUT_MS elapses the qid will be
+                               * marked as -1. */
     struct hmap_node node;
 };
 
@@ -3080,6 +3087,25 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
+dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *cached_tx_port;
+    int tx_qid;
+
+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
+        tx_qid = cached_tx_port->last_used_qid;
+
+        if (tx_qid != LAST_USED_QID_NONE) {
+            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
+                         cached_tx_port->port->dynamic_txqs);
+
+            /* Queue drained and mark it empty. */
+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
+        }
+    }
+}
+
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
@@ -4355,6 +4381,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->last_used_qid = LAST_USED_QID_NONE;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4925,6 +4952,14 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
 
     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 
+    /* The tx queue can change in XPS case, make sure packets in previous
+     * queue is drained properly. */
+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
+               tx->qid != tx->last_used_qid) {
+        netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs);
+        tx->last_used_qid = LAST_USED_QID_NONE;
+    }
+
     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
     return min_qid;
@@ -5020,6 +5055,13 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
                 tx_qid = pmd->static_tx_qid;
             }
 
+            /* In case these packets gets buffered into an intermediate
+             * queue and XPS is enabled the drain function could find a
+             * different tx qid assigned to its thread.  We keep track
+             * of the qid we're now using, that will trigger the drain
+             * function and will select the right queue to flush. */
+            p->last_used_qid = tx_qid;
+
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                         dynamic_txqs);
             return;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 13b4487..4a9d9aa 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -1429,6 +1429,7 @@ static inline int
 netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
                          struct rte_mbuf **pkts, int cnt)
 {
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
     uint32_t nb_tx = 0;
 
     while (nb_tx != cnt) {
@@ -1452,6 +1453,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
         }
     }
 
+    txq->count = 0;
     return cnt - nb_tx;
 }
 
@@ -1836,6 +1838,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
     }
 }
 
+/* Enqueue packets in an intermediate queue and call the drain
+ * function when the queue is full.  This way we can amortize the
+ * cost of MMIO writes. */
+static inline int
+netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
+                            struct rte_mbuf **pkts, int cnt)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    int i = 0;
+    int dropped = 0;
+
+    while (i < cnt) {
+        int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq->count;
+        int tocopy = MIN(freeslots, cnt-i);
+
+        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
+               tocopy * sizeof (struct rte_mbuf *));
+
+        txq->count += tocopy;
+        i += tocopy;
+
+        /* Queue full, burst the packets */
+        if (txq->count >= INTERIM_QUEUE_BURST_THRESHOLD) {
+           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
+                   txq->count);
+        }
+    }
+    return dropped;
+}
+
 static int
 netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                        struct dp_packet_batch *batch,
@@ -1884,7 +1917,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
         cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
         dropped = batch->count - cnt;
 
-        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
+        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
 
         if (OVS_UNLIKELY(dropped)) {
             rte_spinlock_lock(&dev->stats_lock);
-- 
2.4.11



More information about the dev mailing list