[ovs-dev] [PATCH V6] Add Passive TCP connection to IDL.

Ben Pfaff blp at ovn.org
Wed Feb 24 00:48:25 UTC 2016


I spent some time looking this over, just now, and I'm a bit confused.

It's clear that the python implementation of PassiveStream did not
support TCP.  Great, the patch adds that.  If the patch just added that,
I'd be happy, I'd just commit it.

What I don't understand is why other changes are needed to support
passive streams in the IDL.  The python implementation of
jsonrpc.Session seems to support passive connection just as well as
active ones; the code is basically the same as the C implementation.  So
why are other changes needed?

The CAVEAT in the commit message worries me too.  Why doesn't the IDL
work just the same in this regard for active and passive connections?

Thanks,

Ben.

On Wed, Feb 17, 2016 at 05:22:19PM +0200, Ofer Ben Yacov wrote:
> From: Ofer Ben-Yacov <ofer.benyacov at gmail.com>
> 
> Currently the IDL does not support passive TCP connection,
> i.e. when the OVSDB connects to its manager.
> 
> This patch enables IDL to use an already-open session
> (the one which was previously used for retrieving the db schema).
> In addition, it enables IDL to go back to "listen mode" in case the connection
> is lost.
> 
> LIMITATIONS:
> ----------------------
> 
> This patch enables a **SINGLE** TCP connection from an OVSDB server to an
> agent that uses IDL with {IP,PORT} pair. Therefore, the agent will support
> only **ONE** OVSDB server using {IP,PORT} pair.
> 
> Future development may add multi-session server capability that will allow
> an agent to use single {IP,PORT} pair to connect to multiple OVSDB servers.
> 
> 
> CAVEAT:
> --------------
> 
> When a database first connects to the agent, the agent gets the schema and
> data and builds its tables. If the session disconnects, the agent goes back
> to "listen mode" and accepts **ANY** TCP connection, which means that if
> another database will try to connect to the agent using the same {IP,PORT}
> pair, it will be connected to the IDL that has the schema and data from
> the first database.
> 
> A future patch can resolve this problem.
> 
> USAGE:
> -------------
> 
> To use IDL in passive mode, the following example code can be use:
> 
> (snippet)
> 
> from ovs.jsonrpc import Session
> ...
> 
> from neutron.agent.ovsdb.native import idlutils
> 
> ...
> 
> session = Session.open('ptcp:192.168.10.10:6640')
> 
> # first call to session.run creates the PassiveStream object and second one
> # accept incoming connection
> session.run()
> session.run()
> 
> # this static method is similar to the original neutron method but the
> # rpc.close() command that would result closing the socket.
> helper = idlutils.get_schema_helper_from_stream_no_close(session.stream,
>         'hardware_vtep')
> helper.register_all()
> self.idl = idl.Idl(self.connection, helper, session)
> idlutils.wait_for_change(self.idl, self.timeout)
> 
> self.poller = poller.Poller()
> self.thread = threading.Thread(target=self.run)
> self.thread.setDaemon(True)
> self.thread.start()
> 
> 
> TESTING:
> ---------------
> Added unit test for passive mode. See ovsdb-idl.at file.
> 
> TODO
> ----
> Test this patch against C implementation
> 
> 
> Signed-off-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
> 
> Tested-by: "Ofer Ben-Yacov" <ofer.benyacov at gmail.com>
> 
> Requested-by: Ben Pfaff <blp at nicira.com>, 
> Requested-by: "D M, Vikas" <vikas.d-m at hpe.com>,
> Requested-by: "Kamat, Maruti Haridas" <maruti.kamat at hpe.com>,
> Requested-by: "Sukhdev Kapur" <sukhdev at arista.com>,
> Requested-by: "Migliaccio, Armando" <armando.migliaccio at hpe.com>
> 
> ---
>  python/ovs/db/idl.py  | 18 +++++++++++++++---
>  python/ovs/jsonrpc.py | 18 ++++++++++--------
>  python/ovs/stream.py  | 37 +++++++++++++++++++++++++------------
>  tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
>  tests/test-ovsdb.py   | 47 ++++++++++++++++++++++++++++++-----------------
>  5 files changed, 111 insertions(+), 40 deletions(-)
> 
> diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
> index e69d35e..24e9f11 100644
> --- a/python/ovs/db/idl.py
> +++ b/python/ovs/db/idl.py
> @@ -86,7 +86,7 @@ class Idl(object):
>        currently being constructed, if there is one, or None otherwise.
>  """
>  
> -    def __init__(self, remote, schema):
> +    def __init__(self, remote, schema, session=None):
>          """Creates and returns a connection to the database named 'db_name' on
>          'remote', which should be in a form acceptable to
>          ovs.jsonrpc.session.open().  The connection will maintain an in-memory
> @@ -104,7 +104,16 @@ class Idl(object):
>          As a convenience to users, 'schema' may also be an instance of the
>          SchemaHelper class.
>  
> -        The IDL uses and modifies 'schema' directly."""
> +        The IDL uses and modifies 'schema' directly.
> +
> +        In passive mode ( where the OVSDB server connects to its manager ),
> +        we first need to wait for the OVSDB server to connect and then
> +        pass the 'session' object (while the it is still open ) and
> +        the schema we retrieved from the open session to the IDL to use it.
> +
> +        If in active mode, do not pass 'session' and it will be created
> +        by IDL by using 'remote'.
> +        """
>  
>          assert isinstance(schema, SchemaHelper)
>          schema = schema.get_idl_schema()
> @@ -112,7 +121,10 @@ class Idl(object):
>          self.tables = schema.tables
>          self.readonly = schema.readonly
>          self._db = schema
> -        self._session = ovs.jsonrpc.Session.open(remote)
> +        if session:
> +            self._session = session
> +        else:
> +            self._session = ovs.jsonrpc.Session.open(remote)
>          self._monitor_request_id = None
>          self._last_seqno = None
>          self.change_seqno = 0
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
> index e3ef6db..8f63a75 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -429,23 +429,25 @@ class Session(object):
>          self.__disconnect()
>  
>          name = self.reconnect.get_name()
> -        if not self.reconnect.is_passive():
> -            error, self.stream = ovs.stream.Stream.open(name)
> +        if self.reconnect.is_passive():
> +            if self.pstream is not None:
> +                self.pstream.close()
> +            error, self.pstream = ovs.stream.PassiveStream.open(name)
>              if not error:
> -                self.reconnect.connecting(ovs.timeval.msec())
> +                self.reconnect.listening(ovs.timeval.msec())
>              else:
>                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
> -        elif self.pstream is not None:
> -            error, self.pstream = ovs.stream.PassiveStream.open(name)
> +        else:
> +            error, self.stream = ovs.stream.Stream.open(name)
>              if not error:
> -                self.reconnect.listening(ovs.timeval.msec())
> +                self.reconnect.connecting(ovs.timeval.msec())
>              else:
>                  self.reconnect.connect_failed(ovs.timeval.msec(), error)
>  
>          self.seqno += 1
>  
>      def run(self):
> -        if self.pstream is not None:
> +        if self.pstream is not None and self.stream is None:
>              error, stream = self.pstream.accept()
>              if error == 0:
>                  if self.rpc or self.stream:
> @@ -455,11 +457,11 @@ class Session(object):
>                      self.__disconnect()
>                  self.reconnect.connected(ovs.timeval.msec())
>                  self.rpc = Connection(stream)
> +                self.stream = stream
>              elif error != errno.EAGAIN:
>                  self.reconnect.listen_error(ovs.timeval.msec(), error)
>                  self.pstream.close()
>                  self.pstream = None
> -
>          if self.rpc:
>              backlog = self.rpc.get_backlog()
>              self.rpc.run()
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py
> index bc14836..656ba74 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -271,9 +271,9 @@ class PassiveStream(object):
>      @staticmethod
>      def is_valid_name(name):
>          """Returns True if 'name' is a passive stream name in the form
> -        "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
> -        "punix:"), otherwise False."""
> -        return name.startswith("punix:")
> +        "TYPE:ARGS" and TYPE is a supported passive stream type (currently
> +        "punix:" or "ptcp"), otherwise False."""
> +        return name.startswith("punix:") | name.startswith("ptcp:")
>  
>      def __init__(self, sock, name, bind_path):
>          self.name = name
> @@ -284,22 +284,32 @@ class PassiveStream(object):
>      def open(name):
>          """Attempts to start listening for remote stream connections.  'name'
>          is a connection name in the form "TYPE:ARGS", where TYPE is an passive
> -        stream class's name and ARGS are stream class-specific.  Currently the
> -        only supported TYPE is "punix".
> +        stream class's name and ARGS are stream class-specific. Currently the
> +        supported values for TYPE are "punix" and "ptcp".
>  
>          Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
>          new PassiveStream, on failure 'error' is a positive errno value and
>          'pstream' is None."""
>          if not PassiveStream.is_valid_name(name):
>              return errno.EAFNOSUPPORT, None
> -
> -        bind_path = name[6:]
> +        bind_path = None
>          if name.startswith("punix:"):
> +            bind_path = name[6:]
>              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> -        error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> -                                                       True, bind_path, None)
> -        if error:
> -            return error, None
> +            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> +                                                           True, bind_path,
> +                                                           None)
> +            if error:
> +                return error, None
> +
> +        elif name.startswith("ptcp:"):
> +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> +            remote = name.split(':')
> +            sock.bind((remote[1], int(remote[2])))
> +
> +        else:
> +            raise Exception('Unknown connection string')
>  
>          try:
>              sock.listen(10)
> @@ -330,7 +340,10 @@ class PassiveStream(object):
>              try:
>                  sock, addr = self.socket.accept()
>                  ovs.socket_util.set_nonblocking(sock)
> -                return 0, Stream(sock, "unix:%s" % addr, 0)
> +                if (sock.family == socket.AF_UNIX):
> +                    return 0, Stream(sock, "unix:%s" % addr, 0)
> +                return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> +                                                       str(addr[1])), 0)
>              except socket.error as e:
>                  error = ovs.socket_util.get_exception_errno(e)
>                  if error != errno.EAGAIN:
> diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
> index ebf82a5..813812e 100644
> --- a/tests/ovsdb-idl.at
> +++ b/tests/ovsdb-idl.at
> @@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
>     OVSDB_CHECK_IDL_TCP_PY($@)
>     OVSDB_CHECK_IDL_TCP6_PY($@)])
>  
> +
> +# This test uses the Python IDL implementation with passive tcp
> +m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
> +  [AT_SETUP([$1 - Python ptcp])
> +   AT_SKIP_IF([test $HAVE_PYTHON = no])
> +   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
> +   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
> +                  [0], [stdout], [ignore])
> +   # find free TCP port
> +   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
> +   AT_CHECK([kill `cat pid`])
> +
> +   # start OVSDB server in passive mode
> +   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
> +   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
> +      [0], [stdout], [ignore], [kill `cat pid`])
> +   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
> +            [0], [$4], [], [kill `cat pid`])
> +   AT_CLEANUP
> +   ])
> +
> +
> +OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty],
> +  [],
> +  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
> +  [[000: empty
> +001: {"error":null,"result":[{"rows":[]}]}
> +002: done
> +]])
> +
>  OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
>    [],
>    [],
> diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
> index 73c3048..c28ed6b 100644
> --- a/tests/test-ovsdb.py
> +++ b/tests/test-ovsdb.py
> @@ -407,17 +407,29 @@ def do_idl(schema_file, remote, *commands):
>          commands = commands[1:]
>      else:
>          schema_helper.register_all()
> -    idl = ovs.db.idl.Idl(remote, schema_helper)
>  
> -    if commands:
> -        error, stream = ovs.stream.Stream.open_block(
> -            ovs.stream.Stream.open(remote))
> -        if error:
> -            sys.stderr.write("failed to connect to \"%s\"" % remote)
> -            sys.exit(1)
> -        rpc = ovs.jsonrpc.Connection(stream)
> +    passive = remote.startswith('ptcp')
> +    if passive:
> +        session = ovs.jsonrpc.Session.open(remote)
> +        # first call to session.run creates the PassiveStream object and
> +        # second one accept incoming connection
> +        session.run()
> +        session.run()
> +
> +        rpc = session.rpc
> +        idl = ovs.db.idl.Idl(remote, schema_helper, session)
>      else:
> -        rpc = None
> +        idl = ovs.db.idl.Idl(remote, schema_helper)
> +
> +        if commands:
> +            error, stream = ovs.stream.Stream.open_block(
> +                ovs.stream.Stream.open(remote))
> +            if error:
> +                sys.stderr.write("failed to connect to \"%s\"" % remote)
> +                sys.exit(1)
> +            rpc = ovs.jsonrpc.Connection(stream)
> +        else:
> +            rpc = None
>  
>      symtab = {}
>      seqno = 0
> @@ -475,14 +487,15 @@ def do_idl(schema_file, remote, *commands):
>              sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
>              sys.stdout.flush()
>  
> -    if rpc:
> -        rpc.close()
> -    while idl.change_seqno == seqno and not idl.run():
> -        poller = ovs.poller.Poller()
> -        idl.wait(poller)
> -        poller.block()
> -    print_idl(idl, step)
> -    step += 1
> +    if not passive:
> +        if rpc:
> +            rpc.close()
> +        while idl.change_seqno == seqno and not idl.run():
> +            poller = ovs.poller.Poller()
> +            idl.wait(poller)
> +            poller.block()
> +        print_idl(idl, step)
> +        step += 1
>      idl.close()
>      print("%03d: done" % step)
>  
> -- 
> 2.1.4
> 
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list