[ovs-dev] [PATCH monitor_cond V3 06/10] ovsdb: enable jsonrpc-server to service "monitor_cond_update" request

Liran Schour lirans at il.ibm.com
Wed Feb 3 13:53:30 UTC 2016


ovsdb-server now accepts "monitor_cond_update" request. On conditions update
we insert all rows of table in a new changes list - OVSDB_MONITOR_ALL that are
being indexed by the transaction-id at the moment of insertion.
JSON cache is being used only for empty condition monitor sessions.
Sees ovsdb-server (1) man page for details of monitor_cond_update.

Signed-off-by: Liran Schour <lirans at il.ibm.com>

---
v2->v3:
* ovsdb_monitor_table_condition_update() accepts only single json condition
* Allow non-monitored columns in cond_update.
* Flush monitor session after monitor_cond_update to guarantee empty changes list
* Bug fix: use json cache when all condition are empty
* Simplify inserting row change to changes lists
---
 ovsdb/jsonrpc-server.c | 154 +++++++++++++++++++++++++--
 ovsdb/monitor.c        | 283 ++++++++++++++++++++++++++++++++++++++++++-------
 ovsdb/monitor.h        |  26 +++--
 3 files changed, 415 insertions(+), 48 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index cd6a70a..15dc406 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -87,6 +87,10 @@ static void ovsdb_jsonrpc_trigger_complete_done(
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
     enum ovsdb_monitor_version, const struct json *request_id);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_update(
+    struct ovsdb_jsonrpc_session *s,
+    struct json *params,
+    const struct json *request_id);
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct ovsdb_jsonrpc_session *,
     struct json_array *params,
@@ -407,7 +411,8 @@ static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_get_memory_usage(
     const struct ovsdb_jsonrpc_session *, struct simap *usage);
 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
-                                             struct jsonrpc_msg *);
+                                              struct jsonrpc_msg *,
+                                              bool *);
 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 
@@ -463,13 +468,14 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
 
     if (!jsonrpc_session_get_backlog(s->js)) {
         struct jsonrpc_msg *msg;
+        bool needs_flush = false;
 
         ovsdb_jsonrpc_monitor_flush_all(s);
 
         msg = jsonrpc_session_recv(s->js);
         if (msg) {
             if (msg->type == JSONRPC_REQUEST) {
-                ovsdb_jsonrpc_session_got_request(s, msg);
+                ovsdb_jsonrpc_session_got_request(s, msg, &needs_flush);
             } else if (msg->type == JSONRPC_NOTIFY) {
                 ovsdb_jsonrpc_session_got_notify(s, msg);
             } else {
@@ -480,6 +486,9 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
                 jsonrpc_msg_destroy(msg);
             }
         }
+        if (needs_flush) {
+            ovsdb_jsonrpc_monitor_flush_all(s);
+        }
     }
     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
 }
@@ -840,10 +849,12 @@ execute_transaction(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
 
 static void
 ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
-                                  struct jsonrpc_msg *request)
+                                  struct jsonrpc_msg *request,
+                                  bool *needs_flush)
 {
     struct jsonrpc_msg *reply;
 
+    *needs_flush = false;
     if (!strcmp(request->method, "transact")) {
         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
         if (!reply) {
@@ -861,6 +872,10 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
                                                  version,
                                                  request->id);
         }
+    } else if (!strcmp(request->method, "monitor_cond_update")) {
+        reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params,
+                                                  request->id);
+        *needs_flush = true;
     } else if (!strcmp(request->method, "monitor_cancel")) {
         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
                                              request->id);
@@ -1050,6 +1065,8 @@ struct ovsdb_jsonrpc_monitor {
     struct ovsdb_monitor *dbmon;
     uint64_t unflushed;         /* The first transaction that has not been
                                        flushed to the jsonrpc remote client. */
+    bool all_rows;              /* Indicates if in the next flush we request all
+                                   rows (due to a condition change)          */
     enum ovsdb_monitor_version version;
     struct ovsdb_monitor_session_condition *condition;/* Session's condition */
 };
@@ -1213,6 +1230,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         m->condition = ovsdb_monitor_session_condition_create();
     }
     m->unflushed = 0;
+    m->all_rows = false;
     m->version = version;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
@@ -1294,6 +1312,124 @@ error:
     return jsonrpc_create_error(json, request_id);
 }
 
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_update_request(
+                                struct ovsdb_jsonrpc_monitor *m,
+                                const struct ovsdb_table *table,
+                                const struct json *cond_update_req)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct json *condition, *columns;
+    struct ovsdb_parser parser;
+    struct ovsdb_error *error;
+
+    ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name);
+    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+    condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+
+    error = ovsdb_parser_finish(&parser);
+    if (error) {
+        return error;
+    }
+
+    if (columns) {
+        error = ovsdb_syntax_error(cond_update_req, NULL, "changing columns "
+                                   "is unsupported");
+        return error;
+    }
+    error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
+                                                 condition);
+
+    return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s,
+                                  struct json *params,
+                                  const struct json *request_id)
+{
+    struct ovsdb_error *error;
+    struct ovsdb_jsonrpc_monitor *m;
+    struct json *monitor_cond_update_reqs;
+    struct shash_node *node;
+    struct json *json;
+
+    if (json_array(params)->n != 3) {
+        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+        goto error;
+    }
+
+    m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+    if (!m) {
+        error = ovsdb_syntax_error(request_id, NULL,
+                                   "unknown monitor session");
+        goto error;
+    }
+
+    monitor_cond_update_reqs = params->u.array.elems[2];
+    if (monitor_cond_update_reqs->type != JSON_OBJECT) {
+        error =
+            ovsdb_syntax_error(NULL, NULL,
+                               "monitor-cond-change-requests must be object");
+        goto error;
+    }
+
+    SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) {
+        const struct ovsdb_table *table;
+        const struct json *mr_value;
+        size_t i;
+
+        table = ovsdb_get_table(m->db, node->name);
+        if (!table) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s", node->name);
+            goto error;
+        }
+        if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s in monitor session",
+                                       node->name);
+            goto error;
+        }
+
+        mr_value = node->data;
+        if (mr_value->type == JSON_ARRAY) {
+            const struct json_array *array = &mr_value->u.array;
+
+            for (i = 0; i < array->n; i++) {
+                error = ovsdb_jsonrpc_parse_monitor_cond_update_request(
+                                            m, table, array->elems[i]);
+                if (error) {
+                    goto error;
+                }
+            }
+        } else {
+            error = ovsdb_syntax_error(
+                       NULL, NULL,
+                       "table %s no monitor-cond-change JSON array",
+                       node->name);
+            goto error;
+        }
+    }
+
+    ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed);
+    m->all_rows = true;
+
+    /* Change monitor id */
+    hmap_remove(&s->monitors, &m->node);
+    json_destroy(m->monitor_id);
+    m->monitor_id = json_clone(params->u.array.elems[1]);
+    hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
+
+    return jsonrpc_create_reply(json_object_create(), request_id);
+
+error:
+
+    json = ovsdb_error_to_json(error);
+    ovsdb_error_destroy(error);
+    return jsonrpc_create_error(json, request_id);
+}
+
 static struct jsonrpc_msg *
 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
                              struct json_array *params,
@@ -1330,8 +1466,14 @@ static struct json *
 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
                                      bool initial)
 {
-    return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
-                                    m->condition, m->version);
+    struct json * json = ovsdb_monitor_get_update(m->dbmon, initial,
+                                                  m->all_rows,
+                                                  &m->unflushed,
+                                                  m->condition,
+                                                  m->version);
+
+    m->all_rows = false;
+    return json;
 }
 
 static bool
@@ -1340,7 +1482,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
     struct ovsdb_jsonrpc_monitor *m;
 
     HMAP_FOR_EACH (m, node, &s->monitors) {
-        if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+        if (m->all_rows || ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
             return true;
         }
     }
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 1614d67..d087a5a 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -120,6 +120,11 @@ struct ovsdb_monitor_changes {
                                     hmap.  */
 };
 
+enum ovsdb_monitor_changes_type {
+    OVSDB_MONITOR_CHANGES,
+    OVSDB_MONITOR_ALL
+};
+
 /* A particular table being monitored. */
 struct ovsdb_monitor_table {
     const struct ovsdb_table *table;
@@ -142,6 +147,9 @@ struct ovsdb_monitor_table {
 
     /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
     struct hmap changes;
+    /* Contains 'ovsdb_monitor_changes' of all rows in table at transaction
+       point in time. indexed by 'transaction'. */
+    struct hmap all;
 };
 
 typedef struct json *
@@ -153,12 +161,15 @@ typedef struct json *
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
 static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
-    struct ovsdb_monitor_table *mt, uint64_t next_txn);
+    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
+    uint64_t next_txn);
 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
-    struct ovsdb_monitor_table *mt, uint64_t unflushed);
+    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
+    uint64_t unflushed);
 static void ovsdb_monitor_changes_destroy(
                                   struct ovsdb_monitor_changes *changes);
 static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  enum ovsdb_monitor_changes_type type,
                                   uint64_t unflushed);
 
 static uint32_t
@@ -262,8 +273,8 @@ ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
  *
  * If 'row' is NULL, returns NULL. */
 static struct ovsdb_datum *
-clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
-                       const struct ovsdb_row *row)
+clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt,
+                             const struct ovsdb_row *row)
 {
     struct ovsdb_datum *data;
     size_t i;
@@ -284,6 +295,44 @@ clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
     return data;
 }
 
+/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
+ * copies of the data in 'fields' drawn from the columns represented by
+ * mt->columns[].  Returns the array.
+ *
+ * If 'row' is NULL, returns NULL. */
+static struct ovsdb_datum *
+clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
+                       const struct ovsdb_datum *fields)
+{
+    struct ovsdb_datum *data;
+    size_t i;
+
+    if (!fields) {
+        return NULL;
+    }
+
+    data = xmalloc(mt->n_columns * sizeof *data);
+    for (i = 0; i < mt->n_columns; i++) {
+        const struct ovsdb_column *c = mt->columns[i].column;
+        const struct ovsdb_datum *src = &fields[c->index];
+        struct ovsdb_datum *dst = &data[i];
+        const struct ovsdb_type *type = &c->type;
+
+        ovsdb_datum_clone(dst, src, type);
+    }
+    return data;
+}
+
+static void
+clone_monitor_row(const struct ovsdb_monitor_table *mt,
+                  struct ovsdb_monitor_row *to,
+                  const struct ovsdb_monitor_row *from)
+{
+    to->uuid = from->uuid;
+    to->old = clone_monitor_row_data(mt, from->old);
+    to->new = clone_monitor_row_data(mt,from->new);
+}
+
 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
  * in 'row' drawn from the columns represented by mt->columns[]. */
 static void
@@ -393,6 +442,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
     mt->dbmon = m;
     shash_add(&m->tables, table->schema->name, mt);
     hmap_init(&mt->changes);
+    hmap_init(&mt->all);
     mt->columns_index_map =
         xmalloc(sizeof(unsigned int) * shash_count(&table->schema->columns));
     for (i = 0; i < shash_count(&table->schema->columns); i++) {
@@ -472,6 +522,13 @@ ovsdb_monitor_add_all_condition_columns(
     }
 }
 
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+                           const struct ovsdb_table *table)
+{
+    return shash_find_data(&m->tables, table->schema->name);
+}
+
 /* Check for duplicated column names. Return the first
  * duplicated column's name if found. Otherwise return
  * NULL.  */
@@ -499,9 +556,12 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
 
 static struct ovsdb_monitor_changes *
 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+                                enum ovsdb_monitor_changes_type type,
                                 uint64_t next_txn)
 {
     struct ovsdb_monitor_changes *changes;
+    struct hmap *changes_hmap =
+        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
 
     changes = xzalloc(sizeof *changes);
 
@@ -509,19 +569,22 @@ ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
     changes->mt = mt;
     changes->n_refs = 1;
     hmap_init(&changes->rows);
-    hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
+    hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn));
 
     return changes;
 };
 
 static struct ovsdb_monitor_changes *
 ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
+                                 enum ovsdb_monitor_changes_type type,
                                  uint64_t transaction)
 {
     struct ovsdb_monitor_changes *changes;
+    struct hmap *changes_hmap =
+        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
     size_t hash = hash_uint64(transaction);
 
-    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
+    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) {
         if (changes->transaction == transaction) {
             return changes;
         }
@@ -533,13 +596,16 @@ ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
 /* Stop currently tracking changes to table 'mt' since 'transaction'. */
 static void
 ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+                                    enum ovsdb_monitor_changes_type type,
                                     uint64_t transaction)
 {
+    struct hmap *changes_hmap =
+        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
     struct ovsdb_monitor_changes *changes =
-                ovsdb_monitor_table_find_changes(mt, transaction);
+        ovsdb_monitor_table_find_changes(mt, type, transaction);
     if (changes) {
         if (--changes->n_refs == 0) {
-            hmap_remove(&mt->changes, &changes->hmap_node);
+            hmap_remove(changes_hmap, &changes->hmap_node);
             ovsdb_monitor_changes_destroy(changes);
         }
     }
@@ -549,15 +615,16 @@ ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
  */
 static void
 ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  enum ovsdb_monitor_changes_type type,
                                   uint64_t transaction)
 {
     struct ovsdb_monitor_changes *changes;
 
-    changes = ovsdb_monitor_table_find_changes(mt, transaction);
+    changes = ovsdb_monitor_table_find_changes(mt, type, transaction);
     if (changes) {
         changes->n_refs++;
     } else {
-        ovsdb_monitor_table_add_changes(mt, transaction);
+        ovsdb_monitor_table_add_changes(mt, type, transaction);
     }
 }
 
@@ -674,6 +741,46 @@ ovsdb_monitor_get_table_conditions(
     return true;
 }
 
+struct ovsdb_error *
+ovsdb_monitor_table_condition_update(
+                            struct ovsdb_monitor *dbmon,
+                            struct ovsdb_monitor_session_condition *condition,
+                            const struct ovsdb_table *table,
+                            const struct json *cond_json)
+{
+    struct ovsdb_monitor_table_condition *mtc =
+        shash_find_data(&condition->tables, table->schema->name);
+    struct ovsdb_error *error;
+    struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER;
+    bool empty = ovsdb_condition_empty(&mtc->new_condition);
+
+    if (!condition) {
+        return NULL;
+    }
+
+    error = ovsdb_condition_from_json(table->schema, cond_json,
+                                      NULL, &cond);
+    if (error) {
+        return error;
+    }
+
+    ovsdb_condition_destroy(&mtc->new_condition);
+    ovsdb_condition_clone(&mtc->new_condition, &cond);
+
+    if (empty && !ovsdb_condition_empty(&mtc->new_condition)) {
+        condition->n_empty_cnd--;
+    }
+    if (!empty && ovsdb_condition_empty(&mtc->new_condition)) {
+        condition->n_empty_cnd++;
+    }
+
+    ovsdb_monitor_condition_add_columns(dbmon,
+                                        table,
+                                        &mtc->new_condition);
+
+    return NULL;
+}
+
 static enum ovsdb_monitor_selection
 ovsdb_monitor_row_update_type_condition(
                       const struct ovsdb_monitor_table *mt,
@@ -827,7 +934,7 @@ ovsdb_monitor_compose_row_update(
  * for 'row' within * 'mt', or NULL if no row update should be sent.
  *
  * The caller should specify 'initial' as true if the returned JSON is
- * going to be used as part of the initial reply to a "monitor2" request,
+ * going to be used as part of the initial reply to a "monitor_cond" request,
  * false if it is going to be used as part of an "update2" notification.
  *
  * 'changed' must be a scratch buffer for internal use that is at least
@@ -916,7 +1023,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
 static struct json*
 ovsdb_monitor_compose_update(
                       struct ovsdb_monitor *dbmon,
-                      bool initial, uint64_t transaction,
+                      bool initial,  bool all_rows, uint64_t transaction,
                       const struct ovsdb_monitor_session_condition *condition,
                       compose_row_update_cb_func row_update)
 {
@@ -932,7 +1039,16 @@ ovsdb_monitor_compose_update(
         struct ovsdb_monitor_changes *changes;
         struct json *table_json = NULL;
 
-        changes = ovsdb_monitor_table_find_changes(mt, transaction);
+        if (!all_rows) {
+            changes = ovsdb_monitor_table_find_changes(mt,
+                                                       OVSDB_MONITOR_CHANGES,
+                                                       transaction);
+        } else {
+            /* Get changes that includes all rows from all_txn point in time */
+            changes = ovsdb_monitor_table_find_changes(mt,
+                                                       OVSDB_MONITOR_ALL,
+                                                       transaction);
+        }
         if (!changes) {
             continue;
         }
@@ -968,7 +1084,8 @@ ovsdb_monitor_compose_update(
 
 /* Returns JSON for a <table-updates> object (as described in RFC 7047)
  * for all the outstanding changes within 'monitor' that starts from
- * '*unflushed' transaction id.
+ * '*unflushed'.
+ * If all_rows is true all_rows in the db that match conditions will be sent.
  *
  * The caller should specify 'initial' as true if the returned JSON is going to
  * be used as part of the initial reply to a "monitor" request, false if it is
@@ -976,7 +1093,8 @@ ovsdb_monitor_compose_update(
 struct json *
 ovsdb_monitor_get_update(
              struct ovsdb_monitor *dbmon,
-             bool initial, uint64_t *unflushed,
+             bool initial, bool all_rows,
+             uint64_t *unflushed,
              const struct ovsdb_monitor_session_condition *condition,
              enum ovsdb_monitor_version version)
 {
@@ -988,7 +1106,7 @@ ovsdb_monitor_get_update(
 
     /* Return a clone of cached json if one exists. Otherwise,
      * generate a new one and add it to the cache.  */
-    if (!condition || (condition && ovsdb_can_cache(condition))) {
+    if (!condition || (!all_rows && condition && ovsdb_can_cache(condition))) {
         cache_node = ovsdb_monitor_json_cache_search(dbmon, version, prev_txn);
     }
     if (cache_node) {
@@ -996,17 +1114,19 @@ ovsdb_monitor_get_update(
     } else {
         if (version == OVSDB_MONITOR_V1) {
             json =
-               ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
-                                            condition,
+               ovsdb_monitor_compose_update(dbmon, initial, all_rows,
+                                            prev_txn, condition,
                                             ovsdb_monitor_compose_row_update);
         } else {
             ovs_assert(version == OVSDB_MONITOR_V2);
             json =
-               ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
-                                            condition,
+               ovsdb_monitor_compose_update(dbmon, initial, all_rows,
+                                            prev_txn, condition,
                                             ovsdb_monitor_compose_row_update2);
         }
-        if (!condition || (condition && ovsdb_can_cache(condition))) {
+
+        if (!condition ||
+            (!all_rows && condition && ovsdb_can_cache(condition))) {
             ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn, json);
         }
     }
@@ -1014,9 +1134,28 @@ ovsdb_monitor_get_update(
     /* Maintain transaction id of 'changes'. */
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
+        struct ovsdb_condition *old_condition, *new_condition;
 
-        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
-        ovsdb_monitor_table_track_changes(mt, next_txn);
+        if (!all_rows) {
+            ovsdb_monitor_table_untrack_changes(mt,
+                                                OVSDB_MONITOR_CHANGES,
+                                                prev_txn);
+        } else {
+            ovsdb_monitor_table_untrack_changes(mt,
+                                                OVSDB_MONITOR_ALL,
+                                                prev_txn);
+        }
+        ovsdb_monitor_table_track_changes(mt, OVSDB_MONITOR_CHANGES, next_txn);
+
+        if (ovsdb_monitor_get_table_conditions(mt,
+                                               condition,
+                                               &old_condition,
+                                               &new_condition)) {
+            if (ovsdb_condition_cmp(old_condition, new_condition)) {
+                ovsdb_condition_destroy(old_condition);
+                ovsdb_condition_clone(old_condition, new_condition);
+            }
+        }
     }
     *unflushed = next_txn;
 
@@ -1097,8 +1236,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
         change = xzalloc(sizeof *change);
         hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
         change->uuid = *uuid;
-        change->old = clone_monitor_row_data(mt, old);
-        change->new = clone_monitor_row_data(mt, new);
+        change->old = clone_monitor_ovsdb_row_data(mt, old);
+        change->new = clone_monitor_ovsdb_row_data(mt, new);
     } else {
         if (new) {
             update_monitor_row_data(mt, new, change->new);
@@ -1115,6 +1254,20 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
     }
 }
 
+static void
+ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row *row,
+                                       const struct ovsdb_monitor_table *mt,
+                                       struct ovsdb_monitor_changes *changes)
+{
+    struct ovsdb_monitor_row *change;
+
+    ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) == NULL);
+
+    change = xzalloc(sizeof *change);
+    hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(&row->uuid));
+    clone_monitor_row(mt, change, row);
+}
+
 static bool
 ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
                               const unsigned long int *changed)
@@ -1162,6 +1315,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     struct ovsdb_table *table = new ? new->table : old->table;
     struct ovsdb_monitor_table *mt;
     struct ovsdb_monitor_changes *changes;
+    enum ovsdb_monitor_changes_efficacy efficacy;
+    enum ovsdb_monitor_selection type;
 
     if (!aux->mt || table != aux->mt->table) {
         aux->mt = shash_find_data(&m->tables, table->schema->name);
@@ -1173,16 +1328,17 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     }
     mt = aux->mt;
 
-    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
-        enum ovsdb_monitor_changes_efficacy efficacy;
-        enum ovsdb_monitor_selection type;
+    type = ovsdb_monitor_row_update_type(false, old, new);
+    efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
 
-        type = ovsdb_monitor_row_update_type(false, old, new);
-        efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
-        if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+    if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+        /* insert row change to changes lists */
+        HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
+            ovsdb_monitor_changes_update(old, new, mt, changes);
+        }
+        HMAP_FOR_EACH(changes, hmap_node, &mt->all) {
             ovsdb_monitor_changes_update(old, new, mt, changes);
         }
-
         if (aux->efficacy < efficacy) {
             aux->efficacy = efficacy;
         }
@@ -1194,10 +1350,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
 void
 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
 {
-    struct ovsdb_monitor_aux aux;
     struct shash_node *node;
 
-    ovsdb_monitor_init_aux(&aux, dbmon);
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
 
@@ -1205,9 +1359,14 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
             struct ovsdb_row *row;
             struct ovsdb_monitor_changes *changes;
 
-            changes = ovsdb_monitor_table_find_changes(mt, 0);
+            changes = ovsdb_monitor_table_find_changes(mt,
+                                                       OVSDB_MONITOR_CHANGES,
+                                                       0);
             if (!changes) {
-                changes = ovsdb_monitor_table_add_changes(mt, 0);
+                changes =
+                    ovsdb_monitor_table_add_changes(mt,
+                                                    OVSDB_MONITOR_CHANGES,
+                                                    0);
                 HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
                     ovsdb_monitor_changes_update(NULL, row, mt, changes);
                 }
@@ -1218,6 +1377,53 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
     }
 }
 
+/* Record all rows in DB and mark this changes at unflushed tranaction id */
+void
+ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
+                           uint64_t unflushed)
+{
+    struct shash_node *node;
+
+    SHASH_FOR_EACH (node, &dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+        struct ovsdb_row *new;
+        struct ovsdb_monitor_changes *changes, *all_changes;
+
+        all_changes = ovsdb_monitor_table_find_changes(mt,
+                                                       OVSDB_MONITOR_ALL,
+                                                       unflushed);
+        if (!all_changes) {
+            all_changes = ovsdb_monitor_table_add_changes(mt,
+                                                          OVSDB_MONITOR_ALL,
+                                                          unflushed);
+            changes = ovsdb_monitor_table_find_changes(mt,
+                                                       OVSDB_MONITOR_CHANGES,
+                                                       unflushed);
+            HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) {
+                struct ovsdb_monitor_row *row = NULL;
+                if (changes) {
+                    /* Check if we have a change record for this row */
+                    row = ovsdb_monitor_changes_row_find(
+                                                   changes,
+                                                   ovsdb_row_get_uuid(new));
+                }
+                if (row) {
+                    ovsdb_monitor_changes_clone_insert_row(row, mt,
+                                                           all_changes);
+                } else {
+                    ovsdb_monitor_changes_update(new, new, mt, all_changes);
+                }
+            }
+        } else {
+            all_changes->n_refs++;
+        }
+
+        ovsdb_monitor_table_untrack_changes(mt,
+                                            OVSDB_MONITOR_CHANGES,
+                                            unflushed);
+    }
+}
+
 void
 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
                    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
@@ -1367,7 +1573,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
             hmap_remove(&mt->changes, &changes->hmap_node);
             ovsdb_monitor_changes_destroy(changes);
         }
+        HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) {
+            hmap_remove(&mt->changes, &changes->hmap_node);
+            ovsdb_monitor_changes_destroy(changes);
+        }
         hmap_destroy(&mt->changes);
+        hmap_destroy(&mt->all);
         free(mt->columns);
         free(mt->columns_index_map);
         free(mt);
diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
index 0529e5a..935c65f 100644
--- a/ovsdb/monitor.h
+++ b/ovsdb/monitor.h
@@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
 void ovsdb_monitor_add_table(struct ovsdb_monitor *m,
                              const struct ovsdb_table *table);
 
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+                           const struct ovsdb_table *table);
+
 void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
                               const struct ovsdb_table *table,
                               const struct ovsdb_column *column,
@@ -70,12 +74,12 @@ const char * OVS_WARN_UNUSED_RESULT
 ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
                           const struct ovsdb_table *);
 
-struct json *ovsdb_monitor_get_update(
-               struct ovsdb_monitor *dbmon,
-               bool initial,
-               uint64_t *unflushed_transaction,
-               const struct ovsdb_monitor_session_condition *condition,
-               enum ovsdb_monitor_version version);
+struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
+                                      bool initial,
+                                      bool all_rows,
+                                      uint64_t *unflushed_transaction,
+                                      const struct ovsdb_monitor_session_condition *condition,
+                                      enum ovsdb_monitor_version version);
 
 void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
                                     const struct ovsdb_table *table,
@@ -105,8 +109,18 @@ ovsdb_monitor_table_condition_add(
                           const struct ovsdb_table *table,
                           const struct json *json_cnd);
 
+void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
+                                uint64_t unflushed);
+
 void ovsdb_monitor_session_condition_bind(
                            const struct ovsdb_monitor_session_condition *,
                            const struct ovsdb_monitor *);
 
+struct ovsdb_error *
+ovsdb_monitor_table_condition_update(
+                           struct ovsdb_monitor *dbmon,
+                           struct ovsdb_monitor_session_condition *condition,
+                           const struct ovsdb_table *table,
+                           const struct json *cond_json);
+
 #endif
-- 
2.1.4





More information about the dev mailing list