[ovs-dev] [PATCH 2/4] ovsdb: Introduce OVSDB replication feature
Cabrera Vega, Mario Alberto
mario.cabrera at hpe.com
Wed Apr 20 00:07:38 UTC 2016
Hi,
As part of this patch the following scenario takes place:
An ovsdb-server acts as a client of another ovsdb-server: Server A is monitoring
Server B tables, so Server B constantly sends update notifications to Server A.
When an update notification arrives to Server A, I'd like to process that notification
right away, as if it were a client request.
In order to wake up Server A from poll block when an update notification arrives,
how can I add the file descriptor corresponding to the jsonrpc receiving notifications
to the set of file descriptors used to wake up the server during a poll block?
Currently, Server A process the incoming update notifications when status_timer
in the server main loop is elapsed.
Could someone guide me on how to implement that?
Thanks,
-Mario
-----Original Message-----
From: dev [mailto:dev-bounces at openvswitch.org] On Behalf Of Cabrera Vega, Mario Alberto
Sent: Tuesday, March 29, 2016 3:30 PM
To: dev at openvswitch.org
Subject: [ovs-dev] [PATCH 2/4] ovsdb: Introduce OVSDB replication feature
Replication is enabled by using the following option when starting the database server:
--sync-from=server
Where 'server' can take any form described in the ovsdb-client(1) manpage as an active connection. If this option is specified, the replication process is immediately started.
Signed-off-by: Mario Cabrera <mario.cabrera at hpe.com>
---
ovsdb/automake.mk | 6 +-
ovsdb/ovsdb-server.1.in | 3 +
ovsdb/ovsdb-server.c | 46 ++--
ovsdb/replication-syn.man | 2 +
ovsdb/replication.c | 597 ++++++++++++++++++++++++++++++++++++++++++++++
ovsdb/replication.h | 39 +++
ovsdb/replication.man | 8 +
tests/ovsdb-server.at | 51 ++++
8 files changed, 725 insertions(+), 27 deletions(-) create mode 100644 ovsdb/replication-syn.man create mode 100644 ovsdb/replication.c create mode 100644 ovsdb/replication.h create mode 100644 ovsdb/replication.man
diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk index 7db6fea..099ed3c 100644
--- a/ovsdb/automake.mk
+++ b/ovsdb/automake.mk
@@ -24,6 +24,8 @@ ovsdb_libovsdb_la_SOURCES = \
ovsdb/monitor.h \
ovsdb/query.c \
ovsdb/query.h \
+ ovsdb/replication.c \
+ ovsdb/replication.h \
ovsdb/row.c \
ovsdb/row.h \
ovsdb/server.c \
@@ -42,7 +44,9 @@ pkgconfig_DATA += \
MAN_FRAGMENTS += \
ovsdb/remote-active.man \
- ovsdb/remote-passive.man
+ ovsdb/remote-passive.man \
+ ovsdb/replication.man \
+ ovsdb/replication-syn.man
# ovsdb-tool
bin_PROGRAMS += ovsdb/ovsdb-tool
diff --git a/ovsdb/ovsdb-server.1.in b/ovsdb/ovsdb-server.1.in index 6c85729..1025ade 100644
--- a/ovsdb/ovsdb-server.1.in
+++ b/ovsdb/ovsdb-server.1.in
@@ -19,6 +19,7 @@ ovsdb\-server \- Open vSwitch database server .so lib/daemon-syn.man .so lib/service-syn.man .so lib/vlog-syn.man
+.so ovsdb/replication-syn.man
.so lib/ssl-syn.man
.so lib/ssl-bootstrap-syn.man
.so lib/ssl-peer-ca-cert-syn.man
@@ -100,6 +101,8 @@ configured remotes.
.so lib/service.man
.SS "Logging Options"
.so lib/vlog.man
+.SS "Syncing Options"
+.so ovsdb/replication.man
.SS "Public Key Infrastructure Options"
The options described below for configuring the SSL public key infrastructure accept a special syntax for obtaining their diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index fa662b1..63dd209 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -42,6 +42,7 @@
#include "ovsdb-error.h"
#include "poll-loop.h"
#include "process.h"
+#include "replication.h"
#include "row.h"
#include "simap.h"
#include "shash.h"
@@ -59,15 +60,7 @@
VLOG_DEFINE_THIS_MODULE(ovsdb_server);
-struct db {
- /* Initialized in main(). */
- char *filename;
- struct ovsdb_file *file;
- struct ovsdb *db;
-
- /* Only used by update_remote_status(). */
- struct ovsdb_txn *txn;
-};
+struct db;
/* SSL configuration. */
static char *private_key_file;
@@ -75,6 +68,9 @@ static char *certificate_file; static char *ca_cert_file; static bool bootstrap_ca_cert;
+/* Replication configuration. */
+static bool connect_to_remote_server;
+
static unixctl_cb_func ovsdb_server_exit; static unixctl_cb_func ovsdb_server_compact; static unixctl_cb_func ovsdb_server_reconnect; @@ -159,6 +155,10 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
ovsdb_jsonrpc_server_run(jsonrpc);
+ if (connect_to_remote_server) {
+ replication_run(all_dbs);
+ }
+
SHASH_FOR_EACH(node, all_dbs) {
struct db *db = node->data;
ovsdb_trigger_run(db->db, time_msec()); @@ -170,9 +170,9 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
}
}
- /* update Manager status(es) every 5 seconds */
+ /* update Manager status(es) every 2.5 seconds */
if (time_msec() >= status_timer) {
- status_timer = time_msec() + 5000;
+ status_timer = time_msec() + 2500;
update_remote_status(jsonrpc, remotes, all_dbs);
}
@@ -350,6 +350,7 @@ main(int argc, char *argv[])
sset_destroy(&remotes);
sset_destroy(&db_filenames);
unixctl_server_destroy(unixctl);
+ disconnect_remote_server();
if (run_process && process_exited(run_process)) {
int status = process_status(run_process); @@ -433,21 +434,6 @@ open_db(struct server_config *config, const char *filename)
return error;
}
-static const struct db *
-find_db(const struct shash *all_dbs, const char *db_name) -{
- struct shash_node *node;
-
- SHASH_FOR_EACH(node, all_dbs) {
- struct db *db = node->data;
- if (!strcmp(db->db->schema->name, db_name)) {
- return db;
- }
- }
-
- return NULL;
-}
-
static char * OVS_WARN_UNUSED_RESULT
parse_db_column__(const struct shash *all_dbs,
const char *name_, char *name, @@ -1278,6 +1264,7 @@ parse_options(int *argcp, char **argvp[],
OPT_RUN,
OPT_BOOTSTRAP_CA_CERT,
OPT_PEER_CA_CERT,
+ OPT_SYNC_FROM,
VLOG_OPTION_ENUMS,
DAEMON_OPTION_ENUMS
};
@@ -1296,6 +1283,7 @@ parse_options(int *argcp, char **argvp[],
{"private-key", required_argument, NULL, 'p'},
{"certificate", required_argument, NULL, 'c'},
{"ca-cert", required_argument, NULL, 'C'},
+ {"sync-from", required_argument, NULL, OPT_SYNC_FROM},
{NULL, 0, NULL, 0},
};
char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
@@ -1356,6 +1344,11 @@ parse_options(int *argcp, char **argvp[],
stream_ssl_set_peer_ca_cert_file(optarg);
break;
+ case OPT_SYNC_FROM:
+ set_remote_ovsdb_server(optarg);
+ connect_to_remote_server = true;
+ break;
+
case '?':
exit(EXIT_FAILURE);
@@ -1382,6 +1375,7 @@ usage(void)
stream_usage("JSON-RPC", true, true, true);
daemon_usage();
vlog_usage();
+ replication_usage();
printf("\nOther options:\n"
" --run COMMAND run COMMAND as subprocess then exit\n"
" --unixctl=SOCKET override default control socket name\n"
diff --git a/ovsdb/replication-syn.man b/ovsdb/replication-syn.man new file mode 100644 index 0000000..adfd7c2
--- /dev/null
+++ b/ovsdb/replication-syn.man
@@ -0,0 +1,2 @@
+.IP "Syncing options:"
+[\fB\-\-sync\-from=\fIserver\fR]
diff --git a/ovsdb/replication.c b/ovsdb/replication.c new file mode 100644 index 0000000..d9e609e
--- /dev/null
+++ b/ovsdb/replication.c
@@ -0,0 +1,597 @@
+/*
+ * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "replication.h"
+
+#include "condition.h"
+#include "json.h"
+#include "jsonrpc.h"
+#include "ovsdb.h"
+#include "ovsdb-error.h"
+#include "query.h"
+#include "row.h"
+#include "stream.h"
+#include "sset.h"
+#include "svec.h"
+#include "table.h"
+#include "transaction.h"
+
+static char *remote_ovsdb_server;
+static struct jsonrpc *rpc;
+static struct sset monitored_tables =
+SSET_INITIALIZER(&monitored_tables);
+static bool reset_dbs = true;
+
+static struct jsonrpc *open_jsonrpc(const char *server); static struct
+ovsdb_error *check_jsonrpc_error(int error,
+ struct jsonrpc_msg
+**reply_); static void fetch_dbs(struct jsonrpc *rpc, struct svec
+*dbs); static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
+ const char *database);
+
+static void send_monitor_requests(struct shash *all_dbs); static void
+add_monitored_table(struct ovsdb_table_schema *table,
+ struct json *monitor_requests);
+
+static void get_initial_db_state(const struct db *database); static
+void reset_database(struct ovsdb *db, struct ovsdb_txn *txn); static
+struct ovsdb_error *reset_databases(struct shash *all_dbs);
+
+static void check_for_notifications(struct shash *all_dbs); static void
+process_notification(struct json *table_updates,
+ struct ovsdb *database); static struct
+ovsdb_error *process_table_update(struct json *table_update,
+ const char *table_name,
+ struct ovsdb *database,
+ struct ovsdb_txn *txn);
+
+static struct ovsdb_error *execute_insert(struct ovsdb_txn *txn,
+ const char *uuid,
+ struct ovsdb_table *table,
+ struct json *new); static
+struct ovsdb_error *execute_delete(struct ovsdb_txn *txn,
+ const char *uuid,
+ struct ovsdb_table *table);
+static struct ovsdb_error *execute_update(struct ovsdb_txn *txn,
+ const char *uuid,
+ struct ovsdb_table *table,
+ struct json *new);
+
+void
+replication_run(struct shash *all_dbs)
+{
+ if (sset_is_empty(&monitored_tables) && remote_ovsdb_server) {
+ /* Reset local databases. */
+ if (reset_dbs) {
+ struct ovsdb_error *error = reset_databases(all_dbs);
+ if (!error) {
+ reset_dbs = false;
+ }
+ /* In case of success reseting the databases,
+ * return in order to notify monitors. */
+ return;
+ }
+
+ /* Open JSON-RPC. */
+ jsonrpc_close(rpc);
+ rpc = open_jsonrpc(remote_ovsdb_server);
+ if (!rpc) {
+ return;
+ }
+
+ /* Send monitor requests. */
+ send_monitor_requests(all_dbs);
+ }
+ if (!sset_is_empty(&monitored_tables)) {
+ check_for_notifications(all_dbs);
+ }
+}
+
+void
+set_remote_ovsdb_server(const char *remote_server) {
+ remote_ovsdb_server = remote_server ? strdup(remote_server) : NULL;
+}
+
+void
+disconnect_remote_server(void)
+{
+ jsonrpc_close(rpc);
+ sset_destroy(&monitored_tables);
+
+ if (remote_ovsdb_server) {
+ free(remote_ovsdb_server);
+ remote_ovsdb_server = NULL;
+ }
+}
+
+const struct db *
+find_db(const struct shash *all_dbs, const char *db_name) {
+ struct shash_node *node;
+
+ SHASH_FOR_EACH(node, all_dbs) {
+ struct db *db = node->data;
+ if (!strcmp(db->db->schema->name, db_name)) {
+ return db;
+ }
+ }
+
+ return NULL;
+}
+
+static struct ovsdb_error *
+reset_databases(struct shash *all_dbs)
+{
+ struct shash_node *db_node;
+ struct ovsdb_error *error = NULL;
+
+ SHASH_FOR_EACH(db_node, all_dbs) {
+ struct db *db = db_node->data;
+ struct ovsdb_txn *txn = ovsdb_txn_create(db->db);
+ reset_database(db->db, txn);
+ error = ovsdb_txn_commit(txn, false);
+ }
+
+ return error;
+}
+
+static void
+reset_database(struct ovsdb *db, struct ovsdb_txn *txn) {
+ struct shash_node *table_node;
+
+ SHASH_FOR_EACH(table_node, &db->tables) {
+ struct ovsdb_table *table = table_node->data;
+ struct ovsdb_row *row;
+
+ HMAP_FOR_EACH (row, hmap_node, &table->rows) {
+ ovsdb_txn_row_delete(txn, row);
+ }
+ }
+}
+
+static struct jsonrpc *
+open_jsonrpc(const char *server)
+{
+ struct stream *stream;
+ int error;
+
+ error = stream_open_block(jsonrpc_stream_open(server, &stream,
+ DSCP_DEFAULT), &stream);
+
+ return error ? NULL : jsonrpc_open(stream); }
+
+static struct ovsdb_error *
+check_jsonrpc_error(int error, struct jsonrpc_msg **reply_) {
+ struct jsonrpc_msg *reply = *reply_;
+
+ if (error) {
+ return ovsdb_error("transaction failed",
+ "transaction returned error %d",
+ error);
+ }
+
+ if (reply->error) {
+ return ovsdb_error("transaction failed",
+ "transaction returned error: %s",
+ json_to_string(reply->error, 0));
+ }
+ return NULL;
+}
+
+static void
+fetch_dbs(struct jsonrpc *rpc, struct svec *dbs) {
+ struct jsonrpc_msg *request, *reply;
+ struct ovsdb_error *error;
+ size_t i;
+
+ request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
+ NULL);
+
+ error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
+ &reply);
+ if (error) {
+ ovsdb_error_assert(error);
+ return;
+ }
+
+ if (reply->result->type != JSON_ARRAY) {
+ ovsdb_error_assert(ovsdb_error("list-dbs failed",
+ "list_dbs response is not array"));
+ return;
+ }
+
+ for (i = 0; i < reply->result->u.array.n; i++) {
+ const struct json *name = reply->result->u.array.elems[i];
+
+ if (name->type != JSON_STRING) {
+ ovsdb_error_assert(ovsdb_error(
+ "list_dbs failed",
+ "list_dbs response %"PRIuSIZE" is not string",
+ i));
+ }
+ svec_add(dbs, name->u.string);
+ }
+ jsonrpc_msg_destroy(reply);
+ svec_sort(dbs);
+}
+
+static struct ovsdb_schema *
+fetch_schema(struct jsonrpc *rpc, const char *database) {
+ struct jsonrpc_msg *request, *reply;
+ struct ovsdb_schema *schema;
+ struct ovsdb_error *error;
+
+ request = jsonrpc_create_request("get_schema",
+ json_array_create_1(
+ json_string_create(database)),
+ NULL);
+ error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
+ &reply);
+ if (error) {
+ jsonrpc_msg_destroy(reply);
+ ovsdb_error_assert(error);
+ return NULL;
+ }
+
+ error = ovsdb_schema_from_json(reply->result, &schema);
+ if (error) {
+ jsonrpc_msg_destroy(reply);
+ ovsdb_error_assert(error);
+ return NULL;
+ }
+ jsonrpc_msg_destroy(reply);
+
+ return schema;
+}
+
+static void
+send_monitor_requests(struct shash *all_dbs) {
+ const char *db_name;
+ struct svec dbs;
+ size_t i;
+
+ svec_init(&dbs);
+ fetch_dbs(rpc, &dbs);
+ SVEC_FOR_EACH (i, db_name, &dbs) {
+ const struct db *database = find_db(all_dbs, db_name);
+
+ if (database) {
+ struct ovsdb_schema *local_schema, *remote_schema;
+
+ local_schema = database->db->schema;
+ remote_schema = fetch_schema(rpc, db_name);
+ if (ovsdb_schema_equal(local_schema, remote_schema)) {
+ struct jsonrpc_msg *request;
+ struct json *monitor, *monitor_request;
+
+ monitor_request = json_object_create();
+ size_t n = shash_count(&local_schema->tables);
+ const struct shash_node **nodes = shash_sort(
+
+ &local_schema->tables);
+
+ for (int j = 0; j < n; j++) {
+ struct ovsdb_table_schema *table = nodes[j]->data;
+ add_monitored_table(table, monitor_request);
+ }
+ free(nodes);
+
+ /* Send monitor request. */
+ monitor = json_array_create_3(
+ json_string_create(db_name),
+ json_string_create(db_name),
+ monitor_request);
+ request = jsonrpc_create_request("monitor", monitor, NULL);
+ jsonrpc_send(rpc, request);
+ get_initial_db_state(database);
+ }
+ ovsdb_schema_destroy(remote_schema);
+ }
+ }
+ svec_destroy(&dbs);
+}
+
+static void
+get_initial_db_state(const struct db *database) {
+ struct jsonrpc_msg *msg;
+
+ jsonrpc_recv_block(rpc, &msg);
+
+ if (msg->type == JSONRPC_REPLY) {
+ process_notification(msg->result, database->db);
+ }
+}
+
+static void
+add_monitored_table(struct ovsdb_table_schema *table,
+ struct json *monitor_request) {
+ struct json *monitor_request_array;
+
+ sset_add(&monitored_tables, table->name);
+
+ monitor_request_array = json_array_create_empty();
+ json_array_add(monitor_request_array, json_object_create());
+
+ json_object_put(monitor_request, table->name,
+monitor_request_array); }
+
+static void
+check_for_notifications(struct shash *all_dbs) {
+ struct jsonrpc_msg *msg;
+ int error;
+
+ error = jsonrpc_recv(rpc, &msg);
+ if (error == EAGAIN) {
+ return;
+ } else if (error) {
+ rpc = open_jsonrpc(remote_ovsdb_server);
+ if (!rpc) {
+ /* Remote server went down. */
+ disconnect_remote_server();
+ }
+ jsonrpc_msg_destroy(msg);
+ return;
+ }
+ if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
+ jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
+ msg->id));
+ } else if (msg->type == JSONRPC_NOTIFY
+ && !strcmp(msg->method, "update")) {
+ struct json *params = msg->params;
+ if (params->type == JSON_ARRAY
+ && params->u.array.n == 2) {
+ char *db_name = params->u.array.elems[0]->u.string;
+ const struct db *database = find_db(all_dbs, db_name);
+ if (database) {
+ process_notification(params->u.array.elems[1], database->db);
+ }
+ }
+ }
+ jsonrpc_msg_destroy(msg);
+ jsonrpc_run(rpc);
+}
+
+static void
+process_notification(struct json *table_updates, struct ovsdb
+*database) {
+ struct ovsdb_error *error;
+ struct ovsdb_txn *txn;
+
+ if (table_updates->type != JSON_OBJECT) {
+ sset_clear(&monitored_tables);
+ return;
+ }
+
+ txn = ovsdb_txn_create(database);
+ error = NULL;
+
+ /* Process each table update. */
+ struct shash_node *node;
+ SHASH_FOR_EACH(node, json_object(table_updates)) {
+ struct json *table_update = node->data;
+ if (table_update) {
+ error = process_table_update(table_update, node->name, database, txn);
+ if (error) {
+ break;
+ }
+ }
+ }
+
+ if (!error){
+ /* Commit transaction. */
+ error = ovsdb_txn_commit(txn, false);
+ if (error) {
+ ovsdb_error_assert(error);
+ sset_clear(&monitored_tables);
+ }
+ } else {
+ ovsdb_txn_abort(txn);
+ ovsdb_error_assert(error);
+ sset_clear(&monitored_tables);
+ }
+
+ ovsdb_error_destroy(error);
+}
+
+static struct ovsdb_error *
+process_table_update(struct json *table_update, const char *table_name,
+ struct ovsdb *database, struct ovsdb_txn *txn) {
+ struct shash_node *node;
+ struct ovsdb_table *table;
+ struct ovsdb_error *error;
+
+ if (table_update->type != JSON_OBJECT) {
+ error = ovsdb_error("Not a JSON object",
+ "<table-update> for table is not object");
+ }
+
+ table = ovsdb_get_table(database, table_name);
+ error = NULL;
+
+ SHASH_FOR_EACH (node, json_object(table_update)) {
+ struct json *row_update = node->data;
+ struct json *old, *new;
+
+ if (row_update->type != JSON_OBJECT) {
+ error = ovsdb_error("NOt a JSON object",
+ "<row-update> is not object");
+ break;
+ }
+ old = shash_find_data(json_object(row_update), "old");
+ new = shash_find_data(json_object(row_update), "new");
+
+ if (!old) {
+ error = execute_insert(txn, node->name, table, new);
+ } else{
+ if (!new) {
+ error = execute_delete(txn, node->name, table);
+ } else {
+ error = execute_update(txn, node->name, table, new);
+ }
+ }
+ }
+ return error;
+}
+
+static struct ovsdb_error *
+execute_insert(struct ovsdb_txn *txn, const char *uuid,
+ struct ovsdb_table *table, struct json *json_row) {
+ struct ovsdb_row *row = NULL;
+ struct uuid row_uuid;
+ struct ovsdb_error *error;
+
+ row = ovsdb_row_create(table);
+ error = ovsdb_row_from_json(row, json_row, NULL, NULL);
+ if (!error) {
+ /* Add UUID to row. */
+ uuid_from_string(&row_uuid, uuid);
+ *ovsdb_row_get_uuid_rw(row) = row_uuid;
+ ovsdb_txn_row_insert(txn, row);
+ } else {
+ ovsdb_row_destroy(row);
+ }
+
+ return error;
+}
+
+struct delete_row_cbdata {
+ size_t n_matches;
+ const struct ovsdb_table *table;
+ struct ovsdb_txn *txn;
+};
+
+static bool
+delete_row_cb(const struct ovsdb_row *row, void *dr_) {
+ struct delete_row_cbdata *dr = dr_;
+
+ dr->n_matches++;
+ ovsdb_txn_row_delete(dr->txn, row);
+
+ return true;
+}
+
+static struct ovsdb_error *
+execute_delete(struct ovsdb_txn *txn, const char *uuid,
+ struct ovsdb_table *table) {
+ const struct json *where;
+ struct ovsdb_error *error;
+ struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
+ char where_string[UUID_LEN+29];
+
+ if (!table) {
+ return OVSDB_BUG("null table");
+ }
+
+ snprintf(where_string, sizeof where_string, "%s%s%s",
+ "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
+
+ where = json_from_string(where_string);
+ error = ovsdb_condition_from_json(table->schema, where, NULL, &condition);
+ if (!error) {
+ struct delete_row_cbdata dr;
+
+ dr.n_matches = 0;
+ dr.table = table;
+ dr.txn = txn;
+ ovsdb_query(table, &condition, delete_row_cb, &dr);
+ }
+
+ ovsdb_condition_destroy(&condition);
+ return error;
+}
+
+struct update_row_cbdata {
+ size_t n_matches;
+ struct ovsdb_txn *txn;
+ const struct ovsdb_row *row;
+ const struct ovsdb_column_set *columns; };
+
+static bool
+update_row_cb(const struct ovsdb_row *row, void *ur_) {
+ struct update_row_cbdata *ur = ur_;
+
+ ur->n_matches++;
+ if (!ovsdb_row_equal_columns(row, ur->row, ur->columns)) {
+ ovsdb_row_update_columns(ovsdb_txn_row_modify(ur->txn, row),
+ ur->row, ur->columns);
+ }
+
+ return true;
+}
+
+static struct ovsdb_error *
+execute_update(struct ovsdb_txn *txn, const char *uuid,
+ struct ovsdb_table *table, struct json *json_row) {
+ struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
+ struct ovsdb_condition condition = OVSDB_CONDITION_INITIALIZER;
+ struct update_row_cbdata ur;
+ struct ovsdb_row *row;
+ struct ovsdb_error *error;
+ const struct json *where;
+ char where_string[UUID_LEN+29];
+
+ snprintf(where_string, sizeof where_string, "%s%s%s",
+ "[[\"_uuid\",\"==\",[\"uuid\",\"",uuid,"\"]]]");
+ where = json_from_string(where_string);
+
+ row = ovsdb_row_create(table);
+ error = ovsdb_row_from_json(row, json_row, NULL, &columns);
+ if (!error) {
+ error = ovsdb_condition_from_json(table->schema, where, NULL,
+ &condition);
+ }
+ if (!error) {
+ ur.n_matches = 0;
+ ur.txn = txn;
+ ur.row = row;
+ ur.columns = &columns;
+ ovsdb_query(table, &condition, update_row_cb, &ur);
+ }
+
+ ovsdb_row_destroy(row);
+ ovsdb_column_set_destroy(&columns);
+ ovsdb_condition_destroy(&condition);
+
+ return error;
+}
+
+void
+replication_usage(void)
+{
+ printf("\n\
+Syncing options:\n\
+ --sync-from=SERVER sync DATABASE from remote SERVER\n\
+ --sync-exclude-tables=DB:TABLE,...\n\
+ exclude the TABLE in DB from syncing\n"); }
diff --git a/ovsdb/replication.h b/ovsdb/replication.h new file mode 100644 index 0000000..f9b7d63
--- /dev/null
+++ b/ovsdb/replication.h
@@ -0,0 +1,39 @@
+/*
+ * (c) Copyright 2016 Hewlett Packard Enterprise Development LP
+ * Copyright (c) 2009, 2010, 2012, 2013 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H 1
+
+#include "shash.h"
+
+struct db {
+ /* Initialized in main(). */
+ char *filename;
+ struct ovsdb_file *file;
+ struct ovsdb *db;
+
+ /* Only used by update_remote_status(). */
+ struct ovsdb_txn *txn;
+};
+
+void replication_run(struct shash *dbs); void
+set_remote_ovsdb_server(const char *remote_server); void
+disconnect_remote_server(void); const struct db *find_db(const struct
+shash *all_dbs, const char *db_name); void replication_usage(void);
+
+#endif /* ovsdb/replication.h */
diff --git a/ovsdb/replication.man b/ovsdb/replication.man new file mode 100644 index 0000000..26420bc
--- /dev/null
+++ b/ovsdb/replication.man
@@ -0,0 +1,8 @@
+The following options allow \fBovsdb\-server\fR to synchronize its
+databases with another running \fBovsdb\-server\fR.
+.TP
+\fB\-\-sync\-from=\fIserver\fR
+Causes \fBovsdb\-server\fR to synchronize its databases with the
+databases in \fIserver\fR. Every transaction committed by \fIserver\fR
+will be replicated to \fBovsdb\-server\fR. \fIserver\fR is an active
+connection method in one of the forms documented in \fBovsdb\-client(1).
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at index c869d6f..81209af 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -5,6 +5,11 @@ m4_define([OVSDB_SERVER_SHUTDOWN],
AT_CHECK([ovs-appctl -t "`pwd`"/unixctl -e exit], [0], [ignore], [ignore])
OVS_WAIT_WHILE([kill -0 `cat savepid`], [kill `cat savepid`])])
+m4_define([OVSDB_SERVER_SHUTDOWN2],
+ [cp pid2 savepid2
+ AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], [ignore])
+ OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])])
+
# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) # # Creates a database with the given SCHEMA, starts an ovsdb-server on @@ -963,3 +968,49 @@ m4_define([OVSDB_CHECK_EXECUTION],
AT_CLEANUP])
EXECUTION_EXAMPLES
+
+AT_BANNER([OVSDB -- ovsdb-server replication (TCP IPv4 sockets)])
+
+# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT,
+[KEYWORDS]) # # Creates two databases with the given SCHEMA, and starts
+an ovsdb-server on # each database.
+# Runs each of the TRANSACTIONS (which should be a quoted list of #
+quoted strings) against one of the servers with ovsdb-client one at a #
+time. The server replicates its database to the other ovsdb-server.
+#
+# Checks that the dump of both databases are the same.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_EXECUTION],
+ [AT_SETUP([$1])
+ AT_KEYWORDS([ovsdb server tcp replication $5])
+ $2 > schema
+ AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore])
+ AT_CHECK([ovsdb-tool create db2 schema], [0], [stdout], [ignore])
+
+ AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log --pidfile="`pwd`"/pid --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db1], [0], [ignore], [ignore])
+ PARSE_LISTENING_PORT([ovsdb-server1.log], [TCP_PORT1])
+
+ AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server2.log --pidfile="`pwd`"/pid2 --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl2 --sync-from=tcp:127.0.0.1:$TCP_PORT1 db2], [0], [ignore], [ignore])
+ PARSE_LISTENING_PORT([ovsdb-server2.log], [TCP_PORT2])
+
+ m4_foreach([txn], [$3],
+ [AT_CHECK([ovsdb-client transact tcp:127.0.0.1:$TCP_PORT1 'txn'; sleep 2], [0], [stdout], [ignore],
+ [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+ ])
+
+ AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT1], [0], [stdout], [ignore],
+ [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+ cat stdout >> dump1
+ AT_CHECK([ovsdb-client dump tcp:127.0.0.1:$TCP_PORT2], [0], [stdout], [ignore],
+ [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+ cat stdout >> dump2
+
+ AT_CHECK([diff dump1 dump2], [0], [], [ignore],
+ [test ! -e pid || kill `cat pid`; test ! -e pid2 || kill `cat pid2`])
+ OVSDB_SERVER_SHUTDOWN
+ OVSDB_SERVER_SHUTDOWN2
+ AT_CLEANUP])
+
+EXECUTION_EXAMPLES
--
1.9.1
_______________________________________________
dev mailing list
dev at openvswitch.org
http://openvswitch.org/mailman/listinfo/dev
More information about the dev
mailing list