[PATCH] Threaded userspace datapath

Giuseppe Lettieri g.lettieri at iet.unipi.it
Fri Sep 28 10:05:18 UTC 2012


This patch refactors the userlevel datapath (i.e., datapath_type=netdev)
into two threads: one only forwards the packets for which a flow is found,
and the other does all other processing. This arrangement can speed-up
forwarding performance, measuread in pps, by a factor of 5-10.

To enable compilation of the threaded datapath, pass '--enable-threaded'
to configure.

Signed-off-by: Gaetano Catalli <gaetano.catalli at gmai.com>
Signed-off-by: Ed Maste <emaste at adaranet.com>
Signed-off-by: Giuseppe Lettieri <g.lettieri at iet.unipi.it>
---
 configure.ac          |    1 +
 lib/automake.mk       |    1 +
 lib/dispatch.h        |    9 +
 lib/dpif-netdev.c     |  473 ++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/fatal-signal.c    |    2 +-
 lib/netdev-bsd.c      |   91 ++++++++++
 lib/netdev-dummy.c    |  123 ++++++++++++-
 lib/netdev-linux.c    |   44 +++++
 lib/netdev-provider.h |   19 ++
 lib/netdev-vport.c    |    8 +
 lib/netdev.c          |   22 +++
 lib/netdev.h          |    7 +
 lib/vlog.c            |   16 ++
 m4/openvswitch.m4     |   19 ++
 14 files changed, 827 insertions(+), 8 deletions(-)
 create mode 100644 lib/dispatch.h

diff --git a/configure.ac b/configure.ac
index 9bdffea..a70232f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -43,6 +43,7 @@ AC_SEARCH_LIBS([clock_gettime], [rt])
 AC_SEARCH_LIBS([timer_create], [rt])
 AC_SEARCH_LIBS([pcap_open_live], [pcap])
 
+OVS_CHECK_THREADED
 OVS_CHECK_COVERAGE
 OVS_CHECK_NDEBUG
 OVS_CHECK_NETLINK
diff --git a/lib/automake.mk b/lib/automake.mk
index 94b86f6..3479ec0 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -37,6 +37,7 @@ lib_libopenvswitch_a_SOURCES = \
 	lib/daemon.c \
 	lib/daemon.h \
 	lib/dhcp.h \
+	lib/dispatch.h \
 	lib/dummy.c \
 	lib/dummy.h \
 	lib/dhparams.h \
diff --git a/lib/dispatch.h b/lib/dispatch.h
new file mode 100644
index 0000000..80ac9c7
--- /dev/null
+++ b/lib/dispatch.h
@@ -0,0 +1,9 @@
+#include <sys/types.h>
+#include "ofpbuf.h"
+
+#ifndef DISPATCH_H
+#define DISPATCH_H 1
+
+typedef void (*pkt_handler)(u_char *user, struct ofpbuf* buf);
+
+#endif /* DISPATCH_H */
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index dc4479e..e7013db 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -32,6 +32,15 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#ifdef THREADED
+#include <signal.h>
+#include <pthread.h>
+
+#include "socket-util.h"
+#include "fatal-signal.h"
+#include "dispatch.h"
+#endif
+
 #include "csum.h"
 #include "dpif.h"
 #include "dpif-provider.h"
@@ -55,6 +64,18 @@
 #include "vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
+/* Pthread lock macros, nops in the non-threaded case. */
+#ifdef THREADED
+#define INIT_MUTEX(mutex) pthread_mutex_init(mutex, NULL)
+#define DESTROY_MUTEX(mutex) pthread_mutex_destroy(mutex)
+#define LOCK(mutex) pthread_mutex_lock(mutex)
+#define UNLOCK(mutex) pthread_mutex_unlock(mutex)
+#else
+#define INIT_MUTEX(mutex)
+#define DESTROY_MUTEX(mutex)
+#define LOCK(mutex)
+#define UNLOCK(mutex)
+#endif
 
 /* Configuration parameters. */
 enum { MAX_PORTS = 256 };       /* Maximum number of ports. */
@@ -80,6 +101,49 @@ struct dp_netdev_queue {
     unsigned int head, tail;
 };
 
+#ifdef THREADED
+struct dp_netdev_notifier {
+    int pipe[2];
+};
+
+static int dp_netdev_notifier_init(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_poll(struct dp_netdev_notifier *dn, struct pollfd *pfd);
+static int dp_netdev_notifier_notify(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_ack(struct dp_netdev_notifier *);
+static int dp_netdev_notifier_ack1(struct dp_netdev_notifier *);
+#else
+struct dp_netdev_notifier {
+    /* nothing */
+};
+
+static int dp_netdev_notifier_init(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+    return 0;
+}
+/* unused
+static int dp_netdev_notifier_poll(struct dp_netdev_notifier *dn OVS_UNUSED,
+    struct pollfd *pfd OVS_UNUSED)
+{
+    return 0;
+}
+*/
+static int dp_netdev_notifier_notify(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+    return 0;
+}
+/*
+static int dp_netdev_notifier_ack(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+    return 0;
+}
+*/
+static int dp_netdev_notifier_ack1(struct dp_netdev_notifier *dn OVS_UNUSED)
+{
+    return 0;
+}
+#endif
+
+
 /* Datapath based on the network device interface from netdev.h. */
 struct dp_netdev {
     const struct dpif_class *class;
@@ -87,6 +151,16 @@ struct dp_netdev {
     int open_cnt;
     bool destroyed;
 
+    struct dp_netdev_notifier packet_notifier;    /* signal a packet on the queue */
+    struct dp_netdev_notifier notifier;
+#ifdef THREADED
+    struct pollfd *notifier_fd;
+
+    pthread_mutex_t table_mutex;    /* mutex for the flow table */
+    pthread_mutex_t port_list_mutex;    /* port list mutex */
+
+    /* The access to this queue is protected by the table_mutex mutex */
+#endif
     struct dp_netdev_queue queues[N_QUEUES];
     struct hmap flow_table;     /* Flow table. */
 
@@ -107,6 +181,9 @@ struct dp_netdev_port {
     struct list node;           /* Element in dp_netdev's 'port_list'. */
     struct netdev *netdev;
     char *type;                 /* Port type as requested by user. */
+#ifdef THREADED
+    struct pollfd *poll_fd;     /* To manage the poll loop in the thread. */
+#endif
 };
 
 /* A flow in dp_netdev's 'flow_table'. */
@@ -132,6 +209,11 @@ struct dpif_netdev {
     unsigned int dp_serial;
 };
 
+#ifdef THREADED
+/* XXX global Descriptor of the thread that manages the datapaths. */
+pthread_t thread_p;
+#endif
+
 /* All netdev-based datapaths. */
 static struct shash dp_netdevs = SHASH_INITIALIZER(&dp_netdevs);
 
@@ -209,6 +291,13 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->class = class;
     dp->name = xstrdup(name);
     dp->open_cnt = 0;
+    dp_netdev_notifier_init(&dp->packet_notifier);
+    dp_netdev_notifier_init(&dp->notifier);
+    INIT_MUTEX(&dp->table_mutex);
+    INIT_MUTEX(&dp->port_list_mutex);
+#ifdef THREADED
+    dp->notifier_fd = NULL;
+#endif
     for (i = 0; i < N_QUEUES; i++) {
         dp->queues[i].head = dp->queues[i].tail = 0;
     }
@@ -226,6 +315,124 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     return 0;
 }
 
+#ifdef THREADED
+
+static int
+dp_netdev_notifier_init(struct dp_netdev_notifier *dn)
+{
+    int error = pipe(dn->pipe);
+    if (error) {
+        VLOG_ERR("Unable to create notifier: %s", strerror(errno));
+        return errno;
+    }
+    if (set_nonblocking(dn->pipe[0]) || set_nonblocking(dn->pipe[1])) {
+        VLOG_ERR("Unable to set nonblocking on notifier pipe: %s",
+                 strerror(errno));
+        return errno;
+    }
+    VLOG_DBG("Notifier pipes created (%d, %d)", dn->pipe[0], dn->pipe[1]);
+    return 0;
+}
+
+static int
+dp_netdev_notifier_poll(struct dp_netdev_notifier *dn, struct pollfd *pfd)
+{
+    pfd->fd = dn->pipe[0];
+    pfd->events = POLLIN;
+    return 1;
+}
+
+static int
+dp_netdev_notifier_ack(struct dp_netdev_notifier *dn)
+{
+    int error;
+    char readbuf[1024];
+
+    while ((error = read(dn->pipe[0], readbuf, sizeof(readbuf))) > 0)
+            ;
+    if (error < 0 && errno != EAGAIN) {
+        VLOG_ERR("Pipe read error: %s", strerror(errno));
+        return error;
+    }
+    return 0;
+}
+
+static int
+dp_netdev_notifier_ack1(struct dp_netdev_notifier *dn)
+{
+    int error;
+    char c;
+
+    error = read(dn->pipe[0], &c, 1);
+    if (error < 0 && errno != EAGAIN) {
+        VLOG_ERR("Pipe read error: %s", strerror(errno));
+        return error;
+    }
+    return 0;
+}
+
+
+static int
+dp_netdev_notifier_notify(struct dp_netdev_notifier *dn)
+{
+    char c = 0;
+
+    if (write(dn->pipe[1], &c, 1) < 0) {
+        VLOG_ERR("Pipe write error (to datapath): %s", strerror(errno));
+        return errno;
+    }
+    return 0;
+}
+
+static void * dp_thread_body(void *args OVS_UNUSED);
+
+/* This is the function that is called in response of a fatal signal (e.g.
+ * SIGTERM) */
+static void
+dpif_netdev_exit_hook(void *aux OVS_UNUSED)
+{
+    if (pthread_cancel(thread_p) == 0) {
+        /*
+         * POSIX specifies that poll is a thread cancellation point, but it
+         * appears that (at least on FreeBSD) we can wait indefinitely in the
+         * poll() in dp_thread_body.  As a workaround force a notify to exit
+         * the poll().
+         */
+        struct shash_node *node;
+        struct dp_netdev *dp;
+        SHASH_FOR_EACH(node, &dp_netdevs) {
+            dp = (struct dp_netdev *)node->data;
+            dp_netdev_notifier_notify(&dp->notifier);
+        }
+        pthread_join(thread_p, NULL);
+    }
+}
+
+static int
+dpif_netdev_init(void)
+{
+    static int error = -1;
+
+    if (error < 0) {
+        fatal_signal_add_hook(dpif_netdev_exit_hook, NULL, NULL, true);
+        error = pthread_create(&thread_p, NULL, dp_thread_body, NULL);
+        if (error != 0) {
+            VLOG_ERR("Unable to create datapath thread: %s", strerror(errno));
+            error = errno;
+        } else {
+            VLOG_DBG("Datapath thread started");
+        }
+    }
+    return error;
+}
+#else
+static int
+dpif_netdev_init(void)
+{
+    return 0;
+}
+#endif
+
 static int
 dpif_netdev_open(const struct dpif_class *class, const char *name,
                  bool create, struct dpif **dpifp)
@@ -252,9 +459,12 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
     }
 
     *dpifp = create_dpif_netdev(dp);
+    dpif_netdev_init(); /* XXX check error */
     return 0;
 }
 
+/* table_mutex must be locked in THREADED mode.
+ */
 static void
 dp_netdev_purge_queues(struct dp_netdev *dp)
 {
@@ -276,11 +486,17 @@ dp_netdev_free(struct dp_netdev *dp)
     struct dp_netdev_port *port, *next;
 
     dp_netdev_flow_flush(dp);
+    LOCK(&dp->port_list_mutex);
     LIST_FOR_EACH_SAFE (port, next, node, &dp->port_list) {
         do_del_port(dp, port->port_no);
     }
+    UNLOCK(&dp->port_list_mutex);
+    LOCK(&dp->table_mutex);
     dp_netdev_purge_queues(dp);
     hmap_destroy(&dp->flow_table);
+    UNLOCK(&dp->table_mutex);
+    DESTROY_MUTEX(&dp->table_mutex);
+    DESTROY_MUTEX(&dp->port_list_mutex);
     free(dp->name);
     free(dp);
 }
@@ -309,7 +525,9 @@ static int
 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    LOCK(&dp->table_mutex);
     stats->n_flows = hmap_count(&dp->flow_table);
+    UNLOCK(&dp->table_mutex);
     stats->n_hit = dp->n_hit;
     stats->n_missed = dp->n_missed;
     stats->n_lost = dp->n_lost;
@@ -357,13 +575,18 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     port->port_no = port_no;
     port->netdev = netdev;
     port->type = xstrdup(type);
+#ifdef THREADED
+    port->poll_fd = NULL;
+#endif
 
     error = netdev_get_mtu(netdev, &mtu);
     if (!error) {
         max_mtu = mtu;
     }
 
+    LOCK(&dp->port_list_mutex);
     list_push_back(&dp->port_list, &port->node);
+    UNLOCK(&dp->port_list_mutex);
     dp->ports[port_no] = port;
     dp->serial++;
 
@@ -432,7 +655,16 @@ static int
 dpif_netdev_port_del(struct dpif *dpif, uint16_t port_no)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    return port_no == OVSP_LOCAL ? EINVAL : do_del_port(dp, port_no);
+    int error;
+
+    if (port_no == OVSP_LOCAL) {
+        return EINVAL;
+    } else {
+        LOCK(&dp->port_list_mutex);
+        error = do_del_port(dp, port_no);
+        UNLOCK(&dp->port_list_mutex);
+    }
+    return error;
 }
 
 static bool
@@ -460,15 +692,19 @@ get_port_by_name(struct dp_netdev *dp,
 {
     struct dp_netdev_port *port;
 
+    LOCK(&dp->port_list_mutex);
     LIST_FOR_EACH (port, node, &dp->port_list) {
         if (!strcmp(netdev_get_name(port->netdev), devname)) {
             *portp = port;
+            UNLOCK(&dp->port_list_mutex);
             return 0;
         }
     }
+    UNLOCK(&dp->port_list_mutex);
     return ENOENT;
 }
 
+/* In THREADED mode, must be called with port_list_mutex held. */
 static int
 do_del_port(struct dp_netdev *dp, uint16_t port_no)
 {
@@ -543,7 +779,9 @@ dpif_netdev_get_max_ports(const struct dpif *dpif OVS_UNUSED)
 static void
 dp_netdev_free_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
 {
+    LOCK(&dp->table_mutex);
     hmap_remove(&dp->flow_table, &flow->node);
+    UNLOCK(&dp->table_mutex);
     free(flow->actions);
     free(flow);
 }
@@ -632,7 +870,11 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_)
 }
 
 static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
+#ifdef THREADED
+dp_netdev_lookup_flow_locked(struct dp_netdev *dp, const struct flow *key)
+#else
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+#endif
 {
     struct dp_netdev_flow *flow;
 
@@ -644,6 +886,19 @@ dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct flow *key)
     return NULL;
 }
 
+#ifdef THREADED
+static struct dp_netdev_flow *
+dp_netdev_lookup_flow(struct dp_netdev *dp, const struct flow *key)
+{
+    struct dp_netdev_flow *flow;
+
+    LOCK(&dp->table_mutex);
+    flow = dp_netdev_lookup_flow_locked(dp, key);
+    UNLOCK(&dp->table_mutex);
+    return flow;
+}
+#endif
+
 static void
 get_dpif_flow_stats(struct dp_netdev_flow *flow, struct dpif_flow_stats *stats)
 {
@@ -740,7 +995,9 @@ dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *key,
         return error;
     }
 
+    LOCK(&dp->table_mutex);
     hmap_insert(&dp->flow_table, &flow->node, flow_hash(&flow->key, 0));
+    UNLOCK(&dp->table_mutex);
     return 0;
 }
 
@@ -760,6 +1017,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
     struct dp_netdev_flow *flow;
     struct flow key;
     int error;
+    int n_flows;
 
     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &key);
     if (error) {
@@ -769,7 +1027,10 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
     flow = dp_netdev_lookup_flow(dp, &key);
     if (!flow) {
         if (put->flags & DPIF_FP_CREATE) {
-            if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
+            LOCK(&dp->table_mutex);
+            n_flows = hmap_count(&dp->flow_table);
+            UNLOCK(&dp->table_mutex);
+            if (n_flows < MAX_FLOWS) {
                 if (put->stats) {
                     memset(put->stats, 0, sizeof *put->stats);
                 }
@@ -855,7 +1116,9 @@ dpif_netdev_flow_dump_next(const struct dpif *dpif, void *state_,
     struct dp_netdev_flow *flow;
     struct hmap_node *node;
 
+    LOCK(&dp->table_mutex);
     node = hmap_at_position(&dp->flow_table, &state->bucket, &state->offset);
+    UNLOCK(&dp->table_mutex);
     if (!node) {
         return EOF;
     }
@@ -961,7 +1224,10 @@ static int
 dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
                  struct ofpbuf *buf)
 {
-    struct dp_netdev_queue *q = find_nonempty_queue(dpif);
+    struct dp_netdev_queue *q;
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    LOCK(&dp->table_mutex);
+    q = find_nonempty_queue(dpif);
     if (q) {
         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
 
@@ -971,8 +1237,11 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
         ofpbuf_uninit(buf);
         *buf = u->buf;
 
+        dp_netdev_notifier_ack1(&dp->packet_notifier);
+        UNLOCK(&dp->table_mutex);
         return 0;
     } else {
+        UNLOCK(&dp->table_mutex);
         return EAGAIN;
     }
 }
@@ -980,19 +1249,31 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
+#ifdef THREADED
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct pollfd pfd;
+
+    if (dp_netdev_notifier_poll(&dp->packet_notifier, &pfd)) {
+        poll_fd_wait(pfd.fd, pfd.events);
+    }
+#else
     if (find_nonempty_queue(dpif)) {
         poll_immediate_wake();
     } else {
         /* No messages ready to be received, and dp_wait() will ensure that we
          * wake up to queue new messages, so there is nothing to do. */
     }
+#endif
 }
 
 static void
 dpif_netdev_recv_purge(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+    LOCK(&dp->table_mutex);
     dp_netdev_purge_queues(dpif_netdev->dp);
+    UNLOCK(&dp->table_mutex);
 }
 
 static void
@@ -1010,23 +1291,64 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
 {
     struct dp_netdev_flow *flow;
     struct flow key;
+    struct nlattr *actions;
+    size_t actions_len;
+#ifdef THREADED
+    uint8_t actions_buf[128];
+#endif
 
     if (packet->size < ETH_HEADER_LEN) {
         return;
     }
     flow_extract(packet, 0, 0, odp_port_to_ofp_port(port->port_no), &key);
+#ifdef THREADED
+    LOCK(&dp->table_mutex);
+    flow = dp_netdev_lookup_flow_locked(dp, &key);
+#else
     flow = dp_netdev_lookup_flow(dp, &key);
+#endif
     if (flow) {
         dp_netdev_flow_used(flow, packet);
-        dp_netdev_execute_actions(dp, packet, &key,
-                                  flow->actions, flow->actions_len);
+        actions_len = flow->actions_len;
+#ifdef THREADED
+        if (actions_len <= sizeof(actions_buf)) {
+            actions = (struct nlattr*)actions_buf;
+        } else {
+            actions = xmalloc(actions_len);
+        }
+        memcpy(actions, flow->actions, actions_len);
+#else
+        actions = flow->actions;
+#endif
+        UNLOCK(&dp->table_mutex);
+        dp_netdev_execute_actions(dp, packet, &key, actions, actions_len);
+#ifdef THREADED
+        if (actions_len > sizeof(actions_buf)) {
+            free(actions);
+        }
+#endif
         dp->n_hit++;
     } else {
         dp->n_missed++;
         dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, 0);
+        UNLOCK(&dp->table_mutex);
     }
 }
 
+#ifdef THREADED
+static void
+dpif_netdev_run(struct dpif *dpif)
+{
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    dp_netdev_notifier_notify(&dp->notifier);
+}
+
+static void
+dpif_netdev_wait(struct dpif *dpif OVS_UNUSED)
+{
+}
+#else
 static void
 dpif_netdev_run(struct dpif *dpif)
 {
@@ -1065,6 +1387,138 @@ dpif_netdev_wait(struct dpif *dpif)
         netdev_recv_wait(port->netdev);
     }
 }
+#endif
+
+#ifdef THREADED
+/*
+ * pcap callback argument
+ */
+struct dispatch_arg {
+    struct dp_netdev *dp;   /* update statistics */
+    struct dp_netdev_port *port;    /* argument to flow identifier function */
+};
+
+/* Process a packet.
+ *
+ * The port_input function will send immediately if it finds a flow match and
+ * the associated action is ODPAT_OUTPUT or ODPAT_OUTPUT_GROUP.
+ * If a flow is not found or for the other actions, the packet is copied.
+ */
+static void
+process_pkt(u_char *user, struct ofpbuf *buf)
+{
+    struct dispatch_arg *arg = (struct dispatch_arg *)user;
+
+    ofpbuf_padto(buf, ETH_TOTAL_MIN);
+    dp_netdev_port_input(arg->dp, arg->port, buf);
+}
+
+/* Body of the thread that manages the datapaths */
+static void*
+dp_thread_body(void *args OVS_UNUSED)
+{
+    struct dp_netdev *dp;
+    struct dp_netdev_port *port;
+    struct dispatch_arg arg;
+    int error;
+    int n_fds;
+    uint32_t batch = 50; /* max number of pkts processed by the dispatch */
+    int processed;     /* actual number of pkts processed by the dispatch */
+
+    sigset_t sigmask;
+
+    /*XXX Since the poll involves all ports of all datapaths, the right fds
+     * size should be MAX_PORTS * max_number_of_datapaths */
+    struct pollfd fds[MAX_PORTS + 1];
+
+    /* mask the fatal signals. In this way the main thread is delegate to
+     * manage this them. */
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGALRM);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGHUP);
+
+    if (pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0) {
+        VLOG_ERR("Error setting thread sigmask: %s", strerror(errno));
+    }
+
+    for(;;) {
+        struct shash_node *node;
+        n_fds = 0;
+        /* build the structure for poll */
+        SHASH_FOR_EACH(node, &dp_netdevs) {
+            dp = (struct dp_netdev *)node->data;
+            if (dp_netdev_notifier_poll(&dp->notifier, &fds[n_fds])) {
+                dp->notifier_fd = &fds[n_fds];
+                n_fds++;
+            }
+            if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+                VLOG_ERR("Too many fds for poll adding notifier");
+                break;
+            }
+            LOCK(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, node, &dp->port_list) {
+                /* insert an element in the fds structure */
+                fds[n_fds].fd = netdev_get_fd(port->netdev);
+                fds[n_fds].events = POLLIN;
+                port->poll_fd = &fds[n_fds];
+                n_fds++;
+                if (n_fds >= sizeof(fds) / sizeof(fds[0])) {
+                    VLOG_ERR("Too many fds for poll adding port fd");
+                    break;
+                }
+            }
+            UNLOCK(&dp->port_list_mutex);
+        }
+
+        error = poll(fds, n_fds, -1);
+
+        if (error < 0) {
+            if (errno == EINTR) {
+                /* XXX get this case in detach mode */
+                continue;
+            }
+            VLOG_ERR("Datapath thread poll() error: %s\n", strerror(errno));
+            /* XXX terminating the thread is probably not right */
+            break;
+        }
+        pthread_testcancel();
+
+        SHASH_FOR_EACH (node, &dp_netdevs) {
+            dp = (struct dp_netdev *)node->data;
+            if (dp->notifier_fd && (dp->notifier_fd->revents & POLLIN)) {
+                VLOG_DBG("Signalled from main thread");
+                dp_netdev_notifier_ack(&dp->notifier);
+            }
+            arg.dp = dp;
+            LOCK(&dp->port_list_mutex);
+            LIST_FOR_EACH (port, node, &dp->port_list) {
+                arg.port = port;
+                if (port->poll_fd) {
+                    VLOG_DBG("fd %d revents 0x%x", port->poll_fd->fd, port->poll_fd->revents);
+                }
+                if (port->poll_fd && (port->poll_fd->revents & POLLIN)) {
+                    /* call the dispatch and process the packet into
+                     * its callback. We process 'batch' packets at time */
+                    processed = netdev_dispatch(port->netdev, batch,
+                                         process_pkt, (u_char *)&arg);
+                    if (processed < 0) { /* pcap returns error */
+                        static struct vlog_rate_limit rl =
+                            VLOG_RATE_LIMIT_INIT(1, 5);
+                        VLOG_ERR_RL(&rl,
+                                "error receiving data from XXX \n");
+                    }
+                } /* end of if poll */
+            } /* end of port loop */
+            UNLOCK(&dp->port_list_mutex);
+        } /* end of dp loop */
+    } /* for ;; */
+
+    return NULL;
+}
+
+#endif /* THREADED */
 
 static void
 dp_netdev_set_dl(struct ofpbuf *packet, const struct ovs_key_ethernet *eth_key)
@@ -1080,11 +1534,14 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
                       uint16_t out_port)
 {
     struct dp_netdev_port *p = dp->ports[out_port];
+
     if (p) {
         netdev_send(p->netdev, packet);
+        dp_netdev_notifier_notify(&dp->notifier);
     }
 }
 
+/* In THREADED mode, must be called with table_lock_mutex held. */
 static int
 dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
                          int queue_no, const struct flow *flow, uint64_t arg)
@@ -1117,6 +1574,8 @@ dp_netdev_output_userspace(struct dp_netdev *dp, const struct ofpbuf *packet,
     upcall->key_len = key_len;
     upcall->userdata = arg;
 
+    dp_netdev_notifier_notify(&dp->packet_notifier);
+
     return 0;
 }
 
@@ -1164,7 +1623,9 @@ dp_netdev_action_userspace(struct dp_netdev *dp,
 
     userdata_attr = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
     userdata = userdata_attr ? nl_attr_get_u64(userdata_attr) : 0;
+    LOCK(&dp->table_mutex);
     dp_netdev_output_userspace(dp, packet, DPIF_UC_ACTION, key, userdata);
+    UNLOCK(&dp->table_mutex);
 }
 
 static void
diff --git a/lib/fatal-signal.c b/lib/fatal-signal.c
index 21ebb5a..add0f80 100644
--- a/lib/fatal-signal.c
+++ b/lib/fatal-signal.c
@@ -192,7 +192,7 @@ call_hooks(int sig_nr)
         recurse = 1;
 
         for (i = 0; i < n_hooks; i++) {
-            struct hook *h = &hooks[i];
+            struct hook *h = &hooks[n_hooks - i - 1];
             if (sig_nr || h->run_at_exit) {
                 h->hook_cb(h->aux);
             }
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index f8b1188..e62e220 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -667,6 +667,89 @@ netdev_bsd_recv_wait(struct netdev *netdev_)
     }
 }
 
+#ifdef THREADED
+
+struct dispatch_arg {
+    pkt_handler h;
+    u_char *user;
+};
+
+static void
+dispatch_handler(u_char *user, const struct pcap_pkthdr *phdr, const u_char *pdata)
+{
+    struct ofpbuf buf;
+    struct dispatch_arg *parg = (struct dispatch_arg*)user;
+
+    ofpbuf_use_stub(&buf, (void*)pdata, phdr->caplen);
+    buf.size = phdr->caplen;
+    (*parg->h)(parg->user, &buf);
+    ofpbuf_uninit(&buf);
+}
+
+static int
+netdev_bsd_dispatch_system(struct netdev_bsd *netdev, int batch, pkt_handler h,
+                           u_char *user)
+{
+    int ret;
+    struct dispatch_arg arg;
+
+    arg.h = h;
+    arg.user = user;
+    ret = pcap_dispatch(netdev->pcap_handle, batch, dispatch_handler, (u_char*)&arg);
+    return ret;
+}
+
+static int
+netdev_bsd_dispatch_tap(struct netdev_bsd *netdev, int batch, pkt_handler h,
+                        u_char *user)
+{
+    int ret;
+    int i;
+    const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+    OFPBUF_STACK_BUFFER(buf_, size);
+
+    struct ofpbuf buf;
+    ofpbuf_use_stub(&buf, buf_, size);
+    for (i = 0; i < batch; i++) {
+        ret = netdev_bsd_recv_tap(netdev, buf.data, ofpbuf_tailroom(&buf));
+        if (ret >= 0) {
+            buf.size += ret;
+            h(user, &buf);
+        } else if (ret != -EAGAIN) {
+            return -1;
+        } else { /* ret = EAGAIN */
+            break;
+        }
+        ofpbuf_clear(&buf);
+    }
+    ofpbuf_uninit(&buf);
+    return i;
+}
+
+static int
+netdev_bsd_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+                    u_char *user)
+{
+    struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+    struct netdev_dev_bsd * netdev_dev =
+        netdev_dev_bsd_cast(netdev_get_dev(netdev_));
+
+    if (!strcmp(netdev_get_type(netdev_), "tap") &&
+            netdev->netdev_fd == netdev_dev->tap_fd) {
+        return netdev_bsd_dispatch_tap(netdev, batch, h, user);
+    } else {
+        return netdev_bsd_dispatch_system(netdev, batch, h, user);
+    }
+}
+
+static int
+netdev_bsd_get_fd(struct netdev *netdev_)
+{
+    struct netdev_bsd *netdev = netdev_bsd_cast(netdev_);
+    return netdev->netdev_fd;
+}
+#endif
+
 /* Discards all packets waiting to be received from 'netdev'. */
 static int
 netdev_bsd_drain(struct netdev *netdev_)
@@ -1263,6 +1346,10 @@ const struct netdev_class netdev_bsd_class = {
 
     netdev_bsd_recv,
     netdev_bsd_recv_wait,
+#ifdef THREADED
+    netdev_bsd_dispatch,
+    netdev_bsd_get_fd,
+#endif
     netdev_bsd_drain,
 
     netdev_bsd_send,
@@ -1323,6 +1410,10 @@ const struct netdev_class netdev_tap_class = {
 
     netdev_bsd_recv,
     netdev_bsd_recv_wait,
+#ifdef THREADED
+    netdev_bsd_dispatch,
+    netdev_bsd_get_fd,
+#endif
     netdev_bsd_drain,
 
     netdev_bsd_send,
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 6aa4084..e0d4448 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -20,6 +20,12 @@
 
 #include <errno.h>
 
+#ifdef THREADED
+#include <pthread.h>
+#include <unistd.h>
+#include "socket-util.h"
+#endif
+
 #include "flow.h"
 #include "list.h"
 #include "netdev-provider.h"
@@ -51,6 +57,10 @@ struct netdev_dummy {
     struct list node;           /* In netdev_dev_dummy's "devs" list. */
     struct list recv_queue;
     bool listening;
+#ifdef THREADED
+    pthread_mutex_t queue_mutex;
+    int s_pipe[2];              /* used to signal packet arrivals */
+#endif
 };
 
 static struct shash dummy_netdev_devs = SHASH_INITIALIZER(&dummy_netdev_devs);
@@ -146,6 +156,13 @@ netdev_dummy_close(struct netdev *netdev_)
     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
     list_remove(&netdev->node);
     ofpbuf_list_delete(&netdev->recv_queue);
+#ifdef THREADED
+    if (netdev->listening) {
+        close(netdev->s_pipe[0]);
+        close(netdev->s_pipe[1]);
+        pthread_mutex_destroy(&netdev->queue_mutex);
+    }
+#endif
     free(netdev);
 }
 
@@ -153,6 +170,27 @@ static int
 netdev_dummy_listen(struct netdev *netdev_)
 {
     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+#ifdef THREADED
+    int error;
+
+    if (netdev->listening)
+        return 0;
+
+    error = pipe(netdev->s_pipe);
+    if (error) {
+        VLOG_ERR("Unable to create dummy pipe: %s", strerror(errno));
+        return errno;
+    }
+    if (set_nonblocking(netdev->s_pipe[0]) ||
+        set_nonblocking(netdev->s_pipe[1])) {
+        VLOG_ERR("Unable to set nonblocking on dummy pipe: %s",
+                 strerror(errno));
+        close(netdev->s_pipe[0]);
+        close(netdev->s_pipe[1]);
+        return errno;
+    }
+    pthread_mutex_init(&netdev->queue_mutex, NULL);
+#endif
     netdev->listening = true;
     return 0;
 }
@@ -163,12 +201,29 @@ netdev_dummy_recv(struct netdev *netdev_, void *buffer, size_t size)
     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
     struct ofpbuf *packet;
     size_t packet_size;
+#ifdef THREADED
+    char c;
+#endif
 
+#ifdef THREADED
+    pthread_mutex_lock(&netdev->queue_mutex);
+#endif
     if (list_is_empty(&netdev->recv_queue)) {
+#ifdef THREADED
+        pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
         return -EAGAIN;
     }
+#ifdef THREADED
+    if (read(netdev->s_pipe[0], &c, 1) < 0) {
+        VLOG_ERR("Error reading dummy pipe: %s", strerror(errno));
+    }
+#endif
 
     packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+#ifdef THREADED
+    pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
     if (packet->size > size) {
         return -EMSGSIZE;
     }
@@ -184,11 +239,60 @@ static void
 netdev_dummy_recv_wait(struct netdev *netdev_)
 {
     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
-    if (!list_is_empty(&netdev->recv_queue)) {
+    int empty;
+
+#ifdef THREADED
+    pthread_mutex_lock(&netdev->queue_mutex);
+#endif
+    empty = list_is_empty(&netdev->recv_queue);
+#ifdef THREADED
+    pthread_mutex_unlock(&netdev->queue_mutex);
+#endif
+    if (!empty) {
         poll_immediate_wake();
     }
 }
 
+#ifdef THREADED
+static int
+netdev_dummy_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+                      u_char *user)
+{
+    int i;
+    struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+    struct ofpbuf *packet;
+    VLOG_DBG("dispatch %d", batch);
+
+    for (i = 0; i < batch; i++) {
+        char c;
+        if (read(netdev->s_pipe[0], &c, 1) < 0) {
+            if (errno == EAGAIN)
+                break;
+            VLOG_ERR("%s: error reading from the pipe: %s",
+                netdev_get_name(netdev_), strerror(errno));
+            return -1;
+        }
+        pthread_mutex_lock(&netdev->queue_mutex);
+        if (list_is_empty(&netdev->recv_queue)) {
+            pthread_mutex_unlock(&netdev->queue_mutex);
+            return -EAGAIN;
+        }
+        packet = ofpbuf_from_list(list_pop_front(&netdev->recv_queue));
+        pthread_mutex_unlock(&netdev->queue_mutex);
+        h(user, packet);
+        ofpbuf_delete(packet);
+    }
+    return i;
+}
+
+static int
+netdev_dummy_get_fd(struct netdev *netdev_)
+{
+    struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
+    return netdev->s_pipe[0];
+}
+#endif
+
 static int
 netdev_dummy_drain(struct netdev *netdev_)
 {
@@ -326,6 +430,10 @@ static const struct netdev_class dummy_class = {
     netdev_dummy_listen,
     netdev_dummy_recv,
     netdev_dummy_recv_wait,
+#ifdef THREADED
+    netdev_dummy_dispatch,      /* dispatch */
+    netdev_dummy_get_fd,        /* get_fd */
+#endif
     netdev_dummy_drain,
 
     NULL,                       /* send */
@@ -417,6 +525,9 @@ netdev_dummy_receive(struct unixctl_conn *conn,
     struct netdev_dev_dummy *dummy_dev;
     int n_listeners;
     int i;
+#ifdef THREADED
+    char c = 0;
+#endif
 
     dummy_dev = shash_find_data(&dummy_netdev_devs, argv[1]);
     if (!dummy_dev) {
@@ -424,6 +535,7 @@ netdev_dummy_receive(struct unixctl_conn *conn,
         return;
     }
 
+
     n_listeners = 0;
     for (i = 2; i < argc; i++) {
         struct netdev_dummy *dev;
@@ -439,7 +551,16 @@ netdev_dummy_receive(struct unixctl_conn *conn,
         LIST_FOR_EACH (dev, node, &dummy_dev->devs) {
             if (dev->listening) {
                 struct ofpbuf *copy = ofpbuf_clone(packet);
+#ifdef THREADED
+                pthread_mutex_lock(&dev->queue_mutex);
+#endif
                 list_push_back(&dev->recv_queue, &copy->list_node);
+#ifdef THREADED
+                pthread_mutex_unlock(&dev->queue_mutex);
+                if (write(dev->s_pipe[1], &c, 1) < 0) {
+                    VLOG_ERR("Error writing dummy pipe: %s", strerror(errno));
+                }
+#endif
                 n_listeners++;
             }
         }
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 412a92d..7d25ce5 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -893,6 +893,43 @@ netdev_linux_recv_wait(struct netdev *netdev_)
     }
 }
 
+#ifdef THREADED
+static int
+netdev_linux_dispatch(struct netdev *netdev_, int batch, pkt_handler h,
+                      u_char *user)
+{
+    int ret;
+    int i;
+    const size_t size = VLAN_HEADER_LEN + ETH_HEADER_LEN + ETH_PAYLOAD_MAX;
+    OFPBUF_STACK_BUFFER(buf_, size);
+    struct ofpbuf buf;
+    VLOG_DBG("dispatch %d", batch);
+
+    ofpbuf_use_stub(&buf, buf_, size);
+    for (i = 0; i < batch; i++) {
+        ret = netdev_linux_recv(netdev_, buf.data, ofpbuf_tailroom(&buf));
+        if (ret >= 0) {
+            buf.size += ret;
+            h(user, &buf);
+        } else if (ret != -EAGAIN) {
+            return -1;
+        } else {
+            break;
+        }
+        ofpbuf_clear(&buf);
+    }
+    ofpbuf_uninit(&buf);
+    return i;
+}
+
+static int
+netdev_linux_get_fd(struct netdev *netdev_)
+{
+    struct netdev_linux *netdev = netdev_linux_cast(netdev_);
+    return netdev->fd;
+}
+#endif
+
 /* Discards all packets waiting to be received from 'netdev'. */
 static int
 netdev_linux_drain(struct netdev *netdev_)
@@ -2383,6 +2420,12 @@ netdev_linux_change_seq(const struct netdev *netdev)
     return netdev_dev_linux_cast(netdev_get_dev(netdev))->change_seq;
 }
 
+#ifdef THREADED
+#	define THREADED_METHODS netdev_linux_dispatch, netdev_linux_get_fd,
+#else
+#	define THREADED_METHODS
+#endif
+
 #define NETDEV_LINUX_CLASS(NAME, CREATE, GET_STATS, SET_STATS,  \
                            GET_FEATURES, GET_STATUS)            \
 {                                                               \
@@ -2403,6 +2446,7 @@ netdev_linux_change_seq(const struct netdev *netdev)
     netdev_linux_listen,                                        \
     netdev_linux_recv,                                          \
     netdev_linux_recv_wait,                                     \
+    THREADED_METHODS						\
     netdev_linux_drain,                                         \
                                                                 \
     netdev_linux_send,                                          \
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 94f60af..f0cdedf 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -25,6 +25,9 @@
 #include "list.h"
 #include "shash.h"
 #include "smap.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
 
 #ifdef  __cplusplus
 extern "C" {
@@ -191,6 +194,22 @@ struct netdev_class {
      * implement packet reception through the 'recv' member function. */
     void (*recv_wait)(struct netdev *netdev);
 
+#ifdef THREADED
+    /* Attempts to receive 'batch' packets from 'netdev' and process them
+     * through the 'handler' callback. This function is used in the 'THREADED'
+     * version in order to optimize the forwarding process, since it permits to
+     * process packets directly in the netdev memory.
+     *
+     * Returns the number of packets processed on success; this can be 0 if no
+     * packets are available to be read. Returns -1 if an error occurred.
+     */
+    int (*dispatch)(struct netdev *netdev, int batch, pkt_handler handler,
+                    u_char *user);
+
+    /* Return the file descriptor of the device */
+    int (*get_fd)(struct netdev *netdev);
+#endif
+
     /* Discards all packets waiting to be received from 'netdev'.
      *
      * May be null if not needed, such as for a network device that does not
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index d5c288f..7fcbebc 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -899,6 +899,13 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
     return 0;
 }
 
+
+#ifdef THREADED
+#	define THREADED_NULL NULL, NULL,
+#else
+#	define THREADED_NULL
+#endif
+
 #define VPORT_FUNCTIONS(GET_STATUS)                         \
     NULL,                                                   \
     netdev_vport_run,                                       \
@@ -915,6 +922,7 @@ unparse_patch_config(const char *name OVS_UNUSED, const char *type OVS_UNUSED,
     NULL,                       /* listen */                \
     NULL,                       /* recv */                  \
     NULL,                       /* recv_wait */             \
+    THREADED_NULL					    \
     NULL,                       /* drain */                 \
                                                             \
     netdev_vport_send,          /* send */                  \
diff --git a/lib/netdev.c b/lib/netdev.c
index 394d895..3359b4c 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -424,6 +424,28 @@ netdev_recv_wait(struct netdev *netdev)
     }
 }
 
+#ifdef THREADED
+/* Attempts to receive and process 'batch' packets from 'netdev'. */
+int
+netdev_dispatch(struct netdev *netdev, int batch, pkt_handler h, u_char *user)
+{
+   int (*dispatch)(struct netdev*, int, pkt_handler, u_char *);
+
+    dispatch = netdev_get_dev(netdev)->netdev_class->dispatch;
+    return dispatch ? dispatch(netdev, batch, h, user) : 0;
+}
+
+/* Returns the file descriptor */
+int
+netdev_get_fd(struct netdev *netdev)
+{
+    int (*get_fd)(struct netdev *);
+
+    get_fd = netdev_get_dev(netdev)->netdev_class->get_fd;
+    return get_fd ? get_fd(netdev) : 0;
+}
+#endif
+
 /* Discards all packets waiting to be received from 'netdev'. */
 int
 netdev_drain(struct netdev *netdev)
diff --git a/lib/netdev.h b/lib/netdev.h
index d2cc8b5..f55a286 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -21,6 +21,9 @@
 #include <stddef.h>
 #include <stdint.h>
 #include "openvswitch/types.h"
+#ifdef THREADED
+#include "dispatch.h"
+#endif
 
 #ifdef  __cplusplus
 extern "C" {
@@ -107,6 +110,10 @@ int netdev_get_ifindex(const struct netdev *);
 int netdev_listen(struct netdev *);
 int netdev_recv(struct netdev *, struct ofpbuf *);
 void netdev_recv_wait(struct netdev *);
+#ifdef THREADED
+int netdev_dispatch(struct netdev *, int, pkt_handler, u_char *);
+int netdev_get_fd(struct netdev *);
+#endif
 int netdev_drain(struct netdev *);
 
 int netdev_send(struct netdev *, const struct ofpbuf *);
diff --git a/lib/vlog.c b/lib/vlog.c
index 0bd9bd1..affd09e 100644
--- a/lib/vlog.c
+++ b/lib/vlog.c
@@ -37,6 +37,9 @@
 #include "unixctl.h"
 #include "util.h"
 #include "worker.h"
+#ifdef THREADED
+#include <pthread.h>
+#endif
 
 VLOG_DEFINE_THIS_MODULE(vlog);
 
@@ -92,6 +95,10 @@ static int log_fd = -1;
 /* vlog initialized? */
 static bool vlog_inited;
 
+#ifdef THREADED
+static pthread_mutex_t vlog_mutex;
+#endif
+
 static void format_log_message(const struct vlog_module *, enum vlog_level,
                                enum vlog_facility, unsigned int msg_num,
                                const char *message, va_list, struct ds *)
@@ -492,6 +499,9 @@ vlog_init(void)
         return;
     }
     vlog_inited = true;
+#ifdef THREADED
+    pthread_mutex_init(&vlog_mutex, NULL);
+#endif
 
     /* openlog() is allowed to keep the pointer passed in, without making a
      * copy.  The daemonize code sometimes frees and replaces 'program_name',
@@ -707,6 +717,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
 
         ds_init(&s);
         ds_reserve(&s, 1024);
+#ifdef THREADED
+        pthread_mutex_lock(&vlog_mutex);
+#endif
         msg_num++;
 
         if (log_to_console) {
@@ -736,6 +749,9 @@ vlog_valist(const struct vlog_module *module, enum vlog_level level,
             vlog_write_file(&s);
         }
 
+#ifdef THREADED
+        pthread_mutex_unlock(&vlog_mutex);
+#endif
         ds_destroy(&s);
         errno = save_errno;
     }
diff --git a/m4/openvswitch.m4 b/m4/openvswitch.m4
index 939f296..f4cad2c 100644
--- a/m4/openvswitch.m4
+++ b/m4/openvswitch.m4
@@ -14,6 +14,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+dnl Check for --enable-threaded and updates CFLAGS.
+AC_DEFUN([OVS_CHECK_THREADED],
+  [AC_REQUIRE([AC_PROG_CC])
+   AC_ARG_ENABLE(
+     [threaded],
+     [AC_HELP_STRING([--enable-threaded],
+                     [Enable threaded version of userspace implementation])],
+     [case "${enableval}" in
+        (yes) threaded=true ;;
+        (no)  threaded=false ;;
+        (*) AC_MSG_ERROR([bad value ${enableval} for --enable-threaded]) ;;
+      esac],
+     [threaded=false])
+   if $threaded; then
+      AC_DEFINE([THREADED], [1],
+                [Define to 1 if the threaded version of userspace
+                implementation is enabled.])
+   fi])
+
 dnl Checks for --enable-coverage and updates CFLAGS and LDFLAGS appropriately.
 AC_DEFUN([OVS_CHECK_COVERAGE],
   [AC_REQUIRE([AC_PROG_CC])
-- 
1.7.9.5


--------------020206010503040608000104--


More information about the dev mailing list