[ovs-dev] [PATCH 2/2] Introduce async IO in JSONRPC

Mark Michelson mmichels at redhat.com
Fri May 22 15:19:54 UTC 2020


Hi Anton,

One general note about this submission is that I see some coding 
guidelines violations. One is when it comes to definitions of functions. 
The guidelines say to write these as

return_type
func_name(params)
{
}

But some of your functions are written as:

return_type func_name(params) {
}

Notice the return_type and opening parenthesis on the same line as the 
func name.

Another I see is with regards to spacing around prefix and postfix 
operators. For instance, I see pointer dereferencing being done as:

* pointer

instead of

*pointer

In general, I'd say to have a look at the guidelines. Unfortunately, 
checkpatch.py doesn't catch these issues.

I have another observation below inline:

On 5/12/20 3:41 AM, anton.ivanov at cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> 
> 1. Pull out buffering and send/receive ops from json rpc
> 
> 2. Make the SSL send zero copy (it was creating an ofpbuf
> out of an existing ofpbuf data without necessity).
> 
> 3. Add vector IO to stream-fd to make flushing more
> efficient. Also makes queueing for stream-fd and stream-ssl
> roughly identical.
> 
> 4. Unify backlog management
> 
> 5. Make use of the full capacity of the incoming buffer and
> not only when it is empty.
> 
> 6. Allow for IO to be run in worker threads
> 
> 7. Various minor fixes to enable async io - make rx errors
> visible to tx and vice versa. Make activity tracking for
> reconnect async friendly, etc.
> 
> 8. Enable Async IO in ovsdb
> 
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---
>   lib/async-io.c        | 530 ++++++++++++++++++++++++++++++++++++++++++
>   lib/async-io.h        |  86 +++++++
>   lib/automake.mk       |   2 +
>   lib/jsonrpc.c         | 151 +++++-------
>   lib/stream-fd.c       |  82 +++++++
>   lib/stream-provider.h |  34 ++-
>   lib/stream-ssl.c      |  64 ++++-
>   lib/stream-tcp.c      |   2 +
>   lib/stream-unix.c     |   2 +
>   lib/stream-windows.c  |   2 +
>   ovsdb/ovsdb-server.c  |   2 +
>   11 files changed, 864 insertions(+), 93 deletions(-)
>   create mode 100644 lib/async-io.c
>   create mode 100644 lib/async-io.h
> 
> diff --git a/lib/async-io.c b/lib/async-io.c
> new file mode 100644
> index 000000000..16d568a50
> --- /dev/null
> +++ b/lib/async-io.c
> @@ -0,0 +1,530 @@
> +/*
> + * Copyright (c) 2020 Red Hat Inc
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include "stream-provider.h"
> +#include <errno.h>
> +#include <unistd.h>
> +#include <inttypes.h>
> +#include <sys/types.h>
> +#include <netinet/in.h>
> +#include <poll.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include "coverage.h"
> +#include "fatal-signal.h"
> +#include "flow.h"
> +#include "jsonrpc.h"
> +#include "openflow/nicira-ext.h"
> +#include "openflow/openflow.h"
> +#include "openvswitch/dynamic-string.h"
> +#include "openvswitch/ofp-print.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "openvswitch/vlog.h"
> +#include "ovs-thread.h"
> +#include "ovs-atomic.h"
> +#include "packets.h"
> +#include "openvswitch/poll-loop.h"
> +#include "random.h"
> +#include "socket-util.h"
> +#include "util.h"
> +#include "timeval.h"
> +#include "async-io.h"
> +#include "ovs-numa.h"
> +
> +VLOG_DEFINE_THIS_MODULE(async_io);
> +
> +static bool allow_async_io = false;
> +
> +static bool async_io_setup = false;
> +static bool kill_async_io = false;
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools);
> +
> +static int pool_size;
> +
> +static struct async_io_pool *io_pool = NULL;
> +
> +static int do_async_recv(struct async_data *data);
> +static int do_stream_flush(struct async_data *data);
> +
> +static inline bool not_in_error(struct async_data *data) {
> +    int rx_error, tx_error;
> +
> +    if (!data->valid) {
> +        return false;
> +    }
> +
> +    atomic_read_relaxed(&data->rx_error, &rx_error);
> +    atomic_read_relaxed(&data->tx_error, &tx_error);
> +
> +    return (
> +        ((rx_error > 0) || (rx_error == -EAGAIN)) &&
> +        ((tx_error >= 0) || (tx_error == -EAGAIN))
> +    );
> +}
> +
> +static inline bool in_error(struct async_data *data) {
> +    return ! not_in_error(data);
> +}
> +
> +
> +static void *default_async_io_helper(void *arg) {
> +    struct async_io_control *io_control =
> +        (struct async_io_control *) arg;
> +    struct async_data *data;
> +    int retval;
> +
> +    do {
> +        ovs_mutex_lock(&io_control->mutex);
> +        latch_poll(&io_control->async_latch);
> +        LIST_FOR_EACH (data, list_node, &io_control->work_items) {
> +            long backlog, oldbacklog;
> +            ovs_mutex_lock(&data->mutex);
> +            retval = -EAGAIN;
> +            if (not_in_error(data)) {
> +                /*
> +                 * We stop reading if the input queue is full
> +                 */
> +                if (byteq_headroom(&data->input) != 0) {
> +                    retval = do_async_recv(data);
> +                } else {
> +                    poll_timer_wait(1);
> +                    retval = 0;
> +                }
> +            }
> +            if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) {
> +                stream_recv_wait(data->stream);
> +            }
> +            atomic_read_relaxed(&data->backlog, &oldbacklog);
> +            if (not_in_error(data)) {
> +                stream_run(data->stream);
> +                do_stream_flush(data);
> +            }
> +            atomic_read_relaxed(&data->backlog, &backlog);
> +            if (not_in_error(data)) {
> +                if (backlog) {
> +                    /* upper layers will refuse to process rx
> +                     * until the tx is clear, so no point
> +                     * notifying them
> +                     */
> +                    stream_send_wait(data->stream);
> +                } else {
> +                    /* There is no backlog, so the rpc layer will
> +                     * actually pay attention to our notifications
> +                     * We issue a notification for both pending
> +                     * input and what is the equivalent of
> +                     * "IO Completion"
> +                     */
> +                    if (!byteq_is_empty(&data->input) || oldbacklog) {
> +                        latch_set(&data->rx_notify);
> +                    }
> +                }
> +            }
> +            if (data->valid && in_error(data)) {
> +                /* make sure that the other thread(s) notice any errors.
> +                 * this should not be an else because errors may have
> +                 * changed inside the ifs above.
> +                 */
> +                latch_set(&data->rx_notify);
> +                data->valid = false;
> +            }
> +            if (not_in_error(data)) {
> +                stream_run_wait(data->stream);
> +            }
> +            ovs_mutex_unlock(&data->mutex);
> +        }
> +        ovs_mutex_unlock(&io_control->mutex);
> +        latch_wait(&io_control->async_latch);
> +        poll_block();
> +    } while (!kill_async_io);
> +    return arg;
> +}
> +
> +static void async_io_hook(void *aux OVS_UNUSED) {
> +    int i;
> +    static struct async_io_pool *pool;
> +    kill_async_io = true;
> +    LIST_FOR_EACH (pool, list_node, &io_pools) {
> +        for (i = 0; i < pool->size ; i++) {
> +            latch_set(&pool->controls[i].async_latch);
> +            latch_destroy(&pool->controls[i].async_latch);
> +        }
> +    }
> +}
> +
> +static void setup_async_io(void) {
> +    int cores, nodes;
> +
> +    nodes = ovs_numa_get_n_numas();
> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> +        nodes = 1;
> +    }
> +    cores = ovs_numa_get_n_cores();
> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> +        pool_size = 4;
> +    } else {
> +        pool_size = cores / nodes;
> +    }
> +    fatal_signal_add_hook(async_io_hook, NULL, NULL, true);
> +    async_io_setup = true;
> +}
> +
> +struct async_io_pool *add_pool(void *(*start)(void *)){
> +
> +    struct async_io_pool *new_pool = NULL;
> +    struct async_io_control *io_control;
> +    int i;
> +
> +    ovs_mutex_lock(&init_mutex);
> +
> +    if (!async_io_setup) {
> +         setup_async_io();
> +    }
> +
> +    new_pool = xmalloc(sizeof(struct async_io_pool));
> +    new_pool->size = pool_size; /* we may make this more dynamic later */
> +
> +    ovs_list_push_back(&io_pools, &new_pool->list_node);
> +
> +    new_pool->controls =
> +        xmalloc(sizeof(struct async_io_control) * new_pool->size);
> +    for (i = 0; i < new_pool->size; i++) {
> +        io_control = &new_pool->controls[i];
> +        latch_init(&io_control->async_latch);
> +        ovs_mutex_init(&io_control->mutex);
> +        ovs_list_init(&io_control->work_items);
> +    }
> +    for (i = 0; i < pool_size; i++) {
> +        ovs_thread_create("async io helper", start, &new_pool->controls[i]);
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return new_pool;
> +}
> +
> +void
> +async_init_data(struct async_data *data, struct stream *stream)
> +{
> +    struct async_io_control *target_control;
> +    unsigned int buffer_size;
> +
> +    data->stream = stream;
> +#ifdef __linux__
> +    buffer_size = getpagesize();
> +    if (!is_pow2(buffer_size)) {
> +        buffer_size = ASYNC_BUFFER_SIZE;
> +    }
> +#else
> +    buffer_size = ASYNC_BUFFER_SIZE;
> +#endif
> +#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600)
> +    /* try to allocate a buffer_size as aligned, that by default is one page
> +     * if that fails, fall back to normal memory allocation.
> +     */
> +    if (posix_memalign(
> +            (void **) &data->input_buffer, buffer_size, buffer_size)) {
> +        data->input_buffer = xmalloc(buffer_size);
> +    }
> +#else
> +    data->input_buffer = xmalloc(buffer_size);
> +#endif
> +    byteq_init(&data->input, data->input_buffer, buffer_size);
> +    ovs_list_init(&data->output);
> +    data->output_count = 0;
> +    data->rx_error = ATOMIC_VAR_INIT(-EAGAIN);
> +    data->tx_error = ATOMIC_VAR_INIT(0);
> +    data->active = ATOMIC_VAR_INIT(false);
> +    data->backlog = ATOMIC_VAR_INIT(0);
> +    ovs_mutex_init(&data->mutex);
> +    data->async_mode = allow_async_io;
> +    data->valid = true;
> +    if (data->async_mode) {
> +        if (!io_pool) {
> +            io_pool = add_pool(default_async_io_helper);
> +        }

There is a theoretical race condition here since the read of io_pool is 
not mutex-protected. Multiple threads could potentially call add_pool() 
here. The result would be a memory leak of an async_io_pool and a thread 
leak of pool_size threads.

> +        data->async_id = random_uint32();
> +        target_control = &io_pool->controls[data->async_id % io_pool->size];
> +        /* these are just fd pairs, no need to play with pointers, we
> +         * can pass them around
> +         */
> +        data->tx_run_notify = target_control->async_latch;
> +        latch_init(&data->rx_notify);
> +        ovs_mutex_lock(&target_control->mutex);
> +        ovs_list_push_back(&target_control->work_items, &data->list_node);
> +        ovs_mutex_unlock(&target_control->mutex);
> +        latch_set(&target_control->async_latch);
> +    }
> +}
> +
> +void
> +async_stream_enable(struct async_data *data)
> +{
> +    data->async_mode = allow_async_io;
> +}
> +
> +void
> +async_stream_disable(struct async_data *data)
> +{
> +    struct async_io_control *target_control;
> +    bool needs_wake = false;
> +
> +
> +    if (data->async_mode) {
> +        if (not_in_error(data) && (async_get_backlog(data) > 0)) {
> +            needs_wake = true;
> +            latch_poll(&data->rx_notify);
> +            latch_wait(&data->rx_notify);
> +            latch_set(&data->tx_run_notify);
> +            /* limit this to 50ms - should be enough for
> +             * a single flush and we will not get stuck here
> +             * waiting for a send to complete
> +             */
> +            poll_timer_wait(50);
> +            poll_block();
> +        }
> +        if (needs_wake) {
> +            /* we have lost all poll-wait info because we block()-ed
> +             * locally, we need to force the upper layers to rerun so
> +             * that they reinstate the correct waits
> +             */
> +            poll_immediate_wake();
> +        }
> +        target_control = &io_pool->controls[data->async_id % io_pool->size];
> +        ovs_mutex_lock(&target_control->mutex);
> +        ovs_list_remove(&data->list_node);
> +        ovs_mutex_unlock(&target_control->mutex);
> +        data->async_mode = false;
> +        latch_destroy(&data->rx_notify);
> +    }
> +    if (data->input_buffer) {
> +        free(data->input_buffer);
> +        data->input_buffer = NULL;
> +    }
> +}
> +
> +void
> +async_cleanup_data(struct async_data *data)
> +{
> +    if (async_get_backlog(data)) {
> +        ofpbuf_list_delete(&data->output);
> +    }
> +    atomic_store_relaxed(&data->backlog, 0);
> +    data->output_count = 0;
> +}
> +
> +/* Routines intended for async IO */
> +
> +long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) {
> +    long retval = -EAGAIN;
> +    long discard;
> +
> +    ovs_mutex_lock(&data->mutex);
> +    if (buf) {
> +        ovs_list_push_back(&data->output, &buf->list_node);
> +        data->output_count ++;
> +        atomic_add_relaxed(&data->backlog, buf->size, &discard);
> +        atomic_thread_fence(memory_order_release);
> +    }
> +    atomic_read_relaxed(&data->backlog, &retval);
> +    ovs_mutex_unlock(&data->mutex);
> +    return retval;
> +}
> +
> +static int do_stream_flush(struct async_data *data) {
> +    struct ofpbuf *buf;
> +    int count = 0;
> +    bool stamp = false;
> +    int retval = -stream_connect(data->stream);
> +    long discard;
> +
> +    if (!retval) {
> +        while (!ovs_list_is_empty(&data->output) && count < 10) {
> +            buf = ofpbuf_from_list(data->output.next);
> +            if (data->stream->class->enqueue) {
> +                ovs_list_remove(&buf->list_node);
> +                retval = (data->stream->class->enqueue)(data->stream, buf);
> +                if (retval > 0) {
> +                    data->output_count--;
> +                } else {
> +                    ovs_list_push_front(&data->output, &buf->list_node);
> +                }
> +            } else {
> +                retval = stream_send(data->stream, buf->data, buf->size);
> +                if (retval > 0) {
> +                    stamp = true;
> +                    atomic_sub_relaxed(&data->backlog, retval, &discard);
> +                    ofpbuf_pull(buf, retval);
> +                    if (!buf->size) {
> +                        /* stream now owns buf */
> +                        ovs_list_remove(&buf->list_node);
> +                        data->output_count--;
> +                        ofpbuf_delete(buf);
> +                    }
> +                }
> +            }
> +            if (retval <= 0) {
> +                break;
> +            }
> +            count++;
> +        }
> +        if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) {
> +            (data->stream->class->flush)(data->stream, &retval);
> +            if (retval > 0) {
> +                stamp = true;
> +                atomic_sub_relaxed(&data->backlog, retval, &discard);
> +            }
> +        }
> +        if (stamp) {
> +            atomic_store_relaxed(&data->active, true);
> +        }
> +    }
> +    atomic_store_relaxed(&data->tx_error, retval);
> +    return retval;
> +}
> +
> +int async_stream_flush(struct async_data *data) {
> +    int retval;
> +
> +    if (data->async_mode) {
> +        atomic_read_relaxed(&data->tx_error, &retval);
> +        if (retval >= 0) {
> +            retval = -EAGAIN; /* fake a busy so that upper layers do not
> +                               * retry, we will flush the backlog in the
> +                               * background
> +                               */
> +        }
> +        if (async_get_backlog(data)) {
> +            latch_set(&data->tx_run_notify);
> +        }
> +    } else {
> +        retval = do_stream_flush(data);
> +    }
> +    return retval;
> +}
> +
> +static int do_async_recv(struct async_data *data) {
> +    size_t chunk;
> +    int retval;
> +
> +    atomic_read_relaxed(&data->rx_error, &retval);
> +    if (retval > 0 || retval == -EAGAIN) {
> +        chunk = byteq_headroom(&data->input);
> +        if (chunk > 0) {
> +            retval = stream_recv(
> +                    data->stream, byteq_head(&data->input), chunk);
> +            if (retval > 0) {
> +                byteq_advance_head(&data->input, retval);
> +            }
> +        }
> +    }
> +    if (retval > 0 || retval == -EAGAIN) {
> +        retval = byteq_used(&data->input);
> +        if (retval == 0) {
> +            retval = -EAGAIN;
> +        }
> +    }
> +    atomic_store_relaxed(&data->rx_error, retval);
> +    return retval;
> +}
> +
> +
> +int async_stream_recv(struct async_data *data) {
> +    int retval = -EAGAIN;
> +
> +    if (data->async_mode) {
> +        atomic_read_relaxed(&data->rx_error, &retval);
> +        /* clear RX notifications */
> +        latch_poll(&data->rx_notify);
> +        /* fake a retval from byteq usage */
> +        if (retval > 0 || retval == -EAGAIN) {
> +            retval = byteq_used(&data->input);
> +            if (retval == 0) {
> +                retval = -EAGAIN;
> +            }
> +        }
> +    } else {
> +        retval = do_async_recv(data);
> +    }
> +    return retval;
> +}
> +
> +void async_stream_run(struct async_data *data) {
> +    if (!data->async_mode) {
> +        stream_run(data->stream);
> +    } else {
> +        latch_set(&data->tx_run_notify);
> +    }
> + }
> +
> +void async_io_kick(struct async_data *data) {
> +    if (data->async_mode) {
> +        latch_set(&data->tx_run_notify);
> +    }
> +}
> +
> +void async_recv_wait(struct async_data *data) {
> +    if (data->async_mode) {
> +        latch_poll(&data->rx_notify);
> +        latch_wait(&data->rx_notify);
> +    } else {
> +        stream_recv_wait(data->stream);
> +    }
> +}
> +
> +void async_io_enable(void) {
> +    allow_async_io = true;
> +}
> +
> +/* Accessors for JSON RPC */
> +
> +struct byteq *async_get_input(struct async_data *data) {
> +    return &data->input;
> +}
> +struct stream *async_get_stream(struct async_data *data) {
> +    return data->stream;
> +}
> +
> +bool async_output_is_empty(struct async_data *data) {
> +    bool retval;
> +    ovs_mutex_lock(&data->mutex);
> +    /* backlog tracks backlog across the full stack all the
> +     * way to the actual send. It is the source of truth
> +     * if we have output or not so anybody asking if we
> +     * have output should be told if we have backlog
> +     * instead.
> +     */
> +    retval = (data->backlog == 0);
> +    ovs_mutex_unlock(&data->mutex);
> +    return retval;
> +}
> +
> +long async_get_backlog(struct async_data *data) {
> +    long retval;
> +    /* This is used only by the unixctl connection
> +     * so not worth it to convert backlog to atomics
> +     */
> +    atomic_read_relaxed(&data->backlog, &retval);
> +    return retval;
> +}
> +
> +bool async_get_active(struct async_data *data) {
> +    bool test = true;
> +    return atomic_compare_exchange_weak(&data->active, &test, false);
> +}
> +
> +
> diff --git a/lib/async-io.h b/lib/async-io.h
> new file mode 100644
> index 000000000..dea070ee6
> --- /dev/null
> +++ b/lib/async-io.h
> @@ -0,0 +1,86 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef ASYNC_IO_H
> +#define ASYNC_IO_H 1
> +
> +#include <stdbool.h>
> +#include <stddef.h>
> +#include <stdint.h>
> +#include <sys/types.h>
> +#include "openvswitch/types.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "socket-util.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "latch.h"
> +#include "byteq.h"
> +#include "util.h"
> +
> +#define ASYNC_BUFFER_SIZE (4096)
> +
> +struct stream;
> +
> +struct async_data {
> +    struct stream *stream;
> +    struct ovs_list output;
> +    struct ovs_list list_node;
> +    long backlog;
> +    size_t output_count;
> +    atomic_bool active;
> +    atomic_int rx_error, tx_error;
> +    uint32_t async_id;
> +    struct latch rx_notify, tx_run_notify;
> +    struct ovs_mutex mutex;
> +    bool async_mode, valid;
> +    struct byteq input;
> +    uint8_t *input_buffer;
> +};
> +
> +struct async_io_control {
> +    struct latch async_latch;
> +    struct ovs_list work_items;
> +    struct ovs_mutex mutex;
> +};
> +
> +struct async_io_pool {
> +    struct ovs_list list_node;
> +    struct async_io_control *controls;
> +    int size;
> +};
> +
> +struct async_io_pool *add_pool(void *(*start)(void *));
> +
> +long async_stream_enqueue(struct async_data *, struct ofpbuf *buf);
> +int async_stream_flush(struct async_data *);
> +int async_stream_recv(struct async_data *);
> +struct byteq *async_get_input(struct async_data *);
> +struct stream *async_get_stream(struct async_data *);
> +bool async_output_is_empty(struct async_data *);
> +long async_get_backlog(struct async_data *);
> +bool async_get_active(struct async_data *);
> +
> +void async_stream_enable(struct async_data *);
> +void async_stream_disable(struct async_data *);
> +
> +void async_init_data(struct async_data *, struct stream *);
> +void async_cleanup_data(struct async_data *);
> +void async_stream_run(struct async_data *data);
> +void async_io_kick(struct async_data *data);
> +void async_recv_wait(struct async_data *data);
> +void async_io_enable(void);
> +
> +#endif /* async-io.h */
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 86940ccd2..6f7870f26 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \
>   	lib/aes128.c \
>   	lib/aes128.h \
>   	lib/async-append.h \
> +	lib/async-io.h \
> +	lib/async-io.c \
>   	lib/backtrace.c \
>   	lib/backtrace.h \
>   	lib/bfd.c \
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index ed748dbde..f831bc2dd 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -30,28 +30,23 @@
>   #include "openvswitch/poll-loop.h"
>   #include "reconnect.h"
>   #include "stream.h"
> +#include "stream-provider.h"
>   #include "svec.h"
>   #include "timeval.h"
> +#include "async-io.h"
>   #include "openvswitch/vlog.h"
>   
>   VLOG_DEFINE_THIS_MODULE(jsonrpc);
>   

>   struct jsonrpc {
> -    struct stream *stream;
>       char *name;
>       int status;
> -
> -    /* Input. */
> -    struct byteq input;
> -    uint8_t input_buffer[4096];
>       struct json_parser *parser;
> -
> -    /* Output. */
> -    struct ovs_list output;     /* Contains "struct ofpbuf"s. */
> -    size_t output_count;        /* Number of elements in "output". */
> -    size_t backlog;
> +    struct async_data data;
>   };
>   
> +#define MIN_IDLE_TIME 10
> +
>   /* Rate limit for error messages. */
>   static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>   
> @@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
>   static void jsonrpc_cleanup(struct jsonrpc *);
>   static void jsonrpc_error(struct jsonrpc *, int error);
>   
> +static inline struct async_data *adata(struct jsonrpc *rpc) {
> +    return &rpc->data;
> +}
> +
> +
>   /* This is just the same as stream_open() except that it uses the default
>    * JSONRPC port if none is specified. */
>   int
> @@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream)
>   
>       rpc = xzalloc(sizeof *rpc);
>       rpc->name = xstrdup(stream_get_name(stream));
> -    rpc->stream = stream;
> -    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
> -    ovs_list_init(&rpc->output);
> -
> +    async_init_data(adata(rpc), stream);
> +    async_stream_enable(adata(rpc));
>       return rpc;
>   }
>   
> @@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc)
>   void
>   jsonrpc_run(struct jsonrpc *rpc)
>   {
> +    int retval;
>       if (rpc->status) {
>           return;
>       }
>   
> -    stream_run(rpc->stream);
> -    while (!ovs_list_is_empty(&rpc->output)) {
> -        struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
> -        int retval;
> -
> -        retval = stream_send(rpc->stream, buf->data, buf->size);
> -        if (retval >= 0) {
> -            rpc->backlog -= retval;
> -            ofpbuf_pull(buf, retval);
> -            if (!buf->size) {
> -                ovs_list_remove(&buf->list_node);
> -                rpc->output_count--;
> -                ofpbuf_delete(buf);
> -            }
> -        } else {
> +    async_stream_run(adata(rpc));
> +    do {
> +        retval = async_stream_flush(&rpc->data);
> +        if (retval < 0) {
>               if (retval != -EAGAIN) {
>                   VLOG_WARN_RL(&rl, "%s: send error: %s",
>                                rpc->name, ovs_strerror(-retval));
>                   jsonrpc_error(rpc, -retval);
>               }
> -            break;
>           }
> -    }
> +    } while (retval > 0);
>   }
>   
>   /* Arranges for the poll loop to wake up when 'rpc' needs to perform
> @@ -144,9 +131,13 @@ void
>   jsonrpc_wait(struct jsonrpc *rpc)
>   {
>       if (!rpc->status) {
> -        stream_run_wait(rpc->stream);
> -        if (!ovs_list_is_empty(&rpc->output)) {
> -            stream_send_wait(rpc->stream);
> +        if (adata(rpc)->async_mode) {
> +            async_recv_wait(adata(rpc));
> +        } else {
> +            stream_run_wait(rpc->data.stream);
> +            if (!async_output_is_empty(adata(rpc))) {
> +                stream_send_wait(async_get_stream(adata(rpc)));
> +            }
>           }
>       }
>   }
> @@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc)
>   size_t
>   jsonrpc_get_backlog(const struct jsonrpc *rpc)
>   {
> -    return rpc->status ? 0 : rpc->backlog;
> +    return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc));
>   }
>   
>   /* Returns the number of bytes that have been received on 'rpc''s underlying
> @@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
>   unsigned int
>   jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
>   {
> -    return rpc->input.head;
> +    return async_get_input(adata((struct jsonrpc *) rpc))->head;
>   }
>   
>   /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
> @@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
>    * buffered in 'rpc'.)
>    *
>    * Always takes ownership of 'msg', regardless of success. */
> +
>   int
>   jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>   {
>       struct ofpbuf *buf;
>       struct json *json;
>       struct ds ds = DS_EMPTY_INITIALIZER;
> -    size_t length;
>   
>       if (rpc->status) {
>           jsonrpc_msg_destroy(msg);
> @@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>   
>       json = jsonrpc_msg_to_json(msg);
>       json_to_ds(json, 0, &ds);
> -    length = ds.length;
>       json_destroy(json);
>   
>       buf = xmalloc(sizeof *buf);
>       ofpbuf_use_ds(buf, &ds);
> -    ovs_list_push_back(&rpc->output, &buf->list_node);
> -    rpc->output_count++;
> -    rpc->backlog += length;
> -
> -    if (rpc->output_count >= 50) {
> -        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
> -                     " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
> -                     rpc->output_count, rpc->backlog);
> -    }
> +    async_stream_enqueue(adata(rpc), buf);
>   
> -    if (rpc->backlog == length) {
> -        jsonrpc_run(rpc);
> -    }
> +    jsonrpc_run(rpc);
>       return rpc->status;
>   }
>   
> @@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>   int
>   jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
>   {
> -    int i;
> +    int i, retval;
>   
>       *msgp = NULL;
>       if (rpc->status) {
> @@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
>           size_t n, used;
>   
>           /* Fill our input buffer if it's empty. */
> -        if (byteq_is_empty(&rpc->input)) {
> -            size_t chunk;
> -            int retval;
> -
> -            chunk = byteq_headroom(&rpc->input);
> -            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
> -            if (retval < 0) {
> -                if (retval == -EAGAIN) {
> -                    return EAGAIN;
> -                } else {
> -                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
> -                                 rpc->name, ovs_strerror(-retval));
> -                    jsonrpc_error(rpc, -retval);
> -                    return rpc->status;
> -                }
> -            } else if (retval == 0) {
> -                jsonrpc_error(rpc, EOF);
> -                return EOF;
> +        retval = async_stream_recv(adata(rpc));
> +        if (retval < 0) {
> +            if (retval == -EAGAIN) {
> +                return EAGAIN;
> +            } else {
> +                VLOG_WARN_RL(&rl, "%s: receive error: %s",
> +                             rpc->name, ovs_strerror(-retval));
> +                jsonrpc_error(rpc, -retval);
> +                return rpc->status;
>               }
> -            byteq_advance_head(&rpc->input, retval);
> +        } else if (retval == 0) {
> +            jsonrpc_error(rpc, EOF);
> +            return EOF;
>           }
>   
>           /* We have some input.  Feed it into the JSON parser. */
>           if (!rpc->parser) {
>               rpc->parser = json_parser_create(0);
>           }
> -        n = byteq_tailroom(&rpc->input);
> +        n = byteq_tailroom(async_get_input(adata(rpc)));
> +        if (n == 0) {
> +            break;
> +        }
>           used = json_parser_feed(rpc->parser,
> -                                (char *) byteq_tail(&rpc->input), n);
> -        byteq_advance_tail(&rpc->input, used);
> +                        (char *) byteq_tail(async_get_input(adata(rpc))), n);
> +        byteq_advance_tail(async_get_input(adata(rpc)), used);
>   
>           /* If we have complete JSON, attempt to parse it as JSON-RPC. */
>           if (json_parser_is_done(rpc->parser)) {
> @@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
>               }
>   
>               if (rpc->status) {
> -                const struct byteq *q = &rpc->input;
> +                const struct byteq *q = async_get_input(adata(rpc));
>                   if (q->head <= q->size) {
>                       stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
>                                             &this_module, rpc->name);
> @@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
>   void
>   jsonrpc_recv_wait(struct jsonrpc *rpc)
>   {
> -    if (rpc->status || !byteq_is_empty(&rpc->input)) {
> +    if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) {
>           poll_immediate_wake_at(rpc->name);
>       } else {
> -        stream_recv_wait(rpc->stream);
> +        async_recv_wait(adata(rpc));
>       }
>   }
>   
> @@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>   
>       for (;;) {
>           jsonrpc_run(rpc);
> -        if (ovs_list_is_empty(&rpc->output) || rpc->status) {
> +        if (async_output_is_empty(adata(rpc)) || rpc->status) {
>               return rpc->status;
>           }
>           jsonrpc_wait(rpc);
> @@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error)
>   static void
>   jsonrpc_cleanup(struct jsonrpc *rpc)
>   {
> -    stream_close(rpc->stream);
> -    rpc->stream = NULL;
> +    async_stream_disable(adata(rpc));
> +    stream_close(rpc->data.stream);
> +    rpc->data.stream = NULL;
>   
>       json_parser_abort(rpc->parser);
>       rpc->parser = NULL;
>   
> -    ofpbuf_list_delete(&rpc->output);
> -    rpc->backlog = 0;
> -    rpc->output_count = 0;
> +    async_cleanup_data(adata(rpc));
>   }
>   

>   static struct jsonrpc_msg *
> @@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>       }
>   
>       if (s->rpc) {
> -        size_t backlog;
>           int error;
> +        bool active = async_get_active(adata(s->rpc));
>   
> -        backlog = jsonrpc_get_backlog(s->rpc);
>           jsonrpc_run(s->rpc);
> -        if (jsonrpc_get_backlog(s->rpc) < backlog) {
> +
> +        active |= async_get_active(adata(s->rpc));
> +
> +        if (active) {
>               /* Data previously caught in a queue was successfully sent (or
>                * there's an error, which we'll catch below.)
>                *
> @@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s)
>   const char *
>   jsonrpc_session_get_id(const struct jsonrpc_session *s)
>   {
> -    if (s->rpc && s->rpc->stream) {
> -        return stream_get_peer_id(s->rpc->stream);
> +    if (s->rpc && async_get_stream(adata(s->rpc))) {
> +        return stream_get_peer_id(adata(s->rpc)->stream);
>       } else {
>           return NULL;
>       }
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 46ee7ae27..747d543cf 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -30,6 +30,7 @@
>   #include "stream-provider.h"
>   #include "stream.h"
>   #include "openvswitch/vlog.h"
> +#include "openvswitch/list.h"
>   
>   VLOG_DEFINE_THIS_MODULE(stream_fd);
>   
> @@ -40,6 +41,8 @@ struct stream_fd
>       struct stream stream;
>       int fd;
>       int fd_type;
> +    struct ovs_list output;
> +    int queue_depth;
>   };
>   
>   static const struct stream_class stream_fd_class;
> @@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
>       stream_init(&s->stream, &stream_fd_class, connect_status, name);
>       s->fd = fd;
>       s->fd_type = fd_type;
> +    s->queue_depth = 0;
> +    ovs_list_init(&s->output);
>       *streamp = &s->stream;
>       return 0;
>   }
> @@ -83,6 +88,7 @@ fd_close(struct stream *stream)
>   {
>       struct stream_fd *s = stream_fd_cast(stream);
>       closesocket(s->fd);
> +    ofpbuf_list_delete(&s->output);
>       free(s);
>   }
>   
> @@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
>           if (error == WSAEWOULDBLOCK) {
>              error = EAGAIN;
>           }
> +#endif
> +#ifdef __linux__
> +        if (error == ENOBUFS) {
> +           error = EAGAIN;
> +        }
>   #endif
>           if (error != EAGAIN) {
>               VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
> @@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
>       }
>   }
>   
> +static int
> +fd_enqueue(struct stream *stream, struct ofpbuf *buf)
> +{
> +    struct stream_fd *sfd = stream_fd_cast(stream);
> +    ovs_list_push_back(&sfd->output, &buf->list_node);
> +    sfd->queue_depth ++;
> +    return buf->size;
> +}
> +
> +static bool
> +fd_flush(struct stream *stream, int *retval)
> +{
> +    struct stream_fd *sfd = stream_fd_cast(stream);
> +    int old_q_depth;
> +
> +    if (sfd->queue_depth == 0) {
> +        * retval = -EAGAIN;
> +        return true;
> +    } else {
> +        int sent, i = 0;
> +        struct msghdr msg;
> +        struct ofpbuf *buf;
> +
> +        msg.msg_name = NULL;
> +        msg.msg_namelen = 0;
> +        msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth);
> +        msg.msg_iovlen = sfd->queue_depth;
> +        msg.msg_control = NULL;
> +        msg.msg_controllen = 0;
> +        msg.msg_flags = 0;
> +
> +        LIST_FOR_EACH (buf, list_node, &sfd->output) {
> +            msg.msg_iov[i].iov_base = buf->data;
> +            msg.msg_iov[i].iov_len = buf->size;
> +            i++;
> +        }
> +
> +        sent = sendmsg(sfd->fd, &msg, 0);
> +
> +        free(msg.msg_iov);
> +
> +        if (sent > 0) {
> +            * retval = sent;
> +            old_q_depth = sfd->queue_depth;
> +            for (i = 0; i < old_q_depth ; i++) {
> +                buf = ofpbuf_from_list(sfd->output.next);
> +                if (buf->size > sent) {
> +                    ofpbuf_pull(buf, sent);
> +                    sent = 0;
> +                } else {
> +                    sent -= buf->size;
> +                    sfd->queue_depth --;
> +                    ovs_list_remove(&buf->list_node);
> +                    ofpbuf_delete(buf);
> +                }
> +                if (sent == 0) {
> +                    break;
> +                }
> +            }
> +            return true;
> +        } else {
> +            *retval = -sock_errno();
> +            return false;
> +        }
> +    }
> +}
> +
> +
> +
>   static const struct stream_class stream_fd_class = {
>       "fd",                       /* name */
>       false,                      /* needs_probes */
> @@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       fd_wait,                    /* wait */
> +    fd_enqueue,                 /* enqueue */
> +    fd_flush,                   /* flush */
>   };
>   

>   /* Passive file descriptor stream. */
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..b5161bd04 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -18,9 +18,13 @@
>   #define STREAM_PROVIDER_H 1
>   
>   #include <sys/types.h>
> +#include <poll.h>
> +#include "openvswitch/list.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "openvswitch/thread.h"
>   #include "stream.h"
> -
> -/* Active stream connection. */
> +#include "byteq.h"
> +#include "latch.h"
>   
>   /* Active stream connection.
>    *
> @@ -124,6 +128,31 @@ struct stream_class {
>       /* Arranges for the poll loop to wake up when 'stream' is ready to take an
>        * action of the given 'type'. */
>       void (*wait)(struct stream *stream, enum stream_wait_type type);
> +    /* Enqueues an ofpbuf and surrenders its ownership to the
> +     * stream
> +     *
> +     *     - If successful - stream now owns the buffer, returns
> +     *     backlog size
> +     *
> +     *     - On error, negative value, buffer is not claimed by
> +     *     the stream.
> +     *
> +     * The enqueue function must not block.  If no bytes can be immediately
> +     * accepted for transmission, it should return -EAGAIN immediately. */
> +    int (*enqueue)(struct stream *stream, struct ofpbuf *buf);
> +    /* Flushes any stream buffers
> +     *
> +     *     - If successful returns true and retval contains the backlog size
> +     *
> +     *     - If partially successful (EAGAIN), returns false and retval is
> +     *       a positive backlog size
> +     *
> +     *     - If unsuccessful, returns false and retval contains a negative
> +     *       error value
> +     *
> +     * The flush function must not block.  If buffers cannot be flushed
> +     * completely it should return "partial success" immediately. */
> +    bool (*flush)(struct stream *stream, int *retval);
>   };
>   

>   /* Passive listener for incoming stream connections.
> @@ -184,6 +213,7 @@ struct pstream_class {
>       /* Arranges for the poll loop to wake up when a connection is ready to be
>        * accepted on 'pstream'. */
>       void (*wait)(struct pstream *pstream);
> +
>   };
>   
>   /* Active and passive stream classes. */
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 078fcbc3a..0046e383e 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -85,6 +85,8 @@ struct ssl_stream
>       SSL *ssl;
>       struct ofpbuf *txbuf;
>       unsigned int session_nr;
> +    int last_enqueued;
> +    long backlog_to_report;
>   
>       /* rx_want and tx_want record the result of the last call to SSL_read()
>        * and SSL_write(), respectively:
> @@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
>       sslv->rx_want = sslv->tx_want = SSL_NOTHING;
>       sslv->session_nr = next_session_nr++;
>       sslv->n_head = 0;
> +    sslv->last_enqueued = 0;
> +    sslv->backlog_to_report = 0;
>   
>       if (VLOG_IS_DBG_ENABLED()) {
>           SSL_set_msg_callback(ssl, ssl_protocol_cb);
> @@ -784,8 +788,59 @@ ssl_run(struct stream *stream)
>   {
>       struct ssl_stream *sslv = ssl_stream_cast(stream);
>   
> -    if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
> -        ssl_clear_txbuf(sslv);
> +    if (sslv->txbuf) {
> +        if (ssl_do_tx(stream) != EAGAIN) {
> +            sslv->backlog_to_report += sslv->last_enqueued;
> +            ssl_clear_txbuf(sslv);
> +        }
> +    }
> +}
> +
> +static int
> +ssl_enqueue(struct stream *stream, struct ofpbuf *buf)
> +{
> +    int n = buf->size;
> +    struct ssl_stream *sslv = ssl_stream_cast(stream);
> +    if (sslv->txbuf) {
> +        return -EAGAIN;
> +    }
> +    sslv->txbuf = buf;
> +    sslv->last_enqueued = n;
> +    return n;
> +}
> +
> +static bool
> +ssl_flush(struct stream *stream, int *retval)
> +{
> +    struct ssl_stream *sslv = ssl_stream_cast(stream);
> +
> +    if (!sslv->txbuf) {
> +        if (sslv->backlog_to_report) {
> +            * retval = sslv->backlog_to_report;
> +            sslv->backlog_to_report = 0;
> +        } else {
> +            * retval = -EAGAIN;
> +        }
> +        return true;
> +    } else {
> +        int error;
> +
> +        error = ssl_do_tx(stream);
> +        switch (error) {
> +        case 0:
> +            ssl_clear_txbuf(sslv);
> +            * retval = sslv->backlog_to_report + sslv->last_enqueued;
> +            sslv->backlog_to_report = 0;
> +            sslv->last_enqueued = 0;
> +            return true;
> +        case EAGAIN:
> +            * retval = 0;
> +            return false;
> +        default:
> +            ssl_clear_txbuf(sslv);
> +            * retval = -error;
> +            return false;
> +        }
>       }
>   }
>   
> @@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
>               /* We have room in our tx queue. */
>               poll_immediate_wake();
>           } else {
> -            /* stream_run_wait() will do the right thing; don't bother with
> -             * redundancy. */
> +            poll_fd_wait(sslv->fd, POLLOUT);
>           }
>           break;
>   
> @@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = {
>       ssl_run,                    /* run */
>       ssl_run_wait,               /* run_wait */
>       ssl_wait,                   /* wait */
> +    ssl_enqueue,                /* send_buf */

The comment should surely be "/* enqueue */", correct?

> +    ssl_flush,
>   };
>   

>   /* Passive SSL. */
> diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
> index e8dc2bfaa..63632e989 100644
> --- a/lib/stream-tcp.c
> +++ b/lib/stream-tcp.c
> @@ -73,6 +73,8 @@ const struct stream_class tcp_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       NULL,                       /* wait */
> +    NULL,                       /* enqueue */
> +    NULL,                       /* flush */
>   };
>   

>   /* Passive TCP. */
> diff --git a/lib/stream-unix.c b/lib/stream-unix.c
> index d265efb83..4fd7573f2 100644
> --- a/lib/stream-unix.c
> +++ b/lib/stream-unix.c
> @@ -73,6 +73,8 @@ const struct stream_class unix_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       NULL,                       /* wait */
> +    NULL,                       /* enqueue */
> +    NULL,                       /* flush */
>   };
>   

>   /* Passive UNIX socket. */
> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
> index 5c4c55e5d..cb8c2d3c3 100644
> --- a/lib/stream-windows.c
> +++ b/lib/stream-windows.c
> @@ -374,6 +374,8 @@ const struct stream_class windows_stream_class = {
>       NULL,                       /* run */
>       NULL,                       /* run_wait */
>       windows_wait,               /* wait */
> +    NULL,                       /* enqueue */
> +    NULL,                       /* flush */
>   };
>   
>   struct pwindows_pstream
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index d416f1b60..83450beaa 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -59,6 +59,7 @@
>   #include "perf-counter.h"
>   #include "ovsdb-util.h"
>   #include "openvswitch/vlog.h"
> +#include "async-io.h"
>   
>   VLOG_DEFINE_THIS_MODULE(ovsdb_server);
>   
> @@ -398,6 +399,7 @@ main(int argc, char *argv[])
>       }
>   
>       daemonize_complete();
> +    async_io_enable();
>   
>       if (!run_command) {
>           /* ovsdb-server is usually a long-running process, in which case it
> 



More information about the dev mailing list