[ovs-dev] [Replication SM 6/7] OVSDB: Reimplement replication. Using a state machine.

Andy Zhou azhou at ovn.org
Thu Aug 25 00:07:26 UTC 2016


Current replication uses blocking transactions, which are error prone
in practice, especially in handling RPC connection flapping to the
active server.

Signed-off-by: Andy Zhou <azhou at ovn.org>
---
 ovsdb/ovsdb-server.c |  17 +-
 ovsdb/replication.c  | 608 ++++++++++++++++++++++++++-------------------------
 ovsdb/replication.h  |   4 +-
 3 files changed, 326 insertions(+), 303 deletions(-)

diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index d4bc80b..ed73559 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -137,7 +137,7 @@ ovsdb_replication_init(struct shash *all_dbs)
     struct shash_node *node;
     SHASH_FOR_EACH (node, all_dbs) {
         struct db *db = node->data;
-        replication_add_db(db->db->schema->name, db->db);
+        replication_add_local_db(db->db->schema->name, db->db);
     }
 }
 
@@ -188,7 +188,12 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
         ovsdb_jsonrpc_server_run(jsonrpc);
 
         if (is_backup_server) {
-             replication_run();
+            replication_run();
+            if (!replication_is_alive()) {
+                int retval = replication_get_last_error();
+                ovs_fatal(0, "replication connection failed (%s)",
+                          ovs_retval_to_string(retval));
+            }
         }
 
         SHASH_FOR_EACH(node, all_dbs) {
@@ -212,6 +217,7 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
         if (is_backup_server) {
             replication_wait();
         }
+
         ovsdb_jsonrpc_server_wait(jsonrpc);
         unixctl_server_wait(unixctl);
         SHASH_FOR_EACH(node, all_dbs) {
@@ -231,7 +237,6 @@ main_loop(struct ovsdb_jsonrpc_server *jsonrpc, struct shash *all_dbs,
         }
     }
 
-    disconnect_active_server();
     free(remotes_error);
 }
 
@@ -1345,6 +1350,9 @@ ovsdb_server_add_database(struct unixctl_conn *conn, int argc OVS_UNUSED,
     error = open_db(config, filename);
     if (!error) {
         save_config(config);
+        if (is_backup_server) {
+            ovsdb_replication_init(config->all_dbs);
+        }
         unixctl_command_reply(conn, NULL);
     } else {
         unixctl_command_reply_error(conn, error);
@@ -1375,6 +1383,9 @@ ovsdb_server_remove_database(struct unixctl_conn *conn, int argc OVS_UNUSED,
     shash_delete(config->all_dbs, node);
 
     save_config(config);
+    if (is_backup_server) {
+        ovsdb_replication_init(config->all_dbs);
+    }
     unixctl_command_reply(conn, NULL);
 }
 
diff --git a/ovsdb/replication.c b/ovsdb/replication.c
index 9658145..16fd323 100644
--- a/ovsdb/replication.c
+++ b/ovsdb/replication.c
@@ -38,27 +38,16 @@
 VLOG_DEFINE_THIS_MODULE(replication)
 
 static char *active_ovsdb_server;
-static struct jsonrpc *rpc;
-static struct sset monitored_tables = SSET_INITIALIZER(&monitored_tables);
-static bool reset_dbs = true;
-
-static struct jsonrpc *open_jsonrpc(const char *server);
-static struct ovsdb_error *check_jsonrpc_error(int error,
-                                               struct jsonrpc_msg **reply_);
-static void fetch_dbs(struct jsonrpc *rpc, struct svec *dbs);
-static struct ovsdb_schema *fetch_schema(struct jsonrpc *rpc,
-                                         const char *database);
-
-static void send_monitor_requests(void);
+static struct jsonrpc_session *session = NULL;
+static unsigned int session_seqno = UINT_MAX;
+
+static struct jsonrpc_msg *create_monitor_request(struct ovsdb *db);
 static void add_monitored_table(struct ovsdb_table_schema *table,
                                 struct json *monitor_requests);
 
-static void get_initial_db_state(struct ovsdb *db);
-static void reset_database(struct ovsdb *db, struct ovsdb_txn *txn);
-static struct ovsdb_error *reset_databases(void);
+static struct ovsdb_error *reset_database(struct ovsdb *db);
 
-static void check_for_notifications(void);
-static void process_notification(struct json *table_updates, struct ovsdb *db);
+static struct ovsdb_error *process_notification(struct json *, struct ovsdb *);
 static struct ovsdb_error *process_table_update(struct json *table_update,
                                                 const char *table_name,
                                                 struct ovsdb *database,
@@ -97,11 +86,23 @@ bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
 static void request_ids_destroy(void);
 void request_ids_clear(void);
 
+enum ovsdb_replication_state {
+    RPL_S_DB_REQUESTED,
+    RPL_S_SCHEMA_REQUESTED,
+    RPL_S_MONITOR_REQUESTED,
+    RPL_S_REPLICATING,
+    RPL_S_ERR /* Error, no longer replicating. */
+};
+static enum ovsdb_replication_state state;
+
+
+/* All DBs known to ovsdb-server.  The actual replication dbs are stored
+ * in 'replication dbs', which is a subset of all dbs and remote dbs whose
+ * schema matches.  */
+static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
+static struct shash *replication_dbs = NULL;
 
-/* Currently replicating DBs.
- *  replication_dbs is an shash of 'struct ovsdb *'s that stores the
- *  replicating  dbs.  */
-static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
+static struct shash *replication_db_clone(struct shash *dbs);
 /* Find 'struct ovsdb' by name within 'replication_dbs' */
 static struct ovsdb* find_db(const char *db_name);
 
@@ -109,57 +110,206 @@ static struct ovsdb* find_db(const char *db_name);
 void
 replication_init(void)
 {
-    shash_clear(&replication_dbs);
-    if (rpc) {
-        disconnect_active_server();
+    shash_destroy(replication_dbs);
+    replication_dbs = NULL;
+
+    shash_clear(&local_dbs);
+    if (session) {
+        jsonrpc_session_close(session);
     }
-    reset_dbs = true;
+
+    session = jsonrpc_session_open(active_ovsdb_server, true);
+    session_seqno = UINT_MAX;
 }
 
 void
-replication_add_db(const char *database, struct ovsdb *db)
+replication_add_local_db(const char *database, struct ovsdb *db)
 {
     struct shash_node *node = xmalloc(sizeof *node);
-    shash_add_assert(&replication_dbs, database, db);
+    shash_add_assert(&local_dbs, database, db);
 }
 
 void
 replication_run(void)
 {
-    if (sset_is_empty(&monitored_tables) && active_ovsdb_server) {
-        /* Reset local databases. */
-        if (reset_dbs) {
-            struct ovsdb_error *error = reset_databases();
-            if (error) {
-                /* In case reset DB fails, log the error before exiting.  */
-                char *msg = ovsdb_error_to_string(error);
-                ovsdb_error_destroy(error);
-                VLOG_FATAL("Failed to reset DB, (%s)", msg);
-            } else {
-                reset_dbs = false;
-            }
+    struct ovsdb *db;
+    if (!session) {
+        return;
+    }
+
+    jsonrpc_session_run(session);
+
+    int i;
+    for (i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
+        struct jsonrpc_msg *msg;
+        unsigned int seqno;
+
+        seqno = jsonrpc_session_get_seqno(session);
+        if (seqno != session_seqno) {
+            session_seqno = seqno;
+            request_ids_clear();
+            struct jsonrpc_msg *request;
+            request = jsonrpc_create_request("list_dbs",
+                                             json_array_create_empty(), NULL);
+            request_ids_add(request->id, NULL);
+            jsonrpc_session_send(session, request);
+
+            shash_destroy(replication_dbs);
+            replication_dbs = replication_db_clone(&local_dbs);
+
+            state = RPL_S_DB_REQUESTED;
         }
 
-        /* Open JSON-RPC. */
-        jsonrpc_close(rpc);
-        rpc = open_jsonrpc(active_ovsdb_server);
-        if (!rpc) {
-            return;
+        msg = jsonrpc_session_recv(session);
+        if (!msg) {
+            continue;
         }
 
-        /* Send monitor requests. */
-        send_monitor_requests();
-    }
-    if (!sset_is_empty(&monitored_tables)) {
-        check_for_notifications();
+        if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
+            && !strcmp(msg->method, "update")) {
+            if (msg->params->type == JSON_ARRAY
+                && msg->params->u.array.n == 2
+                && msg->params->u.array.elems[0]->type == JSON_STRING) {
+                char *db_name = msg->params->u.array.elems[0]->u.string;
+                db = find_db(db_name);
+                if (db) {
+                    struct ovsdb_error *error;
+                    error = process_notification(msg->params->u.array.elems[1],
+                                                 db);
+                    if (error) {
+                        ovsdb_error_assert(error);
+                        state = RPL_S_ERR;
+                    }
+                }
+            }
+        } else if (msg->type == JSONRPC_REPLY &&
+                   request_ids_lookup_and_free(msg->id, &db)) {
+            switch (state) {
+            case RPL_S_DB_REQUESTED:
+                if (msg->result->type != JSON_ARRAY) {
+                    struct ovsdb_error *error;
+                    error = ovsdb_error("list-dbs failed",
+                                        "list_dbs response is not array");
+                    ovsdb_error_assert(error);
+                    state = RPL_S_ERR;
+                } else {
+                    size_t i;
+                    for (i = 0; i < msg->result->u.array.n; i++) {
+                        const struct json *name = msg->result->u.array.elems[i];
+                        if (name->type == JSON_STRING) {
+                            /* Send one schema request for each remote DB. */
+                            const char *db_name = json_string(name);
+                            db = find_db(db_name);
+                            if (db) {
+                                struct jsonrpc_msg *request =
+                                    jsonrpc_create_request(
+                                        "get_schema",
+                                        json_array_create_1(
+                                            json_string_create(db_name)),
+                                        NULL);
+
+                                request_ids_add(request->id, db);
+                                jsonrpc_session_send(session, request);
+                            }
+                        }
+                    }
+                    state = RPL_S_SCHEMA_REQUESTED;
+                }
+                break;
+
+            case RPL_S_SCHEMA_REQUESTED: {
+                struct ovsdb_schema *schema;
+                struct ovsdb_error *error;
+
+                error = ovsdb_schema_from_json(msg->result, &schema);
+                if (error) {
+                    ovsdb_error_assert(error);
+                    state = RPL_S_ERR;
+                }
+
+                if (db != find_db(schema->name)) {
+                    /* Unexpceted schema */
+                    VLOG_WARN("unexpcted schema %s", schema->name);
+                    state = RPL_S_ERR;
+                } else if (!ovsdb_schema_equal(schema, db->schema)) {
+                    /* Schmea version mismatch. */
+                    VLOG_INFO("Schema version mismatch, %s not replicated",
+                              schema->name);
+                    shash_find_and_delete(replication_dbs, schema->name);
+                }
+                ovsdb_schema_destroy(schema);
+
+                /* After receving schemas, reset the local databases that
+                 * will be monitored and send out monitor requests for them. */
+                if (hmap_is_empty(&request_ids)) {
+                    struct shash_node *node, *next;
+
+                    SHASH_FOR_EACH_SAFE(node, next, replication_dbs) {
+                        db = node->data;
+                        struct ovsdb_error *error = reset_database(db);
+                        if (error) {
+                            const char *db_name = db->schema->name;
+                            shash_find_and_delete(replication_dbs, db_name);
+                            ovsdb_error_assert(error);
+                            VLOG_WARN("Failed to reset databse,"
+                                      "%s not replicated", db_name);
+                        }
+                    }
+
+                    if (shash_is_empty(replication_dbs)) {
+                        VLOG_WARN("Nothing to replicate");
+                        state = RPL_S_ERR;
+                    } else {
+                        SHASH_FOR_EACH (node, replication_dbs) {
+                            db = node->data;
+                            struct jsonrpc_msg *request =
+                                create_monitor_request(db);
+
+                            request_ids_add(request->id, db);
+                            jsonrpc_session_send(session, request);
+                            state = RPL_S_MONITOR_REQUESTED;
+                        }
+                    }
+                }
+                break;
+            }
+
+            case RPL_S_MONITOR_REQUESTED: {
+                /* Reply to monitor requests. */
+                struct ovsdb_error *error;
+                error = process_notification(msg->result, db);
+                if (error) {
+                    ovsdb_error_assert(error);
+                    state = RPL_S_ERR;
+                } else {
+                    /* Transition to replicating state after receving
+                     * all replies of "monitor" requests. */
+                    if (hmap_is_empty(&request_ids)) {
+                        state = RPL_S_REPLICATING;
+                    }
+                }
+                break;
+            }
+
+            case RPL_S_ERR:
+                /* Ignore all messages */
+                break;
+
+            case RPL_S_REPLICATING:
+            default:
+                OVS_NOT_REACHED();
+            }
+        }
+    jsonrpc_msg_destroy(msg);
     }
 }
 
 void
 replication_wait(void)
 {
-    if (rpc) {
-        jsonrpc_wait(rpc);
+    if (session) {
+        jsonrpc_session_wait(session);
+        jsonrpc_session_recv_wait(session);
     }
 }
 
@@ -296,17 +446,13 @@ blacklist_tables_find(const char *database, const char *table)
 void
 disconnect_active_server(void)
 {
-    jsonrpc_close(rpc);
-    rpc = NULL;
-    sset_clear(&monitored_tables);
-    shash_clear(&replication_dbs);
+    jsonrpc_session_close(session);
+    session = NULL;
 }
 
 void
 replication_destroy(void)
 {
-    disconnect_active_server();
-    sset_destroy(&monitored_tables);
     blacklist_tables_clear();
     shash_destroy(&blacklist_tables);
 
@@ -316,34 +462,22 @@ replication_destroy(void)
     }
 
     request_ids_destroy();
-    shash_destroy(&replication_dbs);
+    shash_destroy(replication_dbs);
+    replication_dbs = NULL;
+
+    shash_destroy(&local_dbs);
 }
 
 static struct ovsdb *
 find_db(const char *db_name)
 {
-    return shash_find_data(&replication_dbs, db_name);
+    return shash_find_data(replication_dbs, db_name);
 }
 
 static struct ovsdb_error *
-reset_databases()
-{
-    struct shash_node *db_node;
-    struct ovsdb_error *error = NULL;
-
-    SHASH_FOR_EACH (db_node, &replication_dbs) {
-        struct ovsdb *db = db_node->data;
-        struct ovsdb_txn *txn = ovsdb_txn_create(db);
-        reset_database(db, txn);
-        error = ovsdb_txn_commit(txn, false);
-    }
-
-    return error;
-}
-
-static void
-reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
+reset_database(struct ovsdb *db)
 {
+    struct ovsdb_txn *txn = ovsdb_txn_create(db);
     struct shash_node *table_node;
 
     SHASH_FOR_EACH (table_node, &db->tables) {
@@ -356,169 +490,45 @@ reset_database(struct ovsdb *db, struct ovsdb_txn *txn)
             }
         }
     }
-}
-
-static struct jsonrpc *
-open_jsonrpc(const char *server)
-{
-    struct stream *stream;
-    int error;
-
-    error = jsonrpc_stream_open(server, &stream, DSCP_DEFAULT);
-
-    return error ? NULL : jsonrpc_open(stream);
-}
-
-static struct ovsdb_error *
-check_jsonrpc_error(int error, struct jsonrpc_msg **reply_)
-{
-    struct jsonrpc_msg *reply = *reply_;
-
-    if (error) {
-        return ovsdb_error("transaction failed",
-                           "transaction returned error %d",
-                           error);
-    }
-
-    if (reply->error) {
-        return ovsdb_error("transaction failed",
-                           "transaction returned error: %s",
-                           json_to_string(reply->error, 0));
-    }
-    return NULL;
-}
-
-static void
-fetch_dbs(struct jsonrpc *rpc, struct svec *dbs)
-{
-    struct jsonrpc_msg *request, *reply;
-    struct ovsdb_error *error;
-    size_t i;
-
-    request = jsonrpc_create_request("list_dbs", json_array_create_empty(),
-                                     NULL);
-
-    error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
-                                &reply);
-    if (error) {
-        ovsdb_error_assert(error);
-        return;
-    }
-
-    if (reply->result->type != JSON_ARRAY) {
-        ovsdb_error_assert(ovsdb_error("list-dbs failed",
-                                       "list_dbs response is not array"));
-        return;
-    }
-
-    for (i = 0; i < reply->result->u.array.n; i++) {
-        const struct json *name = reply->result->u.array.elems[i];
-
-        if (name->type != JSON_STRING) {
-            ovsdb_error_assert(ovsdb_error(
-                                   "list_dbs failed",
-                                   "list_dbs response %"PRIuSIZE" is not string",
-                                   i));
-        }
-        svec_add(dbs, name->u.string);
-    }
-    jsonrpc_msg_destroy(reply);
-    svec_sort(dbs);
-}
-
-static struct ovsdb_schema *
-fetch_schema(struct jsonrpc *rpc, const char *database)
-{
-    struct jsonrpc_msg *request, *reply;
-    struct ovsdb_schema *schema;
-    struct ovsdb_error *error;
 
-    request = jsonrpc_create_request("get_schema",
-                                     json_array_create_1(
-                                         json_string_create(database)),
-                                     NULL);
-    error = check_jsonrpc_error(jsonrpc_transact_block(rpc, request, &reply),
-                                &reply);
-    if (error) {
-        jsonrpc_msg_destroy(reply);
-        ovsdb_error_assert(error);
-        return NULL;
-    }
-
-    error = ovsdb_schema_from_json(reply->result, &schema);
-    if (error) {
-        jsonrpc_msg_destroy(reply);
-        ovsdb_error_assert(error);
-        return NULL;
-    }
-    jsonrpc_msg_destroy(reply);
-
-    return schema;
+    return ovsdb_txn_commit(txn, false);
 }
 
-static void
-send_monitor_requests(void)
+/* Create a monitor request for 'db'. The monitor request will include
+ * any tables from 'blacklisted_tables'
+ *
+ * Caller is responsible for disposing 'request'.
+ */
+static struct jsonrpc_msg *
+create_monitor_request(struct ovsdb *db)
 {
-    const char *db_name;
-    struct svec dbs;
-    size_t i;
-
-    svec_init(&dbs);
-    fetch_dbs(rpc, &dbs);
-    SVEC_FOR_EACH (i, db_name, &dbs) {
-        struct ovsdb *db = find_db(db_name);
-
-        if (db) {
-            struct ovsdb_schema *local_schema, *remote_schema;
+    struct jsonrpc_msg *request;
+    struct json *monitor;
+    struct ovsdb_schema *schema = db->schema;
+    const char *db_name = schema->name;
 
-            local_schema = db->schema;
-            remote_schema = fetch_schema(rpc, db_name);
-            if (ovsdb_schema_equal(local_schema, remote_schema)) {
-                struct jsonrpc_msg *request;
-                struct json *monitor, *monitor_request;
+    struct json *monitor_request = json_object_create();
+    size_t n = shash_count(&schema->tables);
+    const struct shash_node **nodes = shash_sort(&schema->tables);
 
-                monitor_request = json_object_create();
-                size_t n = shash_count(&local_schema->tables);
-                const struct shash_node **nodes = shash_sort(
-                    &local_schema->tables);
+    for (int j = 0; j < n; j++) {
+        struct ovsdb_table_schema *table = nodes[j]->data;
 
-                for (int j = 0; j < n; j++) {
-                    struct ovsdb_table_schema *table = nodes[j]->data;
-
-                    /* Monitor all tables not blacklisted. */
-                    if (!blacklist_tables_find(db_name, table->name)) {
-                        add_monitored_table(table, monitor_request);
-                    }
-                }
-                free(nodes);
-
-                /* Send monitor request. */
-                monitor = json_array_create_3(
-                    json_string_create(db_name),
-                    json_string_create(db_name),
-                    monitor_request);
-                request = jsonrpc_create_request("monitor", monitor, NULL);
-                jsonrpc_send(rpc, request);
-                get_initial_db_state(db);
-            }
-            ovsdb_schema_destroy(remote_schema);
+        /* Monitor all tables not blacklisted. */
+        if (!blacklist_tables_find(db_name, table->name)) {
+            add_monitored_table(table, monitor_request);
         }
     }
-    svec_destroy(&dbs);
-}
+    free(nodes);
 
-static void
-get_initial_db_state(struct ovsdb *db)
-{
-    struct jsonrpc_msg *msg;
-
-    jsonrpc_recv_block(rpc, &msg);
-
-    if (msg->type == JSONRPC_REPLY) {
-        process_notification(msg->result, db);
-    }
+    /* Create a monitor request. */
+    monitor = json_array_create_3(
+        json_string_create(db_name),
+        json_string_create(db_name),
+        monitor_request);
+    request = jsonrpc_create_request("monitor", monitor, NULL);
 
-    jsonrpc_msg_destroy(msg);
+    return request;
 }
 
 static void
@@ -527,91 +537,44 @@ add_monitored_table(struct ovsdb_table_schema *table,
 {
     struct json *monitor_request_array;
 
-    sset_add(&monitored_tables, table->name);
-
     monitor_request_array = json_array_create_empty();
     json_array_add(monitor_request_array, json_object_create());
 
     json_object_put(monitor_request, table->name, monitor_request_array);
 }
-
-static void
-check_for_notifications()
-{
-    struct jsonrpc_msg *msg;
-    int error;
-
-    error = jsonrpc_recv(rpc, &msg);
-    if (error == EAGAIN) {
-        return;
-    } else if (error) {
-        jsonrpc_close(rpc);
-        rpc = open_jsonrpc(active_ovsdb_server);
-        if (!rpc) {
-            /* Active server went down. */
-            disconnect_active_server();
-        }
-        jsonrpc_msg_destroy(msg);
-        return;
-    }
-    if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
-        jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
-                                               msg->id));
-    } else if (msg->type == JSONRPC_NOTIFY
-               && !strcmp(msg->method, "update")) {
-        struct json *params = msg->params;
-        if (params->type == JSON_ARRAY
-            && params->u.array.n == 2) {
-            char *db_name = params->u.array.elems[0]->u.string;
-            struct ovsdb *db = find_db(db_name);
-            if (db) {
-                process_notification(params->u.array.elems[1], db);
-            }
-        }
-    }
-    jsonrpc_msg_destroy(msg);
-    jsonrpc_run(rpc);
-}
 
-static void
+
+static struct ovsdb_error *
 process_notification(struct json *table_updates, struct ovsdb *db)
 {
-    struct ovsdb_error *error;
+    struct ovsdb_error *error = NULL;
     struct ovsdb_txn *txn;
 
-    if (table_updates->type != JSON_OBJECT) {
-        sset_clear(&monitored_tables);
-        return;
-    }
-
-    txn = ovsdb_txn_create(db);
-    error = NULL;
-
-    /* Process each table update. */
-    struct shash_node *node;
-    SHASH_FOR_EACH (node, json_object(table_updates)) {
-        struct json *table_update = node->data;
-        if (table_update) {
-            error = process_table_update(table_update, node->name, db, txn);
-            if (error) {
-                break;
+    if (table_updates->type == JSON_OBJECT) {
+        txn = ovsdb_txn_create(db);
+
+        /* Process each table update. */
+        struct shash_node *node;
+        SHASH_FOR_EACH (node, json_object(table_updates)) {
+            struct json *table_update = node->data;
+            if (table_update) {
+                error = process_table_update(table_update, node->name, db, txn);
+                if (error) {
+                    break;
+                }
             }
         }
-    }
 
-    if (error) {
-        ovsdb_txn_abort(txn);
-        goto error;
+        if (error) {
+            ovsdb_txn_abort(txn);
+            return error;
+        } else {
+            /* Commit transaction. */
+            error = ovsdb_txn_commit(txn, false);
+        }
     }
 
-    /* Commit transaction. */
-    error = ovsdb_txn_commit(txn, false);
-
-error:
-    if (error) {
-        ovsdb_error_assert(error);
-        disconnect_active_server();
-    }
+    return error;
 }
 
 static struct ovsdb_error *
@@ -803,7 +766,7 @@ request_ids_add(const struct json *id, struct ovsdb *db)
  * 'request_ids' and free its memory. If not found, 'request_ids' does
  * not change.  '*db' is only valid when return true.
  *
- * Return ture if 'id' is found. False otherwise.
+ * Return true if 'id' is found. False otherwise.
  */
 bool
 request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
@@ -848,6 +811,53 @@ request_ids_clear(void)
     hmap_init(&request_ids);
 }
 
+static struct shash *
+replication_db_clone(struct shash *dbs)
+{
+    struct shash *new = xmalloc(sizeof *new);
+    shash_init(new);
+
+    struct shash_node *node;
+    SHASH_FOR_EACH (node, dbs) {
+        shash_add(new, node->name, node->data);
+    }
+
+    return new;
+}
+
+/* Return true if replication just stared or is on going.
+ * Return false if the connection failed, or the replication
+ * was not able to start. */
+bool
+replication_is_alive(void)
+{
+    if (session) {
+        return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
+    }
+    return false;
+}
+
+/* Return the last error reported on a connection by 'session'. The
+ * return value is 0 if replication is not currently running, or
+ * if replication session has not encountered any error.
+ *
+ * Return a negative value if replication session has error, or the
+ * replication was not able to start.  */
+int
+replication_get_last_error(void)
+{
+    int err = 0;
+
+    if (session) {
+        err = jsonrpc_session_get_last_error(session);
+        if (!err) {
+            err = (state == RPL_S_ERR) ?  ENOENT : 0;
+        }
+    }
+
+    return err;
+}
+
 void
 replication_usage(void)
 {
diff --git a/ovsdb/replication.h b/ovsdb/replication.h
index 7745b47..9c1aac3 100644
--- a/ovsdb/replication.h
+++ b/ovsdb/replication.h
@@ -26,7 +26,9 @@ void replication_run(void);
 void replication_wait(void);
 void replication_destroy(void);
 void replication_usage(void);
-void replication_add_db(const char *databse, struct ovsdb *db);
+void replication_add_local_db(const char *databse, struct ovsdb *db);
+bool replication_is_alive(void);
+int replication_get_last_error(void);
 
 void set_active_ovsdb_server(const char *remote_server);
 const char *get_active_ovsdb_server(void);
-- 
1.9.1




More information about the dev mailing list