[ovs-dev] [PATCH V3] Add Passive TCP connection to IDL

Ofer Ben Yacov ofer.benyacov at gmail.com
Mon Dec 28 08:57:37 UTC 2015


Add unit test for passive mode.

---
 python/ovs/db/idl.py  | 18 +++++++++++++++---
 python/ovs/jsonrpc.py | 19 +++++++++++--------
 python/ovs/stream.py  | 47 +++++++++++++++++++++++++++++++----------------
 tests/ovsdb-idl.at    | 31 +++++++++++++++++++++++++++++++
 tests/test-ovsdb.py   | 49 ++++++++++++++++++++++++++++++++-----------------
 5 files changed, 120 insertions(+), 44 deletions(-)

diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index c8990c7..4b492fe 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -83,7 +83,7 @@ class Idl(object):
       currently being constructed, if there is one, or None otherwise.
 """
 
-    def __init__(self, remote, schema):
+    def __init__(self, remote, schema, session = None):
         """Creates and returns a connection to the database named 'db_name' on
         'remote', which should be in a form acceptable to
         ovs.jsonrpc.session.open().  The connection will maintain an in-memory
@@ -101,7 +101,16 @@ class Idl(object):
         As a convenience to users, 'schema' may also be an instance of the
         SchemaHelper class.
 
-        The IDL uses and modifies 'schema' directly."""
+        The IDL uses and modifies 'schema' directly.
+
+        In passive mode ( where the OVSDB connects to its manager ),
+        we first need to wait for the OVSDB to connect and then
+        pass the 'session' object (while the it is still open ) and
+        the schema we retrieved from the open session to the IDL to use it.
+
+        If in active mode, do not pass 'session' and it will be created
+        by IDL by using 'remote'.
+        """
 
         assert isinstance(schema, SchemaHelper)
         schema = schema.get_idl_schema()
@@ -109,7 +118,10 @@ class Idl(object):
         self.tables = schema.tables
         self.readonly = schema.readonly
         self._db = schema
-        self._session = ovs.jsonrpc.Session.open(remote)
+        if session:
+            self._session = session
+        else:
+            self._session = ovs.jsonrpc.Session.open(remote)
         self._monitor_request_id = None
         self._last_seqno = None
         self.change_seqno = 0
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index d54d74b..1b68d3f 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -418,23 +418,25 @@ class Session(object):
         self.__disconnect()
 
         name = self.reconnect.get_name()
-        if not self.reconnect.is_passive():
-            error, self.stream = ovs.stream.Stream.open(name)
+        if self.reconnect.is_passive():
+            if self.pstream is not None:
+                self.pstream.close()
+            error, self.pstream = ovs.stream.PassiveStream.open(name)
             if not error:
-                self.reconnect.connecting(ovs.timeval.msec())
+                self.reconnect.listening(ovs.timeval.msec())
             else:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
-        elif self.pstream is not None:
-            error, self.pstream = ovs.stream.PassiveStream.open(name)
+        else:
+            error, self.stream = ovs.stream.Stream.open(name)
             if not error:
-                self.reconnect.listening(ovs.timeval.msec())
+                self.reconnect.connecting(ovs.timeval.msec())
             else:
                 self.reconnect.connect_failed(ovs.timeval.msec(), error)
 
         self.seqno += 1
 
     def run(self):
-        if self.pstream is not None:
+        if self.pstream is not None and self.stream is None:
             error, stream = self.pstream.accept()
             if error == 0:
                 if self.rpc or self.stream:
@@ -444,11 +446,11 @@ class Session(object):
                     self.__disconnect()
                 self.reconnect.connected(ovs.timeval.msec())
                 self.rpc = Connection(stream)
+                self.stream = stream
             elif error != errno.EAGAIN:
                 self.reconnect.listen_error(ovs.timeval.msec(), error)
                 self.pstream.close()
                 self.pstream = None
-
         if self.rpc:
             backlog = self.rpc.get_backlog()
             self.rpc.run()
@@ -559,3 +561,4 @@ class Session(object):
 
     def force_reconnect(self):
         self.reconnect.force_reconnect(ovs.timeval.msec())
+
diff --git a/python/ovs/stream.py b/python/ovs/stream.py
index fb083ee..a8be6e0 100644
--- a/python/ovs/stream.py
+++ b/python/ovs/stream.py
@@ -15,7 +15,6 @@
 import errno
 import os
 import socket
-
 import ovs.poller
 import ovs.socket_util
 import ovs.vlog
@@ -261,11 +260,11 @@ class PassiveStream(object):
     @staticmethod
     def is_valid_name(name):
         """Returns True if 'name' is a passive stream name in the form
-        "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
-        "punix:"), otherwise False."""
-        return name.startswith("punix:")
+        "TYPE:ARGS" and TYPE is a supported passive stream type,
+        otherwise False."""
+        return name.startswith("punix:") or name.startswith("ptcp:")
 
-    def __init__(self, sock, name, bind_path):
+    def __init__(self, sock, name, bind_path=None):
         self.name = name
         self.socket = sock
         self.bind_path = bind_path
@@ -274,22 +273,31 @@ class PassiveStream(object):
     def open(name):
         """Attempts to start listening for remote stream connections.  'name'
         is a connection name in the form "TYPE:ARGS", where TYPE is an passive
-        stream class's name and ARGS are stream class-specific.  Currently the
-        only supported TYPE is "punix".
+        stream class's name and ARGS are stream class-specific.
 
         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
         new PassiveStream, on failure 'error' is a positive errno value and
         'pstream' is None."""
         if not PassiveStream.is_valid_name(name):
             return errno.EAFNOSUPPORT, None
-
-        bind_path = name[6:]
+        bind_path = None
         if name.startswith("punix:"):
+            bind_path = name[6:]
             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
-        error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                       True, bind_path, None)
-        if error:
-            return error, None
+            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+                                                           True, bind_path,
+                                                           None)
+            if error:
+                return error, None
+
+        elif name.startswith("ptcp:"):
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+            remote = name.split(':')
+            sock.bind((remote[1], int(remote[2])))
+
+        else:
+            raise Exception('Unknown connection string')
 
         try:
             sock.listen(10)
@@ -320,7 +328,10 @@ class PassiveStream(object):
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                return 0, Stream(sock, "unix:%s" % addr, 0)
+                if (sock.family == socket.AF_UNIX):
+                    return 0, Stream(sock, "unix:%s" % addr, 0)
+                return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + str(addr[1]),
+                                 0)
             except socket.error, e:
                 error = ovs.socket_util.get_exception_errno(e)
                 if error != errno.EAGAIN:
@@ -350,8 +361,10 @@ class UnixStream(Stream):
     @staticmethod
     def _open(suffix, dscp):
         connect_path = suffix
-        return  ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                 True, None, connect_path)
+        return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+                                                True, None, connect_path)
+
+
 Stream.register_method("unix", UnixStream)
 
 
@@ -363,4 +376,6 @@ class TCPStream(Stream):
         if not error:
             sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
         return error, sock
+
+
 Stream.register_method("tcp", TCPStream)
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index ad780af..effdef0 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
    OVSDB_CHECK_IDL_TCP_PY($@)
    OVSDB_CHECK_IDL_TCP6_PY($@)])
 
+
+# This test uses the Python IDL implementation with passive tcp
+m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
+  [AT_SETUP([$1 - Python ptcp])
+   AT_SKIP_IF([test $HAVE_PYTHON = no])
+   AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
+   AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+                  [0], [stdout], [ignore])
+   # find free TCP port
+   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+   AT_CHECK([kill `cat pid`])
+
+   # start OVSDB server in passive mode
+   AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+   AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
+      [0], [stdout], [ignore], [kill `cat pid`])
+   AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
+            [0], [$4], [], [kill `cat pid`])
+   AT_CLEANUP
+   ])
+
+
+OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty],
+  [],
+  [['["idltest",{"op":"select","table":"link1","where":[]}]']],
+  [[000: empty
+001: {"error":null,"result":[{"rows":[]}]}
+002: done
+]])
+
 OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
   [],
   [],
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index a6897f3..dccaf77 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -403,17 +403,31 @@ def do_idl(schema_file, remote, *commands):
         commands = commands[1:]
     else:
         schema_helper.register_all()
-    idl = ovs.db.idl.Idl(remote, schema_helper)
 
-    if commands:
-        error, stream = ovs.stream.Stream.open_block(
-            ovs.stream.Stream.open(remote))
-        if error:
-            sys.stderr.write("failed to connect to \"%s\"" % remote)
-            sys.exit(1)
-        rpc = ovs.jsonrpc.Connection(stream)
+    if remote.startswith('ptcp'):
+        passive = True
+    else:
+        passive = False
+
+    if passive:
+        session = ovs.jsonrpc.Session.open(remote)
+        session.run()
+        session.run()
+
+        rpc = session.rpc
+        idl = ovs.db.idl.Idl(remote, schema_helper, session)
     else:
-        rpc = None
+        idl = ovs.db.idl.Idl(remote, schema_helper)
+
+        if commands:
+            error, stream = ovs.stream.Stream.open_block(
+                ovs.stream.Stream.open(remote))
+            if error:
+                sys.stderr.write("failed to connect to \"%s\"" % remote)
+                sys.exit(1)
+            rpc = ovs.jsonrpc.Connection(stream)
+        else:
+            rpc = None
 
     symtab = {}
     seqno = 0
@@ -471,14 +485,15 @@ def do_idl(schema_file, remote, *commands):
             sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
             sys.stdout.flush()
 
-    if rpc:
-        rpc.close()
-    while idl.change_seqno == seqno and not idl.run():
-        poller = ovs.poller.Poller()
-        idl.wait(poller)
-        poller.block()
-    print_idl(idl, step)
-    step += 1
+    if not passive:
+        if rpc:
+            rpc.close()
+        while idl.change_seqno == seqno and not idl.run():
+            poller = ovs.poller.Poller()
+            idl.wait(poller)
+            poller.block()
+        print_idl(idl, step)
+        step += 1
     idl.close()
     print("%03d: done" % step)
 
-- 
2.1.4




More information about the dev mailing list