[ovs-dev] [PATCH v2 3/9] lib: Introduce cntlock mechanism
Daniele Di Proietto
diproiettod at vmware.com
Wed Mar 4 12:06:42 UTC 2015
Factor out the counter lock from the cmap implementation.
A writer-prioritizing counter-based R/W spinlock is introduced with this
commit. The code is taken from the cmap implementation. It will be used
also in subsequent commits.
Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
---
lib/automake.mk | 1 +
lib/cmap.c | 80 +++++++------------------------
lib/cntlock.h | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 165 insertions(+), 62 deletions(-)
create mode 100644 lib/cntlock.h
diff --git a/lib/automake.mk b/lib/automake.mk
index 2acfe18..3f3b785 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -39,6 +39,7 @@ lib_libopenvswitch_la_SOURCES = \
lib/classifier-private.h \
lib/cmap.c \
lib/cmap.h \
+ lib/cntlock.h \
lib/command-line.c \
lib/command-line.h \
lib/compiler.h \
diff --git a/lib/cmap.c b/lib/cmap.c
index 8d11ae0..e480962 100644
--- a/lib/cmap.c
+++ b/lib/cmap.c
@@ -16,6 +16,7 @@
#include <config.h>
#include "cmap.h"
+#include "cntlock.h"
#include "coverage.h"
#include "bitmap.h"
#include "hash.h"
@@ -129,13 +130,7 @@ COVERAGE_DEFINE(cmap_shrink);
/* A cuckoo hash bucket. Designed to be cache-aligned and exactly one cache
* line long. */
struct cmap_bucket {
- /* Allows readers to track in-progress changes. Initially zero, each
- * writer increments this value just before and just after each change (see
- * cmap_set_bucket()). Thus, a reader can ensure that it gets a consistent
- * snapshot by waiting for the counter to become even (see
- * read_even_counter()), then checking that its value does not change while
- * examining the bucket (see cmap_find()). */
- atomic_uint32_t counter;
+ struct cntlock lock;
/* (hash, node) slots. They are parallel arrays instead of an array of
* structs to reduce the amount of space lost to padding.
@@ -270,46 +265,6 @@ cmap_is_empty(const struct cmap *cmap)
return cmap_count(cmap) == 0;
}
-static inline uint32_t
-read_counter(const struct cmap_bucket *bucket_)
-{
- struct cmap_bucket *bucket = CONST_CAST(struct cmap_bucket *, bucket_);
- uint32_t counter;
-
- atomic_read_explicit(&bucket->counter, &counter, memory_order_acquire);
-
- return counter;
-}
-
-static inline uint32_t
-read_even_counter(const struct cmap_bucket *bucket)
-{
- uint32_t counter;
-
- do {
- counter = read_counter(bucket);
- } while (OVS_UNLIKELY(counter & 1));
-
- return counter;
-}
-
-static inline bool
-counter_changed(const struct cmap_bucket *b_, uint32_t c)
-{
- struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_);
- uint32_t counter;
-
- /* Need to make sure the counter read is not moved up, before the hash and
- * cmap_node_next(). Using atomic_read_explicit with memory_order_acquire
- * would allow prior reads to be moved after the barrier.
- * atomic_thread_fence prevents all following memory accesses from moving
- * prior to preceding loads. */
- atomic_thread_fence(memory_order_acquire);
- atomic_read_relaxed(&b->counter, &counter);
-
- return OVS_UNLIKELY(counter != c);
-}
-
static inline const struct cmap_node *
cmap_find_in_bucket(const struct cmap_bucket *bucket, uint32_t hash)
{
@@ -330,20 +285,20 @@ cmap_find__(const struct cmap_bucket *b1, const struct cmap_bucket *b2,
do {
do {
- c1 = read_even_counter(b1);
+ c1 = cntlock_read_begin(&b1->lock);
node = cmap_find_in_bucket(b1, hash);
- } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+ } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
if (node) {
break;
}
do {
- c2 = read_even_counter(b2);
+ c2 = cntlock_read_begin(&b2->lock);
node = cmap_find_in_bucket(b2, hash);
- } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+ } while (OVS_UNLIKELY(!cntlock_read_correct(&b2->lock, c2)));
if (node) {
break;
}
- } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+ } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
return node;
}
@@ -402,9 +357,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
const struct cmap_node *node;
do {
- c1 = read_even_counter(b1);
+ c1 = cntlock_read_begin(&b1->lock);
node = cmap_find_in_bucket(b1, hashes[i]);
- } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+ } while (OVS_UNLIKELY(!cntlock_read_correct(&b1->lock, c1)));
if (!node) {
/* Not found (yet); Prefetch the 2nd bucket. */
@@ -425,9 +380,9 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
const struct cmap_node *node;
do {
- c2 = read_even_counter(b2);
+ c2 = cntlock_read_begin(&b2->lock);
node = cmap_find_in_bucket(b2, hashes[i]);
- } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+ } while (OVS_UNLIKELY(!cntlock_read_correct(&b2->lock, c2)));
if (!node) {
/* Not found, but the node may have been moved from b2 to b1 right
@@ -438,7 +393,7 @@ cmap_find_batch(const struct cmap *cmap, unsigned long map,
* need to loop as long as it takes to get stable readings of
* both buckets. cmap_find__() does that, and now that we have
* fetched both buckets we can just use it. */
- if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) {
+ if (OVS_UNLIKELY(!cntlock_read_correct(&b1s[i]->lock, c1s[i]))) {
node = cmap_find__(b1s[i], b2s[i], hashes[i]);
if (node) {
goto found;
@@ -519,11 +474,12 @@ cmap_set_bucket(struct cmap_bucket *b, int i,
{
uint32_t c;
- atomic_read_explicit(&b->counter, &c, memory_order_acquire);
- atomic_store_explicit(&b->counter, c + 1, memory_order_release);
- ovsrcu_set(&b->nodes[i].next, node); /* Also atomic. */
+ c = cntlock_write_begin(&b->lock);
+
+ ovsrcu_set(&b->nodes[i].next, node);
b->hashes[i] = hash;
- atomic_store_explicit(&b->counter, c + 2, memory_order_release);
+
+ cntlock_write_end(&b->lock, c);
}
/* Searches 'b' for a node with the given 'hash'. If it finds one, adds
@@ -570,7 +526,7 @@ cmap_insert_dup(struct cmap_node *new_node, uint32_t hash,
}
/* Change the bucket to point to 'new_node'. This is a degenerate
- * form of cmap_set_bucket() that doesn't update the counter since
+ * form of cmap_set_bucket() that doesn't use the cntlock since
* we're only touching one field and in a way that doesn't change
* the bucket's meaning for readers. */
ovsrcu_set(&b->nodes[i].next, new_node);
diff --git a/lib/cntlock.h b/lib/cntlock.h
new file mode 100644
index 0000000..bb16932
--- /dev/null
+++ b/lib/cntlock.h
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2015 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CNTLOCK_H
+#define CNTLOCK_H 1
+
+#include <stdbool.h>
+#include "ovs-atomic.h"
+
+/* Writer-prioritizing counter-based R/W spinlock
+ * ==============================================
+ *
+ * A single-writer, consistency mechanism that doesn't block the writers.
+ *
+ *
+ * Usage
+ * =====
+ *
+ * Writer:
+ * uint32_t c;
+ * c = cntlock_write_begin(&s);
+ * ...
+ * write to protected region
+ * ...
+ * cntlock_write_end(&s, c);
+ *
+ * Reader:
+ * uint32_t c;
+ * do {
+ * c = cntlock_read_begin(&s);
+ * ...
+ * read from protected region
+ * ...
+ * } while (!cntlock_read_correct(&s));
+ *
+ * A reader might have to retry if there's a write in progress.
+ * A writer will never be blocked.
+ *
+ * Thread-safety
+ * =============
+ *
+ * The implementation does not support multiple concurrent writers. If there
+ * may be more than one writer, an external form of mutual exclusion should be
+ * used.
+ *
+ * Another appropriate use case is to protect thread-local data from
+ * reader/writer concurrent access: if the data is written only by a single
+ * thread, external synchronization is unnecessary.
+ *
+ * Implementation
+ * ==============
+ *
+ * The mechanism is based on an atomic counter which is incremented
+ * (non-atomically) by the writer (twice per write) and checked by the
+ * readers.
+ *
+ * A writer:
+ * - Reads the counter (c). It will be an even number.
+ * - Writes c+1 in the counter to point out that a write is in progress
+ * - Writes in the protected region.
+ * - Wrties c+2 in the counter to point out that a write in not in progress
+ * and that the protected region has been changed.
+ *
+ * A reader:
+ * - Spins until the counter is even, waiting for the writer to finish.
+ * - Reads the protected region
+ * - Check that the counter has not changed. If the counter has changed,
+ * it means that a write operation has happened in the meantime and the
+ * whole read sequence must be restarted.
+ *
+ * The idea and the code have been taken from the cmap implementation.
+ */
+
+struct cntlock {
+ atomic_uint32_t counter;
+};
+
+static inline uint32_t
+read_counter__(const struct cntlock *s_)
+{
+ struct cntlock *s = CONST_CAST(struct cntlock *, s_);
+ uint32_t counter;
+
+ atomic_read_explicit(&s->counter, &counter, memory_order_acquire);
+
+ return counter;
+}
+
+static inline uint32_t
+cntlock_read_begin(const struct cntlock *s)
+{
+ uint32_t counter;
+
+ do {
+ counter = read_counter__(s);
+ } while (OVS_UNLIKELY(counter & 1));
+
+ return counter;
+}
+
+static inline bool
+cntlock_read_correct(const struct cntlock *s_, uint32_t c)
+{
+ struct cntlock *s = CONST_CAST(struct cntlock *, s_);
+ uint32_t counter;
+
+ /* Need to make sure the counter read is not moved up. Using
+ * atomic_read_explicit with memory_order_acquire would allow prior reads
+ * to be moved after the barrier. atomic_thread_fence prevents all
+ * following memory accesses from moving prior to preceding loads. */
+ atomic_thread_fence(memory_order_acquire);
+ atomic_read_relaxed(&s->counter, &counter);
+
+ return OVS_UNLIKELY(counter == c);
+}
+
+static inline uint32_t
+cntlock_write_begin(struct cntlock *s)
+{
+ uint32_t c;
+
+ atomic_read_explicit(&s->counter, &c, memory_order_acquire);
+ atomic_store_explicit(&s->counter, c + 1, memory_order_release);
+
+ return c;
+}
+
+static inline void
+cntlock_write_end(struct cntlock *s, uint32_t c)
+{
+ atomic_store_explicit(&s->counter, c + 2, memory_order_release);
+}
+#endif /* cntlock.h */
--
2.1.4
More information about the dev
mailing list