[ovs-dev] [PATCHv4 3/9] lib: Introduce seqlock mechanism.

Daniele Di Proietto diproiettod at vmware.com
Fri Mar 27 16:29:52 UTC 2015


Factor out the counter lock from the cmap implementation.

A writer-prioritizing counter-based R/W spinlock is introduced with this
commit. The code is taken from the cmap implementation. It will be used
also in subsequent commits.

Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
Acked-by: Ben Pfaff <blp at nicira.com>
---
 lib/automake.mk |   1 +
 lib/cmap.c      |  80 +++++++------------------------
 lib/seqlock.h   | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 161 insertions(+), 62 deletions(-)
 create mode 100644 lib/seqlock.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 3629079..abe08a8 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -207,6 +207,7 @@ lib_libopenvswitch_la_SOURCES = \
 	lib/sat-math.h \
 	lib/seq.c \
 	lib/seq.h \
+	lib/seqlock.h \
 	lib/sha1.c \
 	lib/sha1.h \
 	lib/shash.c \
diff --git a/lib/cmap.c b/lib/cmap.c
index 8d11ae0..612e9fb 100644
--- a/lib/cmap.c
+++ b/lib/cmap.c
@@ -21,6 +21,7 @@
 #include "hash.h"
 #include "ovs-rcu.h"
 #include "random.h"
+#include "seqlock.h"
 #include "util.h"
 
 COVERAGE_DEFINE(cmap_expand);
@@ -129,13 +130,7 @@ COVERAGE_DEFINE(cmap_shrink);
 /* A cuckoo hash bucket.  Designed to be cache-aligned and exactly one cache
  * line long. */
 struct cmap_bucket {
-    /* Allows readers to track in-progress changes.  Initially zero, each
-     * writer increments this value just before and just after each change (see
-     * cmap_set_bucket()).  Thus, a reader can ensure that it gets a consistent
-     * snapshot by waiting for the counter to become even (see
-     * read_even_counter()), then checking that its value does not change while
-     * examining the bucket (see cmap_find()). */
-    atomic_uint32_t counter;
+    struct seqlock lock;
 
     /* (hash, node) slots.  They are parallel arrays instead of an array of
      * structs to reduce the amount of space lost to padding.
@@ -270,46 +265,6 @@ cmap_is_empty(const struct cmap *cmap)
     return cmap_count(cmap) == 0;
 }
 
-static inline uint32_t
-read_counter(const struct cmap_bucket *bucket_)
-{
-    struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_);
-    uint32_t counter;
-
-    atomic_read_explicit(&bucket->counter, &counter, memory_order_acquire);
-
-    return counter;
-}
-
-static inline uint32_t
-read_even_counter(const struct cmap_bucket *bucket)
-{
-    uint32_t counter;
-
-    do {
-        counter = read_counter(bucket);
-    } while (OVS_UNLIKELY(counter & 1));
-
-    return counter;
-}
-
-static inline bool
-counter_changed(const struct cmap_bucket *b_, uint32_t c)
-{
-    struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_);
-    uint32_t counter;
-
-    /* Need to make sure the counter read is not moved up, before the hash and
-     * cmap_node_next().  Using atomic_read_explicit with memory_order_acquire
-     * would allow prior reads to be moved after the barrier.
-     * atomic_thread_fence prevents all following memory accesses from moving
-     * prior to preceding loads. */
-    atomic_thread_fence(memory_order_acquire);
-    atomic_read_relaxed(&b->counter, &counter);
-
-    return OVS_UNLIKELY(counter != c);
-}
-
 static inline const struct cmap_node *
 cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash)
 {
@@ -330,20 +285,20 @@ cmap_find__(const struct cmap_bucket *b1, const struct cmap_bucket *b2,
 
     do {
         do {
-            c1 = read_even_counter(b1);
+            c1 = seqlock_read_begin(&b1->lock);
             node = cmap_find_in_bucket(b1, hash);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+        } while (OVS_UNLIKELY(!seqlock_read_correct(&b1->lock, c1)));
         if (node) {
             break;
         }
         do {
-            c2 = read_even_counter(b2);
+            c2 = seqlock_read_begin(&b2->lock);
             node = cmap_find_in_bucket(b2, hash);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+        } while (OVS_UNLIKELY(!seqlock_read_correct(&b2->lock, c2)));
         if (node) {
             break;
         }
-    } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+    } while (OVS_UNLIKELY(!seqlock_read_correct(&b1->lock, c1)));
 
     return node;
 }
@@ -402,9 +357,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c1 = read_even_counter(b1);
+            c1 = seqlock_read_begin(&b1->lock);
             node = cmap_find_in_bucket(b1, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+        } while (OVS_UNLIKELY(!seqlock_read_correct(&b1->lock, c1)));
 
         if (!node) {
             /* Not found (yet); Prefetch the 2nd bucket. */
@@ -425,9 +380,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c2 = read_even_counter(b2);
+            c2 = seqlock_read_begin(&b2->lock);
             node = cmap_find_in_bucket(b2, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+        } while (OVS_UNLIKELY(!seqlock_read_correct(&b2->lock, c2)));
 
         if (!node) {
             /* Not found, but the node may have been moved from b2 to b1 right
@@ -438,7 +393,7 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
              * need to loop as long as it takes to get stable readings of
              * both buckets.  cmap_find__() does that, and now that we have
              * fetched both buckets we can just use it. */
-            if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) {
+            if (OVS_UNLIKELY(!seqlock_read_correct(&b1s[i]->lock, c1s[i]))) {
                 node = cmap_find__(b1s[i], b2s[i], hashes[i]);
                 if (node) {
                     goto found;
@@ -519,11 +474,12 @@ cmap_set_bucket(struct cmap_bucket *b, int i,
 {
     uint32_t c;
 
-    atomic_read_explicit(&b->counter, &c, memory_order_acquire);
-    atomic_store_explicit(&b->counter, c + 1, memory_order_release);
-    ovsrcu_set(&b->nodes[i].next, node); /* Also atomic. */
+    c = seqlock_write_begin(&b->lock);
+
+    ovsrcu_set(&b->nodes[i].next, node);
     b->hashes[i] = hash;
-    atomic_store_explicit(&b->counter, c + 2, memory_order_release);
+
+    seqlock_write_end(&b->lock, c);
 }
 
 /* Searches 'b' for a node with the given 'hash'.  If it finds one, adds
@@ -570,7 +526,7 @@ cmap_insert_dup(struct cmap_node *new_node, uint32_t hash,
             }
 
             /* Change the bucket to point to 'new_node'.  This is a degenerate
-             * form of cmap_set_bucket() that doesn't update the counter since
+             * form of cmap_set_bucket() that doesn't use the seqlock since
              * we're only touching one field and in a way that doesn't change
              * the bucket's meaning for readers. */
             ovsrcu_set(&b->nodes[i].next, new_node);
diff --git a/lib/seqlock.h b/lib/seqlock.h
new file mode 100644
index 0000000..bfd3996
--- /dev/null
+++ b/lib/seqlock.h
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2015 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SEQLOCK_H
+#define SEQLOCK_H 1
+
+#include <stdbool.h>
+#include "ovs-atomic.h"
+
+/* Writer-prioritizing counter-based R/W spinlock
+ * ==============================================
+ *
+ * A single-writer, consistency mechanism that doesn't block the writers.
+ *
+ *
+ * Usage
+ * =====
+ *
+ * Writer:
+ *    uint32_t c;
+ *    c = seqlock_write_begin(&s);
+ *    ...
+ *    write to protected region
+ *    ...
+ *    seqlock_write_end(&s, c);
+ *
+ * Reader:
+ *     uint32_t c;
+ *     do {
+ *         c = seqlock_read_begin(&s);
+ *         ...
+ *         read from protected region
+ *         ...
+ *     } while (!seqlock_read_correct(&s));
+ *
+ * A reader might have to retry if there's a write in progress.
+ * A writer will never be blocked.
+ *
+ * Thread-safety
+ * =============
+ *
+ * The implementation does not support multiple concurrent writers.  If there
+ * might be more than one writer, an external form of mutual exclusion should
+ * be used.
+ *
+ * Implementation
+ * ==============
+ *
+ * The mechanism is based on an atomic counter which is incremented
+ * (non-atomically) by the writer (twice per write) and checked by the
+ * readers.
+ *
+ * A writer:
+ * - Reads the counter (c).  It will be an even number.
+ * - Writes c+1 in the counter to point out that a write is in progress
+ * - Writes in the protected region.
+ * - Writes c+2 in the counter to point out that a write in not in progress
+ *   and that the protected region has been changed.
+ *
+ * A reader:
+ * - Spins until the counter is even, waiting for the writer to finish.
+ * - Reads the protected region
+ * - Check that the counter has not changed.  If the counter has changed,
+ *   it means that a write operation has happened in the meantime and the
+ *   whole read sequence must be restarted.
+ *
+ *   The idea and the code have been taken from the cmap implementation.
+ */
+
+struct seqlock {
+    atomic_uint32_t counter;
+};
+
+static inline uint32_t
+read_counter__(const struct seqlock *s_)
+{
+    struct seqlock *s = CONST_CAST(struct seqlock *, s_);
+    uint32_t counter;
+
+    atomic_read_explicit(&s->counter, &counter, memory_order_acquire);
+
+    return counter;
+}
+
+static inline uint32_t
+seqlock_read_begin(const struct seqlock *s)
+{
+    uint32_t counter;
+
+    do {
+        counter = read_counter__(s);
+    } while (OVS_UNLIKELY(counter & 1));
+
+    return counter;
+}
+
+static inline bool
+seqlock_read_correct(const struct seqlock *s_, uint32_t c)
+{
+    struct seqlock *s = CONST_CAST(struct seqlock *, s_);
+    uint32_t counter;
+
+    /* Need to make sure the counter read is not moved up.  Using
+     * atomic_read_explicit with memory_order_acquire would allow prior reads
+     * to be moved after the barrier.  atomic_thread_fence prevents all
+     * following memory accesses from moving prior to preceding loads. */
+    atomic_thread_fence(memory_order_acquire);
+    atomic_read_relaxed(&s->counter, &counter);
+
+    return OVS_LIKELY(counter == c);
+}
+
+static inline uint32_t
+seqlock_write_begin(struct seqlock *s)
+{
+    uint32_t c;
+
+    atomic_read_explicit(&s->counter, &c, memory_order_acquire);
+    atomic_store_explicit(&s->counter, c + 1, memory_order_release);
+
+    return c;
+}
+
+static inline void
+seqlock_write_end(struct seqlock *s, uint32_t c)
+{
+    atomic_store_explicit(&s->counter, c + 2, memory_order_release);
+}
+#endif /* seqlock.h */
-- 
2.1.4




More information about the dev mailing list