[ovs-dev] [PATCH V2 2/5] Python tests: Ported UNIX sockets to Windows
Alin Balutoiu
abalutoiu at cloudbasesolutions.com
Tue Jan 3 16:27:22 UTC 2017
Please ignore this patch, I will send another one soon.
Thanks,
Alin Balutoiu.
> -----Original Message-----
> From: Alin Balutoiu
> Sent: Tuesday, January 3, 2017 5:17 PM
> To: dev at openvswitch.org
> Cc: Alin Balutoiu <abalutoiu at cloudbasesolutions.com>; Paul Boca
> <pboca at cloudbasesolutions.com>
> Subject: [PATCH V2 2/5] Python tests: Ported UNIX sockets to Windows
>
> From: Alin Balutoiu <abalutoiu at cloudbasesolutions.com>
>
> Unix sockets (AF_UNIX) are not supported on Windows.
> The replacement of Unix sockets on Windows is implemented using named
> pipes, we are trying to mimic the behaviour of unix sockets.
>
> Instead of using Unix sockets to communicate between components Named
> Pipes are used. This makes the python sockets compatible with the Named
> Pipe used in Windows applications.
>
> Signed-off-by: Paul-Daniel Boca <pboca at cloudbasesolutions.com>
> Signed-off-by: Alin-Gheorghe Balutoiu <abalutoiu at cloudbasesolutions.com>
> ---
> V2: No changes.
> ---
> python/ovs/jsonrpc.py | 6 +
> python/ovs/poller.py | 72 ++++++---
> python/ovs/socket_util.py | 31 +++-
> python/ovs/stream.py | 351
> ++++++++++++++++++++++++++++++++++++++++---
> python/ovs/unixctl/server.py | 4 +
> tests/test-jsonrpc.py | 16 +-
> 6 files changed, 436 insertions(+), 44 deletions(-)
>
> diff --git a/python/ovs/jsonrpc.py b/python/ovs/jsonrpc.py index
> 6300c67..5a11500 100644
> --- a/python/ovs/jsonrpc.py
> +++ b/python/ovs/jsonrpc.py
> @@ -14,6 +14,7 @@
>
> import errno
> import os
> +import sys
>
> import six
>
> @@ -274,6 +275,11 @@ class Connection(object):
> except UnicodeError:
> error = errno.EILSEQ
> if error:
> + if (sys.platform == "win32" and
> + error == errno.WSAEWOULDBLOCK):
> + # WSAEWOULDBLOCK would be the equivalent on Windows
> + # for EAGAIN on Unix.
> + error = errno.EAGAIN
> if error == errno.EAGAIN:
> return error, None
> else:
> diff --git a/python/ovs/poller.py b/python/ovs/poller.py index
> d7cb7d3..d836483 100644
> --- a/python/ovs/poller.py
> +++ b/python/ovs/poller.py
> @@ -18,6 +18,10 @@ import ovs.vlog
> import select
> import socket
> import os
> +import sys
> +
> +if sys.platform == "win32":
> + import ovs.winutils as winutils
>
> try:
> from OpenSSL import SSL
> @@ -62,7 +66,9 @@ class _SelectSelect(object):
> if SSL and isinstance(fd, SSL.Connection):
> fd = fd.fileno()
>
> - assert isinstance(fd, int)
> + if sys.platform != 'win32':
> + # Skip this on Windows, it also register events
> + assert isinstance(fd, int)
> if events & POLLIN:
> self.rlist.append(fd)
> events &= ~POLLIN
> @@ -73,28 +79,58 @@ class _SelectSelect(object):
> self.xlist.append(fd)
>
> def poll(self, timeout):
> - if timeout == -1:
> - # epoll uses -1 for infinite timeout, select uses None.
> - timeout = None
> - else:
> - timeout = float(timeout) / 1000
> # XXX workaround a bug in eventlet
> # see https://github.com/eventlet/eventlet/pull/25
> if timeout == 0 and _using_eventlet_green_select():
> timeout = 0.1
> + if sys.platform == 'win32':
> + events = self.rlist + self.wlist + self.xlist
> + if not events:
> + return []
> + if len(events) > winutils.win32event.MAXIMUM_WAIT_OBJECTS:
> + raise WindowsError("Cannot handle more than maximum wait"
> + "objects\n")
> +
> + # win32event.INFINITE timeout is -1
> + # timeout must be an int number, expressed in ms
> + if timeout == 0.1:
> + timeout = 100
> + else:
> + timeout = int(timeout)
> +
> + # Wait until any of the events is set to signaled
> + try:
> + retval = winutils.win32event.WaitForMultipleObjects(
> + events,
> + False, # Wait all
> + timeout)
> + except winutils.pywintypes.error:
> + return [(0, POLLERR)]
>
> - rlist, wlist, xlist = select.select(self.rlist, self.wlist, self.xlist,
> - timeout)
> - events_dict = {}
> - for fd in rlist:
> - events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> - for fd in wlist:
> - events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> - for fd in xlist:
> - events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> - POLLHUP |
> - POLLNVAL)
> - return list(events_dict.items())
> + if retval == winutils.winerror.WAIT_TIMEOUT:
> + return []
> +
> + return [(events[retval], 0)]
> + else:
> + if timeout == -1:
> + # epoll uses -1 for infinite timeout, select uses None.
> + timeout = None
> + else:
> + timeout = float(timeout) / 1000
> + rlist, wlist, xlist = select.select(self.rlist,
> + self.wlist,
> + self.xlist,
> + timeout)
> + events_dict = {}
> + for fd in rlist:
> + events_dict[fd] = events_dict.get(fd, 0) | POLLIN
> + for fd in wlist:
> + events_dict[fd] = events_dict.get(fd, 0) | POLLOUT
> + for fd in xlist:
> + events_dict[fd] = events_dict.get(fd, 0) | (POLLERR |
> + POLLHUP |
> + POLLNVAL)
> + return list(events_dict.items())
>
>
> SelectPoll = _SelectSelect
> diff --git a/python/ovs/socket_util.py b/python/ovs/socket_util.py index
> b358b05..fb6cee4 100644
> --- a/python/ovs/socket_util.py
> +++ b/python/ovs/socket_util.py
> @@ -17,6 +17,7 @@ import os
> import os.path
> import random
> import socket
> +import sys
>
> import six
> from six.moves import range
> @@ -25,6 +26,10 @@ import ovs.fatal_signal import ovs.poller import
> ovs.vlog
>
> +if sys.platform == 'win32':
> + import ovs.winutils as winutils
> + import win32file
> +
> vlog = ovs.vlog.Vlog("socket_util")
>
>
> @@ -158,7 +163,17 @@ def make_unix_socket(style, nonblock, bind_path,
> connect_path, short=False):
>
> def check_connection_completion(sock):
> p = ovs.poller.SelectPoll()
> - p.register(sock, ovs.poller.POLLOUT)
> + if sys.platform == "win32":
> + event = winutils.get_new_event(None, False, True, None)
> + # Receive notification of readiness for writing, of completed
> + # connection or multipoint join operation, and of socket closure.
> + win32file.WSAEventSelect(sock, event,
> + win32file.FD_WRITE |
> + win32file.FD_CONNECT |
> + win32file.FD_CLOSE)
> + p.register(event, ovs.poller.POLLOUT)
> + else:
> + p.register(sock, ovs.poller.POLLOUT)
> pfds = p.poll(0)
> if len(pfds) == 1:
> revents = pfds[0][1]
> @@ -228,7 +243,12 @@ def inet_open_active(style, target, default_port,
> dscp):
> try:
> sock.connect(address)
> except socket.error as e:
> - if get_exception_errno(e) != errno.EINPROGRESS:
> + error = get_exception_errno(e)
> + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> + # WSAEWOULDBLOCK would be the equivalent on Windows
> + # for EINPROGRESS on Unix.
> + error = errno.EINPROGRESS
> + if error != errno.EINPROGRESS:
> raise
> return 0, sock
> except socket.error as e:
> @@ -257,9 +277,12 @@ def get_null_fd():
> global null_fd
> if null_fd < 0:
> try:
> - null_fd = os.open("/dev/null", os.O_RDWR)
> + # os.devnull ensures compatibility with Windows, returns
> + # '/dev/null' for Unix and 'nul' for Windows
> + null_fd = os.open(os.devnull, os.O_RDWR)
> except OSError as e:
> - vlog.err("could not open /dev/null: %s" % os.strerror(e.errno))
> + vlog.err("could not open %s: %s" % (os.devnull,
> + os.strerror(e.errno)))
> return -e.errno
> return null_fd
>
> diff --git a/python/ovs/stream.py b/python/ovs/stream.py index
> cd57eb3..e8e5700 100644
> --- a/python/ovs/stream.py
> +++ b/python/ovs/stream.py
> @@ -15,6 +15,7 @@
> import errno
> import os
> import socket
> +import sys
>
> import six
>
> @@ -27,6 +28,13 @@ try:
> except ImportError:
> SSL = None
>
> +if sys.platform == 'win32':
> + import ovs.winutils as winutils
> + import pywintypes
> + import win32event
> + import win32file
> + import win32pipe
> +
> vlog = ovs.vlog.Vlog("stream")
>
>
> @@ -63,6 +71,13 @@ class Stream(object):
> _SSL_certificate_file = None
> _SSL_ca_cert_file = None
>
> + # Windows only
> + _write = None # overlapped for write operation
> + _read = None # overlapped for read operation
> + _write_pending = False
> + _read_pending = False
> + _retry_connect = False
> +
> @staticmethod
> def register_method(method, cls):
> Stream._SOCKET_METHODS[method + ":"] = cls @@ -81,8 +96,23 @@
> class Stream(object):
> otherwise False."""
> return bool(Stream._find_method(name))
>
> - def __init__(self, socket, name, status):
> + def __init__(self, socket, name, status, pipe=None, is_server=False):
> self.socket = socket
> + self.pipe = pipe
> + if sys.platform == 'win32':
> + self._read = pywintypes.OVERLAPPED()
> + self._read.hEvent = winutils.get_new_event()
> + self._write = pywintypes.OVERLAPPED()
> + self._write.hEvent = winutils.get_new_event()
> + if pipe is not None:
> + # Flag to check if fd is a server HANDLE. In the case of a
> + # server handle we have to issue a disconnect before closing
> + # the actual handle.
> + self._server = is_server
> + suffix = name.split(":", 1)[1]
> + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> + self._pipename = winutils.get_pipe_name(suffix)
> +
> self.name = name
> if status == errno.EAGAIN:
> self.state = Stream.__S_CONNECTING @@ -120,6 +150,38 @@ class
> Stream(object):
> suffix = name.split(":", 1)[1]
> if name.startswith("unix:"):
> suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> + if sys.platform == 'win32':
> + pipename = winutils.get_pipe_name(suffix)
> +
> + if len(suffix) > 255:
> + # Return invalid argument if the name is too long
> + return errno.ENOENT, None
> +
> + try:
> + # In case of "unix:" argument, the assumption is that
> + # there is a file created in the path (suffix).
> + open(suffix, 'r').close()
> + except:
> + return errno.ENOENT, None
> +
> + try:
> + npipe = winutils.create_file(pipename)
> + try:
> + winutils.set_pipe_mode(npipe,
> + win32pipe.PIPE_READMODE_BYTE)
> + except pywintypes.error as e:
> + return errno.ENOENT, None
> + except pywintypes.error as e:
> + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> + # Pipe is busy, set the retry flag to true and retry
> + # again during the connect function.
> + Stream.retry_connect = True
> + return 0, cls(None, name, errno.EAGAIN,
> + pipe=win32file.INVALID_HANDLE_VALUE,
> + is_server=False)
> + return errno.ENOENT, None
> + return 0, cls(None, name, 0, pipe=npipe,
> + is_server=False)
> +
> error, sock = cls._open(suffix, dscp)
> if error:
> return error, None
> @@ -145,6 +207,10 @@ class Stream(object):
> if not error:
> while True:
> error = stream.connect()
> + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> + # WSAEWOULDBLOCK would be the equivalent on Windows
> + # for EAGAIN on Unix.
> + error = errno.EAGAIN
> if error != errno.EAGAIN:
> break
> stream.run()
> @@ -152,7 +218,8 @@ class Stream(object):
> stream.run_wait(poller)
> stream.connect_wait(poller)
> poller.block()
> - assert error != errno.EINPROGRESS
> + if stream.socket is not None:
> + assert error != errno.EINPROGRESS
>
> if error and stream:
> stream.close()
> @@ -160,11 +227,35 @@ class Stream(object):
> return error, stream
>
> def close(self):
> - self.socket.close()
> + if self.socket is not None:
> + self.socket.close()
> + if self.pipe is not None:
> + if self._server:
> + win32pipe.DisconnectNamedPipe(self.pipe)
> + winutils.close_handle(self.pipe, vlog.warn)
> + winutils.close_handle(self._read.hEvent, vlog.warn)
> + winutils.close_handle(self._write.hEvent, vlog.warn)
>
> def __scs_connecting(self):
> - retval = ovs.socket_util.check_connection_completion(self.socket)
> - assert retval != errno.EINPROGRESS
> + if self.socket is not None:
> + retval = ovs.socket_util.check_connection_completion(self.socket)
> + assert retval != errno.EINPROGRESS
> + elif sys.platform == 'win32' and self.retry_connect:
> + try:
> + self.pipe = winutils.create_file(self._pipename)
> + self._retry_connect = False
> + retval = 0
> + except pywintypes.error as e:
> + if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
> + retval = errno.EAGAIN
> + else:
> + self._retry_connect = False
> + retval = errno.ENOENT
> + else:
> + # Windows only, if retry_connect is false, it means it's already
> + # connected so we can set the value of retval to 0
> + retval = 0
> +
> if retval == 0:
> self.state = Stream.__S_CONNECTED
> elif retval != errno.EAGAIN:
> @@ -209,11 +300,63 @@ class Stream(object):
> elif n == 0:
> return (0, "")
>
> + if sys.platform == 'win32' and self.socket is None:
> + return self.__recv_windows(n)
> +
> try:
> return (0, self.socket.recv(n))
> except socket.error as e:
> return (ovs.socket_util.get_exception_errno(e), "")
>
> + def __recv_windows(self, n):
> + if self._read_pending:
> + try:
> + nBytesRead = winutils.get_overlapped_result(self.pipe,
> + self._read,
> + False)
> + self._read_pending = False
> + recvBuffer = self._read_buffer[:nBytesRead]
> +
> + return (0, winutils.get_decoded_buffer(recvBuffer))
> + except pywintypes.error as e:
> + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> + # The operation is still pending, try again
> + self._read_pending = True
> + return (errno.EAGAIN, "")
> + elif e.winerror in winutils.pipe_disconnected_errors:
> + # If the pipe was disconnected, return 0.
> + return (0, "")
> + else:
> + return (errno.EINVAL, "")
> +
> + (errCode, self._read_buffer) = winutils.read_file(self.pipe,
> + n,
> + self._read)
> + if errCode:
> + if errCode == winutils.winerror.ERROR_IO_PENDING:
> + self._read_pending = True
> + return (errno.EAGAIN, "")
> + elif errCode in winutils.pipe_disconnected_errors:
> + # If the pipe was disconnected, return 0.
> + return (0, "")
> + else:
> + return (errCode, "")
> +
> + try:
> + nBytesRead = winutils.get_overlapped_result(self.pipe,
> + self._read,
> + False)
> + winutils.win32event.SetEvent(self._read.hEvent)
> + except pywintypes.error as e:
> + if e.winerror in winutils.pipe_disconnected_errors:
> + # If the pipe was disconnected, return 0.
> + return (0, "")
> + else:
> + return (e.winerror, "")
> +
> + recvBuffer = self._read_buffer[:nBytesRead]
> + return (0, winutils.get_decoded_buffer(recvBuffer))
> +
> def send(self, buf):
> """Tries to send 'buf' on this stream.
>
> @@ -231,6 +374,9 @@ class Stream(object):
> elif len(buf) == 0:
> return 0
>
> + if sys.platform == 'win32' and self.socket is None:
> + return self.__send_windows(buf)
> +
> try:
> # Python 3 has separate types for strings and bytes. We must have
> # bytes here.
> @@ -240,6 +386,40 @@ class Stream(object):
> except socket.error as e:
> return -ovs.socket_util.get_exception_errno(e)
>
> + def __send_windows(self, buf):
> + if self._write_pending:
> + try:
> + nBytesWritten = winutils.get_overlapped_result(self.pipe,
> + self._write,
> + False)
> + self._write_pending = False
> + return nBytesWritten
> + except pywintypes.error as e:
> + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> + # The operation is still pending, try again
> + self._read_pending = True
> + return -errno.EAGAIN
> + elif e.winerror in winutils.pipe_disconnected_errors:
> + # If the pipe was disconnected, return connection reset.
> + return -errno.ECONNRESET
> + else:
> + return -errno.EINVAL
> +
> + buf = winutils.get_encoded_buffer(buf)
> + self._write_pending = False
> + (errCode, nBytesWritten) = winutils.write_file(self.pipe,
> + buf,
> + self._write)
> + if errCode:
> + if errCode == winutils.winerror.ERROR_IO_PENDING:
> + self._write_pending = True
> + return -errno.EAGAIN
> + if (not nBytesWritten and
> + errCode in winutils.pipe_disconnected_errors):
> + # If the pipe was disconnected, return connection reset.
> + return -errno.ECONNRESET
> + return nBytesWritten
> +
> def run(self):
> pass
>
> @@ -255,11 +435,52 @@ class Stream(object):
>
> if self.state == Stream.__S_CONNECTING:
> wait = Stream.W_CONNECT
> +
> + if sys.platform == 'win32':
> + self.__wait_windows(poller, wait)
> + return
> +
> if wait == Stream.W_RECV:
> poller.fd_wait(self.socket, ovs.poller.POLLIN)
> else:
> poller.fd_wait(self.socket, ovs.poller.POLLOUT)
>
> + def __wait_windows(self, poller, wait):
> + if self.socket is not None:
> + if wait == Stream.W_RECV:
> + read_flags = (win32file.FD_READ |
> + win32file.FD_ACCEPT |
> + win32file.FD_CLOSE)
> + try:
> + win32file.WSAEventSelect(self.socket,
> + self._read.hEvent,
> + read_flags)
> + except pywintypes.error as e:
> + vlog.err("failed to associate events with socket: %s"
> + % e.strerror)
> + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> + else:
> + write_flags = (win32file.FD_WRITE |
> + win32file.FD_CONNECT |
> + win32file.FD_CLOSE)
> + try:
> + win32file.WSAEventSelect(self.socket,
> + self._write.hEvent,
> + write_flags)
> + except pywintypes.error as e:
> + vlog.err("failed to associate events with socket: %s"
> + % e.strerror)
> + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> + else:
> + if wait == Stream.W_RECV:
> + if self._read:
> + poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
> + elif wait == Stream.W_SEND:
> + if self._write:
> + poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
> + elif wait == Stream.W_CONNECT:
> + return
> +
> def connect_wait(self, poller):
> self.wait(poller, Stream.W_CONNECT)
>
> @@ -267,11 +488,22 @@ class Stream(object):
> self.wait(poller, Stream.W_RECV)
>
> def send_wait(self, poller):
> + if sys.platform == 'win32':
> + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
> self.wait(poller, Stream.W_SEND)
>
> def __del__(self):
> # Don't delete the file: we might have forked.
> - self.socket.close()
> + if self.socket is not None:
> + self.socket.close()
> + if self.pipe is not None:
> + # Check if there are any remaining valid handles and close them
> + if self.pipe:
> + winutils.close_handle(self.pipe)
> + if self._read.hEvent:
> + winutils.close_handle(self._read.hEvent)
> + if self._write.hEvent:
> + winutils.close_handle(self._write.hEvent)
>
> @staticmethod
> def ssl_set_private_key_file(file_name):
> @@ -287,6 +519,10 @@ class Stream(object):
>
>
> class PassiveStream(object):
> + # Windows only
> + connect = None # overlapped for read operation
> + connect_pending = False
> +
> @staticmethod
> def is_valid_name(name):
> """Returns True if 'name' is a passive stream name in the form @@ -
> 294,9 +530,18 @@ class PassiveStream(object):
> "punix:" or "ptcp"), otherwise False."""
> return name.startswith("punix:") | name.startswith("ptcp:")
>
> - def __init__(self, sock, name, bind_path):
> + def __init__(self, sock, name, bind_path, pipe=None):
> self.name = name
> + self.pipe = pipe
> self.socket = sock
> + if pipe is not None:
> + self.connect = pywintypes.OVERLAPPED()
> + self.connect.hEvent = winutils.get_new_event(bManualReset=True)
> + self.connect_pending = False
> + suffix = name.split(":", 1)[1]
> + suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
> + self._pipename = winutils.get_pipe_name(suffix)
> +
> self.bind_path = bind_path
>
> @staticmethod
> @@ -315,11 +560,27 @@ class PassiveStream(object):
> bind_path = name[6:]
> if name.startswith("punix:"):
> bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
> - error, sock =
> ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
> - True, bind_path,
> - None)
> - if error:
> - return error, None
> + if sys.platform != 'win32':
> + error, sock = ovs.socket_util.make_unix_socket(
> + socket.SOCK_STREAM, True, bind_path, None)
> + if error:
> + return error, None
> + else:
> + # Branch used only on Windows
> + try:
> + open(bind_path, 'w').close()
> + except:
> + return errno.ENOENT, None
> +
> + pipename = winutils.get_pipe_name(bind_path)
> + if len(pipename) > 255:
> + # Return invalid argument if the name is too long
> + return errno.ENOENT, None
> +
> + npipe = winutils.create_named_pipe(pipename)
> + if not npipe:
> + return errno.ENOENT, None
> + return 0, PassiveStream(None, name, bind_path,
> + pipe=npipe)
>
> elif name.startswith("ptcp:"):
> sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -
> 341,7 +602,11 @@ class PassiveStream(object):
>
> def close(self):
> """Closes this PassiveStream."""
> - self.socket.close()
> + if self.socket is not None:
> + self.socket.close()
> + if self.pipe is not None:
> + winutils.close_handle(self.pipe, vlog.warn)
> + winutils.close_handle(self.connect.hEvent, vlog.warn)
> if self.bind_path is not None:
> ovs.fatal_signal.unlink_file_now(self.bind_path)
> self.bind_path = None
> @@ -354,28 +619,80 @@ class PassiveStream(object):
>
> Will not block waiting for a connection. If no connection is ready to
> be accepted, returns (errno.EAGAIN, None) immediately."""
> -
> + if sys.platform == 'win32' and self.socket is None:
> + return self.__accept_windows()
> while True:
> try:
> sock, addr = self.socket.accept()
> ovs.socket_util.set_nonblocking(sock)
> - if (sock.family == socket.AF_UNIX):
> + if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
> return 0, Stream(sock, "unix:%s" % addr, 0)
> return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
> str(addr[1])), 0)
> except socket.error as e:
> error = ovs.socket_util.get_exception_errno(e)
> + if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
> + # WSAEWOULDBLOCK would be the equivalent on Windows
> + # for EAGAIN on Unix.
> + error = errno.EAGAIN
> if error != errno.EAGAIN:
> # XXX rate-limit
> vlog.dbg("accept: %s" % os.strerror(error))
> return error, None
>
> + def __accept_windows(self):
> + if self.connect_pending:
> + try:
> + winutils.get_overlapped_result(self.pipe, self.connect, False)
> + except pywintypes.error as e:
> + if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
> + # The operation is still pending, try again
> + self.connect_pending = True
> + return errno.EAGAIN, None
> + else:
> + if self.pipe:
> + win32pipe.DisconnectNamedPipe(self.pipe)
> + return errno.EINVAL, None
> + self.connect_pending = False
> +
> + error = winutils.connect_named_pipe(self.pipe, self.connect)
> + if error:
> + if error == winutils.winerror.ERROR_IO_PENDING:
> + self.connect_pending = True
> + return errno.EAGAIN, None
> + elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
> + if self.pipe:
> + win32pipe.DisconnectNamedPipe(self.pipe)
> + self.connect_pending = False
> + return errno.EINVAL, None
> + else:
> + win32event.SetEvent(self.connect.hEvent)
> +
> + npipe = winutils.create_named_pipe(self._pipename)
> + if not npipe:
> + return errno.ENOENT, None
> +
> + old_pipe = self.pipe
> + self.pipe = npipe
> + winutils.win32event.ResetEvent(self.connect.hEvent)
> + return 0, Stream(None, self.name, 0, pipe=old_pipe)
> +
> def wait(self, poller):
> - poller.fd_wait(self.socket, ovs.poller.POLLIN)
> + if sys.platform != 'win32' or self.socket is not None:
> + poller.fd_wait(self.socket, ovs.poller.POLLIN)
> + else:
> + poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
>
> def __del__(self):
> # Don't delete the file: we might have forked.
> - self.socket.close()
> + if self.socket is not None:
> + self.socket.close()
> + if self.pipe is not None:
> + # Check if there are any remaining valid handles and close them
> + if self.pipe:
> + winutils.close_handle(self.pipe)
> + if self._connect.hEvent:
> + winutils.close_handle(self._read.hEvent)
>
>
> def usage(name):
> diff --git a/python/ovs/unixctl/server.py b/python/ovs/unixctl/server.py
> index 8595ed8..3f3e051 100644
> --- a/python/ovs/unixctl/server.py
> +++ b/python/ovs/unixctl/server.py
> @@ -148,6 +148,10 @@ class UnixctlServer(object):
> def run(self):
> for _ in range(10):
> error, stream = self._listener.accept()
> + if sys.platform == "win32" and error == errno.WSAEWOULDBLOCK:
> + # WSAEWOULDBLOCK would be the equivalent on Windows
> + # for EAGAIN on Unix.
> + error = errno.EAGAIN
> if not error:
> rpc = ovs.jsonrpc.Connection(stream)
> self._conns.append(UnixctlConnection(rpc))
> diff --git a/tests/test-jsonrpc.py b/tests/test-jsonrpc.py index
> 18634e6..3eabcd7 100644
> --- a/tests/test-jsonrpc.py
> +++ b/tests/test-jsonrpc.py
> @@ -53,11 +53,17 @@ def handle_rpc(rpc, msg):
>
>
> def do_listen(name):
> - error, pstream = ovs.stream.PassiveStream.open(name)
> - if error:
> - sys.stderr.write("could not listen on \"%s\": %s\n"
> - % (name, os.strerror(error)))
> - sys.exit(1)
> + if sys.platform != 'win32' or (
> + ovs.daemon._detach and ovs.daemon._detached):
> + # On Windows the child is a new process created which should be the
> + # one that creates the PassiveStream. Without this check, the new
> + # child process will create a new PassiveStream overwriting the one
> + # that the parent process created.
> + error, pstream = ovs.stream.PassiveStream.open(name)
> + if error:
> + sys.stderr.write("could not listen on \"%s\": %s\n"
> + % (name, os.strerror(error)))
> + sys.exit(1)
>
> ovs.daemon.daemonize()
>
> --
> 2.10.0.windows.1
More information about the dev
mailing list