[ovs-dev] [PATCH v2 4/5] Introduce async IO in JSONRPC

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Tue Jun 2 07:21:51 UTC 2020


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

1. Pull out buffering and send/receive ops from json rpc

2. Make the SSL send zero copy (it was creating an ofpbuf
out of an existing ofpbuf data without necessity).

3. Add vector IO to stream-fd to make flushing more
efficient. Also makes queueing for stream-fd and stream-ssl
roughly identical.

4. Unify backlog management

5. Make use of the full capacity of the incoming buffer and
not only when it is empty.

6. Allow for IO to be run in worker threads

7. Various minor fixes to enable async io - make rx errors
visible to tx and vice versa. Make activity tracking for
reconnect async friendly, etc.

8. Enable Async IO in ovsdb

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 lib/async-io.c         | 523 +++++++++++++++++++++++++++++++++++++++++
 lib/async-io.h         |  86 +++++++
 lib/automake.mk        |   2 +
 lib/jsonrpc.c          | 151 +++++-------
 lib/stream-fd.c        |  82 +++++++
 lib/stream-provider.h  |  34 ++-
 lib/stream-ssl.c       |  64 ++++-
 lib/stream-tcp.c       |   2 +
 lib/stream-unix.c      |   2 +
 lib/stream-windows.c   |   2 +
 ovsdb/jsonrpc-server.c |  33 ++-
 ovsdb/ovsdb-server.c   |   2 +
 12 files changed, 872 insertions(+), 111 deletions(-)
 create mode 100644 lib/async-io.c
 create mode 100644 lib/async-io.h

diff --git a/lib/async-io.c b/lib/async-io.c
new file mode 100644
index 000000000..62725a653
--- /dev/null
+++ b/lib/async-io.c
@@ -0,0 +1,523 @@
+/*
+ * Copyright (c) 2020 Red Hat Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include "stream-provider.h"
+#include <errno.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include "coverage.h"
+#include "fatal-signal.h"
+#include "flow.h"
+#include "jsonrpc.h"
+#include "openflow/nicira-ext.h"
+#include "openflow/openflow.h"
+#include "openvswitch/dynamic-string.h"
+#include "openvswitch/ofp-print.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/vlog.h"
+#include "ovs-thread.h"
+#include "ovs-atomic.h"
+#include "packets.h"
+#include "openvswitch/poll-loop.h"
+#include "random.h"
+#include "socket-util.h"
+#include "util.h"
+#include "timeval.h"
+#include "async-io.h"
+#include "ovs-numa.h"
+
+VLOG_DEFINE_THIS_MODULE(async_io);
+
+static bool allow_async_io = false;
+
+static bool async_io_setup = false;
+static bool kill_async_io = false;
+
+static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
+
+static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools);
+
+static int pool_size;
+
+static struct async_io_pool *io_pool = NULL;
+
+static int do_async_recv(struct async_data *data);
+static int do_stream_flush(struct async_data *data);
+
+static inline bool not_in_error(struct async_data *data) {
+    int rx_error, tx_error;
+
+    if (!data->valid) {
+        return false;
+    }
+
+    atomic_read_relaxed(&data->rx_error, &rx_error);
+    atomic_read_relaxed(&data->tx_error, &tx_error);
+
+    return (
+        ((rx_error > 0) || (rx_error == -EAGAIN)) &&
+        ((tx_error >= 0) || (tx_error == -EAGAIN))
+    );
+}
+
+static inline bool in_error(struct async_data *data) {
+    return ! not_in_error(data);
+}
+
+
+static void *default_async_io_helper(void *arg) {
+    struct async_io_control *io_control =
+        (struct async_io_control *) arg;
+    struct async_data *data;
+    int retval;
+
+    do {
+        ovs_mutex_lock(&io_control->mutex);
+        latch_poll(&io_control->async_latch);
+        LIST_FOR_EACH (data, list_node, &io_control->work_items) {
+            long backlog, oldbacklog;
+            ovs_mutex_lock(&data->mutex);
+            retval = -EAGAIN;
+            if (not_in_error(data)) {
+                /*
+                 * We stop reading if the input queue is full
+                 */
+                if (byteq_headroom(&data->input) != 0) {
+                    retval = do_async_recv(data);
+                } else {
+                    poll_timer_wait(1);
+                    retval = 0;
+                }
+            }
+            if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) {
+                stream_recv_wait(data->stream);
+            }
+            atomic_read_relaxed(&data->backlog, &oldbacklog);
+            if (not_in_error(data)) {
+                stream_run(data->stream);
+                do_stream_flush(data);
+            }
+            atomic_read_relaxed(&data->backlog, &backlog);
+            if (not_in_error(data)) {
+                if (backlog) {
+                    /* upper layers will refuse to process rx
+                     * until the tx is clear, so no point
+                     * notifying them
+                     */
+                    stream_send_wait(data->stream);
+                }
+                if (!byteq_is_empty(&data->input) || oldbacklog) {
+                        latch_set(&data->rx_notify);
+                }
+            }
+            if (data->valid && in_error(data)) {
+                /* make sure that the other thread(s) notice any errors.
+                 * this should not be an else because errors may have
+                 * changed inside the ifs above.
+                 */
+                latch_set(&data->rx_notify);
+                data->valid = false;
+            }
+            if (not_in_error(data)) {
+                stream_run_wait(data->stream);
+            }
+            ovs_mutex_unlock(&data->mutex);
+        }
+        ovs_mutex_unlock(&io_control->mutex);
+        latch_wait(&io_control->async_latch);
+        poll_block();
+    } while (!kill_async_io);
+    return arg;
+}
+
+static void async_io_hook(void *aux OVS_UNUSED) {
+    int i;
+    static struct async_io_pool *pool;
+    kill_async_io = true;
+    LIST_FOR_EACH (pool, list_node, &io_pools) {
+        for (i = 0; i < pool->size ; i++) {
+            latch_set(&pool->controls[i].async_latch);
+            latch_destroy(&pool->controls[i].async_latch);
+        }
+    }
+}
+
+static void setup_async_io(void) {
+    int cores, nodes;
+
+    nodes = ovs_numa_get_n_numas();
+    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
+        nodes = 1;
+    }
+    cores = ovs_numa_get_n_cores();
+    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
+        pool_size = 4;
+    } else {
+        pool_size = cores / nodes;
+    }
+    fatal_signal_add_hook(async_io_hook, NULL, NULL, true);
+    async_io_setup = true;
+}
+
+struct async_io_pool *add_pool(void *(*start)(void *)){
+
+    struct async_io_pool *new_pool = NULL;
+    struct async_io_control *io_control;
+    int i;
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (!async_io_setup) {
+         setup_async_io();
+    }
+
+    new_pool = xmalloc(sizeof(struct async_io_pool));
+    new_pool->size = pool_size; /* we may make this more dynamic later */
+
+    ovs_list_push_back(&io_pools, &new_pool->list_node);
+
+    new_pool->controls =
+        xmalloc(sizeof(struct async_io_control) * new_pool->size);
+    for (i = 0; i < new_pool->size; i++) {
+        io_control = &new_pool->controls[i];
+        latch_init(&io_control->async_latch);
+        ovs_mutex_init(&io_control->mutex);
+        ovs_list_init(&io_control->work_items);
+    }
+    for (i = 0; i < pool_size; i++) {
+        ovs_thread_create("async io helper", start, &new_pool->controls[i]);
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return new_pool;
+}
+
+void
+async_init_data(struct async_data *data, struct stream *stream)
+{
+    struct async_io_control *target_control;
+    unsigned int buffer_size;
+
+    data->stream = stream;
+#ifdef __linux__
+    buffer_size = getpagesize();
+    if (!is_pow2(buffer_size)) {
+        buffer_size = ASYNC_BUFFER_SIZE;
+    }
+#else
+    buffer_size = ASYNC_BUFFER_SIZE;
+#endif
+#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600)
+    /* try to allocate a buffer_size as aligned, that by default is one page
+     * if that fails, fall back to normal memory allocation.
+     */
+    if (posix_memalign(
+            (void **) &data->input_buffer, buffer_size, buffer_size)) {
+        data->input_buffer = xmalloc(buffer_size);
+    }
+#else
+    data->input_buffer = xmalloc(buffer_size);
+#endif
+    byteq_init(&data->input, data->input_buffer, buffer_size);
+    ovs_list_init(&data->output);
+    data->output_count = 0;
+    data->rx_error = ATOMIC_VAR_INIT(-EAGAIN);
+    data->tx_error = ATOMIC_VAR_INIT(0);
+    data->active = ATOMIC_VAR_INIT(false);
+    data->backlog = ATOMIC_VAR_INIT(0);
+    ovs_mutex_init(&data->mutex);
+    data->async_mode = allow_async_io;
+    data->valid = true;
+    if (data->async_mode) {
+        if (!io_pool) {
+            io_pool = add_pool(default_async_io_helper);
+        }
+        data->async_id = random_uint32();
+        target_control = &io_pool->controls[data->async_id % io_pool->size];
+        /* these are just fd pairs, no need to play with pointers, we
+         * can pass them around
+         */
+        data->tx_run_notify = target_control->async_latch;
+        latch_init(&data->rx_notify);
+        ovs_mutex_lock(&target_control->mutex);
+        ovs_list_push_back(&target_control->work_items, &data->list_node);
+        ovs_mutex_unlock(&target_control->mutex);
+        latch_set(&target_control->async_latch);
+    }
+}
+
+void
+async_stream_enable(struct async_data *data)
+{
+    data->async_mode = allow_async_io;
+}
+
+void
+async_stream_disable(struct async_data *data)
+{
+    struct async_io_control *target_control;
+    bool needs_wake = false;
+
+
+    if (data->async_mode) {
+        if (not_in_error(data) && (async_get_backlog(data) > 0)) {
+            needs_wake = true;
+            latch_poll(&data->rx_notify);
+            latch_wait(&data->rx_notify);
+            latch_set(&data->tx_run_notify);
+            /* limit this to 50ms - should be enough for
+             * a single flush and we will not get stuck here
+             * waiting for a send to complete
+             */
+            poll_timer_wait(50);
+            poll_block();
+        }
+        if (needs_wake) {
+            /* we have lost all poll-wait info because we block()-ed
+             * locally, we need to force the upper layers to rerun so
+             * that they reinstate the correct waits
+             */
+            poll_immediate_wake();
+        }
+        target_control = &io_pool->controls[data->async_id % io_pool->size];
+        ovs_mutex_lock(&target_control->mutex);
+        ovs_list_remove(&data->list_node);
+        ovs_mutex_unlock(&target_control->mutex);
+        data->async_mode = false;
+        latch_destroy(&data->rx_notify);
+    }
+    if (data->input_buffer) {
+        free(data->input_buffer);
+        data->input_buffer = NULL;
+    }
+}
+
+void
+async_cleanup_data(struct async_data *data)
+{
+    if (async_get_backlog(data)) {
+        ofpbuf_list_delete(&data->output);
+    }
+    atomic_store_relaxed(&data->backlog, 0);
+    data->output_count = 0;
+}
+
+/* Routines intended for async IO */
+
+long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) {
+    long retval = -EAGAIN;
+    long discard;
+
+    ovs_mutex_lock(&data->mutex);
+    if (buf) {
+        ovs_list_push_back(&data->output, &buf->list_node);
+        data->output_count ++;
+        atomic_add_relaxed(&data->backlog, buf->size, &discard);
+        atomic_thread_fence(memory_order_release);
+    }
+    atomic_read_relaxed(&data->backlog, &retval);
+    ovs_mutex_unlock(&data->mutex);
+    return retval;
+}
+
+static int do_stream_flush(struct async_data *data) {
+    struct ofpbuf *buf;
+    int count = 0;
+    bool stamp = false;
+    int retval = -stream_connect(data->stream);
+    long discard;
+
+    if (!retval) {
+        while (!ovs_list_is_empty(&data->output) && count < 10) {
+            buf = ofpbuf_from_list(data->output.next);
+            if (data->stream->class->enqueue) {
+                ovs_list_remove(&buf->list_node);
+                retval = (data->stream->class->enqueue)(data->stream, buf);
+                if (retval > 0) {
+                    data->output_count--;
+                } else {
+                    ovs_list_push_front(&data->output, &buf->list_node);
+                }
+            } else {
+                retval = stream_send(data->stream, buf->data, buf->size);
+                if (retval > 0) {
+                    stamp = true;
+                    atomic_sub_relaxed(&data->backlog, retval, &discard);
+                    ofpbuf_pull(buf, retval);
+                    if (!buf->size) {
+                        /* stream now owns buf */
+                        ovs_list_remove(&buf->list_node);
+                        data->output_count--;
+                        ofpbuf_delete(buf);
+                    }
+                }
+            }
+            if (retval <= 0) {
+                break;
+            }
+            count++;
+        }
+        if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) {
+            (data->stream->class->flush)(data->stream, &retval);
+            if (retval > 0) {
+                stamp = true;
+                atomic_sub_relaxed(&data->backlog, retval, &discard);
+            }
+        }
+        if (stamp) {
+            atomic_store_relaxed(&data->active, true);
+        }
+    }
+    atomic_store_relaxed(&data->tx_error, retval);
+    return retval;
+}
+
+int async_stream_flush(struct async_data *data) {
+    int retval;
+
+    if (data->async_mode) {
+        atomic_read_relaxed(&data->tx_error, &retval);
+        if (retval >= 0) {
+            retval = -EAGAIN; /* fake a busy so that upper layers do not
+                               * retry, we will flush the backlog in the
+                               * background
+                               */
+        }
+        if (async_get_backlog(data)) {
+            latch_set(&data->tx_run_notify);
+        }
+    } else {
+        retval = do_stream_flush(data);
+    }
+    return retval;
+}
+
+static int do_async_recv(struct async_data *data) {
+    size_t chunk;
+    int retval;
+
+    atomic_read_relaxed(&data->rx_error, &retval);
+    if (retval > 0 || retval == -EAGAIN) {
+        chunk = byteq_headroom(&data->input);
+        if (chunk > 0) {
+            retval = stream_recv(
+                    data->stream, byteq_head(&data->input), chunk);
+            if (retval > 0) {
+                byteq_advance_head(&data->input, retval);
+            }
+        }
+    }
+    if (retval > 0 || retval == -EAGAIN) {
+        retval = byteq_used(&data->input);
+        if (retval == 0) {
+            retval = -EAGAIN;
+        }
+    }
+    atomic_store_relaxed(&data->rx_error, retval);
+    return retval;
+}
+
+
+int async_stream_recv(struct async_data *data) {
+    int retval = -EAGAIN;
+
+    if (data->async_mode) {
+        atomic_read_relaxed(&data->rx_error, &retval);
+        /* clear RX notifications */
+        latch_poll(&data->rx_notify);
+        /* fake a retval from byteq usage */
+        if (retval > 0 || retval == -EAGAIN) {
+            retval = byteq_used(&data->input);
+            if (retval == 0) {
+                retval = -EAGAIN;
+            }
+        }
+    } else {
+        retval = do_async_recv(data);
+    }
+    return retval;
+}
+
+void async_stream_run(struct async_data *data) {
+    if (!data->async_mode) {
+        stream_run(data->stream);
+    } else {
+        latch_set(&data->tx_run_notify);
+    }
+ }
+
+void async_io_kick(struct async_data *data) {
+    if (data->async_mode) {
+        latch_set(&data->tx_run_notify);
+    }
+}
+
+void async_recv_wait(struct async_data *data) {
+    if (data->async_mode) {
+        latch_poll(&data->rx_notify);
+        latch_wait(&data->rx_notify);
+    } else {
+        stream_recv_wait(data->stream);
+    }
+}
+
+void async_io_enable(void) {
+    allow_async_io = true;
+}
+
+/* Accessors for JSON RPC */
+
+struct byteq *async_get_input(struct async_data *data) {
+    return &data->input;
+}
+struct stream *async_get_stream(struct async_data *data) {
+    return data->stream;
+}
+
+bool async_output_is_empty(struct async_data *data) {
+    bool retval;
+    ovs_mutex_lock(&data->mutex);
+    /* backlog tracks backlog across the full stack all the
+     * way to the actual send. It is the source of truth
+     * if we have output or not so anybody asking if we
+     * have output should be told if we have backlog
+     * instead.
+     */
+    retval = (data->backlog == 0);
+    ovs_mutex_unlock(&data->mutex);
+    return retval;
+}
+
+long async_get_backlog(struct async_data *data) {
+    long retval;
+    /* This is used only by the unixctl connection
+     * so not worth it to convert backlog to atomics
+     */
+    atomic_read_relaxed(&data->backlog, &retval);
+    return retval;
+}
+
+bool async_get_active(struct async_data *data) {
+    bool test = true;
+    return atomic_compare_exchange_weak(&data->active, &test, false);
+}
+
+
diff --git a/lib/async-io.h b/lib/async-io.h
new file mode 100644
index 000000000..dea070ee6
--- /dev/null
+++ b/lib/async-io.h
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2020 Red Hat, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ASYNC_IO_H
+#define ASYNC_IO_H 1
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <sys/types.h>
+#include "openvswitch/types.h"
+#include "openvswitch/ofpbuf.h"
+#include "socket-util.h"
+#include "ovs-atomic.h"
+#include "ovs-thread.h"
+#include "latch.h"
+#include "byteq.h"
+#include "util.h"
+
+#define ASYNC_BUFFER_SIZE (4096)
+
+struct stream;
+
+struct async_data {
+    struct stream *stream;
+    struct ovs_list output;
+    struct ovs_list list_node;
+    long backlog;
+    size_t output_count;
+    atomic_bool active;
+    atomic_int rx_error, tx_error;
+    uint32_t async_id;
+    struct latch rx_notify, tx_run_notify;
+    struct ovs_mutex mutex;
+    bool async_mode, valid;
+    struct byteq input;
+    uint8_t *input_buffer;
+};
+
+struct async_io_control {
+    struct latch async_latch;
+    struct ovs_list work_items;
+    struct ovs_mutex mutex;
+};
+
+struct async_io_pool {
+    struct ovs_list list_node;
+    struct async_io_control *controls;
+    int size;
+};
+
+struct async_io_pool *add_pool(void *(*start)(void *));
+
+long async_stream_enqueue(struct async_data *, struct ofpbuf *buf);
+int async_stream_flush(struct async_data *);
+int async_stream_recv(struct async_data *);
+struct byteq *async_get_input(struct async_data *);
+struct stream *async_get_stream(struct async_data *);
+bool async_output_is_empty(struct async_data *);
+long async_get_backlog(struct async_data *);
+bool async_get_active(struct async_data *);
+
+void async_stream_enable(struct async_data *);
+void async_stream_disable(struct async_data *);
+
+void async_init_data(struct async_data *, struct stream *);
+void async_cleanup_data(struct async_data *);
+void async_stream_run(struct async_data *data);
+void async_io_kick(struct async_data *data);
+void async_recv_wait(struct async_data *data);
+void async_io_enable(void);
+
+#endif /* async-io.h */
diff --git a/lib/automake.mk b/lib/automake.mk
index 86940ccd2..6f7870f26 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \
 	lib/aes128.c \
 	lib/aes128.h \
 	lib/async-append.h \
+	lib/async-io.h \
+	lib/async-io.c \
 	lib/backtrace.c \
 	lib/backtrace.h \
 	lib/bfd.c \
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index 15e8d3527..0fc75691f 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -30,28 +30,23 @@
 #include "openvswitch/poll-loop.h"
 #include "reconnect.h"
 #include "stream.h"
+#include "stream-provider.h"
 #include "svec.h"
 #include "timeval.h"
+#include "async-io.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(jsonrpc);
 
 struct jsonrpc {
-    struct stream *stream;
     char *name;
     int status;
-
-    /* Input. */
-    struct byteq input;
-    uint8_t input_buffer[4096];
     struct json_parser *parser;
-
-    /* Output. */
-    struct ovs_list output;     /* Contains "struct ofpbuf"s. */
-    size_t output_count;        /* Number of elements in "output". */
-    size_t backlog;
+    struct async_data data;
 };
 
+#define MIN_IDLE_TIME 10
+
 /* Rate limit for error messages. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
 
@@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
 static void jsonrpc_cleanup(struct jsonrpc *);
 static void jsonrpc_error(struct jsonrpc *, int error);
 
+static inline struct async_data *adata(struct jsonrpc *rpc) {
+    return &rpc->data;
+}
+
+
 /* This is just the same as stream_open() except that it uses the default
  * JSONRPC port if none is specified. */
 int
@@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream)
 
     rpc = xzalloc(sizeof *rpc);
     rpc->name = xstrdup(stream_get_name(stream));
-    rpc->stream = stream;
-    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
-    ovs_list_init(&rpc->output);
-
+    async_init_data(adata(rpc), stream);
+    async_stream_enable(adata(rpc));
     return rpc;
 }
 
@@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc)
 void
 jsonrpc_run(struct jsonrpc *rpc)
 {
+    int retval;
     if (rpc->status) {
         return;
     }
 
-    stream_run(rpc->stream);
-    while (!ovs_list_is_empty(&rpc->output)) {
-        struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
-        int retval;
-
-        retval = stream_send(rpc->stream, buf->data, buf->size);
-        if (retval >= 0) {
-            rpc->backlog -= retval;
-            ofpbuf_pull(buf, retval);
-            if (!buf->size) {
-                ovs_list_remove(&buf->list_node);
-                rpc->output_count--;
-                ofpbuf_delete(buf);
-            }
-        } else {
+    async_stream_run(adata(rpc));
+    do {
+        retval = async_stream_flush(&rpc->data);
+        if (retval < 0) {
             if (retval != -EAGAIN) {
                 VLOG_WARN_RL(&rl, "%s: send error: %s",
                              rpc->name, ovs_strerror(-retval));
                 jsonrpc_error(rpc, -retval);
             }
-            break;
         }
-    }
+    } while (retval > 0);
 }
 
 /* Arranges for the poll loop to wake up when 'rpc' needs to perform
@@ -144,9 +131,13 @@ void
 jsonrpc_wait(struct jsonrpc *rpc)
 {
     if (!rpc->status) {
-        stream_run_wait(rpc->stream);
-        if (!ovs_list_is_empty(&rpc->output)) {
-            stream_send_wait(rpc->stream);
+        if (adata(rpc)->async_mode) {
+            async_recv_wait(adata(rpc));
+        } else {
+            stream_run_wait(rpc->data.stream);
+            if (!async_output_is_empty(adata(rpc))) {
+                stream_send_wait(async_get_stream(adata(rpc)));
+            }
         }
     }
 }
@@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc)
 size_t
 jsonrpc_get_backlog(const struct jsonrpc *rpc)
 {
-    return rpc->status ? 0 : rpc->backlog;
+    return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc));
 }
 
 /* Returns the number of bytes that have been received on 'rpc''s underlying
@@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
 unsigned int
 jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
 {
-    return rpc->input.head;
+    return async_get_input(adata((struct jsonrpc *) rpc))->head;
 }
 
 /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
@@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
  * buffered in 'rpc'.)
  *
  * Always takes ownership of 'msg', regardless of success. */
+
 int
 jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 {
     struct ofpbuf *buf;
     struct json *json;
     struct ds ds = DS_EMPTY_INITIALIZER;
-    size_t length;
 
     if (rpc->status) {
         jsonrpc_msg_destroy(msg);
@@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 
     json = jsonrpc_msg_to_json(msg);
     json_to_ds(json, 0, &ds);
-    length = ds.length;
     json_destroy(json);
 
     buf = xmalloc(sizeof *buf);
     ofpbuf_use_ds(buf, &ds);
-    ovs_list_push_back(&rpc->output, &buf->list_node);
-    rpc->output_count++;
-    rpc->backlog += length;
-
-    if (rpc->output_count >= 50) {
-        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
-                     " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
-                     rpc->output_count, rpc->backlog);
-    }
+    async_stream_enqueue(adata(rpc), buf);
 
-    if (rpc->backlog == length) {
-        jsonrpc_run(rpc);
-    }
+    jsonrpc_run(rpc);
     return rpc->status;
 }
 
@@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 int
 jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
 {
-    int i;
+    int i, retval;
 
     *msgp = NULL;
     if (rpc->status) {
@@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
         size_t n, used;
 
         /* Fill our input buffer if it's empty. */
-        if (byteq_is_empty(&rpc->input)) {
-            size_t chunk;
-            int retval;
-
-            chunk = byteq_headroom(&rpc->input);
-            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
-            if (retval < 0) {
-                if (retval == -EAGAIN) {
-                    return EAGAIN;
-                } else {
-                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
-                                 rpc->name, ovs_strerror(-retval));
-                    jsonrpc_error(rpc, -retval);
-                    return rpc->status;
-                }
-            } else if (retval == 0) {
-                jsonrpc_error(rpc, EOF);
-                return EOF;
+        retval = async_stream_recv(adata(rpc));
+        if (retval < 0) {
+            if (retval == -EAGAIN) {
+                return EAGAIN;
+            } else {
+                VLOG_WARN_RL(&rl, "%s: receive error: %s",
+                             rpc->name, ovs_strerror(-retval));
+                jsonrpc_error(rpc, -retval);
+                return rpc->status;
             }
-            byteq_advance_head(&rpc->input, retval);
+        } else if (retval == 0) {
+            jsonrpc_error(rpc, EOF);
+            return EOF;
         }
 
         /* We have some input.  Feed it into the JSON parser. */
         if (!rpc->parser) {
             rpc->parser = json_parser_create(0);
         }
-        n = byteq_tailroom(&rpc->input);
+        n = byteq_tailroom(async_get_input(adata(rpc)));
+        if (n == 0) {
+            break;
+        }
         used = json_parser_feed(rpc->parser,
-                                (char *) byteq_tail(&rpc->input), n);
-        byteq_advance_tail(&rpc->input, used);
+                        (char *) byteq_tail(async_get_input(adata(rpc))), n);
+        byteq_advance_tail(async_get_input(adata(rpc)), used);
 
         /* If we have complete JSON, attempt to parse it as JSON-RPC. */
         if (json_parser_is_done(rpc->parser)) {
@@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
             }
 
             if (rpc->status) {
-                const struct byteq *q = &rpc->input;
+                const struct byteq *q = async_get_input(adata(rpc));
                 if (q->head <= q->size) {
                     stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
                                           &this_module, rpc->name);
@@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
 void
 jsonrpc_recv_wait(struct jsonrpc *rpc)
 {
-    if (rpc->status || !byteq_is_empty(&rpc->input)) {
+    if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) {
         poll_immediate_wake_at(rpc->name);
     } else {
-        stream_recv_wait(rpc->stream);
+        async_recv_wait(adata(rpc));
     }
 }
 
@@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
 
     for (;;) {
         jsonrpc_run(rpc);
-        if (ovs_list_is_empty(&rpc->output) || rpc->status) {
+        if (async_output_is_empty(adata(rpc)) || rpc->status) {
             return rpc->status;
         }
         jsonrpc_wait(rpc);
@@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error)
 static void
 jsonrpc_cleanup(struct jsonrpc *rpc)
 {
-    stream_close(rpc->stream);
-    rpc->stream = NULL;
+    async_stream_disable(adata(rpc));
+    stream_close(rpc->data.stream);
+    rpc->data.stream = NULL;
 
     json_parser_abort(rpc->parser);
     rpc->parser = NULL;
 
-    ofpbuf_list_delete(&rpc->output);
-    rpc->backlog = 0;
-    rpc->output_count = 0;
+    async_cleanup_data(adata(rpc));
 }
 
 static struct jsonrpc_msg *
@@ -998,12 +973,14 @@ jsonrpc_session_run(struct jsonrpc_session *s)
     }
 
     if (s->rpc) {
-        size_t backlog;
         int error;
+        bool active = async_get_active(adata(s->rpc));
 
-        backlog = jsonrpc_get_backlog(s->rpc);
         jsonrpc_run(s->rpc);
-        if (jsonrpc_get_backlog(s->rpc) < backlog) {
+
+        active |= async_get_active(adata(s->rpc));
+
+        if (active) {
             /* Data previously caught in a queue was successfully sent (or
              * there's an error, which we'll catch below.)
              *
@@ -1103,8 +1080,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s)
 const char *
 jsonrpc_session_get_id(const struct jsonrpc_session *s)
 {
-    if (s->rpc && s->rpc->stream) {
-        return stream_get_peer_id(s->rpc->stream);
+    if (s->rpc && async_get_stream(adata(s->rpc))) {
+        return stream_get_peer_id(adata(s->rpc)->stream);
     } else {
         return NULL;
     }
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 30622929b..1c5faf0af 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -30,6 +30,7 @@
 #include "stream-provider.h"
 #include "stream.h"
 #include "openvswitch/vlog.h"
+#include "openvswitch/list.h"
 
 VLOG_DEFINE_THIS_MODULE(stream_fd);
 
@@ -40,6 +41,8 @@ struct stream_fd
     struct stream stream;
     int fd;
     int fd_type;
+    struct ovs_list output;
+    int queue_depth;
 };
 
 static const struct stream_class stream_fd_class;
@@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type,
     stream_init(&s->stream, &stream_fd_class, connect_status, name);
     s->fd = fd;
     s->fd_type = fd_type;
+    s->queue_depth = 0;
+    ovs_list_init(&s->output);
     *streamp = &s->stream;
     return 0;
 }
@@ -83,6 +88,7 @@ fd_close(struct stream *stream)
 {
     struct stream_fd *s = stream_fd_cast(stream);
     closesocket(s->fd);
+    ofpbuf_list_delete(&s->output);
     free(s);
 }
 
@@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n)
         if (error == WSAEWOULDBLOCK) {
            error = EAGAIN;
         }
+#endif
+#ifdef __linux__
+        if (error == ENOBUFS) {
+           error = EAGAIN;
+        }
 #endif
         if (error != EAGAIN) {
             VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error));
@@ -170,6 +181,75 @@ static bool fd_set_probe_interval(struct stream *stream, int probe_interval) {
 
 
 
+static int
+fd_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+    struct stream_fd *sfd = stream_fd_cast(stream);
+    ovs_list_push_back(&sfd->output, &buf->list_node);
+    sfd->queue_depth ++;
+    return buf->size;
+}
+
+static bool
+fd_flush(struct stream *stream, int *retval)
+{
+    struct stream_fd *sfd = stream_fd_cast(stream);
+    int old_q_depth;
+
+    if (sfd->queue_depth == 0) {
+        * retval = -EAGAIN;
+        return true;
+    } else {
+        int sent, i = 0;
+        struct msghdr msg;
+        struct ofpbuf *buf;
+
+        msg.msg_name = NULL;
+        msg.msg_namelen = 0;
+        msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth);
+        msg.msg_iovlen = sfd->queue_depth;
+        msg.msg_control = NULL;
+        msg.msg_controllen = 0;
+        msg.msg_flags = 0;
+
+        LIST_FOR_EACH (buf, list_node, &sfd->output) {
+            msg.msg_iov[i].iov_base = buf->data;
+            msg.msg_iov[i].iov_len = buf->size;
+            i++;
+        }
+
+        sent = sendmsg(sfd->fd, &msg, 0);
+
+        free(msg.msg_iov);
+
+        if (sent > 0) {
+            * retval = sent;
+            old_q_depth = sfd->queue_depth;
+            for (i = 0; i < old_q_depth ; i++) {
+                buf = ofpbuf_from_list(sfd->output.next);
+                if (buf->size > sent) {
+                    ofpbuf_pull(buf, sent);
+                    sent = 0;
+                } else {
+                    sent -= buf->size;
+                    sfd->queue_depth --;
+                    ovs_list_remove(&buf->list_node);
+                    ofpbuf_delete(buf);
+                }
+                if (sent == 0) {
+                    break;
+                }
+            }
+            return true;
+        } else {
+            *retval = -sock_errno();
+            return false;
+        }
+    }
+}
+
+
+
 static const struct stream_class stream_fd_class = {
     "fd",                       /* name */
     false,                      /* needs_probes */
@@ -182,6 +262,8 @@ static const struct stream_class stream_fd_class = {
     NULL,                       /* run_wait */
     fd_wait,                    /* wait */
     fd_set_probe_interval,      /* set_probe_interval */
+    fd_enqueue,                 /* enqueue */
+    fd_flush,                   /* flush */
 };
 
 /* Passive file descriptor stream. */
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 6c28cb50b..993c2f742 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -18,9 +18,13 @@
 #define STREAM_PROVIDER_H 1
 
 #include <sys/types.h>
+#include <poll.h>
+#include "openvswitch/list.h"
+#include "openvswitch/ofpbuf.h"
+#include "openvswitch/thread.h"
 #include "stream.h"
-
-/* Active stream connection. */
+#include "byteq.h"
+#include "latch.h"
 
 /* Active stream connection.
  *
@@ -130,6 +134,31 @@ struct stream_class {
      *
      */
     bool (*set_probe_interval)(struct stream *stream, int probe_interval);
+    /* Enqueues an ofpbuf and surrenders its ownership to the
+     * stream
+     *
+     *     - If successful - stream now owns the buffer, returns
+     *     backlog size
+     *
+     *     - On error, negative value, buffer is not claimed by
+     *     the stream.
+     *
+     * The enqueue function must not block.  If no bytes can be immediately
+     * accepted for transmission, it should return -EAGAIN immediately. */
+    int (*enqueue)(struct stream *stream, struct ofpbuf *buf);
+    /* Flushes any stream buffers
+     *
+     *     - If successful returns true and retval contains the backlog size
+     *
+     *     - If partially successful (EAGAIN), returns false and retval is
+     *       a positive backlog size
+     *
+     *     - If unsuccessful, returns false and retval contains a negative
+     *       error value
+     *
+     * The flush function must not block.  If buffers cannot be flushed
+     * completely it should return "partial success" immediately. */
+    bool (*flush)(struct stream *stream, int *retval);
 };
 
 /* Passive listener for incoming stream connections.
@@ -190,6 +219,7 @@ struct pstream_class {
     /* Arranges for the poll loop to wake up when a connection is ready to be
      * accepted on 'pstream'. */
     void (*wait)(struct pstream *pstream);
+
 };
 
 /* Active and passive stream classes. */
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 575c55f5b..38a07bf6a 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -85,6 +85,8 @@ struct ssl_stream
     SSL *ssl;
     struct ofpbuf *txbuf;
     unsigned int session_nr;
+    int last_enqueued;
+    long backlog_to_report;
 
     /* rx_want and tx_want record the result of the last call to SSL_read()
      * and SSL_write(), respectively:
@@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
     sslv->rx_want = sslv->tx_want = SSL_NOTHING;
     sslv->session_nr = next_session_nr++;
     sslv->n_head = 0;
+    sslv->last_enqueued = 0;
+    sslv->backlog_to_report = 0;
 
     if (VLOG_IS_DBG_ENABLED()) {
         SSL_set_msg_callback(ssl, ssl_protocol_cb);
@@ -784,8 +788,59 @@ ssl_run(struct stream *stream)
 {
     struct ssl_stream *sslv = ssl_stream_cast(stream);
 
-    if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) {
-        ssl_clear_txbuf(sslv);
+    if (sslv->txbuf) {
+        if (ssl_do_tx(stream) != EAGAIN) {
+            sslv->backlog_to_report += sslv->last_enqueued;
+            ssl_clear_txbuf(sslv);
+        }
+    }
+}
+
+static int
+ssl_enqueue(struct stream *stream, struct ofpbuf *buf)
+{
+    int n = buf->size;
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+    if (sslv->txbuf) {
+        return -EAGAIN;
+    }
+    sslv->txbuf = buf;
+    sslv->last_enqueued = n;
+    return n;
+}
+
+static bool
+ssl_flush(struct stream *stream, int *retval)
+{
+    struct ssl_stream *sslv = ssl_stream_cast(stream);
+
+    if (!sslv->txbuf) {
+        if (sslv->backlog_to_report) {
+            * retval = sslv->backlog_to_report;
+            sslv->backlog_to_report = 0;
+        } else {
+            * retval = -EAGAIN;
+        }
+        return true;
+    } else {
+        int error;
+
+        error = ssl_do_tx(stream);
+        switch (error) {
+        case 0:
+            ssl_clear_txbuf(sslv);
+            * retval = sslv->backlog_to_report + sslv->last_enqueued;
+            sslv->backlog_to_report = 0;
+            sslv->last_enqueued = 0;
+            return true;
+        case EAGAIN:
+            * retval = 0;
+            return false;
+        default:
+            ssl_clear_txbuf(sslv);
+            * retval = -error;
+            return false;
+        }
     }
 }
 
@@ -847,8 +902,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait)
             /* We have room in our tx queue. */
             poll_immediate_wake();
         } else {
-            /* stream_run_wait() will do the right thing; don't bother with
-             * redundancy. */
+            poll_fd_wait(sslv->fd, POLLOUT);
         }
         break;
 
@@ -869,6 +923,8 @@ const struct stream_class ssl_stream_class = {
     ssl_run_wait,               /* run_wait */
     ssl_wait,                   /* wait */
     ssl_set_probe_interval,     /* set_probe_interval */
+    ssl_enqueue,                /* send_buf */
+    ssl_flush,
 };
 
 /* Passive SSL. */
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index 67c912105..7821c1dd1 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -74,6 +74,8 @@ const struct stream_class tcp_stream_class = {
     NULL,                       /* run_wait */
     NULL,                       /* wait */
     NULL,
+    NULL,                       /* enqueue */
+    NULL,                       /* flush */
 };
 
 /* Passive TCP. */
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index 2322df955..7f3d7ab5c 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -74,6 +74,8 @@ const struct stream_class unix_stream_class = {
     NULL,                       /* run_wait */
     NULL,                       /* wait */
     NULL,
+    NULL,                       /* enqueue */
+    NULL,                       /* flush */
 };
 
 /* Passive UNIX socket. */
diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 836112f75..d40e55d5c 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -375,6 +375,8 @@ const struct stream_class windows_stream_class = {
     NULL,                       /* run_wait */
     windows_wait,               /* wait */
     NULL,
+    NULL,                       /* enqueue */
+    NULL,                       /* flush */
 };
 
 struct pwindows_pstream
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index fe8ffc317..3cf3df46d 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -540,6 +540,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 static int
 ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
 {
+    struct jsonrpc_msg *msg;
+
     jsonrpc_session_run(s->js);
     if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
         s->js_seqno = jsonrpc_session_get_seqno(s->js);
@@ -549,25 +551,20 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
     }
 
     ovsdb_jsonrpc_trigger_complete_done(s);
+    ovsdb_jsonrpc_monitor_flush_all(s);
 
-    if (!jsonrpc_session_get_backlog(s->js)) {
-        struct jsonrpc_msg *msg;
-
-        ovsdb_jsonrpc_monitor_flush_all(s);
-
-        msg = jsonrpc_session_recv(s->js);
-        if (msg) {
-            if (msg->type == JSONRPC_REQUEST) {
-                ovsdb_jsonrpc_session_got_request(s, msg);
-            } else if (msg->type == JSONRPC_NOTIFY) {
-                ovsdb_jsonrpc_session_got_notify(s, msg);
-            } else {
-                VLOG_WARN("%s: received unexpected %s message",
-                          jsonrpc_session_get_name(s->js),
-                          jsonrpc_msg_type_to_string(msg->type));
-                jsonrpc_session_force_reconnect(s->js);
-                jsonrpc_msg_destroy(msg);
-            }
+    msg = jsonrpc_session_recv(s->js);
+    if (msg) {
+        if (msg->type == JSONRPC_REQUEST) {
+            ovsdb_jsonrpc_session_got_request(s, msg);
+        } else if (msg->type == JSONRPC_NOTIFY) {
+            ovsdb_jsonrpc_session_got_notify(s, msg);
+        } else {
+            VLOG_WARN("%s: received unexpected %s message",
+                      jsonrpc_session_get_name(s->js),
+                      jsonrpc_msg_type_to_string(msg->type));
+            jsonrpc_session_force_reconnect(s->js);
+            jsonrpc_msg_destroy(msg);
         }
     }
     return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
index d416f1b60..83450beaa 100644
--- a/ovsdb/ovsdb-server.c
+++ b/ovsdb/ovsdb-server.c
@@ -59,6 +59,7 @@
 #include "perf-counter.h"
 #include "ovsdb-util.h"
 #include "openvswitch/vlog.h"
+#include "async-io.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_server);
 
@@ -398,6 +399,7 @@ main(int argc, char *argv[])
     }
 
     daemonize_complete();
+    async_io_enable();
 
     if (!run_command) {
         /* ovsdb-server is usually a long-running process, in which case it
-- 
2.20.1



More information about the dev mailing list