[ovs-dev] [PATCH 4/4] rconn: Make thread-safe.

Ben Pfaff blp at nicira.com
Fri Oct 11 07:23:34 UTC 2013


This should make sending OFPT_FLOW_REMOVED and NXST_FLOW_MONITOR safe from
miss handler threads.

Bug #20271.
Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 lib/rconn.c |  286 +++++++++++++++++++++++++++++++++++++++++++++++++----------
 lib/rconn.h |    6 ++
 2 files changed, 247 insertions(+), 45 deletions(-)

diff --git a/lib/rconn.c b/lib/rconn.c
index 9273cff..bfa4752 100644
--- a/lib/rconn.c
+++ b/lib/rconn.c
@@ -66,6 +66,8 @@ state_name(enum state state)
  *
  * See the large comment in rconn.h for more information. */
 struct rconn {
+    struct ovs_mutex mutex;
+
     enum state state;
     time_t state_entered;
 
@@ -139,22 +141,71 @@ uint32_t rconn_get_allowed_versions(const struct rconn *rconn)
     return rconn->allowed_versions;
 }
 
-static unsigned int elapsed_in_this_state(const struct rconn *);
-static unsigned int timeout(const struct rconn *);
-static bool timed_out(const struct rconn *);
-static void state_transition(struct rconn *, enum state);
-static void rconn_set_target__(struct rconn *,
-                               const char *target, const char *name);
-static int try_send(struct rconn *);
-static void reconnect(struct rconn *);
-static void report_error(struct rconn *, int error);
-static void disconnect(struct rconn *, int error);
-static void flush_queue(struct rconn *);
-static void close_monitor(struct rconn *, size_t idx, int retval);
+static unsigned int elapsed_in_this_state(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex);
+static unsigned int timeout(const struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static bool timed_out(const struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static void state_transition(struct rconn *rc, enum state)
+    OVS_REQUIRES(rc->mutex);
+static void rconn_set_target__(struct rconn *rc,
+                               const char *target, const char *name)
+    OVS_REQUIRES(rc->mutex);
+static int rconn_send__(struct rconn *rc, struct ofpbuf *,
+                        struct rconn_packet_counter *)
+    OVS_REQUIRES(rc->mutex);
+static int try_send(struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static void reconnect(struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static void report_error(struct rconn *rc, int error) OVS_REQUIRES(rc->mutex);
+static void rconn_disconnect__(struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static void disconnect(struct rconn *rc, int error) OVS_REQUIRES(rc->mutex);
+static void flush_queue(struct rconn *rc) OVS_REQUIRES(rc->mutex);
+static void close_monitor(struct rconn *rc, size_t idx, int retval)
+    OVS_REQUIRES(rc->mutex);
 static void copy_to_monitor(struct rconn *, const struct ofpbuf *);
 static bool is_connected_state(enum state);
 static bool is_admitted_msg(const struct ofpbuf *);
-static bool rconn_logging_connection_attempts__(const struct rconn *);
+static bool rconn_logging_connection_attempts__(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex);
+static int rconn_get_version__(const struct rconn *rconn)
+    OVS_REQUIRES(rconn->mutex);
+
+/* The following prototypes duplicate those in rconn.h, but there we weren't
+ * able to add the OVS_EXCLUDED annotations because the definition of struct
+ * rconn was not visible.   */
+
+void rconn_set_max_backoff(struct rconn *rc, int max_backoff)
+    OVS_EXCLUDED(rc->mutex);
+void rconn_connect(struct rconn *rc, const char *target, const char *name)
+    OVS_EXCLUDED(rc->mutex);
+void rconn_connect_unreliably(struct rconn *rc,
+                              struct vconn *vconn, const char *name)
+    OVS_EXCLUDED(rc->mutex);
+void rconn_reconnect(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+void rconn_disconnect(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+void rconn_run(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+void rconn_run_wait(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+struct ofpbuf *rconn_recv(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+void rconn_recv_wait(struct rconn *rc) OVS_EXCLUDED(rc->mutex);
+int rconn_send(struct rconn *rc, struct ofpbuf *b,
+               struct rconn_packet_counter *counter)
+    OVS_EXCLUDED(rc->mutex);
+int rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
+                          struct rconn_packet_counter *counter,
+                          int queue_limit)
+    OVS_EXCLUDED(rc->mutex);
+void rconn_add_monitor(struct rconn *rc, struct vconn *vconn)
+    OVS_EXCLUDED(rc->mutex);
+void rconn_set_name(struct rconn *rc, const char *new_name)
+    OVS_EXCLUDED(rc->mutex);
+bool rconn_is_admitted(const struct rconn *rconn) OVS_EXCLUDED(rconn->mutex);
+int rconn_failure_duration(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex);
+ovs_be16 rconn_get_local_port(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex);
+int rconn_get_version(const struct rconn *rconn) OVS_EXCLUDED(rconn->mutex);
+unsigned int rconn_count_txqlen(const struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex);
+
 
 /* Creates and returns a new rconn.
  *
@@ -184,6 +235,8 @@ rconn_create(int probe_interval, int max_backoff, uint8_t dscp,
 {
     struct rconn *rc = xzalloc(sizeof *rc);
 
+    ovs_mutex_init(&rc->mutex);
+
     rc->state = S_VOID;
     rc->state_entered = time_now();
 
@@ -225,7 +278,9 @@ rconn_create(int probe_interval, int max_backoff, uint8_t dscp,
 
 void
 rconn_set_max_backoff(struct rconn *rc, int max_backoff)
+    OVS_EXCLUDED(rc->mutex)
 {
+    ovs_mutex_lock(&rc->mutex);
     rc->max_backoff = MAX(1, max_backoff);
     if (rc->state == S_BACKOFF && rc->backoff > max_backoff) {
         rc->backoff = max_backoff;
@@ -233,6 +288,7 @@ rconn_set_max_backoff(struct rconn *rc, int max_backoff)
             rc->backoff_deadline = time_now() + max_backoff;
         }
     }
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 int
@@ -274,11 +330,14 @@ rconn_get_probe_interval(const struct rconn *rc)
  * but it need not be acceptable to vconn_open(). */
 void
 rconn_connect(struct rconn *rc, const char *target, const char *name)
+    OVS_EXCLUDED(rc->mutex)
 {
-    rconn_disconnect(rc);
+    ovs_mutex_lock(&rc->mutex);
+    rconn_disconnect__(rc);
     rconn_set_target__(rc, target, name);
     rc->reliable = true;
     reconnect(rc);
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* Drops any existing connection on 'rc', then configures 'rc' to use
@@ -292,28 +351,36 @@ rconn_connect(struct rconn *rc, const char *target, const char *name)
 void
 rconn_connect_unreliably(struct rconn *rc,
                          struct vconn *vconn, const char *name)
+    OVS_EXCLUDED(rc->mutex)
 {
     ovs_assert(vconn != NULL);
-    rconn_disconnect(rc);
+
+    ovs_mutex_lock(&rc->mutex);
+    rconn_disconnect__(rc);
     rconn_set_target__(rc, vconn_get_name(vconn), name);
     rc->reliable = false;
     rc->vconn = vconn;
     rc->last_connected = time_now();
     state_transition(rc, S_ACTIVE);
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* If 'rc' is connected, forces it to drop the connection and reconnect. */
 void
 rconn_reconnect(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
+    ovs_mutex_lock(&rc->mutex);
     if (rc->state & (S_ACTIVE | S_IDLE)) {
         VLOG_INFO("%s: disconnecting", rc->name);
         disconnect(rc, 0);
     }
+    ovs_mutex_unlock(&rc->mutex);
 }
 
-void
-rconn_disconnect(struct rconn *rc)
+static void
+rconn_disconnect__(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (rc->state != S_VOID) {
         if (rc->vconn) {
@@ -330,6 +397,15 @@ rconn_disconnect(struct rconn *rc)
     }
 }
 
+void
+rconn_disconnect(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
+{
+    ovs_mutex_lock(&rc->mutex);
+    rconn_disconnect__(rc);
+    ovs_mutex_unlock(&rc->mutex);
+}
+
 /* Disconnects 'rc' and frees the underlying storage. */
 void
 rconn_destroy(struct rconn *rc)
@@ -337,6 +413,7 @@ rconn_destroy(struct rconn *rc)
     if (rc) {
         size_t i;
 
+        ovs_mutex_lock(&rc->mutex);
         free(rc->name);
         free(rc->target);
         vconn_close(rc->vconn);
@@ -345,24 +422,30 @@ rconn_destroy(struct rconn *rc)
         for (i = 0; i < rc->n_monitors; i++) {
             vconn_close(rc->monitors[i]);
         }
+        ovs_mutex_unlock(&rc->mutex);
+        ovs_mutex_destroy(&rc->mutex);
+
         free(rc);
     }
 }
 
 static unsigned int
 timeout_VOID(const struct rconn *rc OVS_UNUSED)
+    OVS_REQUIRES(rc->mutex)
 {
     return UINT_MAX;
 }
 
 static void
 run_VOID(struct rconn *rc OVS_UNUSED)
+    OVS_REQUIRES(rc->mutex)
 {
     /* Nothing to do. */
 }
 
 static void
 reconnect(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     int retval;
 
@@ -388,12 +471,14 @@ reconnect(struct rconn *rc)
 
 static unsigned int
 timeout_BACKOFF(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return rc->backoff;
 }
 
 static void
 run_BACKOFF(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (timed_out(rc)) {
         reconnect(rc);
@@ -402,12 +487,14 @@ run_BACKOFF(struct rconn *rc)
 
 static unsigned int
 timeout_CONNECTING(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return MAX(1, rc->backoff);
 }
 
 static void
 run_CONNECTING(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     int retval = vconn_connect(rc->vconn);
     if (!retval) {
@@ -432,6 +519,7 @@ run_CONNECTING(struct rconn *rc)
 
 static void
 do_tx_work(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (list_is_empty(&rc->txq)) {
         return;
@@ -450,6 +538,7 @@ do_tx_work(struct rconn *rc)
 
 static unsigned int
 timeout_ACTIVE(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (rc->probe_interval) {
         unsigned int base = MAX(rc->last_activity, rc->state_entered);
@@ -461,6 +550,7 @@ timeout_ACTIVE(const struct rconn *rc)
 
 static void
 run_ACTIVE(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (timed_out(rc)) {
         unsigned int base = MAX(rc->last_activity, rc->state_entered);
@@ -469,14 +559,14 @@ run_ACTIVE(struct rconn *rc)
         VLOG_DBG("%s: idle %u seconds, sending inactivity probe",
                  rc->name, (unsigned int) (time_now() - base));
 
-        version = rconn_get_version(rc);
+        version = rconn_get_version__(rc);
         ovs_assert(version >= 0 && version <= 0xff);
 
         /* Ordering is important here: rconn_send() can transition to BACKOFF,
          * and we don't want to transition back to IDLE if so, because then we
          * can end up queuing a packet with vconn == NULL and then *boom*. */
         state_transition(rc, S_IDLE);
-        rconn_send(rc, make_echo_request(version), NULL);
+        rconn_send__(rc, make_echo_request(version), NULL);
         return;
     }
 
@@ -485,12 +575,14 @@ run_ACTIVE(struct rconn *rc)
 
 static unsigned int
 timeout_IDLE(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return rc->probe_interval;
 }
 
 static void
 run_IDLE(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (timed_out(rc)) {
         VLOG_ERR("%s: no response to inactivity probe after %u "
@@ -507,10 +599,12 @@ run_IDLE(struct rconn *rc)
  * connected, attempts to send packets in the send queue, if any. */
 void
 rconn_run(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
     int old_state;
     size_t i;
 
+    ovs_mutex_lock(&rc->mutex);
     if (rc->vconn) {
         vconn_run(rc->vconn);
     }
@@ -541,16 +635,19 @@ rconn_run(struct rconn *rc)
             NOT_REACHED();
         }
     } while (rc->state != old_state);
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* Causes the next call to poll_block() to wake up when rconn_run() should be
  * called on 'rc'. */
 void
 rconn_run_wait(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
     unsigned int timeo;
     size_t i;
 
+    ovs_mutex_lock(&rc->mutex);
     if (rc->vconn) {
         vconn_run_wait(rc->vconn);
         if ((rc->state & (S_ACTIVE | S_IDLE)) && !list_is_empty(&rc->txq)) {
@@ -567,6 +664,7 @@ rconn_run_wait(struct rconn *rc)
         long long int expires = sat_add(rc->state_entered, timeo);
         poll_timer_wait_until(expires * 1000);
     }
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* Attempts to receive a packet from 'rc'.  If successful, returns the packet;
@@ -574,9 +672,12 @@ rconn_run_wait(struct rconn *rc)
  * the packet (with ofpbuf_delete()). */
 struct ofpbuf *
 rconn_recv(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
+    struct ofpbuf *buffer = NULL;
+
+    ovs_mutex_lock(&rc->mutex);
     if (rc->state & (S_ACTIVE | S_IDLE)) {
-        struct ofpbuf *buffer;
         int error = vconn_recv(rc->vconn, &buffer);
         if (!error) {
             copy_to_monitor(rc, buffer);
@@ -590,40 +691,33 @@ rconn_recv(struct rconn *rc)
             if (rc->state == S_IDLE) {
                 state_transition(rc, S_ACTIVE);
             }
-            return buffer;
         } else if (error != EAGAIN) {
             report_error(rc, error);
             disconnect(rc, error);
         }
     }
-    return NULL;
+    ovs_mutex_unlock(&rc->mutex);
+
+    return buffer;
 }
 
 /* Causes the next call to poll_block() to wake up when a packet may be ready
  * to be received by vconn_recv() on 'rc'.  */
 void
 rconn_recv_wait(struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
+    ovs_mutex_lock(&rc->mutex);
     if (rc->vconn) {
         vconn_wait(rc->vconn, WAIT_RECV);
     }
+    ovs_mutex_unlock(&rc->mutex);
 }
 
-/* Sends 'b' on 'rc'.  Returns 0 if successful, or ENOTCONN if 'rc' is not
- * currently connected.  Takes ownership of 'b'.
- *
- * If 'counter' is non-null, then 'counter' will be incremented while the
- * packet is in flight, then decremented when it has been sent (or discarded
- * due to disconnection).  Because 'b' may be sent (or discarded) before this
- * function returns, the caller may not be able to observe any change in
- * 'counter'.
- *
- * There is no rconn_send_wait() function: an rconn has a send queue that it
- * takes care of sending if you call rconn_run(), which will have the side
- * effect of waking up poll_block(). */
-int
-rconn_send(struct rconn *rc, struct ofpbuf *b,
+static int
+rconn_send__(struct rconn *rc, struct ofpbuf *b,
            struct rconn_packet_counter *counter)
+    OVS_REQUIRES(rc->mutex)
 {
     if (rconn_is_connected(rc)) {
         COVERAGE_INC(rconn_queued);
@@ -648,6 +742,32 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
     }
 }
 
+/* Sends 'b' on 'rc'.  Returns 0 if successful, or ENOTCONN if 'rc' is not
+ * currently connected.  Takes ownership of 'b'.
+ *
+ * If 'counter' is non-null, then 'counter' will be incremented while the
+ * packet is in flight, then decremented when it has been sent (or discarded
+ * due to disconnection).  Because 'b' may be sent (or discarded) before this
+ * function returns, the caller may not be able to observe any change in
+ * 'counter'.
+ *
+ * There is no rconn_send_wait() function: an rconn has a send queue that it
+ * takes care of sending if you call rconn_run(), which will have the side
+ * effect of waking up poll_block(). */
+int
+rconn_send(struct rconn *rc, struct ofpbuf *b,
+           struct rconn_packet_counter *counter)
+    OVS_EXCLUDED(rc->mutex)
+{
+    int error;
+
+    ovs_mutex_lock(&rc->mutex);
+    error = rconn_send__(rc, b, counter);
+    ovs_mutex_unlock(&rc->mutex);
+
+    return error;
+}
+
 /* Sends 'b' on 'rc'.  Increments 'counter' while the packet is in flight; it
  * will be decremented when it has been sent (or discarded due to
  * disconnection).  Returns 0 if successful, EAGAIN if 'counter->n' is already
@@ -663,14 +783,21 @@ rconn_send(struct rconn *rc, struct ofpbuf *b,
 int
 rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
                       struct rconn_packet_counter *counter, int queue_limit)
+    OVS_EXCLUDED(rc->mutex)
 {
+    int error;
+
+    ovs_mutex_lock(&rc->mutex);
     if (rconn_packet_counter_n_packets(counter) < queue_limit) {
-        return rconn_send(rc, b, counter);
+        error = rconn_send__(rc, b, counter);
     } else {
         COVERAGE_INC(rconn_overflow);
         ofpbuf_delete(b);
-        return EAGAIN;
+        error = EAGAIN;
     }
+    ovs_mutex_unlock(&rc->mutex);
+
+    return error;
 }
 
 /* Returns the total number of packets successfully sent on the underlying
@@ -686,7 +813,9 @@ rconn_packets_sent(const struct rconn *rc)
  * and received on 'rconn' will be copied.  'rc' takes ownership of 'vconn'. */
 void
 rconn_add_monitor(struct rconn *rc, struct vconn *vconn)
+    OVS_EXCLUDED(rc->mutex)
 {
+    ovs_mutex_lock(&rc->mutex);
     if (rc->n_monitors < ARRAY_SIZE(rc->monitors)) {
         VLOG_INFO("new monitor connection from %s", vconn_get_name(vconn));
         rc->monitors[rc->n_monitors++] = vconn;
@@ -695,6 +824,7 @@ rconn_add_monitor(struct rconn *rc, struct vconn *vconn)
                  vconn_get_name(vconn));
         vconn_close(vconn);
     }
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* Returns 'rc''s name.  This is a name for human consumption, appropriate for
@@ -709,9 +839,12 @@ rconn_get_name(const struct rconn *rc)
 /* Sets 'rc''s name to 'new_name'. */
 void
 rconn_set_name(struct rconn *rc, const char *new_name)
+    OVS_EXCLUDED(rc->mutex)
 {
+    ovs_mutex_lock(&rc->mutex);
     free(rc->name);
     rc->name = xstrdup(new_name);
+    ovs_mutex_unlock(&rc->mutex);
 }
 
 /* Returns 'rc''s target.  This is intended to be a string that may be passed
@@ -737,13 +870,27 @@ rconn_is_connected(const struct rconn *rconn)
     return is_connected_state(rconn->state);
 }
 
+static bool
+rconn_is_admitted__(const struct rconn *rconn)
+    OVS_REQUIRES(rconn->mutex)
+{
+    return (rconn_is_connected(rconn)
+            && rconn->last_admitted >= rconn->last_connected);
+}
+
 /* Returns true if 'rconn' is connected and thought to have been accepted by
  * the peer's admission-control policy. */
 bool
 rconn_is_admitted(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex)
 {
-    return (rconn_is_connected(rconn)
-            && rconn->last_admitted >= rconn->last_connected);
+    bool admitted;
+
+    ovs_mutex_lock(&rconn->mutex);
+    admitted = rconn_is_admitted__(rconn);
+    ovs_mutex_unlock(&rconn->mutex);
+
+    return admitted;
 }
 
 /* Returns 0 if 'rconn' is currently connected and considered to have been
@@ -751,8 +898,17 @@ rconn_is_admitted(const struct rconn *rconn)
  * seconds since 'rconn' was last in such a state. */
 int
 rconn_failure_duration(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex)
 {
-    return rconn_is_admitted(rconn) ? 0 : time_now() - rconn->last_admitted;
+    int duration;
+
+    ovs_mutex_lock(&rconn->mutex);
+    duration = (rconn_is_admitted__(rconn)
+                ? 0
+                : time_now() - rconn->last_admitted);
+    ovs_mutex_unlock(&rconn->mutex);
+
+    return duration;
 }
 
 /* Returns the IP address of the peer, or 0 if the peer's IP address is not
@@ -784,16 +940,37 @@ rconn_get_local_ip(const struct rconn *rconn)
  * connection does not contain a port or if the port is not known. */
 ovs_be16
 rconn_get_local_port(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex)
 {
-    return rconn->vconn ? vconn_get_local_port(rconn->vconn) : 0;
+    ovs_be16 port;
+
+    ovs_mutex_lock(&rconn->mutex);
+    port = rconn->vconn ? vconn_get_local_port(rconn->vconn) : 0;
+    ovs_mutex_unlock(&rconn->mutex);
+
+    return port;
+}
+
+static int
+rconn_get_version__(const struct rconn *rconn)
+    OVS_REQUIRES(rconn->mutex)
+{
+    return rconn->vconn ? vconn_get_version(rconn->vconn) : -1;
 }
 
 /* Returns the OpenFlow version negotiated with the peer, or -1 if there is
  * currently no connection or if version negotiation is not yet complete. */
 int
 rconn_get_version(const struct rconn *rconn)
+    OVS_EXCLUDED(rconn->mutex)
 {
-    return rconn->vconn ? vconn_get_version(rconn->vconn) : -1;
+    int version;
+
+    ovs_mutex_lock(&rconn->mutex);
+    version = rconn_get_version__(rconn);
+    ovs_mutex_unlock(&rconn->mutex);
+
+    return version;
 }
 
 /* Returns the total number of packets successfully received by the underlying
@@ -855,8 +1032,15 @@ rconn_get_last_error(const struct rconn *rc)
 /* Returns the number of messages queued for transmission on 'rc'. */
 unsigned int
 rconn_count_txqlen(const struct rconn *rc)
+    OVS_EXCLUDED(rc->mutex)
 {
-    return list_size(&rc->txq);
+    unsigned int len;
+
+    ovs_mutex_lock(&rc->mutex);
+    len = list_size(&rc->txq);
+    ovs_mutex_unlock(&rc->mutex);
+
+    return len;
 }
 
 struct rconn_packet_counter *
@@ -949,6 +1133,7 @@ rconn_packet_counter_n_bytes(const struct rconn_packet_counter *c)
  * the target also likely changes these values. */
 static void
 rconn_set_target__(struct rconn *rc, const char *target, const char *name)
+    OVS_REQUIRES(rc->mutex)
 {
     free(rc->name);
     rc->name = xstrdup(name ? name : target);
@@ -963,6 +1148,7 @@ rconn_set_target__(struct rconn *rc, const char *target, const char *name)
  * otherwise a positive errno value. */
 static int
 try_send(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     struct ofpbuf *msg = ofpbuf_from_list(rc->txq.next);
     unsigned int n_bytes = msg->size;
@@ -996,6 +1182,7 @@ try_send(struct rconn *rc)
  * normally. */
 static void
 report_error(struct rconn *rc, int error)
+    OVS_REQUIRES(rc->mutex)
 {
     if (error == EOF) {
         /* If 'rc' isn't reliable, then we don't really expect this connection
@@ -1021,6 +1208,7 @@ report_error(struct rconn *rc, int error)
  */
 static void
 disconnect(struct rconn *rc, int error)
+    OVS_REQUIRES(rc->mutex)
 {
     rc->last_error = error;
     if (rc->reliable) {
@@ -1051,7 +1239,7 @@ disconnect(struct rconn *rc, int error)
         state_transition(rc, S_BACKOFF);
     } else {
         rc->last_disconnected = time_now();
-        rconn_disconnect(rc);
+        rconn_disconnect__(rc);
     }
 }
 
@@ -1059,6 +1247,7 @@ disconnect(struct rconn *rc, int error)
  * counts. */
 static void
 flush_queue(struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     if (list_is_empty(&rc->txq)) {
         return;
@@ -1077,12 +1266,14 @@ flush_queue(struct rconn *rc)
 
 static unsigned int
 elapsed_in_this_state(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return time_now() - rc->state_entered;
 }
 
 static unsigned int
 timeout(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     switch (rc->state) {
 #define STATE(NAME, VALUE) case S_##NAME: return timeout_##NAME(rc);
@@ -1095,12 +1286,14 @@ timeout(const struct rconn *rc)
 
 static bool
 timed_out(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return time_now() >= sat_add(rc->state_entered, timeout(rc));
 }
 
 static void
 state_transition(struct rconn *rc, enum state state)
+    OVS_REQUIRES(rc->mutex)
 {
     rc->seqno += (rc->state == S_ACTIVE) != (state == S_ACTIVE);
     if (is_connected_state(state) && !is_connected_state(rc->state)) {
@@ -1116,6 +1309,7 @@ state_transition(struct rconn *rc, enum state state)
 
 static void
 close_monitor(struct rconn *rc, size_t idx, int retval)
+    OVS_REQUIRES(rc->mutex)
 {
     VLOG_DBG("%s: closing monitor connection to %s: %s",
              rconn_get_name(rc), vconn_get_name(rc->monitors[idx]),
@@ -1125,6 +1319,7 @@ close_monitor(struct rconn *rc, size_t idx, int retval)
 
 static void
 copy_to_monitor(struct rconn *rc, const struct ofpbuf *b)
+    OVS_REQUIRES(rc->mutex)
 {
     struct ofpbuf *clone = NULL;
     int retval;
@@ -1244,6 +1439,7 @@ is_admitted_msg(const struct ofpbuf *b)
  * successuflly connected in too long. */
 static bool
 rconn_logging_connection_attempts__(const struct rconn *rc)
+    OVS_REQUIRES(rc->mutex)
 {
     return rc->backoff < rc->max_backoff;
 }
diff --git a/lib/rconn.h b/lib/rconn.h
index d943203..408cec9 100644
--- a/lib/rconn.h
+++ b/lib/rconn.h
@@ -33,6 +33,12 @@
  * An rconn optionally provides reliable communication, in this sense: the
  * rconn will re-connect, with exponential backoff, when the underlying vconn
  * disconnects.
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * Fully thread-safe.
  */
 
 struct vconn;
-- 
1.7.10.4




More information about the dev mailing list