[ovs-dev] [PATCH 2/2] Introduce async IO in JSONRPC
Mark Michelson
mmichels at redhat.com
Fri May 22 15:19:54 UTC 2020
Hi Anton,
One general note about this submission is that I see some coding
guidelines violations. One is when it comes to definitions of functions.
The guidelines say to write these as
return_type
func_name(params)
{
}
But some of your functions are written as:
return_type func_name(params) {
}
Notice the return_type and opening parenthesis on the same line as the
func name.
Another I see is with regards to spacing around prefix and postfix
operators. For instance, I see pointer dereferencing being done as:
* pointer
instead of
*pointer
In general, I'd say to have a look at the guidelines. Unfortunately,
checkpatch.py doesn't catch these issues.
I have another observation below inline:
On 5/12/20 3:41 AM, anton.ivanov at cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> 1. Pull out buffering and send/receive ops from json rpc
>
> 2. Make the SSL send zero copy (it was creating an ofpbuf
> out of an existing ofpbuf data without necessity).
>
> 3. Add vector IO to stream-fd to make flushing more
> efficient. Also makes queueing for stream-fd and stream-ssl
> roughly identical.
>
> 4. Unify backlog management
>
> 5. Make use of the full capacity of the incoming buffer and
> not only when it is empty.
>
> 6. Allow for IO to be run in worker threads
>
> 7. Various minor fixes to enable async io - make rx errors
> visible to tx and vice versa. Make activity tracking for
> reconnect async friendly, etc.
>
> 8. Enable Async IO in ovsdb
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---
> lib/async-io.c | 530 ++++++++++++++++++++++++++++++++++++++++++
> lib/async-io.h | 86 +++++++
> lib/automake.mk | 2 +
> lib/jsonrpc.c | 151 +++++-------
> lib/stream-fd.c | 82 +++++++
> lib/stream-provider.h | 34 ++-
> lib/stream-ssl.c | 64 ++++-
> lib/stream-tcp.c | 2 +
> lib/stream-unix.c | 2 +
> lib/stream-windows.c | 2 +
> ovsdb/ovsdb-server.c | 2 +
> 11 files changed, 864 insertions(+), 93 deletions(-)
> create mode 100644 lib/async-io.c
> create mode 100644 lib/async-io.h
>
> diff --git a/lib/async-io.c b/lib/async-io.c
> new file mode 100644
> index 000000000..16d568a50
> --- /dev/null
> +++ b/lib/async-io.c
> @@ -0,0 +1,530 @@
> +/*
> + * Copyright (c) 2020 Red Hat Inc
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include "stream-provider.h"
> +#include <errno.h>
> +#include <unistd.h>
> +#include <inttypes.h>
> +#include <sys/types.h>
> +#include <netinet/in.h>
> +#include <poll.h>
> +#include <stdlib.h>
> +#include <string.h>
> +#include "coverage.h"
> +#include "fatal-signal.h"
> +#include "flow.h"
> +#include "jsonrpc.h"
> +#include "openflow/nicira-ext.h"
> +#include "openflow/openflow.h"
> +#include "openvswitch/dynamic-string.h"
> +#include "openvswitch/ofp-print.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "openvswitch/vlog.h"
> +#include "ovs-thread.h"
> +#include "ovs-atomic.h"
> +#include "packets.h"
> +#include "openvswitch/poll-loop.h"
> +#include "random.h"
> +#include "socket-util.h"
> +#include "util.h"
> +#include "timeval.h"
> +#include "async-io.h"
> +#include "ovs-numa.h"
> +
> +VLOG_DEFINE_THIS_MODULE(async_io);
> +
> +static bool allow_async_io = false;
> +
> +static bool async_io_setup = false;
> +static bool kill_async_io = false;
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools);
> +
> +static int pool_size;
> +
> +static struct async_io_pool *io_pool = NULL;
> +
> +static int do_async_recv(struct async_data *data);
> +static int do_stream_flush(struct async_data *data);
> +
> +static inline bool not_in_error(struct async_data *data) {
> + int rx_error, tx_error;
> +
> + if (!data->valid) {
> + return false;
> + }
> +
> + atomic_read_relaxed(&data->rx_error, &rx_error);
> + atomic_read_relaxed(&data->tx_error, &tx_error);
> +
> + return (
> + ((rx_error > 0) || (rx_error == -EAGAIN)) &&
> + ((tx_error >= 0) || (tx_error == -EAGAIN))
> + );
> +}
> +
> +static inline bool in_error(struct async_data *data) {
> + return ! not_in_error(data);
> +}
> +
> +
> +static void *default_async_io_helper(void *arg) {
> + struct async_io_control *io_control =
> + (struct async_io_control *) arg;
> + struct async_data *data;
> + int retval;
> +
> + do {
> + ovs_mutex_lock(&io_control->mutex);
> + latch_poll(&io_control->async_latch);
> + LIST_FOR_EACH (data, list_node, &io_control->work_items) {
> + long backlog, oldbacklog;
> + ovs_mutex_lock(&data->mutex);
> + retval = -EAGAIN;
> + if (not_in_error(data)) {
> + /*
> + * We stop reading if the input queue is full
> + */
> + if (byteq_headroom(&data->input) != 0) {
> + retval = do_async_recv(data);
> + } else {
> + poll_timer_wait(1);
> + retval = 0;
> + }
> + }
> + if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) {
> + stream_recv_wait(data->stream);
> + }
> + atomic_read_relaxed(&data->backlog, &oldbacklog);
> + if (not_in_error(data)) {
> + stream_run(data->stream);
> + do_stream_flush(data);
> + }
> + atomic_read_relaxed(&data->backlog, &backlog);
> + if (not_in_error(data)) {
> + if (backlog) {
> + /* upper layers will refuse to process rx
> + * until the tx is clear, so no point
> + * notifying them
> + */
> + stream_send_wait(data->stream);
> + } else {
> + /* There is no backlog, so the rpc layer will
> + * actually pay attention to our notifications
> + * We issue a notification for both pending
> + * input and what is the equivalent of
> + * "IO Completion"
> + */
> + if (!byteq_is_empty(&data->input) || oldbacklog) {
> + latch_set(&data->rx_notify);
> + }
> + }
> + }
> + if (data->valid && in_error(data)) {
> + /* make sure that the other thread(s) notice any errors.
> + * this should not be an else because errors may have
> + * changed inside the ifs above.
> + */
> + latch_set(&data->rx_notify);
> + data->valid = false;
> + }
> + if (not_in_error(data)) {
> + stream_run_wait(data->stream);
> + }
> + ovs_mutex_unlock(&data->mutex);
> + }
> + ovs_mutex_unlock(&io_control->mutex);
> + latch_wait(&io_control->async_latch);
> + poll_block();
> + } while (!kill_async_io);
> + return arg;
> +}
> +
> +static void async_io_hook(void *aux OVS_UNUSED) {
> + int i;
> + static struct async_io_pool *pool;
> + kill_async_io = true;
> + LIST_FOR_EACH (pool, list_node, &io_pools) {
> + for (i = 0; i < pool->size ; i++) {
> + latch_set(&pool->controls[i].async_latch);
> + latch_destroy(&pool->controls[i].async_latch);
> + }
> + }
> +}
> +
> +static void setup_async_io(void) {
> + int cores, nodes;
> +
> + nodes = ovs_numa_get_n_numas();
> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> + nodes = 1;
> + }
> + cores = ovs_numa_get_n_cores();
> + if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> + pool_size = 4;
> + } else {
> + pool_size = cores / nodes;
> + }
> + fatal_signal_add_hook(async_io_hook, NULL, NULL, true);
> + async_io_setup = true;
> +}
> +
> +struct async_io_pool *add_pool(void *(*start)(void *)){
> +
> + struct async_io_pool *new_pool = NULL;
> + struct async_io_control *io_control;
> + int i;
> +
> + ovs_mutex_lock(&init_mutex);
> +
> + if (!async_io_setup) {
> + setup_async_io();
> + }
> +
> + new_pool = xmalloc(sizeof(struct async_io_pool));
> + new_pool->size = pool_size; /* we may make this more dynamic later */
> +
> + ovs_list_push_back(&io_pools, &new_pool->list_node);
> +
> + new_pool->controls =
> + xmalloc(sizeof(struct async_io_control) * new_pool->size);
> + for (i = 0; i < new_pool->size; i++) {
> + io_control = &new_pool->controls[i];
> + latch_init(&io_control->async_latch);
> + ovs_mutex_init(&io_control->mutex);
> + ovs_list_init(&io_control->work_items);
> + }
> + for (i = 0; i < pool_size; i++) {
> + ovs_thread_create("async io helper", start, &new_pool->controls[i]);
> + }
> + ovs_mutex_unlock(&init_mutex);
> + return new_pool;
> +}
> +
> +void
> +async_init_data(struct async_data *data, struct stream *stream)
> +{
> + struct async_io_control *target_control;
> + unsigned int buffer_size;
> +
> + data->stream = stream;
> +#ifdef __linux__
> + buffer_size = getpagesize();
> + if (!is_pow2(buffer_size)) {
> + buffer_size = ASYNC_BUFFER_SIZE;
> + }
> +#else
> + buffer_size = ASYNC_BUFFER_SIZE;
> +#endif
> +#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600)
> + /* try to allocate a buffer_size as aligned, that by default is one page
> + * if that fails, fall back to normal memory allocation.
> + */
> + if (posix_memalign(
> + (void **) &data->input_buffer, buffer_size, buffer_size)) {
> + data->input_buffer = xmalloc(buffer_size);
> + }
> +#else
> + data->input_buffer = xmalloc(buffer_size);
> +#endif
> + byteq_init(&data->input, data->input_buffer, buffer_size);
> + ovs_list_init(&data->output);
> + data->output_count = 0;
> + data->rx_error = ATOMIC_VAR_INIT(-EAGAIN);
> + data->tx_error = ATOMIC_VAR_INIT(0);
> + data->active = ATOMIC_VAR_INIT(false);
> + data->backlog = ATOMIC_VAR_INIT(0);
> + ovs_mutex_init(&data->mutex);
> + data->async_mode = allow_async_io;
> + data->valid = true;
> + if (data->async_mode) {
> + if (!io_pool) {
> + io_pool = add_pool(default_async_io_helper);
> + }
There is a theoretical race condition here since the read of io_pool is
not mutex-protected. Multiple threads could potentially call add_pool()
here. The result would be a memory leak of an async_io_pool and a thread
leak of pool_size threads.
> + data->async_id = random_uint32();
> + target_control = &io_pool->controls[data->async_id % io_pool->size];
> + /* these are just fd pairs, no need to play with pointers, we
> + * can pass them around
> + */
> + data->tx_run_notify = target_control->async_latch;
> + latch_init(&data->rx_notify);
> + ovs_mutex_lock(&target_control->mutex);
> + ovs_list_push_back(&target_control->work_items, &data->list_node);
> + ovs_mutex_unlock(&target_control->mutex);
> + latch_set(&target_control->async_latch);
> + }
> +}
> +
> +void
> +async_stream_enable(struct async_data *data)
> +{
> + data->async_mode = allow_async_io;
> +}
> +
> +void
> +async_stream_disable(struct async_data *data)
> +{
> + struct async_io_control *target_control;
> + bool needs_wake = false;
> +
> +
> + if (data->async_mode) {
> + if (not_in_error(data) && (async_get_backlog(data) > 0)) {
> + needs_wake = true;
> + latch_poll(&data->rx_notify);
> + latch_wait(&data->rx_notify);
> + latch_set(&data->tx_run_notify);
> + /* limit this to 50ms - should be enough for
> + * a single flush and we will not get stuck here
> + * waiting for a send to complete
> + */
> + poll_timer_wait(50);
> + poll_block();
> + }
> + if (needs_wake) {
> + /* we have lost all poll-wait info because we block()-ed
> + * locally, we need to force the upper layers to rerun so
> + * that they reinstate the correct waits
> + */
> + poll_immediate_wake();
> + }
> + target_control = &io_pool->controls[data->async_id % io_pool->size];
> + ovs_mutex_lock(&target_control->mutex);
> + ovs_list_remove(&data->list_node);
> + ovs_mutex_unlock(&target_control->mutex);
> + data->async_mode = false;
> + latch_destroy(&data->rx_notify);
> + }
> + if (data->input_buffer) {
> + free(data->input_buffer);
> + data->input_buffer = NULL;
> + }
> +}
> +
> +void
> +async_cleanup_data(struct async_data *data)
> +{
> + if (async_get_backlog(data)) {
> + ofpbuf_list_delete(&data->output);
> + }
> + atomic_store_relaxed(&data->backlog, 0);
> + data->output_count = 0;
> +}
> +
> +/* Routines intended for async IO */
> +
> +long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) {
> + long retval = -EAGAIN;
> + long discard;
> +
> + ovs_mutex_lock(&data->mutex);
> + if (buf) {
> + ovs_list_push_back(&data->output, &buf->list_node);
> + data->output_count ++;
> + atomic_add_relaxed(&data->backlog, buf->size, &discard);
> + atomic_thread_fence(memory_order_release);
> + }
> + atomic_read_relaxed(&data->backlog, &retval);
> + ovs_mutex_unlock(&data->mutex);
> + return retval;
> +}
> +
> +static int do_stream_flush(struct async_data *data) {
> + struct ofpbuf *buf;
> + int count = 0;
> + bool stamp = false;
> + int retval = -stream_connect(data->stream);
> + long discard;
> +
> + if (!retval) {
> + while (!ovs_list_is_empty(&data->output) && count < 10) {
> + buf = ofpbuf_from_list(data->output.next);
> + if (data->stream->class->enqueue) {
> + ovs_list_remove(&buf->list_node);
> + retval = (data->stream->class->enqueue)(data->stream, buf);
> + if (retval > 0) {
> + data->output_count--;
> + } else {
> + ovs_list_push_front(&data->output, &buf->list_node);
> + }
> + } else {
> + retval = stream_send(data->stream, buf->data, buf->size);
> + if (retval > 0) {
> + stamp = true;
> + atomic_sub_relaxed(&data->backlog, retval, &discard);
> + ofpbuf_pull(buf, retval);
> + if (!buf->size) {
> + /* stream now owns buf */
> + ovs_list_remove(&buf->list_node);
> + data->output_count--;
> + ofpbuf_delete(buf);
> + }
> + }
> + }
> + if (retval <= 0) {
> + break;
> + }
> + count++;
> + }
> + if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) {
> + (data->stream->class->flush)(data->stream, &retval);
> + if (retval > 0) {
> + stamp = true;
> + atomic_sub_relaxed(&data->backlog, retval, &discard);
> + }
> + }
> + if (stamp) {
> + atomic_store_relaxed(&data->active, true);
> + }
> + }
> + atomic_store_relaxed(&data->tx_error, retval);
> + return retval;
> +}
> +
> +int async_stream_flush(struct async_data *data) {
> + int retval;
> +
> + if (data->async_mode) {
> + atomic_read_relaxed(&data->tx_error, &retval);
> + if (retval >= 0) {
> + retval = -EAGAIN; /* fake a busy so that upper layers do not
> + * retry, we will flush the backlog in the
> + * background
> + */
> + }
> + if (async_get_backlog(data)) {
> + latch_set(&data->tx_run_notify);
> + }
> + } else {
> + retval = do_stream_flush(data);
> + }
> + return retval;
> +}
> +
> +static int do_async_recv(struct async_data *data) {
> + size_t chunk;
> + int retval;
> +
> + atomic_read_relaxed(&data->rx_error, &retval);
> + if (retval > 0 || retval == -EAGAIN) {
> + chunk = byteq_headroom(&data->input);
> + if (chunk > 0) {
> + retval = stream_recv(
> + data->stream, byteq_head(&data->input), chunk);
> + if (retval > 0) {
> + byteq_advance_head(&data->input, retval);
> + }
> + }
> + }
> + if (retval > 0 || retval == -EAGAIN) {
> + retval = byteq_used(&data->input);
> + if (retval == 0) {
> + retval = -EAGAIN;
> + }
> + }
> + atomic_store_relaxed(&data->rx_error, retval);
> + return retval;
> +}
> +
> +
> +int async_stream_recv(struct async_data *data) {
> + int retval = -EAGAIN;
> +
> + if (data->async_mode) {
> + atomic_read_relaxed(&data->rx_error, &retval);
> + /* clear RX notifications */
> + latch_poll(&data->rx_notify);
> + /* fake a retval from byteq usage */
> + if (retval > 0 || retval == -EAGAIN) {
> + retval = byteq_used(&data->input);
> + if (retval == 0) {
> + retval = -EAGAIN;
> + }
> + }
> + } else {
> + retval = do_async_recv(data);
> + }
> + return retval;
> +}
> +
> +void async_stream_run(struct async_data *data) {
> + if (!data->async_mode) {
> + stream_run(data->stream);
> + } else {
> + latch_set(&data->tx_run_notify);
> + }
> + }
> +
> +void async_io_kick(struct async_data *data) {
> + if (data->async_mode) {
> + latch_set(&data->tx_run_notify);
> + }
> +}
> +
> +void async_recv_wait(struct async_data *data) {
> + if (data->async_mode) {
> + latch_poll(&data->rx_notify);
> + latch_wait(&data->rx_notify);
> + } else {
> + stream_recv_wait(data->stream);
> + }
> +}
> +
> +void async_io_enable(void) {
> + allow_async_io = true;
> +}
> +
> +/* Accessors for JSON RPC */
> +
> +struct byteq *async_get_input(struct async_data *data) {
> + return &data->input;
> +}
> +struct stream *async_get_stream(struct async_data *data) {
> + return data->stream;
> +}
> +
> +bool async_output_is_empty(struct async_data *data) {
> + bool retval;
> + ovs_mutex_lock(&data->mutex);
> + /* backlog tracks backlog across the full stack all the
> + * way to the actual send. It is the source of truth
> + * if we have output or not so anybody asking if we
> + * have output should be told if we have backlog
> + * instead.
> + */
> + retval = (data->backlog == 0);
> + ovs_mutex_unlock(&data->mutex);
> + return retval;
> +}
> +
> +long async_get_backlog(struct async_data *data) {
> + long retval;
> + /* This is used only by the unixctl connection
> + * so not worth it to convert backlog to atomics
> + */
> + atomic_read_relaxed(&data->backlog, &retval);
> + return retval;
> +}
> +
> +bool async_get_active(struct async_data *data) {
> + bool test = true;
> + return atomic_compare_exchange_weak(&data->active, &test, false);
> +}
> +
> +
> diff --git a/lib/async-io.h b/lib/async-io.h
> new file mode 100644
> index 000000000..dea070ee6
> --- /dev/null
> +++ b/lib/async-io.h
> @@ -0,0 +1,86 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef ASYNC_IO_H
> +#define ASYNC_IO_H 1
> +
> +#include <stdbool.h>
> +#include <stddef.h>
> +#include <stdint.h>
> +#include <sys/types.h>
> +#include "openvswitch/types.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "socket-util.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "latch.h"
> +#include "byteq.h"
> +#include "util.h"
> +
> +#define ASYNC_BUFFER_SIZE (4096)
> +
> +struct stream;
> +
> +struct async_data {
> + struct stream *stream;
> + struct ovs_list output;
> + struct ovs_list list_node;
> + long backlog;
> + size_t output_count;
> + atomic_bool active;
> + atomic_int rx_error, tx_error;
> + uint32_t async_id;
> + struct latch rx_notify, tx_run_notify;
> + struct ovs_mutex mutex;
> + bool async_mode, valid;
> + struct byteq input;
> + uint8_t *input_buffer;
> +};
> +
> +struct async_io_control {
> + struct latch async_latch;
> + struct ovs_list work_items;
> + struct ovs_mutex mutex;
> +};
> +
> +struct async_io_pool {
> + struct ovs_list list_node;
> + struct async_io_control *controls;
> + int size;
> +};
> +
> +struct async_io_pool *add_pool(void *(*start)(void *));
> +
> +long async_stream_enqueue(struct async_data *, struct ofpbuf *buf);
> +int async_stream_flush(struct async_data *);
> +int async_stream_recv(struct async_data *);
> +struct byteq *async_get_input(struct async_data *);
> +struct stream *async_get_stream(struct async_data *);
> +bool async_output_is_empty(struct async_data *);
> +long async_get_backlog(struct async_data *);
> +bool async_get_active(struct async_data *);
> +
> +void async_stream_enable(struct async_data *);
> +void async_stream_disable(struct async_data *);
> +
> +void async_init_data(struct async_data *, struct stream *);
> +void async_cleanup_data(struct async_data *);
> +void async_stream_run(struct async_data *data);
> +void async_io_kick(struct async_data *data);
> +void async_recv_wait(struct async_data *data);
> +void async_io_enable(void);
> +
> +#endif /* async-io.h */
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 86940ccd2..6f7870f26 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \
> lib/aes128.c \
> lib/aes128.h \
> lib/async-append.h \
> + lib/async-io.h \
> + lib/async-io.c \
> lib/backtrace.c \
> lib/backtrace.h \
> lib/bfd.c \
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index ed748dbde..f831bc2dd 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -30,28 +30,23 @@
> #include "openvswitch/poll-loop.h"
> #include "reconnect.h"
> #include "stream.h"
> +#include "stream-provider.h"
> #include "svec.h"
> #include "timeval.h"
> +#include "async-io.h"
> #include "openvswitch/vlog.h"
>
> VLOG_DEFINE_THIS_MODULE(jsonrpc);
>
> struct jsonrpc {
> - struct stream *stream;
> char *name;
> int status;
> -
> - /* Input. */
> - struct byteq input;
> - uint8_t input_buffer[4096];
> struct json_parser *parser;
> -
> - /* Output. */
> - struct ovs_list output; /* Contains "struct ofpbuf"s. */
> - size_t output_count; /* Number of elements in "output". */
> - size_t backlog;
> + struct async_data data;
> };
>
> +#define MIN_IDLE_TIME 10
> +
> /* Rate limit for error messages. */
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>
> @@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
> static void jsonrpc_cleanup(struct jsonrpc *);
> static void jsonrpc_error(struct jsonrpc *, int error);
>
> +static inline struct async_data *adata(struct jsonrpc *rpc) {
> + return &rpc->data;
> +}
> +
> +
> /* This is just the same as stream_open() except that it uses the default
> * JSONRPC port if none is specified. */
> int
> @@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream)
>
> rpc = xzalloc(sizeof *rpc);
> rpc->name = xstrdup(stream_get_name(stream));
> - rpc->stream = stream;
> - byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
> - ovs_list_init(&rpc->output);
> -
> + async_init_data(adata(rpc), stream);
> + async_stream_enable(adata(rpc));
> return rpc;
> }
>
> @@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc)
> void
> jsonrpc_run(struct jsonrpc *rpc)
> {
> + int retval;
> if (rpc->status) {
> return;
> }
>
> - stream_run(rpc->stream);
> - while (!ovs_list_is_empty(&rpc->output)) {
> - struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
> - int retval;
> -
> - retval = stream_send(rpc->stream, buf->data, buf->size);
> - if (retval >= 0) {
> - rpc->backlog -= retval;
> - ofpbuf_pull(buf, retval);
> - if (!buf->size) {
> - ovs_list_remove(&buf->list_node);
> - rpc->output_count--;
> - ofpbuf_delete(buf);
> - }
> - } else {
> + async_stream_run(adata(rpc));
> + do {
> + retval = async_stream_flush(&rpc->data);
> + if (retval < 0) {
> if (retval != -EAGAIN) {
> VLOG_WARN_RL(&rl, "%s: send error: %s",
> rpc->name, ovs_strerror(-retval));
> jsonrpc_error(rpc, -retval);
> }
> - break;
> }
> - }
> + } while (retval > 0);
> }
>
> /* Arranges for the poll loop to wake up when 'rpc' needs to perform
> @@ -144,9 +131,13 @@ void
> jsonrpc_wait(struct jsonrpc *rpc)
> {
> if (!rpc->status) {
> - stream_run_wait(rpc->stream);
> - if (!ovs_list_is_empty(&rpc->output)) {
> - stream_send_wait(rpc->stream);
> + if (adata(rpc)->async_mode) {
> + async_recv_wait(adata(rpc));
> + } else {
> + stream_run_wait(rpc->data.stream);
> + if (!async_output_is_empty(adata(rpc))) {
> + stream_send_wait(async_get_stream(adata(rpc)));
> + }
> }
> }
> }
> @@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc)
> size_t
> jsonrpc_get_backlog(const struct jsonrpc *rpc)
> {
> - return rpc->status ? 0 : rpc->backlog;
> + return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc));
> }
>
> /* Returns the number of bytes that have been received on 'rpc''s underlying
> @@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
> unsigned int
> jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
> {
> - return rpc->input.head;
> + return async_get_input(adata((struct jsonrpc *) rpc))->head;
> }
>
> /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
> @@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
> * buffered in 'rpc'.)
> *
> * Always takes ownership of 'msg', regardless of success. */
> +
> int
> jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
> {
> struct ofpbuf *buf;
> struct json *json;
> struct ds ds = DS_EMPTY_INITIALIZER;
> - size_t length;
>
> if (rpc->status) {
> jsonrpc_msg_destroy(msg);
> @@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>
> json = jsonrpc_msg_to_json(msg);
> json_to_ds(json, 0, &ds);
> - length = ds.length;
> json_destroy(json);
>
> buf = xmalloc(sizeof *buf);
> ofpbuf_use_ds(buf, &ds);
> - ovs_list_push_back(&rpc->output, &buf->list_node);
> - rpc->output_count++;
> - rpc->backlog += length;
> -
> - if (rpc->output_count >= 50) {
> - VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
> - " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
> - rpc->output_count, rpc->backlog);
> - }
> + async_stream_enqueue(adata(rpc), buf);
>
> - if (rpc->backlog == length) {
> - jsonrpc_run(rpc);
> - }
> + jsonrpc_run(rpc);
> return rpc->status;
> }
>
> @@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
> int
> jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
> {
> - int i;
> + int i, retval;
>
> *msgp = NULL;
> if (rpc->status) {
> @@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
> size_t n, used;
>
> /* Fill our input buffer if it's empty. */
> - if (byteq_is_empty(&rpc->input)) {
> - size_t chunk;
> - int retval;
> -
> - chunk = byteq_headroom(&rpc->input);
> - retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
> - if (retval < 0) {
> - if (retval == -EAGAIN) {
> - return EAGAIN;
> - } else {
> - VLOG_WARN_RL(&rl, "%s: receive error: %s",
> - rpc->name, ovs_strerror(-retval));
> - jsonrpc_error(rpc, -retval);
> - return rpc->status;
> - }
> - } else if (retval == 0) {
> - jsonrpc_error(rpc, EOF);
> - return EOF;
> + retval = async_stream_recv(adata(rpc));
> + if (retval < 0) {
> + if (retval == -EAGAIN) {
> + return EAGAIN;
> + } else {
> + VLOG_WARN_RL(&rl, "%s: receive error: %s",
> + rpc->name, ovs_strerror(-retval));
> + jsonrpc_error(rpc, -retval);
> + return rpc->status;
> }
> - byteq_advance_head(&rpc->input, retval);
> + } else if (retval == 0) {
> + jsonrpc_error(rpc, EOF);
> + return EOF;
> }
>
> /* We have some input. Feed it into the JSON parser. */
> if (!rpc->parser) {
> rpc->parser = json_parser_create(0);
> }
> - n = byteq_tailroom(&rpc->input);
> + n = byteq_tailroom(async_get_input(adata(rpc)));
> + if (n == 0) {
> + break;
> + }
> used = json_parser_feed(rpc->parser,
> - (char *) byteq_tail(&rpc->input), n);
> - byteq_advance_tail(&rpc->input, used);
> + (char *) byteq_tail(async_get_input(adata(rpc))), n);
> + byteq_advance_tail(async_get_input(adata(rpc)), used);
>
> /* If we have complete JSON, attempt to parse it as JSON-RPC. */
> if (json_parser_is_done(rpc->parser)) {
> @@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
> }
>
> if (rpc->status) {
> - const struct byteq *q = &rpc->input;
> + const struct byteq *q = async_get_input(adata(rpc));
> if (q->head <= q->size) {
> stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
> &this_module, rpc->name);
> @@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
> void
> jsonrpc_recv_wait(struct jsonrpc *rpc)
> {
> - if (rpc->status || !byteq_is_empty(&rpc->input)) {
> + if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) {
> poll_immediate_wake_at(rpc->name);
> } else {
> - stream_recv_wait(rpc->stream);
> + async_recv_wait(adata(rpc));
> }
> }
>
> @@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>
> for (;;) {
> jsonrpc_run(rpc);
> - if (ovs_list_is_empty(&rpc->output) || rpc->status) {
> + if (async_output_is_empty(adata(rpc)) || rpc->status) {
> return rpc->status;
> }
> jsonrpc_wait(rpc);
> @@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error)
> static void
> jsonrpc_cleanup(struct jsonrpc *rpc)
> {
> - stream_close(rpc->stream);
> - rpc->stream = NULL;
> + async_stream_disable(adata(rpc));
> + stream_close(rpc->data.stream);
> + rpc->data.stream = NULL;
>
> json_parser_abort(rpc->parser);
> rpc->parser = NULL;
>
> - ofpbuf_list_delete(&rpc->output);
> - rpc->backlog = 0;
> - rpc->output_count = 0;
> + async_cleanup_data(adata(rpc));
> }
>
> static struct jsonrpc_msg *
> @@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s)
> }
>
> if (s->rpc) {
> - size_t backlog;
> int error;
> + bool active = async_get_active(adata(s->rpc));
>
> - backlog = jsonrpc_get_backlog(s->rpc);
> jsonrpc_run(s->rpc);
> - if (jsonrpc_get_backlog(s->rpc) < backlog) {
> +
> + active |= async_get_active(adata(s->rpc));
> +
> + if (active) {
> /* Data previously caught in a queue was successfully sent (or
> * there's an error, which we'll catch below.)
> *
> @@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s)
> const char *
> jsonrpc_session_get_id(const struct jsonrpc_session *s)
> {
> - if (s->rpc && s->rpc->stream) {
> - return stream_get_peer_id(s->rpc->stream);
> + if (s->rpc && async_get_stream(adata(s->rpc))) {
> + return stream_get_peer_id(adata(s->rpc)->stream);
> } else {
> return NULL;
> }
> diff --git a/lib/stream-fd.c b/lib/stream-fd.c
> index 46ee7ae27..747d543cf 100644
> --- a/lib/stream-fd.c
> +++ b/lib/stream-fd.c
> @@ -30,6 +30,7 @@
> #include "stream-provider.h"
> #include "stream.h"
> #include "openvswitch/vlog.h"
> +#include "openvswitch/list.h"
>
> VLOG_DEFINE_THIS_MODULE(stream_fd);
>
> @@ -40,6 +41,8 @@ struct stream_fd
> struct stream stream;
> int fd;
> int fd_type;
> + struct ovs_list output;
> + int queue_depth;
> };
>
> static const struct stream_class stream_fd_class;
> @@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
> stream_init(&s->stream, &stream_fd_class, connect_status, name);
> s->fd = fd;
> s->fd_type = fd_type;
> + s->queue_depth = 0;
> + ovs_list_init(&s->output);
> *streamp = &s->stream;
> return 0;
> }
> @@ -83,6 +88,7 @@ fd_close(struct stream *stream)
> {
> struct stream_fd *s = stream_fd_cast(stream);
> closesocket(s->fd);
> + ofpbuf_list_delete(&s->output);
> free(s);
> }
>
> @@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
> if (error == WSAEWOULDBLOCK) {
> error = EAGAIN;
> }
> +#endif
> +#ifdef __linux__
> + if (error == ENOBUFS) {
> + error = EAGAIN;
> + }
> #endif
> if (error != EAGAIN) {
> VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
> @@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait)
> }
> }
>
> +static int
> +fd_enqueue(struct stream *stream, struct ofpbuf *buf)
> +{
> + struct stream_fd *sfd = stream_fd_cast(stream);
> + ovs_list_push_back(&sfd->output, &buf->list_node);
> + sfd->queue_depth ++;
> + return buf->size;
> +}
> +
> +static bool
> +fd_flush(struct stream *stream, int *retval)
> +{
> + struct stream_fd *sfd = stream_fd_cast(stream);
> + int old_q_depth;
> +
> + if (sfd->queue_depth == 0) {
> + * retval = -EAGAIN;
> + return true;
> + } else {
> + int sent, i = 0;
> + struct msghdr msg;
> + struct ofpbuf *buf;
> +
> + msg.msg_name = NULL;
> + msg.msg_namelen = 0;
> + msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth);
> + msg.msg_iovlen = sfd->queue_depth;
> + msg.msg_control = NULL;
> + msg.msg_controllen = 0;
> + msg.msg_flags = 0;
> +
> + LIST_FOR_EACH (buf, list_node, &sfd->output) {
> + msg.msg_iov[i].iov_base = buf->data;
> + msg.msg_iov[i].iov_len = buf->size;
> + i++;
> + }
> +
> + sent = sendmsg(sfd->fd, &msg, 0);
> +
> + free(msg.msg_iov);
> +
> + if (sent > 0) {
> + * retval = sent;
> + old_q_depth = sfd->queue_depth;
> + for (i = 0; i < old_q_depth ; i++) {
> + buf = ofpbuf_from_list(sfd->output.next);
> + if (buf->size > sent) {
> + ofpbuf_pull(buf, sent);
> + sent = 0;
> + } else {
> + sent -= buf->size;
> + sfd->queue_depth --;
> + ovs_list_remove(&buf->list_node);
> + ofpbuf_delete(buf);
> + }
> + if (sent == 0) {
> + break;
> + }
> + }
> + return true;
> + } else {
> + *retval = -sock_errno();
> + return false;
> + }
> + }
> +}
> +
> +
> +
> static const struct stream_class stream_fd_class = {
> "fd", /* name */
> false, /* needs_probes */
> @@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = {
> NULL, /* run */
> NULL, /* run_wait */
> fd_wait, /* wait */
> + fd_enqueue, /* enqueue */
> + fd_flush, /* flush */
> };
>
> /* Passive file descriptor stream. */
> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
> index 75f4f059b..b5161bd04 100644
> --- a/lib/stream-provider.h
> +++ b/lib/stream-provider.h
> @@ -18,9 +18,13 @@
> #define STREAM_PROVIDER_H 1
>
> #include <sys/types.h>
> +#include <poll.h>
> +#include "openvswitch/list.h"
> +#include "openvswitch/ofpbuf.h"
> +#include "openvswitch/thread.h"
> #include "stream.h"
> -
> -/* Active stream connection. */
> +#include "byteq.h"
> +#include "latch.h"
>
> /* Active stream connection.
> *
> @@ -124,6 +128,31 @@ struct stream_class {
> /* Arranges for the poll loop to wake up when 'stream' is ready to take an
> * action of the given 'type'. */
> void (*wait)(struct stream *stream, enum stream_wait_type type);
> + /* Enqueues an ofpbuf and surrenders its ownership to the
> + * stream
> + *
> + * - If successful - stream now owns the buffer, returns
> + * backlog size
> + *
> + * - On error, negative value, buffer is not claimed by
> + * the stream.
> + *
> + * The enqueue function must not block. If no bytes can be immediately
> + * accepted for transmission, it should return -EAGAIN immediately. */
> + int (*enqueue)(struct stream *stream, struct ofpbuf *buf);
> + /* Flushes any stream buffers
> + *
> + * - If successful returns true and retval contains the backlog size
> + *
> + * - If partially successful (EAGAIN), returns false and retval is
> + * a positive backlog size
> + *
> + * - If unsuccessful, returns false and retval contains a negative
> + * error value
> + *
> + * The flush function must not block. If buffers cannot be flushed
> + * completely it should return "partial success" immediately. */
> + bool (*flush)(struct stream *stream, int *retval);
> };
>
> /* Passive listener for incoming stream connections.
> @@ -184,6 +213,7 @@ struct pstream_class {
> /* Arranges for the poll loop to wake up when a connection is ready to be
> * accepted on 'pstream'. */
> void (*wait)(struct pstream *pstream);
> +
> };
>
> /* Active and passive stream classes. */
> diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
> index 078fcbc3a..0046e383e 100644
> --- a/lib/stream-ssl.c
> +++ b/lib/stream-ssl.c
> @@ -85,6 +85,8 @@ struct ssl_stream
> SSL *ssl;
> struct ofpbuf *txbuf;
> unsigned int session_nr;
> + int last_enqueued;
> + long backlog_to_report;
>
> /* rx_want and tx_want record the result of the last call to SSL_read()
> * and SSL_write(), respectively:
> @@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
> sslv->rx_want = sslv->tx_want = SSL_NOTHING;
> sslv->session_nr = next_session_nr++;
> sslv->n_head = 0;
> + sslv->last_enqueued = 0;
> + sslv->backlog_to_report = 0;
>
> if (VLOG_IS_DBG_ENABLED()) {
> SSL_set_msg_callback(ssl, ssl_protocol_cb);
> @@ -784,8 +788,59 @@ ssl_run(struct stream *stream)
> {
> struct ssl_stream *sslv = ssl_stream_cast(stream);
>
> - if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
> - ssl_clear_txbuf(sslv);
> + if (sslv->txbuf) {
> + if (ssl_do_tx(stream) != EAGAIN) {
> + sslv->backlog_to_report += sslv->last_enqueued;
> + ssl_clear_txbuf(sslv);
> + }
> + }
> +}
> +
> +static int
> +ssl_enqueue(struct stream *stream, struct ofpbuf *buf)
> +{
> + int n = buf->size;
> + struct ssl_stream *sslv = ssl_stream_cast(stream);
> + if (sslv->txbuf) {
> + return -EAGAIN;
> + }
> + sslv->txbuf = buf;
> + sslv->last_enqueued = n;
> + return n;
> +}
> +
> +static bool
> +ssl_flush(struct stream *stream, int *retval)
> +{
> + struct ssl_stream *sslv = ssl_stream_cast(stream);
> +
> + if (!sslv->txbuf) {
> + if (sslv->backlog_to_report) {
> + * retval = sslv->backlog_to_report;
> + sslv->backlog_to_report = 0;
> + } else {
> + * retval = -EAGAIN;
> + }
> + return true;
> + } else {
> + int error;
> +
> + error = ssl_do_tx(stream);
> + switch (error) {
> + case 0:
> + ssl_clear_txbuf(sslv);
> + * retval = sslv->backlog_to_report + sslv->last_enqueued;
> + sslv->backlog_to_report = 0;
> + sslv->last_enqueued = 0;
> + return true;
> + case EAGAIN:
> + * retval = 0;
> + return false;
> + default:
> + ssl_clear_txbuf(sslv);
> + * retval = -error;
> + return false;
> + }
> }
> }
>
> @@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
> /* We have room in our tx queue. */
> poll_immediate_wake();
> } else {
> - /* stream_run_wait() will do the right thing; don't bother with
> - * redundancy. */
> + poll_fd_wait(sslv->fd, POLLOUT);
> }
> break;
>
> @@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = {
> ssl_run, /* run */
> ssl_run_wait, /* run_wait */
> ssl_wait, /* wait */
> + ssl_enqueue, /* send_buf */
The comment should surely be "/* enqueue */", correct?
> + ssl_flush,
> };
>
> /* Passive SSL. */
> diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
> index e8dc2bfaa..63632e989 100644
> --- a/lib/stream-tcp.c
> +++ b/lib/stream-tcp.c
> @@ -73,6 +73,8 @@ const struct stream_class tcp_stream_class = {
> NULL, /* run */
> NULL, /* run_wait */
> NULL, /* wait */
> + NULL, /* enqueue */
> + NULL, /* flush */
> };
>
> /* Passive TCP. */
> diff --git a/lib/stream-unix.c b/lib/stream-unix.c
> index d265efb83..4fd7573f2 100644
> --- a/lib/stream-unix.c
> +++ b/lib/stream-unix.c
> @@ -73,6 +73,8 @@ const struct stream_class unix_stream_class = {
> NULL, /* run */
> NULL, /* run_wait */
> NULL, /* wait */
> + NULL, /* enqueue */
> + NULL, /* flush */
> };
>
> /* Passive UNIX socket. */
> diff --git a/lib/stream-windows.c b/lib/stream-windows.c
> index 5c4c55e5d..cb8c2d3c3 100644
> --- a/lib/stream-windows.c
> +++ b/lib/stream-windows.c
> @@ -374,6 +374,8 @@ const struct stream_class windows_stream_class = {
> NULL, /* run */
> NULL, /* run_wait */
> windows_wait, /* wait */
> + NULL, /* enqueue */
> + NULL, /* flush */
> };
>
> struct pwindows_pstream
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index d416f1b60..83450beaa 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -59,6 +59,7 @@
> #include "perf-counter.h"
> #include "ovsdb-util.h"
> #include "openvswitch/vlog.h"
> +#include "async-io.h"
>
> VLOG_DEFINE_THIS_MODULE(ovsdb_server);
>
> @@ -398,6 +399,7 @@ main(int argc, char *argv[])
> }
>
> daemonize_complete();
> + async_io_enable();
>
> if (!run_command) {
> /* ovsdb-server is usually a long-running process, in which case it
>
More information about the dev
mailing list