[ovs-dev] [PATCH monitor_cond 07/12] ovsdb: enable jsonrpc-server to service "monitor_cond_change" request
Liran Schour
lirans at il.ibm.com
Tue Jan 5 13:14:00 UTC 2016
ovsdb-server now accepts "monitor_cond_change" request. On condition change
we record all rows of table in a new changes list - OVSDB_MONITOR_ALL that are
being indexed by the transaction-id at the moment of record.
JSON cache is being used only for empty condition monitor sessions.
Sees ovsdb-server (1) man page for details of monitor_cond_change.
Signed-off-by: Liran Schour <lirans at il.ibm.com>
---
ovsdb/jsonrpc-server.c | 132 +++++++++++++++++++++++++-
ovsdb/monitor.c | 252 ++++++++++++++++++++++++++++++++++++++++++-------
ovsdb/monitor.h | 19 ++--
3 files changed, 359 insertions(+), 44 deletions(-)
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 3439f40..ecaf379 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -87,6 +87,11 @@ static void ovsdb_jsonrpc_trigger_complete_done(
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
enum ovsdb_monitor_version, bool, const struct json *request_id);
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(
+ struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id);
static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
struct ovsdb_jsonrpc_session *,
struct json_array *params,
@@ -863,6 +868,9 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
"monitor_cond"),
request->id);
}
+ } else if (!strcmp(request->method, "monitor_cond_change")) {
+ reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
+ request->id);
} else if (!strcmp(request->method, "monitor_cancel")) {
reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
request->id);
@@ -1052,6 +1060,9 @@ struct ovsdb_jsonrpc_monitor {
struct ovsdb_monitor *dbmon;
uint64_t unflushed; /* The first transaction that has not been
flushed to the jsonrpc remote client. */
+ uint64_t unflushed_all; /* The first transaction that has not been
+ flushed to the jsonrpc remote client
+ while all rows was recorded. */
enum ovsdb_monitor_version version;
struct ovsdb_monitor_session_condition *condition;/* Session's condition */
};
@@ -1219,6 +1230,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
m->condition = ovsdb_monitor_session_condition_create();
}
m->unflushed = 0;
+ m->unflushed_all = 0;
m->version = version;
hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
m->monitor_id = json_clone(monitor_id);
@@ -1301,6 +1313,115 @@ error:
return jsonrpc_create_error(json, request_id);
}
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ struct ovsdb_jsonrpc_monitor *m,
+ const struct ovsdb_table *table,
+ const struct json *cond_change_req)
+{
+ const struct ovsdb_table_schema *ts = table->schema;
+ const struct json *added = NULL;
+ const struct json *removed = NULL;
+ struct ovsdb_parser parser;
+ struct ovsdb_error *error;
+
+ ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
+ added = ovsdb_parser_member(&parser, "added", OP_ARRAY | OP_OPTIONAL);
+ removed = ovsdb_parser_member(&parser, "removed", OP_ARRAY | OP_OPTIONAL);
+
+ error = ovsdb_parser_finish(&parser);
+ if (error) {
+ return error;
+ }
+
+ error = ovsdb_monitor_table_condition_change(m->condition, table,
+ added, removed);
+
+ return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
+ struct json *params,
+ const struct json *request_id)
+{
+ struct ovsdb_error *error;
+ struct ovsdb_jsonrpc_monitor *m;
+ struct json *monitor_cond_change_reqs;
+ struct shash_node *node;
+ struct json *json;
+
+ if (json_array(params)->n != 2) {
+ error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+ goto error;
+ }
+
+ m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+ if (!m) {
+ error = ovsdb_syntax_error(request_id, NULL,
+ "unknown monitor session");
+ goto error;
+ }
+
+ monitor_cond_change_reqs = params->u.array.elems[1];
+ if (monitor_cond_change_reqs->type != JSON_OBJECT) {
+ error =
+ ovsdb_syntax_error(NULL, NULL,
+ "monitor-cond-change-requests must be object");
+ goto error;
+ }
+
+ SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
+ const struct ovsdb_table *table;
+ const struct json *mr_value;
+ size_t i;
+
+ table = ovsdb_get_table(m->db, node->name);
+ if (!table) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s", node->name);
+ goto error;
+ }
+ if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+ error = ovsdb_syntax_error(NULL, NULL,
+ "no table named %s in monitor session",
+ node->name);
+ goto error;
+ }
+
+ mr_value = node->data;
+ if (mr_value->type == JSON_ARRAY) {
+ const struct json_array *array = &mr_value->u.array;
+
+ for (i = 0; i < array->n; i++) {
+ error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
+ m, table, array->elems[i]);
+ if (error) {
+ goto error;
+ }
+ }
+ } else {
+ error = ovsdb_syntax_error(
+ NULL, NULL,
+ "table %s no monitor-cond-change JSON array",
+ node->name);
+ goto error;
+ }
+ }
+
+ ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed);
+ m->unflushed_all = m->unflushed;
+ m->unflushed = 0;
+
+ return jsonrpc_create_reply(json_object_create(), request_id);
+
+ error:
+
+ json = ovsdb_error_to_json(error);
+ ovsdb_error_destroy(error);
+ return jsonrpc_create_error(json, request_id);
+}
+
static struct jsonrpc_msg *
ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
struct json_array *params,
@@ -1337,8 +1458,15 @@ static struct json *
ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
bool initial)
{
- return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
- m->condition, m->version);
+ struct json * json = ovsdb_monitor_get_update(m->dbmon, initial,
+ &m->unflushed,
+ m->unflushed_all,
+ m->condition,
+ m->version);
+
+ m->unflushed_all = 0;
+
+ return json;
}
static bool
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index cf87b98..00ebdfc 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -121,6 +121,11 @@ struct ovsdb_monitor_changes {
hmap. */
};
+enum ovsdb_monitor_changes_type {
+ OVSDB_MONITOR_CHANGES,
+ OVSDB_MONITOR_ALL
+};
+
/* A particular table being monitored. */
struct ovsdb_monitor_table {
const struct ovsdb_table *table;
@@ -143,6 +148,11 @@ struct ovsdb_monitor_table {
/* Temp pointers to conditions for composing update */
struct ovsdb_condition *old_condition;
struct ovsdb_condition *new_condition;
+
+ /* Contains 'ovsdb_monitor_changes' of all rows on table at transaction
+ point in time. indexed by 'transaction'. */
+ struct hmap all;
+
};
typedef struct json *
@@ -152,12 +162,15 @@ typedef struct json *
static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
- struct ovsdb_monitor_table *mt, uint64_t next_txn);
+ struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
+ uint64_t next_txn);
static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
- struct ovsdb_monitor_table *mt, uint64_t unflushed);
+ struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
+ uint64_t unflushed);
static void ovsdb_monitor_changes_destroy(
struct ovsdb_monitor_changes *changes);
static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ enum ovsdb_monitor_changes_type type,
uint64_t unflushed);
static uint32_t
@@ -256,8 +269,8 @@ ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
*
* If 'row' is NULL, returns NULL. */
static struct ovsdb_datum *
-clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
- const struct ovsdb_row *row)
+clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt,
+ const struct ovsdb_row *row)
{
struct ovsdb_datum *data;
size_t i;
@@ -278,6 +291,44 @@ clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
return data;
}
+/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes them as
+ * copies of the data in 'fields' drawn from the columns represented by
+ * mt->columns[]. Returns the array.
+ *
+ * If 'row' is NULL, returns NULL. */
+static struct ovsdb_datum *
+clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
+ const struct ovsdb_datum *fields)
+{
+ struct ovsdb_datum *data;
+ size_t i;
+
+ if (!fields) {
+ return NULL;
+ }
+
+ data = xmalloc(mt->n_columns * sizeof *data);
+ for (i = 0; i < mt->n_columns; i++) {
+ const struct ovsdb_column *c = mt->columns[i].column;
+ const struct ovsdb_datum *src = &fields[c->index];
+ struct ovsdb_datum *dst = &data[i];
+ const struct ovsdb_type *type = &c->type;
+
+ ovsdb_datum_clone(dst, src, type);
+ }
+ return data;
+}
+
+static void
+clone_monitor_row(const struct ovsdb_monitor_table *mt,
+ struct ovsdb_monitor_row *to,
+ const struct ovsdb_monitor_row *from)
+{
+ to->uuid = from->uuid;
+ to->old = clone_monitor_row_data(mt, from->old);
+ to->new = clone_monitor_row_data(mt,from->new);
+}
+
/* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
* in 'row' drawn from the columns represented by mt->columns[]. */
static void
@@ -373,6 +424,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
mt->dbmon = m;
shash_add(&m->tables, table->schema->name, mt);
hmap_init(&mt->changes);
+ hmap_init(&mt->all);
mt->columns_index_map =
xmalloc(sizeof(unsigned int) * shash_count(&table->schema->columns));
}
@@ -400,6 +452,13 @@ ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
c->select = select;
}
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+ const struct ovsdb_table *table)
+{
+ return shash_find_data(&m->tables, table->schema->name);
+}
+
/* Check for duplicated column names. Return the first
* duplicated column's name if found. Otherwise return
* NULL. */
@@ -428,9 +487,12 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
static struct ovsdb_monitor_changes *
ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+ enum ovsdb_monitor_changes_type type,
uint64_t next_txn)
{
struct ovsdb_monitor_changes *changes;
+ struct hmap *changes_hmap =
+ type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
changes = xzalloc(sizeof *changes);
@@ -438,19 +500,22 @@ ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
changes->mt = mt;
changes->n_refs = 1;
hmap_init(&changes->rows);
- hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
+ hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn));
return changes;
};
static struct ovsdb_monitor_changes *
ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
+ enum ovsdb_monitor_changes_type type,
uint64_t transaction)
{
struct ovsdb_monitor_changes *changes;
+ struct hmap *changes_hmap =
+ type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
size_t hash = hash_uint64(transaction);
- HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
+ HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) {
if (changes->transaction == transaction) {
return changes;
}
@@ -462,13 +527,16 @@ ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
/* Stop currently tracking changes to table 'mt' since 'transaction'. */
static void
ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+ enum ovsdb_monitor_changes_type type,
uint64_t transaction)
{
+ struct hmap *changes_hmap =
+ type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
struct ovsdb_monitor_changes *changes =
- ovsdb_monitor_table_find_changes(mt, transaction);
+ ovsdb_monitor_table_find_changes(mt, type, transaction);
if (changes) {
if (--changes->n_refs == 0) {
- hmap_remove(&mt->changes, &changes->hmap_node);
+ hmap_remove(changes_hmap, &changes->hmap_node);
ovsdb_monitor_changes_destroy(changes);
}
}
@@ -478,15 +546,41 @@ ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
*/
static void
ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+ enum ovsdb_monitor_changes_type type,
uint64_t transaction)
{
struct ovsdb_monitor_changes *changes;
- changes = ovsdb_monitor_table_find_changes(mt, transaction);
+ changes = ovsdb_monitor_table_find_changes(mt, type, transaction);
if (changes) {
changes->n_refs++;
} else {
- ovsdb_monitor_table_add_changes(mt, transaction);
+ ovsdb_monitor_table_add_changes(mt, type, transaction);
+ }
+}
+
+static void
+ovsdb_monitor_copy_rows(const struct ovsdb_monitor_table *mt,
+ struct ovsdb_monitor_changes *to,
+ const struct ovsdb_monitor_changes *from)
+{
+ struct ovsdb_monitor_row *row, *change;
+
+ HMAP_FOR_EACH(row, hmap_node, &from->rows) {
+ change = ovsdb_monitor_changes_row_find(to, &row->uuid);
+ if (!change) {
+ change = xzalloc(sizeof *change);
+ hmap_insert(&to->rows, &change->hmap_node, uuid_hash(&row->uuid));
+ clone_monitor_row(mt, change, row);
+ } else {
+ free_monitor_row_data(mt, change->new);
+ change->new = clone_monitor_row_data(mt,row->new);
+ }
+ if (!change->old && !change->new) {
+ /* This row was added then deleted. Forget about it. */
+ hmap_remove(&to->rows, &change->hmap_node);
+ free(change);
+ }
}
}
@@ -673,7 +767,7 @@ ovsdb_monitor_row_update_type_condition(const struct ovsdb_monitor_table *mt,
ovsdb_monitor_row_update_type(initial, old, new);
if (mt->dbmon->condition &&
- (!ovsdb_condition_empty(mt->old_condition) &&
+ (!ovsdb_condition_empty(mt->old_condition) ||
!ovsdb_condition_empty(mt->new_condition))) {
bool old_cond = !old ? false
: ovsdb_condition_evaluate_or_datum(old,
@@ -896,6 +990,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
static struct json*
ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
bool initial, uint64_t transaction,
+ uint64_t all_txn,
compose_row_update_cb_func row_update)
{
struct shash_node *node;
@@ -910,7 +1005,16 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
struct ovsdb_monitor_changes *changes;
struct json *table_json = NULL;
- changes = ovsdb_monitor_table_find_changes(mt, transaction);
+ if (!all_txn) {
+ changes = ovsdb_monitor_table_find_changes(mt,
+ OVSDB_MONITOR_CHANGES,
+ transaction);
+ } else {
+ /* Get changes that includes all rows from all_txn point in time */
+ changes = ovsdb_monitor_table_find_changes(mt,
+ OVSDB_MONITOR_ALL,
+ all_txn);
+ }
if (!changes) {
continue;
}
@@ -946,7 +1050,9 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
/* Returns JSON for a <table-updates> object (as described in RFC 7047)
* for all the outstanding changes within 'monitor' that starts from
- * '*unflushed' transaction id.
+ * '*unflushed' or all_unflushed transaction id.
+ * If all_unflushed is not zero it means that we return all rows in db
+ * since all_unflushed transaction id.
*
* The caller should specify 'initial' as true if the returned JSON is going to
* be used as part of the initial reply to a "monitor" request, false if it is
@@ -955,6 +1061,7 @@ struct json *
ovsdb_monitor_get_update(
struct ovsdb_monitor *dbmon,
bool initial, uint64_t *unflushed,
+ uint64_t all_unflushed,
const struct ovsdb_monitor_session_condition *condition,
enum ovsdb_monitor_version version)
{
@@ -963,11 +1070,18 @@ ovsdb_monitor_get_update(
struct json *json;
uint64_t prev_txn = *unflushed;
uint64_t next_txn = dbmon->n_transactions + 1;
+ /* We use the cache only for empty condition sessions */
+ bool can_cache = condition && condition->can_cache && !all_unflushed ?
+ true : false;
ovsdb_monitor_set_condition(dbmon, condition);
+ if (all_unflushed) {
+ ovs_assert(*unflushed == 0);
+ }
+
/* Return a clone of cached json if one exists. Otherwise,
* generate a new one and add it to the cache. */
- if (condition && condition->can_cache) {
+ if (can_cache) {
cache_node = ovsdb_monitor_json_cache_search(dbmon, version, prev_txn);
}
if (cache_node) {
@@ -975,13 +1089,15 @@ ovsdb_monitor_get_update(
} else {
if (version == OVSDB_MONITOR_V1) {
json = ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
+ all_unflushed,
ovsdb_monitor_compose_row_update);
} else {
ovs_assert(version == OVSDB_MONITOR_V2);
json = ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
+ all_unflushed,
ovsdb_monitor_compose_row_update2);
}
- if (condition && condition->can_cache) {
+ if (can_cache) {
ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn, json);
}
}
@@ -990,8 +1106,16 @@ ovsdb_monitor_get_update(
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
- ovsdb_monitor_table_untrack_changes(mt, prev_txn);
- ovsdb_monitor_table_track_changes(mt, next_txn);
+ if (!all_unflushed) {
+ ovsdb_monitor_table_untrack_changes(mt,
+ OVSDB_MONITOR_CHANGES,
+ prev_txn);
+ } else {
+ ovsdb_monitor_table_untrack_changes(mt,
+ OVSDB_MONITOR_ALL,
+ all_unflushed);
+ }
+ ovsdb_monitor_table_track_changes(mt, OVSDB_MONITOR_CHANGES, next_txn);
if (dbmon->condition && ovsdb_condition_cmp(mt->old_condition,
mt->new_condition)) {
@@ -1079,8 +1203,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row *old,
change = xzalloc(sizeof *change);
hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
change->uuid = *uuid;
- change->old = clone_monitor_row_data(mt, old);
- change->new = clone_monitor_row_data(mt, new);
+ change->old = clone_monitor_ovsdb_row_data(mt, old);
+ change->new = clone_monitor_ovsdb_row_data(mt, new);
} else {
if (new) {
update_monitor_row_data(mt, new, change->new);
@@ -1138,6 +1262,29 @@ ovsdb_monitor_changes_classify(enum ovsdb_monitor_selection type,
: OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
}
+static void
+ovsdb_monitor_record_change(struct ovsdb_monitor_table *mt,
+ const struct ovsdb_row *old,
+ const struct ovsdb_row *new,
+ const unsigned long int *changed,
+ struct ovsdb_monitor_changes *changes,
+ struct ovsdb_monitor_aux *aux)
+{
+ enum ovsdb_monitor_changes_efficacy efficacy;
+ enum ovsdb_monitor_selection type;
+
+ type = ovsdb_monitor_row_update_type(false, old, new);
+
+ efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
+ if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
+ ovsdb_monitor_changes_update(old, new, mt, changes);
+ }
+
+ if (aux->efficacy < efficacy) {
+ aux->efficacy = efficacy;
+ }
+}
+
static bool
ovsdb_monitor_change_cb(const struct ovsdb_row *old,
const struct ovsdb_row *new,
@@ -1161,19 +1308,10 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
mt = aux->mt;
HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
- enum ovsdb_monitor_changes_efficacy efficacy;
- enum ovsdb_monitor_selection type;
-
- type = ovsdb_monitor_row_update_type(false, old, new);
-
- efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
- if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
- ovsdb_monitor_changes_update(old, new, mt, changes);
- }
-
- if (aux->efficacy < efficacy) {
- aux->efficacy = efficacy;
- }
+ ovsdb_monitor_record_change(mt, old, new, changed, changes, aux);
+ }
+ HMAP_FOR_EACH(changes, hmap_node, &mt->all) {
+ ovsdb_monitor_record_change(mt, old, new, changed, changes, aux);
}
return true;
@@ -1182,10 +1320,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
void
ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
{
- struct ovsdb_monitor_aux aux;
struct shash_node *node;
- ovsdb_monitor_init_aux(&aux, dbmon);
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
@@ -1193,9 +1329,14 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
struct ovsdb_row *row;
struct ovsdb_monitor_changes *changes;
- changes = ovsdb_monitor_table_find_changes(mt, 0);
+ changes = ovsdb_monitor_table_find_changes(mt,
+ OVSDB_MONITOR_CHANGES,
+ 0);
if (!changes) {
- changes = ovsdb_monitor_table_add_changes(mt, 0);
+ changes =
+ ovsdb_monitor_table_add_changes(mt,
+ OVSDB_MONITOR_CHANGES,
+ 0);
HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
ovsdb_monitor_changes_update(NULL, row, mt, changes);
}
@@ -1206,6 +1347,40 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
}
}
+/* Record all rows in DB and mark this changes at unflushed tranaction id */
+void
+ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
+ uint64_t unflushed)
+{
+ struct shash_node *node;
+
+ SHASH_FOR_EACH (node, &dbmon->tables) {
+ struct ovsdb_monitor_table *mt = node->data;
+ struct ovsdb_row *row;
+ struct ovsdb_monitor_changes *changes, *all_changes;
+
+ all_changes = ovsdb_monitor_table_find_changes(mt,
+ OVSDB_MONITOR_ALL,
+ unflushed);
+ if (!all_changes) {
+ all_changes = ovsdb_monitor_table_add_changes(mt,
+ OVSDB_MONITOR_ALL,
+ unflushed);
+ HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+ ovsdb_monitor_changes_update(row, row, mt, all_changes);
+ }
+ } else {
+ changes = ovsdb_monitor_table_find_changes(mt,
+ OVSDB_MONITOR_CHANGES,
+ unflushed);
+ if (changes) {
+ ovsdb_monitor_copy_rows(mt, all_changes, changes);
+ }
+ all_changes->n_refs++;
+ }
+ }
+}
+
void
ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
@@ -1364,7 +1539,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
hmap_remove(&mt->changes, &changes->hmap_node);
ovsdb_monitor_changes_destroy(changes);
}
+ HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) {
+ hmap_remove(&mt->changes, &changes->hmap_node);
+ ovsdb_monitor_changes_destroy(changes);
+ }
hmap_destroy(&mt->changes);
+ hmap_destroy(&mt->all);
free(mt->columns);
free(mt->columns_index_map);
free(mt);
diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
index 0f6c360..4b54c6c 100644
--- a/ovsdb/monitor.h
+++ b/ovsdb/monitor.h
@@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
void ovsdb_monitor_add_table(struct ovsdb_monitor *m,
const struct ovsdb_table *table);
+bool
+ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
+ const struct ovsdb_table *table);
+
void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
const struct ovsdb_table *table,
const struct ovsdb_column *column,
@@ -67,12 +71,12 @@ const char * OVS_WARN_UNUSED_RESULT
ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
const struct ovsdb_table *);
-struct json *ovsdb_monitor_get_update(
- struct ovsdb_monitor *dbmon,
- bool initial,
- uint64_t *unflushed_transaction,
- const struct ovsdb_monitor_session_condition *condition,
- enum ovsdb_monitor_version version);
+struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
+ bool initial,
+ uint64_t *unflushed_transaction,
+ uint64_t all_txn,
+ const struct ovsdb_monitor_session_condition *condition,
+ enum ovsdb_monitor_version version);
void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
const struct ovsdb_table *table,
@@ -102,6 +106,9 @@ ovsdb_monitor_table_condition_add(
const struct ovsdb_table *table,
const struct json *json_cnd);
+void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
+ uint64_t unflushed);
+
void ovsdb_monitor_session_condition_bind(
const struct ovsdb_monitor_session_condition *,
const struct ovsdb_monitor *);
--
2.1.4
More information about the dev
mailing list