[ovs-dev] [PATCH V2 2/5] Python tests: Ported UNIX sockets to Windows

Alin Balutoiu abalutoiu at cloudbasesolutions.com
Tue Jan 3 16:27:22 UTC 2017


Please ignore this patch, I will send another one soon.

Thanks,
Alin Balutoiu.

> -----Original Message-----
> From: Alin Balutoiu
> Sent: Tuesday, January 3, 2017 5:17 PM
> To: dev at openvswitch.org
> Cc: Alin Balutoiu <abalutoiu at cloudbasesolutions.com>; Paul Boca
> <pboca at cloudbasesolutions.com>
> Subject: [PATCH V2 2/5] Python tests: Ported UNIX sockets to Windows
> 
> From: Alin Balutoiu <abalutoiu at cloudbasesolutions.com>
> 
> Unix sockets (AF_UNIX) are not supported on Windows.
> The replacement of Unix sockets on Windows is implemented using named
> pipes, we are trying to mimic the behaviour of unix sockets.
> 
> Instead of using Unix sockets to communicate between components Named
> Pipes are used. This makes the python sockets compatible with the Named
> Pipe used in Windows applications.
> 
> Signed-off-by: Paul-Daniel Boca <pboca at cloudbasesolutions.com>
> Signed-off-by: Alin-Gheorghe Balutoiu <abalutoiu at cloudbasesolutions.com>
> ---
> V2: No changes.
> ---
>  python/ovs/jsonrpc.py        |   6 +
>  python/ovs/poller.py         |  72 ++++++---
>  python/ovs/socket_util.py    |  31 +++-
>  python/ovs/stream.py         | 351
> ++++++++++++++++++++++++++++++++++++++++---
>  python/ovs/unixctl/server.py |   4 +
>  tests/test-jsonrpc.py        |  16 +-
>  6 files changed, 436 insertions(+), 44 deletions(-)
> 
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index
> 6300c67..5a11500 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -14,6 +14,7 @@
> 
>  import errno
>  import os
> +import sys
> 
>  import six
> 
> @@ -274,6 +275,11 @@ class Connection(object):
>                      except UnicodeError:
>                          error = errno.EILSEQ
>                  if error:
> +                    if (sys.platform == "win32" and
> +                            error == errno.WSAEWOULDBLOCK):
> +                        # WSAEWOULDBLOCK would be the equivalent on Windows
> +                        # for EAGAIN on Unix.
> +                        error = errno.EAGAIN
>                      if error == errno.EAGAIN:
>                          return error, None
>                      else:
> diff --git a/python/ovs/poller.py b/python/ovs/poller.py index
> d7cb7d3..d836483 100644
> --- a/python/ovs/poller.py
> +++ b/python/ovs/poller.py
> @@ -18,6 +18,10 @@ import ovs.vlog
>  import select
>  import socket
>  import os
> +import sys
> +
> +if sys.platform == "win32":
> +    import ovs.winutils as winutils
> 
>  try:
>      from OpenSSL import SSL
> @@ -62,7 +66,9 @@ class _SelectSelect(object):
>          if SSL and isinstance(fd, SSL.Connection):
>              fd = fd.fileno()
> 
> -        assert isinstance(fd, int)
> +        if sys.platform != 'win32':
> +            # Skip this on Windows, it also register events
> +            assert isinstance(fd, int)
>          if events & POLLIN:
>              self.rlist.append(fd)
>              events &= ~POLLIN
> @@ -73,28 +79,58 @@ class _SelectSelect(object):
>              self.xlist.append(fd)
> 
>      def poll(self, timeout):
> -        if timeout == -1:
> -            # epoll uses -1 for infinite timeout, select uses None.
> -            timeout = None
> -        else:
> -            timeout = float(timeout) / 1000
>          # XXX workaround a bug in eventlet
>          # see https://github.com/eventlet/eventlet/pull/25
>          if timeout == 0 and _using_eventlet_green_select():
>              timeout = 0.1
> +        if sys.platform == 'win32':
> +            events = self.rlist + self.wlist + self.xlist
> +            if not events:
> +                return []
> +            if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> +                raise WindowsError("Cannot handle more than maximum wait"
> +                                   "objects\n")
> +
> +            # win32event.INFINITE timeout is -1
> +            # timeout must be an int number, expressed in ms
> +            if timeout == 0.1:
> +                timeout = 100
> +            else:
> +                timeout = int(timeout)
> +
> +            # Wait until any of the events is set to signaled
> +            try:
> +                retval = winutils.win32event.WaitForMultipleObjects(
> +                    events,
> +                    False,  # Wait all
> +                    timeout)
> +            except winutils.pywintypes.error:
> +                    return [(0, POLLERR)]
> 
> -        rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
> -                                            timeout)
> -        events_dict = {}
> -        for fd in rlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> -        for fd in wlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> -        for fd in xlist:
> -            events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> -                                                        POLLHUP |
> -                                                        POLLNVAL)
> -        return list(events_dict.items())
> +            if retval == winutils.winerror.WAIT_TIMEOUT:
> +                return []
> +
> +            return [(events[retval], 0)]
> +        else:
> +            if timeout == -1:
> +                # epoll uses -1 for infinite timeout, select uses None.
> +                timeout = None
> +            else:
> +                timeout = float(timeout) / 1000
> +            rlist, wlist, xlist = select.select(self.rlist,
> +                                                self.wlist,
> +                                                self.xlist,
> +                                                timeout)
> +            events_dict = {}
> +            for fd in rlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> +            for fd in wlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> +            for fd in xlist:
> +                events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> +                                                            POLLHUP |
> +                                                            POLLNVAL)
> +            return list(events_dict.items())
> 
> 
>  SelectPoll = _SelectSelect
> diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index
> b358b05..fb6cee4 100644
> --- a/python/ovs/socket_util.py
> +++ b/python/ovs/socket_util.py
> @@ -17,6 +17,7 @@ import os
>  import os.path
>  import random
>  import socket
> +import sys
> 
>  import six
>  from six.moves import range
> @@ -25,6 +26,10 @@ import ovs.fatal_signal  import ovs.poller  import
> ovs.vlog
> 
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import win32file
> +
>  vlog = ovs.vlog.Vlog("socket_util")
> 
> 
> @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
> 
>  def check_connection_completion(sock):
>      p = ovs.poller.SelectPoll()
> -    p.register(sock, ovs.poller.POLLOUT)
> +    if sys.platform == "win32":
> +        event = winutils.get_new_event(None, False, True, None)
> +        # Receive notification of readiness for writing, of completed
> +        # connection or multipoint join operation, and of socket closure.
> +        win32file.WSAEventSelect(sock, event,
> +                                 win32file.FD_WRITE |
> +                                 win32file.FD_CONNECT |
> +                                 win32file.FD_CLOSE)
> +        p.register(event, ovs.poller.POLLOUT)
> +    else:
> +        p.register(sock, ovs.poller.POLLOUT)
>      pfds = p.poll(0)
>      if len(pfds) == 1:
>          revents = pfds[0][1]
> @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port,
> dscp):
>          try:
>              sock.connect(address)
>          except socket.error as e:
> -            if get_exception_errno(e) != errno.EINPROGRESS:
> +            error = get_exception_errno(e)
> +            if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EINPROGRESS on Unix.
> +                error = errno.EINPROGRESS
> +            if error != errno.EINPROGRESS:
>                  raise
>          return 0, sock
>      except socket.error as e:
> @@ -257,9 +277,12 @@ def get_null_fd():
>      global null_fd
>      if null_fd < 0:
>          try:
> -            null_fd = os.open("/dev/null", os.O_RDWR)
> +            # os.devnull ensures compatibility with Windows, returns
> +            # '/dev/null' for Unix and 'nul' for Windows
> +            null_fd = os.open(os.devnull, os.O_RDWR)
>          except OSError as e:
> -            vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> +            vlog.err("could not open %s: %s" % (os.devnull,
> +                                                os.strerror(e.errno)))
>              return -e.errno
>      return null_fd
> 
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py index
> cd57eb3..e8e5700 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -15,6 +15,7 @@
>  import errno
>  import os
>  import socket
> +import sys
> 
>  import six
> 
> @@ -27,6 +28,13 @@ try:
>  except ImportError:
>      SSL = None
> 
> +if sys.platform == 'win32':
> +    import ovs.winutils as winutils
> +    import pywintypes
> +    import win32event
> +    import win32file
> +    import win32pipe
> +
>  vlog = ovs.vlog.Vlog("stream")
> 
> 
> @@ -63,6 +71,13 @@ class Stream(object):
>      _SSL_certificate_file = None
>      _SSL_ca_cert_file = None
> 
> +    # Windows only
> +    _write = None                # overlapped for write operation
> +    _read = None                 # overlapped for read operation
> +    _write_pending = False
> +    _read_pending = False
> +    _retry_connect = False
> +
>      @staticmethod
>      def register_method(method, cls):
>          Stream._SOCKET_METHODS[method + ":"] = cls @@ -81,8 +96,23 @@
> class Stream(object):
>          otherwise False."""
>          return bool(Stream._find_method(name))
> 
> -    def __init__(self, socket, name, status):
> +    def __init__(self, socket, name, status, pipe=None, is_server=False):
>          self.socket = socket
> +        self.pipe = pipe
> +        if sys.platform == 'win32':
> +            self._read = pywintypes.OVERLAPPED()
> +            self._read.hEvent = winutils.get_new_event()
> +            self._write = pywintypes.OVERLAPPED()
> +            self._write.hEvent = winutils.get_new_event()
> +            if pipe is not None:
> +                # Flag to check if fd is a server HANDLE.  In the case of a
> +                # server handle we have to issue a disconnect before closing
> +                # the actual handle.
> +                self._server = is_server
> +                suffix = name.split(":", 1)[1]
> +                suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +                self._pipename = winutils.get_pipe_name(suffix)
> +
>          self.name = name
>          if status == errno.EAGAIN:
>              self.state = Stream.__S_CONNECTING @@ -120,6 +150,38 @@ class
> Stream(object):
>          suffix = name.split(":", 1)[1]
>          if name.startswith("unix:"):
>              suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            if sys.platform == 'win32':
> +                pipename = winutils.get_pipe_name(suffix)
> +
> +                if len(suffix) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    # In case of "unix:" argument, the assumption is that
> +                    # there is a file created in the path (suffix).
> +                    open(suffix, 'r').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                try:
> +                    npipe = winutils.create_file(pipename)
> +                    try:
> +                        winutils.set_pipe_mode(npipe,
> +                                               win32pipe.PIPE_READMODE_BYTE)
> +                    except pywintypes.error as e:
> +                        return errno.ENOENT, None
> +                except pywintypes.error as e:
> +                    if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                        # Pipe is busy, set the retry flag to true and retry
> +                        # again during the connect function.
> +                        Stream.retry_connect = True
> +                        return 0, cls(None, name, errno.EAGAIN,
> +                                      pipe=win32file.INVALID_HANDLE_VALUE,
> +                                      is_server=False)
> +                    return errno.ENOENT, None
> +                return 0, cls(None, name, 0, pipe=npipe,
> + is_server=False)
> +
>          error, sock = cls._open(suffix, dscp)
>          if error:
>              return error, None
> @@ -145,6 +207,10 @@ class Stream(object):
>          if not error:
>              while True:
>                  error = stream.connect()
> +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      break
>                  stream.run()
> @@ -152,7 +218,8 @@ class Stream(object):
>                  stream.run_wait(poller)
>                  stream.connect_wait(poller)
>                  poller.block()
> -            assert error != errno.EINPROGRESS
> +            if stream.socket is not None:
> +                assert error != errno.EINPROGRESS
> 
>          if error and stream:
>              stream.close()
> @@ -160,11 +227,35 @@ class Stream(object):
>          return error, stream
> 
>      def close(self):
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            if self._server:
> +                win32pipe.DisconnectNamedPipe(self.pipe)
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self._read.hEvent, vlog.warn)
> +            winutils.close_handle(self._write.hEvent, vlog.warn)
> 
>      def __scs_connecting(self):
> -        retval = ovs.socket_util.check_connection_completion(self.socket)
> -        assert retval != errno.EINPROGRESS
> +        if self.socket is not None:
> +            retval = ovs.socket_util.check_connection_completion(self.socket)
> +            assert retval != errno.EINPROGRESS
> +        elif sys.platform == 'win32' and self.retry_connect:
> +            try:
> +                self.pipe = winutils.create_file(self._pipename)
> +                self._retry_connect = False
> +                retval = 0
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> +                    retval = errno.EAGAIN
> +                else:
> +                    self._retry_connect = False
> +                    retval = errno.ENOENT
> +        else:
> +            # Windows only, if retry_connect is false, it means it's already
> +            # connected so we can set the value of retval to 0
> +            retval = 0
> +
>          if retval == 0:
>              self.state = Stream.__S_CONNECTED
>          elif retval != errno.EAGAIN:
> @@ -209,11 +300,63 @@ class Stream(object):
>          elif n == 0:
>              return (0, "")
> 
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__recv_windows(n)
> +
>          try:
>              return (0, self.socket.recv(n))
>          except socket.error as e:
>              return (ovs.socket_util.get_exception_errno(e), "")
> 
> +    def __recv_windows(self, n):
> +        if self._read_pending:
> +            try:
> +                nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                            self._read,
> +                                                            False)
> +                self._read_pending = False
> +                recvBuffer = self._read_buffer[:nBytesRead]
> +
> +                return (0, winutils.get_decoded_buffer(recvBuffer))
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return (errno.EAGAIN, "")
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return 0.
> +                    return (0, "")
> +                else:
> +                    return (errno.EINVAL, "")
> +
> +        (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> +                                                          n,
> +                                                          self._read)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._read_pending = True
> +                return (errno.EAGAIN, "")
> +            elif errCode in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (errCode, "")
> +
> +        try:
> +            nBytesRead = winutils.get_overlapped_result(self.pipe,
> +                                                        self._read,
> +                                                        False)
> +            winutils.win32event.SetEvent(self._read.hEvent)
> +        except pywintypes.error as e:
> +            if e.winerror in winutils.pipe_disconnected_errors:
> +                # If the pipe was disconnected, return 0.
> +                return (0, "")
> +            else:
> +                return (e.winerror, "")
> +
> +        recvBuffer = self._read_buffer[:nBytesRead]
> +        return (0, winutils.get_decoded_buffer(recvBuffer))
> +
>      def send(self, buf):
>          """Tries to send 'buf' on this stream.
> 
> @@ -231,6 +374,9 @@ class Stream(object):
>          elif len(buf) == 0:
>              return 0
> 
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__send_windows(buf)
> +
>          try:
>              # Python 3 has separate types for strings and bytes.  We must have
>              # bytes here.
> @@ -240,6 +386,40 @@ class Stream(object):
>          except socket.error as e:
>              return -ovs.socket_util.get_exception_errno(e)
> 
> +    def __send_windows(self, buf):
> +        if self._write_pending:
> +            try:
> +                nBytesWritten = winutils.get_overlapped_result(self.pipe,
> +                                                               self._write,
> +                                                               False)
> +                self._write_pending = False
> +                return nBytesWritten
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self._read_pending = True
> +                    return -errno.EAGAIN
> +                elif e.winerror in winutils.pipe_disconnected_errors:
> +                    # If the pipe was disconnected, return connection reset.
> +                    return -errno.ECONNRESET
> +                else:
> +                    return -errno.EINVAL
> +
> +        buf = winutils.get_encoded_buffer(buf)
> +        self._write_pending = False
> +        (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> +                                                       buf,
> +                                                       self._write)
> +        if errCode:
> +            if errCode == winutils.winerror.ERROR_IO_PENDING:
> +                self._write_pending = True
> +                return -errno.EAGAIN
> +            if (not nBytesWritten and
> +                    errCode in winutils.pipe_disconnected_errors):
> +                # If the pipe was disconnected, return connection reset.
> +                return -errno.ECONNRESET
> +        return nBytesWritten
> +
>      def run(self):
>          pass
> 
> @@ -255,11 +435,52 @@ class Stream(object):
> 
>          if self.state == Stream.__S_CONNECTING:
>              wait = Stream.W_CONNECT
> +
> +        if sys.platform == 'win32':
> +            self.__wait_windows(poller, wait)
> +            return
> +
>          if wait == Stream.W_RECV:
>              poller.fd_wait(self.socket, ovs.poller.POLLIN)
>          else:
>              poller.fd_wait(self.socket, ovs.poller.POLLOUT)
> 
> +    def __wait_windows(self, poller, wait):
> +        if self.socket is not None:
> +            if wait == Stream.W_RECV:
> +                read_flags = (win32file.FD_READ |
> +                              win32file.FD_ACCEPT |
> +                              win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._read.hEvent,
> +                                             read_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            else:
> +                write_flags = (win32file.FD_WRITE |
> +                               win32file.FD_CONNECT |
> +                               win32file.FD_CLOSE)
> +                try:
> +                    win32file.WSAEventSelect(self.socket,
> +                                             self._write.hEvent,
> +                                             write_flags)
> +                except pywintypes.error as e:
> +                    vlog.err("failed to associate events with socket: %s"
> +                             % e.strerror)
> +                poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> +        else:
> +            if wait == Stream.W_RECV:
> +                if self._read:
> +                    poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> +            elif wait == Stream.W_SEND:
> +                if self._write:
> +                    poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> +            elif wait == Stream.W_CONNECT:
> +                return
> +
>      def connect_wait(self, poller):
>          self.wait(poller, Stream.W_CONNECT)
> 
> @@ -267,11 +488,22 @@ class Stream(object):
>          self.wait(poller, Stream.W_RECV)
> 
>      def send_wait(self, poller):
> +        if sys.platform == 'win32':
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
>          self.wait(poller, Stream.W_SEND)
> 
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._read.hEvent:
> +                winutils.close_handle(self._read.hEvent)
> +            if self._write.hEvent:
> +                winutils.close_handle(self._write.hEvent)
> 
>      @staticmethod
>      def ssl_set_private_key_file(file_name):
> @@ -287,6 +519,10 @@ class Stream(object):
> 
> 
>  class PassiveStream(object):
> +    # Windows only
> +    connect = None                  # overlapped for read operation
> +    connect_pending = False
> +
>      @staticmethod
>      def is_valid_name(name):
>          """Returns True if 'name' is a passive stream name in the form @@ -
> 294,9 +530,18 @@ class PassiveStream(object):
>          "punix:" or "ptcp"), otherwise False."""
>          return name.startswith("punix:") | name.startswith("ptcp:")
> 
> -    def __init__(self, sock, name, bind_path):
> +    def __init__(self, sock, name, bind_path, pipe=None):
>          self.name = name
> +        self.pipe = pipe
>          self.socket = sock
> +        if pipe is not None:
> +            self.connect = pywintypes.OVERLAPPED()
> +            self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> +            self.connect_pending = False
> +            suffix = name.split(":", 1)[1]
> +            suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> +            self._pipename = winutils.get_pipe_name(suffix)
> +
>          self.bind_path = bind_path
> 
>      @staticmethod
> @@ -315,11 +560,27 @@ class PassiveStream(object):
>          bind_path = name[6:]
>          if name.startswith("punix:"):
>              bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> -            error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> -                                                           True, bind_path,
> -                                                           None)
> -            if error:
> -                return error, None
> +            if sys.platform != 'win32':
> +                error, sock = ovs.socket_util.make_unix_socket(
> +                    socket.SOCK_STREAM, True, bind_path, None)
> +                if error:
> +                    return error, None
> +            else:
> +                # Branch used only on Windows
> +                try:
> +                    open(bind_path, 'w').close()
> +                except:
> +                    return errno.ENOENT, None
> +
> +                pipename = winutils.get_pipe_name(bind_path)
> +                if len(pipename) > 255:
> +                    # Return invalid argument if the name is too long
> +                    return errno.ENOENT, None
> +
> +                npipe = winutils.create_named_pipe(pipename)
> +                if not npipe:
> +                    return errno.ENOENT, None
> +                return 0, PassiveStream(None, name, bind_path,
> + pipe=npipe)
> 
>          elif name.startswith("ptcp:"):
>              sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -
> 341,7 +602,11 @@ class PassiveStream(object):
> 
>      def close(self):
>          """Closes this PassiveStream."""
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            winutils.close_handle(self.pipe, vlog.warn)
> +            winutils.close_handle(self.connect.hEvent, vlog.warn)
>          if self.bind_path is not None:
>              ovs.fatal_signal.unlink_file_now(self.bind_path)
>              self.bind_path = None
> @@ -354,28 +619,80 @@ class PassiveStream(object):
> 
>          Will not block waiting for a connection.  If no connection is ready to
>          be accepted, returns (errno.EAGAIN, None) immediately."""
> -
> +        if sys.platform == 'win32' and self.socket is None:
> +            return self.__accept_windows()
>          while True:
>              try:
>                  sock, addr = self.socket.accept()
>                  ovs.socket_util.set_nonblocking(sock)
> -                if (sock.family == socket.AF_UNIX):
> +                if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
>                      return 0, Stream(sock, "unix:%s" % addr, 0)
>                  return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
>                                                         str(addr[1])), 0)
>              except socket.error as e:
>                  error = ovs.socket_util.get_exception_errno(e)
> +                if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> +                    # WSAEWOULDBLOCK would be the equivalent on Windows
> +                    # for EAGAIN on Unix.
> +                    error = errno.EAGAIN
>                  if error != errno.EAGAIN:
>                      # XXX rate-limit
>                      vlog.dbg("accept: %s" % os.strerror(error))
>                  return error, None
> 
> +    def __accept_windows(self):
> +        if self.connect_pending:
> +            try:
> +                winutils.get_overlapped_result(self.pipe, self.connect, False)
> +            except pywintypes.error as e:
> +                if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> +                    # The operation is still pending, try again
> +                    self.connect_pending = True
> +                    return errno.EAGAIN, None
> +                else:
> +                    if self.pipe:
> +                        win32pipe.DisconnectNamedPipe(self.pipe)
> +                    return errno.EINVAL, None
> +            self.connect_pending = False
> +
> +        error = winutils.connect_named_pipe(self.pipe, self.connect)
> +        if error:
> +            if error == winutils.winerror.ERROR_IO_PENDING:
> +                self.connect_pending = True
> +                return errno.EAGAIN, None
> +            elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> +                if self.pipe:
> +                    win32pipe.DisconnectNamedPipe(self.pipe)
> +                self.connect_pending = False
> +                return errno.EINVAL, None
> +            else:
> +                win32event.SetEvent(self.connect.hEvent)
> +
> +        npipe = winutils.create_named_pipe(self._pipename)
> +        if not npipe:
> +            return errno.ENOENT, None
> +
> +        old_pipe = self.pipe
> +        self.pipe = npipe
> +        winutils.win32event.ResetEvent(self.connect.hEvent)
> +        return 0, Stream(None, self.name, 0, pipe=old_pipe)
> +
>      def wait(self, poller):
> -        poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        if sys.platform != 'win32' or self.socket is not None:
> +            poller.fd_wait(self.socket, ovs.poller.POLLIN)
> +        else:
> +            poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> 
>      def __del__(self):
>          # Don't delete the file: we might have forked.
> -        self.socket.close()
> +        if self.socket is not None:
> +            self.socket.close()
> +        if self.pipe is not None:
> +            # Check if there are any remaining valid handles and close them
> +            if self.pipe:
> +                winutils.close_handle(self.pipe)
> +            if self._connect.hEvent:
> +                winutils.close_handle(self._read.hEvent)
> 
> 
>  def usage(name):
> diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> index 8595ed8..3f3e051 100644
> --- a/python/ovs/unixctl/server.py
> +++ b/python/ovs/unixctl/server.py
> @@ -148,6 +148,10 @@ class UnixctlServer(object):
>      def run(self):
>          for _ in range(10):
>              error, stream = self._listener.accept()
> +            if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> +                # WSAEWOULDBLOCK would be the equivalent on Windows
> +                # for EAGAIN on Unix.
> +                error = errno.EAGAIN
>              if not error:
>                  rpc = ovs.jsonrpc.Connection(stream)
>                  self._conns.append(UnixctlConnection(rpc))
> diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index
> 18634e6..3eabcd7 100644
> --- a/tests/test-jsonrpc.py
> +++ b/tests/test-jsonrpc.py
> @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
> 
> 
>  def do_listen(name):
> -    error, pstream = ovs.stream.PassiveStream.open(name)
> -    if error:
> -        sys.stderr.write("could not listen on \"%s\": %s\n"
> -                         % (name, os.strerror(error)))
> -        sys.exit(1)
> +    if sys.platform != 'win32' or (
> +            ovs.daemon._detach and ovs.daemon._detached):
> +        # On Windows the child is a new process created which should be the
> +        # one that creates the PassiveStream. Without this check, the new
> +        # child process will create a new PassiveStream overwriting the one
> +        # that the parent process created.
> +        error, pstream = ovs.stream.PassiveStream.open(name)
> +        if error:
> +            sys.stderr.write("could not listen on \"%s\": %s\n"
> +                             % (name, os.strerror(error)))
> +            sys.exit(1)
> 
>      ovs.daemon.daemonize()
> 
> --
> 2.10.0.windows.1


More information about the dev mailing list