[ovs-dev] [PATCH 13/19] ovsdb-monitor: add transaction ids

Andy Zhou azhou at nicira.com
Fri Apr 10 00:11:32 UTC 2015


With N:1 mappings, multiple jsonrpc server may be servicing the rpc
connection at a different pace. ovsdb-monitor thus needs to maintain
different change sets, depends on connection speed of each rpc
connections. Connections servicing at the same speed can share the
same change set.

Transaction ID is an concept added to describe the change set. One
 possible view of the database state is a sequence of changes, more
 precisely, commits be applied to it in order, starting from an
 initial state, with commit 0. The logic can also be applied to the
 jsonrpc monitor; each change it pushes corresponds to commits between
 two transaction IDs.

This patch introduces transaction IDs. For ovsdb-monitor, it maintains
n_transactions, starting from 0. Each commit add 1 to the number.
Jsonrpc maintains and 'unflushed' transaction number, corresponding to
the next commit the remote has not seen. jsonrpc's job is simply to
notice there are changes in the ovsdb-monitor that it is interested in,
i.e.  'n_transactions' >= 'unflushed', get the changes in json format,
and push them to the remote site.

Signed-off-by: Andy Zhou <azhou at nicira.com>
Acked-by: Ben Pfaff <blp at nicira.com>

---
v1->v2: style fixes
---
 ovsdb/jsonrpc-server.c | 12 ++++++++----
 ovsdb/monitor.c        | 24 +++++++++++-------------
 ovsdb/monitor.h        |  7 ++++---
 3 files changed, 23 insertions(+), 20 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index c7d404f..9809a8f 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -90,7 +90,7 @@ static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
 static struct json *ovsdb_jsonrpc_monitor_compose_table_update(
-    const struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+    struct ovsdb_jsonrpc_monitor *monitor, bool initial);
 
 
 /* JSON-RPC database server. */
@@ -1037,6 +1037,8 @@ struct ovsdb_jsonrpc_monitor {
     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
     struct json *monitor_id;
     struct ovsdb_monitor *dbmon;
+    uint64_t unflushed;         /* The first transaction that has not been
+                                       flushed to the jsonrpc remote client. */
 };
 
 static struct ovsdb_jsonrpc_monitor *
@@ -1181,6 +1183,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     m->session = s;
     m->db = db;
     m->dbmon = ovsdb_monitor_create(db, m);
+    m->unflushed = 0;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
 
@@ -1280,9 +1283,10 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
 
 static struct json *
 ovsdb_jsonrpc_monitor_compose_table_update(
-    const struct ovsdb_jsonrpc_monitor *monitor, bool initial)
+    struct ovsdb_jsonrpc_monitor *monitor, bool initial)
 {
-    return ovsdb_monitor_compose_table_update(monitor->dbmon, initial);
+    return ovsdb_monitor_compose_table_update(monitor->dbmon, initial,
+                                      &monitor->unflushed);
 }
 
 static bool
@@ -1291,7 +1295,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
     struct ovsdb_jsonrpc_monitor *m;
 
     HMAP_FOR_EACH (m, node, &s->monitors) {
-        if (ovsdb_monitor_needs_flush(m->dbmon)) {
+        if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
             return true;
         }
     }
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 415e5c8..ff7ffa4 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -49,6 +49,7 @@ struct ovsdb_monitor {
     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
     struct ovsdb *db;
+    uint64_t n_transactions;      /* Count number of committed transactions. */
 };
 
 struct jsonrpc_monitor_node {
@@ -214,6 +215,7 @@ ovsdb_monitor_create(struct ovsdb *db,
     ovsdb_add_replica(db, &dbmon->replica);
     list_init(&dbmon->jsonrpc_monitors);
     dbmon->db = db;
+    dbmon->n_transactions = 0;
     shash_init(&dbmon->tables);
 
     jm = xzalloc(sizeof *jm);
@@ -376,14 +378,16 @@ ovsdb_monitor_compose_row_update(
  * be used as part of the initial reply to a "monitor" request, false if it is
  * going to be used as part of an "update" notification. */
 struct json *
-ovsdb_monitor_compose_table_update(
-    const struct ovsdb_monitor *dbmon, bool initial)
+ovsdb_monitor_compose_table_update(const struct ovsdb_monitor *dbmon,
+                                   bool initial, uint64_t *unflushed)
 {
     struct shash_node *node;
     unsigned long int *changed;
     struct json *json;
     size_t max_columns;
 
+    *unflushed = dbmon->n_transactions + 1;
+
     max_columns = 0;
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
@@ -432,18 +436,11 @@ ovsdb_monitor_compose_table_update(
 }
 
 bool
-ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon)
+ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+                          uint64_t next_transaction)
 {
-    struct shash_node *node;
-
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
-
-        if (!hmap_is_empty(&mt->changes)) {
-            return true;
-        }
-    }
-    return false;
+    ovs_assert(next_transaction <= dbmon->n_transactions + 1);
+    return (next_transaction <= dbmon->n_transactions);
 }
 
 void
@@ -596,6 +593,7 @@ ovsdb_monitor_commit(struct ovsdb_replica *replica,
 
     ovsdb_monitor_init_aux(&aux, m);
     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+    m->n_transactions++;
 
     return NULL;
 }
diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
index 829a135..5cb55dc 100644
--- a/ovsdb/monitor.h
+++ b/ovsdb/monitor.h
@@ -46,14 +46,15 @@ const char * OVS_WARN_UNUSED_RESULT
 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
                           const struct ovsdb_table *);
 
-struct json *ovsdb_monitor_compose_table_update(
-    const struct ovsdb_monitor *dbmon, bool initial);
+struct json *ovsdb_monitor_compose_table_update(const struct ovsdb_monitor *dbmon,
+                          bool initial, uint64_t *unflushed_transaction);
 
 void ovsdb_monitor_table_set_select(struct ovsdb_monitor *dbmon,
                                     const struct ovsdb_table *table,
                                     enum ovsdb_monitor_selection select);
 
-bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon);
+bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+                               uint64_t next_transaction);
 
 void ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon);
 #endif
-- 
1.9.1




More information about the dev mailing list