[ovs-dev] [RFCv3 05/13] upcall: Create ukeys in handler threads.

Joe Stringer joestringer at nicira.com
Tue Sep 2 10:48:43 UTC 2014


Currently, when a revalidator thread first dumps a flow, it creates a
'udpif_key' object and caches a copy of a kernel flow key. This allows
us to perform lookups in the classifier to attribute stats and validate
the correctness of the datapath flow.

This patch sets up this cache from the handler threads, during flow
setup. This allows future patches to reduce the cost of flow dumping.

Signed-off-by: Joe Stringer <joestringer at nicira.com>
---
v3: Rebase.
v2: Rebase.
RFC: First post.
---
 ofproto/ofproto-dpif-upcall.c |  334 ++++++++++++++++++++++++++++-------------
 1 file changed, 227 insertions(+), 107 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 4791193..76a6a70 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -46,7 +46,9 @@
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(upcall_duplicate_flow);
+COVERAGE_DEFINE(handler_install_conflict);
+COVERAGE_DEFINE(upcall_ukey_contention);
+COVERAGE_DEFINE(revalidate_duplicate_flow);
 COVERAGE_DEFINE(revalidate_missed_dp_flow);
 
 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
@@ -108,7 +110,6 @@ struct udpif {
 
     /* Revalidation. */
     struct seq *reval_seq;             /* Incremented to force revalidation. */
-    bool need_revalidate;              /* As indicated by 'reval_seq'. */
     bool reval_exit;                   /* Set by leader on 'exit_latch. */
     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
@@ -172,6 +173,10 @@ struct upcall {
     bool vsp_adjusted;             /* 'packet' and 'flow' were adjusted for
                                       VLAN splinters if true. */
 
+    struct udpif_key *ukey;        /* Revalidator flow cache. */
+    uint64_t dump_seq;             /* udpif->dump_seq at translation time. */
+    uint64_t reval_seq;            /* udpif->reval_seq at translation time. */
+
     /* Not used by the upcall callback interface. */
     const struct nlattr *key;      /* Datapath flow key. */
     size_t key_len;                /* Datapath flow key length. */
@@ -180,9 +185,10 @@ struct upcall {
 
 /* 'udpif_key's are responsible for tracking the little bit of state udpif
  * needs to do flow expiration which can't be pulled directly from the
- * datapath.  They may be created or maintained by any revalidator during
- * the dump phase, but are owned by a single revalidator, and are destroyed
- * by that revalidator during the garbage-collection phase.
+ * datapath.  They may be created by any handler thread at any time, and read
+ * by any revalidator during the dump phase. They are however each owned by a
+ * single revalidator which takes care of destroying them during the
+ * garbage-collection phase.
  *
  * The mutex within the ukey protects some members of the ukey. The ukey
  * itself is protected by RCU and is held within a umap in the parent udpif.
@@ -201,12 +207,14 @@ struct udpif_key {
     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
     long long int created OVS_GUARDED;        /* Estimate of creation time. */
     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
+    uint64_t reval_seq OVS_GUARDED;           /* Tracks udpif->reval_seq. */
     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
                                                  once. */
 
     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
                                                * are affected by this ukey.
                                                * Used for stats and learning.*/
+
     union {
         struct odputil_keybuf key_buf;        /* Memory for 'key'. */
         struct nlattr key_buf_nla;
@@ -247,14 +255,14 @@ static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
                                      const char *argv[], void *aux);
 
-static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
-                                     long long int used, uint32_t hash);
-static struct udpif_key *ukey_lookup(struct udpif *udpif,
-                                     const struct nlattr *key, size_t key_len,
-                                     uint32_t hash);
-static bool ukey_acquire(struct udpif *udpif, const struct nlattr *key,
-                         size_t key_len, long long int used,
-                         struct udpif_key **result);
+static struct udpif_key *ukey_new(const struct udpif *, struct upcall *);
+static bool ukey_install_start(struct udpif *, struct udpif_key *ukey);
+static void ukey_install_finish(struct udpif_key *ukey, int error);
+static bool ukey_install(struct udpif *udpif, struct udpif_key *ukey);
+static struct udpif_key *ukey_lookup(struct udpif *udpif, uint32_t hash,
+                                     const struct nlattr *key, size_t key_len);
+static int ukey_acquire(struct udpif *, const struct dpif_flow *,
+                        struct udpif_key **result, int *error);
 static void ukey_delete__(struct udpif_key *);
 static void ukey_delete(struct umap *, struct udpif_key *);
 static enum upcall_type classify_upcall(enum dpif_upcall_type type,
@@ -571,7 +579,9 @@ udpif_upcall_handler(void *arg)
     struct udpif *udpif = handler->udpif;
 
     while (!latch_is_set(&handler->udpif->exit_latch)) {
-        if (!recv_upcalls(handler)) {
+        if (recv_upcalls(handler)) {
+            ovsrcu_quiesce();
+        } else {
             dpif_recv_wait(udpif->dpif, handler->handler_id);
             latch_wait(&udpif->exit_latch);
             poll_block();
@@ -687,7 +697,6 @@ udpif_revalidator(void *arg)
             uint64_t reval_seq;
 
             reval_seq = seq_read(udpif->reval_seq);
-            udpif->need_revalidate = last_reval_seq != reval_seq;
             last_reval_seq = reval_seq;
 
             n_flows = udpif_get_n_flows(udpif);
@@ -842,6 +851,7 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
 {
     int error;
 
+    memset(upcall, 0, sizeof *upcall);
     error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
                          &upcall->sflow, NULL, &upcall->in_port);
     if (error) {
@@ -854,14 +864,6 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
     upcall->userdata = userdata;
     ofpbuf_init(&upcall->put_actions, 0);
 
-    upcall->xout_initialized = false;
-    upcall->vsp_adjusted = false;
-
-    upcall->key = NULL;
-    upcall->key_len = 0;
-
-    upcall->out_tun_key = NULL;
-
     return 0;
 }
 
@@ -889,6 +891,8 @@ upcall_xlate(struct udpif *udpif, struct upcall *upcall,
          * with pushing its stats eventually. */
     }
 
+    upcall->dump_seq = seq_read(udpif->dump_seq);
+    upcall->reval_seq = seq_read(udpif->reval_seq);
     xlate_actions(&xin, &upcall->xout);
     upcall->xout_initialized = true;
 
@@ -927,6 +931,8 @@ upcall_xlate(struct udpif *udpif, struct upcall *upcall,
                           upcall->flow->in_port.odp_port,
                           &upcall->put_actions);
     }
+
+    upcall->ukey = ukey_new(udpif, upcall);
 }
 
 static void
@@ -937,6 +943,7 @@ upcall_uninit(struct upcall *upcall)
             xlate_out_uninit(&upcall->xout);
         }
         ofpbuf_uninit(&upcall->put_actions);
+        ukey_delete__(upcall->ukey);
     }
 }
 
@@ -983,9 +990,17 @@ upcall_cb(const struct ofpbuf *packet, const struct flow *flow,
 
     if (udpif_get_n_flows(udpif) >= flow_limit) {
         error = ENOSPC;
+        goto out;
+    }
+
+    if (!ukey_install(udpif, upcall.ukey)) {
+        error = ENOSPC;
     }
 
 out:
+    if (!error) {
+        upcall.ukey = NULL;
+    }
     upcall_uninit(&upcall);
     return error;
 }
@@ -1067,7 +1082,7 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
     struct ukey_op ops[UPCALL_MAX_BATCH * 2];
     unsigned int flow_limit;
-    size_t n_ops, i;
+    size_t n_ops, n_opsp, i;
     bool may_put;
     bool megaflow;
 
@@ -1130,6 +1145,8 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
             }
 
             op = &ops[n_ops++];
+            op->ukey = upcall->ukey;
+            upcall->ukey = NULL;
             op->dop.type = DPIF_OP_FLOW_PUT;
             op->dop.u.flow_put.flags = DPIF_FP_CREATE;
             op->dop.u.flow_put.key = upcall->key;
@@ -1143,6 +1160,7 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
 
         if (ofpbuf_size(upcall->xout.odp_actions)) {
             op = &ops[n_ops++];
+            op->ukey = NULL;
             op->dop.type = DPIF_OP_EXECUTE;
             op->dop.u.execute.packet = CONST_CAST(struct ofpbuf *, packet);
             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
@@ -1153,16 +1171,37 @@ handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
         }
     }
 
-    /* Execute batch. */
+    /* Execute batch.
+     *
+     * We install ukeys before installing the flows, locking them for exclusive
+     * access by this thread for the period of installation. This ensures that
+     * other threads won't attempt to delete the flows as we are creating them.
+     */
+    n_opsp = 0;
     for (i = 0; i < n_ops; i++) {
-        opsp[i] = &ops[i].dop;
+        struct udpif_key *ukey = ops[i].ukey;
+
+        if (ukey) {
+            /* If we can't install the ukey, don't install the flow. */
+            if (!ukey_install_start(udpif, ukey)) {
+                ukey_delete__(ukey);
+                ops[i].ukey = NULL;
+                continue;
+            }
+        }
+        opsp[n_opsp++] = &ops[i].dop;
+    }
+    dpif_operate(udpif->dpif, opsp, n_opsp);
+    for (i = 0; i < n_ops; i++) {
+        if (ops[i].ukey) {
+            ukey_install_finish(ops[i].ukey, ops[i].dop.error);
+        }
     }
-    dpif_operate(udpif->dpif, opsp, n_ops);
 }
 
 static struct udpif_key *
-ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
-            uint32_t hash)
+ukey_lookup(struct udpif *udpif, uint32_t hash, const struct nlattr *key,
+            size_t key_len)
 {
     struct udpif_key *ukey;
     struct cmap *cmap = &udpif->ukeys[hash % N_UMAPS].cmap;
@@ -1175,74 +1214,132 @@ ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
     return NULL;
 }
 
-/* Creates a ukey for 'key' and 'key_len', returning it with ukey->mutex in
- * a locked state. */
 static struct udpif_key *
-ukey_create(const struct nlattr *key, size_t key_len, long long int used,
-            uint32_t hash)
+ukey_new(const struct udpif *udpif, struct upcall *upcall)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    struct udpif_key *ukey = xmalloc(sizeof *ukey);
+    struct udpif_key *ukey = xzalloc(sizeof *ukey);
+    struct ofpbuf key;
+    bool recirc;
+
+    recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
+    ofpbuf_use_stack(&key, &ukey->key_buf, sizeof ukey->key_buf);
+    if (upcall->key_len) {
+        ofpbuf_put(&key, upcall->key, upcall->key_len);
+    } else {
+        odp_flow_key_from_flow(&key, upcall->flow, &upcall->xout.wc.masks,
+                               upcall->flow->in_port.odp_port, recirc);
+    }
+
+    ukey->key = ofpbuf_data(&key);
+    ukey->key_len = ofpbuf_size(&key);
+    ukey->hash = hash_bytes(ukey->key, ukey->key_len, udpif->secret);
 
     ovs_mutex_init(&ukey->mutex);
-    ukey->key = &ukey->key_buf_nla;
-    memcpy(&ukey->key_buf, key, key_len);
-    ukey->key_len = key_len;
-
-    ovs_mutex_lock(&ukey->mutex);
-    ukey->hash = hash;
-    ukey->dump_seq = 0;
-    ukey->flow_exists = true;
-    ukey->created = used ? used : time_msec();
-    memset(&ukey->stats, 0, sizeof ukey->stats);
-    ukey->xcache = NULL;
+    ukey->dump_seq = upcall->dump_seq;
+    ukey->reval_seq = upcall->reval_seq;
+    ukey->flow_exists = false;
+    ukey->created = time_msec();
 
     return ukey;
 }
 
-/* Searches for a ukey in 'udpif->ukeys' that matches 'key' and 'key_len' and
- * attempts to lock the ukey. If the ukey does not exist, create it.
+/* Attempts to insert a ukey into the shared ukey maps.
  *
- * Returns true on success, setting *result to the matching ukey and returning
- * it in a locked state. Otherwise, returns false and clears *result. */
+ * On success, returns true, installs the ukey and returns it in a locked
+ * state. Otherwise, returns false. */
 static bool
-ukey_acquire(struct udpif *udpif, const struct nlattr *key, size_t key_len,
-             long long int used, struct udpif_key **result)
-    OVS_TRY_LOCK(true, (*result)->mutex)
+ukey_install_start(struct udpif *udpif, struct udpif_key *ukey)
+    OVS_TRY_LOCK(true, ukey->mutex)
 {
-    struct udpif_key *ukey;
-    uint32_t hash, idx;
+    struct umap *umap;
+    uint32_t idx;
     bool locked = false;
 
-    hash = hash_bytes(key, key_len, udpif->secret);
-    idx = hash % N_UMAPS;
-
-    ovs_mutex_lock(&udpif->ukeys[idx].mutex);
-    ukey = ukey_lookup(udpif, key, key_len, hash);
-    if (!ukey) {
-        ukey = ukey_create(key, key_len, used, hash);
-        cmap_insert(&udpif->ukeys[idx].cmap, &ukey->cmap_node, hash);
-        locked = true;
-    } else if (!ovs_mutex_trylock(&ukey->mutex)) {
+    idx = ukey->hash % N_UMAPS;
+    umap = &udpif->ukeys[idx];
+    ovs_mutex_lock(&umap->mutex);
+    if (!ukey_lookup(udpif, ukey->hash, ukey->key, ukey->key_len)) {
+        ovs_mutex_lock(&ukey->mutex);
+        cmap_insert(&umap->cmap, &ukey->cmap_node, ukey->hash);
         locked = true;
+    } else {
+        COVERAGE_INC(handler_install_conflict);
+        VLOG_DBG("Failed to ukey_install_start(): 0x%"PRIx32, ukey->hash);
     }
-    ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+    ovs_mutex_unlock(&umap->mutex);
 
-    if (locked) {
-        *result = ukey;
+    return locked;
+}
+
+static void
+ukey_install_finish(struct udpif_key *ukey, int error)
+    OVS_RELEASES(ukey->mutex)
+{
+    if (!error) {
+        ukey->flow_exists = true;
+    }
+    ovs_mutex_unlock(&ukey->mutex);
+}
+
+static bool
+ukey_install(struct udpif *udpif, struct udpif_key *ukey)
+{
+    if (!ukey_install_start(udpif, ukey)) {
+        return false;
+    }
+
+    ukey_install_finish(ukey, 0);
+    return true;
+}
+
+/* Searches for a ukey in 'udpif->ukeys' that matches 'flow' and attempts to
+ * lock the ukey.
+ *
+ * Returns 0 on success, setting *result to the matching ukey and returning it
+ * in a locked state. Otherwise, returns an errno and clears *result.
+ * + EBUSY indicates that another thread is handling this flow.
+ * + ENOENT indicates that a corresponding ukey cannot be found.
+ *
+ * *error is an output parameter provided to appease the threadsafety analyser,
+ * and its value matches the return value. */
+static int
+ukey_acquire(struct udpif *udpif, const struct dpif_flow *flow,
+             struct udpif_key **result, int *error)
+    OVS_TRY_LOCK(0, (*result)->mutex)
+{
+    struct udpif_key *ukey;
+    int retval;
+    uint32_t hash;
+
+    hash = hash_bytes(flow->key, flow->key_len, udpif->secret);
+    ukey = ukey_lookup(udpif, hash, flow->key, flow->key_len);
+    if (ukey) {
+        retval = ovs_mutex_trylock(&ukey->mutex);
     } else {
+        VLOG_INFO("Dumped flow from datapath with no corresponding ukey: 0x%"
+                  PRIx32, hash);
+        retval = ENOENT;
+    }
+
+    *error = retval;
+    if (retval) {
         *result = NULL;
+    } else {
+        *result = ukey;
     }
-    return locked;
+    return retval;
 }
 
 static void
 ukey_delete__(struct udpif_key *ukey)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    xlate_cache_delete(ukey->xcache);
-    ovs_mutex_destroy(&ukey->mutex);
-    free(ukey);
+    if (ukey) {
+        xlate_cache_delete(ukey->xcache);
+        ovs_mutex_destroy(&ukey->mutex);
+        free(ukey);
+    }
 }
 
 static void
@@ -1288,7 +1385,7 @@ should_revalidate(const struct udpif *udpif, uint64_t packets,
 
 static bool
 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
-                const struct dpif_flow *f)
+                const struct dpif_flow *f, uint64_t reval_seq)
     OVS_REQUIRES(ukey->mutex)
 {
     uint64_t slow_path_buf[128 / 8];
@@ -1305,11 +1402,13 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
     int error;
     size_t i;
     bool ok;
+    bool need_revalidate;
 
     ok = false;
     xoutp = NULL;
     netflow = NULL;
 
+    need_revalidate = (ukey->reval_seq != reval_seq);
     last_used = ukey->stats.used;
     push.used = f->stats.used;
     push.tcp_flags = f->stats.tcp_flags;
@@ -1320,7 +1419,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
                     ? f->stats.n_bytes - ukey->stats.n_bytes
                     : 0);
 
-    if (udpif->need_revalidate && last_used
+    if (need_revalidate && last_used
         && !should_revalidate(udpif, push.n_packets, last_used)) {
         ok = false;
         goto exit;
@@ -1328,12 +1427,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
 
     /* We will push the stats, so update the ukey stats cache. */
     ukey->stats = f->stats;
-    if (!push.n_packets && !udpif->need_revalidate) {
+    if (!push.n_packets && !need_revalidate) {
         ok = true;
         goto exit;
     }
 
-    if (ukey->xcache && !udpif->need_revalidate) {
+    if (ukey->xcache && !need_revalidate) {
         xlate_push_stats(ukey->xcache, &push);
         ok = true;
         goto exit;
@@ -1350,7 +1449,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
         goto exit;
     }
 
-    if (udpif->need_revalidate) {
+    if (need_revalidate) {
         xlate_cache_clear(ukey->xcache);
     }
     if (!ukey->xcache) {
@@ -1364,11 +1463,11 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
         xin.may_learn = true;
     }
     xin.xcache = ukey->xcache;
-    xin.skip_wildcards = !udpif->need_revalidate;
+    xin.skip_wildcards = !need_revalidate;
     xlate_actions(&xin, &xout);
     xoutp = &xout;
 
-    if (!udpif->need_revalidate) {
+    if (!need_revalidate) {
         ok = true;
         goto exit;
     }
@@ -1407,8 +1506,14 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
     ok = true;
 
 exit:
-    if (netflow && !ok) {
-        netflow_flow_clear(netflow, &flow);
+    if (ok) {
+        ukey->reval_seq = reval_seq;
+    }
+    if (netflow) {
+        if (!ok) {
+            netflow_flow_clear(netflow, &flow);
+        }
+        netflow_unref(netflow);
     }
     xlate_out_uninit(xoutp);
     return ok;
@@ -1444,12 +1549,16 @@ push_ukey_ops__(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
         stats = op->dop.u.flow_del.stats;
         push = &push_buf;
 
-        ovs_mutex_lock(&op->ukey->mutex);
-        push->used = MAX(stats->used, op->ukey->stats.used);
-        push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
-        push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
-        push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
-        ovs_mutex_unlock(&op->ukey->mutex);
+        if (op->ukey) {
+            ovs_mutex_lock(&op->ukey->mutex);
+            push->used = MAX(stats->used, op->ukey->stats.used);
+            push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
+            push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
+            push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+            ovs_mutex_unlock(&op->ukey->mutex);
+        } else {
+            push = stats;
+        }
 
         if (push->n_packets || netflow_exists()) {
             struct ofproto_dpif *ofproto;
@@ -1458,13 +1567,15 @@ push_ukey_ops__(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
             struct flow flow;
             int error;
 
-            ovs_mutex_lock(&op->ukey->mutex);
-            if (op->ukey->xcache) {
-                xlate_push_stats(op->ukey->xcache, push);
+            if (op->ukey) {
+                ovs_mutex_lock(&op->ukey->mutex);
+                if (op->ukey->xcache) {
+                    xlate_push_stats(op->ukey->xcache, push);
+                    ovs_mutex_unlock(&op->ukey->mutex);
+                    continue;
+                }
                 ovs_mutex_unlock(&op->ukey->mutex);
-                continue;
             }
-            ovs_mutex_unlock(&op->ukey->mutex);
 
             if (odp_flow_key_to_flow(op->dop.u.flow_del.key,
                                      op->dop.u.flow_del.key_len, &flow)
@@ -1511,10 +1622,11 @@ revalidate(struct revalidator *revalidator)
 {
     struct udpif *udpif = revalidator->udpif;
     struct dpif_flow_dump_thread *dump_thread;
-    uint64_t dump_seq;
+    uint64_t dump_seq, reval_seq;
     unsigned int flow_limit;
 
     dump_seq = seq_read(udpif->dump_seq);
+    reval_seq = seq_read(udpif->reval_seq);
     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
     for (;;) {
@@ -1557,12 +1669,16 @@ revalidate(struct revalidator *revalidator)
             long long int used = f->stats.used;
             struct udpif_key *ukey;
             bool already_dumped, keep;
+            int error;
 
-            if (!ukey_acquire(udpif, f->key, f->key_len, used, &ukey)) {
-                /* We couldn't acquire the ukey. This means that
-                 * another revalidator is processing this flow
-                 * concurrently, so don't bother processing it. */
-                COVERAGE_INC(upcall_duplicate_flow);
+            if (ukey_acquire(udpif, f, &ukey, &error)) {
+                if (error == EBUSY) {
+                    /* Another thread is processing this flow, so don't bother
+                     * processing it.*/
+                    COVERAGE_INC(upcall_ukey_contention);
+                } else {
+                    delete_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
+                }
                 continue;
             }
 
@@ -1570,7 +1686,7 @@ revalidate(struct revalidator *revalidator)
             if (already_dumped) {
                 /* The flow has already been dumped and handled by another
                  * revalidator during this flow dump operation. Skip it. */
-                COVERAGE_INC(upcall_duplicate_flow);
+                COVERAGE_INC(revalidate_duplicate_flow);
                 ovs_mutex_unlock(&ukey->mutex);
                 continue;
             }
@@ -1581,7 +1697,7 @@ revalidate(struct revalidator *revalidator)
             if (kill_them_all || (used && used < now - max_idle)) {
                 keep = false;
             } else {
-                keep = revalidate_ukey(udpif, ukey, f);
+                keep = revalidate_ukey(udpif, ukey, f, reval_seq);
             }
             ukey->dump_seq = dump_seq;
             ukey->flow_exists = keep;
@@ -1600,12 +1716,10 @@ revalidate(struct revalidator *revalidator)
     dpif_flow_dump_thread_destroy(dump_thread);
 }
 
-/* Called with exclusive access to 'revalidator' and 'ukey'. */
 static bool
-handle_missed_revalidation(struct revalidator *revalidator,
+handle_missed_revalidation(struct udpif *udpif, uint64_t reval_seq,
                            struct udpif_key *ukey)
 {
-    struct udpif *udpif = revalidator->udpif;
     struct dpif_flow flow;
     struct ofpbuf buf;
     uint64_t stub[DPIF_FLOW_BUFSIZE / 8];
@@ -1616,7 +1730,7 @@ handle_missed_revalidation(struct revalidator *revalidator,
     ofpbuf_use_stub(&buf, &stub, sizeof stub);
     if (!dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &buf, &flow)) {
         ovs_mutex_lock(&ukey->mutex);
-        keep = revalidate_ukey(udpif, ukey, &flow);
+        keep = revalidate_ukey(udpif, ukey, &flow, reval_seq);
         ovs_mutex_unlock(&ukey->mutex);
     }
     ofpbuf_uninit(&buf);
@@ -1628,11 +1742,12 @@ static void
 revalidator_sweep__(struct revalidator *revalidator, bool purge)
 {
     struct udpif *udpif;
-    uint64_t dump_seq;
+    uint64_t dump_seq, reval_seq;
     int slice;
 
     udpif = revalidator->udpif;
     dump_seq = seq_read(udpif->dump_seq);
+    reval_seq = seq_read(udpif->reval_seq);
     slice = revalidator - udpif->revalidators;
     ovs_assert(slice < udpif->n_revalidators);
 
@@ -1645,16 +1760,21 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
             bool flow_exists, seq_mismatch;
 
-            ovs_mutex_lock(&ukey->mutex);
+            /* Handler threads could be holding a ukey lock while it installs a
+             * new flow, so don't hang around waiting for access to it. */
+            if (ovs_mutex_trylock(&ukey->mutex)) {
+                continue;
+            }
             flow_exists = ukey->flow_exists;
             seq_mismatch = (ukey->dump_seq != dump_seq
-                            && revalidator->udpif->need_revalidate);
+                            && ukey->reval_seq != reval_seq);
             ovs_mutex_unlock(&ukey->mutex);
 
             if (flow_exists
                 && (purge
                     || (seq_mismatch
-                        && !handle_missed_revalidation(revalidator, ukey)))) {
+                        && !handle_missed_revalidation(udpif, reval_seq,
+                                                       ukey)))) {
                 struct ukey_op *op = &ops[n_ops++];
 
                 delete_op_init(op, ukey->key, ukey->key_len, ukey);
-- 
1.7.10.4




More information about the dev mailing list