[ovs-dev] [netlink v3 07/16] datapath: Report kernel's flow key when passing packets up to userspace.

Ben Pfaff blp at nicira.com
Thu Dec 30 00:56:28 UTC 2010


One of the goals for Open vSwitch is to decouple kernel and userspace
software, so that either one can be upgraded or rolled back independent of
the other.  To do this in full generality, it must be possible to change
the kernel's idea of the flow key separately from the userspace version.

This commit takes one step in that direction by making the kernel report
its idea of the flow that a packet belongs to whenever it passes a packet
up to userspace.  This means that userspace can intelligently figure out
what to do:

   - If userspace's notion of the flow for the packet matches the kernel's,
     then nothing special is necessary.

   - If the kernel has a more specific notion for the flow than userspace,
     for example if the kernel decoded IPv6 headers but userspace stopped
     at the Ethernet type (because it does not understand IPv6), then again
     nothing special is necessary: userspace can still set up the flow in
     the usual way.

   - If userspace has a more specific notion for the flow than the kernel,
     for example if userspace decoded an IPv6 header but the kernel
     stopped at the Ethernet type, then userspace can forward the packet
     manually, without setting up a flow in the kernel.  (This case is
     bad from a performance point of view, but at least it is correct.)

This commit does not actually make userspace flexible enough to handle
changes in the kernel flow key structure, although userspace does now
have enough information to do that intelligently.  This will have to wait
for later commits.

This commit is bigger than it would otherwise be because it is rolled
together with changing "struct odp_msg" to a sequence of Netlink
attributes.  The alternative, to do each of those changes in a separate
patch, seemed like overkill because it meant that either we would have to
introduce and then kill off Netlink attributes for in_port and tun_id, if
Netlink conversion went first, or shove yet another variable-length header
into the stuff already after odp_msg, if adding the flow key to odp_msg
went first.

This commit will slow down performance of checksumming packets sent up to
userspace.  I'm not entirely pleased with how I did it.  I considered a
couple of alternatives, but none of them seemed that much better.
Suggestions welcome.  Not changing anything wasn't an option,
unfortunately.  At any rate some slowdown will become unavoidable when OVS
actually starts using Netlink instead of just Netlink framing.

(Actually, I thought of one option where we could avoid that: make
userspace do the checksum instead, by passing csum_start and csum_offset as
part of what goes to userspace.  But that's not perfect either.)

Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 datapath/actions.c                      |   60 +++++----
 datapath/datapath.c                     |  232 +++++++++++--------------------
 datapath/datapath.h                     |   20 +++-
 datapath/flow.c                         |    2 +-
 datapath/flow.h                         |    1 +
 include/openvswitch/datapath-protocol.h |   61 ++++-----
 lib/dpif-linux.c                        |   73 ++++++++--
 lib/dpif-netdev.c                       |   96 ++++++++-----
 lib/dpif-provider.h                     |   24 ++--
 lib/dpif.c                              |   61 ++++----
 lib/dpif.h                              |   32 +++-
 ofproto/ofproto-sflow.c                 |   47 ++-----
 ofproto/ofproto-sflow.h                 |    6 +-
 ofproto/ofproto.c                       |  225 ++++++++++++------------------
 14 files changed, 457 insertions(+), 483 deletions(-)

diff --git a/datapath/actions.c b/datapath/actions.c
index 511937e..c61e5ff 100644
--- a/datapath/actions.c
+++ b/datapath/actions.c
@@ -360,12 +360,22 @@ error:
 	kfree_skb(skb);
 }
 
-static int output_control(struct datapath *dp, struct sk_buff *skb, u64 arg)
+static int output_control(struct datapath *dp, struct sk_buff *skb, u64 arg,
+			  const struct sw_flow_key *key)
 {
+	struct dp_upcall_info upcall;
+
 	skb = skb_clone(skb, GFP_ATOMIC);
 	if (!skb)
 		return -ENOMEM;
-	return dp_output_control(dp, skb, _ODPL_ACTION_NR, arg);
+
+	upcall.type = _ODPL_ACTION_NR;
+	upcall.key = key;
+	upcall.userdata = arg;
+	upcall.sample_pool = 0;
+	upcall.actions = NULL;
+	upcall.actions_len = 0;
+	return dp_upcall(dp, skb, &upcall);
 }
 
 /* Execute a list of actions against 'skb'. */
@@ -394,7 +404,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
 			break;
 
 		case ODPAT_CONTROLLER:
-			err = output_control(dp, skb, nla_get_u64(a));
+			err = output_control(dp, skb, nla_get_u64(a), key);
 			if (err) {
 				kfree_skb(skb);
 				return err;
@@ -467,25 +477,32 @@ exit:
 	return 0;
 }
 
-/* Send a copy of this packet up to the sFlow agent, along with extra
- * information about what happened to it. */
 static void sflow_sample(struct datapath *dp, struct sk_buff *skb,
-			 const struct nlattr *a, u32 actions_len,
-			 struct vport *vport)
+			 const struct sw_flow_key *key,
+			 const struct nlattr *a, u32 actions_len)
 {
-	struct odp_sflow_sample_header *hdr;
-	unsigned int hdrlen = sizeof(struct odp_sflow_sample_header);
 	struct sk_buff *nskb;
+	struct vport *p = OVS_CB(skb)->vport;
+	struct dp_upcall_info upcall;
+
+	if (unlikely(!p))
+		return;
 
-	nskb = skb_copy_expand(skb, actions_len + hdrlen, 0, GFP_ATOMIC);
-	if (!nskb)
+	atomic_inc(&p->sflow_pool);
+	if (net_random() >= dp->sflow_probability)
 		return;
 
-	memcpy(__skb_push(nskb, actions_len), a, actions_len);
-	hdr = (struct odp_sflow_sample_header*)__skb_push(nskb, hdrlen);
-	hdr->actions_len = actions_len;
-	hdr->sample_pool = atomic_read(&vport->sflow_pool);
-	dp_output_control(dp, nskb, _ODPL_SFLOW_NR, 0);
+	nskb = skb_clone(skb, GFP_ATOMIC);
+	if (unlikely(!nskb))
+		return;
+
+	upcall.type = _ODPL_SFLOW_NR;
+	upcall.key = key;
+	upcall.userdata = 0;
+	upcall.sample_pool = atomic_read(&p->sflow_pool);
+	upcall.actions = a;
+	upcall.actions_len = actions_len;
+	dp_upcall(dp, nskb, &upcall);
 }
 
 /* Execute a list of actions against 'skb'. */
@@ -493,15 +510,8 @@ int execute_actions(struct datapath *dp, struct sk_buff *skb,
 		    const struct sw_flow_key *key,
 		    const struct nlattr *actions, u32 actions_len)
 {
-	if (dp->sflow_probability) {
-		struct vport *p = OVS_CB(skb)->vport;
-		if (p) {
-			atomic_inc(&p->sflow_pool);
-			if (dp->sflow_probability == UINT_MAX ||
-			    net_random() < dp->sflow_probability)
-				sflow_sample(dp, skb, actions, actions_len, p);
-		}
-	}
+	if (dp->sflow_probability)
+		sflow_sample(dp, skb, key, actions, actions_len);
 
 	OVS_CB(skb)->tun_id = 0;
 
diff --git a/datapath/datapath.c b/datapath/datapath.c
index 3fd7666..64b5193 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -510,8 +510,15 @@ void dp_process_received_packet(struct vport *p, struct sk_buff *skb)
 		flow_node = tbl_lookup(rcu_dereference(dp->table), &key,
 					flow_hash(&key), flow_cmp);
 		if (unlikely(!flow_node)) {
-			dp_output_control(dp, skb, _ODPL_MISS_NR,
-					 (__force u64)OVS_CB(skb)->tun_id);
+			struct dp_upcall_info upcall;
+
+			upcall.type = _ODPL_MISS_NR;
+			upcall.key = &key;
+			upcall.userdata = 0;
+			upcall.sample_pool = 0;
+			upcall.actions = NULL;
+			upcall.actions_len = 0;
+			dp_upcall(dp, skb, &upcall);
 			stats_counter_off = offsetof(struct dp_stats_percpu, n_missed);
 			goto out;
 		}
@@ -560,10 +567,26 @@ out:
 	local_bh_enable();
 }
 
+static void copy_and_csum_skb(struct sk_buff *skb, void *to)
+{
+	u16 csum_start, csum_offset;
+	__wsum csum;
+
+	get_skb_csum_pointers(skb, &csum_start, &csum_offset);
+	csum_start -= skb_headroom(skb);
+	BUG_ON(csum_start >= skb_headlen(skb));
+
+	skb_copy_bits(skb, 0, to, csum_start);
+
+	csum = skb_copy_and_csum_bits(skb, csum_start, to + csum_start,
+				      skb->len - csum_start, 0);
+	*(__sum16 *)(to + csum_start + csum_offset) = csum_fold(csum);
+}
+
 /* Append each packet in 'skb' list to 'queue'.  There will be only one packet
  * unless we broke up a GSO packet. */
-static int queue_control_packets(struct sk_buff *skb, struct sk_buff_head *queue,
-				 int queue_no, u64 arg)
+static int queue_control_packets(struct datapath *dp, struct sk_buff *skb,
+				 const struct dp_upcall_info *upcall_info)
 {
 	struct sk_buff *nskb;
 	int port_no;
@@ -575,22 +598,58 @@ static int queue_control_packets(struct sk_buff *skb, struct sk_buff_head *queue
 		port_no = ODPP_LOCAL;
 
 	do {
-		struct odp_msg *header;
+		struct odp_upcall *upcall;
+		struct sk_buff *user_skb; /* to be queued to userspace */
+		struct nlattr *nla;
+		unsigned int len;
 
 		nskb = skb->next;
 		skb->next = NULL;
 
-		err = skb_cow(skb, sizeof *header);
-		if (err)
+		len = sizeof(struct odp_upcall);
+		len += nla_total_size(skb->len);
+		len += nla_total_size(FLOW_BUFSIZE);
+		if (upcall_info->userdata)
+			len += nla_total_size(8);
+		if (upcall_info->sample_pool)
+			len += nla_total_size(4);
+		if (upcall_info->actions_len)
+			len += nla_total_size(upcall_info->actions_len);
+
+		user_skb = alloc_skb(len, GFP_ATOMIC);
+		if (!user_skb)
 			goto err_kfree_skbs;
 
-		header = (struct odp_msg*)__skb_push(skb, sizeof *header);
-		header->type = queue_no;
-		header->length = skb->len;
-		header->port = port_no;
-		header->arg = arg;
-		skb_queue_tail(queue, skb);
+		upcall = (struct odp_upcall *)__skb_put(user_skb, sizeof(*upcall));
+		upcall->type = upcall_info->type;
+
+		nla = nla_nest_start(user_skb, ODPUT_KEY);
+		flow_to_nlattrs(upcall_info->key, user_skb);
+		nla_nest_end(user_skb, nla);
+
+		if (upcall_info->userdata)
+			nla_put_u64(user_skb, ODPUT_USERDATA, upcall_info->userdata);
+		if (upcall_info->sample_pool)
+			nla_put_u32(user_skb, ODPUT_SAMPLE_POOL, upcall_info->sample_pool);
+		if (upcall_info->actions_len) {
+			const struct nlattr *actions = upcall_info->actions;
+			u32 actions_len = upcall_info->actions_len;
+
+			nla = nla_nest_start(user_skb, ODPUT_ACTIONS);
+			memcpy(__skb_put(user_skb, actions_len), actions, actions_len);
+			nla_nest_end(user_skb, nla);
+		}
 
+		nla = __nla_reserve(user_skb, ODPUT_PACKET, skb->len);
+		if (skb->ip_summed == CHECKSUM_PARTIAL)
+			copy_and_csum_skb(skb, nla_data(nla));
+		else
+			skb_copy_bits(skb, 0, nla_data(nla), skb->len);
+
+		upcall->length = user_skb->len;
+		skb_queue_tail(&dp->queues[upcall_info->type], user_skb);
+
+		kfree_skb(skb);
 		skb = nskb;
 	} while (skb);
 	return 0;
@@ -604,16 +663,16 @@ err_kfree_skbs:
 	return err;
 }
 
-int dp_output_control(struct datapath *dp, struct sk_buff *skb, int queue_no,
-		      u64 arg)
+int dp_upcall(struct datapath *dp, struct sk_buff *skb, const struct dp_upcall_info *upcall_info)
 {
 	struct dp_stats_percpu *stats;
 	struct sk_buff_head *queue;
 	int err;
 
 	WARN_ON_ONCE(skb_shared(skb));
-	BUG_ON(queue_no != _ODPL_MISS_NR && queue_no != _ODPL_ACTION_NR && queue_no != _ODPL_SFLOW_NR);
-	queue = &dp->queues[queue_no];
+	BUG_ON(upcall_info->type >= DP_N_QUEUES);
+
+	queue = &dp->queues[upcall_info->type];
 	err = -ENOBUFS;
 	if (skb_queue_len(queue) >= DP_MAX_QUEUE_LEN)
 		goto err_kfree_skb;
@@ -637,7 +696,7 @@ int dp_output_control(struct datapath *dp, struct sk_buff *skb, int queue_no,
 		}
 	}
 
-	err = queue_control_packets(skb, queue, queue_no, arg);
+	err = queue_control_packets(dp, skb, upcall_info);
 	wake_up_interruptible(&dp->waitqueue);
 	return err;
 
@@ -1806,100 +1865,6 @@ exit:
 }
 #endif
 
-/* Unfortunately this function is not exported so this is a verbatim copy
- * from net/core/datagram.c in 2.6.30. */
-static int skb_copy_and_csum_datagram(const struct sk_buff *skb, int offset,
-				      u8 __user *to, int len,
-				      __wsum *csump)
-{
-	int start = skb_headlen(skb);
-	int pos = 0;
-	int i, copy = start - offset;
-
-	/* Copy header. */
-	if (copy > 0) {
-		int err = 0;
-		if (copy > len)
-			copy = len;
-		*csump = csum_and_copy_to_user(skb->data + offset, to, copy,
-					       *csump, &err);
-		if (err)
-			goto fault;
-		if ((len -= copy) == 0)
-			return 0;
-		offset += copy;
-		to += copy;
-		pos = copy;
-	}
-
-	for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
-		int end;
-
-		WARN_ON(start > offset + len);
-
-		end = start + skb_shinfo(skb)->frags[i].size;
-		if ((copy = end - offset) > 0) {
-			__wsum csum2;
-			int err = 0;
-			u8  *vaddr;
-			skb_frag_t *frag = &skb_shinfo(skb)->frags[i];
-			struct page *page = frag->page;
-
-			if (copy > len)
-				copy = len;
-			vaddr = kmap(page);
-			csum2 = csum_and_copy_to_user(vaddr +
-							frag->page_offset +
-							offset - start,
-						      to, copy, 0, &err);
-			kunmap(page);
-			if (err)
-				goto fault;
-			*csump = csum_block_add(*csump, csum2, pos);
-			if (!(len -= copy))
-				return 0;
-			offset += copy;
-			to += copy;
-			pos += copy;
-		}
-		start = end;
-	}
-
-	if (skb_shinfo(skb)->frag_list) {
-		struct sk_buff *list = skb_shinfo(skb)->frag_list;
-
-		for (; list; list=list->next) {
-			int end;
-
-			WARN_ON(start > offset + len);
-
-			end = start + list->len;
-			if ((copy = end - offset) > 0) {
-				__wsum csum2 = 0;
-				if (copy > len)
-					copy = len;
-				if (skb_copy_and_csum_datagram(list,
-							       offset - start,
-							       to, copy,
-							       &csum2))
-					goto fault;
-				*csump = csum_block_add(*csump, csum2, pos);
-				if ((len -= copy) == 0)
-					return 0;
-				offset += copy;
-				to += copy;
-				pos += copy;
-			}
-			start = end;
-		}
-	}
-	if (!len)
-		return 0;
-
-fault:
-	return -EFAULT;
-}
-
 static ssize_t openvswitch_read(struct file *f, char __user *buf,
 				size_t nbytes, loff_t *ppos)
 {
@@ -1907,7 +1872,7 @@ static ssize_t openvswitch_read(struct file *f, char __user *buf,
 	int dp_idx = iminor(f->f_dentry->d_inode);
 	struct datapath *dp = get_dp_locked(dp_idx);
 	struct sk_buff *skb;
-	size_t copy_bytes, tot_copy_bytes;
+	struct iovec iov;
 	int retval;
 
 	if (!dp)
@@ -1944,44 +1909,11 @@ static ssize_t openvswitch_read(struct file *f, char __user *buf,
 success:
 	mutex_unlock(&dp->mutex);
 
-	copy_bytes = tot_copy_bytes = min_t(size_t, skb->len, nbytes);
-
-	retval = 0;
-	if (skb->ip_summed == CHECKSUM_PARTIAL) {
-		if (copy_bytes == skb->len) {
-			__wsum csum = 0;
-			u16 csum_start, csum_offset;
-
-			get_skb_csum_pointers(skb, &csum_start, &csum_offset);
-			csum_start -= skb_headroom(skb);
-
-			BUG_ON(csum_start >= skb_headlen(skb));
-			retval = skb_copy_and_csum_datagram(skb, csum_start, buf + csum_start,
-							    copy_bytes - csum_start, &csum);
-			if (!retval) {
-				__sum16 __user *csump;
-
-				copy_bytes = csum_start;
-				csump = (__sum16 __user *)(buf + csum_start + csum_offset);
-
-				BUG_ON((char __user *)csump + sizeof(__sum16) >
-					buf + nbytes);
-				put_user(csum_fold(csum), csump);
-			}
-		} else
-			retval = skb_checksum_help(skb);
-	}
-
-	if (!retval) {
-		struct iovec iov;
-
-		iov.iov_base = buf;
-		iov.iov_len = copy_bytes;
-		retval = skb_copy_datagram_iovec(skb, 0, &iov, iov.iov_len);
-	}
-
+	iov.iov_base = buf;
+	iov.iov_len = min_t(size_t, skb->len, nbytes);
+	retval = skb_copy_datagram_iovec(skb, 0, &iov, iov.iov_len);
 	if (!retval)
-		retval = tot_copy_bytes;
+		retval = skb->len;
 
 	kfree_skb(skb);
 	return retval;
diff --git a/datapath/datapath.h b/datapath/datapath.h
index e4c6534..b370caa 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -121,12 +121,30 @@ struct ovs_skb_cb {
 };
 #define OVS_CB(skb) ((struct ovs_skb_cb *)(skb)->cb)
 
+/**
+ * struct dp_upcall - metadata to include with a packet to send to userspace
+ * @type: One of %_ODPL_*_NR.
+ * @key: Becomes %ODPUT_KEY.  Must be nonnull.
+ * @userdata: Becomes %ODPUT_USERDATA if nonzero.
+ * @sample_pool: Becomes %ODPUT_SAMPLE_POOL if nonzero.
+ * @actions: Becomes %ODPUT_ACTIONS if nonnull.
+ * @actions_len: Number of bytes in @actions.
+*/
+struct dp_upcall_info {
+	u32 type;
+	const struct sw_flow_key *key;
+	u64 userdata;
+	u32 sample_pool;
+	const struct nlattr *actions;
+	u32 actions_len;
+};
+
 extern struct notifier_block dp_device_notifier;
 extern int (*dp_ioctl_hook)(struct net_device *dev, struct ifreq *rq, int cmd);
 
 void dp_process_received_packet(struct vport *, struct sk_buff *);
 int dp_detach_port(struct vport *);
-int dp_output_control(struct datapath *, struct sk_buff *, int, u64 arg);
+int dp_upcall(struct datapath *, struct sk_buff *, const struct dp_upcall_info *);
 int dp_min_mtu(const struct datapath *dp);
 void set_internal_devs_mtu(const struct datapath *dp);
 
diff --git a/datapath/flow.c b/datapath/flow.c
index 1c2b677..2552c0f 100644
--- a/datapath/flow.c
+++ b/datapath/flow.c
@@ -573,7 +573,7 @@ static int flow_from_nlattrs(struct sw_flow_key *swkey, const struct nlattr *key
 	return -EINVAL;
 }
 
-static u32 flow_to_nlattrs(const struct sw_flow_key *swkey, struct sk_buff *skb)
+u32 flow_to_nlattrs(const struct sw_flow_key *swkey, struct sk_buff *skb)
 {
 	struct odp_key_ethernet *eth_key;
 
diff --git a/datapath/flow.h b/datapath/flow.h
index f09f76c..5e5ae5e 100644
--- a/datapath/flow.h
+++ b/datapath/flow.h
@@ -100,6 +100,7 @@ int flow_cmp(const struct tbl_node *, void *target);
  */
 #define FLOW_BUFSIZE 96
 
+u32 flow_to_nlattrs(const struct sw_flow_key *, struct sk_buff *);
 int flow_copy_from_user(struct sw_flow_key *, const struct nlattr __user *ukey, u32 key_len);
 int flow_copy_to_user(struct nlattr __user *ukey, const struct sw_flow_key *, u32 key_len);
 
diff --git a/include/openvswitch/datapath-protocol.h b/include/openvswitch/datapath-protocol.h
index 273ca98..aee83c5 100644
--- a/include/openvswitch/datapath-protocol.h
+++ b/include/openvswitch/datapath-protocol.h
@@ -141,47 +141,36 @@ struct odp_stats {
 #define ODPL_SFLOW      (1 << _ODPL_SFLOW_NR)
 #define ODPL_ALL        (ODPL_MISS | ODPL_ACTION | ODPL_SFLOW)
 
-/**
- * struct odp_msg - format of messages read from datapath fd.
- * @length: Total length of message, including this header.
- * @type: One of the %_ODPL_* constants.
- * @port: Port that received the packet embedded in this message.
- * @arg: Argument value whose meaning depends on @type.
- *
- * For @type == %_ODPL_MISS_NR, the header is followed by packet data.  The
- * @arg member is the ID (in network byte order) of the tunnel that
- * encapsulated this packet. It is 0 if the packet was not received on a tunnel.
- *
- * For @type == %_ODPL_ACTION_NR, the header is followed by packet data.  The
- * @arg member is copied from the %ODPAT_CONTROLLER action that caused the
- * &struct odp_msg to be composed.
- *
- * For @type == %_ODPL_SFLOW_NR, the header is followed by &struct
- * odp_sflow_sample_header, then by a series of Netlink attributes (whose
- * length is specified in &struct odp_sflow_sample_header), then by packet
- * data.
- */
-struct odp_msg {
-    uint32_t length;
-    uint16_t type;
-    uint16_t port;
-    __aligned_u64 arg;
+enum odp_upcall_type {
+	ODPUT_UNSPEC,
+	ODPUT_PACKET,		/* Packet data. */
+	ODPUT_KEY,		/* Nested ODPKT_* attributes with flow key. */
+	ODPUT_USERDATA,		/* 64-bit data from ODPAT_CONTROLLER. */
+	ODPUT_SAMPLE_POOL,	/* # of sampling candidate packets so far. */
+	ODPUT_ACTIONS,		/* Nested ODPAT_* attributes with actions. */
+	__ODPUT_MAX,
 };
 
+#define ODPUT_MAX (__ODPUT_MAX - 1)
+
 /**
- * struct odp_sflow_sample_header - header added to sFlow sampled packet.
- * @sample_pool: Number of packets that were candidates for sFlow sampling,
- * regardless of whether they were actually chosen and sent down to userspace.
- * @actions_len: Number of bytes of actions immediately following this header.
+ * struct odp_upcall - header for packets passed up from kernel to userspace.
+ * @length: Length of complete message, including this header.
+ * @type: One of the %_ODPL_* constants.
+ *
+ * The header is followed by a sequence of Netlink attributes.  The
+ * %ODPUT_PACKET and %ODPUT_KEY attributes are always present.  When @type ==
+ * %_ODPL_ACTION_NR, the %ODPUT_USERDATA attribute is included if it would be
+ * nonzero.  When @type == %_ODPL_SFLOW_NR, the %ODPUT_SAMPLE_POOL and
+ * %ODPUT_ACTIONS attributes are included.
  *
- * This header follows &struct odp_msg when that structure's @type is
- * %_ODPL_SFLOW_NR, and it is itself followed by a series of Netlink attributes
- * (the number of bytes of which is specified in @actions_len) and then by
- * packet data.
+ * For @type of %_ODPL_ACTION_NR, %ODPUT_PACKET reflects changes made by
+ * actions preceding %ODPAT_CONTROLLER, but %ODPUT_KEY is the flow key
+ * extracted from the packet as originally received.
  */
-struct odp_sflow_sample_header {
-    uint32_t sample_pool;
-    uint32_t actions_len;
+struct odp_upcall {
+	uint32_t length;
+	uint32_t type;
 };
 
 #define VPORT_TYPE_SIZE     16
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index 6664145..ea4cb75 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -36,8 +36,10 @@
 #include "dpif-provider.h"
 #include "netdev.h"
 #include "netdev-vport.h"
+#include "netlink.h"
 #include "ofpbuf.h"
 #include "openvswitch/tunnel.h"
+#include "packets.h"
 #include "poll-loop.h"
 #include "rtnetlink.h"
 #include "shash.h"
@@ -470,14 +472,59 @@ dpif_linux_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
 }
 
 static int
-dpif_linux_recv(struct dpif *dpif_, struct ofpbuf **bufp)
+parse_odp_upcall(struct ofpbuf *buf, struct dpif_upcall *upcall)
+{
+    static const struct nl_policy odp_upcall_policy[] = {
+        /* Always present. */
+        [ODPUT_PACKET] = { .type = NL_A_UNSPEC, .min_len = ETH_HEADER_LEN },
+        [ODPUT_KEY] = { .type = NL_A_NESTED },
+
+        /* _ODPL_ACTION_NR only. */
+        [ODPUT_USERDATA] = { .type = NL_A_U64, .optional = true },
+
+        /* _ODPL_SFLOW_NR only. */
+        [ODPUT_SAMPLE_POOL] = { .type = NL_A_U32, .optional = true },
+        [ODPUT_ACTIONS] = { .type = NL_A_NESTED, .optional = true },
+    };
+
+    struct odp_upcall *odp_upcall = buf->data;
+    struct nlattr *a[ARRAY_SIZE(odp_upcall_policy)];
+
+    if (!nl_policy_parse(buf, sizeof *odp_upcall, odp_upcall_policy,
+                         a, ARRAY_SIZE(odp_upcall_policy))) {
+        return EINVAL;
+    }
+
+    memset(upcall, 0, sizeof *upcall);
+    upcall->type = odp_upcall->type;
+    upcall->packet = buf;
+    upcall->packet->data = (void *) nl_attr_get(a[ODPUT_PACKET]);
+    upcall->packet->size = nl_attr_get_size(a[ODPUT_PACKET]);
+    upcall->key = (void *) nl_attr_get(a[ODPUT_KEY]);
+    upcall->key_len = nl_attr_get_size(a[ODPUT_KEY]);
+    upcall->userdata = (a[ODPUT_USERDATA]
+                        ? nl_attr_get_u64(a[ODPUT_USERDATA])
+                        : 0);
+    upcall->sample_pool = (a[ODPUT_SAMPLE_POOL]
+                        ? nl_attr_get_u32(a[ODPUT_SAMPLE_POOL])
+                           : 0);
+    if (a[ODPUT_ACTIONS]) {
+        upcall->actions = (void *) nl_attr_get(a[ODPUT_ACTIONS]);
+        upcall->actions_len = nl_attr_get_size(a[ODPUT_ACTIONS]);
+    }
+
+    return 0;
+}
+
+static int
+dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     struct ofpbuf *buf;
     int retval;
     int error;
 
-    buf = ofpbuf_new_with_headroom(65536, DPIF_RECV_MSG_PADDING);
+    buf = ofpbuf_new(65536);
     retval = read(dpif->fd, ofpbuf_tail(buf), ofpbuf_tailroom(buf));
     if (retval < 0) {
         error = errno;
@@ -485,30 +532,30 @@ dpif_linux_recv(struct dpif *dpif_, struct ofpbuf **bufp)
             VLOG_WARN_RL(&error_rl, "%s: read failed: %s",
                          dpif_name(dpif_), strerror(error));
         }
-    } else if (retval >= sizeof(struct odp_msg)) {
-        struct odp_msg *msg = buf->data;
-        if (msg->length <= retval) {
-            buf->size += retval;
-            *bufp = buf;
-            return 0;
+    } else if (retval >= sizeof(struct odp_upcall)) {
+        struct odp_upcall *odp_upcall = buf->data;
+        buf->size += retval;
+
+        if (odp_upcall->length <= retval) {
+            error = parse_odp_upcall(buf, upcall);
         } else {
             VLOG_WARN_RL(&error_rl, "%s: discarding message truncated "
                          "from %"PRIu32" bytes to %d",
-                         dpif_name(dpif_), msg->length, retval);
+                         dpif_name(dpif_), odp_upcall->length, retval);
             error = ERANGE;
         }
     } else if (!retval) {
         VLOG_WARN_RL(&error_rl, "%s: unexpected end of file", dpif_name(dpif_));
         error = EPROTO;
     } else {
-        VLOG_WARN_RL(&error_rl,
-                     "%s: discarding too-short message (%d bytes)",
+        VLOG_WARN_RL(&error_rl, "%s: discarding too-short message (%d bytes)",
                      dpif_name(dpif_), retval);
         error = ERANGE;
     }
 
-    *bufp = NULL;
-    ofpbuf_delete(buf);
+    if (error) {
+        ofpbuf_delete(buf);
+    }
     return error;
 }
 
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 7e9e36c..788eac2 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -55,8 +55,6 @@
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
 /* Configuration parameters. */
-enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
-enum { MAX_QUEUE_LEN = 100 };   /* Maximum number of packets per queue. */
 enum { MAX_PORTS = 256 };       /* Maximum number of ports. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 
@@ -64,6 +62,17 @@ enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
  * headers to be aligned on a 4-byte boundary.  */
 enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
 
+/* Queues. */
+enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
+enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
+enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
+BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
+
+struct dp_netdev_queue {
+    struct dpif_upcall *upcalls[MAX_QUEUE_LEN];
+    unsigned int head, tail;
+};
+
 /* Datapath based on the network device interface from netdev.h. */
 struct dp_netdev {
     const struct dpif_class *class;
@@ -72,8 +81,7 @@ struct dp_netdev {
     bool destroyed;
 
     bool drop_frags;            /* Drop all IP fragments, if true. */
-    struct list queues[N_QUEUES]; /* Contain ofpbufs queued for dpif_recv(). */
-    size_t queue_len[N_QUEUES]; /* Number of packets in each queue. */
+    struct dp_netdev_queue queues[N_QUEUES];
     struct hmap flow_table;     /* Flow table. */
 
     /* Statistics. */
@@ -139,7 +147,8 @@ static int do_del_port(struct dp_netdev *, uint16_t port_no);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
 static int dp_netdev_output_control(struct dp_netdev *, const struct ofpbuf *,
-                                    int queue_no, int port_no, uint64_t arg);
+                                    int queue_no, const struct flow *,
+                                    uint64_t arg);
 static int dp_netdev_execute_actions(struct dp_netdev *,
                                      struct ofpbuf *, struct flow *,
                                      const struct nlattr *actions,
@@ -191,7 +200,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->open_cnt = 0;
     dp->drop_frags = false;
     for (i = 0; i < N_QUEUES; i++) {
-        list_init(&dp->queues[i]);
+        dp->queues[i].head = dp->queues[i].tail = 0;
     }
     hmap_init(&dp->flow_table);
     list_init(&dp->port_list);
@@ -248,7 +257,15 @@ dp_netdev_free(struct dp_netdev *dp)
         do_del_port(dp, port->port_no);
     }
     for (i = 0; i < N_QUEUES; i++) {
-        ofpbuf_list_delete(&dp->queues[i]);
+        struct dp_netdev_queue *q = &dp->queues[i];
+        unsigned int j;
+
+        for (j = q->tail; j != q->head; j++) {
+            struct dpif_upcall *upcall = q->upcalls[j & QUEUE_MASK];
+
+            ofpbuf_delete(upcall->packet);
+            free(upcall);
+        }
     }
     hmap_destroy(&dp->flow_table);
     free(dp->name);
@@ -931,7 +948,7 @@ dpif_netdev_recv_set_mask(struct dpif *dpif, int listen_mask)
     }
 }
 
-static int
+static struct dp_netdev_queue *
 find_nonempty_queue(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
@@ -940,23 +957,22 @@ find_nonempty_queue(struct dpif *dpif)
     int i;
 
     for (i = 0; i < N_QUEUES; i++) {
-        struct list *queue = &dp->queues[i];
-        if (!list_is_empty(queue) && mask & (1u << i)) {
-            return i;
+        struct dp_netdev_queue *q = &dp->queues[i];
+        if (q->head != q->tail && mask & (1u << i)) {
+            return q;
         }
     }
-    return -1;
+    return NULL;
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
+dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall)
 {
-    int queue_idx = find_nonempty_queue(dpif);
-    if (queue_idx >= 0) {
-        struct dp_netdev *dp = get_dp_netdev(dpif);
-
-        *bufp = ofpbuf_from_list(list_pop_front(&dp->queues[queue_idx]));
-        dp->queue_len[queue_idx]--;
+    struct dp_netdev_queue *q = find_nonempty_queue(dpif);
+    if (q) {
+        struct dpif_upcall *u = q->upcalls[q->tail++ & QUEUE_MASK];
+        *upcall = *u;
+        free(u);
 
         return 0;
     } else {
@@ -967,7 +983,7 @@ dpif_netdev_recv(struct dpif *dpif, struct ofpbuf **bufp)
 static void
 dpif_netdev_recv_wait(struct dpif *dpif)
 {
-    if (find_nonempty_queue(dpif) >= 0) {
+    if (find_nonempty_queue(dpif)) {
         poll_immediate_wake();
     } else {
         /* No messages ready to be received, and dp_wait() will ensure that we
@@ -1011,7 +1027,7 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
         dp->n_hit++;
     } else {
         dp->n_missed++;
-        dp_netdev_output_control(dp, packet, _ODPL_MISS_NR, port->port_no, 0);
+        dp_netdev_output_control(dp, packet, _ODPL_MISS_NR, &key, 0);
     }
 }
 
@@ -1212,27 +1228,33 @@ dp_netdev_output_port(struct dp_netdev *dp, struct ofpbuf *packet,
 
 static int
 dp_netdev_output_control(struct dp_netdev *dp, const struct ofpbuf *packet,
-                         int queue_no, int port_no, uint64_t arg)
+                         int queue_no, const struct flow *flow, uint64_t arg)
 {
-    struct odp_msg *header;
-    struct ofpbuf *msg;
-    size_t msg_size;
+    struct dp_netdev_queue *q = &dp->queues[queue_no];
+    struct dpif_upcall *upcall;
+    struct ofpbuf *buf;
+    size_t key_len;
 
-    if (dp->queue_len[queue_no] >= MAX_QUEUE_LEN) {
+    if (q->head - q->tail >= MAX_QUEUE_LEN) {
         dp->n_lost++;
         return ENOBUFS;
     }
 
-    msg_size = sizeof *header + packet->size;
-    msg = ofpbuf_new_with_headroom(msg_size, DPIF_RECV_MSG_PADDING);
-    header = ofpbuf_put_uninit(msg, sizeof *header);
-    header->type = queue_no;
-    header->length = msg_size;
-    header->port = port_no;
-    header->arg = arg;
-    ofpbuf_put(msg, packet->data, packet->size);
-    list_push_back(&dp->queues[queue_no], &msg->list_node);
-    dp->queue_len[queue_no]++;
+    buf = ofpbuf_new(ODPUTIL_FLOW_KEY_BYTES + 2 + packet->size);
+    odp_flow_key_from_flow(buf, flow);
+    key_len = buf->size;
+    ofpbuf_pull(buf, key_len);
+    ofpbuf_reserve(buf, 2);
+    ofpbuf_put(buf, packet->data, packet->size);
+
+    upcall = xzalloc(sizeof *upcall);
+    upcall->type = queue_no;
+    upcall->packet = buf;
+    upcall->key = buf->base;
+    upcall->key_len = key_len;
+    upcall->userdata = arg;
+
+    q->upcalls[++q->head & QUEUE_MASK] = upcall;
 
     return 0;
 }
@@ -1282,7 +1304,7 @@ dp_netdev_execute_actions(struct dp_netdev *dp,
 
         case ODPAT_CONTROLLER:
             dp_netdev_output_control(dp, packet, _ODPL_ACTION_NR,
-                                     key->in_port, nl_attr_get_u64(a));
+                                     key, nl_attr_get_u64(a));
             break;
 
         case ODPAT_SET_DL_TCI:
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index 6b66a5d..bb7b2a8 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -308,16 +308,22 @@ struct dpif_class {
     int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
                              uint32_t *priority);
 
-    /* Attempts to receive a message from 'dpif'.  If successful, stores the
-     * message into '*packetp'.  The message, if one is received, must begin
-     * with 'struct odp_msg' as a header, and must have at least
-     * DPIF_RECV_MSG_PADDING bytes of headroom (allocated using
-     * e.g. ofpbuf_reserve()).  Only messages of the types selected with the
-     * set_listen_mask member function should be received.
+    /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
+     * '*upcall'.  Only upcalls of the types selected with the set_listen_mask
+     * member function should be received.
      *
-     * This function must not block.  If no message is ready to be received
-     * when it is called, it should return EAGAIN without blocking. */
-    int (*recv)(struct dpif *dpif, struct ofpbuf **packetp);
+     * The caller takes ownership of the data that 'upcall' points to.
+     * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned
+     * by 'upcall->packet', so their memory cannot be freed separately.  (This
+     * is hardly a great way to do things but it works out OK for the dpif
+     * providers that exist so far.)
+     *
+     * For greatest efficiency, 'upcall->packet' should have at least
+     * offsetof(struct ofp_packet_in, data) bytes of headroom.
+     *
+     * This function must not block.  If no upcall is pending when it is
+     * called, it should return EAGAIN without blocking. */
+    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall);
 
     /* Arranges for the poll loop to wake up when 'dpif' has a message queued
      * to be received with the recv member function. */
diff --git a/lib/dpif.c b/lib/dpif.c
index 73c92cc..b54ecaa 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -959,39 +959,38 @@ dpif_set_sflow_probability(struct dpif *dpif, uint32_t probability)
     return error;
 }
 
-/* Attempts to receive a message from 'dpif'.  If successful, stores the
- * message into '*packetp'.  The message, if one is received, will begin with
- * 'struct odp_msg' as a header, and will have at least DPIF_RECV_MSG_PADDING
- * bytes of headroom.  Only messages of the types selected with
- * dpif_set_listen_mask() will ordinarily be received (but if a message type is
+/* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
+ * '*upcall'.  Only upcalls of the types selected with the set_listen_mask
+ * member function will ordinarily be received (but if a message type is
  * enabled and then later disabled, some stragglers might pop up).
  *
+ * The caller takes ownership of the data that 'upcall' points to.
+ * 'upcall->key' and 'upcall->actions' (if nonnull) point into data owned by
+ * 'upcall->packet', so their memory cannot be freed separately.  (This is
+ * hardly a great way to do things but it works out OK for the dpif providers
+ * and clients that exist so far.)
+ *
  * Returns 0 if successful, otherwise a positive errno value.  Returns EAGAIN
- * if no message is immediately available. */
+ * if no upcall is immediately available. */
 int
-dpif_recv(struct dpif *dpif, struct ofpbuf **packetp)
+dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall)
 {
-    int error = dpif->dpif_class->recv(dpif, packetp);
-    if (!error) {
-        struct ofpbuf *buf = *packetp;
-
-        assert(ofpbuf_headroom(buf) >= DPIF_RECV_MSG_PADDING);
-        if (VLOG_IS_DBG_ENABLED()) {
-            struct odp_msg *msg = buf->data;
-            void *payload = msg + 1;
-            size_t payload_len = buf->size - sizeof *msg;
-            char *s = ofp_packet_to_string(payload, payload_len, payload_len);
-            VLOG_DBG_RL(&dpmsg_rl, "%s: received %s message of length "
-                        "%zu on port %"PRIu16": %s", dpif_name(dpif),
-                        (msg->type == _ODPL_MISS_NR ? "miss"
-                         : msg->type == _ODPL_ACTION_NR ? "action"
-                         : msg->type == _ODPL_SFLOW_NR ? "sFlow"
-                         : "<unknown>"),
-                        payload_len, msg->port, s);
-            free(s);
-        }
-    } else {
-        *packetp = NULL;
+    int error = dpif->dpif_class->recv(dpif, upcall);
+    if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
+        struct flow flow;
+        char *s;
+
+        s = ofp_packet_to_string(upcall->packet->data,
+                                 upcall->packet->size, upcall->packet->size);
+        odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+
+        VLOG_DBG("%s: %s upcall on port %"PRIu16": %s", dpif_name(dpif),
+                 (upcall->type == _ODPL_MISS_NR ? "miss"
+                  : upcall->type == _ODPL_ACTION_NR ? "action"
+                  : upcall->type == _ODPL_SFLOW_NR ? "sFlow"
+                  : "<unknown>"),
+                 flow.in_port, s);
+        free(s);
     }
     return error;
 }
@@ -1013,12 +1012,12 @@ dpif_recv_purge(struct dpif *dpif)
     }
 
     for (i = 0; i < stats.max_miss_queue + stats.max_action_queue + stats.max_sflow_queue; i++) {
-        struct ofpbuf *buf;
-        error = dpif_recv(dpif, &buf);
+        struct dpif_upcall upcall;
+        error = dpif_recv(dpif, &upcall);
         if (error) {
             return error == EAGAIN ? 0 : error;
         }
-        ofpbuf_delete(buf);
+        ofpbuf_delete(upcall.packet);
     }
     return 0;
 }
diff --git a/lib/dpif.h b/lib/dpif.h
index 0a41b77..3e72539 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -92,19 +92,35 @@ int dpif_flow_dump_done(struct dpif_flow_dump *);
 int dpif_execute(struct dpif *, const struct nlattr *actions,
                  size_t actions_len, const struct ofpbuf *);
 
-/* Minimum number of bytes of headroom for a packet returned by dpif_recv()
- * member function.  This headroom allows "struct odp_msg" to be replaced by
- * "struct ofp_packet_in" without copying the buffer. */
-#define DPIF_RECV_MSG_PADDING \
-        ROUND_UP(sizeof(struct ofp_packet_in) - sizeof(struct odp_msg), 8)
-BUILD_ASSERT_DECL(sizeof(struct ofp_packet_in) > sizeof(struct odp_msg));
-BUILD_ASSERT_DECL(DPIF_RECV_MSG_PADDING % 8 == 0);
+/* A packet passed up from the datapath to userspace.
+ *
+ * If 'key' or 'actions' is nonnull, then it points into data owned by
+ * 'packet', so their memory cannot be freed separately.  (This is hardly a
+ * great way to do things but it works out OK for the dpif providers and
+ * clients that exist so far.)
+ */
+struct dpif_upcall {
+    uint32_t type;              /* One of _ODPL_*_NR. */
+
+    /* All types. */
+    struct ofpbuf *packet;      /* Packet data. */
+    struct nlattr *key;         /* Flow key. */
+    size_t key_len;             /* Length of 'key' in bytes. */
+
+    /* _ODPL_ACTION_NR only. */
+    uint64_t userdata;          /* Argument to ODPAT_CONTROLLER. */
+
+    /* _ODPL_SFLOW_NR only. */
+    uint32_t sample_pool;       /* # of sampling candidate packets so far. */
+    struct nlattr *actions;     /* Associated flow actions. */
+    size_t actions_len;
+};
 
 int dpif_recv_get_mask(const struct dpif *, int *listen_mask);
 int dpif_recv_set_mask(struct dpif *, int listen_mask);
 int dpif_get_sflow_probability(const struct dpif *, uint32_t *probability);
 int dpif_set_sflow_probability(struct dpif *, uint32_t probability);
-int dpif_recv(struct dpif *, struct ofpbuf **);
+int dpif_recv(struct dpif *, struct dpif_upcall *);
 int dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *);
 
diff --git a/ofproto/ofproto-sflow.c b/ofproto/ofproto-sflow.c
index 77119fd..b3206c1 100644
--- a/ofproto/ofproto-sflow.c
+++ b/ofproto/ofproto-sflow.c
@@ -477,46 +477,24 @@ ofproto_sflow_odp_port_to_ifindex(const struct ofproto_sflow *os,
 }
 
 void
-ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg)
+ofproto_sflow_received(struct ofproto_sflow *os,
+                       const struct dpif_upcall *upcall,
+                       const struct flow *flow)
 {
     SFL_FLOW_SAMPLE_TYPE fs;
     SFLFlow_sample_element hdrElem;
     SFLSampled_header *header;
     SFLFlow_sample_element switchElem;
     SFLSampler *sampler;
-    const struct odp_sflow_sample_header *hdr;
-    const struct nlattr *actions, *a;
     unsigned int left;
-    struct ofpbuf b;
+    struct nlattr *a;
     size_t n_outputs;
-    struct flow flow;
-
-    /* Pull odp_msg header. */
-    ofpbuf_use_const(&b, msg, msg->length);
-    ofpbuf_pull(&b, sizeof *msg);
-
-    /* Pull odp_sflow_sample_header. */
-    hdr = ofpbuf_try_pull(&b, sizeof *hdr);
-    if (!hdr) {
-        VLOG_WARN_RL(&rl, "missing odp_sflow_sample_header");
-        return;
-    }
-
-    /* Pull actions. */
-    actions = ofpbuf_try_pull(&b, hdr->actions_len);
-    if (!actions) {
-        VLOG_WARN_RL(&rl, "missing odp actions");
-        return;
-    }
-
-    /* Now only the payload is left. */
-    flow_extract(&b, 0, msg->port, &flow);
 
     /* Build a flow sample */
     memset(&fs, 0, sizeof fs);
-    fs.input = ofproto_sflow_odp_port_to_ifindex(os, msg->port);
+    fs.input = ofproto_sflow_odp_port_to_ifindex(os, flow->in_port);
     fs.output = 0;              /* Filled in correctly below. */
-    fs.sample_pool = hdr->sample_pool;
+    fs.sample_pool = upcall->sample_pool;
 
     /* We are going to give it to the sampler that represents this input port.
      * By implementing "ingress-only" sampling like this we ensure that we
@@ -535,17 +513,18 @@ ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg)
     header->header_protocol = SFLHEADER_ETHERNET_ISO8023;
     /* The frame_length should include the Ethernet FCS (4 bytes),
        but it has already been stripped,  so we need to add 4 here. */
-    header->frame_length = b.size + 4;
+    header->frame_length = upcall->packet->size + 4;
     /* Ethernet FCS stripped off. */
     header->stripped = 4;
-    header->header_length = MIN(b.size, sampler->sFlowFsMaximumHeaderSize);
-    header->header_bytes = b.data;
+    header->header_length = MIN(upcall->packet->size,
+                                sampler->sFlowFsMaximumHeaderSize);
+    header->header_bytes = upcall->packet->data;
 
     /* Add extended switch element. */
     memset(&switchElem, 0, sizeof(switchElem));
     switchElem.tag = SFLFLOW_EX_SWITCH;
-    switchElem.flowType.sw.src_vlan = vlan_tci_to_vid(flow.vlan_tci);
-    switchElem.flowType.sw.src_priority = vlan_tci_to_pcp(flow.vlan_tci);
+    switchElem.flowType.sw.src_vlan = vlan_tci_to_vid(flow->vlan_tci);
+    switchElem.flowType.sw.src_priority = vlan_tci_to_pcp(flow->vlan_tci);
      /* Initialize the output VLAN and priority to be the same as the input,
         but these fields can be overriden below if affected by an action. */
     switchElem.flowType.sw.dst_vlan = switchElem.flowType.sw.src_vlan;
@@ -553,7 +532,7 @@ ofproto_sflow_received(struct ofproto_sflow *os, struct odp_msg *msg)
 
     /* Figure out the output ports. */
     n_outputs = 0;
-    NL_ATTR_FOR_EACH_UNSAFE (a, left, actions, hdr->actions_len) {
+    NL_ATTR_FOR_EACH_UNSAFE (a, left, upcall->actions, upcall->actions_len) {
         ovs_be16 tci;
 
         switch (nl_attr_type(a)) {
diff --git a/ofproto/ofproto-sflow.h b/ofproto/ofproto-sflow.h
index df89765..6f9a93d 100644
--- a/ofproto/ofproto-sflow.h
+++ b/ofproto/ofproto-sflow.h
@@ -22,7 +22,8 @@
 #include "svec.h"
 
 struct dpif;
-struct odp_msg;
+struct dpif_upcall;
+struct flow;
 struct ofproto_sflow_options;
 
 struct ofproto_sflow *ofproto_sflow_create(struct dpif *);
@@ -39,6 +40,7 @@ void ofproto_sflow_del_port(struct ofproto_sflow *, uint16_t odp_port);
 void ofproto_sflow_run(struct ofproto_sflow *);
 void ofproto_sflow_wait(struct ofproto_sflow *);
 
-void ofproto_sflow_received(struct ofproto_sflow *, struct odp_msg *);
+void ofproto_sflow_received(struct ofproto_sflow *,
+                            const struct dpif_upcall *, const struct flow *);
 
 #endif /* ofproto/ofproto-sflow.h */
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index f9b86f7..87e9d46 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -340,8 +340,9 @@ static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
 static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
                      struct rconn_packet_counter *counter);
 
-static void send_packet_in(struct ofproto *, struct ofpbuf *odp_msg);
-static void do_send_packet_in(struct ofpbuf *odp_msg, void *ofconn);
+static void send_packet_in(struct ofproto *, struct dpif_upcall *,
+                           const struct flow *, bool clone);
+static void do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn);
 
 struct ofproto {
     /* Settings. */
@@ -412,7 +413,7 @@ static uint64_t pick_fallback_dpid(void);
 
 static int ofproto_expire(struct ofproto *);
 
-static void handle_odp_msg(struct ofproto *, struct ofpbuf *);
+static void handle_upcall(struct ofproto *, struct dpif_upcall *);
 
 static void handle_openflow(struct ofconn *, struct ofpbuf *);
 
@@ -1173,9 +1174,9 @@ ofproto_run1(struct ofproto *p)
     }
 
     for (i = 0; i < 50; i++) {
-        struct ofpbuf *buf;
+        struct dpif_upcall packet;
 
-        error = dpif_recv(p->dpif, &buf);
+        error = dpif_recv(p->dpif, &packet);
         if (error) {
             if (error == ENODEV) {
                 /* Someone destroyed the datapath behind our back.  The caller
@@ -1189,7 +1190,7 @@ ofproto_run1(struct ofproto *p)
             break;
         }
 
-        handle_odp_msg(p, buf);
+        handle_upcall(p, &packet);
     }
 
     while ((error = dpif_port_poll(p->dpif, &devname)) != EAGAIN) {
@@ -2050,7 +2051,7 @@ rule_has_out_port(const struct rule *rule, ovs_be16 out_port)
  *
  * Takes ownership of 'packet'. */
 static bool
-execute_odp_actions(struct ofproto *ofproto, uint16_t in_port,
+execute_odp_actions(struct ofproto *ofproto, const struct flow *flow,
                     const struct nlattr *odp_actions, size_t actions_len,
                     struct ofpbuf *packet)
 {
@@ -2059,15 +2060,18 @@ execute_odp_actions(struct ofproto *ofproto, uint16_t in_port,
         /* As an optimization, avoid a round-trip from userspace to kernel to
          * userspace.  This also avoids possibly filling up kernel packet
          * buffers along the way. */
-        struct odp_msg *msg;
+        struct dpif_upcall upcall;
 
-        msg = ofpbuf_push_uninit(packet, sizeof *msg);
-        msg->type = _ODPL_ACTION_NR;
-        msg->length = sizeof(struct odp_msg) + packet->size;
-        msg->port = in_port;
-        msg->arg = nl_attr_get_u64(odp_actions);
+        upcall.type = _ODPL_ACTION_NR;
+        upcall.packet = packet;
+        upcall.key = NULL;
+        upcall.key_len = 0;
+        upcall.userdata = nl_attr_get_u64(odp_actions);
+        upcall.sample_pool = 0;
+        upcall.actions = NULL;
+        upcall.actions_len = 0;
 
-        send_packet_in(ofproto, packet);
+        send_packet_in(ofproto, &upcall, flow, false);
 
         return true;
     } else {
@@ -2100,7 +2104,7 @@ facet_execute(struct ofproto *ofproto, struct facet *facet,
     assert(ofpbuf_headroom(packet) >= sizeof(struct ofp_packet_in));
 
     flow_extract_stats(&facet->flow, packet, &stats);
-    if (execute_odp_actions(ofproto, facet->flow.in_port,
+    if (execute_odp_actions(ofproto, &facet->flow,
                             facet->actions, facet->actions_len, packet)) {
         facet_update_stats(ofproto, facet, &stats);
         facet->used = time_msec();
@@ -2152,7 +2156,7 @@ rule_execute(struct ofproto *ofproto, struct rule *rule, uint16_t in_port,
     action_xlate_ctx_init(&ctx, ofproto, &flow, packet);
     odp_actions = xlate_actions(&ctx, rule->actions, rule->n_actions);
     size = packet->size;
-    if (execute_odp_actions(ofproto, in_port, odp_actions->data,
+    if (execute_odp_actions(ofproto, &flow, odp_actions->data,
                             odp_actions->size, packet)) {
         rule->used = time_msec();
         rule->packet_count++;
@@ -4340,29 +4344,26 @@ handle_openflow(struct ofconn *ofconn, struct ofpbuf *ofp_msg)
 }
 
 static void
-handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
+handle_miss_upcall(struct ofproto *p, struct dpif_upcall *upcall)
 {
-    struct odp_msg *msg = packet->data;
-    struct ofpbuf payload;
     struct facet *facet;
     struct flow flow;
 
-    ofpbuf_use_const(&payload, msg + 1, msg->length - sizeof *msg);
-    flow_extract(&payload, msg->arg, msg->port, &flow);
+    /* Obtain in_port and tun_id, at least. */
+    odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
 
-    packet->l2 = payload.l2;
-    packet->l3 = payload.l3;
-    packet->l4 = payload.l4;
-    packet->l7 = payload.l7;
+    /* Set header pointers in 'flow'. */
+    flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
 
     /* Check with in-band control to see if this packet should be sent
      * to the local port regardless of the flow table. */
-    if (in_band_msg_in_hook(p->in_band, &flow, &payload)) {
+    if (in_band_msg_in_hook(p->in_band, &flow, upcall->packet)) {
         struct ofpbuf odp_actions;
 
         ofpbuf_init(&odp_actions, 32);
         nl_msg_put_u32(&odp_actions, ODPAT_OUTPUT, ODPP_LOCAL);
-        dpif_execute(p->dpif, odp_actions.data, odp_actions.size, &payload);
+        dpif_execute(p->dpif, odp_actions.data, odp_actions.size,
+                     upcall->packet);
         ofpbuf_uninit(&odp_actions);
     }
 
@@ -4371,29 +4372,29 @@ handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
         struct rule *rule = rule_lookup(p, &flow);
         if (!rule) {
             /* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
-            struct ofport *port = get_port(p, msg->port);
+            struct ofport *port = get_port(p, flow.in_port);
             if (port) {
                 if (port->opp.config & OFPPC_NO_PACKET_IN) {
                     COVERAGE_INC(ofproto_no_packet_in);
                     /* XXX install 'drop' flow entry */
-                    ofpbuf_delete(packet);
+                    ofpbuf_delete(upcall->packet);
                     return;
                 }
             } else {
                 VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
-                             msg->port);
+                             flow.in_port);
             }
 
             COVERAGE_INC(ofproto_packet_in);
-            send_packet_in(p, packet);
+            send_packet_in(p, upcall, &flow, false);
             return;
         }
 
-        facet = facet_create(p, rule, &flow, packet);
+        facet = facet_create(p, rule, &flow, upcall->packet);
     } else if (!facet->may_install) {
         /* The facet is not installable, that is, we need to process every
          * packet, so process the current packet's actions into 'facet'. */
-        facet_make_actions(p, facet, packet);
+        facet_make_actions(p, facet, upcall->packet);
     }
 
     if (facet->rule->cr.priority == FAIL_OPEN_PRIORITY) {
@@ -4407,40 +4408,39 @@ handle_odp_miss_msg(struct ofproto *p, struct ofpbuf *packet)
          *
          * See the top-level comment in fail-open.c for more information.
          */
-        send_packet_in(p, ofpbuf_clone_with_headroom(packet,
-                                                     DPIF_RECV_MSG_PADDING));
+        send_packet_in(p, upcall, &flow, true);
     }
 
-    ofpbuf_pull(packet, sizeof *msg);
-    facet_execute(p, facet, packet);
+    facet_execute(p, facet, upcall->packet);
     facet_install(p, facet, false);
 }
 
 static void
-handle_odp_msg(struct ofproto *p, struct ofpbuf *packet)
+handle_upcall(struct ofproto *p, struct dpif_upcall *upcall)
 {
-    struct odp_msg *msg = packet->data;
+    struct flow flow;
 
-    switch (msg->type) {
+    switch (upcall->type) {
     case _ODPL_ACTION_NR:
         COVERAGE_INC(ofproto_ctlr_action);
-        send_packet_in(p, packet);
+        odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+        send_packet_in(p, upcall, &flow, false);
         break;
 
     case _ODPL_SFLOW_NR:
         if (p->sflow) {
-            ofproto_sflow_received(p->sflow, msg);
+            odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+            ofproto_sflow_received(p->sflow, upcall, &flow);
         }
-        ofpbuf_delete(packet);
+        ofpbuf_delete(upcall->packet);
         break;
 
     case _ODPL_MISS_NR:
-        handle_odp_miss_msg(p, packet);
+        handle_miss_upcall(p, upcall);
         break;
 
     default:
-        VLOG_WARN_RL(&rl, "received ODP message of unexpected type %"PRIu32,
-                     msg->type);
+        VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, upcall->type);
         break;
     }
 }
@@ -4795,150 +4795,103 @@ rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
     }
 }
 
-/* pinsched callback for sending 'packet' on 'ofconn'. */
+/* pinsched callback for sending 'ofp_packet_in' on 'ofconn'. */
 static void
-do_send_packet_in(struct ofpbuf *packet, void *ofconn_)
+do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn_)
 {
     struct ofconn *ofconn = ofconn_;
 
-    rconn_send_with_limit(ofconn->rconn, packet,
+    rconn_send_with_limit(ofconn->rconn, ofp_packet_in,
                           ofconn->packet_in_counter, 100);
 }
 
-/* Takes 'packet', which has been converted with do_convert_to_packet_in(), and
- * finalizes its content for sending on 'ofconn', and passes it to 'ofconn''s
- * packet scheduler for sending.
+/* Takes 'upcall', whose packet has the flow specified by 'flow', composes an
+ * OpenFlow packet-in message from it, and passes it to 'ofconn''s packet
+ * scheduler for sending.
  *
- * 'max_len' specifies the maximum number of bytes of the packet to send on
- * 'ofconn' (INT_MAX specifies no limit).
- *
- * If 'clone' is true, the caller retains ownership of 'packet'.  Otherwise,
- * ownership is transferred to this function. */
+ * If 'clone' is true, the caller retains ownership of 'upcall->packet'.
+ * Otherwise, ownership is transferred to this function. */
 static void
-schedule_packet_in(struct ofconn *ofconn, struct ofpbuf *packet, int max_len,
-                   bool clone)
+schedule_packet_in(struct ofconn *ofconn, struct dpif_upcall *upcall,
+                   const struct flow *flow, bool clone)
 {
+    enum { OPI_SIZE = offsetof(struct ofp_packet_in, data) };
     struct ofproto *ofproto = ofconn->ofproto;
-    struct ofp_packet_in *opi = packet->data;
-    uint16_t in_port = ofp_port_to_odp_port(ntohs(opi->in_port));
-    int send_len, trim_size;
+    struct ofp_packet_in *opi;
+    int total_len, send_len;
+    struct ofpbuf *packet;
     uint32_t buffer_id;
 
-    /* Get buffer. */
-    if (opi->reason == OFPR_ACTION) {
+    /* Get OpenFlow buffer_id. */
+    if (upcall->type == _ODPL_ACTION_NR) {
         buffer_id = UINT32_MAX;
     } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
         buffer_id = pktbuf_get_null();
     } else if (!ofconn->pktbuf) {
         buffer_id = UINT32_MAX;
     } else {
-        struct ofpbuf payload;
-
-        ofpbuf_use_const(&payload, opi->data,
-                         packet->size - offsetof(struct ofp_packet_in, data));
-        buffer_id = pktbuf_save(ofconn->pktbuf, &payload, in_port);
+        buffer_id = pktbuf_save(ofconn->pktbuf, upcall->packet, flow->in_port);
     }
 
     /* Figure out how much of the packet to send. */
-    send_len = ntohs(opi->total_len);
+    total_len = send_len = upcall->packet->size;
     if (buffer_id != UINT32_MAX) {
         send_len = MIN(send_len, ofconn->miss_send_len);
     }
-    send_len = MIN(send_len, max_len);
+    if (upcall->type == _ODPL_ACTION_NR) {
+        send_len = MIN(send_len, upcall->userdata);
+    }
 
-    /* Adjust packet length and clone if necessary. */
-    trim_size = offsetof(struct ofp_packet_in, data) + send_len;
+    /* Copy or steal buffer for OFPT_PACKET_IN. */
     if (clone) {
-        packet = ofpbuf_clone_data(packet->data, trim_size);
-        opi = packet->data;
+        packet = ofpbuf_clone_data_with_headroom(upcall->packet->data,
+                                                 send_len, OPI_SIZE);
     } else {
-        packet->size = trim_size;
+        packet = upcall->packet;
+        packet->size = send_len;
     }
 
-    /* Update packet headers. */
+    /* Add OFPT_PACKET_IN. */
+    opi = ofpbuf_push_zeros(packet, OPI_SIZE);
+    opi->header.version = OFP_VERSION;
+    opi->header.type = OFPT_PACKET_IN;
+    opi->total_len = htons(total_len);
+    opi->in_port = htons(odp_port_to_ofp_port(flow->in_port));
+    opi->reason = upcall->type == _ODPL_MISS_NR ? OFPR_NO_MATCH : OFPR_ACTION;
     opi->buffer_id = htonl(buffer_id);
     update_openflow_length(packet);
 
     /* Hand over to packet scheduler.  It might immediately call into
      * do_send_packet_in() or it might buffer it for a while (until a later
      * call to pinsched_run()). */
-    pinsched_send(ofconn->schedulers[opi->reason], in_port,
+    pinsched_send(ofconn->schedulers[opi->reason], flow->in_port,
                   packet, do_send_packet_in, ofconn);
 }
 
-/* Replace struct odp_msg header in 'packet' by equivalent struct
- * ofp_packet_in.  The odp_msg must have sufficient headroom to do so (e.g. as
- * returned by dpif_recv()).
- *
- * The conversion is not complete: the caller still needs to trim any unneeded
- * payload off the end of the buffer, set the length in the OpenFlow header,
- * and set buffer_id.  Those require us to know the controller settings and so
- * must be done on a per-controller basis.
- *
- * Returns the maximum number of bytes of the packet that should be sent to
- * the controller (INT_MAX if no limit). */
-static int
-do_convert_to_packet_in(struct ofpbuf *packet)
-{
-    struct odp_msg *msg = packet->data;
-    struct ofp_packet_in *opi;
-    uint8_t reason;
-    uint16_t total_len;
-    uint16_t in_port;
-    int max_len;
-
-    /* Extract relevant header fields */
-    if (msg->type == _ODPL_ACTION_NR) {
-        reason = OFPR_ACTION;
-        max_len = msg->arg;
-    } else {
-        reason = OFPR_NO_MATCH;
-        max_len = INT_MAX;
-    }
-    total_len = msg->length - sizeof *msg;
-    in_port = odp_port_to_ofp_port(msg->port);
-
-    /* Repurpose packet buffer by overwriting header. */
-    ofpbuf_pull(packet, sizeof(struct odp_msg));
-    opi = ofpbuf_push_zeros(packet, offsetof(struct ofp_packet_in, data));
-    opi->header.version = OFP_VERSION;
-    opi->header.type = OFPT_PACKET_IN;
-    opi->total_len = htons(total_len);
-    opi->in_port = htons(in_port);
-    opi->reason = reason;
-
-    return max_len;
-}
-
-/* Given 'packet' containing an odp_msg of type _ODPL_ACTION_NR or
- * _ODPL_MISS_NR, sends an OFPT_PACKET_IN message to each OpenFlow controller
- * as necessary according to their individual configurations.
- *
- * 'packet' must have sufficient headroom to convert it into a struct
- * ofp_packet_in (e.g. as returned by dpif_recv()).
+/* Given 'upcall', of type _ODPL_ACTION_NR or _ODPL_MISS_NR, sends an
+ * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
+ * their individual configurations.
  *
  * Takes ownership of 'packet'. */
 static void
-send_packet_in(struct ofproto *ofproto, struct ofpbuf *packet)
+send_packet_in(struct ofproto *ofproto, struct dpif_upcall *upcall,
+               const struct flow *flow, bool clone)
 {
     struct ofconn *ofconn, *prev;
-    int max_len;
-
-    max_len = do_convert_to_packet_in(packet);
 
     prev = NULL;
     LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
         if (ofconn_receives_async_msgs(ofconn)) {
             if (prev) {
-                schedule_packet_in(prev, packet, max_len, true);
+                schedule_packet_in(prev, upcall, flow, true);
             }
             prev = ofconn;
         }
     }
     if (prev) {
-        schedule_packet_in(prev, packet, max_len, false);
-    } else {
-        ofpbuf_delete(packet);
+        schedule_packet_in(prev, upcall, flow, clone);
+    } else if (!clone) {
+        ofpbuf_delete(upcall->packet);
     }
 }
 
-- 
1.7.1





More information about the dev mailing list