[ovs-dev] [PATCH] ofproto-dpif-upcall: Improve upcall handling fairness.

Alex Wang alexw at nicira.com
Fri Nov 22 01:54:39 UTC 2013


This commit improves the upcall dispatching fairness by
introduing a 2-stage scheme.  And the two stages are run by two
threads, 'dispatcher' and 'distributor', respectively.

At the first stage, the dispatcher thread will read upcalls from
kernel and put the upcalls into the fair queues based on the L2
and L3 header information.  At the second stage, the distributor
thread will iterate over the fair queues in a Round Robin fashion.
Each time, it will insert the upcall at the front of queue into
the corresponding upcall handler thread's queue for processing.

Experiment shows big improvement in dispatching fairness and slight
improvement in flow setup rate.

Signed-off-by: Alex Wang <alexw at nicira.com>

---

RFC -> PATCH
- fix a bug cause by typo.
- use L2 and L3 header information at first stage.

---
 lib/guarded-list.c            |   12 ++
 lib/guarded-list.h            |    1 +
 ofproto/ofproto-dpif-upcall.c |  308 +++++++++++++++++++++++++++++++++--------
 3 files changed, 267 insertions(+), 54 deletions(-)

diff --git a/lib/guarded-list.c b/lib/guarded-list.c
index cbb2030..2dc0c48 100644
--- a/lib/guarded-list.c
+++ b/lib/guarded-list.c
@@ -44,6 +44,18 @@ guarded_list_is_empty(const struct guarded_list *list)
     return empty;
 }

+struct list*
+guarded_list_front(struct guarded_list *list)
+{
+    struct list *ret;
+
+    ovs_mutex_lock(&list->mutex);
+    ret = list_front(&list->list);
+    ovs_mutex_unlock(&list->mutex);
+
+    return ret;
+}
+
 /* If 'list' has fewer than 'max' elements, adds 'node' at the end of the list
  * and returns the number of elements now on the list.
  *
diff --git a/lib/guarded-list.h b/lib/guarded-list.h
index 625865d..1a26dc8 100644
--- a/lib/guarded-list.h
+++ b/lib/guarded-list.h
@@ -33,6 +33,7 @@ void guarded_list_destroy(struct guarded_list *);

 bool guarded_list_is_empty(const struct guarded_list *);

+struct list *guarded_list_front(struct guarded_list *);
 size_t guarded_list_push_back(struct guarded_list *, struct list *,
                               size_t max);
 struct list *guarded_list_pop_front(struct guarded_list *);
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index dba3d3b..36a731c 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -36,7 +36,9 @@
 #include "poll-loop.h"
 #include "vlog.h"

-#define MAX_QUEUE_LENGTH 512
+#define MAX_QUEUE_LENGTH 128
+#define UPCALL_QUEUES 512
+#define UPCALL_QUEUE_LENGTH 8192

 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);

@@ -45,7 +47,7 @@ COVERAGE_DEFINE(upcall_queue_overflow);
 COVERAGE_DEFINE(fmb_queue_overflow);
 COVERAGE_DEFINE(fmb_queue_revalidated);

-/* A thread that processes each upcall handed to it by the dispatcher thread,
+/* A thread that processes each upcall handed to it by the distributor thread,
  * forwards the upcall's packet, and then queues it to the main ofproto_dpif
  * to possibly set up a kernel flow as a cache. */
 struct handler {
@@ -58,8 +60,8 @@ struct handler {
     struct list upcalls OVS_GUARDED;
     size_t n_upcalls OVS_GUARDED;

-    size_t n_new_upcalls;              /* Only changed by the dispatcher. */
-    bool need_signal;                  /* Only changed by the dispatcher. */
+    size_t n_new_upcalls;              /* Only changed by the distributor. */
+    bool need_signal;                  /* Only changed by the distributor. */

     pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
                                           'mutex'. */
@@ -67,11 +69,12 @@ struct handler {

 /* An upcall handler for ofproto_dpif.
  *
- * udpif is implemented as a "dispatcher" thread that reads upcalls from the
- * kernel.  It processes each upcall just enough to figure out its next
- * destination.  For a "miss" upcall (MISS_UPCALL), this is one of several
- * "handler" threads (see struct handler).  Other upcalls are queued to the
- * main ofproto_dpif. */
+ * udpif is implemented as two threads, "dispatcher" and "distributor".
+ * "dispatcher" thread reads upcalls from the kernel and puts the upcalls
+ * to the corresponding fair queues based on L2+L3 header.  "distributor"
+ * thread reads upcalls from fair queues in a round-robin fashion and puts
+ * the upcalls to the corresponding upcall handler's queue based on L4
+ * header. */
 struct udpif {
     struct dpif *dpif;                 /* Datapath handle. */
     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
@@ -79,6 +82,7 @@ struct udpif {
     uint32_t secret;                   /* Random seed for upcall hash. */

     pthread_t dispatcher;              /* Dispatcher thread ID. */
+    pthread_t distributor;             /* Distributor thread ID. */

     struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
@@ -93,6 +97,33 @@ struct udpif {
     struct seq *wait_seq;

     struct latch exit_latch; /* Tells child threads to exit. */
+
+    /* Fair queues for upcalls. */
+    struct guarded_list upcall_queues[UPCALL_QUEUES];
+
+    /* For waking up the distributor thread, when there are upcalls
+     * to distribute. */
+    struct seq *distributor_seq;
+    /* For waking up the distributor thread, when the handler's queue
+     * has more room. */
+    struct seq *hol_block_seq;
+
+    struct ovs_mutex mutex;        /* Mutex guarding the following. */
+
+    /* Contains the index of non-empty "upcall_queues". */
+    struct list non_empty_list;
+    /* Indicates if upcall_queue at index has already been in
+     * non_empty_list. */
+    int non_empty_list_map[UPCALL_QUEUES];
+    /* Indicates if upcall_queue at index is blocked by the front upcall. */
+    int hol_block_map[UPCALL_QUEUES];
+};
+
+/* In udpif's "non_empty_list".  Used to store an index, which indicates
+ * a non-empty upcall_queue. */
+struct non_empty_list_entry {
+    struct list list_node;
+    int value;
 };

 enum upcall_type {
@@ -107,6 +138,9 @@ struct upcall {
     struct list list_node;          /* For queuing upcalls. */
     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */

+    uint32_t hash_1;                /* Used by dispatcher thread. */
+    uint32_t hash_2;                /* Used by distributor thread. */
+
     /* Raw upcall plus data for keeping track of the memory backing it. */
     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
@@ -118,24 +152,34 @@ static void upcall_destroy(struct upcall *);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);

 static void recv_upcalls(struct udpif *);
+static void distribute_upcalls(struct udpif *);
 static void handle_upcalls(struct udpif *, struct list *upcalls);
 static void miss_destroy(struct flow_miss *);
 static void *udpif_dispatcher(void *);
+static void *udpif_distributor(void *);
 static void *udpif_upcall_handler(void *);

 struct udpif *
 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
 {
     struct udpif *udpif = xzalloc(sizeof *udpif);
+    size_t i;

     udpif->dpif = dpif;
     udpif->backer = backer;
     udpif->secret = random_uint32();
     udpif->wait_seq = seq_create();
+    udpif->distributor_seq = seq_create();
+    udpif->hol_block_seq = seq_create();
     latch_init(&udpif->exit_latch);
+    list_init(&udpif->non_empty_list);
     guarded_list_init(&udpif->drop_keys);
     guarded_list_init(&udpif->fmbs);
+    ovs_mutex_init(&udpif->mutex);
     atomic_init(&udpif->reval_seq, 0);
+    for (i = 0; i < UPCALL_QUEUES; i++) {
+        guarded_list_init(&udpif->upcall_queues[i]);
+    }

     return udpif;
 }
@@ -145,6 +189,7 @@ udpif_destroy(struct udpif *udpif)
 {
     struct flow_miss_batch *fmb;
     struct drop_key *drop_key;
+    size_t i;

     udpif_recv_set(udpif, 0, false);

@@ -160,6 +205,12 @@ udpif_destroy(struct udpif *udpif)
     guarded_list_destroy(&udpif->fmbs);
     latch_destroy(&udpif->exit_latch);
     seq_destroy(udpif->wait_seq);
+    seq_destroy(udpif->distributor_seq);
+    seq_destroy(udpif->hol_block_seq);
+    ovs_mutex_destroy(&udpif->mutex);
+    for (i = 0; i < UPCALL_QUEUES; i++) {
+        guarded_list_destroy(&udpif->upcall_queues[i]);
+    }
     free(udpif);
 }

@@ -189,6 +240,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
         }

         xpthread_join(udpif->dispatcher, NULL);
+        xpthread_join(udpif->distributor, NULL);
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
             struct upcall *miss, *next;
@@ -230,6 +282,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
                             handler);
         }
         xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
+        xpthread_create(&udpif->distributor, NULL, udpif_distributor, udpif);
     }
 }

@@ -368,8 +421,8 @@ udpif_drop_key_clear(struct udpif *udpif)
     }
 }
 
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
+/* The dispatcher thread is responsible for reading upcalls from
+ * the kernel, assigning them to udpif's "upcall_queues". */
 static void *
 udpif_dispatcher(void *arg)
 {
@@ -386,9 +439,36 @@ udpif_dispatcher(void *arg)
     return NULL;
 }

-/* The miss handler thread is responsible for processing miss upcalls retrieved
- * by the dispatcher thread.  Once finished it passes the processed miss
- * upcalls to ofproto-dpif where they're installed in the datapath. */
+/* The distributor thread is responsible for reading upcalls from
+ * udpif's "upcall_queues" and distributing them into corresponding
+ * upcall handler's queue.
+ *
+ * If all of the front upcalls in "upcall_queues" are blocked (similar
+ * to head-of-line blocking), the distributor thread will wait on the
+ * hol_block_seq. */
+static void *
+udpif_distributor(void *arg)
+{
+    struct udpif *udpif = arg;
+
+    set_subprogram_name("distributor");
+    while (!latch_is_set(&udpif->exit_latch)) {
+        uint64_t distributor_seq = seq_read(udpif->distributor_seq);
+        uint64_t hol_block_seq = seq_read(udpif->hol_block_seq);
+
+        distribute_upcalls(udpif);
+        seq_wait(udpif->distributor_seq, distributor_seq);
+        seq_wait(udpif->hol_block_seq, hol_block_seq);
+        latch_wait(&udpif->exit_latch);
+        poll_block();
+    }
+
+    return NULL;
+}
+
+/* The miss handler thread is responsible for processing miss upcalls handed
+ * to it by the distributor thread.  Once finished it passes the processed
+ * miss upcalls to ofproto-dpif where they're installed in the datapath. */
 static void *
 udpif_upcall_handler(void *arg)
 {
@@ -420,6 +500,9 @@ udpif_upcall_handler(void *arg)
         }
         ovs_mutex_unlock(&handler->mutex);

+        /* Changes the udpif->hol_block_seq every time reading a batch of
+         * upcalls. */
+        seq_change(handler->udpif->hol_block_seq);
         handle_upcalls(handler->udpif, &misses);

         coverage_clear();
@@ -490,19 +573,21 @@ classify_upcall(const struct upcall *upcall)
 static void
 recv_upcalls(struct udpif *udpif)
 {
-    int n;
-
+    /* recv upcalls from dpif and put them into the upcall_queues. */
     for (;;) {
-        uint32_t hash = udpif->secret;
-        struct handler *handler;
         struct upcall *upcall;
-        size_t n_bytes, left;
+        struct guarded_list *queue;
+        size_t n_bytes_1, n_bytes_2, left;
         struct nlattr *nla;
-        int error;
+        int idx, error;

         upcall = xmalloc(sizeof *upcall);
-        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
+        upcall->hash_1 = udpif->secret;
+        upcall->hash_2 = udpif->secret;
+        ofpbuf_use_stub(&upcall->upcall_buf,
+                        upcall->upcall_stub,
                         sizeof upcall->upcall_stub);
+
         error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
                           &upcall->upcall_buf);
         if (error) {
@@ -510,53 +595,166 @@ recv_upcalls(struct udpif *udpif)
             break;
         }

-        n_bytes = 0;
+        n_bytes_1 = 0;
+        n_bytes_2 = 0;
         NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
                           upcall->dpif_upcall.key_len) {
             enum ovs_key_attr type = nl_attr_type(nla);
-            if (type == OVS_KEY_ATTR_IN_PORT
-                || type == OVS_KEY_ATTR_TCP
-                || type == OVS_KEY_ATTR_UDP) {
+
+            if (type == OVS_KEY_ATTR_ETHERNET) {
+                if (nl_attr_get_size(nla) == 12) {
+                    uint32_t hash = hash_bytes(nl_attr_get_unspec(nla, 12),
+                                               12, 0);
+                    upcall->hash_1 = mhash_add(upcall->hash_1, hash);
+                    n_bytes_1 += 4;
+                    continue;
+                }
+            } else if (type == OVS_KEY_ATTR_IPV4) {
+                if (nl_attr_get_size(nla) == 12) {
+                    uint32_t hash = hash_bytes(nl_attr_get_unspec(nla, 8),
+                                               8, 0);
+
+                    upcall->hash_1 = mhash_add(upcall->hash_1, hash);
+                    n_bytes_1 += 4;
+                    continue;
+                }
+            } else if (type == OVS_KEY_ATTR_IPV6) {
+                if (nl_attr_get_size(nla) == 40) {
+                    uint32_t hash = hash_bytes(nl_attr_get_unspec(nla, 32),
+                                               32, 0);
+                    upcall->hash_1 = mhash_add(upcall->hash_1, hash);
+                    n_bytes_1 += 4;
+                    continue;
+                }
+            }  else if (type == OVS_KEY_ATTR_IN_PORT
+                       || type == OVS_KEY_ATTR_TCP
+                       || type == OVS_KEY_ATTR_UDP) {
                 if (nl_attr_get_size(nla) == 4) {
-                    hash = mhash_add(hash, nl_attr_get_u32(nla));
-                    n_bytes += 4;
-                } else {
-                    VLOG_WARN_RL(&rl,
-                                 "Netlink attribute with incorrect size.");
+                    upcall->hash_2 = mhash_add(upcall->hash_2,
+                                               nl_attr_get_u32(nla));
+                    n_bytes_2 += 4;
+                    continue;
+                }
+            } else {
+                continue;
+            }
+            VLOG_WARN_RL(&rl, "Netlink attribute with incorrect size.");
+        }
+        upcall->hash_1 = mhash_finish(upcall->hash_1, n_bytes_1);
+        upcall->hash_2 = mhash_finish(upcall->hash_2, n_bytes_2);
+
+        /* Selects the upcall_queue based on L2 and L3 header hash. */
+        idx = upcall->hash_1 % UPCALL_QUEUES;
+        queue = &udpif->upcall_queues[idx];
+        /* Drops upcall if queue length exceeds UPCALL_QUEUE_LENGTH. */
+        if (!guarded_list_push_back(queue, &upcall->list_node,
+                                    UPCALL_QUEUE_LENGTH)) {
+            upcall_destroy(upcall);
+        } else {
+            /* If successfully inserted, checks if the queue should be recorded
+             * in the non_empty_list. */
+            ovs_mutex_lock(&udpif->mutex);
+            if (!udpif->non_empty_list_map[idx]
+                && !guarded_list_is_empty(queue)) {
+                struct non_empty_list_entry *entry = xmalloc(sizeof *entry);
+
+                entry->value = idx;
+                udpif->non_empty_list_map[idx] = 1;
+                list_push_back(&udpif->non_empty_list, &entry->list_node);
+                if (list_is_singleton(&udpif->non_empty_list)) {
+                    seq_change(udpif->distributor_seq);
                 }
             }
+            ovs_mutex_unlock(&udpif->mutex);
         }
-        hash =  mhash_finish(hash, n_bytes);
+    }
+}

-        handler = &udpif->handlers[hash % udpif->n_handlers];
+static void
+distribute_upcalls(struct udpif *udpif)
+{
+    int n;
+
+    while (!list_is_empty(&udpif->non_empty_list)) {
+        struct non_empty_list_entry *front;
+        struct guarded_list *queue;
+        struct upcall *upcall;
+        struct handler *handler;
+
+        front = CONTAINER_OF(list_front(&udpif->non_empty_list),
+                             struct non_empty_list_entry, list_node);
+
+        queue = &udpif->upcall_queues[front->value];
+        upcall = CONTAINER_OF(guarded_list_front(queue),
+                              struct upcall, list_node);
+
+        /* Selects handler based on IN_PORT and L4 header hash. */
+        handler = &udpif->handlers[upcall->hash_2 % udpif->n_handlers];

         ovs_mutex_lock(&handler->mutex);
-        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-            list_push_back(&handler->upcalls, &upcall->list_node);
-            if (handler->n_upcalls == 0) {
-                handler->need_signal = true;
-            }
-            handler->n_upcalls++;
-            if (handler->need_signal &&
-                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
-                handler->need_signal = false;
-                xpthread_cond_signal(&handler->wake_cond);
-            }
+        if (handler->n_upcalls >= MAX_QUEUE_LENGTH) {
             ovs_mutex_unlock(&handler->mutex);
-            if (!VLOG_DROP_DBG(&rl)) {
-                struct ds ds = DS_EMPTY_INITIALIZER;
-
-                odp_flow_key_format(upcall->dpif_upcall.key,
-                                    upcall->dpif_upcall.key_len,
-                                    &ds);
-                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
-                ds_destroy(&ds);
+
+            /* If the handler queue is full, moves front node to the back. */
+            ovs_mutex_lock(&udpif->mutex);
+            list_pop_front(&udpif->non_empty_list);
+            list_push_back(&udpif->non_empty_list, &front->list_node);
+
+            /* Updates the hol_block_map. */
+            if (!udpif->hol_block_map[front->value]) {
+                udpif->hol_block_map[front->value] = 1;
             }
+
+            /* If all queues are blocked by the head upcall, jumps out the
+             * loop. */
+            if (!memcmp(udpif->hol_block_map, udpif->non_empty_list_map,
+                        UPCALL_QUEUES)) {
+                ovs_mutex_unlock(&udpif->mutex);
+                break;
+            }
+            ovs_mutex_unlock(&udpif->mutex);
+            continue;
+	}
+
+        /* Since the queue is not blocked, removes it from hol_block_map. */
+        if (udpif->hol_block_map[front->value]) {
+            udpif->hol_block_map[front->value] = 0;
+        }
+
+        /* Pops out the front upcall, since can be accepted by handler. */
+        guarded_list_pop_front(queue);
+        list_push_back(&handler->upcalls, &upcall->list_node);
+        if (handler->n_upcalls == 0) {
+            handler->need_signal = true;
+        }
+        handler->n_upcalls++;
+        if (handler->need_signal &&
+            handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
+            handler->need_signal = false;
+            xpthread_cond_signal(&handler->wake_cond);
+        }
+        ovs_mutex_unlock(&handler->mutex);
+        if (!VLOG_DROP_DBG(&rl)) {
+            struct ds ds = DS_EMPTY_INITIALIZER;
+
+            odp_flow_key_format(upcall->dpif_upcall.key,
+                                upcall->dpif_upcall.key_len,
+                                &ds);
+            VLOG_DBG("distributor: enqueue (%s)", ds_cstr(&ds));
+            ds_destroy(&ds);
+        }
+
+        /* Do not insert 'front' back to non_empty_list if the
+         * list becomes empty. */
+        ovs_mutex_lock(&udpif->mutex);
+        list_pop_front(&udpif->non_empty_list);
+        if (guarded_list_is_empty(queue)) {
+            udpif->non_empty_list_map[front->value] = 0;
+            free(front);
         } else {
-            ovs_mutex_unlock(&handler->mutex);
-            COVERAGE_INC(upcall_queue_overflow);
-            upcall_destroy(upcall);
+            list_push_back(&udpif->non_empty_list, &front->list_node);
         }
+        ovs_mutex_unlock(&udpif->mutex);
     }

     for (n = 0; n < udpif->n_handlers; ++n) {
@@ -569,6 +767,8 @@ recv_upcalls(struct udpif *udpif)
             ovs_mutex_unlock(&handler->mutex);
         }
     }
+
+    memset(udpif->hol_block_map, '\0', UPCALL_QUEUES);
 }

 static struct flow_miss *
--
1.7.9.5




More information about the dev mailing list