[ovs-dev] [PATCH v2 6/9] ovsdb: relay: Add support for transaction forwarding.

Mark Gray mark.d.gray at redhat.com
Fri Jul 2 10:24:21 UTC 2021


On 12/06/2021 03:00, Ilya Maximets wrote:
> Current version of ovsdb relay allows to scale out read-only
> access to the primary database.  However, many clients are not
> read-only but read-mostly.  For example, ovn-controller.
> 
> In order to scale out database access for this case ovsdb-server
> need to process transactions that are not read-only.  Relay is not
> allowed to do that, i.e. not allowed to modify the database, but it
> can act like a proxy and forward transactions that includes database
> modifications to the primary server and forward replies back to a
> client.  At the same time it may serve read-only transactions and
> monitor requests by itself greatly reducing the load on primary
> server.
> 
> This configuration will slightly increase transaction latency, but
> it's not very important for read-mostly use cases.
> 
> Implementation details:
> With this change instead of creating a trigger to commit the
> transaction, ovsdb-server will create a trigger for transaction
> forwarding.  Later, ovsdb_relay_run() will send all new transactions
> to the relay source.  Once transaction reply received from the
> relay source, ovsdb-relay module will update the state of the
> transaction forwarding with the reply.  After that, trigger_run()
> will complete the trigger and jsonrpc_server_run() will send the
> reply back to the client.  Since transaction reply from the relay
> source will be received after all the updates, client will receive
> all the updates before receiving the transaction reply as it is in
> a normal scenario with other database models.
> 
> Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
> ---
>  ovsdb/automake.mk           |   2 +
>  ovsdb/execution.c           |  18 ++--
>  ovsdb/ovsdb.c               |   9 ++
>  ovsdb/ovsdb.h               |   8 +-
>  ovsdb/relay.c               |  10 +-
>  ovsdb/transaction-forward.c | 182 ++++++++++++++++++++++++++++++++++++
>  ovsdb/transaction-forward.h |  44 +++++++++
>  ovsdb/trigger.c             |  48 ++++++++--
>  ovsdb/trigger.h             |  41 ++++----
>  tests/ovsdb-server.at       |  85 ++++++++++++++++-
>  10 files changed, 409 insertions(+), 38 deletions(-)
>  create mode 100644 ovsdb/transaction-forward.c
>  create mode 100644 ovsdb/transaction-forward.h
> 
> diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
> index 05c8ebbdf..62cc02686 100644
> --- a/ovsdb/automake.mk
> +++ b/ovsdb/automake.mk
> @@ -48,6 +48,8 @@ ovsdb_libovsdb_la_SOURCES = \
>  	ovsdb/trigger.h \
>  	ovsdb/transaction.c \
>  	ovsdb/transaction.h \
> +	ovsdb/transaction-forward.c \
> +	ovsdb/transaction-forward.h \
>  	ovsdb/ovsdb-util.c \
>  	ovsdb/ovsdb-util.h
>  ovsdb_libovsdb_la_CFLAGS = $(AM_CFLAGS)
> diff --git a/ovsdb/execution.c b/ovsdb/execution.c
> index dd2569055..f9b8067d0 100644
> --- a/ovsdb/execution.c
> +++ b/ovsdb/execution.c
> @@ -99,7 +99,8 @@ lookup_executor(const char *name, bool *read_only)
>  }
>  
>  /* On success, returns a transaction and stores the results to return to the
> - * client in '*resultsp'.
> + * client in '*resultsp'.  If 'forwarding_needed' is nonnull and transaction
> + * needs to be forwarded (in relay mode), sets '*forwarding_needed' to true.
>   *
>   * On failure, returns NULL.  If '*resultsp' is nonnull, then it is the results
>   * to return to the client.  If '*resultsp' is null, then the execution failed
> @@ -111,7 +112,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
>                        const struct json *params, bool read_only,
>                        const char *role, const char *id,
>                        long long int elapsed_msec, long long int *timeout_msec,
> -                      bool *durable, struct json **resultsp)
> +                      bool *durable, bool *forwarding_needed,
> +                      struct json **resultsp)
>  {
>      struct ovsdb_execution x;
>      struct ovsdb_error *error;
> @@ -120,6 +122,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
>      size_t i;
>  
>      *durable = false;
> +    if (forwarding_needed) {
> +        *forwarding_needed = false;
> +    }
>      if (params->type != JSON_ARRAY
>          || !params->array.n
>          || params->array.elems[0]->type != JSON_STRING
> @@ -196,11 +201,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
>                                      "%s operation not allowed on "
>                                      "table in reserved database %s",
>                                      op_name, db->schema->name);
> -            } else if (db->is_relay) {
> -                error = ovsdb_error("not allowed",
> -                                    "%s operation not allowed when "
> -                                    "database server is in relay mode",
> -                                    op_name);
> +            } else if (db->is_relay && forwarding_needed) {
> +                *forwarding_needed = true;
>              }
>          }
>          if (error) {
> @@ -245,7 +247,7 @@ ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
>      struct json *results;
>      struct ovsdb_txn *txn = ovsdb_execute_compose(
>          db, session, params, read_only, role, id, elapsed_msec, timeout_msec,
> -        &durable, &results);
> +        &durable, NULL, &results);
>      if (!txn) {
>          return results;
>      }
> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
> index 999cd0d75..126d16a2f 100644
> --- a/ovsdb/ovsdb.c
> +++ b/ovsdb/ovsdb.c
> @@ -33,6 +33,7 @@
>  #include "table.h"
>  #include "timeval.h"
>  #include "transaction.h"
> +#include "transaction-forward.h"
>  #include "trigger.h"
>  
>  #include "openvswitch/vlog.h"
> @@ -422,6 +423,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct ovsdb_storage *storage)
>      db->run_triggers_now = db->run_triggers = false;
>  
>      db->is_relay = false;
> +    ovs_list_init(&db->txn_forward_new);
> +    hmap_init(&db->txn_forward_sent);
>  
>      shash_init(&db->tables);
>      if (schema) {
> @@ -465,6 +468,12 @@ ovsdb_destroy(struct ovsdb *db)
>          /* Destroy txn history. */
>          ovsdb_txn_history_destroy(db);
>  
> +        /* Cancell all the forwarded transactions.  There should not be
> +         * any as all triggers should be already cancelled. */
> +        ovsdb_txn_forward_cancel_all(db, false);
> +        ovs_assert(hmap_is_empty(&db->txn_forward_sent));
> +        hmap_destroy(&db->txn_forward_sent);
> +
>          /* The caller must ensure that no triggers remain. */
>          ovs_assert(ovs_list_is_empty(&db->triggers));
>  
> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
> index 16bd5f5ec..4a7bd0f0e 100644
> --- a/ovsdb/ovsdb.h
> +++ b/ovsdb/ovsdb.h
> @@ -93,7 +93,11 @@ struct ovsdb {
>      struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */
>  
>      /* Relay mode. */
> -    bool is_relay;
> +    bool is_relay;  /* True, if database is in relay mode. */
> +    /* List that holds transactions waiting to be forwarded to the server. */
> +    struct ovs_list txn_forward_new;
> +    /* Hash map for transactions that are already sent and waits for reply. */
> +    struct hmap txn_forward_sent;
>  };
>  
>  struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
> @@ -107,7 +111,7 @@ struct ovsdb_txn *ovsdb_execute_compose(
>      struct ovsdb *, const struct ovsdb_session *, const struct json *params,
>      bool read_only, const char *role, const char *id,
>      long long int elapsed_msec, long long int *timeout_msec,
> -    bool *durable, struct json **);
> +    bool *durable, bool *forwarding_needed, struct json **);
>  
>  struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
>                             const struct json *params, bool read_only,
> diff --git a/ovsdb/relay.c b/ovsdb/relay.c
> index 5f423a0b9..ef689c649 100644
> --- a/ovsdb/relay.c
> +++ b/ovsdb/relay.c
> @@ -32,6 +32,7 @@
>  #include "row.h"
>  #include "table.h"
>  #include "transaction.h"
> +#include "transaction-forward.h"
>  #include "util.h"
>  
>  VLOG_DEFINE_THIS_MODULE(relay);
> @@ -298,6 +299,7 @@ ovsdb_relay_run(void)
>          struct relay_ctx *ctx = node->data;
>          struct ovs_list events;
>  
> +        ovsdb_txn_forward_run(ctx->db, ctx->cs);
>          ovsdb_cs_run(ctx->cs, &events);
>  
>          struct ovsdb_cs_event *event;
> @@ -309,7 +311,9 @@ ovsdb_relay_run(void)
>  
>              switch (event->type) {
>              case OVSDB_CS_EVENT_TYPE_RECONNECT:
> -                /* Nothing to do. */
> +                /* Cancelling all the transactions that was already sent but
> +                 * not replied yet as they might be lost. */
> +                ovsdb_txn_forward_cancel_all(ctx->db, true);
>                  break;
>  
>              case OVSDB_CS_EVENT_TYPE_UPDATE:
> @@ -317,6 +321,9 @@ ovsdb_relay_run(void)
>                  break;
>  
>              case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
> +                ovsdb_txn_forward_complete(ctx->db, event->txn_reply);
> +                break;
> +
>              case OVSDB_CS_EVENT_TYPE_LOCKED:
>                  /* Not expected. */
>                  break;
> @@ -335,5 +342,6 @@ ovsdb_relay_wait(void)
>          struct relay_ctx *ctx = node->data;
>  
>          ovsdb_cs_wait(ctx->cs);
> +        ovsdb_txn_forward_wait(ctx->db, ctx->cs);
>      }
>  }
> diff --git a/ovsdb/transaction-forward.c b/ovsdb/transaction-forward.c
> new file mode 100644
> index 000000000..8ff12ef4b
> --- /dev/null
> +++ b/ovsdb/transaction-forward.c
> @@ -0,0 +1,182 @@
> +/*
> + * Copyright (c) 2021, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "transaction-forward.h"
> +
> +#include "coverage.h"
> +#include "jsonrpc.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/json.h"
> +#include "openvswitch/list.h"
> +#include "openvswitch/poll-loop.h"
> +#include "openvswitch/vlog.h"
> +#include "ovsdb.h"
> +#include "ovsdb-cs.h"
> +#include "util.h"
> +
> +VLOG_DEFINE_THIS_MODULE(transaction_forward);
> +
> +COVERAGE_DEFINE(txn_forward_cancel);
> +COVERAGE_DEFINE(txn_forward_complete);
> +COVERAGE_DEFINE(txn_forward_create);
> +COVERAGE_DEFINE(txn_forward_sent);
> +
> +struct ovsdb_txn_forward {
> +    struct ovs_list new_node;    /* In 'txn_forward_new' of struct ovsdb. */
> +    struct hmap_node sent_node;  /* In 'txn_forward_sent' of struct ovsdb. */
> +    struct json *id;             /* 'id' of the forwarded transaction. */
> +    struct jsonrpc_msg *request; /* Original request. */
> +    struct jsonrpc_msg *reply;   /* Reply from the server. */
> +};
> +
> +struct ovsdb_txn_forward *
> +ovsdb_txn_forward_create(struct ovsdb *db, const struct jsonrpc_msg *request)
> +{
> +    struct ovsdb_txn_forward *txn_fwd = xzalloc(sizeof *txn_fwd);
> +
> +    COVERAGE_INC(txn_forward_create);
> +    txn_fwd->request = jsonrpc_msg_clone(request);
> +    ovs_list_push_back(&db->txn_forward_new, &txn_fwd->new_node);
> +
> +    return txn_fwd;
> +}
> +
> +static void
> +ovsdb_txn_forward_unlist(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> +    if (!ovs_list_is_empty(&txn_fwd->new_node)) {
> +        ovs_list_remove(&txn_fwd->new_node);
> +        ovs_list_init(&txn_fwd->new_node);
> +    }
> +    if (!hmap_node_is_null(&txn_fwd->sent_node)) {
> +        hmap_remove(&db->txn_forward_sent, &txn_fwd->sent_node);
> +        hmap_node_nullify(&txn_fwd->sent_node);
> +    }
> +}
> +
> +void
> +ovsdb_txn_forward_destroy(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> +    if (!txn_fwd) {
> +        return;
> +    }
> +
> +    ovsdb_txn_forward_unlist(db, txn_fwd);
> +    json_destroy(txn_fwd->id);
> +    jsonrpc_msg_destroy(txn_fwd->request);
> +    jsonrpc_msg_destroy(txn_fwd->reply);
> +    free(txn_fwd);
> +}
> +
> +bool
> +ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *txn_fwd)
> +{
> +    return txn_fwd->reply != NULL;
> +}
> +
> +void
> +ovsdb_txn_forward_complete(struct ovsdb *db, const struct jsonrpc_msg *reply)
> +{
> +    struct ovsdb_txn_forward *t;
> +    size_t hash = json_hash(reply->id, 0);
> +
> +    HMAP_FOR_EACH_WITH_HASH (t, sent_node, hash, &db->txn_forward_sent) {
> +        if (json_equal(reply->id, t->id)) {
> +            COVERAGE_INC(txn_forward_complete);
> +            t->reply = jsonrpc_msg_clone(reply);
> +
> +            /* Replacing id with the id of the original request. */
> +            json_destroy(t->reply->id);
> +            t->reply->id = json_clone(t->request->id);
> +
> +            hmap_remove(&db->txn_forward_sent, &t->sent_node);
> +            hmap_node_nullify(&t->sent_node);
> +
> +            db->run_triggers_now = db->run_triggers = true;
> +            return;
> +        }
> +    }
> +}
> +
> +struct jsonrpc_msg *
> +ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *txn_fwd)
> +{
> +    struct jsonrpc_msg *reply = txn_fwd->reply;
> +
> +    txn_fwd->reply = NULL;
> +    return reply;
> +}
> +
> +void
> +ovsdb_txn_forward_run(struct ovsdb *db, struct ovsdb_cs *cs)
> +{
> +    struct ovsdb_txn_forward *t, *next;
> +
> +    /* Send all transactions that needs to be forwarded. */
> +    LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
> +        if (!ovsdb_cs_may_send_transaction(cs)) {
> +            break;
> +        }
> +        ovs_assert(!strcmp(t->request->method, "transact"));
> +        t->id = ovsdb_cs_send_transaction(cs, json_clone(t->request->params));
> +        if (t->id) {
> +            COVERAGE_INC(txn_forward_sent);
> +            ovs_list_remove(&t->new_node);
> +            ovs_list_init(&t->new_node);
> +            hmap_insert(&db->txn_forward_sent, &t->sent_node,
> +                        json_hash(t->id, 0));
> +        }
> +    }
> +}
> +
> +void
> +ovsdb_txn_forward_wait(struct ovsdb *db, struct ovsdb_cs *cs)
> +{
> +    if (ovsdb_cs_may_send_transaction(cs)
> +        && !ovs_list_is_empty(&db->txn_forward_new)) {
> +        poll_immediate_wake();
> +    }
> +}
> +
> +void
> +ovsdb_txn_forward_cancel(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> +    COVERAGE_INC(txn_forward_cancel);
> +    jsonrpc_msg_destroy(txn_fwd->reply);
> +    txn_fwd->reply = jsonrpc_create_error(json_string_create("canceled"),
> +                                          txn_fwd->request->id);
> +    ovsdb_txn_forward_unlist(db, txn_fwd);
> +}
> +
> +void
> +ovsdb_txn_forward_cancel_all(struct ovsdb *db, bool sent_only)
> +{
> +    struct ovsdb_txn_forward *t, *next;
> +
> +    HMAP_FOR_EACH_SAFE (t, next, sent_node, &db->txn_forward_sent) {
> +        ovsdb_txn_forward_cancel(db, t);
> +    }
> +
> +    if (sent_only) {
> +        return;
> +    }
> +
> +    LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
> +        ovsdb_txn_forward_cancel(db, t);
> +    }
> +}
> diff --git a/ovsdb/transaction-forward.h b/ovsdb/transaction-forward.h
> new file mode 100644
> index 000000000..6788d3824
> --- /dev/null
> +++ b/ovsdb/transaction-forward.h
> @@ -0,0 +1,44 @@
> +/*
> + * Copyright (c) 2021, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVSDB_TXN_FORWARD_H
> +#define OVSDB_TXN_FORWARD_H 1
> +
> +#include <stdbool.h>
> +
> +struct ovsdb;
> +struct ovsdb_cs;
> +struct ovsdb_txn_forward;
> +struct jsonrpc_session;
> +struct jsonrpc_msg;
> +
> +struct ovsdb_txn_forward *ovsdb_txn_forward_create(
> +    struct ovsdb *, const struct jsonrpc_msg *request);
> +void ovsdb_txn_forward_destroy(struct ovsdb *, struct ovsdb_txn_forward *);
> +
> +bool ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *);
> +void ovsdb_txn_forward_complete(struct ovsdb *,
> +                                const struct jsonrpc_msg *reply);
> +
> +struct jsonrpc_msg *ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *);
> +
> +void ovsdb_txn_forward_run(struct ovsdb *, struct ovsdb_cs *);
> +void ovsdb_txn_forward_wait(struct ovsdb *, struct ovsdb_cs *);
> +
> +void ovsdb_txn_forward_cancel(struct ovsdb *, struct ovsdb_txn_forward *);
> +void ovsdb_txn_forward_cancel_all(struct ovsdb *, bool sent_only);
> +
> +#endif /* OVSDB_TXN_FORWARD_H */
> diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
> index 0372302af..1c38a94c1 100644
> --- a/ovsdb/trigger.c
> +++ b/ovsdb/trigger.c
> @@ -28,6 +28,7 @@
>  #include "openvswitch/poll-loop.h"
>  #include "server.h"
>  #include "transaction.h"
> +#include "transaction-forward.h"
>  #include "openvswitch/vlog.h"
>  #include "util.h"
>  
> @@ -53,6 +54,7 @@ ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
>      trigger->request = request;
>      trigger->reply = NULL;
>      trigger->progress = NULL;
> +    trigger->txn_forward = NULL;
>      trigger->created = now;
>      trigger->timeout_msec = LLONG_MAX;
>      trigger->read_only = read_only;
> @@ -65,6 +67,7 @@ void
>  ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
>  {
>      ovsdb_txn_progress_destroy(trigger->progress);
> +    ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
>      ovs_list_remove(&trigger->node);
>      jsonrpc_msg_destroy(trigger->request);
>      jsonrpc_msg_destroy(trigger->reply);
> @@ -75,7 +78,7 @@ ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
>  bool
>  ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger)
>  {
> -    return trigger->reply && !trigger->progress;
> +    return trigger->reply && !trigger->progress && !trigger->txn_forward;
>  }
>  
>  struct jsonrpc_msg *
> @@ -98,6 +101,11 @@ ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason)
>          trigger->progress = NULL;
>      }
>  
> +    if (trigger->txn_forward) {
> +        ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
> +        trigger->txn_forward = NULL;
> +    }
> +
>      jsonrpc_msg_destroy(trigger->reply);
>      trigger->reply = NULL;
>  
> @@ -148,7 +156,7 @@ ovsdb_trigger_run(struct ovsdb *db, long long int now)
>      LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) {
>          if (run_triggers
>              || now - t->created >= t->timeout_msec
> -            || t->progress) {
> +            || t->progress || t->txn_forward) {
>              if (ovsdb_trigger_try(t, now)) {
>                  disconnect_all = true;
>              }
> @@ -188,7 +196,7 @@ static bool
>  ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
>  {
>      /* Handle "initialized" state. */
> -    if (!t->reply) {
> +    if (!t->reply && !t->txn_forward) {
>          ovs_assert(!t->progress);
>  
>          struct ovsdb_txn *txn = NULL;
> @@ -198,13 +206,14 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
>                  return false;
>              }
>  
> -            bool durable;
> +            bool durable, forwarding_needed;
>  
>              struct json *result;
> +            /* Trying to compose transaction. */
>              txn = ovsdb_execute_compose(
>                  t->db, t->session, t->request->params, t->read_only,
>                  t->role, t->id, now - t->created, &t->timeout_msec,
> -                &durable, &result);
> +                &durable, &forwarding_needed, &result);
>              if (!txn) {
>                  if (result) {
>                      /* Complete.  There was an error but we still represent it
> @@ -217,9 +226,20 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
>                  return false;
>              }
>  
> -            /* Transition to "committing" state. */
> -            t->reply = jsonrpc_create_reply(result, t->request->id);
> -            t->progress = ovsdb_txn_propose_commit(txn, durable);
> +            if (forwarding_needed) {
> +                /* Transaction is good, but we don't need it. */
> +                ovsdb_txn_abort(txn);
> +                json_destroy(result);
> +                /* Transition to "forwarding" state. */
> +                t->txn_forward = ovsdb_txn_forward_create(t->db, t->request);
> +                /* Forward will not be completed immediately.  Will check
> +                 * next time. */
> +                return false;
> +            } else {
> +                /* Transition to "committing" state. */
> +                t->reply = jsonrpc_create_reply(result, t->request->id);
> +                t->progress = ovsdb_txn_propose_commit(txn, durable);
> +            }
>          } else if (!strcmp(t->request->method, "convert")) {
>              /* Permission check. */
>              if (t->role && *t->role) {
> @@ -348,6 +368,18 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
>              ovsdb_trigger_complete(t);
>          }
>  
> +        return false;
> +    } else if (t->txn_forward) {
> +        /* Handle "forwarding" state. */

Should we assert that reply == NULL and progress == NULL?

> +        if (!ovsdb_txn_forward_is_complete(t->txn_forward)) {
> +            return false;
> +        }
> +
> +        /* Transition to "complete". */
> +        t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward);
> +        ovsdb_txn_forward_destroy(t->db, t->txn_forward);
> +        t->txn_forward = NULL;
> +        ovsdb_trigger_complete(t);
>          return false;
>      }
>  
> diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
> index 79af7f6be..d060c72e5 100644
> --- a/ovsdb/trigger.h
> +++ b/ovsdb/trigger.h
> @@ -22,26 +22,34 @@ struct ovsdb;
>  
>  /* Triggers have the following states:
>   *
> - *    - Initialized (reply == NULL, progress == NULL): Executing the trigger
> - *      can keep it in the initialized state, if it has a "wait" condition that
> - *      isn't met.  Executing the trigger can also yield an error, in which
> - *      case it transitions to "complete".  Otherwise, execution yields a
> - *      transaction, which the database attempts to commit.  If the transaction
> - *      completes immediately and synchronously, then the trigger transitions
> - *      to the "complete" state.  If the transaction requires some time to
> - *      complete, it transitions to the "committing" state.
> + *    - Initialized (reply == NULL, progress == NULL, txn_forward == NULL):
> + *      Executing the trigger can keep it in the initialized state, if it has a
> + *      "wait" condition that isn't met.  Executing the trigger can also yield
> + *      an error, in which case it transitions to "complete".  Otherwise,
> + *      execution yields a transaction, which the database attempts to commit.
> + *      If the transaction completes immediately and synchronously, then the
> + *      trigger transitions to the "complete" state.  If the transaction
> + *      requires some time to complete, it transitions to the "committing"
> + *      state.  If the transaction can not be completed locally due to
> + *      read-only restrictions and transaction forwarding is enabled, starts
> + *      forwarding and transitions to the "forwarding" state.
>   *
> - *    - Committing (reply != NULL, progress != NULL): The transaction is
> - *      committing.  If it succeeds, or if it fails permanently, then the
> - *      trigger transitions to "complete".  If it fails temporarily
> - *      (e.g. because someone else committed to cluster-based storage before we
> - *      did), then we transition back to "initialized" to try again.
> + *    - Committing (reply != NULL, progress != NULL, txn_forward == NULL):
> + *      The transaction is committing.  If it succeeds, or if it fails
> + *      permanently, then the trigger transitions to "complete".  If it fails
> + *      temporarily (e.g. because someone else committed to cluster-based
> + *      storage before we did), then we transition back to "initialized" to
> + *      try again.
>   *
> - *    - Complete (reply != NULL, progress == NULL): The transaction is done
> - *      and either succeeded or failed.
> + *    - Forwarding (reply == NULL, progress == NULL, txn_forward != NULL):
> + *      Transaction is forwarded.  Either it succeeds or it fails, the trigger
> + *      transitions to "complete".
> + *
> + *    - Complete (reply != NULL, progress == NULL, txn_forward == NULL):
> + *      The transaction is done and either succeeded or failed.
>   */
>  struct ovsdb_trigger {
> -    /* In "initialized" or "committing" state, in db->triggers.
> +    /* In "initialized", "committing" or "forwarding" state, in db->triggers.
>       * In "complete", in session->completions. */
>      struct ovs_list node;
>      struct ovsdb_session *session; /* Session that owns this trigger. */
> @@ -49,6 +57,7 @@ struct ovsdb_trigger {
>      struct jsonrpc_msg *request; /* Database request. */
>      struct jsonrpc_msg *reply;   /* Result (null if none yet). */
>      struct ovsdb_txn_progress *progress;
> +    struct ovsdb_txn_forward *txn_forward; /* Tracks transaction forwarding. */
>      long long int created;      /* Time created. */
>      long long int timeout_msec; /* Max wait duration. */
>      bool read_only;             /* Database is in read only mode. */
> diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
> index ba1b369c1..ac243d6a7 100644
> --- a/tests/ovsdb-server.at
> +++ b/tests/ovsdb-server.at
> @@ -3,10 +3,13 @@ AT_BANNER([OVSDB -- ovsdb-server transactions (Unix sockets)])
>  m4_define([OVSDB_SERVER_SHUTDOWN],
>    [OVS_APP_EXIT_AND_WAIT_BY_TARGET([ovsdb-server], [ovsdb-server.pid])])
>  
> +m4_define([OVSDB_SERVER_SHUTDOWN_N],
> +  [cp pid$1 savepid$1
> +   AT_CHECK([ovs-appctl -t "`pwd`"/unixctl$1 -e exit], [0], [ignore], [ignore])
> +   OVS_WAIT_WHILE([kill -0 `cat savepid$1`], [kill `cat savepid$1`])])
> +
>  m4_define([OVSDB_SERVER_SHUTDOWN2],
> -  [cp pid2 savepid2
> -   AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore])
> -   OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])])
> +  [OVSDB_SERVER_SHUTDOWN_N([2])])
>  
>  # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
>  #
> @@ -1412,6 +1415,82 @@ m4_define([OVSDB_CHECK_EXECUTION],
>  
>  EXECUTION_EXAMPLES
>  

> +AT_BANNER([OVSDB -- ovsdb-server relay])
> +
> +# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
> +#
> +# Creates a database with the given SCHEMA and starts an ovsdb-server on
> +# it.  Also starts a daisy chain of ovsdb-servers in relay mode where the
> +# first relay server is connected to the main non-relay ovsdb-server.
> +#
> +# Runs each of the TRANSACTIONS (which should be a quoted list of
> +# quoted strings) against one of relay servers in the middle with
> +# ovsdb-client one at a time.  The server executes read-only transactions
> +# and forwards rest of them to the previous ovsdb-server in a chain.
> +# The main ovsdb-server executes 'write' transactions.  Transaction
> +# reply with data updates propagates back through the chain to all
> +# the servers and the client.
> +#
> +#    main        relay       relay       relay       relay       relay
> +#   server1 <-- server2 <-- server3 <-- server4 <-- server5 <-- server6
> +#                                         ^
> +#                                         |
> +#                                     ovsdb-client
> +#
> +# Checks that the overall output is OUTPUT, but UUIDs in the output
> +# are replaced by markers of the form <N> where N is a number.  The
> +# first unique UUID is replaced by <0>, the next by <1>, and so on.
> +# If a given UUID appears more than once it is always replaced by the
> +# same marker.
> +#
> +# Checks that the dump of all databases is the same.
> +#
> +# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
> +m4_define([OVSDB_CHECK_EXECUTION],
> +  [AT_SETUP([$1])
> +   AT_KEYWORDS([ovsdb server tcp relay $5])
> +   n_servers=6
> +   target=4
> +   $2 > schema
> +   schema_name=`ovsdb-tool schema-name schema`
> +   AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
> +
> +   on_exit 'kill `cat *.pid`'
> +   AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log dnl
> +                          --pidfile --remote=punix:db1.sock db1
> +            ], [0], [ignore], [ignore])
> +
> +   for i in $(seq 2 ${n_servers}); do
> +     AT_CHECK([ovsdb-server --detach --no-chdir                           dnl
> +                            --log-file=ovsdb-server$i.log                 dnl
> +                            --pidfile=${i}.pid --remote=punix:db${i}.sock dnl
> +                            --unixctl=unixctl${i} -vjsonrpc:file:dbg      dnl
> +                            relay:${schema_name}:unix:db$((i-1)).sock
> +            ], [0], [ignore], [ignore])
> +   done
> +
> +   m4_foreach([txn], [$3],
> +     [AT_CHECK([ovsdb-client transact unix:db${target}.sock 'txn'], [0],
> +               [stdout], [ignore])
> +      cat stdout >> output
> +   ])
> +
> +   AT_CHECK([uuidfilt output], [0], [$4], [ignore])
> +
> +   AT_CHECK([ovsdb-client dump unix:db1.sock], [0], [stdout], [ignore])
> +   for i in $(seq 2 ${n_servers}); do
> +     OVS_WAIT_UNTIL([ovsdb-client dump unix:db${i}.sock > dump${i}; dnl
> +                     diff stdout dump${i}])
> +   done
> +
> +   OVSDB_SERVER_SHUTDOWN
> +   for i in $(seq 2 ${n_servers}); do
> +     OVSDB_SERVER_SHUTDOWN_N([$i])
> +   done
> +   AT_CLEANUP])
> +
> +EXECUTION_EXAMPLES
> +
>  AT_BANNER([OVSDB -- ovsdb-server replication])
>  
>  # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
> 



More information about the dev mailing list