[ovs-dev] [netlink v4 47/52] datapath: Adopt Generic Netlink-compatible locking.

Ben Pfaff blp at nicira.com
Thu Jan 20 23:12:13 UTC 2011


On Wed, Jan 19, 2011 at 07:00:01PM -0800, Jesse Gross wrote:
> On Tue, Jan 11, 2011 at 9:49 PM, Ben Pfaff <blp at nicira.com> wrote:
> > It might be possible to drop dp_mutex given a little bit of cleverness but
> > I haven't gone to the trouble.
> 
> We had talked about doing this before.  Did you run into a problem
> with it or simply decide that it was better to wait for the future?

The latter.  I had enough problems.

> Some other notes on locking issues that I ran across while reviewing this:
> * dp->port_list uses the *_rcu variants of the list functions but it
> is not actually protected by RCU.  It is actually protected by
> dp_mutex.

OK, I changed the two instances of list_for_each_entry_rcu() in
datapath.c that were iterating through port_list to use
list_for_each_entry() instead.

> * dump_flow() directly accesses dp->table instead of using
> get_table_protected().

OK, fixed.

> * We need to more consistently document the locking expectations of
> the functions in datapath.c
> * We don't consistently use dp_mutex_(un)lock, frequently preferring
> the direct version.

OK, I've dropped those functions entirely now.  I think I had this idea
that brcompat_mod needed to take dp_mutex, and so I wanted to export
that to it via functions, but it didn't do that.

> Did you run this with lockdep?

I always run with lockdep.  (I also always use sparse.  It points out
its own share of my locking bugs.)

I might not have tested this particular commit in its current form,
though.  Certainly you have pointed out enough bugs that it doesn't look
like it.

> > diff --git a/datapath/datapath.c b/datapath/datapath.c
> > index bc6b7be..19f2eaf 100644
> > --- a/datapath/datapath.c
> > +++ b/datapath/datapath.c
> > ?static struct datapath *lookup_datapath(struct odp_datapath *odp_datapath, struct nlattr *a[ODPDT_MAX + 1])
> 
> The comment above this function still refers to dp->mutex.

OK, I fixed this up, thanks.

> > @@ -1456,10 +1445,8 @@ static int modify_datapath(unsigned int cmd, struct odp_datapath __user *uodp_da
> >
> > ? ? ? ?if (cmd == ODP_DP_DEL)
> > ? ? ? ? ? ? ? ?destroy_dp(dp);
> > - ? ? ? else {
> > + ? ? ? else
> > ? ? ? ? ? ? ? ?change_datapath(dp, a);
> > - ? ? ? ? ? ? ? mutex_unlock(&dp->mutex);
> > - ? ? ? }
> 
> del_datapath() is now the only caller of destroy_dp().  Is it worth
> merging these two functions together?

OK, done.

> > @@ -1657,43 +1638,21 @@ static struct vport *lookup_vport(struct odp_vport *odp_vport, struct nlattr *a[
> 
> The comments above this function still refer to dp->mutex.  It also
> says that it should be called without any locks but it seems like
> dp_mutex is required.

Oops.  I fixed it:

/* Called with dp_mutex and optionally with RTNL lock also. */

> > @@ -1798,10 +1753,11 @@ static int modify_vport(unsigned int cmd, struct odp_vport __user *uodp_vport)
> > ? ? ? ? ? ? ? ?goto exit;
> >
> > ? ? ? ?rtnl_lock();
> > + ? ? ? mutex_lock(&dp_mutex);
> > ? ? ? ?vport = lookup_vport((struct odp_vport *)skb->data, a);
> > ? ? ? ?err = PTR_ERR(vport);
> > ? ? ? ?if (IS_ERR(vport))
> > - ? ? ? ? ? ? ? goto exit_free;
> > + ? ? ? ? ? ? ? goto exit_unlock;
> > ? ? ? ?dp = vport->dp;
> >
> > ? ? ? ?if (cmd == ODP_VPORT_DEL)
> 
> In set_vport() (the successor to this function) it looks like we are
> missing a call to lock dp_mutex after rtnl_lock().

Oops, fixed.

> > @@ -1893,6 +1848,8 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
> > - ? ? ? dp = get_dp_locked(dp_idx);
> > + ? ? ? dp = get_dp(dp_idx);
> 
> I don't see what actually uses dp here.  If something does though,
> get_dp() should complain because we're not holding dp_mutex at this
> point.

You're right, we can delete this get_dp() call entirely now.

> > @@ -2032,7 +1989,7 @@ static ssize_t openvswitch_read(struct file *f, char __user *buf,
> > ?{
> > ? ? ? ?int listeners = get_listen_mask(f);
> > ? ? ? ?int dp_idx = iminor(f->f_dentry->d_inode);
> > - ? ? ? struct datapath *dp = get_dp_locked(dp_idx);
> > + ? ? ? struct datapath *dp = get_dp(dp_idx);
> 
> I think we need to take dp_mutex by hand here.

You're right.  Oops.  Fixed.

> > ?static unsigned int openvswitch_poll(struct file *file, poll_table *wait)
> > ?{
> > ? ? ? ?int dp_idx = iminor(file->f_dentry->d_inode);
> > - ? ? ? struct datapath *dp = get_dp_locked(dp_idx);
> > + ? ? ? struct datapath *dp = get_dp(dp_idx);
> 
> We need to take dp_mutex here too.

Drat.  Fixed.

> > diff --git a/datapath/vport.c b/datapath/vport.c
> > index 4db1f01..5c048c6 100644
> > --- a/datapath/vport.c
> > +++ b/datapath/vport.c
> > @@ -142,7 +95,7 @@ static void vport_del_all(void)
> > ? ? ? ?int i;
> >
> > ? ? ? ?rtnl_lock();
> > - ? ? ? vport_lock();
> > + ? ? ? dp_mutex_lock();
> >
> > ? ? ? ?for (i = 0; i < VPORT_HASH_BUCKETS; i++) {
> > ? ? ? ? ? ? ? ?struct hlist_head *bucket = &dev_table[i];
> > @@ -153,7 +106,7 @@ static void vport_del_all(void)
> > ? ? ? ? ? ? ? ? ? ? ? ?vport_del(vport);
> > ? ? ? ?}
> >
> > - ? ? ? vport_unlock();
> > + ? ? ? dp_mutex_unlock();
> > ? ? ? ?rtnl_unlock();
> 
> This function is actually completely unnecessary now.  It's left over
> from the time when vports could be created without being attached to
> datapaths and therefore needed to be cleaned up before unloading the
> module.  However, we can't leak memory in this way any more, so we
> should just get rid of the function.

Good catch.  I'll add a commit to do that.

> > ?* ? ? vport_locate - find a port that has already been created
> > ?*
> > ?* @name: name of port to find
> > - *
> > - * Either RTNL or vport lock must be acquired before calling this function
> > - * and held while using the found port. ?See the locking comments at the
> > - * top of the file.
> > ?*/
> > ?struct vport *vport_locate(const char *name)
> > ?{
> > @@ -199,11 +148,6 @@ struct vport *vport_locate(const char *name)
> > ? ? ? ?struct vport *vport;
> > ? ? ? ?struct hlist_node *node;
> >
> > - ? ? ? if (unlikely(!mutex_is_locked(&vport_mutex) && !rtnl_is_locked())) {
> > - ? ? ? ? ? ? ? pr_err("neither RTNL nor vport lock held in vport_locate\n");
> > - ? ? ? ? ? ? ? dump_stack();
> > - ? ? ? }
> > -
> > ? ? ? ?rcu_read_lock();
> 
> It's not clear to me what is handling the locking for vports.  It
> appears to be dp_mutex but none of the vport functions are documented
> as requiring it.  The add/del functions are protected by RTNL but
> locate can't be because it is called by get_datapath() which does not
> (and should not) hold it.
> 
> At the very least the vport functions should be documented as to the
> locking that they expect.  Perhaps a better way would be to convert
> the vport hash table to use RCU and then there would be one less lock
> to worry about in the vport layer.

All of them are protected by dp_mutex, including get_datapath().  Shall
I add comments to that effect?

Here's the incremental.  It does include the vport_del_all() deletion
but I'll break that out separately.

I also added use of rcu_dereference_{check,protected}() in table.c.  Now
a basic run with a simple GRE tunnel setup is lockdep-clean for me.

diff --git a/datapath/datapath.c b/datapath/datapath.c
index ddf71cd..341d3d6 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -80,20 +80,10 @@ EXPORT_SYMBOL(dp_ioctl_hook);
  * notifier is running.
  */
 static struct datapath __rcu *dps[256];
-static DEFINE_MUTEX(dp_mutex);
+DEFINE_MUTEX(dp_mutex);
 
 static struct vport *new_vport(const struct vport_parms *);
 
-void dp_mutex_lock(void)
-{
-	mutex_lock(&dp_mutex);
-}
-
-void dp_mutex_unlock(void)
-{
-	mutex_unlock(&dp_mutex);
-}
-
 /* Must be called with rcu_read_lock or dp_mutex. */
 struct datapath *get_dp(int dp_idx)
 {
@@ -230,23 +220,6 @@ static void destroy_dp_rcu(struct rcu_head *rcu)
 	kobject_put(&dp->ifobj);
 }
 
-/* Caller must hold RTNL and dp_mutex. */
-static void destroy_dp(struct datapath *dp)
-{
-	struct vport *p, *n;
-
-	list_for_each_entry_safe (p, n, &dp->port_list, node)
-		if (p->port_no != ODPP_LOCAL)
-			dp_detach_port(p);
-
-	dp_sysfs_del_dp(dp);
-	rcu_assign_pointer(dps[dp->dp_idx], NULL);
-	dp_detach_port(get_vport_protected(dp, ODPP_LOCAL));
-
-	call_rcu(&dp->rcu, destroy_dp_rcu);
-	module_put(THIS_MODULE);
-}
-
 /* Called with RTNL lock and dp_mutex. */
 static struct vport *new_vport(const struct vport_parms *parms)
 {
@@ -793,7 +766,7 @@ int dp_min_mtu(const struct datapath *dp)
 
 	ASSERT_RTNL();
 
-	list_for_each_entry_rcu (p, &dp->port_list, node) {
+	list_for_each_entry (p, &dp->port_list, node) {
 		int dev_mtu;
 
 		/* Skip any internal ports, since that's what we're trying to
@@ -820,7 +793,7 @@ void set_internal_devs_mtu(const struct datapath *dp)
 
 	mtu = dp_min_mtu(dp);
 
-	list_for_each_entry_rcu (p, &dp->port_list, node) {
+	list_for_each_entry (p, &dp->port_list, node) {
 		if (is_internal_vport(p))
 			vport_set_mtu(p, mtu);
 	}
@@ -1175,7 +1148,7 @@ static int dump_flow(struct odp_flow __user *uodp_flow)
 
 	bucket = flowcmd.state >> 32;
 	obj = flowcmd.state;
-	flow_node = tbl_next(dp->table, &bucket, &obj);
+	flow_node = tbl_next(get_table_protected(dp), &bucket, &obj);
 	err = -ENODEV;
 	if (!flow_node)
 		goto exit_unlock_dp;
@@ -1294,7 +1267,6 @@ error_free_skb:
 }
 
 /* Called with dp_mutex and optionally with RTNL lock also.
- * Holds the returned datapath's mutex on return.
  */
 static struct datapath *lookup_datapath(struct odp_datapath *odp_datapath, struct nlattr *a[ODPDT_MAX + 1])
 {
@@ -1437,6 +1409,7 @@ err:
 static int del_datapath(struct odp_datapath __user *uodp_datapath)
 {
 	struct nlattr *a[ODPDT_MAX + 1];
+	struct vport *vport, *next_vport;
 	struct datapath *dp;
 	struct sk_buff *skb;
 	int err;
@@ -1453,7 +1426,17 @@ static int del_datapath(struct odp_datapath __user *uodp_datapath)
 	if (IS_ERR(dp))
 		goto exit_free;
 
-	destroy_dp(dp);
+	list_for_each_entry_safe (vport, next_vport, &dp->port_list, node)
+		if (vport->port_no != ODPP_LOCAL)
+			dp_detach_port(vport);
+
+	dp_sysfs_del_dp(dp);
+	rcu_assign_pointer(dps[dp->dp_idx], NULL);
+	dp_detach_port(get_vport_protected(dp, ODPP_LOCAL));
+
+	call_rcu(&dp->rcu, destroy_dp_rcu);
+	module_put(THIS_MODULE);
+
 	err = 0;
 
 exit_free:
@@ -1662,10 +1645,7 @@ error_free_skb:
 	return ERR_PTR(err);
 }
 
-
-/* Called without any locks (or with RTNL lock).
- * Returns holding vport->dp->mutex.
- */
+/* Called with dp_mutex and optionally with RTNL lock also. */
 static struct vport *lookup_vport(struct odp_vport *odp_vport,
 				  struct nlattr *a[ODPPT_MAX + 1])
 {
@@ -1793,6 +1773,7 @@ static int set_vport(unsigned int cmd, struct odp_vport __user *uodp_vport)
 		goto exit;
 
 	rtnl_lock();
+	mutex_lock(&dp_mutex);
 	vport = lookup_vport((struct odp_vport *)skb->data, a);
 	err = PTR_ERR(vport);
 	if (IS_ERR(vport))
@@ -1804,9 +1785,9 @@ static int set_vport(unsigned int cmd, struct odp_vport __user *uodp_vport)
 	if (!err)
 		err = change_vport(vport, a);
 
-	mutex_unlock(&dp_mutex);
 exit_free:
 	kfree_skb(skb);
+	mutex_unlock(&dp_mutex);
 	rtnl_unlock();
 exit:
 	return err;
@@ -1904,8 +1885,6 @@ exit:
 static long openvswitch_ioctl(struct file *f, unsigned int cmd,
 			   unsigned long argp)
 {
-	int dp_idx = iminor(f->f_dentry->d_inode);
-	struct datapath *dp;
 	int listeners;
 	int err;
 
@@ -1976,11 +1955,6 @@ static long openvswitch_ioctl(struct file *f, unsigned int cmd,
 		goto exit;
 	}
 
-	dp = get_dp(dp_idx);
-	err = -ENODEV;
-	if (!dp)
-		goto exit;
-
 	switch (cmd) {
 	case ODP_GET_LISTEN_MASK:
 		err = put_user(get_listen_mask(f), (int __user *)argp);
@@ -2051,49 +2025,58 @@ static long openvswitch_compat_ioctl(struct file *f, unsigned int cmd, unsigned
 }
 #endif
 
+static struct sk_buff *openvswitch_try_read(struct file *f, struct datapath *dp)
+{
+	int listeners = get_listen_mask(f);
+	int i;
+
+	for (i = 0; i < DP_N_QUEUES; i++) {
+		if (listeners & (1 << i)) {
+			struct sk_buff *skb = skb_dequeue(&dp->queues[i]);
+			if (skb)
+				return skb;
+		}
+	}
+
+	if (f->f_flags & O_NONBLOCK)
+		return ERR_PTR(-EAGAIN);
+
+	wait_event_interruptible(dp->waitqueue,
+				 dp_has_packet_of_interest(dp, listeners));
+
+	if (signal_pending(current))
+		return ERR_PTR(-ERESTARTSYS);
+
+	return NULL;
+}
+
 static ssize_t openvswitch_read(struct file *f, char __user *buf,
 				size_t nbytes, loff_t *ppos)
 {
-	int listeners = get_listen_mask(f);
 	int dp_idx = iminor(f->f_dentry->d_inode);
-	struct datapath *dp = get_dp(dp_idx);
+	struct datapath *dp;
 	struct sk_buff *skb;
 	struct iovec iov;
 	int retval;
 
-	if (!dp)
-		return -ENODEV;
-
-	if (nbytes == 0 || !listeners)
-		return 0;
-
-	for (;;) {
-		int i;
+	mutex_lock(&dp_mutex);
 
-		for (i = 0; i < DP_N_QUEUES; i++) {
-			if (listeners & (1 << i)) {
-				skb = skb_dequeue(&dp->queues[i]);
-				if (skb)
-					goto success;
-			}
-		}
+	dp = get_dp(dp_idx);
+	retval = -ENODEV;
+	if (!dp)
+		goto error;
 
-		if (f->f_flags & O_NONBLOCK) {
-			retval = -EAGAIN;
-			goto error;
-		}
+	retval = 0;
+	if (nbytes == 0 || !get_listen_mask(f))
+		goto error;
 
-		wait_event_interruptible(dp->waitqueue,
-					 dp_has_packet_of_interest(dp,
-								   listeners));
+	do {
+		skb = openvswitch_try_read(f, dp);
+	} while (!skb);
 
-		if (signal_pending(current)) {
-			retval = -ERESTARTSYS;
-			goto error;
-		}
-	}
-success:
-	dp_mutex_unlock();
+	mutex_unlock(&dp_mutex);
+	if (IS_ERR(skb))
+		return PTR_ERR(skb);
 
 	iov.iov_base = buf;
 	iov.iov_len = min_t(size_t, skb->len, nbytes);
@@ -2105,25 +2088,28 @@ success:
 	return retval;
 
 error:
-	dp_mutex_unlock();
+	mutex_unlock(&dp_mutex);
 	return retval;
 }
 
 static unsigned int openvswitch_poll(struct file *file, poll_table *wait)
 {
 	int dp_idx = iminor(file->f_dentry->d_inode);
-	struct datapath *dp = get_dp(dp_idx);
+	struct datapath *dp;
 	unsigned int mask;
 
+	mutex_lock(&dp_mutex);
+	dp = get_dp(dp_idx);
 	if (dp) {
 		mask = 0;
 		poll_wait(file, &dp->waitqueue, wait);
 		if (dp_has_packet_of_interest(dp, get_listen_mask(file)))
 			mask |= POLLIN | POLLRDNORM;
-		dp_mutex_unlock();
 	} else {
 		mask = POLLIN | POLLRDNORM | POLLHUP;
 	}
+	mutex_unlock(&dp_mutex);
+
 	return mask;
 }
 
diff --git a/datapath/datapath.h b/datapath/datapath.h
index d1c5679..fc6cdf4 100644
--- a/datapath/datapath.h
+++ b/datapath/datapath.h
@@ -145,8 +145,7 @@ struct dp_upcall_info {
 extern struct notifier_block dp_device_notifier;
 extern int (*dp_ioctl_hook)(struct net_device *dev, struct ifreq *rq, int cmd);
 
-void dp_mutex_lock(void);
-void dp_mutex_unlock(void);
+extern struct mutex dp_mutex;
 
 void dp_process_received_packet(struct vport *, struct sk_buff *);
 int dp_detach_port(struct vport *);
diff --git a/datapath/dp_notify.c b/datapath/dp_notify.c
index de63989..7602a08 100644
--- a/datapath/dp_notify.c
+++ b/datapath/dp_notify.c
@@ -1,6 +1,6 @@
 /*
  * Distributed under the terms of the GNU GPL version 2.
- * Copyright (c) 2007, 2008, 2009, 2010 Nicira Networks.
+ * Copyright (c) 2007, 2008, 2009, 2010, 2011 Nicira Networks.
  *
  * Significant portions of this file may be copied from parts of the Linux
  * kernel, by Linus Torvalds and others.
@@ -34,18 +34,18 @@ static int dp_device_event(struct notifier_block *unused, unsigned long event,
 	switch (event) {
 	case NETDEV_UNREGISTER:
 		if (!is_internal_dev(dev)) {
-			dp_mutex_lock();
+			mutex_lock(&dp_mutex);
 			dp_detach_port(vport);
-			dp_mutex_unlock();
+			mutex_unlock(&dp_mutex);
 		}
 		break;
 
 	case NETDEV_CHANGENAME:
 		if (vport->port_no != ODPP_LOCAL) {
-			dp_mutex_lock();
+			mutex_lock(&dp_mutex);
 			dp_sysfs_del_if(vport);
 			dp_sysfs_add_if(vport);
-			dp_mutex_unlock();
+			mutex_unlock(&dp_mutex);
 		}
 		break;
 
diff --git a/datapath/table.c b/datapath/table.c
index 35a532e..f6b1de8 100644
--- a/datapath/table.c
+++ b/datapath/table.c
@@ -30,6 +30,17 @@ struct tbl_bucket {
 	struct tbl_node *objs[];
 };
 
+static struct tbl_bucket *get_bucket(struct tbl_bucket __rcu *bucket)
+{
+	return rcu_dereference_check(bucket, rcu_read_lock_held() ||
+				     lockdep_is_held(&dp_mutex));
+}
+
+static struct tbl_bucket *get_bucket_protected(struct tbl_bucket __rcu *bucket)
+{
+	return rcu_dereference_protected(bucket, lockdep_is_held(&dp_mutex));
+}
+
 static inline int bucket_size(int n_objs)
 {
 	return sizeof(struct tbl_bucket) + sizeof(struct tbl_node *) * n_objs;
@@ -196,7 +207,7 @@ struct tbl_node *tbl_lookup(struct tbl *table, void *target, u32 hash,
 			    int (*cmp)(const struct tbl_node *, void *))
 {
 	struct tbl_bucket __rcu **bucketp = find_bucket(table, hash);
-	struct tbl_bucket *bucket = rcu_dereference(*bucketp);
+	struct tbl_bucket *bucket = get_bucket(*bucketp);
 	int index;
 
 	if (!bucket)
@@ -237,7 +248,7 @@ int tbl_foreach(struct tbl *table,
 			struct tbl_bucket *bucket;
 			unsigned int i;
 
-			bucket = rcu_dereference(l2[l2_idx]);
+			bucket = get_bucket(l2[l2_idx]);
 			if (!bucket)
 				continue;
 
@@ -282,7 +293,7 @@ struct tbl_node *tbl_next(struct tbl *table, u32 *bucketp, u32 *objp)
 		for (l2_idx = s_l2_idx; l2_idx < TBL_L2_SIZE; l2_idx++) {
 			struct tbl_bucket *bucket;
 
-			bucket = rcu_dereference(l2[l2_idx]);
+			bucket = get_bucket_protected(l2[l2_idx]);
 			if (bucket && s_obj < bucket->n_objs) {
 				*bucketp = (l1_idx << TBL_L1_SHIFT) + l2_idx;
 				*objp = s_obj + 1;
@@ -371,7 +382,7 @@ static void free_bucket_rcu(struct rcu_head *rcu)
 int tbl_insert(struct tbl *table, struct tbl_node *target, u32 hash)
 {
 	struct tbl_bucket __rcu **oldp = find_bucket(table, hash);
-	struct tbl_bucket *old = rcu_dereference(*oldp);
+	struct tbl_bucket *old = get_bucket_protected(*oldp);
 	unsigned int n = old ? old->n_objs : 0;
 	struct tbl_bucket *new = bucket_alloc(n + 1);
 
@@ -409,7 +420,7 @@ int tbl_insert(struct tbl *table, struct tbl_node *target, u32 hash)
 int tbl_remove(struct tbl *table, struct tbl_node *target)
 {
 	struct tbl_bucket __rcu **oldp = find_bucket(table, target->hash);
-	struct tbl_bucket *old = rcu_dereference(*oldp);
+	struct tbl_bucket *old = get_bucket_protected(*oldp);
 	unsigned int n = old->n_objs;
 	struct tbl_bucket *new;
 
diff --git a/datapath/vport.c b/datapath/vport.c
index 3d506d8..4b0e1ea 100644
--- a/datapath/vport.c
+++ b/datapath/vport.c
@@ -90,26 +90,6 @@ error:
 	return err;
 }
 
-static void vport_del_all(void)
-{
-	int i;
-
-	rtnl_lock();
-	dp_mutex_lock();
-
-	for (i = 0; i < VPORT_HASH_BUCKETS; i++) {
-		struct hlist_head *bucket = &dev_table[i];
-		struct vport *vport;
-		struct hlist_node *node, *next;
-
-		hlist_for_each_entry_safe(vport, node, next, bucket, hash_node)
-			vport_del(vport);
-	}
-
-	dp_mutex_unlock();
-	rtnl_unlock();
-}
-
 /**
  *	vport_exit - shutdown vport subsystem
  *
@@ -120,8 +100,6 @@ void vport_exit(void)
 {
 	int i;
 
-	vport_del_all();
-
 	for (i = 0; i < n_vport_types; i++) {
 		if (vport_ops_list[i]->exit)
 			vport_ops_list[i]->exit();





More information about the dev mailing list