[ovs-dev] [PATCH monitor_cond V3 09/10] python: move Python idl to work with monitor_cond
Andy Zhou
azhou at ovn.org
Fri Feb 5 11:46:45 UTC 2016
On Wed, Feb 3, 2016 at 5:53 AM, Liran Schour <lirans at il.ibm.com> wrote:
> Python idl works now with "monitor_cond" method. Add test
> for backward compatibility with old "monitor" method.
>
> Signed-off-by: Liran Schour <lirans at il.ibm.com>
>
I glanced through the logic of most of the changes. They look find in
general. I have the same comment as the C IDL implementation: The
con_update seems to support
only one table at a time. May be that's an O.K. restriction in practice?
>
> ---
> v3->v4:
> * Change 3rd parameter of notify to old row, update API documentation
>
> v2->v3:
> * cond_update() receives a single condition
> ---
> python/ovs/db/data.py | 18 ++++-
> python/ovs/db/idl.py | 182
> +++++++++++++++++++++++++++++++++++++++++++-------
> tests/ovsdb-idl.at | 97 +++++++++++++++++++++++++++
> 3 files changed, 271 insertions(+), 26 deletions(-)
>
> diff --git a/python/ovs/db/data.py b/python/ovs/db/data.py
> index 3075ee6..162ab19 100644
> --- a/python/ovs/db/data.py
> +++ b/python/ovs/db/data.py
> @@ -146,7 +146,7 @@ class Atom(object):
> % (self.to_string(), base.enum.to_string()))
> elif base.type in [ovs.db.types.IntegerType,
> ovs.db.types.RealType]:
> if ((base.min is None or self.value >= base.min) and
> - (base.max is None or self.value <= base.max)):
> + (base.max is None or self.value <= base.max)):
> pass
> elif base.min is not None and base.max is not None:
> raise ConstraintViolation(
> @@ -155,7 +155,7 @@ class Atom(object):
> elif base.min is not None:
> raise ConstraintViolation(
> "%s is less than minimum allowed value %.15g"
> - % (self.to_string(), base.min))
> + % (self.to_string(), base.min))
> else:
> raise ConstraintViolation(
> "%s is greater than maximum allowed value %.15g"
> @@ -313,7 +313,7 @@ class Datum(object):
> that this function accepts."""
> is_map = type_.is_map()
> if (is_map or
> - (type(json) == list and len(json) > 0 and json[0] == "set")):
> + (type(json) == list and len(json) > 0 and json[0] ==
> "set")):
> if is_map:
> class_ = "map"
> else:
> @@ -388,6 +388,18 @@ class Datum(object):
> s.append(tail)
> return ''.join(s)
>
> + def diff(self, datum):
> + if self.type.n_max > 1 or len(self.values) == 0:
> + for k, v in six.iteritems(datum.values):
> + if k in self.values and v == self.values[k]:
> + del self.values[k]
> + else:
> + self.values[k] = v
> + else:
> + return datum
> +
> + return self
> +
> def as_list(self):
> if self.type.is_map():
> return [[k.value, v.value] for k, v in
> six.iteritems(self.values)]
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 3187db9..65d408e 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -32,6 +32,9 @@ ROW_CREATE = "create"
> ROW_UPDATE = "update"
> ROW_DELETE = "delete"
>
> +OVSDB_UPDATE = 0
> +OVSDB_UPDATE2 = 1
> +
>
> class Idl(object):
> """Open vSwitch Database Interface Definition Language (OVSDB IDL).
> @@ -85,6 +88,10 @@ class Idl(object):
> currently being constructed, if there is one, or None otherwise.
> """
>
> + IDL_S_INITIAL = 0
> + IDL_S_MONITOR_REQUESTED = 1
> + IDL_S_MONITOR_COND_REQUESTED = 2
> +
> def __init__(self, remote, schema):
> """Creates and returns a connection to the database named
> 'db_name' on
> 'remote', which should be in a form acceptable to
> @@ -115,6 +122,8 @@ class Idl(object):
> self._monitor_request_id = None
> self._last_seqno = None
> self.change_seqno = 0
> + self.uuid = uuid.uuid1()
> + self.state = self.IDL_S_INITIAL
>
> # Database locking.
> self.lock_name = None # Name of lock we need, None if
> none.
> @@ -133,6 +142,7 @@ class Idl(object):
> table.need_table = False
> table.rows = {}
> table.idl = self
> + table.condition = []
>
> def close(self):
> """Closes the connection to the database. The IDL will no longer
> @@ -179,11 +189,15 @@ class Idl(object):
> if msg is None:
> break
> if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> - and msg.method == "update"
> - and len(msg.params) == 2
> - and msg.params[0] is None):
> + and msg.method == "update2"
> + and len(msg.params) == 2):
> + # Database contents changed.
> + self.__parse_update(msg.params[1], OVSDB_UPDATE2)
> + elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> + and msg.method == "update"
> + and len(msg.params) == 2):
> # Database contents changed.
> - self.__parse_update(msg.params[1])
> + self.__parse_update(msg.params[1], OVSDB_UPDATE)
> elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> and self._monitor_request_id is not None
> and self._monitor_request_id == msg.id):
> @@ -192,10 +206,15 @@ class Idl(object):
> self.change_seqno += 1
> self._monitor_request_id = None
> self.__clear()
> - self.__parse_update(msg.result)
> - except error.Error as e:
> + if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
> + self.__parse_update(msg.result, OVSDB_UPDATE2)
> + else:
> + assert self.state == self.IDL_S_MONITOR_REQUESTED
> + self.__parse_update(msg.result, OVSDB_UPDATE)
> +
> + except error.Error, e:
> vlog.err("%s: parse error in received schema: %s"
> - % (self._session.get_name(), e))
> + % (self._session.get_name(), e))
> self.__error()
> elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> and self._lock_request_id is not None
> @@ -213,6 +232,11 @@ class Idl(object):
> elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id ==
> "echo":
> # Reply to our echo request. Ignore it.
> pass
> + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> + self.state == self.IDL_S_MONITOR_COND_REQUESTED and
> + self._monitor_request_id == msg.id):
> + if msg.error == "unknown method":
> + self.__send_monitor_request()
> elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
> ovs.jsonrpc.Message.T_REPLY)
> and self.__txn_process_reply(msg)):
> @@ -227,6 +251,19 @@ class Idl(object):
>
> return initial_change_seqno != self.change_seqno
>
> + def cond_update(self, table_name, cond):
> + """Change conditions for this IDL session. If session is not
> already
> + connected, add condtion to table and submit it on
> send_monitor_request.
> + Otherwise send monitor_cond_update method with the requested
> + changes."""
> + table = self.tables.get(table_name)
> + if not table:
> + raise error.Error('Unknown table "%s"' % table_name)
> + if self._session.is_connected():
> + self.__send_cond_update(table, cond)
> + else:
> + table.condition = cond
> +
> def wait(self, poller):
> """Arranges for poller.block() to wake up when self.run() has
> something
> to do or when activity occurs on a transaction on 'self'."""
> @@ -278,10 +315,18 @@ class Idl(object):
> :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
> :param row: The row as it is after the operation has occured
> :type row: Row
> - :param updates: For updates, a Row object with just the changed
> columns
> + :param updates: For updates, old row
> :type updates: Row
> """
>
> + def __send_cond_update(self, table, cond):
> + monitor_cond_update = {table.name: [{"where": cond}]}
> + old_uuid = str(self.uuid)
> + self.uuid = uuid.uuid1()
> + params = [old_uuid, str(self.uuid), monitor_cond_update]
> + msg = ovs.jsonrpc.Message.create_request("monitor_cond_update",
> params)
> + self._session.send(msg)
> +
> def __clear(self):
> changed = False
>
> @@ -331,36 +376,47 @@ class Idl(object):
>
> def __parse_lock_notify(self, params, new_has_lock):
> if (self.lock_name is not None
> - and type(params) in (list, tuple)
> - and params
> - and params[0] == self.lock_name):
> + and type(params) in (list, tuple)
> + and params
> + and params[0] == self.lock_name):
> self.__update_has_lock(new_has_lock)
> if not new_has_lock:
> self.is_lock_contended = True
>
> def __send_monitor_request(self):
> + if self.state == self.IDL_S_INITIAL:
> + self.state = self.IDL_S_MONITOR_COND_REQUESTED
> + method = "monitor_cond"
> + else:
> + self.state = self.IDL_S_MONITOR_REQUESTED
> + method = "monitor"
> +
> monitor_requests = {}
> for table in six.itervalues(self.tables):
> columns = []
> for column in six.iterkeys(table.columns):
> if ((table.name not in self.readonly) or
> - (table.name in self.readonly) and
> - (column not in self.readonly[table.name])):
> + (table.name in self.readonly) and
> + (column not in self.readonly[table.name])):
> columns.append(column)
> monitor_requests[table.name] = {"columns": columns}
> + if method == "monitor_cond" and table.condition:
> + monitor_requests[table.name]["where"] = table.condition
> + table.condition = None
> +
> msg = ovs.jsonrpc.Message.create_request(
> - "monitor", [self._db.name, None, monitor_requests])
> + method, [self._db.name, str(self.uuid), monitor_requests])
> self._monitor_request_id = msg.id
> self._session.send(msg)
>
> - def __parse_update(self, update):
> + def __parse_update(self, update, version):
> try:
> - self.__do_parse_update(update)
> - except error.Error as e:
> + self.__do_parse_update(update, version)
> + except error.Error, e:
> vlog.err("%s: error parsing update: %s"
> % (self._session.get_name(), e))
>
> - def __do_parse_update(self, table_updates):
> + def __do_parse_update(self, table_updates, version):
> if type(table_updates) != dict:
> raise error.Error("<table-updates> is not an object",
> table_updates)
> @@ -389,6 +445,11 @@ class Idl(object):
> 'is not an object'
> % (table_name, uuid_string))
>
> + if version == OVSDB_UPDATE2:
> + if self.__process_update2(table, uuid, row_update):
> + self.change_seqno += 1
> + continue
> +
> parser = ovs.db.parser.Parser(row_update, "row-update")
> old = parser.get_optional("old", [dict])
> new = parser.get_optional("new", [dict])
> @@ -401,6 +462,45 @@ class Idl(object):
> if self.__process_update(table, uuid, old, new):
> self.change_seqno += 1
>
> + def __process_update2(self, table, uuid, row_update):
> + row = table.rows.get(uuid)
> + changed = False
> + if "delete" in row_update:
> + if row:
> + del table.rows[uuid]
> + self.notify(ROW_DELETE, row)
> + changed = True
> + else:
> + # XXX rate-limit
> + vlog.warn("cannot delete missing row %s from table"
> + "%s" % (uuid, table.name))
> + elif "insert" in row_update or "initial" in row_update:
> + if row:
> + vlog.warn("cannot add existing row %s from table"
> + " %s" % (uuid, table.name))
> + del table.rows[uuid]
> + row = self.__create_row(table, uuid)
> + if "insert" in row_update:
> + row_update = row_update['insert']
> + else:
> + row_update = row_update['initial']
> + self.__add_default(table, row_update)
> + if self.__row_update(table, row, row_update):
> + changed = True
> + self.notify(ROW_CREATE, row)
> + elif "modify" in row_update:
> + if not row:
> + raise error.Error('Modify non-existing row')
> +
> + old = row
> + self.__apply_diff(table, row, row_update['modify'])
> + self.notify(ROW_UPDATE, row, old)
> + changed = True
> + else:
> + raise error.Error('<row-update> unknown operation',
> + row_update)
> + return changed
> +
> def __process_update(self, table, uuid, old, new):
> """Returns True if a column changed, False otherwise."""
> row = table.rows.get(uuid)
> @@ -441,6 +541,42 @@ class Idl(object):
> self.notify(op, row, Row.from_json(self, table, uuid,
> old))
> return changed
>
> + def __column_name(self, column):
> + if column.type.key.type == ovs.db.types.UuidType:
> + return ovs.ovsuuid.to_json(column.type.key.type.default)
> + else:
> + return column.type.key.type.default
> +
> + def __add_default(self, table, row_update):
> + for column in table.columns.itervalues():
> + if column.name not in row_update:
> + if ((table.name not in self.readonly) or
> + (table.name in self.readonly) and
> + (column.name not in self.readonly[table.name])):
> + if column.type.n_min != 0 and not
> column.type.is_map():
> + row_update[column.name] =
> self.__column_name(column)
> +
> + def __apply_diff(self, table, row, row_diff):
> + for column_name, datum_json in row_diff.iteritems():
> + column = table.columns.get(column_name)
> + if not column:
> + # XXX rate-limit
> + vlog.warn("unknown column %s updating table %s"
> + % (column_name, table.name))
> + continue
> +
> + try:
> + datum = ovs.db.data.Datum.from_json(column.type,
> datum_json)
> + except error.Error, e:
> + # XXX rate-limit
> + vlog.warn("error parsing column %s in table %s: %s"
> + % (column_name, table.name, e))
> + continue
> +
> + datum = row._data[column_name].diff(datum)
> + if datum != row._data[column_name]:
> + row._data[column_name] = datum
> +
> def __row_update(self, table, row, row_json):
> changed = False
> for column_name, datum_json in six.iteritems(row_json):
> @@ -593,7 +729,7 @@ class Row(object):
> assert self._idl.txn
>
> if ((self._table.name in self._idl.readonly) and
> - (column_name in self._idl.readonly[self._table.name])):
> + (column_name in self._idl.readonly[self._table.name])):
> vlog.warn("attempting to write to readonly column %s"
> % column_name)
> return
> @@ -829,8 +965,8 @@ class Transaction(object):
> def _substitute_uuids(self, json):
> if type(json) in (list, tuple):
> if (len(json) == 2
> - and json[0] == 'uuid'
> - and ovs.ovsuuid.is_valid_string(json[1])):
> + and json[0] == 'uuid'
> + and ovs.ovsuuid.is_valid_string(json[1])):
> uuid = ovs.ovsuuid.from_string(json[1])
> row = self._txn_rows.get(uuid, None)
> if row and row._data is None:
> @@ -967,14 +1103,14 @@ class Transaction(object):
> for column_name, datum in six.iteritems(row._changes):
> if row._data is not None or not datum.is_default():
> row_json[column_name] = (
> - self._substitute_uuids(datum.to_json()))
> + self._substitute_uuids(datum.to_json()))
>
> # If anything really changed, consider it an
> update.
> # We can't suppress not-really-changed values
> earlier
> # or transactions would become nonatomic (see the
> big
> # comment inside Transaction._write()).
> if (not any_updates and row._data is not None and
> - row._data[column_name] != datum):
> + row._data[column_name] != datum):
> any_updates = True
>
> if row._data is None or row_json:
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index 4baac46..2238653 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -646,6 +646,103 @@ OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially
> populated],
> 003: done
> ]])
>
> +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND_PY],
> + [AT_SETUP([$1 - Python])
> + AT_SKIP_IF([test $HAVE_PYTHON = no])
> + AT_KEYWORDS([ovsdb server idl Python monitor $4])
> + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> + [0], [stdout], [ignore])
> + AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach
> --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket
> --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl
> ovsdb-server/disable-monitor-cond])
> + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl
> $srcdir/idltest.ovsschema unix:socket $2],
> + [0], [stdout], [ignore], [kill `cat pid`])
> + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$5],,, [[|
> $5]]),
> + [0], [$3], [], [kill `cat pid`])
> + OVSDB_SERVER_SHUTDOWN
> + AT_CLEANUP])
> +
> +
> +m4_define([OVSDB_CHECK_IDL_WO_MONITOR_COND],
> + [OVSDB_CHECK_IDL_WO_MONITOR_COND_PY($@)])
> +
> +
> +OVSDB_CHECK_IDL_WO_MONITOR_COND([simple idl disable monitor-cond],
> + [['["idltest",
> + {"op": "insert",
> + "table": "simple",
> + "row": {"i": 1,
> + "r": 2.0,
> + "b": true,
> + "s": "mystring",
> + "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
> + "ia": ["set", [1, 2, 3]],
> + "ra": ["set", [-0.5]],
> + "ba": ["set", [true]],
> + "sa": ["set", ["abc", "def"]],
> + "ua": ["set", [["uuid",
> "69443985-7806-45e2-b35f-574a04e720f9"],
> + ["uuid",
> "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
> + {"op": "insert",
> + "table": "simple",
> + "row": {}}]' \
> + '["idltest",
> + {"op": "update",
> + "table": "simple",
> + "where": [],
> + "row": {"b": true}}]' \
> + '["idltest",
> + {"op": "update",
> + "table": "simple",
> + "where": [],
> + "row": {"r": 123.5}}]' \
> + '["idltest",
> + {"op": "insert",
> + "table": "simple",
> + "row": {"i": -1,
> + "r": 125,
> + "b": false,
> + "s": "",
> + "ia": ["set", [1]],
> + "ra": ["set", [1.5]],
> + "ba": ["set", [false]],
> + "sa": ["set", []],
> + "ua": ["set", []]}}]' \
> + '["idltest",
> + {"op": "update",
> + "table": "simple",
> + "where": [["i", "<", 1]],
> + "row": {"s": "newstring"}}]' \
> + '["idltest",
> + {"op": "delete",
> + "table": "simple",
> + "where": [["i", "==", 0]]}]' \
> + 'reconnect']],
> + [[000: empty
> +001:
> {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
> +002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +003: {"error":null,"result":[{"count":2}]}
> +004: i=0 r=0 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +004: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +005: {"error":null,"result":[{"count":2}]}
> +006: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +006: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
> +008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[]
> uuid=<6>
> +008: i=0 r=123.5 b=true s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> +008: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +009: {"error":null,"result":[{"count":2}]}
> +010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +010: i=0 r=123.5 b=true s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[]
> uuid=<1>
> +010: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +011: {"error":null,"result":[{"count":1}]}
> +012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +012: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +013: reconnect
> +014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> +014: i=1 r=123.5 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +015: done
> +]])
> +
> m4_define([OVSDB_CHECK_IDL_TRACK_C],
> [AT_SETUP([$1 - C])
> AT_KEYWORDS([ovsdb server idl tracking positive $5])
> --
> 2.1.4
>
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
More information about the dev
mailing list