[ovs-dev] [PATCH] ofproto-dpif-upcall: Remove the dispatcher thread.

Alex Wang alexw at nicira.com
Thu Feb 6 07:01:01 UTC 2014


This commit removes the 'dispatcher' thread by allowing 'handler'
threads to read upcalls directly from dpif.  vport in dpif will
open netlink sockets for each handler and will use the 5-tuple
hash from the missed packet to choose which socket (handler) to
send the upcall.

This patch also significantly simplifies the flow miss handling
code and brings slight improvement to flow setup rate.

Signed-off-by: Alex Wang <alexw at nicira.com>

---
RFC->PATCH
- use XOR to calculate the 5-tuple hash.  this fixes the flow setup
  performance variation issue.
- replace the malloc of 'struct upcall *'  in udpif_upcall_handler()
  by local 'struct upcall' array.
---
 datapath/datapath.c           |   22 +-
 datapath/vport.c              |  128 +++++++++++-
 datapath/vport.h              |   25 ++-
 include/linux/openvswitch.h   |    9 +-
 lib/dpif-linux.c              |  458 +++++++++++++++++++++++------------------
 lib/dpif-linux.h              |    3 +-
 lib/dpif-netdev.c             |    9 +-
 lib/dpif-provider.h           |   26 ++-
 lib/dpif.c                    |   38 ++--
 lib/dpif.h                    |   10 +-
 lib/flow.c                    |   18 ++
 lib/flow.h                    |    3 +-
 ofproto/ofproto-dpif-upcall.c |  265 ++++++------------------
 ofproto/ofproto-dpif-xlate.c  |    5 +-
 ofproto/ofproto-dpif.c        |    8 +-
 15 files changed, 560 insertions(+), 467 deletions(-)

diff --git a/datapath/datapath.c b/datapath/datapath.c
index 5f1b34c..3e08e02 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -242,7 +242,7 @@ void ovs_dp_process_received_packet(struct vport *p, struct sk_buff *skb)
 		upcall.cmd = OVS_PACKET_CMD_MISS;
 		upcall.key = &key;
 		upcall.userdata = NULL;
-		upcall.portid = p->upcall_portid;
+		upcall.portid = ovs_vport_find_pid(p, &key);
 		ovs_dp_upcall(dp, skb, &upcall);
 		consume_skb(skb);
 		stats_counter = &stats->n_missed;
@@ -1239,7 +1239,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	parms.options = NULL;
 	parms.dp = dp;
 	parms.port_no = OVSP_LOCAL;
-	parms.upcall_portid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);
+	parms.upcall_pids = a[OVS_DP_ATTR_UPCALL_PID];
 
 	ovs_dp_change(dp, a);
 
@@ -1457,7 +1457,7 @@ static const struct nla_policy vport_policy[OVS_VPORT_ATTR_MAX + 1] = {
 	[OVS_VPORT_ATTR_STATS] = { .len = sizeof(struct ovs_vport_stats) },
 	[OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },
 	[OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },
-	[OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },
+	[OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NLA_UNSPEC },
 	[OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },
 };
 
@@ -1492,8 +1492,7 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, struct sk_buff *skb,
 
 	if (nla_put_u32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no) ||
 	    nla_put_u32(skb, OVS_VPORT_ATTR_TYPE, vport->ops->type) ||
-	    nla_put_string(skb, OVS_VPORT_ATTR_NAME, vport->ops->get_name(vport)) ||
-	    nla_put_u32(skb, OVS_VPORT_ATTR_UPCALL_PID, vport->upcall_portid))
+	    nla_put_string(skb, OVS_VPORT_ATTR_NAME, vport->ops->get_name(vport)))
 		goto nla_put_failure;
 
 	ovs_vport_get_stats(vport, &vport_stats);
@@ -1501,6 +1500,9 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, struct sk_buff *skb,
 		    &vport_stats))
 		goto nla_put_failure;
 
+	if (ovs_vport_get_upcall_pids(vport, skb))
+		goto nla_put_failure;
+
 	err = ovs_vport_get_options(vport, skb);
 	if (err == -EMSGSIZE)
 		goto error;
@@ -1577,8 +1579,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	int err;
 
 	err = -EINVAL;
-	if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE] ||
-	    !a[OVS_VPORT_ATTR_UPCALL_PID])
+	if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE])
 		goto exit;
 
 	ovs_lock();
@@ -1615,7 +1616,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	parms.options = a[OVS_VPORT_ATTR_OPTIONS];
 	parms.dp = dp;
 	parms.port_no = port_no;
-	parms.upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
+	parms.upcall_pids = a[OVS_VPORT_ATTR_UPCALL_PIDS];
 
 	vport = new_vport(&parms);
 	err = PTR_ERR(vport);
@@ -1676,8 +1677,9 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, struct genl_info *info)
 	if (a[OVS_VPORT_ATTR_STATS])
 		ovs_vport_set_stats(vport, nla_data(a[OVS_VPORT_ATTR_STATS]));
 
-	if (a[OVS_VPORT_ATTR_UPCALL_PID])
-		vport->upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
+	err = ovs_vport_set_upcall_pids(vport, a[OVS_VPORT_ATTR_UPCALL_PIDS]);
+	if (err)
+		goto exit_free;
 
 	err = ovs_vport_cmd_fill_info(vport, reply, info->snd_portid,
 				      info->snd_seq, 0, OVS_VPORT_CMD_NEW);
diff --git a/datapath/vport.c b/datapath/vport.c
index 7f12acc..3e83ba7 100644
--- a/datapath/vport.c
+++ b/datapath/vport.c
@@ -135,10 +135,12 @@ struct vport *ovs_vport_alloc(int priv_size, const struct vport_ops *ops,
 
 	vport->dp = parms->dp;
 	vport->port_no = parms->port_no;
-	vport->upcall_portid = parms->upcall_portid;
 	vport->ops = ops;
 	INIT_HLIST_NODE(&vport->dp_hash_node);
 
+	if (ovs_vport_set_upcall_pids(vport, parms->upcall_pids))
+		return ERR_PTR(-EINVAL);
+
 	vport->percpu_stats = alloc_percpu(struct pcpu_tstats);
 	if (!vport->percpu_stats) {
 		kfree(vport);
@@ -162,6 +164,7 @@ struct vport *ovs_vport_alloc(int priv_size, const struct vport_ops *ops,
  */
 void ovs_vport_free(struct vport *vport)
 {
+	ovs_vport_set_upcall_pids(vport, NULL);
 	free_percpu(vport->percpu_stats);
 	kfree(vport);
 }
@@ -348,6 +351,129 @@ int ovs_vport_get_options(const struct vport *vport, struct sk_buff *skb)
 	return 0;
 }
 
+static void __vport_pids_destroy(struct vport_pids *pids)
+{
+	if (pids->pids)
+		kfree(pids->pids);
+
+	kfree(pids);
+}
+
+static void vport_pids_destroy_rcu_cb(struct rcu_head *rcu)
+{
+	struct vport_pids *pids = container_of(rcu, struct vport_pids, rcu);
+
+	__vport_pids_destroy(pids);
+}
+
+/**
+ *	ovs_vport_set_upcall_pids - set upcall pids for sending upcall.
+ *
+ * @vport: vport to modify.
+ * @pids: new configuration.
+ *
+ * If the pids is non-null, sets the vport's upcall_pids pointer.  If the
+ * pids is null, frees the vport's upcall_pids.
+ *
+ * Returns 0 if successful, -EINVAL if @pids cannot be parsed as an array
+ * of U32.
+ */
+int ovs_vport_set_upcall_pids(struct vport *vport,  struct nlattr *pids)
+{
+	struct vport_pids *old;
+
+	if (pids && nla_len(pids) % sizeof(u32))
+		return -EINVAL;
+
+	rcu_read_lock();
+	old = vport->upcall_pids ? ovsl_dereference(vport->upcall_pids)
+		: NULL;
+
+	if (pids) {
+		struct vport_pids *vport_pids;
+
+		vport_pids = kmalloc(sizeof *vport_pids, GFP_KERNEL);
+		vport_pids->pids = kmalloc(nla_len(pids), GFP_KERNEL);
+		vport_pids->n_pids = nla_len(pids)
+			/ (sizeof *vport_pids->pids);
+		memcpy(vport_pids->pids, nla_data(pids), nla_len(pids));
+
+		rcu_assign_pointer(vport->upcall_pids, vport_pids);
+	} else if (old) {
+		rcu_assign_pointer(vport->upcall_pids, NULL);
+	}
+
+	if (old)
+		call_rcu(&old->rcu, vport_pids_destroy_rcu_cb);
+
+	rcu_read_unlock();
+	return 0;
+}
+
+/**
+ *	ovs_vport_get_options - get the upcall_pids value.
+ *
+ * @vport: vport from which to retrieve the pids.
+ * @skb: sk_buff where pids should be appended.
+ *
+ * Retrieves the configuration of the given vport, appending the
+ * %OVS_VPORT_ATTR_UPCALL_PIDS attribute which is the array of upcall
+ * pids to @skb.
+ *
+ * Returns 0 if successful, -EMSGSIZE if @skb has insufficient room.
+ * If an error occurs, @skb is left unmodified.
+ */
+int ovs_vport_get_upcall_pids(const struct vport *vport, struct sk_buff *skb)
+{
+	struct vport_pids *pids;
+	int err = 0;
+
+	rcu_read_lock();
+	pids = ovsl_dereference(vport->upcall_pids);
+
+	if (!pids)
+		goto exit;
+
+	if (nla_put(skb, OVS_VPORT_ATTR_UPCALL_PIDS,
+		    pids->n_pids * sizeof *pids->pids,
+		    (void *) pids->pids)) {
+		err = -EMSGSIZE;
+		goto exit;
+	}
+
+exit:
+	rcu_read_unlock();
+	return err;
+}
+
+/**
+ *	ovs_vport_find_pid - find the upcall pid to send upcall.
+ *
+ * @vport: vport from which the missed packet is received.
+ * @key: flow keys.
+ *
+ * Calculates the 5-tuple hash from the flow key and finds the upcall pid to
+ * send the upcall to.
+ *
+ * Returns the pid of the target socket.  Must be called with rcu_read_lock.
+ */
+u32 ovs_vport_find_pid(const struct vport *p, const struct sw_flow_key *key)
+{
+	struct vport_pids *pids;
+	u32 hash;
+
+	pids = ovsl_dereference(p->upcall_pids);
+
+	if (!pids)
+		return 0;
+
+	hash = key->ipv4.addr.src ^ key->ipv4.addr.dst
+		^ key->ip.proto ^ key->ipv4.tp.src
+		^ key->ipv4.tp.dst;
+
+	return pids->pids[jhash((void *) &hash, 4, 0) % pids->n_pids];
+}
+
 /**
  *	ovs_vport_receive - pass up received packet to the datapath for processing
  *
diff --git a/datapath/vport.h b/datapath/vport.h
index 18b723e..f11faa9 100644
--- a/datapath/vport.h
+++ b/datapath/vport.h
@@ -50,6 +50,11 @@ void ovs_vport_get_stats(struct vport *, struct ovs_vport_stats *);
 int ovs_vport_set_options(struct vport *, struct nlattr *options);
 int ovs_vport_get_options(const struct vport *, struct sk_buff *);
 
+int ovs_vport_set_upcall_pids(struct vport *, struct nlattr *pids);
+int ovs_vport_get_upcall_pids(const struct vport *, struct sk_buff *);
+
+u32 ovs_vport_find_pid(const struct vport *, const struct sw_flow_key *);
+
 int ovs_vport_send(struct vport *, struct sk_buff *);
 
 /* The following definitions are for implementers of vport devices: */
@@ -60,13 +65,25 @@ struct vport_err_stats {
 	u64 tx_dropped;
 	u64 tx_errors;
 };
+/**
+ * struct vport_pids - array of netlink pids for a vport.
+ *                     must be protected by rcu.
+ * @rcu: RCU callback head for deferred destruction.
+ * @n_pids: Size of @upcall_pids array.
+ * @pids: Array storing the Netlink socket pids to use for packets received
+ * on this port that miss the flow table.
+ */
+struct vport_pids {
+	struct rcu_head rcu;
+	u32 n_pids;
+	u32 *pids;
+};
 
 /**
  * struct vport - one port within a datapath
  * @rcu: RCU callback head for deferred destruction.
  * @dp: Datapath to which this port belongs.
- * @upcall_portid: The Netlink port to use for packets received on this port that
- * miss the flow table.
+ * @upcall_pids: RCU protected vport_pids array.
  * @port_no: Index into @dp's @ports array.
  * @hash_node: Element in @dev_table hash table in vport.c.
  * @dp_hash_node: Element in @datapath->ports hash table in datapath.c.
@@ -80,7 +97,7 @@ struct vport_err_stats {
 struct vport {
 	struct rcu_head rcu;
 	struct datapath	*dp;
-	u32 upcall_portid;
+	struct vport_pids __rcu *upcall_pids;
 	u16 port_no;
 
 	struct hlist_node hash_node;
@@ -112,7 +129,7 @@ struct vport_parms {
 	/* For ovs_vport_alloc(). */
 	struct datapath *dp;
 	u16 port_no;
-	u32 upcall_portid;
+	struct nlattr *upcall_pids;
 };
 
 /**
diff --git a/include/linux/openvswitch.h b/include/linux/openvswitch.h
index 5137c2f..e1fe92d 100644
--- a/include/linux/openvswitch.h
+++ b/include/linux/openvswitch.h
@@ -225,9 +225,9 @@ enum ovs_vport_type {
  * this is the name of the network device.  Maximum length %IFNAMSIZ-1 bytes
  * plus a null terminator.
  * @OVS_VPORT_ATTR_OPTIONS: Vport-specific configuration information.
- * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that
- * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
- * this port.  A value of zero indicates that upcalls should not be sent.
+ * @OVS_VPORT_ATTR_UPCALL_PIDS: The array of Netlink socket pids in userspace
+ * that OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
+ * this port.  If this is not specified, upcalls should not be sent.
  * @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for
  * packets sent or received through the vport.
  *
@@ -251,7 +251,8 @@ enum ovs_vport_attr {
 	OVS_VPORT_ATTR_TYPE,	/* u32 OVS_VPORT_TYPE_* constant. */
 	OVS_VPORT_ATTR_NAME,	/* string name, up to IFNAMSIZ bytes long */
 	OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type */
-	OVS_VPORT_ATTR_UPCALL_PID, /* u32 Netlink PID to receive upcalls */
+	OVS_VPORT_ATTR_UPCALL_PIDS, /* array of u32 Netlink socket PIDs for */
+				/* receiving upcalls */
 	OVS_VPORT_ATTR_STATS,	/* struct ovs_vport_stats */
 	__OVS_VPORT_ATTR_MAX
 };
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index f7f5292..86e0e97 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -35,6 +35,7 @@
 #include "bitmap.h"
 #include "dpif-provider.h"
 #include "dynamic-string.h"
+#include "fat-rwlock.h"
 #include "flow.h"
 #include "netdev.h"
 #include "netdev-linux.h"
@@ -132,7 +133,15 @@ struct dpif_channel {
     long long int last_poll;    /* Last time this channel was polled. */
 };
 
-static void report_loss(struct dpif *, struct dpif_channel *);
+static void report_loss(struct dpif *, struct dpif_channel *, uint32_t ch_idx,
+                        uint32_t handler_id);
+
+struct dpif_epoll {
+    struct epoll_event *epoll_events;
+    int epoll_fd;               /* epoll fd that includes channel socks. */
+    int n_events;               /* Num events returned by epoll_wait(). */
+    int event_offset;           /* Offset into 'epoll_events'. */
+};
 
 /* Datapath interface for the openvswitch Linux kernel module. */
 struct dpif_linux {
@@ -140,13 +149,11 @@ struct dpif_linux {
     int dp_ifindex;
 
     /* Upcall messages. */
-    struct ovs_mutex upcall_lock;
+    struct fat_rwlock upcall_lock;
     int uc_array_size;          /* Size of 'channels' and 'epoll_events'. */
-    struct dpif_channel *channels;
-    struct epoll_event *epoll_events;
-    int epoll_fd;               /* epoll fd that includes channel socks. */
-    int n_events;               /* Num events returned by epoll_wait(). */
-    int event_offset;           /* Offset into 'epoll_events'. */
+    struct dpif_epoll *epolls;
+    struct dpif_channel **channels;/* Array of channel arrays for each vport. */
+    uint32_t n_handlers;           /* Num of upcall receivers (handlers). */
 
     /* Change notification. */
     struct nl_sock *port_notifier; /* vport multicast group subscriber. */
@@ -171,8 +178,8 @@ static unsigned int ovs_vport_mcgroup;
 static int dpif_linux_init(void);
 static int open_dpif(const struct dpif_linux_dp *, struct dpif **);
 static uint32_t dpif_linux_port_get_pid(const struct dpif *,
-                                        odp_port_t port_no);
-static int dpif_linux_refresh_channels(struct dpif *);
+                                        odp_port_t port_no, uint32_t hash);
+static int dpif_linux_refresh_channels(struct dpif *, uint32_t n_handlers);
 
 static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *,
                                        struct ofpbuf *);
@@ -252,8 +259,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)
 
     dpif = xzalloc(sizeof *dpif);
     dpif->port_notifier = NULL;
-    ovs_mutex_init(&dpif->upcall_lock);
-    dpif->epoll_fd = -1;
+    fat_rwlock_init(&dpif->upcall_lock);
 
     dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,
               dp->dp_ifindex, dp->dp_ifindex);
@@ -265,63 +271,43 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)
 }
 
 static void
-destroy_channels(struct dpif_linux *dpif)
+channels_to_pids(struct dpif_channel *ch, uint32_t len, uint32_t **upcall_pids)
 {
-    unsigned int i;
+    size_t i;
+    uint32_t *pids;
 
-    if (dpif->epoll_fd < 0) {
+    if (!upcall_pids) {
         return;
     }
 
-    for (i = 0; i < dpif->uc_array_size; i++ ) {
-        struct dpif_linux_vport vport_request;
-        struct dpif_channel *ch = &dpif->channels[i];
-        uint32_t upcall_pid = 0;
-
-        if (!ch->sock) {
-            continue;
-        }
-
-        epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
-
-        /* Turn off upcalls. */
-        dpif_linux_vport_init(&vport_request);
-        vport_request.cmd = OVS_VPORT_CMD_SET;
-        vport_request.dp_ifindex = dpif->dp_ifindex;
-        vport_request.port_no = u32_to_odp(i);
-        vport_request.upcall_pid = &upcall_pid;
-        dpif_linux_vport_transact(&vport_request, NULL, NULL);
-
-        nl_sock_destroy(ch->sock);
+    pids = xmalloc(len * sizeof *pids);
+    for (i = 0; i < len; i++) {
+        pids[i] = nl_sock_pid(ch[i].sock);
     }
-
-    free(dpif->channels);
-    dpif->channels = NULL;
-    dpif->uc_array_size = 0;
-
-    free(dpif->epoll_events);
-    dpif->epoll_events = NULL;
-    dpif->n_events = dpif->event_offset = 0;
-
-    /* Don't close dpif->epoll_fd since that would cause other threads that
-     * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed fd. */
+    *upcall_pids = pids;
 }
 
+/* Adds 'dpif->n_handlers' channels to vport.  If upcall_pids is non-NULL,
+ * makes it point to the array of channel socket pids. */
 static int
-add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)
+add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no,
+                   uint32_t **upcall_pids)
 {
     struct epoll_event event;
+    struct dpif_epoll *epolls = dpif->epolls;
     uint32_t port_idx = odp_to_u32(port_no);
+    uint32_t *pids = NULL;
+    int error = 0;
+    size_t i, j;
 
-    if (dpif->epoll_fd < 0) {
+    if (epolls == NULL) {
         return 0;
     }
 
     /* We assume that the datapath densely chooses port numbers, which
-     * can therefore be used as an index into an array of channels. */
+     * can therefore be used as an index into dpif->channels. */
     if (port_idx >= dpif->uc_array_size) {
         uint32_t new_size = port_idx + 1;
-        uint32_t i;
 
         if (new_size > MAX_PORTS) {
             VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
@@ -332,49 +318,117 @@ add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)
         dpif->channels = xrealloc(dpif->channels,
                                   new_size * sizeof *dpif->channels);
         for (i = dpif->uc_array_size; i < new_size; i++) {
-            dpif->channels[i].sock = NULL;
+            dpif->channels[i] = NULL;
+        }
+        for (i = 0; i < dpif->n_handlers; i++) {
+            epolls[i].epoll_events = xrealloc(epolls[i].epoll_events,
+                                              new_size * sizeof
+                                              *epolls[i].epoll_events);
         }
-
-        dpif->epoll_events = xrealloc(dpif->epoll_events,
-                                      new_size * sizeof *dpif->epoll_events);
         dpif->uc_array_size = new_size;
     }
 
     memset(&event, 0, sizeof event);
     event.events = EPOLLIN;
     event.data.u32 = port_idx;
-    if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
-                  &event) < 0) {
-        return errno;
-    }
 
-    nl_sock_destroy(dpif->channels[port_idx].sock);
-    dpif->channels[port_idx].sock = sock;
-    dpif->channels[port_idx].last_poll = LLONG_MIN;
+    /* Creates channel for each upcall handler. */
+    dpif->channels[port_idx] = xzalloc(dpif->n_handlers
+                                       * sizeof *dpif->channels[port_idx]);
+    for (i = 0; i < dpif->n_handlers; i++) {
+        struct nl_sock *sock = NULL;
+
+        error = nl_sock_create(NETLINK_GENERIC, &sock);
+        if (error) {
+            goto error;
+        }
+
+        if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
+                      &event) < 0) {
+            error = errno;
+            goto error;
+        }
+        dpif->channels[port_idx][i].sock = sock;
+        dpif->channels[port_idx][i].last_poll = LLONG_MIN;
+    }
+    channels_to_pids(dpif->channels[port_idx], dpif->n_handlers, upcall_pids);
 
     return 0;
+
+error:
+    /* Cleans up the already created channel and socks. */
+    for (j = 0; j < i; j++) {
+        nl_sock_destroy(dpif->channels[port_idx][j].sock);
+    }
+    free(pids);
+    free(dpif->channels[port_idx]);
+    dpif->channels[port_idx] = NULL;
+
+    return error;
 }
 
 static void
-del_channel(struct dpif_linux *dpif, odp_port_t port_no)
+del_vport_channels(struct dpif_linux *dpif, odp_port_t port_no)
 {
     struct dpif_channel *ch;
     uint32_t port_idx = odp_to_u32(port_no);
+    size_t i;
+
+    if (!dpif->epolls || port_idx >= dpif->uc_array_size) {
+        return;
+    }
 
-    if (dpif->epoll_fd < 0 || port_idx >= dpif->uc_array_size) {
+    ch = dpif->channels[port_idx];
+    if (!ch) {
         return;
     }
 
-    ch = &dpif->channels[port_idx];
-    if (!ch->sock) {
+    for (i = 0; i < dpif->n_handlers; i++) {
+        epoll_ctl(dpif->epolls[i].epoll_fd, EPOLL_CTL_DEL,
+                  nl_sock_fd(ch[i].sock), NULL);
+        nl_sock_destroy(ch[i].sock);
+        dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
+    }
+    free(ch);
+    dpif->channels[port_idx] = NULL;
+}
+
+static void
+destroy_all_channels(struct dpif_linux *dpif)
+{
+    unsigned int i;
+
+    if (!dpif->epolls) {
         return;
     }
 
-    epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
-    dpif->event_offset = dpif->n_events = 0;
+    for (i = 0; i < dpif->uc_array_size; i++ ) {
+        struct dpif_linux_vport vport_request;
+        struct dpif_channel *ch = dpif->channels[i];
+
+        if (!ch->sock) {
+            continue;
+        }
+
+        /* Turn off upcalls. */
+        dpif_linux_vport_init(&vport_request);
+        vport_request.cmd = OVS_VPORT_CMD_SET;
+        vport_request.dp_ifindex = dpif->dp_ifindex;
+        vport_request.port_no = u32_to_odp(i);
+        vport_request.upcall_pids = NULL;
+        dpif_linux_vport_transact(&vport_request, NULL, NULL);
+
+        del_vport_channels(dpif, u32_to_odp(i));
+    }
+
+    free(dpif->channels);
+    dpif->channels = NULL;
+    dpif->uc_array_size = 0;
+
+    free(dpif->epolls);
 
-    nl_sock_destroy(ch->sock);
-    ch->sock = NULL;
+    /* Don't close dpif->epoll_fd since that would cause other threads that
+     * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed fd. */
 }
 
 static void
@@ -383,11 +437,8 @@ dpif_linux_close(struct dpif *dpif_)
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
     nl_sock_destroy(dpif->port_notifier);
-    destroy_channels(dpif);
-    if (dpif->epoll_fd >= 0) {
-        close(dpif->epoll_fd);
-    }
-    ovs_mutex_destroy(&dpif->upcall_lock);
+    destroy_all_channels(dpif);
+    fat_rwlock_destroy(&dpif->upcall_lock);
     free(dpif);
 }
 
@@ -409,7 +460,7 @@ dpif_linux_run(struct dpif *dpif_)
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     if (dpif->refresh_channels) {
         dpif->refresh_channels = false;
-        dpif_linux_refresh_channels(dpif_);
+        dpif_linux_refresh_channels(dpif_, dpif->n_handlers);
     }
 }
 
@@ -500,20 +551,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
                                                   namebuf, sizeof namebuf);
     const char *type = netdev_get_type(netdev);
     struct dpif_linux_vport request, reply;
-    struct nl_sock *sock = NULL;
-    uint32_t upcall_pid;
     struct ofpbuf *buf;
     uint64_t options_stub[64 / 8];
     struct ofpbuf options;
+    uint32_t *upcall_pids = NULL;
     int error;
 
-    if (dpif->epoll_fd >= 0) {
-        error = nl_sock_create(NETLINK_GENERIC, &sock);
-        if (error) {
-            return error;
-        }
-    }
-
     dpif_linux_vport_init(&request);
     request.cmd = OVS_VPORT_CMD_NEW;
     request.dp_ifindex = dpif->dp_ifindex;
@@ -522,7 +565,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
         VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it has "
                      "unsupported type `%s'",
                      dpif_name(dpif_), name, type);
-        nl_sock_destroy(sock);
         return EINVAL;
     }
     request.name = name;
@@ -541,41 +583,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
     }
 
     request.port_no = *port_nop;
-    upcall_pid = sock ? nl_sock_pid(sock) : 0;
-    request.upcall_pid = &upcall_pid;
+    request.upcall_pids = NULL;
 
     error = dpif_linux_vport_transact(&request, &reply, &buf);
     if (!error) {
         *port_nop = reply.port_no;
-        VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
-                 dpif_name(dpif_), reply.port_no, upcall_pid);
     } else {
         if (error == EBUSY && *port_nop != ODPP_NONE) {
             VLOG_INFO("%s: requested port %"PRIu32" is in use",
                       dpif_name(dpif_), *port_nop);
         }
-        nl_sock_destroy(sock);
         ofpbuf_delete(buf);
         return error;
     }
     ofpbuf_delete(buf);
 
-    if (sock) {
-        error = add_channel(dpif, *port_nop, sock);
-        if (error) {
-            VLOG_INFO("%s: could not add channel for port %s",
-                      dpif_name(dpif_), name);
+    if (add_vport_channels(dpif, *port_nop, &upcall_pids)) {
+        VLOG_INFO("%s: could not add channel for port %s",
+                  dpif_name(dpif_), name);
 
-            /* Delete the port. */
-            dpif_linux_vport_init(&request);
-            request.cmd = OVS_VPORT_CMD_DEL;
-            request.dp_ifindex = dpif->dp_ifindex;
-            request.port_no = *port_nop;
-            dpif_linux_vport_transact(&request, NULL, NULL);
+        /* Delete the port. */
+        dpif_linux_vport_init(&request);
+        request.cmd = OVS_VPORT_CMD_DEL;
+        request.dp_ifindex = dpif->dp_ifindex;
+        request.port_no = *port_nop;
+        dpif_linux_vport_transact(&request, NULL, NULL);
 
-            nl_sock_destroy(sock);
-            return error;
-        }
+        return error;
+    } else {
+        dpif_linux_vport_init(&request);
+        request.cmd = OVS_VPORT_CMD_SET;
+        request.dp_ifindex = dpif->dp_ifindex;
+        request.port_no = *port_nop;
+        request.n_pids = dpif->n_handlers;
+        request.upcall_pids = upcall_pids;
+        dpif_linux_vport_transact(&request, NULL, NULL);
+        free(upcall_pids);
     }
 
     return 0;
@@ -588,9 +631,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int error;
 
-    ovs_mutex_lock(&dpif->upcall_lock);
+    fat_rwlock_wrlock(&dpif->upcall_lock);
     error = dpif_linux_port_add__(dpif_, netdev, port_nop);
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return error;
 }
@@ -608,7 +651,7 @@ dpif_linux_port_del__(struct dpif *dpif_, odp_port_t port_no)
     vport.port_no = port_no;
     error = dpif_linux_vport_transact(&vport, NULL, NULL);
 
-    del_channel(dpif, port_no);
+    del_vport_channels(dpif, port_no);
 
     return error;
 }
@@ -619,9 +662,9 @@ dpif_linux_port_del(struct dpif *dpif_, odp_port_t port_no)
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int error;
 
-    ovs_mutex_lock(&dpif->upcall_lock);
+    fat_rwlock_wrlock(&dpif->upcall_lock);
     error = dpif_linux_port_del__(dpif_, port_no);
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return error;
 }
@@ -672,21 +715,26 @@ dpif_linux_port_query_by_name(const struct dpif *dpif, const char *devname,
 }
 
 static uint32_t
-dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)
+dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
+                        uint32_t hash)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     uint32_t port_idx = odp_to_u32(port_no);
     uint32_t pid = 0;
 
-    ovs_mutex_lock(&dpif->upcall_lock);
-    if (dpif->epoll_fd >= 0) {
+    fat_rwlock_wrlock(&dpif->upcall_lock);
+    if (dpif->epolls) {
         /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
          * channel, since it is not heavily loaded. */
         uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
-        const struct nl_sock *sock = dpif->channels[idx].sock;
-        pid = sock ? nl_sock_pid(sock) : 0;
+        const struct dpif_channel *ch = dpif->channels[idx];
+
+        if (ch) {
+            pid = ch[hash % dpif->n_handlers].sock
+                  ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0;
+        }
     }
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return pid;
 }
@@ -1274,10 +1322,10 @@ dpif_linux_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
 }
 
 /* Synchronizes 'dpif->channels' with the set of vports currently in 'dpif' in
- * the kernel, by adding a new channel for any kernel vport that lacks one and
- * deleting any channels that have no backing kernel vports. */
+ * the kernel, by adding a new set of channels for any kernel vport that lacks
+ * one and deleting any channels that have no backing kernel vports. */
 static int
-dpif_linux_refresh_channels(struct dpif *dpif_)
+dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     unsigned long int *keep_channels;
@@ -1287,52 +1335,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
     int retval = 0;
     size_t i;
 
-    /* To start with, we need an epoll fd. */
-    if (dpif->epoll_fd < 0) {
-        dpif->epoll_fd = epoll_create(10);
-        if (dpif->epoll_fd < 0) {
-            return errno;
+    if (dpif->n_handlers != n_handlers) {
+        destroy_all_channels(dpif);
+        dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls);
+
+        for (i = 0; i < n_handlers; i++) {
+            dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ?
+                                                    dpif->uc_array_size : 10);
+            if (dpif->epolls[i].epoll_fd < 0) {
+                return errno;
+            }
         }
+        dpif->n_handlers = n_handlers;
+    }
+
+    for (i = 0; i < n_handlers; i++) {
+        dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
     }
 
     keep_channels_nbits = dpif->uc_array_size;
     keep_channels = bitmap_allocate(keep_channels_nbits);
 
-    dpif->n_events = dpif->event_offset = 0;
-
     dpif_linux_port_dump_start__(dpif_, &dump);
     while (!dpif_linux_port_dump_next__(dpif_, &dump, &vport)) {
         uint32_t port_no = odp_to_u32(vport.port_no);
-        struct nl_sock *sock = (port_no < dpif->uc_array_size
-                                ? dpif->channels[port_no].sock
-                                : NULL);
-        bool new_sock = !sock;
+        struct dpif_channel *ch = (port_no < dpif->uc_array_size
+                                   ? dpif->channels[port_no]
+                                   : NULL);
+        uint32_t *upcall_pids = NULL;
         int error;
 
-        if (new_sock) {
-            error = nl_sock_create(NETLINK_GENERIC, &sock);
+        if (!ch) {
+            error = add_vport_channels(dpif, vport.port_no, &upcall_pids);
             if (error) {
+                VLOG_INFO("%s: could not add channels for port %s",
+                          dpif_name(dpif_), vport.name);
                 retval = error;
                 goto error;
             }
+        } else {
+            channels_to_pids(ch, dpif->n_handlers, &upcall_pids);
         }
 
         /* Configure the vport to deliver misses to 'sock'. */
-        if (!vport.upcall_pid || *vport.upcall_pid != nl_sock_pid(sock)) {
-            uint32_t upcall_pid = nl_sock_pid(sock);
+        if (!vport.upcall_pids
+            || vport.n_pids != dpif->n_handlers
+            || memcmp(upcall_pids, vport.upcall_pids, n_handlers * sizeof
+                      *upcall_pids)) {
             struct dpif_linux_vport vport_request;
 
             dpif_linux_vport_init(&vport_request);
             vport_request.cmd = OVS_VPORT_CMD_SET;
             vport_request.dp_ifindex = dpif->dp_ifindex;
             vport_request.port_no = vport.port_no;
-            vport_request.upcall_pid = &upcall_pid;
+            vport_request.n_pids = dpif->n_handlers;
+            vport_request.upcall_pids = upcall_pids;
             error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
-            if (!error) {
-                VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
-                         dpif_name(&dpif->dpif), vport_request.port_no,
-                         upcall_pid);
-            } else {
+            if (error) {
                 VLOG_WARN_RL(&error_rl,
                              "%s: failed to set upcall pid on port: %s",
                              dpif_name(&dpif->dpif), ovs_strerror(error));
@@ -1348,31 +1407,22 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
             }
         }
 
-        if (new_sock) {
-            error = add_channel(dpif, vport.port_no, sock);
-            if (error) {
-                VLOG_INFO("%s: could not add channel for port %s",
-                          dpif_name(dpif_), vport.name);
-                retval = error;
-                goto error;
-            }
-        }
-
         if (port_no < keep_channels_nbits) {
             bitmap_set1(keep_channels, port_no);
         }
         continue;
 
     error:
-        nl_sock_destroy(sock);
+        del_vport_channels(dpif, vport.port_no);
     }
     nl_dump_done(&dump);
 
     /* Discard any saved channels that we didn't reuse. */
     for (i = 0; i < keep_channels_nbits; i++) {
         if (!bitmap_is_set(keep_channels, i)) {
-            nl_sock_destroy(dpif->channels[i].sock);
-            dpif->channels[i].sock = NULL;
+            del_vport_channels(dpif, u32_to_odp(i));
+            free(dpif->channels[i]);
+            dpif->channels[i] = NULL;
         }
     }
     free(keep_channels);
@@ -1381,29 +1431,32 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
 }
 
 static int
-dpif_linux_recv_set__(struct dpif *dpif_, bool enable)
+dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t n_handlers)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
-    if ((dpif->epoll_fd >= 0) == enable) {
+    if ((dpif->epolls != NULL) == enable) {
+        if (enable && dpif->n_handlers != n_handlers) {
+            dpif_linux_refresh_channels(dpif_, n_handlers);
+        }
         return 0;
     } else if (!enable) {
-        destroy_channels(dpif);
+        destroy_all_channels(dpif);
         return 0;
     } else {
-        return dpif_linux_refresh_channels(dpif_);
+        return dpif_linux_refresh_channels(dpif_, n_handlers);
     }
 }
 
 static int
-dpif_linux_recv_set(struct dpif *dpif_, bool enable)
+dpif_linux_recv_set(struct dpif *dpif_, bool enable, uint32_t n_handlers)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int error;
 
-    ovs_mutex_lock(&dpif->upcall_lock);
-    error = dpif_linux_recv_set__(dpif_, enable);
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_wrlock(&dpif->upcall_lock);
+    error = dpif_linux_recv_set__(dpif_, enable, n_handlers);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return error;
 }
@@ -1482,38 +1535,39 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,
 }
 
 static int
-dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,
-                  struct ofpbuf *buf)
+dpif_linux_recv__(struct dpif *dpif_, uint32_t handler_id,
+                  struct dpif_upcall *upcall, struct ofpbuf *buf)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
+    struct dpif_epoll *epoll = dpif->epolls ? &dpif->epolls[handler_id] : NULL;
     int read_tries = 0;
 
-    if (dpif->epoll_fd < 0) {
-       return EAGAIN;
+    if (!epoll) {
+        return EAGAIN;
     }
 
-    if (dpif->event_offset >= dpif->n_events) {
+    if (epoll->event_offset >= epoll->n_events) {
         int retval;
 
-        dpif->event_offset = dpif->n_events = 0;
+        epoll->event_offset = epoll->n_events = 0;
 
         do {
-            retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
+            retval = epoll_wait(epoll->epoll_fd, epoll->epoll_events,
                                 dpif->uc_array_size, 0);
         } while (retval < 0 && errno == EINTR);
         if (retval < 0) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
             VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", ovs_strerror(errno));
         } else if (retval > 0) {
-            dpif->n_events = retval;
+            epoll->n_events = retval;
         }
     }
 
-    while (dpif->event_offset < dpif->n_events) {
-        int idx = dpif->epoll_events[dpif->event_offset].data.u32;
-        struct dpif_channel *ch = &dpif->channels[idx];
+    while (epoll->event_offset < epoll->n_events) {
+        int idx = epoll->epoll_events[epoll->event_offset].data.u32;
+        struct dpif_channel *ch = &dpif->channels[idx][handler_id];
 
-        dpif->event_offset++;
+        epoll->event_offset++;
 
         for (;;) {
             int dp_ifindex;
@@ -1529,7 +1583,7 @@ dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,
                  * packets that the buffer overflowed.  Try again
                  * immediately because there's almost certainly a packet
                  * waiting for us. */
-                report_loss(dpif_, ch);
+                report_loss(dpif_, ch, idx, handler_id);
                 continue;
             }
 
@@ -1554,29 +1608,29 @@ dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,
 }
 
 static int
-dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
-                struct ofpbuf *buf)
+dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id,
+                struct dpif_upcall *upcall, struct ofpbuf *buf)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int error;
 
-    ovs_mutex_lock(&dpif->upcall_lock);
-    error = dpif_linux_recv__(dpif_, upcall, buf);
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_rdlock(&dpif->upcall_lock);
+    error = dpif_linux_recv__(dpif_, handler_id, upcall, buf);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return error;
 }
 
 static void
-dpif_linux_recv_wait(struct dpif *dpif_)
+dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
-    ovs_mutex_lock(&dpif->upcall_lock);
-    if (dpif->epoll_fd >= 0) {
-        poll_fd_wait(dpif->epoll_fd, POLLIN);
+    fat_rwlock_rdlock(&dpif->upcall_lock);
+    if (dpif->epolls) {
+        poll_fd_wait(dpif->epolls[handler_id].epoll_fd, POLLIN);
     }
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 }
 
 static void
@@ -1584,18 +1638,19 @@ dpif_linux_recv_purge(struct dpif *dpif_)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
-    ovs_mutex_lock(&dpif->upcall_lock);
-    if (dpif->epoll_fd >= 0) {
-        struct dpif_channel *ch;
+    fat_rwlock_rdlock(&dpif->upcall_lock);
+    if (dpif->epolls) {
+        size_t i, j;
 
-        for (ch = dpif->channels; ch < &dpif->channels[dpif->uc_array_size];
-             ch++) {
-            if (ch->sock) {
-                nl_sock_drain(ch->sock);
+        for (i = 0; i < dpif->uc_array_size; i++ ) {
+            if (dpif->channels[i]) {
+                for (j = 0; j < dpif->n_handlers; j++) {
+                    nl_sock_drain(dpif->channels[i][j].sock);
+                }
             }
         }
     }
-    ovs_mutex_unlock(&dpif->upcall_lock);
+    fat_rwlock_unlock(&dpif->upcall_lock);
 }
 
 const struct dpif_class dpif_linux_class = {
@@ -1701,7 +1756,7 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,
         [OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 },
         [OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 },
         [OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len = IFNAMSIZ },
-        [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 },
+        [OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NL_A_UNSPEC, .optional = true },
         [OVS_VPORT_ATTR_STATS] = { NL_POLICY_FOR(struct ovs_vport_stats),
                                    .optional = true },
         [OVS_VPORT_ATTR_OPTIONS] = { .type = NL_A_NESTED, .optional = true },
@@ -1731,9 +1786,12 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,
     vport->port_no = nl_attr_get_odp_port(a[OVS_VPORT_ATTR_PORT_NO]);
     vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]);
     vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]);
-    if (a[OVS_VPORT_ATTR_UPCALL_PID]) {
-        vport->upcall_pid = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]);
+    if (a[OVS_VPORT_ATTR_UPCALL_PIDS]) {
+        vport->n_pids = nl_attr_get_size(a[OVS_VPORT_ATTR_UPCALL_PIDS])
+                        / (sizeof *vport->upcall_pids);
+        vport->upcall_pids = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PIDS]);
     }
+
     if (a[OVS_VPORT_ATTR_STATS]) {
         vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]);
     }
@@ -1770,8 +1828,10 @@ dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *vport,
         nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name);
     }
 
-    if (vport->upcall_pid) {
-        nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID, *vport->upcall_pid);
+    if (vport->upcall_pids) {
+        nl_msg_put_unspec(buf, OVS_VPORT_ATTR_UPCALL_PIDS,
+                          vport->upcall_pids,
+                          vport->n_pids * sizeof *vport->upcall_pids);
     }
 
     if (vport->stats) {
@@ -2176,9 +2236,9 @@ dpif_linux_flow_get_stats(const struct dpif_linux_flow *flow,
 /* Logs information about a packet that was recently lost in 'ch' (in
  * 'dpif_'). */
 static void
-report_loss(struct dpif *dpif_, struct dpif_channel *ch)
+report_loss(struct dpif *dpif_, struct dpif_channel *ch, uint32_t ch_idx,
+            uint32_t handler_id)
 {
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
     struct ds s;
 
@@ -2192,7 +2252,7 @@ report_loss(struct dpif *dpif_, struct dpif_channel *ch)
                       time_msec() - ch->last_poll);
     }
 
-    VLOG_WARN("%s: lost packet on channel %"PRIdPTR"%s",
-              dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s));
+    VLOG_WARN("%s: lost packet on channel %u of handler %u", dpif_name(dpif_),
+              ch_idx, handler_id);
     ds_destroy(&s);
 }
diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h
index ec94ccf..02c7b03 100644
--- a/lib/dpif-linux.h
+++ b/lib/dpif-linux.h
@@ -32,6 +32,7 @@ struct dpif_linux_vport {
 
     /* ovs_vport header. */
     int dp_ifindex;
+    uint32_t n_pids;
     odp_port_t port_no;                    /* ODPP_NONE if unknown. */
     enum ovs_vport_type type;
 
@@ -41,7 +42,7 @@ struct dpif_linux_vport {
      * 32-bit boundaries, so use get_unaligned_u64() to access its values.
      */
     const char *name;                      /* OVS_VPORT_ATTR_NAME. */
-    const uint32_t *upcall_pid;            /* OVS_VPORT_ATTR_UPCALL_PID. */
+    const uint32_t *upcall_pids;           /* OVS_VPORT_ATTR_UPCALL_PIDS. */
     const struct ovs_vport_stats *stats;   /* OVS_VPORT_ATTR_STATS. */
     const struct nlattr *options;          /* OVS_VPORT_ATTR_OPTIONS. */
     size_t options_len;
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 73eb99d..9033fba 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -1429,7 +1429,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 }
 
 static int
-dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
+dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED,
+                     uint32_t n_handlers OVS_UNUSED)
 {
     return 0;
 }
@@ -1458,8 +1459,8 @@ find_nonempty_queue(struct dp_netdev *dp)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
-                 struct ofpbuf *buf)
+dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id OVS_UNUSED,
+                 struct dpif_upcall *upcall, struct ofpbuf *buf)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_queue *q;
@@ -1485,7 +1486,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
 }
 
 static void
-dpif_netdev_recv_wait(struct dpif *dpif)
+dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     uint64_t seq;
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index adc5242..7ef10bb 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -146,7 +146,8 @@ struct dpif_class {
 
     /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE
      * actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in
-     * flows whose packets arrived on port 'port_no'.
+     * flows whose packets arrived on port 'port_no' and 5-tuple hash is
+     * 'hash'.
      *
      * A 'port_no' of UINT32_MAX should be treated as a special case.  The
      * implementation should return a reserved PID, not allocated to any port,
@@ -158,7 +159,8 @@ struct dpif_class {
      *
      * A dpif provider that doesn't have meaningful Netlink PIDs can use NULL
      * for this function.  This is equivalent to always returning 0. */
-    uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no);
+    uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no,
+                             uint32_t hash);
 
     /* Attempts to begin dumping the ports in a dpif.  On success, returns 0
      * and initializes '*statep' with any data needed for iteration.  On
@@ -324,16 +326,17 @@ struct dpif_class {
      * Turning packet receive off and then back on is allowed to change Netlink
      * PID assignments (see ->port_get_pid()).  The client is responsible for
      * updating flows as necessary if it does this. */
-    int (*recv_set)(struct dpif *dpif, bool enable);
+    int (*recv_set)(struct dpif *dpif, bool enable, uint32_t n_handlers);
 
     /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
      * priority value used for setting packet priority. */
     int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
                              uint32_t *priority);
 
-    /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
-     * '*upcall', using 'buf' for storage.  Should only be called if 'recv_set'
-     * has been used to enable receiving packets from 'dpif'.
+    /* Polls for an upcall from 'dpif' for handler with 'handler_id'.  If
+     * successful, stores the upcall into '*upcall', using 'buf' for storage.
+     * Should only be called if 'recv_set' has been used to enable receiving
+     * packets from 'dpif'.
      *
      * The implementation should point 'upcall->key' and 'upcall->userdata'
      * (if any) into data in the caller-provided 'buf'.  The implementation may
@@ -349,12 +352,13 @@ struct dpif_class {
      *
      * This function must not block.  If no upcall is pending when it is
      * called, it should return EAGAIN without blocking. */
-    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
-                struct ofpbuf *buf);
+    int (*recv)(struct dpif *dpif, uint32_t handler_id,
+                struct dpif_upcall *upcall, struct ofpbuf *buf);
 
-    /* Arranges for the poll loop to wake up when 'dpif' has a message queued
-     * to be received with the recv member function. */
-    void (*recv_wait)(struct dpif *dpif);
+    /* Arranges for the poll loop for handler with 'handler_id' to wake up when
+     * 'dpif' has a message queued to be received with the recv member
+     * function by the handler. */
+    void (*recv_wait)(struct dpif *dpif, uint32_t handler_id);
 
     /* Throws away any queued upcalls that 'dpif' currently has ready to
      * return. */
diff --git a/lib/dpif.c b/lib/dpif.c
index 2b79a6e..72897b3 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -634,7 +634,7 @@ dpif_port_query_by_name(const struct dpif *dpif, const char *devname,
 
 /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE actions
  * as the OVS_USERSPACE_ATTR_PID attribute's value, for use in flows whose
- * packets arrived on port 'port_no'.
+ * packets arrived on port 'port_no' and 5-tuple hash is 'hash'.
  *
  * A 'port_no' of ODPP_NONE is a special case: it returns a reserved PID, not
  * allocated to any port, that the client may use for special purposes.
@@ -645,10 +645,10 @@ dpif_port_query_by_name(const struct dpif *dpif, const char *devname,
  * update all of the flows that it installed that contain
  * OVS_ACTION_ATTR_USERSPACE actions. */
 uint32_t
-dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no)
+dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no, uint32_t hash)
 {
     return (dpif->dpif_class->port_get_pid
-            ? (dpif->dpif_class->port_get_pid)(dpif, port_no)
+            ? (dpif->dpif_class->port_get_pid)(dpif, port_no, hash)
             : 0);
 }
 
@@ -1248,24 +1248,18 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type)
     }
 }
 
-/* Enables or disables receiving packets with dpif_recv() on 'dpif'.  Returns 0
- * if successful, otherwise a positive errno value.
- *
- * Turning packet receive off and then back on may change the Netlink PID
- * assignments returned by dpif_port_get_pid().  If the client does this, it
- * must update all of the flows that have OVS_ACTION_ATTR_USERSPACE actions
- * using the new PID assignment. */
 int
-dpif_recv_set(struct dpif *dpif, bool enable)
+dpif_recv_set(struct dpif *dpif, bool enable, uint32_t n_handlers)
 {
-    int error = dpif->dpif_class->recv_set(dpif, enable);
+    int error = dpif->dpif_class->recv_set(dpif, enable, n_handlers);
     log_operation(dpif, "recv_set", error);
     return error;
 }
 
-/* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
- * '*upcall', using 'buf' for storage.  Should only be called if
- * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
+/* Polls for an upcall from 'dpif' for handler with 'handler_id'.  If
+ * successful, stores the upcall into '*upcall', using 'buf' for storage.
+ * Should only be called if dpif_recv_set() has been used to enable receiving
+ * packets on 'dpif'.
  *
  * 'upcall->key' and 'upcall->userdata' point into data in the caller-provided
  * 'buf', so their memory cannot be freed separately from 'buf'.
@@ -1280,9 +1274,10 @@ dpif_recv_set(struct dpif *dpif, bool enable)
  * Returns 0 if successful, otherwise a positive errno value.  Returns EAGAIN
  * if no upcall is immediately available. */
 int
-dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)
+dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,
+          struct ofpbuf *buf)
 {
-    int error = dpif->dpif_class->recv(dpif, upcall, buf);
+    int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
     if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
         struct ds flow;
         char *packet;
@@ -1316,12 +1311,13 @@ dpif_recv_purge(struct dpif *dpif)
     }
 }
 
-/* Arranges for the poll loop to wake up when 'dpif' has a message queued to be
- * received with dpif_recv(). */
+/* Arranges for the poll loop for handler with 'handler_id' to wake up when
+ * 'dpif' has a message queued to be received with dpif_recv() by the handler.
+ */
 void
-dpif_recv_wait(struct dpif *dpif)
+dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
 {
-    dpif->dpif_class->recv_wait(dpif);
+	dpif->dpif_class->recv_wait(dpif, handler_id);
 }
 
 /* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'
diff --git a/lib/dpif.h b/lib/dpif.h
index 7f986f9..b6e3fbd 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -445,7 +445,8 @@ int dpif_port_query_by_name(const struct dpif *, const char *devname,
                             struct dpif_port *);
 int dpif_port_get_name(struct dpif *, odp_port_t port_no,
                        char *name, size_t name_size);
-uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no);
+uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no,
+                           uint32_t hash);
 
 struct dpif_port_dump {
     const struct dpif *dpif;
@@ -613,10 +614,11 @@ struct dpif_upcall {
     struct nlattr *userdata;    /* Argument to OVS_ACTION_ATTR_USERSPACE. */
 };
 
-int dpif_recv_set(struct dpif *, bool enable);
-int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
+int dpif_recv_set(struct dpif *, bool enable, uint32_t n_handlers);
+int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
+              struct ofpbuf *);
 void dpif_recv_purge(struct dpif *);
-void dpif_recv_wait(struct dpif *);
+void dpif_recv_wait(struct dpif *, uint32_t handler_id);
 
 /* Miscellaneous. */
 
diff --git a/lib/flow.c b/lib/flow.c
index 06ba036..db29db0 100644
--- a/lib/flow.c
+++ b/lib/flow.c
@@ -822,6 +822,24 @@ flow_wildcards_set_reg_mask(struct flow_wildcards *wc, int idx, uint32_t mask)
     wc->masks.regs[idx] = mask;
 }
 
+/* Calculates the 5-tuple hash from the given flow. */
+uint32_t
+flow_hash_5tuple(const struct flow *flow, uint32_t basis)
+{
+    uint32_t hash;
+
+    if (!flow) {
+        return 0;
+    }
+
+    hash = (OVS_FORCE int) flow->nw_src
+           ^ (OVS_FORCE int) flow->nw_dst
+           ^ flow->nw_proto ^ (OVS_FORCE int) flow->tp_src
+           ^ (OVS_FORCE int) flow->tp_dst;
+
+    return jhash_bytes((void *) &hash, sizeof hash, basis);
+}
+
 /* Hashes 'flow' based on its L2 through L4 protocol information. */
 uint32_t
 flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis)
diff --git a/lib/flow.h b/lib/flow.h
index 3109a84..26871a2 100644
--- a/lib/flow.h
+++ b/lib/flow.h
@@ -323,7 +323,8 @@ void flow_wildcards_fold_minimask_range(struct flow_wildcards *,
 uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t basis);
 bool flow_wildcards_equal(const struct flow_wildcards *,
                           const struct flow_wildcards *);
-uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis);
+uint32_t flow_hash_5tuple(const struct flow *, uint32_t basis);
+uint32_t flow_hash_symmetric_l4(const struct flow *, uint32_t basis);
 
 /* Initialize a flow with random fields that matter for nx_hash_fields. */
 void flow_random_hash_fields(struct flow *);
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 489012a..e6df302 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -45,26 +45,11 @@
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(upcall_queue_overflow);
-
-/* A thread that processes each upcall handed to it by the dispatcher thread,
- * forwards the upcall's packet, and possibly sets up a kernel flow as a
- * cache. */
 struct handler {
     struct udpif *udpif;               /* Parent udpif. */
     pthread_t thread;                  /* Thread ID. */
     char *name;                        /* Thread name. */
-
-    struct ovs_mutex mutex;            /* Mutex guarding the following. */
-
-    /* Atomic queue of unprocessed upcalls. */
-    struct list upcalls OVS_GUARDED;
-    size_t n_upcalls OVS_GUARDED;
-
-    bool need_signal;                  /* Only changed by the dispatcher. */
-
-    pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
-                                          'mutex'. */
+    uint32_t handler_id;               /* Handler id. */
 };
 
 /* A thread that processes each kernel flow handed to it by the flow_dumper
@@ -89,9 +74,6 @@ struct revalidator {
  *
  * udpif has two logically separate pieces:
  *
- *    - A "dispatcher" thread that reads upcalls from the kernel and dispatches
- *      them to one of several "handler" threads (see struct handler).
- *
  *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
  *      flows to one of several "revalidator" threads (see struct
  *      revalidator). */
@@ -103,7 +85,6 @@ struct udpif {
 
     uint32_t secret;                   /* Random seed for upcall hash. */
 
-    pthread_t dispatcher;              /* Dispatcher thread ID. */
     pthread_t flow_dumper;             /* Flow dumper thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
@@ -143,7 +124,7 @@ enum upcall_type {
 };
 
 struct upcall {
-    struct list list_node;          /* For queuing upcalls. */
+    bool is_valid;
     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
 
     /* Raw upcall plus data for keeping track of the memory backing it. */
@@ -216,15 +197,14 @@ struct flow_miss {
     bool put;
 };
 
-static void upcall_destroy(struct upcall *);
+static void upcall_destroy(struct upcall *, bool free_upcall);
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
-static void recv_upcalls(struct udpif *);
-static void handle_upcalls(struct handler *handler, struct list *upcalls);
+static void handle_upcalls(struct handler *handler, struct upcall *upcalls,
+                           size_t n_upcalls);
 static void *udpif_flow_dumper(void *);
-static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
 static uint64_t udpif_get_n_flows(struct udpif *);
@@ -307,9 +287,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
             xpthread_join(handler->thread, NULL);
         }
 
@@ -323,7 +300,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         xpthread_join(udpif->flow_dumper, NULL);
-        xpthread_join(udpif->dispatcher, NULL);
 
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
@@ -347,17 +323,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         }
 
         for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-            struct upcall *miss, *next;
-
-            LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {
-                list_remove(&miss->list_node);
-                upcall_destroy(miss);
-            }
-            ovs_mutex_destroy(&handler->mutex);
-
-            xpthread_cond_destroy(&handler->wake_cond);
-            free(handler->name);
+            free(udpif->handlers[i].name);
         }
         latch_poll(&udpif->exit_latch);
 
@@ -382,10 +348,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             struct handler *handler = &udpif->handlers[i];
 
             handler->udpif = udpif;
-            list_init(&handler->upcalls);
-            handler->need_signal = false;
-            xpthread_cond_init(&handler->wake_cond, NULL);
-            ovs_mutex_init(&handler->mutex);
+            handler->handler_id = i;
             xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
                             handler);
         }
@@ -403,7 +366,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
             xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
                             revalidator);
         }
-        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
         xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 }
@@ -430,16 +392,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
 {
     size_t i;
 
-    simap_increase(usage, "dispatchers", 1);
     simap_increase(usage, "flow_dumpers", 1);
 
     simap_increase(usage, "handlers", udpif->n_handlers);
-    for (i = 0; i < udpif->n_handlers; i++) {
-        struct handler *handler = &udpif->handlers[i];
-        ovs_mutex_lock(&handler->mutex);
-        simap_increase(usage, "handler upcalls",  handler->n_upcalls);
-        ovs_mutex_unlock(&handler->mutex);
-    }
 
     simap_increase(usage, "revalidators", udpif->n_revalidators);
     for (i = 0; i < udpif->n_revalidators; i++) {
@@ -468,12 +423,16 @@ udpif_flush(void)
 
 /* Destroys and deallocates 'upcall'. */
 static void
-upcall_destroy(struct upcall *upcall)
+upcall_destroy(struct upcall *upcall, bool free_upcall)
 {
     if (upcall) {
         ofpbuf_uninit(&upcall->dpif_upcall.packet);
         ofpbuf_uninit(&upcall->upcall_buf);
-        free(upcall);
+
+        upcall->is_valid = false;
+        if (free_upcall) {
+            free(upcall);
+        }
     }
 }
 
@@ -499,24 +458,6 @@ udpif_get_n_flows(struct udpif *udpif)
     return flow_count;
 }
 
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
-static void *
-udpif_dispatcher(void *arg)
-{
-    struct udpif *udpif = arg;
-
-    set_subprogram_name("dispatcher");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        recv_upcalls(udpif);
-        dpif_recv_wait(udpif->dpif);
-        latch_wait(&udpif->exit_latch);
-        poll_block();
-    }
-
-    return NULL;
-}
-
 static void *
 udpif_flow_dumper(void *arg)
 {
@@ -627,38 +568,44 @@ udpif_flow_dumper(void *arg)
     return NULL;
 }
 
-/* The miss handler thread is responsible for processing miss upcalls retrieved
- * by the dispatcher thread.  Once finished it passes the processed miss
- * upcalls to ofproto-dpif where they're installed in the datapath. */
 static void *
 udpif_upcall_handler(void *arg)
 {
     struct handler *handler = arg;
+    struct udpif *udpif = handler->udpif;
+    struct upcall upcalls[FLOW_MISS_MAX_BATCH];
 
     handler->name = xasprintf("handler_%u", ovsthread_id_self());
     set_subprogram_name("%s", handler->name);
 
     while (!latch_is_set(&handler->udpif->exit_latch)) {
-        struct list misses = LIST_INITIALIZER(&misses);
-        size_t i;
-
-        ovs_mutex_lock(&handler->mutex);
-        if (!handler->n_upcalls) {
-            ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
-        }
+        size_t i, n_upcalls;
 
         for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
-            if (handler->n_upcalls) {
-                handler->n_upcalls--;
-                list_push_back(&misses, list_pop_front(&handler->upcalls));
-            } else {
+            struct upcall *upcall = &upcalls[i];
+            int error;
+
+            ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
+                            sizeof upcall->upcall_stub);
+            error = dpif_recv(udpif->dpif, handler->handler_id,
+                              &upcall->dpif_upcall, &upcall->upcall_buf);
+            if (error) {
+                /* upcall_destroy() can only be called on successfully received
+                 * upcalls. */
+                ofpbuf_uninit(&upcall->upcall_buf);
                 break;
             }
+            upcall->is_valid = true;
         }
-        ovs_mutex_unlock(&handler->mutex);
-
-        handle_upcalls(handler, &misses);
 
+        n_upcalls = i;
+        if (!n_upcalls) {
+            dpif_recv_wait(udpif->dpif, handler->handler_id);
+            latch_wait(&udpif->exit_latch);
+            poll_block();
+        } else {
+            handle_upcalls(handler, upcalls, n_upcalls);
+        }
         coverage_clear();
     }
 
@@ -765,98 +712,11 @@ classify_upcall(const struct upcall *upcall)
     }
 }
 
-static void
-recv_upcalls(struct udpif *udpif)
-{
-    int n;
-
-    for (;;) {
-        uint32_t hash = udpif->secret;
-        struct handler *handler;
-        struct upcall *upcall;
-        size_t n_bytes, left;
-        struct nlattr *nla;
-        int error;
-
-        upcall = xmalloc(sizeof *upcall);
-        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
-                        sizeof upcall->upcall_stub);
-        error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
-                          &upcall->upcall_buf);
-        if (error) {
-            /* upcall_destroy() can only be called on successfully received
-             * upcalls. */
-            ofpbuf_uninit(&upcall->upcall_buf);
-            free(upcall);
-            break;
-        }
-
-        n_bytes = 0;
-        NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
-                          upcall->dpif_upcall.key_len) {
-            enum ovs_key_attr type = nl_attr_type(nla);
-            if (type == OVS_KEY_ATTR_IN_PORT
-                || type == OVS_KEY_ATTR_TCP
-                || type == OVS_KEY_ATTR_UDP) {
-                if (nl_attr_get_size(nla) == 4) {
-                    hash = mhash_add(hash, nl_attr_get_u32(nla));
-                    n_bytes += 4;
-                } else {
-                    VLOG_WARN_RL(&rl,
-                                 "Netlink attribute with incorrect size.");
-                }
-            }
-        }
-        hash =  mhash_finish(hash, n_bytes);
-
-        handler = &udpif->handlers[hash % udpif->n_handlers];
-
-        ovs_mutex_lock(&handler->mutex);
-        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-            list_push_back(&handler->upcalls, &upcall->list_node);
-            if (handler->n_upcalls == 0) {
-                handler->need_signal = true;
-            }
-            handler->n_upcalls++;
-            if (handler->need_signal &&
-                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
-                handler->need_signal = false;
-                xpthread_cond_signal(&handler->wake_cond);
-            }
-            ovs_mutex_unlock(&handler->mutex);
-            if (!VLOG_DROP_DBG(&rl)) {
-                struct ds ds = DS_EMPTY_INITIALIZER;
-
-                odp_flow_key_format(upcall->dpif_upcall.key,
-                                    upcall->dpif_upcall.key_len,
-                                    &ds);
-                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
-                ds_destroy(&ds);
-            }
-        } else {
-            ovs_mutex_unlock(&handler->mutex);
-            COVERAGE_INC(upcall_queue_overflow);
-            upcall_destroy(upcall);
-        }
-    }
-
-    for (n = 0; n < udpif->n_handlers; ++n) {
-        struct handler *handler = &udpif->handlers[n];
-
-        if (handler->need_signal) {
-            handler->need_signal = false;
-            ovs_mutex_lock(&handler->mutex);
-            xpthread_cond_signal(&handler->wake_cond);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-    }
-}
-
 /* Calculates slow path actions for 'xout'.  'buf' must statically be
  * initialized with at least 128 bytes of space. */
 static void
 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
-                  odp_port_t odp_in_port, struct ofpbuf *buf)
+                  struct flow *flow, odp_port_t odp_in_port, struct ofpbuf *buf)
 {
     union user_action_cookie cookie;
     odp_port_t port;
@@ -869,7 +729,7 @@ compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
         ? ODPP_NONE
         : odp_in_port;
-    pid = dpif_port_get_pid(udpif->dpif, port);
+    pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
 }
 
@@ -889,7 +749,8 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
 }
 
 static void
-handle_upcalls(struct handler *handler, struct list *upcalls)
+handle_upcalls(struct handler *handler, struct upcall *upcalls,
+               size_t n_upcalls)
 {
     struct hmap misses = HMAP_INITIALIZER(&misses);
     struct udpif *udpif = handler->udpif;
@@ -898,7 +759,6 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
     struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
     struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
     struct flow_miss *miss, *next_miss;
-    struct upcall *upcall, *next;
     size_t n_misses, n_ops, i;
     unsigned int flow_limit;
     bool fail_open, may_put;
@@ -927,7 +787,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *     datapath flow.)
      */
     n_misses = 0;
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
+    for (i = 0; i < n_upcalls; i++) {
+        struct upcall *upcall = &upcalls[i];
         struct dpif_upcall *dupcall = &upcall->dpif_upcall;
         struct flow_miss *miss = &miss_buf[n_misses];
         struct ofpbuf *packet = &dupcall->packet;
@@ -956,8 +817,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
                               NULL);
             }
-            list_remove(&upcall->list_node);
-            upcall_destroy(upcall);
+            upcall_destroy(upcall, false);
             continue;
         }
 
@@ -1039,8 +899,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         dpif_ipfix_unref(ipfix);
         dpif_sflow_unref(sflow);
 
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+        upcall_destroy(upcall, false);
     }
 
     /* Initialize each 'struct flow_miss's ->xout.
@@ -1083,12 +942,17 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      * The loop fills 'ops' with an array of operations to execute in the
      * datapath. */
     n_ops = 0;
-    LIST_FOR_EACH (upcall, list_node, upcalls) {
+    for (i = 0; i < n_upcalls; i++) {
+        struct upcall *upcall = &upcalls[i];
         struct flow_miss *miss = upcall->flow_miss;
         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
         struct dpif_op *op;
         ovs_be16 flow_vlan_tci;
 
+        if (!upcall->is_valid) {
+            continue;
+        }
+
         /* Save a copy of flow.vlan_tci in case it is changed to
          * generate proper mega flow masks for VLAN splinter flows. */
         flow_vlan_tci = miss->flow.vlan_tci;
@@ -1162,7 +1026,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
 
                 ofpbuf_use_stack(&buf, miss->slow_path_buf,
                                  sizeof miss->slow_path_buf);
-                compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
+                compose_slow_path(udpif, &miss->xout, &miss->flow,
+                                  miss->odp_in_port, &buf);
                 op->u.flow_put.actions = buf.data;
                 op->u.flow_put.actions_len = buf.size;
             }
@@ -1197,11 +1062,16 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
      *
      * Copy packets before they are modified by execution. */
     if (fail_open) {
-        LIST_FOR_EACH (upcall, list_node, upcalls) {
+        for (i = 0; i < n_upcalls; i++) {
+            struct upcall *upcall = &upcalls[i];
             struct flow_miss *miss = upcall->flow_miss;
             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
             struct ofproto_packet_in *pin;
 
+            if (!upcall->is_valid) {
+                continue;
+            }
+
             pin = xmalloc(sizeof *pin);
             pin->up.packet = xmemdup(packet->data, packet->size);
             pin->up.packet_len = packet->size;
@@ -1227,9 +1097,10 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
     }
     hmap_destroy(&misses);
 
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+    for (i = 0; i < n_upcalls; i++) {
+        if (upcalls[i].is_valid) {
+            upcall_destroy(&upcalls[i], false);
+        }
     }
 }
 
@@ -1324,7 +1195,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
                          xout.odp_actions.size);
     } else {
         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
-        compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
+        compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
     }
 
     if (!ofpbuf_equal(&xout_actions, actions)) {
@@ -1525,16 +1396,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
 
         ds_put_char(&ds, '\n');
-        for (i = 0; i < udpif->n_handlers; i++) {
-            struct handler *handler = &udpif->handlers[i];
-
-            ovs_mutex_lock(&handler->mutex);
-            ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
-                          handler->name, handler->n_upcalls);
-            ovs_mutex_unlock(&handler->mutex);
-        }
-
-        ds_put_char(&ds, '\n');
         for (i = 0; i < n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
 
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index ad44582..e85aa7a 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -1483,8 +1483,9 @@ compose_sample_action(const struct xbridge *xbridge,
     actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);
 
     odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
-    pid = dpif_port_get_pid(xbridge->dpif, odp_port);
-    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size, odp_actions);
+    pid = dpif_port_get_pid(xbridge->dpif, odp_port, flow_hash_5tuple(flow, 0));
+    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
+                                             odp_actions);
 
     nl_msg_end_nested(odp_actions, actions_offset);
     nl_msg_end_nested(odp_actions, sample_offset);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 7b3e1eb..a53b3fd 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -470,7 +470,8 @@ type_run(const char *type)
 
         backer->recv_set_enable = true;
 
-        error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
+        error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
+                              n_handlers);
         if (error) {
             VLOG_ERR("Failed to enable receiving packets in dpif.");
             return error;
@@ -480,6 +481,7 @@ type_run(const char *type)
     }
 
     if (backer->recv_set_enable) {
+        dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);
         udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
     }
 
@@ -885,7 +887,7 @@ open_dpif_backer(const char *type, struct dpif_backer **backerp)
 
     shash_add(&all_dpif_backers, type, backer);
 
-    error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
+    error = dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);
     if (error) {
         VLOG_ERR("failed to listen on datapath of type %s: %s",
                  type, ovs_strerror(error));
@@ -930,7 +932,7 @@ check_variable_length_userdata(struct dpif_backer *backer)
     ofpbuf_init(&actions, 64);
     start = nl_msg_start_nested(&actions, OVS_ACTION_ATTR_USERSPACE);
     nl_msg_put_u32(&actions, OVS_USERSPACE_ATTR_PID,
-                   dpif_port_get_pid(backer->dpif, ODPP_NONE));
+                   dpif_port_get_pid(backer->dpif, ODPP_NONE, 0));
     nl_msg_put_unspec_zero(&actions, OVS_USERSPACE_ATTR_USERDATA, 4);
     nl_msg_end_nested(&actions, start);
 
-- 
1.7.9.5




More information about the dev mailing list