[ovs-dev] [PATCH V3] Add Passive TCP connection to IDL
Ofer Ben Yacov
ofer.benyacov at gmail.com
Mon Dec 28 08:57:37 UTC 2015
Add unit test for passive mode.
---
python/ovs/db/idl.py | 18 +++++++++++++++---
python/ovs/jsonrpc.py | 19 +++++++++++--------
python/ovs/stream.py | 47 +++++++++++++++++++++++++++++++----------------
tests/ovsdb-idl.at | 31 +++++++++++++++++++++++++++++++
tests/test-ovsdb.py | 49 ++++++++++++++++++++++++++++++++-----------------
5 files changed, 120 insertions(+), 44 deletions(-)
diff --git a/python/ovs/db/idl.py b/python/ovs/db/idl.py
index c8990c7..4b492fe 100644
--- a/python/ovs/db/idl.py
+++ b/python/ovs/db/idl.py
@@ -83,7 +83,7 @@ class Idl(object):
currently being constructed, if there is one, or None otherwise.
"""
- def __init__(self, remote, schema):
+ def __init__(self, remote, schema, session = None):
"""Creates and returns a connection to the database named 'db_name' on
'remote', which should be in a form acceptable to
ovs.jsonrpc.session.open(). The connection will maintain an in-memory
@@ -101,7 +101,16 @@ class Idl(object):
As a convenience to users, 'schema' may also be an instance of the
SchemaHelper class.
- The IDL uses and modifies 'schema' directly."""
+ The IDL uses and modifies 'schema' directly.
+
+ In passive mode ( where the OVSDB connects to its manager ),
+ we first need to wait for the OVSDB to connect and then
+ pass the 'session' object (while the it is still open ) and
+ the schema we retrieved from the open session to the IDL to use it.
+
+ If in active mode, do not pass 'session' and it will be created
+ by IDL by using 'remote'.
+ """
assert isinstance(schema, SchemaHelper)
schema = schema.get_idl_schema()
@@ -109,7 +118,10 @@ class Idl(object):
self.tables = schema.tables
self.readonly = schema.readonly
self._db = schema
- self._session = ovs.jsonrpc.Session.open(remote)
+ if session:
+ self._session = session
+ else:
+ self._session = ovs.jsonrpc.Session.open(remote)
self._monitor_request_id = None
self._last_seqno = None
self.change_seqno = 0
diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index d54d74b..1b68d3f 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -418,23 +418,25 @@ class Session(object):
self.__disconnect()
name = self.reconnect.get_name()
- if not self.reconnect.is_passive():
- error, self.stream = ovs.stream.Stream.open(name)
+ if self.reconnect.is_passive():
+ if self.pstream is not None:
+ self.pstream.close()
+ error, self.pstream = ovs.stream.PassiveStream.open(name)
if not error:
- self.reconnect.connecting(ovs.timeval.msec())
+ self.reconnect.listening(ovs.timeval.msec())
else:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
- elif self.pstream is not None:
- error, self.pstream = ovs.stream.PassiveStream.open(name)
+ else:
+ error, self.stream = ovs.stream.Stream.open(name)
if not error:
- self.reconnect.listening(ovs.timeval.msec())
+ self.reconnect.connecting(ovs.timeval.msec())
else:
self.reconnect.connect_failed(ovs.timeval.msec(), error)
self.seqno += 1
def run(self):
- if self.pstream is not None:
+ if self.pstream is not None and self.stream is None:
error, stream = self.pstream.accept()
if error == 0:
if self.rpc or self.stream:
@@ -444,11 +446,11 @@ class Session(object):
self.__disconnect()
self.reconnect.connected(ovs.timeval.msec())
self.rpc = Connection(stream)
+ self.stream = stream
elif error != errno.EAGAIN:
self.reconnect.listen_error(ovs.timeval.msec(), error)
self.pstream.close()
self.pstream = None
-
if self.rpc:
backlog = self.rpc.get_backlog()
self.rpc.run()
@@ -559,3 +561,4 @@ class Session(object):
def force_reconnect(self):
self.reconnect.force_reconnect(ovs.timeval.msec())
+
diff --git a/python/ovs/stream.py b/python/ovs/stream.py
index fb083ee..a8be6e0 100644
--- a/python/ovs/stream.py
+++ b/python/ovs/stream.py
@@ -15,7 +15,6 @@
import errno
import os
import socket
-
import ovs.poller
import ovs.socket_util
import ovs.vlog
@@ -261,11 +260,11 @@ class PassiveStream(object):
@staticmethod
def is_valid_name(name):
"""Returns True if 'name' is a passive stream name in the form
- "TYPE:ARGS" and TYPE is a supported passive stream type (currently only
- "punix:"), otherwise False."""
- return name.startswith("punix:")
+ "TYPE:ARGS" and TYPE is a supported passive stream type,
+ otherwise False."""
+ return name.startswith("punix:") or name.startswith("ptcp:")
- def __init__(self, sock, name, bind_path):
+ def __init__(self, sock, name, bind_path=None):
self.name = name
self.socket = sock
self.bind_path = bind_path
@@ -274,22 +273,31 @@ class PassiveStream(object):
def open(name):
"""Attempts to start listening for remote stream connections. 'name'
is a connection name in the form "TYPE:ARGS", where TYPE is an passive
- stream class's name and ARGS are stream class-specific. Currently the
- only supported TYPE is "punix".
+ stream class's name and ARGS are stream class-specific.
Returns (error, pstream): on success 'error' is 0 and 'pstream' is the
new PassiveStream, on failure 'error' is a positive errno value and
'pstream' is None."""
if not PassiveStream.is_valid_name(name):
return errno.EAFNOSUPPORT, None
-
- bind_path = name[6:]
+ bind_path = None
if name.startswith("punix:"):
+ bind_path = name[6:]
bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
- error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
- True, bind_path, None)
- if error:
- return error, None
+ error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+ True, bind_path,
+ None)
+ if error:
+ return error, None
+
+ elif name.startswith("ptcp:"):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ remote = name.split(':')
+ sock.bind((remote[1], int(remote[2])))
+
+ else:
+ raise Exception('Unknown connection string')
try:
sock.listen(10)
@@ -320,7 +328,10 @@ class PassiveStream(object):
try:
sock, addr = self.socket.accept()
ovs.socket_util.set_nonblocking(sock)
- return 0, Stream(sock, "unix:%s" % addr, 0)
+ if (sock.family == socket.AF_UNIX):
+ return 0, Stream(sock, "unix:%s" % addr, 0)
+ return 0, Stream(sock, 'ptcp:' + addr[0] + ':' + str(addr[1]),
+ 0)
except socket.error, e:
error = ovs.socket_util.get_exception_errno(e)
if error != errno.EAGAIN:
@@ -350,8 +361,10 @@ class UnixStream(Stream):
@staticmethod
def _open(suffix, dscp):
connect_path = suffix
- return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
- True, None, connect_path)
+ return ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
+ True, None, connect_path)
+
+
Stream.register_method("unix", UnixStream)
@@ -363,4 +376,6 @@ class TCPStream(Stream):
if not error:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return error, sock
+
+
Stream.register_method("tcp", TCPStream)
diff --git a/tests/ovsdb-idl.at b/tests/ovsdb-idl.at
index ad780af..effdef0 100644
--- a/tests/ovsdb-idl.at
+++ b/tests/ovsdb-idl.at
@@ -111,6 +111,37 @@ m4_define([OVSDB_CHECK_IDL],
OVSDB_CHECK_IDL_TCP_PY($@)
OVSDB_CHECK_IDL_TCP6_PY($@)])
+
+# This test uses the Python IDL implementation with passive tcp
+m4_define([OVSDB_CHECK_IDL_PASSIVE_TCP_PY],
+ [AT_SETUP([$1 - Python ptcp])
+ AT_SKIP_IF([test $HAVE_PYTHON = no])
+ AT_KEYWORDS([ovsdb server idl positive Python with tcp socket $5])
+ AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+ [0], [stdout], [ignore])
+ # find free TCP port
+ AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=ptcp:0:127.0.0.1 --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+ PARSE_LISTENING_PORT([ovsdb-server.log], [TCP_PORT])
+ AT_CHECK([kill `cat pid`])
+
+ # start OVSDB server in passive mode
+ AT_CHECK([ovsdb-server --log-file '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --remote=tcp:127.0.0.1:$TCP_PORT --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+ AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema ptcp:127.0.0.1:$TCP_PORT $3],
+ [0], [stdout], [ignore], [kill `cat pid`])
+ AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$6],,, [[| $6]]),
+ [0], [$4], [], [kill `cat pid`])
+ AT_CLEANUP
+ ])
+
+
+OVSDB_CHECK_IDL_PASSIVE_TCP_PY([simple passive idl, initially empty, select empty],
+ [],
+ [['["idltest",{"op":"select","table":"link1","where":[]}]']],
+ [[000: empty
+001: {"error":null,"result":[{"rows":[]}]}
+002: done
+]])
+
OVSDB_CHECK_IDL([simple idl, initially empty, no ops],
[],
[],
diff --git a/tests/test-ovsdb.py b/tests/test-ovsdb.py
index a6897f3..dccaf77 100644
--- a/tests/test-ovsdb.py
+++ b/tests/test-ovsdb.py
@@ -403,17 +403,31 @@ def do_idl(schema_file, remote, *commands):
commands = commands[1:]
else:
schema_helper.register_all()
- idl = ovs.db.idl.Idl(remote, schema_helper)
- if commands:
- error, stream = ovs.stream.Stream.open_block(
- ovs.stream.Stream.open(remote))
- if error:
- sys.stderr.write("failed to connect to \"%s\"" % remote)
- sys.exit(1)
- rpc = ovs.jsonrpc.Connection(stream)
+ if remote.startswith('ptcp'):
+ passive = True
+ else:
+ passive = False
+
+ if passive:
+ session = ovs.jsonrpc.Session.open(remote)
+ session.run()
+ session.run()
+
+ rpc = session.rpc
+ idl = ovs.db.idl.Idl(remote, schema_helper, session)
else:
- rpc = None
+ idl = ovs.db.idl.Idl(remote, schema_helper)
+
+ if commands:
+ error, stream = ovs.stream.Stream.open_block(
+ ovs.stream.Stream.open(remote))
+ if error:
+ sys.stderr.write("failed to connect to \"%s\"" % remote)
+ sys.exit(1)
+ rpc = ovs.jsonrpc.Connection(stream)
+ else:
+ rpc = None
symtab = {}
seqno = 0
@@ -471,14 +485,15 @@ def do_idl(schema_file, remote, *commands):
sys.stdout.write("%s\n" % ovs.json.to_string(reply.to_json()))
sys.stdout.flush()
- if rpc:
- rpc.close()
- while idl.change_seqno == seqno and not idl.run():
- poller = ovs.poller.Poller()
- idl.wait(poller)
- poller.block()
- print_idl(idl, step)
- step += 1
+ if not passive:
+ if rpc:
+ rpc.close()
+ while idl.change_seqno == seqno and not idl.run():
+ poller = ovs.poller.Poller()
+ idl.wait(poller)
+ poller.block()
+ print_idl(idl, step)
+ step += 1
idl.close()
print("%03d: done" % step)
--
2.1.4
More information about the dev
mailing list