[ovs-dev] [PATCH v2] ovsdb-idl: Support for readonly columns that are fetched on-demand

Ben Pfaff blp at ovn.org
Mon Nov 23 16:39:25 UTC 2015


I applied this to master.  Thank you!

On Tue, Nov 10, 2015 at 05:21:04PM +0000, Ansari, Shad wrote:
> (resubmitting this patch for comments/review)
> 
> There is currently no mechanism in IDL to fetch specific column values
> on-demand without having to register them for monitoring. In the case
> where the column represent a frequently changing entity (e.g. counter),
> and the reads are relatively infrequent (e.g. CLI client), there is a
> significant overhead in replication.
> 
> This patch adds support in the Python IDL to register a subset of the
> columns of a table as "readonly". Readonly columns are not replicated.
> Users may "fetch" the readonly columns of a row on-demand. Once fetched,
> the columns are not updated until the next fetch by the user. Writes by
> the user to readonly columns does not change the value (both locally or
> on the server).
> 
> The two main user visible changes in this patch are:
>   - The SchemaHelper.register_columns() method now takes an optionaly
>     argument to specify the subset of readonly column(s)
>   - A new Row.fetch(columns) method to fetch values of readonly columns(s)
> 
> Usage:
> ------
> 
>     # Schema file includes all columns, including readonly
>     schema_helper = ovs.db.idl.SchemaHelper(schema_file)
> 
>     # Register interest in columns with 'r' and 's' as readonly
>     schema_helper.register_columns("simple", [i, r, s], [r, s])
> 
>     # Create Idl and jsonrpc, and wait for update, as usual
>     ...
> 
>     # Fetch value of column 'r' for a specific row
>     row.fetch('r')
>     txn.commit_block()
> 
>     print row.r
>     print getattr(row, 'r')
> 
>     Signed-off-by: Shad Ansari <shad.ansari at hp.com<mailto:shad.ansari at hp.com>>
> ---
> python/ovs/db/idl.py |  86 ++++++++++++++++++++++++++++++++++++--
> tests/ovsdb-idl.at   |  47 +++++++++++++++++++++
> tests/test-ovsdb.py  | 114 +++++++++++++++++++++++++++++++++------------------
> 3 files changed, 204 insertions(+), 43 deletions(-)
> 
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index f074dbf..c8990c7 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -107,6 +107,7 @@ class Idl(object):
>          schema = schema.get_idl_schema()
> 
>          self.tables = schema.tables
> +        self.readonly = schema.readonly
>          self._db = schema
>          self._session = ovs.jsonrpc.Session.open(remote)
>          self._monitor_request_id = None
> @@ -338,7 +339,13 @@ class Idl(object):
>      def __send_monitor_request(self):
>          monitor_requests = {}
>          for table in self.tables.itervalues():
> -            monitor_requests[table.name] = {"columns": table.columns.keys()}
> +            columns = []
> +            for column in table.columns.keys():
> +                if ((table.name not in self.readonly) or
> +                    (table.name in self.readonly) and
> +                    (column not in self.readonly[table.name])):
> +                    columns.append(column)
> +            monitor_requests[table.name] = {"columns": columns}
>          msg = ovs.jsonrpc.Message.create_request(
>              "monitor", [self._db.name, None, monitor_requests])
>          self._monitor_request_id = msg.id
> @@ -571,7 +578,11 @@ class Row(object):
>              if self._data is None:
>                  raise AttributeError("%s instance has no attribute '%s'" %
>                                       (self.__class__.__name__, column_name))
> -            datum = self._data[column_name]
> +            if column_name in self._data:
> +                datum = self._data[column_name]
> +            else:
> +                raise AttributeError("%s instance has no attribute '%s'" %
> +                                     (self.__class__.__name__, column_name))
> 
>          return datum.to_python(_uuid_to_row)
> 
> @@ -579,6 +590,11 @@ class Row(object):
>          assert self._changes is not None
>          assert self._idl.txn
> 
> +        if ((self._table.name in self._idl.readonly) and
> +            (column_name in self._idl.readonly[self._table.name])):
> +            vlog.warn("attempting to write to readonly column %s" % column_name)
> +            return
> +
>          column = self._table.columns[column_name]
>          try:
>              datum = ovs.db.data.Datum.from_python(column.type, value,
> @@ -655,6 +671,9 @@ class Row(object):
>          self.__dict__["_changes"] = None
>          del self._table.rows[self.uuid]
> 
> +    def fetch(self, column_name):
> +        self._idl.txn._fetch(self, column_name)
> +
>      def increment(self, column_name):
>          """Causes the transaction, when committed, to increment the value of
>          'column_name' within this row by 1.  'column_name' must have an integer
> @@ -777,10 +796,12 @@ class Transaction(object):
>          self._inc_row = None
>          self._inc_column = None
> 
> +        self._fetch_requests = []
> +
>          self._inserted_rows = {}  # Map from UUID to _InsertedRow
> 
>      def add_comment(self, comment):
> -        """Appens 'comment' to the comments that will be passed to the OVSDB
> +        """Appends 'comment' to the comments that will be passed to the OVSDB
>          server when this transaction is committed.  (The comment will be
>          committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
>          relatively human-readable form.)"""
> @@ -947,6 +968,16 @@ class Transaction(object):
>                  if row._data is None or row_json:
>                      operations.append(op)
> 
> +        if self._fetch_requests:
> +            for fetch in self._fetch_requests:
> +                fetch["index"] = len(operations) - 1
> +                operations.append({"op": "select",
> +                                   "table": fetch["row"]._table.name,
> +                                   "where": self._substitute_uuids(
> +                                       _where_uuid_equals(fetch["row"].uuid)),
> +                                   "columns": [fetch["column_name"]]})
> +            any_updates = True
> +
>          # Add increment.
>          if self._inc_row and any_updates:
>              self._inc_index = len(operations) - 1
> @@ -1057,6 +1088,9 @@ class Transaction(object):
>          self._inc_row = row
>          self._inc_column = column
> 
> +    def _fetch(self, row, column_name):
> +        self._fetch_requests.append({"row":row, "column_name":column_name})
> +
>      def _write(self, row, column, datum):
>          assert row._changes is not None
> 
> @@ -1139,6 +1173,11 @@ class Transaction(object):
>              if not soft_errors and not hard_errors and not lock_errors:
>                  if self._inc_row and not self.__process_inc_reply(ops):
>                      hard_errors = True
> +                if self._fetch_requests:
> +                    if self.__process_fetch_reply(ops):
> +                        self.idl.change_seqno += 1
> +                    else:
> +                        hard_errors = True
> 
>                  for insert in self._inserted_rows.itervalues():
>                      if not self.__process_insert_reply(insert, ops):
> @@ -1166,6 +1205,38 @@ class Transaction(object):
>          else:
>              return True
> 
> +    def __process_fetch_reply(self, ops):
> +        update = False
> +        for fetch_request in self._fetch_requests:
> +            row = fetch_request["row"]
> +            column_name = fetch_request["column_name"]
> +            index = fetch_request["index"]
> +            table = row._table
> +
> +            select = ops[index]
> +            fetched_rows = select.get("rows")
> +            if not Transaction.__check_json_type(fetched_rows, (list, tuple),
> +                                                 '"select" reply "rows"'):
> +                return False
> +            if len(fetched_rows) != 1:
> +                # XXX rate-limit
> +                vlog.warn('"select" reply "rows" has %d elements '
> +                          'instead of 1' % len(rows))
> +                continue
> +            fetched_row = fetched_rows[0]
> +            if not Transaction.__check_json_type(fetched_row, (dict,),
> +                                                 '"select" reply row'):
> +                continue
> +
> +            column = table.columns.get(column_name)
> +            datum_json = fetched_row.get(column_name)
> +            datum = ovs.db.data.Datum.from_json(column.type, datum_json)
> +
> +            row._data[column_name] = datum
> +            update = True
> +
> +        return update
> +
>      def __process_inc_reply(self, ops):
>          if self._inc_index + 2 > len(ops):
>              # XXX rate-limit
> @@ -1261,16 +1332,21 @@ class SchemaHelper(object):
> 
>          self.schema_json = schema_json
>          self._tables = {}
> +        self._readonly = {}
>          self._all = False
> 
> -    def register_columns(self, table, columns):
> +    def register_columns(self, table, columns, readonly=[]):
>          """Registers interest in the given 'columns' of 'table'.  Future calls
>          to get_idl_schema() will include 'table':column for each column in
>          'columns'. This function automatically avoids adding duplicate entries
>          to the schema.
> +        A subset of 'columns' can be specified as 'readonly'. The readonly
> +        columns are not replicated but can be fetched on-demand by the user
> +        with Row.fetch().
> 
>          'table' must be a string.
>          'columns' must be a list of strings.
> +        'readonly' must be a list of strings.
>          """
> 
>          assert type(table) is str
> @@ -1278,6 +1354,7 @@ class SchemaHelper(object):
> 
>          columns = set(columns) | self._tables.get(table, set())
>          self._tables[table] = columns
> +        self._readonly[table] = readonly
> 
>      def register_table(self, table):
>          """Registers interest in the given all columns of 'table'. Future calls
> @@ -1307,6 +1384,7 @@ class SchemaHelper(object):
>                      self._keep_table_columns(schema, table, columns))
> 
>              schema.tables = schema_tables
> +        schema.readonly = self._readonly
>          return schema
> 
>      def _keep_table_columns(self, schema, table_name, columns):
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index d3d2aeb..c7b2582 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -598,3 +598,50 @@ AT_CHECK([grep '"monitor"' stderr | grep -c '"ua"'], [0], [1
> ])
> OVSDB_SERVER_SHUTDOWN
> AT_CLEANUP
> +
> +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS_PY],
> +  [AT_SETUP([$1 - Python fetch])
> +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> +   AT_KEYWORDS([ovsdb server idl positive Python increment fetch $6])
> +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> +                  [0], [stdout], [ignore])
> +   AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   m4_if([$2], [], [],
> +     [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
> +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py  -t10 idl $srcdir/idltest.ovsschema unix:socket [$3] $4],
> +            [0], [stdout], [ignore], [kill `cat pid`])
> +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$7],,, [[| $7]]),
> +            [0], [$5], [], [kill `cat pid`])
> +   OVSDB_SERVER_SHUTDOWN
> +   AT_CLEANUP])
> +
> +m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS],
> +   [OVSDB_CHECK_IDL_FETCH_COLUMNS_PY($@)])
> +
> +OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated],
> +  [['["idltest",
> +      {"op": "insert",
> +       "table": "simple",
> +       "row": {"i": 1,
> +               "r": 2.0,
> +               "b": true,
> +               "s": "mystring",
> +               "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
> +               "ia": ["set", [1, 2, 3]],
> +               "ra": ["set", [-0.5]],
> +               "ba": ["set", [true]],
> +               "sa": ["set", ["abc", "def"]],
> +               "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
> +                              ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
> +      {"op": "insert",
> +       "table": "simple",
> +       "row": {}}]']],
> +  [?simple:i,r!],
> +  ['fetch 0 r'],
> +  [[000: i=0 uuid=<0>
> +000: i=1 uuid=<1>
> +001: commit, status=success
> +002: i=0 r=0 uuid=<0>
> +002: i=1 uuid=<1>
> +003: done
> +]])
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index ab951f9..a6897f3 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -146,44 +146,53 @@ def do_parse_schema(schema_string):
> 
>  def print_idl(idl, step):
> -    simple = idl.tables["simple"].rows
> -    l1 = idl.tables["link1"].rows
> -    l2 = idl.tables["link2"].rows
> -
>      n = 0
> -    for row in simple.itervalues():
> -        s = ("%03d: i=%s r=%s b=%s s=%s u=%s "
> -             "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s"
> -             % (step, row.i, row.r, row.b, row.s, row.u,
> -                row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid))
> -        s = re.sub('""|,|u?\'', "", s)
> -        s = re.sub('UUID\(([^)]+)\)', r'\1', s)
> -        s = re.sub('False', 'false', s)
> -        s = re.sub('True', 'true', s)
> -        s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
> -        print(s)
> -        n += 1
> -
> -    for row in l1.itervalues():
> -        s = ["%03d: i=%s k=" % (step, row.i)]
> -        if row.k:
> -            s.append(str(row.k.i))
> -        s.append(" ka=[")
> -        s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
> -        s.append("] l2=")
> -        if row.l2:
> -            s.append(str(row.l2[0].i))
> -        s.append(" uuid=%s" % row.uuid)
> -        print(''.join(s))
> -        n += 1
> -
> -    for row in l2.itervalues():
> -        s = ["%03d: i=%s l1=" % (step, row.i)]
> -        if row.l1:
> -            s.append(str(row.l1[0].i))
> -        s.append(" uuid=%s" % row.uuid)
> -        print(''.join(s))
> -        n += 1
> +    if "simple" in idl.tables:
> +        simple_columns = ["i", "r", "b", "s", "u", "ia",
> +                          "ra", "ba", "sa", "ua", "uuid"]
> +        simple = idl.tables["simple"].rows
> +        for row in simple.itervalues():
> +            s = "%03d:" % step
> +            for column in simple_columns:
> +                if hasattr(row, column) and not (type(getattr(row, column))
> +                                                 is ovs.db.data.Atom):
> +                    s += " %s=%s" % (column, getattr(row, column))
> +            s = re.sub('""|,|u?\'', "", s)
> +            s = re.sub('UUID\(([^)]+)\)', r'\1', s)
> +            s = re.sub('False', 'false', s)
> +            s = re.sub('True', 'true', s)
> +            s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
> +            print(s)
> +            n += 1
> +
> +    if "link1" in idl.tables:
> +        l1 = idl.tables["link1"].rows
> +        for row in l1.itervalues():
> +            s = ["%03d: i=%s k=" % (step, row.i)]
> +            if hasattr(row, "k") and row.k:
> +                s.append(str(row.k.i))
> +            if hasattr(row, "ka"):
> +                s.append(" ka=[")
> +                s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
> +                s.append("] l2=")
> +            if hasattr(row, "l2") and row.l2:
> +                s.append(str(row.l2[0].i))
> +            if hasattr(row, "uuid"):
> +                s.append(" uuid=%s" % row.uuid)
> +            print(''.join(s))
> +            n += 1
> +
> +    if "link2" in idl.tables:
> +        l2 = idl.tables["link2"].rows
> +        for row in l2.itervalues():
> +            s = ["%03d:" % step]
> +            s.append(" i=%s l1=" % row.i)
> +            if hasattr(row, "l1") and row.l1:
> +                s.append(str(row.l1[0].i))
> +            if hasattr(row, "uuid"):
> +                s.append(" uuid=%s" % row.uuid)
> +            print(''.join(s))
> +            n += 1
> 
>      if not n:
>          print("%03d: empty" % step)
> @@ -228,6 +237,7 @@ def idltest_find_simple(idl, i):
> def idl_set(idl, commands, step):
>      txn = ovs.db.idl.Transaction(idl)
>      increment = False
> +    fetch_cmds = []
>      events = []
>      for command in commands.split(','):
>          words = command.split()
> @@ -307,6 +317,20 @@ def idl_set(idl, commands, step):
>                  sys.stderr.write('"verify" command asks for unknown column '
>                                   '"%s"\n' % args[1])
>                  sys.exit(1)
> +        elif name == "fetch":
> +            if len(args) != 2:
> +                sys.stderr.write('"fetch" command requires 2 argument\n')
> +                sys.exit(1)
> +
> +            row = idltest_find_simple(idl, int(args[0]))
> +            if not row:
> +                sys.stderr.write('"fetch" command asks for nonexistent i=%d\n'
> +                                 % int(args[0]))
> +                sys.exit(1)
> +
> +            column = args[1]
> +            row.fetch(column)
> +            fetch_cmds.append([row, column])
>          elif name == "increment":
>              if len(args) != 1:
>                  sys.stderr.write('"increment" command requires 1 argument\n')
> @@ -366,10 +390,16 @@ def do_idl(schema_file, remote, *commands):
>      schema_helper = ovs.db.idl.SchemaHelper(schema_file)
>      if commands and commands[0].startswith("?"):
>          monitor = {}
> +        readonly = {}
>          for x in commands[0][1:].split("?"):
> +            readonly = []
>              table, columns = x.split(":")
> -            monitor[table] = columns.split(",")
> -            schema_helper.register_columns(table, monitor[table])
> +            columns = columns.split(",")
> +            for index, column in enumerate(columns):
> +                if column[-1] == '!':
> +                    columns[index] = columns[index][:-1]
> +                    readonly.append(columns[index])
> +            schema_helper.register_columns(table, columns, readonly)
>          commands = commands[1:]
>      else:
>          schema_helper.register_all()
> @@ -499,6 +529,12 @@ idl SCHEMA SERVER [?T1:C1,C2...[?T2:C1,C2,...]...] [TRANSACTION...]
>    e.g.:
>        ?simple:b?link1:i,k - Monitor column "b" in table "simple",
>                              and column "i", "k" in table "link1"
> +  Readonly columns: Suffixing a "!" after a column indicates that the
> +  column is to be registered "readonly".
> +  e.g.:
> +      ?simple:i,b!  - Register interest in column "i" (monitoring) and
> +                      column "b" (readonly).
> +
> 
> The following options are also available:
>    -t, --timeout=SECS          give up after SECS seconds
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list