[ovs-dev] [PATCH 3/6] Make ByteQ safe for simultaneous producer/consumer

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Mon Jul 6 08:20:11 UTC 2020


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

A ByteQ with unlocked head and tail is unsafe for simultaneous
consume/produce.

If simultaneous use is desired, these either need to be locked
or there needs to be a third atomic or lock guarded variable
"used".

An atomic "used" allows the producer to enqueue safely because
it "owns" the head and even if the consumer changes the head
it will only increase the space available versus the value in
"used".

Once the data has been written and the enqueued should be
made visible it fenced and the used is updated.

Similar for "consumer" - it can safely consume now as it
"owns" tail and never reads beyond tail + used (wrapped
around as needed).

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 lib/byteq.c | 17 ++++++++++++++++-
 lib/byteq.h |  7 ++++++-
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/lib/byteq.c b/lib/byteq.c
index 3f865cf9e..da40c2530 100644
--- a/lib/byteq.c
+++ b/lib/byteq.c
@@ -19,6 +19,7 @@
 #include <string.h>
 #include <unistd.h>
 #include "util.h"
+#include "ovs-atomic.h"
 
 /* Initializes 'q' as an empty byteq that uses the 'size' bytes of 'buffer' to
  * store data.  'size' must be a power of 2.
@@ -32,13 +33,16 @@ byteq_init(struct byteq *q, uint8_t *buffer, size_t size)
     q->buffer = buffer;
     q->size = size;
     q->head = q->tail = 0;
+    q->used = ATOMIC_VAR_INIT(0);
 }
 
 /* Returns the number of bytes current queued in 'q'. */
 int
 byteq_used(const struct byteq *q)
 {
-    return q->head - q->tail;
+    int retval;
+    atomic_read_relaxed(&q->used, &retval);
+    return retval;
 }
 
 /* Returns the number of bytes that can be added to 'q' without overflow. */
@@ -68,9 +72,11 @@ byteq_is_full(const struct byteq *q)
 void
 byteq_put(struct byteq *q, uint8_t c)
 {
+    int discard;
     ovs_assert(!byteq_is_full(q));
     *byteq_head(q) = c;
     q->head++;
+    atomic_add(&q->used, 1, &discard);
 }
 
 /* Adds the 'n' bytes in 'p' at the head of 'q', which must have at least 'n'
@@ -79,6 +85,7 @@ void
 byteq_putn(struct byteq *q, const void *p_, size_t n)
 {
     const uint8_t *p = p_;
+    int discard;
     ovs_assert(byteq_avail(q) >= n);
     while (n > 0) {
         size_t chunk = MIN(n, byteq_headroom(q));
@@ -86,6 +93,7 @@ byteq_putn(struct byteq *q, const void *p_, size_t n)
         byteq_advance_head(q, chunk);
         p += chunk;
         n -= chunk;
+        atomic_add(&q->used, chunk, &discard);
     }
 }
 
@@ -103,9 +111,11 @@ uint8_t
 byteq_get(struct byteq *q)
 {
     uint8_t c;
+    int discard;
     ovs_assert(!byteq_is_empty(q));
     c = *byteq_tail(q);
     q->tail++;
+    atomic_sub(&q->used, 1, &discard);
     return c;
 }
 
@@ -168,8 +178,10 @@ byteq_tail(const struct byteq *q)
 void
 byteq_advance_tail(struct byteq *q, unsigned int n)
 {
+    int discard;
     ovs_assert(byteq_tailroom(q) >= n);
     q->tail += n;
+    atomic_sub_relaxed(&q->used, n, &discard);
 }
 
 /* Returns the byte after the last in-use byte of 'q', the point at which new
@@ -195,6 +207,9 @@ byteq_headroom(const struct byteq *q)
 void
 byteq_advance_head(struct byteq *q, unsigned int n)
 {
+    int discard;
     ovs_assert(byteq_headroom(q) >= n);
     q->head += n;
+    atomic_thread_fence(memory_order_release);
+    atomic_add_relaxed(&q->used, n, &discard);
 }
diff --git a/lib/byteq.h b/lib/byteq.h
index d73e3684e..5078dd7a4 100644
--- a/lib/byteq.h
+++ b/lib/byteq.h
@@ -19,13 +19,18 @@
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
+#include "ovs-atomic.h"
 
-/* General-purpose circular queue of bytes. */
+/* General-purpose circular queue of bytes.
+ * Thread safe for simultaneous use by a SINGLE producer and SINGLE
+ * consumer (1:1).
+ */
 struct byteq {
     uint8_t *buffer;            /* Circular queue. */
     unsigned int size;          /* Number of bytes allocated for 'buffer'. */
     unsigned int head;          /* Head of queue. */
     unsigned int tail;          /* Chases the head. */
+    atomic_int used;
 };
 
 void byteq_init(struct byteq *, uint8_t *buffer, size_t size);
-- 
2.20.1



More information about the dev mailing list