[ovs-dev] [PATCH v2 3/4] Add file descriptor persistence where possible

Anton Ivanov anton.ivanov at cambridgegreys.com
Fri Feb 14 17:55:50 UTC 2020



On 14/02/2020 17:54, anton.ivanov at cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> 
> 1. Adds "persistent" behaviour where feasible (streams and signals).
> These are waited upon in the same thread where they are created. This
> allows them to be registered persistently with the OS (if possible)
> as well as the OS to provide hints - is the FD ready, is it closed,
> etc.

I will amend the commit message in the next version - it is a bit misleading. signals are a global pipe which allows them to be persistent too.

> 
> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> which is not ready if that fd has been registered as "private" to the
> thread which waits upon it.
> 
> 3. No longer breaks other parts of OVS which create the fd in one
> thread and waits upon it in others.
> 
> 3. Adds support for EPOLL on Linux and can be expanded to cover similar
> poll++ frameworks in other OSes.
> 
> 4. Sets up the necessary infrastructure to make IO/SSL multi-threaded
> using a "centeral (e)poll dispatcher + IO threads" pattern
> 
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---
>   include/openvswitch/poll-loop.h |  56 +++++-
>   lib/dpif-netlink.c              |   6 +-
>   lib/fatal-signal.c              |   7 +-
>   lib/latch-unix.c                |   3 +-
>   lib/netdev-afxdp.c              |   2 +-
>   lib/poll-loop.c                 | 320 ++++++++++++++++++++++++--------
>   lib/route-table-bsd.c           |   1 +
>   lib/stream-fd.c                 |  62 ++++++-
>   lib/stream-ssl.c                |  50 ++++-
>   lib/timeval.c                   |  83 +++++++++
>   lib/timeval.h                   |   7 +
>   11 files changed, 508 insertions(+), 89 deletions(-)
> 
> diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
> index 532d9caa6..6d0331f6d 100644
> --- a/include/openvswitch/poll-loop.h
> +++ b/include/openvswitch/poll-loop.h
> @@ -41,11 +41,30 @@
>   #include <windows.h>
>   #endif
>   
> +#ifdef __linux__
> +#define OVS_USE_EPOLL
> +#endif
> +
> +#ifdef OVS_USE_EPOLL
> +#include <sys/epoll.h>
> +
> +#define OVS_POLLIN EPOLLIN
> +#define OVS_POLLOUT EPOLLOUT
> +#define OVS_POLLERR EPOLLERR
> +#define OVS_POLLHUP EPOLLHUP
> +#define OVS_ONESHOT EPOLLONESHOT
> +#define OVS_POLLNVAL 0
> +
> +#else
> +
>   #define OVS_POLLIN POLLIN
>   #define OVS_POLLOUT POLLOUT
>   #define OVS_POLLERR POLLERR
>   #define OVS_POLLNVAL POLLNVAL
>   #define OVS_POLLHUP POLLHUP
> +#define OVS_ONESHOT (1U << 30)
> +
> +#endif
>   
>   #ifdef  __cplusplus
>   extern "C" {
> @@ -60,10 +79,43 @@ extern "C" {
>    * the source code location of the caller.  The function version allows the
>    * caller to supply a location explicitly, which is useful if the caller's own
>    * caller would be more useful in log output.  See timer_wait_at() for an
> - * example. */
> -void poll_fd_wait_at(int fd, short int events, const char *where);
> + * example.
> + * Note - using on fds registered using poll_fd_register() will generate a
> + * warning as this is not an intended use.
> + */
> +void poll_fd_wait_at(int fd, int events, const char *where);
>   #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>   
> +/* Register a fd with a persistence framework if available so it can be served
> + * "faster" and the caller can be provided with "hints" on what caused the IO
> + * event.
> + * If the "hint" argument is supplied it set to point to the pollfd structure
> + * containing the events passed by the OS in .revents.
> + * Note - as the frameworks are OS dependent, the events are limited to what
> + * can be passed in a .revents which is a short int.
> + * Limitations - MUST BE registered from the same thread as the one where
> + * it will be waited upon.
> + */
> +
> +void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
> +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
> +
> +/* De-register a fd which was registered as "private" with the persistence
> + * framework
> + */
> +
> +void poll_fd_deregister_at(int fd, const char *where);
> +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
> +
> +/* Schedule events to wake up the following poll_block() - "private fds"
> + * Same as poll_fd_wait, but for fds which have been registered and are
> + * expected to persist. If a "fast" OS fd notification framework is used
> + * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
> + */
> +void private_poll_fd_wait_at(int fd, int events, const char *where);
> +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
> +
> +
>   #ifdef _WIN32
>   void poll_wevent_wait_at(HANDLE wevent, const char *where);
>   #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 5b5c96d72..ad5db9452 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_)
>       const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>   
>       if (dpif->port_notifier) {
> -        nl_sock_wait(dpif->port_notifier, POLLIN);
> +        nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
>       } else {
>           poll_immediate_wake();
>       }
> @@ -2756,13 +2756,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
>       }
>   
>       for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
> -        nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
> +        nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
>       }
>   #else
>       if (dpif->handlers && handler_id < dpif->n_handlers) {
>           struct dpif_handler *handler = &dpif->handlers[handler_id];
>   
> -        poll_fd_wait(handler->epoll_fd, POLLIN);
> +        poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
>       }
>   #endif
>   }
> diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
> index 97d8d1dab..424636e07 100644
> --- a/lib/fatal-signal.c
> +++ b/lib/fatal-signal.c
> @@ -96,6 +96,7 @@ fatal_signal_init(void)
>           ovs_mutex_init_recursive(&mutex);
>   #ifndef _WIN32
>           xpipe_nonblocking(signal_fds);
> +        poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
>   #else
>           wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
>           if (!wevent) {
> @@ -236,9 +237,12 @@ void
>   fatal_signal_run(void)
>   {
>       sig_atomic_t sig_nr;
> +    char sigbuffer[_POSIX_PIPE_BUF];
>   
>       fatal_signal_init();
>   
> +    read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
> +
>       sig_nr = stored_sig_nr;
>       if (sig_nr != SIG_ATOMIC_MAX) {
>           char namebuf[SIGNAL_NAME_BUFSIZE];
> @@ -271,7 +275,8 @@ fatal_signal_wait(void)
>   #ifdef _WIN32
>       poll_wevent_wait(wevent);
>   #else
> -    poll_fd_wait(signal_fds[0], OVS_POLLIN);
> +    /* a noop - schedule for removal */
> +    private_poll_fd_wait(signal_fds[0], OVS_POLLIN);
>   #endif
>   }
>   
> diff --git a/lib/latch-unix.c b/lib/latch-unix.c
> index fea61ab28..5f15b59fe 100644
> --- a/lib/latch-unix.c
> +++ b/lib/latch-unix.c
> @@ -83,5 +83,6 @@ latch_is_set(const struct latch *latch)
>   void
>   latch_wait_at(const struct latch *latch, const char *where)
>   {
> -    poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
> +    /* Ask for wait and make it one-shot if persistence is in play */
> +    poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
>   }
> diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c
> index ef367e5ea..482400d8d 100644
> --- a/lib/netdev-afxdp.c
> +++ b/lib/netdev-afxdp.c
> @@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
>   
>       if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
>           pfd.fd = fd;
> -        pfd.events = OVS_POLLIN;
> +        pfd.events = POLLIN;
>   
>           ret = poll(&pfd, 1, 0);
>           if (OVS_UNLIKELY(ret < 0)) {
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 3902d6c1f..10a5b0c01 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -18,6 +18,12 @@
>   #include "openvswitch/poll-loop.h"
>   #include <errno.h>
>   #include <inttypes.h>
> +#ifdef OVS_USE_EPOLL
> +#include <sys/epoll.h>
> +#endif
> +#ifndef _WIN32
> +#include <unistd.h>
> +#endif
>   #include <poll.h>
>   #include <stdlib.h>
>   #include <string.h>
> @@ -31,7 +37,9 @@
>   #include "timeval.h"
>   #include "openvswitch/vlog.h"
>   #include "openvswitch/hmap.h"
> +#include "openvswitch/list.h"
>   #include "hash.h"
> +#include "ovs-atomic.h"
>   
>   VLOG_DEFINE_THIS_MODULE(poll_loop);
>   
> @@ -43,21 +51,32 @@ struct poll_node {
>       struct pollfd pollfd;       /* Events to pass to time_poll(). */
>       HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
>       const char *where;          /* Where poll_node was created. */
> +    bool valid;                 /* Can it be used? */
> +    bool private;               /* Can we assume that it is only in this thread poll loop? */
>   };
>   
> +#define MAX_EPOLL_EVENTS 64
> +
>   struct poll_loop {
> -    /* All active poll waiters. */
> +    /* List of all poll loops in the system */
> +    struct ovs_mutex loop_mutex;
> +    /* All poll waiters for this poll loop */
>       struct hmap poll_nodes;
>   
>       /* Time at which to wake up the next call to poll_block(), LLONG_MIN to
>        * wake up immediately, or LLONG_MAX to wait forever. */
>       long long int timeout_when; /* In msecs as returned by time_msec(). */
>       const char *timeout_where;  /* Where 'timeout_when' was set. */
> +#ifdef OVS_USE_EPOLL
> +    int epoll_fd;
> +    struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
> +#endif
>   };
>   
> +
>   static struct poll_loop *poll_loop(void);
>   
> -/* Look up the node with same fd or wevent. */
> +/* Look up the node with same fd or wevent - should be accessed under &loop->mutex. */
>   static struct poll_node *
>   find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
>   {
> @@ -76,79 +95,142 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
>       }
>       return NULL;
>   }
> -
> -/* On Unix based systems:
> - *
> - *     Registers 'fd' as waiting for the specified 'events' (which should be
> - *     OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to
> - *     poll_block() will wake up when 'fd' becomes ready for one or more of the
> - *     requested events. The 'fd's are given to poll() function later.
> - *
> - * On Windows system:
> +/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
> + * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
> + * wake up when 'fd' becomes ready for one or more of the requested events.
>    *
> - *     If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
> - *     'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
> - *     it is assumed that it is unrelated to any sockets and poll_block()
> - *     will wake up on any event on that 'wevent'. It is an error to pass
> - *     both 'wevent' and 'fd'.
> + * The event registration is PERSISTENT. This is intended for OSes which have a persistent
> + * event framework. For now it is implemented only for epoll and Linux, other
> + * implementations such as BSD kqueue and Solaris /dev/poll may follow.
>    *
> - * The event registration is one-shot: only the following call to
> - * poll_block() is affected.  The event will need to be re-registered after
> - * poll_block() is called if it is to persist.
> + * If the OS has no persistent even framework does nothing
>    *
>    * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
>    * automatically provide the caller's source file and line number for
>    * 'where'.) */
> +
>   static void
> -poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
> +poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint, const char *where, bool private)
>   {
>       struct poll_loop *loop = poll_loop();
>       struct poll_node *node;
> +#ifdef OVS_USE_EPOLL
> +    struct epoll_event event;
> +#endif
>   
> -    COVERAGE_INC(poll_create_node);
> -
> -    /* Both 'fd' and 'wevent' cannot be set. */
>       ovs_assert(!fd != !wevent);
>   
> +    /* This is mostly uncontended, so the thread should grab it straight away.
> +     * We will reuse it later to introduce threading for IO and SSL
> +     */
> +    ovs_mutex_lock(&loop->loop_mutex);
> +
>       /* Check for duplicate.  If found, "or" the events. */
>       node = find_poll_node(loop, fd, wevent);
> -    if (node) {
> -        node->pollfd.events |= events;
> -    } else {
> -        node = xzalloc(sizeof *node);
> -        hmap_insert(&loop->poll_nodes, &node->hmap_node,
> -                    hash_2words(fd, (uint32_t)wevent));
> -        node->pollfd.fd = fd;
> -        node->pollfd.events = events;
> -#ifdef _WIN32
> -        if (!wevent) {
> -            wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
> +
> +    if (node && node->valid) {
> +#ifdef OVS_USE_EPOLL
> +        int old_event_mask = node->pollfd.events;
> +#endif
> +        /* If there is an existing event mask we do not need to inc - this will be waited upon */
> +        node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll specific bits */
> +
> +#ifdef OVS_USE_EPOLL
> +        /* modify existing epoll entry if there is an epoll specific ask or if the
> +         * mask has changed
> +         */
> +        if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
> +            event.events = node->pollfd.events | events | EPOLLHUP | EPOLLRDHUP;
> +            event.data.ptr = node;
> +            epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
>           }
>   #endif
> +    } else {
> +        if (!node) {
> +            node = xzalloc(sizeof *node);
> +            hmap_insert(&loop->poll_nodes, &node->hmap_node,
> +                        hash_2words(fd, 0));
> +        } else {
> +            /* node marked for reaping, OS has reused the fd number, valid is set to false */
> +#ifdef OVS_USE_EPOLl
> +            epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
> +#endif
> +        }
> +        node->pollfd.fd = fd;
> +        node->pollfd.events = (events & 0x0000FFFF);
>           node->wevent = wevent;
>           node->where = where;
> +        node->valid = true;
> +        node->private = private;
> +#ifdef OVS_USE_EPOLL
> +        event.events = node->pollfd.events | EPOLLHUP | EPOLLRDHUP; /* we always listen for fd close */
> +        event.data.ptr = node;
> +        epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event);
> +#endif
> +    }
> +    if (hint) {
> +        *hint = &node->pollfd;
>       }
> +    ovs_mutex_unlock(&loop->loop_mutex);
> +}
> +
> +void
> +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
> +    poll_fd_subscribe_at(fd, 0, events, hint, where , true);
> +}
> +
> +/* Deregisters a fd. Note - this looks like a memory leak (deallocating only private fds)
> + * but it is not.
> + * In order to be compatible with existing calling conventions while using fd persistence
> + * where supported we have to keep "legacy" fds around for the duration of the life of
> + * the thread because we have no idea if they have been reaped properly or not.
> + * The reason for this is that for some of them the close() is in a thread different from the
> + * poll loop.
> + * Thus, the only thing we can do in this case is mark them "invalid". Once the OS reuses the
> + * same fd number, we will reuse the existing has entry.
> + */
> +
> +void
> +poll_fd_deregister_at(int fd, const char *where) {
> +    struct poll_loop *loop = poll_loop();
> +
> +    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
> +    struct poll_node *node;
> +
> +    ovs_mutex_lock(&loop->loop_mutex);
> +    node = find_poll_node(loop, fd, 0);
> +    if (node) {
> +        if (node->private) {
> +#ifdef OVN_USE_EPOLL
> +            epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
> +#endif
> +            hmap_remove(&loop->poll_nodes, &node->hmap_node);
> +        } else {
> +            VLOG(VLL_WARN, "Trying to deregister a non-private %d from %s", fd, where);
> +            node->valid = false;
> +        }
> +    }
> +    ovs_mutex_unlock(&loop->loop_mutex);
> +}
> +
> +void
> +poll_fd_wait_at(int fd, int events, const char *where)
> +{
> +    poll_fd_subscribe_at(fd, 0, events, NULL, where, false);
>   }
>   
> -/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
> - * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT).  The following call to poll_block() will
> - * wake up when 'fd' becomes ready for one or more of the requested events.
> - *
> - * On Windows, 'fd' must be a socket.
> - *
> - * The event registration is one-shot: only the following call to poll_block()
> - * is affected.  The event will need to be re-registered after poll_block() is
> - * called if it is to persist.
> - *
> - * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
> - * automatically provide the caller's source file and line number for
> - * 'where'.) */
>   void
> -poll_fd_wait_at(int fd, short int events, const char *where)
> +private_poll_fd_wait_at(int fd, int events, const char *where)
>   {
> -    poll_create_node(fd, 0, events, where);
> +    /* POLLIN persists on "private" fds - either emulated or at epoll
> +     * or other persistence framework level
> +     */
> +    if (events & (~OVS_POLLIN)) {
> +        poll_fd_subscribe_at(fd, 0, events, NULL, where, true);
> +    }
>   }
>   
> +
>   #ifdef _WIN32
>   /* Registers for the next call to poll_block() to wake up when 'wevent' is
>    * signaled.
> @@ -163,7 +245,7 @@ poll_fd_wait_at(int fd, short int events, const char *where)
>   void
>   poll_wevent_wait_at(HANDLE wevent, const char *where)
>   {
> -    poll_create_node(0, wevent, 0, where);
> +    poll_fd_subscribe_at(0, wevent, 0, NULL, where);
>   }
>   #endif /* _WIN32 */
>   
> @@ -277,9 +359,12 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
>           if (pollfd->revents & OVS_POLLHUP) {
>               ds_put_cstr(&s, "[OVS_POLLHUP]");
>           }
> +#ifndef OVS_USE_EPOLL
> +        /* epoll does not have NVAL - it uses RDHUP and HUP which we cannot actually get to here*/
>           if (pollfd->revents & OVS_POLLNVAL) {
>               ds_put_cstr(&s, "[OVS_POLLNVAL]");
>           }
> +#endif
>           ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
>           free(description);
>       } else {
> @@ -295,12 +380,17 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
>       ds_destroy(&s);
>   }
>   
> +
>   static void
>   free_poll_nodes(struct poll_loop *loop)
>   {
>       struct poll_node *node, *next;
>   
> +    ovs_mutex_lock(&loop->loop_mutex);
>       HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
> +#ifdef OVS_USE_EPOLL
> +        epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
> +#endif
>           hmap_remove(&loop->poll_nodes, &node->hmap_node);
>   #ifdef _WIN32
>           if (node->wevent && node->pollfd.fd) {
> @@ -310,6 +400,7 @@ free_poll_nodes(struct poll_loop *loop)
>   #endif
>           free(node);
>       }
> +    ovs_mutex_unlock(&loop->loop_mutex);
>   }
>   
>   /* Blocks until one or more of the events registered with poll_fd_wait()
> @@ -320,8 +411,13 @@ poll_block(void)
>   {
>       struct poll_loop *loop = poll_loop();
>       struct poll_node *node;
> +#ifndef OVS_USE_EPOLL
>       struct pollfd *pollfds;
> +#endif
> +#ifndef OVS_USE_EPOLL
>       HANDLE *wevents = NULL;
> +    int counter;
> +#endif
>       int elapsed;
>       int retval;
>       int i;
> @@ -335,54 +431,126 @@ poll_block(void)
>       }
>   
>       timewarp_run();
> -    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
>   
> +#ifdef OVS_USE_EPOLL
> +    retval = time_epoll_wait(loop->epoll_fd,
> +        (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, loop->timeout_when, &elapsed);
> +    if (retval < 0) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> +        VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
> +    } else if (!retval) {
> +        log_wakeup(loop->timeout_where, NULL, elapsed);
> +    } else {
> +        ovs_mutex_lock(&loop->loop_mutex);
> +        if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> +            for (i = 0; i < retval; i++) {
> +                node = (struct poll_node *) loop->epoll_events[i].data.ptr;
> +                if (loop->epoll_events[i].events) {
> +                    node->pollfd.revents = loop->epoll_events[i].events;
> +                    log_wakeup(node->where, &node->pollfd, 0);
> +                }
> +            }
> +        }
> +        for (i = 0; i < retval; i++) {
> +            node = (struct poll_node *) loop->epoll_events[i].data.ptr;
> +            if (loop->epoll_events[i].events & EPOLLHUP) {
> +                /* File descriptor closed already elsewhere
> +                 * We have to make the assumption that whoever closed it has
> +                 * ensured that anything which refers to IO event hints will not run
> +                 * on this fd after we free it.
> +                 */
> +                node->valid = false;
> +            }
> +            if (loop->epoll_events[i].events) {
> +                node->pollfd.revents |= (loop->epoll_events[i].events & 0x0000FFFF);
> +            }
> +            if (loop->epoll_events[i].events & OVS_POLLOUT) {
> +                struct epoll_event event;
> +                node->pollfd.events = OVS_POLLIN; /* reset back to defaults - write needs one shot */
> +                event.events = node->pollfd.events;
> +                event.data.ptr = node;
> +                epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, &event);
> +            }
> +        }
> +        ovs_mutex_unlock(&loop->loop_mutex);
> +    }
> +#else
> +    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
>   #ifdef _WIN32
>       wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
>   #endif
>   
> +
>       /* Populate with all the fds and events. */
> -    i = 0;
> +    counter = 0;
>       HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> -        pollfds[i] = node->pollfd;
> +        if ((node->valid) && (node->pollfd.events)) {
> +            pollfds[counter] = node->pollfd;
>   #ifdef _WIN32
> -        wevents[i] = node->wevent;
> -        if (node->pollfd.fd && node->wevent) {
> -            short int wsa_events = 0;
> -            if (node->pollfd.events & OVS_POLLIN) {
> -                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> +            wevents[counter] = node->wevent;
> +            if (node->pollfd.fd && node->wevent) {
> +                short int wsa_events = 0;
> +                if (node->pollfd.events & OVS_POLLIN) {
> +                    wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> +                }
> +                if (node->pollfd.events & OVS_POLLOUT) {
> +                    wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> +                }
> +                WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
>               }
> -            if (node->pollfd.events & OVS_POLLOUT) {
> -                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> -            }
> -            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
> -        }
>   #endif
> -        i++;
> +            counter++;
> +        }
>       }
>   
> -    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
> +    retval = time_poll(pollfds, counter, wevents,
>                          loop->timeout_when, &elapsed);
>       if (retval < 0) {
>           static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>           VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
> -    } else if (!retval) {
> +    } else if (retval == 0) {
>           log_wakeup(loop->timeout_where, NULL, elapsed);
> -    } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> -        i = 0;
> -        HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> +    } else {
> +        for (i = 0; i < counter; i++) {
>               if (pollfds[i].revents) {
> -                log_wakeup(node->where, &pollfds[i], 0);
> +
> +                node = find_poll_node(loop, pollfds[i].fd, 0);
> +
> +                if (!node) {
> +                    VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
> +                }
> +                if (pollfds[i].revents & (OVS_POLLHUP | OVS_POLLNVAL)) {
> +                    node->valid = false;
> +                }
> +
> +                if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> +                    log_wakeup(node->where, &pollfds[i], 0);
> +                }
> +                /* update "requested" events.
> +                 * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
> +                 * behaviour which they should be using in real life instead of using poll()
> +                 */
> +                if (node->private) {
> +                    node->pollfd.events &= ~(pollfds[i].revents & (~OVS_POLLIN));
> +                } else {
> +                    node->pollfd.events &= ~pollfds[i].revents;
> +                }
> +                /* update "occured" events for use by streams and handlers. In case there
> +                 * is an existing (but not consumed yet) event, we OR the events in the
> +                 * stored record with the new ones - it is the job of the stream to clear
> +                 * that.
> +                 */
> +                node->pollfd.revents |= pollfds[i].revents;
>               }
> -            i++;
>           }
>       }
>   
> -    free_poll_nodes(loop);
> +    free(pollfds);
> +    if (wevents)
> +        free(wevents);
> +#endif
>       loop->timeout_when = LLONG_MAX;
>       loop->timeout_where = NULL;
> -    free(pollfds);
> -    free(wevents);
>   
>       /* Handle any pending signals before doing anything else. */
>       fatal_signal_run();
> @@ -416,8 +584,12 @@ poll_loop(void)
>       if (!loop) {
>           loop = xzalloc(sizeof *loop);
>           loop->timeout_when = LLONG_MAX;
> +        ovs_mutex_init(&loop->loop_mutex);
>           hmap_init(&loop->poll_nodes);
>           xpthread_setspecific(key, loop);
> +#ifdef OVS_USE_EPOLL
> +        loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
> +#endif
>       }
>       return loop;
>   }
> diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c
> index 3dfa80c7f..16d155989 100644
> --- a/lib/route-table-bsd.c
> +++ b/lib/route-table-bsd.c
> @@ -34,6 +34,7 @@
>   #include "ovs-router.h"
>   #include "packets.h"
>   #include "openvswitch/vlog.h"
> +#include "openvswitch/poll-loop.h"
>   #include "util.h"
>   
>   VLOG_DEFINE_THIS_MODULE(route_table_bsd);
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 62f768d45..6a80d6e05 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -40,6 +40,8 @@ struct stream_fd
>       struct stream stream;
>       int fd;
>       int fd_type;
> +    bool rx_ready, tx_ready;
> +    struct pollfd *hint;
>   };
>   
>   static const struct stream_class stream_fd_class;
> @@ -67,7 +69,14 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
>       stream_init(&s->stream, &stream_fd_class, connect_status, name);
>       s->fd = fd;
>       s->fd_type = fd_type;
> +    s->rx_ready = true;
> +    s->tx_ready = true;
> +    s->hint = NULL;
>       *streamp = &s->stream;
> +    /* Persistent registration - we always get POLLINs from now on,
> +     * POLLOUTs when we ask for them
> +     */
> +    poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
>       return 0;
>   }
>   
> @@ -82,6 +91,8 @@ static void
>   fd_close(struct stream *stream)
>   {
>       struct stream_fd *s = stream_fd_cast(stream);
> +    /* Deregister the FD from any persistent registrations if supported */
> +    poll_fd_deregister(s->fd);
>       closesocket(s->fd);
>       free(s);
>   }
> @@ -104,6 +115,24 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>       ssize_t retval;
>       int error;
>   
> +    if (s->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(s->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (!s->rx_ready)) {
> +            if (!(s->hint->revents & OVS_POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready */
> +                s->rx_ready = true;
> +                s->hint->revents &= ~OVS_POLLIN;
> +            }
> +        } else {
> +            s->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
> +        }
> +    }
> +
>       retval = recv(s->fd, buffer, n, 0);
>       if (retval < 0) {
>           error = sock_errno();
> @@ -114,6 +143,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>   #endif
>           if (error != EAGAIN) {
>               VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
> +        } else {
> +            s->rx_ready = false;
>           }
>           return -error;
>       }
> @@ -127,9 +158,29 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>       ssize_t retval;
>       int error;
>   
> +    if (s->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (!s->tx_ready) {
> +            if (!(s->hint->revents & OVS_POLLOUT)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLOUT event from poll loop, mark us as ready */
> +                s->tx_ready = true;
> +                s->hint->revents &= ~OVS_POLLOUT;
> +            }
> +        }
> +    }
>       retval = send(s->fd, buffer, n, 0);
>       if (retval < 0) {
>           error = sock_errno();
> +#ifdef __linux__
> +        /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. Usually seen
> +         *  on unix domain sockets
> +         */
> +        if (error == ENOBUFS) {
> +           error = EAGAIN;
> +        }
> +#endif
>   #ifdef _WIN32
>           if (error == WSAEWOULDBLOCK) {
>              error = EAGAIN;
> @@ -137,6 +188,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>   #endif
>           if (error != EAGAIN) {
>               VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
> +        } else {
> +            s->tx_ready = false;
>           }
>           return -error;
>       }
> @@ -150,11 +203,11 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>       switch (wait) {
>       case STREAM_CONNECT:
>       case STREAM_SEND:
> -        poll_fd_wait(s->fd, OVS_POLLOUT);
> +        private_poll_fd_wait(s->fd, OVS_POLLOUT);
>           break;
>   
>       case STREAM_RECV:
> -        poll_fd_wait(s->fd, OVS_POLLIN);
> +        private_poll_fd_wait(s->fd, OVS_POLLIN);
>           break;
>   
>       default:
> @@ -223,6 +276,8 @@ new_fd_pstream(char *name, int fd,
>       ps->accept_cb = accept_cb;
>       ps->unlink_path = unlink_path;
>       *pstreamp = &ps->pstream;
> +    /* persistent registration */
> +    poll_fd_register(ps->fd, OVS_POLLIN, NULL);
>       return 0;
>   }
>   
> @@ -230,6 +285,7 @@ static void
>   pfd_close(struct pstream *pstream)
>   {
>       struct fd_pstream *ps = fd_pstream_cast(pstream);
> +    poll_fd_deregister(ps->fd);
>       closesocket(ps->fd);
>       maybe_unlink_and_free(ps->unlink_path);
>       free(ps);
> @@ -271,7 +327,7 @@ static void
>   pfd_wait(struct pstream *pstream)
>   {
>       struct fd_pstream *ps = fd_pstream_cast(pstream);
> -    poll_fd_wait(ps->fd, OVS_POLLIN);
> +    private_poll_fd_wait(ps->fd, OVS_POLLIN);
>   }
>   
>   static const struct pstream_class fd_pstream_class = {
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 3b7f9865e..53ae51c1b 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -147,6 +147,7 @@ struct ssl_stream
>       /* A few bytes of header data in case SSL negotiation fails. */
>       uint8_t head[2];
>       short int n_head;
> +    struct pollfd *hint;
>   };
>   
>   /* SSL context created by ssl_init(). */
> @@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
>           SSL_set_msg_callback_arg(ssl, sslv);
>       }
>   
> +
> +    poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
>       *streamp = &sslv->stream;
>       free(server_name);
>       return 0;
> @@ -604,6 +607,7 @@ ssl_close(struct stream *stream)
>       ERR_clear_error();
>   
>       SSL_free(sslv->ssl);
> +    poll_fd_deregister(sslv->fd);
>       closesocket(sslv->fd);
>       free(sslv);
>   }
> @@ -697,6 +701,27 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
>       /* Behavior of zero-byte SSL_read is poorly defined. */
>       ovs_assert(n > 0);
>   
> +     if (sslv->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(sslv->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
> +            if (!(sslv->hint->revents & OVS_POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                sslv->hint->revents &= ~OVS_POLLIN;
> +            }
> +        } else {
> +            sslv->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
> +        }
> +    }
> +
> +
> +
>       old_state = SSL_get_state(sslv->ssl);
>       ret = SSL_read(sslv->ssl, buffer, n);
>       if (old_state != SSL_get_state(sslv->ssl)) {
> @@ -729,6 +754,19 @@ ssl_do_tx(struct stream *stream)
>   {
>       struct ssl_stream *sslv = ssl_stream_cast(stream);
>   
> +     if (sslv->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (sslv->tx_want == SSL_WRITING) {
> +            if (!(sslv->hint->revents & OVS_POLLOUT)) {
> +                return EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                sslv->hint->revents &= ~OVS_POLLOUT;
> +            }
> +        }
> +    }
>       for (;;) {
>           int old_state = SSL_get_state(sslv->ssl);
>           int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
> @@ -771,6 +809,8 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
>               ssl_clear_txbuf(sslv);
>               return n;
>           case EAGAIN:
> +            /* we want to know when this fd will become available again */
> +            stream_send_wait(stream);
>               return n;
>           default:
>               ssl_clear_txbuf(sslv);
> @@ -795,7 +835,7 @@ ssl_run_wait(struct stream *stream)
>       struct ssl_stream *sslv = ssl_stream_cast(stream);
>   
>       if (sslv->tx_want != SSL_NOTHING) {
> -        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
>       }
>   }
>   
> @@ -811,13 +851,13 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>           } else {
>               switch (sslv->state) {
>               case STATE_TCP_CONNECTING:
> -                poll_fd_wait(sslv->fd, OVS_POLLOUT);
> +                private_poll_fd_wait(sslv->fd, OVS_POLLOUT);
>                   break;
>   
>               case STATE_SSL_CONNECTING:
>                   /* ssl_connect() called SSL_accept() or SSL_connect(), which
>                    * set up the status that we test here. */
> -                poll_fd_wait(sslv->fd,
> +                private_poll_fd_wait(sslv->fd,
>                                  want_to_poll_events(SSL_want(sslv->ssl)));
>                   break;
>   
> @@ -829,7 +869,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>   
>       case STREAM_RECV:
>           if (sslv->rx_want != SSL_NOTHING) {
> -            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
>           } else {
>               poll_immediate_wake();
>           }
> @@ -911,6 +951,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
>                    ds_steal_cstr(&bound_name));
>       pstream_set_bound_port(&pssl->pstream, htons(port));
>       pssl->fd = fd;
> +    poll_fd_register(fd, OVS_POLLIN, NULL);
>       *pstreamp = &pssl->pstream;
>   
>       return 0;
> @@ -920,6 +961,7 @@ static void
>   pssl_close(struct pstream *pstream)
>   {
>       struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
> +    poll_fd_deregister(pssl->fd);
>       closesocket(pssl->fd);
>       free(pssl);
>   }
> diff --git a/lib/timeval.c b/lib/timeval.c
> index 193c7bab1..59a12414f 100644
> --- a/lib/timeval.c
> +++ b/lib/timeval.c
> @@ -38,6 +38,7 @@
>   #include "unixctl.h"
>   #include "util.h"
>   #include "openvswitch/vlog.h"
> +#include "openvswitch/poll-loop.h"
>   
>   VLOG_DEFINE_THIS_MODULE(timeval);
>   
> @@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
>       return retval;
>   }
>   
> +#ifdef OVS_USE_EPOLL
> +
> +/* Like epoll_wait(), except:
> + *
> + *      - The timeout is specified as an absolute time, as defined by
> + *        time_msec(), instead of a duration.
> + *
> + *      - On error, returns a negative error code (instead of setting errno).
> + *
> + *      - If interrupted by a signal, retries automatically until the original
> + *        timeout is reached.  (Because of this property, this function will
> + *        never return -EINTR.)
> + *
> + * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
> +int
> +time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
> +          long long int timeout_when, int *elapsed)
> +{
> +    long long int *last_wakeup = last_wakeup_get();
> +    long long int start;
> +    bool quiescent;
> +    int retval = 0;
> +
> +    time_init();
> +    coverage_clear();
> +    coverage_run();
> +    if (*last_wakeup && !thread_is_pmd()) {
> +        log_poll_interval(*last_wakeup);
> +    }
> +    start = time_msec();
> +
> +    timeout_when = MIN(timeout_when, deadline);
> +    quiescent = ovsrcu_is_quiescent();
> +
> +    for (;;) {
> +        long long int now = time_msec();
> +        int time_left;
> +
> +        if (now >= timeout_when) {
> +            time_left = 0;
> +        } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
> +            time_left = INT_MAX;
> +        } else {
> +            time_left = timeout_when - now;
> +        }
> +
> +        if (!quiescent) {
> +            if (!time_left) {
> +                ovsrcu_quiesce();
> +            } else {
> +                ovsrcu_quiesce_start();
> +            }
> +        }
> +
> +        retval = epoll_wait(epoll_fd, events, max, time_left);
> +        if (retval < 0) {
> +            retval = -errno;
> +        }
> +
> +        if (!quiescent && time_left) {
> +            ovsrcu_quiesce_end();
> +        }
> +
> +        if (deadline <= time_msec()) {
> +            fatal_signal_handler(SIGALRM);
> +            if (retval < 0) {
> +                retval = 0;
> +            }
> +            break;
> +        }
> +
> +        if (retval != -EINTR) {
> +            break;
> +        }
> +    }
> +    *last_wakeup = time_msec();
> +    refresh_rusage();
> +    *elapsed = *last_wakeup - start;
> +    return retval;
> +}
> +#endif
> +
>   long long int
>   timespec_to_msec(const struct timespec *ts)
>   {
> diff --git a/lib/timeval.h b/lib/timeval.h
> index 502f703d4..347a09d63 100644
> --- a/lib/timeval.h
> +++ b/lib/timeval.h
> @@ -20,6 +20,9 @@
>   #include <time.h>
>   #include "openvswitch/type-props.h"
>   #include "util.h"
> +#ifdef __linux__
> +#include <sys/epoll.h>
> +#endif
>   
>   #ifdef  __cplusplus
>   extern "C" {
> @@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *);
>   void time_alarm(unsigned int secs);
>   int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
>                 long long int timeout_when, int *elapsed);
> +#ifdef __linux__
> +int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
> +          long long int timeout_when, int *elapsed);
> +#endif
>   
>   long long int timespec_to_msec(const struct timespec *);
>   long long int timespec_to_usec(const struct timespec *);
> 

-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/



More information about the dev mailing list