[ovs-dev] [threads 16/28] worker: Delete library.
Ben Pfaff
blp at nicira.com
Wed Jul 10 23:03:58 UTC 2013
It had no remaining users.
Signed-off-by: Ben Pfaff <blp at nicira.com>
---
Makefile.am | 2 +-
lib/automake.mk | 4 +-
lib/util.c | 3 +-
lib/worker.c | 472 -----------------------------------------------
lib/worker.h | 68 -------
vswitchd/ovs-vswitchd.c | 5 -
6 files changed, 3 insertions(+), 551 deletions(-)
delete mode 100644 lib/worker.c
delete mode 100644 lib/worker.h
diff --git a/Makefile.am b/Makefile.am
index 08aea0f..5b9e0ac 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -202,7 +202,7 @@ ALL_LOCAL += check-assert-h-usage
check-assert-h-usage:
@if test -e $(srcdir)/.git && (git --version) >/dev/null 2>&1 && \
(cd $(srcdir) && git --no-pager grep -l -E '[<]assert.h[>]') | \
- $(EGREP) -v '^lib/(sflow_receiver|vlog|worker).c$$|^tests/'; \
+ $(EGREP) -v '^lib/(sflow_receiver|vlog).c$$|^tests/'; \
then \
echo "Files listed above unexpectedly #include <""assert.h"">."; \
echo "Please use ovs_assert (from util.h) instead of assert."; \
diff --git a/lib/automake.mk b/lib/automake.mk
index 83ec520..280fc8b 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -218,9 +218,7 @@ lib_libopenvswitch_a_SOURCES = \
lib/vlog.c \
lib/vlog.h \
lib/vswitch-idl.c \
- lib/vswitch-idl.h \
- lib/worker.c \
- lib/worker.h
+ lib/vswitch-idl.h
nodist_lib_libopenvswitch_a_SOURCES = \
lib/dirs.c
diff --git a/lib/util.c b/lib/util.c
index 0ba1ed5..8e628cc 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -38,8 +38,7 @@ COVERAGE_DEFINE(util_xalloc);
/* argv[0] without directory names. */
const char *program_name;
-/* Ordinarily "" but set to "monitor" for a monitor process or "worker" for a
- * worker process. */
+/* Ordinarily "" but set to "monitor" for a monitor process. */
const char *subprogram_name = "";
/* --version option output. */
diff --git a/lib/worker.c b/lib/worker.c
deleted file mode 100644
index 6904fdd..0000000
--- a/lib/worker.c
+++ /dev/null
@@ -1,472 +0,0 @@
-/* Copyright (c) 2012, 2013 Nicira, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <config.h>
-
-#include "worker.h"
-
-#include <assert.h>
-#include <errno.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <sys/wait.h>
-#include <unistd.h>
-
-#include "command-line.h"
-#include "daemon.h"
-#include "ofpbuf.h"
-#include "poll-loop.h"
-#include "socket-util.h"
-#include "util.h"
-#include "vlog.h"
-
-VLOG_DEFINE_THIS_MODULE(worker);
-
-/* ovs_assert() logs the assertion message and logging sometimes goes through a
- * worker, so using ovs_assert() in this source file could cause recursion. */
-#undef ovs_assert
-#define ovs_assert use_assert_instead_of_ovs_assert_in_this_module
-
-/* Header for an RPC request. */
-struct worker_request {
- size_t request_len; /* Length of the payload in bytes. */
- worker_request_func *request_cb; /* Function to call in worker process. */
- worker_reply_func *reply_cb; /* Function to call in main process. */
- void *reply_aux; /* Auxiliary data for 'reply_cb'. */
-};
-
-/* Header for an RPC reply. */
-struct worker_reply {
- size_t reply_len; /* Length of the payload in bytes. */
- worker_reply_func *reply_cb; /* Function to call in main process. */
- void *reply_aux; /* Auxiliary data for 'reply_cb'. */
-};
-
-/* Receive buffer for a RPC request or reply. */
-struct rxbuf {
- /* Header. */
- struct ofpbuf header; /* Header data. */
- int fds[SOUTIL_MAX_FDS]; /* File descriptors. */
- size_t n_fds;
-
- /* Payload. */
- struct ofpbuf payload; /* Payload data. */
-};
-
-static int client_sock = -1;
-static struct rxbuf client_rx;
-
-static void rxbuf_init(struct rxbuf *);
-static void rxbuf_clear(struct rxbuf *);
-static int rxbuf_run(struct rxbuf *, int sock, size_t header_len);
-
-static struct iovec *prefix_iov(void *data, size_t len,
- const struct iovec *iovs, size_t n_iovs);
-
-static void worker_broke(void);
-
-static void worker_main(int fd) NO_RETURN;
-
-/* Starts a worker process as a subprocess of the current process. Currently
- * only a single worker process is supported, so this function may only be
- * called once.
- *
- * The client should call worker_run() and worker_wait() from its main loop.
- *
- * Call this function between daemonize_start() and daemonize_complete(). */
-void
-worker_start(void)
-{
- int work_fds[2];
-
- assert(client_sock < 0);
-
- /* Create non-blocking socket pair. */
- xsocketpair(AF_UNIX, SOCK_STREAM, 0, work_fds);
- xset_nonblocking(work_fds[0]);
- xset_nonblocking(work_fds[1]);
-
- /* Don't let the worker process own the responsibility to delete
- * the pidfile. Register it again after the fork. */
- remove_pidfile_from_unlink();
- if (!fork_and_clean_up()) {
- /* In child (worker) process. */
- daemonize_post_detach();
- close(work_fds[0]);
- worker_main(work_fds[1]);
- NOT_REACHED();
- }
-
- /* In parent (main) process. */
- add_pidfile_to_unlink();
- close(work_fds[1]);
- client_sock = work_fds[0];
- rxbuf_init(&client_rx);
-}
-
-/* Returns true if this process has started a worker and the worker is not
- * known to have malfunctioned. */
-bool
-worker_is_running(void)
-{
- return client_sock >= 0;
-}
-
-/* If a worker process was started, processes RPC replies from it, calling the
- * registered 'reply_cb' callbacks.
- *
- * If the worker process died or malfunctioned, aborts. */
-void
-worker_run(void)
-{
- if (worker_is_running()) {
- int error;
-
- error = rxbuf_run(&client_rx, client_sock,
- sizeof(struct worker_reply));
- if (!error) {
- struct worker_reply *reply = client_rx.header.data;
- reply->reply_cb(&client_rx.payload, client_rx.fds,
- client_rx.n_fds, reply->reply_aux);
- rxbuf_clear(&client_rx);
- } else if (error != EAGAIN) {
- worker_broke();
- VLOG_ABORT("receive from worker failed (%s)",
- ovs_retval_to_string(error));
- }
- }
-}
-
-/* Causes the poll loop to wake up if we need to process RPC replies. */
-void
-worker_wait(void)
-{
- if (worker_is_running()) {
- poll_fd_wait(client_sock, POLLIN);
- }
-}
-
-/* Interface for main process to interact with the worker. */
-
-/* Sends an RPC request to the worker process. The worker process will call
- * 'request_cb' passing the 'size' (zero or more) bytes of data in 'data' as
- * arguments as well as the 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors
- * in 'fds'.
- *
- * If and only if 'reply_cb' is nonnull, 'request_cb' must call worker_reply()
- * or worker_reply_iovec() with a reply. The main process will later call
- * 'reply_cb' with the reply data (if any) and file descriptors (if any).
- *
- * 'request_cb' receives copies (as if by dup()) of the file descriptors in
- * fds[]. 'request_cb' takes ownership of these copies, and the caller of
- * worker_request() retains its ownership of the originals.
- *
- * This function may block until the RPC request has been sent (if the socket
- * buffer fills up) but it does not wait for the reply (if any). If this
- * function blocks, it may invoke reply callbacks for previous requests.
- *
- * The worker process executes RPC requests in strict order of submission and
- * runs each request to completion before beginning the next request. The main
- * process invokes reply callbacks in strict order of request submission. */
-void
-worker_request(const void *data, size_t size,
- const int fds[], size_t n_fds,
- worker_request_func *request_cb,
- worker_reply_func *reply_cb, void *aux)
-{
- if (size > 0) {
- struct iovec iov;
-
- iov.iov_base = (void *) data;
- iov.iov_len = size;
- worker_request_iovec(&iov, 1, fds, n_fds, request_cb, reply_cb, aux);
- } else {
- worker_request_iovec(NULL, 0, fds, n_fds, request_cb, reply_cb, aux);
- }
-}
-
-static int
-worker_send_iovec(const struct iovec iovs[], size_t n_iovs,
- const int fds[], size_t n_fds)
-{
- size_t sent = 0;
-
- for (;;) {
- struct pollfd pfd;
- int error;
-
- /* Try to send the rest of the request. */
- error = send_iovec_and_fds_fully(client_sock, iovs, n_iovs,
- fds, n_fds, sent, &sent);
- if (error != EAGAIN) {
- return error;
- }
-
- /* Process replies to avoid deadlock. */
- worker_run();
-
- /* Wait for 'client_sock' to become ready before trying again. We
- * can't use poll_block() because it sometimes calls into vlog, which
- * calls indirectly into worker_send_iovec(). To be usable here,
- * poll_block() would therefore need to be reentrant, but it isn't
- * (calling it recursively causes memory corruption and an eventual
- * crash). */
- pfd.fd = client_sock;
- pfd.events = POLLIN | POLLOUT;
- do {
- error = poll(&pfd, 1, -1) < 0 ? errno : 0;
- } while (error == EINTR);
- if (error) {
- worker_broke();
- VLOG_ABORT("poll failed (%s)", ovs_strerror(error));
- }
- }
-}
-
-/* Same as worker_request() except that the data to send is specified as an
- * array of iovecs. */
-void
-worker_request_iovec(const struct iovec iovs[], size_t n_iovs,
- const int fds[], size_t n_fds,
- worker_request_func *request_cb,
- worker_reply_func *reply_cb, void *aux)
-{
- static bool recursing = false;
- struct worker_request rq;
- struct iovec *all_iovs;
- int error;
-
- assert(worker_is_running());
- assert(!recursing);
- recursing = true;
-
- rq.request_len = iovec_len(iovs, n_iovs);
- rq.request_cb = request_cb;
- rq.reply_cb = reply_cb;
- rq.reply_aux = aux;
-
- all_iovs = prefix_iov(&rq, sizeof rq, iovs, n_iovs);
- error = worker_send_iovec(all_iovs, n_iovs + 1, fds, n_fds);
- if (error) {
- worker_broke();
- VLOG_ABORT("send failed (%s)", ovs_strerror(error));
- }
- free(all_iovs);
-
- recursing = false;
-}
-
-/* Closes the client socket, if any, so that worker_is_running() will return
- * false.
- *
- * The client does this just before aborting if the worker process dies or
- * malfunctions, to prevent the logging subsystem from trying to use the
- * worker to log the failure. */
-static void
-worker_broke(void)
-{
- if (client_sock >= 0) {
- close(client_sock);
- client_sock = -1;
- }
-}
-
-/* Interfaces for RPC implementations (running in the worker process). */
-
-static int server_sock = -1;
-static bool expect_reply;
-static struct worker_request request;
-
-/* When a call to worker_request() or worker_request_iovec() provides a
- * 'reply_cb' callback, the 'request_cb' implementation must call this function
- * to send its reply. The main process will call 'reply_cb' passing the
- * 'size' (zero or more) bytes of data in 'data' as arguments as well as the
- * 'n_fds' (SOUTIL_MAX_FDS or fewer) file descriptors in 'fds'.
- *
- * If a call to worker_request() or worker_request_iovec() provides no
- * 'reply_cb' callback, the 'request_cb' implementation must not call this
- * function.
- *
- * 'reply_cb' receives copies (as if by dup()) of the file descriptors in
- * fds[]. 'reply_cb' takes ownership of these copies, and the caller of
- * worker_reply() retains its ownership of the originals.
- *
- * This function blocks until the RPC reply has been sent (if the socket buffer
- * fills up) but it does not wait for the main process to receive or to process
- * the reply. */
-void
-worker_reply(const void *data, size_t size, const int fds[], size_t n_fds)
-{
- if (size > 0) {
- struct iovec iov;
-
- iov.iov_base = (void *) data;
- iov.iov_len = size;
- worker_reply_iovec(&iov, 1, fds, n_fds);
- } else {
- worker_reply_iovec(NULL, 0, fds, n_fds);
- }
-}
-
-/* Same as worker_reply() except that the data to send is specified as an array
- * of iovecs. */
-void
-worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
- const int fds[], size_t n_fds)
-{
- struct worker_reply reply;
- struct iovec *all_iovs;
- int error;
-
- assert(expect_reply);
- expect_reply = false;
-
- reply.reply_len = iovec_len(iovs, n_iovs);
- reply.reply_cb = request.reply_cb;
- reply.reply_aux = request.reply_aux;
-
- all_iovs = prefix_iov(&reply, sizeof reply, iovs, n_iovs);
-
- error = send_iovec_and_fds_fully_block(server_sock, all_iovs, n_iovs + 1,
- fds, n_fds);
- if (error == EPIPE) {
- /* Parent probably died. Continue processing any RPCs still buffered,
- * to avoid missing log messages. */
- VLOG_INFO("send failed (%s)", ovs_strerror(error));
- } else if (error) {
- VLOG_FATAL("send failed (%s)", ovs_strerror(error));
- }
-
- free(all_iovs);
-}
-
-static void
-worker_main(int fd)
-{
- struct rxbuf rx;
-
- server_sock = fd;
-
- subprogram_name = "worker";
- proctitle_set("worker process for pid %lu", (unsigned long int) getppid());
- VLOG_INFO("worker process started");
-
- rxbuf_init(&rx);
- for (;;) {
- int error;
-
- error = rxbuf_run(&rx, server_sock, sizeof(struct worker_request));
- if (!error) {
- request = *(struct worker_request *) rx.header.data;
-
- expect_reply = request.reply_cb != NULL;
- request.request_cb(&rx.payload, rx.fds, rx.n_fds);
- assert(!expect_reply);
-
- rxbuf_clear(&rx);
- } else if (error == EOF && !rx.header.size) {
- /* Main process closed the IPC socket. Exit cleanly. */
- break;
- } else if (error != EAGAIN) {
- VLOG_FATAL("RPC receive failed (%s)", ovs_retval_to_string(error));
- }
-
- poll_fd_wait(server_sock, POLLIN);
- poll_block();
- }
-
- VLOG_INFO("worker process exiting");
- exit(0);
-}
-
-static void
-rxbuf_init(struct rxbuf *rx)
-{
- ofpbuf_init(&rx->header, 0);
- rx->n_fds = 0;
- ofpbuf_init(&rx->payload, 0);
-}
-
-static void
-rxbuf_clear(struct rxbuf *rx)
-{
- ofpbuf_clear(&rx->header);
- rx->n_fds = 0;
- ofpbuf_clear(&rx->payload);
-}
-
-static int
-rxbuf_run(struct rxbuf *rx, int sock, size_t header_len)
-{
- for (;;) {
- if (!rx->header.size) {
- int retval;
-
- ofpbuf_clear(&rx->header);
- ofpbuf_prealloc_tailroom(&rx->header, header_len);
-
- retval = recv_data_and_fds(sock, rx->header.data, header_len,
- rx->fds, &rx->n_fds);
- if (retval <= 0) {
- return retval ? -retval : EOF;
- }
- rx->header.size += retval;
- } else if (rx->header.size < header_len) {
- size_t bytes_read;
- int error;
-
- error = read_fully(sock, ofpbuf_tail(&rx->header),
- header_len - rx->header.size, &bytes_read);
- rx->header.size += bytes_read;
- if (error) {
- return error;
- }
- } else {
- size_t payload_len = *(size_t *) rx->header.data;
-
- if (rx->payload.size < payload_len) {
- size_t left = payload_len - rx->payload.size;
- size_t bytes_read;
- int error;
-
- ofpbuf_prealloc_tailroom(&rx->payload, left);
- error = read_fully(sock, ofpbuf_tail(&rx->payload), left,
- &bytes_read);
- rx->payload.size += bytes_read;
- if (error) {
- return error;
- }
- } else {
- return 0;
- }
- }
- }
-}
-
-static struct iovec *
-prefix_iov(void *data, size_t len, const struct iovec *iovs, size_t n_iovs)
-{
- struct iovec *dst;
-
- dst = xmalloc((n_iovs + 1) * sizeof *dst);
- dst[0].iov_base = data;
- dst[0].iov_len = len;
- memcpy(dst + 1, iovs, n_iovs * sizeof *iovs);
-
- return dst;
-}
diff --git a/lib/worker.h b/lib/worker.h
deleted file mode 100644
index 135d50d..0000000
--- a/lib/worker.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/* Copyright (c) 2012 Nicira, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at:
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef WORKER_H
-#define WORKER_H 1
-
-/* Worker processes.
- *
- * Thes functions allow an OVS daemon to fork off a "worker process" to do
- * tasks that may unavoidably block in the kernel. The worker executes remote
- * procedure calls on behalf of the main process.
- *
- * Tasks that may unavoidably block in the kernel include writes to regular
- * files, sends to Generic Netlink sockets (which as of this writing use a
- * global lock), and other unusual operations.
- *
- * The worker functions *will* block if the finite buffer between a main
- * process and its worker process fills up.
- */
-
-#include <stdbool.h>
-#include <stddef.h>
-#include "compiler.h"
-
-struct iovec;
-struct ofpbuf;
-
-/* The main process calls this function to start a worker. */
-void worker_start(void);
-
-/* Interface for main process to interact with the worker. */
-typedef void worker_request_func(struct ofpbuf *request,
- const int fds[], size_t n_fds);
-typedef void worker_reply_func(struct ofpbuf *reply,
- const int fds[], size_t n_fds, void *aux);
-
-bool worker_is_running(void);
-void worker_run(void);
-void worker_wait(void);
-
-void worker_request(const void *data, size_t size,
- const int fds[], size_t n_fds,
- worker_request_func *request_cb,
- worker_reply_func *reply_cb, void *aux);
-void worker_request_iovec(const struct iovec *iovs, size_t n_iovs,
- const int fds[], size_t n_fds,
- worker_request_func *request_cb,
- worker_reply_func *reply_cb, void *aux);
-
-/* Interfaces for RPC implementations (running in the worker process). */
-void worker_reply(const void *data, size_t size,
- const int fds[], size_t n_fds);
-void worker_reply_iovec(const struct iovec *iovs, size_t n_iovs,
- const int fds[], size_t n_fds);
-
-#endif /* worker.h */
diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
index d216a48..bc45dac 100644
--- a/vswitchd/ovs-vswitchd.c
+++ b/vswitchd/ovs-vswitchd.c
@@ -49,7 +49,6 @@
#include "vconn.h"
#include "vlog.h"
#include "lib/vswitch-idl.h"
-#include "worker.h"
VLOG_DEFINE_THIS_MODULE(vswitchd);
@@ -92,8 +91,6 @@ main(int argc, char *argv[])
#endif
}
- worker_start();
-
retval = unixctl_server_create(unixctl_path, &unixctl);
if (retval) {
exit(EXIT_FAILURE);
@@ -105,7 +102,6 @@ main(int argc, char *argv[])
exiting = false;
while (!exiting) {
- worker_run();
if (signal_poll(sighup)) {
vlog_reopen_log_file();
}
@@ -124,7 +120,6 @@ main(int argc, char *argv[])
unixctl_server_run(unixctl);
netdev_run();
- worker_wait();
signal_wait(sighup);
memory_wait();
bridge_wait();
--
1.7.2.5
More information about the dev
mailing list