[ovs-dev] [PATCH monitor_cond V2 07/12] ovsdb: enable jsonrpc-server to service "monitor_cond_update" request
Andy Zhou
azhou at ovn.org
Fri Jan 22 02:43:26 UTC 2016
On Sat, Jan 16, 2016 at 12:16 AM, Liran Schour <lirans at il.ibm.com> wrote:
> ovsdb-server now accepts "monitor_cond_update" request. On conditions
> update
> we record all rows of table in a new changes list - OVSDB_MONITOR_ALL that
> are
> being indexed by the transaction-id at the moment of record.
> JSON cache is being used only for empty condition monitor sessions.
> Sees ovsdb-server (1) man page for details of monitor_cond_update.
>
> Signed-off-by: Liran Schour <lirans at il.ibm.com>
> ---
> ovsdb/jsonrpc-server.c | 140 ++++++++++++++++++++++++++-
> ovsdb/monitor.c | 258
> +++++++++++++++++++++++++++++++++++++++++--------
> ovsdb/monitor.h | 19 ++--
> 3 files changed, 369 insertions(+), 48 deletions(-)
>
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index 3439f40..3d898bb 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -87,6 +87,11 @@ static void ovsdb_jsonrpc_trigger_complete_done(
> static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
> struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
> enum ovsdb_monitor_version, bool, const struct json *request_id);
> +static struct jsonrpc_msg *
>
This line break looks different from the surrounding code.
> +ovsdb_jsonrpc_monitor_cond_update(
> + struct ovsdb_jsonrpc_session *s,
> + struct json *params,
> + const struct json *request_id);
> static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
> struct ovsdb_jsonrpc_session *,
> struct json_array *params,
> @@ -863,6 +868,9 @@ ovsdb_jsonrpc_session_got_request(struct
> ovsdb_jsonrpc_session *s,
> "monitor_cond"),
> request->id);
> }
> + } else if (!strcmp(request->method, "monitor_cond_update")) {
> + reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params,
> + request->id);
> } else if (!strcmp(request->method, "monitor_cancel")) {
> reply = ovsdb_jsonrpc_monitor_cancel(s,
> json_array(request->params),
> request->id);
> @@ -1052,6 +1060,8 @@ struct ovsdb_jsonrpc_monitor {
> struct ovsdb_monitor *dbmon;
> uint64_t unflushed; /* The first transaction that has not been
> flushed to the jsonrpc remote
> client. */
> + bool all_rows; /* Indicates if in the next flush we
> request all
> + rows (due to a condition change)
> */
>
I don't think I understand the logic of 'all_rows', Should flush all
changes using the old conditions, then reply
to the cond_update message, then start to send updates according to the new
conditions?
> enum ovsdb_monitor_version version;
> struct ovsdb_monitor_session_condition *condition;/* Session's
> condition */
> };
> @@ -1219,6 +1229,7 @@ ovsdb_jsonrpc_monitor_create(struct
> ovsdb_jsonrpc_session *s, struct ovsdb *db,
> m->condition = ovsdb_monitor_session_condition_create();
> }
> m->unflushed = 0;
> + m->all_rows = false;
> m->version = version;
> hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
> m->monitor_id = json_clone(monitor_id);
> @@ -1301,6 +1312,125 @@ error:
> return jsonrpc_create_error(json, request_id);
> }
>
> +static struct ovsdb_error *
> +ovsdb_jsonrpc_parse_monitor_cond_update_request(
> + struct ovsdb_jsonrpc_monitor *m,
> + const struct ovsdb_table *table,
> + const struct json *cond_update_req)
> +{
> + const struct ovsdb_table_schema *ts = table->schema;
> + const struct json *added, *removed, *columns;
> + struct ovsdb_parser parser;
> + struct ovsdb_error *error;
> +
> + ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name);
> + columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY |
> OP_OPTIONAL);
> + added = ovsdb_parser_member(&parser, "added", OP_ARRAY | OP_OPTIONAL);
> + removed = ovsdb_parser_member(&parser, "removed", OP_ARRAY |
> OP_OPTIONAL);
> +
> + error = ovsdb_parser_finish(&parser);
> + if (error) {
> + return error;
> + }
> +
> + if (columns) {
> + error = ovsdb_syntax_error(cond_update_req, NULL, "changing
> columns "
> + "is unsupported");
> + return error;
> + }
> + error = ovsdb_monitor_table_condition_change(m->condition, table,
> + added, removed);
> +
> + return error;
> +}
> +
> +static struct jsonrpc_msg *
> +ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s,
> + struct json *params,
> + const struct json *request_id)
> +{
> + struct ovsdb_error *error;
> + struct ovsdb_jsonrpc_monitor *m;
> + struct json *monitor_cond_update_reqs;
> + struct shash_node *node;
> + struct json *json;
> +
> + if (json_array(params)->n != 3) {
> + error = ovsdb_syntax_error(params, NULL, "invalid parameters");
> + goto error;
> + }
> +
> + m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
> + if (!m) {
> + error = ovsdb_syntax_error(request_id, NULL,
> + "unknown monitor session");
> + goto error;
> + }
> +
> + monitor_cond_update_reqs = params->u.array.elems[2];
> + if (monitor_cond_update_reqs->type != JSON_OBJECT) {
> + error =
> + ovsdb_syntax_error(NULL, NULL,
> + "monitor-cond-change-requests must be
> object");
> + goto error;
> + }
> +
> + SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) {
> + const struct ovsdb_table *table;
> + const struct json *mr_value;
> + size_t i;
> +
> + table = ovsdb_get_table(m->db, node->name);
> + if (!table) {
> + error = ovsdb_syntax_error(NULL, NULL,
> + "no table named %s", node->name);
> + goto error;
> + }
> + if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
> + error = ovsdb_syntax_error(NULL, NULL,
> + "no table named %s in monitor
> session",
> + node->name);
> + goto error;
> + }
> +
> + mr_value = node->data;
> + if (mr_value->type == JSON_ARRAY) {
> + const struct json_array *array = &mr_value->u.array;
> +
> + for (i = 0; i < array->n; i++) {
> + error = ovsdb_jsonrpc_parse_monitor_cond_update_request(
> + m, table, array->elems[i]);
> + if (error) {
> + goto error;
> + }
> + }
> + } else {
> + error = ovsdb_syntax_error(
> + NULL, NULL,
> + "table %s no monitor-cond-change JSON array",
> + node->name);
> + goto error;
> + }
> + }
> +
> + ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed);
> + m->all_rows = true;
> +
> + /* Change monitor id */
> + hmap_remove(&s->monitors, &m->node);
> + json_destroy(m->monitor_id);
> + m->monitor_id = json_clone(params->u.array.elems[1]);
> + hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
> +
> + return jsonrpc_create_reply(json_object_create(), request_id);
> +
> + error:
>
The indenting of "error:" label looks odd.
> +
> + json = ovsdb_error_to_json(error);
> + ovsdb_error_destroy(error);
> + return jsonrpc_create_error(json, request_id);
> +}
> +
> static struct jsonrpc_msg *
> ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
> struct json_array *params,
> @@ -1337,8 +1467,14 @@ static struct json *
> ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
> bool initial)
> {
> - return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
> - m->condition, m->version);
> + struct json * json = ovsdb_monitor_get_update(m->dbmon, initial,
> + m->all_rows,
> + &m->unflushed,
> + m->condition,
> + m->version);
> +
> + m->all_rows = false;
> + return json;
> }
>
> static bool
> diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
> index cf87b98..bddf20e 100644
> --- a/ovsdb/monitor.c
> +++ b/ovsdb/monitor.c
> @@ -121,6 +121,11 @@ struct ovsdb_monitor_changes {
> hmap. */
> };
>
> +enum ovsdb_monitor_changes_type {
> + OVSDB_MONITOR_CHANGES,
> + OVSDB_MONITOR_ALL
> +};
> +
> /* A particular table being monitored. */
> struct ovsdb_monitor_table {
> const struct ovsdb_table *table;
> @@ -143,6 +148,11 @@ struct ovsdb_monitor_table {
> /* Temp pointers to conditions for composing update */
> struct ovsdb_condition *old_condition;
> struct ovsdb_condition *new_condition;
> +
> + /* Contains 'ovsdb_monitor_changes' of all rows on table at
> transaction
> + point in time. indexed by 'transaction'. */
> + struct hmap all;
> +
> };
>
> typedef struct json *
> @@ -152,12 +162,15 @@ typedef struct json *
>
> static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
> static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
> - struct ovsdb_monitor_table *mt, uint64_t next_txn);
> + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
> + uint64_t next_txn);
> static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
> - struct ovsdb_monitor_table *mt, uint64_t unflushed);
> + struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
> + uint64_t unflushed);
> static void ovsdb_monitor_changes_destroy(
> struct ovsdb_monitor_changes *changes);
> static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table
> *mt,
> + enum ovsdb_monitor_changes_type type,
> uint64_t unflushed);
>
> static uint32_t
> @@ -256,8 +269,8 @@ ovsdb_monitor_changes_row_find(const struct
> ovsdb_monitor_changes *changes,
> *
> * If 'row' is NULL, returns NULL. */
> static struct ovsdb_datum *
> -clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> - const struct ovsdb_row *row)
> +clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt,
> + const struct ovsdb_row *row)
> {
> struct ovsdb_datum *data;
> size_t i;
> @@ -278,6 +291,44 @@ clone_monitor_row_data(const struct
> ovsdb_monitor_table *mt,
> return data;
> }
>
> +/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes
> them as
> + * copies of the data in 'fields' drawn from the columns represented by
> + * mt->columns[]. Returns the array.
> + *
> + * If 'row' is NULL, returns NULL. */
> +static struct ovsdb_datum *
> +clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> + const struct ovsdb_datum *fields)
> +{
> + struct ovsdb_datum *data;
> + size_t i;
> +
> + if (!fields) {
> + return NULL;
> + }
> +
> + data = xmalloc(mt->n_columns * sizeof *data);
> + for (i = 0; i < mt->n_columns; i++) {
> + const struct ovsdb_column *c = mt->columns[i].column;
> + const struct ovsdb_datum *src = &fields[c->index];
> + struct ovsdb_datum *dst = &data[i];
> + const struct ovsdb_type *type = &c->type;
> +
> + ovsdb_datum_clone(dst, src, type);
> + }
> + return data;
> +}
> +
> +static void
> +clone_monitor_row(const struct ovsdb_monitor_table *mt,
> + struct ovsdb_monitor_row *to,
> + const struct ovsdb_monitor_row *from)
> +{
> + to->uuid = from->uuid;
> + to->old = clone_monitor_row_data(mt, from->old);
> + to->new = clone_monitor_row_data(mt,from->new);
> +}
> +
> /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data
> from
> * in 'row' drawn from the columns represented by mt->columns[]. */
> static void
> @@ -373,6 +424,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
> mt->dbmon = m;
> shash_add(&m->tables, table->schema->name, mt);
> hmap_init(&mt->changes);
> + hmap_init(&mt->all);
> mt->columns_index_map =
> xmalloc(sizeof(unsigned int) *
> shash_count(&table->schema->columns));
> }
> @@ -400,6 +452,13 @@ ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
> c->select = select;
> }
>
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> + const struct ovsdb_table *table)
> +{
> + return shash_find_data(&m->tables, table->schema->name);
> +}
> +
> /* Check for duplicated column names. Return the first
> * duplicated column's name if found. Otherwise return
> * NULL. */
> @@ -428,9 +487,12 @@ ovsdb_monitor_table_check_duplicates(struct
> ovsdb_monitor *m,
>
> static struct ovsdb_monitor_changes *
> ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
> + enum ovsdb_monitor_changes_type type,
> uint64_t next_txn)
> {
> struct ovsdb_monitor_changes *changes;
> + struct hmap *changes_hmap =
> + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>
> changes = xzalloc(sizeof *changes);
>
> @@ -438,19 +500,22 @@ ovsdb_monitor_table_add_changes(struct
> ovsdb_monitor_table *mt,
> changes->mt = mt;
> changes->n_refs = 1;
> hmap_init(&changes->rows);
> - hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
> + hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn));
>
> return changes;
> };
>
> static struct ovsdb_monitor_changes *
> ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
> + enum ovsdb_monitor_changes_type type,
> uint64_t transaction)
> {
> struct ovsdb_monitor_changes *changes;
> + struct hmap *changes_hmap =
> + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
> size_t hash = hash_uint64(transaction);
>
> - HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
> + HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) {
> if (changes->transaction == transaction) {
> return changes;
> }
> @@ -462,13 +527,16 @@ ovsdb_monitor_table_find_changes(struct
> ovsdb_monitor_table *mt,
> /* Stop currently tracking changes to table 'mt' since 'transaction'. */
> static void
> ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
> + enum ovsdb_monitor_changes_type type,
> uint64_t transaction)
> {
> + struct hmap *changes_hmap =
> + type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
> struct ovsdb_monitor_changes *changes =
> - ovsdb_monitor_table_find_changes(mt, transaction);
> + ovsdb_monitor_table_find_changes(mt, type, transaction);
> if (changes) {
> if (--changes->n_refs == 0) {
> - hmap_remove(&mt->changes, &changes->hmap_node);
> + hmap_remove(changes_hmap, &changes->hmap_node);
> ovsdb_monitor_changes_destroy(changes);
> }
> }
> @@ -478,15 +546,16 @@ ovsdb_monitor_table_untrack_changes(struct
> ovsdb_monitor_table *mt,
> */
> static void
> ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
> + enum ovsdb_monitor_changes_type type,
> uint64_t transaction)
> {
> struct ovsdb_monitor_changes *changes;
>
> - changes = ovsdb_monitor_table_find_changes(mt, transaction);
> + changes = ovsdb_monitor_table_find_changes(mt, type, transaction);
> if (changes) {
> changes->n_refs++;
> } else {
> - ovsdb_monitor_table_add_changes(mt, transaction);
> + ovsdb_monitor_table_add_changes(mt, type, transaction);
> }
> }
>
> @@ -673,7 +742,7 @@ ovsdb_monitor_row_update_type_condition(const struct
> ovsdb_monitor_table *mt,
> ovsdb_monitor_row_update_type(initial, old, new);
>
> if (mt->dbmon->condition &&
> - (!ovsdb_condition_empty(mt->old_condition) &&
> + (!ovsdb_condition_empty(mt->old_condition) ||
> !ovsdb_condition_empty(mt->new_condition))) {
> bool old_cond = !old ? false
> : ovsdb_condition_evaluate_or_datum(old,
> @@ -895,7 +964,8 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
> * 'transaction'. */
> static struct json*
> ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
> - bool initial, uint64_t transaction,
> + bool initial, bool all_rows,
> + uint64_t transaction,
> compose_row_update_cb_func row_update)
> {
> struct shash_node *node;
> @@ -910,7 +980,16 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor
> *dbmon,
> struct ovsdb_monitor_changes *changes;
> struct json *table_json = NULL;
>
> - changes = ovsdb_monitor_table_find_changes(mt, transaction);
> + if (!all_rows) {
> + changes = ovsdb_monitor_table_find_changes(mt,
> +
> OVSDB_MONITOR_CHANGES,
> + transaction);
> + } else {
> + /* Get changes that includes all rows from all_txn point in
> time */
> + changes = ovsdb_monitor_table_find_changes(mt,
> + OVSDB_MONITOR_ALL,
> + transaction);
> + }
> if (!changes) {
> continue;
> }
> @@ -946,7 +1025,8 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor
> *dbmon,
>
> /* Returns JSON for a <table-updates> object (as described in RFC 7047)
> * for all the outstanding changes within 'monitor' that starts from
> - * '*unflushed' transaction id.
> + * '*unflushed'.
> + * If all_rows is true all_rows in the db that match conditions will be
> sent.
> *
> * The caller should specify 'initial' as true if the returned JSON is
> going to
> * be used as part of the initial reply to a "monitor" request, false if
> it is
> @@ -954,7 +1034,8 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor
> *dbmon,
> struct json *
> ovsdb_monitor_get_update(
> struct ovsdb_monitor *dbmon,
> - bool initial, uint64_t *unflushed,
> + bool initial, bool all_rows,
> + uint64_t *unflushed,
> const struct ovsdb_monitor_session_condition *condition,
> enum ovsdb_monitor_version version)
> {
> @@ -963,25 +1044,31 @@ ovsdb_monitor_get_update(
> struct json *json;
> uint64_t prev_txn = *unflushed;
> uint64_t next_txn = dbmon->n_transactions + 1;
> + /* We use the cache only for empty condition sessions */
> + bool can_cache = condition && condition->can_cache && !all_rows ?
> + true : false;
>
> ovsdb_monitor_set_condition(dbmon, condition);
> +
> /* Return a clone of cached json if one exists. Otherwise,
> * generate a new one and add it to the cache. */
> - if (condition && condition->can_cache) {
> + if (can_cache) {
> cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
> prev_txn);
> }
> if (cache_node) {
> json = cache_node->json ? json_clone(cache_node->json) : NULL;
> } else {
> if (version == OVSDB_MONITOR_V1) {
> - json = ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
> + json = ovsdb_monitor_compose_update(dbmon, initial, all_rows,
> + prev_txn,
> ovsdb_monitor_compose_row_update);
> } else {
> ovs_assert(version == OVSDB_MONITOR_V2);
> - json = ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
> + json = ovsdb_monitor_compose_update(dbmon, initial, all_rows,
> + prev_txn,
>
> ovsdb_monitor_compose_row_update2);
> }
> - if (condition && condition->can_cache) {
> + if (can_cache) {
> ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn,
> json);
> }
> }
> @@ -990,8 +1077,16 @@ ovsdb_monitor_get_update(
> SHASH_FOR_EACH (node, &dbmon->tables) {
> struct ovsdb_monitor_table *mt = node->data;
>
> - ovsdb_monitor_table_untrack_changes(mt, prev_txn);
> - ovsdb_monitor_table_track_changes(mt, next_txn);
> + if (!all_rows) {
> + ovsdb_monitor_table_untrack_changes(mt,
> + OVSDB_MONITOR_CHANGES,
> + prev_txn);
> + } else {
> + ovsdb_monitor_table_untrack_changes(mt,
> + OVSDB_MONITOR_ALL,
> + prev_txn);
> + }
> + ovsdb_monitor_table_track_changes(mt, OVSDB_MONITOR_CHANGES,
> next_txn);
>
> if (dbmon->condition && ovsdb_condition_cmp(mt->old_condition,
> mt->new_condition)) {
> @@ -1079,8 +1174,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row
> *old,
> change = xzalloc(sizeof *change);
> hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
> change->uuid = *uuid;
> - change->old = clone_monitor_row_data(mt, old);
> - change->new = clone_monitor_row_data(mt, new);
> + change->old = clone_monitor_ovsdb_row_data(mt, old);
> + change->new = clone_monitor_ovsdb_row_data(mt, new);
> } else {
> if (new) {
> update_monitor_row_data(mt, new, change->new);
> @@ -1097,6 +1192,20 @@ ovsdb_monitor_changes_update(const struct ovsdb_row
> *old,
> }
> }
>
> +static void
> +ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row
> *row,
> + const struct ovsdb_monitor_table
> *mt,
> + struct ovsdb_monitor_changes
> *changes)
> +{
> + struct ovsdb_monitor_row *change;
> +
> + ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) ==
> NULL);
> +
> + change = xzalloc(sizeof *change);
> + hmap_insert(&changes->rows, &change->hmap_node,
> uuid_hash(&row->uuid));
> + clone_monitor_row(mt, change, row);
> +}
> +
> static bool
> ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
> const unsigned long int *changed)
> @@ -1138,6 +1247,29 @@ ovsdb_monitor_changes_classify(enum
> ovsdb_monitor_selection type,
> : OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
> }
>
> +static void
> +ovsdb_monitor_record_change(struct ovsdb_monitor_table *mt,
> + const struct ovsdb_row *old,
> + const struct ovsdb_row *new,
> + const unsigned long int *changed,
> + struct ovsdb_monitor_changes *changes,
> + struct ovsdb_monitor_aux *aux)
> +{
> + enum ovsdb_monitor_changes_efficacy efficacy;
> + enum ovsdb_monitor_selection type;
> +
> + type = ovsdb_monitor_row_update_type(false, old, new);
> +
> + efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
> + if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> + ovsdb_monitor_changes_update(old, new, mt, changes);
> + }
> +
> + if (aux->efficacy < efficacy) {
> + aux->efficacy = efficacy;
> + }
> +}
> +
> static bool
> ovsdb_monitor_change_cb(const struct ovsdb_row *old,
> const struct ovsdb_row *new,
> @@ -1161,19 +1293,10 @@ ovsdb_monitor_change_cb(const struct ovsdb_row
> *old,
> mt = aux->mt;
>
> HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
> - enum ovsdb_monitor_changes_efficacy efficacy;
> - enum ovsdb_monitor_selection type;
> -
> - type = ovsdb_monitor_row_update_type(false, old, new);
> -
> - efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
> - if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> - ovsdb_monitor_changes_update(old, new, mt, changes);
> - }
> -
> - if (aux->efficacy < efficacy) {
> - aux->efficacy = efficacy;
> - }
> + ovsdb_monitor_record_change(mt, old, new, changed, changes, aux);
> + }
> + HMAP_FOR_EACH(changes, hmap_node, &mt->all) {
> + ovsdb_monitor_record_change(mt, old, new, changed, changes, aux);
> }
>
> return true;
> @@ -1182,10 +1305,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
> void
> ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
> {
> - struct ovsdb_monitor_aux aux;
> struct shash_node *node;
>
> - ovsdb_monitor_init_aux(&aux, dbmon);
> SHASH_FOR_EACH (node, &dbmon->tables) {
> struct ovsdb_monitor_table *mt = node->data;
>
> @@ -1193,9 +1314,14 @@ ovsdb_monitor_get_initial(const struct
> ovsdb_monitor *dbmon)
> struct ovsdb_row *row;
> struct ovsdb_monitor_changes *changes;
>
> - changes = ovsdb_monitor_table_find_changes(mt, 0);
> + changes = ovsdb_monitor_table_find_changes(mt,
> +
> OVSDB_MONITOR_CHANGES,
> + 0);
> if (!changes) {
> - changes = ovsdb_monitor_table_add_changes(mt, 0);
> + changes =
> + ovsdb_monitor_table_add_changes(mt,
> + OVSDB_MONITOR_CHANGES,
> + 0);
> HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
> ovsdb_monitor_changes_update(NULL, row, mt, changes);
> }
> @@ -1206,6 +1332,53 @@ ovsdb_monitor_get_initial(const struct
> ovsdb_monitor *dbmon)
> }
> }
>
> +/* Record all rows in DB and mark this changes at unflushed tranaction id
> */
> +void
> +ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> + uint64_t unflushed)
> +{
> + struct shash_node *node;
> +
> + SHASH_FOR_EACH (node, &dbmon->tables) {
> + struct ovsdb_monitor_table *mt = node->data;
> + struct ovsdb_row *new;
> + struct ovsdb_monitor_changes *changes, *all_changes;
> +
> + all_changes = ovsdb_monitor_table_find_changes(mt,
> + OVSDB_MONITOR_ALL,
> + unflushed);
> + if (!all_changes) {
> + all_changes = ovsdb_monitor_table_add_changes(mt,
> +
> OVSDB_MONITOR_ALL,
> + unflushed);
> + changes = ovsdb_monitor_table_find_changes(mt,
> +
> OVSDB_MONITOR_CHANGES,
> + unflushed);
> + HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) {
> + struct ovsdb_monitor_row *row = NULL;
> + if (changes) {
> + /* Check if we have a change record for this row */
> + row = ovsdb_monitor_changes_row_find(
> + changes,
> +
> ovsdb_row_get_uuid(new));
> + }
> + if (row) {
> + ovsdb_monitor_changes_clone_insert_row(row, mt,
> + all_changes);
> + } else {
> + ovsdb_monitor_changes_update(new, new, mt,
> all_changes);
> + }
> + }
> + } else {
> + all_changes->n_refs++;
> + }
> +
> + ovsdb_monitor_table_untrack_changes(mt,
> + OVSDB_MONITOR_CHANGES,
> + unflushed);
> + }
> +}
> +
> void
> ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
> struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
> @@ -1364,7 +1537,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
> hmap_remove(&mt->changes, &changes->hmap_node);
> ovsdb_monitor_changes_destroy(changes);
> }
> + HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) {
> + hmap_remove(&mt->changes, &changes->hmap_node);
> + ovsdb_monitor_changes_destroy(changes);
> + }
> hmap_destroy(&mt->changes);
> + hmap_destroy(&mt->all);
> free(mt->columns);
> free(mt->columns_index_map);
> free(mt);
> diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
> index 0f6c360..55d07a3 100644
> --- a/ovsdb/monitor.h
> +++ b/ovsdb/monitor.h
> @@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct
> ovsdb_monitor *dbmon,
> void ovsdb_monitor_add_table(struct ovsdb_monitor *m,
> const struct ovsdb_table *table);
>
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> + const struct ovsdb_table *table);
> +
> void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
> const struct ovsdb_table *table,
> const struct ovsdb_column *column,
> @@ -67,12 +71,12 @@ const char * OVS_WARN_UNUSED_RESULT
> ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
> const struct ovsdb_table *);
>
> -struct json *ovsdb_monitor_get_update(
> - struct ovsdb_monitor *dbmon,
> - bool initial,
> - uint64_t *unflushed_transaction,
> - const struct ovsdb_monitor_session_condition *condition,
> - enum ovsdb_monitor_version version);
> +struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
> + bool initial,
> + bool all_rows,
> + uint64_t *unflushed_transaction,
> + const struct
> ovsdb_monitor_session_condition *condition,
> + enum ovsdb_monitor_version version);
>
> void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
> const struct ovsdb_table *table,
> @@ -102,6 +106,9 @@ ovsdb_monitor_table_condition_add(
> const struct ovsdb_table *table,
> const struct json *json_cnd);
>
> +void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> + uint64_t unflushed);
> +
> void ovsdb_monitor_session_condition_bind(
> const struct ovsdb_monitor_session_condition *,
> const struct ovsdb_monitor *);
> --
> 2.1.4
>
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
More information about the dev
mailing list