[ovs-dev] [PATCH V4] Add Passive TCP connection to IDL.
Russell Bryant
russell at ovn.org
Tue Jan 26 18:53:00 UTC 2016
On 01/25/2016 04:09 AM, ofer.benyacov at gmail.com wrote:
> From: Ofer Ben-Yacov <ofer.benyacov at gmail.com>
>
> Currently the IDL does not support passive TCP connection,
> i.e. when the OVSDB connects to its manager.
>
> This patch enables IDL to use an already-open session
> (the one which was previously used for retrieving the db schema).
> In addition, it enables IDL to go back to "listen mode" in case the connection
> is lost.
>
> LIMITATIONS:
> ----------------------
>
> This patch enables a **SINGLE** TCP connection from an OVSDB server to an
> agent that uses IDL with {IP,PORT} pair. Therefore, the agent will support
> only **ONE** OVSDB server using {IP,PORT} pair.
>
> Future development may add multi-session server capability that will allow
> an agent to use single {IP,PORT} pair to connect to multiple OVSDB servers.
>
>
> CAVEAT:
> --------------
>
> When a database first connects to the agent, the agent gets the schema and
> data and builds its tables. If the session disconnects, the agent goes back
> to "listen mode" and accepts **ANY** TCP connection, which means that if
> another database will try to connect to the agent using the same {IP,PORT}
> pair, it will be connected to the IDL that has the schema and data from
> the first database.
>
> A future patch can resolve this problem.
>
> USAGE:
> -------------
>
> To use IDL in passive mode, the following example code can be use:
>
> (snippet)
>
> from ovs.jsonrpc import Session
> ...
>
> from neutron.agent.ovsdb.native import idlutils
>
> ...
>
> session = Session.open('ptcp:192.168.10.10:6640')
>
> # first call to session.run creates the PassiveStream object and second one
> # accept incoming connection
> session.run()
> session.run()
>
> # this static method is similar to the original neutron method but the
> # rpc.close() command that would result closing the socket.
> helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
> 'hardware_vtep')
> helper.register_all()
> self.idl = idl.Idl(self.connection, helper, session)
> idlutils.wait_for_change(self.idl, self.timeout)
>
> self.poller = poller.Poller()
> self.thread = threading.Thread(target=self.run)
> self.thread.setDaemon(True)
> self.thread.start()
It's a little odd to show example usage that depends on neutron code. I
actually don't even see get_schema_helper_from_stream_no_close() in
idlutils right now. Is that a pending patch?
Maybe this usage information could be incorporated into a docstring
somewhere in idl.py? That would make it more useful to future users, as
they likely won't find the details in the commit message.
>
>
> TESTING:
> ---------------
> Added unit test for passive mode. See ovsdb-idl.at file.
>
>
> Signed-off-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
>
> Tested-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
>
> Requested-by: Ben Pfaff <blp at nicira.com>,
> "D M, Vikas" <vikas.d-m at hpe.com>,
> "Kamat, Maruti Haridas" <maruti.kamat at hpe.com>,
> "Sukhdev Kapur" <sukhdev at arista.com>,
> "Migliaccio, Armando" <armando.migliaccio at hpe.com>
Each email address should have it's own "Requested-by" prefix.
>
> ---
> python/ovs/db/idl.py | 18 +++++++++++++++---
> python/ovs/jsonrpc.py | 18 ++++++++++--------
> python/ovs/stream.py | 32 ++++++++++++++++++++++----------
> tests/ovsdb-idl.at | 31 +++++++++++++++++++++++++++++++
> tests/test-ovsdb.py | 49 ++++++++++++++++++++++++++++++++-----------------
> 5 files changed, 110 insertions(+), 38 deletions(-)
>
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 3187db9..8f55d0d 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -85,7 +85,7 @@ class Idl(object):
> currently being constructed, if there is one, or None otherwise.
> """
>
> - def __init__(self, remote, schema):
> + def __init__(self, remote, schema, session = None):
You need this patch to get the full set of flake8 errors:
https://patchwork.ozlabs.org/patch/571788/
with that, I get the following (which includes the need to remove spaces
around '=' above.
python/ovs/db/idl.py:88:47: E251 unexpected spaces around keyword /
parameter equals
python/ovs/db/idl.py:88:49: E251 unexpected spaces around keyword /
parameter equals
> """Creates and returns a connection to the database named 'db_name' on
> 'remote', which should be in a form acceptable to
> ovs.jsonrpc.session.open(). The connection will maintain an in-memory
> @@ -103,7 +103,16 @@ class Idl(object):
> As a convenience to users, 'schema' may also be an instance of the
> SchemaHelper class.
>
> - The IDL uses and modifies 'schema' directly."""
> + The IDL uses and modifies 'schema' directly.
> +
> + In passive mode ( where the OVSDB connects to its manager ),
> + we first need to wait for the OVSDB to connect and then
saying "the OVSDB server" would read a little clearer to me than just
"the OVSDB".
> + pass the 'session' object (while the it is still open ) and
> + the schema we retrieved from the open session to the IDL to use it.
> +
> + If in active mode, do not pass 'session' and it will be created
> + by IDL by using 'remote'.
> + """
>
> assert isinstance(schema, SchemaHelper)
> schema = schema.get_idl_schema()
> @@ -111,7 +120,10 @@ class Idl(object):
> self.tables = schema.tables
> self.readonly = schema.readonly
> self._db = schema
> - self._session = ovs.jsonrpc.Session.open(remote)
> + if session:
> + self._session = session
> + else:
> + self._session = ovs.jsonrpc.Session.open(remote)
> self._monitor_request_id = None
> self._last_seqno = None
> self.change_seqno = 0
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index 99aa27c..de1125b 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -418,23 +418,25 @@ class Session(object):
> self.__disconnect()
>
> name = self.reconnect.get_name()
> - if not self.reconnect.is_passive():
> - error, self.stream = ovs.stream.Stream.open(name)
> + if self.reconnect.is_passive():
> + if self.pstream is not None:
> + self.pstream.close()
> + error, self.pstream = ovs.stream.PassiveStream.open(name)
> if not error:
> - self.reconnect.connecting(ovs.timeval.msec())
> + self.reconnect.listening(ovs.timeval.msec())
> else:
> self.reconnect.connect_failed(ovs.timeval.msec(), error)
> - elif self.pstream is not None:
> - error, self.pstream = ovs.stream.PassiveStream.open(name)
> + else:
> + error, self.stream = ovs.stream.Stream.open(name)
> if not error:
> - self.reconnect.listening(ovs.timeval.msec())
> + self.reconnect.connecting(ovs.timeval.msec())
> else:
> self.reconnect.connect_failed(ovs.timeval.msec(), error)
>
> self.seqno += 1
>
> def run(self):
> - if self.pstream is not None:
> + if self.pstream is not None and self.stream is None:
> error, stream = self.pstream.accept()
> if error == 0:
> if self.rpc or self.stream:
> @@ -444,11 +446,11 @@ class Session(object):
> self.__disconnect()
> self.reconnect.connected(ovs.timeval.msec())
> self.rpc = Connection(stream)
> + self.stream = stream
> elif error != errno.EAGAIN:
> self.reconnect.listen_error(ovs.timeval.msec(), error)
> self.pstream.close()
> self.pstream = None
> -
Unrelated formatting change here.
> if self.rpc:
> backlog = self.rpc.get_backlog()
> self.rpc.run()
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index a555a76..46f6a8d 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -278,26 +278,35 @@ class PassiveStream(object):
> def open(name):
> """Attempts to start listening for remote stream connections. 'name'
> is a connection name in the form "TYPE:ARGS", where TYPE is an passive
> - stream class's name and ARGS are stream class-specific. Currently the
> - only supported TYPE is "punix".
> + stream class's name and ARGS are stream class-specific.
How about updating this to explicitly say that it's now "punix" and "ptcp" ?
>
> Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
> new PassiveStream, on failure 'error' is a positive errno value and
> 'pstream' is None."""
> if not PassiveStream.is_valid_name(name):
> return errno.EAFNOSUPPORT, None
> -
> - bind_path = name[6:]
> + bind_path = None
> if name.startswith("punix:"):
> + bind_path = name[6:]
> bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> - error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> - True, bind_path, None)
> - if error:
> - return error, None
> + error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> + True, bind_path,
> + None)
> + if error:
> + return error, None
> +
> + elif name.startswith("ptcp:"):
> + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> + remote = name.split(':')
> + sock.bind((remote[1], int(remote[2])))
> +
> + else:
> + raise Exception('Unknown connection string')
>
> try:
> sock.listen(10)
> - except socket.error as e:
> + except socket.error, e:
This change is reverting a Python 3 compatibility fix. I'm sure it was
just an accident while resolving conflicts in a rebase, but it should be
restored to "except socket.error as e".
If you install flake8 as well as the hacking flake8 plugin, you will get
an error at build-time about this.
python/ovs/stream.py:309:9: H231 Python 3.x incompatible 'except x,y:'
construct
> vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
> sock.close()
> return e.error, None
> @@ -324,7 +333,10 @@ class PassiveStream(object):
> try:
> sock, addr = self.socket.accept()
> ovs.socket_util.set_nonblocking(sock)
> - return 0, Stream(sock, "unix:%s" % addr, 0)
> + if (sock.family == socket.AF_UNIX):
> + return 0, Stream(sock, "unix:%s" % addr, 0)
> + return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + str(addr[1]),
This is a minor style thing, but I'd prefer:
'ptcp:%s:%s' % (addr[0], addr[1])
> + 0)
> except socket.error as e:
> error = ovs.socket_util.get_exception_errno(e)
> if error != errno.EAGAIN:
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index ebf82a5..813812e 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
> OVSDB_CHECK_IDL_TCP_PY($@)
> OVSDB_CHECK_IDL_TCP6_PY($@)])
>
> +
> +# This test uses the Python IDL implementation with passive tcp
> +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
> + [AT_SETUP([$1 - Python ptcp])
> + AT_SKIP_IF([test $HAVE_PYTHON = no])
> + AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
> + AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> + [0], [stdout], [ignore])
> + # find free TCP port
> + AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> + PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
> + AT_CHECK([kill `cat pid`])
> +
> + # start OVSDB server in passive mode
> + AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> + AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
> + [0], [stdout], [ignore], [kill `cat pid`])
> + AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
> + [0], [$4], [], [kill `cat pid`])
> + AT_CLEANUP
> + ])
> +
> +
> +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty],
> + [],
> + [['["idltest",{"op":"select","table":"link1","where":[]}]']],
> + [[000: empty
> +001: {"error":null,"result":[{"rows":[]}]}
> +002: done
> +]])
> +
> OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
> [],
> [],
Several other tests in here test with both IPv4 and IPv6.
OVSDB_CHECK_IDL_TCP_PY vs OVSDB_CHECK_IDL_TCP6_PY.
Can you update this to test with both IPv4 and IPv6? I actually don't
think IPv6 is working currently.
It would also be great if you could come up with a test that ran against
both the C and Python implementations, like all the tests that call
OVSDB_CHECK_IDL. That may require some work in the C code though, and
if you're not comfortable with that, don't worry about it. You could
leave it as a TODO in here for now.
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index 4690722..ac28d76 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -406,17 +406,31 @@ def do_idl(schema_file, remote, *commands):
> commands = commands[1:]
> else:
> schema_helper.register_all()
> - idl = ovs.db.idl.Idl(remote, schema_helper)
>
> - if commands:
> - error, stream = ovs.stream.Stream.open_block(
> - ovs.stream.Stream.open(remote))
> - if error:
> - sys.stderr.write("failed to connect to \"%s\"" % remote)
> - sys.exit(1)
> - rpc = ovs.jsonrpc.Connection(stream)
> + if remote.startswith('ptcp'):
> + passive = True
> + else:
> + passive = False
minor simplification:
passive = remove.startswith('ptcp:')
> +
> + if passive:
> + session = ovs.jsonrpc.Session.open(remote)
> + session.run()
> + session.run()
A comment would be helpful in the code here to explain why you're
calling session.run() twice here. Otherwise it looks like an accident.
> +
> + rpc = session.rpc
> + idl = ovs.db.idl.Idl(remote, schema_helper, session)
> else:
> - rpc = None
> + idl = ovs.db.idl.Idl(remote, schema_helper)
> +
> + if commands:
> + error, stream = ovs.stream.Stream.open_block(
> + ovs.stream.Stream.open(remote))
> + if error:
> + sys.stderr.write("failed to connect to \"%s\"" % remote)
> + sys.exit(1)
> + rpc = ovs.jsonrpc.Connection(stream)
> + else:
> + rpc = None
>
> symtab = {}
> seqno = 0
> @@ -474,14 +488,15 @@ def do_idl(schema_file, remote, *commands):
> sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
> sys.stdout.flush()
>
> - if rpc:
> - rpc.close()
> - while idl.change_seqno == seqno and not idl.run():
> - poller = ovs.poller.Poller()
> - idl.wait(poller)
> - poller.block()
> - print_idl(idl, step)
> - step += 1
> + if not passive:
> + if rpc:
> + rpc.close()
> + while idl.change_seqno == seqno and not idl.run():
> + poller = ovs.poller.Poller()
> + idl.wait(poller)
> + poller.block()
> + print_idl(idl, step)
> + step += 1
> idl.close()
> print("%03d: done" % step)
>
>
--
Russell Bryant
More information about the dev
mailing list