[ovs-dev] [PATCH 2/4] ovsdb: Introduce OVSDB replication feature

Cabrera Vega, Mario Alberto mario.cabrera at hpe.com
Tue Mar 29 21:30:08 UTC 2016


Replication is enabled by using the following option when starting the
database server:

--sync-from=server

Where 'server' can take any form described in the ovsdb-client(1)
manpage as an active connection. If this option is specified, the
replication process is immediately started.

Signed-off-by: Mario Cabrera <mario.cabrera at hpe.com>
---
ovsdb/automake.mk         |   6 +-
ovsdb/ovsdb-server.1.in   |   3 +
ovsdb/ovsdb-server.c      |  46 ++--
ovsdb/replication-syn.man |   2 +
ovsdb/replication.c       | 597 ++++++++++++++++++++++++++++++++++++++++++++++
ovsdb/replication.h       |  39 +++
ovsdb/replication.man     |   8 +
tests/ovsdb-server.at     |  51 ++++
8 files changed, 725 insertions(+), 27 deletions(-)
create mode 100644 ovsdb/replication-syn.man
create mode 100644 ovsdb/replication.c
create mode 100644 ovsdb/replication.h
create mode 100644 ovsdb/replication.man

diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
index 7db6fea..099ed3c 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -24,6 +24,8 @@ ovsdb_libovsdb_la_SOURCES = \
              ovsdb/monitor.h \
              ovsdb/query.c \
              ovsdb/query.h \
+             ovsdb/replication.c \
+             ovsdb/replication.h \
              ovsdb/row.c \
              ovsdb/row.h \
              ovsdb/server.c \
@@ -42,7 +44,9 @@ pkgconfig_DATA += \
 MAN_FRAGMENTS += \
              ovsdb/remote-active.man \
-              ovsdb/remote-passive.man
+             ovsdb/remote-passive.man \
+             ovsdb/replication.man \
+             ovsdb/replication-syn.man
 # ovsdb-tool
bin_PROGRAMS += ovsdb/ovsdb-tool
diff --git a/ovsdb/ovsdb-server.1.in b/ovsdb/ovsdb-server.1.in
index 6c85729..1025ade 100644
--- a/ovsdb/ovsdb-server.1.in
+++ b/ovsdb/ovsdb-server.1.in
@@ -19,6 +19,7 @@ ovsdb\-server \- Open vSwitch database server
.so lib/daemon-syn.man
.so lib/service-syn.man
.so lib/vlog-syn.man
+.so ovsdb/replication-syn.man
.so lib/ssl-syn.man
.so lib/ssl-bootstrap-syn.man
.so lib/ssl-peer-ca-cert-syn.man
@@ -100,6 +101,8 @@ configured remotes.
.so lib/service.man
.SS "Logging Options"
.so lib/vlog.man
+.SS "Syncing Options"
+.so ovsdb/replication.man
.SS "Public Key Infrastructure Options"
The options described below for configuring the SSL public key
infrastructure accept a special syntax for obtaining their
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index fa662b1..63dd209 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -42,6 +42,7 @@
#include "ovsdb-error.h"
#include "poll-loop.h"
#include "process.h"
+#include "replication.h"
#include "row.h"
#include "simap.h"
#include "shash.h"
@@ -59,15 +60,7 @@
 VLOG_DEFINE_THIS_MODULE(ovsdb_server);
-struct db {
-    /* Initialized in main(). */
-    char *filename;
-    struct ovsdb_file *file;
-    struct ovsdb *db;
-
-    /* Only used by update_remote_status(). */
-    struct ovsdb_txn *txn;
-};
+struct db;
 /* SSL configuration. */
static char *private_key_file;
@@ -75,6 +68,9 @@ static char *certificate_file;
static char *ca_cert_file;
static bool bootstrap_ca_cert;
+/* Replication configuration. */
+static bool connect_to_remote_server;
+
static unixctl_cb_func ovsdb_server_exit;
static unixctl_cb_func ovsdb_server_compact;
static unixctl_cb_func ovsdb_server_reconnect;
@@ -159,6 +155,10 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
         report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
         ovsdb_jsonrpc_server_run(jsonrpc);
+        if (connect_to_remote_server) {
+             replication_run(all_dbs);
+        }
+
         SHASH_FOR_EACH(node, all_dbs) {
             struct db *db = node->data;
             ovsdb_trigger_run(db->db, time_msec());
@@ -170,9 +170,9 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
             }
         }
-        /* update Manager status(es) every 5 seconds */
+        /* update Manager status(es) every 2.5 seconds */
         if (time_msec() >= status_timer) {
-            status_timer = time_msec() + 5000;
+            status_timer = time_msec() + 2500;
             update_remote_status(jsonrpc, remotes, all_dbs);
         }
@@ -350,6 +350,7 @@ main(int argc, char *argv[])
     sset_destroy(&remotes);
     sset_destroy(&db_filenames);
     unixctl_server_destroy(unixctl);
+    disconnect_remote_server();
     if (run_process && process_exited(run_process)) {
         int status = process_status(run_process);
@@ -433,21 +434,6 @@ open_db(struct server_config *config, const char *filename)
     return error;
}
-static const struct db *
-find_db(const struct shash *all_dbs, const char *db_name)
-{
-    struct shash_node *node;
-
-    SHASH_FOR_EACH(node, all_dbs) {
-        struct db *db = node->data;
-        if (!strcmp(db->db->schema->name, db_name)) {
-            return db;
-        }
-    }
-
-    return NULL;
-}
-
static char * OVS_WARN_UNUSED_RESULT
parse_db_column__(const struct shash *all_dbs,
                   const char *name_, char *name,
@@ -1278,6 +1264,7 @@ parse_options(int *argcp, char **argvp[],
         OPT_RUN,
         OPT_BOOTSTRAP_CA_CERT,
         OPT_PEER_CA_CERT,
+        OPT_SYNC_FROM,
         VLOG_OPTION_ENUMS,
         DAEMON_OPTION_ENUMS
     };
@@ -1296,6 +1283,7 @@ parse_options(int *argcp, char **argvp[],
         {"private-key", required_argument, NULL, 'p'},
         {"certificate", required_argument, NULL, 'c'},
         {"ca-cert",     required_argument, NULL, 'C'},
+        {"sync-from",   required_argument, NULL, OPT_SYNC_FROM},
         {NULL, 0, NULL, 0},
     };
     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
@@ -1356,6 +1344,11 @@ parse_options(int *argcp, char **argvp[],
             stream_ssl_set_peer_ca_cert_file(optarg);
             break;
+        case OPT_SYNC_FROM:
+            set_remote_ovsdb_server(optarg);
+            connect_to_remote_server = true;
+            break;
+
         case '?':
             exit(EXIT_FAILURE);
@@ -1382,6 +1375,7 @@ usage(void)
     stream_usage("JSON-RPC", true, true, true);
     daemon_usage();
     vlog_usage();
+    replication_usage();
     printf("\nOther options:\n"
            "  --run COMMAND           run COMMAND as subprocess then exit\n"
            "  --unixctl=SOCKET        override default control socket name\n"
diff --git a/ovsdb/replication-syn.man b/ovsdb/replication-syn.man
new file mode 100644
index 0000000..adfd7c2
--- /dev/null
+++ b/ovsdb/replication-syn.man
@@ -0,0 +1,2 @@
+.IP "Syncing options:"
+[\fB\-\-sync\-from=\fIserver\fR]
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
new file mode 100644
index 0000000..d9e609e
--- /dev/null
+++ b/ovsdb/replication.c
@@ -0,0 +1,597 @@
+/*
+ * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "replication.h"
+
+#include "condition.h"
+#include "json.h"
+#include "jsonrpc.h"
+#include "ovsdb.h"
+#include "ovsdb-error.h"
+#include "query.h"
+#include "row.h"
+#include "stream.h"
+#include "sset.h"
+#include "svec.h"
+#include "table.h"
+#include "transaction.h"
+
+static char *remote_ovsdb_server;
+static struct jsonrpc *rpc;
+static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
+static bool reset_dbs = true;
+
+static struct jsonrpc *open_jsonrpc(const char *server);
+static struct ovsdb_error *check_jsonrpc_error(int error,
+                                               struct jsonrpc_msg **reply_);
+static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
+static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
+                                         const char *database);
+
+static void send_monitor_requests(struct shash *all_dbs);
+static void add_monitored_table(struct ovsdb_table_schema *table,
+                                struct json *monitor_requests);
+
+static void get_initial_db_state(const struct db *database);
+static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
+static struct ovsdb_error *reset_databases(struct shash *all_dbs);
+
+static void check_for_notifications(struct shash *all_dbs);
+static void process_notification(struct json *table_updates,
+                                 struct ovsdb *database);
+static struct ovsdb_error *process_table_update(struct json *table_update,
+                                                const char *table_name,
+                                                struct ovsdb *database,
+                                                struct ovsdb_txn *txn);
+
+static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
+                                          const char *uuid,
+                                          struct ovsdb_table *table,
+                                          struct json *new);
+static struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
+                                          const char *uuid,
+                                          struct ovsdb_table *table);
+static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
+                                          const char *uuid,
+                                          struct ovsdb_table *table,
+                                          struct json *new);
+
+void
+replication_run(struct shash *all_dbs)
+{
+    if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
+        /* Reset local databases. */
+        if (reset_dbs) {
+            struct ovsdb_error *error = reset_databases(all_dbs);
+            if (!error) {
+                reset_dbs = false;
+            }
+            /* In case of success reseting the databases,
+             * return in order to notify monitors. */
+            return;
+        }
+
+        /* Open JSON-RPC. */
+        jsonrpc_close(rpc);
+        rpc = open_jsonrpc(remote_ovsdb_server);
+        if (!rpc) {
+            return;
+        }
+
+        /* Send monitor requests. */
+        send_monitor_requests(all_dbs);
+    }
+    if (!sset_is_empty(&monitored_tables)) {
+        check_for_notifications(all_dbs);
+    }
+}
+
+void
+set_remote_ovsdb_server(const char *remote_server)
+{
+    remote_ovsdb_server = remote_server ? strdup(remote_server) : NULL;
+}
+
+void
+disconnect_remote_server(void)
+{
+    jsonrpc_close(rpc);
+    sset_destroy(&monitored_tables);
+
+    if (remote_ovsdb_server) {
+        free(remote_ovsdb_server);
+        remote_ovsdb_server = NULL;
+    }
+}
+
+const struct db *
+find_db(const struct shash *all_dbs, const char *db_name)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH(node, all_dbs) {
+        struct db *db = node->data;
+        if (!strcmp(db->db->schema->name, db_name)) {
+            return db;
+        }
+    }
+
+    return NULL;
+}
+
+static struct ovsdb_error *
+reset_databases(struct shash *all_dbs)
+{
+    struct shash_node *db_node;
+    struct ovsdb_error *error = NULL;
+
+    SHASH_FOR_EACH(db_node, all_dbs) {
+        struct db *db = db_node->data;
+        struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
+        reset_database(db->db, txn);
+        error = ovsdb_txn_commit(txn, false);
+    }
+
+    return error;
+}
+
+static void
+reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
+{
+    struct shash_node *table_node;
+
+    SHASH_FOR_EACH(table_node, &db->tables) {
+        struct ovsdb_table *table = table_node->data;
+        struct ovsdb_row *row;
+
+        HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+            ovsdb_txn_row_delete(txn, row);
+        }
+    }
+}
+
+static struct jsonrpc *
+open_jsonrpc(const char *server)
+{
+    struct stream *stream;
+    int error;
+
+    error = stream_open_block(jsonrpc_stream_open(server, &stream,
+                              DSCP_DEFAULT), &stream);
+
+    return error ? NULL : jsonrpc_open(stream);
+}
+
+static struct ovsdb_error *
+check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
+{
+    struct jsonrpc_msg *reply = *reply_;
+
+    if (error) {
+        return ovsdb_error("transaction failed",
+                           "transaction returned error %d",
+                           error);
+    }
+
+    if (reply->error) {
+        return ovsdb_error("transaction failed",
+                           "transaction returned error: %s",
+                           json_to_string(reply->error, 0));
+    }
+    return NULL;
+}
+
+static void
+fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
+{
+    struct jsonrpc_msg *request, *reply;
+    struct ovsdb_error *error;
+    size_t i;
+
+    request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
+                                     NULL);
+
+    error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
+                                &reply);
+    if (error) {
+        ovsdb_error_assert(error);
+        return;
+    }
+
+    if (reply->result->type != JSON_ARRAY) {
+        ovsdb_error_assert(ovsdb_error("list-dbs failed",
+                                       "list_dbs response is not array"));
+        return;
+    }
+
+    for (i = 0; i < reply->result->u.array.n; i++) {
+        const struct json *name = reply->result->u.array.elems[i];
+
+        if (name->type != JSON_STRING) {
+            ovsdb_error_assert(ovsdb_error(
+                               "list_dbs failed",
+                               "list_dbs response %"PRIuSIZE" is not string",
+                               i));
+        }
+        svec_add(dbs, name->u.string);
+    }
+    jsonrpc_msg_destroy(reply);
+    svec_sort(dbs);
+}
+
+static struct ovsdb_schema *
+fetch_schema(struct jsonrpc *rpc, const char *database)
+{
+    struct jsonrpc_msg *request, *reply;
+    struct ovsdb_schema *schema;
+    struct ovsdb_error *error;
+
+    request = jsonrpc_create_request("get_schema",
+                                     json_array_create_1(
+                                         json_string_create(database)),
+                                     NULL);
+    error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
+                                &reply);
+    if (error) {
+        jsonrpc_msg_destroy(reply);
+        ovsdb_error_assert(error);
+        return NULL;
+    }
+
+    error = ovsdb_schema_from_json(reply->result, &schema);
+    if (error) {
+        jsonrpc_msg_destroy(reply);
+        ovsdb_error_assert(error);
+        return NULL;
+    }
+    jsonrpc_msg_destroy(reply);
+
+    return schema;
+}
+
+static void
+send_monitor_requests(struct shash *all_dbs)
+{
+    const char *db_name;
+    struct svec dbs;
+    size_t i;
+
+    svec_init(&dbs);
+    fetch_dbs(rpc, &dbs);
+    SVEC_FOR_EACH (i, db_name, &dbs) {
+        const struct db *database = find_db(all_dbs, db_name);
+
+        if (database) {
+            struct ovsdb_schema *local_schema, *remote_schema;
+
+            local_schema = database->db->schema;
+            remote_schema = fetch_schema(rpc, db_name);
+            if (ovsdb_schema_equal(local_schema, remote_schema)) {
+                struct jsonrpc_msg *request;
+                struct json *monitor, *monitor_request;
+
+                monitor_request = json_object_create();
+                size_t n = shash_count(&local_schema->tables);
+                const struct shash_node **nodes = shash_sort(
+                                                     &local_schema->tables);
+
+                for (int j = 0; j < n; j++) {
+                    struct ovsdb_table_schema *table = nodes[j]->data;
+                    add_monitored_table(table, monitor_request);
+                }
+                free(nodes);
+
+                /* Send monitor request. */
+                 monitor = json_array_create_3(
+                              json_string_create(db_name),
+                              json_string_create(db_name),
+                              monitor_request);
+                 request = jsonrpc_create_request("monitor", monitor, NULL);
+                 jsonrpc_send(rpc, request);
+                 get_initial_db_state(database);
+            }
+            ovsdb_schema_destroy(remote_schema);
+        }
+    }
+    svec_destroy(&dbs);
+}
+
+static void
+get_initial_db_state(const struct db *database)
+{
+    struct jsonrpc_msg *msg;
+
+    jsonrpc_recv_block(rpc, &msg);
+
+    if (msg->type == JSONRPC_REPLY) {
+        process_notification(msg->result, database->db);
+    }
+}
+
+static void
+add_monitored_table(struct ovsdb_table_schema *table,
+                    struct json *monitor_request)
+{
+    struct json *monitor_request_array;
+
+    sset_add(&monitored_tables, table->name);
+
+    monitor_request_array = json_array_create_empty();
+    json_array_add(monitor_request_array, json_object_create());
+
+    json_object_put(monitor_request, table->name, monitor_request_array);
+}
+
+static void
+check_for_notifications(struct shash *all_dbs)
+{
+    struct jsonrpc_msg *msg;
+    int error;
+
+    error = jsonrpc_recv(rpc, &msg);
+    if (error == EAGAIN) {
+        return;
+    } else if (error) {
+        rpc = open_jsonrpc(remote_ovsdb_server);
+        if (!rpc) {
+            /* Remote server went down. */
+            disconnect_remote_server();
+        }
+        jsonrpc_msg_destroy(msg);
+        return;
+    }
+    if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
+        jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
+                     msg->id));
+    } else if (msg->type == JSONRPC_NOTIFY
+               && !strcmp(msg->method, "update")) {
+        struct json *params = msg->params;
+        if (params->type == JSON_ARRAY
+            && params->u.array.n == 2) {
+            char *db_name = params->u.array.elems[0]->u.string;
+            const struct db *database = find_db(all_dbs, db_name);
+            if (database) {
+                process_notification(params->u.array.elems[1], database->db);
+            }
+       }
+    }
+    jsonrpc_msg_destroy(msg);
+    jsonrpc_run(rpc);
+}
+
+static void
+process_notification(struct json *table_updates, struct ovsdb *database)
+{
+    struct ovsdb_error *error;
+    struct ovsdb_txn *txn;
+
+    if (table_updates->type != JSON_OBJECT) {
+        sset_clear(&monitored_tables);
+        return;
+    }
+
+    txn = ovsdb_txn_create(database);
+    error = NULL;
+
+    /* Process each table update. */
+    struct shash_node *node;
+    SHASH_FOR_EACH(node, json_object(table_updates)) {
+        struct json *table_update = node->data;
+        if (table_update) {
+            error = process_table_update(table_update, node->name, database, txn);
+            if (error) {
+                break;
+            }
+        }
+    }
+
+    if (!error){
+        /* Commit transaction. */
+        error = ovsdb_txn_commit(txn, false);
+        if (error) {
+            ovsdb_error_assert(error);
+            sset_clear(&monitored_tables);
+        }
+    } else {
+        ovsdb_txn_abort(txn);
+        ovsdb_error_assert(error);
+        sset_clear(&monitored_tables);
+    }
+
+    ovsdb_error_destroy(error);
+}
+
+static struct ovsdb_error *
+process_table_update(struct json *table_update, const char *table_name,
+                     struct ovsdb *database, struct ovsdb_txn *txn)
+{
+    struct shash_node *node;
+    struct ovsdb_table *table;
+    struct ovsdb_error *error;
+
+    if (table_update->type != JSON_OBJECT) {
+        error = ovsdb_error("Not a JSON object",
+                            "<table-update> for table is not object");
+    }
+
+    table = ovsdb_get_table(database, table_name);
+    error = NULL;
+
+    SHASH_FOR_EACH (node, json_object(table_update)) {
+        struct json *row_update = node->data;
+        struct json *old, *new;
+
+        if (row_update->type != JSON_OBJECT) {
+            error = ovsdb_error("NOt a JSON object",
+                                "<row-update> is not object");
+            break;
+        }
+        old = shash_find_data(json_object(row_update), "old");
+        new = shash_find_data(json_object(row_update), "new");
+
+        if (!old) {
+            error = execute_insert(txn, node->name, table, new);
+        } else{
+            if (!new) {
+                error = execute_delete(txn, node->name, table);
+            } else {
+                error = execute_update(txn, node->name, table, new);
+            }
+        }
+    }
+    return error;
+}
+
+static struct ovsdb_error *
+execute_insert(struct ovsdb_txn *txn, const char *uuid,
+               struct ovsdb_table *table, struct json *json_row)
+{
+    struct ovsdb_row *row = NULL;
+    struct uuid row_uuid;
+    struct ovsdb_error *error;
+
+    row = ovsdb_row_create(table);
+    error = ovsdb_row_from_json(row, json_row, NULL, NULL);
+    if (!error) {
+        /* Add UUID to row. */
+        uuid_from_string(&row_uuid, uuid);
+        *ovsdb_row_get_uuid_rw(row) = row_uuid;
+        ovsdb_txn_row_insert(txn, row);
+    } else {
+        ovsdb_row_destroy(row);
+    }
+
+    return error;
+}
+
+struct delete_row_cbdata {
+    size_t n_matches;
+    const struct ovsdb_table *table;
+    struct ovsdb_txn *txn;
+};
+
+static bool
+delete_row_cb(const struct ovsdb_row *row, void *dr_)
+{
+    struct delete_row_cbdata *dr = dr_;
+
+    dr->n_matches++;
+    ovsdb_txn_row_delete(dr->txn, row);
+
+    return true;
+}
+
+static struct ovsdb_error *
+execute_delete(struct ovsdb_txn *txn, const char *uuid,
+               struct ovsdb_table *table)
+{
+    const struct json *where;
+    struct ovsdb_error *error;
+    struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
+    char where_string[UUID_LEN+29];
+
+    if (!table) {
+        return OVSDB_BUG("null table");
+    }
+
+    snprintf(where_string, sizeof where_string, "%s%s%s",
+             "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
+
+    where = json_from_string(where_string);
+    error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
+    if (!error) {
+        struct delete_row_cbdata dr;
+
+        dr.n_matches = 0;
+        dr.table = table;
+        dr.txn = txn;
+        ovsdb_query(table, &condition, delete_row_cb, &dr);
+    }
+
+    ovsdb_condition_destroy(&condition);
+    return error;
+}
+
+struct update_row_cbdata {
+    size_t n_matches;
+    struct ovsdb_txn *txn;
+    const struct ovsdb_row *row;
+    const struct ovsdb_column_set *columns;
+};
+
+static bool
+update_row_cb(const struct ovsdb_row *row, void *ur_)
+{
+    struct update_row_cbdata *ur = ur_;
+
+    ur->n_matches++;
+    if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
+        ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
+                                 ur->row, ur->columns);
+    }
+
+    return true;
+}
+
+static struct ovsdb_error *
+execute_update(struct ovsdb_txn *txn, const char *uuid,
+               struct ovsdb_table *table, struct json *json_row)
+{
+    struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
+    struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
+    struct update_row_cbdata ur;
+    struct ovsdb_row *row;
+    struct ovsdb_error *error;
+    const struct json *where;
+    char where_string[UUID_LEN+29];
+
+    snprintf(where_string, sizeof where_string, "%s%s%s",
+             "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
+    where = json_from_string(where_string);
+
+    row = ovsdb_row_create(table);
+    error = ovsdb_row_from_json(row, json_row, NULL, &columns);
+    if (!error) {
+        error = ovsdb_condition_from_json(table->schema, where, NULL,
+                                          &condition);
+    }
+    if (!error) {
+        ur.n_matches = 0;
+        ur.txn = txn;
+        ur.row = row;
+        ur.columns = &columns;
+        ovsdb_query(table, &condition, update_row_cb, &ur);
+    }
+
+    ovsdb_row_destroy(row);
+    ovsdb_column_set_destroy(&columns);
+    ovsdb_condition_destroy(&condition);
+
+    return error;
+}
+
+void
+replication_usage(void)
+{
+    printf("\n\
+Syncing options:\n\
+  --sync-from=SERVER      sync DATABASE from remote SERVER\n\
+  --sync-exclude-tables=DB:TABLE,...\n\
+                          exclude the TABLE in DB from syncing\n");
+}
diff --git a/ovsdb/replication.h b/ovsdb/replication.h
new file mode 100644
index 0000000..f9b7d63
--- /dev/null
+++ b/ovsdb/replication.h
@@ -0,0 +1,39 @@
+/*
+ * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
+ * Copyright (c) 2009, 2010, 2012, 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H 1
+
+#include "shash.h"
+
+struct db {
+    /* Initialized in main(). */
+    char *filename;
+    struct ovsdb_file *file;
+    struct ovsdb *db;
+
+    /* Only used by update_remote_status(). */
+    struct ovsdb_txn *txn;
+};
+
+void replication_run(struct shash *dbs);
+void set_remote_ovsdb_server(const char *remote_server);
+void disconnect_remote_server(void);
+const struct db *find_db(const struct shash *all_dbs, const char *db_name);
+void replication_usage(void);
+
+#endif /* ovsdb/replication.h */
diff --git a/ovsdb/replication.man b/ovsdb/replication.man
new file mode 100644
index 0000000..26420bc
--- /dev/null
+++ b/ovsdb/replication.man
@@ -0,0 +1,8 @@
+The following options allow \fBovsdb\-server\fR to synchronize  its  databases
+with another running \fBovsdb\-server\fR.
+.TP
+\fB\-\-sync\-from=\fIserver\fR
+Causes  \fBovsdb\-server\fR to synchronize its databases with the databases in
+\fIserver\fR.  Every transaction committed by \fIserver\fR  will  be  replicated
+to \fBovsdb\-server\fR. \fIserver\fR is an active connection method in one of
+the forms documented in \fBovsdb\-client(1).
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index c869d6f..81209af 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -5,6 +5,11 @@ m4_define([OVSDB_SERVER_SHUTDOWN],
    AT_CHECK([ovs-appctl -t "`pwd`"/unixctl -e exit], [0], [ignore], [ignore])
    OVS_WAIT_WHILE([kill -0 `cat savepid`], [kill `cat savepid`])])
+m4_define([OVSDB_SERVER_SHUTDOWN2],
+  [cp pid2 savepid2
+   AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore])
+   OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])])
+
# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
#
# Creates a database with the given SCHEMA, starts an ovsdb-server on
@@ -963,3 +968,49 @@ m4_define([OVSDB_CHECK_EXECUTION],
    AT_CLEANUP])
 EXECUTION_EXAMPLES
+
+AT_BANNER([OVSDB -- ovsdb-server replication (TCP IPv4 sockets)])
+
+# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS])
+#
+# Creates two databases with the given SCHEMA, and starts an ovsdb-server on
+# each database.
+# Runs each of the TRANSACTIONS (which should be a quoted list of
+# quoted strings) against one of the servers with ovsdb-client one at a
+# time. The server replicates its database to the other ovsdb-server.
+#
+# Checks that the dump of both databases are the same.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_EXECUTION],
+  [AT_SETUP([$1])
+   AT_KEYWORDS([ovsdb server tcp replication $5])
+   $2 > schema
+   AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
+   AT_CHECK([ovsdb-tool create db2 schema], [0], [stdout], [ignore])
+
+   AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log --pidfile="`pwd`"/pid --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db1], [0], [ignore], [ignore])
+   PARSE_LISTENING_PORT([ovsdb-server1.log], [TCP_PORT1])
+
+   AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server2.log --pidfile="`pwd`"/pid2 --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl2 --sync-from=tcp:127.0.0.1:$TCP_PORT1 db2], [0], [ignore], [ignore])
+   PARSE_LISTENING_PORT([ovsdb-server2.log], [TCP_PORT2])
+
+   m4_foreach([txn], [$3],
+     [AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT1 'txn'; sleep 2], [0], [stdout], [ignore],
+     [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+   ])
+
+   AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT1], [0], [stdout], [ignore],
+     [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+   cat stdout >> dump1
+   AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT2], [0], [stdout], [ignore],
+     [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+   cat stdout >> dump2
+
+   AT_CHECK([diff dump1 dump2], [0], [], [ignore],
+            [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+   OVSDB_SERVER_SHUTDOWN
+   OVSDB_SERVER_SHUTDOWN2
+   AT_CLEANUP])
+
+EXECUTION_EXAMPLES
--
1.9.1



More information about the dev mailing list