[ovs-dev] [PATCH V5 5/5] ofproto-dpif-upcall: Remove the dispatcher thread.

Alex Wang alexw at nicira.com
Wed Mar 19 20:35:07 UTC 2014


With the foundation laid in previous commits, this commit
removes the 'dispatcher' thread by allowing 'handler'
threads to read upcalls directly from dpif.

This commit significantly simplifies the flow miss handling
code and brings slight improvement to flow setup rate.

Note:
- the flow setup rate improvement is more observable on
  top of Jarno's "datapath: Reduce lock contention."
  patch series.

Signed-off-by: Alex Wang <alexw at nicira.com>

---
V4 -> V5:
- rebase.

V3 -> V4:
- rebase.

V2 -> V3:
- rebase.

PATCH -> V2:
- rebase.
---
 ofproto/ofproto-dpif-upcall.c |  284 ++++++++++++-----------------------------
 ofproto/ofproto-dpif-xlate.c  |    6 +-
 2 files changed, 84 insertions(+), 206 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 25d7a84..0704abc 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -45,26 +45,13 @@
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(upcall_queue_overflow);
-
-/* A thread that processes each upcall handed to it by the dispatcher thread,
- * forwards the upcall's packet, and possibly sets up a kernel flow as a
- * cache. */
+/* A thread that reads upcalls from dpif, forwards each upcall's packet,
+ * and possibly sets up a kernel flow as a cache. */
 struct handler {
     struct udpif *udpif;               /* Parent udpif. */
     pthread_t thread;                  /* Thread ID. */
     char *name;                        /* Thread name. */
-
-    struct ovs_mutex mutex;            /* Mutex guarding the following. */
-
-    /* Atomic queue of unprocessed upcalls. */
-    struct list upcalls OVS_GUARDED;
-    size_t n_upcalls OVS_GUARDED;
-
-    bool need_signal;                  /* Only changed by the dispatcher. */
-
-    pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
-                                          'mutex'. */
+    uint32_t handler_id;               /* Handler id. */
 };
 
 /* A thread that processes each kernel flow handed to it by the flow_dumper
@@ -87,12 +74,18 @@ struct revalidator {
 
 /* An upcall handler for ofproto_dpif.
  *
- * udpif has two logically separate pieces:
+ * udpif keeps records of two kind of logically separate units:
+ *
+ *    - An array of 'struct handler's for upcall handling and flow
+ *      installation.
+ *
+ *    - An array of 'struct revalidator's for flow revalidation and
+ *      stats collection.
  *
- *    - A "dispatcher" thread that reads upcalls from the kernel and dispatches
- *      them to one of several "handler" threads (see struct handler).
+ * the following module is currently in use but is going to be removed
+ * in the near feature:
  *
- *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
+ *    - "flow_dumper" thread that reads the kernel flow table and dispatches
  *      flows to one of several "revalidator" threads (see struct
  *      revalidator). */
 struct udpif {
@@ -103,7 +96,6 @@ struct udpif {
 
     uint32_t secret;                   /* Random seed for upcall hash. */
 
-    pthread_t dispatcher;              /* Dispatcher thread ID. */
     pthread_t flow_dumper;             /* Flow dumper thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
@@ -143,7 +135,7 @@ enum upcall_type {
 };
 
 struct upcall {
-    struct list list_node;          /* For queuing upcalls. */
+    bool is_valid;                  /* If the upcall can be used. */
     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
 
     /* Raw upcall plus data for keeping track of the memory backing it. */
@@ -216,15 +208,14 @@ struct flow_miss {
     bool put;
 };
 
-static void upcall_destroy(struct upcall *);
+static void upcall_destroy(struct upcall *, bool free_upcall);
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
-static void recv_upcalls(struct udpif *);
-static void handle_upcalls(struct handler *handler, struct list *upcalls);
+static void handle_upcalls(struct handler *handler, struct upcall *upcalls,
+                           size_t n_upcalls);
 static void *udpif_flow_dumper(void *);
-static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
 static uint64_t udpif_get_n_flows(struct udpif *);
@@ -312,9 +303,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
             xpthread_join(handler->thread, NULL);
         }
 
@@ -328,7 +316,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         xpthread_join(udpif->flow_dumper, NULL);
-        xpthread_join(udpif->dispatcher, NULL);
 
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
@@ -350,17 +337,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-            struct upcall *miss, *next;
-
-            LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {
-                list_remove(&miss->list_node);
-                upcall_destroy(miss);
-            }
-            ovs_mutex_destroy(&handler->mutex);
-
-            xpthread_cond_destroy(&handler->wake_cond);
-            free(handler->name);
+            free(udpif->handlers[i].name);
         }
         latch_poll(&udpif->exit_latch);
 
@@ -373,7 +350,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         udpif->n_handlers = 0;
     }
 
-    error = dpif_handlers_set(udpif->dpif, 1);
+    error = dpif_handlers_set(udpif->dpif, n_handlers);
     if (error) {
         VLOG_ERR("failed to configure handlers in dpif %s: %s",
                  dpif_name(udpif->dpif), ovs_strerror(error));
@@ -392,10 +369,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             struct handler *handler = &udpif->handlers[i];
 
             handler->udpif = udpif;
-            list_init(&handler->upcalls);
-            handler->need_signal = false;
-            xpthread_cond_init(&handler->wake_cond, NULL);
-            ovs_mutex_init(&handler->mutex);
+            handler->handler_id = i;
             xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
                             handler);
         }
@@ -413,7 +387,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
                             revalidator);
         }
-        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
         xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 
@@ -458,16 +431,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 {
     size_t i;
 
-    simap_increase(usage, "dispatchers", 1);
     simap_increase(usage, "flow_dumpers", 1);
 
     simap_increase(usage, "handlers", udpif->n_handlers);
-    for (i = 0; i < udpif->n_handlers; i++) {
-        struct handler *handler = &udpif->handlers[i];
-        ovs_mutex_lock(&handler->mutex);
-        simap_increase(usage, "handler upcalls",  handler->n_upcalls);
-        ovs_mutex_unlock(&handler->mutex);
-    }
 
     simap_increase(usage, "revalidators", udpif->n_revalidators);
     for (i = 0; i < udpif->n_revalidators; i++) {
@@ -511,12 +477,16 @@ udpif_flush_all_datapaths(void)
 
 /* Destroys and deallocates 'upcall'. */
 static void
-upcall_destroy(struct upcall *upcall)
+upcall_destroy(struct upcall *upcall, bool free_upcall)
 {
     if (upcall) {
         ofpbuf_uninit(&upcall->dpif_upcall.packet);
         ofpbuf_uninit(&upcall->upcall_buf);
-        free(upcall);
+
+        upcall->is_valid = false;
+        if (free_upcall) {
+            free(upcall);
+        }
     }
 }
 
@@ -542,24 +512,6 @@ udpif_get_n_flows(struct udpif *udpif)
     return flow_count;
 }
 
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
-static void *
-udpif_dispatcher(void *arg)
-{
-    struct udpif *udpif = arg;
-
-    set_subprogram_name("dispatcher");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        recv_upcalls(udpif);
-        dpif_recv_wait(udpif->dpif, 0);
-        latch_wait(&udpif->exit_latch);
-        poll_block();
-    }
-
-    return NULL;
-}
-
 static void *
 udpif_flow_dumper(void *arg)
 {
@@ -679,38 +631,47 @@ skip:
     return NULL;
 }
 
-/* The miss handler thread is responsible for processing miss upcalls retrieved
- * by the dispatcher thread.  Once finished it passes the processed miss
- * upcalls to ofproto-dpif where they're installed in the datapath. */
+/* The upcall handler thread tries to read a batch of FLOW_MISS_MAX_BATCH
+ * upcalls from dpif, processes the batch and installs corresponding flows
+ * in dpif. */
 static void *
 udpif_upcall_handler(void *arg)
 {
     struct handler *handler = arg;
+    struct udpif *udpif = handler->udpif;
+    struct upcall upcalls[FLOW_MISS_MAX_BATCH];
 
     handler->name = xasprintf("handler_%u", ovsthread_id_self());
     set_subprogram_name("%s", handler->name);
 
     while (!latch_is_set(&handler->udpif->exit_latch)) {
-        struct list misses = LIST_INITIALIZER(&misses);
-        size_t i;
-
-        ovs_mutex_lock(&handler->mutex);
-        if (!handler->n_upcalls) {
-            ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
-        }
+        size_t i, n_upcalls;
 
         for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
-            if (handler->n_upcalls) {
-                handler->n_upcalls--;
-                list_push_back(&misses, list_pop_front(&handler->upcalls));
-            } else {
+            struct upcall *upcall = &upcalls[i];
+            int error;
+
+            ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
+                            sizeof upcall->upcall_stub);
+            error = dpif_recv(udpif->dpif, handler->handler_id,
+                              &upcall->dpif_upcall, &upcall->upcall_buf);
+            if (error) {
+                /* upcall_destroy() can only be called on successfully received
+                 * upcalls. */
+                ofpbuf_uninit(&upcall->upcall_buf);
                 break;
             }
+            upcall->is_valid = true;
         }
-        ovs_mutex_unlock(&handler->mutex);
-
-        handle_upcalls(handler, &misses);
 
+        n_upcalls = i;
+        if (!n_upcalls) {
+            dpif_recv_wait(udpif->dpif, handler->handler_id);
+            latch_wait(&udpif->exit_latch);
+            poll_block();
+        } else {
+            handle_upcalls(handler, upcalls, n_upcalls);
+        }
         coverage_clear();
     }
 
@@ -817,98 +778,12 @@ classify_upcall(const struct upcall *upcall)
     }
 }
 
-static void
-recv_upcalls(struct udpif *udpif)
-{
-    int n;
-
-    for (;;) {
-        uint32_t hash = udpif->secret;
-        struct handler *handler;
-        struct upcall *upcall;
-        size_t n_bytes, left;
-        struct nlattr *nla;
-        int error;
-
-        upcall = xmalloc(sizeof *upcall);
-        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
-                        sizeof upcall->upcall_stub);
-        error = dpif_recv(udpif->dpif, 0, &upcall->dpif_upcall,
-                          &upcall->upcall_buf);
-        if (error) {
-            /* upcall_destroy() can only be called on successfully received
-             * upcalls. */
-            ofpbuf_uninit(&upcall->upcall_buf);
-            free(upcall);
-            break;
-        }
-
-        n_bytes = 0;
-        NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
-                          upcall->dpif_upcall.key_len) {
-            enum ovs_key_attr type = nl_attr_type(nla);
-            if (type == OVS_KEY_ATTR_IN_PORT
-                || type == OVS_KEY_ATTR_TCP
-                || type == OVS_KEY_ATTR_UDP) {
-                if (nl_attr_get_size(nla) == 4) {
-                    hash = mhash_add(hash, nl_attr_get_u32(nla));
-                    n_bytes += 4;
-                } else {
-                    VLOG_WARN_RL(&rl,
-                                 "Netlink attribute with incorrect size.");
-                }
-            }
-        }
-        hash =  mhash_finish(hash, n_bytes);
-
-        handler = &udpif->handlers[hash % udpif->n_handlers];
-
-        ovs_mutex_lock(&handler->mutex);
-        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-            list_push_back(&handler->upcalls, &upcall->list_node);
-            if (handler->n_upcalls == 0) {
-                handler->need_signal = true;
-            }
-            handler->n_upcalls++;
-            if (handler->need_signal &&
-                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
-                handler->need_signal = false;
-                xpthread_cond_signal(&handler->wake_cond);
-            }
-            ovs_mutex_unlock(&handler->mutex);
-            if (!VLOG_DROP_DBG(&rl)) {
-                struct ds ds = DS_EMPTY_INITIALIZER;
-
-                odp_flow_key_format(upcall->dpif_upcall.key,
-                                    upcall->dpif_upcall.key_len,
-                                    &ds);
-                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
-                ds_destroy(&ds);
-            }
-        } else {
-            ovs_mutex_unlock(&handler->mutex);
-            COVERAGE_INC(upcall_queue_overflow);
-            upcall_destroy(upcall);
-        }
-    }
-
-    for (n = 0; n < udpif->n_handlers; ++n) {
-        struct handler *handler = &udpif->handlers[n];
-
-        if (handler->need_signal) {
-            handler->need_signal = false;
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-    }
-}
-
 /* Calculates slow path actions for 'xout'.  'buf' must statically be
  * initialized with at least 128 bytes of space. */
 static void
 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
-                  odp_port_t odp_in_port, struct ofpbuf *buf)
+                  struct flow *flow, odp_port_t odp_in_port,
+                  struct ofpbuf *buf)
 {
     union user_action_cookie cookie;
     odp_port_t port;
@@ -921,7 +796,7 @@ compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
         ? ODPP_NONE
         : odp_in_port;
-    pid = dpif_port_get_pid(udpif->dpif, port, 0);
+    pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
 }
 
@@ -941,7 +816,8 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
 }
 
 static void
-handle_upcalls(struct handler *handler, struct list *upcalls)
+handle_upcalls(struct handler *handler, struct upcall *upcalls,
+               size_t n_upcalls)
 {
     struct hmap misses = HMAP_INITIALIZER(&misses);
     struct udpif *udpif = handler->udpif;
@@ -950,7 +826,6 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
     struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
     struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
     struct flow_miss *miss, *next_miss;
-    struct upcall *upcall, *next;
     size_t n_misses, n_ops, i;
     unsigned int flow_limit;
     bool fail_open, may_put;
@@ -979,7 +854,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *     datapath flow.)
      */
     n_misses = 0;
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
+    for (i = 0; i < n_upcalls; i++) {
+        struct upcall *upcall = &upcalls[i];
         struct dpif_upcall *dupcall = &upcall->dpif_upcall;
         struct flow_miss *miss = &miss_buf[n_misses];
         struct ofpbuf *packet = &dupcall->packet;
@@ -1008,8 +884,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
                               NULL);
             }
-            list_remove(&upcall->list_node);
-            upcall_destroy(upcall);
+            upcall_destroy(upcall, false);
             continue;
         }
 
@@ -1092,8 +967,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         dpif_ipfix_unref(ipfix);
         dpif_sflow_unref(sflow);
 
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+        upcall_destroy(upcall, false);
     }
 
     /* Initialize each 'struct flow_miss's ->xout.
@@ -1136,12 +1010,17 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      * The loop fills 'ops' with an array of operations to execute in the
      * datapath. */
     n_ops = 0;
-    LIST_FOR_EACH (upcall, list_node, upcalls) {
+    for (i = 0; i < n_upcalls; i++) {
+        struct upcall *upcall = &upcalls[i];
         struct flow_miss *miss = upcall->flow_miss;
         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
         struct dpif_op *op;
         ovs_be16 flow_vlan_tci;
 
+        if (!upcall->is_valid) {
+            continue;
+        }
+
         /* Save a copy of flow.vlan_tci in case it is changed to
          * generate proper mega flow masks for VLAN splinter flows. */
         flow_vlan_tci = miss->flow.vlan_tci;
@@ -1215,7 +1094,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
 
                 ofpbuf_use_stack(&buf, miss->slow_path_buf,
                                  sizeof miss->slow_path_buf);
-                compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
+                compose_slow_path(udpif, &miss->xout, &miss->flow,
+                                  miss->odp_in_port, &buf);
                 op->u.flow_put.actions = buf.data;
                 op->u.flow_put.actions_len = buf.size;
             }
@@ -1250,11 +1130,16 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *
      * Copy packets before they are modified by execution. */
     if (fail_open) {
-        LIST_FOR_EACH (upcall, list_node, upcalls) {
+        for (i = 0; i < n_upcalls; i++) {
+            struct upcall *upcall = &upcalls[i];
             struct flow_miss *miss = upcall->flow_miss;
             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
             struct ofproto_packet_in *pin;
 
+            if (!upcall->is_valid) {
+                continue;
+            }
+
             pin = xmalloc(sizeof *pin);
             pin->up.packet = xmemdup(packet->data, packet->size);
             pin->up.packet_len = packet->size;
@@ -1280,9 +1165,10 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
     }
     hmap_destroy(&misses);
 
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+    for (i = 0; i < n_upcalls; i++) {
+        if (upcalls[i].is_valid) {
+            upcall_destroy(&upcalls[i], false);
+        }
     }
 }
 
@@ -1393,7 +1279,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
                          xout.odp_actions.size);
     } else {
         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
-        compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
+        compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
     }
 
     if (!ofpbuf_equal(&xout_actions, actions)) {
@@ -1647,16 +1533,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
 
         ds_put_char(&ds, '\n');
-        for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-
-            ovs_mutex_lock(&handler->mutex);
-            ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
-                          handler->name, handler->n_upcalls);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-
-        ds_put_char(&ds, '\n');
         for (i = 0; i < n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index 6b50ce8..10c83bf 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -1491,8 +1491,10 @@ compose_sample_action(const struct xbridge *xbridge,
     actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);
 
     odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
-    pid = dpif_port_get_pid(xbridge->dpif, odp_port, 0);
-    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size, odp_actions);
+    pid = dpif_port_get_pid(xbridge->dpif, odp_port,
+                            flow_hash_5tuple(flow, 0));
+    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
+                                             odp_actions);
 
     nl_msg_end_nested(odp_actions, actions_offset);
     nl_msg_end_nested(odp_actions, sample_offset);
-- 
1.7.9.5




More information about the dev mailing list