[ovs-dev] [seq 4/5] seq: New module for race-free, pollable, thread-safe sequence number.
Ethan Jackson
ethan at nicira.com
Thu Aug 8 00:56:40 UTC 2013
I haven't read this code carefully yet, but in this patch you put the
OVS_EXCLUDED after the function return type instead of after the
parameters. Does that work? I had done it the other way.
Ethan
On Tue, Aug 6, 2013 at 4:56 PM, Ben Pfaff <blp at nicira.com> wrote:
> Signed-off-by: Ben Pfaff <blp at nicira.com>
> ---
> lib/automake.mk | 2 +
> lib/poll-loop.c | 3 +
> lib/seq.c | 255 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
> lib/seq.h | 89 +++++++++++++++++++
> 4 files changed, 349 insertions(+)
> create mode 100644 lib/seq.c
> create mode 100644 lib/seq.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index b46a868..f936897 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -159,6 +159,8 @@ lib_libopenvswitch_a_SOURCES = \
> lib/reconnect.c \
> lib/reconnect.h \
> lib/sat-math.h \
> + lib/seq.c \
> + lib/seq.h \
> lib/sha1.c \
> lib/sha1.h \
> lib/shash.c \
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 5f9b9cd..4eb1187 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -26,6 +26,7 @@
> #include "fatal-signal.h"
> #include "list.h"
> #include "ovs-thread.h"
> +#include "seq.h"
> #include "socket-util.h"
> #include "timeval.h"
> #include "vlog.h"
> @@ -248,6 +249,8 @@ poll_block(void)
>
> /* Handle any pending signals before doing anything else. */
> fatal_signal_run();
> +
> + seq_woke();
> }
>
> static void
> diff --git a/lib/seq.c b/lib/seq.c
> new file mode 100644
> index 0000000..70975f2
> --- /dev/null
> +++ b/lib/seq.c
> @@ -0,0 +1,255 @@
> +/*
> + * Copyright (c) 2013 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "seq.h"
> +
> +#include <stdbool.h>
> +
> +#include "hash.h"
> +#include "hmap.h"
> +#include "latch.h"
> +#include "list.h"
> +#include "ovs-thread.h"
> +#include "poll-loop.h"
> +
> +/* A sequence number object. */
> +struct seq {
> + uint64_t value OVS_GUARDED;
> + struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
> +};
> +
> +/* A thread waiting on a particular seq. */
> +struct seq_waiter {
> + struct seq *seq OVS_GUARDED; /* Seq being waited for. */
> + struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */
> + unsigned int ovsthread_id OVS_GUARDED; /* Key in 'waiters' hmap. */
> +
> + struct seq_thread *thread OVS_GUARDED; /* Thread preparing to wait. */
> + struct list list_node OVS_GUARDED; /* In 'thread->waiters'. */
> +
> + uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */
> +};
> +
> +/* A thread that might be waiting on one or more seqs. */
> +struct seq_thread {
> + struct list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
> + struct latch latch OVS_GUARDED; /* Wakeup latch for this thread. */
> + bool waiting OVS_GUARDED; /* True if latch_wait() already called. */
> +};
> +
> +static struct ovs_mutex seq_mutex = OVS_ADAPTIVE_MUTEX_INITIALIZER;
> +
> +static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
> +
> +static pthread_key_t seq_thread_key;
> +
> +static void seq_init(void);
> +static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex);
> +static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex);
> +static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex);
> +static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
> +static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
> +
> +/* Creates and returns a new 'seq' object. */
> +struct seq * OVS_EXCLUDED(seq_mutex)
> +seq_create(void)
> +{
> + struct seq *seq;
> +
> + seq_init();
> +
> + seq = xmalloc(sizeof *seq);
> + ovs_mutex_lock(&seq_mutex);
> + seq->value = seq_next++;
> + hmap_init(&seq->waiters);
> + ovs_mutex_unlock(&seq_mutex);
> +
> + return seq;
> +}
> +
> +/* Destroys 'seq', waking up threads that were waiting on it, if any. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_destroy(struct seq *seq)
> +{
> + ovs_mutex_lock(&seq_mutex);
> + seq_wake_waiters(seq);
> + hmap_destroy(&seq->waiters);
> + free(seq);
> + ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Increments 'seq''s sequence number, waking up any threads that are waiting
> + * on 'seq'. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_change(struct seq *seq)
> +{
> + ovs_mutex_lock(&seq_mutex);
> + seq->value = seq_next++;
> + seq_wake_waiters(seq);
> + ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Returns 'seq''s current sequence number (which could change immediately). */
> +uint64_t OVS_EXCLUDED(seq_mutex)
> +seq_read(const struct seq *seq)
> +{
> + uint64_t value;
> +
> + ovs_mutex_lock(&seq_mutex);
> + value = seq->value;
> + ovs_mutex_unlock(&seq_mutex);
> +
> + return value;
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_wait__(struct seq *seq, uint64_t value)
> +{
> + unsigned int id = ovsthread_id_self();
> + uint32_t hash = hash_int(id, 0);
> + struct seq_waiter *waiter;
> +
> + HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) {
> + if (waiter->ovsthread_id == id) {
> + if (waiter->value != value) {
> + /* The current value is different from the value we've already
> + * waited for, */
> + poll_immediate_wake();
> + } else {
> + /* Already waiting on 'value', nothing more to do. */
> + }
> + return;
> + }
> + }
> +
> + waiter = xmalloc(sizeof *waiter);
> + waiter->seq = seq;
> + hmap_insert(&seq->waiters, &waiter->hmap_node, hash);
> + waiter->value = value;
> + waiter->thread = seq_thread_get();
> + list_push_back(&waiter->thread->waiters, &waiter->list_node);
> +
> + if (!waiter->thread->waiting) {
> + latch_wait(&waiter->thread->latch);
> + waiter->thread->waiting = true;
> + }
> +}
> +
> +/* Causes the following poll_block() to wake up when 'seq''s sequence number
> + * changes from 'value'. (If 'seq''s sequence number isn't 'value', then
> + * poll_block() won't block at all.) */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_wait(const struct seq *seq_, uint64_t value)
> +{
> + struct seq *seq = CONST_CAST(struct seq *, seq_);
> +
> + ovs_mutex_lock(&seq_mutex);
> + if (value == seq->value) {
> + seq_wait__(seq, value);
> + } else {
> + poll_immediate_wake();
> + }
> + ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Called by poll_block() just before it returns, this function destroys any
> + * seq_waiter objects associated with the current thread. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_woke(void)
> +{
> + struct seq_thread *thread;
> +
> + seq_init();
> +
> + thread = pthread_getspecific(seq_thread_key);
> + if (thread) {
> + ovs_mutex_lock(&seq_mutex);
> + seq_thread_woke(thread);
> + thread->waiting = false;
> + ovs_mutex_unlock(&seq_mutex);
> + }
> +}
> +
> +static void
> +seq_init(void)
> +{
> + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> + if (ovsthread_once_start(&once)) {
> + xpthread_key_create(&seq_thread_key, seq_thread_exit);
> + ovsthread_once_done(&once);
> + }
> +}
> +
> +static struct seq_thread * OVS_REQUIRES(seq_mutex)
> +seq_thread_get(void)
> +{
> + struct seq_thread *thread = pthread_getspecific(seq_thread_key);
> + if (!thread) {
> + thread = xmalloc(sizeof *thread);
> + list_init(&thread->waiters);
> + latch_init(&thread->latch);
> + thread->waiting = false;
> +
> + xpthread_setspecific(seq_thread_key, thread);
> + }
> + return thread;
> +}
> +
> +static void OVS_EXCLUDED(seq_mutex)
> +seq_thread_exit(void *thread_)
> +{
> + struct seq_thread *thread = thread_;
> +
> + ovs_mutex_lock(&seq_mutex);
> + seq_thread_woke(thread);
> + latch_destroy(&thread->latch);
> + free(thread);
> + ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_thread_woke(struct seq_thread *thread)
> +{
> + struct seq_waiter *waiter, *next_waiter;
> +
> + LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) {
> + ovs_assert(waiter->thread == thread);
> + seq_waiter_destroy(waiter);
> + }
> + latch_poll(&thread->latch);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_waiter_destroy(struct seq_waiter *waiter)
> +{
> + hmap_remove(&waiter->seq->waiters, &waiter->hmap_node);
> + list_remove(&waiter->list_node);
> + free(waiter);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_wake_waiters(struct seq *seq)
> +{
> + struct seq_waiter *waiter, *next_waiter;
> +
> + HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) {
> + latch_set(&waiter->thread->latch);
> + seq_waiter_destroy(waiter);
> + }
> +}
> diff --git a/lib/seq.h b/lib/seq.h
> new file mode 100644
> index 0000000..3423e21
> --- /dev/null
> +++ b/lib/seq.h
> @@ -0,0 +1,89 @@
> +/*
> + * Copyright (c) 2013 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef SEQ_H
> +#define SEQ_H 1
> +
> +/* Thread-safe, pollable sequence number.
> + *
> + *
> + * Background
> + * ==========
> + *
> + * It is sometimes desirable to take an action whenever an object changes.
> + * Suppose we associate a sequence number with an object and increment the
> + * sequence number whenver we change the object. An observer can then record
> + * the sequence number it sees. Later on, if the current sequence number
> + * differs from the one it saw last, then the observer knows to examine the
> + * object for changes.
> + *
> + * Code that wants to run when a sequence number changes is challenging to
> + * implement in a multithreaded environment. A naive implementation, that
> + * simply checks whether the sequence number changed and, if so, calls
> + * poll_immediate_wake(), will fail when another thread increments the sequence
> + * number after the check (including during poll_block()).
> + *
> + * struct seq is a solution. It implements a sequence number along with enough
> + * internal infrastructure so that a thread waiting on a particular value will
> + * wake up if the sequence number changes, or even if the "struct seq" is
> + * destroyed.
> + *
> + *
> + * Usage
> + * =====
> + *
> + * The object that includes a sequence number should use seq_create() and
> + * seq_destroy() at creation and destruction, and seq_change() whenever the
> + * object's observable state changes.
> + *
> + * An observer may seq_read() to read the current sequence number and
> + * seq_wait() to cause poll_block() to wake up when the sequence number changes
> + * from a specified value.
> + *
> + * To avoid races, observers should use seq_read() to check for changes,
> + * process any changes, and then use seq_wait() to wait for a change from the
> + * previously read value. That is, a correct usage looks something like this:
> + *
> + * new_seq = seq_read(seq);
> + * if (new_seq != last_seq) {
> + * ...process changes...
> + * last_seq = new_seq;
> + * }
> + * seq_wait(seq, new_seq);
> + * poll_block();
> + *
> + *
> + * Thread-safety
> + * =============
> + *
> + * Fully thread safe.
> + */
> +
> +#include <stdint.h>
> +
> +/* For implementation of an object with a sequence number attached. */
> +struct seq *seq_create(void);
> +void seq_destroy(struct seq *);
> +void seq_change(struct seq *);
> +
> +/* For observers. */
> +uint64_t seq_read(const struct seq *);
> +void seq_wait(const struct seq *, uint64_t value);
> +
> +/* For poll_block() internal use. */
> +void seq_woke(void);
> +
> +#endif /* seq.h */
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
X-CudaMail-Whitelist-To: dev at openvswitch.org
More information about the dev
mailing list