[ovs-dev] [PATCH] ofproto-dpif: Remove the flow_dumper thread.

Joe Stringer joestringer at nicira.com
Thu Mar 6 00:33:34 UTC 2014


From: Ethan Jackson <ethan at nicira.com>

Previously, we had a separate flow_dumper thread that fetched flows from
the datapath to distribute to revalidator threads. This patch takes the
logic for dumping and pushes it into the revalidator threads, resulting
in simpler code with similar performance to the current code.

One thread, the "leader", is responsible for beginning and ending each
flow dump, maintaining the flow_limit, and checking whether the
revalidator threads need to exit. All revalidator threads dump,
revalidate, delete datapath flows and garbage collect ukeys.

Co-authored-by: Joe Stringer <joestringer at nicira.com>
Signed-off-by: Joe Stringer <joestringer at nicira.com>
---
v6: Shift ukeys hmaps from revalidators into udpif.
    Documentation tidyups.
v5: Handle ukey creation race.
    Style fixes.
v4: Rebase.
v3: First post.
---
 ofproto/ofproto-dpif-upcall.c |  624 +++++++++++++++++++----------------------
 1 file changed, 293 insertions(+), 331 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 7dbd7f7..f884d31 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -67,34 +67,27 @@ struct handler {
                                           'mutex'. */
 };
 
-/* A thread that processes each kernel flow handed to it by the flow_dumper
- * thread, updates OpenFlow statistics, and updates or removes the kernel flow
- * as necessary. */
+/* A thread that processes datapath flows, updates OpenFlow statistics, and
+ * updates or removes them if necessary. */
 struct revalidator {
     struct udpif *udpif;               /* Parent udpif. */
     char *name;                        /* Thread name. */
 
     pthread_t thread;                  /* Thread ID. */
-    struct hmap ukeys;                 /* Datapath flow keys. */
-
-    uint64_t dump_seq;
-
-    struct ovs_mutex mutex;            /* Mutex guarding the following. */
-    pthread_cond_t wake_cond;
-    struct list udumps OVS_GUARDED;    /* Unprocessed udumps. */
-    size_t n_udumps OVS_GUARDED;       /* Number of unprocessed udumps. */
+    struct hmap *ukeys;                /* Points into udpif->ukeys for this
+                                          revalidator. Used for GC phase. */
 };
 
 /* An upcall handler for ofproto_dpif.
  *
  * udpif has two logically separate pieces:
  *
- *    - A "dispatcher" thread that reads upcalls from the kernel and dispatches
- *      them to one of several "handler" threads (see struct handler).
+ *    - Miss handling threads led by a "dispatcher" thread that reads upcalls
+ *      from the kernel and dispatches them to one of several "handler" threads
+ *      (see struct handler).
  *
- *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
- *      flows to one of several "revalidator" threads (see struct
- *      revalidator). */
+ *    - Revalidation threads which read the datapath flow table and maintains
+ *      them. */
 struct udpif {
     struct list list_node;             /* In all_udpifs list. */
 
@@ -104,7 +97,6 @@ struct udpif {
     uint32_t secret;                   /* Random seed for upcall hash. */
 
     pthread_t dispatcher;              /* Dispatcher thread ID. */
-    pthread_t flow_dumper;             /* Flow dumper thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
@@ -112,14 +104,24 @@ struct udpif {
     struct revalidator *revalidators;  /* Flow revalidators. */
     size_t n_revalidators;
 
-    uint64_t last_reval_seq;           /* 'reval_seq' at last revalidation. */
-    struct seq *reval_seq;             /* Incremented to force revalidation. */
-
-    struct seq *dump_seq;              /* Increments each dump iteration. */
-
     struct latch exit_latch;           /* Tells child threads to exit. */
 
+    /* Revalidation. */
+    struct seq *reval_seq;             /* Incremented to force revalidation. */
+    bool need_revalidate;              /* As indicated by 'reval_seq'. */
+    bool reval_exit;                   /* Set by leader on 'exit_latch. */
+    pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
+    struct dpif_flow_dump dump;        /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
+    struct seq *dump_seq;              /* Increments each dump iteration. */
+
+    /* During the flow dump phase, revalidators insert into these with a random
+     * distribution. During the garbage collection phase, each revalidator
+     * takes care of garbage collecting one of these hmaps. */
+    struct {
+        struct ovs_mutex mutex;        /* Guards the following. */
+        struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
+    } *ukeys;
 
     /* Datapath flow statistics. */
     unsigned int max_n_flows;
@@ -155,40 +157,28 @@ struct upcall {
 /* 'udpif_key's are responsible for tracking the little bit of state udpif
  * needs to do flow expiration which can't be pulled directly from the
  * datapath.  They are owned, created by, maintained, and destroyed by a single
- * revalidator making them easy to efficiently handle with multiple threads. */
+ * revalidator making them easy to efficiently handle with multiple threads.
+ *
+ * While some elements of a udpif_key are protected by a mutex, the ukey itself
+ * is not.  Therefore it is not safe to destroy a udpif_key except when all
+ * revalidators are in garbage collection phase, or they aren't running. */
 struct udpif_key {
     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
 
-    struct nlattr *key;            /* Datapath flow key. */
+    /* These elements are read only once created, and therefore aren't
+     * protected by a mutex. */
+    const struct nlattr *key;      /* Datapath flow key. */
     size_t key_len;                /* Length of 'key'. */
 
-    struct dpif_flow_stats stats;  /* Stats at most recent flow dump. */
-    long long int created;         /* Estimation of creation time. */
-
-    bool mark;                     /* Used by mark and sweep GC algorithm. */
+    struct ovs_mutex mutex;                   /* Guards the following. */
+    struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
+    long long int created OVS_GUARDED;        /* Estimate of creation time. */
+    bool mark OVS_GUARDED;                    /* For mark and sweep garbage
+                                                 collection. */
+    bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
+                                                 once. */
 
-    struct odputil_keybuf key_buf; /* Memory for 'key'. */
-};
-
-/* 'udpif_flow_dump's hold the state associated with one iteration in a flow
- * dump operation.  This is created by the flow_dumper thread and handed to the
- * appropriate revalidator thread to be processed. */
-struct udpif_flow_dump {
-    struct list list_node;
-
-    struct nlattr *key;            /* Datapath flow key. */
-    size_t key_len;                /* Length of 'key'. */
-    uint32_t key_hash;             /* Hash of 'key'. */
-
-    struct odputil_keybuf mask_buf;
-    struct nlattr *mask;           /* Datapath mask for 'key'. */
-    size_t mask_len;               /* Length of 'mask'. */
-
-    struct dpif_flow_stats stats;  /* Stats pulled from the datapath. */
-
-    bool need_revalidate;          /* Key needs revalidation? */
-
-    struct odputil_keybuf key_buf;
+    struct odputil_keybuf key_buf;            /* Memory for 'key'. */
 };
 
 /* Flow miss batching.
@@ -223,12 +213,11 @@ static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
 static void recv_upcalls(struct udpif *);
 static void handle_upcalls(struct handler *handler, struct list *upcalls);
-static void *udpif_flow_dumper(void *);
 static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
 static uint64_t udpif_get_n_flows(struct udpif *);
-static void revalidate_udumps(struct revalidator *, struct list *udumps);
+static void revalidate(struct revalidator *);
 static void revalidator_sweep(struct revalidator *);
 static void revalidator_purge(struct revalidator *);
 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
@@ -239,6 +228,9 @@ static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
                                             const char *argv[], void *aux);
 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
                                             const char *argv[], void *aux);
+
+static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
+                                     long long int used);
 static void ukey_delete(struct revalidator *, struct udpif_key *);
 
 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
@@ -319,34 +311,21 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-            xpthread_join(revalidator->thread, NULL);
+            xpthread_join(udpif->revalidators[i].thread, NULL);
         }
 
-        xpthread_join(udpif->flow_dumper, NULL);
         xpthread_join(udpif->dispatcher, NULL);
 
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
-            struct udpif_flow_dump *udump, *next_udump;
-
-            LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
-                                &revalidator->udumps) {
-                list_remove(&udump->list_node);
-                free(udump);
-            }
 
             /* Delete ukeys, and delete all flows from the datapath to prevent
              * double-counting stats. */
             revalidator_purge(revalidator);
-            hmap_destroy(&revalidator->ukeys);
-            ovs_mutex_destroy(&revalidator->mutex);
-
             free(revalidator->name);
+
+            hmap_destroy(&udpif->ukeys[i].hmap);
+            ovs_mutex_destroy(&udpif->ukeys[i].mutex);
         }
 
         for (i = 0; i < udpif->n_handlers; i++) {
@@ -364,6 +343,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
         latch_poll(&udpif->exit_latch);
 
+        xpthread_barrier_destroy(&udpif->reval_barrier);
+
         free(udpif->revalidators);
         udpif->revalidators = NULL;
         udpif->n_revalidators = 0;
@@ -371,6 +352,9 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         free(udpif->handlers);
         udpif->handlers = NULL;
         udpif->n_handlers = 0;
+
+        free(udpif->ukeys);
+        udpif->ukeys = NULL;
     }
 
     /* Start new threads (if necessary). */
@@ -393,21 +377,23 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
                             handler);
         }
 
+        xpthread_barrier_init(&udpif->reval_barrier, NULL,
+                              udpif->n_revalidators);
+        udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators
                                       * sizeof *udpif->revalidators);
+        udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
             revalidator->udpif = udpif;
-            list_init(&revalidator->udumps);
-            hmap_init(&revalidator->ukeys);
-            ovs_mutex_init(&revalidator->mutex);
-            xpthread_cond_init(&revalidator->wake_cond, NULL);
+            hmap_init(&udpif->ukeys[i].hmap);
+            ovs_mutex_init(&udpif->ukeys[i].mutex);
+            revalidator->ukeys = &udpif->ukeys[i].hmap;
             xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
                             revalidator);
         }
         xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
-        xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 }
 
@@ -450,7 +436,6 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
     size_t i;
 
     simap_increase(usage, "dispatchers", 1);
-    simap_increase(usage, "flow_dumpers", 1);
 
     simap_increase(usage, "handlers", udpif->n_handlers);
     for (i = 0; i < udpif->n_handlers; i++) {
@@ -462,15 +447,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 
     simap_increase(usage, "revalidators", udpif->n_revalidators);
     for (i = 0; i < udpif->n_revalidators; i++) {
-        struct revalidator *revalidator = &udpif->revalidators[i];
-        ovs_mutex_lock(&revalidator->mutex);
-        simap_increase(usage, "revalidator dumps", revalidator->n_udumps);
-
-        /* XXX: This isn't technically thread safe because the revalidator
-         * ukeys maps isn't protected by a mutex since it's per thread. */
-        simap_increase(usage, "revalidator keys",
-                       hmap_count(&revalidator->ukeys));
-        ovs_mutex_unlock(&revalidator->mutex);
+        ovs_mutex_lock(&udpif->ukeys[i].mutex);
+        simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
+        ovs_mutex_unlock(&udpif->ukeys[i].mutex);
     }
 }
 
@@ -536,125 +515,6 @@ udpif_dispatcher(void *arg)
     return NULL;
 }
 
-static void *
-udpif_flow_dumper(void *arg)
-{
-    struct udpif *udpif = arg;
-
-    set_subprogram_name("flow_dumper");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        const struct dpif_flow_stats *stats;
-        long long int start_time, duration;
-        const struct nlattr *key, *mask;
-        struct dpif_flow_dump dump;
-        size_t key_len, mask_len;
-        unsigned int flow_limit;
-        bool need_revalidate;
-        uint64_t reval_seq;
-        size_t n_flows, i;
-        int error;
-        void *state = NULL;
-
-        reval_seq = seq_read(udpif->reval_seq);
-        need_revalidate = udpif->last_reval_seq != reval_seq;
-        udpif->last_reval_seq = reval_seq;
-
-        n_flows = udpif_get_n_flows(udpif);
-        udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
-        udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
-
-        start_time = time_msec();
-        error = dpif_flow_dump_start(&dump, udpif->dpif);
-        if (error) {
-            VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error));
-            goto skip;
-        }
-        dpif_flow_dump_state_init(udpif->dpif, &state);
-        while (dpif_flow_dump_next(&dump, state, &key, &key_len,
-                                   &mask, &mask_len, NULL, NULL, &stats)
-               && !latch_is_set(&udpif->exit_latch)) {
-            struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
-            struct revalidator *revalidator;
-
-            udump->key_hash = hash_bytes(key, key_len, udpif->secret);
-            memcpy(&udump->key_buf, key, key_len);
-            udump->key = (struct nlattr *) &udump->key_buf;
-            udump->key_len = key_len;
-
-            memcpy(&udump->mask_buf, mask, mask_len);
-            udump->mask = (struct nlattr *) &udump->mask_buf;
-            udump->mask_len = mask_len;
-
-            udump->stats = *stats;
-            udump->need_revalidate = need_revalidate;
-
-            revalidator = &udpif->revalidators[udump->key_hash
-                % udpif->n_revalidators];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3
-                   && !latch_is_set(&udpif->exit_latch)) {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
-            }
-            list_push_back(&revalidator->udumps, &udump->list_node);
-            revalidator->n_udumps++;
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-        dpif_flow_dump_state_uninit(udpif->dpif, state);
-        dpif_flow_dump_done(&dump);
-
-        /* Let all the revalidators finish and garbage collect. */
-        seq_change(udpif->dump_seq);
-        for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-            ovs_mutex_lock(&revalidator->mutex);
-            xpthread_cond_signal(&revalidator->wake_cond);
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-
-        for (i = 0; i < udpif->n_revalidators; i++) {
-            struct revalidator *revalidator = &udpif->revalidators[i];
-
-            ovs_mutex_lock(&revalidator->mutex);
-            while (revalidator->dump_seq != seq_read(udpif->dump_seq)
-                   && !latch_is_set(&udpif->exit_latch)) {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
-            }
-            ovs_mutex_unlock(&revalidator->mutex);
-        }
-
-        duration = MAX(time_msec() - start_time, 1);
-        udpif->dump_duration = duration;
-        atomic_read(&udpif->flow_limit, &flow_limit);
-        if (duration > 2000) {
-            flow_limit /= duration / 1000;
-        } else if (duration > 1300) {
-            flow_limit = flow_limit * 3 / 4;
-        } else if (duration < 1000 && n_flows > 2000
-                   && flow_limit < n_flows * 1000 / duration) {
-            flow_limit += 1000;
-        }
-        flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
-        atomic_store(&udpif->flow_limit, flow_limit);
-
-        if (duration > 2000) {
-            VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
-                      duration);
-        }
-
-skip:
-        poll_timer_wait_until(start_time + MIN(MAX_IDLE, 500));
-        seq_wait(udpif->reval_seq, udpif->last_reval_seq);
-        latch_wait(&udpif->exit_latch);
-        poll_block();
-    }
-
-    return NULL;
-}
-
 /* The miss handler thread is responsible for processing miss upcalls retrieved
  * by the dispatcher thread.  Once finished it passes the processed miss
  * upcalls to ofproto-dpif where they're installed in the datapath. */
@@ -696,42 +556,85 @@ udpif_upcall_handler(void *arg)
 static void *
 udpif_revalidator(void *arg)
 {
+    /* Used by all revalidators. */
     struct revalidator *revalidator = arg;
+    struct udpif *udpif = revalidator->udpif;
+    bool leader = revalidator == &udpif->revalidators[0];
+
+    /* Used only by the leader. */
+    long long int start_time = 0;
+    uint64_t last_reval_seq = 0;
+    unsigned int flow_limit = 0;
+    size_t n_flows = 0;
 
     revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self());
     set_subprogram_name("%s", revalidator->name);
     for (;;) {
-        struct list udumps = LIST_INITIALIZER(&udumps);
-        struct udpif *udpif = revalidator->udpif;
-        size_t i;
+        if (leader) {
+            uint64_t reval_seq;
 
-        ovs_mutex_lock(&revalidator->mutex);
-        if (latch_is_set(&udpif->exit_latch)) {
-            ovs_mutex_unlock(&revalidator->mutex);
-            return NULL;
-        }
+            reval_seq = seq_read(udpif->reval_seq);
+            udpif->need_revalidate = last_reval_seq != reval_seq;
+            last_reval_seq = reval_seq;
 
-        if (!revalidator->n_udumps) {
-            if (revalidator->dump_seq != seq_read(udpif->dump_seq)) {
-                revalidator->dump_seq = seq_read(udpif->dump_seq);
-                revalidator_sweep(revalidator);
-            } else {
-                ovs_mutex_cond_wait(&revalidator->wake_cond,
-                                    &revalidator->mutex);
+            n_flows = udpif_get_n_flows(udpif);
+            udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
+            udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
+
+            /* Only the leader checks the exit latch to prevent a race where
+             * some threads think it's true and exit and others think it's
+             * false and block indefinitely on the reval_barrier */
+            udpif->reval_exit = latch_is_set(&udpif->exit_latch);
+
+            start_time = time_msec();
+            if (!udpif->reval_exit) {
+                dpif_flow_dump_start(&udpif->dump, udpif->dpif);
             }
         }
 
-        for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) {
-            list_push_back(&udumps, list_pop_front(&revalidator->udumps));
-            revalidator->n_udumps--;
+        /* Wait for the leader to start the flow dump. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+        if (udpif->reval_exit) {
+            break;
         }
+        revalidate(revalidator);
+
+        /* Wait for all flows to have been dumped before we garbage collect. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+        revalidator_sweep(revalidator);
+
+        /* Wait for all revalidators to finish garbage collection. */
+        xpthread_barrier_wait(&udpif->reval_barrier);
+
+        if (leader) {
+            long long int duration;
+
+            dpif_flow_dump_done(&udpif->dump);
+            seq_change(udpif->dump_seq);
+
+            duration = MAX(time_msec() - start_time, 1);
+            atomic_read(&udpif->flow_limit, &flow_limit);
+            udpif->dump_duration = duration;
+            if (duration > 2000) {
+                flow_limit /= duration / 1000;
+            } else if (duration > 1300) {
+                flow_limit = flow_limit * 3 / 4;
+            } else if (duration < 1000 && n_flows > 2000
+                       && flow_limit < n_flows * 1000 / duration) {
+                flow_limit += 1000;
+            }
+            flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
+            atomic_store(&udpif->flow_limit, flow_limit);
 
-        /* Wake up the flow dumper. */
-        xpthread_cond_signal(&revalidator->wake_cond);
-        ovs_mutex_unlock(&revalidator->mutex);
+            if (duration > 2000) {
+                VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
+                          duration);
+            }
 
-        if (!list_is_empty(&udumps)) {
-            revalidate_udumps(revalidator, &udumps);
+            poll_timer_wait_until(start_time + MIN(MAX_IDLE, 500));
+            seq_wait(udpif->reval_seq, last_reval_seq);
+            latch_wait(&udpif->exit_latch);
+            poll_block();
         }
     }
 
@@ -1262,15 +1165,16 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
     }
 }
 
+/* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
 static struct udpif_key *
-ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
+ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len,
+              uint32_t hash)
 {
     struct udpif_key *ukey;
+    struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
 
-    HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash,
-                             &revalidator->ukeys) {
-        if (ukey->key_len == udump->key_len
-            && !memcmp(ukey->key, udump->key, udump->key_len)) {
+    HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
+        if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
             return ukey;
         }
     }
@@ -1278,39 +1182,87 @@ ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
 }
 
 static struct udpif_key *
+ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
+            uint32_t hash)
+{
+    struct udpif_key *ukey;
+    uint32_t idx = hash % udpif->n_revalidators;
+
+    ovs_mutex_lock(&udpif->ukeys[idx].mutex);
+    ukey = ukey_lookup__(udpif, key, key_len, hash);
+    ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+
+    return ukey;
+}
+
+static struct udpif_key *
 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
 {
     struct udpif_key *ukey = xmalloc(sizeof *ukey);
+    ovs_mutex_init(&ukey->mutex);
 
     ukey->key = (struct nlattr *) &ukey->key_buf;
     memcpy(&ukey->key_buf, key, key_len);
     ukey->key_len = key_len;
 
+    ovs_mutex_lock(&ukey->mutex);
     ukey->mark = false;
+    ukey->flow_exists = true;
     ukey->created = used ? used : time_msec();
     memset(&ukey->stats, 0, sizeof ukey->stats);
+    ovs_mutex_unlock(&ukey->mutex);
 
     return ukey;
 }
 
+/* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash',
+ * and inserts 'ukey' if it does not exist.
+ *
+ * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. */
+static bool
+udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash)
+{
+    struct udpif_key *duplicate;
+    uint32_t idx = hash % udpif->n_revalidators;
+    bool ok;
+
+    ovs_mutex_lock(&udpif->ukeys[idx].mutex);
+    duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash);
+    if (duplicate) {
+        ok = false;
+    } else {
+        hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
+        ok = true;
+    }
+    ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+
+    return ok;
+}
+
 static void
 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
 {
-    hmap_remove(&revalidator->ukeys, &ukey->hmap_node);
+    if (revalidator) {
+        hmap_remove(revalidator->ukeys, &ukey->hmap_node);
+    }
+    ovs_mutex_destroy(&ukey->mutex);
     free(ukey);
 }
 
 static bool
-revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
-                struct udpif_key *ukey)
+revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
+                const struct nlattr *mask, size_t mask_len,
+                const struct nlattr *actions, size_t actions_len,
+                const struct dpif_flow_stats *stats)
+
 {
-    struct ofpbuf xout_actions, *actions;
     uint64_t slow_path_buf[128 / 8];
     struct xlate_out xout, *xoutp;
-    struct flow flow, udump_mask;
     struct ofproto_dpif *ofproto;
     struct dpif_flow_stats push;
-    uint32_t *udump32, *xout32;
+    struct ofpbuf xout_actions;
+    struct flow flow, dp_mask;
+    uint32_t *dp32, *xout32;
     odp_port_t odp_in_port;
     struct xlate_in xin;
     int error;
@@ -1319,29 +1271,20 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
 
     ok = false;
     xoutp = NULL;
-    actions = NULL;
-
-    /* If we don't need to revalidate, we can simply push the stats contained
-     * in the udump, otherwise we'll have to get the actions so we can check
-     * them. */
-    if (udump->need_revalidate) {
-        if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions,
-                          &udump->stats)) {
-            goto exit;
-        }
-    }
 
-    push.used = udump->stats.used;
-    push.tcp_flags = udump->stats.tcp_flags;
-    push.n_packets = udump->stats.n_packets > ukey->stats.n_packets
-        ? udump->stats.n_packets - ukey->stats.n_packets
+    ovs_mutex_lock(&ukey->mutex);
+    push.used = stats->used;
+    push.tcp_flags = stats->tcp_flags;
+    push.n_packets = stats->n_packets > ukey->stats.n_packets
+        ? stats->n_packets - ukey->stats.n_packets
         : 0;
-    push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes
-        ? udump->stats.n_bytes - ukey->stats.n_bytes
+    push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
+        ? stats->n_bytes - ukey->stats.n_bytes
         : 0;
-    ukey->stats = udump->stats;
+    ukey->stats = *stats;
+    ovs_mutex_unlock(&ukey->mutex);
 
-    if (!push.n_packets && !udump->need_revalidate) {
+    if (!push.n_packets && !udpif->need_revalidate) {
         ok = true;
         goto exit;
     }
@@ -1355,11 +1298,11 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
     xin.resubmit_stats = push.n_packets ? &push : NULL;
     xin.may_learn = push.n_packets > 0;
-    xin.skip_wildcards = !udump->need_revalidate;
+    xin.skip_wildcards = !udpif->need_revalidate;
     xlate_actions(&xin, &xout);
     xoutp = &xout;
 
-    if (!udump->need_revalidate) {
+    if (!udpif->need_revalidate) {
         ok = true;
         goto exit;
     }
@@ -1372,11 +1315,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
         compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
     }
 
-    if (!ofpbuf_equal(&xout_actions, actions)) {
+    if (actions_len != xout_actions.size
+        || memcmp(xout_actions.data, actions, actions_len)) {
         goto exit;
     }
 
-    if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, &flow)
+    if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
         == ODP_FIT_ERROR) {
         goto exit;
     }
@@ -1386,34 +1330,31 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
      * mask in the kernel is more specific i.e. less wildcarded, than what
      * we've calculated here.  This guarantees we don't catch any packets we
      * shouldn't with the megaflow. */
-    udump32 = (uint32_t *) &udump_mask;
+    dp32 = (uint32_t *) &dp_mask;
     xout32 = (uint32_t *) &xout.wc.masks;
     for (i = 0; i < FLOW_U32S; i++) {
-        if ((udump32[i] | xout32[i]) != udump32[i]) {
+        if ((dp32[i] | xout32[i]) != dp32[i]) {
             goto exit;
         }
     }
     ok = true;
 
 exit:
-    ofpbuf_delete(actions);
     xlate_out_uninit(xoutp);
     return ok;
 }
 
 struct dump_op {
     struct udpif_key *ukey;
-    struct udpif_flow_dump *udump;
     struct dpif_flow_stats stats; /* Stats for 'op'. */
     struct dpif_op op;            /* Flow del operation. */
 };
 
 static void
 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
-             struct udpif_key *ukey, struct udpif_flow_dump *udump)
+             struct udpif_key *ukey)
 {
     op->ukey = ukey;
-    op->udump = udump;
     op->op.type = DPIF_OP_FLOW_DEL;
     op->op.u.flow_del.key = key;
     op->op.u.flow_del.key_len = key_len;
@@ -1421,10 +1362,8 @@ dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
 }
 
 static void
-push_dump_ops(struct revalidator *revalidator,
-              struct dump_op *ops, size_t n_ops)
+push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
 {
-    struct udpif *udpif = revalidator->udpif;
     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
     size_t i;
 
@@ -1441,10 +1380,12 @@ push_dump_ops(struct revalidator *revalidator,
         stats = op->op.u.flow_del.stats;
         if (op->ukey) {
             push = &push_buf;
+            ovs_mutex_lock(&op->ukey->mutex);
             push->used = MAX(stats->used, op->ukey->stats.used);
             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+            ovs_mutex_unlock(&op->ukey->mutex);
         } else {
             push = stats;
         }
@@ -1474,94 +1415,114 @@ push_dump_ops(struct revalidator *revalidator,
             }
         }
     }
+}
 
-    for (i = 0; i < n_ops; i++) {
-        struct udpif_key *ukey;
+static void
+push_dump_ops(struct revalidator *revalidator,
+              struct dump_op *ops, size_t n_ops)
+{
+    int i;
 
-        /* If there's a udump, this ukey came directly from a datapath flow
-         * dump.  Sometimes a datapath can send duplicates in flow dumps, in
-         * which case we wouldn't want to double-free a ukey, so avoid that by
-         * looking up the ukey again.
-         *
-         * If there's no udump then we know what we're doing. */
-        ukey = (ops[i].udump
-                ? ukey_lookup(revalidator, ops[i].udump)
-                : ops[i].ukey);
-        if (ukey) {
-            ukey_delete(revalidator, ukey);
-        }
+    push_dump_ops__(revalidator->udpif, ops, n_ops);
+    for (i = 0; i < n_ops; i++) {
+        ukey_delete(revalidator, ops[i].ukey);
     }
 }
 
 static void
-revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+revalidate(struct revalidator *revalidator)
 {
     struct udpif *udpif = revalidator->udpif;
 
     struct dump_op ops[REVALIDATE_MAX_BATCH];
-    struct udpif_flow_dump *udump, *next_udump;
-    size_t n_ops, n_flows;
+    const struct nlattr *key, *mask, *actions;
+    size_t key_len, mask_len, actions_len;
+    const struct dpif_flow_stats *stats;
+    long long int now;
     unsigned int flow_limit;
-    long long int max_idle;
-    bool must_del;
+    size_t n_ops;
+    void *state;
 
+    n_ops = 0;
+    now = time_msec();
     atomic_read(&udpif->flow_limit, &flow_limit);
 
-    n_flows = udpif_get_n_flows(udpif);
-
-    must_del = false;
-    max_idle = MAX_IDLE;
-    if (n_flows > flow_limit) {
-        must_del = n_flows > 2 * flow_limit;
-        max_idle = 100;
-    }
-
-    n_ops = 0;
-    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
-        long long int used, now;
+    dpif_flow_dump_state_init(udpif->dpif, &state);
+    while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
+                               &mask_len, &actions, &actions_len, &stats)) {
         struct udpif_key *ukey;
+        bool mark, may_destroy;
+        long long int used;
+        uint32_t hash;
 
-        now = time_msec();
-        ukey = ukey_lookup(revalidator, udump);
+        hash = hash_bytes(key, key_len, udpif->secret);
+        ukey = ukey_lookup(udpif, key, key_len, hash);
 
-        used = udump->stats.used;
+        used = stats->used;
         if (!used && ukey) {
+            ovs_mutex_lock(&ukey->mutex);
             used = ukey->created;
+            ovs_mutex_unlock(&ukey->mutex);
         }
 
-        if (must_del || (used && used < now - max_idle)) {
-            struct dump_op *dop = &ops[n_ops++];
+        if ((used && used < now - MAX_IDLE)
+            || udpif_get_n_flows(udpif) > flow_limit * 2) {
+            mark = false;
+        } else {
+            if (!ukey) {
+                ukey = ukey_create(key, key_len, used);
+                if (!udpif_insert_ukey(udpif, ukey, hash)) {
+                    /* The same ukey has already been created. This means that
+                     * another revalidator is processing this flow
+                     * concurrently, so don't bother processing it. */
+                    ukey_delete(NULL, ukey);
+                    continue;
+                }
+            }
 
-            dump_op_init(dop, udump->key, udump->key_len, ukey, udump);
-            continue;
+            mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
+                                   actions_len, stats);
         }
 
-        if (!ukey) {
-            ukey = ukey_create(udump->key, udump->key_len, used);
-            hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
-                        udump->key_hash);
+        if (ukey) {
+            ovs_mutex_lock(&ukey->mutex);
+            ukey->mark = ukey->flow_exists = mark;
+            ovs_mutex_unlock(&ukey->mutex);
         }
-        ukey->mark = true;
 
-        if (!revalidate_ukey(udpif, udump, ukey)) {
-            dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL);
-            ukey_delete(revalidator, ukey);
+        if (!mark) {
+            dump_op_init(&ops[n_ops++], key, key_len, ukey);
         }
 
-        list_remove(&udump->list_node);
-        free(udump);
-    }
+        may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
+                                                           state);
+
+        /* Only update 'now' immediately before 'buffer' will be updated.
+         * This gives us the current time relative to the time the datapath
+         * will write into 'stats'. */
+        if (may_destroy) {
+            now = time_msec();
+        }
 
-    push_dump_ops(revalidator, ops, n_ops);
+        /* Only do a dpif_operate when we've hit our maximum batch, or when our
+         * memory is about to be clobbered by the next call to
+         * dpif_flow_dump_next(). */
+        if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
+            push_dump_ops__(udpif, ops, n_ops);
+            n_ops = 0;
+        }
+    }
 
-    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
-        list_remove(&udump->list_node);
-        free(udump);
+    if (n_ops) {
+        push_dump_ops__(udpif, ops, n_ops);
     }
+
+    dpif_flow_dump_state_uninit(udpif->dpif, state);
 }
 
 static void
 revalidator_sweep__(struct revalidator *revalidator, bool purge)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct dump_op ops[REVALIDATE_MAX_BATCH];
     struct udpif_key *ukey, *next;
@@ -1569,16 +1530,20 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge)
 
     n_ops = 0;
 
-    HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
+    /* During garbage collection, this revalidator completely owns its ukeys
+     * map, and therefore doesn't need to do any locking. */
+    HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
         if (!purge && ukey->mark) {
             ukey->mark = false;
+        } else if (!ukey->flow_exists) {
+            ukey_delete(revalidator, ukey);
         } else {
             struct dump_op *op = &ops[n_ops++];
 
             /* If we have previously seen a flow in the datapath, but didn't
              * see it during the most recent dump, delete it. This allows us
              * to clean up the ukey and keep the statistics consistent. */
-            dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL);
+            dump_op_init(op, ukey->key, ukey->key_len, ukey);
             if (n_ops == REVALIDATE_MAX_BATCH) {
                 push_dump_ops(revalidator, ops, n_ops);
                 n_ops = 0;
@@ -1636,13 +1601,10 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         for (i = 0; i < n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
-            /* XXX: The result of hmap_count(&revalidator->ukeys) may not be
-             * accurate because it's not protected by the revalidator mutex. */
-            ovs_mutex_lock(&revalidator->mutex);
-            ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys %"PRIuSIZE
-                          ")\n", revalidator->name, revalidator->n_udumps,
-                          hmap_count(&revalidator->ukeys));
-            ovs_mutex_unlock(&revalidator->mutex);
+            ovs_mutex_lock(&udpif->ukeys[i].mutex);
+            ds_put_format(&ds, "\t%s: (keys %"PRIuSIZE")\n", revalidator->name,
+                          hmap_count(&udpif->ukeys[i].hmap));
+            ovs_mutex_unlock(&udpif->ukeys[i].mutex);
         }
     }
 
-- 
1.7.9.5




More information about the dev mailing list