[ovs-dev] [PATCH v2 03/11] conntrack: Use mpsc-queue to store conn expirations

Gaetan Rivet grive at u256.net
Wed Apr 21 22:11:17 UTC 2021


Change the connection expiration lists from ovs_list to mpsc-queue.
This is a pre-step towards reducing the granularity of 'ct_lock'.

It simplifies the responsibilities toward updating the expiration queue.
The dataplane now appends the new conn for expiration once during
creation.  Any further update will only consist in writing the conn
expiration limit and marking the conn for expiration rescheduling.

The ageing thread 'ct_clean' is the only one consuming the expiration
lists.  If a conn was marked for rescheduling by a dataplane, it will
move the conn to the end of the queue.

Once the locks have been reworked, it means neither the dataplane
threads nor 'ct_clean' have to take a lock to update the expiration
lists (assuming the consumer lock is perpetually held by 'ct_clean');

Signed-off-by: Gaetan Rivet <grive at u256.net>
Reviewed-by: Eli Britstein <elibr at nvidia.com>
---
 lib/conntrack-private.h |  84 +++++++++++++++++++---------
 lib/conntrack-tp.c      |  28 +++++-----
 lib/conntrack.c         | 118 ++++++++++++++++++++++++++++++++++------
 3 files changed, 173 insertions(+), 57 deletions(-)

diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index e8332bdba..7db249fdb 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -29,6 +29,7 @@
 #include "openvswitch/list.h"
 #include "openvswitch/types.h"
 #include "packets.h"
+#include "mpsc-queue.h"
 #include "unaligned.h"
 #include "dp-packet.h"
 
@@ -86,22 +87,57 @@ struct alg_exp_node {
     bool nat_rpl_dst;
 };
 
+/* Timeouts: all the possible timeout states passed to update_expiration()
+ * are listed here. The name will be prefix by CT_TM_ and the value is in
+ * milliseconds */
+#define CT_TIMEOUTS \
+    CT_TIMEOUT(TCP_FIRST_PACKET) \
+    CT_TIMEOUT(TCP_OPENING) \
+    CT_TIMEOUT(TCP_ESTABLISHED) \
+    CT_TIMEOUT(TCP_CLOSING) \
+    CT_TIMEOUT(TCP_FIN_WAIT) \
+    CT_TIMEOUT(TCP_CLOSED) \
+    CT_TIMEOUT(OTHER_FIRST) \
+    CT_TIMEOUT(OTHER_MULTIPLE) \
+    CT_TIMEOUT(OTHER_BIDIR) \
+    CT_TIMEOUT(ICMP_FIRST) \
+    CT_TIMEOUT(ICMP_REPLY)
+
+enum ct_timeout {
+#define CT_TIMEOUT(NAME) CT_TM_##NAME,
+    CT_TIMEOUTS
+#undef CT_TIMEOUT
+    N_CT_TM
+};
+
 enum OVS_PACKED_ENUM ct_conn_type {
     CT_CONN_TYPE_DEFAULT,
     CT_CONN_TYPE_UN_NAT,
 };
 
+struct conn_expire {
+    struct mpsc_queue_node node;
+    /* Timeout state of the connection.
+     * It follows the connection state updates.
+     */
+    enum ct_timeout tm;
+    atomic_flag reschedule;
+    struct ovs_refcount refcount;
+};
+
 struct conn {
     /* Immutable data. */
     struct conn_key key;
     struct conn_key rev_key;
     struct conn_key parent_key; /* Only used for orig_tuple support. */
-    struct ovs_list exp_node;
     struct cmap_node cm_node;
     struct nat_action_info_t *nat_info;
     char *alg;
     struct conn *nat_conn; /* The NAT 'conn' context, if there is one. */
 
+    /* Inserted once by a PMD, then managed by the 'ct_clean' thread. */
+    struct conn_expire exp;
+
     /* Mutable data. */
     struct ovs_mutex lock; /* Guards all mutable fields. */
     ovs_u128 label;
@@ -132,33 +168,10 @@ enum ct_update_res {
     CT_UPDATE_VALID_NEW,
 };
 
-/* Timeouts: all the possible timeout states passed to update_expiration()
- * are listed here. The name will be prefix by CT_TM_ and the value is in
- * milliseconds */
-#define CT_TIMEOUTS \
-    CT_TIMEOUT(TCP_FIRST_PACKET) \
-    CT_TIMEOUT(TCP_OPENING) \
-    CT_TIMEOUT(TCP_ESTABLISHED) \
-    CT_TIMEOUT(TCP_CLOSING) \
-    CT_TIMEOUT(TCP_FIN_WAIT) \
-    CT_TIMEOUT(TCP_CLOSED) \
-    CT_TIMEOUT(OTHER_FIRST) \
-    CT_TIMEOUT(OTHER_MULTIPLE) \
-    CT_TIMEOUT(OTHER_BIDIR) \
-    CT_TIMEOUT(ICMP_FIRST) \
-    CT_TIMEOUT(ICMP_REPLY)
-
-enum ct_timeout {
-#define CT_TIMEOUT(NAME) CT_TM_##NAME,
-    CT_TIMEOUTS
-#undef CT_TIMEOUT
-    N_CT_TM
-};
-
 struct conntrack {
     struct ovs_mutex ct_lock; /* Protects 2 following fields. */
     struct cmap conns OVS_GUARDED;
-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+    struct mpsc_queue exp_lists[N_CT_TM];
     struct hmap zone_limits OVS_GUARDED;
     struct hmap timeout_policies OVS_GUARDED;
     uint32_t hash_basis; /* Salt for hashing a connection key. */
@@ -204,4 +217,25 @@ struct ct_l4_proto {
                                struct ct_dpif_protoinfo *);
 };
 
+static inline void
+conn_expire_append(struct conntrack *ct, struct conn *conn)
+{
+    if (ovs_refcount_try_ref_rcu(&conn->exp.refcount)) {
+        atomic_flag_clear(&conn->exp.reschedule);
+        mpsc_queue_insert(&ct->exp_lists[conn->exp.tm], &conn->exp.node);
+    }
+}
+
+static inline void
+conn_expire_prepend(struct conntrack *ct, struct conn *conn)
+    OVS_REQUIRES(ct->exp_lists[conn->exp.tm].read_lock)
+{
+    if (ovs_refcount_try_ref_rcu(&conn->exp.refcount)) {
+        /* Do not change 'reschedule' state, if this expire node is put
+         * at the tail of the list, it will be re-examined next sweep.
+         */
+        mpsc_queue_push_back(&ct->exp_lists[conn->exp.tm], &conn->exp.node);
+    }
+}
+
 #endif /* conntrack-private.h */
diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
index a586d3a8d..6de2354c0 100644
--- a/lib/conntrack-tp.c
+++ b/lib/conntrack-tp.c
@@ -230,6 +230,15 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
     return CT_DPIF_TP_ATTR_MAX;
 }
 
+static void
+conn_schedule_expiration(struct conn *conn, enum ct_timeout tm, long long now,
+                         uint32_t tp_value)
+{
+    conn->expiration = now + tp_value * 1000;
+    conn->exp.tm = tm;
+    ignore(atomic_flag_test_and_set(&conn->exp.reschedule));
+}
+
 static void
 conn_update_expiration__(struct conntrack *ct, struct conn *conn,
                          enum ct_timeout tm, long long now,
@@ -240,11 +249,7 @@ conn_update_expiration__(struct conntrack *ct, struct conn *conn,
 
     ovs_mutex_lock(&ct->ct_lock);
     ovs_mutex_lock(&conn->lock);
-    if (!conn->cleaned) {
-        conn->expiration = now + tp_value * 1000;
-        ovs_list_remove(&conn->exp_node);
-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-    }
+    conn_schedule_expiration(conn, tm, now, tp_value);
     ovs_mutex_unlock(&conn->lock);
     ovs_mutex_unlock(&ct->ct_lock);
 
@@ -281,15 +286,6 @@ conn_update_expiration(struct conntrack *ct, struct conn *conn,
     conn_update_expiration__(ct, conn, tm, now, val);
 }
 
-static void
-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
-                       enum ct_timeout tm, long long now,
-                       uint32_t tp_value)
-{
-    conn->expiration = now + tp_value * 1000;
-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-}
-
 /* ct_lock must be held. */
 void
 conn_init_expiration(struct conntrack *ct, struct conn *conn,
@@ -309,5 +305,7 @@ conn_init_expiration(struct conntrack *ct, struct conn *conn,
     VLOG_DBG_RL(&rl, "Init timeout %s zone=%u with policy id=%d val=%u sec.",
                 ct_timeout_str[tm], conn->key.zone, conn->tp_id, val);
 
-    conn_init_expiration__(ct, conn, tm, now, val);
+    atomic_flag_clear(&conn->exp.reschedule);
+    ovs_refcount_init(&conn->exp.refcount);
+    conn_schedule_expiration(conn, tm, now, val);
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index 99198a601..4243541da 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -301,7 +301,7 @@ conntrack_init(void)
     ovs_mutex_lock(&ct->ct_lock);
     cmap_init(&ct->conns);
     for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
-        ovs_list_init(&ct->exp_lists[i]);
+        mpsc_queue_init(&ct->exp_lists[i]);
     }
     hmap_init(&ct->zone_limits);
     ct->zone_limit_seq = 0;
@@ -453,6 +453,17 @@ conn_clean_cmn(struct conntrack *ct, struct conn *conn)
     }
 }
 
+static inline bool
+conn_unref(struct conn *conn)
+{
+    ovs_assert(conn->conn_type == CT_CONN_TYPE_DEFAULT);
+    if (ovs_refcount_unref(&conn->exp.refcount) == 1) {
+        ovsrcu_postpone(delete_conn, conn);
+        return true;
+    }
+    return false;
+}
+
 /* Must be called with 'conn' of 'conn_type' CT_CONN_TYPE_DEFAULT.  Also
  * removes the associated nat 'conn' from the lookup datastructures. */
 static void
@@ -466,23 +477,31 @@ conn_clean(struct conntrack *ct, struct conn *conn)
         uint32_t hash = conn_key_hash(&conn->nat_conn->key, ct->hash_basis);
         cmap_remove(&ct->conns, &conn->nat_conn->cm_node, hash);
     }
-    ovs_list_remove(&conn->exp_node);
     conn->cleaned = true;
-    ovsrcu_postpone(delete_conn, conn);
+    conn_unref(conn);
     atomic_count_dec(&ct->n_conn);
 }
 
+static inline bool
+conn_unref_one(struct conn *conn)
+{
+    if (ovs_refcount_unref(&conn->exp.refcount) == 1) {
+        ovsrcu_postpone(delete_conn_one, conn);
+        return true;
+    }
+    return false;
+}
+
 static void
 conn_clean_one(struct conntrack *ct, struct conn *conn)
     OVS_REQUIRES(ct->ct_lock)
 {
     conn_clean_cmn(ct, conn);
     if (conn->conn_type == CT_CONN_TYPE_DEFAULT) {
-        ovs_list_remove(&conn->exp_node);
         conn->cleaned = true;
         atomic_count_dec(&ct->n_conn);
     }
-    ovsrcu_postpone(delete_conn_one, conn);
+    conn_unref_one(conn);
 }
 
 /* Destroys the connection tracker 'ct' and frees all the allocated memory.
@@ -497,6 +516,19 @@ conntrack_destroy(struct conntrack *ct)
     latch_destroy(&ct->clean_thread_exit);
 
     ovs_mutex_lock(&ct->ct_lock);
+
+    for (unsigned i = 0; i < N_CT_TM; i++) {
+        struct mpsc_queue_node *node;
+
+        mpsc_queue_acquire(&ct->exp_lists[i]);
+        MPSC_QUEUE_FOR_EACH_POP (node, &ct->exp_lists[i]) {
+            conn = CONTAINER_OF(node, struct conn, exp.node);
+            conn_unref(conn);
+        }
+        mpsc_queue_release(&ct->exp_lists[i]);
+        mpsc_queue_destroy(&ct->exp_lists[i]);
+    }
+
     CMAP_FOR_EACH (conn, cm_node, &ct->conns) {
         conn_clean_one(ct, conn);
     }
@@ -1055,6 +1087,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         ovs_mutex_init_adaptive(&nc->lock);
         nc->conn_type = CT_CONN_TYPE_DEFAULT;
         cmap_insert(&ct->conns, &nc->cm_node, ctx->hash);
+        conn_expire_append(ct, nc);
         atomic_count_inc(&ct->n_conn);
         ctx->conn = nc; /* For completeness. */
         if (zl) {
@@ -1075,7 +1108,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
      * can limit DoS impact. */
 nat_res_exhaustion:
     free(nat_conn);
-    ovs_list_remove(&nc->exp_node);
     delete_conn_cmn(nc);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
     VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
@@ -1492,29 +1524,72 @@ set_label(struct dp_packet *pkt, struct conn *conn,
  * if 'limit' is reached */
 static long long
 ct_sweep(struct conntrack *ct, long long now, size_t limit)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    struct conn *conn, *next;
+    struct conn *conn;
     long long min_expiration = LLONG_MAX;
+    struct mpsc_queue_node *node;
     size_t count = 0;
 
     ovs_mutex_lock(&ct->ct_lock);
 
     for (unsigned i = 0; i < N_CT_TM; i++) {
-        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
+        struct conn *end_of_queue = NULL;
+
+        MPSC_QUEUE_FOR_EACH_POP (node, &ct->exp_lists[i]) {
+            long long int expiration;
+
+            conn = CONTAINER_OF(node, struct conn, exp.node);
+            if (conn_unref(conn)) {
+                continue;
+            }
+
             ovs_mutex_lock(&conn->lock);
-            if (now < conn->expiration || count >= limit) {
-                min_expiration = MIN(min_expiration, conn->expiration);
-                ovs_mutex_unlock(&conn->lock);
-                if (count >= limit) {
-                    /* Do not check other lists. */
-                    COVERAGE_INC(conntrack_long_cleanup);
-                    goto out;
-                }
+            expiration = conn->expiration;
+            ovs_mutex_unlock(&conn->lock);
+
+            if (conn == end_of_queue) {
+                /* If we already re-enqueued this conn during this sweep,
+                 * stop iterating this list and skip to the next.
+                 */
+                min_expiration = MIN(min_expiration, expiration);
+                conn_expire_prepend(ct, conn);
                 break;
+            }
+
+            if (count >= limit) {
+                min_expiration = MIN(min_expiration, expiration);
+                conn_expire_prepend(ct, conn);
+                COVERAGE_INC(conntrack_long_cleanup);
+                /* Do not check other lists. */
+                goto out;
+            }
+
+            if (now < expiration) {
+                if (atomic_flag_test_and_set(&conn->exp.reschedule)) {
+                    /* Reschedule was true, another thread marked
+                     * this conn to be enqueued again.
+                     * The conn is not yet expired, still valid, and
+                     * this list should still be iterated.
+                     */
+                    conn_expire_append(ct, conn);
+                    if (end_of_queue == NULL) {
+                        end_of_queue = conn;
+                    }
+                } else {
+                    /* This connection is still valid, while no other thread
+                     * modified it: it means this list iteration is finished
+                     * for now. Put back the connection within the list.
+                     */
+                    atomic_flag_clear(&conn->exp.reschedule);
+                    conn_expire_prepend(ct, conn);
+                    min_expiration = MIN(min_expiration, expiration);
+                    break;
+                }
             } else {
-                ovs_mutex_unlock(&conn->lock);
                 conn_clean(ct, conn);
             }
+
             count++;
         }
     }
@@ -1566,9 +1641,14 @@ conntrack_clean(struct conntrack *ct, long long now)
 
 static void *
 clean_thread_main(void *f_)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct conntrack *ct = f_;
 
+    for (unsigned i = 0; i < N_CT_TM; i++) {
+        mpsc_queue_acquire(&ct->exp_lists[i]);
+    }
+
     while (!latch_is_set(&ct->clean_thread_exit)) {
         long long next_wake;
         long long now = time_msec();
@@ -1583,6 +1663,10 @@ clean_thread_main(void *f_)
         poll_block();
     }
 
+    for (unsigned i = 0; i < N_CT_TM; i++) {
+        mpsc_queue_release(&ct->exp_lists[i]);
+    }
+
     return NULL;
 }
 
-- 
2.31.1



More information about the dev mailing list