[ovs-dev] [PATCH v2 3/9] lib: Introduce cntlock mechanism

Daniele Di Proietto diproiettod at vmware.com
Wed Mar 4 12:06:42 UTC 2015


Factor out the counter lock from the cmap implementation.

A writer-prioritizing counter-based R/W spinlock is introduced with this
commit. The code is taken from the cmap implementation. It will be used
also in subsequent commits.

Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
---
 lib/automake.mk |   1 +
 lib/cmap.c      |  80 +++++++------------------------
 lib/cntlock.h   | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 165 insertions(+), 62 deletions(-)
 create mode 100644 lib/cntlock.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 2acfe18..3f3b785 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -39,6 +39,7 @@ lib_libopenvswitch_la_SOURCES = \
 	lib/classifier-private.h \
 	lib/cmap.c \
 	lib/cmap.h \
+	lib/cntlock.h \
 	lib/command-line.c \
 	lib/command-line.h \
 	lib/compiler.h \
diff --git a/lib/cmap.c b/lib/cmap.c
index 8d11ae0..e480962 100644
--- a/lib/cmap.c
+++ b/lib/cmap.c
@@ -16,6 +16,7 @@
 
 #include <config.h>
 #include "cmap.h"
+#include "cntlock.h"
 #include "coverage.h"
 #include "bitmap.h"
 #include "hash.h"
@@ -129,13 +130,7 @@ COVERAGE_DEFINE(cmap_shrink);
 /* A cuckoo hash bucket.  Designed to be cache-aligned and exactly one cache
  * line long. */
 struct cmap_bucket {
-    /* Allows readers to track in-progress changes.  Initially zero, each
-     * writer increments this value just before and just after each change (see
-     * cmap_set_bucket()).  Thus, a reader can ensure that it gets a consistent
-     * snapshot by waiting for the counter to become even (see
-     * read_even_counter()), then checking that its value does not change while
-     * examining the bucket (see cmap_find()). */
-    atomic_uint32_t counter;
+    struct cntlock lock;
 
     /* (hash, node) slots.  They are parallel arrays instead of an array of
      * structs to reduce the amount of space lost to padding.
@@ -270,46 +265,6 @@ cmap_is_empty(const struct cmap *cmap)
     return cmap_count(cmap) == 0;
 }
 
-static inline uint32_t
-read_counter(const struct cmap_bucket *bucket_)
-{
-    struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_);
-    uint32_t counter;
-
-    atomic_read_explicit(&bucket->counter, &counter, memory_order_acquire);
-
-    return counter;
-}
-
-static inline uint32_t
-read_even_counter(const struct cmap_bucket *bucket)
-{
-    uint32_t counter;
-
-    do {
-        counter = read_counter(bucket);
-    } while (OVS_UNLIKELY(counter & 1));
-
-    return counter;
-}
-
-static inline bool
-counter_changed(const struct cmap_bucket *b_, uint32_t c)
-{
-    struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_);
-    uint32_t counter;
-
-    /* Need to make sure the counter read is not moved up, before the hash and
-     * cmap_node_next().  Using atomic_read_explicit with memory_order_acquire
-     * would allow prior reads to be moved after the barrier.
-     * atomic_thread_fence prevents all following memory accesses from moving
-     * prior to preceding loads. */
-    atomic_thread_fence(memory_order_acquire);
-    atomic_read_relaxed(&b->counter, &counter);
-
-    return OVS_UNLIKELY(counter != c);
-}
-
 static inline const struct cmap_node *
 cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash)
 {
@@ -330,20 +285,20 @@ cmap_find__(const struct cmap_bucket *b1, const struct cmap_bucket *b2,
 
     do {
         do {
-            c1 = read_even_counter(b1);
+            c1 = cntlock_read_begin(&b1->lock);
             node = cmap_find_in_bucket(b1, hash);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+        } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
         if (node) {
             break;
         }
         do {
-            c2 = read_even_counter(b2);
+            c2 = cntlock_read_begin(&b2->lock);
             node = cmap_find_in_bucket(b2, hash);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+        } while (OVS_UNLIKELY(!cntlock_read_correct(&b2->lock, c2)));
         if (node) {
             break;
         }
-    } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+    } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
 
     return node;
 }
@@ -402,9 +357,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c1 = read_even_counter(b1);
+            c1 = cntlock_read_begin(&b1->lock);
             node = cmap_find_in_bucket(b1, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+        } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
 
         if (!node) {
             /* Not found (yet); Prefetch the 2nd bucket. */
@@ -425,9 +380,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
         const struct cmap_node *node;
 
         do {
-            c2 = read_even_counter(b2);
+            c2 = cntlock_read_begin(&b2->lock);
             node = cmap_find_in_bucket(b2, hashes[i]);
-        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+        } while (OVS_UNLIKELY(!cntlock_read_correct(&b2->lock, c2)));
 
         if (!node) {
             /* Not found, but the node may have been moved from b2 to b1 right
@@ -438,7 +393,7 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
              * need to loop as long as it takes to get stable readings of
              * both buckets.  cmap_find__() does that, and now that we have
              * fetched both buckets we can just use it. */
-            if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) {
+            if (OVS_UNLIKELY(!cntlock_read_correct(&b1s[i]->lock, c1s[i]))) {
                 node = cmap_find__(b1s[i], b2s[i], hashes[i]);
                 if (node) {
                     goto found;
@@ -519,11 +474,12 @@ cmap_set_bucket(struct cmap_bucket *b, int i,
 {
     uint32_t c;
 
-    atomic_read_explicit(&b->counter, &c, memory_order_acquire);
-    atomic_store_explicit(&b->counter, c + 1, memory_order_release);
-    ovsrcu_set(&b->nodes[i].next, node); /* Also atomic. */
+    c = cntlock_write_begin(&b->lock);
+
+    ovsrcu_set(&b->nodes[i].next, node);
     b->hashes[i] = hash;
-    atomic_store_explicit(&b->counter, c + 2, memory_order_release);
+
+    cntlock_write_end(&b->lock, c);
 }
 
 /* Searches 'b' for a node with the given 'hash'.  If it finds one, adds
@@ -570,7 +526,7 @@ cmap_insert_dup(struct cmap_node *new_node, uint32_t hash,
             }
 
             /* Change the bucket to point to 'new_node'.  This is a degenerate
-             * form of cmap_set_bucket() that doesn't update the counter since
+             * form of cmap_set_bucket() that doesn't use the cntlock since
              * we're only touching one field and in a way that doesn't change
              * the bucket's meaning for readers. */
             ovsrcu_set(&b->nodes[i].next, new_node);
diff --git a/lib/cntlock.h b/lib/cntlock.h
new file mode 100644
index 0000000..bb16932
--- /dev/null
+++ b/lib/cntlock.h
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2015 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CNTLOCK_H
+#define CNTLOCK_H 1
+
+#include <stdbool.h>
+#include "ovs-atomic.h"
+
+/* Writer-prioritizing counter-based R/W spinlock
+ * ==============================================
+ *
+ * A single-writer, consistency mechanism that doesn't block the writers.
+ *
+ *
+ * Usage
+ * =====
+ *
+ * Writer:
+ *    uint32_t c;
+ *    c = cntlock_write_begin(&s);
+ *    ...
+ *    write to protected region
+ *    ...
+ *    cntlock_write_end(&s, c);
+ *
+ * Reader:
+ *     uint32_t c;
+ *     do {
+ *         c = cntlock_read_begin(&s);
+ *         ...
+ *         read from protected region
+ *         ...
+ *     } while (!cntlock_read_correct(&s));
+ *
+ * A reader might have to retry if there's a write in progress.
+ * A writer will never be blocked.
+ *
+ * Thread-safety
+ * =============
+ *
+ * The implementation does not support multiple concurrent writers.  If there
+ * may be more than one writer, an external form of mutual exclusion should be
+ * used.
+ *
+ * Another appropriate use case is to protect thread-local data from
+ * reader/writer concurrent access: if the data is written only by a single
+ * thread, external synchronization is unnecessary.
+ *
+ * Implementation
+ * ==============
+ *
+ * The mechanism is based on an atomic counter which is incremented
+ * (non-atomically) by the writer (twice per write) and checked by the
+ * readers.
+ * 
+ * A writer:
+ * - Reads the counter (c).  It will be an even number.
+ * - Writes c+1 in the counter to point out that a write is in progress
+ * - Writes in the protected region.
+ * - Wrties c+2 in the counter to point out that a write in not in progress
+ *   and that the protected region has been changed.
+ *
+ * A reader:
+ * - Spins until the counter is even, waiting for the writer to finish.
+ * - Reads the protected region
+ * - Check that the counter has not changed.  If the counter has changed,
+ *   it means that a write operation has happened in the meantime and the
+ *   whole read sequence must be restarted.
+ *
+ *   The idea and the code have been taken from the cmap implementation.
+ */
+
+struct cntlock {
+    atomic_uint32_t counter;
+};
+
+static inline uint32_t
+read_counter__(const struct cntlock *s_)
+{
+    struct cntlock *s = CONST_CAST(struct cntlock *, s_);
+    uint32_t counter;
+
+    atomic_read_explicit(&s->counter, &counter, memory_order_acquire);
+
+    return counter;
+}
+
+static inline uint32_t
+cntlock_read_begin(const struct cntlock *s)
+{
+    uint32_t counter;
+
+    do {
+        counter = read_counter__(s);
+    } while (OVS_UNLIKELY(counter & 1));
+
+    return counter;
+}
+
+static inline bool
+cntlock_read_correct(const struct cntlock *s_, uint32_t c)
+{
+    struct cntlock *s = CONST_CAST(struct cntlock *, s_);
+    uint32_t counter;
+
+    /* Need to make sure the counter read is not moved up.  Using
+     * atomic_read_explicit with memory_order_acquire would allow prior reads
+     * to be moved after the barrier.  atomic_thread_fence prevents all
+     * following memory accesses from moving prior to preceding loads. */
+    atomic_thread_fence(memory_order_acquire);
+    atomic_read_relaxed(&s->counter, &counter);
+
+    return OVS_UNLIKELY(counter == c);
+}
+
+static inline uint32_t
+cntlock_write_begin(struct cntlock *s)
+{
+    uint32_t c;
+
+    atomic_read_explicit(&s->counter, &c, memory_order_acquire);
+    atomic_store_explicit(&s->counter, c + 1, memory_order_release);
+
+    return c;
+}
+
+static inline void
+cntlock_write_end(struct cntlock *s, uint32_t c)
+{
+    atomic_store_explicit(&s->counter, c + 2, memory_order_release);
+}
+#endif /* cntlock.h */
-- 
2.1.4




More information about the dev mailing list