[ovs-dev] [PATCH v2 6/9] ovsdb: relay: Add support for transaction forwarding.
Mark Gray
mark.d.gray at redhat.com
Fri Jul 2 10:24:21 UTC 2021
On 12/06/2021 03:00, Ilya Maximets wrote:
> Current version of ovsdb relay allows to scale out read-only
> access to the primary database. However, many clients are not
> read-only but read-mostly. For example, ovn-controller.
>
> In order to scale out database access for this case ovsdb-server
> need to process transactions that are not read-only. Relay is not
> allowed to do that, i.e. not allowed to modify the database, but it
> can act like a proxy and forward transactions that includes database
> modifications to the primary server and forward replies back to a
> client. At the same time it may serve read-only transactions and
> monitor requests by itself greatly reducing the load on primary
> server.
>
> This configuration will slightly increase transaction latency, but
> it's not very important for read-mostly use cases.
>
> Implementation details:
> With this change instead of creating a trigger to commit the
> transaction, ovsdb-server will create a trigger for transaction
> forwarding. Later, ovsdb_relay_run() will send all new transactions
> to the relay source. Once transaction reply received from the
> relay source, ovsdb-relay module will update the state of the
> transaction forwarding with the reply. After that, trigger_run()
> will complete the trigger and jsonrpc_server_run() will send the
> reply back to the client. Since transaction reply from the relay
> source will be received after all the updates, client will receive
> all the updates before receiving the transaction reply as it is in
> a normal scenario with other database models.
>
> Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
> ---
> ovsdb/automake.mk | 2 +
> ovsdb/execution.c | 18 ++--
> ovsdb/ovsdb.c | 9 ++
> ovsdb/ovsdb.h | 8 +-
> ovsdb/relay.c | 10 +-
> ovsdb/transaction-forward.c | 182 ++++++++++++++++++++++++++++++++++++
> ovsdb/transaction-forward.h | 44 +++++++++
> ovsdb/trigger.c | 48 ++++++++--
> ovsdb/trigger.h | 41 ++++----
> tests/ovsdb-server.at | 85 ++++++++++++++++-
> 10 files changed, 409 insertions(+), 38 deletions(-)
> create mode 100644 ovsdb/transaction-forward.c
> create mode 100644 ovsdb/transaction-forward.h
>
> diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
> index 05c8ebbdf..62cc02686 100644
> --- a/ovsdb/automake.mk
> +++ b/ovsdb/automake.mk
> @@ -48,6 +48,8 @@ ovsdb_libovsdb_la_SOURCES = \
> ovsdb/trigger.h \
> ovsdb/transaction.c \
> ovsdb/transaction.h \
> + ovsdb/transaction-forward.c \
> + ovsdb/transaction-forward.h \
> ovsdb/ovsdb-util.c \
> ovsdb/ovsdb-util.h
> ovsdb_libovsdb_la_CFLAGS = $(AM_CFLAGS)
> diff --git a/ovsdb/execution.c b/ovsdb/execution.c
> index dd2569055..f9b8067d0 100644
> --- a/ovsdb/execution.c
> +++ b/ovsdb/execution.c
> @@ -99,7 +99,8 @@ lookup_executor(const char *name, bool *read_only)
> }
>
> /* On success, returns a transaction and stores the results to return to the
> - * client in '*resultsp'.
> + * client in '*resultsp'. If 'forwarding_needed' is nonnull and transaction
> + * needs to be forwarded (in relay mode), sets '*forwarding_needed' to true.
> *
> * On failure, returns NULL. If '*resultsp' is nonnull, then it is the results
> * to return to the client. If '*resultsp' is null, then the execution failed
> @@ -111,7 +112,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
> const struct json *params, bool read_only,
> const char *role, const char *id,
> long long int elapsed_msec, long long int *timeout_msec,
> - bool *durable, struct json **resultsp)
> + bool *durable, bool *forwarding_needed,
> + struct json **resultsp)
> {
> struct ovsdb_execution x;
> struct ovsdb_error *error;
> @@ -120,6 +122,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
> size_t i;
>
> *durable = false;
> + if (forwarding_needed) {
> + *forwarding_needed = false;
> + }
> if (params->type != JSON_ARRAY
> || !params->array.n
> || params->array.elems[0]->type != JSON_STRING
> @@ -196,11 +201,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
> "%s operation not allowed on "
> "table in reserved database %s",
> op_name, db->schema->name);
> - } else if (db->is_relay) {
> - error = ovsdb_error("not allowed",
> - "%s operation not allowed when "
> - "database server is in relay mode",
> - op_name);
> + } else if (db->is_relay && forwarding_needed) {
> + *forwarding_needed = true;
> }
> }
> if (error) {
> @@ -245,7 +247,7 @@ ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
> struct json *results;
> struct ovsdb_txn *txn = ovsdb_execute_compose(
> db, session, params, read_only, role, id, elapsed_msec, timeout_msec,
> - &durable, &results);
> + &durable, NULL, &results);
> if (!txn) {
> return results;
> }
> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
> index 999cd0d75..126d16a2f 100644
> --- a/ovsdb/ovsdb.c
> +++ b/ovsdb/ovsdb.c
> @@ -33,6 +33,7 @@
> #include "table.h"
> #include "timeval.h"
> #include "transaction.h"
> +#include "transaction-forward.h"
> #include "trigger.h"
>
> #include "openvswitch/vlog.h"
> @@ -422,6 +423,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct ovsdb_storage *storage)
> db->run_triggers_now = db->run_triggers = false;
>
> db->is_relay = false;
> + ovs_list_init(&db->txn_forward_new);
> + hmap_init(&db->txn_forward_sent);
>
> shash_init(&db->tables);
> if (schema) {
> @@ -465,6 +468,12 @@ ovsdb_destroy(struct ovsdb *db)
> /* Destroy txn history. */
> ovsdb_txn_history_destroy(db);
>
> + /* Cancell all the forwarded transactions. There should not be
> + * any as all triggers should be already cancelled. */
> + ovsdb_txn_forward_cancel_all(db, false);
> + ovs_assert(hmap_is_empty(&db->txn_forward_sent));
> + hmap_destroy(&db->txn_forward_sent);
> +
> /* The caller must ensure that no triggers remain. */
> ovs_assert(ovs_list_is_empty(&db->triggers));
>
> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
> index 16bd5f5ec..4a7bd0f0e 100644
> --- a/ovsdb/ovsdb.h
> +++ b/ovsdb/ovsdb.h
> @@ -93,7 +93,11 @@ struct ovsdb {
> struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. */
>
> /* Relay mode. */
> - bool is_relay;
> + bool is_relay; /* True, if database is in relay mode. */
> + /* List that holds transactions waiting to be forwarded to the server. */
> + struct ovs_list txn_forward_new;
> + /* Hash map for transactions that are already sent and waits for reply. */
> + struct hmap txn_forward_sent;
> };
>
> struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
> @@ -107,7 +111,7 @@ struct ovsdb_txn *ovsdb_execute_compose(
> struct ovsdb *, const struct ovsdb_session *, const struct json *params,
> bool read_only, const char *role, const char *id,
> long long int elapsed_msec, long long int *timeout_msec,
> - bool *durable, struct json **);
> + bool *durable, bool *forwarding_needed, struct json **);
>
> struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
> const struct json *params, bool read_only,
> diff --git a/ovsdb/relay.c b/ovsdb/relay.c
> index 5f423a0b9..ef689c649 100644
> --- a/ovsdb/relay.c
> +++ b/ovsdb/relay.c
> @@ -32,6 +32,7 @@
> #include "row.h"
> #include "table.h"
> #include "transaction.h"
> +#include "transaction-forward.h"
> #include "util.h"
>
> VLOG_DEFINE_THIS_MODULE(relay);
> @@ -298,6 +299,7 @@ ovsdb_relay_run(void)
> struct relay_ctx *ctx = node->data;
> struct ovs_list events;
>
> + ovsdb_txn_forward_run(ctx->db, ctx->cs);
> ovsdb_cs_run(ctx->cs, &events);
>
> struct ovsdb_cs_event *event;
> @@ -309,7 +311,9 @@ ovsdb_relay_run(void)
>
> switch (event->type) {
> case OVSDB_CS_EVENT_TYPE_RECONNECT:
> - /* Nothing to do. */
> + /* Cancelling all the transactions that was already sent but
> + * not replied yet as they might be lost. */
> + ovsdb_txn_forward_cancel_all(ctx->db, true);
> break;
>
> case OVSDB_CS_EVENT_TYPE_UPDATE:
> @@ -317,6 +321,9 @@ ovsdb_relay_run(void)
> break;
>
> case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
> + ovsdb_txn_forward_complete(ctx->db, event->txn_reply);
> + break;
> +
> case OVSDB_CS_EVENT_TYPE_LOCKED:
> /* Not expected. */
> break;
> @@ -335,5 +342,6 @@ ovsdb_relay_wait(void)
> struct relay_ctx *ctx = node->data;
>
> ovsdb_cs_wait(ctx->cs);
> + ovsdb_txn_forward_wait(ctx->db, ctx->cs);
> }
> }
> diff --git a/ovsdb/transaction-forward.c b/ovsdb/transaction-forward.c
> new file mode 100644
> index 000000000..8ff12ef4b
> --- /dev/null
> +++ b/ovsdb/transaction-forward.c
> @@ -0,0 +1,182 @@
> +/*
> + * Copyright (c) 2021, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +
> +#include "transaction-forward.h"
> +
> +#include "coverage.h"
> +#include "jsonrpc.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/json.h"
> +#include "openvswitch/list.h"
> +#include "openvswitch/poll-loop.h"
> +#include "openvswitch/vlog.h"
> +#include "ovsdb.h"
> +#include "ovsdb-cs.h"
> +#include "util.h"
> +
> +VLOG_DEFINE_THIS_MODULE(transaction_forward);
> +
> +COVERAGE_DEFINE(txn_forward_cancel);
> +COVERAGE_DEFINE(txn_forward_complete);
> +COVERAGE_DEFINE(txn_forward_create);
> +COVERAGE_DEFINE(txn_forward_sent);
> +
> +struct ovsdb_txn_forward {
> + struct ovs_list new_node; /* In 'txn_forward_new' of struct ovsdb. */
> + struct hmap_node sent_node; /* In 'txn_forward_sent' of struct ovsdb. */
> + struct json *id; /* 'id' of the forwarded transaction. */
> + struct jsonrpc_msg *request; /* Original request. */
> + struct jsonrpc_msg *reply; /* Reply from the server. */
> +};
> +
> +struct ovsdb_txn_forward *
> +ovsdb_txn_forward_create(struct ovsdb *db, const struct jsonrpc_msg *request)
> +{
> + struct ovsdb_txn_forward *txn_fwd = xzalloc(sizeof *txn_fwd);
> +
> + COVERAGE_INC(txn_forward_create);
> + txn_fwd->request = jsonrpc_msg_clone(request);
> + ovs_list_push_back(&db->txn_forward_new, &txn_fwd->new_node);
> +
> + return txn_fwd;
> +}
> +
> +static void
> +ovsdb_txn_forward_unlist(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> + if (!ovs_list_is_empty(&txn_fwd->new_node)) {
> + ovs_list_remove(&txn_fwd->new_node);
> + ovs_list_init(&txn_fwd->new_node);
> + }
> + if (!hmap_node_is_null(&txn_fwd->sent_node)) {
> + hmap_remove(&db->txn_forward_sent, &txn_fwd->sent_node);
> + hmap_node_nullify(&txn_fwd->sent_node);
> + }
> +}
> +
> +void
> +ovsdb_txn_forward_destroy(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> + if (!txn_fwd) {
> + return;
> + }
> +
> + ovsdb_txn_forward_unlist(db, txn_fwd);
> + json_destroy(txn_fwd->id);
> + jsonrpc_msg_destroy(txn_fwd->request);
> + jsonrpc_msg_destroy(txn_fwd->reply);
> + free(txn_fwd);
> +}
> +
> +bool
> +ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *txn_fwd)
> +{
> + return txn_fwd->reply != NULL;
> +}
> +
> +void
> +ovsdb_txn_forward_complete(struct ovsdb *db, const struct jsonrpc_msg *reply)
> +{
> + struct ovsdb_txn_forward *t;
> + size_t hash = json_hash(reply->id, 0);
> +
> + HMAP_FOR_EACH_WITH_HASH (t, sent_node, hash, &db->txn_forward_sent) {
> + if (json_equal(reply->id, t->id)) {
> + COVERAGE_INC(txn_forward_complete);
> + t->reply = jsonrpc_msg_clone(reply);
> +
> + /* Replacing id with the id of the original request. */
> + json_destroy(t->reply->id);
> + t->reply->id = json_clone(t->request->id);
> +
> + hmap_remove(&db->txn_forward_sent, &t->sent_node);
> + hmap_node_nullify(&t->sent_node);
> +
> + db->run_triggers_now = db->run_triggers = true;
> + return;
> + }
> + }
> +}
> +
> +struct jsonrpc_msg *
> +ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *txn_fwd)
> +{
> + struct jsonrpc_msg *reply = txn_fwd->reply;
> +
> + txn_fwd->reply = NULL;
> + return reply;
> +}
> +
> +void
> +ovsdb_txn_forward_run(struct ovsdb *db, struct ovsdb_cs *cs)
> +{
> + struct ovsdb_txn_forward *t, *next;
> +
> + /* Send all transactions that needs to be forwarded. */
> + LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
> + if (!ovsdb_cs_may_send_transaction(cs)) {
> + break;
> + }
> + ovs_assert(!strcmp(t->request->method, "transact"));
> + t->id = ovsdb_cs_send_transaction(cs, json_clone(t->request->params));
> + if (t->id) {
> + COVERAGE_INC(txn_forward_sent);
> + ovs_list_remove(&t->new_node);
> + ovs_list_init(&t->new_node);
> + hmap_insert(&db->txn_forward_sent, &t->sent_node,
> + json_hash(t->id, 0));
> + }
> + }
> +}
> +
> +void
> +ovsdb_txn_forward_wait(struct ovsdb *db, struct ovsdb_cs *cs)
> +{
> + if (ovsdb_cs_may_send_transaction(cs)
> + && !ovs_list_is_empty(&db->txn_forward_new)) {
> + poll_immediate_wake();
> + }
> +}
> +
> +void
> +ovsdb_txn_forward_cancel(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd)
> +{
> + COVERAGE_INC(txn_forward_cancel);
> + jsonrpc_msg_destroy(txn_fwd->reply);
> + txn_fwd->reply = jsonrpc_create_error(json_string_create("canceled"),
> + txn_fwd->request->id);
> + ovsdb_txn_forward_unlist(db, txn_fwd);
> +}
> +
> +void
> +ovsdb_txn_forward_cancel_all(struct ovsdb *db, bool sent_only)
> +{
> + struct ovsdb_txn_forward *t, *next;
> +
> + HMAP_FOR_EACH_SAFE (t, next, sent_node, &db->txn_forward_sent) {
> + ovsdb_txn_forward_cancel(db, t);
> + }
> +
> + if (sent_only) {
> + return;
> + }
> +
> + LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) {
> + ovsdb_txn_forward_cancel(db, t);
> + }
> +}
> diff --git a/ovsdb/transaction-forward.h b/ovsdb/transaction-forward.h
> new file mode 100644
> index 000000000..6788d3824
> --- /dev/null
> +++ b/ovsdb/transaction-forward.h
> @@ -0,0 +1,44 @@
> +/*
> + * Copyright (c) 2021, Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVSDB_TXN_FORWARD_H
> +#define OVSDB_TXN_FORWARD_H 1
> +
> +#include <stdbool.h>
> +
> +struct ovsdb;
> +struct ovsdb_cs;
> +struct ovsdb_txn_forward;
> +struct jsonrpc_session;
> +struct jsonrpc_msg;
> +
> +struct ovsdb_txn_forward *ovsdb_txn_forward_create(
> + struct ovsdb *, const struct jsonrpc_msg *request);
> +void ovsdb_txn_forward_destroy(struct ovsdb *, struct ovsdb_txn_forward *);
> +
> +bool ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *);
> +void ovsdb_txn_forward_complete(struct ovsdb *,
> + const struct jsonrpc_msg *reply);
> +
> +struct jsonrpc_msg *ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *);
> +
> +void ovsdb_txn_forward_run(struct ovsdb *, struct ovsdb_cs *);
> +void ovsdb_txn_forward_wait(struct ovsdb *, struct ovsdb_cs *);
> +
> +void ovsdb_txn_forward_cancel(struct ovsdb *, struct ovsdb_txn_forward *);
> +void ovsdb_txn_forward_cancel_all(struct ovsdb *, bool sent_only);
> +
> +#endif /* OVSDB_TXN_FORWARD_H */
> diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
> index 0372302af..1c38a94c1 100644
> --- a/ovsdb/trigger.c
> +++ b/ovsdb/trigger.c
> @@ -28,6 +28,7 @@
> #include "openvswitch/poll-loop.h"
> #include "server.h"
> #include "transaction.h"
> +#include "transaction-forward.h"
> #include "openvswitch/vlog.h"
> #include "util.h"
>
> @@ -53,6 +54,7 @@ ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
> trigger->request = request;
> trigger->reply = NULL;
> trigger->progress = NULL;
> + trigger->txn_forward = NULL;
> trigger->created = now;
> trigger->timeout_msec = LLONG_MAX;
> trigger->read_only = read_only;
> @@ -65,6 +67,7 @@ void
> ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
> {
> ovsdb_txn_progress_destroy(trigger->progress);
> + ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
> ovs_list_remove(&trigger->node);
> jsonrpc_msg_destroy(trigger->request);
> jsonrpc_msg_destroy(trigger->reply);
> @@ -75,7 +78,7 @@ ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
> bool
> ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger)
> {
> - return trigger->reply && !trigger->progress;
> + return trigger->reply && !trigger->progress && !trigger->txn_forward;
> }
>
> struct jsonrpc_msg *
> @@ -98,6 +101,11 @@ ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason)
> trigger->progress = NULL;
> }
>
> + if (trigger->txn_forward) {
> + ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward);
> + trigger->txn_forward = NULL;
> + }
> +
> jsonrpc_msg_destroy(trigger->reply);
> trigger->reply = NULL;
>
> @@ -148,7 +156,7 @@ ovsdb_trigger_run(struct ovsdb *db, long long int now)
> LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) {
> if (run_triggers
> || now - t->created >= t->timeout_msec
> - || t->progress) {
> + || t->progress || t->txn_forward) {
> if (ovsdb_trigger_try(t, now)) {
> disconnect_all = true;
> }
> @@ -188,7 +196,7 @@ static bool
> ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
> {
> /* Handle "initialized" state. */
> - if (!t->reply) {
> + if (!t->reply && !t->txn_forward) {
> ovs_assert(!t->progress);
>
> struct ovsdb_txn *txn = NULL;
> @@ -198,13 +206,14 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
> return false;
> }
>
> - bool durable;
> + bool durable, forwarding_needed;
>
> struct json *result;
> + /* Trying to compose transaction. */
> txn = ovsdb_execute_compose(
> t->db, t->session, t->request->params, t->read_only,
> t->role, t->id, now - t->created, &t->timeout_msec,
> - &durable, &result);
> + &durable, &forwarding_needed, &result);
> if (!txn) {
> if (result) {
> /* Complete. There was an error but we still represent it
> @@ -217,9 +226,20 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
> return false;
> }
>
> - /* Transition to "committing" state. */
> - t->reply = jsonrpc_create_reply(result, t->request->id);
> - t->progress = ovsdb_txn_propose_commit(txn, durable);
> + if (forwarding_needed) {
> + /* Transaction is good, but we don't need it. */
> + ovsdb_txn_abort(txn);
> + json_destroy(result);
> + /* Transition to "forwarding" state. */
> + t->txn_forward = ovsdb_txn_forward_create(t->db, t->request);
> + /* Forward will not be completed immediately. Will check
> + * next time. */
> + return false;
> + } else {
> + /* Transition to "committing" state. */
> + t->reply = jsonrpc_create_reply(result, t->request->id);
> + t->progress = ovsdb_txn_propose_commit(txn, durable);
> + }
> } else if (!strcmp(t->request->method, "convert")) {
> /* Permission check. */
> if (t->role && *t->role) {
> @@ -348,6 +368,18 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
> ovsdb_trigger_complete(t);
> }
>
> + return false;
> + } else if (t->txn_forward) {
> + /* Handle "forwarding" state. */
Should we assert that reply == NULL and progress == NULL?
> + if (!ovsdb_txn_forward_is_complete(t->txn_forward)) {
> + return false;
> + }
> +
> + /* Transition to "complete". */
> + t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward);
> + ovsdb_txn_forward_destroy(t->db, t->txn_forward);
> + t->txn_forward = NULL;
> + ovsdb_trigger_complete(t);
> return false;
> }
>
> diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
> index 79af7f6be..d060c72e5 100644
> --- a/ovsdb/trigger.h
> +++ b/ovsdb/trigger.h
> @@ -22,26 +22,34 @@ struct ovsdb;
>
> /* Triggers have the following states:
> *
> - * - Initialized (reply == NULL, progress == NULL): Executing the trigger
> - * can keep it in the initialized state, if it has a "wait" condition that
> - * isn't met. Executing the trigger can also yield an error, in which
> - * case it transitions to "complete". Otherwise, execution yields a
> - * transaction, which the database attempts to commit. If the transaction
> - * completes immediately and synchronously, then the trigger transitions
> - * to the "complete" state. If the transaction requires some time to
> - * complete, it transitions to the "committing" state.
> + * - Initialized (reply == NULL, progress == NULL, txn_forward == NULL):
> + * Executing the trigger can keep it in the initialized state, if it has a
> + * "wait" condition that isn't met. Executing the trigger can also yield
> + * an error, in which case it transitions to "complete". Otherwise,
> + * execution yields a transaction, which the database attempts to commit.
> + * If the transaction completes immediately and synchronously, then the
> + * trigger transitions to the "complete" state. If the transaction
> + * requires some time to complete, it transitions to the "committing"
> + * state. If the transaction can not be completed locally due to
> + * read-only restrictions and transaction forwarding is enabled, starts
> + * forwarding and transitions to the "forwarding" state.
> *
> - * - Committing (reply != NULL, progress != NULL): The transaction is
> - * committing. If it succeeds, or if it fails permanently, then the
> - * trigger transitions to "complete". If it fails temporarily
> - * (e.g. because someone else committed to cluster-based storage before we
> - * did), then we transition back to "initialized" to try again.
> + * - Committing (reply != NULL, progress != NULL, txn_forward == NULL):
> + * The transaction is committing. If it succeeds, or if it fails
> + * permanently, then the trigger transitions to "complete". If it fails
> + * temporarily (e.g. because someone else committed to cluster-based
> + * storage before we did), then we transition back to "initialized" to
> + * try again.
> *
> - * - Complete (reply != NULL, progress == NULL): The transaction is done
> - * and either succeeded or failed.
> + * - Forwarding (reply == NULL, progress == NULL, txn_forward != NULL):
> + * Transaction is forwarded. Either it succeeds or it fails, the trigger
> + * transitions to "complete".
> + *
> + * - Complete (reply != NULL, progress == NULL, txn_forward == NULL):
> + * The transaction is done and either succeeded or failed.
> */
> struct ovsdb_trigger {
> - /* In "initialized" or "committing" state, in db->triggers.
> + /* In "initialized", "committing" or "forwarding" state, in db->triggers.
> * In "complete", in session->completions. */
> struct ovs_list node;
> struct ovsdb_session *session; /* Session that owns this trigger. */
> @@ -49,6 +57,7 @@ struct ovsdb_trigger {
> struct jsonrpc_msg *request; /* Database request. */
> struct jsonrpc_msg *reply; /* Result (null if none yet). */
> struct ovsdb_txn_progress *progress;
> + struct ovsdb_txn_forward *txn_forward; /* Tracks transaction forwarding. */
> long long int created; /* Time created. */
> long long int timeout_msec; /* Max wait duration. */
> bool read_only; /* Database is in read only mode. */
> diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
> index ba1b369c1..ac243d6a7 100644
> --- a/tests/ovsdb-server.at
> +++ b/tests/ovsdb-server.at
> @@ -3,10 +3,13 @@ AT_BANNER([OVSDB -- ovsdb-server transactions (Unix sockets)])
> m4_define([OVSDB_SERVER_SHUTDOWN],
> [OVS_APP_EXIT_AND_WAIT_BY_TARGET([ovsdb-server], [ovsdb-server.pid])])
>
> +m4_define([OVSDB_SERVER_SHUTDOWN_N],
> + [cp pid$1 savepid$1
> + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl$1 -e exit], [0], [ignore], [ignore])
> + OVS_WAIT_WHILE([kill -0 `cat savepid$1`], [kill `cat savepid$1`])])
> +
> m4_define([OVSDB_SERVER_SHUTDOWN2],
> - [cp pid2 savepid2
> - AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore])
> - OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])])
> + [OVSDB_SERVER_SHUTDOWN_N([2])])
>
> # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
> #
> @@ -1412,6 +1415,82 @@ m4_define([OVSDB_CHECK_EXECUTION],
>
> EXECUTION_EXAMPLES
>
> +AT_BANNER([OVSDB -- ovsdb-server relay])
> +
> +# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
> +#
> +# Creates a database with the given SCHEMA and starts an ovsdb-server on
> +# it. Also starts a daisy chain of ovsdb-servers in relay mode where the
> +# first relay server is connected to the main non-relay ovsdb-server.
> +#
> +# Runs each of the TRANSACTIONS (which should be a quoted list of
> +# quoted strings) against one of relay servers in the middle with
> +# ovsdb-client one at a time. The server executes read-only transactions
> +# and forwards rest of them to the previous ovsdb-server in a chain.
> +# The main ovsdb-server executes 'write' transactions. Transaction
> +# reply with data updates propagates back through the chain to all
> +# the servers and the client.
> +#
> +# main relay relay relay relay relay
> +# server1 <-- server2 <-- server3 <-- server4 <-- server5 <-- server6
> +# ^
> +# |
> +# ovsdb-client
> +#
> +# Checks that the overall output is OUTPUT, but UUIDs in the output
> +# are replaced by markers of the form <N> where N is a number. The
> +# first unique UUID is replaced by <0>, the next by <1>, and so on.
> +# If a given UUID appears more than once it is always replaced by the
> +# same marker.
> +#
> +# Checks that the dump of all databases is the same.
> +#
> +# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
> +m4_define([OVSDB_CHECK_EXECUTION],
> + [AT_SETUP([$1])
> + AT_KEYWORDS([ovsdb server tcp relay $5])
> + n_servers=6
> + target=4
> + $2 > schema
> + schema_name=`ovsdb-tool schema-name schema`
> + AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
> +
> + on_exit 'kill `cat *.pid`'
> + AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log dnl
> + --pidfile --remote=punix:db1.sock db1
> + ], [0], [ignore], [ignore])
> +
> + for i in $(seq 2 ${n_servers}); do
> + AT_CHECK([ovsdb-server --detach --no-chdir dnl
> + --log-file=ovsdb-server$i.log dnl
> + --pidfile=${i}.pid --remote=punix:db${i}.sock dnl
> + --unixctl=unixctl${i} -vjsonrpc:file:dbg dnl
> + relay:${schema_name}:unix:db$((i-1)).sock
> + ], [0], [ignore], [ignore])
> + done
> +
> + m4_foreach([txn], [$3],
> + [AT_CHECK([ovsdb-client transact unix:db${target}.sock 'txn'], [0],
> + [stdout], [ignore])
> + cat stdout >> output
> + ])
> +
> + AT_CHECK([uuidfilt output], [0], [$4], [ignore])
> +
> + AT_CHECK([ovsdb-client dump unix:db1.sock], [0], [stdout], [ignore])
> + for i in $(seq 2 ${n_servers}); do
> + OVS_WAIT_UNTIL([ovsdb-client dump unix:db${i}.sock > dump${i}; dnl
> + diff stdout dump${i}])
> + done
> +
> + OVSDB_SERVER_SHUTDOWN
> + for i in $(seq 2 ${n_servers}); do
> + OVSDB_SERVER_SHUTDOWN_N([$i])
> + done
> + AT_CLEANUP])
> +
> +EXECUTION_EXAMPLES
> +
> AT_BANNER([OVSDB -- ovsdb-server replication])
>
> # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
>
More information about the dev
mailing list