[ovs-dev] [worker 08/12] worker: New library for breaking a daemon into multiple processes.

Ben Pfaff blp at nicira.com
Wed May 23 00:19:11 UTC 2012


ovs-vswitchd is effectively a "soft real-time" process, because flows that
do not get set up quickly lead to packet loss or retransmission.  We've
done our best to keep it from blocking unnecessarily, but some operations
unavoidably block.  This new library allows a daemon to break itself up
into a main process and a worker process, connected by an RPC channel,
with the idea being that the main process will delegate any possibly
blocking operations to the worker.

This commit also modifies ovs-vswitchd to start a worker process, but it
does not actually introduce any uses for the worker process.  Upcoming
commits will add those.

Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 lib/automake.mk         |    4 +-
 lib/worker.c            |  447 +++++++++++++++++++++++++++++++++++++++++++++++
 lib/worker.h            |   68 +++++++
 vswitchd/ovs-vswitchd.c |    5 +
 4 files changed, 523 insertions(+), 1 deletions(-)
 create mode 100644 lib/worker.c
 create mode 100644 lib/worker.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 1d404c2..1bd87cc 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -193,7 +193,9 @@ lib_libopenvswitch_a_SOURCES = \
 	lib/vlog.c \
 	lib/vlog.h \
 	lib/vswitch-idl.c \
-	lib/vswitch-idl.h
+	lib/vswitch-idl.h \
+	lib/worker.c \
+	lib/worker.h
 
 nodist_lib_libopenvswitch_a_SOURCES = \
 	lib/dirs.c
diff --git a/lib/worker.c b/lib/worker.c
new file mode 100644
index 0000000..4661d02
--- /dev/null
+++ b/lib/worker.c
@@ -0,0 +1,447 @@
+/* Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "worker.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include "command-line.h"
+#include "daemon.h"
+#include "ofpbuf.h"
+#include "poll-loop.h"
+#include "socket-util.h"
+#include "util.h"
+#include "vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(worker);
+
+/* Header for an RPC request. */
+struct worker_request {
+    size_t request_len;              /* Length of the payload in bytes. */
+    worker_request_func *request_cb; /* Function to call in worker process. */
+    worker_reply_func *reply_cb;     /* Function to call in main process. */
+    void *reply_aux;                 /* Auxiliary data for 'reply_cb'. */
+};
+
+/* Header for an RPC reply. */
+struct worker_reply {
+    size_t reply_len;            /* Length of the payload in bytes. */
+    worker_reply_func *reply_cb; /* Function to call in main process. */
+    void *reply_aux;             /* Auxiliary data for 'reply_cb'. */
+};
+
+/* Receive buffer for a RPC request or reply. */
+struct rxbuf {
+    /* Header. */
+    struct ofpbuf header;       /* Header data. */
+    int fds[SOUTIL_MAX_FDS];    /* File descriptors. */
+    size_t n_fds;
+
+    /* Payload. */
+    struct ofpbuf payload;      /* Payload data. */
+};
+
+static int client_sock = -1;
+static struct rxbuf client_rx;
+
+static void rxbuf_init(struct rxbuf *);
+static void rxbuf_clear(struct rxbuf *);
+static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
+
+static struct iovec *prefix_iov(void *data, size_t len,
+                                const struct iovec *iovs, size_t n_iovs);
+
+static void worker_broke(void);
+
+static void worker_main(int fd) NO_RETURN;
+
+/* Starts a worker process as a subprocess of the current process.  Currently
+ * only a single worker process is supported, so this function may only be
+ * called once.
+ *
+ * The client should call worker_run() and worker_wait() from its main loop.
+ *
+ * Call this function between daemonize_start() and daemonize_complete(). */
+void
+worker_start(void)
+{
+    int work_fds[2];
+
+    assert(client_sock < 0);
+
+    /* Create non-blocking socket pair. */
+    xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
+    xset_nonblocking(work_fds[0]);
+    xset_nonblocking(work_fds[1]);
+
+    if (!fork_and_clean_up()) {
+        /* In child (worker) process. */
+        daemonize_post_detach();
+        close(work_fds[0]);
+        worker_main(work_fds[1]);
+        NOT_REACHED();
+    }
+
+    /* In parent (main) process. */
+    close(work_fds[1]);
+    client_sock = work_fds[0];
+    rxbuf_init(&client_rx);
+}
+
+/* Returns true if this process has started a worker and the worker is not
+ * known to have malfunctioned. */
+bool
+worker_is_running(void)
+{
+    return client_sock >= 0;
+}
+
+/* If a worker process was started, processes RPC replies from it, calling the
+ * registered 'reply_cb' callbacks.
+ *
+ * If the worker process died or malfunctioned, aborts. */
+void
+worker_run(void)
+{
+    if (worker_is_running()) {
+        int error;
+
+        error = rxbuf_run(&client_rx, client_sock,
+                          sizeof(struct worker_reply));
+        if (!error) {
+            struct worker_reply *reply = client_rx.header.data;
+            reply->reply_cb(&client_rx.payload, client_rx.fds,
+                            client_rx.n_fds, reply->reply_aux);
+            rxbuf_clear(&client_rx);
+        } else if (error != EAGAIN) {
+            worker_broke();
+            VLOG_ABORT("receive from worker failed (%s)",
+                       ovs_retval_to_string(error));
+        }
+    }
+}
+
+/* Causes the poll loop to wake up if we need to process RPC replies. */
+void
+worker_wait(void)
+{
+    if (worker_is_running()) {
+        poll_fd_wait(client_sock, POLLIN);
+    }
+}
+
+/* Interface for main process to interact with the worker. */
+
+/* Sends an RPC request to the worker process.  The worker process will call
+ * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
+ * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
+ * in 'fds'.
+ *
+ * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
+ * or worker_reply_iovec() with a reply.  The main process will later call
+ * 'reply_cb' with the reply data (if any) and file descriptors (if any).
+ *
+ * 'request_cb' receives copies (as if by dup()) of the file descriptors in
+ * fds[].  'request_cb' takes ownership of these copies, and the caller of
+ * worker_request() retains its ownership of the originals.
+ *
+ * This function may block until the RPC request has been sent (if the socket
+ * buffer fills up) but it does not wait for the reply (if any).  If this
+ * function blocks, it may invoke reply callbacks for previous requests.
+ *
+ * The worker process executes RPC requests in strict order of submission and
+ * runs each request to completion before beginning the next request.  The main
+ * process invokes reply callbacks in strict order of request submission. */
+void
+worker_request(const void *data, size_t size,
+               const int fds[], size_t n_fds,
+               worker_request_func *request_cb,
+               worker_reply_func *reply_cb, void *aux)
+{
+    if (size > 0) {
+        struct iovec iov;
+
+        iov.iov_base = (void *) data;
+        iov.iov_len = size;
+        worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
+    } else {
+        worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
+    }
+}
+
+static int
+worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
+                  const int fds[], size_t n_fds)
+{
+    size_t sent = 0;
+
+    for (;;) {
+        int error;
+
+        /* Try to send the rest of the request. */
+        error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
+                                         fds, n_fds, sent, &sent);
+        if (error != EAGAIN) {
+            return error;
+        }
+
+        /* Process replies to avoid deadlock. */
+        worker_run();
+
+        poll_fd_wait(client_sock, POLLIN | POLLOUT);
+        poll_block();
+    }
+}
+
+/* Same as worker_request() except that the data to send is specified as an
+ * array of iovecs. */
+void
+worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
+                     const int fds[], size_t n_fds,
+                     worker_request_func *request_cb,
+                     worker_reply_func *reply_cb, void *aux)
+{
+    struct worker_request rq;
+    struct iovec *all_iovs;
+    int error;
+
+    assert(worker_is_running());
+
+    rq.request_len = iovec_len(iovs, n_iovs);
+    rq.request_cb = request_cb;
+    rq.reply_cb = reply_cb;
+    rq.reply_aux = aux;
+
+    all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
+    error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
+    if (error) {
+        worker_broke();
+        VLOG_ABORT("send failed (%s)", strerror(error));
+    }
+    free(all_iovs);
+}
+
+/* Closes the client socket, if any, so that worker_is_running() will return
+ * false.
+ *
+ * The client does this just before aborting if the worker process dies or
+ * malfunctions, to prevent the logging subsystem from trying to use the
+ * worker to log the failure. */
+static void
+worker_broke(void)
+{
+    if (client_sock >= 0) {
+        close(client_sock);
+        client_sock = -1;
+    }
+}
+
+/* Interfaces for RPC implementations (running in the worker process). */
+
+static int server_sock = -1;
+static bool expect_reply;
+static struct worker_request request;
+
+/* When a call to worker_request() or worker_request_iovec() provides a
+ * 'reply_cb' callback, the 'request_cb' implementation must call this function
+ * to send its reply.  The main process will call 'reply_cb' passing the
+ * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
+ * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
+ *
+ * If a call to worker_request() or worker_request_iovec() provides no
+ * 'reply_cb' callback, the 'request_cb' implementation must not call this
+ * function.
+ *
+ * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
+ * fds[].  'reply_cb' takes ownership of these copies, and the caller of
+ * worker_reply() retains its ownership of the originals.
+ *
+ * This function blocks until the RPC reply has been sent (if the socket buffer
+ * fills up) but it does not wait for the main process to receive or to process
+ * the reply. */
+void
+worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
+{
+    if (size > 0) {
+        struct iovec iov;
+
+        iov.iov_base = (void *) data;
+        iov.iov_len = size;
+        worker_reply_iovec(&iov, 1, fds, n_fds);
+    } else {
+        worker_reply_iovec(NULL, 0, fds, n_fds);
+    }
+}
+
+/* Same as worker_reply() except that the data to send is specified as an array
+ * of iovecs. */
+void
+worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
+                       const int fds[], size_t n_fds)
+{
+    struct worker_reply reply;
+    struct iovec *all_iovs;
+    int error;
+
+    assert(expect_reply);
+    expect_reply = false;
+
+    reply.reply_len = iovec_len(iovs, n_iovs);
+    reply.reply_cb = request.reply_cb;
+    reply.reply_aux = request.reply_aux;
+
+    all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
+
+    error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
+                                           fds, n_fds);
+    if (error == EPIPE) {
+        /* Parent probably died.  Continue processing any RPCs still buffered,
+         * to avoid missing log messages. */
+        VLOG_INFO("send failed (%s)", strerror(error));
+    } else if (error) {
+        VLOG_ABORT("send failed (%s)", strerror(error));
+    }
+
+    free(all_iovs);
+}
+
+static void
+worker_main(int fd)
+{
+    struct rxbuf rx;
+
+    server_sock = fd;
+
+    subprogram_name = "worker";
+    proctitle_set("%s: worker process for pid %lu",
+                  program_name, (unsigned long int) getppid());
+    VLOG_INFO("worker process started");
+
+    rxbuf_init(&rx);
+    for (;;) {
+        int error;
+
+        error = rxbuf_run(&rx, server_sock,
+                                   sizeof(struct worker_request));
+        if (!error) {
+            request = *(struct worker_request *) rx.header.data;
+
+            expect_reply = request.reply_cb != NULL;
+            request.request_cb(&rx.payload, rx.fds, rx.n_fds);
+            assert(!expect_reply);
+
+            rxbuf_clear(&rx);
+        } else if (error == EOF && !rx.header.size) {
+            break;
+        } else if (error != EAGAIN) {
+            ovs_abort(error, "RPC receive failed");
+        }
+
+        poll_fd_wait(server_sock, POLLIN);
+        poll_block();
+    }
+
+    VLOG_INFO("worker process exiting");
+    exit(0);
+}
+
+static void
+rxbuf_init(struct rxbuf *rx)
+{
+    ofpbuf_init(&rx->header, 0);
+    rx->n_fds = 0;
+    ofpbuf_init(&rx->payload, 0);
+}
+
+static void
+rxbuf_clear(struct rxbuf *rx)
+{
+    ofpbuf_clear(&rx->header);
+    rx->n_fds = 0;
+    ofpbuf_clear(&rx->payload);
+}
+
+static int
+rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
+{
+    for (;;) {
+        if (!rx->header.size) {
+            int retval;
+
+            ofpbuf_clear(&rx->header);
+            ofpbuf_prealloc_tailroom(&rx->header, header_len);
+
+            retval = recv_data_and_fds(sock, rx->header.data, header_len,
+                                       rx->fds, &rx->n_fds);
+            if (retval <= 0) {
+                return retval ? -retval : EOF;
+            }
+            rx->header.size += retval;
+        } else if (rx->header.size < header_len) {
+            size_t bytes_read;
+            int error;
+
+            error = read_fully(sock, ofpbuf_tail(&rx->header),
+                               header_len - rx->header.size, &bytes_read);
+            rx->header.size += bytes_read;
+            if (error) {
+                return error;
+            }
+        } else {
+            size_t payload_len = *(size_t *) rx->header.data;
+
+            if (rx->payload.size < payload_len) {
+                size_t left = payload_len - rx->payload.size;
+                size_t bytes_read;
+                int error;
+
+                ofpbuf_prealloc_tailroom(&rx->payload, left);
+                error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
+                                   &bytes_read);
+                rx->payload.size += bytes_read;
+                if (error) {
+                    return error;
+                }
+            } else {
+                return 0;
+            }
+        }
+    }
+
+    return EAGAIN;
+}
+
+static struct iovec *
+prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
+{
+    struct iovec *dst;
+
+    dst = xmalloc((n_iovs + 1) * sizeof *dst);
+    dst[0].iov_base = data;
+    dst[0].iov_len = len;
+    memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
+
+    return dst;
+}
diff --git a/lib/worker.h b/lib/worker.h
new file mode 100644
index 0000000..135d50d
--- /dev/null
+++ b/lib/worker.h
@@ -0,0 +1,68 @@
+/* Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef WORKER_H
+#define WORKER_H 1
+
+/* Worker processes.
+ *
+ * Thes functions allow an OVS daemon to fork off a "worker process" to do
+ * tasks that may unavoidably block in the kernel.  The worker executes remote
+ * procedure calls on behalf of the main process.
+ *
+ * Tasks that may unavoidably block in the kernel include writes to regular
+ * files, sends to Generic Netlink sockets (which as of this writing use a
+ * global lock), and other unusual operations.
+ *
+ * The worker functions *will* block if the finite buffer between a main
+ * process and its worker process fills up.
+ */
+
+#include <stdbool.h>
+#include <stddef.h>
+#include "compiler.h"
+
+struct iovec;
+struct ofpbuf;
+
+/* The main process calls this function to start a worker. */
+void worker_start(void);
+
+/* Interface for main process to interact with the worker. */
+typedef void worker_request_func(struct ofpbuf *request,
+                                 const int fds[], size_t n_fds);
+typedef void worker_reply_func(struct ofpbuf *reply,
+                               const int fds[], size_t n_fds, void *aux);
+
+bool worker_is_running(void);
+void worker_run(void);
+void worker_wait(void);
+
+void worker_request(const void *data, size_t size,
+                    const int fds[], size_t n_fds,
+                    worker_request_func *request_cb,
+                    worker_reply_func *reply_cb, void *aux);
+void worker_request_iovec(const struct iovec *iovs, size_t n_iovs,
+                          const int fds[], size_t n_fds,
+                          worker_request_func *request_cb,
+                          worker_reply_func *reply_cb, void *aux);
+
+/* Interfaces for RPC implementations (running in the worker process). */
+void worker_reply(const void *data, size_t size,
+                  const int fds[], size_t n_fds);
+void worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
+                        const int fds[], size_t n_fds);
+
+#endif /* worker.h */
diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
index 8ef3b10..e9a7b07 100644
--- a/vswitchd/ovs-vswitchd.c
+++ b/vswitchd/ovs-vswitchd.c
@@ -52,6 +52,7 @@
 #include "vconn.h"
 #include "vlog.h"
 #include "lib/vswitch-idl.h"
+#include "worker.h"
 
 VLOG_DEFINE_THIS_MODULE(vswitchd);
 
@@ -81,6 +82,8 @@ main(int argc, char *argv[])
 
     daemonize_start();
 
+    worker_start();
+
     retval = unixctl_server_create(unixctl_path, &unixctl);
     if (retval) {
         exit(EXIT_FAILURE);
@@ -92,6 +95,7 @@ main(int argc, char *argv[])
 
     exiting = false;
     while (!exiting) {
+        worker_run();
         if (signal_poll(sighup)) {
             vlog_reopen_log_file();
         }
@@ -110,6 +114,7 @@ main(int argc, char *argv[])
         unixctl_server_run(unixctl);
         netdev_run();
 
+        worker_wait();
         signal_wait(sighup);
         memory_wait();
         bridge_wait();
-- 
1.7.2.5




More information about the dev mailing list