[ovs-dev] [PATCH 15/17] dpif-netdev: Use hmap for poll_list in pmd threads.

Daniele Di Proietto diproiettod at vmware.com
Wed Nov 16 00:46:10 UTC 2016


A future commit will use this to determine if a queue is already
contained in a pmd thread.

To keep the behavior unaltered we now have to sort queues before
printing them in pmd_info_show_rxq().

Also this commit introduces 'struct polled_queue' that will be used
exclusively in the fast path, uses 'struct dp_netdev_rxq' from 'struct
rxq_poll' and uses 'rx' for 'netdev_rxq' and 'rxq' for 'dp_netdev_rxq'.

Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
---
 lib/dpif-netdev.c | 164 ++++++++++++++++++++++++++++++++++++------------------
 1 file changed, 109 insertions(+), 55 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 72eb5b6..0a88df3 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -282,7 +282,9 @@ enum pmd_cycles_counter_type {
 
 /* Contained by struct dp_netdev_port's 'rxqs' member.  */
 struct dp_netdev_rxq {
-    struct netdev_rxq *rxq;
+    struct dp_netdev_port *port;
+    struct netdev_rxq *rx;
+
     unsigned core_id;                  /* Core to which this queue should be
                                           pinned. RXQ_CORE_UNPINNED if the
                                           queue doesn't need to be pinned to a
@@ -420,11 +422,15 @@ struct dp_netdev_pmd_cycles {
     atomic_ullong n[PMD_N_CYCLES];
 };
 
+struct polled_queue {
+    struct netdev_rxq *rx;
+    odp_port_t port_no;
+};
+
 /* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
 struct rxq_poll {
-    struct dp_netdev_port *port;
-    struct netdev_rxq *rx;
-    struct ovs_list node;
+    struct dp_netdev_rxq *rxq;
+    struct hmap_node node;
 };
 
 /* Contained by struct dp_netdev_pmd_thread's 'send_port_cache',
@@ -505,9 +511,7 @@ struct dp_netdev_pmd_thread {
 
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
-    struct ovs_list poll_list OVS_GUARDED;
-    /* Number of elements in 'poll_list' */
-    int poll_cnt;
+    struct hmap poll_list OVS_GUARDED;
     /* Map of 'tx_port's used for transmission.  Written by the main thread,
      * read by the pmd thread. */
     struct hmap tx_ports OVS_GUARDED;
@@ -592,8 +596,8 @@ static void dp_netdev_add_port_to_pmds(struct dp_netdev *dp,
 static void dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
                                          struct dp_netdev_port *port);
 static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                                     struct dp_netdev_port *port,
-                                     struct netdev_rxq *rx);
+                                     struct dp_netdev_rxq *rxq)
+    OVS_REQUIRES(pmd->port_mutex);
 static struct dp_netdev_pmd_thread *
 dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
@@ -789,12 +793,56 @@ pmd_info_clear_stats(struct ds *reply OVS_UNUSED,
     }
 }
 
+static int
+compare_poll_list(const void *a_, const void *b_)
+{
+    const struct rxq_poll *a = a_;
+    const struct rxq_poll *b = b_;
+
+    const char *namea = netdev_rxq_get_name(a->rxq->rx);
+    const char *nameb = netdev_rxq_get_name(b->rxq->rx);
+
+    int cmp = strcmp(namea, nameb);
+    if (!cmp) {
+        return netdev_rxq_get_queue_id(a->rxq->rx)
+               - netdev_rxq_get_queue_id(b->rxq->rx);
+    } else {
+        return cmp;
+    }
+}
+
+static void
+sorted_poll_list(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **list,
+                 size_t *n)
+{
+    struct rxq_poll *ret, *poll;
+    size_t i;
+
+    *n = hmap_count(&pmd->poll_list);
+    if (!*n) {
+        ret = NULL;
+    } else {
+        ret = xcalloc(*n, sizeof *ret);
+        i = 0;
+        HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+            ret[i] = *poll;
+            i++;
+        }
+        ovs_assert(i == *n);
+    }
+
+    qsort(ret, *n, sizeof *ret, compare_poll_list);
+
+    *list = ret;
+}
+
 static void
 pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
 {
     if (pmd->core_id != NON_PMD_CORE_ID) {
-        struct rxq_poll *poll;
         const char *prev_name = NULL;
+        struct rxq_poll *list;
+        size_t i, n;
 
         ds_put_format(reply,
                       "pmd thread numa_id %d core_id %u:\n\tisolated : %s\n",
@@ -802,21 +850,23 @@ pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
                                                   ? "true" : "false");
 
         ovs_mutex_lock(&pmd->port_mutex);
-        LIST_FOR_EACH (poll, node, &pmd->poll_list) {
-            const char *name = netdev_get_name(poll->port->netdev);
+        sorted_poll_list(pmd, &list, &n);
+        for (i = 0; i < n; i++) {
+            const char *name = netdev_rxq_get_name(list[i].rxq->rx);
 
             if (!prev_name || strcmp(name, prev_name)) {
                 if (prev_name) {
                     ds_put_cstr(reply, "\n");
                 }
-                ds_put_format(reply, "\tport: %s\tqueue-id:",
-                              netdev_get_name(poll->port->netdev));
+                ds_put_format(reply, "\tport: %s\tqueue-id:", name);
             }
-            ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
+            ds_put_format(reply, " %d",
+                          netdev_rxq_get_queue_id(list[i].rxq->rx));
             prev_name = name;
         }
         ovs_mutex_unlock(&pmd->port_mutex);
         ds_put_cstr(reply, "\n");
+        free(list);
     }
 }
 
@@ -1301,7 +1351,8 @@ port_create(const char *devname, const char *type,
     port->dynamic_txqs = dynamic_txqs;
 
     for (i = 0; i < port->n_rxq; i++) {
-        error = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+        port->rxqs[i].port = port;
+        error = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
         if (error) {
             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
                      devname, ovs_strerror(errno));
@@ -1323,7 +1374,7 @@ port_create(const char *devname, const char *type,
 
 out_rxq_close:
     for (i = 0; i < n_open_rxqs; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
+        netdev_rxq_close(port->rxqs[i].rx);
     }
     ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->type);
@@ -1461,7 +1512,7 @@ port_destroy(struct dp_netdev_port *port)
     netdev_restore_flags(port->sf);
 
     for (unsigned i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
+        netdev_rxq_close(port->rxqs[i].rx);
     }
     ovs_mutex_destroy(&port->txq_used_mutex);
     free(port->rxq_affinity_list);
@@ -2903,27 +2954,27 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 
 static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
-                           struct dp_netdev_port *port,
-                           struct netdev_rxq *rxq)
+                           struct netdev_rxq *rx,
+                           odp_port_t port_no)
 {
     struct dp_packet_batch batch;
     int error;
 
     dp_packet_batch_init(&batch);
     cycles_count_start(pmd);
-    error = netdev_rxq_recv(rxq, &batch);
+    error = netdev_rxq_recv(rx, &batch);
     cycles_count_end(pmd, PMD_CYCLES_POLLING);
     if (!error) {
         *recirc_depth_get() = 0;
 
         cycles_count_start(pmd);
-        dp_netdev_input(pmd, &batch, port->port_no);
+        dp_netdev_input(pmd, &batch, port_no);
         cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
-                    netdev_get_name(port->netdev), ovs_strerror(error));
+                    netdev_rxq_get_name(rx), ovs_strerror(error));
     }
 }
 
@@ -2939,8 +2990,8 @@ port_reconfigure(struct dp_netdev_port *port)
 
     /* Closes the existing 'rxq's. */
     for (i = 0; i < port->n_rxq; i++) {
-        netdev_rxq_close(port->rxqs[i].rxq);
-        port->rxqs[i].rxq = NULL;
+        netdev_rxq_close(port->rxqs[i].rx);
+        port->rxqs[i].rx = NULL;
     }
     port->n_rxq = 0;
 
@@ -2959,7 +3010,8 @@ port_reconfigure(struct dp_netdev_port *port)
     port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
 
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
-        err = netdev_rxq_open(netdev, &port->rxqs[i].rxq, i);
+        port->rxqs[i].port = port;
+        err = netdev_rxq_open(netdev, &port->rxqs[i].rx, i);
         if (err) {
             return err;
         }
@@ -3044,8 +3096,8 @@ dpif_netdev_run(struct dpif *dpif)
                 int i;
 
                 for (i = 0; i < port->n_rxq; i++) {
-                    dp_netdev_process_rxq_port(non_pmd, port,
-                                               port->rxqs[i].rxq);
+                    dp_netdev_process_rxq_port(non_pmd, port->rxqs[i].rx,
+                                               port->port_no);
                 }
             }
         }
@@ -3085,7 +3137,7 @@ dpif_netdev_wait(struct dpif *dpif)
             int i;
 
             for (i = 0; i < port->n_rxq; i++) {
-                netdev_rxq_wait(port->rxqs[i].rxq);
+                netdev_rxq_wait(port->rxqs[i].rx);
             }
         }
     }
@@ -3141,18 +3193,21 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
 
 static int
 pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
-                          struct rxq_poll **ppoll_list)
+                          struct polled_queue **ppoll_list)
 {
-    struct rxq_poll *poll_list = *ppoll_list;
+    struct polled_queue *poll_list = *ppoll_list;
     struct rxq_poll *poll;
     int i;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
+    poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
+                                    * sizeof *poll_list);
 
     i = 0;
-    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
-        poll_list[i++] = *poll;
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        poll_list[i].rx = poll->rxq->rx;
+        poll_list[i].port_no = poll->rxq->port->port_no;
+        i++;
     }
 
     pmd_load_cached_ports(pmd);
@@ -3168,7 +3223,7 @@ pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
     unsigned int lc = 0;
-    struct rxq_poll *poll_list;
+    struct polled_queue *poll_list;
     bool exiting;
     int poll_cnt;
     int i;
@@ -3186,7 +3241,7 @@ reload:
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
        VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
-                pmd->core_id, netdev_get_name(poll_list[i].port->netdev),
+                pmd->core_id, netdev_rxq_get_name(poll_list[i].rx),
                 netdev_rxq_get_queue_id(poll_list[i].rx));
     }
 
@@ -3199,7 +3254,8 @@ reload:
 
     for (;;) {
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
+            dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
+                                       poll_list[i].port_no);
         }
 
         if (lc++ > 1024) {
@@ -3358,7 +3414,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->dp = dp;
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
-    pmd->poll_cnt = 0;
 
     *CONST_CAST(int *, &pmd->static_tx_qid) = (core_id == NON_PMD_CORE_ID)
                                               ? ovs_numa_get_n_cores()
@@ -3376,7 +3431,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     cmap_init(&pmd->flow_table);
     cmap_init(&pmd->classifiers);
     pmd->next_optimization = time_msec() + DPCLS_OPTIMIZATION_INTERVAL;
-    ovs_list_init(&pmd->poll_list);
+    hmap_init(&pmd->poll_list);
     hmap_init(&pmd->tx_ports);
     hmap_init(&pmd->tnl_port_cache);
     hmap_init(&pmd->send_port_cache);
@@ -3398,6 +3453,7 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
     hmap_destroy(&pmd->send_port_cache);
     hmap_destroy(&pmd->tnl_port_cache);
     hmap_destroy(&pmd->tx_ports);
+    hmap_destroy(&pmd->poll_list);
     /* All flows (including their dpcls_rules) have been deleted already */
     CMAP_FOR_EACH (cls, node, &pmd->classifiers) {
         dpcls_destroy(cls);
@@ -3508,10 +3564,9 @@ dp_netdev_pmd_clear_ports(struct dp_netdev_pmd_thread *pmd)
     struct tx_port *port;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+    HMAP_FOR_EACH_POP (poll, node, &pmd->poll_list) {
         free(poll);
     }
-    pmd->poll_cnt = 0;
     HMAP_FOR_EACH_POP (port, node, &pmd->tx_ports) {
         free(port);
     }
@@ -3544,11 +3599,10 @@ dp_netdev_del_port_from_pmd__(struct dp_netdev_port *port,
     bool found = false;
 
     ovs_mutex_lock(&pmd->port_mutex);
-    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
-        if (poll->port == port) {
+    HMAP_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
+        if (poll->rxq->port == port) {
             found = true;
-            ovs_list_remove(&poll->node);
-            pmd->poll_cnt--;
+            hmap_remove(&pmd->poll_list, &poll->node);
             free(poll);
         }
     }
@@ -3617,8 +3671,8 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         if (!pmd->isolated && pmd->numa_id == numa_id
-            && (min_cnt > pmd->poll_cnt || res == NULL)) {
-            min_cnt = pmd->poll_cnt;
+            && (min_cnt > hmap_count(&pmd->poll_list) || res == NULL)) {
+            min_cnt = hmap_count(&pmd->poll_list);
             res = pmd;
         }
     }
@@ -3629,16 +3683,16 @@ dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
 /* Adds rx queue to poll_list of PMD thread. */
 static void
 dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
-                         struct dp_netdev_port *port, struct netdev_rxq *rx)
+                         struct dp_netdev_rxq *rxq)
     OVS_REQUIRES(pmd->port_mutex)
 {
-    struct rxq_poll *poll = xmalloc(sizeof *poll);
-
-    poll->port = port;
-    poll->rx = rx;
+    int qid = netdev_rxq_get_queue_id(rxq->rx);
+    uint32_t hash = hash_2words(odp_to_u32(rxq->port->port_no), qid);
+    struct rxq_poll *poll;
 
-    ovs_list_push_back(&pmd->poll_list, &poll->node);
-    pmd->poll_cnt++;
+    poll = xmalloc(sizeof *poll);
+    poll->rxq = rxq;
+    hmap_insert(&pmd->poll_list, &poll->node, hash);
 }
 
 /* Add 'port' to the tx port cache of 'pmd', which must be reloaded for the
@@ -3703,7 +3757,7 @@ dp_netdev_add_port_rx_to_pmds(struct dp_netdev *dp,
         }
 
         ovs_mutex_lock(&pmd->port_mutex);
-        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxqs[i].rxq);
+        dp_netdev_add_rxq_to_pmd(pmd, &port->rxqs[i]);
         ovs_mutex_unlock(&pmd->port_mutex);
 
         hmapx_add(to_reload, pmd);
-- 
2.10.2



More information about the dev mailing list