[ovs-dev] [PATCH 1/7] ovsdb: Add support for transaction forwarding to the replication mode.

Ilya Maximets i.maximets at ovn.org
Sat May 1 00:55:42 UTC 2021


Current version of ovsdb replication allows to scale out read-only
access to the primary database.  However, many clients are not
read-only but read-mostly.  For example, ovn-controller.

In order to scale out database access for this case ovsdb-server
need to process transactions that are not read-only.  Replica is
not allowed to do that, i.e. not allowed to modify the database,
but it can act like a proxy and forward transactions that includes
database modifications to the primary server and forward replies
back to client. At the same time it may serve read-only transactions
and monitor requests by itself greatly reducing the load on primary
server.

To support that new command line option '--enable-txn-forward' added
to ovsdb-server.

This configuration will slightly increase transaction latency, but
it's not very important for read-mostly use cases.

With this change instead of creating a trigger to commit the
transaction, ovsdb-server will create a trigger for transaction
forwarding.  Later, replication_run() will send all new transactions
to the replication source.  Once transaction reply received from the
replication source, replication module will update the state of the
transaction forwarding with the reply.  After that, trigger_run()
will complete the trigger and jsonrpc_server_run() will send the
reply back to the client.  Since transaction reply from the replication
source will be received after all the updates, client will receive
all the updates before receiving the transaction reply as it is in
a normal scenario without transaction forwarding.

Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
---
 Documentation/ref/ovsdb.7.rst |   8 ++
 NEWS                          |   4 +
 ovsdb/automake.mk             |   2 +
 ovsdb/execution.c             |  14 ++-
 ovsdb/jsonrpc-server.c        |  61 +++++++++--
 ovsdb/jsonrpc-server.h        |   6 +-
 ovsdb/ovsdb-server.c          |  54 +++++++---
 ovsdb/ovsdb.h                 |   2 +-
 ovsdb/replication.c           |  34 ++++++-
 ovsdb/transaction-forward.c   | 187 ++++++++++++++++++++++++++++++++++
 ovsdb/transaction-forward.h   |  42 ++++++++
 ovsdb/trigger.c               |  62 +++++++++--
 ovsdb/trigger.h               |  45 ++++----
 tests/ovsdb-server.at         |  64 +++++++++++-
 tests/test-ovsdb.c            |   2 +-
 15 files changed, 526 insertions(+), 61 deletions(-)
 create mode 100644 ovsdb/transaction-forward.c
 create mode 100644 ovsdb/transaction-forward.h

diff --git a/Documentation/ref/ovsdb.7.rst b/Documentation/ref/ovsdb.7.rst
index e4f1bf766..164f40b7b 100644
--- a/Documentation/ref/ovsdb.7.rst
+++ b/Documentation/ref/ovsdb.7.rst
@@ -424,6 +424,14 @@ A database can have multiple replicas.
 
 Open vSwitch 2.6 introduced support for database replication.
 
+Open vSwitch 2.16 introduced transaction forwarding support for database
+replication.  It can be enabled by passing ``--enable-txn-forward`` command
+line argument to ``ovsdb-server``.  A replica with enabled transaction
+forwarding can be used to scale out read-mostly access to the primary database.
+In this case replica will work as a proxy between the client and the primary
+database for transactions that needs to modify the database, while executing
+read-only transactions and serving monitor requests by itself.
+
 Connection Methods
 ==================
 
diff --git a/NEWS b/NEWS
index 95cf922aa..790f93af9 100644
--- a/NEWS
+++ b/NEWS
@@ -1,5 +1,9 @@
 Post-v2.15.0
 ---------------------
+   - OVSDB:
+     * New command line argument '--enable-txn-forward' for ovsdb-server in
+       replication mode that allows to forward transactions that includes
+       database modifications to the primary database server.
    - In ovs-vsctl and vtep-ctl, the "find" command now accept new
      operators {in} and {not-in}.
    - Userspace datapath:
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index 446d6c136..5f5da26d4 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -46,6 +46,8 @@ ovsdb_libovsdb_la_SOURCES = \
 	ovsdb/trigger.h \
 	ovsdb/transaction.c \
 	ovsdb/transaction.h \
+	ovsdb/transaction-forward.c \
+	ovsdb/transaction-forward.h \
 	ovsdb/ovsdb-util.c \
 	ovsdb/ovsdb-util.h
 ovsdb_libovsdb_la_CFLAGS = $(AM_CFLAGS)
diff --git a/ovsdb/execution.c b/ovsdb/execution.c
index 3a0dad5d0..fb94c2071 100644
--- a/ovsdb/execution.c
+++ b/ovsdb/execution.c
@@ -99,7 +99,8 @@ lookup_executor(const char *name, bool *read_only)
 }
 
 /* On success, returns a transaction and stores the results to return to the
- * client in '*resultsp'.
+ * client in '*resultsp'.  If 'all_ops_read_only' is nonnull and all oparations
+ * in transaction are read-only operations, sets '*all_ops_read_only' to true.
  *
  * On failure, returns NULL.  If '*resultsp' is nonnull, then it is the results
  * to return to the client.  If '*resultsp' is null, then the execution failed
@@ -111,7 +112,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
                       const struct json *params, bool read_only,
                       const char *role, const char *id,
                       long long int elapsed_msec, long long int *timeout_msec,
-                      bool *durable, struct json **resultsp)
+                      bool *durable, bool *all_ops_read_only,
+                      struct json **resultsp)
 {
     struct ovsdb_execution x;
     struct ovsdb_error *error;
@@ -120,6 +122,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
     size_t i;
 
     *durable = false;
+    if (all_ops_read_only) {
+        *all_ops_read_only = true;
+    }
     if (params->type != JSON_ARRAY
         || !params->array.n
         || params->array.elems[0]->type != JSON_STRING
@@ -210,6 +215,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct ovsdb_session *session,
             }
             break;
         }
+        if (!ro && all_ops_read_only) {
+            *all_ops_read_only = false;
+        }
         json_array_add(results, result);
     }
     while (json_array(results)->n < n_operations) {
@@ -240,7 +248,7 @@ ovsdb_execute(struct ovsdb *db, const struct ovsdb_session *session,
     struct json *results;
     struct ovsdb_txn *txn = ovsdb_execute_compose(
         db, session, params, read_only, role, id, elapsed_msec, timeout_msec,
-        &durable, &results);
+        &durable, NULL, &results);
     if (!txn) {
         return results;
     }
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 4e2dfc3d7..0ff4f7c32 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -57,7 +57,7 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 /* Sessions. */
 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
-    struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
+    struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool, bool);
 static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
                                                struct ovsdb *);
 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
@@ -82,6 +82,8 @@ static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
                                        struct jsonrpc_msg *);
 static void ovsdb_jsonrpc_session_set_readonly_all(
     struct ovsdb_jsonrpc_remote *remote, bool read_only);
+static void ovsdb_jsonrpc_session_set_txn_forward_all(
+    struct ovsdb_jsonrpc_remote *remote, bool txn_forward);
 
 /* Triggers. */
 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
@@ -127,6 +129,8 @@ struct ovsdb_jsonrpc_server {
     unsigned int n_sessions;
     bool read_only;            /* This server is does not accept any
                                   transactions that can modify the database. */
+    bool txn_forward;          /* This server is able to forward transactions
+                                  to another server. */
     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
 };
 
@@ -139,6 +143,7 @@ struct ovsdb_jsonrpc_remote {
     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
     uint8_t dscp;
     bool read_only;
+    bool txn_forward;
     char *role;
 };
 
@@ -153,12 +158,13 @@ static void ovsdb_jsonrpc_server_del_remote(struct shash_node *);
  * The caller must call ovsdb_jsonrpc_server_add_db() for each database to
  * which 'server' should provide access. */
 struct ovsdb_jsonrpc_server *
-ovsdb_jsonrpc_server_create(bool read_only)
+ovsdb_jsonrpc_server_create(bool read_only, bool txn_forward)
 {
     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     ovsdb_server_init(&server->up);
     shash_init(&server->remotes);
     server->read_only = read_only;
+    server->txn_forward = txn_forward;
     return server;
 }
 
@@ -215,6 +221,7 @@ ovsdb_jsonrpc_default_options(const char *target)
     options->probe_interval = (stream_or_pstream_needs_probes(target)
                                ? RECONNECT_DEFAULT_PROBE_INTERVAL
                                : 0);
+    options->txn_forward = true;
     return options;
 }
 
@@ -278,12 +285,14 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     ovs_list_init(&remote->sessions);
     remote->dscp = options->dscp;
     remote->read_only = options->read_only;
+    remote->txn_forward = options->txn_forward;
     remote->role = nullable_xstrdup(options->role);
     shash_add(&svr->remotes, name, remote);
 
     if (!listener) {
         ovsdb_jsonrpc_session_create(remote, jsonrpc_session_open(name, true),
-                                      svr->read_only || remote->read_only);
+                                     svr->read_only || remote->read_only,
+                                     svr->txn_forward && remote->txn_forward);
     }
     return remote;
 }
@@ -377,6 +386,22 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
     }
 }
 
+void
+ovsdb_jsonrpc_server_set_txn_forward(struct ovsdb_jsonrpc_server *svr,
+                                     bool txn_forward)
+{
+    if (svr->txn_forward != txn_forward) {
+        svr->txn_forward = txn_forward;
+
+        struct shash_node *node;
+        SHASH_FOR_EACH (node, &svr->remotes) {
+            struct ovsdb_jsonrpc_remote *remote = node->data;
+
+            ovsdb_jsonrpc_session_set_txn_forward_all(remote, txn_forward);
+        }
+    }
+}
+
 void
 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 {
@@ -394,8 +419,9 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
                 struct jsonrpc_session *js;
                 js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
                                                      remote->dscp);
-                ovsdb_jsonrpc_session_create(remote, js, svr->read_only ||
-                                                         remote->read_only);
+                ovsdb_jsonrpc_session_create(remote, js,
+                    svr->read_only || remote->read_only,
+                    svr->txn_forward && remote->txn_forward);
             } else if (error != EAGAIN) {
                 VLOG_WARN_RL(&rl, "%s: accept failed: %s",
                              pstream_get_name(remote->listener),
@@ -473,6 +499,10 @@ struct ovsdb_jsonrpc_session {
     /* Read only. */
     bool read_only;             /*  When true, not allow to modify the
                                     database. */
+
+    /* Transaction forwarding. */
+    bool txn_forward;           /*  When true, allow to forward incoming
+                                    transactions. */
 };
 
 static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
@@ -487,7 +517,8 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
 
 static struct ovsdb_jsonrpc_session *
 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
-                             struct jsonrpc_session *js, bool read_only)
+                             struct jsonrpc_session *js,
+                             bool read_only, bool txn_forward)
 {
     struct ovsdb_jsonrpc_session *s;
 
@@ -500,6 +531,7 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
     s->js = js;
     s->js_seqno = jsonrpc_session_get_seqno(js);
     s->read_only = read_only;
+    s->txn_forward = txn_forward;
 
     remote->server->n_sessions++;
 
@@ -599,8 +631,10 @@ static void
 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
 {
     jsonrpc_session_wait(s->js);
+
     if (!jsonrpc_session_get_backlog(s->js)) {
-        if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
+        if (ovsdb_jsonrpc_monitor_needs_flush(s)
+            || !ovs_list_is_empty(&s->up.completions)) {
             poll_immediate_wake();
         } else {
             jsonrpc_session_recv_wait(s->js);
@@ -686,6 +720,17 @@ ovsdb_jsonrpc_session_set_readonly_all(struct ovsdb_jsonrpc_remote *remote,
     }
 }
 
+static void
+ovsdb_jsonrpc_session_set_txn_forward_all(struct ovsdb_jsonrpc_remote *remote,
+                                          bool txn_forward)
+{
+    struct ovsdb_jsonrpc_session *s;
+
+    LIST_FOR_EACH (s, node, &remote->sessions) {
+        s->txn_forward = txn_forward;
+    }
+}
+
 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
  * 'options'.
  *
@@ -1134,7 +1179,7 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     t = xmalloc(sizeof *t);
     bool disconnect_all = ovsdb_trigger_init(
         &s->up, db, &t->trigger, request, time_msec(), s->read_only,
-        s->remote->role, jsonrpc_session_get_id(s->js));
+        s->txn_forward, s->remote->role, jsonrpc_session_get_id(s->js));
     t->id = json_clone(request->id);
     hmap_insert(&s->triggers, &t->hmap_node, hash);
 
diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
index e0653aa39..0a8ec63b1 100644
--- a/ovsdb/jsonrpc-server.h
+++ b/ovsdb/jsonrpc-server.h
@@ -24,7 +24,8 @@ struct shash;
 struct simap;
 struct uuid;
 
-struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(bool read_only);
+struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(bool read_only,
+                                                         bool txn_forward);
 bool ovsdb_jsonrpc_server_add_db(struct ovsdb_jsonrpc_server *,
                                  struct ovsdb *);
 void ovsdb_jsonrpc_server_remove_db(struct ovsdb_jsonrpc_server *,
@@ -36,6 +37,7 @@ struct ovsdb_jsonrpc_options {
     int max_backoff;            /* Maximum reconnection backoff, in msec. */
     int probe_interval;         /* Max idle time before probing, in msec. */
     bool read_only;             /* Only read-only transactions are allowed. */
+    bool txn_forward;           /* Transaction forwarding is allowed. */
     int dscp;                   /* Dscp value for manager connections */
     char *role;                 /* Role, for role-based access controls */
 };
@@ -72,6 +74,8 @@ void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
 
 void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
                                         bool read_only);
+void ovsdb_jsonrpc_server_set_txn_forward(struct ovsdb_jsonrpc_server *,
+                                          bool txn_forward);
 
 void ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *,
                                            struct simap *usage);
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index 29a2bace8..c9d21af1c 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -103,6 +103,7 @@ struct server_config {
     char **sync_from;
     char **sync_exclude;
     bool *is_backup;
+    bool *txn_forward;
     int *replication_probe_interval;
     struct ovsdb_jsonrpc_server *jsonrpc;
 };
@@ -126,7 +127,7 @@ static void parse_options(int argc, char *argvp[],
                           struct sset *db_filenames, struct sset *remotes,
                           char **unixctl_pathp, char **run_command,
                           char **sync_from, char **sync_exclude,
-                          bool *is_backup);
+                          bool *is_backup, bool *txn_forward);
 OVS_NO_RETURN static void usage(void);
 
 static char *reconfigure_remotes(struct ovsdb_jsonrpc_server *,
@@ -143,11 +144,12 @@ static void update_server_status(struct shash *all_dbs);
 static void save_config__(FILE *config_file, const struct sset *remotes,
                           const struct sset *db_filenames,
                           const char *sync_from, const char *sync_exclude,
-                          bool is_backup);
+                          bool is_backup, bool txn_forward);
 static void save_config(struct server_config *);
 static void load_config(FILE *config_file, struct sset *remotes,
                         struct sset *db_filenames, char **sync_from,
-                        char **sync_exclude, bool *is_backup);
+                        char **sync_exclude, bool *is_backup,
+                        bool *txn_forward);
 
 static void
 ovsdb_replication_init(const char *sync_from, const char *exclude,
@@ -178,7 +180,8 @@ static void
 main_loop(struct server_config *config,
           struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
           struct unixctl_server *unixctl, struct sset *remotes,
-          struct process *run_process, bool *exiting, bool *is_backup)
+          struct process *run_process, bool *exiting, bool *is_backup,
+          bool *txn_forward)
 {
     char *remotes_error, *ssl_error;
     struct shash_node *node;
@@ -209,6 +212,7 @@ main_loop(struct server_config *config,
         unixctl_server_run(unixctl);
 
         ovsdb_jsonrpc_server_set_read_only(jsonrpc, *is_backup);
+        ovsdb_jsonrpc_server_set_txn_forward(jsonrpc, *txn_forward);
 
         report_error_if_changed(
             reconfigure_remotes(jsonrpc, all_dbs, remotes),
@@ -321,8 +325,10 @@ main(int argc, char *argv[])
     process_init();
 
     bool active = false;
+    bool txn_forward = false;
     parse_options(argc, argv, &db_filenames, &remotes, &unixctl_path,
-                  &run_command, &sync_from, &sync_exclude, &active);
+                  &run_command, &sync_from, &sync_exclude, &active,
+                  &txn_forward);
     is_backup = sync_from && !active;
 
     daemon_become_new_user(false);
@@ -341,18 +347,18 @@ main(int argc, char *argv[])
     server_config.config_tmpfile = config_tmpfile;
 
     save_config__(config_tmpfile, &remotes, &db_filenames, sync_from,
-                  sync_exclude, is_backup);
+                  sync_exclude, is_backup, txn_forward);
 
     daemonize_start(false);
 
     /* Load the saved config. */
     load_config(config_tmpfile, &remotes, &db_filenames, &sync_from,
-                &sync_exclude, &is_backup);
+                &sync_exclude, &is_backup, &txn_forward);
 
     /* Start ovsdb jsonrpc server. When running as a backup server,
      * jsonrpc connections are read only. Otherwise, both read
      * and write transactions are allowed.  */
-    jsonrpc = ovsdb_jsonrpc_server_create(is_backup);
+    jsonrpc = ovsdb_jsonrpc_server_create(is_backup, txn_forward);
 
     shash_init(&all_dbs);
     server_config.all_dbs = &all_dbs;
@@ -360,6 +366,7 @@ main(int argc, char *argv[])
     server_config.sync_from = &sync_from;
     server_config.sync_exclude = &sync_exclude;
     server_config.is_backup = &is_backup;
+    server_config.txn_forward = &txn_forward;
     server_config.replication_probe_interval = &replication_probe_interval;
 
     perf_counters_init();
@@ -478,7 +485,7 @@ main(int argc, char *argv[])
     }
 
     main_loop(&server_config, jsonrpc, &all_dbs, unixctl, &remotes,
-              run_process, &exiting, &is_backup);
+              run_process, &exiting, &is_backup, &txn_forward);
 
     SHASH_FOR_EACH_SAFE(node, next, &all_dbs) {
         struct db *db = node->data;
@@ -905,7 +912,7 @@ add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
     struct ovsdb_jsonrpc_options *options;
     long long int max_backoff, probe_interval;
-    bool read_only;
+    bool read_only, txn_forward;
     const char *target, *dscp_string, *role;
 
     if (!ovsdb_util_read_string_column(row, "target", &target) || !target) {
@@ -925,6 +932,9 @@ add_manager_options(struct shash *remotes, const struct ovsdb_row *row)
     if (ovsdb_util_read_bool_column(row, "read_only", &read_only)) {
         options->read_only = read_only;
     }
+    if (ovsdb_util_read_bool_column(row, "txn_forward", &txn_forward)) {
+        options->txn_forward = txn_forward;
+    }
 
     free(options->role);
     options->role = NULL;
@@ -1733,7 +1743,8 @@ ovsdb_server_get_sync_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
     struct ds ds = DS_EMPTY_INITIALIZER;
 
     ds_put_format(&ds, "state: %s\n", is_backup ? "backup" : "active");
-
+    ds_put_format(&ds, "transaction forwarding: %s\n",
+                       *config->txn_forward ? "enabled" : "disabled");
     if (is_backup) {
         ds_put_and_free_cstr(&ds, replication_status());
     }
@@ -1781,7 +1792,8 @@ static void
 parse_options(int argc, char *argv[],
               struct sset *db_filenames, struct sset *remotes,
               char **unixctl_pathp, char **run_command,
-              char **sync_from, char **sync_exclude, bool *active)
+              char **sync_from, char **sync_exclude,
+              bool *active, bool *txn_forward)
 {
     enum {
         OPT_REMOTE = UCHAR_MAX + 1,
@@ -1791,6 +1803,7 @@ parse_options(int argc, char *argv[],
         OPT_PEER_CA_CERT,
         OPT_SYNC_FROM,
         OPT_SYNC_EXCLUDE,
+        OPT_TXN_FORWARD,
         OPT_ACTIVE,
         OPT_NO_DBS,
         OPT_FILE_COLUMN_DIFF,
@@ -1814,6 +1827,7 @@ parse_options(int argc, char *argv[],
         STREAM_SSL_LONG_OPTIONS,
         {"sync-from",   required_argument, NULL, OPT_SYNC_FROM},
         {"sync-exclude-tables", required_argument, NULL, OPT_SYNC_EXCLUDE},
+        {"enable-txn-forward", no_argument, NULL, OPT_TXN_FORWARD},
         {"active", no_argument, NULL, OPT_ACTIVE},
         {"no-dbs", no_argument, NULL, OPT_NO_DBS},
         {"disable-file-column-diff", no_argument, NULL, OPT_FILE_COLUMN_DIFF},
@@ -1899,6 +1913,11 @@ parse_options(int argc, char *argv[],
             *sync_exclude = xstrdup(optarg);
             break;
         }
+
+        case OPT_TXN_FORWARD:
+            *txn_forward = true;
+            break;
+
         case OPT_ACTIVE:
             *active = true;
             break;
@@ -1973,7 +1992,7 @@ sset_to_json(const struct sset *sset)
 static void
 save_config__(FILE *config_file, const struct sset *remotes,
               const struct sset *db_filenames, const char *sync_from,
-              const char *sync_exclude, bool is_backup)
+              const char *sync_exclude, bool is_backup, bool txn_forward)
 {
     struct json *obj;
     char *s;
@@ -1994,6 +2013,7 @@ save_config__(FILE *config_file, const struct sset *remotes,
                         json_string_create(sync_exclude));
     }
     json_object_put(obj, "is_backup", json_boolean_create(is_backup));
+    json_object_put(obj, "txn_forward", json_boolean_create(txn_forward));
 
     s = json_to_string(obj, 0);
     json_destroy(obj);
@@ -2024,7 +2044,7 @@ save_config(struct server_config *config)
 
     save_config__(config->config_tmpfile, config->remotes, &db_filenames,
                   *config->sync_from, *config->sync_exclude,
-                  *config->is_backup);
+                  *config->is_backup, *config->txn_forward);
 
     sset_destroy(&db_filenames);
 }
@@ -2047,7 +2067,8 @@ sset_from_json(struct sset *sset, const struct json *array)
  * 'config_file', which must have been previously written by save_config(). */
 static void
 load_config(FILE *config_file, struct sset *remotes, struct sset *db_filenames,
-            char **sync_from, char **sync_exclude, bool *is_backup)
+            char **sync_from, char **sync_exclude, bool *is_backup,
+            bool *txn_forward)
 {
     struct json *json;
 
@@ -2074,6 +2095,7 @@ load_config(FILE *config_file, struct sset *remotes, struct sset *db_filenames,
     *sync_exclude = string ? xstrdup(json_string(string)) : NULL;
 
     *is_backup = json_boolean(shash_find_data(json_object(json), "is_backup"));
-
+    *txn_forward = json_boolean(shash_find_data(json_object(json),
+                                                "txn_forward"));
     json_destroy(json);
 }
diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
index 72e127c84..db86f6da5 100644
--- a/ovsdb/ovsdb.h
+++ b/ovsdb/ovsdb.h
@@ -104,7 +104,7 @@ struct ovsdb_txn *ovsdb_execute_compose(
     struct ovsdb *, const struct ovsdb_session *, const struct json *params,
     bool read_only, const char *role, const char *id,
     long long int elapsed_msec, long long int *timeout_msec,
-    bool *durable, struct json **);
+    bool *durable, bool *all_ops_read_only, struct json **);
 
 struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *,
                            const struct json *params, bool read_only,
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index bb1bd4250..543881843 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -34,6 +34,7 @@
 #include "svec.h"
 #include "table.h"
 #include "transaction.h"
+#include "transaction-forward.h"
 #include "uuid.h"
 
 VLOG_DEFINE_THIS_MODULE(replication);
@@ -140,6 +141,9 @@ replication_init(const char *sync_from_, const char *exclude_tables,
         jsonrpc_session_close(session);
     }
 
+    /* Cancel all pending transactions. */
+    ovsdb_txn_forward_cancel_all(false);
+
     session = jsonrpc_session_open(sync_from, true);
     session_seqno = UINT_MAX;
 
@@ -198,6 +202,9 @@ replication_run(void)
         if (seqno != session_seqno || state == RPL_S_INIT) {
             session_seqno = seqno;
             request_ids_clear();
+            /* Canceling all the transactions that have already been forwarded,
+             * as they might be lost. */
+            ovsdb_txn_forward_cancel_all(true);
             struct jsonrpc_msg *request;
             request = jsonrpc_create_request("get_server_id",
                                              json_array_create_empty(), NULL);
@@ -208,6 +215,12 @@ replication_run(void)
             VLOG_DBG("send server ID request.");
         }
 
+        if (state == RPL_S_REPLICATING) {
+            /* Replication is active.  Trying to forward client transactions
+             * to the replication source. */
+            ovsdb_txn_forward_run(session);
+        }
+
         msg = jsonrpc_session_recv(session);
         if (!msg) {
             continue;
@@ -233,7 +246,8 @@ replication_run(void)
         } else if (msg->type == JSONRPC_REPLY) {
             struct replication_db *rdb;
             struct ovsdb *db;
-            if (!request_ids_lookup_and_free(msg->id, &db)) {
+            if (!request_ids_lookup_and_free(msg->id, &db)
+                && state != RPL_S_REPLICATING) {
                 VLOG_WARN("received unexpected reply");
                 goto next;
             }
@@ -383,12 +397,17 @@ replication_run(void)
                 break;
             }
 
+            case RPL_S_REPLICATING:
+                /* We're not expecting any replies in this state.  Assuming
+                 * this is a reply for forwarded transaction. */
+                ovsdb_txn_forward_complete(msg);
+                break;
+
             case RPL_S_ERR:
                 /* Ignore all messages */
                 break;
 
             case RPL_S_INIT:
-            case RPL_S_REPLICATING:
             default:
                 OVS_NOT_REACHED();
             }
@@ -404,6 +423,11 @@ replication_wait(void)
     if (session) {
         jsonrpc_session_wait(session);
         jsonrpc_session_recv_wait(session);
+
+        if (jsonrpc_session_is_connected(session)
+            && state == RPL_S_REPLICATING) {
+            ovsdb_txn_forward_wait();
+        }
     }
 }
 
@@ -526,6 +550,8 @@ disconnect_active_server(void)
 {
     jsonrpc_session_close(session);
     session = NULL;
+    /* Cancel all pending transactions. */
+    ovsdb_txn_forward_cancel_all(false);
 }
 
 void
@@ -542,6 +568,8 @@ replication_destroy(void)
     request_ids_destroy();
     replication_dbs_destroy();
 
+    ovsdb_txn_forward_cancel_all(false);
+
     shash_destroy(&local_dbs);
 }
 
@@ -998,5 +1026,7 @@ Syncing options:\n\
                           backup mode (except with --active)\n\
   --sync-exclude-tables=DB:TABLE,...\n\
                           exclude the TABLE in DB from syncing\n\
+  --enable-txn-forward    allow trnasaction forwarding to the\n\
+                          replication source.\n\
   --active                with --sync-from, start in active mode\n");
 }
diff --git a/ovsdb/transaction-forward.c b/ovsdb/transaction-forward.c
new file mode 100644
index 000000000..f865ff066
--- /dev/null
+++ b/ovsdb/transaction-forward.c
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2021, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "transaction-forward.h"
+
+#include "coverage.h"
+#include "jsonrpc.h"
+#include "openvswitch/hmap.h"
+#include "openvswitch/json.h"
+#include "openvswitch/list.h"
+#include "openvswitch/poll-loop.h"
+#include "openvswitch/vlog.h"
+#include "ovsdb.h"
+#include "util.h"
+
+VLOG_DEFINE_THIS_MODULE(transaction_forward);
+
+COVERAGE_DEFINE(txn_forward_cancel);
+COVERAGE_DEFINE(txn_forward_complete);
+COVERAGE_DEFINE(txn_forward_create);
+COVERAGE_DEFINE(txn_forward_sent);
+
+struct ovsdb_txn_forward {
+    struct ovs_list new_node;    /* In 'new_transactions'. */
+    struct hmap_node sent_node;  /* In 'sent_transactions'. */
+    struct ovsdb *db;            /* Database of this transaction. */
+    struct json *id;             /* 'id' of the forwarded transaction. */
+    struct jsonrpc_msg *request; /* Original request. */
+    struct jsonrpc_msg *reply;   /* Reply from the server. */
+};
+
+/* List that holds transactions waiting to be forwarded to the server. */
+static struct ovs_list new_transactions =
+    OVS_LIST_INITIALIZER(&new_transactions);
+/* Hash map for transactions that are already sent and waits for reply. */
+static struct hmap sent_transactions = HMAP_INITIALIZER(&sent_transactions);
+
+struct ovsdb_txn_forward *
+ovsdb_txn_forward_create(struct ovsdb *db, const struct jsonrpc_msg *request)
+{
+    struct ovsdb_txn_forward *txn_fwd = xzalloc(sizeof *txn_fwd);
+
+    COVERAGE_INC(txn_forward_create);
+    txn_fwd->db = db;
+    txn_fwd->request = jsonrpc_msg_clone(request);
+    ovs_list_push_back(&new_transactions, &txn_fwd->new_node);
+
+    return txn_fwd;
+}
+
+static void
+ovsdb_txn_forward_unlist(struct ovsdb_txn_forward *txn_fwd)
+{
+    if (!ovs_list_is_empty(&txn_fwd->new_node)) {
+        ovs_list_remove(&txn_fwd->new_node);
+        ovs_list_init(&txn_fwd->new_node);
+    }
+    if (!hmap_node_is_null(&txn_fwd->sent_node)) {
+        hmap_remove(&sent_transactions, &txn_fwd->sent_node);
+        hmap_node_nullify(&txn_fwd->sent_node);
+    }
+}
+
+void
+ovsdb_txn_forward_destroy(struct ovsdb_txn_forward *txn_fwd)
+{
+    if (!txn_fwd) {
+        return;
+    }
+
+    ovsdb_txn_forward_unlist(txn_fwd);
+    json_destroy(txn_fwd->id);
+    jsonrpc_msg_destroy(txn_fwd->request);
+    jsonrpc_msg_destroy(txn_fwd->reply);
+    free(txn_fwd);
+}
+
+bool
+ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *txn_fwd)
+{
+    return txn_fwd->reply != NULL;
+}
+
+void
+ovsdb_txn_forward_complete(const struct jsonrpc_msg *reply)
+{
+    struct ovsdb_txn_forward *t;
+    size_t hash = json_hash(reply->id, 0);
+
+    HMAP_FOR_EACH_WITH_HASH (t, sent_node, hash, &sent_transactions) {
+        if (json_equal(reply->id, t->id)) {
+            COVERAGE_INC(txn_forward_complete);
+            t->reply = jsonrpc_msg_clone(reply);
+
+            /* Replacing id with the id of the original request. */
+            json_destroy(t->reply->id);
+            t->reply->id = json_clone(t->request->id);
+
+            hmap_remove(&sent_transactions, &t->sent_node);
+            hmap_node_nullify(&t->sent_node);
+
+            t->db->run_triggers_now = t->db->run_triggers = true;
+            return;
+        }
+    }
+}
+
+struct jsonrpc_msg *
+ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *txn_fwd)
+{
+    struct jsonrpc_msg *reply = txn_fwd->reply;
+
+    txn_fwd->reply = NULL;
+    return reply;
+}
+
+void
+ovsdb_txn_forward_run(struct jsonrpc_session *session)
+{
+    struct ovsdb_txn_forward *t, *next;
+    struct jsonrpc_msg *request;
+
+    /* Send all transactions that needs to be forwarded. */
+    LIST_FOR_EACH_SAFE (t, next, new_node, &new_transactions) {
+        request = jsonrpc_create_request(t->request->method,
+                                         json_clone(t->request->params),
+                                         &t->id);
+        if (!jsonrpc_session_send(session, request)) {
+            COVERAGE_INC(txn_forward_sent);
+            ovs_list_remove(&t->new_node);
+            ovs_list_init(&t->new_node);
+            hmap_insert(&sent_transactions, &t->sent_node,
+                        json_hash(t->id, 0));
+        }
+    }
+}
+
+void
+ovsdb_txn_forward_wait(void)
+{
+    if (!ovs_list_is_empty(&new_transactions)) {
+        poll_immediate_wake();
+    }
+}
+
+void
+ovsdb_txn_forward_cancel(struct ovsdb_txn_forward *txn_fwd)
+{
+    COVERAGE_INC(txn_forward_cancel);
+    jsonrpc_msg_destroy(txn_fwd->reply);
+    txn_fwd->reply = jsonrpc_create_error(json_string_create("canceled"),
+                                          txn_fwd->request->id);
+    ovsdb_txn_forward_unlist(txn_fwd);
+}
+
+void
+ovsdb_txn_forward_cancel_all(bool sent_only)
+{
+    struct ovsdb_txn_forward *t, *next;
+
+    HMAP_FOR_EACH_SAFE (t, next, sent_node, &sent_transactions) {
+        ovsdb_txn_forward_cancel(t);
+    }
+
+    if (sent_only) {
+        return;
+    }
+
+    LIST_FOR_EACH_SAFE (t, next, new_node, &new_transactions) {
+        ovsdb_txn_forward_cancel(t);
+    }
+}
diff --git a/ovsdb/transaction-forward.h b/ovsdb/transaction-forward.h
new file mode 100644
index 000000000..6bd0aed64
--- /dev/null
+++ b/ovsdb/transaction-forward.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2021, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef OVSDB_TXN_FORWARD_H
+#define OVSDB_TXN_FORWARD_H 1
+
+#include <stdbool.h>
+
+struct ovsdb;
+struct ovsdb_txn_forward;
+struct jsonrpc_session;
+struct jsonrpc_msg;
+
+struct ovsdb_txn_forward *ovsdb_txn_forward_create(
+    struct ovsdb *, const struct jsonrpc_msg *request);
+void ovsdb_txn_forward_destroy(struct ovsdb_txn_forward *);
+
+bool ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *);
+void ovsdb_txn_forward_complete(const struct jsonrpc_msg *reply);
+
+struct jsonrpc_msg *ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *);
+
+void ovsdb_txn_forward_run(struct jsonrpc_session *);
+void ovsdb_txn_forward_wait(void);
+
+void ovsdb_txn_forward_cancel(struct ovsdb_txn_forward *);
+void ovsdb_txn_forward_cancel_all(bool sent_only);
+
+#endif /* OVSDB_TXN_FORWARD_H */
diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c
index 0372302af..7067f84cc 100644
--- a/ovsdb/trigger.c
+++ b/ovsdb/trigger.c
@@ -28,6 +28,7 @@
 #include "openvswitch/poll-loop.h"
 #include "server.h"
 #include "transaction.h"
+#include "transaction-forward.h"
 #include "openvswitch/vlog.h"
 #include "util.h"
 
@@ -43,7 +44,8 @@ bool
 ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
                    struct ovsdb_trigger *trigger,
                    struct jsonrpc_msg *request, long long int now,
-                   bool read_only, const char *role, const char *id)
+                   bool read_only, bool txn_forward_enabled,
+                   const char *role, const char *id)
 {
     ovs_assert(!strcmp(request->method, "transact") ||
                !strcmp(request->method, "convert"));
@@ -53,9 +55,11 @@ ovsdb_trigger_init(struct ovsdb_session *session, struct ovsdb *db,
     trigger->request = request;
     trigger->reply = NULL;
     trigger->progress = NULL;
+    trigger->txn_forward = NULL;
     trigger->created = now;
     trigger->timeout_msec = LLONG_MAX;
     trigger->read_only = read_only;
+    trigger->txn_forward_enabled = txn_forward_enabled;
     trigger->role = nullable_xstrdup(role);
     trigger->id = nullable_xstrdup(id);
     return ovsdb_trigger_try(trigger, now);
@@ -65,6 +69,7 @@ void
 ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
 {
     ovsdb_txn_progress_destroy(trigger->progress);
+    ovsdb_txn_forward_destroy(trigger->txn_forward);
     ovs_list_remove(&trigger->node);
     jsonrpc_msg_destroy(trigger->request);
     jsonrpc_msg_destroy(trigger->reply);
@@ -75,7 +80,7 @@ ovsdb_trigger_destroy(struct ovsdb_trigger *trigger)
 bool
 ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger)
 {
-    return trigger->reply && !trigger->progress;
+    return trigger->reply && !trigger->progress && !trigger->txn_forward;
 }
 
 struct jsonrpc_msg *
@@ -98,6 +103,11 @@ ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const char *reason)
         trigger->progress = NULL;
     }
 
+    if (trigger->txn_forward) {
+        ovsdb_txn_forward_destroy(trigger->txn_forward);
+        trigger->txn_forward = NULL;
+    }
+
     jsonrpc_msg_destroy(trigger->reply);
     trigger->reply = NULL;
 
@@ -148,7 +158,7 @@ ovsdb_trigger_run(struct ovsdb *db, long long int now)
     LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) {
         if (run_triggers
             || now - t->created >= t->timeout_msec
-            || t->progress) {
+            || t->progress || t->txn_forward) {
             if (ovsdb_trigger_try(t, now)) {
                 disconnect_all = true;
             }
@@ -188,23 +198,32 @@ static bool
 ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
 {
     /* Handle "initialized" state. */
-    if (!t->reply) {
+    if (!t->reply && !t->txn_forward) {
         ovs_assert(!t->progress);
 
         struct ovsdb_txn *txn = NULL;
         struct ovsdb *newdb = NULL;
+        bool txn_forwarding_needed = (t->read_only && t->txn_forward_enabled);
         if (!strcmp(t->request->method, "transact")) {
             if (!ovsdb_txn_precheck_prereq(t->db)) {
                 return false;
             }
 
-            bool durable;
+            bool durable, all_ops_read_only;
 
             struct json *result;
+            /* Trying to compose transaction.  If transaction forwarding is
+             * needed, composing it as if there is no read-only restriction.
+             * This allows us to avoid forwarding of invalid requests.  It's
+             * also a security check, because we will forward transactions from
+             * this server and we have to be sure that client passes the RBAC
+             * check.  Transaction will be executed locally if all operations
+             * are read-only.  */
             txn = ovsdb_execute_compose(
-                t->db, t->session, t->request->params, t->read_only,
+                t->db, t->session, t->request->params,
+                txn_forwarding_needed ? false : t->read_only,
                 t->role, t->id, now - t->created, &t->timeout_msec,
-                &durable, &result);
+                &durable, &all_ops_read_only, &result);
             if (!txn) {
                 if (result) {
                     /* Complete.  There was an error but we still represent it
@@ -217,9 +236,20 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
                 return false;
             }
 
-            /* Transition to "committing" state. */
-            t->reply = jsonrpc_create_reply(result, t->request->id);
-            t->progress = ovsdb_txn_propose_commit(txn, durable);
+            if (txn_forwarding_needed && !all_ops_read_only) {
+                /* Transaction is good, but we don't need it. */
+                ovsdb_txn_abort(txn);
+                json_destroy(result);
+                /* Transition to "forwarding" state. */
+                t->txn_forward = ovsdb_txn_forward_create(t->db, t->request);
+                /* Forward will not be completed immediately.  Will check
+                 * next time. */
+                return false;
+            } else {
+                /* Transition to "committing" state. */
+                t->reply = jsonrpc_create_reply(result, t->request->id);
+                t->progress = ovsdb_txn_propose_commit(txn, durable);
+            }
         } else if (!strcmp(t->request->method, "convert")) {
             /* Permission check. */
             if (t->role && *t->role) {
@@ -348,6 +378,18 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now)
             ovsdb_trigger_complete(t);
         }
 
+        return false;
+    } else if (t->txn_forward) {
+        /* Handle "forwarding" state. */
+        if (!ovsdb_txn_forward_is_complete(t->txn_forward)) {
+            return false;
+        }
+
+        /* Transition to "complete". */
+        t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward);
+        ovsdb_txn_forward_destroy(t->txn_forward);
+        t->txn_forward = NULL;
+        ovsdb_trigger_complete(t);
         return false;
     }
 
diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h
index 79af7f6be..4da0ff2e1 100644
--- a/ovsdb/trigger.h
+++ b/ovsdb/trigger.h
@@ -22,26 +22,34 @@ struct ovsdb;
 
 /* Triggers have the following states:
  *
- *    - Initialized (reply == NULL, progress == NULL): Executing the trigger
- *      can keep it in the initialized state, if it has a "wait" condition that
- *      isn't met.  Executing the trigger can also yield an error, in which
- *      case it transitions to "complete".  Otherwise, execution yields a
- *      transaction, which the database attempts to commit.  If the transaction
- *      completes immediately and synchronously, then the trigger transitions
- *      to the "complete" state.  If the transaction requires some time to
- *      complete, it transitions to the "committing" state.
+ *    - Initialized (reply == NULL, progress == NULL, txn_forward == NULL):
+ *      Executing the trigger can keep it in the initialized state, if it has a
+ *      "wait" condition that isn't met.  Executing the trigger can also yield
+ *      an error, in which case it transitions to "complete".  Otherwise,
+ *      execution yields a transaction, which the database attempts to commit.
+ *      If the transaction completes immediately and synchronously, then the
+ *      trigger transitions to the "complete" state.  If the transaction
+ *      requires some time to complete, it transitions to the "committing"
+ *      state.  If the transaction can not be completed locally due to
+ *      read-only restrictions and transaction forwarding is enabled, starts
+ *      forwarding and transitions to the "forwarding" state.
  *
- *    - Committing (reply != NULL, progress != NULL): The transaction is
- *      committing.  If it succeeds, or if it fails permanently, then the
- *      trigger transitions to "complete".  If it fails temporarily
- *      (e.g. because someone else committed to cluster-based storage before we
- *      did), then we transition back to "initialized" to try again.
+ *    - Committing (reply != NULL, progress != NULL, txn_forward == NULL):
+ *      The transaction is committing.  If it succeeds, or if it fails
+ *      permanently, then the trigger transitions to "complete".  If it fails
+ *      temporarily (e.g. because someone else committed to cluster-based
+ *      storage before we did), then we transition back to "initialized" to
+ *      try again.
  *
- *    - Complete (reply != NULL, progress == NULL): The transaction is done
- *      and either succeeded or failed.
+ *    - Forwarding (reply == NULL, progress == NULL, txn_forward != NULL):
+ *      Transaction is forwarded.  Either it succeeds or it fails, the trigger
+ *      transitions to "complete".
+ *
+ *    - Complete (reply != NULL, progress == NULL, txn_forward == NULL):
+ *      The transaction is done and either succeeded or failed.
  */
 struct ovsdb_trigger {
-    /* In "initialized" or "committing" state, in db->triggers.
+    /* In "initialized", "committing" or "forwarding" state, in db->triggers.
      * In "complete", in session->completions. */
     struct ovs_list node;
     struct ovsdb_session *session; /* Session that owns this trigger. */
@@ -49,9 +57,11 @@ struct ovsdb_trigger {
     struct jsonrpc_msg *request; /* Database request. */
     struct jsonrpc_msg *reply;   /* Result (null if none yet). */
     struct ovsdb_txn_progress *progress;
+    struct ovsdb_txn_forward *txn_forward; /* Tracks transaction forwarding. */
     long long int created;      /* Time created. */
     long long int timeout_msec; /* Max wait duration. */
     bool read_only;             /* Database is in read only mode. */
+    bool txn_forward_enabled;   /* Server configured to forward transactions.*/
     char *role;                 /* Role, for role-based access controls. */
     char *id;                   /* ID, for role-based access controls. */
 };
@@ -59,7 +69,8 @@ struct ovsdb_trigger {
 bool ovsdb_trigger_init(struct ovsdb_session *, struct ovsdb *,
                         struct ovsdb_trigger *,
                         struct jsonrpc_msg *request, long long int now,
-                        bool read_only, const char *role, const char *id);
+                        bool read_only, bool txn_forward_enabled,
+                        const char *role, const char *id);
 void ovsdb_trigger_destroy(struct ovsdb_trigger *);
 
 bool ovsdb_trigger_is_complete(const struct ovsdb_trigger *);
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index 926abce3a..c718f7e2a 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -1452,6 +1452,62 @@ m4_define([OVSDB_CHECK_EXECUTION],
 
 EXECUTION_EXAMPLES
 
+AT_BANNER([OVSDB -- ovsdb-server replication with transaction forwarding])
+
+# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
+#
+# Creates two databases with the given SCHEMA, and starts an ovsdb-server on
+# each database.
+# Runs each of the TRANSACTIONS (which should be a quoted list of
+# quoted strings) against the replication server with ovsdb-client one at a
+# time. The server executes read-only transactions and forwards rest of them
+# to the other ovsdb-server.
+#
+# Checks that the overall output is OUTPUT, but UUIDs in the output
+# are replaced by markers of the form <N> where N is a number.  The
+# first unique UUID is replaced by <0>, the next by <1>, and so on.
+# If a given UUID appears more than once it is always replaced by the
+# same marker.
+#
+# Checks that the dump of both databases are the same.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_EXECUTION],
+  [AT_SETUP([$1])
+   AT_KEYWORDS([ovsdb server tcp replication transaction forward $5])
+   $2 > schema
+   AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
+   AT_CHECK([ovsdb-tool create db2 schema], [0], [stdout], [ignore])
+
+   on_exit 'kill `cat *.pid`'
+   AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log dnl
+                          --pidfile --remote=punix:db.sock db1
+            ], [0], [ignore], [ignore])
+
+   AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server2.log dnl
+                          --pidfile=pid2 --remote=punix:db2.sock           dnl
+                          --unixctl=unixctl2 -vjsonrpc:file:dbg            dnl
+                          --sync-from=unix:db.sock --enable-txn-forward db2
+            ], [0], [ignore], [ignore])
+   on_exit 'test ! -e pid2 || kill `cat pid2`'
+
+   m4_foreach([txn], [$3],
+     [AT_CHECK([ovsdb-client transact unix:db2.sock 'txn'], [0],
+               [stdout], [ignore])
+      cat stdout >> output
+   ])
+
+   AT_CHECK([uuidfilt output], [0], [$4], [ignore])
+
+   AT_CHECK([ovsdb-client dump], [0], [stdout], [ignore])
+   OVS_WAIT_UNTIL([ ovsdb-client dump unix:db2.sock > dump2; diff stdout dump2])
+
+   OVSDB_SERVER_SHUTDOWN
+   OVSDB_SERVER_SHUTDOWN2
+   AT_CLEANUP])
+
+EXECUTION_EXAMPLES
+
 AT_BANNER([OVSDB -- ovsdb-server replication table-exclusion])
 
 # OVSDB_CHECK_REPLICATION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
@@ -1762,7 +1818,9 @@ OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/sync-status |grep re
 
 dnl Switch the 'db1' to active
 AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/disconnect-active-ovsdb-server])
-AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/sync-status], [0], [state: active
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/sync-status], [0], [dnl
+state: active
+transaction forwarding: disabled
 ])
 
 dnl Issue a transaction to 'db1'
@@ -1781,7 +1839,9 @@ AT_CHECK([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/connect-active-ovsdb-server
 
 dnl Verify the change happend
 OVS_WAIT_UNTIL([ovs-appctl -t "`pwd`"/unixctl ovsdb-server/sync-status |grep replicating])
-AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/sync-status], [0], [state: active
+AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 ovsdb-server/sync-status], [0], [dnl
+state: active
+transaction forwarding: disabled
 ])
 
 dnl Issue an transaction to 'db2' which is now active.
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index a886f971e..7dc634d3e 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -1573,7 +1573,7 @@ do_trigger(struct ovs_cmdl_context *ctx)
             ovsdb_trigger_init(&session, db, &t->trigger,
                                jsonrpc_create_request("transact", params,
                                                       NULL),
-                               now, false, NULL, NULL);
+                               now, false, false, NULL, NULL);
             t->number = number++;
             if (ovsdb_trigger_is_complete(&t->trigger)) {
                 do_trigger_dump(t, now, "immediate");
-- 
2.26.3



More information about the dev mailing list