[ovs-dev] [seq 4/5] seq: New module for race-free, pollable, thread-safe sequence number.

Ethan Jackson ethan at nicira.com
Thu Aug 8 00:56:40 UTC 2013


I haven't read this code carefully yet, but in this patch you put the
OVS_EXCLUDED after the function return type instead of after the
parameters.  Does that work?  I had done it the other way.

Ethan

On Tue, Aug 6, 2013 at 4:56 PM, Ben Pfaff <blp at nicira.com> wrote:
> Signed-off-by: Ben Pfaff <blp at nicira.com>
> ---
>  lib/automake.mk |    2 +
>  lib/poll-loop.c |    3 +
>  lib/seq.c       |  255 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  lib/seq.h       |   89 +++++++++++++++++++
>  4 files changed, 349 insertions(+)
>  create mode 100644 lib/seq.c
>  create mode 100644 lib/seq.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index b46a868..f936897 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -159,6 +159,8 @@ lib_libopenvswitch_a_SOURCES = \
>         lib/reconnect.c \
>         lib/reconnect.h \
>         lib/sat-math.h \
> +       lib/seq.c \
> +       lib/seq.h \
>         lib/sha1.c \
>         lib/sha1.h \
>         lib/shash.c \
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 5f9b9cd..4eb1187 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -26,6 +26,7 @@
>  #include "fatal-signal.h"
>  #include "list.h"
>  #include "ovs-thread.h"
> +#include "seq.h"
>  #include "socket-util.h"
>  #include "timeval.h"
>  #include "vlog.h"
> @@ -248,6 +249,8 @@ poll_block(void)
>
>      /* Handle any pending signals before doing anything else. */
>      fatal_signal_run();
> +
> +    seq_woke();
>  }
>
>  static void
> diff --git a/lib/seq.c b/lib/seq.c
> new file mode 100644
> index 0000000..70975f2
> --- /dev/null
> +++ b/lib/seq.c
> @@ -0,0 +1,255 @@
> +/*
> + * Copyright (c) 2013 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "seq.h"
> +
> +#include <stdbool.h>
> +
> +#include "hash.h"
> +#include "hmap.h"
> +#include "latch.h"
> +#include "list.h"
> +#include "ovs-thread.h"
> +#include "poll-loop.h"
> +
> +/* A sequence number object. */
> +struct seq {
> +    uint64_t value OVS_GUARDED;
> +    struct hmap waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
> +};
> +
> +/* A thread waiting on a particular seq. */
> +struct seq_waiter {
> +    struct seq *seq OVS_GUARDED;            /* Seq being waited for. */
> +    struct hmap_node hmap_node OVS_GUARDED; /* In 'seq->waiters'. */
> +    unsigned int ovsthread_id OVS_GUARDED;  /* Key in 'waiters' hmap. */
> +
> +    struct seq_thread *thread OVS_GUARDED; /* Thread preparing to wait. */
> +    struct list list_node OVS_GUARDED;     /* In 'thread->waiters'. */
> +
> +    uint64_t value OVS_GUARDED; /* seq->value we're waiting to change. */
> +};
> +
> +/* A thread that might be waiting on one or more seqs. */
> +struct seq_thread {
> +    struct list waiters OVS_GUARDED; /* Contains 'struct seq_waiter's. */
> +    struct latch latch OVS_GUARDED;  /* Wakeup latch for this thread. */
> +    bool waiting OVS_GUARDED;        /* True if latch_wait() already called. */
> +};
> +
> +static struct ovs_mutex seq_mutex = OVS_ADAPTIVE_MUTEX_INITIALIZER;
> +
> +static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
> +
> +static pthread_key_t seq_thread_key;
> +
> +static void seq_init(void);
> +static struct seq_thread *seq_thread_get(void) OVS_REQUIRES(seq_mutex);
> +static void seq_thread_exit(void *thread_) OVS_EXCLUDED(seq_mutex);
> +static void seq_thread_woke(struct seq_thread *) OVS_REQUIRES(seq_mutex);
> +static void seq_waiter_destroy(struct seq_waiter *) OVS_REQUIRES(seq_mutex);
> +static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
> +
> +/* Creates and returns a new 'seq' object. */
> +struct seq * OVS_EXCLUDED(seq_mutex)
> +seq_create(void)
> +{
> +    struct seq *seq;
> +
> +    seq_init();
> +
> +    seq = xmalloc(sizeof *seq);
> +    ovs_mutex_lock(&seq_mutex);
> +    seq->value = seq_next++;
> +    hmap_init(&seq->waiters);
> +    ovs_mutex_unlock(&seq_mutex);
> +
> +    return seq;
> +}
> +
> +/* Destroys 'seq', waking up threads that were waiting on it, if any. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_destroy(struct seq *seq)
> +{
> +    ovs_mutex_lock(&seq_mutex);
> +    seq_wake_waiters(seq);
> +    hmap_destroy(&seq->waiters);
> +    free(seq);
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Increments 'seq''s sequence number, waking up any threads that are waiting
> + * on 'seq'. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_change(struct seq *seq)
> +{
> +    ovs_mutex_lock(&seq_mutex);
> +    seq->value = seq_next++;
> +    seq_wake_waiters(seq);
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Returns 'seq''s current sequence number (which could change immediately). */
> +uint64_t OVS_EXCLUDED(seq_mutex)
> +seq_read(const struct seq *seq)
> +{
> +    uint64_t value;
> +
> +    ovs_mutex_lock(&seq_mutex);
> +    value = seq->value;
> +    ovs_mutex_unlock(&seq_mutex);
> +
> +    return value;
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_wait__(struct seq *seq, uint64_t value)
> +{
> +    unsigned int id = ovsthread_id_self();
> +    uint32_t hash = hash_int(id, 0);
> +    struct seq_waiter *waiter;
> +
> +    HMAP_FOR_EACH_IN_BUCKET (waiter, hmap_node, hash, &seq->waiters) {
> +        if (waiter->ovsthread_id == id) {
> +            if (waiter->value != value) {
> +                /* The current value is different from the value we've already
> +                 * waited for, */
> +                poll_immediate_wake();
> +            } else {
> +                /* Already waiting on 'value', nothing more to do. */
> +            }
> +            return;
> +        }
> +    }
> +
> +    waiter = xmalloc(sizeof *waiter);
> +    waiter->seq = seq;
> +    hmap_insert(&seq->waiters, &waiter->hmap_node, hash);
> +    waiter->value = value;
> +    waiter->thread = seq_thread_get();
> +    list_push_back(&waiter->thread->waiters, &waiter->list_node);
> +
> +    if (!waiter->thread->waiting) {
> +        latch_wait(&waiter->thread->latch);
> +        waiter->thread->waiting = true;
> +    }
> +}
> +
> +/* Causes the following poll_block() to wake up when 'seq''s sequence number
> + * changes from 'value'.  (If 'seq''s sequence number isn't 'value', then
> + * poll_block() won't block at all.) */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_wait(const struct seq *seq_, uint64_t value)
> +{
> +    struct seq *seq = CONST_CAST(struct seq *, seq_);
> +
> +    ovs_mutex_lock(&seq_mutex);
> +    if (value == seq->value) {
> +        seq_wait__(seq, value);
> +    } else {
> +        poll_immediate_wake();
> +    }
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +/* Called by poll_block() just before it returns, this function destroys any
> + * seq_waiter objects associated with the current thread. */
> +void OVS_EXCLUDED(seq_mutex)
> +seq_woke(void)
> +{
> +    struct seq_thread *thread;
> +
> +    seq_init();
> +
> +    thread = pthread_getspecific(seq_thread_key);
> +    if (thread) {
> +        ovs_mutex_lock(&seq_mutex);
> +        seq_thread_woke(thread);
> +        thread->waiting = false;
> +        ovs_mutex_unlock(&seq_mutex);
> +    }
> +}
> +
> +static void
> +seq_init(void)
> +{
> +    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> +    if (ovsthread_once_start(&once)) {
> +        xpthread_key_create(&seq_thread_key, seq_thread_exit);
> +        ovsthread_once_done(&once);
> +    }
> +}
> +
> +static struct seq_thread * OVS_REQUIRES(seq_mutex)
> +seq_thread_get(void)
> +{
> +    struct seq_thread *thread = pthread_getspecific(seq_thread_key);
> +    if (!thread) {
> +        thread = xmalloc(sizeof *thread);
> +        list_init(&thread->waiters);
> +        latch_init(&thread->latch);
> +        thread->waiting = false;
> +
> +        xpthread_setspecific(seq_thread_key, thread);
> +    }
> +    return thread;
> +}
> +
> +static void OVS_EXCLUDED(seq_mutex)
> +seq_thread_exit(void *thread_)
> +{
> +    struct seq_thread *thread = thread_;
> +
> +    ovs_mutex_lock(&seq_mutex);
> +    seq_thread_woke(thread);
> +    latch_destroy(&thread->latch);
> +    free(thread);
> +    ovs_mutex_unlock(&seq_mutex);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_thread_woke(struct seq_thread *thread)
> +{
> +    struct seq_waiter *waiter, *next_waiter;
> +
> +    LIST_FOR_EACH_SAFE (waiter, next_waiter, list_node, &thread->waiters) {
> +        ovs_assert(waiter->thread == thread);
> +        seq_waiter_destroy(waiter);
> +    }
> +    latch_poll(&thread->latch);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_waiter_destroy(struct seq_waiter *waiter)
> +{
> +    hmap_remove(&waiter->seq->waiters, &waiter->hmap_node);
> +    list_remove(&waiter->list_node);
> +    free(waiter);
> +}
> +
> +static void OVS_REQUIRES(seq_mutex)
> +seq_wake_waiters(struct seq *seq)
> +{
> +    struct seq_waiter *waiter, *next_waiter;
> +
> +    HMAP_FOR_EACH_SAFE (waiter, next_waiter, hmap_node, &seq->waiters) {
> +        latch_set(&waiter->thread->latch);
> +        seq_waiter_destroy(waiter);
> +    }
> +}
> diff --git a/lib/seq.h b/lib/seq.h
> new file mode 100644
> index 0000000..3423e21
> --- /dev/null
> +++ b/lib/seq.h
> @@ -0,0 +1,89 @@
> +/*
> + * Copyright (c) 2013 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef SEQ_H
> +#define SEQ_H 1
> +
> +/* Thread-safe, pollable sequence number.
> + *
> + *
> + * Background
> + * ==========
> + *
> + * It is sometimes desirable to take an action whenever an object changes.
> + * Suppose we associate a sequence number with an object and increment the
> + * sequence number whenver we change the object.  An observer can then record
> + * the sequence number it sees.  Later on, if the current sequence number
> + * differs from the one it saw last, then the observer knows to examine the
> + * object for changes.
> + *
> + * Code that wants to run when a sequence number changes is challenging to
> + * implement in a multithreaded environment.  A naive implementation, that
> + * simply checks whether the sequence number changed and, if so, calls
> + * poll_immediate_wake(), will fail when another thread increments the sequence
> + * number after the check (including during poll_block()).
> + *
> + * struct seq is a solution.  It implements a sequence number along with enough
> + * internal infrastructure so that a thread waiting on a particular value will
> + * wake up if the sequence number changes, or even if the "struct seq" is
> + * destroyed.
> + *
> + *
> + * Usage
> + * =====
> + *
> + * The object that includes a sequence number should use seq_create() and
> + * seq_destroy() at creation and destruction, and seq_change() whenever the
> + * object's observable state changes.
> + *
> + * An observer may seq_read() to read the current sequence number and
> + * seq_wait() to cause poll_block() to wake up when the sequence number changes
> + * from a specified value.
> + *
> + * To avoid races, observers should use seq_read() to check for changes,
> + * process any changes, and then use seq_wait() to wait for a change from the
> + * previously read value.  That is, a correct usage looks something like this:
> + *
> + *    new_seq = seq_read(seq);
> + *    if (new_seq != last_seq) {
> + *        ...process changes...
> + *        last_seq = new_seq;
> + *    }
> + *    seq_wait(seq, new_seq);
> + *    poll_block();
> + *
> + *
> + * Thread-safety
> + * =============
> + *
> + * Fully thread safe.
> + */
> +
> +#include <stdint.h>
> +
> +/* For implementation of an object with a sequence number attached. */
> +struct seq *seq_create(void);
> +void seq_destroy(struct seq *);
> +void seq_change(struct seq *);
> +
> +/* For observers. */
> +uint64_t seq_read(const struct seq *);
> +void seq_wait(const struct seq *, uint64_t value);
> +
> +/* For poll_block() internal use. */
> +void seq_woke(void);
> +
> +#endif /* seq.h */
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
X-CudaMail-Whitelist-To: dev at openvswitch.org



More information about the dev mailing list