[ovs-dev] [PATCH v2 3/4] Add file descriptor persistence where possible
Anton Ivanov
anton.ivanov at cambridgegreys.com
Fri Feb 14 17:55:50 UTC 2020
On 14/02/2020 17:54, anton.ivanov at cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> 1. Adds "persistent" behaviour where feasible (streams and signals).
> These are waited upon in the same thread where they are created. This
> allows them to be registered persistently with the OS (if possible)
> as well as the OS to provide hints - is the FD ready, is it closed,
> etc.
I will amend the commit message in the next version - it is a bit misleading. signals are a global pipe which allows them to be persistent too.
>
> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> which is not ready if that fd has been registered as "private" to the
> thread which waits upon it.
>
> 3. No longer breaks other parts of OVS which create the fd in one
> thread and waits upon it in others.
>
> 3. Adds support for EPOLL on Linux and can be expanded to cover similar
> poll++ frameworks in other OSes.
>
> 4. Sets up the necessary infrastructure to make IO/SSL multi-threaded
> using a "centeral (e)poll dispatcher + IO threads" pattern
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---
> include/openvswitch/poll-loop.h | 56 +++++-
> lib/dpif-netlink.c | 6 +-
> lib/fatal-signal.c | 7 +-
> lib/latch-unix.c | 3 +-
> lib/netdev-afxdp.c | 2 +-
> lib/poll-loop.c | 320 ++++++++++++++++++++++++--------
> lib/route-table-bsd.c | 1 +
> lib/stream-fd.c | 62 ++++++-
> lib/stream-ssl.c | 50 ++++-
> lib/timeval.c | 83 +++++++++
> lib/timeval.h | 7 +
> 11 files changed, 508 insertions(+), 89 deletions(-)
>
> diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
> index 532d9caa6..6d0331f6d 100644
> --- a/include/openvswitch/poll-loop.h
> +++ b/include/openvswitch/poll-loop.h
> @@ -41,11 +41,30 @@
> #include <windows.h>
> #endif
>
> +#ifdef __linux__
> +#define OVS_USE_EPOLL
> +#endif
> +
> +#ifdef OVS_USE_EPOLL
> +#include <sys/epoll.h>
> +
> +#define OVS_POLLIN EPOLLIN
> +#define OVS_POLLOUT EPOLLOUT
> +#define OVS_POLLERR EPOLLERR
> +#define OVS_POLLHUP EPOLLHUP
> +#define OVS_ONESHOT EPOLLONESHOT
> +#define OVS_POLLNVAL 0
> +
> +#else
> +
> #define OVS_POLLIN POLLIN
> #define OVS_POLLOUT POLLOUT
> #define OVS_POLLERR POLLERR
> #define OVS_POLLNVAL POLLNVAL
> #define OVS_POLLHUP POLLHUP
> +#define OVS_ONESHOT (1U << 30)
> +
> +#endif
>
> #ifdef __cplusplus
> extern "C" {
> @@ -60,10 +79,43 @@ extern "C" {
> * the source code location of the caller. The function version allows the
> * caller to supply a location explicitly, which is useful if the caller's own
> * caller would be more useful in log output. See timer_wait_at() for an
> - * example. */
> -void poll_fd_wait_at(int fd, short int events, const char *where);
> + * example.
> + * Note - using on fds registered using poll_fd_register() will generate a
> + * warning as this is not an intended use.
> + */
> +void poll_fd_wait_at(int fd, int events, const char *where);
> #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>
> +/* Register a fd with a persistence framework if available so it can be served
> + * "faster" and the caller can be provided with "hints" on what caused the IO
> + * event.
> + * If the "hint" argument is supplied it set to point to the pollfd structure
> + * containing the events passed by the OS in .revents.
> + * Note - as the frameworks are OS dependent, the events are limited to what
> + * can be passed in a .revents which is a short int.
> + * Limitations - MUST BE registered from the same thread as the one where
> + * it will be waited upon.
> + */
> +
> +void poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
> +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
> +
> +/* De-register a fd which was registered as "private" with the persistence
> + * framework
> + */
> +
> +void poll_fd_deregister_at(int fd, const char *where);
> +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
> +
> +/* Schedule events to wake up the following poll_block() - "private fds"
> + * Same as poll_fd_wait, but for fds which have been registered and are
> + * expected to persist. If a "fast" OS fd notification framework is used
> + * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
> + */
> +void private_poll_fd_wait_at(int fd, int events, const char *where);
> +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
> +
> +
> #ifdef _WIN32
> void poll_wevent_wait_at(HANDLE wevent, const char *where);
> #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 5b5c96d72..ad5db9452 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -1289,7 +1289,7 @@ dpif_netlink_port_poll_wait(const struct dpif *dpif_)
> const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>
> if (dpif->port_notifier) {
> - nl_sock_wait(dpif->port_notifier, POLLIN);
> + nl_sock_wait(dpif->port_notifier, OVS_POLLIN);
> } else {
> poll_immediate_wake();
> }
> @@ -2756,13 +2756,13 @@ dpif_netlink_recv_wait__(struct dpif_netlink *dpif, uint32_t handler_id)
> }
>
> for (i = 0; i < VPORT_SOCK_POOL_SIZE; i++) {
> - nl_sock_wait(sock_pool[i].nl_sock, POLLIN);
> + nl_sock_wait(sock_pool[i].nl_sock, OVS_POLLIN);
> }
> #else
> if (dpif->handlers && handler_id < dpif->n_handlers) {
> struct dpif_handler *handler = &dpif->handlers[handler_id];
>
> - poll_fd_wait(handler->epoll_fd, POLLIN);
> + poll_fd_wait(handler->epoll_fd, OVS_POLLIN);
> }
> #endif
> }
> diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
> index 97d8d1dab..424636e07 100644
> --- a/lib/fatal-signal.c
> +++ b/lib/fatal-signal.c
> @@ -96,6 +96,7 @@ fatal_signal_init(void)
> ovs_mutex_init_recursive(&mutex);
> #ifndef _WIN32
> xpipe_nonblocking(signal_fds);
> + poll_fd_register(signal_fds[0], OVS_POLLIN, NULL);
> #else
> wevent = CreateEvent(NULL, TRUE, FALSE, NULL);
> if (!wevent) {
> @@ -236,9 +237,12 @@ void
> fatal_signal_run(void)
> {
> sig_atomic_t sig_nr;
> + char sigbuffer[_POSIX_PIPE_BUF];
>
> fatal_signal_init();
>
> + read(signal_fds[0], sigbuffer, sizeof(sigbuffer));
> +
> sig_nr = stored_sig_nr;
> if (sig_nr != SIG_ATOMIC_MAX) {
> char namebuf[SIGNAL_NAME_BUFSIZE];
> @@ -271,7 +275,8 @@ fatal_signal_wait(void)
> #ifdef _WIN32
> poll_wevent_wait(wevent);
> #else
> - poll_fd_wait(signal_fds[0], OVS_POLLIN);
> + /* a noop - schedule for removal */
> + private_poll_fd_wait(signal_fds[0], OVS_POLLIN);
> #endif
> }
>
> diff --git a/lib/latch-unix.c b/lib/latch-unix.c
> index fea61ab28..5f15b59fe 100644
> --- a/lib/latch-unix.c
> +++ b/lib/latch-unix.c
> @@ -83,5 +83,6 @@ latch_is_set(const struct latch *latch)
> void
> latch_wait_at(const struct latch *latch, const char *where)
> {
> - poll_fd_wait_at(latch->fds[0], OVS_POLLIN, where);
> + /* Ask for wait and make it one-shot if persistence is in play */
> + poll_fd_wait_at(latch->fds[0], OVS_POLLIN | OVS_ONESHOT, where);
> }
> diff --git a/lib/netdev-afxdp.c b/lib/netdev-afxdp.c
> index ef367e5ea..482400d8d 100644
> --- a/lib/netdev-afxdp.c
> +++ b/lib/netdev-afxdp.c
> @@ -184,7 +184,7 @@ xsk_rx_wakeup_if_needed(struct xsk_umem_info *umem,
>
> if (xsk_ring_prod__needs_wakeup(&umem->fq)) {
> pfd.fd = fd;
> - pfd.events = OVS_POLLIN;
> + pfd.events = POLLIN;
>
> ret = poll(&pfd, 1, 0);
> if (OVS_UNLIKELY(ret < 0)) {
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 3902d6c1f..10a5b0c01 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -18,6 +18,12 @@
> #include "openvswitch/poll-loop.h"
> #include <errno.h>
> #include <inttypes.h>
> +#ifdef OVS_USE_EPOLL
> +#include <sys/epoll.h>
> +#endif
> +#ifndef _WIN32
> +#include <unistd.h>
> +#endif
> #include <poll.h>
> #include <stdlib.h>
> #include <string.h>
> @@ -31,7 +37,9 @@
> #include "timeval.h"
> #include "openvswitch/vlog.h"
> #include "openvswitch/hmap.h"
> +#include "openvswitch/list.h"
> #include "hash.h"
> +#include "ovs-atomic.h"
>
> VLOG_DEFINE_THIS_MODULE(poll_loop);
>
> @@ -43,21 +51,32 @@ struct poll_node {
> struct pollfd pollfd; /* Events to pass to time_poll(). */
> HANDLE wevent; /* Events for WaitForMultipleObjects(). */
> const char *where; /* Where poll_node was created. */
> + bool valid; /* Can it be used? */
> + bool private; /* Can we assume that it is only in this thread poll loop? */
> };
>
> +#define MAX_EPOLL_EVENTS 64
> +
> struct poll_loop {
> - /* All active poll waiters. */
> + /* List of all poll loops in the system */
> + struct ovs_mutex loop_mutex;
> + /* All poll waiters for this poll loop */
> struct hmap poll_nodes;
>
> /* Time at which to wake up the next call to poll_block(), LLONG_MIN to
> * wake up immediately, or LLONG_MAX to wait forever. */
> long long int timeout_when; /* In msecs as returned by time_msec(). */
> const char *timeout_where; /* Where 'timeout_when' was set. */
> +#ifdef OVS_USE_EPOLL
> + int epoll_fd;
> + struct epoll_event epoll_events[MAX_EPOLL_EVENTS];
> +#endif
> };
>
> +
> static struct poll_loop *poll_loop(void);
>
> -/* Look up the node with same fd or wevent. */
> +/* Look up the node with same fd or wevent - should be accessed under &loop->mutex. */
> static struct poll_node *
> find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
> {
> @@ -76,79 +95,142 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
> }
> return NULL;
> }
> -
> -/* On Unix based systems:
> - *
> - * Registers 'fd' as waiting for the specified 'events' (which should be
> - * OVS_POLLIN or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to
> - * poll_block() will wake up when 'fd' becomes ready for one or more of the
> - * requested events. The 'fd's are given to poll() function later.
> - *
> - * On Windows system:
> +/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
> + * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to poll_block() will
> + * wake up when 'fd' becomes ready for one or more of the requested events.
> *
> - * If 'fd' is specified, create a new 'wevent'. Association of 'fd' and
> - * 'wevent' for 'events' happens in poll_block(). If 'wevent' is specified,
> - * it is assumed that it is unrelated to any sockets and poll_block()
> - * will wake up on any event on that 'wevent'. It is an error to pass
> - * both 'wevent' and 'fd'.
> + * The event registration is PERSISTENT. This is intended for OSes which have a persistent
> + * event framework. For now it is implemented only for epoll and Linux, other
> + * implementations such as BSD kqueue and Solaris /dev/poll may follow.
> *
> - * The event registration is one-shot: only the following call to
> - * poll_block() is affected. The event will need to be re-registered after
> - * poll_block() is called if it is to persist.
> + * If the OS has no persistent even framework does nothing
> *
> * ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
> * automatically provide the caller's source file and line number for
> * 'where'.) */
> +
> static void
> -poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
> +poll_fd_subscribe_at(int fd, HANDLE wevent, int events, struct pollfd **hint, const char *where, bool private)
> {
> struct poll_loop *loop = poll_loop();
> struct poll_node *node;
> +#ifdef OVS_USE_EPOLL
> + struct epoll_event event;
> +#endif
>
> - COVERAGE_INC(poll_create_node);
> -
> - /* Both 'fd' and 'wevent' cannot be set. */
> ovs_assert(!fd != !wevent);
>
> + /* This is mostly uncontended, so the thread should grab it straight away.
> + * We will reuse it later to introduce threading for IO and SSL
> + */
> + ovs_mutex_lock(&loop->loop_mutex);
> +
> /* Check for duplicate. If found, "or" the events. */
> node = find_poll_node(loop, fd, wevent);
> - if (node) {
> - node->pollfd.events |= events;
> - } else {
> - node = xzalloc(sizeof *node);
> - hmap_insert(&loop->poll_nodes, &node->hmap_node,
> - hash_2words(fd, (uint32_t)wevent));
> - node->pollfd.fd = fd;
> - node->pollfd.events = events;
> -#ifdef _WIN32
> - if (!wevent) {
> - wevent = CreateEvent(NULL, FALSE, FALSE, NULL);
> +
> + if (node && node->valid) {
> +#ifdef OVS_USE_EPOLL
> + int old_event_mask = node->pollfd.events;
> +#endif
> + /* If there is an existing event mask we do not need to inc - this will be waited upon */
> + node->pollfd.events |= (events & 0x0000FFFF); /* or without epoll specific bits */
> +
> +#ifdef OVS_USE_EPOLL
> + /* modify existing epoll entry if there is an epoll specific ask or if the
> + * mask has changed
> + */
> + if ((events & 0xFFFF0000) || (old_event_mask != node->pollfd.events)) {
> + event.events = node->pollfd.events | events | EPOLLHUP | EPOLLRDHUP;
> + event.data.ptr = node;
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, fd, &event);
> }
> #endif
> + } else {
> + if (!node) {
> + node = xzalloc(sizeof *node);
> + hmap_insert(&loop->poll_nodes, &node->hmap_node,
> + hash_2words(fd, 0));
> + } else {
> + /* node marked for reaping, OS has reused the fd number, valid is set to false */
> +#ifdef OVS_USE_EPOLl
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
> +#endif
> + }
> + node->pollfd.fd = fd;
> + node->pollfd.events = (events & 0x0000FFFF);
> node->wevent = wevent;
> node->where = where;
> + node->valid = true;
> + node->private = private;
> +#ifdef OVS_USE_EPOLL
> + event.events = node->pollfd.events | EPOLLHUP | EPOLLRDHUP; /* we always listen for fd close */
> + event.data.ptr = node;
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_ADD, fd, &event);
> +#endif
> + }
> + if (hint) {
> + *hint = &node->pollfd;
> }
> + ovs_mutex_unlock(&loop->loop_mutex);
> +}
> +
> +void
> +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
> + poll_fd_subscribe_at(fd, 0, events, hint, where , true);
> +}
> +
> +/* Deregisters a fd. Note - this looks like a memory leak (deallocating only private fds)
> + * but it is not.
> + * In order to be compatible with existing calling conventions while using fd persistence
> + * where supported we have to keep "legacy" fds around for the duration of the life of
> + * the thread because we have no idea if they have been reaped properly or not.
> + * The reason for this is that for some of them the close() is in a thread different from the
> + * poll loop.
> + * Thus, the only thing we can do in this case is mark them "invalid". Once the OS reuses the
> + * same fd number, we will reuse the existing has entry.
> + */
> +
> +void
> +poll_fd_deregister_at(int fd, const char *where) {
> + struct poll_loop *loop = poll_loop();
> +
> + VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
> + struct poll_node *node;
> +
> + ovs_mutex_lock(&loop->loop_mutex);
> + node = find_poll_node(loop, fd, 0);
> + if (node) {
> + if (node->private) {
> +#ifdef OVN_USE_EPOLL
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
> +#endif
> + hmap_remove(&loop->poll_nodes, &node->hmap_node);
> + } else {
> + VLOG(VLL_WARN, "Trying to deregister a non-private %d from %s", fd, where);
> + node->valid = false;
> + }
> + }
> + ovs_mutex_unlock(&loop->loop_mutex);
> +}
> +
> +void
> +poll_fd_wait_at(int fd, int events, const char *where)
> +{
> + poll_fd_subscribe_at(fd, 0, events, NULL, where, false);
> }
>
> -/* Registers 'fd' as waiting for the specified 'events' (which should be OVS_POLLIN
> - * or OVS_POLLOUT or OVS_POLLIN | OVS_POLLOUT). The following call to poll_block() will
> - * wake up when 'fd' becomes ready for one or more of the requested events.
> - *
> - * On Windows, 'fd' must be a socket.
> - *
> - * The event registration is one-shot: only the following call to poll_block()
> - * is affected. The event will need to be re-registered after poll_block() is
> - * called if it is to persist.
> - *
> - * ('where' is used in debug logging. Commonly one would use poll_fd_wait() to
> - * automatically provide the caller's source file and line number for
> - * 'where'.) */
> void
> -poll_fd_wait_at(int fd, short int events, const char *where)
> +private_poll_fd_wait_at(int fd, int events, const char *where)
> {
> - poll_create_node(fd, 0, events, where);
> + /* POLLIN persists on "private" fds - either emulated or at epoll
> + * or other persistence framework level
> + */
> + if (events & (~OVS_POLLIN)) {
> + poll_fd_subscribe_at(fd, 0, events, NULL, where, true);
> + }
> }
>
> +
> #ifdef _WIN32
> /* Registers for the next call to poll_block() to wake up when 'wevent' is
> * signaled.
> @@ -163,7 +245,7 @@ poll_fd_wait_at(int fd, short int events, const char *where)
> void
> poll_wevent_wait_at(HANDLE wevent, const char *where)
> {
> - poll_create_node(0, wevent, 0, where);
> + poll_fd_subscribe_at(0, wevent, 0, NULL, where);
> }
> #endif /* _WIN32 */
>
> @@ -277,9 +359,12 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
> if (pollfd->revents & OVS_POLLHUP) {
> ds_put_cstr(&s, "[OVS_POLLHUP]");
> }
> +#ifndef OVS_USE_EPOLL
> + /* epoll does not have NVAL - it uses RDHUP and HUP which we cannot actually get to here*/
> if (pollfd->revents & OVS_POLLNVAL) {
> ds_put_cstr(&s, "[OVS_POLLNVAL]");
> }
> +#endif
> ds_put_format(&s, " on fd %d (%s)", pollfd->fd, description);
> free(description);
> } else {
> @@ -295,12 +380,17 @@ log_wakeup(const char *where, const struct pollfd *pollfd, int timeout)
> ds_destroy(&s);
> }
>
> +
> static void
> free_poll_nodes(struct poll_loop *loop)
> {
> struct poll_node *node, *next;
>
> + ovs_mutex_lock(&loop->loop_mutex);
> HMAP_FOR_EACH_SAFE (node, next, hmap_node, &loop->poll_nodes) {
> +#ifdef OVS_USE_EPOLL
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_DEL, node->pollfd.fd, NULL);
> +#endif
> hmap_remove(&loop->poll_nodes, &node->hmap_node);
> #ifdef _WIN32
> if (node->wevent && node->pollfd.fd) {
> @@ -310,6 +400,7 @@ free_poll_nodes(struct poll_loop *loop)
> #endif
> free(node);
> }
> + ovs_mutex_unlock(&loop->loop_mutex);
> }
>
> /* Blocks until one or more of the events registered with poll_fd_wait()
> @@ -320,8 +411,13 @@ poll_block(void)
> {
> struct poll_loop *loop = poll_loop();
> struct poll_node *node;
> +#ifndef OVS_USE_EPOLL
> struct pollfd *pollfds;
> +#endif
> +#ifndef OVS_USE_EPOLL
> HANDLE *wevents = NULL;
> + int counter;
> +#endif
> int elapsed;
> int retval;
> int i;
> @@ -335,54 +431,126 @@ poll_block(void)
> }
>
> timewarp_run();
> - pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
>
> +#ifdef OVS_USE_EPOLL
> + retval = time_epoll_wait(loop->epoll_fd,
> + (struct epoll_event *) &loop->epoll_events, MAX_EPOLL_EVENTS, loop->timeout_when, &elapsed);
> + if (retval < 0) {
> + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> + VLOG_ERR_RL(&rl, "epoll: %s", ovs_strerror(retval));
> + } else if (!retval) {
> + log_wakeup(loop->timeout_where, NULL, elapsed);
> + } else {
> + ovs_mutex_lock(&loop->loop_mutex);
> + if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> + for (i = 0; i < retval; i++) {
> + node = (struct poll_node *) loop->epoll_events[i].data.ptr;
> + if (loop->epoll_events[i].events) {
> + node->pollfd.revents = loop->epoll_events[i].events;
> + log_wakeup(node->where, &node->pollfd, 0);
> + }
> + }
> + }
> + for (i = 0; i < retval; i++) {
> + node = (struct poll_node *) loop->epoll_events[i].data.ptr;
> + if (loop->epoll_events[i].events & EPOLLHUP) {
> + /* File descriptor closed already elsewhere
> + * We have to make the assumption that whoever closed it has
> + * ensured that anything which refers to IO event hints will not run
> + * on this fd after we free it.
> + */
> + node->valid = false;
> + }
> + if (loop->epoll_events[i].events) {
> + node->pollfd.revents |= (loop->epoll_events[i].events & 0x0000FFFF);
> + }
> + if (loop->epoll_events[i].events & OVS_POLLOUT) {
> + struct epoll_event event;
> + node->pollfd.events = OVS_POLLIN; /* reset back to defaults - write needs one shot */
> + event.events = node->pollfd.events;
> + event.data.ptr = node;
> + epoll_ctl(loop->epoll_fd, EPOLL_CTL_MOD, node->pollfd.fd, &event);
> + }
> + }
> + ovs_mutex_unlock(&loop->loop_mutex);
> + }
> +#else
> + pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
> #ifdef _WIN32
> wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
> #endif
>
> +
> /* Populate with all the fds and events. */
> - i = 0;
> + counter = 0;
> HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> - pollfds[i] = node->pollfd;
> + if ((node->valid) && (node->pollfd.events)) {
> + pollfds[counter] = node->pollfd;
> #ifdef _WIN32
> - wevents[i] = node->wevent;
> - if (node->pollfd.fd && node->wevent) {
> - short int wsa_events = 0;
> - if (node->pollfd.events & OVS_POLLIN) {
> - wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> + wevents[counter] = node->wevent;
> + if (node->pollfd.fd && node->wevent) {
> + short int wsa_events = 0;
> + if (node->pollfd.events & OVS_POLLIN) {
> + wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> + }
> + if (node->pollfd.events & OVS_POLLOUT) {
> + wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> + }
> + WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
> }
> - if (node->pollfd.events & OVS_POLLOUT) {
> - wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> - }
> - WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
> - }
> #endif
> - i++;
> + counter++;
> + }
> }
>
> - retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
> + retval = time_poll(pollfds, counter, wevents,
> loop->timeout_when, &elapsed);
> if (retval < 0) {
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
> - } else if (!retval) {
> + } else if (retval == 0) {
> log_wakeup(loop->timeout_where, NULL, elapsed);
> - } else if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> - i = 0;
> - HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> + } else {
> + for (i = 0; i < counter; i++) {
> if (pollfds[i].revents) {
> - log_wakeup(node->where, &pollfds[i], 0);
> +
> + node = find_poll_node(loop, pollfds[i].fd, 0);
> +
> + if (!node) {
> + VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
> + }
> + if (pollfds[i].revents & (OVS_POLLHUP | OVS_POLLNVAL)) {
> + node->valid = false;
> + }
> +
> + if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> + log_wakeup(node->where, &pollfds[i], 0);
> + }
> + /* update "requested" events.
> + * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
> + * behaviour which they should be using in real life instead of using poll()
> + */
> + if (node->private) {
> + node->pollfd.events &= ~(pollfds[i].revents & (~OVS_POLLIN));
> + } else {
> + node->pollfd.events &= ~pollfds[i].revents;
> + }
> + /* update "occured" events for use by streams and handlers. In case there
> + * is an existing (but not consumed yet) event, we OR the events in the
> + * stored record with the new ones - it is the job of the stream to clear
> + * that.
> + */
> + node->pollfd.revents |= pollfds[i].revents;
> }
> - i++;
> }
> }
>
> - free_poll_nodes(loop);
> + free(pollfds);
> + if (wevents)
> + free(wevents);
> +#endif
> loop->timeout_when = LLONG_MAX;
> loop->timeout_where = NULL;
> - free(pollfds);
> - free(wevents);
>
> /* Handle any pending signals before doing anything else. */
> fatal_signal_run();
> @@ -416,8 +584,12 @@ poll_loop(void)
> if (!loop) {
> loop = xzalloc(sizeof *loop);
> loop->timeout_when = LLONG_MAX;
> + ovs_mutex_init(&loop->loop_mutex);
> hmap_init(&loop->poll_nodes);
> xpthread_setspecific(key, loop);
> +#ifdef OVS_USE_EPOLL
> + loop->epoll_fd = epoll_create(MAX_EPOLL_EVENTS);
> +#endif
> }
> return loop;
> }
> diff --git a/lib/route-table-bsd.c b/lib/route-table-bsd.c
> index 3dfa80c7f..16d155989 100644
> --- a/lib/route-table-bsd.c
> +++ b/lib/route-table-bsd.c
> @@ -34,6 +34,7 @@
> #include "ovs-router.h"
> #include "packets.h"
> #include "openvswitch/vlog.h"
> +#include "openvswitch/poll-loop.h"
> #include "util.h"
>
> VLOG_DEFINE_THIS_MODULE(route_table_bsd);
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 62f768d45..6a80d6e05 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -40,6 +40,8 @@ struct stream_fd
> struct stream stream;
> int fd;
> int fd_type;
> + bool rx_ready, tx_ready;
> + struct pollfd *hint;
> };
>
> static const struct stream_class stream_fd_class;
> @@ -67,7 +69,14 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
> stream_init(&s->stream, &stream_fd_class, connect_status, name);
> s->fd = fd;
> s->fd_type = fd_type;
> + s->rx_ready = true;
> + s->tx_ready = true;
> + s->hint = NULL;
> *streamp = &s->stream;
> + /* Persistent registration - we always get POLLINs from now on,
> + * POLLOUTs when we ask for them
> + */
> + poll_fd_register(s->fd, OVS_POLLIN, &s->hint);
> return 0;
> }
>
> @@ -82,6 +91,8 @@ static void
> fd_close(struct stream *stream)
> {
> struct stream_fd *s = stream_fd_cast(stream);
> + /* Deregister the FD from any persistent registrations if supported */
> + poll_fd_deregister(s->fd);
> closesocket(s->fd);
> free(s);
> }
> @@ -104,6 +115,24 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
> ssize_t retval;
> int error;
>
> + if (s->hint) {
> + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> + * to the read which should return 0 if the HUP is a real one, if not we clear it
> + * for all other cases we belive what (e)poll has fed us.
> + */
> + if ((!(s->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (!s->rx_ready)) {
> + if (!(s->hint->revents & OVS_POLLIN)) {
> + return -EAGAIN;
> + } else {
> + /* POLLIN event from poll loop, mark us as ready */
> + s->rx_ready = true;
> + s->hint->revents &= ~OVS_POLLIN;
> + }
> + } else {
> + s->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
> + }
> + }
> +
> retval = recv(s->fd, buffer, n, 0);
> if (retval < 0) {
> error = sock_errno();
> @@ -114,6 +143,8 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
> #endif
> if (error != EAGAIN) {
> VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
> + } else {
> + s->rx_ready = false;
> }
> return -error;
> }
> @@ -127,9 +158,29 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
> ssize_t retval;
> int error;
>
> + if (s->hint) {
> + /* poll-loop is providing us with hints for IO */
> + if (!s->tx_ready) {
> + if (!(s->hint->revents & OVS_POLLOUT)) {
> + return -EAGAIN;
> + } else {
> + /* POLLOUT event from poll loop, mark us as ready */
> + s->tx_ready = true;
> + s->hint->revents &= ~OVS_POLLOUT;
> + }
> + }
> + }
> retval = send(s->fd, buffer, n, 0);
> if (retval < 0) {
> error = sock_errno();
> +#ifdef __linux__
> + /* Linux will sometimes return ENOBUFS on sockets instead of EAGAIN. Usually seen
> + * on unix domain sockets
> + */
> + if (error == ENOBUFS) {
> + error = EAGAIN;
> + }
> +#endif
> #ifdef _WIN32
> if (error == WSAEWOULDBLOCK) {
> error = EAGAIN;
> @@ -137,6 +188,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
> #endif
> if (error != EAGAIN) {
> VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
> + } else {
> + s->tx_ready = false;
> }
> return -error;
> }
> @@ -150,11 +203,11 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
> switch (wait) {
> case STREAM_CONNECT:
> case STREAM_SEND:
> - poll_fd_wait(s->fd, OVS_POLLOUT);
> + private_poll_fd_wait(s->fd, OVS_POLLOUT);
> break;
>
> case STREAM_RECV:
> - poll_fd_wait(s->fd, OVS_POLLIN);
> + private_poll_fd_wait(s->fd, OVS_POLLIN);
> break;
>
> default:
> @@ -223,6 +276,8 @@ new_fd_pstream(char *name, int fd,
> ps->accept_cb = accept_cb;
> ps->unlink_path = unlink_path;
> *pstreamp = &ps->pstream;
> + /* persistent registration */
> + poll_fd_register(ps->fd, OVS_POLLIN, NULL);
> return 0;
> }
>
> @@ -230,6 +285,7 @@ static void
> pfd_close(struct pstream *pstream)
> {
> struct fd_pstream *ps = fd_pstream_cast(pstream);
> + poll_fd_deregister(ps->fd);
> closesocket(ps->fd);
> maybe_unlink_and_free(ps->unlink_path);
> free(ps);
> @@ -271,7 +327,7 @@ static void
> pfd_wait(struct pstream *pstream)
> {
> struct fd_pstream *ps = fd_pstream_cast(pstream);
> - poll_fd_wait(ps->fd, OVS_POLLIN);
> + private_poll_fd_wait(ps->fd, OVS_POLLIN);
> }
>
> static const struct pstream_class fd_pstream_class = {
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 3b7f9865e..53ae51c1b 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -147,6 +147,7 @@ struct ssl_stream
> /* A few bytes of header data in case SSL negotiation fails. */
> uint8_t head[2];
> short int n_head;
> + struct pollfd *hint;
> };
>
> /* SSL context created by ssl_init(). */
> @@ -310,6 +311,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
> SSL_set_msg_callback_arg(ssl, sslv);
> }
>
> +
> + poll_fd_register(sslv->fd, OVS_POLLIN, &sslv->hint);
> *streamp = &sslv->stream;
> free(server_name);
> return 0;
> @@ -604,6 +607,7 @@ ssl_close(struct stream *stream)
> ERR_clear_error();
>
> SSL_free(sslv->ssl);
> + poll_fd_deregister(sslv->fd);
> closesocket(sslv->fd);
> free(sslv);
> }
> @@ -697,6 +701,27 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
> /* Behavior of zero-byte SSL_read is poorly defined. */
> ovs_assert(n > 0);
>
> + if (sslv->hint) {
> + /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> + * to the read which should return 0 if the HUP is a real one, if not we clear it
> + * for all other cases we belive what (e)poll has fed us.
> + */
> + if ((!(sslv->hint->revents & (OVS_POLLHUP|OVS_POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
> + if (!(sslv->hint->revents & OVS_POLLIN)) {
> + return -EAGAIN;
> + } else {
> + /* POLLIN event from poll loop, mark us as ready
> + * rx_want is cleared further down by reading ssl fsm
> + */
> + sslv->hint->revents &= ~OVS_POLLIN;
> + }
> + } else {
> + sslv->hint->revents &= ~(OVS_POLLHUP|OVS_POLLNVAL);
> + }
> + }
> +
> +
> +
> old_state = SSL_get_state(sslv->ssl);
> ret = SSL_read(sslv->ssl, buffer, n);
> if (old_state != SSL_get_state(sslv->ssl)) {
> @@ -729,6 +754,19 @@ ssl_do_tx(struct stream *stream)
> {
> struct ssl_stream *sslv = ssl_stream_cast(stream);
>
> + if (sslv->hint) {
> + /* poll-loop is providing us with hints for IO */
> + if (sslv->tx_want == SSL_WRITING) {
> + if (!(sslv->hint->revents & OVS_POLLOUT)) {
> + return EAGAIN;
> + } else {
> + /* POLLIN event from poll loop, mark us as ready
> + * rx_want is cleared further down by reading ssl fsm
> + */
> + sslv->hint->revents &= ~OVS_POLLOUT;
> + }
> + }
> + }
> for (;;) {
> int old_state = SSL_get_state(sslv->ssl);
> int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
> @@ -771,6 +809,8 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
> ssl_clear_txbuf(sslv);
> return n;
> case EAGAIN:
> + /* we want to know when this fd will become available again */
> + stream_send_wait(stream);
> return n;
> default:
> ssl_clear_txbuf(sslv);
> @@ -795,7 +835,7 @@ ssl_run_wait(struct stream *stream)
> struct ssl_stream *sslv = ssl_stream_cast(stream);
>
> if (sslv->tx_want != SSL_NOTHING) {
> - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> + private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> }
> }
>
> @@ -811,13 +851,13 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
> } else {
> switch (sslv->state) {
> case STATE_TCP_CONNECTING:
> - poll_fd_wait(sslv->fd, OVS_POLLOUT);
> + private_poll_fd_wait(sslv->fd, OVS_POLLOUT);
> break;
>
> case STATE_SSL_CONNECTING:
> /* ssl_connect() called SSL_accept() or SSL_connect(), which
> * set up the status that we test here. */
> - poll_fd_wait(sslv->fd,
> + private_poll_fd_wait(sslv->fd,
> want_to_poll_events(SSL_want(sslv->ssl)));
> break;
>
> @@ -829,7 +869,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>
> case STREAM_RECV:
> if (sslv->rx_want != SSL_NOTHING) {
> - poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> + private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> } else {
> poll_immediate_wake();
> }
> @@ -911,6 +951,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
> ds_steal_cstr(&bound_name));
> pstream_set_bound_port(&pssl->pstream, htons(port));
> pssl->fd = fd;
> + poll_fd_register(fd, OVS_POLLIN, NULL);
> *pstreamp = &pssl->pstream;
>
> return 0;
> @@ -920,6 +961,7 @@ static void
> pssl_close(struct pstream *pstream)
> {
> struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
> + poll_fd_deregister(pssl->fd);
> closesocket(pssl->fd);
> free(pssl);
> }
> diff --git a/lib/timeval.c b/lib/timeval.c
> index 193c7bab1..59a12414f 100644
> --- a/lib/timeval.c
> +++ b/lib/timeval.c
> @@ -38,6 +38,7 @@
> #include "unixctl.h"
> #include "util.h"
> #include "openvswitch/vlog.h"
> +#include "openvswitch/poll-loop.h"
>
> VLOG_DEFINE_THIS_MODULE(timeval);
>
> @@ -369,6 +370,88 @@ time_poll(struct pollfd *pollfds, int n_pollfds, HANDLE *handles OVS_UNUSED,
> return retval;
> }
>
> +#ifdef OVS_USE_EPOLL
> +
> +/* Like epoll_wait(), except:
> + *
> + * - The timeout is specified as an absolute time, as defined by
> + * time_msec(), instead of a duration.
> + *
> + * - On error, returns a negative error code (instead of setting errno).
> + *
> + * - If interrupted by a signal, retries automatically until the original
> + * timeout is reached. (Because of this property, this function will
> + * never return -EINTR.)
> + *
> + * Stores the number of milliseconds elapsed during poll in '*elapsed'. */
> +int
> +time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
> + long long int timeout_when, int *elapsed)
> +{
> + long long int *last_wakeup = last_wakeup_get();
> + long long int start;
> + bool quiescent;
> + int retval = 0;
> +
> + time_init();
> + coverage_clear();
> + coverage_run();
> + if (*last_wakeup && !thread_is_pmd()) {
> + log_poll_interval(*last_wakeup);
> + }
> + start = time_msec();
> +
> + timeout_when = MIN(timeout_when, deadline);
> + quiescent = ovsrcu_is_quiescent();
> +
> + for (;;) {
> + long long int now = time_msec();
> + int time_left;
> +
> + if (now >= timeout_when) {
> + time_left = 0;
> + } else if ((unsigned long long int) timeout_when - now > INT_MAX) {
> + time_left = INT_MAX;
> + } else {
> + time_left = timeout_when - now;
> + }
> +
> + if (!quiescent) {
> + if (!time_left) {
> + ovsrcu_quiesce();
> + } else {
> + ovsrcu_quiesce_start();
> + }
> + }
> +
> + retval = epoll_wait(epoll_fd, events, max, time_left);
> + if (retval < 0) {
> + retval = -errno;
> + }
> +
> + if (!quiescent && time_left) {
> + ovsrcu_quiesce_end();
> + }
> +
> + if (deadline <= time_msec()) {
> + fatal_signal_handler(SIGALRM);
> + if (retval < 0) {
> + retval = 0;
> + }
> + break;
> + }
> +
> + if (retval != -EINTR) {
> + break;
> + }
> + }
> + *last_wakeup = time_msec();
> + refresh_rusage();
> + *elapsed = *last_wakeup - start;
> + return retval;
> +}
> +#endif
> +
> long long int
> timespec_to_msec(const struct timespec *ts)
> {
> diff --git a/lib/timeval.h b/lib/timeval.h
> index 502f703d4..347a09d63 100644
> --- a/lib/timeval.h
> +++ b/lib/timeval.h
> @@ -20,6 +20,9 @@
> #include <time.h>
> #include "openvswitch/type-props.h"
> #include "util.h"
> +#ifdef __linux__
> +#include <sys/epoll.h>
> +#endif
>
> #ifdef __cplusplus
> extern "C" {
> @@ -61,6 +64,10 @@ void time_wall_timespec(struct timespec *);
> void time_alarm(unsigned int secs);
> int time_poll(struct pollfd *, int n_pollfds, HANDLE *handles,
> long long int timeout_when, int *elapsed);
> +#ifdef __linux__
> +int time_epoll_wait(int epoll_fd, struct epoll_event *events, int max,
> + long long int timeout_when, int *elapsed);
> +#endif
>
> long long int timespec_to_msec(const struct timespec *);
> long long int timespec_to_usec(const struct timespec *);
>
--
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/
More information about the dev
mailing list