[ovs-dev] [PATCH RFC V9] ofproto-dpif-upcall: Remove the dispatcher thread.

Alex Wang alexw at nicira.com
Fri Apr 18 21:13:09 UTC 2014


With the foundation laid in previous commits, this commit
removes the 'dispatcher' thread by allowing 'handler'
threads to read upcalls directly from dpif.

This commit significantly simplifies the flow miss handling
code and brings slight improvement to flow setup rate.

Signed-off-by: Alex Wang <alexw at nicira.com>

---
NOTE:
- somehow this patch bring back the race in unittest.
  few tests failed due to the
  "WARN|dummy at ovs-dummy: failed to flow_get (No such file or directory)"
  "WARN|dummy at ovs-dummy: failed to flow_del (No such file or directory)"

  So, I'm looking for high level review while keep figuring out the race.
---
V8 -> V9:
- remove un-comment like comment.
- move the code of destroy_upcall() to calling line.
- add a new function read_upcalls() for reading and classifying upcalls
  from dpif.

V7 -> V8:
- rebase.

V6 -> V7:
- rebase.

V5 -> V6:
- remove the input argument 'free_upcall' in upcall-destroy().
- destroy all upcalls at the end of handle_upcalls().

V4 -> V5:
- rebase.

V3 -> V4:
- rebase.

V2 -> V3:
- rebase.

PATCH -> V2:
- rebase.
---
 ofproto/ofproto-dpif-upcall.c |  372 +++++++++++++----------------------------
 ofproto/ofproto-dpif-xlate.c  |    6 +-
 2 files changed, 123 insertions(+), 255 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 0d7dd8e..987f1ad 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -45,26 +45,13 @@
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(upcall_queue_overflow);
-
-/* A thread that processes each upcall handed to it by the dispatcher thread,
- * forwards the upcall's packet, and possibly sets up a kernel flow as a
- * cache. */
+/* A thread that reads upcalls from dpif, forwards each upcall's packet,
+ * and possibly sets up a kernel flow as a cache. */
 struct handler {
     struct udpif *udpif;               /* Parent udpif. */
     pthread_t thread;                  /* Thread ID. */
     char *name;                        /* Thread name. */
-
-    struct ovs_mutex mutex;            /* Mutex guarding the following. */
-
-    /* Atomic queue of unprocessed upcalls. */
-    struct list upcalls OVS_GUARDED;
-    size_t n_upcalls OVS_GUARDED;
-
-    bool need_signal;                  /* Only changed by the dispatcher. */
-
-    pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
-                                          'mutex'. */
+    uint32_t handler_id;               /* Handler id. */
 };
 
 /* A thread that processes each kernel flow handed to it by the flow_dumper
@@ -87,14 +74,24 @@ struct revalidator {
 
 /* An upcall handler for ofproto_dpif.
  *
- * udpif has two logically separate pieces:
+ * udpif keeps records of two kind of logically separate units:
+ *
+ * upcall handling
+ * ---------------
+ *
+ *    - An array of 'struct handler's for upcall handling and flow
+ *      installation.
  *
- *    - A "dispatcher" thread that reads upcalls from the kernel and dispatches
- *      them to one of several "handler" threads (see struct handler).
+ * flow revalidation
+ * -----------------
  *
- *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
+ *    - An array of 'struct revalidator's for flow revalidation and
+ *      stats collection.
+ *
+ *    - "flow_dumper" thread that reads the kernel flow table and dispatches
  *      flows to one of several "revalidator" threads (see struct
- *      revalidator). */
+ *      revalidator).
+ * */
 struct udpif {
     struct list list_node;             /* In all_udpifs list. */
 
@@ -103,7 +100,6 @@ struct udpif {
 
     uint32_t secret;                   /* Random seed for upcall hash. */
 
-    pthread_t dispatcher;              /* Dispatcher thread ID. */
     pthread_t flow_dumper;             /* Flow dumper thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
@@ -143,7 +139,7 @@ enum upcall_type {
 };
 
 struct upcall {
-    struct list list_node;          /* For queuing upcalls. */
+    bool is_valid;                  /* If the upcall can be used. */
     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
 
     /* Raw upcall plus data for keeping track of the memory backing it. */
@@ -219,15 +215,16 @@ struct flow_miss {
     bool put;
 };
 
-static void upcall_destroy(struct upcall *);
-
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
-static void recv_upcalls(struct udpif *);
-static void handle_upcalls(struct handler *handler, struct list *upcalls);
+static size_t read_upcalls(struct handler *,
+                           struct upcall upcalls[FLOW_MISS_MAX_BATCH],
+                           struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH],
+                           struct hmap *);
+static void handle_upcalls(struct handler *, struct hmap *, struct upcall *,
+                           size_t n_upcalls);
 static void *udpif_flow_dumper(void *);
-static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
 static uint64_t udpif_get_n_flows(struct udpif *);
@@ -315,9 +312,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
             xpthread_join(handler->thread, NULL);
         }
 
@@ -331,7 +325,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         xpthread_join(udpif->flow_dumper, NULL);
-        xpthread_join(udpif->dispatcher, NULL);
 
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
@@ -353,17 +346,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-            struct upcall *miss, *next;
-
-            LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {
-                list_remove(&miss->list_node);
-                upcall_destroy(miss);
-            }
-            ovs_mutex_destroy(&handler->mutex);
-
-            xpthread_cond_destroy(&handler->wake_cond);
-            free(handler->name);
+            free(udpif->handlers[i].name);
         }
         latch_poll(&udpif->exit_latch);
 
@@ -376,7 +359,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         udpif->n_handlers = 0;
     }
 
-    error = dpif_handlers_set(udpif->dpif, 1);
+    error = dpif_handlers_set(udpif->dpif, n_handlers);
     if (error) {
         VLOG_ERR("failed to configure handlers in dpif %s: %s",
                  dpif_name(udpif->dpif), ovs_strerror(error));
@@ -395,10 +378,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             struct handler *handler = &udpif->handlers[i];
 
             handler->udpif = udpif;
-            list_init(&handler->upcalls);
-            handler->need_signal = false;
-            xpthread_cond_init(&handler->wake_cond, NULL);
-            ovs_mutex_init(&handler->mutex);
+            handler->handler_id = i;
             xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
                             handler);
         }
@@ -416,7 +396,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
                             revalidator);
         }
-        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
         xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 
@@ -461,16 +440,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 {
     size_t i;
 
-    simap_increase(usage, "dispatchers", 1);
     simap_increase(usage, "flow_dumpers", 1);
 
     simap_increase(usage, "handlers", udpif->n_handlers);
-    for (i = 0; i < udpif->n_handlers; i++) {
-        struct handler *handler = &udpif->handlers[i];
-        ovs_mutex_lock(&handler->mutex);
-        simap_increase(usage, "handler upcalls",  handler->n_upcalls);
-        ovs_mutex_unlock(&handler->mutex);
-    }
 
     simap_increase(usage, "revalidators", udpif->n_revalidators);
     for (i = 0; i < udpif->n_revalidators; i++) {
@@ -512,17 +484,6 @@ udpif_flush_all_datapaths(void)
 }
 
 
-/* Destroys and deallocates 'upcall'. */
-static void
-upcall_destroy(struct upcall *upcall)
-{
-    if (upcall) {
-        ofpbuf_uninit(&upcall->dpif_upcall.packet);
-        ofpbuf_uninit(&upcall->upcall_buf);
-        free(upcall);
-    }
-}
-
 static uint64_t
 udpif_get_n_flows(struct udpif *udpif)
 {
@@ -545,24 +506,6 @@ udpif_get_n_flows(struct udpif *udpif)
     return flow_count;
 }
 
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
-static void *
-udpif_dispatcher(void *arg)
-{
-    struct udpif *udpif = arg;
-
-    set_subprogram_name("dispatcher");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        recv_upcalls(udpif);
-        dpif_recv_wait(udpif->dpif, 0);
-        latch_wait(&udpif->exit_latch);
-        poll_block();
-    }
-
-    return NULL;
-}
-
 static void *
 udpif_flow_dumper(void *arg)
 {
@@ -682,43 +625,45 @@ skip:
     return NULL;
 }
 
-/* The miss handler thread is responsible for processing miss upcalls retrieved
- * by the dispatcher thread.  Once finished it passes the processed miss
- * upcalls to ofproto-dpif where they're installed in the datapath. */
+/* The upcall handler thread tries to read a batch of FLOW_MISS_MAX_BATCH
+ * upcalls from dpif, processes the batch and installs corresponding flows
+ * in dpif. */
 static void *
 udpif_upcall_handler(void *arg)
 {
     struct handler *handler = arg;
+    struct udpif *udpif = handler->udpif;
+    struct hmap misses = HMAP_INITIALIZER(&misses);
 
     handler->name = xasprintf("handler_%u", ovsthread_id_self());
     set_subprogram_name("%s", handler->name);
 
     while (!latch_is_set(&handler->udpif->exit_latch)) {
-        struct list misses = LIST_INITIALIZER(&misses);
-        size_t i;
-
-        ovs_mutex_lock(&handler->mutex);
-        /* Must check the 'exit_latch' again to make sure the main thread is
-         * not joining on the handler thread. */
-        if (!handler->n_upcalls
-            && !latch_is_set(&handler->udpif->exit_latch)) {
-            ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
-        }
+        struct upcall upcalls[FLOW_MISS_MAX_BATCH];
+        struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH];
+        struct flow_miss *miss;
+        size_t n_upcalls, i;
+
+        n_upcalls = read_upcalls(handler, upcalls, miss_buf, &misses);
+        if (!n_upcalls) {
+            dpif_recv_wait(udpif->dpif, handler->handler_id);
+            latch_wait(&udpif->exit_latch);
+            poll_block();
+        } else {
+            handle_upcalls(handler, &misses, upcalls, n_upcalls);
 
-        for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
-            if (handler->n_upcalls) {
-                handler->n_upcalls--;
-                list_push_back(&misses, list_pop_front(&handler->upcalls));
-            } else {
-                break;
+            HMAP_FOR_EACH (miss, hmap_node, &misses) {
+                xlate_out_uninit(&miss->xout);
+            }
+            hmap_clear(&misses);
+            for (i = 0; i < n_upcalls; i++) {
+                ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
+                ofpbuf_uninit(&upcalls[i].upcall_buf);
             }
         }
-        ovs_mutex_unlock(&handler->mutex);
-
-        handle_upcalls(handler, &misses);
-
         coverage_clear();
     }
+    hmap_destroy(&misses);
 
     return NULL;
 }
@@ -823,98 +768,12 @@ classify_upcall(const struct upcall *upcall)
     }
 }
 
-static void
-recv_upcalls(struct udpif *udpif)
-{
-    int n;
-
-    for (;;) {
-        uint32_t hash = udpif->secret;
-        struct handler *handler;
-        struct upcall *upcall;
-        size_t n_bytes, left;
-        struct nlattr *nla;
-        int error;
-
-        upcall = xmalloc(sizeof *upcall);
-        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
-                        sizeof upcall->upcall_stub);
-        error = dpif_recv(udpif->dpif, 0, &upcall->dpif_upcall,
-                          &upcall->upcall_buf);
-        if (error) {
-            /* upcall_destroy() can only be called on successfully received
-             * upcalls. */
-            ofpbuf_uninit(&upcall->upcall_buf);
-            free(upcall);
-            break;
-        }
-
-        n_bytes = 0;
-        NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
-                          upcall->dpif_upcall.key_len) {
-            enum ovs_key_attr type = nl_attr_type(nla);
-            if (type == OVS_KEY_ATTR_IN_PORT
-                || type == OVS_KEY_ATTR_TCP
-                || type == OVS_KEY_ATTR_UDP) {
-                if (nl_attr_get_size(nla) == 4) {
-                    hash = mhash_add(hash, nl_attr_get_u32(nla));
-                    n_bytes += 4;
-                } else {
-                    VLOG_WARN_RL(&rl,
-                                 "Netlink attribute with incorrect size.");
-                }
-            }
-        }
-        hash =  mhash_finish(hash, n_bytes);
-
-        handler = &udpif->handlers[hash % udpif->n_handlers];
-
-        ovs_mutex_lock(&handler->mutex);
-        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-            list_push_back(&handler->upcalls, &upcall->list_node);
-            if (handler->n_upcalls == 0) {
-                handler->need_signal = true;
-            }
-            handler->n_upcalls++;
-            if (handler->need_signal &&
-                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
-                handler->need_signal = false;
-                xpthread_cond_signal(&handler->wake_cond);
-            }
-            ovs_mutex_unlock(&handler->mutex);
-            if (!VLOG_DROP_DBG(&rl)) {
-                struct ds ds = DS_EMPTY_INITIALIZER;
-
-                odp_flow_key_format(upcall->dpif_upcall.key,
-                                    upcall->dpif_upcall.key_len,
-                                    &ds);
-                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
-                ds_destroy(&ds);
-            }
-        } else {
-            ovs_mutex_unlock(&handler->mutex);
-            COVERAGE_INC(upcall_queue_overflow);
-            upcall_destroy(upcall);
-        }
-    }
-
-    for (n = 0; n < udpif->n_handlers; ++n) {
-        struct handler *handler = &udpif->handlers[n];
-
-        if (handler->need_signal) {
-            handler->need_signal = false;
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-    }
-}
-
 /* Calculates slow path actions for 'xout'.  'buf' must statically be
  * initialized with at least 128 bytes of space. */
 static void
 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
-                  odp_port_t odp_in_port, struct ofpbuf *buf)
+                  struct flow *flow, odp_port_t odp_in_port,
+                  struct ofpbuf *buf)
 {
     union user_action_cookie cookie;
     odp_port_t port;
@@ -927,7 +786,7 @@ compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
         ? ODPP_NONE
         : odp_in_port;
-    pid = dpif_port_get_pid(udpif->dpif, port, 0);
+    pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
 }
 
@@ -946,26 +805,22 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
     return NULL;
 }
 
-static void
-handle_upcalls(struct handler *handler, struct list *upcalls)
+/* Reads and classifies upcalls. */
+static size_t
+read_upcalls(struct handler *handler,
+             struct upcall upcalls[FLOW_MISS_MAX_BATCH],
+             struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH],
+             struct hmap *misses)
 {
-    struct hmap misses = HMAP_INITIALIZER(&misses);
     struct udpif *udpif = handler->udpif;
+    size_t i;
+    size_t n_misses = 0;
+    size_t n_upcalls = 0;
 
-    struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH];
-    struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
-    struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
-    struct flow_miss *miss, *next_miss;
-    struct upcall *upcall, *next;
-    size_t n_misses, n_ops, i;
-    unsigned int flow_limit;
-    bool fail_open, may_put;
-    enum upcall_type type;
-
-    atomic_read(&udpif->flow_limit, &flow_limit);
-    may_put = udpif_get_n_flows(udpif) < flow_limit;
-
-    /* Extract the flow from each upcall.  Construct in 'misses' a hash table
+    /*
+     * Try reading FLOW_MISS_MAX_BATCH upcalls from dpif.
+     *
+     * Extract the flow from each upcall.  Construct in 'misses' a hash table
      * that maps each unique flow to a 'struct flow_miss'.
      *
      * Most commonly there is a single packet per flow_miss, but there are
@@ -984,19 +839,31 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *     other end of the connection, which gives OVS a chance to set up a
      *     datapath flow.)
      */
-    n_misses = 0;
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
-        struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+    for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
+        struct upcall *upcall = &upcalls[n_upcalls];
         struct flow_miss *miss = &miss_buf[n_misses];
-        struct ofpbuf *packet = &dupcall->packet;
+        struct dpif_upcall *dupcall;
+        struct ofpbuf *packet;
         struct flow_miss *existing_miss;
         struct ofproto_dpif *ofproto;
         struct dpif_sflow *sflow;
         struct dpif_ipfix *ipfix;
-        odp_port_t odp_in_port;
         struct flow flow;
+        enum upcall_type type;
+        odp_port_t odp_in_port;
         int error;
 
+        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
+                        sizeof upcall->upcall_stub);
+        error = dpif_recv(udpif->dpif, handler->handler_id,
+                          &upcall->dpif_upcall, &upcall->upcall_buf);
+        if (error) {
+            ofpbuf_uninit(&upcall->upcall_buf);
+            break;
+        }
+
+        dupcall = &upcall->dpif_upcall;
+        packet = &dupcall->packet;
         error = xlate_receive(udpif->backer, packet, dupcall->key,
                               dupcall->key_len, &flow,
                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
@@ -1014,9 +881,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
                               NULL);
             }
-            list_remove(&upcall->list_node);
-            upcall_destroy(upcall);
-            continue;
+            goto destroy_upcall;
         }
 
         type = classify_upcall(upcall);
@@ -1026,10 +891,10 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
 
             flow_extract(packet, &md, &miss->flow);
             hash = flow_hash(&miss->flow, 0);
-            existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
+            existing_miss = flow_miss_find(misses, ofproto, &miss->flow,
                                            hash);
             if (!existing_miss) {
-                hmap_insert(&misses, &miss->hmap_node, hash);
+                hmap_insert(misses, &miss->hmap_node, hash);
                 miss->ofproto = ofproto;
                 miss->key = dupcall->key;
                 miss->key_len = dupcall->key_len;
@@ -1040,7 +905,6 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
                 miss->stats.tcp_flags = 0;
                 miss->odp_in_port = odp_in_port;
                 miss->put = false;
-
                 n_misses++;
             } else {
                 miss = existing_miss;
@@ -1050,6 +914,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
             miss->stats.n_packets++;
 
             upcall->flow_miss = miss;
+            n_upcalls++;
             continue;
         }
 
@@ -1096,10 +961,29 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         dpif_ipfix_unref(ipfix);
         dpif_sflow_unref(sflow);
 
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+destroy_upcall:
+        ofpbuf_uninit(&upcall->dpif_upcall.packet);
+        ofpbuf_uninit(&upcall->upcall_buf);
     }
 
+    return n_upcalls;
+}
+
+static void
+handle_upcalls(struct handler *handler, struct hmap *misses,
+               struct upcall *upcalls, size_t n_upcalls)
+{
+    struct udpif *udpif = handler->udpif;
+    struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
+    struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
+    struct flow_miss *miss;
+    size_t n_ops, i;
+    unsigned int flow_limit;
+    bool fail_open, may_put;
+
+    atomic_read(&udpif->flow_limit, &flow_limit);
+    may_put = udpif_get_n_flows(udpif) < flow_limit;
+
     /* Initialize each 'struct flow_miss's ->xout.
      *
      * We do this per-flow_miss rather than per-packet because, most commonly,
@@ -1108,7 +992,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      * We can't do this in the previous loop because we need the TCP flags for
      * all the packets in each miss. */
     fail_open = false;
-    HMAP_FOR_EACH (miss, hmap_node, &misses) {
+    HMAP_FOR_EACH (miss, hmap_node, misses) {
         struct xlate_in xin;
 
         xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL,
@@ -1140,7 +1024,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      * The loop fills 'ops' with an array of operations to execute in the
      * datapath. */
     n_ops = 0;
-    LIST_FOR_EACH (upcall, list_node, upcalls) {
+    for (i = 0; i < n_upcalls; i++) {
+        struct upcall *upcall = &upcalls[i];
         struct flow_miss *miss = upcall->flow_miss;
         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
         struct dpif_op *op;
@@ -1219,7 +1104,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
 
                 ofpbuf_use_stack(&buf, miss->slow_path_buf,
                                  sizeof miss->slow_path_buf);
-                compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
+                compose_slow_path(udpif, &miss->xout, &miss->flow,
+                                  miss->odp_in_port, &buf);
                 op->u.flow_put.actions = ofpbuf_data(&buf);
                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
             }
@@ -1254,7 +1140,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *
      * Copy packets before they are modified by execution. */
     if (fail_open) {
-        LIST_FOR_EACH (upcall, list_node, upcalls) {
+        for (i = 0; i < n_upcalls; i++) {
+            struct upcall *upcall = &upcalls[i];
             struct flow_miss *miss = upcall->flow_miss;
             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
             struct ofproto_packet_in *pin;
@@ -1277,17 +1164,6 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         opsp[i] = &ops[i];
     }
     dpif_operate(udpif->dpif, opsp, n_ops);
-
-    HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &misses) {
-        hmap_remove(&misses, &miss->hmap_node);
-        xlate_out_uninit(&miss->xout);
-    }
-    hmap_destroy(&misses);
-
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
-    }
 }
 
 static struct udpif_key *
@@ -1450,7 +1326,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
                          ofpbuf_size(&xout.odp_actions));
     } else {
         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
-        compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
+        compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
     }
 
     if (!ofpbuf_equal(&xout_actions, actions)) {
@@ -1718,16 +1594,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
 
         ds_put_char(&ds, '\n');
-        for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-
-            ovs_mutex_lock(&handler->mutex);
-            ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
-                          handler->name, handler->n_upcalls);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-
-        ds_put_char(&ds, '\n');
         for (i = 0; i < n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index 78813dd..e4e8246 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -1608,8 +1608,10 @@ compose_sample_action(const struct xbridge *xbridge,
     actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);
 
     odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
-    pid = dpif_port_get_pid(xbridge->dpif, odp_port, 0);
-    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size, odp_actions);
+    pid = dpif_port_get_pid(xbridge->dpif, odp_port,
+                            flow_hash_5tuple(flow, 0));
+    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
+                                             odp_actions);
 
     nl_msg_end_nested(odp_actions, actions_offset);
     nl_msg_end_nested(odp_actions, sample_offset);
-- 
1.7.9.5




More information about the dev mailing list