[ovs-dev] [PATCH monitor_cond V2 07/12] ovsdb: enable jsonrpc-server to service "monitor_cond_update" request

Liran Schour LIRANS at il.ibm.com
Fri Jan 22 09:52:24 UTC 2016


Andy Zhou <azhou at ovn.org> wrote on 22/01/2016 04:43:26 AM:

> On Sat, Jan 16, 2016 at 12:16 AM, Liran Schour <lirans at il.ibm.com> 
wrote:
> ovsdb-server now accepts "monitor_cond_update" request. On conditions 
update
> we record all rows of table in a new changes list - OVSDB_MONITOR_ALL 
that are
> being indexed by the transaction-id at the moment of record.
> JSON cache is being used only for empty condition monitor sessions.
> Sees ovsdb-server (1) man page for details of monitor_cond_update.
> 
> Signed-off-by: Liran Schour <lirans at il.ibm.com>
> ---
>  ovsdb/jsonrpc-server.c | 140 ++++++++++++++++++++++++++-
>  ovsdb/monitor.c        | 258 ++++++++++++++++++++++++++++++++++++++
> +++--------
>  ovsdb/monitor.h        |  19 ++--
>  3 files changed, 369 insertions(+), 48 deletions(-)
> 
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index 3439f40..3d898bb 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -87,6 +87,11 @@ static void ovsdb_jsonrpc_trigger_complete_done(
>  static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
>      struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json 
*params,
>      enum ovsdb_monitor_version, bool, const struct json *request_id);
> +static struct jsonrpc_msg *
>  
> This line break looks different from the surrounding code.
>

Will fix this.
  
> +ovsdb_jsonrpc_monitor_cond_update(
> +    struct ovsdb_jsonrpc_session *s,
> +    struct json *params,
> +    const struct json *request_id);
>  static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
>      struct ovsdb_jsonrpc_session *,
>      struct json_array *params,
> @@ -863,6 +868,9 @@ ovsdb_jsonrpc_session_got_request(struct 
> ovsdb_jsonrpc_session *s,
>                                                           
"monitor_cond"),
>                                                   request->id);
>          }
> +    } else if (!strcmp(request->method, "monitor_cond_update")) {
> +        reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params,
> +                                                  request->id);
>      } else if (!strcmp(request->method, "monitor_cancel")) {
>          reply = ovsdb_jsonrpc_monitor_cancel(s, 
json_array(request->params),
>                                               request->id);
> @@ -1052,6 +1060,8 @@ struct ovsdb_jsonrpc_monitor {
>      struct ovsdb_monitor *dbmon;
>      uint64_t unflushed;         /* The first transaction that has not 
been
>                                         flushed to the jsonrpc 
> remote client. */
> +    bool all_rows;              /* Indicates if in the next flush 
> we request all
> +                                   rows (due to a condition 
> change)          */
> I don't think I understand the logic of 'all_rows',  Should flush 
> all changes using the old conditions, then reply
> to the cond_update message, then start to send updates according to 
> the new conditions? 
>

After going over the code due to the previous patch review I noticed that 
if we force the client to send cond_update request from the JSONRPC 
session of the existing monitor session. Then we can guarantee that all 
changes are being flushed and we can have a simpler code. That will send 
an update of all_rows according to the old and new condition.
Will fix the code according to that.

>      enum ovsdb_monitor_version version;
>      struct ovsdb_monitor_session_condition *condition;/* Session's 
> condition */
>  };
> @@ -1219,6 +1229,7 @@ ovsdb_jsonrpc_monitor_create(struct 
> ovsdb_jsonrpc_session *s, struct ovsdb *db,
>          m->condition = ovsdb_monitor_session_condition_create();
>      }
>      m->unflushed = 0;
> +    m->all_rows = false;
>      m->version = version;
>      hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
>      m->monitor_id = json_clone(monitor_id);
> @@ -1301,6 +1312,125 @@ error:
>      return jsonrpc_create_error(json, request_id);
>  }
> 
> +static struct ovsdb_error *
> +ovsdb_jsonrpc_parse_monitor_cond_update_request(
> +                                struct ovsdb_jsonrpc_monitor *m,
> +                                const struct ovsdb_table *table,
> +                                const struct json *cond_update_req)
> +{
> +    const struct ovsdb_table_schema *ts = table->schema;
> +    const struct json *added, *removed, *columns;
> +    struct ovsdb_parser parser;
> +    struct ovsdb_error *error;
> +
> +    ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name);
> +    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | 
> OP_OPTIONAL);
> +    added = ovsdb_parser_member(&parser, "added", OP_ARRAY | 
OP_OPTIONAL);
> +    removed = ovsdb_parser_member(&parser, "removed", OP_ARRAY | 
> OP_OPTIONAL);
> +
> +    error = ovsdb_parser_finish(&parser);
> +    if (error) {
> +        return error;
> +    }
> +
> +    if (columns) {
> +        error = ovsdb_syntax_error(cond_update_req, NULL, 
"changingcolumns "
> +                                   "is unsupported");
> +        return error;
> +    }
> +    error = ovsdb_monitor_table_condition_change(m->condition, table,
> +                                                 added, removed);
> +
> +    return error;
> +}
> +
> +static struct jsonrpc_msg *
> +ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s,
> +                                  struct json *params,
> +                                  const struct json *request_id)
> +{
> +    struct ovsdb_error *error;
> +    struct ovsdb_jsonrpc_monitor *m;
> +    struct json *monitor_cond_update_reqs;
> +    struct shash_node *node;
> +    struct json *json;
> +
> +    if (json_array(params)->n != 3) {
> +        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
> +        goto error;
> +    }
> +
> +    m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
> +    if (!m) {
> +        error = ovsdb_syntax_error(request_id, NULL,
> +                                   "unknown monitor session");
> +        goto error;
> +    }
> +
> +    monitor_cond_update_reqs = params->u.array.elems[2];
> +    if (monitor_cond_update_reqs->type != JSON_OBJECT) {
> +        error =
> +            ovsdb_syntax_error(NULL, NULL,
> +                               "monitor-cond-change-requests must 
> be object");
> +        goto error;
> +    }
> +
> +    SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) {
> +        const struct ovsdb_table *table;
> +        const struct json *mr_value;
> +        size_t i;
> +
> +        table = ovsdb_get_table(m->db, node->name);
> +        if (!table) {
> +            error = ovsdb_syntax_error(NULL, NULL,
> +                                       "no table named %s", 
node->name);
> +            goto error;
> +        }
> +        if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
> +            error = ovsdb_syntax_error(NULL, NULL,
> +                                       "no table named %s in 
> monitor session",
> +                                       node->name);
> +            goto error;
> +        }
> +
> +        mr_value = node->data;
> +        if (mr_value->type == JSON_ARRAY) {
> +            const struct json_array *array = &mr_value->u.array;
> +
> +            for (i = 0; i < array->n; i++) {
> +                error = 
ovsdb_jsonrpc_parse_monitor_cond_update_request(
> +                                            m, table, array->elems[i]);
> +                if (error) {
> +                    goto error;
> +                }
> +            }
> +        } else {
> +            error = ovsdb_syntax_error(
> +                       NULL, NULL,
> +                       "table %s no monitor-cond-change JSON array",
> +                       node->name);
> +            goto error;
> +        }
> +    }
> +
> +    ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed);
> +    m->all_rows = true;
> +
> +    /* Change monitor id */
> +    hmap_remove(&s->monitors, &m->node);
> +    json_destroy(m->monitor_id);
> +    m->monitor_id = json_clone(params->u.array.elems[1]);
> +    hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
> +
> +    return jsonrpc_create_reply(json_object_create(), request_id);
> +
> +    error:
> The indenting of "error:" label looks odd.  

Will fix that.

> +
> +    json = ovsdb_error_to_json(error);
> +    ovsdb_error_destroy(error);
> +    return jsonrpc_create_error(json, request_id);
> +}
> +
>  static struct jsonrpc_msg *
>  ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
>                               struct json_array *params,
> @@ -1337,8 +1467,14 @@ static struct json *
>  ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
>                                       bool initial)
>  {
> -    return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
> -                                    m->condition, m->version);
> +    struct json * json = ovsdb_monitor_get_update(m->dbmon, initial,
> +                                                  m->all_rows,
> +                                                  &m->unflushed,
> +                                                  m->condition,
> +                                                  m->version);
> +
> +    m->all_rows = false;
> +    return json;
>  }
> 
>  static bool
> diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
> index cf87b98..bddf20e 100644
> --- a/ovsdb/monitor.c
> +++ b/ovsdb/monitor.c
> @@ -121,6 +121,11 @@ struct ovsdb_monitor_changes {
>                                      hmap.  */
>  };
> 
> +enum ovsdb_monitor_changes_type {
> +    OVSDB_MONITOR_CHANGES,
> +    OVSDB_MONITOR_ALL
> +};
> +
>  /* A particular table being monitored. */
>  struct ovsdb_monitor_table {
>      const struct ovsdb_table *table;
> @@ -143,6 +148,11 @@ struct ovsdb_monitor_table {
>      /* Temp pointers to conditions for composing update */
>      struct ovsdb_condition *old_condition;
>      struct ovsdb_condition *new_condition;
> +
> +    /* Contains 'ovsdb_monitor_changes' of all rows on table at 
transaction
> +       point in time. indexed by 'transaction'. */
> +    struct hmap all;
> +
>  };
> 
>  typedef struct json *
> @@ -152,12 +162,15 @@ typedef struct json *
> 
>  static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
>  static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
> -    struct ovsdb_monitor_table *mt, uint64_t next_txn);
> +    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type 
type,
> +    uint64_t next_txn);
>  static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
> -    struct ovsdb_monitor_table *mt, uint64_t unflushed);
> +    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type 
type,
> +    uint64_t unflushed);
>  static void ovsdb_monitor_changes_destroy(
>                                    struct ovsdb_monitor_changes 
*changes);
>  static void ovsdb_monitor_table_track_changes(struct 
ovsdb_monitor_table *mt,
> +                                  enum ovsdb_monitor_changes_type type,
>                                    uint64_t unflushed);
> 
>  static uint32_t
> @@ -256,8 +269,8 @@ ovsdb_monitor_changes_row_find(const struct 
> ovsdb_monitor_changes *changes,
>   *
>   * If 'row' is NULL, returns NULL. */
>  static struct ovsdb_datum *
> -clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> -                       const struct ovsdb_row *row)
> +clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt,
> +                             const struct ovsdb_row *row)
>  {
>      struct ovsdb_datum *data;
>      size_t i;
> @@ -278,6 +291,44 @@ clone_monitor_row_data(const struct 
> ovsdb_monitor_table *mt,
>      return data;
>  }
> 
> +/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes 
them as
> + * copies of the data in 'fields' drawn from the columns represented by
> + * mt->columns[].  Returns the array.
> + *
> + * If 'row' is NULL, returns NULL. */
> +static struct ovsdb_datum *
> +clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> +                       const struct ovsdb_datum *fields)
> +{
> +    struct ovsdb_datum *data;
> +    size_t i;
> +
> +    if (!fields) {
> +        return NULL;
> +    }
> +
> +    data = xmalloc(mt->n_columns * sizeof *data);
> +    for (i = 0; i < mt->n_columns; i++) {
> +        const struct ovsdb_column *c = mt->columns[i].column;
> +        const struct ovsdb_datum *src = &fields[c->index];
> +        struct ovsdb_datum *dst = &data[i];
> +        const struct ovsdb_type *type = &c->type;
> +
> +        ovsdb_datum_clone(dst, src, type);
> +    }
> +    return data;
> +}
> +
> +static void
> +clone_monitor_row(const struct ovsdb_monitor_table *mt,
> +                  struct ovsdb_monitor_row *to,
> +                  const struct ovsdb_monitor_row *from)
> +{
> +    to->uuid = from->uuid;
> +    to->old = clone_monitor_row_data(mt, from->old);
> +    to->new = clone_monitor_row_data(mt,from->new);
> +}
> +
>  /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of 
> the data from
>   * in 'row' drawn from the columns represented by mt->columns[]. */
>  static void
> @@ -373,6 +424,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
>      mt->dbmon = m;
>      shash_add(&m->tables, table->schema->name, mt);
>      hmap_init(&mt->changes);
> +    hmap_init(&mt->all);
>      mt->columns_index_map =
>          xmalloc(sizeof(unsigned int) * 
shash_count(&table->schema->columns));
>  }
> @@ -400,6 +452,13 @@ ovsdb_monitor_add_column(struct ovsdb_monitor 
*dbmon,
>      c->select = select;
>  }
> 
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> +                           const struct ovsdb_table *table)
> +{
> +    return shash_find_data(&m->tables, table->schema->name);
> +}
> +
>  /* Check for duplicated column names. Return the first
>   * duplicated column's name if found. Otherwise return
>   * NULL.  */
> @@ -428,9 +487,12 @@ ovsdb_monitor_table_check_duplicates(struct 
> ovsdb_monitor *m,
> 
>  static struct ovsdb_monitor_changes *
>  ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
> +                                enum ovsdb_monitor_changes_type type,
>                                  uint64_t next_txn)
>  {
>      struct ovsdb_monitor_changes *changes;
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
> 
>      changes = xzalloc(sizeof *changes);
> 
> @@ -438,19 +500,22 @@ ovsdb_monitor_table_add_changes(struct 
> ovsdb_monitor_table *mt,
>      changes->mt = mt;
>      changes->n_refs = 1;
>      hmap_init(&changes->rows);
> -    hmap_insert(&mt->changes, &changes->hmap_node, 
hash_uint64(next_txn));
> +    hmap_insert(changes_hmap, &changes->hmap_node, 
hash_uint64(next_txn));
> 
>      return changes;
>  };
> 
>  static struct ovsdb_monitor_changes *
>  ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
> +                                 enum ovsdb_monitor_changes_type type,
>                                   uint64_t transaction)
>  {
>      struct ovsdb_monitor_changes *changes;
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>      size_t hash = hash_uint64(transaction);
> 
> -    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
> +    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) {
>          if (changes->transaction == transaction) {
>              return changes;
>          }
> @@ -462,13 +527,16 @@ ovsdb_monitor_table_find_changes(struct 
> ovsdb_monitor_table *mt,
>  /* Stop currently tracking changes to table 'mt' since 'transaction'. 
*/
>  static void
>  ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
> +                                    enum ovsdb_monitor_changes_type 
type,
>                                      uint64_t transaction)
>  {
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>      struct ovsdb_monitor_changes *changes =
> -                ovsdb_monitor_table_find_changes(mt, transaction);
> +        ovsdb_monitor_table_find_changes(mt, type, transaction);
>      if (changes) {
>          if (--changes->n_refs == 0) {
> -            hmap_remove(&mt->changes, &changes->hmap_node);
> +            hmap_remove(changes_hmap, &changes->hmap_node);
>              ovsdb_monitor_changes_destroy(changes);
>          }
>      }
> @@ -478,15 +546,16 @@ ovsdb_monitor_table_untrack_changes(struct 
> ovsdb_monitor_table *mt,
>   */
>  static void
>  ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
> +                                  enum ovsdb_monitor_changes_type type,
>                                    uint64_t transaction)
>  {
>      struct ovsdb_monitor_changes *changes;
> 
> -    changes = ovsdb_monitor_table_find_changes(mt, transaction);
> +    changes = ovsdb_monitor_table_find_changes(mt, type, transaction);
>      if (changes) {
>          changes->n_refs++;
>      } else {
> -        ovsdb_monitor_table_add_changes(mt, transaction);
> +        ovsdb_monitor_table_add_changes(mt, type, transaction);
>      }
>  }
> 
> @@ -673,7 +742,7 @@ ovsdb_monitor_row_update_type_condition(const 
> struct ovsdb_monitor_table *mt,
>          ovsdb_monitor_row_update_type(initial, old, new);
> 
>      if (mt->dbmon->condition &&
> -        (!ovsdb_condition_empty(mt->old_condition) &&
> +        (!ovsdb_condition_empty(mt->old_condition) ||
>           !ovsdb_condition_empty(mt->new_condition))) {
>          bool old_cond = !old ? false
>              : ovsdb_condition_evaluate_or_datum(old,
> @@ -895,7 +964,8 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor 
*dbmon)
>   * 'transaction'.  */
>  static struct json*
>  ovsdb_monitor_compose_update(struct ovsdb_monitor *dbmon,
> -                             bool initial, uint64_t transaction,
> +                             bool initial, bool all_rows,
> +                             uint64_t transaction,
>                               compose_row_update_cb_func row_update)
>  {
>      struct shash_node *node;
> @@ -910,7 +980,16 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor 
*dbmon,
>          struct ovsdb_monitor_changes *changes;
>          struct json *table_json = NULL;
> 
> -        changes = ovsdb_monitor_table_find_changes(mt, transaction);
> +        if (!all_rows) {
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +                                                      
 OVSDB_MONITOR_CHANGES,
> +                                                       transaction);
> +        } else {
> +            /* Get changes that includes all rows from all_txn 
> point in time */
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +                                                      
 OVSDB_MONITOR_ALL,
> +                                                       transaction);
> +        }
>          if (!changes) {
>              continue;
>          }
> @@ -946,7 +1025,8 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor 
*dbmon,
> 
>  /* Returns JSON for a <table-updates> object (as described in RFC 7047)
>   * for all the outstanding changes within 'monitor' that starts from
> - * '*unflushed' transaction id.
> + * '*unflushed'.
> + * If all_rows is true all_rows in the db that match conditions will be 
sent.
>   *
>   * The caller should specify 'initial' as true if the returned JSON
> is going to
>   * be used as part of the initial reply to a "monitor" request, 
> false if it is
> @@ -954,7 +1034,8 @@ ovsdb_monitor_compose_update(struct ovsdb_monitor 
*dbmon,
>  struct json *
>  ovsdb_monitor_get_update(
>               struct ovsdb_monitor *dbmon,
> -             bool initial, uint64_t *unflushed,
> +             bool initial, bool all_rows,
> +             uint64_t *unflushed,
>               const struct ovsdb_monitor_session_condition *condition,
>               enum ovsdb_monitor_version version)
>  {
> @@ -963,25 +1044,31 @@ ovsdb_monitor_get_update(
>      struct json *json;
>      uint64_t prev_txn = *unflushed;
>      uint64_t next_txn = dbmon->n_transactions + 1;
> +    /* We use the cache only for empty condition sessions */
> +    bool can_cache = condition && condition->can_cache && !all_rows ?
> +        true : false;
> 
>      ovsdb_monitor_set_condition(dbmon, condition);
> +
>      /* Return a clone of cached json if one exists. Otherwise,
>       * generate a new one and add it to the cache.  */
> -    if (condition && condition->can_cache) {
> +    if (can_cache) {
>          cache_node = ovsdb_monitor_json_cache_search(dbmon, 
> version, prev_txn);
>      }
>      if (cache_node) {
>          json = cache_node->json ? json_clone(cache_node->json) : NULL;
>      } else {
>          if (version == OVSDB_MONITOR_V1) {
> -            json = ovsdb_monitor_compose_update(dbmon, initial, 
prev_txn,
> +            json = ovsdb_monitor_compose_update(dbmon, initial, 
all_rows,
> +                                                prev_txn,
>                                         
 ovsdb_monitor_compose_row_update);
>          } else {
>              ovs_assert(version == OVSDB_MONITOR_V2);
> -            json = ovsdb_monitor_compose_update(dbmon, initial, 
prev_txn,
> +            json = ovsdb_monitor_compose_update(dbmon, initial, 
all_rows,
> +                                                prev_txn,
>                                         
 ovsdb_monitor_compose_row_update2);
>          }
> -        if (condition && condition->can_cache) {
> +        if (can_cache) {
>              ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn, 
json);
>          }
>      }
> @@ -990,8 +1077,16 @@ ovsdb_monitor_get_update(
>      SHASH_FOR_EACH (node, &dbmon->tables) {
>          struct ovsdb_monitor_table *mt = node->data;
> 
> -        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
> -        ovsdb_monitor_table_track_changes(mt, next_txn);
> +        if (!all_rows) {
> +            ovsdb_monitor_table_untrack_changes(mt,
> +                                                OVSDB_MONITOR_CHANGES,
> +                                                prev_txn);
> +        } else {
> +            ovsdb_monitor_table_untrack_changes(mt,
> +                                                OVSDB_MONITOR_ALL,
> +                                                prev_txn);
> +        }
> +        ovsdb_monitor_table_track_changes(mt, 
> OVSDB_MONITOR_CHANGES, next_txn);
> 
>          if (dbmon->condition && ovsdb_condition_cmp(mt->old_condition,
>                                                      mt->new_condition)) 
{
> @@ -1079,8 +1174,8 @@ ovsdb_monitor_changes_update(const struct 
> ovsdb_row *old,
>          change = xzalloc(sizeof *change);
>          hmap_insert(&changes->rows, &change->hmap_node, 
uuid_hash(uuid));
>          change->uuid = *uuid;
> -        change->old = clone_monitor_row_data(mt, old);
> -        change->new = clone_monitor_row_data(mt, new);
> +        change->old = clone_monitor_ovsdb_row_data(mt, old);
> +        change->new = clone_monitor_ovsdb_row_data(mt, new);
>      } else {
>          if (new) {
>              update_monitor_row_data(mt, new, change->new);
> @@ -1097,6 +1192,20 @@ ovsdb_monitor_changes_update(const struct 
> ovsdb_row *old,
>      }
>  }
> 
> +static void
> +ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row 
*row,
> +                                       const struct ovsdb_monitor_table 
*mt,
> +                                       struct 
ovsdb_monitor_changes*changes)
> +{
> +    struct ovsdb_monitor_row *change;
> +
> +    ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) == 
NULL);
> +
> +    change = xzalloc(sizeof *change);
> +    hmap_insert(&changes->rows, &change->hmap_node, 
uuid_hash(&row->uuid));
> +    clone_monitor_row(mt, change, row);
> +}
> +
>  static bool
>  ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
>                                const unsigned long int *changed)
> @@ -1138,6 +1247,29 @@ ovsdb_monitor_changes_classify(enum 
> ovsdb_monitor_selection type,
>                  :  OVSDB_CHANGES_REQUIRE_INTERNAL_UPDATE;
>  }
> 
> +static void
> +ovsdb_monitor_record_change(struct ovsdb_monitor_table *mt,
> +                            const struct ovsdb_row *old,
> +                            const struct ovsdb_row *new,
> +                            const unsigned long int *changed,
> +                            struct ovsdb_monitor_changes *changes,
> +                            struct ovsdb_monitor_aux *aux)
> +{
> +    enum ovsdb_monitor_changes_efficacy efficacy;
> +    enum ovsdb_monitor_selection type;
> +
> +    type = ovsdb_monitor_row_update_type(false, old, new);
> +
> +    efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
> +    if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> +        ovsdb_monitor_changes_update(old, new, mt, changes);
> +    }
> +
> +    if (aux->efficacy < efficacy) {
> +        aux->efficacy = efficacy;
> +    }
> +}
> +
>  static bool
>  ovsdb_monitor_change_cb(const struct ovsdb_row *old,
>                          const struct ovsdb_row *new,
> @@ -1161,19 +1293,10 @@ ovsdb_monitor_change_cb(const struct ovsdb_row 
*old,
>      mt = aux->mt;
> 
>      HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
> -        enum ovsdb_monitor_changes_efficacy efficacy;
> -        enum ovsdb_monitor_selection type;
> -
> -        type = ovsdb_monitor_row_update_type(false, old, new);
> -
> -        efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
> -        if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> -            ovsdb_monitor_changes_update(old, new, mt, changes);
> -        }
> -
> -        if (aux->efficacy < efficacy) {
> -            aux->efficacy = efficacy;
> -        }
> +        ovsdb_monitor_record_change(mt, old, new, changed, changes, 
aux);
> +    }
> +    HMAP_FOR_EACH(changes, hmap_node, &mt->all) {
> +        ovsdb_monitor_record_change(mt, old, new, changed, changes, 
aux);
>      }
> 
>      return true;
> @@ -1182,10 +1305,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row 
*old,
>  void
>  ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
>  {
> -    struct ovsdb_monitor_aux aux;
>      struct shash_node *node;
> 
> -    ovsdb_monitor_init_aux(&aux, dbmon);
>      SHASH_FOR_EACH (node, &dbmon->tables) {
>          struct ovsdb_monitor_table *mt = node->data;
> 
> @@ -1193,9 +1314,14 @@ ovsdb_monitor_get_initial(const struct 
> ovsdb_monitor *dbmon)
>              struct ovsdb_row *row;
>              struct ovsdb_monitor_changes *changes;
> 
> -            changes = ovsdb_monitor_table_find_changes(mt, 0);
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +                                                      
 OVSDB_MONITOR_CHANGES,
> +                                                       0);
>              if (!changes) {
> -                changes = ovsdb_monitor_table_add_changes(mt, 0);
> +                changes =
> +                    ovsdb_monitor_table_add_changes(mt,
> +                                                    
OVSDB_MONITOR_CHANGES,
> +                                                    0);
>                  HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
>                      ovsdb_monitor_changes_update(NULL, row, mt, 
changes);
>                  }
> @@ -1206,6 +1332,53 @@ ovsdb_monitor_get_initial(const struct 
> ovsdb_monitor *dbmon)
>      }
>  }
> 
> +/* Record all rows in DB and mark this changes at unflushed tranaction 
id */
> +void
> +ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> +                           uint64_t unflushed)
> +{
> +    struct shash_node *node;
> +
> +    SHASH_FOR_EACH (node, &dbmon->tables) {
> +        struct ovsdb_monitor_table *mt = node->data;
> +        struct ovsdb_row *new;
> +        struct ovsdb_monitor_changes *changes, *all_changes;
> +
> +        all_changes = ovsdb_monitor_table_find_changes(mt,
> +                                                      
 OVSDB_MONITOR_ALL,
> +                                                       unflushed);
> +        if (!all_changes) {
> +            all_changes = ovsdb_monitor_table_add_changes(mt,
> +                                                          
OVSDB_MONITOR_ALL,
> +                                                          unflushed);
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +                                                      
 OVSDB_MONITOR_CHANGES,
> +                                                       unflushed);
> +            HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) {
> +                struct ovsdb_monitor_row *row = NULL;
> +                if (changes) {
> +                    /* Check if we have a change record for this row */
> +                    row = ovsdb_monitor_changes_row_find(
> +                                                   changes,
> +                                                  
 ovsdb_row_get_uuid(new));
> +                }
> +                if (row) {
> +                    ovsdb_monitor_changes_clone_insert_row(row, mt,
> +                                                          
 all_changes);
> +                } else {
> +                    ovsdb_monitor_changes_update(new, new, mt, 
all_changes);
> +                }
> +            }
> +        } else {
> +            all_changes->n_refs++;
> +        }
> +
> +        ovsdb_monitor_table_untrack_changes(mt,
> +                                            OVSDB_MONITOR_CHANGES,
> +                                            unflushed);
> +    }
> +}
> +
>  void
>  ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
>                     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
> @@ -1364,7 +1537,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor 
*dbmon)
>              hmap_remove(&mt->changes, &changes->hmap_node);
>              ovsdb_monitor_changes_destroy(changes);
>          }
> +        HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) {
> +            hmap_remove(&mt->changes, &changes->hmap_node);
> +            ovsdb_monitor_changes_destroy(changes);
> +        }
>          hmap_destroy(&mt->changes);
> +        hmap_destroy(&mt->all);
>          free(mt->columns);
>          free(mt->columns_index_map);
>          free(mt);
> diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
> index 0f6c360..55d07a3 100644
> --- a/ovsdb/monitor.h
> +++ b/ovsdb/monitor.h
> @@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct 
> ovsdb_monitor *dbmon,
>  void ovsdb_monitor_add_table(struct ovsdb_monitor *m,
>                               const struct ovsdb_table *table);
> 
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> +                           const struct ovsdb_table *table);
> +
>  void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
>                                const struct ovsdb_table *table,
>                                const struct ovsdb_column *column,
> @@ -67,12 +71,12 @@ const char * OVS_WARN_UNUSED_RESULT
>  ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
>                            const struct ovsdb_table *);
> 
> -struct json *ovsdb_monitor_get_update(
> -               struct ovsdb_monitor *dbmon,
> -               bool initial,
> -               uint64_t *unflushed_transaction,
> -               const struct ovsdb_monitor_session_condition *condition,
> -               enum ovsdb_monitor_version version);
> +struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
> +                                      bool initial,
> +                                      bool all_rows,
> +                                      uint64_t *unflushed_transaction,
> +                                      const struct 
> ovsdb_monitor_session_condition *condition,
> +                                      enum ovsdb_monitor_version 
version);
> 
>  void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
>                                      const struct ovsdb_table *table,
> @@ -102,6 +106,9 @@ ovsdb_monitor_table_condition_add(
>                            const struct ovsdb_table *table,
>                            const struct json *json_cnd);
> 
> +void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> +                                uint64_t unflushed);
> +
>  void ovsdb_monitor_session_condition_bind(
>                             const struct ovsdb_monitor_session_condition 
*,
>                             const struct ovsdb_monitor *);
> --
> 2.1.4
> 
> 
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev




More information about the dev mailing list