[ovs-dev] [PATCH V4] Add Passive TCP connection to IDL.

Russell Bryant russell at ovn.org
Tue Jan 26 18:53:00 UTC 2016


On 01/25/2016 04:09 AM, ofer.benyacov at gmail.com wrote:
> From: Ofer Ben-Yacov <ofer.benyacov at gmail.com>
> 
> Currently the IDL does not support passive TCP connection,
> i.e. when the OVSDB connects to its manager.
> 
> This patch enables IDL to use an already-open session
> (the one which was previously used for retrieving the db schema).
> In addition, it enables IDL to go back to "listen mode" in case the connection
> is lost.
> 
> LIMITATIONS:
> ----------------------
> 
> This patch enables a **SINGLE** TCP connection from an OVSDB server to an
> agent that uses IDL with {IP,PORT} pair. Therefore, the agent will support
> only **ONE** OVSDB server using {IP,PORT} pair.
> 
> Future development may add multi-session server capability that will allow
> an agent to use single {IP,PORT} pair to connect to multiple OVSDB servers.
> 
> 
> CAVEAT:
> --------------
> 
> When a database first connects to the agent, the agent gets the schema and
> data and builds its tables. If the session disconnects, the agent goes back
> to "listen mode" and accepts **ANY** TCP connection, which means that if
> another database will try to connect to the agent using the same {IP,PORT}
> pair, it will be connected to the IDL that has the schema and data from
> the first database.
> 
> A future patch can resolve this problem.
> 
> USAGE:
> -------------
> 
> To use IDL in passive mode, the following example code can be use:
> 
> (snippet)
> 
> from ovs.jsonrpc import Session
> ...
> 
> from neutron.agent.ovsdb.native import idlutils
> 
> ...
> 
> session = Session.open('ptcp:192.168.10.10:6640')
> 
> # first call to session.run creates the PassiveStream object and second one
> # accept incoming connection
> session.run()
> session.run()
> 
> # this static method is similar to the original neutron method but the
> # rpc.close() command that would result closing the socket.
> helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
>         'hardware_vtep')
> helper.register_all()
> self.idl = idl.Idl(self.connection, helper, session)
> idlutils.wait_for_change(self.idl, self.timeout)
> 
> self.poller = poller.Poller()
> self.thread = threading.Thread(target=self.run)
> self.thread.setDaemon(True)
> self.thread.start()

It's a little odd to show example usage that depends on neutron code.  I
actually don't even see get_schema_helper_from_stream_no_close() in
idlutils right now.  Is that a pending patch?

Maybe this usage information could be incorporated into a docstring
somewhere in idl.py?  That would make it more useful to future users, as
they likely won't find the details in the commit message.

> 
> 
> TESTING:
> ---------------
> Added unit test for passive mode. See ovsdb-idl.at file.
> 
> 
> Signed-off-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
> 
> Tested-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
> 
> Requested-by: Ben Pfaff <blp at nicira.com>, 
>         "D M, Vikas" <vikas.d-m at hpe.com>,
>         "Kamat, Maruti Haridas" <maruti.kamat at hpe.com>,
>         "Sukhdev Kapur" <sukhdev at arista.com>,
>         "Migliaccio, Armando" <armando.migliaccio at hpe.com>

Each email address should have it's own "Requested-by" prefix.

> 
> ---
>  python/ovs/db/idl.py  | 18 +++++++++++++++---
>  python/ovs/jsonrpc.py | 18 ++++++++++--------
>  python/ovs/stream.py  | 32 ++++++++++++++++++++++----------
>  tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
>  tests/test-ovsdb.py   | 49 ++++++++++++++++++++++++++++++++-----------------
>  5 files changed, 110 insertions(+), 38 deletions(-)
> 
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index 3187db9..8f55d0d 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -85,7 +85,7 @@ class Idl(object):
>        currently being constructed, if there is one, or None otherwise.
>  """
>  
> -    def __init__(self, remote, schema):
> +    def __init__(self, remote, schema, session = None):

You need this patch to get the full set of flake8 errors:

https://patchwork.ozlabs.org/patch/571788/

with that, I get the following (which includes the need to remove spaces
around '=' above.

python/ovs/db/idl.py:88:47: E251 unexpected spaces around keyword /
parameter equals
python/ovs/db/idl.py:88:49: E251 unexpected spaces around keyword /
parameter equals

>          """Creates and returns a connection to the database named 'db_name' on
>          'remote', which should be in a form acceptable to
>          ovs.jsonrpc.session.open().  The connection will maintain an in-memory
> @@ -103,7 +103,16 @@ class Idl(object):
>          As a convenience to users, 'schema' may also be an instance of the
>          SchemaHelper class.
>  
> -        The IDL uses and modifies 'schema' directly."""
> +        The IDL uses and modifies 'schema' directly.
> +
> +        In passive mode ( where the OVSDB connects to its manager ),
> +        we first need to wait for the OVSDB to connect and then

saying "the OVSDB server" would read a little clearer to me than just
"the OVSDB".

> +        pass the 'session' object (while the it is still open ) and
> +        the schema we retrieved from the open session to the IDL to use it.
> +
> +        If in active mode, do not pass 'session' and it will be created
> +        by IDL by using 'remote'.
> +        """
>  
>          assert isinstance(schema, SchemaHelper)
>          schema = schema.get_idl_schema()
> @@ -111,7 +120,10 @@ class Idl(object):
>          self.tables = schema.tables
>          self.readonly = schema.readonly
>          self._db = schema
> -        self._session = ovs.jsonrpc.Session.open(remote)
> +        if session:
> +            self._session = session
> +        else:
> +            self._session = ovs.jsonrpc.Session.open(remote)
>          self._monitor_request_id = None
>          self._last_seqno = None
>          self.change_seqno = 0
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index 99aa27c..de1125b 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -418,23 +418,25 @@ class Session(object):
>          self.__disconnect()
>  
>          name = self.reconnect.get_name()
> -        if not self.reconnect.is_passive():
> -            error, self.stream = ovs.stream.Stream.open(name)
> +        if self.reconnect.is_passive():
> +            if self.pstream is not None:
> +                self.pstream.close()
> +            error, self.pstream = ovs.stream.PassiveStream.open(name)
>              if not error:
> -                self.reconnect.connecting(ovs.timeval.msec())
> +                self.reconnect.listening(ovs.timeval.msec())
>              else:
>                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> -        elif self.pstream is not None:
> -            error, self.pstream = ovs.stream.PassiveStream.open(name)
> +        else:
> +            error, self.stream = ovs.stream.Stream.open(name)
>              if not error:
> -                self.reconnect.listening(ovs.timeval.msec())
> +                self.reconnect.connecting(ovs.timeval.msec())
>              else:
>                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
>  
>          self.seqno += 1
>  
>      def run(self):
> -        if self.pstream is not None:
> +        if self.pstream is not None and self.stream is None:
>              error, stream = self.pstream.accept()
>              if error == 0:
>                  if self.rpc or self.stream:
> @@ -444,11 +446,11 @@ class Session(object):
>                      self.__disconnect()
>                  self.reconnect.connected(ovs.timeval.msec())
>                  self.rpc = Connection(stream)
> +                self.stream = stream
>              elif error != errno.EAGAIN:
>                  self.reconnect.listen_error(ovs.timeval.msec(), error)
>                  self.pstream.close()
>                  self.pstream = None
> -

Unrelated formatting change here.

>          if self.rpc:
>              backlog = self.rpc.get_backlog()
>              self.rpc.run()
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index a555a76..46f6a8d 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -278,26 +278,35 @@ class PassiveStream(object):
>      def open(name):
>          """Attempts to start listening for remote stream connections.  'name'
>          is a connection name in the form "TYPE:ARGS", where TYPE is an passive
> -        stream class's name and ARGS are stream class-specific.  Currently the
> -        only supported TYPE is "punix".
> +        stream class's name and ARGS are stream class-specific.

How about updating this to explicitly say that it's now "punix" and "ptcp" ?

>  
>          Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
>          new PassiveStream, on failure 'error' is a positive errno value and
>          'pstream' is None."""
>          if not PassiveStream.is_valid_name(name):
>              return errno.EAFNOSUPPORT, None
> -
> -        bind_path = name[6:]
> +        bind_path = None
>          if name.startswith("punix:"):
> +            bind_path = name[6:]
>              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> -        error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> -                                                       True, bind_path, None)
> -        if error:
> -            return error, None
> +            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> +                                                           True, bind_path,
> +                                                           None)
> +            if error:
> +                return error, None
> +
> +        elif name.startswith("ptcp:"):
> +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> +            remote = name.split(':')
> +            sock.bind((remote[1], int(remote[2])))
> +
> +        else:
> +            raise Exception('Unknown connection string')
>  
>          try:
>              sock.listen(10)
> -        except socket.error as e:
> +        except socket.error, e:

This change is reverting a Python 3 compatibility fix.  I'm sure it was
just an accident while resolving conflicts in a rebase, but it should be
restored to "except socket.error as e".

If you install flake8 as well as the hacking flake8 plugin, you will get
an error at build-time about this.

python/ovs/stream.py:309:9: H231  Python 3.x incompatible 'except x,y:'
construct


>              vlog.err("%s: listen: %s" % (name, os.strerror(e.error)))
>              sock.close()
>              return e.error, None
> @@ -324,7 +333,10 @@ class PassiveStream(object):
>              try:
>                  sock, addr = self.socket.accept()
>                  ovs.socket_util.set_nonblocking(sock)
> -                return 0, Stream(sock, "unix:%s" % addr, 0)
> +                if (sock.family == socket.AF_UNIX):
> +                    return 0, Stream(sock, "unix:%s" % addr, 0)
> +                return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + str(addr[1]),

This is a minor style thing, but I'd prefer:

    'ptcp:%s:%s' % (addr[0], addr[1])

> +                                 0)
>              except socket.error as e:
>                  error = ovs.socket_util.get_exception_errno(e)
>                  if error != errno.EAGAIN:
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index ebf82a5..813812e 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
>     OVSDB_CHECK_IDL_TCP_PY($@)
>     OVSDB_CHECK_IDL_TCP6_PY($@)])
>  
> +
> +# This test uses the Python IDL implementation with passive tcp
> +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
> +  [AT_SETUP([$1 - Python ptcp])
> +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> +   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
> +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> +                  [0], [stdout], [ignore])
> +   # find free TCP port
> +   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
> +   AT_CHECK([kill `cat pid`])
> +
> +   # start OVSDB server in passive mode
> +   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
> +      [0], [stdout], [ignore], [kill `cat pid`])
> +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
> +            [0], [$4], [], [kill `cat pid`])
> +   AT_CLEANUP
> +   ])
> +
> +
> +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty],
> +  [],
> +  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
> +  [[000: empty
> +001: {"error":null,"result":[{"rows":[]}]}
> +002: done
> +]])
> +
>  OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
>    [],
>    [],

Several other tests in here test with both IPv4 and IPv6.
OVSDB_CHECK_IDL_TCP_PY vs OVSDB_CHECK_IDL_TCP6_PY.

Can you update this to test with both IPv4 and IPv6?  I actually don't
think IPv6 is working currently.

It would also be great if you could come up with a test that ran against
both the C and Python implementations, like all the tests that call
OVSDB_CHECK_IDL.  That may require some work in the C code though, and
if you're not comfortable with that, don't worry about it.  You could
leave it as a TODO in here for now.

> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index 4690722..ac28d76 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -406,17 +406,31 @@ def do_idl(schema_file, remote, *commands):
>          commands = commands[1:]
>      else:
>          schema_helper.register_all()
> -    idl = ovs.db.idl.Idl(remote, schema_helper)
>  
> -    if commands:
> -        error, stream = ovs.stream.Stream.open_block(
> -            ovs.stream.Stream.open(remote))
> -        if error:
> -            sys.stderr.write("failed to connect to \"%s\"" % remote)
> -            sys.exit(1)
> -        rpc = ovs.jsonrpc.Connection(stream)
> +    if remote.startswith('ptcp'):
> +        passive = True
> +    else:
> +        passive = False

minor simplification:

    passive = remove.startswith('ptcp:')

> +
> +    if passive:
> +        session = ovs.jsonrpc.Session.open(remote)
> +        session.run()
> +        session.run()

A comment would be helpful in the code here to explain why you're
calling session.run() twice here.  Otherwise it looks like an accident.

> +
> +        rpc = session.rpc
> +        idl = ovs.db.idl.Idl(remote, schema_helper, session)
>      else:
> -        rpc = None
> +        idl = ovs.db.idl.Idl(remote, schema_helper)
> +
> +        if commands:
> +            error, stream = ovs.stream.Stream.open_block(
> +                ovs.stream.Stream.open(remote))
> +            if error:
> +                sys.stderr.write("failed to connect to \"%s\"" % remote)
> +                sys.exit(1)
> +            rpc = ovs.jsonrpc.Connection(stream)
> +        else:
> +            rpc = None
>  
>      symtab = {}
>      seqno = 0
> @@ -474,14 +488,15 @@ def do_idl(schema_file, remote, *commands):
>              sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
>              sys.stdout.flush()
>  
> -    if rpc:
> -        rpc.close()
> -    while idl.change_seqno == seqno and not idl.run():
> -        poller = ovs.poller.Poller()
> -        idl.wait(poller)
> -        poller.block()
> -    print_idl(idl, step)
> -    step += 1
> +    if not passive:
> +        if rpc:
> +            rpc.close()
> +        while idl.change_seqno == seqno and not idl.run():
> +            poller = ovs.poller.Poller()
> +            idl.wait(poller)
> +            poller.block()
> +        print_idl(idl, step)
> +        step += 1
>      idl.close()
>      print("%03d: done" % step)
>  
> 


-- 
Russell Bryant



More information about the dev mailing list