[ovs-dev] [PATCH 2/5] Python tests: Ported UNIX sockets to Windows

Alin Balutoiu abalutoiu at cloudbasesolutions.com
Thu Dec 29 17:40:43 UTC 2016


Unix sockets (AF_UNIX) are not supported on Windows.
The replacement of Unix sockets on Windows is implemented
using named pipes, we are trying to mimic the behaviour
of unix sockets.

Instead of using Unix sockets to communicate
between components Named Pipes are used. This
makes the python sockets compatible with the
Named Pipe used in Windows applications.

Signed-off-by: Paul-Daniel Boca <pboca at cloudbasesolutions.com>
Signed-off-by: Alin-Gheorghe Balutoiu <abalutoiu at cloudbasesolutions.com>
---
 python/ovs/jsonrpc.py        |   6 +
 python/ovs/poller.py         |  72 ++++++---
 python/ovs/socket_util.py    |  31 +++-
 python/ovs/stream.py         | 351 ++++++++++++++++++++++++++++++++++++++++---
 python/ovs/unixctl/server.py |   4 +
 tests/test-jsonrpc.py        |  16 +-
 6 files changed, 436 insertions(+), 44 deletions(-)

diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py
index 6300c67..5a11500 100644
--- a/python/ovs/jsonrpc.py
+++ b/python/ovs/jsonrpc.py
@@ -14,6 +14,7 @@
 
 import errno
 import os
+import sys
 
 import six
 
@@ -274,6 +275,11 @@ class Connection(object):
                     except UnicodeError:
                         error = errno.EILSEQ
                 if error:
+                    if (sys.platform == "win32" and
+                            error == errno.WSAEWOULDBLOCK):
+                        # WSAEWOULDBLOCK would be the equivalent on Windows
+                        # for EAGAIN on Unix.
+                        error = errno.EAGAIN
                     if error == errno.EAGAIN:
                         return error, None
                     else:
diff --git a/python/ovs/poller.py b/python/ovs/poller.py
index d7cb7d3..d836483 100644
--- a/python/ovs/poller.py
+++ b/python/ovs/poller.py
@@ -18,6 +18,10 @@ import ovs.vlog
 import select
 import socket
 import os
+import sys
+
+if sys.platform == "win32":
+    import ovs.winutils as winutils
 
 try:
     from OpenSSL import SSL
@@ -62,7 +66,9 @@ class _SelectSelect(object):
         if SSL and isinstance(fd, SSL.Connection):
             fd = fd.fileno()
 
-        assert isinstance(fd, int)
+        if sys.platform != 'win32':
+            # Skip this on Windows, it also register events
+            assert isinstance(fd, int)
         if events & POLLIN:
             self.rlist.append(fd)
             events &= ~POLLIN
@@ -73,28 +79,58 @@ class _SelectSelect(object):
             self.xlist.append(fd)
 
     def poll(self, timeout):
-        if timeout == -1:
-            # epoll uses -1 for infinite timeout, select uses None.
-            timeout = None
-        else:
-            timeout = float(timeout) / 1000
         # XXX workaround a bug in eventlet
         # see https://github.com/eventlet/eventlet/pull/25
         if timeout == 0 and _using_eventlet_green_select():
             timeout = 0.1
+        if sys.platform == 'win32':
+            events = self.rlist + self.wlist + self.xlist
+            if not events:
+                return []
+            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
+                raise WindowsError("Cannot handle more than maximum wait"
+                                   "objects\n")
+
+            # win32event.INFINITE timeout is -1
+            # timeout must be an int number, expressed in ms
+            if timeout == 0.1:
+                timeout = 100
+            else:
+                timeout = int(timeout)
+
+            # Wait until any of the events is set to signaled
+            try:
+                retval = winutils.win32event.WaitForMultipleObjects(
+                    events,
+                    False,  # Wait all
+                    timeout)
+            except winutils.pywintypes.error:
+                    return [(0, POLLERR)]
 
-        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
-                                            timeout)
-        events_dict = {}
-        for fd in rlist:
-            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
-        for fd in wlist:
-            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
-        for fd in xlist:
-            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
-                                                        POLLHUP |
-                                                        POLLNVAL)
-        return list(events_dict.items())
+            if retval == winutils.winerror.WAIT_TIMEOUT:
+                return []
+
+            return [(events[retval], 0)]
+        else:
+            if timeout == -1:
+                # epoll uses -1 for infinite timeout, select uses None.
+                timeout = None
+            else:
+                timeout = float(timeout) / 1000
+            rlist, wlist, xlist = select.select(self.rlist,
+                                                self.wlist,
+                                                self.xlist,
+                                                timeout)
+            events_dict = {}
+            for fd in rlist:
+                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
+            for fd in wlist:
+                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
+            for fd in xlist:
+                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
+                                                            POLLHUP |
+                                                            POLLNVAL)
+            return list(events_dict.items())
 
 
 SelectPoll = _SelectSelect
diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py
index b358b05..fb6cee4 100644
--- a/python/ovs/socket_util.py
+++ b/python/ovs/socket_util.py
@@ -17,6 +17,7 @@ import os
 import os.path
 import random
 import socket
+import sys
 
 import six
 from six.moves import range
@@ -25,6 +26,10 @@ import ovs.fatal_signal
 import ovs.poller
 import ovs.vlog
 
+if sys.platform == 'win32':
+    import ovs.winutils as winutils
+    import win32file
+
 vlog = ovs.vlog.Vlog("socket_util")
 
 
@@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path, connect_path, short=False):
 
 def check_connection_completion(sock):
     p = ovs.poller.SelectPoll()
-    p.register(sock, ovs.poller.POLLOUT)
+    if sys.platform == "win32":
+        event = winutils.get_new_event(None, False, True, None)
+        # Receive notification of readiness for writing, of completed
+        # connection or multipoint join operation, and of socket closure.
+        win32file.WSAEventSelect(sock, event,
+                                 win32file.FD_WRITE |
+                                 win32file.FD_CONNECT |
+                                 win32file.FD_CLOSE)
+        p.register(event, ovs.poller.POLLOUT)
+    else:
+        p.register(sock, ovs.poller.POLLOUT)
     pfds = p.poll(0)
     if len(pfds) == 1:
         revents = pfds[0][1]
@@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port, dscp):
         try:
             sock.connect(address)
         except socket.error as e:
-            if get_exception_errno(e) != errno.EINPROGRESS:
+            error = get_exception_errno(e)
+            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
+                # WSAEWOULDBLOCK would be the equivalent on Windows
+                # for EINPROGRESS on Unix.
+                error = errno.EINPROGRESS
+            if error != errno.EINPROGRESS:
                 raise
         return 0, sock
     except socket.error as e:
@@ -257,9 +277,12 @@ def get_null_fd():
     global null_fd
     if null_fd < 0:
         try:
-            null_fd = os.open("/dev/null", os.O_RDWR)
+            # os.devnull ensures compatibility with Windows, returns
+            # '/dev/null' for Unix and 'nul' for Windows
+            null_fd = os.open(os.devnull, os.O_RDWR)
         except OSError as e:
-            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
+            vlog.err("could not open %s: %s" % (os.devnull,
+                                                os.strerror(e.errno)))
             return -e.errno
     return null_fd
 
diff --git a/python/ovs/stream.py b/python/ovs/stream.py
index cd57eb3..e8e5700 100644
--- a/python/ovs/stream.py
+++ b/python/ovs/stream.py
@@ -15,6 +15,7 @@
 import errno
 import os
 import socket
+import sys
 
 import six
 
@@ -27,6 +28,13 @@ try:
 except ImportError:
     SSL = None
 
+if sys.platform == 'win32':
+    import ovs.winutils as winutils
+    import pywintypes
+    import win32event
+    import win32file
+    import win32pipe
+
 vlog = ovs.vlog.Vlog("stream")
 
 
@@ -63,6 +71,13 @@ class Stream(object):
     _SSL_certificate_file = None
     _SSL_ca_cert_file = None
 
+    # Windows only
+    _write = None                # overlapped for write operation
+    _read = None                 # overlapped for read operation
+    _write_pending = False
+    _read_pending = False
+    _retry_connect = False
+
     @staticmethod
     def register_method(method, cls):
         Stream._SOCKET_METHODS[method + ":"] = cls
@@ -81,8 +96,23 @@ class Stream(object):
         otherwise False."""
         return bool(Stream._find_method(name))
 
-    def __init__(self, socket, name, status):
+    def __init__(self, socket, name, status, pipe=None, is_server=False):
         self.socket = socket
+        self.pipe = pipe
+        if sys.platform == 'win32':
+            self._read = pywintypes.OVERLAPPED()
+            self._read.hEvent = winutils.get_new_event()
+            self._write = pywintypes.OVERLAPPED()
+            self._write.hEvent = winutils.get_new_event()
+            if pipe is not None:
+                # Flag to check if fd is a server HANDLE.  In the case of a
+                # server handle we have to issue a disconnect before closing
+                # the actual handle.
+                self._server = is_server
+                suffix = name.split(":", 1)[1]
+                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+                self._pipename = winutils.get_pipe_name(suffix)
+
         self.name = name
         if status == errno.EAGAIN:
             self.state = Stream.__S_CONNECTING
@@ -120,6 +150,38 @@ class Stream(object):
         suffix = name.split(":", 1)[1]
         if name.startswith("unix:"):
             suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+            if sys.platform == 'win32':
+                pipename = winutils.get_pipe_name(suffix)
+
+                if len(suffix) > 255:
+                    # Return invalid argument if the name is too long
+                    return errno.ENOENT, None
+
+                try:
+                    # In case of "unix:" argument, the assumption is that
+                    # there is a file created in the path (suffix).
+                    open(suffix, 'r').close()
+                except:
+                    return errno.ENOENT, None
+
+                try:
+                    npipe = winutils.create_file(pipename)
+                    try:
+                        winutils.set_pipe_mode(npipe,
+                                               win32pipe.PIPE_READMODE_BYTE)
+                    except pywintypes.error as e:
+                        return errno.ENOENT, None
+                except pywintypes.error as e:
+                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
+                        # Pipe is busy, set the retry flag to true and retry
+                        # again during the connect function.
+                        Stream.retry_connect = True
+                        return 0, cls(None, name, errno.EAGAIN,
+                                      pipe=win32file.INVALID_HANDLE_VALUE,
+                                      is_server=False)
+                    return errno.ENOENT, None
+                return 0, cls(None, name, 0, pipe=npipe, is_server=False)
+
         error, sock = cls._open(suffix, dscp)
         if error:
             return error, None
@@ -145,6 +207,10 @@ class Stream(object):
         if not error:
             while True:
                 error = stream.connect()
+                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
+                    # WSAEWOULDBLOCK would be the equivalent on Windows
+                    # for EAGAIN on Unix.
+                    error = errno.EAGAIN
                 if error != errno.EAGAIN:
                     break
                 stream.run()
@@ -152,7 +218,8 @@ class Stream(object):
                 stream.run_wait(poller)
                 stream.connect_wait(poller)
                 poller.block()
-            assert error != errno.EINPROGRESS
+            if stream.socket is not None:
+                assert error != errno.EINPROGRESS
 
         if error and stream:
             stream.close()
@@ -160,11 +227,35 @@ class Stream(object):
         return error, stream
 
     def close(self):
-        self.socket.close()
+        if self.socket is not None:
+            self.socket.close()
+        if self.pipe is not None:
+            if self._server:
+                win32pipe.DisconnectNamedPipe(self.pipe)
+            winutils.close_handle(self.pipe, vlog.warn)
+            winutils.close_handle(self._read.hEvent, vlog.warn)
+            winutils.close_handle(self._write.hEvent, vlog.warn)
 
     def __scs_connecting(self):
-        retval = ovs.socket_util.check_connection_completion(self.socket)
-        assert retval != errno.EINPROGRESS
+        if self.socket is not None:
+            retval = ovs.socket_util.check_connection_completion(self.socket)
+            assert retval != errno.EINPROGRESS
+        elif sys.platform == 'win32' and self.retry_connect:
+            try:
+                self.pipe = winutils.create_file(self._pipename)
+                self._retry_connect = False
+                retval = 0
+            except pywintypes.error as e:
+                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
+                    retval = errno.EAGAIN
+                else:
+                    self._retry_connect = False
+                    retval = errno.ENOENT
+        else:
+            # Windows only, if retry_connect is false, it means it's already
+            # connected so we can set the value of retval to 0
+            retval = 0
+
         if retval == 0:
             self.state = Stream.__S_CONNECTED
         elif retval != errno.EAGAIN:
@@ -209,11 +300,63 @@ class Stream(object):
         elif n == 0:
             return (0, "")
 
+        if sys.platform == 'win32' and self.socket is None:
+            return self.__recv_windows(n)
+
         try:
             return (0, self.socket.recv(n))
         except socket.error as e:
             return (ovs.socket_util.get_exception_errno(e), "")
 
+    def __recv_windows(self, n):
+        if self._read_pending:
+            try:
+                nBytesRead = winutils.get_overlapped_result(self.pipe,
+                                                            self._read,
+                                                            False)
+                self._read_pending = False
+                recvBuffer = self._read_buffer[:nBytesRead]
+
+                return (0, winutils.get_decoded_buffer(recvBuffer))
+            except pywintypes.error as e:
+                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
+                    # The operation is still pending, try again
+                    self._read_pending = True
+                    return (errno.EAGAIN, "")
+                elif e.winerror in winutils.pipe_disconnected_errors:
+                    # If the pipe was disconnected, return 0.
+                    return (0, "")
+                else:
+                    return (errno.EINVAL, "")
+
+        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
+                                                          n,
+                                                          self._read)
+        if errCode:
+            if errCode == winutils.winerror.ERROR_IO_PENDING:
+                self._read_pending = True
+                return (errno.EAGAIN, "")
+            elif errCode in winutils.pipe_disconnected_errors:
+                # If the pipe was disconnected, return 0.
+                return (0, "")
+            else:
+                return (errCode, "")
+
+        try:
+            nBytesRead = winutils.get_overlapped_result(self.pipe,
+                                                        self._read,
+                                                        False)
+            winutils.win32event.SetEvent(self._read.hEvent)
+        except pywintypes.error as e:
+            if e.winerror in winutils.pipe_disconnected_errors:
+                # If the pipe was disconnected, return 0.
+                return (0, "")
+            else:
+                return (e.winerror, "")
+
+        recvBuffer = self._read_buffer[:nBytesRead]
+        return (0, winutils.get_decoded_buffer(recvBuffer))
+
     def send(self, buf):
         """Tries to send 'buf' on this stream.
 
@@ -231,6 +374,9 @@ class Stream(object):
         elif len(buf) == 0:
             return 0
 
+        if sys.platform == 'win32' and self.socket is None:
+            return self.__send_windows(buf)
+
         try:
             # Python 3 has separate types for strings and bytes.  We must have
             # bytes here.
@@ -240,6 +386,40 @@ class Stream(object):
         except socket.error as e:
             return -ovs.socket_util.get_exception_errno(e)
 
+    def __send_windows(self, buf):
+        if self._write_pending:
+            try:
+                nBytesWritten = winutils.get_overlapped_result(self.pipe,
+                                                               self._write,
+                                                               False)
+                self._write_pending = False
+                return nBytesWritten
+            except pywintypes.error as e:
+                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
+                    # The operation is still pending, try again
+                    self._read_pending = True
+                    return -errno.EAGAIN
+                elif e.winerror in winutils.pipe_disconnected_errors:
+                    # If the pipe was disconnected, return connection reset.
+                    return -errno.ECONNRESET
+                else:
+                    return -errno.EINVAL
+
+        buf = winutils.get_encoded_buffer(buf)
+        self._write_pending = False
+        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
+                                                       buf,
+                                                       self._write)
+        if errCode:
+            if errCode == winutils.winerror.ERROR_IO_PENDING:
+                self._write_pending = True
+                return -errno.EAGAIN
+            if (not nBytesWritten and
+                    errCode in winutils.pipe_disconnected_errors):
+                # If the pipe was disconnected, return connection reset.
+                return -errno.ECONNRESET
+        return nBytesWritten
+
     def run(self):
         pass
 
@@ -255,11 +435,52 @@ class Stream(object):
 
         if self.state == Stream.__S_CONNECTING:
             wait = Stream.W_CONNECT
+
+        if sys.platform == 'win32':
+            self.__wait_windows(poller, wait)
+            return
+
         if wait == Stream.W_RECV:
             poller.fd_wait(self.socket, ovs.poller.POLLIN)
         else:
             poller.fd_wait(self.socket, ovs.poller.POLLOUT)
 
+    def __wait_windows(self, poller, wait):
+        if self.socket is not None:
+            if wait == Stream.W_RECV:
+                read_flags = (win32file.FD_READ |
+                              win32file.FD_ACCEPT |
+                              win32file.FD_CLOSE)
+                try:
+                    win32file.WSAEventSelect(self.socket,
+                                             self._read.hEvent,
+                                             read_flags)
+                except pywintypes.error as e:
+                    vlog.err("failed to associate events with socket: %s"
+                             % e.strerror)
+                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
+            else:
+                write_flags = (win32file.FD_WRITE |
+                               win32file.FD_CONNECT |
+                               win32file.FD_CLOSE)
+                try:
+                    win32file.WSAEventSelect(self.socket,
+                                             self._write.hEvent,
+                                             write_flags)
+                except pywintypes.error as e:
+                    vlog.err("failed to associate events with socket: %s"
+                             % e.strerror)
+                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
+        else:
+            if wait == Stream.W_RECV:
+                if self._read:
+                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
+            elif wait == Stream.W_SEND:
+                if self._write:
+                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
+            elif wait == Stream.W_CONNECT:
+                return
+
     def connect_wait(self, poller):
         self.wait(poller, Stream.W_CONNECT)
 
@@ -267,11 +488,22 @@ class Stream(object):
         self.wait(poller, Stream.W_RECV)
 
     def send_wait(self, poller):
+        if sys.platform == 'win32':
+            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
         self.wait(poller, Stream.W_SEND)
 
     def __del__(self):
         # Don't delete the file: we might have forked.
-        self.socket.close()
+        if self.socket is not None:
+            self.socket.close()
+        if self.pipe is not None:
+            # Check if there are any remaining valid handles and close them
+            if self.pipe:
+                winutils.close_handle(self.pipe)
+            if self._read.hEvent:
+                winutils.close_handle(self._read.hEvent)
+            if self._write.hEvent:
+                winutils.close_handle(self._write.hEvent)
 
     @staticmethod
     def ssl_set_private_key_file(file_name):
@@ -287,6 +519,10 @@ class Stream(object):
 
 
 class PassiveStream(object):
+    # Windows only
+    connect = None                  # overlapped for read operation
+    connect_pending = False
+
     @staticmethod
     def is_valid_name(name):
         """Returns True if 'name' is a passive stream name in the form
@@ -294,9 +530,18 @@ class PassiveStream(object):
         "punix:" or "ptcp"), otherwise False."""
         return name.startswith("punix:") | name.startswith("ptcp:")
 
-    def __init__(self, sock, name, bind_path):
+    def __init__(self, sock, name, bind_path, pipe=None):
         self.name = name
+        self.pipe = pipe
         self.socket = sock
+        if pipe is not None:
+            self.connect = pywintypes.OVERLAPPED()
+            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
+            self.connect_pending = False
+            suffix = name.split(":", 1)[1]
+            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
+            self._pipename = winutils.get_pipe_name(suffix)
+
         self.bind_path = bind_path
 
     @staticmethod
@@ -315,11 +560,27 @@ class PassiveStream(object):
         bind_path = name[6:]
         if name.startswith("punix:"):
             bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
-            error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
-                                                           True, bind_path,
-                                                           None)
-            if error:
-                return error, None
+            if sys.platform != 'win32':
+                error, sock = ovs.socket_util.make_unix_socket(
+                    socket.SOCK_STREAM, True, bind_path, None)
+                if error:
+                    return error, None
+            else:
+                # Branch used only on Windows
+                try:
+                    open(bind_path, 'w').close()
+                except:
+                    return errno.ENOENT, None
+
+                pipename = winutils.get_pipe_name(bind_path)
+                if len(pipename) > 255:
+                    # Return invalid argument if the name is too long
+                    return errno.ENOENT, None
+
+                npipe = winutils.create_named_pipe(pipename)
+                if not npipe:
+                    return errno.ENOENT, None
+                return 0, PassiveStream(None, name, bind_path, pipe=npipe)
 
         elif name.startswith("ptcp:"):
             sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -341,7 +602,11 @@ class PassiveStream(object):
 
     def close(self):
         """Closes this PassiveStream."""
-        self.socket.close()
+        if self.socket is not None:
+            self.socket.close()
+        if self.pipe is not None:
+            winutils.close_handle(self.pipe, vlog.warn)
+            winutils.close_handle(self.connect.hEvent, vlog.warn)
         if self.bind_path is not None:
             ovs.fatal_signal.unlink_file_now(self.bind_path)
             self.bind_path = None
@@ -354,28 +619,80 @@ class PassiveStream(object):
 
         Will not block waiting for a connection.  If no connection is ready to
         be accepted, returns (errno.EAGAIN, None) immediately."""
-
+        if sys.platform == 'win32' and self.socket is None:
+            return self.__accept_windows()
         while True:
             try:
                 sock, addr = self.socket.accept()
                 ovs.socket_util.set_nonblocking(sock)
-                if (sock.family == socket.AF_UNIX):
+                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
                     return 0, Stream(sock, "unix:%s" % addr, 0)
                 return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
                                                        str(addr[1])), 0)
             except socket.error as e:
                 error = ovs.socket_util.get_exception_errno(e)
+                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
+                    # WSAEWOULDBLOCK would be the equivalent on Windows
+                    # for EAGAIN on Unix.
+                    error = errno.EAGAIN
                 if error != errno.EAGAIN:
                     # XXX rate-limit
                     vlog.dbg("accept: %s" % os.strerror(error))
                 return error, None
 
+    def __accept_windows(self):
+        if self.connect_pending:
+            try:
+                winutils.get_overlapped_result(self.pipe, self.connect, False)
+            except pywintypes.error as e:
+                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
+                    # The operation is still pending, try again
+                    self.connect_pending = True
+                    return errno.EAGAIN, None
+                else:
+                    if self.pipe:
+                        win32pipe.DisconnectNamedPipe(self.pipe)
+                    return errno.EINVAL, None
+            self.connect_pending = False
+
+        error = winutils.connect_named_pipe(self.pipe, self.connect)
+        if error:
+            if error == winutils.winerror.ERROR_IO_PENDING:
+                self.connect_pending = True
+                return errno.EAGAIN, None
+            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
+                if self.pipe:
+                    win32pipe.DisconnectNamedPipe(self.pipe)
+                self.connect_pending = False
+                return errno.EINVAL, None
+            else:
+                win32event.SetEvent(self.connect.hEvent)
+
+        npipe = winutils.create_named_pipe(self._pipename)
+        if not npipe:
+            return errno.ENOENT, None
+
+        old_pipe = self.pipe
+        self.pipe = npipe
+        winutils.win32event.ResetEvent(self.connect.hEvent)
+        return 0, Stream(None, self.name, 0, pipe=old_pipe)
+
     def wait(self, poller):
-        poller.fd_wait(self.socket, ovs.poller.POLLIN)
+        if sys.platform != 'win32' or self.socket is not None:
+            poller.fd_wait(self.socket, ovs.poller.POLLIN)
+        else:
+            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
 
     def __del__(self):
         # Don't delete the file: we might have forked.
-        self.socket.close()
+        if self.socket is not None:
+            self.socket.close()
+        if self.pipe is not None:
+            # Check if there are any remaining valid handles and close them
+            if self.pipe:
+                winutils.close_handle(self.pipe)
+            if self._connect.hEvent:
+                winutils.close_handle(self._read.hEvent)
 
 
 def usage(name):
diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
index 8595ed8..3f3e051 100644
--- a/python/ovs/unixctl/server.py
+++ b/python/ovs/unixctl/server.py
@@ -148,6 +148,10 @@ class UnixctlServer(object):
     def run(self):
         for _ in range(10):
             error, stream = self._listener.accept()
+            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
+                # WSAEWOULDBLOCK would be the equivalent on Windows
+                # for EAGAIN on Unix.
+                error = errno.EAGAIN
             if not error:
                 rpc = ovs.jsonrpc.Connection(stream)
                 self._conns.append(UnixctlConnection(rpc))
diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py
index 18634e6..3eabcd7 100644
--- a/tests/test-jsonrpc.py
+++ b/tests/test-jsonrpc.py
@@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
 
 
 def do_listen(name):
-    error, pstream = ovs.stream.PassiveStream.open(name)
-    if error:
-        sys.stderr.write("could not listen on \"%s\": %s\n"
-                         % (name, os.strerror(error)))
-        sys.exit(1)
+    if sys.platform != 'win32' or (
+            ovs.daemon._detach and ovs.daemon._detached):
+        # On Windows the child is a new process created which should be the
+        # one that creates the PassiveStream. Without this check, the new
+        # child process will create a new PassiveStream overwriting the one
+        # that the parent process created.
+        error, pstream = ovs.stream.PassiveStream.open(name)
+        if error:
+            sys.stderr.write("could not listen on \"%s\": %s\n"
+                             % (name, os.strerror(error)))
+            sys.exit(1)
 
     ovs.daemon.daemonize()
 
-- 
2.10.0.windows.1


More information about the dev mailing list