[ovs-dev] [PATCH v2] ovsdb-idl: Support for readonly columns that are fetched on-demand
Ben Pfaff
blp at ovn.org
Mon Nov 23 16:39:25 UTC 2015
I applied this to master. Thank you!
On Tue, Nov 10, 2015 at 05:21:04PM +0000, Ansari, Shad wrote:
> (resubmitting this patch for comments/review)
>
> There is currently no mechanism in IDL to fetch specific column values
> on-demand without having to register them for monitoring. In the case
> where the column represent a frequently changing entity (e.g. counter),
> and the reads are relatively infrequent (e.g. CLI client), there is a
> significant overhead in replication.
>
> This patch adds support in the Python IDL to register a subset of the
> columns of a table as "readonly". Readonly columns are not replicated.
> Users may "fetch" the readonly columns of a row on-demand. Once fetched,
> the columns are not updated until the next fetch by the user. Writes by
> the user to readonly columns does not change the value (both locally or
> on the server).
>
> The two main user visible changes in this patch are:
> - The SchemaHelper.register_columns() method now takes an optionaly
> argument to specify the subset of readonly column(s)
> - A new Row.fetch(columns) method to fetch values of readonly columns(s)
>
> Usage:
> ------
>
> # Schema file includes all columns, including readonly
> schema_helper = ovs.db.idl.SchemaHelper(schema_file)
>
> # Register interest in columns with 'r' and 's' as readonly
> schema_helper.register_columns("simple", [i, r, s], [r, s])
>
> # Create Idl and jsonrpc, and wait for update, as usual
> ...
>
> # Fetch value of column 'r' for a specific row
> row.fetch('r')
> txn.commit_block()
>
> print row.r
> print getattr(row, 'r')
>
> Signed-off-by: Shad Ansari <shad.ansari at hp.com<mailto:shad.ansari at hp.com>>
> ---
> python/ovs/db/idl.py | 86 ++++++++++++++++++++++++++++++++++++--
> tests/ovsdb-idl.at | 47 +++++++++++++++++++++
> tests/test-ovsdb.py | 114 +++++++++++++++++++++++++++++++++------------------
> 3 files changed, 204 insertions(+), 43 deletions(-)
>
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index f074dbf..c8990c7 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -107,6 +107,7 @@ class Idl(object):
> schema = schema.get_idl_schema()
>
> self.tables = schema.tables
> + self.readonly = schema.readonly
> self._db = schema
> self._session = ovs.jsonrpc.Session.open(remote)
> self._monitor_request_id = None
> @@ -338,7 +339,13 @@ class Idl(object):
> def __send_monitor_request(self):
> monitor_requests = {}
> for table in self.tables.itervalues():
> - monitor_requests[table.name] = {"columns": table.columns.keys()}
> + columns = []
> + for column in table.columns.keys():
> + if ((table.name not in self.readonly) or
> + (table.name in self.readonly) and
> + (column not in self.readonly[table.name])):
> + columns.append(column)
> + monitor_requests[table.name] = {"columns": columns}
> msg = ovs.jsonrpc.Message.create_request(
> "monitor", [self._db.name, None, monitor_requests])
> self._monitor_request_id = msg.id
> @@ -571,7 +578,11 @@ class Row(object):
> if self._data is None:
> raise AttributeError("%s instance has no attribute '%s'" %
> (self.__class__.__name__, column_name))
> - datum = self._data[column_name]
> + if column_name in self._data:
> + datum = self._data[column_name]
> + else:
> + raise AttributeError("%s instance has no attribute '%s'" %
> + (self.__class__.__name__, column_name))
>
> return datum.to_python(_uuid_to_row)
>
> @@ -579,6 +590,11 @@ class Row(object):
> assert self._changes is not None
> assert self._idl.txn
>
> + if ((self._table.name in self._idl.readonly) and
> + (column_name in self._idl.readonly[self._table.name])):
> + vlog.warn("attempting to write to readonly column %s" % column_name)
> + return
> +
> column = self._table.columns[column_name]
> try:
> datum = ovs.db.data.Datum.from_python(column.type, value,
> @@ -655,6 +671,9 @@ class Row(object):
> self.__dict__["_changes"] = None
> del self._table.rows[self.uuid]
>
> + def fetch(self, column_name):
> + self._idl.txn._fetch(self, column_name)
> +
> def increment(self, column_name):
> """Causes the transaction, when committed, to increment the value of
> 'column_name' within this row by 1. 'column_name' must have an integer
> @@ -777,10 +796,12 @@ class Transaction(object):
> self._inc_row = None
> self._inc_column = None
>
> + self._fetch_requests = []
> +
> self._inserted_rows = {} # Map from UUID to _InsertedRow
>
> def add_comment(self, comment):
> - """Appens 'comment' to the comments that will be passed to the OVSDB
> + """Appends 'comment' to the comments that will be passed to the OVSDB
> server when this transaction is committed. (The comment will be
> committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
> relatively human-readable form.)"""
> @@ -947,6 +968,16 @@ class Transaction(object):
> if row._data is None or row_json:
> operations.append(op)
>
> + if self._fetch_requests:
> + for fetch in self._fetch_requests:
> + fetch["index"] = len(operations) - 1
> + operations.append({"op": "select",
> + "table": fetch["row"]._table.name,
> + "where": self._substitute_uuids(
> + _where_uuid_equals(fetch["row"].uuid)),
> + "columns": [fetch["column_name"]]})
> + any_updates = True
> +
> # Add increment.
> if self._inc_row and any_updates:
> self._inc_index = len(operations) - 1
> @@ -1057,6 +1088,9 @@ class Transaction(object):
> self._inc_row = row
> self._inc_column = column
>
> + def _fetch(self, row, column_name):
> + self._fetch_requests.append({"row":row, "column_name":column_name})
> +
> def _write(self, row, column, datum):
> assert row._changes is not None
>
> @@ -1139,6 +1173,11 @@ class Transaction(object):
> if not soft_errors and not hard_errors and not lock_errors:
> if self._inc_row and not self.__process_inc_reply(ops):
> hard_errors = True
> + if self._fetch_requests:
> + if self.__process_fetch_reply(ops):
> + self.idl.change_seqno += 1
> + else:
> + hard_errors = True
>
> for insert in self._inserted_rows.itervalues():
> if not self.__process_insert_reply(insert, ops):
> @@ -1166,6 +1205,38 @@ class Transaction(object):
> else:
> return True
>
> + def __process_fetch_reply(self, ops):
> + update = False
> + for fetch_request in self._fetch_requests:
> + row = fetch_request["row"]
> + column_name = fetch_request["column_name"]
> + index = fetch_request["index"]
> + table = row._table
> +
> + select = ops[index]
> + fetched_rows = select.get("rows")
> + if not Transaction.__check_json_type(fetched_rows, (list, tuple),
> + '"select" reply "rows"'):
> + return False
> + if len(fetched_rows) != 1:
> + # XXX rate-limit
> + vlog.warn('"select" reply "rows" has %d elements '
> + 'instead of 1' % len(rows))
> + continue
> + fetched_row = fetched_rows[0]
> + if not Transaction.__check_json_type(fetched_row, (dict,),
> + '"select" reply row'):
> + continue
> +
> + column = table.columns.get(column_name)
> + datum_json = fetched_row.get(column_name)
> + datum = ovs.db.data.Datum.from_json(column.type, datum_json)
> +
> + row._data[column_name] = datum
> + update = True
> +
> + return update
> +
> def __process_inc_reply(self, ops):
> if self._inc_index + 2 > len(ops):
> # XXX rate-limit
> @@ -1261,16 +1332,21 @@ class SchemaHelper(object):
>
> self.schema_json = schema_json
> self._tables = {}
> + self._readonly = {}
> self._all = False
>
> - def register_columns(self, table, columns):
> + def register_columns(self, table, columns, readonly=[]):
> """Registers interest in the given 'columns' of 'table'. Future calls
> to get_idl_schema() will include 'table':column for each column in
> 'columns'. This function automatically avoids adding duplicate entries
> to the schema.
> + A subset of 'columns' can be specified as 'readonly'. The readonly
> + columns are not replicated but can be fetched on-demand by the user
> + with Row.fetch().
>
> 'table' must be a string.
> 'columns' must be a list of strings.
> + 'readonly' must be a list of strings.
> """
>
> assert type(table) is str
> @@ -1278,6 +1354,7 @@ class SchemaHelper(object):
>
> columns = set(columns) | self._tables.get(table, set())
> self._tables[table] = columns
> + self._readonly[table] = readonly
>
> def register_table(self, table):
> """Registers interest in the given all columns of 'table'. Future calls
> @@ -1307,6 +1384,7 @@ class SchemaHelper(object):
> self._keep_table_columns(schema, table, columns))
>
> schema.tables = schema_tables
> + schema.readonly = self._readonly
> return schema
>
> def _keep_table_columns(self, schema, table_name, columns):
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index d3d2aeb..c7b2582 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -598,3 +598,50 @@ AT_CHECK([grep '"monitor"' stderr | grep -c '"ua"'], [0], [1
> ])
> OVSDB_SERVER_SHUTDOWN
> AT_CLEANUP
> +
> +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS_PY],
> + [AT_SETUP([$1 - Python fetch])
> + AT_SKIP_IF([test $HAVE_PYTHON = no])
> + AT_KEYWORDS([ovsdb server idl positive Python increment fetch $6])
> + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> + [0], [stdout], [ignore])
> + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> + m4_if([$2], [], [],
> + [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
> + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema unix:socket [$3] $4],
> + [0], [stdout], [ignore], [kill `cat pid`])
> + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$7],,, [[| $7]]),
> + [0], [$5], [], [kill `cat pid`])
> + OVSDB_SERVER_SHUTDOWN
> + AT_CLEANUP])
> +
> +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS],
> + [OVSDB_CHECK_IDL_FETCH_COLUMNS_PY($@)])
> +
> +OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated],
> + [['["idltest",
> + {"op": "insert",
> + "table": "simple",
> + "row": {"i": 1,
> + "r": 2.0,
> + "b": true,
> + "s": "mystring",
> + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
> + "ia": ["set", [1, 2, 3]],
> + "ra": ["set", [-0.5]],
> + "ba": ["set", [true]],
> + "sa": ["set", ["abc", "def"]],
> + "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
> + ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
> + {"op": "insert",
> + "table": "simple",
> + "row": {}}]']],
> + [?simple:i,r!],
> + ['fetch 0 r'],
> + [[000: i=0 uuid=<0>
> +000: i=1 uuid=<1>
> +001: commit, status=success
> +002: i=0 r=0 uuid=<0>
> +002: i=1 uuid=<1>
> +003: done
> +]])
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index ab951f9..a6897f3 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -146,44 +146,53 @@ def do_parse_schema(schema_string):
>
> def print_idl(idl, step):
> - simple = idl.tables["simple"].rows
> - l1 = idl.tables["link1"].rows
> - l2 = idl.tables["link2"].rows
> -
> n = 0
> - for row in simple.itervalues():
> - s = ("%03d: i=%s r=%s b=%s s=%s u=%s "
> - "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s"
> - % (step, row.i, row.r, row.b, row.s, row.u,
> - row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid))
> - s = re.sub('""|,|u?\'', "", s)
> - s = re.sub('UUID\(([^)]+)\)', r'\1', s)
> - s = re.sub('False', 'false', s)
> - s = re.sub('True', 'true', s)
> - s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
> - print(s)
> - n += 1
> -
> - for row in l1.itervalues():
> - s = ["%03d: i=%s k=" % (step, row.i)]
> - if row.k:
> - s.append(str(row.k.i))
> - s.append(" ka=[")
> - s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
> - s.append("] l2=")
> - if row.l2:
> - s.append(str(row.l2[0].i))
> - s.append(" uuid=%s" % row.uuid)
> - print(''.join(s))
> - n += 1
> -
> - for row in l2.itervalues():
> - s = ["%03d: i=%s l1=" % (step, row.i)]
> - if row.l1:
> - s.append(str(row.l1[0].i))
> - s.append(" uuid=%s" % row.uuid)
> - print(''.join(s))
> - n += 1
> + if "simple" in idl.tables:
> + simple_columns = ["i", "r", "b", "s", "u", "ia",
> + "ra", "ba", "sa", "ua", "uuid"]
> + simple = idl.tables["simple"].rows
> + for row in simple.itervalues():
> + s = "%03d:" % step
> + for column in simple_columns:
> + if hasattr(row, column) and not (type(getattr(row, column))
> + is ovs.db.data.Atom):
> + s += " %s=%s" % (column, getattr(row, column))
> + s = re.sub('""|,|u?\'', "", s)
> + s = re.sub('UUID\(([^)]+)\)', r'\1', s)
> + s = re.sub('False', 'false', s)
> + s = re.sub('True', 'true', s)
> + s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
> + print(s)
> + n += 1
> +
> + if "link1" in idl.tables:
> + l1 = idl.tables["link1"].rows
> + for row in l1.itervalues():
> + s = ["%03d: i=%s k=" % (step, row.i)]
> + if hasattr(row, "k") and row.k:
> + s.append(str(row.k.i))
> + if hasattr(row, "ka"):
> + s.append(" ka=[")
> + s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
> + s.append("] l2=")
> + if hasattr(row, "l2") and row.l2:
> + s.append(str(row.l2[0].i))
> + if hasattr(row, "uuid"):
> + s.append(" uuid=%s" % row.uuid)
> + print(''.join(s))
> + n += 1
> +
> + if "link2" in idl.tables:
> + l2 = idl.tables["link2"].rows
> + for row in l2.itervalues():
> + s = ["%03d:" % step]
> + s.append(" i=%s l1=" % row.i)
> + if hasattr(row, "l1") and row.l1:
> + s.append(str(row.l1[0].i))
> + if hasattr(row, "uuid"):
> + s.append(" uuid=%s" % row.uuid)
> + print(''.join(s))
> + n += 1
>
> if not n:
> print("%03d: empty" % step)
> @@ -228,6 +237,7 @@ def idltest_find_simple(idl, i):
> def idl_set(idl, commands, step):
> txn = ovs.db.idl.Transaction(idl)
> increment = False
> + fetch_cmds = []
> events = []
> for command in commands.split(','):
> words = command.split()
> @@ -307,6 +317,20 @@ def idl_set(idl, commands, step):
> sys.stderr.write('"verify" command asks for unknown column '
> '"%s"\n' % args[1])
> sys.exit(1)
> + elif name == "fetch":
> + if len(args) != 2:
> + sys.stderr.write('"fetch" command requires 2 argument\n')
> + sys.exit(1)
> +
> + row = idltest_find_simple(idl, int(args[0]))
> + if not row:
> + sys.stderr.write('"fetch" command asks for nonexistent i=%d\n'
> + % int(args[0]))
> + sys.exit(1)
> +
> + column = args[1]
> + row.fetch(column)
> + fetch_cmds.append([row, column])
> elif name == "increment":
> if len(args) != 1:
> sys.stderr.write('"increment" command requires 1 argument\n')
> @@ -366,10 +390,16 @@ def do_idl(schema_file, remote, *commands):
> schema_helper = ovs.db.idl.SchemaHelper(schema_file)
> if commands and commands[0].startswith("?"):
> monitor = {}
> + readonly = {}
> for x in commands[0][1:].split("?"):
> + readonly = []
> table, columns = x.split(":")
> - monitor[table] = columns.split(",")
> - schema_helper.register_columns(table, monitor[table])
> + columns = columns.split(",")
> + for index, column in enumerate(columns):
> + if column[-1] == '!':
> + columns[index] = columns[index][:-1]
> + readonly.append(columns[index])
> + schema_helper.register_columns(table, columns, readonly)
> commands = commands[1:]
> else:
> schema_helper.register_all()
> @@ -499,6 +529,12 @@ idl SCHEMA SERVER [?T1:C1,C2...[?T2:C1,C2,...]...] [TRANSACTION...]
> e.g.:
> ?simple:b?link1:i,k - Monitor column "b" in table "simple",
> and column "i", "k" in table "link1"
> + Readonly columns: Suffixing a "!" after a column indicates that the
> + column is to be registered "readonly".
> + e.g.:
> + ?simple:i,b! - Register interest in column "i" (monitoring) and
> + column "b" (readonly).
> +
>
> The following options are also available:
> -t, --timeout=SECS give up after SECS seconds
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
More information about the dev
mailing list