[ovs-dev] [PATCH v4 1/2] Add file descriptor persistence where possible

Aaron Conole aconole at redhat.com
Thu Mar 12 18:49:30 UTC 2020


anton.ivanov at cambridgegreys.com writes:

> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> 1. Adds "persistent" behaviour where feasible streams.
> These are waited upon in the same thread where they are created. This
> allows them to be registered persistently with the OS (if possible)
> as well as the OS to provide hints - is the FD ready, is it closed,
> etc.
>
> 2. Removes unnecessary attempts to perform a read vs EAGAIN on a fd
> which is not ready if the thread has a persistent poll loop.
>
> 3. Enables fd persistence for ovsdb main server thread only
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---

Hi Anton,

Just a really cursory review.  The robot flagged a bunch of things to
address.

>  include/openvswitch/poll-loop.h |  33 ++++++
>  lib/poll-loop.c                 | 173 ++++++++++++++++++++++++++++++--
>  lib/stream-fd.c                 |  63 +++++++++++-
>  lib/stream-provider.h           |   4 +
>  lib/stream-ssl.c                |  78 +++++++++++++-
>  lib/stream-windows.c            |   2 +
>  ovsdb/ovsdb-server.c            |   1 +
>  tests/ovsdb-cluster.at          |   6 +-
>  8 files changed, 340 insertions(+), 20 deletions(-)
>
> diff --git a/include/openvswitch/poll-loop.h b/include/openvswitch/poll-loop.h
> index 532640497..50d49a8ed 100644
> --- a/include/openvswitch/poll-loop.h
> +++ b/include/openvswitch/poll-loop.h
> @@ -33,6 +33,8 @@
>  #ifndef POLL_LOOP_H
>  #define POLL_LOOP_H 1
>  
> +#include <stdbool.h>
> +
>  #ifndef _WIN32
>  #include <poll.h>
>  #endif
> @@ -45,6 +47,24 @@
>  extern "C" {
>  #endif
>  
> +/* Register a fd with a persistence framework if available so it can be served
> + * "faster" and the caller can be provided with "hints" on what caused the IO
> + * event.
> + * If the "hint" argument is supplied it set to point to the pollfd structure
> + * containing the events passed by the OS in .revents. 
> + * returns false if persistence is not enabled.
> + */
> +
> +bool poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where);
> +#define poll_fd_register(fd, events, hint) poll_fd_register_at(fd, events, hint, OVS_SOURCE_LOCATOR)
> +
> +/* De-register a fd which was registered as "private" with the persistence
> + * framework
> + */
> +
> +void poll_fd_deregister_at(int fd, const char *where);
> +#define poll_fd_deregister(fd) poll_fd_deregister_at(fd, OVS_SOURCE_LOCATOR)
> +
>  
>  /* Schedule events to wake up the following poll_block().
>   *
> @@ -58,6 +78,15 @@ extern "C" {
>  void poll_fd_wait_at(int fd, short int events, const char *where);
>  #define poll_fd_wait(fd, events) poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
>  
> +/* Schedule events to wake up the following poll_block() - "private fds"
> + * Same as poll_fd_wait, but for fds which have been registered and are
> + * expected to persist. If a "fast" OS fd notification framework is used
> + * this version of wait may be a NOOP (f.e. for (E)POLLIN events.
> + */
> +void private_poll_fd_wait_at(int fd, int events, const char *where);
> +#define private_poll_fd_wait(fd, events) private_poll_fd_wait_at(fd, events, OVS_SOURCE_LOCATOR)
> +
> +
>  #ifdef _WIN32
>  void poll_wevent_wait_at(HANDLE wevent, const char *where);
>  #define poll_wevent_wait(wevent) poll_wevent_wait_at(wevent, OVS_SOURCE_LOCATOR)
> @@ -76,6 +105,10 @@ void poll_immediate_wake_at(const char *where);
>  /* Wait until an event occurs. */
>  void poll_block(void);
>  
> +/* Enable persistence for this thread's poll loop */
> +void poll_enable_persist(void);
> +
> +
>  #ifdef  __cplusplus
>  }
>  #endif
> diff --git a/lib/poll-loop.c b/lib/poll-loop.c
> index 4e751ff2c..ed8ad16b1 100644
> --- a/lib/poll-loop.c
> +++ b/lib/poll-loop.c
> @@ -43,6 +43,7 @@ struct poll_node {
>      struct pollfd pollfd;       /* Events to pass to time_poll(). */
>      HANDLE wevent;              /* Events for WaitForMultipleObjects(). */
>      const char *where;          /* Where poll_node was created. */
> +    bool valid;                 /* Marked invalid if we got a HUP/NVAL from poll */
>  };
>  
>  struct poll_loop {
> @@ -53,6 +54,7 @@ struct poll_loop {
>       * wake up immediately, or LLONG_MAX to wait forever. */
>      long long int timeout_when; /* In msecs as returned by time_msec(). */
>      const char *timeout_where;  /* Where 'timeout_when' was set. */
> +    bool persist;
>  };
>  
>  static struct poll_loop *poll_loop(void);
> @@ -99,8 +101,8 @@ find_poll_node(struct poll_loop *loop, int fd, HANDLE wevent)
>   * ('where' is used in debug logging.  Commonly one would use poll_fd_wait() to
>   * automatically provide the caller's source file and line number for
>   * 'where'.) */
> -static void
> -poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
> +static struct poll_node
> +*poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>  {
>      struct poll_loop *loop = poll_loop();
>      struct poll_node *node;
> @@ -127,7 +129,9 @@ poll_create_node(int fd, HANDLE wevent, short int events, const char *where)
>  #endif
>          node->wevent = wevent;
>          node->where = where;
> +        node->valid = true;
>      }
> +    return node;
>  }
>  
>  /* Registers 'fd' as waiting for the specified 'events' (which should be POLLIN
> @@ -149,6 +153,44 @@ poll_fd_wait_at(int fd, short int events, const char *where)
>      poll_create_node(fd, 0, events, where);
>  }
>  
> +void
> +private_poll_fd_wait_at(int fd, int events, const char *where)
> +{
> +    if (events & (~POLLIN)) {
> +        poll_create_node(fd, 0, events, where);
> +    }
> +}
> +
> +
> +bool
> +poll_fd_register_at(int fd, int events, struct pollfd **hint, const char *where) {
> +    struct poll_loop *loop = poll_loop();
> +    struct poll_node *node;
> +    if (loop->persist) {
> +        node = poll_create_node(fd, 0, events, where);
> +        if (hint) {
> +            *hint = &node->pollfd;
> +        }
> +        return true;
> +    } 
> +    return false;
> +}
> +
> +
> +void
> +poll_fd_deregister_at(int fd, const char *where) {
> +    struct poll_loop *loop = poll_loop();
> +    struct poll_node *node;
> +
> +    VLOG(VLL_DBG, "Deregister %d from %s", fd, where);
> +
> +    node = find_poll_node(loop, fd, 0);
> +    if (node) {
> +        hmap_remove(&loop->poll_nodes, &node->hmap_node);
> +    }
> +}
> +
> +
>  #ifdef _WIN32
>  /* Registers for the next call to poll_block() to wake up when 'wevent' is
>   * signaled.
> @@ -312,13 +354,9 @@ free_poll_nodes(struct poll_loop *loop)
>      }
>  }
>  
> -/* Blocks until one or more of the events registered with poll_fd_wait()
> - * occurs, or until the minimum duration registered with poll_timer_wait()
> - * elapses, or not at all if poll_immediate_wake() has been called. */
> -void
> -poll_block(void)
> +static void
> +non_persist_poll_block(struct poll_loop *loop)
>  {
> -    struct poll_loop *loop = poll_loop();
>      struct poll_node *node;
>      struct pollfd *pollfds;
>      HANDLE *wevents = NULL;
> @@ -389,7 +427,117 @@ poll_block(void)
>  
>      seq_woke();
>  }
> -
> +
> +static void
> +persist_poll_block(struct poll_loop *loop)
> +{
> +    struct poll_node *node;
> +    struct pollfd *pollfds;
> +    HANDLE *wevents = NULL;
> +    int elapsed;
> +    int retval;
> +    int i, counter;

Please initialize 'i' here.

> +
> +    /* Register fatal signal events before actually doing any real work for
> +     * poll_block. */
> +    fatal_signal_wait();
> +
> +    if (loop->timeout_when == LLONG_MIN) {
> +        COVERAGE_INC(poll_zero_timeout);
> +    }
> +
> +    timewarp_run();
> +    pollfds = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *pollfds);
> +
> +#ifdef _WIN32
> +    wevents = xmalloc(hmap_count(&loop->poll_nodes) * sizeof *wevents);
> +#endif
> +
> +    /* Populate with all the fds and events. */
> +    counter = 0;
> +    HMAP_FOR_EACH (node, hmap_node, &loop->poll_nodes) {
> +        if (node->pollfd.events && node->valid) {
> +            pollfds[counter] = node->pollfd;
> +            counter++;
> +        }
> +#ifdef _WIN32
> +        wevents[i] = node->wevent;
> +        if (node->pollfd.fd && node->wevent) {
> +            short int wsa_events = 0;
> +            if (node->pollfd.events & POLLIN) {
> +                wsa_events |= FD_READ | FD_ACCEPT | FD_CLOSE;
> +            }
> +            if (node->pollfd.events & POLLOUT) {
> +                wsa_events |= FD_WRITE | FD_CONNECT | FD_CLOSE;
> +            }
> +            WSAEventSelect(node->pollfd.fd, node->wevent, wsa_events);
> +        }
> +#endif
> +        i++;

I think 'i' goes in the #define above.

> +    }
> +
> +    retval = time_poll(pollfds, hmap_count(&loop->poll_nodes), wevents,
> +                       loop->timeout_when, &elapsed);
> +    if (retval < 0) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> +        VLOG_ERR_RL(&rl, "poll: %s", ovs_strerror(-retval));
> +    } else if (retval == 0) {
> +        log_wakeup(loop->timeout_where, NULL, elapsed);
> +    } else {
> +        for (i = 0; i < counter; i++) {
> +            node = find_poll_node(loop, pollfds[i].fd, 0);
> +            if (!node) {
> +                VLOG_FATAL("poll: persistence state corrupted, no hash entry for %d", pollfds[i].fd);
> +            }
> +            if (pollfds[i].revents & (POLLHUP | POLLNVAL)) {
> +                node->valid = false;
> +            }
> +            if (get_cpu_usage() > 50 || VLOG_IS_DBG_ENABLED()) {
> +                if (pollfds[i].revents) {
> +                    log_wakeup(node->where, &pollfds[i], 0);
> +                }
> +            }
> +            /* update "requested" events.
> +            * Note - "private" fds always want POLLIN - that emulates EPOLL, /dev/poll, etc
> +            * behaviour which they should be using in real life instead of using poll()
> +            */
> +            node->pollfd.events &= ~(pollfds[i].revents & (~POLLIN));
> +            /* update "occured" events for use by streams and handlers. In case there
> +             * is an existing (but not consumed yet) event, we OR the events in the
> +             * stored record with the new ones - it is the job of the stream to clear
> +             * that.
> +             */
> +            node->pollfd.revents |= pollfds[i].revents;
> +        }
> +    }
> +
> +    loop->timeout_when = LLONG_MAX;
> +    loop->timeout_where = NULL;
> +    free(pollfds);
> +    free(wevents);
> +
> +    /* Handle any pending signals before doing anything else. */
> +    fatal_signal_run();
> +
> +    seq_woke();
> +
> +}
> +/* Blocks until one or more of the events registered with poll_fd_wait()
> + * occurs, or until the minimum duration registered with poll_timer_wait()
> + * elapses, or not at all if poll_immediate_wake() has been called. */
> +
> +void
> +poll_block(void)
> +{
> +    struct poll_loop *loop = poll_loop();
> +    if (loop->persist) {
> +        persist_poll_block(loop);
> +    } else {
> +        non_persist_poll_block(loop);
> +    }
> +}
> +
> +
>  static void
>  free_poll_loop(void *loop_)
>  {
> @@ -400,6 +548,12 @@ free_poll_loop(void *loop_)
>      free(loop);
>  }
>  
> +void poll_enable_persist(void) {
> +    struct poll_loop *loop = poll_loop();
> +
> +    loop->persist = true;
> +}
> +
>  static struct poll_loop *
>  poll_loop(void)
>  {
> @@ -418,6 +572,7 @@ poll_loop(void)
>          loop->timeout_when = LLONG_MAX;
>          hmap_init(&loop->poll_nodes);
>          xpthread_setspecific(key, loop);
> +        loop->persist = false;
>      }
>      return loop;
>  }
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 46ee7ae27..22261fedb 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -65,6 +65,9 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
>  
>      s = xmalloc(sizeof *s);
>      stream_init(&s->stream, &stream_fd_class, connect_status, name);
> +    s->stream.persist = poll_fd_register(fd, POLLIN, &s->stream.hint);
> +    s->stream.rx_ready = true;
> +    s->stream.tx_ready = true;
>      s->fd = fd;
>      s->fd_type = fd_type;
>      *streamp = &s->stream;
> @@ -82,6 +85,9 @@ static void
>  fd_close(struct stream *stream)
>  {
>      struct stream_fd *s = stream_fd_cast(stream);
> +    if (s->stream.persist) {
> +        poll_fd_deregister(s->fd);
> +    }
>      closesocket(s->fd);
>      free(s);
>  }
> @@ -104,6 +110,26 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>      ssize_t retval;
>      int error;
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (!stream->rx_ready)) {
> +            if (!(stream->hint->revents & POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready */
> +                stream->rx_ready = true;
> +                stream->hint->revents &= ~POLLIN;
> +            }
> +        } else {
> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
> +        }
> +    }
> +
> +
> +
>      retval = recv(s->fd, buffer, n, 0);
>      if (retval < 0) {
>          error = sock_errno();
> @@ -127,6 +153,19 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>      ssize_t retval;
>      int error;
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (!stream->tx_ready) {
> +            if (!(stream->hint->revents & POLLOUT)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLOUT event from poll loop, mark us as ready */
> +                stream->tx_ready = true;
> +                stream->hint->revents &= ~POLLOUT;
> +            }
> +        }
> +    }
> +
>      retval = send(s->fd, buffer, n, 0);
>      if (retval < 0) {
>          error = sock_errno();
> @@ -137,6 +176,8 @@ fd_send(struct stream *stream, const void *buffer, size_t n)
>  #endif
>          if (error != EAGAIN) {
>              VLOG_DBG_RL(&rl, "send: %s", sock_strerror(error));
> +        } else {
> +            stream->tx_ready = false;
>          }
>          return -error;
>      }
> @@ -150,11 +191,19 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>      switch (wait) {
>      case STREAM_CONNECT:
>      case STREAM_SEND:
> -        poll_fd_wait(s->fd, POLLOUT);
> +        if (stream->persist) {
> +            private_poll_fd_wait(s->fd, POLLOUT);
> +        } else {
> +            poll_fd_wait(s->fd, POLLOUT);
> +        }
>          break;
>  
>      case STREAM_RECV:
> -        poll_fd_wait(s->fd, POLLIN);
> +        if (stream->persist) {
> +            private_poll_fd_wait(s->fd, POLLIN);
> +        } else {
> +            poll_fd_wait(s->fd, POLLIN);
> +        }
>          break;
>  
>      default:
> @@ -219,6 +268,7 @@ new_fd_pstream(char *name, int fd,
>  {
>      struct fd_pstream *ps = xmalloc(sizeof *ps);
>      pstream_init(&ps->pstream, &fd_pstream_class, name);
> +    ps->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>      ps->fd = fd;
>      ps->accept_cb = accept_cb;
>      ps->unlink_path = unlink_path;
> @@ -230,6 +280,9 @@ static void
>  pfd_close(struct pstream *pstream)
>  {
>      struct fd_pstream *ps = fd_pstream_cast(pstream);
> +    if (pstream->persist) {
> +        poll_fd_deregister(ps->fd);
> +    }
>      closesocket(ps->fd);
>      maybe_unlink_and_free(ps->unlink_path);
>      free(ps);
> @@ -271,7 +324,11 @@ static void
>  pfd_wait(struct pstream *pstream)
>  {
>      struct fd_pstream *ps = fd_pstream_cast(pstream);
> -    poll_fd_wait(ps->fd, POLLIN);
> +    if (pstream->persist) {
> +        private_poll_fd_wait(ps->fd, POLLIN);
> +    } else {
> +        poll_fd_wait(ps->fd, POLLIN);
> +    }
>  }
>  
>  static const struct pstream_class fd_pstream_class = {
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..53de231dc 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -18,6 +18,7 @@
>  #define STREAM_PROVIDER_H 1
>  
>  #include <sys/types.h>
> +#include <poll.h>
>  #include "stream.h"
>  
>  /* Active stream connection. */
> @@ -31,6 +32,8 @@ struct stream {
>      int error;
>      char *name;
>      char *peer_id;
> +    bool persist, rx_ready, tx_ready;
> +    struct pollfd *hint;
>  };
>  
>  void stream_init(struct stream *, const struct stream_class *,
> @@ -133,6 +136,7 @@ struct pstream {
>      const struct pstream_class *class;
>      char *name;
>      ovs_be16 bound_port;
> +    bool persist;
>  };
>  
>  void pstream_init(struct pstream *, const struct pstream_class *, char *name);
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 078fcbc3a..056bb2023 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -310,6 +310,10 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
>          SSL_set_msg_callback_arg(ssl, sslv);
>      }
>  
> +    sslv->stream.persist = poll_fd_register(fd, POLLIN, &sslv->stream.hint);
> +    sslv->stream.rx_ready = true;
> +    sslv->stream.tx_ready = true;
> +
>      *streamp = &sslv->stream;
>      free(server_name);
>      return 0;
> @@ -604,6 +608,7 @@ ssl_close(struct stream *stream)
>      ERR_clear_error();
>  
>      SSL_free(sslv->ssl);
> +    poll_fd_deregister(sslv->fd);
>      closesocket(sslv->fd);
>      free(sslv);
>  }
> @@ -697,6 +702,26 @@ ssl_recv(struct stream *stream, void *buffer, size_t n)
>      /* Behavior of zero-byte SSL_read is poorly defined. */
>      ovs_assert(n > 0);
>  
> +    if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO. If we got a HUP/NVAL we skip straight
> +         * to the read which should return 0 if the HUP is a real one, if not we clear it
> +         * for all other cases we belive what (e)poll has fed us.
> +         */
> +        if ((!(stream->hint->revents & (POLLHUP | POLLNVAL))) && (sslv->rx_want == SSL_READING)) {
> +            if (!(stream->hint->revents & POLLIN)) {
> +                return -EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready 
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                stream->hint->revents &= ~POLLIN;
> +            }
> +        } else {
> +            stream->hint->revents &= ~(POLLHUP | POLLNVAL);
> +        }
> +    }
> +
> +
>      old_state = SSL_get_state(sslv->ssl);
>      ret = SSL_read(sslv->ssl, buffer, n);
>      if (old_state != SSL_get_state(sslv->ssl)) {
> @@ -729,6 +754,21 @@ ssl_do_tx(struct stream *stream)
>  {
>      struct ssl_stream *sslv = ssl_stream_cast(stream);
>  
> +     if (stream->persist && stream->hint) {
> +        /* poll-loop is providing us with hints for IO */
> +        if (sslv->tx_want == SSL_WRITING) {
> +            if (!(stream->hint->revents & POLLOUT)) {
> +                return EAGAIN;
> +            } else {
> +                /* POLLIN event from poll loop, mark us as ready
> +                 * rx_want is cleared further down by reading ssl fsm
> +                 */
> +                stream->hint->revents &= ~POLLOUT;
> +            }
> +        }
> +    }
> +
> +
>      for (;;) {
>          int old_state = SSL_get_state(sslv->ssl);
>          int ret = SSL_write(sslv->ssl, sslv->txbuf->data, sslv->txbuf->size);
> @@ -771,6 +811,9 @@ ssl_send(struct stream *stream, const void *buffer, size_t n)
>              ssl_clear_txbuf(sslv);
>              return n;
>          case EAGAIN:
> +            if (stream->persist) {
> +                stream_send_wait(stream);
> +            }
>              return n;
>          default:
>              ssl_clear_txbuf(sslv);
> @@ -795,7 +838,11 @@ ssl_run_wait(struct stream *stream)
>      struct ssl_stream *sslv = ssl_stream_cast(stream);
>  
>      if (sslv->tx_want != SSL_NOTHING) {
> -        poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        if (stream->persist) {
> +            private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        } else {
> +            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->tx_want));
> +        }
>      }
>  }
>  
> @@ -811,14 +858,23 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>          } else {
>              switch (sslv->state) {
>              case STATE_TCP_CONNECTING:
> -                poll_fd_wait(sslv->fd, POLLOUT);
> +                if (stream->persist) {
> +                    private_poll_fd_wait(sslv->fd, POLLOUT);
> +                } else {
> +                    poll_fd_wait(sslv->fd, POLLOUT);
> +                }
>                  break;
>  
>              case STATE_SSL_CONNECTING:
>                  /* ssl_connect() called SSL_accept() or SSL_connect(), which
>                   * set up the status that we test here. */
> -                poll_fd_wait(sslv->fd,
> +                if (stream->persist) {
> +                    private_poll_fd_wait(sslv->fd,
> +                               want_to_poll_events(SSL_want(sslv->ssl)));
> +                } else {
> +                    poll_fd_wait(sslv->fd,
>                                 want_to_poll_events(SSL_want(sslv->ssl)));
> +                }
>                  break;
>  
>              default:
> @@ -829,7 +885,11 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>  
>      case STREAM_RECV:
>          if (sslv->rx_want != SSL_NOTHING) {
> -            poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            if (stream->persist) {
> +                private_poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            } else {
> +                poll_fd_wait(sslv->fd, want_to_poll_events(sslv->rx_want));
> +            }
>          } else {
>              poll_immediate_wake();
>          }
> @@ -911,6 +971,7 @@ pssl_open(const char *name OVS_UNUSED, char *suffix, struct pstream **pstreamp,
>                   ds_steal_cstr(&bound_name));
>      pstream_set_bound_port(&pssl->pstream, htons(port));
>      pssl->fd = fd;
> +    pssl->pstream.persist = poll_fd_register(fd, POLLIN, NULL);
>      *pstreamp = &pssl->pstream;
>  
>      return 0;
> @@ -921,6 +982,9 @@ pssl_close(struct pstream *pstream)
>  {
>      struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
>      closesocket(pssl->fd);
> +    if (pstream->persist) {
> +        poll_fd_deregister(pssl->fd);
> +    }
>      free(pssl);
>  }
>  
> @@ -965,7 +1029,11 @@ static void
>  pssl_wait(struct pstream *pstream)
>  {
>      struct pssl_pstream *pssl = pssl_pstream_cast(pstream);
> -    poll_fd_wait(pssl->fd, POLLIN);
> +    if (pstream->persist) {
> +        private_poll_fd_wait(pssl->fd, POLLIN);
> +    } else {
> +        poll_fd_wait(pssl->fd, POLLIN);
> +    }
>  }
>  
>  const struct pstream_class pssl_pstream_class = {
> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
> index 34bc610b6..b845911b6 100644
> --- a/lib/stream-windows.c
> +++ b/lib/stream-windows.c
> @@ -175,6 +175,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
>      s->read_pending = false;
>      s->write_pending = false;
>      s->retry_connect = retry;
> +    s->stream.persist = false;
>      *streamp = &s->stream;
>      return 0;
>  }
> @@ -649,6 +650,7 @@ pwindows_open(const char *name OVS_UNUSED, char *suffix,
>      p->connect.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
>      p->pending = false;
>      p->pipe_path = bind_path;
> +    p->stream.persist = false;
>      *pstreamp = &p->pstream;
>      return 0;
>  }
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index b6957d730..295a62afd 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -182,6 +182,7 @@ main_loop(struct server_config *config,
>      *exiting = false;
>      ssl_error = NULL;
>      remotes_error = NULL;
> +    poll_enable_persist();
>      while (!*exiting) {
>          memory_run();
>          if (memory_should_report()) {
> diff --git a/tests/ovsdb-cluster.at b/tests/ovsdb-cluster.at
> index 3a0bd4579..1f71d4fe8 100644
> --- a/tests/ovsdb-cluster.at
> +++ b/tests/ovsdb-cluster.at
> @@ -15,7 +15,7 @@ ovsdb_check_cluster () {
>  
>      on_exit 'kill `cat *.pid`'
>      for i in `seq $n`; do
> -        AT_CHECK([ovsdb-server -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vpoll_loop -vraft -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      done
>      for i in `seq $n`; do
>          AT_CHECK([ovsdb_client_wait unix:s$i.ovsdb $schema connected])
> @@ -307,7 +307,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>      start_server() {
>          local i=$1
>          printf "\ns$i: starting\n"
> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vjsonrpc -vpoll_loop -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      }
>      connect_server() {
>          local i=$1
> @@ -465,7 +465,7 @@ ovsdb|WARN|schema: changed 30 columns in 'Open_vSwitch' database from ephemeral
>      start_server() {
>          local i=$1
>          printf "\ns$i: starting\n"
> -        AT_CHECK([ovsdb-server -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
> +        AT_CHECK([ovsdb-server -vpoll_loop -vjsonrpc -vconsole:off -vsyslog:off --detach --no-chdir --log-file=s$i.log --pidfile=s$i.pid --unixctl=s$i --remote=punix:s$i.ovsdb s$i.db])
>      }
>      stop_server() {
>          local i=$1



More information about the dev mailing list