[ovs-dev] [RFC PATCH 13/26] llring: Add lockless MPMC bounded queue structure
Gaetan Rivet
grive at u256.net
Sat Dec 5 14:22:08 UTC 2020
Add a lockless multi-producer/multi-consumer, array-based,
non-intrusive, bounded queue that will fail on overflow.
Each operation (enqueue, dequeue) uses a CAS(). As such, both producer
and consumer sides guarantee lock-free forward progress. If the queue
is full, enqueuing will fail. Conversely, if the queue is empty,
dequeueing will fail.
The bound of the queue are restricted to power-of-twos, to allow simpler
overflow on unsigned position markers.
Signed-off-by: Gaetan Rivet <grive at u256.net>
---
lib/automake.mk | 2 +
lib/llring.c | 153 ++++++++++++++++++++++++++++++++++++++++++++++++
lib/llring.h | 76 ++++++++++++++++++++++++
3 files changed, 231 insertions(+)
create mode 100644 lib/llring.c
create mode 100644 lib/llring.h
diff --git a/lib/automake.mk b/lib/automake.mk
index 3012d4700..c67c01779 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -156,6 +156,8 @@ lib_libopenvswitch_la_SOURCES = \
lib/learn.h \
lib/learning-switch.c \
lib/learning-switch.h \
+ lib/llring.c \
+ lib/llring.h \
lib/lockfile.c \
lib/lockfile.h \
lib/mac-learning.c \
diff --git a/lib/llring.c b/lib/llring.c
new file mode 100644
index 000000000..66fb22a1b
--- /dev/null
+++ b/lib/llring.c
@@ -0,0 +1,153 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "ovs-atomic.h"
+
+#include "llring.h"
+
+/* A queue element.
+ * Calling 'llring_create' will allocate an array of such elements,
+ * that will hold the inserted data.
+ */
+struct llring_node {
+ atomic_uint32_t seq;
+ uint32_t data;
+};
+
+/* A ring description.
+ * The head and tail of the ring are padded to avoid false-sharing,
+ * which improves slightly multi-thread performance, at the cost
+ * of some memory.
+ */
+struct llring {
+ PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t head;);
+ PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t tail;);
+ uint32_t mask;
+ struct llring_node nodes[0];
+};
+
+struct llring *
+llring_create(uint32_t size)
+{
+ struct llring *r;
+ uint32_t i;
+
+ if (size < 2 || !IS_POW2(size)) {
+ return NULL;
+ }
+
+ r = xmalloc(sizeof *r + size * sizeof r->nodes[0]);
+
+ r->mask = size - 1;
+ for (i = 0; i < size; i++) {
+ atomic_store_relaxed(&r->nodes[i].seq, i);
+ }
+ atomic_store_relaxed(&r->head, 0);
+ atomic_store_relaxed(&r->tail, 0);
+
+ return r;
+}
+
+void
+llring_destroy(struct llring *r)
+{
+ free(r);
+}
+
+bool
+llring_enqueue(struct llring *r, uint32_t data)
+{
+ struct llring_node *node;
+ uint32_t pos;
+
+ atomic_read_relaxed(&r->head, &pos);
+ while (true) {
+ int64_t diff;
+ uint32_t seq;
+
+ node = &r->nodes[pos & r->mask];
+ atomic_read_explicit(&node->seq, &seq, memory_order_acquire);
+ diff = (int64_t)seq - (int64_t)pos;
+
+ if (diff < 0) {
+ /* Current ring[head].seq is from previous ring generation,
+ * ring is full and enqueue fails. */
+ return false;
+ }
+
+ if (diff == 0) {
+ /* If head == ring[head].seq, then the slot is free,
+ * attempt to take it by moving the head, if no one moved it since.
+ */
+ if (atomic_compare_exchange_weak_explicit(&r->head, &pos, pos + 1,
+ memory_order_relaxed,
+ memory_order_relaxed)) {
+ break;
+ }
+ } else {
+ /* Someone changed the head since last read, retry. */
+ atomic_read_relaxed(&r->head, &pos);
+ }
+ }
+
+ node->data = data;
+ atomic_store_explicit(&node->seq, pos + 1, memory_order_release);
+ return true;
+}
+
+bool
+llring_dequeue(struct llring *r, uint32_t *data)
+{
+ struct llring_node *node;
+ uint32_t pos;
+
+ atomic_read_relaxed(&r->tail, &pos);
+ while (true) {
+ int64_t diff;
+ uint32_t seq;
+
+ node = &r->nodes[pos & r->mask];
+ atomic_read_explicit(&node->seq, &seq, memory_order_acquire);
+ diff = (int64_t)seq - (int64_t)(pos + 1);
+
+ if (diff < 0) {
+ /* Current ring[tail + 1].seq is from previous ring generation,
+ * ring is empty and dequeue fails. */
+ return false;
+ }
+
+ if (diff == 0) {
+ /* If tail + 1 == ring[tail + 1].seq, then the slot is allocated,
+ * attempt to free it by moving the tail, if no one moved it since.
+ */
+ if (atomic_compare_exchange_weak_explicit(&r->tail, &pos, pos + 1,
+ memory_order_relaxed,
+ memory_order_relaxed)) {
+ break;
+ }
+ } else {
+ /* Someone changed the tail since last read, retry. */
+ atomic_read_relaxed(&r->tail, &pos);
+ }
+ }
+
+ *data = node->data;
+ /* Advance the slot to next gen by adding r->mask + 1 to its sequence. */
+ atomic_store_explicit(&node->seq, pos + r->mask + 1, memory_order_release);
+ return true;
+}
diff --git a/lib/llring.h b/lib/llring.h
new file mode 100644
index 000000000..f97baa343
--- /dev/null
+++ b/lib/llring.h
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2020 NVIDIA Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdint.h>
+#include <stdbool.h>
+
+#include "ovs-atomic.h"
+
+/* Bounded lockless queue
+ * ======================
+ *
+ * A lockless FIFO queue bounded to a known size.
+ * Each operation (insert, remove) uses one CAS().
+ *
+ * The structure is:
+ *
+ * Multi-producer: multiple threads can write to it
+ * concurrently.
+ *
+ * Multi-consumer: multiple threads can read from it
+ * concurrently.
+ *
+ * Bounded: the queue is backed by external memory.
+ * No new allocation is made on insertion, only the
+ * used elements in the queue are marked as such.
+ * The boundary of the queue is defined as the size given
+ * at init, which must be a power of two.
+ *
+ * Failing: when an operation (enqueue, dequeue) cannot
+ * be performed due to the queue being full/empty, the
+ * operation immediately fails, instead of waiting on
+ * a state change.
+ *
+ * Non-intrusive: queue elements are allocated prior to
+ * initialization. Data is shallow-copied to those
+ * allocated elements.
+ *
+ * Thread safety
+ * =============
+ *
+ * The queue is thread-safe for MPMC case.
+ * No lock is taken by the queue. The queue guarantees
+ * lock-free forward progress for each of its operations.
+ *
+ */
+
+/* Create a circular lockless ring.
+ * The 'size' parameter must be a power-of-two higher than 2,
+ * otherwise allocation will fail.
+ */
+struct llring;
+struct llring *llring_create(uint32_t size);
+
+/* Free a lockless ring. */
+void llring_destroy(struct llring *r);
+
+/* 'data' is copied to the latest free slot in the queue. */
+bool llring_enqueue(struct llring *r, uint32_t data);
+
+/* The value within the oldest slot taken in the queue is copied
+ * to the address pointed by 'data'.
+ */
+bool llring_dequeue(struct llring *r, uint32_t *data);
--
2.29.2
More information about the dev
mailing list