[ovs-dev] [replication SMv2 6/7] OVSDB: Reimplement replication. Using a state machine.

Ben Pfaff blp at ovn.org
Tue Aug 30 21:35:43 UTC 2016


On Fri, Aug 26, 2016 at 04:15:53PM -0700, Andy Zhou wrote:
> Current replication uses blocking transactions, which are error prone
> in practice, especially in handling RPC connection flapping to the
> active server.
> 
> Signed-off-by: Andy Zhou <azhou at ovn.org>

Much better.  Thank you.

Acked-by: Ben Pfaff <blp at ovn.org>

Some suggestions below.

--8<--------------------------cut here-------------------------->8--

diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index ed73559..0c879fe 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -191,8 +191,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
             replication_run();
             if (!replication_is_alive()) {
                 int retval = replication_get_last_error();
-                ovs_fatal(0, "replication connection failed (%s)",
-                          ovs_retval_to_string(retval));
+                ovs_fatal(retval, "replication connection failed");
             }
         }
 
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 16fd323..b9759b5 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -132,15 +132,13 @@ replication_add_local_db(const char *database, struct ovsdb *db)
 void
 replication_run(void)
 {
-    struct ovsdb *db;
     if (!session) {
         return;
     }
 
     jsonrpc_session_run(session);
 
-    int i;
-    for (i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
+    for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
         struct jsonrpc_msg *msg;
         unsigned int seqno;
 
@@ -171,7 +169,7 @@ replication_run(void)
                 && msg->params->u.array.n == 2
                 && msg->params->u.array.elems[0]->type == JSON_STRING) {
                 char *db_name = msg->params->u.array.elems[0]->u.string;
-                db = find_db(db_name);
+                struct ovsdb *db = find_db(db_name);
                 if (db) {
                     struct ovsdb_error *error;
                     error = process_notification(msg->params->u.array.elems[1],
@@ -182,8 +180,13 @@ replication_run(void)
                     }
                 }
             }
-        } else if (msg->type == JSONRPC_REPLY &&
-                   request_ids_lookup_and_free(msg->id, &db)) {
+        } else if (msg->type == JSONRPC_REPLY) {
+            struct ovsdb *db;
+            if (!request_ids_lookup_and_free(msg->id, &db)) {
+                VLOG_WARN("received unexpected reply");
+                goto next;
+            }
+
             switch (state) {
             case RPL_S_DB_REQUESTED:
                 if (msg->result->type != JSON_ARRAY) {
@@ -199,7 +202,7 @@ replication_run(void)
                         if (name->type == JSON_STRING) {
                             /* Send one schema request for each remote DB. */
                             const char *db_name = json_string(name);
-                            db = find_db(db_name);
+                            struct ovsdb *db = find_db(db_name);
                             if (db) {
                                 struct jsonrpc_msg *request =
                                     jsonrpc_create_request(
@@ -228,8 +231,8 @@ replication_run(void)
                 }
 
                 if (db != find_db(schema->name)) {
-                    /* Unexpceted schema */
-                    VLOG_WARN("unexpcted schema %s", schema->name);
+                    /* Unexpected schema. */
+                    VLOG_WARN("unexpected schema %s", schema->name);
                     state = RPL_S_ERR;
                 } else if (!ovsdb_schema_equal(schema, db->schema)) {
                     /* Schmea version mismatch. */
@@ -239,29 +242,29 @@ replication_run(void)
                 }
                 ovsdb_schema_destroy(schema);
 
-                /* After receving schemas, reset the local databases that
+                /* After receiving schemas, reset the local databases that
                  * will be monitored and send out monitor requests for them. */
                 if (hmap_is_empty(&request_ids)) {
                     struct shash_node *node, *next;
 
-                    SHASH_FOR_EACH_SAFE(node, next, replication_dbs) {
-                        db = node->data;
+                    SHASH_FOR_EACH_SAFE (node, next, replication_dbs) {
+                        struct ovsdb *db = node->data;
                         struct ovsdb_error *error = reset_database(db);
                         if (error) {
                             const char *db_name = db->schema->name;
                             shash_find_and_delete(replication_dbs, db_name);
                             ovsdb_error_assert(error);
-                            VLOG_WARN("Failed to reset databse,"
-                                      "%s not replicated", db_name);
+                            VLOG_WARN("Failed to reset database, "
+                                      "%s not replicated.", db_name);
                         }
                     }
 
                     if (shash_is_empty(replication_dbs)) {
-                        VLOG_WARN("Nothing to replicate");
+                        VLOG_WARN("Nothing to replicate.");
                         state = RPL_S_ERR;
                     } else {
                         SHASH_FOR_EACH (node, replication_dbs) {
-                            db = node->data;
+                            struct ovsdb *db = node->data;
                             struct jsonrpc_msg *request =
                                 create_monitor_request(db);
 
@@ -282,7 +285,7 @@ replication_run(void)
                     ovsdb_error_assert(error);
                     state = RPL_S_ERR;
                 } else {
-                    /* Transition to replicating state after receving
+                    /* Transition to replicating state after receiving
                      * all replies of "monitor" requests. */
                     if (hmap_is_empty(&request_ids)) {
                         state = RPL_S_REPLICATING;
@@ -300,7 +303,8 @@ replication_run(void)
                 OVS_NOT_REACHED();
             }
         }
-    jsonrpc_msg_destroy(msg);
+    next:
+        jsonrpc_msg_destroy(msg);
     }
 }
 
@@ -825,7 +829,7 @@ replication_db_clone(struct shash *dbs)
     return new;
 }
 
-/* Return true if replication just stared or is on going.
+/* Return true if replication just started or is ongoing.
  * Return false if the connection failed, or the replication
  * was not able to start. */
 bool
@@ -851,7 +855,7 @@ replication_get_last_error(void)
     if (session) {
         err = jsonrpc_session_get_last_error(session);
         if (!err) {
-            err = (state == RPL_S_ERR) ?  ENOENT : 0;
+            err = (state == RPL_S_ERR) ? ENOENT : 0;
         }
     }
 



More information about the dev mailing list