[ovs-dev] [PATCH v3 3/3] dpif-netdev: XPS (Transmit Packet Steering) implementation.

Ilya Maximets i.maximets at samsung.com
Mon Jul 11 15:15:31 UTC 2016


If CPU number in pmd-cpu-mask is not divisible by the number of queues and
in a few more complex situations there may be unfair distribution of TX
queue-ids between PMD threads.

For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask
such distribution is possible:
<------------------------------------------------------------------------>
pmd thread numa_id 0 core_id 13:
        port: vhost-user1       queue-id: 1
        port: dpdk0     queue-id: 3
pmd thread numa_id 0 core_id 14:
        port: vhost-user1       queue-id: 2
pmd thread numa_id 0 core_id 16:
        port: dpdk0     queue-id: 0
pmd thread numa_id 0 core_id 17:
        port: dpdk0     queue-id: 1
pmd thread numa_id 0 core_id 12:
        port: vhost-user1       queue-id: 0
        port: dpdk0     queue-id: 2
pmd thread numa_id 0 core_id 15:
        port: vhost-user1       queue-id: 3
<------------------------------------------------------------------------>

As we can see above dpdk0 port polled by threads on cores:
	12, 13, 16 and 17.

By design of dpif-netdev, there is only one TX queue-id assigned to each
pmd thread. This queue-id's are sequential similar to core-id's. And
thread will send packets to queue with exact this queue-id regardless
of port.

In previous example:

	pmd thread on core 12 will send packets to tx queue 0
	pmd thread on core 13 will send packets to tx queue 1
	...
	pmd thread on core 17 will send packets to tx queue 5

So, for dpdk0 port after truncating in netdev-dpdk:

	core 12 --> TX queue-id 0 % 4 == 0
	core 13 --> TX queue-id 1 % 4 == 1
	core 16 --> TX queue-id 4 % 4 == 0
	core 17 --> TX queue-id 5 % 4 == 1

As a result only 2 of 4 queues used.

To fix this issue some kind of XPS implemented in following way:

	* TX queue-ids are allocated dynamically.
	* When PMD thread first time tries to send packets to new port
	  it allocates less used TX queue for this port.
	* PMD threads periodically performes revalidation of
	  allocated TX queue-ids. If queue wasn't used in last XPS_CYCLES
	  it will be freed while revalidation.

Reported-by: Zhihong Wang <zhihong.wang at intel.com>
Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
---
 lib/dpif-netdev.c | 130 +++++++++++++++++++++++++++++++++++++++---------------
 1 file changed, 94 insertions(+), 36 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 3fb1942..5eed50c 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type {
     PMD_N_CYCLES
 };
 
+#define XPS_CYCLES 1000000000ULL
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -256,6 +258,8 @@ struct dp_netdev_port {
     struct netdev_saved_flags *sf;
     unsigned n_rxq;             /* Number of elements in 'rxq' */
     struct netdev_rxq **rxq;
+    unsigned *txq_used;         /* Number of threads that uses each tx queue. */
+    struct ovs_mutex txq_used_mutex;
     char *type;                 /* Port type as requested by user. */
 };
 
@@ -385,6 +389,8 @@ struct rxq_poll {
 /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
 struct tx_port {
     odp_port_t port_no;
+    int qid;
+    unsigned long long last_cycles;
     struct netdev *netdev;
     struct hmap_node node;
 };
@@ -541,6 +547,11 @@ static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
 static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
     OVS_REQUIRES(pmd->port_mutex);
 
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd);
+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+                                      struct tx_port *tx);
+
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
 
@@ -1185,7 +1196,9 @@ port_create(const char *devname, const char *open_type, const char *type,
     port->netdev = netdev;
     port->n_rxq = netdev_n_rxq(netdev);
     port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
     port->type = xstrdup(type);
+    ovs_mutex_init(&port->txq_used_mutex);
 
     for (i = 0; i < port->n_rxq; i++) {
         error = netdev_rxq_open(netdev, &port->rxq[i], i);
@@ -1211,7 +1224,9 @@ out_rxq_close:
     for (i = 0; i < n_open_rxqs; i++) {
         netdev_rxq_close(port->rxq[i]);
     }
+    ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->type);
+    free(port->txq_used);
     free(port->rxq);
     free(port);
 
@@ -1353,7 +1368,8 @@ port_destroy(struct dp_netdev_port *port)
     for (unsigned i = 0; i < port->n_rxq; i++) {
         netdev_rxq_close(port->rxq[i]);
     }
-
+    ovs_mutex_destroy(&port->txq_used_mutex);
+    free(port->txq_used);
     free(port->rxq);
     free(port->type);
     free(port);
@@ -1376,13 +1392,6 @@ get_port_by_name(struct dp_netdev *dp,
 }
 
 static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
-    /* There is one non pmd thread in dp->poll_threads */
-    return cmap_count(&dp->poll_threads) - 1;
-}
-
-static int
 get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
 {
     struct dp_netdev_pmd_thread *pmd;
@@ -2664,6 +2673,10 @@ port_reconfigure(struct dp_netdev_port *port)
     }
     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
     port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
+    /* Realloc 'used' counters for tx queues. */
+    free(port->txq_used);
+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
+
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
         err = netdev_rxq_open(netdev, &port->rxq[i], i);
         if (err) {
@@ -2743,6 +2756,7 @@ dpif_netdev_run(struct dpif *dpif)
             }
         }
     }
+    dpif_netdev_xps_revalidate_pmd(non_pmd);
     ovs_mutex_unlock(&dp->non_pmd_mutex);
 
     dp_netdev_pmd_unref(non_pmd);
@@ -3027,11 +3041,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->poll_cnt = 0;
 
-    atomic_init(&pmd->tx_qid,
-                (core_id == NON_PMD_CORE_ID)
-                ? ovs_numa_get_n_cores()
-                : get_n_pmd_threads(dp));
-
     ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
@@ -3122,18 +3131,16 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
     free(pmd_list);
 }
 
-/* Deletes all pmd threads on numa node 'numa_id' and
- * fixes tx_qids of other threads to keep them sequential. */
+/* Deletes all pmd threads on numa node 'numa_id'. */
 static void
 dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 {
     struct dp_netdev_pmd_thread *pmd;
-    int n_pmds_on_numa, n_pmds;
-    int *free_idx, k = 0;
+    int n_pmds_on_numa;
+    int k = 0;
     struct dp_netdev_pmd_thread **pmd_list;
 
     n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
-    free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
     pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
@@ -3141,7 +3148,6 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
          * 'dp->poll_threads' (while we're iterating it) and it
          * might quiesce. */
         if (pmd->numa_id == numa_id) {
-            atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
             pmd_list[k] = pmd;
             ovs_assert(k < n_pmds_on_numa);
             k++;
@@ -3152,21 +3158,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
         dp_netdev_del_pmd(dp, pmd_list[i]);
     }
 
-    n_pmds = get_n_pmd_threads(dp);
-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        int old_tx_qid;
-
-        atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
-
-        if (old_tx_qid >= n_pmds) {
-            int new_tx_qid = free_idx[--k];
-
-            atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
-        }
-    }
-
     free(pmd_list);
-    free(free_idx);
 }
 
 /* Deletes all rx queues from pmd->poll_list and all the ports from
@@ -3321,6 +3313,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
 
     tx->netdev = port->netdev;
     tx->port_no = port->port_no;
+    tx->qid = -1;
 
     ovs_mutex_lock(&pmd->port_mutex);
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
@@ -3980,6 +3973,73 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
     dp->upcall_cb = cb;
 }
 
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *tx;
+    struct dp_netdev_port *port;
+    struct dp_netdev *dp = pmd->dp;
+    unsigned long long interval;
+
+    HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
+        interval = pmd->last_cycles - tx->last_cycles;
+        if (tx->qid >= 0 && interval >= XPS_CYCLES) {
+            fat_rwlock_rdlock(&dp->port_rwlock);
+            port = dp_netdev_lookup_port(dp, tx->port_no);
+            ovs_mutex_lock(&port->txq_used_mutex);
+            port->txq_used[tx->qid]--;
+            ovs_mutex_unlock(&port->txq_used_mutex);
+            fat_rwlock_unlock(&dp->port_rwlock);
+            tx->qid = -1;
+        }
+    }
+}
+
+static int
+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+                           struct tx_port *tx)
+{
+    struct dp_netdev_port *port;
+    struct dp_netdev *dp = pmd->dp;
+    unsigned long long interval = pmd->last_cycles - tx->last_cycles;
+    int i, min_cnt, min_qid;
+
+    tx->last_cycles = pmd->last_cycles;
+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_CYCLES)) {
+        return tx->qid;
+    }
+
+    fat_rwlock_rdlock(&dp->port_rwlock);
+    port = dp_netdev_lookup_port(dp, tx->port_no);
+
+    ovs_mutex_lock(&port->txq_used_mutex);
+    if (tx->qid >= 0) {
+        port->txq_used[tx->qid]--;
+        tx->qid = -1;
+    }
+
+    min_cnt = -1;
+    min_qid = 0;
+    for (i = 0; i < netdev_n_txq(port->netdev); i++) {
+        if (port->txq_used[i] < min_cnt || min_cnt == -1) {
+            min_cnt = port->txq_used[i];
+            min_qid = i;
+        }
+    }
+
+    port->txq_used[min_qid]++;
+    tx->qid = min_qid;
+
+    ovs_mutex_unlock(&port->txq_used_mutex);
+    fat_rwlock_unlock(&dp->port_rwlock);
+
+    dpif_netdev_xps_revalidate_pmd(pmd);
+
+    VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.\n",
+             pmd->core_id, tx->qid, netdev_get_name(tx->netdev));
+    return min_qid;
+}
+
 static struct tx_port *
 pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
                          odp_port_t port_no)
@@ -4051,9 +4111,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
     case OVS_ACTION_ATTR_OUTPUT:
         p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
         if (OVS_LIKELY(p)) {
-            int tx_qid;
-
-            atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
+            int tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p);
 
             netdev_send(p->netdev, tx_qid, packets_, may_steal);
             return;
-- 
2.7.4




More information about the dev mailing list