[ovs-dev] [PATCHv2] netlink-socket: Simplify multithreaded dumping to match Linux reality.

Ben Pfaff blp at nicira.com
Tue Jul 15 17:58:00 UTC 2014


Commit 0791315e4d (netlink-socket: Work around kernel Netlink dump thread
races.) introduced a simple workaround for Linux kernel races in Netlink
dumps.  However, the code remained more complicated than needed.  This
commit simplifies it.

The main reason for complication in the code was 'status_seq' in nl_dump.
This member was there to allow a thread to wait for some other thread to
refill the socket buffer with another dump message (although we did not
understand the reason at the time it was introduced).  Now that we know
that Netlink dumps properly need to be serialized to work in existing
Linux kernels, there's no additional value in having 'status_seq',
because serialized recvmsg() calls always refill the socket buffer
properly.

This commit updates nl_msg_next() to clear its buffer argument on error.
This is a more convenient interface for the new version of the Netlink
dump code.  nl_msg_next() doesn't have any other callers.

Signed-off-by: Ben Pfaff <blp at nicira.com>
---
v1->v2: Remove __ from function name, fix nl_dump_refill() return values.

 lib/netlink-socket.c |  176 ++++++++++++++++++++++++++++----------------------
 lib/netlink-socket.h |   11 ++--
 lib/netlink.c        |    3 +-
 3 files changed, 106 insertions(+), 84 deletions(-)

diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
index e36100d..8f4ff9a 100644
--- a/lib/netlink-socket.c
+++ b/lib/netlink-socket.c
@@ -702,18 +702,63 @@ nl_sock_drain(struct nl_sock *sock)
 void
 nl_dump_start(struct nl_dump *dump, int protocol, const struct ofpbuf *request)
 {
-    int status;
-
     nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_DUMP | NLM_F_ACK;
-    status = nl_pool_alloc(protocol, &dump->sock);
-    if (!status) {
-        status = nl_sock_send__(dump->sock, request,
-                                nl_sock_allocate_seq(dump->sock, 1), true);
+
+    ovs_mutex_init(&dump->mutex);
+    ovs_mutex_lock(&dump->mutex);
+    dump->status = nl_pool_alloc(protocol, &dump->sock);
+    if (!dump->status) {
+        dump->status = nl_sock_send__(dump->sock, request,
+                                      nl_sock_allocate_seq(dump->sock, 1),
+                                      true);
     }
-    atomic_init(&dump->status, status << 1);
     dump->nl_seq = nl_msg_nlmsghdr(request)->nlmsg_seq;
-    dump->status_seq = seq_create();
-    ovs_mutex_init(&dump->mutex);
+    ovs_mutex_unlock(&dump->mutex);
+}
+
+static int
+nl_dump_refill(struct nl_dump *dump, struct ofpbuf *buffer)
+    OVS_REQUIRES(dump->mutex)
+{
+    struct nlmsghdr *nlmsghdr;
+    int error;
+
+    while (!ofpbuf_size(buffer)) {
+        error = nl_sock_recv__(dump->sock, buffer, true);
+        if (error) {
+            return error == EAGAIN ? EOF : error;
+        }
+
+        nlmsghdr = nl_msg_nlmsghdr(buffer);
+        if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
+            VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
+                        nlmsghdr->nlmsg_seq, dump->nl_seq);
+            ofpbuf_clear(buffer);
+        }
+    }
+
+    if (nl_msg_nlmsgerr(buffer, &error) && error) {
+        VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
+                     ovs_strerror(error));
+        ofpbuf_clear(buffer);
+        return error;
+    }
+
+    return 0;
+}
+
+static int
+nl_dump_next__(struct ofpbuf *reply, struct ofpbuf *buffer)
+{
+    struct nlmsghdr *nlmsghdr = nl_msg_next(buffer, reply);
+    if (!nlmsghdr) {
+        VLOG_WARN_RL(&rl, "netlink dump contains message fragment");
+        return EPROTO;
+    } else if (nlmsghdr->nlmsg_type == NLMSG_DONE) {
+        return EOF;
+    } else {
+        return 0;
+    }
 }
 
 /* Attempts to retrieve another reply from 'dump' into 'buffer'. 'dump' must
@@ -739,81 +784,47 @@ nl_dump_start(struct nl_dump *dump, int protocol, const struct ofpbuf *request)
 bool
 nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer)
 {
-    struct nlmsghdr *nlmsghdr;
-    int error = 0;
-
-    ofpbuf_set_data(reply, NULL);
-    ofpbuf_set_size(reply, 0);
-
-    /* If 'buffer' is empty, fetch another batch of nlmsgs. */
-    while (!ofpbuf_size(buffer)) {
-        unsigned int status;
-        int retval, seq;
-
-        seq = seq_read(dump->status_seq);
-        atomic_read(&dump->status, &status);
-        if (status) {
-            return false;
-        }
+    int retval = 0;
 
-        /* Take the mutex here to avoid an in-kernel race.  If two threads try
-         * to read from a Netlink dump socket at once, then the socket error
-         * can be set to EINVAL, which will be encountered on the next recv on
-         * that socket, which could be anywhere due to the way that we pool
-         * Netlink sockets.  Serializing the recv calls avoids the issue. */
+    /* If the buffer is empty, refill it.
+     *
+     * If the buffer is not empty, we don't check the dump's status.
+     * Otherwise, we could end up skipping some of the dump results if thread A
+     * hits EOF while thread B is in the midst of processing a batch. */
+    if (!ofpbuf_size(buffer)) {
         ovs_mutex_lock(&dump->mutex);
-        retval = nl_sock_recv__(dump->sock, buffer, false);
+        if (!dump->status) {
+            /* Take the mutex here to avoid an in-kernel race.  If two threads
+             * try to read from a Netlink dump socket at once, then the socket
+             * error can be set to EINVAL, which will be encountered on the
+             * next recv on that socket, which could be anywhere due to the way
+             * that we pool Netlink sockets.  Serializing the recv calls avoids
+             * the issue. */
+            dump->status = nl_dump_refill(dump, buffer);
+        }
+        retval = dump->status;
         ovs_mutex_unlock(&dump->mutex);
+    }
 
+    /* Fetch the next message from the buffer. */
+    if (!retval) {
+        retval = nl_dump_next__(reply, buffer);
         if (retval) {
-            ofpbuf_clear(buffer);
-            if (retval == EAGAIN) {
-                nl_sock_wait(dump->sock, POLLIN);
-                seq_wait(dump->status_seq, seq);
-                poll_block();
-                continue;
-            } else {
-                error = retval;
-                goto exit;
+            /* Record 'retval' as the dump status, but don't overwrite an error
+             * with EOF.  */
+            ovs_mutex_lock(&dump->mutex);
+            if (dump->status <= 0) {
+                dump->status = retval;
             }
-        }
-
-        nlmsghdr = nl_msg_nlmsghdr(buffer);
-        if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
-            VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
-                        nlmsghdr->nlmsg_seq, dump->nl_seq);
-            ofpbuf_clear(buffer);
-            continue;
-        }
-
-        if (nl_msg_nlmsgerr(buffer, &retval) && retval) {
-            VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
-                         ovs_strerror(retval));
-            error = retval == EAGAIN ? EPROTO : retval;
-            ofpbuf_clear(buffer);
-            goto exit;
+            ovs_mutex_unlock(&dump->mutex);
         }
     }
 
-    /* Fetch the next nlmsg in the current batch. */
-    nlmsghdr = nl_msg_next(buffer, reply);
-    if (!nlmsghdr) {
-        VLOG_WARN_RL(&rl, "netlink dump reply contains message fragment");
-        error = EPROTO;
-    } else if (nlmsghdr->nlmsg_type == NLMSG_DONE) {
-        error = EOF;
+    if (retval) {
+        ofpbuf_set_data(reply, NULL);
+        ofpbuf_set_size(reply, 0);
     }
-
-exit:
-    if (error == EOF) {
-        unsigned int old;
-        atomic_or(&dump->status, 1, &old);
-        seq_change(dump->status_seq);
-    } else if (error) {
-        atomic_store(&dump->status, error << 1);
-        seq_change(dump->status_seq);
-    }
-    return !error;
+    return !retval;
 }
 
 /* Completes Netlink dump operation 'dump', which must have been initialized
@@ -824,11 +835,14 @@ nl_dump_done(struct nl_dump *dump)
 {
     int status;
 
+    ovs_mutex_lock(&dump->mutex);
+    status = dump->status;
+    ovs_mutex_unlock(&dump->mutex);
+
     /* Drain any remaining messages that the client didn't read.  Otherwise the
      * kernel will continue to queue them up and waste buffer space.
      *
      * XXX We could just destroy and discard the socket in this case. */
-    atomic_read(&dump->status, &status);
     if (!status) {
         uint64_t tmp_reply_stub[NL_DUMP_BUFSIZE / 8];
         struct ofpbuf reply, buf;
@@ -837,14 +851,18 @@ nl_dump_done(struct nl_dump *dump)
         while (nl_dump_next(dump, &reply, &buf)) {
             /* Nothing to do. */
         }
-        atomic_read(&dump->status, &status);
-        ovs_assert(status);
         ofpbuf_uninit(&buf);
+
+        ovs_mutex_lock(&dump->mutex);
+        status = dump->status;
+        ovs_mutex_unlock(&dump->mutex);
+        ovs_assert(status);
     }
+
     nl_pool_release(dump->sock);
-    seq_destroy(dump->status_seq);
     ovs_mutex_destroy(&dump->mutex);
-    return status >> 1;
+
+    return status == EOF ? 0 : status;
 }
 
 /* Causes poll_block() to wake up when any of the specified 'events' (which is
diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
index 8ac201a..d53db4e 100644
--- a/lib/netlink-socket.h
+++ b/lib/netlink-socket.h
@@ -110,12 +110,15 @@ void nl_transact_multiple(int protocol, struct nl_transaction **, size_t n);
 #define NL_DUMP_BUFSIZE         4096
 
 struct nl_dump {
+    /* These members are immutable during the lifetime of the nl_dump. */
     struct nl_sock *sock;       /* Socket being dumped. */
     uint32_t nl_seq;            /* Expected nlmsg_seq for replies. */
-    atomic_uint status;         /* Low bit set if we read final message.
-                                 * Other bits hold an errno (0 for success). */
-    struct seq *status_seq;     /* Tracks changes to the above 'status'. */
-    struct ovs_mutex mutex;
+
+    /* 'mutex' protects 'status' and serializes access to 'sock'. */
+    struct ovs_mutex mutex;     /* Protects 'status', synchronizes recv(). */
+    int status OVS_GUARDED;     /* 0: dump in progress,
+                                 * positive errno: dump completed with error,
+                                 * EOF: dump completed successfully. */
 };
 
 void nl_dump_start(struct nl_dump *, int protocol,
diff --git a/lib/netlink.c b/lib/netlink.c
index c08a557..24b2168 100644
--- a/lib/netlink.c
+++ b/lib/netlink.c
@@ -462,7 +462,7 @@ nl_msg_put_nested(struct ofpbuf *msg,
  * 'ofpbuf_size(msg)', and returns a pointer to the header.
  *
  * If 'buffer' does not begin with a "struct nlmsghdr" or begins with one that
- * is invalid, returns NULL without modifying 'buffer'. */
+ * is invalid, returns NULL and clears 'buffer' and 'msg'. */
 struct nlmsghdr *
 nl_msg_next(struct ofpbuf *buffer, struct ofpbuf *msg)
 {
@@ -476,6 +476,7 @@ nl_msg_next(struct ofpbuf *buffer, struct ofpbuf *msg)
         }
     }
 
+    ofpbuf_clear(buffer);
     ofpbuf_set_data(msg, NULL);
     ofpbuf_set_size(msg, 0);
     return NULL;
-- 
1.7.10.4




More information about the dev mailing list