[ovs-dev] [PATCH 01/15] log: Add async commit support.
Yifeng Sun
pkusunyifeng at gmail.com
Thu Jan 11 22:45:31 UTC 2018
After further study, I understand the "seq" code and think it is
well-designed.
Thank you for this patch and the previous reply.
This patch looks good to me.
Reviewed-by: Yifeng Sun <pkusunyifeng at gmail.com>
On Wed, Jan 10, 2018 at 4:21 PM, Ben Pfaff <blp at ovn.org> wrote:
> Thanks for the review.
>
> The "seq" code introduces kind of a weird concept. I might have
> invented it, not sure, but it's probably some kind of distortion of a
> more standard idea. I'd like it to be easy for people to understand.
> Are there any questions that I could help to answer? Maybe I could
> answer them directly, or maybe I could update the documentation in seq.h
> (which tries to be comprehensive but maybe it isn't?).
>
> On Wed, Jan 10, 2018 at 04:04:53PM -0800, Yifeng Sun wrote:
> > I read through lib/seq.c to learn how this patch works.
> > Looks good to me, but I feel not very confident.
> >
> > Reviewed-by: Yifeng Sun <pkusunyifeng at gmail.com>
> >
> > On Sun, Dec 31, 2017 at 9:16 PM, Ben Pfaff <blp at ovn.org> wrote:
> >
> > > The OVSDB log code has always had the ability to commit the log to
> disk and
> > > wait for the commit to finish. This patch introduces a new feature
> that
> > > allows the client to start a commit in the background and then to
> determine
> > > asynchronously that the commit has completed. This will be especially
> > > useful later for the distributed database feature.
> > >
> > > Signed-off-by: Ben Pfaff <blp at ovn.org>
> > > ---
> > > ovsdb/file.c | 4 +-
> > > ovsdb/log.c | 152 ++++++++++++++++++++++++++++++
> > > +++++++++++++++++++++--
> > > ovsdb/log.h | 7 ++-
> > > ovsdb/ovsdb-tool.c | 2 +-
> > > tests/test-ovsdb.c | 2 +-
> > > 5 files changed, 158 insertions(+), 9 deletions(-)
> > >
> > > diff --git a/ovsdb/file.c b/ovsdb/file.c
> > > index 90c2b9d20a9a..fdd5f8e35a44 100644
> > > --- a/ovsdb/file.c
> > > +++ b/ovsdb/file.c
> > > @@ -661,7 +661,7 @@ ovsdb_file_compact(struct ovsdb_file *file)
> > >
> > > /* Commit the old version, so that we can be assured that we'll
> > > eventually
> > > * have either the old or the new version. */
> > > - error = ovsdb_log_commit(file->log);
> > > + error = ovsdb_log_commit_block(file->log);
> > > if (error) {
> > > goto exit;
> > > }
> > > @@ -857,7 +857,7 @@ ovsdb_file_txn_commit(struct json *json, const char
> > > *comment,
> > > }
> > >
> > > if (durable) {
> > > - error = ovsdb_log_commit(log);
> > > + error = ovsdb_log_commit_block(log);
> > > if (error) {
> > > return ovsdb_wrap_error(error, "committing transaction
> > > failed");
> > > }
> > > diff --git a/ovsdb/log.c b/ovsdb/log.c
> > > index 0f8dafa30a8f..cc4bc2c6243e 100644
> > > --- a/ovsdb/log.c
> > > +++ b/ovsdb/log.c
> > > @@ -24,12 +24,17 @@
> > > #include <sys/stat.h>
> > > #include <unistd.h>
> > >
> > > +#include "lockfile.h"
> > > #include "openvswitch/dynamic-string.h"
> > > #include "openvswitch/json.h"
> > > #include "openvswitch/vlog.h"
> > > -#include "lockfile.h"
> > > -#include "ovsdb.h"
> > > +#include "ovs-atomic.h"
> > > +#include "ovs-rcu.h"
> > > +#include "ovs-thread.h"
> > > #include "ovsdb-error.h"
> > > +#include "ovsdb.h"
> > > +#include "openvswitch/poll-loop.h"
> > > +#include "seq.h"
> > > #include "sha1.h"
> > > #include "socket-util.h"
> > > #include "transaction.h"
> > > @@ -78,6 +83,7 @@ struct ovsdb_log {
> > > struct lockfile *lockfile;
> > > FILE *stream;
> > > off_t base;
> > > + struct afsync *afsync;
> > > };
> > >
> > > /* Whether the OS supports renaming open files.
> > > @@ -95,6 +101,9 @@ static bool parse_header(char *header, const char
> > > **magicp,
> > > uint8_t sha1[SHA1_DIGEST_SIZE]);
> > > static bool is_magic_ok(const char *needle, const char *haystack);
> > >
> > > +static struct afsync *afsync_create(int fd, uint64_t initial_ticket);
> > > +static uint64_t afsync_destroy(struct afsync *);
> > > +
> > > /* Attempts to open 'name' with the specified 'open_mode'. On
> success,
> > > stores
> > > * the new log into '*filep' and returns NULL; otherwise returns NULL
> and
> > > * stores NULL into '*filep'.
> > > @@ -269,6 +278,7 @@ ovsdb_log_open(const char *name, const char *magic,
> > > file->prev_offset = 0;
> > > file->offset = 0;
> > > file->base = 0;
> > > + file->afsync = NULL;
> > > *filep = file;
> > > return NULL;
> > >
> > > @@ -308,6 +318,7 @@ ovsdb_log_close(struct ovsdb_log *file)
> > > {
> > > if (file) {
> > > ovsdb_error_destroy(file->error);
> > > + afsync_destroy(file->afsync);
> > > free(file->name);
> > > free(file->display_name);
> > > free(file->magic);
> > > @@ -634,8 +645,10 @@ ovsdb_log_write(struct ovsdb_log *file, const
> struct
> > > json *json)
> > > return NULL;
> > > }
> > >
> > > +/* Attempts to commit 'file' to disk. Waits for the commit to
> succeed or
> > > fail.
> > > + * Returns NULL if successful, otherwise the error that occurred. */
> > > struct ovsdb_error *
> > > -ovsdb_log_commit(struct ovsdb_log *file)
> > > +ovsdb_log_commit_block(struct ovsdb_log *file)
> > > {
> > > if (file->stream && fsync(fileno(file->stream))) {
> > > return ovsdb_io_error(errno, "%s: fsync failed",
> > > file->display_name);
> > > @@ -740,7 +753,7 @@ ovsdb_rename(const char *old, const char *new)
> > > struct ovsdb_error * OVS_WARN_UNUSED_RESULT
> > > ovsdb_log_replace_commit(struct ovsdb_log *old, struct ovsdb_log
> *new)
> > > {
> > > - struct ovsdb_error *error = ovsdb_log_commit(new);
> > > + struct ovsdb_error *error = ovsdb_log_commit_block(new);
> > > if (error) {
> > > ovsdb_log_replace_abort(new);
> > > return error;
> > > @@ -812,6 +825,10 @@ ovsdb_log_replace_commit(struct ovsdb_log *old,
> > > struct ovsdb_log *new)
> > > ovsdb_error_destroy(old->error);
> > > old->error = NULL;
> > > /* prev_offset only matters for OVSDB_LOG_READ. */
> > > + if (old->afsync) {
> > > + uint64_t ticket = afsync_destroy(old->afsync);
> > > + old->afsync = afsync_create(fileno(old->stream), ticket + 1);
> > > + }
> > > old->offset = new->offset;
> > > /* Keep old->name. */
> > > free(old->magic);
> > > @@ -844,3 +861,130 @@ ovsdb_log_disable_renaming_open_files(void)
> > > {
> > > rename_open_files = false;
> > > }
> > > +
> > > +struct afsync {
> > > + pthread_t thread;
> > > + atomic_uint64_t cur, next;
> > > + struct seq *request, *complete;
> > > + int fd;
> > > +};
> > > +
> > > +static void *
> > > +afsync_thread(void *afsync_)
> > > +{
> > > + struct afsync *afsync = afsync_;
> > > + uint64_t cur = 0;
> > > + for (;;) {
> > > + ovsrcu_quiesce_start();
> > > +
> > > + uint64_t request_seq = seq_read(afsync->request);
> > > +
> > > + uint64_t next;
> > > + atomic_read_explicit(&afsync->next, &next,
> memory_order_acquire);
> > > + if (next == UINT64_MAX) {
> > > + break;
> > > + }
> > > +
> > > + if (cur != next && afsync->fd != -1) {
> > > + int error = fsync(afsync->fd) ? errno : 0;
> > > + if (!error) {
> > > + cur = next;
> > > + atomic_store_explicit(&afsync->cur, cur,
> > > memory_order_release);
> > > + seq_change(afsync->complete);
> > > + } else {
> > > + VLOG_WARN("fsync failed (%s)", ovs_strerror(error));
> > > + }
> > > + }
> > > +
> > > + seq_wait(afsync->request, request_seq);
> > > + poll_block();
> > > + }
> > > + return NULL;
> > > +}
> > > +
> > > +static struct afsync *
> > > +afsync_create(int fd, uint64_t initial_ticket)
> > > +{
> > > + struct afsync *afsync = xzalloc(sizeof *afsync);
> > > + atomic_init(&afsync->cur, initial_ticket);
> > > + atomic_init(&afsync->next, initial_ticket);
> > > + afsync->request = seq_create();
> > > + afsync->complete = seq_create();
> > > + afsync->thread = ovs_thread_create("log_fsync", afsync_thread,
> > > afsync);
> > > + afsync->fd = fd;
> > > + return afsync;
> > > +}
> > > +
> > > +static uint64_t
> > > +afsync_destroy(struct afsync *afsync)
> > > +{
> > > + if (!afsync) {
> > > + return 0;
> > > + }
> > > +
> > > + uint64_t next;
> > > + atomic_read(&afsync->next, &next);
> > > + atomic_store(&afsync->next, UINT64_MAX);
> > > + seq_change(afsync->request);
> > > + xpthread_join(afsync->thread, NULL);
> > > +
> > > + seq_destroy(afsync->request);
> > > + seq_destroy(afsync->complete);
> > > +
> > > + free(afsync);
> > > +
> > > + return next;
> > > +}
> > > +
> > > +static struct afsync *
> > > +ovsdb_log_get_afsync(struct ovsdb_log *log)
> > > +{
> > > + if (!log->afsync) {
> > > + log->afsync = afsync_create(log->stream ? fileno(log->stream)
> :
> > > -1, 0);
> > > + }
> > > + return log->afsync;
> > > +}
> > > +
> > > +/* Starts committing 'log' to disk. Returns a ticket that can be
> passed
> > > to
> > > + * ovsdb_log_commit_wait() or compared against the return value of
> > > + * ovsdb_log_commit_progress() later. */
> > > +uint64_t
> > > +ovsdb_log_commit_start(struct ovsdb_log *log)
> > > +{
> > > + struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > +
> > > + uint64_t orig;
> > > + atomic_add_explicit(&afsync->next, 1, &orig,
> memory_order_acq_rel);
> > > +
> > > + seq_change(afsync->request);
> > > +
> > > + return orig + 1;
> > > +}
> > > +
> > > +/* Returns a ticket value that represents the current progress of
> commits
> > > to
> > > + * 'log'. Suppose that some call to ovsdb_log_commit_start() returns
> X
> > > and any
> > > + * call ovsdb_log_commit_progress() returns Y, for the same 'log'.
> Then
> > > commit
> > > + * X is complete if and only if X <= Y. */
> > > +uint64_t
> > > +ovsdb_log_commit_progress(struct ovsdb_log *log)
> > > +{
> > > + struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > + uint64_t cur;
> > > + atomic_read_explicit(&afsync->cur, &cur, memory_order_acquire);
> > > + return cur;
> > > +}
> > > +
> > > +/* Causes poll_block() to wake up if and when
> > > ovsdb_log_commit_progress(log)
> > > + * would return at least 'goal'. */
> > > +void
> > > +ovsdb_log_commit_wait(struct ovsdb_log *log, uint64_t goal)
> > > +{
> > > + struct afsync *afsync = ovsdb_log_get_afsync(log);
> > > + uint64_t complete = seq_read(afsync->complete);
> > > + uint64_t cur = ovsdb_log_commit_progress(log);
> > > + if (cur < goal) {
> > > + seq_wait(afsync->complete, complete);
> > > + } else {
> > > + poll_immediate_wake();
> > > + }
> > > +}
> > > diff --git a/ovsdb/log.h b/ovsdb/log.h
> > > index 18900fa50f44..bd0396f27ea8 100644
> > > --- a/ovsdb/log.h
> > > +++ b/ovsdb/log.h
> > > @@ -35,6 +35,7 @@
> > > * that compacting is advised.
> > > */
> > >
> > > +#include <stdint.h>
> > > #include <sys/types.h>
> > > #include "compiler.h"
> > >
> > > @@ -70,7 +71,11 @@ void ovsdb_log_compose_record(const struct json *,
> > > const char *magic,
> > >
> > > struct ovsdb_error *ovsdb_log_write(struct ovsdb_log *, const struct
> json
> > > *)
> > > OVS_WARN_UNUSED_RESULT;
> > > -struct ovsdb_error *ovsdb_log_commit(struct ovsdb_log *)
> > > +
> > > +uint64_t ovsdb_log_commit_start(struct ovsdb_log *);
> > > +uint64_t ovsdb_log_commit_progress(struct ovsdb_log *);
> > > +void ovsdb_log_commit_wait(struct ovsdb_log *, uint64_t);
> > > +struct ovsdb_error *ovsdb_log_commit_block(struct ovsdb_log *)
> > > OVS_WARN_UNUSED_RESULT;
> > >
> > > void ovsdb_log_mark_base(struct ovsdb_log *);
> > > diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
> > > index 4343e3ce5b22..cec64152f079 100644
> > > --- a/ovsdb/ovsdb-tool.c
> > > +++ b/ovsdb/ovsdb-tool.c
> > > @@ -222,7 +222,7 @@ do_create(struct ovs_cmdl_context *ctx)
> > > check_ovsdb_error(ovsdb_log_open(db_file_name, OVSDB_MAGIC,
> > > OVSDB_LOG_CREATE_EXCL, -1,
> &log));
> > > check_ovsdb_error(ovsdb_log_write(log, json));
> > > - check_ovsdb_error(ovsdb_log_commit(log));
> > > + check_ovsdb_error(ovsdb_log_commit_block(log));
> > > ovsdb_log_close(log);
> > >
> > > json_destroy(json);
> > > diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
> > > index 6b2cde863aba..c0c5a4df51af 100644
> > > --- a/tests/test-ovsdb.c
> > > +++ b/tests/test-ovsdb.c
> > > @@ -380,7 +380,7 @@ do_log_io(struct ovs_cmdl_context *ctx)
> > > error = ovsdb_log_write(target, json);
> > > json_destroy(json);
> > > } else if (!strcmp(command, "commit")) {
> > > - error = ovsdb_log_commit(target);
> > > + error = ovsdb_log_commit_block(target);
> > > } else if (!strcmp(command, "replace_start")) {
> > > ovs_assert(!replacement);
> > > error = ovsdb_log_replace_start(log, &replacement);
> > > --
> > > 2.10.2
> > >
> > > _______________________________________________
> > > dev mailing list
> > > dev at openvswitch.org
> > > https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> > >
>
More information about the dev
mailing list