[ovs-dev] [PATCH 16/20] ovsdb: add ovsdb_monitor_changes

Andy Zhou azhou at nicira.com
Thu Mar 19 07:08:38 UTC 2015


Currently, each monitor table contains a single hmap 'changes' to
track updates. This patch introduces a new data structure
'ovsdb_monitor_changes' that stores the updates 'rows' tagged by
its first commit transaction id. Each 'ovsdb_monitor_changes' is
refenece counted allowing multiple jsonrpc_monitors to share them.

The next patch will allow each ovsdb monitor table to store a list
of 'ovsdb_monitor_changes'. This patch stores only one, same as
before.

Signed-off-by: Andy Zhou <azhou at nicira.com>
---
 ovsdb/jsonrpc-server.c |  16 ++++--
 ovsdb/ovsdb-monitor.c  | 133 +++++++++++++++++++++++++++++++++++++++++++------
 ovsdb/ovsdb-monitor.h  |   3 ++
 3 files changed, 133 insertions(+), 19 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index ddcacbb..c9d84a5 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -1280,11 +1280,19 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
 }
 
 static struct json *
-ovsdb_jsonrpc_monitor_compose_update(
-    struct ovsdb_jsonrpc_monitor *monitor, bool initial)
+ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
+                                     bool initial)
 {
-    return ovsdb_monitor_compose_update(monitor->dbmon, initial,
-                                        &monitor->unflushed);
+    uint64_t unflushed;
+    struct json *json;
+
+    unflushed = initial ? 0 : m->unflushed;
+
+    json = ovsdb_monitor_compose_update(m->dbmon, initial,
+                                        &m->unflushed);
+    ovsdb_monitor_renew_tracking_changes(m->dbmon, unflushed, m->unflushed);
+
+    return json;
 }
 
 static bool
diff --git a/ovsdb/ovsdb-monitor.c b/ovsdb/ovsdb-monitor.c
index 397aa2a..a1efa66 100644
--- a/ovsdb/ovsdb-monitor.c
+++ b/ovsdb/ovsdb-monitor.c
@@ -73,6 +73,22 @@ struct ovsdb_monitor_row {
     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
 };
 
+/* Contains 'struct ovsdb_monitor_row's for rows that have been
+ * updated but not yet flushed to all the jsonrpc connection.
+ *
+ * 'n_refs' represent the number of jsonrpc connections that have
+ * not received updates. Generate the update for the last jsonprc
+ * connection will also remove rows contained in 'changes'.
+ *
+ * 'transaction' stores the first update's transaction id.
+ * */
+struct ovsdb_monitor_changes {
+    struct ovsdb_monitor_table *mt;
+    struct hmap rows;
+    int n_refs;
+    uint64_t transaction;
+};
+
 /* A particular table being monitored. */
 struct ovsdb_monitor_table {
     const struct ovsdb_table *table;
@@ -87,10 +103,16 @@ struct ovsdb_monitor_table {
 
     /* Contains 'struct ovsdb_monitor_row's for rows that have been
      * updated but not yet flushed to the jsonrpc connection. */
-    struct hmap changes;
+    struct ovsdb_monitor_changes *changes;
 };
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
+static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+                                            uint64_t next_txn);
+static void ovsdb_monitor_changes_destroy_rows(
+                                  struct ovsdb_monitor_changes *changes);
+static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  uint64_t transaction);
 
 static int
 compare_ovsdb_monitor_column(const void *a_, const void *b_)
@@ -108,7 +130,7 @@ ovsdb_monitor_cast(struct ovsdb_replica *replica)
     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
 }
 
-/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
+/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
  * given 'uuid', or NULL if there is no such row. */
 static struct ovsdb_monitor_row *
 ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
@@ -116,7 +138,8 @@ ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
 {
     struct ovsdb_monitor_row *row;
 
-    HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
+    HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
+                             &mt->changes->rows) {
         if (uuid_equals(uuid, &row->uuid)) {
             return row;
         }
@@ -235,7 +258,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
 
     mt = xzalloc(sizeof *mt);
     mt->table = table;
-    hmap_init(&mt->changes);
+    mt->changes = NULL;
     shash_add(&m->tables, table->schema->name, mt);
 }
 
@@ -288,6 +311,70 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
     return NULL;
 }
 
+static void
+ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+                                uint64_t next_txn)
+{
+    struct ovsdb_monitor_changes *changes;
+
+    changes = xzalloc(sizeof *changes);
+
+    changes->transaction = next_txn;
+    changes->mt = mt;
+    changes->n_refs = 1;
+    hmap_init(&changes->rows);
+    mt->changes = changes;
+};
+
+/* Stop currently tracking changes to table 'mt' since 'transaction'.
+ *
+ * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
+static void
+ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+                                    uint64_t transaction)
+{
+    struct ovsdb_monitor_changes *changes;
+
+    changes = mt->changes;
+
+    if (changes) {
+        ovs_assert(changes->transaction == transaction);
+        if (--changes->n_refs == 0) {
+            ovsdb_monitor_changes_destroy_rows(changes);
+            free(changes);
+            mt->changes = NULL;
+        }
+    }
+}
+
+/* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
+ */
+static void
+ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  uint64_t transaction)
+{
+    struct ovsdb_monitor_changes *changes;
+
+    changes = mt->changes;
+    if (changes) {
+        ovs_assert(false);
+    } else {
+        ovsdb_monitor_table_add_changes(mt, transaction);
+    }
+}
+
+static void
+ovsdb_monitor_changes_destroy_rows(struct ovsdb_monitor_changes *changes)
+{
+    struct ovsdb_monitor_row *row, *next;
+
+    HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+        hmap_remove(&changes->rows, &row->hmap_node);
+        ovsdb_monitor_row_destroy(changes->mt, row);
+    }
+    hmap_destroy(&changes->rows);
+}
+
 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
  * 'mt', or NULL if no row update should be sent.
  *
@@ -404,7 +491,11 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
         struct ovsdb_monitor_row *row, *next;
         struct json *table_json = NULL;
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
+        if (!mt->changes) {
+            continue;
+        }
+
+        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes->rows) {
             struct json *row_json;
 
             row_json = ovsdb_monitor_compose_row_update(
@@ -428,7 +519,7 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
                 json_object_put(table_json, uuid, row_json);
             }
 
-            hmap_remove(&mt->changes, &row->hmap_node);
+            hmap_remove(&mt->changes->rows, &row->hmap_node);
             ovsdb_monitor_row_destroy(mt, row);
         }
     }
@@ -495,7 +586,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     change = ovsdb_monitor_row_find(mt, uuid);
     if (!change) {
         change = xmalloc(sizeof *change);
-        hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
+        hmap_insert(&mt->changes->rows, &change->hmap_node, uuid_hash(uuid));
         change->uuid = *uuid;
         change->old = clone_monitor_row_data(mt, old);
         change->new = clone_monitor_row_data(mt, new);
@@ -508,7 +599,7 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
 
             if (!change->old) {
                 /* This row was added then deleted.  Forget about it. */
-                hmap_remove(&mt->changes, &change->hmap_node);
+                hmap_remove(&mt->changes->rows, &change->hmap_node);
                 free(change);
             }
         }
@@ -529,6 +620,10 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
         if (mt->select & OJMS_INITIAL) {
             struct ovsdb_row *row;
 
+            if (!mt->changes) {
+                ovsdb_monitor_table_add_changes(mt, 0);
+            }
+
             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
                 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
             }
@@ -563,6 +658,20 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
     ovs_assert(true);
 }
 
+void
+ovsdb_monitor_renew_tracking_changes(struct ovsdb_monitor *dbmon,
+                                     uint64_t prev_txn, uint64_t next_txn)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+
+        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
+        ovsdb_monitor_table_track_changes(mt, next_txn);
+    }
+}
+
 static void
 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 {
@@ -572,14 +681,8 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
-        struct ovsdb_monitor_row *row, *next;
-
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
-            hmap_remove(&mt->changes, &row->hmap_node);
-            ovsdb_monitor_row_destroy(mt, row);
-        }
-        hmap_destroy(&mt->changes);
 
+        ovsdb_monitor_changes_destroy_rows(mt->changes);
         free(mt->columns);
         free(mt);
     }
diff --git a/ovsdb/ovsdb-monitor.h b/ovsdb/ovsdb-monitor.h
index ea2a7aa..c003184 100644
--- a/ovsdb/ovsdb-monitor.h
+++ b/ovsdb/ovsdb-monitor.h
@@ -57,4 +57,7 @@ bool ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
                                uint64_t next_transaction);
 
 void ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon);
+
+void ovsdb_monitor_renew_tracking_changes(struct ovsdb_monitor *dbmon,
+                                     uint64_t prev_txn, uint64_t next_txn);
 #endif
-- 
1.9.1




More information about the dev mailing list