[ovs-dev] [PATCHv12] ofproto-dpif-upcall: Remove the flow_dumper thread.

Joe Stringer joe at wand.net.nz
Mon Apr 21 22:44:15 UTC 2014


From: Ethan Jackson <ethan at nicira.com>

Previously, we had a separate flow_dumper thread that fetched flows from
the datapath to distribute to revalidator threads. This patch takes the
logic for dumping and pushes it into the revalidator threads, resulting
in simpler code with similar performance to the current code.

One thread, the "leader", is responsible for beginning and ending each
flow dump, maintaining the flow_limit, and checking whether the
revalidator threads need to exit. All revalidator threads dump,
revalidate, delete datapath flows and garbage collect ukeys.

Co-authored-by: Joe Stringer <joestringer at nicira.com>
Signed-off-by: Joe Stringer <joestringer at nicira.com>
---
v12: Rebase.
v11: Rebase.
v10: Minor whitespace and documentation fixups.
v9: Update testsuite for also printing actions on flow_dump.
v8: Rebase.
v7: Add back logic (present in master) that deletes all flows older than
     100ms if we are currently exceeding the flow limit.
    Rebase.
v6: Shift ukeys hmaps from revalidators into udpif.
    Documentation tidyups.
v5: Handle ukey creation race.
    Style fixes.
v4: Rebase.
v3: First post.
---
 ofproto/ofproto-dpif-upcall.c |  649 +++++++++++++++++++----------------------
 tests/ofproto-dpif.at         |    8 +-
 2 files changed, 309 insertions(+), 348 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 4ee5bf5..28a1f27 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -54,22 +54,15 @@ struct handler {
     uint32_t handler_id;               /* Handler id. */
 };
 
-/* A thread that processes each kernel flow handed to it by the flow_dumper
- * thread, updates OpenFlow statistics, and updates or removes the kernel flow
- * as necessary. */
+/* A thread that processes datapath flows, updates OpenFlow statistics, and
+ * updates or removes them if necessary. */
 struct revalidator {
     struct udpif *udpif;               /* Parent udpif. */
     char *name;                        /* Thread name. */
 
     pthread_t thread;                  /* Thread ID. */
-    struct hmap ukeys;                 /* Datapath flow keys. */
-
-    uint64_t dump_seq;
-
-    struct ovs_mutex mutex;            /* Mutex guarding the following. */
-    pthread_cond_t wake_cond;
-    struct list udumps OVS_GUARDED;    /* Unprocessed udumps. */
-    size_t n_udumps OVS_GUARDED;       /* Number of unprocessed udumps. */
+    struct hmap *ukeys;                /* Points into udpif->ukeys for this
+                                          revalidator. Used for GC phase. */
 };
 
 /* An upcall handler for ofproto_dpif.
@@ -85,13 +78,9 @@ struct revalidator {
  * flow revalidation
  * -----------------
  *
- *    - An array of 'struct revalidator's for flow revalidation and
- *      stats collection.
- *
- *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
- *      flows to one of several "revalidator" threads (see struct
- *      revalidator).
- * */
+ *    - Revalidation threads which read the datapath flow table and maintains
+ *      them.
+ */
 struct udpif {
     struct list list_node;             /* In all_udpifs list. */
 
@@ -100,22 +89,30 @@ struct udpif {
 
     uint32_t secret;                   /* Random seed for upcall hash. */
 
-    pthread_t flow_dumper;             /* Flow dumper thread ID. */
-
     struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
 
     struct revalidator *revalidators;  /* Flow revalidators. */
     size_t n_revalidators;
 
-    uint64_t last_reval_seq;           /* 'reval_seq' at last revalidation. */
-    struct seq *reval_seq;             /* Incremented to force revalidation. */
-
-    struct seq *dump_seq;              /* Increments each dump iteration. */
-
     struct latch exit_latch;           /* Tells child threads to exit. */
 
+    /* Revalidation. */
+    struct seq *reval_seq;             /* Incremented to force revalidation. */
+    bool need_revalidate;              /* As indicated by 'reval_seq'. */
+    bool reval_exit;                   /* Set by leader on 'exit_latch. */
+    pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
+    struct dpif_flow_dump dump;        /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
+    struct seq *dump_seq;              /* Increments each dump iteration. */
+
+    /* During the flow dump phase, revalidators insert into these with a random
+     * distribution. During the garbage collection phase, each revalidator
+     * takes care of garbage collecting one of these hmaps. */
+    struct {
+        struct ovs_mutex mutex;        /* Guards the following. */
+        struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
+    } *ukeys;
 
     /* Datapath flow statistics. */
     unsigned int max_n_flows;
@@ -149,44 +146,33 @@ struct upcall {
 
 /* 'udpif_key's are responsible for tracking the little bit of state udpif
  * needs to do flow expiration which can't be pulled directly from the
- * datapath.  They are owned, created by, maintained, and destroyed by a single
- * revalidator making them easy to efficiently handle with multiple threads. */
+ * datapath.  They may be created or maintained by any revalidator during
+ * the dump phase, but are owned by a single revalidator, and are destroyed
+ * by that revalidator during the garbage-collection phase.
+ *
+ * While some elements of a udpif_key are protected by a mutex, the ukey itself
+ * is not.  Therefore it is not safe to destroy a udpif_key except when all
+ * revalidators are in garbage collection phase, or they aren't running. */
 struct udpif_key {
     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
 
-    struct nlattr *key;            /* Datapath flow key. */
+    /* These elements are read only once created, and therefore aren't
+     * protected by a mutex. */
+    const struct nlattr *key;      /* Datapath flow key. */
     size_t key_len;                /* Length of 'key'. */
 
-    struct dpif_flow_stats stats;  /* Stats at most recent flow dump. */
-    long long int created;         /* Estimation of creation time. */
-
-    bool mark;                     /* Used by mark and sweep GC algorithm. */
-
-    struct odputil_keybuf key_buf; /* Memory for 'key'. */
-    struct xlate_cache *xcache;    /* Cache for xlate entries that
-                                    * are affected by this ukey.
-                                    * Used for stats and learning.*/
-};
-
-/* 'udpif_flow_dump's hold the state associated with one iteration in a flow
- * dump operation.  This is created by the flow_dumper thread and handed to the
- * appropriate revalidator thread to be processed. */
-struct udpif_flow_dump {
-    struct list list_node;
-
-    struct nlattr *key;            /* Datapath flow key. */
-    size_t key_len;                /* Length of 'key'. */
-    uint32_t key_hash;             /* Hash of 'key'. */
-
-    struct odputil_keybuf mask_buf;
-    struct nlattr *mask;           /* Datapath mask for 'key'. */
-    size_t mask_len;               /* Length of 'mask'. */
-
-    struct dpif_flow_stats stats;  /* Stats pulled from the datapath. */
-
-    bool need_revalidate;          /* Key needs revalidation? */
-
-    struct odputil_keybuf key_buf;
+    struct ovs_mutex mutex;                   /* Guards the following. */
+    struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
+    long long int created OVS_GUARDED;        /* Estimate of creation time. */
+    bool mark OVS_GUARDED;                    /* For mark and sweep garbage
+                                                 collection. */
+    bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
+                                                 once. */
+
+    struct xlate_cache *xcache;               /* Cache for xlate entries that
+                                               * are affected by this ukey.
+                                               * Used for stats and learning.*/
+    struct odputil_keybuf key_buf;            /* Memory for 'key'. */
 };
 
 /* Flow miss batching.
@@ -223,11 +209,10 @@ static size_t read_upcalls(struct handler *,
                            struct hmap *);
 static void handle_upcalls(struct handler *, struct hmap *, struct upcall *,
                            size_t n_upcalls);
-static void *udpif_flow_dumper(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
 static uint64_t udpif_get_n_flows(struct udpif *);
-static void revalidate_udumps(struct revalidator *, struct list *udumps);
+static void revalidate(struct revalidator *);
 static void revalidator_sweep(struct revalidator *);
 static void revalidator_purge(struct revalidator *);
 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
@@ -238,6 +223,9 @@ static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
                                             const char *argv[], void *aux);
 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
                                             const char *argv[], void *aux);
+
+static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
+                                     long long int used);
 static void ukey_delete(struct revalidator *, struct udpif_key *);
 
 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
@@ -315,33 +303,19 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-            xpthread_join(revalidator->thread, NULL);
+            xpthread_join(udpif->revalidators[i].thread, NULL);
         }
 
-        xpthread_join(udpif->flow_dumper, NULL);
-
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
-            struct udpif_flow_dump *udump, *next_udump;
-
-            LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
-                                &revalidator->udumps) {
-                list_remove(&udump->list_node);
-                free(udump);
-            }
 
             /* Delete ukeys, and delete all flows from the datapath to prevent
              * double-counting stats. */
             revalidator_purge(revalidator);
-            hmap_destroy(&revalidator->ukeys);
-            ovs_mutex_destroy(&revalidator->mutex);
-
             free(revalidator->name);
+
+            hmap_destroy(&udpif->ukeys[i].hmap);
+            ovs_mutex_destroy(&udpif->ukeys[i].mutex);
         }
 
         for (i = 0; i < udpif->n_handlers; i++) {
@@ -349,6 +323,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
         latch_poll(&udpif->exit_latch);
 
+        xpthread_barrier_destroy(&udpif->reval_barrier);
+
         free(udpif->revalidators);
         udpif->revalidators = NULL;
         udpif->n_revalidators = 0;
@@ -356,6 +332,9 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         free(udpif->handlers);
         udpif->handlers = NULL;
         udpif->n_handlers = 0;
+
+        free(udpif->ukeys);
+        udpif->ukeys = NULL;
     }
 
     error = dpif_handlers_set(udpif->dpif, n_handlers);
@@ -382,20 +361,22 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
                             handler);
         }
 
+        xpthread_barrier_init(&udpif->reval_barrier, NULL,
+                              udpif->n_revalidators);
+        udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators
                                       * sizeof *udpif->revalidators);
+        udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
             revalidator->udpif = udpif;
-            list_init(&revalidator->udumps);
-            hmap_init(&revalidator->ukeys);
-            ovs_mutex_init(&revalidator->mutex);
-            xpthread_cond_init(&revalidator->wake_cond, NULL);
+            hmap_init(&udpif->ukeys[i].hmap);
+            ovs_mutex_init(&udpif->ukeys[i].mutex);
+            revalidator->ukeys = &udpif->ukeys[i].hmap;
             xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
                             revalidator);
         }
-        xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 
     ovsrcu_quiesce_end();
@@ -439,21 +420,13 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 {
     size_t i;
 
-    simap_increase(usage, "flow_dumpers", 1);
-
     simap_increase(usage, "handlers", udpif->n_handlers);
 
     simap_increase(usage, "revalidators", udpif->n_revalidators);
     for (i = 0; i < udpif->n_revalidators; i++) {
-        struct revalidator *revalidator = &udpif->revalidators[i];
-        ovs_mutex_lock(&revalidator->mutex);
-        simap_increase(usage, "revalidator dumps", revalidator->n_udumps);
-
-        /* XXX: This isn't technically thread safe because the revalidator
-         * ukeys maps isn't protected by a mutex since it's per thread. */
-        simap_increase(usage, "revalidator keys",
-                       hmap_count(&revalidator->ukeys));
-        ovs_mutex_unlock(&revalidator->mutex);
+        ovs_mutex_lock(&udpif->ukeys[i].mutex);
+        simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
+        ovs_mutex_unlock(&udpif->ukeys[i].mutex);
     }
 }
 
@@ -505,125 +478,6 @@ udpif_get_n_flows(struct udpif *udpif)
     return flow_count;
 }
 
-static void *
-udpif_flow_dumper(void *arg)
-{
-    struct udpif *udpif = arg;
-
-    set_subprogram_name("flow_dumper");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        const struct dpif_flow_stats *stats;
-        long long int start_time, duration;
-        const struct nlattr *key, *mask;
-        struct dpif_flow_dump dump;
-        size_t key_len, mask_len;
-        unsigned int flow_limit;
-        bool need_revalidate;
-        uint64_t reval_seq;
-        size_t n_flows, i;
-        int error;
-        void *state = NULL;
-
-        reval_seq = seq_read(udpif->reval_seq);
-        need_revalidate = udpif->last_reval_seq != reval_seq;
-        udpif->last_reval_seq = reval_seq;
-
-        n_flows = udpif_get_n_flows(udpif);
-        udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
-        udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
-
-        start_time = time_msec();
-        error = dpif_flow_dump_start(&dump, udpif->dpif);
-        if (error) {
-            VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error));
-            goto skip;
-        }
-        dpif_flow_dump_state_init(udpif->dpif, &state);
-        while (dpif_flow_dump_next(&dump, state, &key, &key_len,
-                                   &mask, &mask_len, NULL, NULL, &stats)
-               && !latch_is_set(&udpif->exit_latch)) {
-            struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
-            struct revalidator *revalidator;
-
-            udump->key_hash = hash_bytes(key, key_len, udpif->secret);
-            memcpy(&udump->key_buf, key, key_len);
-            udump->key = (struct nlattr *) &udump->key_buf;
-            udump->key_len = key_len;
-
-            memcpy(&udump->mask_buf, mask, mask_len);
-            udump->mask = (struct nlattr *) &udump->mask_buf;
-            udump->mask_len = mask_len;
-
-            udump->stats = *stats;
-            udump->need_revalidate = need_revalidate;
-
-            revalidator = &udpif->revalidators[udump->key_hash
-                % udpif->n_revalidators];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3
-                   && !latch_is_set(&udpif->exit_latch)) {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
-            }
-            list_push_back(&revalidator->udumps, &udump->list_node);
-            revalidator->n_udumps++;
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-        dpif_flow_dump_state_uninit(udpif->dpif, state);
-        dpif_flow_dump_done(&dump);
-
-        /* Let all the revalidators finish and garbage collect. */
-        seq_change(udpif->dump_seq);
-        for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-            ovs_mutex_lock(&revalidator->mutex);
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-
-        for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            while (revalidator->dump_seq != seq_read(udpif->dump_seq)
-                   && !latch_is_set(&udpif->exit_latch)) {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
-            }
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-
-        duration = MAX(time_msec() - start_time, 1);
-        udpif->dump_duration = duration;
-        atomic_read(&udpif->flow_limit, &flow_limit);
-        if (duration > 2000) {
-            flow_limit /= duration / 1000;
-        } else if (duration > 1300) {
-            flow_limit = flow_limit * 3 / 4;
-        } else if (duration < 1000 && n_flows > 2000
-                   && flow_limit < n_flows * 1000 / duration) {
-            flow_limit += 1000;
-        }
-        flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
-        atomic_store(&udpif->flow_limit, flow_limit);
-
-        if (duration > 2000) {
-            VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
-                      duration);
-        }
-
-skip:
-        poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
-        seq_wait(udpif->reval_seq, udpif->last_reval_seq);
-        latch_wait(&udpif->exit_latch);
-        poll_block();
-    }
-
-    return NULL;
-}
-
 /* The upcall handler thread tries to read a batch of FLOW_MISS_MAX_BATCH
  * upcalls from dpif, processes the batch and installs corresponding flows
  * in dpif. */
@@ -670,42 +524,85 @@ udpif_upcall_handler(void *arg)
 static void *
 udpif_revalidator(void *arg)
 {
+    /* Used by all revalidators. */
     struct revalidator *revalidator = arg;
+    struct udpif *udpif = revalidator->udpif;
+    bool leader = revalidator == &udpif->revalidators[0];
+
+    /* Used only by the leader. */
+    long long int start_time = 0;
+    uint64_t last_reval_seq = 0;
+    unsigned int flow_limit = 0;
+    size_t n_flows = 0;
 
     revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self());
     set_subprogram_name("%s", revalidator->name);
     for (;;) {
-        struct list udumps = LIST_INITIALIZER(&udumps);
-        struct udpif *udpif = revalidator->udpif;
-        size_t i;
+        if (leader) {
+            uint64_t reval_seq;
 
-        ovs_mutex_lock(&revalidator->mutex);
-        if (latch_is_set(&udpif->exit_latch)) {
-            ovs_mutex_unlock(&revalidator->mutex);
-            return NULL;
-        }
+            reval_seq = seq_read(udpif->reval_seq);
+            udpif->need_revalidate = last_reval_seq != reval_seq;
+            last_reval_seq = reval_seq;
 
-        if (!revalidator->n_udumps) {
-            if (revalidator->dump_seq != seq_read(udpif->dump_seq)) {
-                revalidator->dump_seq = seq_read(udpif->dump_seq);
-                revalidator_sweep(revalidator);
-            } else {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
+            n_flows = udpif_get_n_flows(udpif);
+            udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
+            udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
+
+            /* Only the leader checks the exit latch to prevent a race where
+             * some threads think it's true and exit and others think it's
+             * false and block indefinitely on the reval_barrier */
+            udpif->reval_exit = latch_is_set(&udpif->exit_latch);
+
+            start_time = time_msec();
+            if (!udpif->reval_exit) {
+                dpif_flow_dump_start(&udpif->dump, udpif->dpif);
             }
         }
 
-        for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) {
-            list_push_back(&udumps, list_pop_front(&revalidator->udumps));
-            revalidator->n_udumps--;
+        /* Wait for the leader to start the flow dump. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+        if (udpif->reval_exit) {
+            break;
         }
+        revalidate(revalidator);
+
+        /* Wait for all flows to have been dumped before we garbage collect. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+        revalidator_sweep(revalidator);
+
+        /* Wait for all revalidators to finish garbage collection. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+
+        if (leader) {
+            long long int duration;
+
+            dpif_flow_dump_done(&udpif->dump);
+            seq_change(udpif->dump_seq);
+
+            duration = MAX(time_msec() - start_time, 1);
+            atomic_read(&udpif->flow_limit, &flow_limit);
+            udpif->dump_duration = duration;
+            if (duration > 2000) {
+                flow_limit /= duration / 1000;
+            } else if (duration > 1300) {
+                flow_limit = flow_limit * 3 / 4;
+            } else if (duration < 1000 && n_flows > 2000
+                       && flow_limit < n_flows * 1000 / duration) {
+                flow_limit += 1000;
+            }
+            flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
+            atomic_store(&udpif->flow_limit, flow_limit);
 
-        /* Wake up the flow dumper. */
-        xpthread_cond_signal(&revalidator->wake_cond);
-        ovs_mutex_unlock(&revalidator->mutex);
+            if (duration > 2000) {
+                VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
+                          duration);
+            }
 
-        if (!list_is_empty(&udumps)) {
-            revalidate_udumps(revalidator, &udumps);
+            poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
+            seq_wait(udpif->reval_seq, last_reval_seq);
+            latch_wait(&udpif->exit_latch);
+            poll_block();
         }
     }
 
@@ -1166,15 +1063,16 @@ handle_upcalls(struct handler *handler, struct hmap *misses,
     dpif_operate(udpif->dpif, opsp, n_ops);
 }
 
+/* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
 static struct udpif_key *
-ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
+ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len,
+              uint32_t hash)
 {
     struct udpif_key *ukey;
+    struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
 
-    HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash,
-                             &revalidator->ukeys) {
-        if (ukey->key_len == udump->key_len
-            && !memcmp(ukey->key, udump->key, udump->key_len)) {
+    HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
+        if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
             return ukey;
         }
     }
@@ -1182,27 +1080,72 @@ ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
 }
 
 static struct udpif_key *
+ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
+            uint32_t hash)
+{
+    struct udpif_key *ukey;
+    uint32_t idx = hash % udpif->n_revalidators;
+
+    ovs_mutex_lock(&udpif->ukeys[idx].mutex);
+    ukey = ukey_lookup__(udpif, key, key_len, hash);
+    ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+
+    return ukey;
+}
+
+static struct udpif_key *
 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
 {
     struct udpif_key *ukey = xmalloc(sizeof *ukey);
+    ovs_mutex_init(&ukey->mutex);
 
     ukey->key = (struct nlattr *) &ukey->key_buf;
     memcpy(&ukey->key_buf, key, key_len);
     ukey->key_len = key_len;
 
+    ovs_mutex_lock(&ukey->mutex);
     ukey->mark = false;
+    ukey->flow_exists = true;
     ukey->created = used ? used : time_msec();
     memset(&ukey->stats, 0, sizeof ukey->stats);
     ukey->xcache = NULL;
+    ovs_mutex_unlock(&ukey->mutex);
 
     return ukey;
 }
 
+/* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash',
+ * and inserts 'ukey' if it does not exist.
+ *
+ * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. */
+static bool
+udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash)
+{
+    struct udpif_key *duplicate;
+    uint32_t idx = hash % udpif->n_revalidators;
+    bool ok;
+
+    ovs_mutex_lock(&udpif->ukeys[idx].mutex);
+    duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash);
+    if (duplicate) {
+        ok = false;
+    } else {
+        hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
+        ok = true;
+    }
+    ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+
+    return ok;
+}
+
 static void
 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
 {
-    hmap_remove(&revalidator->ukeys, &ukey->hmap_node);
+    if (revalidator) {
+        hmap_remove(revalidator->ukeys, &ukey->hmap_node);
+    }
     xlate_cache_delete(ukey->xcache);
+    ovs_mutex_destroy(&ukey->mutex);
     free(ukey);
 }
 
@@ -1233,17 +1176,19 @@ should_revalidate(uint64_t packets, long long int used)
 }
 
 static bool
-revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
-                struct udpif_key *ukey)
+revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
+                const struct nlattr *mask, size_t mask_len,
+                const struct nlattr *actions, size_t actions_len,
+                const struct dpif_flow_stats *stats)
 {
-    struct ofpbuf xout_actions, *actions;
     uint64_t slow_path_buf[128 / 8];
     struct xlate_out xout, *xoutp;
     struct netflow *netflow;
-    struct flow flow, udump_mask;
     struct ofproto_dpif *ofproto;
     struct dpif_flow_stats push;
-    uint32_t *udump32, *xout32;
+    struct ofpbuf xout_actions;
+    struct flow flow, dp_mask;
+    uint32_t *dp32, *xout32;
     odp_port_t odp_in_port;
     struct xlate_in xin;
     long long int last_used;
@@ -1253,43 +1198,34 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
 
     ok = false;
     xoutp = NULL;
-    actions = NULL;
     netflow = NULL;
 
-    /* If we don't need to revalidate, we can simply push the stats contained
-     * in the udump, otherwise we'll have to get the actions so we can check
-     * them. */
-    if (udump->need_revalidate) {
-        if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions,
-                          &udump->stats)) {
-            goto exit;
-        }
-    }
-
+    ovs_mutex_lock(&ukey->mutex);
     last_used = ukey->stats.used;
-    push.used = udump->stats.used;
-    push.tcp_flags = udump->stats.tcp_flags;
-    push.n_packets = udump->stats.n_packets > ukey->stats.n_packets
-        ? udump->stats.n_packets - ukey->stats.n_packets
+    push.used = stats->used;
+    push.tcp_flags = stats->tcp_flags;
+    push.n_packets = stats->n_packets > ukey->stats.n_packets
+        ? stats->n_packets - ukey->stats.n_packets
         : 0;
-    push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes
-        ? udump->stats.n_bytes - ukey->stats.n_bytes
+    push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
+        ? stats->n_bytes - ukey->stats.n_bytes
         : 0;
-    ukey->stats = udump->stats;
+    ukey->stats = *stats;
+    ovs_mutex_unlock(&ukey->mutex);
 
-    if (udump->need_revalidate && last_used
+    if (udpif->need_revalidate && last_used
         && !should_revalidate(push.n_packets, last_used)) {
         ok = false;
         goto exit;
     }
 
-    if (!push.n_packets && !udump->need_revalidate) {
+    if (!push.n_packets && !udpif->need_revalidate) {
         ok = true;
         goto exit;
     }
 
     may_learn = push.n_packets > 0;
-    if (ukey->xcache && !udump->need_revalidate) {
+    if (ukey->xcache && !udpif->need_revalidate) {
         xlate_push_stats(ukey->xcache, may_learn, &push);
         ok = true;
         goto exit;
@@ -1301,7 +1237,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
         goto exit;
     }
 
-    if (udump->need_revalidate) {
+    if (udpif->need_revalidate) {
         xlate_cache_clear(ukey->xcache);
     }
     if (!ukey->xcache) {
@@ -1312,11 +1248,11 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     xin.resubmit_stats = push.n_packets ? &push : NULL;
     xin.xcache = ukey->xcache;
     xin.may_learn = may_learn;
-    xin.skip_wildcards = !udump->need_revalidate;
+    xin.skip_wildcards = !udpif->need_revalidate;
     xlate_actions(&xin, &xout);
     xoutp = &xout;
 
-    if (!udump->need_revalidate) {
+    if (!udpif->need_revalidate) {
         ok = true;
         goto exit;
     }
@@ -1329,11 +1265,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
     }
 
-    if (!ofpbuf_equal(&xout_actions, actions)) {
+    if (actions_len != ofpbuf_size(&xout_actions)
+        || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) {
         goto exit;
     }
 
-    if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, &flow)
+    if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
         == ODP_FIT_ERROR) {
         goto exit;
     }
@@ -1343,10 +1280,10 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
      * mask in the kernel is more specific i.e. less wildcarded, than what
      * we've calculated here.  This guarantees we don't catch any packets we
      * shouldn't with the megaflow. */
-    udump32 = (uint32_t *) &udump_mask;
+    dp32 = (uint32_t *) &dp_mask;
     xout32 = (uint32_t *) &xout.wc.masks;
     for (i = 0; i < FLOW_U32S; i++) {
-        if ((udump32[i] | xout32[i]) != udump32[i]) {
+        if ((dp32[i] | xout32[i]) != dp32[i]) {
             goto exit;
         }
     }
@@ -1360,24 +1297,21 @@ exit:
         }
         netflow_unref(netflow);
     }
-    ofpbuf_delete(actions);
     xlate_out_uninit(xoutp);
     return ok;
 }
 
 struct dump_op {
     struct udpif_key *ukey;
-    struct udpif_flow_dump *udump;
     struct dpif_flow_stats stats; /* Stats for 'op'. */
     struct dpif_op op;            /* Flow del operation. */
 };
 
 static void
 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
-             struct udpif_key *ukey, struct udpif_flow_dump *udump)
+             struct udpif_key *ukey)
 {
     op->ukey = ukey;
-    op->udump = udump;
     op->op.type = DPIF_OP_FLOW_DEL;
     op->op.u.flow_del.key = key;
     op->op.u.flow_del.key_len = key_len;
@@ -1385,10 +1319,8 @@ dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
 }
 
 static void
-push_dump_ops(struct revalidator *revalidator,
-              struct dump_op *ops, size_t n_ops)
+push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
 {
-    struct udpif *udpif = revalidator->udpif;
     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
     size_t i;
 
@@ -1405,10 +1337,12 @@ push_dump_ops(struct revalidator *revalidator,
         stats = op->op.u.flow_del.stats;
         if (op->ukey) {
             push = &push_buf;
+            ovs_mutex_lock(&op->ukey->mutex);
             push->used = MAX(stats->used, op->ukey->stats.used);
             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+            ovs_mutex_unlock(&op->ukey->mutex);
         } else {
             push = stats;
         }
@@ -1445,94 +1379,120 @@ push_dump_ops(struct revalidator *revalidator,
             }
         }
     }
+}
 
-    for (i = 0; i < n_ops; i++) {
-        struct udpif_key *ukey;
+static void
+push_dump_ops(struct revalidator *revalidator,
+              struct dump_op *ops, size_t n_ops)
+{
+    int i;
 
-        /* If there's a udump, this ukey came directly from a datapath flow
-         * dump.  Sometimes a datapath can send duplicates in flow dumps, in
-         * which case we wouldn't want to double-free a ukey, so avoid that by
-         * looking up the ukey again.
-         *
-         * If there's no udump then we know what we're doing. */
-        ukey = (ops[i].udump
-                ? ukey_lookup(revalidator, ops[i].udump)
-                : ops[i].ukey);
-        if (ukey) {
-            ukey_delete(revalidator, ukey);
-        }
+    push_dump_ops__(revalidator->udpif, ops, n_ops);
+    for (i = 0; i < n_ops; i++) {
+        ukey_delete(revalidator, ops[i].ukey);
     }
 }
 
 static void
-revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+revalidate(struct revalidator *revalidator)
 {
     struct udpif *udpif = revalidator->udpif;
 
     struct dump_op ops[REVALIDATE_MAX_BATCH];
-    struct udpif_flow_dump *udump, *next_udump;
-    size_t n_ops, n_flows;
+    const struct nlattr *key, *mask, *actions;
+    size_t key_len, mask_len, actions_len;
+    const struct dpif_flow_stats *stats;
+    long long int now;
     unsigned int flow_limit;
-    long long int max_idle;
-    bool must_del;
+    size_t n_ops;
+    void *state;
 
+    n_ops = 0;
+    now = time_msec();
     atomic_read(&udpif->flow_limit, &flow_limit);
 
-    n_flows = udpif_get_n_flows(udpif);
-
-    must_del = false;
-    max_idle = ofproto_max_idle;
-    if (n_flows > flow_limit) {
-        must_del = n_flows > 2 * flow_limit;
-        max_idle = 100;
-    }
-
-    n_ops = 0;
-    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
-        long long int used, now;
+    dpif_flow_dump_state_init(udpif->dpif, &state);
+    while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
+                               &mask_len, &actions, &actions_len, &stats)) {
         struct udpif_key *ukey;
+        bool mark, may_destroy;
+        long long int used, max_idle;
+        uint32_t hash;
+        size_t n_flows;
 
-        now = time_msec();
-        ukey = ukey_lookup(revalidator, udump);
+        hash = hash_bytes(key, key_len, udpif->secret);
+        ukey = ukey_lookup(udpif, key, key_len, hash);
 
-        used = udump->stats.used;
+        used = stats->used;
         if (!used && ukey) {
+            ovs_mutex_lock(&ukey->mutex);
             used = ukey->created;
+            ovs_mutex_unlock(&ukey->mutex);
         }
 
-        if (must_del || (used && used < now - max_idle)) {
-            struct dump_op *dop = &ops[n_ops++];
+        n_flows = udpif_get_n_flows(udpif);
+        max_idle = ofproto_max_idle;
+        if (n_flows > flow_limit) {
+            max_idle = 100;
+        }
 
-            dump_op_init(dop, udump->key, udump->key_len, ukey, udump);
-            continue;
+        if ((used && used < now - max_idle) || n_flows > flow_limit * 2) {
+            mark = false;
+        } else {
+            if (!ukey) {
+                ukey = ukey_create(key, key_len, used);
+                if (!udpif_insert_ukey(udpif, ukey, hash)) {
+                    /* The same ukey has already been created. This means that
+                     * another revalidator is processing this flow
+                     * concurrently, so don't bother processing it. */
+                    ukey_delete(NULL, ukey);
+                    continue;
+                }
+            }
+
+            mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
+                                   actions_len, stats);
+        }
+
+        if (ukey) {
+            ovs_mutex_lock(&ukey->mutex);
+            ukey->mark = ukey->flow_exists = mark;
+            ovs_mutex_unlock(&ukey->mutex);
         }
 
-        if (!ukey) {
-            ukey = ukey_create(udump->key, udump->key_len, used);
-            hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
-                        udump->key_hash);
+        if (!mark) {
+            dump_op_init(&ops[n_ops++], key, key_len, ukey);
         }
-        ukey->mark = true;
 
-        if (!revalidate_ukey(udpif, udump, ukey)) {
-            dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL);
-            ukey_delete(revalidator, ukey);
+        may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
+                                                           state);
+
+        /* Only update 'now' immediately before 'buffer' will be updated.
+         * This gives us the current time relative to the time the datapath
+         * will write into 'stats'. */
+        if (may_destroy) {
+            now = time_msec();
         }
 
-        list_remove(&udump->list_node);
-        free(udump);
+        /* Only do a dpif_operate when we've hit our maximum batch, or when our
+         * memory is about to be clobbered by the next call to
+         * dpif_flow_dump_next(). */
+        if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
+            push_dump_ops__(udpif, ops, n_ops);
+            n_ops = 0;
+        }
     }
 
-    push_dump_ops(revalidator, ops, n_ops);
-
-    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
-        list_remove(&udump->list_node);
-        free(udump);
+    if (n_ops) {
+        push_dump_ops__(udpif, ops, n_ops);
     }
+
+    dpif_flow_dump_state_uninit(udpif->dpif, state);
 }
 
 static void
 revalidator_sweep__(struct revalidator *revalidator, bool purge)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct dump_op ops[REVALIDATE_MAX_BATCH];
     struct udpif_key *ukey, *next;
@@ -1540,16 +1500,20 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
 
     n_ops = 0;
 
-    HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
+    /* During garbage collection, this revalidator completely owns its ukeys
+     * map, and therefore doesn't need to do any locking. */
+    HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
         if (!purge && ukey->mark) {
             ukey->mark = false;
+        } else if (!ukey->flow_exists) {
+            ukey_delete(revalidator, ukey);
         } else {
             struct dump_op *op = &ops[n_ops++];
 
             /* If we have previously seen a flow in the datapath, but didn't
              * see it during the most recent dump, delete it. This allows us
              * to clean up the ukey and keep the statistics consistent. */
-            dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL);
+            dump_op_init(op, ukey->key, ukey->key_len, ukey);
             if (n_ops == REVALIDATE_MAX_BATCH) {
                 push_dump_ops(revalidator, ops, n_ops);
                 n_ops = 0;
@@ -1597,13 +1561,10 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         for (i = 0; i < n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
-            /* XXX: The result of hmap_count(&revalidator->ukeys) may not be
-             * accurate because it's not protected by the revalidator mutex. */
-            ovs_mutex_lock(&revalidator->mutex);
-            ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys %"PRIuSIZE
-                          ")\n", revalidator->name, revalidator->n_udumps,
-                          hmap_count(&revalidator->ukeys));
-            ovs_mutex_unlock(&revalidator->mutex);
+            ovs_mutex_lock(&udpif->ukeys[i].mutex);
+            ds_put_format(&ds, "\t%s: (keys %"PRIuSIZE")\n", revalidator->name,
+                          hmap_count(&udpif->ukeys[i].hmap));
+            ovs_mutex_unlock(&udpif->ukeys[i].mutex);
         }
     }
 
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index c5a01a1..89c8ad7 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -3833,10 +3833,10 @@ skb_priority(0),skb_mark(0/0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00
 ])
 
 AT_CHECK([cat ovs-vswitchd.log | grep -e 'in_port(100).*packets:9' | FILTER_FLOW_DUMP], [0], [dnl
-skb_priority(0),skb_mark(0/0),recirc_id(0),in_port(100),eth(src=50:54:00:00:00:05/00:00:00:00:00:00,dst=50:54:00:00:00:07/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.1/0.0.0.0,dst=192.168.0.2/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), packets:9, bytes:540, used:0.0s
+skb_priority(0),skb_mark(0/0),recirc_id(0),in_port(100),eth(src=50:54:00:00:00:05/00:00:00:00:00:00,dst=50:54:00:00:00:07/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.1/0.0.0.0,dst=192.168.0.2/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), packets:9, bytes:540, used:0.0s, actions:101,3,2
 ])
 AT_CHECK([cat ovs-vswitchd.log | grep -e 'in_port(101).*packets:4' | FILTER_FLOW_DUMP], [0], [dnl
-skb_priority(0),skb_mark(0/0),recirc_id(0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00:00:00,dst=50:54:00:00:00:05/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.2/0.0.0.0,dst=192.168.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), packets:4, bytes:240, used:0.0s
+skb_priority(0),skb_mark(0/0),recirc_id(0),in_port(101),eth(src=50:54:00:00:00:07/00:00:00:00:00:00,dst=50:54:00:00:00:05/00:00:00:00:00:00),eth_type(0x0800),ipv4(src=192.168.0.2/0.0.0.0,dst=192.168.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff),icmp(type=8/0,code=0/0), packets:4, bytes:240, used:0.0s, actions:100,2,3
 ])
 
 AT_CHECK([ovs-ofctl dump-ports br0 pbr0], [0], [dnl
@@ -4378,10 +4378,10 @@ skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00
 skb_priority(0),skb_mark(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), actions:drop
 ])
 AT_CHECK([cat ovs-vswitchd.log | grep '00:09.*packets:3' | FILTER_FLOW_DUMP], [0], [dnl
-skb_priority(0),skb_mark(0),recirc_id(0),dp_hash(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), packets:3, bytes:180, used:0.0s
+skb_priority(0),skb_mark(0),recirc_id(0),dp_hash(0),in_port(1),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), packets:3, bytes:180, used:0.0s, actions:2
 ])
 AT_CHECK([cat ovs-vswitchd.log | grep '00:0b.*packets:3' | FILTER_FLOW_DUMP], [0], [dnl
-skb_priority(0),skb_mark(0),recirc_id(0),dp_hash(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), packets:3, bytes:180, used:0.0s
+skb_priority(0),skb_mark(0),recirc_id(0),dp_hash(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.4,dst=10.0.0.3,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0), packets:3, bytes:180, used:0.0s, actions:drop
 ])
 OVS_VSWITCHD_STOP
 AT_CLEANUP
-- 
1.7.10.4




More information about the dev mailing list