[ovs-dev] [PATCH v5] Monitor Database table to manage lifecycle of IDL client.
Numan Siddique
nusiddiq at redhat.com
Thu Mar 21 06:55:23 UTC 2019
On Sat, Jan 26, 2019 at 12:41 AM Ted Elhourani <ted.elhourani at nutanix.com>
wrote:
> The Python IDL implementation supports ovsdb cluster connections.
> This patch is a follow up to commit 31e434fc98, it adds the option of
> connecting to the leader (the default) in the Raft-based cluster. It mimics
> the exisiting C IDL support for clusters introduced in commit 1b1d2e6daa.
>
> The _Server database schema is first requested, then a monitor of the
> Database table in the _Server Database. Method __check_server_db verifies
> the eligibility of the server. If the attempt to obtain a monitor of the
> _Server database fails and a cluster id was not provided this
> implementation
> proceeds to request the data monitor. If a cluster id was provided via the
> set_cluster_id method then the connection is aborted and a connection to a
> different node is instead attempted, until a valid cluster node is found.
> Thus, when supplied, cluster id is interpreted as the intention to only
> allow connections to a clustered database. If not supplied, connections to
> standalone nodes, or nodes that do not have the _Server database are
> allowed. change_seqno is not incremented in the case of Database table
> updates.
>
> Signed-off-by: Ted Elhourani <ted.elhourani at nutanix.com>
>
Acked-by: Numan Siddique <nusiddiq at redhat.com>
Numan
---
>
> v4 -> v5
> --------
> * Increase test timeout.
> * Spell out list of files to cat for shell compatibility.
>
> v3 -> v4
> --------
> * export -f is not compatible with FreeBSD, modify tests to avoid shell
> function export.
> * Re-add a line that was removed by mistake.
>
> v2 -> v3
> --------
> * Add 2 tests, treat cluster_id as a string, mv arg till end, pep8 fixes.
>
> v1 -> v2
> --------
> * Modify for backward compatibility with _Server-less ovsdb servers.
>
> python/ovs/db/idl.py | 219
> ++++++++++++++++++++++++++++++++++++++++++++----
> python/ovs/reconnect.py | 3 +
> tests/ovsdb-idl.at | 129 +++++++++++++++++++++-------
> tests/test-ovsdb.py | 67 ++++++++++++++-
> 4 files changed, 372 insertions(+), 46 deletions(-)
>
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 250e897..84af978 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -38,6 +38,8 @@ ROW_DELETE = "delete"
> OVSDB_UPDATE = 0
> OVSDB_UPDATE2 = 1
>
> +CLUSTERED = "clustered"
> +
>
> class Idl(object):
> """Open vSwitch Database Interface Definition Language (OVSDB IDL).
> @@ -92,10 +94,13 @@ class Idl(object):
> """
>
> IDL_S_INITIAL = 0
> - IDL_S_MONITOR_REQUESTED = 1
> - IDL_S_MONITOR_COND_REQUESTED = 2
> + IDL_S_SERVER_SCHEMA_REQUESTED = 1
> + IDL_S_SERVER_MONITOR_REQUESTED = 2
> + IDL_S_DATA_MONITOR_REQUESTED = 3
> + IDL_S_DATA_MONITOR_COND_REQUESTED = 4
>
> - def __init__(self, remote, schema_helper, probe_interval=None):
> + def __init__(self, remote, schema_helper, probe_interval=None,
> + leader_only=True):
> """Creates and returns a connection to the database named
> 'db_name' on
> 'remote', which should be in a form acceptable to
> ovs.jsonrpc.session.open(). The connection will maintain an
> in-memory
> @@ -119,6 +124,9 @@ class Idl(object):
>
> The IDL uses and modifies 'schema' directly.
>
> + If 'leader_only' is set to True (default value) the IDL will only
> + monitor and transact with the leader of the cluster.
> +
> If "probe_interval" is zero it disables the connection keepalive
> feature. If non-zero the value will be forced to at least 1000
> milliseconds. If None it will just use the default value in OVS.
> @@ -137,6 +145,20 @@ class Idl(object):
> self._last_seqno = None
> self.change_seqno = 0
> self.uuid = uuid.uuid1()
> +
> + # Server monitor.
> + self._server_schema_request_id = None
> + self._server_monitor_request_id = None
> + self._db_change_aware_request_id = None
> + self._server_db_name = '_Server'
> + self._server_db_table = 'Database'
> + self.server_tables = None
> + self._server_db = None
> + self.server_monitor_uuid = uuid.uuid1()
> + self.leader_only = leader_only
> + self.cluster_id = None
> + self._min_index = 0
> +
> self.state = self.IDL_S_INITIAL
>
> # Database locking.
> @@ -172,6 +194,12 @@ class Idl(object):
> remotes.append(r)
> return remotes
>
> + def set_cluster_id(self, cluster_id):
> + """Set the id of the cluster that this idl must connect to."""
> + self.cluster_id = cluster_id
> + if self.state != self.IDL_S_INITIAL:
> + self.force_reconnect()
> +
> def index_create(self, table, name):
> """Create a named multi-column index on a table"""
> return self.tables[table].rows.index_create(name)
> @@ -222,7 +250,7 @@ class Idl(object):
> if seqno != self._last_seqno:
> self._last_seqno = seqno
> self.__txn_abort_all()
> - self.__send_monitor_request()
> + self.__send_server_schema_request()
> if self.lock_name:
> self.__send_lock_request()
> break
> @@ -230,6 +258,7 @@ class Idl(object):
> msg = self._session.recv()
> if msg is None:
> break
> +
> if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
> and msg.method == "update2"
> and len(msg.params) == 2):
> @@ -239,7 +268,15 @@ class Idl(object):
> and msg.method == "update"
> and len(msg.params) == 2):
> # Database contents changed.
> - self.__parse_update(msg.params[1], OVSDB_UPDATE)
> + if msg.params[0] == str(self.server_monitor_uuid):
> + self.__parse_update(msg.params[1], OVSDB_UPDATE,
> + tables=self.server_tables)
> + self.change_seqno = initial_change_seqno
> + if not self.__check_server_db():
> + self.force_reconnect()
> + break
> + else:
> + self.__parse_update(msg.params[1], OVSDB_UPDATE)
> elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> and self._monitor_request_id is not None
> and self._monitor_request_id == msg.id):
> @@ -248,10 +285,10 @@ class Idl(object):
> self.change_seqno += 1
> self._monitor_request_id = None
> self.__clear()
> - if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
> + if self.state ==
> self.IDL_S_DATA_MONITOR_COND_REQUESTED:
> self.__parse_update(msg.result, OVSDB_UPDATE2)
> else:
> - assert self.state == self.IDL_S_MONITOR_REQUESTED
> + assert self.state ==
> self.IDL_S_DATA_MONITOR_REQUESTED
> self.__parse_update(msg.result, OVSDB_UPDATE)
>
> except error.Error as e:
> @@ -259,6 +296,56 @@ class Idl(object):
> % (self._session.get_name(), e))
> self.__error()
> elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> + and self._server_schema_request_id is not None
> + and self._server_schema_request_id == msg.id):
> + # Reply to our "get_schema" of _Server request.
> + try:
> + self._server_schema_request_id = None
> + sh = SchemaHelper(None, msg.result)
> + sh.register_table(self._server_db_table)
> + schema = sh.get_idl_schema()
> + self._server_db = schema
> + self.server_tables = schema.tables
> + self.__send_server_monitor_request()
> + except error.Error as e:
> + vlog.err("%s: error receiving server schema: %s"
> + % (self._session.get_name(), e))
> + if self.cluster_id:
> + self.__error()
> + break
> + else:
> + self.change_seqno = initial_change_seqno
> + self.__send_monitor_request()
> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> + and self._server_monitor_request_id is not None
> + and self._server_monitor_request_id == msg.id):
> + # Reply to our "monitor" of _Server request.
> + try:
> + self._server_monitor_request_id = None
> + self.__parse_update(msg.result, OVSDB_UPDATE,
> + tables=self.server_tables)
> + self.change_seqno = initial_change_seqno
> + if self.__check_server_db():
> + self.__send_monitor_request()
> + self.__send_db_change_aware()
> + else:
> + self.force_reconnect()
> + break
> + except error.Error as e:
> + vlog.err("%s: parse error in received schema: %s"
> + % (self._session.get_name(), e))
> + if self.cluster_id:
> + self.__error()
> + break
> + else:
> + self.change_seqno = initial_change_seqno
> + self.__send_monitor_request()
> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> + and self._db_change_aware_request_id is not None
> + and self._db_change_aware_request_id == msg.id):
> + # Reply to us notifying the server of our change awarness.
> + self._db_change_aware_request_id = None
> + elif (msg.type == ovs.jsonrpc.Message.T_REPLY
> and self._lock_request_id is not None
> and self._lock_request_id == msg.id):
> # Reply to our "lock" request.
> @@ -275,10 +362,20 @@ class Idl(object):
> # Reply to our echo request. Ignore it.
> pass
> elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> - self.state == self.IDL_S_MONITOR_COND_REQUESTED and
> + self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
> self._monitor_request_id == msg.id):
> if msg.error == "unknown method":
> self.__send_monitor_request()
> + elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
> + self._server_schema_request_id is not None and
> + self._server_schema_request_id == msg.id):
> + self._server_schema_request_id = None
> + if self.cluster_id:
> + self.force_reconnect()
> + break
> + else:
> + self.change_seqno = initial_change_seqno
> + self.__send_monitor_request()
> elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
> ovs.jsonrpc.Message.T_REPLY)
> and self.__txn_process_reply(msg)):
> @@ -342,6 +439,9 @@ class Idl(object):
> In the meantime, the contents of the IDL will not change."""
> self._session.force_reconnect()
>
> + def session_name(self):
> + return self._session.get_name()
> +
> def set_lock(self, lock_name):
> """If 'lock_name' is not None, configures the IDL to obtain the
> named
> lock from the database server and to avoid modifying the database
> when
> @@ -440,12 +540,19 @@ class Idl(object):
> if not new_has_lock:
> self.is_lock_contended = True
>
> + def __send_db_change_aware(self):
> + msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
> + [True])
> + self._db_change_aware_request_id = msg.id
> + self._session.send(msg)
> +
> def __send_monitor_request(self):
> - if self.state == self.IDL_S_INITIAL:
> - self.state = self.IDL_S_MONITOR_COND_REQUESTED
> + if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
> + self.IDL_S_INITIAL]):
> + self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
> method = "monitor_cond"
> else:
> - self.state = self.IDL_S_MONITOR_REQUESTED
> + self.state = self.IDL_S_DATA_MONITOR_REQUESTED
> method = "monitor"
>
> monitor_requests = {}
> @@ -467,20 +574,50 @@ class Idl(object):
> self._monitor_request_id = msg.id
> self._session.send(msg)
>
> - def __parse_update(self, update, version):
> + def __send_server_schema_request(self):
> + self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
> + msg = ovs.jsonrpc.Message.create_request(
> + "get_schema", [self._server_db_name, str(self.uuid)])
> + self._server_schema_request_id = msg.id
> + self._session.send(msg)
> +
> + def __send_server_monitor_request(self):
> + self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
> + monitor_requests = {}
> + table = self.server_tables[self._server_db_table]
> + columns = [column for column in six.iterkeys(table.columns)]
> + for column in six.itervalues(table.columns):
> + if not hasattr(column, 'alert'):
> + column.alert = True
> + table.rows = custom_index.IndexedRows(table)
> + table.need_table = False
> + table.idl = self
> + monitor_request = {"columns": columns}
> + monitor_requests[table.name] = [monitor_request]
> + msg = ovs.jsonrpc.Message.create_request(
> + 'monitor', [self._server_db.name,
> + str(self.server_monitor_uuid),
> + monitor_requests])
> + self._server_monitor_request_id = msg.id
> + self._session.send(msg)
> +
> + def __parse_update(self, update, version, tables=None):
> try:
> - self.__do_parse_update(update, version)
> + if not tables:
> + self.__do_parse_update(update, version, self.tables)
> + else:
> + self.__do_parse_update(update, version, tables)
> except error.Error as e:
> vlog.err("%s: error parsing update: %s"
> % (self._session.get_name(), e))
>
> - def __do_parse_update(self, table_updates, version):
> + def __do_parse_update(self, table_updates, version, tables):
> if not isinstance(table_updates, dict):
> raise error.Error("<table-updates> is not an object",
> table_updates)
>
> for table_name, table_update in six.iteritems(table_updates):
> - table = self.tables.get(table_name)
> + table = tables.get(table_name)
> if not table:
> raise error.Error('<table-updates> includes unknown '
> 'table "%s"' % table_name)
> @@ -605,6 +742,58 @@ class Idl(object):
> self.notify(op, row, Row.from_json(self, table, uuid,
> old))
> return changed
>
> + def __check_server_db(self):
> + """Returns True if this is a valid server database, False
> otherwise."""
> + session_name = self.session_name()
> +
> + if self._server_db_table not in self.server_tables:
> + vlog.info("%s: server does not have %s table in its %s
> database"
> + % (session_name, self._server_db_table,
> + self._server_db_name))
> + return False
> +
> + rows = self.server_tables[self._server_db_table].rows
> +
> + database = None
> + for row in six.itervalues(rows):
> + if self.cluster_id:
> + if self.cluster_id in \
> + map(lambda x: str(x)[:4], row.cid):
> + database = row
> + break
> + elif row.name == self._db.name:
> + database = row
> + break
> +
> + if not database:
> + vlog.info("%s: server does not have %s database"
> + % (session_name, self._db.name))
> + return False
> +
> + if (database.model == CLUSTERED and
> + self._session.get_num_of_remotes() > 1):
> + if not database.schema:
> + vlog.info('%s: clustered database server has not yet
> joined '
> + 'cluster; trying another server' % session_name)
> + return False
> + if not database.connected:
> + vlog.info('%s: clustered database server is disconnected
> '
> + 'from cluster; trying another server' %
> session_name)
> + return False
> + if (self.leader_only and
> + not database.leader):
> + vlog.info('%s: clustered database server is not cluster '
> + 'leader; trying another server' % session_name)
> + return False
> + if database.index:
> + if database.index[0] < self._min_index:
> + vlog.warn('%s: clustered database server has stale
> data; '
> + 'trying another server' % session_name)
> + return False
> + self._min_index = database.index[0]
> +
> + return True
> +
> def __column_name(self, column):
> if column.type.key.type == ovs.db.types.UuidType:
> return ovs.ovsuuid.to_json(column.type.key.type.default)
> diff --git a/python/ovs/reconnect.py b/python/ovs/reconnect.py
> index 34cc769..afbe445 100644
> --- a/python/ovs/reconnect.py
> +++ b/python/ovs/reconnect.py
> @@ -344,6 +344,9 @@ class Reconnect(object):
> else:
> self.info_level("%s: error listening for connections"
> % self.name)
> + elif self.state == Reconnect.Reconnect:
> + self.info_level("%s: connection closed by client"
> + % self.name)
> elif self.backoff < self.max_backoff:
> if self.passive:
> type_ = "listen"
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index 8981b5e..7c937f7 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -11,7 +11,42 @@ ovsdb_start_idltest () {
> ovsdb-server -vconsole:warn --log-file --detach --no-chdir --pidfile
> --remote=punix:socket ${1:+--remote=$1} db || return $?
> on_exit 'kill `cat ovsdb-server.pid`'
> }
> -])
> +
> +# ovsdb_cluster_start_idltest [REMOTE] [SCHEMA]
> +#
> +# Creates a database using SCHEMA (default: idltest.ovsschema) and
> +# starts a database cluster listening on punix:socket and REMOTE (if
> +# specified).
> +ovsdb_cluster_start_idltest () {
> + local n=$1
> + ovsdb-tool create-cluster s1.db $abs_srcdir/idltest.ovsschema
> unix:s1.raft || return $?
> + cid=`ovsdb-tool db-cid s1.db`
> + schema_name=`ovsdb-tool schema-name $abs_srcdir/idltest.ovsschema`
> + for i in `seq 2 $n`; do
> + ovsdb-tool join-cluster s$i.db $schema_name unix:s$i.raft
> unix:s1.raft || return $?
> + done
> + for i in `seq $n`; do
> + ovsdb-server -vraft -vconsole:warn --detach --no-chdir
> --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb
> ${2:+--remote=$2} s$i.db || return $?
> + done
> + on_exit 'kill `cat s*.pid`'
> +}
> +
> +# ovsdb_cluster_leader [REMOTES] [DATABASE]
> +#
> +# Returns the leader of the DATABASE cluster.
> +ovsdb_cluster_leader () {
> + remotes=$(echo $1 | tr "," "\n")
> + for remote in $remotes; do
> + ovsdb-client dump $remote _Server Database name leader | grep $2 |
> grep -q true
> + if [[ $? == 0 ]]; then
> + port=$(echo $remote | cut -d':' -f 3)
> + log=$(grep --include=s\*.log -rlnw -e "listening on port $port"
> ./)
> + pid=$(echo $log | sed 's/\(.*\.\)log/\1pid/' )
> + echo "${remote}|${pid}"
> + return
> + fi
> + done
> +}])
>
> # OVSDB_CHECK_IDL_C(TITLE, [PRE-IDL-TXN], TRANSACTIONS, OUTPUT,
> [KEYWORDS],
> # [FILTER])
> @@ -1466,40 +1501,44 @@ OVSDB_CHECK_IDL_NOTIFY([simple idl verify notify],
> "where": [["i", "==", 0]]}]' \
> 'reconnect']],
> [[000: empty
> -001:
> {"error":null,"result":[{"uuid":["uuid","<0>"]},{"uuid":["uuid","<1>"]}]}
> -002: event:create, row={i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[]
> ua=[] uuid=<1>}, updates=None
> -002: event:create, row={i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
> -002: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -002: i=1 r=2 b=true s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +000: event:create, row={uuid=<0>}, updates=None
> +000: event:create, row={uuid=<1>}, updates=None
> +001:
> {"error":null,"result":[{"uuid":["uuid","<2>"]},{"uuid":["uuid","<3>"]}]}
> +002: event:create, row={i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[]
> ua=[] uuid=<3>}, updates=None
> +002: event:create, row={i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
> +002: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +002: i=1 r=2 b=true s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 003: {"error":null,"result":[{"count":2}]}
> -004: event:update, row={i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={b=true
> uuid=<0>}
> -004: i=0 r=0 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -004: i=1 r=2 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +004: event:update, row={i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={b=true
> uuid=<2>}
> +004: i=0 r=0 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +004: i=1 r=2 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 005: {"error":null,"result":[{"count":2}]}
> -006: event:update, row={i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[]
> sa=[] ua=[] uuid=<1>}, updates={r=0 uuid=<1>}
> -006: event:update, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates={r=2
> uuid=<0>}
> -006: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -006: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> -007: {"error":null,"result":[{"uuid":["uuid","<6>"]}]}
> -008: event:create, row={i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5]
> ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
> -008: i=-1 r=125 b=false s= u=<2> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[]
> uuid=<6>
> -008: i=0 r=123.5 b=false s= u=<2> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<1>
> -008: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +006: event:update, row={i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[]
> sa=[] ua=[] uuid=<3>}, updates={r=0 uuid=<3>}
> +006: event:update, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates={r=2
> uuid=<2>}
> +006: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +006: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> +007: {"error":null,"result":[{"uuid":["uuid","<8>"]}]}
> +008: event:create, row={i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5]
> ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
> +008: i=-1 r=125 b=false s= u=<4> ia=[1] ra=[1.5] ba=[false] sa=[] ua=[]
> uuid=<8>
> +008: i=0 r=123.5 b=false s= u=<4> ia=[] ra=[] ba=[] sa=[] ua=[] uuid=<3>
> +008: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 009: {"error":null,"result":[{"count":2}]}
> -010: event:update, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1]
> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates={s= uuid=<6>}
> -010: event:update, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[]
> ba=[] sa=[] ua=[] uuid=<1>}, updates={s= uuid=<1>}
> -010: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> -010: i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[] ba=[] sa=[] ua=[]
> uuid=<1>
> -010: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +010: event:update, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1]
> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates={s= uuid=<8>}
> +010: event:update, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[]
> ba=[] sa=[] ua=[] uuid=<3>}, updates={s= uuid=<3>}
> +010: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<8>
> +010: i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[] ba=[] sa=[] ua=[]
> uuid=<3>
> +010: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 011: {"error":null,"result":[{"count":1}]}
> -012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<2> ia=[] ra=[]
> ba=[] sa=[] ua=[] uuid=<1>}, updates=None
> -012: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> -012: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +012: event:delete, row={i=0 r=123.5 b=false s=newstring u=<4> ia=[] ra=[]
> ba=[] sa=[] ua=[] uuid=<3>}, updates=None
> +012: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<8>
> +012: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 013: reconnect
> -014: event:create, row={i=-1 r=125 b=false s=newstring u=<2> ia=[1]
> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<6>}, updates=None
> -014: event:create, row={i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<4> <5>] uuid=<0>}, updates=None
> -014: i=-1 r=125 b=false s=newstring u=<2> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<6>
> -014: i=1 r=123.5 b=false s=mystring u=<3> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<4> <5>] uuid=<0>
> +014: event:create, row={i=-1 r=125 b=false s=newstring u=<4> ia=[1]
> ra=[1.5] ba=[false] sa=[] ua=[] uuid=<8>}, updates=None
> +014: event:create, row={i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3]
> ra=[-0.5] ba=[true] sa=[abc def] ua=[<6> <7>] uuid=<2>}, updates=None
> +014: event:create, row={uuid=<0>}, updates=None
> +014: event:create, row={uuid=<1>}, updates=None
> +014: i=-1 r=125 b=false s=newstring u=<4> ia=[1] ra=[1.5] ba=[false]
> sa=[] ua=[] uuid=<8>
> +014: i=1 r=123.5 b=false s=mystring u=<5> ia=[1 2 3] ra=[-0.5] ba=[true]
> sa=[abc def] ua=[<6> <7>] uuid=<2>
> 015: done
> ]])
>
> @@ -1853,3 +1892,33 @@ m4_define([CHECK_STREAM_OPEN_BLOCK_PY],
>
> CHECK_STREAM_OPEN_BLOCK_PY([Check PY2 Stream open block],
> [$HAVE_PYTHON2], [$PYTHON2])
> CHECK_STREAM_OPEN_BLOCK_PY([Check PY3 Stream open block],
> [$HAVE_PYTHON3], [$PYTHON3])
> +
> +# same as OVSDB_CHECK_IDL but uses Python IDL implementation with tcp
> +# with multiple remotes to assert the idl connects to the leader of the
> Raft cluster
> +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PYN],
> + [AT_SETUP([$1])
> + AT_SKIP_IF([test $7 = no])
> + AT_KEYWORDS([ovsdb server idl Python leader_only with tcp socket])
> + m4_define([LPBK],[127.0.0.1])
> + AT_CHECK([ovsdb_cluster_start_idltest $2 "ptcp:0:"LPBK])
> + PARSE_LISTENING_PORT([s2.log], [TCP_PORT_1])
> + PARSE_LISTENING_PORT([s3.log], [TCP_PORT_2])
> + PARSE_LISTENING_PORT([s1.log], [TCP_PORT_3])
> + remotes=tcp:LPBK:$TCP_PORT_1,tcp:LPBK:$TCP_PORT_2,tcp:LPBK:$TCP_PORT_3
> + pids=$(cat s2.pid s3.pid s1.pid | tr '\n' ',')
> + echo $pids
> + AT_CHECK([$8 $srcdir/test-ovsdb.py -t30 idl-cluster
> $srcdir/idltest.ovsschema $remotes $pids $3],
> + [0], [stdout], [ignore])
> + remote=$(ovsdb_cluster_leader $remotes "idltest")
> + leader=$(echo $remote | cut -d'|' -f 1)
> + AT_CHECK([grep -F -- "${leader}" stdout], [0], [ignore])
> + AT_CLEANUP])
> +
> +m4_define([OVSDB_CHECK_IDL_LEADER_ONLY_PY],
> + [OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python2 (leader only)], [$2],
> [$3], [$4], [$5], [$6],
> + [$HAVE_PYTHON], [$PYTHON])
> + OVSDB_CHECK_IDL_LEADER_ONLY_PYN([$1 - Python3 (leader only)], [$2],
> [$3], [$4], [$5], [$6],
> + [$HAVE_PYTHON3], [$PYTHON3])])
> +
> +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL connects to leader], 3,
> ['remote'])
> +OVSDB_CHECK_IDL_LEADER_ONLY_PY([Check Python IDL reconnects to leader],
> 3, ['remote' '+remotestop' 'remote'])
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index 1d7c023..422321a 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -758,6 +758,70 @@ def do_idl_passive(schema_file, remote, *commands):
> print("%03d: done" % step)
>
>
> +def do_idl_cluster(schema_file, remote, pid, *commands):
> + schema_helper = ovs.db.idl.SchemaHelper(schema_file)
> +
> + if remote.startswith("ssl:"):
> + if len(commands) < 3:
> + sys.stderr.write("SSL connection requires private key, "
> + "certificate for private key, and peer CA "
> + "certificate as arguments\n")
> + sys.exit(1)
> + ovs.stream.Stream.ssl_set_private_key_file(commands[0])
> + ovs.stream.Stream.ssl_set_certificate_file(commands[1])
> + ovs.stream.Stream.ssl_set_ca_cert_file(commands[2])
> + commands = commands[3:]
> +
> + schema_helper.register_all()
> + idl = ovs.db.idl.Idl(remote, schema_helper)
> +
> + step = 0
> + seqno = 0
> + commands = list(commands)
> + for command in commands:
> + if command.startswith("+"):
> + # The previous transaction didn't change anything.
> + command = command[1:]
> + else:
> + # Wait for update.
> + while idl.change_seqno == seqno and not idl.run():
> + poller = ovs.poller.Poller()
> + idl.wait(poller)
> + poller.block()
> + step += 1
> +
> + seqno = idl.change_seqno
> +
> + if command == "reconnect":
> + print("%03d: reconnect" % step)
> + sys.stdout.flush()
> + step += 1
> + idl.force_reconnect()
> + elif command == "remote":
> + print("%03d: %s" % (step, idl.session_name()))
> + sys.stdout.flush()
> + step += 1
> + elif command == "remotestop":
> + r = idl.session_name()
> + remotes = remote.split(',')
> + i = remotes.index(r)
> + pids = pid.split(',')
> + command = None
> + try:
> + command = "kill %s" % pids[i]
> + except ValueError as error:
> + sys.stderr.write("Cannot find pid of remote: %s\n"
> + % os.strerror(error))
> + sys.exit(1)
> + os.popen(command)
> + print("%03d: stop %s" % (step, pids[i]))
> + sys.stdout.flush()
> + step += 1
> +
> + idl.close()
> + print("%03d: done" % step)
> +
> +
> def usage():
> print("""\
> %(program_name)s: test utility for Open vSwitch database Python bindings
> @@ -861,7 +925,8 @@ def main(argv):
> "parse-table": (do_parse_table, (2, 3)),
> "parse-schema": (do_parse_schema, 1),
> "idl": (do_idl, (2,)),
> - "idl_passive": (do_idl_passive, (2,))}
> + "idl_passive": (do_idl_passive, (2,)),
> + "idl-cluster": (do_idl_cluster, (3,))}
>
> command_name = args[0]
> args = args[1:]
> --
> 2.7.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
More information about the dev
mailing list