[ovs-dev] [PATCHv2 4/7] netlink: Make nl_dump_next() thread-safe.

Joe Stringer joestringer at nicira.com
Tue Jan 21 19:29:27 UTC 2014


This patch modifies 'struct nl_dump' and nl_dump_next() to allow
multiple threads to share the same nl_dump. These changes are targeted
around synchronizing dump status between multiple callers, and
allowing callers to fully process their existing buffers before
determining whether to stop fetching flows.

The 'status' field of 'struct nl_dump' becomes atomic, so that multiple
threads may check and/or update it to communicate when there is an error
or the netlink dump is finished. The low bit holds whether the final
message was seen, while the higher bits hold an errno value.

nl_dump_next() will now read all messages from the given buffer before
checking the shared error status and attempting to fetch more. Multiple
threads may call this with the same nl_dump, but must provide
independent buffers. As previously, the final dump status can be
determined by calling nl_dump_done() from a single thread.

Signed-off-by: Joe Stringer <joestringer at nicira.com>
---
v2: Prevent spinning if the kernel sends EAGAIN.
    Separate finish success (bit 0 of status) from errors (bits 1+).
---
 lib/netlink-socket.c |  131 ++++++++++++++++++++++++++++++--------------------
 lib/netlink-socket.h |   16 +++++-
 2 files changed, 92 insertions(+), 55 deletions(-)

diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
index b5eac0f..80d1f4c 100644
--- a/lib/netlink-socket.c
+++ b/lib/netlink-socket.c
@@ -31,6 +31,7 @@
 #include "ofpbuf.h"
 #include "ovs-thread.h"
 #include "poll-loop.h"
+#include "seq.h"
 #include "socket-util.h"
 #include "util.h"
 #include "vlog.h"
@@ -690,43 +691,18 @@ nl_sock_drain(struct nl_sock *sock)
 void
 nl_dump_start(struct nl_dump *dump, int protocol, const struct ofpbuf *request)
 {
-    dump->status = nl_pool_alloc(protocol, &dump->sock);
-    if (dump->status) {
+    int status = nl_pool_alloc(protocol, &dump->sock);
+
+    if (status) {
         return;
     }
 
     nl_msg_nlmsghdr(request)->nlmsg_flags |= NLM_F_DUMP | NLM_F_ACK;
-    dump->status = nl_sock_send__(dump->sock, request,
-                                  nl_sock_allocate_seq(dump->sock, 1), true);
+    status = nl_sock_send__(dump->sock, request,
+                            nl_sock_allocate_seq(dump->sock, 1), true);
+    atomic_init(&dump->status, status << 1);
     dump->nl_seq = nl_msg_nlmsghdr(request)->nlmsg_seq;
-}
-
-/* Helper function for nl_dump_next(). */
-static int
-nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer)
-{
-    struct nlmsghdr *nlmsghdr;
-    int retval;
-
-    retval = nl_sock_recv__(dump->sock, buffer, true);
-    if (retval) {
-        return retval == EINTR ? EAGAIN : retval;
-    }
-
-    nlmsghdr = nl_msg_nlmsghdr(buffer);
-    if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
-        VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
-                    nlmsghdr->nlmsg_seq, dump->nl_seq);
-        return EAGAIN;
-    }
-
-    if (nl_msg_nlmsgerr(buffer, &retval)) {
-        VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
-                     ovs_strerror(retval));
-        return retval && retval != EAGAIN ? retval : EPROTO;
-    }
-
-    return 0;
+    dump->status_seq = seq_create();
 }
 
 /* Attempts to retrieve another reply from 'dump' into 'buffer'. 'dump' must
@@ -742,40 +718,83 @@ nl_dump_recv(struct nl_dump *dump, struct ofpbuf *buffer)
  * to 0.  Failure might indicate an actual error or merely the end of replies.
  * An error status for the entire dump operation is provided when it is
  * completed by calling nl_dump_done().
+ *
+ * Multiple threads may call this function, passing the same nl_dump, however
+ * each must provide independent buffers. This function may cache multiple
+ * flows in the buffer, and these will be processed before more flows are
+ * fetched. When this function returns false, other threads may continue to
+ * process flows in their buffers, but they will not fetch more flows.
  */
 bool
 nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer)
 {
     struct nlmsghdr *nlmsghdr;
+    int error = 0;
 
     reply->data = NULL;
     reply->size = 0;
-    if (dump->status) {
-        return false;
-    }
 
+    /* If 'buffer' is empty, fetch another batch of nlmsgs. */
     while (!buffer->size) {
-        int retval = nl_dump_recv(dump, buffer);
+        unsigned int status;
+        int retval, seq;
+
+        seq = seq_read(dump->status_seq);
+        atomic_read(&dump->status, &status);
+        if (status) {
+            return false;
+        }
+
+        retval = nl_sock_recv__(dump->sock, buffer, false);
         if (retval) {
             ofpbuf_clear(buffer);
-            if (retval != EAGAIN) {
-                dump->status = retval;
-                return false;
+            if (retval == EAGAIN) {
+                nl_sock_wait(dump->sock, POLLIN);
+                seq_wait(dump->status_seq, seq);
+                poll_block();
+                continue;
+            } else {
+                error = retval;
+                goto exit;
             }
         }
+
+        nlmsghdr = nl_msg_nlmsghdr(buffer);
+        if (dump->nl_seq != nlmsghdr->nlmsg_seq) {
+            VLOG_DBG_RL(&rl, "ignoring seq %#"PRIx32" != expected %#"PRIx32,
+                        nlmsghdr->nlmsg_seq, dump->nl_seq);
+            ofpbuf_clear(buffer);
+            continue;
+        }
+
+        if (nl_msg_nlmsgerr(buffer, &retval) && retval) {
+            VLOG_INFO_RL(&rl, "netlink dump request error (%s)",
+                         ovs_strerror(retval));
+            error = retval == EAGAIN ? EPROTO : retval;
+            ofpbuf_clear(buffer);
+            goto exit;
+        }
     }
 
+    /* Fetch the next nlmsg in the current batch. */
     nlmsghdr = nl_msg_next(buffer, reply);
     if (!nlmsghdr) {
         VLOG_WARN_RL(&rl, "netlink dump reply contains message fragment");
-        dump->status = EPROTO;
-        return false;
+        error = EPROTO;
     } else if (nlmsghdr->nlmsg_type == NLMSG_DONE) {
-        dump->status = EOF;
-        return false;
+        error = EOF;
     }
 
-    return true;
+exit:
+    if (error == EOF) {
+        unsigned int old;
+        atomic_or(&dump->status, 1, &old);
+        seq_change(dump->status_seq);
+    } else if (error) {
+        atomic_store(&dump->status, error << 1);
+        seq_change(dump->status_seq);
+    }
+    return !error;
 }
 
 /* Completes Netlink dump operation 'dump', which must have been initialized
@@ -784,23 +803,29 @@ nl_dump_next(struct nl_dump *dump, struct ofpbuf *reply, struct ofpbuf *buffer)
 int
 nl_dump_done(struct nl_dump *dump)
 {
-    uint64_t tmp_reply_stub[NL_DUMP_BUFSIZE / 8];
-    struct ofpbuf buf;
+    int status;
 
     /* Drain any remaining messages that the client didn't read.  Otherwise the
      * kernel will continue to queue them up and waste buffer space.
      *
      * XXX We could just destroy and discard the socket in this case. */
-    ofpbuf_use_stub(&buf, tmp_reply_stub, sizeof tmp_reply_stub);
-    while (!dump->status) {
-        struct ofpbuf reply;
-        if (!nl_dump_next(dump, &reply, &buf)) {
-            ovs_assert(dump->status);
+    atomic_read(&dump->status, &status);
+    if (!status) {
+        uint64_t tmp_reply_stub[NL_DUMP_BUFSIZE / 8];
+        struct ofpbuf reply, buf;
+
+        ofpbuf_use_stub(&buf, tmp_reply_stub, sizeof tmp_reply_stub);
+        while (nl_dump_next(dump, &reply, &buf)) {
+            /* Nothing to do. */
         }
+        atomic_read(&dump->status, &status);
+        ovs_assert(status);
+        ofpbuf_uninit(&buf);
     }
+    atomic_destroy(&dump->status);
     nl_pool_release(dump->sock);
-    ofpbuf_uninit(&buf);
-    return dump->status == EOF ? 0 : dump->status;
+    seq_destroy(dump->status_seq);
+    return status >> 1;
 }
 
 /* Causes poll_block() to wake up when any of the specified 'events' (which is
diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
index 1d733c3..7700a4a 100644
--- a/lib/netlink-socket.h
+++ b/lib/netlink-socket.h
@@ -35,13 +35,23 @@
  * Thread-safety
  * =============
  *
- * Only a single thread may use a given nl_sock or nl_dump at one time.
+ * Most of the netlink functions are not fully thread-safe: Only a single
+ * thread may use a given nl_sock or nl_dump at one time. The exceptions are:
+ *
+ *    - nl_sock_recv() is conditionally thread-safe: it may be called from
+ *      different threads with the same nl_sock, but each caller must provide
+ *      an independent receive buffer.
+ *
+ *    - nl_dump_next() is conditionally thread-safe: it may be called from
+ *      different threads with the same nl_dump, but each caller must provide
+ *      independent buffers.
  */
 
 #include <stdbool.h>
 #include <stddef.h>
 #include <stdint.h>
 #include "ofpbuf.h"
+#include "ovs-atomic.h"
 
 struct nl_sock;
 
@@ -101,7 +111,9 @@ void nl_transact_multiple(int protocol, struct nl_transaction **, size_t n);
 struct nl_dump {
     struct nl_sock *sock;       /* Socket being dumped. */
     uint32_t nl_seq;            /* Expected nlmsg_seq for replies. */
-    int status;                 /* 0=OK, EOF=done, or positive errno value. */
+    atomic_uint status;         /* Low bit set if we read final message.
+                                 * Other bits hold an errno (0 for success). */
+    struct seq *status_seq;     /* Tracks changes to the above 'status'. */
 };
 
 void nl_dump_start(struct nl_dump *, int protocol,
-- 
1.7.9.5




More information about the dev mailing list