[ovs-dev] [RFC] ofproto-dpif-upcall: Improve upcall handling fairness.

Alex Wang alexw at nicira.com
Wed Nov 20 02:09:14 UTC 2013


This commit improves the upcall dispatching fairness by
introduing a 2-stage scheme.  And the two stages are run in two
threads, 'dispatcher' and 'distributor', respectively.

At the first stage, the dispatcher thread will read upcalls from
kernel and put the upcalls into the fair queues based on the L2
header information.  At the second stage, the distributor thread
will iterate over the fair queues in a Round Robin fashion.  Each
time, it will insert the upcall at the front of the fair queue into
the corresponding upcall handler thread's queue for processing.

Experiment shows big improvement in handling fairness and slight
improvement in flow setup rate.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
 lib/guarded-list.c            |   12 ++
 lib/guarded-list.h            |    1 +
 ofproto/ofproto-dpif-upcall.c |  267 +++++++++++++++++++++++++++++++++--------
 3 files changed, 232 insertions(+), 48 deletions(-)

diff --git a/lib/guarded-list.c b/lib/guarded-list.c
index cbb2030..2dc0c48 100644
--- a/lib/guarded-list.c
+++ b/lib/guarded-list.c
@@ -44,6 +44,18 @@ guarded_list_is_empty(const struct guarded_list *list)
     return empty;
 }
 
+struct list*
+guarded_list_front(struct guarded_list *list)
+{
+    struct list *ret;
+
+    ovs_mutex_lock(&list->mutex);
+    ret = list_front(&list->list);
+    ovs_mutex_unlock(&list->mutex);
+
+    return ret;
+}
+
 /* If 'list' has fewer than 'max' elements, adds 'node' at the end of the list
  * and returns the number of elements now on the list.
  *
diff --git a/lib/guarded-list.h b/lib/guarded-list.h
index 625865d..1a26dc8 100644
--- a/lib/guarded-list.h
+++ b/lib/guarded-list.h
@@ -33,6 +33,7 @@ void guarded_list_destroy(struct guarded_list *);
 
 bool guarded_list_is_empty(const struct guarded_list *);
 
+struct list *guarded_list_front(struct guarded_list *);
 size_t guarded_list_push_back(struct guarded_list *, struct list *,
                               size_t max);
 struct list *guarded_list_pop_front(struct guarded_list *);
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index dba3d3b..97e9d22 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -36,7 +36,9 @@
 #include "poll-loop.h"
 #include "vlog.h"
 
-#define MAX_QUEUE_LENGTH 512
+#define MAX_QUEUE_LENGTH 128
+#define UPCALL_QUEUES 512
+#define UPCALL_QUEUE_LENGTH 64
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
@@ -45,7 +47,7 @@ COVERAGE_DEFINE(upcall_queue_overflow);
 COVERAGE_DEFINE(fmb_queue_overflow);
 COVERAGE_DEFINE(fmb_queue_revalidated);
 
-/* A thread that processes each upcall handed to it by the dispatcher thread,
+/* A thread that processes each upcall handed to it by the distributor thread,
  * forwards the upcall's packet, and then queues it to the main ofproto_dpif
  * to possibly set up a kernel flow as a cache. */
 struct handler {
@@ -58,8 +60,8 @@ struct handler {
     struct list upcalls OVS_GUARDED;
     size_t n_upcalls OVS_GUARDED;
 
-    size_t n_new_upcalls;              /* Only changed by the dispatcher. */
-    bool need_signal;                  /* Only changed by the dispatcher. */
+    size_t n_new_upcalls;              /* Only changed by the distributor. */
+    bool need_signal;                  /* Only changed by the distributor. */
 
     pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
                                           'mutex'. */
@@ -67,11 +69,12 @@ struct handler {
 
 /* An upcall handler for ofproto_dpif.
  *
- * udpif is implemented as a "dispatcher" thread that reads upcalls from the
- * kernel.  It processes each upcall just enough to figure out its next
- * destination.  For a "miss" upcall (MISS_UPCALL), this is one of several
- * "handler" threads (see struct handler).  Other upcalls are queued to the
- * main ofproto_dpif. */
+ * udpif is implemented as two threads, "dispatcher" and "distributor".
+ * "dispatcher" thread reads upcalls from the kernel and puts the upcalls
+ * to the corresponding fair queues based on L2+L3 header.  "distributor"
+ * thread reads upcalls from fair queues in a round-robin fashion and puts
+ * the upcalls to the corresponding upcall handler's queue based on L4
+ * header. */
 struct udpif {
     struct dpif *dpif;                 /* Datapath handle. */
     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
@@ -79,6 +82,7 @@ struct udpif {
     uint32_t secret;                   /* Random seed for upcall hash. */
 
     pthread_t dispatcher;              /* Dispatcher thread ID. */
+    pthread_t distributor;             /* Distributor thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
@@ -93,6 +97,33 @@ struct udpif {
     struct seq *wait_seq;
 
     struct latch exit_latch; /* Tells child threads to exit. */
+
+    /* Fair queues for upcalls. */
+    struct guarded_list upcall_queues[UPCALL_QUEUES];
+
+    /* For waking up the distributor thread, when there are upcalls
+     * to distribute. */
+    struct seq *distributor_seq;
+    /* For waking up the distributor thread, when the handler's queue
+     * has more room. */
+    struct seq *hol_block_seq;
+
+    struct ovs_mutex mutex;        /* Mutex guarding the following. */
+
+    /* Contains the index of non-empty "upcall_queues". */
+    struct list non_empty_list;
+    /* Indicates if upcall_queue at index has already been in
+     * non_empty_list. */
+    int non_empty_list_map[UPCALL_QUEUES];
+    /* Indicates if upcall_queue at index is blocked by the front upcall. */
+    int hol_block_map[UPCALL_QUEUES];
+};
+
+/* In udpif's "non_empty_list".  Used to store an index, which indicates
+ * a non-empty upcall_queue. */
+struct non_empty_list_entry {
+    struct list list_node;
+    int value;
 };
 
 enum upcall_type {
@@ -107,6 +138,9 @@ struct upcall {
     struct list list_node;          /* For queuing upcalls. */
     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
 
+    uint32_t hash_1;                /* Used by dispatcher thread. */
+    uint32_t hash_2;                /* Used by distributor thread. */
+
     /* Raw upcall plus data for keeping track of the memory backing it. */
     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
@@ -118,24 +152,34 @@ static void upcall_destroy(struct upcall *);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 static void recv_upcalls(struct udpif *);
+static void distribute_upcalls(struct udpif *);
 static void handle_upcalls(struct udpif *, struct list *upcalls);
 static void miss_destroy(struct flow_miss *);
 static void *udpif_dispatcher(void *);
+static void *udpif_distributor(void *);
 static void *udpif_upcall_handler(void *);
 
 struct udpif *
 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
 {
     struct udpif *udpif = xzalloc(sizeof *udpif);
+    size_t i;
 
     udpif->dpif = dpif;
     udpif->backer = backer;
     udpif->secret = random_uint32();
     udpif->wait_seq = seq_create();
+    udpif->distributor_seq = seq_create();
+    udpif->hol_block_seq = seq_create();
     latch_init(&udpif->exit_latch);
+    list_init(&udpif->non_empty_list);
     guarded_list_init(&udpif->drop_keys);
     guarded_list_init(&udpif->fmbs);
+    ovs_mutex_init(&udpif->mutex);
     atomic_init(&udpif->reval_seq, 0);
+    for (i = 0; i < UPCALL_QUEUES; i++) {
+        guarded_list_init(&udpif->upcall_queues[i]);
+    }
 
     return udpif;
 }
@@ -145,6 +189,7 @@ udpif_destroy(struct udpif *udpif)
 {
     struct flow_miss_batch *fmb;
     struct drop_key *drop_key;
+    size_t i;
 
     udpif_recv_set(udpif, 0, false);
 
@@ -160,6 +205,12 @@ udpif_destroy(struct udpif *udpif)
     guarded_list_destroy(&udpif->fmbs);
     latch_destroy(&udpif->exit_latch);
     seq_destroy(udpif->wait_seq);
+    seq_destroy(udpif->distributor_seq);
+    seq_destroy(udpif->hol_block_seq);
+    ovs_mutex_destroy(&udpif->mutex);
+    for (i = 0; i < UPCALL_QUEUES; i++) {
+        guarded_list_destroy(&udpif->upcall_queues[i]);
+    }
     free(udpif);
 }
 
@@ -189,6 +240,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
         }
 
         xpthread_join(udpif->dispatcher, NULL);
+        xpthread_join(udpif->distributor, NULL);
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
             struct upcall *miss, *next;
@@ -230,6 +282,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
                             handler);
         }
         xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
+        xpthread_create(&udpif->distributor, NULL, udpif_distributor, udpif);
     }
 }
 
@@ -368,8 +421,8 @@ udpif_drop_key_clear(struct udpif *udpif)
     }
 }
 
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
+/* The dispatcher thread is responsible for reading upcalls from
+ * the kernel, assigning them to udpif's "upcall_queues". */
 static void *
 udpif_dispatcher(void *arg)
 {
@@ -386,9 +439,36 @@ udpif_dispatcher(void *arg)
     return NULL;
 }
 
-/* The miss handler thread is responsible for processing miss upcalls retrieved
- * by the dispatcher thread.  Once finished it passes the processed miss
- * upcalls to ofproto-dpif where they're installed in the datapath. */
+/* The distributor thread is responsible for reading upcalls from
+ * udpif's "upcall_queues" and distributing them into corresponding
+ * upcall handler's queue.
+ *
+ * If all of the front upcalls in "upcall_queues" are blocked (similar
+ * to head-of-line blocking), the distributor thread will wait on the
+ * hol_block_seq. */
+static void *
+udpif_distributor(void *arg)
+{
+    struct udpif *udpif = arg;
+
+    set_subprogram_name("distributor");
+    while (!latch_is_set(&udpif->exit_latch)) {
+        uint64_t distributor_seq = seq_read(udpif->distributor_seq);
+        uint64_t hol_block_seq = seq_read(udpif->hol_block_seq);
+
+        distribute_upcalls(udpif);
+        seq_wait(udpif->distributor_seq, distributor_seq);
+        seq_wait(udpif->hol_block_seq, hol_block_seq);
+        latch_wait(&udpif->exit_latch);
+        poll_block();
+    }
+
+    return NULL;
+}
+
+/* The miss handler thread is responsible for processing miss upcalls handed
+ * to it by the distributor thread.  Once finished it passes the processed
+ * miss upcalls to ofproto-dpif where they're installed in the datapath. */
 static void *
 udpif_upcall_handler(void *arg)
 {
@@ -420,6 +500,9 @@ udpif_upcall_handler(void *arg)
         }
         ovs_mutex_unlock(&handler->mutex);
 
+        /* Changes the udpif->hol_block_seq every time reading a batch of
+         * upcalls. */
+        seq_change(handler->udpif->hol_block_seq);
         handle_upcalls(handler->udpif, &misses);
 
         coverage_clear();
@@ -490,19 +573,21 @@ classify_upcall(const struct upcall *upcall)
 static void
 recv_upcalls(struct udpif *udpif)
 {
-    int n;
-
+    /* recv upcalls from dpif and put them into the upcall_queues. */
     for (;;) {
-        uint32_t hash = udpif->secret;
-        struct handler *handler;
         struct upcall *upcall;
+        struct guarded_list *queue;
         size_t n_bytes, left;
         struct nlattr *nla;
-        int error;
+        int idx, error;
 
         upcall = xmalloc(sizeof *upcall);
-        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
+        upcall->hash_1 = udpif->secret;
+        upcall->hash_2 = udpif->secret;
+        ofpbuf_use_stub(&upcall->upcall_buf,
+                        upcall->upcall_stub,
                         sizeof upcall->upcall_stub);
+
         error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
                           &upcall->upcall_buf);
         if (error) {
@@ -514,11 +599,20 @@ recv_upcalls(struct udpif *udpif)
         NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
                           upcall->dpif_upcall.key_len) {
             enum ovs_key_attr type = nl_attr_type(nla);
-            if (type == OVS_KEY_ATTR_IN_PORT
-                || type == OVS_KEY_ATTR_TCP
-                || type == OVS_KEY_ATTR_UDP) {
+            if (type == OVS_KEY_ATTR_ETHERNET) {
+                if (nl_attr_get_size(nla) == 12) {
+                    upcall->hash_1 = hash_bytes(nl_attr_get_unspec(nla, 12),
+                                                2 * ETH_ADDR_LEN, 0);
+                } else {
+                    VLOG_WARN_RL(&rl,
+                                 "Netlink attribute with incorrect size.");
+                }
+            } else if (type == OVS_KEY_ATTR_IN_PORT
+                       || type == OVS_KEY_ATTR_TCP
+                       || type == OVS_KEY_ATTR_UDP) {
                 if (nl_attr_get_size(nla) == 4) {
-                    hash = mhash_add(hash, nl_attr_get_u32(nla));
+                    upcall->hash_2 = mhash_add(upcall->hash_2,
+                                               nl_attr_get_u32(nla));
                     n_bytes += 4;
                 } else {
                     VLOG_WARN_RL(&rl,
@@ -526,37 +620,112 @@ recv_upcalls(struct udpif *udpif)
                 }
             }
         }
-        hash =  mhash_finish(hash, n_bytes);
+        upcall->hash_2 = mhash_finish(upcall->hash_2, n_bytes);
+
+        idx = upcall->hash_1 % UPCALL_QUEUES;
+        /* Selects the upcall_queues to insert the upcall. */
+        queue = &udpif->upcall_queues[idx];
+        /* Drops upcall if queue length exceeds UPCALL_QUEUE_LENGTH. */
+        if (!guarded_list_push_back(queue, &upcall->list_node,
+                                    UPCALL_QUEUE_LENGTH)) {
+            upcall_destroy(upcall);
+        } else {
+            /* If successfully inserted, check if the queue should be recorded
+             * in the non_empty_list. */
+            ovs_mutex_lock(&udpif->mutex);
+            if (!udpif->non_empty_list_map[idx]
+                && !guarded_list_is_empty(queue)) {
+                struct non_empty_list_entry *entry = xmalloc(sizeof *entry);
+
+                entry->value = idx;
+                udpif->non_empty_list_map[idx] = 1;
+                list_push_back(&udpif->non_empty_list, &entry->list_node);
+                if (list_is_singleton(&udpif->non_empty_list)) {
+                    seq_change(udpif->distributor_seq);
+                }
+            }
+            ovs_mutex_unlock(&udpif->mutex);
+        }
+    }
+}
+
+static void
+distribute_upcalls(struct udpif *udpif)
+{
+    int n;
 
-        handler = &udpif->handlers[hash % udpif->n_handlers];
+    while (!list_is_empty(&udpif->non_empty_list)) {
+        struct non_empty_list_entry *front;
+        struct guarded_list *queue;
+        struct upcall *upcall;
+        struct handler *handler;
+
+        front = CONTAINER_OF(list_front(&udpif->non_empty_list),
+                             struct non_empty_list_entry, list_node);
+
+        queue = &udpif->upcall_queues[front->value];
+        upcall = CONTAINER_OF(guarded_list_front(queue),
+                              struct upcall, list_node);
+        handler = &udpif->handlers[upcall->hash_2 % udpif->n_handlers];
 
         ovs_mutex_lock(&handler->mutex);
-        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-            list_push_back(&handler->upcalls, &upcall->list_node);
-            if (handler->n_upcalls == 0) {
-                handler->need_signal = true;
-            }
-            handler->n_upcalls++;
-            if (handler->need_signal &&
-                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
-                handler->need_signal = false;
-                xpthread_cond_signal(&handler->wake_cond);
-            }
+        if (handler->n_upcalls >= MAX_QUEUE_LENGTH) {
             ovs_mutex_unlock(&handler->mutex);
-            if (!VLOG_DROP_DBG(&rl)) {
-                struct ds ds = DS_EMPTY_INITIALIZER;
-
-                odp_flow_key_format(upcall->dpif_upcall.key,
-                                    upcall->dpif_upcall.key_len,
-                                    &ds);
-                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
-                ds_destroy(&ds);
+
+            /* If the handler queue is full, moves front node to the back. */
+            ovs_mutex_lock(&udpif->mutex);
+            list_pop_front(&udpif->non_empty_list);
+            list_push_back(&udpif->non_empty_list, &front->list_node);
+
+            /* Updates the hol_block_map. */
+            if (!udpif->hol_block_map) {
+                udpif->hol_block_map[front->value] = 1;
             }
+
+            /* If all queues are blocked by the head upcall, jump out the
+             * loop. */
+            if (!memcmp(udpif->hol_block_map, udpif->non_empty_list_map,
+                        UPCALL_QUEUES)) {
+                ovs_mutex_unlock(&udpif->mutex);
+                break;
+            }
+            ovs_mutex_unlock(&udpif->mutex);
+            continue;
+	}
+
+        guarded_list_pop_front(queue);
+        list_push_back(&handler->upcalls, &upcall->list_node);
+        if (handler->n_upcalls == 0) {
+            handler->need_signal = true;
+        }
+        handler->n_upcalls++;
+        if (handler->need_signal &&
+            handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
+            handler->need_signal = false;
+            xpthread_cond_signal(&handler->wake_cond);
+        }
+        ovs_mutex_unlock(&handler->mutex);
+        if (!VLOG_DROP_DBG(&rl)) {
+            struct ds ds = DS_EMPTY_INITIALIZER;
+
+            odp_flow_key_format(upcall->dpif_upcall.key,
+                                upcall->dpif_upcall.key_len,
+                                &ds);
+            VLOG_DBG("distributor: enqueue (%s)", ds_cstr(&ds));
+            ds_destroy(&ds);
+        }
+
+        /* Do not insert 'front' back to non_empty_list if the
+         * list becomes empty. */
+        ovs_mutex_lock(&udpif->mutex);
+        list_pop_front(&udpif->non_empty_list);
+        if (guarded_list_is_empty(queue)) {
+            udpif->non_empty_list_map[front->value] = 0;
+            free(front);
         } else {
-            ovs_mutex_unlock(&handler->mutex);
-            COVERAGE_INC(upcall_queue_overflow);
-            upcall_destroy(upcall);
+            list_push_back(&udpif->non_empty_list, &front->list_node);
         }
+        ovs_mutex_unlock(&udpif->mutex);
     }
 
     for (n = 0; n < udpif->n_handlers; ++n) {
@@ -569,6 +738,8 @@ recv_upcalls(struct udpif *udpif)
             ovs_mutex_unlock(&handler->mutex);
         }
     }
+
+    memset(udpif->hol_block_map, '\0', UPCALL_QUEUES);
 }
 
 static struct flow_miss *
-- 
1.7.9.5




More information about the dev mailing list