[ovs-dev] [PATCH v2 2/6] datapath: Use unicast Netlink sockets for upcalls.

Jesse Gross jesse at nicira.com
Fri Sep 23 21:20:13 UTC 2011


Currently we publish several multicast groups for upcalls and let
userspace sockets subscribe to them.  The benefit of this is mostly
that userspace is the one doing the subscription - the actual
multicast capability is not currently used and probably wouldn't be
even if we moved to a multiprocess model.  Despite the convenience,
multicast sockets have a number of disadvantages, primarily that
we only have a limited number of them so there could be collisions.
In addition, unicast sockets give additional flexibility to userspace
by allowing every object to potentially have a different socket
chosen by userspace for upcalls.  Finally, any future optimizations
for upcalls to reduce copying will likely not be compatible with
multicast anyways so disallowing it potentially simplifies things.

We also never unregistered the multicast groups registered for upcalls
and leaked them on module unload.  As a side effect, this solves that
problem.

Signed-off-by: Jesse Gross <jesse at nicira.com>
---
v2:
 - Fix typos.
 - Setting pid to 0 is now allowed to turn off upcalls.  This is used
   when listen_mask is 0.
 - Error handling is relaxed in dpif_linux_recv_set_mask().
---
 datapath/datapath.c                     |  103 +++++++++------------
 datapath/datapath.h                     |    2 +-
 datapath/flow.h                         |    1 +
 datapath/vport.c                        |    1 +
 datapath/vport.h                        |    4 +
 include/openvswitch/datapath-protocol.h |   22 ++++-
 lib/dpif-linux.c                        |  157 ++++++++++++++++--------------
 lib/dpif-linux.h                        |    1 +
 8 files changed, 153 insertions(+), 138 deletions(-)

diff --git a/datapath/datapath.c b/datapath/datapath.c
index 5fcf81b..31a1c18 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -361,45 +361,6 @@ static struct genl_family dp_packet_genl_family = {
 	.maxattr = OVS_PACKET_ATTR_MAX
 };
 
-/* Generic Netlink multicast groups for upcalls.
- *
- * We really want three unique multicast groups per datapath, but we can't even
- * get one, because genl_register_mc_group() takes genl_lock, which is also
- * held during Generic Netlink message processing, so trying to acquire
- * multicast groups during OVS_DP_NEW processing deadlocks.  Instead, we
- * preallocate a few groups and use them round-robin for datapaths.  Collision
- * isn't fatal--multicast listeners should check that the family is the one
- * that they want and discard others--but it wastes time and memory to receive
- * unwanted messages.
- */
-#define PACKET_N_MC_GROUPS 16
-static struct genl_multicast_group packet_mc_groups[PACKET_N_MC_GROUPS];
-
-static u32 packet_mc_group(int dp_ifindex, u8 cmd)
-{
-	u32 idx;
-	BUILD_BUG_ON_NOT_POWER_OF_2(PACKET_N_MC_GROUPS);
-
-	idx = jhash_2words(dp_ifindex, cmd, 0) & (PACKET_N_MC_GROUPS - 1);
-	return packet_mc_groups[idx].id;
-}
-
-static int packet_register_mc_groups(void)
-{
-	int i;
-
-	for (i = 0; i < PACKET_N_MC_GROUPS; i++) {
-		struct genl_multicast_group *group = &packet_mc_groups[i];
-		int error;
-
-		sprintf(group->name, "packet%d", i);
-		error = genl_register_mc_group(&dp_packet_genl_family, group);
-		if (error)
-			return error;
-	}
-	return 0;
-}
-
 int dp_upcall(struct datapath *dp, struct sk_buff *skb, const struct dp_upcall_info *upcall_info)
 {
 	struct dp_stats_percpu *stats;
@@ -448,8 +409,8 @@ static int queue_userspace_packets(struct datapath *dp, struct sk_buff *skb,
 				 const struct dp_upcall_info *upcall_info)
 {
 	int dp_ifindex;
-	u32 group;
 	struct sk_buff *nskb;
+	u32 pid;
 	int err;
 
 	dp_ifindex = get_dpifindex(dp);
@@ -459,7 +420,16 @@ static int queue_userspace_packets(struct datapath *dp, struct sk_buff *skb,
 		goto err_kfree_skbs;
 	}
 
-	group = packet_mc_group(dp_ifindex, upcall_info->cmd);
+	if (OVS_CB(skb)->flow)
+		pid = OVS_CB(skb)->flow->upcall_pid;
+	else
+		pid = OVS_CB(skb)->vport->upcall_pid;
+
+	if (pid == 0) {
+		err = -ENOTCONN;
+		nskb = skb->next;
+		goto err_kfree_skbs;
+	}
 
 	do {
 		struct ovs_header *upcall;
@@ -491,7 +461,6 @@ static int queue_userspace_packets(struct datapath *dp, struct sk_buff *skb,
 
 		user_skb = genlmsg_new(len, GFP_ATOMIC);
 		if (!user_skb) {
-			netlink_set_err(INIT_NET_GENL_SOCK, 0, group, -ENOBUFS);
 			err = -ENOMEM;
 			goto err_kfree_skbs;
 		}
@@ -522,7 +491,7 @@ static int queue_userspace_packets(struct datapath *dp, struct sk_buff *skb,
 		else
 			skb_copy_bits(skb, 0, nla_data(nla), skb->len);
 
-		err = genlmsg_multicast(user_skb, 0, group, GFP_ATOMIC);
+		err = genlmsg_unicast(&init_net, user_skb, pid);
 		if (err)
 			goto err_kfree_skbs;
 
@@ -702,6 +671,11 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
 
 	flow->hash = flow_hash(&flow->key, key_len);
 
+	if (a[OVS_PACKET_ATTR_UPCALL_PID])
+		flow->upcall_pid = nla_get_u32(a[OVS_PACKET_ATTR_UPCALL_PID]);
+	else
+		flow->upcall_pid = NETLINK_CB(skb).pid;
+
 	acts = flow_actions_alloc(a[OVS_PACKET_ATTR_ACTIONS]);
 	err = PTR_ERR(acts);
 	if (IS_ERR(acts))
@@ -740,6 +714,7 @@ static const struct nla_policy packet_policy[OVS_PACKET_ATTR_MAX + 1] = {
 	[OVS_PACKET_ATTR_PACKET] = { .type = NLA_UNSPEC },
 	[OVS_PACKET_ATTR_KEY] = { .type = NLA_NESTED },
 	[OVS_PACKET_ATTR_ACTIONS] = { .type = NLA_NESTED },
+	[OVS_PACKET_ATTR_UPCALL_PID] = { .type = NLA_U32 },
 };
 
 static struct genl_ops dp_packet_genl_ops[] = {
@@ -779,6 +754,7 @@ static void get_dp_stats(struct datapath *dp, struct ovs_dp_stats *stats)
 
 static const struct nla_policy flow_policy[OVS_FLOW_ATTR_MAX + 1] = {
 	[OVS_FLOW_ATTR_KEY] = { .type = NLA_NESTED },
+	[OVS_FLOW_ATTR_UPCALL_PID] = { .type = NLA_U32 },
 	[OVS_FLOW_ATTR_ACTIONS] = { .type = NLA_NESTED },
 	[OVS_FLOW_ATTR_CLEAR] = { .type = NLA_FLAG },
 };
@@ -825,6 +801,8 @@ static int ovs_flow_cmd_fill_info(struct sw_flow *flow, struct datapath *dp,
 		goto error;
 	nla_nest_end(skb, nla);
 
+	NLA_PUT_U32(skb, OVS_FLOW_ATTR_UPCALL_PID, flow->upcall_pid);
+
 	spin_lock_bh(&flow->lock);
 	used = flow->used;
 	stats.n_packets = flow->packet_count;
@@ -962,6 +940,11 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, struct genl_info *info)
 		flow->key = key;
 		clear_stats(flow);
 
+		if (a[OVS_FLOW_ATTR_UPCALL_PID])
+			flow->upcall_pid = nla_get_u32(a[OVS_FLOW_ATTR_UPCALL_PID]);
+		else
+			flow->upcall_pid = NETLINK_CB(skb).pid;
+
 		/* Obtain actions. */
 		acts = flow_actions_alloc(a[OVS_FLOW_ATTR_ACTIONS]);
 		error = PTR_ERR(acts);
@@ -1011,6 +994,9 @@ static int ovs_flow_cmd_new_or_set(struct sk_buff *skb, struct genl_info *info)
 		reply = ovs_flow_cmd_build_info(flow, dp, info->snd_pid,
 						info->snd_seq, OVS_FLOW_CMD_NEW);
 
+		if (a[OVS_FLOW_ATTR_UPCALL_PID])
+			flow->upcall_pid = nla_get_u32(a[OVS_FLOW_ATTR_UPCALL_PID]);
+
 		/* Clear stats. */
 		if (a[OVS_FLOW_ATTR_CLEAR]) {
 			spin_lock_bh(&flow->lock);
@@ -1169,6 +1155,7 @@ static const struct nla_policy datapath_policy[OVS_DP_ATTR_MAX + 1] = {
 #ifdef HAVE_NLA_NUL_STRING
 	[OVS_DP_ATTR_NAME] = { .type = NLA_NUL_STRING, .len = IFNAMSIZ - 1 },
 #endif
+	[OVS_DP_ATTR_UPCALL_PID] = { .type = NLA_U32 },
 	[OVS_DP_ATTR_IPV4_FRAGS] = { .type = NLA_U32 },
 	[OVS_DP_ATTR_SAMPLING] = { .type = NLA_U32 },
 };
@@ -1191,14 +1178,13 @@ static int ovs_dp_cmd_fill_info(struct datapath *dp, struct sk_buff *skb,
 	struct ovs_header *ovs_header;
 	struct nlattr *nla;
 	int err;
-	int dp_ifindex = get_dpifindex(dp);
 
 	ovs_header = genlmsg_put(skb, pid, seq, &dp_datapath_genl_family,
 				   flags, cmd);
 	if (!ovs_header)
 		goto error;
 
-	ovs_header->dp_ifindex = dp_ifindex;
+	ovs_header->dp_ifindex = get_dpifindex(dp);
 
 	rcu_read_lock();
 	err = nla_put_string(skb, OVS_DP_ATTR_NAME, dp_name(dp));
@@ -1217,17 +1203,6 @@ static int ovs_dp_cmd_fill_info(struct datapath *dp, struct sk_buff *skb,
 	if (dp->sflow_probability)
 		NLA_PUT_U32(skb, OVS_DP_ATTR_SAMPLING, dp->sflow_probability);
 
-	nla = nla_nest_start(skb, OVS_DP_ATTR_MCGROUPS);
-	if (!nla)
-		goto nla_put_failure;
-	NLA_PUT_U32(skb, OVS_PACKET_CMD_MISS,
-			packet_mc_group(dp_ifindex, OVS_PACKET_CMD_MISS));
-	NLA_PUT_U32(skb, OVS_PACKET_CMD_ACTION,
-			packet_mc_group(dp_ifindex, OVS_PACKET_CMD_ACTION));
-	NLA_PUT_U32(skb, OVS_PACKET_CMD_SAMPLE,
-			packet_mc_group(dp_ifindex, OVS_PACKET_CMD_SAMPLE));
-	nla_nest_end(skb, nla);
-
 	return genlmsg_end(skb, ovs_header);
 
 nla_put_failure:
@@ -1347,6 +1322,11 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	parms.options = NULL;
 	parms.dp = dp;
 	parms.port_no = OVSP_LOCAL;
+	if (a[OVS_DP_ATTR_UPCALL_PID])
+		parms.upcall_pid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);
+	else
+		parms.upcall_pid = NETLINK_CB(skb).pid;
+
 	vport = new_vport(&parms);
 	if (IS_ERR(vport)) {
 		err = PTR_ERR(vport);
@@ -1543,6 +1523,7 @@ static const struct nla_policy vport_policy[OVS_VPORT_ATTR_MAX + 1] = {
 #endif
 	[OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },
 	[OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },
+	[OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },
 	[OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },
 };
 
@@ -1577,6 +1558,7 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, struct sk_buff *skb,
 	NLA_PUT_U32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no);
 	NLA_PUT_U32(skb, OVS_VPORT_ATTR_TYPE, vport_get_type(vport));
 	NLA_PUT_STRING(skb, OVS_VPORT_ATTR_NAME, vport_get_name(vport));
+	NLA_PUT_U32(skb, OVS_VPORT_ATTR_UPCALL_PID, vport->upcall_pid);
 
 	nla = nla_reserve(skb, OVS_VPORT_ATTR_STATS, sizeof(struct ovs_vport_stats));
 	if (!nla)
@@ -1724,6 +1706,10 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	parms.options = a[OVS_VPORT_ATTR_OPTIONS];
 	parms.dp = dp;
 	parms.port_no = port_no;
+	if (a[OVS_VPORT_ATTR_UPCALL_PID])
+		parms.upcall_pid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
+	else
+		parms.upcall_pid = NETLINK_CB(skb).pid;
 
 	vport = new_vport(&parms);
 	err = PTR_ERR(vport);
@@ -1775,6 +1761,8 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, struct genl_info *info)
 		err = vport_set_options(vport, a[OVS_VPORT_ATTR_OPTIONS]);
 	if (!err)
 		err = change_vport(vport, a);
+	if (!err && a[OVS_VPORT_ATTR_UPCALL_PID])
+		vport->upcall_pid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
 
 	reply = ovs_vport_cmd_build_info(vport, info->snd_pid, info->snd_seq,
 					 OVS_VPORT_CMD_NEW);
@@ -1976,9 +1964,6 @@ static int dp_register_genl(void)
 		}
 	}
 
-	err = packet_register_mc_groups();
-	if (err)
-		goto error;
 	return 0;
 
 error:
diff --git a/datapath/datapath.h b/datapath/datapath.h
index f54d844..03bebd1 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -69,7 +69,7 @@ struct dp_stats_percpu {
  * to iterate or modify.
  * @stats_percpu: Per-CPU datapath statistics.
  * @sflow_probability: Number of packets out of UINT_MAX to sample to the
- * %OVS_PACKET_CMD_SAMPLE multicast group, e.g. (@sflow_probability/UINT_MAX)
+ * %OVS_PACKET_CMD_SAMPLE upcall, e.g. (@sflow_probability/UINT_MAX)
  * is the probability of sampling a given packet.
  *
  * Context: See the comment on locking at the top of datapath.c for additional
diff --git a/datapath/flow.h b/datapath/flow.h
index 3590a7d..ae12fe4 100644
--- a/datapath/flow.h
+++ b/datapath/flow.h
@@ -81,6 +81,7 @@ struct sw_flow {
 	struct rcu_head rcu;
 	struct hlist_node  hash_node;
 	u32 hash;
+	u32 upcall_pid;
 
 	struct sw_flow_key key;
 	struct sw_flow_actions __rcu *sf_acts;
diff --git a/datapath/vport.c b/datapath/vport.c
index 71fdd84..4e922e4 100644
--- a/datapath/vport.c
+++ b/datapath/vport.c
@@ -176,6 +176,7 @@ struct vport *vport_alloc(int priv_size, const struct vport_ops *ops, const stru
 
 	vport->dp = parms->dp;
 	vport->port_no = parms->port_no;
+	vport->upcall_pid = parms->upcall_pid;
 	atomic_set(&vport->sflow_pool, 0);
 	vport->ops = ops;
 
diff --git a/datapath/vport.h b/datapath/vport.h
index e7d2eb5..61da439 100644
--- a/datapath/vport.h
+++ b/datapath/vport.h
@@ -82,6 +82,8 @@ struct vport_err_stats {
  * @node: Element in @dp's @port_list.
  * @sflow_pool: Number of packets that were candidates for sFlow sampling,
  * regardless of whether they were actually chosen and sent down to userspace.
+ * @upcall_pid: The Netlink port to use for packets received on this port that
+ * miss the flow table.
  * @hash_node: Element in @dev_table hash table in vport.c.
  * @ops: Class structure.
  * @percpu_stats: Points to per-CPU statistics used and maintained by vport
@@ -98,6 +100,7 @@ struct vport {
 	char linkname[IFNAMSIZ];
 	struct list_head node;
 	atomic_t sflow_pool;
+	u32 upcall_pid;
 
 	struct hlist_node hash_node;
 	const struct vport_ops *ops;
@@ -131,6 +134,7 @@ struct vport_parms {
 	/* For vport_alloc(). */
 	struct datapath *dp;
 	u16 port_no;
+	u32 upcall_pid;
 };
 
 /**
diff --git a/include/openvswitch/datapath-protocol.h b/include/openvswitch/datapath-protocol.h
index fc7cc1f..98316ae 100644
--- a/include/openvswitch/datapath-protocol.h
+++ b/include/openvswitch/datapath-protocol.h
@@ -85,6 +85,9 @@ struct ovs_header {
  * the &struct ovs_header.  Always present in notifications.  Required in
  * %OVS_DP_NEW requests.  May be used as an alternative to specifying
  * dp_ifindex in other requests (with a dp_ifindex of 0).
+ * @OVS_DP_ATTR_UPCALL_PID: The Netlink socket in userspace that is initially
+ * set on the datapath port (for OVS_ACTION_ATTR_MISS).  Only valid on
+ * %OVS_DP_CMD_NEW requests.
  * @OVS_DP_ATTR_STATS: Statistics about packets that have passed through the
  * datapath.  Always present in notifications.
  * @OVS_DP_ATTR_IPV4_FRAGS: One of %OVS_DP_FRAG_*.  Always present in
@@ -94,10 +97,6 @@ struct ovs_header {
  * @OVS_PACKET_CMD_SAMPLE.  A value of 0 samples no packets, a value of
  * %UINT32_MAX samples all packets, and intermediate values sample intermediate
  * fractions of packets.
- * @OVS_DP_ATTR_MCGROUPS: Nested attributes with multicast groups.  Each nested
- * attribute has a %OVS_PACKET_CMD_* type with a 32-bit value giving the
- * Generic Netlink multicast group number used for sending this datapath's
- * messages with that command type up to userspace.
  *
  * These attributes follow the &struct ovs_header within the Generic Netlink
  * payload for %OVS_DP_* commands.
@@ -105,10 +104,10 @@ struct ovs_header {
 enum ovs_datapath_attr {
 	OVS_DP_ATTR_UNSPEC,
 	OVS_DP_ATTR_NAME,       /* name of dp_ifindex netdev */
+	OVS_DP_ATTR_UPCALL_PID, /* Netlink PID to receive upcalls */
 	OVS_DP_ATTR_STATS,      /* struct ovs_dp_stats */
 	OVS_DP_ATTR_IPV4_FRAGS,	/* 32-bit enum ovs_frag_handling */
 	OVS_DP_ATTR_SAMPLING,   /* 32-bit fraction of packets to sample. */
-	OVS_DP_ATTR_MCGROUPS,   /* Nested attributes with multicast groups. */
 	__OVS_DP_ATTR_MAX
 };
 
@@ -174,6 +173,10 @@ enum ovs_packet_cmd {
  * extracted from the packet as nested %OVS_KEY_ATTR_* attributes.  This allows
  * userspace to adapt its flow setup strategy by comparing its notion of the
  * flow key against the kernel's.
+ * @OVS_PACKET_ATTR_UPCALL_PID: Optionally present for OVS_PACKET_CMD_EXECUTE.
+ * The Netlink socket in userspace that OVS_PACKET_CMD_USERSPACE and
+ * OVS_PACKET_CMD_SAMPLE upcalls will be directed to for actions triggered by
+ * this packet.
  * @OVS_PACKET_ATTR_USERDATA: Present for an %OVS_PACKET_CMD_ACTION
  * notification if the %OVS_ACTION_ATTR_USERSPACE, action's argument was
  * nonzero.
@@ -190,6 +193,7 @@ enum ovs_packet_attr {
 	OVS_PACKET_ATTR_UNSPEC,
 	OVS_PACKET_ATTR_PACKET,      /* Packet data. */
 	OVS_PACKET_ATTR_KEY,         /* Nested OVS_KEY_ATTR_* attributes. */
+	OVS_PACKET_ATTR_UPCALL_PID,  /* Netlink PID to receive upcalls. */
 	OVS_PACKET_ATTR_USERDATA,    /* u64 OVS_ACTION_ATTR_USERSPACE arg. */
 	OVS_PACKET_ATTR_SAMPLE_POOL, /* # sampling candidate packets so far. */
 	OVS_PACKET_ATTR_ACTIONS,     /* Nested OVS_ACTION_ATTR_* attributes. */
@@ -229,6 +233,9 @@ enum ovs_vport_cmd {
  * @OVS_VPORT_ATTR_NAME: Name of vport.  For a vport based on a network device
  * this is the name of the network device.  Maximum length %IFNAMSIZ-1 bytes
  * plus a null terminator.
+ * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that
+ * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
+ * this port.
  * @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for
  * packets sent or received through the vport.
  * @OVS_VPORT_ATTR_ADDRESS: A 6-byte Ethernet address for the vport.
@@ -257,6 +264,7 @@ enum ovs_vport_attr {
 	OVS_VPORT_ATTR_PORT_NO,	/* port number within datapath */
 	OVS_VPORT_ATTR_TYPE,	/* 32-bit OVS_VPORT_TYPE_* constant. */
 	OVS_VPORT_ATTR_NAME,	/* string name, up to IFNAMSIZ bytes long */
+	OVS_VPORT_ATTR_UPCALL_PID, /* Netlink PID to receive upcalls */
 	OVS_VPORT_ATTR_STATS,	/* struct ovs_vport_stats */
 	OVS_VPORT_ATTR_ADDRESS, /* hardware address */
 	OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type */
@@ -379,6 +387,9 @@ struct ovs_key_nd {
  * @OVS_FLOW_ATTR_ACTIONS: Nested %OVS_ACTION_ATTR_* attributes specifying
  * the actions to take for packets that match the key.  Always present in
  * notifications.  Required for %OVS_FLOW_CMD_NEW requests, optional
+ * @OVS_FLOW_ATTR_UPCALL_PID: The Netlink socket in userspace that
+ * OVS_PACKET_CMD_USERSPACE and OVS_PACKET_CMD_SAMPLE upcalls will be
+ * directed to for packets received on this port.
  * on %OVS_FLOW_CMD_SET request to change the existing actions, ignored for
  * other requests.
  * @OVS_FLOW_ATTR_STATS: &struct ovs_flow_stats giving statistics for this
@@ -402,6 +413,7 @@ enum ovs_flow_attr {
 	OVS_FLOW_ATTR_UNSPEC,
 	OVS_FLOW_ATTR_KEY,       /* Sequence of OVS_KEY_ATTR_* attributes. */
 	OVS_FLOW_ATTR_ACTIONS,   /* Nested OVS_ACTION_ATTR_* attributes. */
+	OVS_FLOW_ATTR_UPCALL_PID, /* Netlink PID to receive upcalls. */
 	OVS_FLOW_ATTR_STATS,     /* struct ovs_flow_stats. */
 	OVS_FLOW_ATTR_TCP_FLAGS, /* 8-bit OR'd TCP flags. */
 	OVS_FLOW_ATTR_USED,      /* u64 msecs last used in monotonic time. */
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index 15a21e6..ef8502f 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -73,10 +73,10 @@ struct dpif_linux_dp {
 
     /* Attributes. */
     const char *name;                  /* OVS_DP_ATTR_NAME. */
+    uint32_t upcall_pid;               /* OVS_DP_UPCALL_PID. */
     struct ovs_dp_stats stats;         /* OVS_DP_ATTR_STATS. */
     enum ovs_frag_handling ipv4_frags; /* OVS_DP_ATTR_IPV4_FRAGS. */
     const uint32_t *sampling;          /* OVS_DP_ATTR_SAMPLING. */
-    uint32_t mcgroups[DPIF_N_UC_TYPES]; /* OVS_DP_ATTR_MCGROUPS. */
 };
 
 static void dpif_linux_dp_init(struct dpif_linux_dp *);
@@ -109,6 +109,7 @@ struct dpif_linux_flow {
     size_t key_len;
     const struct nlattr *actions;       /* OVS_FLOW_ATTR_ACTIONS. */
     size_t actions_len;
+    uint32_t upcall_pid;                /* OVS_FLOW_ATTR_UPCALL_PID. */
     const struct ovs_flow_stats *stats; /* OVS_FLOW_ATTR_STATS. */
     const uint8_t *tcp_flags;           /* OVS_FLOW_ATTR_TCP_FLAGS. */
     const uint64_t *used;               /* OVS_FLOW_ATTR_USED. */
@@ -131,9 +132,8 @@ struct dpif_linux {
     struct dpif dpif;
     int dp_ifindex;
 
-    /* Multicast group messages. */
-    struct nl_sock *mc_sock;
-    uint32_t mcgroups[DPIF_N_UC_TYPES];
+    /* Upcall messages. */
+    struct nl_sock *upcall_sock;
     unsigned int listen_mask;
 
     /* Change notification. */
@@ -263,10 +263,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)
     dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,
               dp->dp_ifindex, dp->dp_ifindex);
 
-    dpif->mc_sock = NULL;
-    for (i = 0; i < DPIF_N_UC_TYPES; i++) {
-        dpif->mcgroups[i] = dp->mcgroups[i];
-    }
+    dpif->upcall_sock = NULL;
     dpif->listen_mask = 0;
     dpif->dp_ifindex = dp->dp_ifindex;
     sset_init(&dpif->changed_ports);
@@ -287,7 +284,7 @@ dpif_linux_close(struct dpif *dpif_)
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
     nln_notifier_destroy(dpif->port_notifier);
-    nl_sock_destroy(dpif->mc_sock);
+    nl_sock_destroy(dpif->upcall_sock);
     sset_destroy(&dpif->changed_ports);
     free(dpif->lru_bitmap);
     free(dpif);
@@ -401,6 +398,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
     /* Loop until we find a port that isn't used. */
     do {
         request.port_no = dpif_linux_pop_port(dpif);
+        if (dpif->upcall_sock) {
+            request.upcall_pid = nl_sock_pid(dpif->upcall_sock);
+        }
         error = dpif_linux_vport_transact(&request, &reply, &buf);
 
         if (!error) {
@@ -659,6 +659,9 @@ dpif_linux_flow_put(struct dpif *dpif_, enum dpif_flow_put_flags flags,
     /* Ensure that OVS_FLOW_ATTR_ACTIONS will always be included. */
     request.actions = actions ? actions : &dummy_action;
     request.actions_len = actions_len;
+    if (dpif->upcall_sock) {
+        request.upcall_pid = nl_sock_pid(dpif->upcall_sock);
+    }
     if (flags & DPIF_FP_ZERO_STATS) {
         request.clear = true;
     }
@@ -790,7 +793,7 @@ dpif_linux_flow_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
 }
 
 static int
-dpif_linux_execute__(int dp_ifindex,
+dpif_linux_execute__(int dp_ifindex, uint32_t upcall_pid,
                      const struct nlattr *key, size_t key_len,
                      const struct nlattr *actions, size_t actions_len,
                      const struct ofpbuf *packet)
@@ -810,6 +813,7 @@ dpif_linux_execute__(int dp_ifindex,
     nl_msg_put_unspec(buf, OVS_PACKET_ATTR_PACKET, packet->data, packet->size);
     nl_msg_put_unspec(buf, OVS_PACKET_ATTR_KEY, key, key_len);
     nl_msg_put_unspec(buf, OVS_PACKET_ATTR_ACTIONS, actions, actions_len);
+    nl_msg_put_u32(buf, OVS_PACKET_ATTR_UPCALL_PID, upcall_pid);
 
     error = nl_sock_transact(genl_sock, buf, NULL);
     ofpbuf_delete(buf);
@@ -823,8 +827,13 @@ dpif_linux_execute(struct dpif *dpif_,
                    const struct ofpbuf *packet)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
+    uint32_t upcall_pid = 0;
+
+    if (dpif->upcall_sock) {
+        upcall_pid = nl_sock_pid(dpif->upcall_sock);
+    }
 
-    return dpif_linux_execute__(dpif->dp_ifindex, key, key_len,
+    return dpif_linux_execute__(dpif->dp_ifindex, upcall_pid, key, key_len,
                                 actions, actions_len, packet);
 }
 
@@ -841,45 +850,62 @@ dpif_linux_recv_set_mask(struct dpif *dpif_, int listen_mask)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     int error;
-    int i;
 
     if (listen_mask == dpif->listen_mask) {
         return 0;
     } else if (!listen_mask) {
-        nl_sock_destroy(dpif->mc_sock);
-        dpif->mc_sock = NULL;
-        dpif->listen_mask = 0;
-        return 0;
-    } else if (!dpif->mc_sock) {
-        error = nl_sock_create(NETLINK_GENERIC, &dpif->mc_sock);
+        nl_sock_destroy(dpif->upcall_sock);
+        dpif->upcall_sock = NULL;
+    } else if (!dpif->upcall_sock) {
+        struct dpif_port port;
+        struct dpif_port_dump port_dump;
+        struct dpif_flow_dump flow_dump;
+        const struct nlattr *key;
+        size_t key_len;
+        const struct nlattr *actions;
+        size_t actions_len;
+        const struct dpif_flow_stats *flow_stats;
+
+        error = nl_sock_create(NETLINK_GENERIC, &dpif->upcall_sock);
         if (error) {
             return error;
         }
-    }
 
-    /* Unsubscribe from old groups. */
-    for (i = 0; i < DPIF_N_UC_TYPES; i++) {
-        if (dpif->listen_mask & (1u << i)) {
-            nl_sock_leave_mcgroup(dpif->mc_sock, dpif->mcgroups[i]);
+        DPIF_PORT_FOR_EACH (&port, &port_dump, dpif_) {
+            struct dpif_linux_vport vport_request;
+
+            dpif_linux_vport_init(&vport_request);
+            vport_request.cmd = OVS_VPORT_CMD_SET;
+            vport_request.dp_ifindex = dpif->dp_ifindex;
+            vport_request.port_no = port.port_no;
+            vport_request.upcall_pid = nl_sock_pid(dpif->upcall_sock);
+            error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
+            if (error) {
+                VLOG_WARN("%s: failed to set upcall pid on port: %s",
+                          dpif_name(dpif_), strerror(error));
+            }
         }
-    }
-
-    /* Update listen_mask. */
-    dpif->listen_mask = listen_mask;
 
-    /* Subscribe to new groups. */
-    error = 0;
-    for (i = 0; i < DPIF_N_UC_TYPES; i++) {
-        if (dpif->listen_mask & (1u << i)) {
-            int retval;
-
-            retval = nl_sock_join_mcgroup(dpif->mc_sock, dpif->mcgroups[i]);
-            if (retval) {
-                error = retval;
+        dpif_flow_dump_start(&flow_dump, dpif_);
+        while (dpif_flow_dump_next(&flow_dump, &key, &key_len,
+                                   &actions, &actions_len, &flow_stats)) {
+            struct dpif_linux_flow flow_request;
+
+            dpif_linux_flow_init(&flow_request);
+            flow_request.cmd = OVS_FLOW_CMD_SET;
+            flow_request.dp_ifindex = dpif->dp_ifindex;
+            flow_request.upcall_pid = nl_sock_pid(dpif->upcall_sock);
+            error = dpif_linux_flow_transact(&flow_request, NULL, NULL);
+            if (error) {
+                VLOG_WARN("%s: failed to set upcall pid on flow: %s",
+                          dpif_name(dpif_), strerror(error));
             }
         }
+        dpif_flow_dump_done(&flow_dump);
     }
-    return error;
+
+    dpif->listen_mask = listen_mask;
+    return 0;
 }
 
 static int
@@ -999,14 +1025,14 @@ dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall)
     int error;
     int i;
 
-    if (!dpif->mc_sock) {
+    if (!dpif->upcall_sock) {
         return EAGAIN;
     }
 
     for (i = 0; i < 50; i++) {
         int dp_ifindex;
 
-        error = nl_sock_recv(dpif->mc_sock, &buf, false);
+        error = nl_sock_recv(dpif->upcall_sock, &buf, false);
         if (error) {
             return error;
         }
@@ -1031,8 +1057,8 @@ static void
 dpif_linux_recv_wait(struct dpif *dpif_)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-    if (dpif->mc_sock) {
-        nl_sock_wait(dpif->mc_sock, POLLIN);
+    if (dpif->upcall_sock) {
+        nl_sock_wait(dpif->upcall_sock, POLLIN);
     }
 }
 
@@ -1041,8 +1067,8 @@ dpif_linux_recv_purge(struct dpif *dpif_)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
-    if (dpif->mc_sock) {
-        nl_sock_drain(dpif->mc_sock);
+    if (dpif->upcall_sock) {
+        nl_sock_drain(dpif->upcall_sock);
     }
 }
 
@@ -1164,7 +1190,7 @@ dpif_linux_vport_send(int dp_ifindex, uint32_t port_no,
     ofpbuf_use_stack(&actions, &action, sizeof action);
     nl_msg_put_u32(&actions, OVS_ACTION_ATTR_OUTPUT, port_no);
 
-    return dpif_linux_execute__(dp_ifindex, key.data, key.size,
+    return dpif_linux_execute__(dp_ifindex, 0, key.data, key.size,
                                 actions.data, actions.size, &packet);
 }
 
@@ -1209,6 +1235,7 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,
         [OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 },
         [OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 },
         [OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len = IFNAMSIZ },
+        [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 },
         [OVS_VPORT_ATTR_STATS] = { .type = NL_A_UNSPEC,
                                    .min_len = sizeof(struct ovs_vport_stats),
                                    .max_len = sizeof(struct ovs_vport_stats),
@@ -1245,6 +1272,9 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,
     vport->port_no = nl_attr_get_u32(a[OVS_VPORT_ATTR_PORT_NO]);
     vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]);
     vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]);
+    if (a[OVS_VPORT_ATTR_UPCALL_PID]) {
+        vport->upcall_pid = nl_attr_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
+    }
     if (a[OVS_VPORT_ATTR_STATS]) {
         vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]);
     }
@@ -1287,6 +1317,8 @@ dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *vport,
         nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name);
     }
 
+    nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID, vport->upcall_pid);
+
     if (vport->stats) {
         nl_msg_put_unspec(buf, OVS_VPORT_ATTR_STATS,
                           vport->stats, sizeof *vport->stats);
@@ -1391,7 +1423,6 @@ dpif_linux_dp_from_ofpbuf(struct dpif_linux_dp *dp, const struct ofpbuf *buf)
                                 .optional = true },
         [OVS_DP_ATTR_IPV4_FRAGS] = { .type = NL_A_U32, .optional = true },
         [OVS_DP_ATTR_SAMPLING] = { .type = NL_A_U32, .optional = true },
-        [OVS_DP_ATTR_MCGROUPS] = { .type = NL_A_NESTED, .optional = true },
     };
 
     struct nlattr *a[ARRAY_SIZE(ovs_datapath_policy)];
@@ -1429,34 +1460,6 @@ dpif_linux_dp_from_ofpbuf(struct dpif_linux_dp *dp, const struct ofpbuf *buf)
         dp->sampling = nl_attr_get(a[OVS_DP_ATTR_SAMPLING]);
     }
 
-    if (a[OVS_DP_ATTR_MCGROUPS]) {
-        static const struct nl_policy ovs_mcgroup_policy[] = {
-            [OVS_PACKET_CMD_MISS] = { .type = NL_A_U32, .optional = true },
-            [OVS_PACKET_CMD_ACTION] = { .type = NL_A_U32, .optional = true },
-            [OVS_PACKET_CMD_SAMPLE] = { .type = NL_A_U32, .optional = true },
-        };
-
-        struct nlattr *mcgroups[ARRAY_SIZE(ovs_mcgroup_policy)];
-
-        if (!nl_parse_nested(a[OVS_DP_ATTR_MCGROUPS], ovs_mcgroup_policy,
-                             mcgroups, ARRAY_SIZE(ovs_mcgroup_policy))) {
-            return EINVAL;
-        }
-
-        if (mcgroups[OVS_PACKET_CMD_MISS]) {
-            dp->mcgroups[DPIF_UC_MISS]
-                = nl_attr_get_u32(mcgroups[OVS_PACKET_CMD_MISS]);
-        }
-        if (mcgroups[OVS_PACKET_CMD_ACTION]) {
-            dp->mcgroups[DPIF_UC_ACTION]
-                = nl_attr_get_u32(mcgroups[OVS_PACKET_CMD_ACTION]);
-        }
-        if (mcgroups[OVS_PACKET_CMD_SAMPLE]) {
-            dp->mcgroups[DPIF_UC_SAMPLE]
-                = nl_attr_get_u32(mcgroups[OVS_PACKET_CMD_SAMPLE]);
-        }
-    }
-
     return 0;
 }
 
@@ -1476,6 +1479,8 @@ dpif_linux_dp_to_ofpbuf(const struct dpif_linux_dp *dp, struct ofpbuf *buf)
         nl_msg_put_string(buf, OVS_DP_ATTR_NAME, dp->name);
     }
 
+    nl_msg_put_u32(buf, OVS_DP_ATTR_UPCALL_PID, dp->upcall_pid);
+
     /* Skip OVS_DP_ATTR_STATS since we never have a reason to serialize it. */
 
     if (dp->ipv4_frags) {
@@ -1572,6 +1577,7 @@ dpif_linux_flow_from_ofpbuf(struct dpif_linux_flow *flow,
     static const struct nl_policy ovs_flow_policy[] = {
         [OVS_FLOW_ATTR_KEY] = { .type = NL_A_NESTED },
         [OVS_FLOW_ATTR_ACTIONS] = { .type = NL_A_NESTED, .optional = true },
+        [OVS_FLOW_ATTR_UPCALL_PID] = { .type = NL_A_U32 },
         [OVS_FLOW_ATTR_STATS] = { .type = NL_A_UNSPEC,
                                   .min_len = sizeof(struct ovs_flow_stats),
                                   .max_len = sizeof(struct ovs_flow_stats),
@@ -1608,6 +1614,9 @@ dpif_linux_flow_from_ofpbuf(struct dpif_linux_flow *flow,
         flow->actions = nl_attr_get(a[OVS_FLOW_ATTR_ACTIONS]);
         flow->actions_len = nl_attr_get_size(a[OVS_FLOW_ATTR_ACTIONS]);
     }
+    if (a[OVS_FLOW_ATTR_UPCALL_PID]) {
+        flow->upcall_pid = nl_attr_get_u32(a[OVS_FLOW_ATTR_UPCALL_PID]);
+    }
     if (a[OVS_FLOW_ATTR_STATS]) {
         flow->stats = nl_attr_get(a[OVS_FLOW_ATTR_STATS]);
     }
@@ -1644,6 +1653,8 @@ dpif_linux_flow_to_ofpbuf(const struct dpif_linux_flow *flow,
                           flow->actions, flow->actions_len);
     }
 
+    nl_msg_put_u32(buf, OVS_FLOW_ATTR_UPCALL_PID, flow->upcall_pid);
+
     /* We never need to send these to the kernel. */
     assert(!flow->stats);
     assert(!flow->tcp_flags);
diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h
index 727a9e5..41ede74 100644
--- a/lib/dpif-linux.h
+++ b/lib/dpif-linux.h
@@ -34,6 +34,7 @@ struct dpif_linux_vport {
 
     /* Attributes. */
     const char *name;                      /* OVS_VPORT_ATTR_NAME. */
+    uint32_t upcall_pid;                   /* OVS_VPORT_ATTR_UPCALL_PID. */
     const struct ovs_vport_stats *stats;   /* OVS_VPORT_ATTR_STATS. */
     const uint8_t *address;                /* OVS_VPORT_ATTR_ADDRESS. */
     const struct nlattr *options;          /* OVS_VPORT_ATTR_OPTIONS. */
-- 
1.7.4.1




More information about the dev mailing list