[ovs-dev] [PATCH v2 3/4] Add file descriptor persistence where possible

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Fri Feb 14 17:54:28 UTC 2020


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

1. Adds "persistent" behaviour where feasible (streams and signals).
These are waited upon in the same thread where they are created. This
allows them to be registered persistently with the OS (if possible)
as well as the OS to provide hints - is the FD ready, is it closed,
etc.

2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
which is not ready if that fd has been registered as "private" to the
thread which waits upon it.

3. No longer breaks other parts of OVS which create the fd in one
thread and waits upon it in others.

3. Adds support for EPOLL on Linux and can be expanded to cover similar
poll++ frameworks in other OSes.

4. Sets up the necessary infrastructure to make IO/SSL multi-threaded
using a "centeral (e)poll dispatcher + IO threads" pattern

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 include/openvswitch/poll-loop.h |  56 +++++-
 lib/dpif-netlink.c              |   6 +-
 lib/fatal-signal.c              |   7 +-
 lib/latch-unix.c                |   3 +-
 lib/netdev-afxdp.c              |   2 +-
 lib/poll-loop.c                 | 320 ++++++++++++++++++++++++--------
 lib/route-table-bsd.c           |   1 +
 lib/stream-fd.c                 |  62 ++++++-
 lib/stream-ssl.c                |  50 ++++-
 lib/timeval.c                   |  83 +++++++++
 lib/timeval.h                   |   7 +
 11 files changed, 508 insertions(+), 89 deletions(-)

diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
index 532d9caa6..6d0331f6d 100644
--- a/include/openvswitch/poll-loop.h
+++ b/include/openvswitch/poll-loop.h
@@ -41,11 +41,30 @@
 #include <windows.h>
 #endif
 
+#ifdef __linux__
+#define OVS_USE_EPOLL
+#endif
+
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+
+#define OVS_POLLIN EPOLLIN
+#define OVS_POLLOUT EPOLLOUT
+#define OVS_POLLERR EPOLLERR
+#define OVS_POLLHUP EPOLLHUP
+#define OVS_ONESHOT EPOLLONESHOT
+#define OVS_POLLNVAL 0
+
+#else
+
 #define OVS_POLLIN POLLIN
 #define OVS_POLLOUT POLLOUT
 #define OVS_POLLERR POLLERR
 #define OVS_POLLNVAL POLLNVAL
 #define OVS_POLLHUP POLLHUP
+#define OVS_ONESHOT (1U << 30)
+
+#endif 
 
 #ifdef  __cplusplus
 extern "C" {
@@ -60,10 +79,43 @@ extern "C" {
  * the source code location of the caller.  The function version allows the
  * caller to supply a location explicitly, which is useful if the caller's own
  * caller would be more useful in log output.  See timer_wait_at() for an
- * example. */
-void poll_fd_wait_at(int fd, short int events, const char *where);
+ * example.
+ * Note - using on fds registered using poll_fd_register() will generate a
+ * warning as this is not an intended use.
+ */
+void poll_fd_wait_at(int fd, int events, const char *where);
 #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
 
+/* Register a fd with a persistence framework if available so it can be served
+ * "faster" and the caller can be provided with "hints" on what caused the IO
+ * event.
+ * If the "hint" argument is supplied it set to point to the pollfd structure
+ * containing the events passed by the OS in .revents. 
+ * Note - as the frameworks are OS dependent, the events are limited to what
+ * can be passed in a .revents which is a short int.
+ * Limitations - MUST BE registered from the same thread as the one where 
+ * it will be waited upon.
+ */
+
+void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
+#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
+
+/* De-register a fd which was registered as "private" with the persistence
+ * framework
+ */
+
+void poll_fd_deregister_at(int fd, const char *where);
+#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
+
+/* Schedule events to wake up the following poll_block() - "private fds"
+ * Same as poll_fd_wait, but for fds which have been registered and are
+ * expected to persist. If a "fast" OS fd notification framework is used
+ * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
+ */
+void private_poll_fd_wait_at(int fd, int events, const char *where);
+#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
+
+
 #ifdef _WIN32
 void poll_wevent_wait_at(HANDLE wevent, const char *where);
 #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index 5b5c96d72..ad5db9452 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_)
     const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
 
     if (dpif->port_notifier) {
-        nl_sock_wait(dpif->port_notifier, POLLIN);
+        nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
     } else {
         poll_immediate_wake();
     }
@@ -2756,13 +2756,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
     }
 
     for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
-        nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
+        nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
     }
 #else
     if (dpif->handlers && handler_id < dpif->n_handlers) {
         struct dpif_handler *handler = &dpif->handlers[handler_id];
 
-        poll_fd_wait(handler->epoll_fd, POLLIN);
+        poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
     }
 #endif
 }
diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
index 97d8d1dab..424636e07 100644
--- a/lib/fatal-signal.c
+++ b/lib/fatal-signal.c
@@ -96,6 +96,7 @@ fatal_signal_init(void)
         ovs_mutex_init_recursive(&mutex);
 #ifndef _WIN32
         xpipe_nonblocking(signal_fds);
+        poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
 #else
         wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
         if (!wevent) {
@@ -236,9 +237,12 @@ void
 fatal_signal_run(void)
 {
     sig_atomic_t sig_nr;
+    char sigbuffer[_POSIX_PIPE_BUF];
 
     fatal_signal_init();
 
+    read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
+
     sig_nr = stored_sig_nr;
     if (sig_nr != SIG_ATOMIC_MAX) {
         char namebuf[SIGNAL_NAME_BUFSIZE];
@@ -271,7 +275,8 @@ fatal_signal_wait(void)
 #ifdef _WIN32
     poll_wevent_wait(wevent);
 #else
-    poll_fd_wait(signal_fds[0], OVS_POLLIN);
+    /* a noop - schedule for removal */
+    private_poll_fd_wait(signal_fds[0], OVS_POLLIN);
 #endif
 }
 
diff --git a/lib/latch-unix.c b/lib/latch-unix.c
index fea61ab28..5f15b59fe 100644
--- a/lib/latch-unix.c
+++ b/lib/latch-unix.c
@@ -83,5 +83,6 @@ latch_is_set(const struct latch *latch)
 void
 latch_wait_at(const struct latch *latch, const char *where)
 {
-    poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
+    /* Ask for wait and make it one-shot if persistence is in play */
+    poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
 }
diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c
index ef367e5ea..482400d8d 100644
--- a/lib/netdev-afxdp.c
+++ b/lib/netdev-afxdp.c
@@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
 
     if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
         pfd.fd = fd;
-        pfd.events = OVS_POLLIN;
+        pfd.events = POLLIN;
 
         ret = poll(&pfd, 1, 0);
         if (OVS_UNLIKELY(ret < 0)) {
diff --git a/lib/poll-loop.c b/lib/poll-loop.c
index 3902d6c1f..10a5b0c01 100644
--- a/lib/poll-loop.c
+++ b/lib/poll-loop.c
@@ -18,6 +18,12 @@
 #include "openvswitch/poll-loop.h"
 #include <errno.h>
 #include <inttypes.h>
+#ifdef OVS_USE_EPOLL
+#include <sys/epoll.h>
+#endif
+#ifndef _WIN32
+#include <unistd.h>
+#endif
 #include <poll.h>
 #include <stdlib.h>
 #include <string.h>
@@ -31,7 +37,9 @@
 #include "timeval.h"
 #include "openvswitch/vlog.h"
 #include "openvswitch/hmap.h"
+#include "openvswitch/list.h"
 #include "hash.h"
+#include "ovs-atomic.h"
 
 VLOG_DEFINE_THIS_MODULE(poll_loop);
 
@@ -43,21 +51,32 @@ struct poll_node {
     struct pollfd pollfd;       /* Events to pass to time_poll(). */
     HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
     const char *where;          /* Where poll_node was created. */
+    bool valid;                 /* Can it be used? */
+    bool private;               /* Can we assume that it is only in this thread poll loop? */
 };
 
+#define MAX_EPOLL_EVENTS 64
+
 struct poll_loop {
-    /* All active poll waiters. */
+    /* List of all poll loops in the system */
+    struct ovs_mutex loop_mutex;
+    /* All poll waiters for this poll loop */
     struct hmap poll_nodes;
 
     /* Time at which to wake up the next call to poll_block(), LLONG_MIN to
      * wake up immediately, or LLONG_MAX to wait forever. */
     long long int timeout_when; /* In msecs as returned by time_msec(). */
     const char *timeout_where;  /* Where 'timeout_when' was set. */
+#ifdef OVS_USE_EPOLL
+    int epoll_fd;
+    struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
+#endif
 };
 
+
 static struct poll_loop *poll_loop(void);
 
-/* Look up the node with same fd or wevent. */
+/* Look up the node with same fd or wevent - should be accessed under &loop->mutex. */
 static struct poll_node *
 find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
 {
@@ -76,79 +95,142 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
     }
     return NULL;
 }
-
-/* On Unix based systems:
- *
- *     Registers 'fd' as waiting for the specified 'events' (which should be
- *     OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to
- *     poll_block() will wake up when 'fd' becomes ready for one or more of the
- *     requested events. The 'fd's are given to poll() function later.
- *
- * On Windows system:
+/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
+ * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
+ * wake up when 'fd' becomes ready for one or more of the requested events.
  *
- *     If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
- *     'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
- *     it is assumed that it is unrelated to any sockets and poll_block()
- *     will wake up on any event on that 'wevent'. It is an error to pass
- *     both 'wevent' and 'fd'.
+ * The event registration is PERSISTENT. This is intended for OSes which have a persistent
+ * event framework. For now it is implemented only for epoll and Linux, other
+ * implementations such as BSD kqueue and Solaris /dev/poll may follow.
  *
- * The event registration is one-shot: only the following call to
- * poll_block() is affected.  The event will need to be re-registered after
- * poll_block() is called if it is to persist.
+ * If the OS has no persistent even framework does nothing
  *
  * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
  * automatically provide the caller's source file and line number for
  * 'where'.) */
+
 static void
-poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
+poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint, const char *where, bool private)
 {
     struct poll_loop *loop = poll_loop();
     struct poll_node *node;
+#ifdef OVS_USE_EPOLL
+    struct epoll_event event;
+#endif
 
-    COVERAGE_INC(poll_create_node);
-
-    /* Both 'fd' and 'wevent' cannot be set. */
     ovs_assert(!fd != !wevent);
 
+    /* This is mostly uncontended, so the thread should grab it straight away.
+     * We will reuse it later to introduce threading for IO and SSL
+     */
+    ovs_mutex_lock(&loop->loop_mutex);
+
     /* Check for duplicate.  If found, "or" the events. */
     node = find_poll_node(loop, fd, wevent);
-    if (node) {
-        node->pollfd.events |= events;
-    } else {
-        node = xzalloc(sizeof *node);
-        hmap_insert(&loop->poll_nodes, &node->hmap_node,
-                    hash_2words(fd, (uint32_t)wevent));
-        node->pollfd.fd = fd;
-        node->pollfd.events = events;
-#ifdef _WIN32
-        if (!wevent) {
-            wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
+
+    if (node && node->valid) {
+#ifdef OVS_USE_EPOLL
+        int old_event_mask = node->pollfd.events;
+#endif
+        /* If there is an existing event mask we do not need to inc - this will be waited upon */
+        node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll specific bits */
+
+#ifdef OVS_USE_EPOLL
+        /* modify existing epoll entry if there is an epoll specific ask or if the
+         * mask has changed
+         */
+        if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
+            event.events = node->pollfd.events | events | EPOLLHUP | EPOLLRDHUP;
+            event.data.ptr = node;
+            epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
         }
 #endif
+    } else {
+        if (!node) {
+            node = xzalloc(sizeof *node);
+            hmap_insert(&loop->poll_nodes, &node->hmap_node,
+                        hash_2words(fd, 0));
+        } else {
+            /* node marked for reaping, OS has reused the fd number, valid is set to false */
+#ifdef OVS_USE_EPOLl
+            epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
+#endif
+        }
+        node->pollfd.fd = fd;
+        node->pollfd.events = (events & 0x0000FFFF);
         node->wevent = wevent;
         node->where = where;
+        node->valid = true;
+        node->private = private;
+#ifdef OVS_USE_EPOLL
+        event.events = node->pollfd.events | EPOLLHUP | EPOLLRDHUP; /* we always listen for fd close */
+        event.data.ptr = node;
+        epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event);
+#endif
+    }
+    if (hint) {
+        *hint = &node->pollfd;
     }
+    ovs_mutex_unlock(&loop->loop_mutex);
+}
+
+void
+poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
+    poll_fd_subscribe_at(fd, 0, events, hint, where , true);
+}
+
+/* Deregisters a fd. Note - this looks like a memory leak (deallocating only private fds)
+ * but it is not.
+ * In order to be compatible with existing calling conventions while using fd persistence
+ * where supported we have to keep "legacy" fds around for the duration of the life of
+ * the thread because we have no idea if they have been reaped properly or not.
+ * The reason for this is that for some of them the close() is in a thread different from the
+ * poll loop.
+ * Thus, the only thing we can do in this case is mark them "invalid". Once the OS reuses the
+ * same fd number, we will reuse the existing has entry.
+ */
+
+void
+poll_fd_deregister_at(int fd, const char *where) {
+    struct poll_loop *loop = poll_loop();
+
+    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
+    struct poll_node *node;
+
+    ovs_mutex_lock(&loop->loop_mutex);
+    node = find_poll_node(loop, fd, 0);
+    if (node) {
+        if (node->private) {
+#ifdef OVN_USE_EPOLL
+            epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
+            hmap_remove(&loop->poll_nodes, &node->hmap_node);
+        } else {
+            VLOG(VLL_WARN, "Trying to deregister a non-private %d from %s", fd, where);
+            node->valid = false;
+        }
+    }
+    ovs_mutex_unlock(&loop->loop_mutex);
+}
+
+void
+poll_fd_wait_at(int fd, int events, const char *where)
+{
+    poll_fd_subscribe_at(fd, 0, events, NULL, where, false);
 }
 
-/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
- * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
- * wake up when 'fd' becomes ready for one or more of the requested events.
- *
- * On Windows, 'fd' must be a socket.
- *
- * The event registration is one-shot: only the following call to poll_block()
- * is affected.  The event will need to be re-registered after poll_block() is
- * called if it is to persist.
- *
- * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
- * automatically provide the caller's source file and line number for
- * 'where'.) */
 void
-poll_fd_wait_at(int fd, short int events, const char *where)
+private_poll_fd_wait_at(int fd, int events, const char *where)
 {
-    poll_create_node(fd, 0, events, where);
+    /* POLLIN persists on "private" fds - either emulated or at epoll
+     * or other persistence framework level
+     */
+    if (events & (~OVS_POLLIN)) {
+        poll_fd_subscribe_at(fd, 0, events, NULL, where, true);
+    }
 }
 
+
 #ifdef _WIN32
 /* Registers for the next call to poll_block() to wake up when 'wevent' is
  * signaled.
@@ -163,7 +245,7 @@ poll_fd_wait_at(int fd, short int events, const char *where)
 void
 poll_wevent_wait_at(HANDLE wevent, const char *where)
 {
-    poll_create_node(0, wevent, 0, where);
+    poll_fd_subscribe_at(0, wevent, 0, NULL, where);
 }
 #endif /* _WIN32 */
 
@@ -277,9 +359,12 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
         if (pollfd->revents & OVS_POLLHUP) {
             ds_put_cstr(&s, "[OVS_POLLHUP]");
         }
+#ifndef OVS_USE_EPOLL
+        /* epoll does not have NVAL - it uses RDHUP and HUP which we cannot actually get to here*/
         if (pollfd->revents & OVS_POLLNVAL) {
             ds_put_cstr(&s, "[OVS_POLLNVAL]");
         }
+#endif
         ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
         free(description);
     } else {
@@ -295,12 +380,17 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
     ds_destroy(&s);
 }
 
+
 static void
 free_poll_nodes(struct poll_loop *loop)
 {
     struct poll_node *node, *next;
 
+    ovs_mutex_lock(&loop->loop_mutex);
     HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
+#ifdef OVS_USE_EPOLL
+        epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
+#endif
         hmap_remove(&loop->poll_nodes, &node->hmap_node);
 #ifdef _WIN32
         if (node->wevent && node->pollfd.fd) {
@@ -310,6 +400,7 @@ free_poll_nodes(struct poll_loop *loop)
 #endif
         free(node);
     }
+    ovs_mutex_unlock(&loop->loop_mutex);
 }
 
 /* Blocks until one or more of the events registered with poll_fd_wait()
@@ -320,8 +411,13 @@ poll_block(void)
 {
     struct poll_loop *loop = poll_loop();
     struct poll_node *node;
+#ifndef OVS_USE_EPOLL
     struct pollfd *pollfds;
+#endif
+#ifndef OVS_USE_EPOLL
     HANDLE *wevents = NULL;
+    int counter;
+#endif
     int elapsed;
     int retval;
     int i;
@@ -335,54 +431,126 @@ poll_block(void)
     }
 
     timewarp_run();
-    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
 
+#ifdef OVS_USE_EPOLL
+    retval = time_epoll_wait(loop->epoll_fd,
+        (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, loop->timeout_when, &elapsed);
+    if (retval < 0) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+        VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
+    } else if (!retval) {
+        log_wakeup(loop->timeout_where, NULL, elapsed);
+    } else {
+        ovs_mutex_lock(&loop->loop_mutex);
+        if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+            for (i = 0; i < retval; i++) {
+                node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+                if (loop->epoll_events[i].events) {
+                    node->pollfd.revents = loop->epoll_events[i].events;
+                    log_wakeup(node->where, &node->pollfd, 0);
+                }
+            }
+        }
+        for (i = 0; i < retval; i++) {
+            node = (struct poll_node *) loop->epoll_events[i].data.ptr;
+            if (loop->epoll_events[i].events & EPOLLHUP) {
+                /* File descriptor closed already elsewhere
+                 * We have to make the assumption that whoever closed it has
+                 * ensured that anything which refers to IO event hints will not run
+                 * on this fd after we free it.
+                 */
+                node->valid = false;
+            }
+            if (loop->epoll_events[i].events) {
+                node->pollfd.revents |= (loop->epoll_events[i].events & 0x0000FFFF);
+            }
+            if (loop->epoll_events[i].events & OVS_POLLOUT) {
+                struct epoll_event event;
+                node->pollfd.events = OVS_POLLIN; /* reset back to defaults - write needs one shot */
+                event.events = node->pollfd.events;
+                event.data.ptr = node;
+                epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, &event);
+            }
+        }
+        ovs_mutex_unlock(&loop->loop_mutex);
+    }
+#else
+    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
 #ifdef _WIN32
     wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
 #endif
 
+
     /* Populate with all the fds and events. */
-    i = 0;
+    counter = 0;
     HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
-        pollfds[i] = node->pollfd;
+        if ((node->valid) && (node->pollfd.events)) {
+            pollfds[counter] = node->pollfd;
 #ifdef _WIN32
-        wevents[i] = node->wevent;
-        if (node->pollfd.fd && node->wevent) {
-            short int wsa_events = 0;
-            if (node->pollfd.events & OVS_POLLIN) {
-                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+            wevents[counter] = node->wevent;
+            if (node->pollfd.fd && node->wevent) {
+                short int wsa_events = 0;
+                if (node->pollfd.events & OVS_POLLIN) {
+                    wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
+                }
+                if (node->pollfd.events & OVS_POLLOUT) {
+                    wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
+                }
+                WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
             }
-            if (node->pollfd.events & OVS_POLLOUT) {
-                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
-            }
-            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
-        }
 #endif
-        i++;
+            counter++;
+        }
     }
 
-    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
+    retval = time_poll(pollfds, counter, wevents,
                        loop->timeout_when, &elapsed);
     if (retval < 0) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
         VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
-    } else if (!retval) {
+    } else if (retval == 0) {
         log_wakeup(loop->timeout_where, NULL, elapsed);
-    } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
-        i = 0;
-        HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
+    } else {
+        for (i = 0; i < counter; i++) {
             if (pollfds[i].revents) {
-                log_wakeup(node->where, &pollfds[i], 0);
+
+                node = find_poll_node(loop, pollfds[i].fd, 0);
+
+                if (!node) {
+                    VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
+                }
+                if (pollfds[i].revents & (OVS_POLLHUP | OVS_POLLNVAL)) {
+                    node->valid = false;
+                }
+
+                if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
+                    log_wakeup(node->where, &pollfds[i], 0);
+                }
+                /* update "requested" events. 
+                 * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
+                 * behaviour which they should be using in real life instead of using poll()
+                 */
+                if (node->private) {
+                    node->pollfd.events &= ~(pollfds[i].revents & (~OVS_POLLIN));
+                } else {
+                    node->pollfd.events &= ~pollfds[i].revents;
+                }
+                /* update "occured" events for use by streams and handlers. In case there
+                 * is an existing (but not consumed yet) event, we OR the events in the
+                 * stored record with the new ones - it is the job of the stream to clear
+                 * that.
+                 */
+                node->pollfd.revents |= pollfds[i].revents;
             }
-            i++;
         }
     }
 
-    free_poll_nodes(loop);
+    free(pollfds);
+    if (wevents)
+        free(wevents);
+#endif
     loop->timeout_when = LLONG_MAX;
     loop->timeout_where = NULL;
-    free(pollfds);
-    free(wevents);
 
     /* Handle any pending signals before doing anything else. */
     fatal_signal_run();
@@ -416,8 +584,12 @@ poll_loop(void)
     if (!loop) {
         loop = xzalloc(sizeof *loop);
         loop->timeout_when = LLONG_MAX;
+        ovs_mutex_init(&loop->loop_mutex);
         hmap_init(&loop->poll_nodes);
         xpthread_setspecific(key, loop);
+#ifdef OVS_USE_EPOLL
+        loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
+#endif
     }
     return loop;
 }
diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c
index 3dfa80c7f..16d155989 100644
--- a/lib/route-table-bsd.c
+++ b/lib/route-table-bsd.c
@@ -34,6 +34,7 @@
 #include "ovs-router.h"
 #include "packets.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
 #include "util.h"
 
 VLOG_DEFINE_THIS_MODULE(route_table_bsd);
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 62f768d45..6a80d6e05 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -40,6 +40,8 @@ struct stream_fd
     struct stream stream;
     int fd;
     int fd_type;
+    bool rx_ready, tx_ready;
+    struct pollfd *hint;
 };
 
 static const struct stream_class stream_fd_class;
@@ -67,7 +69,14 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
     stream_init(&s->stream, &stream_fd_class, connect_status, name);
     s->fd = fd;
     s->fd_type = fd_type;
+    s->rx_ready = true;
+    s->tx_ready = true;
+    s->hint = NULL;
     *streamp = &s->stream;
+    /* Persistent registration - we always get POLLINs from now on,
+     * POLLOUTs when we ask for them
+     */
+    poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
     return 0;
 }
 
@@ -82,6 +91,8 @@ static void
 fd_close(struct stream *stream)
 {
     struct stream_fd *s = stream_fd_cast(stream);
+    /* Deregister the FD from any persistent registrations if supported */
+    poll_fd_deregister(s->fd);
     closesocket(s->fd);
     free(s);
 }
@@ -104,6 +115,24 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (s->hint) {
+        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+         * to the read which should return 0 if the HUP is a real one, if not we clear it
+         * for all other cases we belive what (e)poll has fed us.
+         */
+        if ((!(s->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (!s->rx_ready)) {
+            if (!(s->hint->revents & OVS_POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready */
+                s->rx_ready = true;
+                s->hint->revents &= ~OVS_POLLIN;
+            }
+        } else {
+            s->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
+        }
+    }
+
     retval = recv(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
@@ -114,6 +143,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
+        } else {
+            s->rx_ready = false;
         }
         return -error;
     }
@@ -127,9 +158,29 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
     ssize_t retval;
     int error;
 
+    if (s->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (!s->tx_ready) {
+            if (!(s->hint->revents & OVS_POLLOUT)) {
+                return -EAGAIN;
+            } else {
+                /* POLLOUT event from poll loop, mark us as ready */
+                s->tx_ready = true;
+                s->hint->revents &= ~OVS_POLLOUT;
+            }
+        }
+    }
     retval = send(s->fd, buffer, n, 0);
     if (retval < 0) {
         error = sock_errno();
+#ifdef __linux__
+        /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. Usually seen
+         *  on unix domain sockets 
+         */
+        if (error == ENOBUFS) {
+           error = EAGAIN;
+        }
+#endif
 #ifdef _WIN32
         if (error == WSAEWOULDBLOCK) {
            error = EAGAIN;
@@ -137,6 +188,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
+        } else {
+            s->tx_ready = false;
         }
         return -error;
     }
@@ -150,11 +203,11 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
     switch (wait) {
     case STREAM_CONNECT:
     case STREAM_SEND:
-        poll_fd_wait(s->fd, OVS_POLLOUT);
+        private_poll_fd_wait(s->fd, OVS_POLLOUT);
         break;
 
     case STREAM_RECV:
-        poll_fd_wait(s->fd, OVS_POLLIN);
+        private_poll_fd_wait(s->fd, OVS_POLLIN);
         break;
 
     default:
@@ -223,6 +276,8 @@ new_fd_pstream(char *name, int fd,
     ps->accept_cb = accept_cb;
     ps->unlink_path = unlink_path;
     *pstreamp = &ps->pstream;
+    /* persistent registration */
+    poll_fd_register(ps->fd, OVS_POLLIN, NULL);
     return 0;
 }
 
@@ -230,6 +285,7 @@ static void
 pfd_close(struct pstream *pstream)
 {
     struct fd_pstream *ps = fd_pstream_cast(pstream);
+    poll_fd_deregister(ps->fd);
     closesocket(ps->fd);
     maybe_unlink_and_free(ps->unlink_path);
     free(ps);
@@ -271,7 +327,7 @@ static void
 pfd_wait(struct pstream *pstream)
 {
     struct fd_pstream *ps = fd_pstream_cast(pstream);
-    poll_fd_wait(ps->fd, OVS_POLLIN);
+    private_poll_fd_wait(ps->fd, OVS_POLLIN);
 }
 
 static const struct pstream_class fd_pstream_class = {
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 3b7f9865e..53ae51c1b 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -147,6 +147,7 @@ struct ssl_stream
     /* A few bytes of header data in case SSL negotiation fails. */
     uint8_t head[2];
     short int n_head;
+    struct pollfd *hint;
 };
 
 /* SSL context created by ssl_init(). */
@@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
         SSL_set_msg_callback_arg(ssl, sslv);
     }
 
+
+    poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
     *streamp = &sslv->stream;
     free(server_name);
     return 0;
@@ -604,6 +607,7 @@ ssl_close(struct stream *stream)
     ERR_clear_error();
 
     SSL_free(sslv->ssl);
+    poll_fd_deregister(sslv->fd);
     closesocket(sslv->fd);
     free(sslv);
 }
@@ -697,6 +701,27 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
     /* Behavior of zero-byte SSL_read is poorly defined. */
     ovs_assert(n > 0);
 
+     if (sslv->hint) {
+        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
+         * to the read which should return 0 if the HUP is a real one, if not we clear it
+         * for all other cases we belive what (e)poll has fed us.
+         */
+        if ((!(sslv->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
+            if (!(sslv->hint->revents & OVS_POLLIN)) {
+                return -EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready 
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                sslv->hint->revents &= ~OVS_POLLIN;
+            }
+        } else {
+            sslv->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
+        }
+    }
+
+
+
     old_state = SSL_get_state(sslv->ssl);
     ret = SSL_read(sslv->ssl, buffer, n);
     if (old_state != SSL_get_state(sslv->ssl)) {
@@ -729,6 +754,19 @@ ssl_do_tx(struct stream *stream)
 {
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
+     if (sslv->hint) {
+        /* poll-loop is providing us with hints for IO */
+        if (sslv->tx_want == SSL_WRITING) {
+            if (!(sslv->hint->revents & OVS_POLLOUT)) {
+                return EAGAIN;
+            } else {
+                /* POLLIN event from poll loop, mark us as ready 
+                 * rx_want is cleared further down by reading ssl fsm
+                 */
+                sslv->hint->revents &= ~OVS_POLLOUT;
+            }
+        }
+    }
     for (;;) {
         int old_state = SSL_get_state(sslv->ssl);
         int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
@@ -771,6 +809,8 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
             ssl_clear_txbuf(sslv);
             return n;
         case EAGAIN:
+            /* we want to know when this fd will become available again */
+            stream_send_wait(stream);
             return n;
         default:
             ssl_clear_txbuf(sslv);
@@ -795,7 +835,7 @@ ssl_run_wait(struct stream *stream)
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
     if (sslv->tx_want != SSL_NOTHING) {
-        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
+        private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
     }
 }
 
@@ -811,13 +851,13 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
         } else {
             switch (sslv->state) {
             case STATE_TCP_CONNECTING:
-                poll_fd_wait(sslv->fd, OVS_POLLOUT);
+                private_poll_fd_wait(sslv->fd, OVS_POLLOUT);
                 break;
 
             case STATE_SSL_CONNECTING:
                 /* ssl_connect() called SSL_accept() or SSL_connect(), which
                  * set up the status that we test here. */
-                poll_fd_wait(sslv->fd,
+                private_poll_fd_wait(sslv->fd,
                                want_to_poll_events(SSL_want(sslv->ssl)));
                 break;
 
@@ -829,7 +869,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
 
     case STREAM_RECV:
         if (sslv->rx_want != SSL_NOTHING) {
-            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
+            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
         } else {
             poll_immediate_wake();
         }
@@ -911,6 +951,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
                  ds_steal_cstr(&bound_name));
     pstream_set_bound_port(&pssl->pstream, htons(port));
     pssl->fd = fd;
+    poll_fd_register(fd, OVS_POLLIN, NULL);
     *pstreamp = &pssl->pstream;
 
     return 0;
@@ -920,6 +961,7 @@ static void
 pssl_close(struct pstream *pstream)
 {
     struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
+    poll_fd_deregister(pssl->fd);
     closesocket(pssl->fd);
     free(pssl);
 }
diff --git a/lib/timeval.c b/lib/timeval.c
index 193c7bab1..59a12414f 100644
--- a/lib/timeval.c
+++ b/lib/timeval.c
@@ -38,6 +38,7 @@
 #include "unixctl.h"
 #include "util.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/poll-loop.h"
 
 VLOG_DEFINE_THIS_MODULE(timeval);
 
@@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
     return retval;
 }
 
+#ifdef OVS_USE_EPOLL
+
+/* Like epoll_wait(), except:
+ *
+ *      - The timeout is specified as an absolute time, as defined by
+ *        time_msec(), instead of a duration.
+ *
+ *      - On error, returns a negative error code (instead of setting errno).
+ *
+ *      - If interrupted by a signal, retries automatically until the original
+ *        timeout is reached.  (Because of this property, this function will
+ *        never return -EINTR.)
+ *
+ * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
+int
+time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+          long long int timeout_when, int *elapsed)
+{
+    long long int *last_wakeup = last_wakeup_get();
+    long long int start;
+    bool quiescent;
+    int retval = 0;
+
+    time_init();
+    coverage_clear();
+    coverage_run();
+    if (*last_wakeup && !thread_is_pmd()) {
+        log_poll_interval(*last_wakeup);
+    }
+    start = time_msec();
+
+    timeout_when = MIN(timeout_when, deadline);
+    quiescent = ovsrcu_is_quiescent();
+
+    for (;;) {
+        long long int now = time_msec();
+        int time_left;
+
+        if (now >= timeout_when) {
+            time_left = 0;
+        } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
+            time_left = INT_MAX;
+        } else {
+            time_left = timeout_when - now;
+        }
+
+        if (!quiescent) {
+            if (!time_left) {
+                ovsrcu_quiesce();
+            } else {
+                ovsrcu_quiesce_start();
+            }
+        }
+
+        retval = epoll_wait(epoll_fd, events, max, time_left);
+        if (retval < 0) {
+            retval = -errno;
+        }
+
+        if (!quiescent && time_left) {
+            ovsrcu_quiesce_end();
+        }
+
+        if (deadline <= time_msec()) {
+            fatal_signal_handler(SIGALRM);
+            if (retval < 0) {
+                retval = 0;
+            }
+            break;
+        }
+
+        if (retval != -EINTR) {
+            break;
+        }
+    }
+    *last_wakeup = time_msec();
+    refresh_rusage();
+    *elapsed = *last_wakeup - start;
+    return retval;
+}
+#endif
+
 long long int
 timespec_to_msec(const struct timespec *ts)
 {
diff --git a/lib/timeval.h b/lib/timeval.h
index 502f703d4..347a09d63 100644
--- a/lib/timeval.h
+++ b/lib/timeval.h
@@ -20,6 +20,9 @@
 #include <time.h>
 #include "openvswitch/type-props.h"
 #include "util.h"
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
 
 #ifdef  __cplusplus
 extern "C" {
@@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *);
 void time_alarm(unsigned int secs);
 int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
               long long int timeout_when, int *elapsed);
+#ifdef __linux__
+int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
+          long long int timeout_when, int *elapsed);
+#endif
 
 long long int timespec_to_msec(const struct timespec *);
 long long int timespec_to_usec(const struct timespec *);
-- 
2.20.1




More information about the dev mailing list