[ovs-dev] [replication SMv2 6/7] OVSDB: Reimplement replication. Using a state machine.
Ben Pfaff
blp at ovn.org
Tue Aug 30 21:35:43 UTC 2016
On Fri, Aug 26, 2016 at 04:15:53PM -0700, Andy Zhou wrote:
> Current replication uses blocking transactions, which are error prone
> in practice, especially in handling RPC connection flapping to the
> active server.
>
> Signed-off-by: Andy Zhou <azhou at ovn.org>
Much better. Thank you.
Acked-by: Ben Pfaff <blp at ovn.org>
Some suggestions below.
--8<--------------------------cut here-------------------------->8--
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index ed73559..0c879fe 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -191,8 +191,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
replication_run();
if (!replication_is_alive()) {
int retval = replication_get_last_error();
- ovs_fatal(0, "replication connection failed (%s)",
- ovs_retval_to_string(retval));
+ ovs_fatal(retval, "replication connection failed");
}
}
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 16fd323..b9759b5 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -132,15 +132,13 @@ replication_add_local_db(const char *database, struct ovsdb *db)
void
replication_run(void)
{
- struct ovsdb *db;
if (!session) {
return;
}
jsonrpc_session_run(session);
- int i;
- for (i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
+ for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
struct jsonrpc_msg *msg;
unsigned int seqno;
@@ -171,7 +169,7 @@ replication_run(void)
&& msg->params->u.array.n == 2
&& msg->params->u.array.elems[0]->type == JSON_STRING) {
char *db_name = msg->params->u.array.elems[0]->u.string;
- db = find_db(db_name);
+ struct ovsdb *db = find_db(db_name);
if (db) {
struct ovsdb_error *error;
error = process_notification(msg->params->u.array.elems[1],
@@ -182,8 +180,13 @@ replication_run(void)
}
}
}
- } else if (msg->type == JSONRPC_REPLY &&
- request_ids_lookup_and_free(msg->id, &db)) {
+ } else if (msg->type == JSONRPC_REPLY) {
+ struct ovsdb *db;
+ if (!request_ids_lookup_and_free(msg->id, &db)) {
+ VLOG_WARN("received unexpected reply");
+ goto next;
+ }
+
switch (state) {
case RPL_S_DB_REQUESTED:
if (msg->result->type != JSON_ARRAY) {
@@ -199,7 +202,7 @@ replication_run(void)
if (name->type == JSON_STRING) {
/* Send one schema request for each remote DB. */
const char *db_name = json_string(name);
- db = find_db(db_name);
+ struct ovsdb *db = find_db(db_name);
if (db) {
struct jsonrpc_msg *request =
jsonrpc_create_request(
@@ -228,8 +231,8 @@ replication_run(void)
}
if (db != find_db(schema->name)) {
- /* Unexpceted schema */
- VLOG_WARN("unexpcted schema %s", schema->name);
+ /* Unexpected schema. */
+ VLOG_WARN("unexpected schema %s", schema->name);
state = RPL_S_ERR;
} else if (!ovsdb_schema_equal(schema, db->schema)) {
/* Schmea version mismatch. */
@@ -239,29 +242,29 @@ replication_run(void)
}
ovsdb_schema_destroy(schema);
- /* After receving schemas, reset the local databases that
+ /* After receiving schemas, reset the local databases that
* will be monitored and send out monitor requests for them. */
if (hmap_is_empty(&request_ids)) {
struct shash_node *node, *next;
- SHASH_FOR_EACH_SAFE(node, next, replication_dbs) {
- db = node->data;
+ SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
+ struct ovsdb *db = node->data;
struct ovsdb_error *error = reset_database(db);
if (error) {
const char *db_name = db->schema->name;
shash_find_and_delete(replication_dbs, db_name);
ovsdb_error_assert(error);
- VLOG_WARN("Failed to reset databse,"
- "%s not replicated", db_name);
+ VLOG_WARN("Failed to reset database, "
+ "%s not replicated.", db_name);
}
}
if (shash_is_empty(replication_dbs)) {
- VLOG_WARN("Nothing to replicate");
+ VLOG_WARN("Nothing to replicate.");
state = RPL_S_ERR;
} else {
SHASH_FOR_EACH (node, replication_dbs) {
- db = node->data;
+ struct ovsdb *db = node->data;
struct jsonrpc_msg *request =
create_monitor_request(db);
@@ -282,7 +285,7 @@ replication_run(void)
ovsdb_error_assert(error);
state = RPL_S_ERR;
} else {
- /* Transition to replicating state after receving
+ /* Transition to replicating state after receiving
* all replies of "monitor" requests. */
if (hmap_is_empty(&request_ids)) {
state = RPL_S_REPLICATING;
@@ -300,7 +303,8 @@ replication_run(void)
OVS_NOT_REACHED();
}
}
- jsonrpc_msg_destroy(msg);
+ next:
+ jsonrpc_msg_destroy(msg);
}
}
@@ -825,7 +829,7 @@ replication_db_clone(struct shash *dbs)
return new;
}
-/* Return true if replication just stared or is on going.
+/* Return true if replication just started or is ongoing.
* Return false if the connection failed, or the replication
* was not able to start. */
bool
@@ -851,7 +855,7 @@ replication_get_last_error(void)
if (session) {
err = jsonrpc_session_get_last_error(session);
if (!err) {
- err = (state == RPL_S_ERR) ? ENOENT : 0;
+ err = (state == RPL_S_ERR) ? ENOENT : 0;
}
}
More information about the dev
mailing list