[ovs-dev] [PATCH v4 05/17] conntrack: Periodically delete expired connections.

Daniele Di Proietto diproiettod at vmware.com
Fri Jun 10 22:47:31 UTC 2016


This commit adds a thread that periodically removes expired connections.

The expiration time of a connection can be expressed by:

expiration = now + timeout

For each possible 'timeout' value (there aren't many) we keep a list.
When the expiration is updated, we move the connection to the back of the
corresponding 'timeout' list. This ways, the list is always ordered by
'expiration'.

When the cleanup thread iterates through the lists for expired
connections, it can stop at the first non expired connection.

Suggested-by: Joe Stringer <joe at ovn.org>
Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
---
 lib/conntrack-other.c   |  11 +--
 lib/conntrack-private.h |  21 ++++--
 lib/conntrack-tcp.c     |  20 +++---
 lib/conntrack.c         | 184 ++++++++++++++++++++++++++++++++++++++++++++----
 lib/conntrack.h         |  32 ++++++++-
 5 files changed, 237 insertions(+), 31 deletions(-)

diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c
index 295cb2c..2920889 100644
--- a/lib/conntrack-other.c
+++ b/lib/conntrack-other.c
@@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn)
 }
 
 static enum ct_update_res
-other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
-                  bool reply, long long now)
+other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+                  struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
 {
     struct conn_other *conn = conn_other_cast(conn_);
 
@@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
         conn->state = OTHERS_MULTIPLE;
     }
 
-    update_expiration(conn_, other_timeouts[conn->state], now);
+    conn_update_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
 
     return CT_UPDATE_VALID;
 }
@@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED)
 }
 
 static struct conn *
-other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now)
+other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
+               long long now)
 {
     struct conn_other *conn;
 
     conn = xzalloc(sizeof *conn);
     conn->state = OTHERS_FIRST;
 
-    update_expiration(&conn->up, other_timeouts[conn->state], now);
+    conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
 
     return &conn->up;
 }
diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index d3e0099..4743dc6 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -68,10 +68,13 @@ enum ct_update_res {
 };
 
 struct ct_l4_proto {
-    struct conn *(*new_conn)(struct dp_packet *pkt, long long now);
+    struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt,
+                             long long now);
     bool (*valid_new)(struct dp_packet *pkt);
-    enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet *pkt,
-                                      bool reply, long long now);
+    enum ct_update_res (*conn_update)(struct conn *conn,
+                                      struct conntrack_bucket *,
+                                      struct dp_packet *pkt, bool reply,
+                                      long long now);
 };
 
 extern struct ct_l4_proto ct_proto_tcp;
@@ -80,9 +83,19 @@ extern struct ct_l4_proto ct_proto_other;
 extern long long ct_timeout_val[];
 
 static inline void
-update_expiration(struct conn *conn, enum ct_timeout tm, long long now)
+conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn,
+                        enum ct_timeout tm, long long now)
 {
     conn->expiration = now + ct_timeout_val[tm];
+    ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node);
+}
+
+static inline void
+conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn,
+                       enum ct_timeout tm, long long now)
+{
+    ovs_list_remove(&conn->exp_node);
+    conn_init_expiration(ctb, conn, tm, now);
 }
 
 #endif /* conntrack-private.h */
diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c
index b574eeb..71eadc1 100644
--- a/lib/conntrack-tcp.c
+++ b/lib/conntrack-tcp.c
@@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt)
 }
 
 static enum ct_update_res
-tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
-                long long now)
+tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
+                struct dp_packet *pkt, bool reply, long long now)
 {
     struct conn_tcp *conn = conn_tcp_cast(conn_);
     struct tcp_header *tcp = dp_packet_l4(pkt);
@@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
 
         if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
             && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
-            update_expiration(conn_, CT_TM_TCP_CLOSED, now);
+            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
         } else if (src->state >= CT_DPIF_TCPS_CLOSING
                    && dst->state >= CT_DPIF_TCPS_CLOSING) {
-            update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now);
+            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
         } else if (src->state < CT_DPIF_TCPS_ESTABLISHED
                    || dst->state < CT_DPIF_TCPS_ESTABLISHED) {
-            update_expiration(conn_, now, CT_TM_TCP_OPENING);
+            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
         } else if (src->state >= CT_DPIF_TCPS_CLOSING
                    || dst->state >= CT_DPIF_TCPS_CLOSING) {
-            update_expiration(conn_, now, CT_TM_TCP_CLOSING);
+            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
         } else {
-            update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED);
+            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
         }
     } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
                 || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
@@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt)
 }
 
 static struct conn *
-tcp_new_conn(struct dp_packet *pkt, long long now)
+tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+             long long now)
 {
     struct conn_tcp* newconn = NULL;
     struct tcp_header *tcp = dp_packet_l4(pkt);
@@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now)
     src->state = CT_DPIF_TCPS_SYN_SENT;
     dst->state = CT_DPIF_TCPS_CLOSED;
 
-    update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET);
+    conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
+                         now);
 
     return &newconn->up;
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index 96935bc..5376550 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -33,6 +33,8 @@
 #include "odp-netlink.h"
 #include "openvswitch/vlog.h"
 #include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
 #include "random.h"
 #include "timeval.h"
 
@@ -56,17 +58,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb,
                             struct conn_lookup_ctx *ctx,
                             long long now);
 static bool valid_new(struct dp_packet *pkt, struct conn_key *);
-static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *,
-                             long long now);
+static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
+                             struct conn_key *, long long now);
 static void delete_conn(struct conn *);
-static enum ct_update_res conn_update(struct conn *, struct dp_packet*,
-                                      bool reply, long long now);
+static enum ct_update_res conn_update(struct conn *,
+                                      struct conntrack_bucket *ctb,
+                                      struct dp_packet *, bool reply,
+                                      long long now);
 static bool conn_expired(struct conn *, long long now);
 static void set_mark(struct dp_packet *, struct conn *,
                      uint32_t val, uint32_t mask);
 static void set_label(struct dp_packet *, struct conn *,
                       const struct ovs_key_ct_labels *val,
                       const struct ovs_key_ct_labels *mask);
+static void *clean_thread_main(void *f_);
 
 static struct ct_l4_proto *l4_protos[] = {
     [IPPROTO_TCP] = &ct_proto_tcp,
@@ -90,7 +95,8 @@ long long ct_timeout_val[] = {
 void
 conntrack_init(struct conntrack *ct)
 {
-    unsigned i;
+    unsigned i, j;
+    long long now = time_msec();
 
     for (i = 0; i < CONNTRACK_BUCKETS; i++) {
         struct conntrack_bucket *ctb = &ct->buckets[i];
@@ -98,11 +104,20 @@ conntrack_init(struct conntrack *ct)
         ct_lock_init(&ctb->lock);
         ct_lock_lock(&ctb->lock);
         hmap_init(&ctb->connections);
+        for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
+            ovs_list_init(&ctb->exp_lists[j]);
+        }
         ct_lock_unlock(&ctb->lock);
+        ovs_mutex_init(&ctb->cleanup_mutex);
+        ovs_mutex_lock(&ctb->cleanup_mutex);
+        ctb->next_cleanup = now + CT_TM_MIN;
+        ovs_mutex_unlock(&ctb->cleanup_mutex);
     }
     ct->hash_basis = random_uint32();
     atomic_count_init(&ct->n_conn, 0);
     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
+    latch_init(&ct->clean_thread_exit);
+    ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
 }
 
 /* Destroys the connection tracker 'ct' and frees all the allocated memory. */
@@ -111,10 +126,14 @@ conntrack_destroy(struct conntrack *ct)
 {
     unsigned i;
 
+    latch_set(&ct->clean_thread_exit);
+    pthread_join(ct->clean_thread, NULL);
+    latch_destroy(&ct->clean_thread_exit);
     for (i = 0; i < CONNTRACK_BUCKETS; i++) {
         struct conntrack_bucket *ctb = &ct->buckets[i];
         struct conn *conn;
 
+        ovs_mutex_destroy(&ctb->cleanup_mutex);
         ct_lock_lock(&ctb->lock);
         HMAP_FOR_EACH_POP(conn, node, &ctb->connections) {
             atomic_count_dec(&ct->n_conn);
@@ -170,7 +189,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
             return nc;
         }
 
-        nc = new_conn(pkt, &ctx->key, now);
+        nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now);
 
         memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key);
 
@@ -200,7 +219,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
         } else {
             enum ct_update_res res;
 
-            res = conn_update(conn, pkt, ctx->reply, now);
+            res = conn_update(conn, &ct->buckets[bucket], pkt,
+                              ctx->reply, now);
 
             switch (res) {
             case CT_UPDATE_VALID:
@@ -213,6 +233,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
                 state |= CS_INVALID;
                 break;
             case CT_UPDATE_NEW:
+                ovs_list_remove(&conn->exp_node);
                 hmap_remove(&ct->buckets[bucket].connections, &conn->node);
                 atomic_count_dec(&ct->n_conn);
                 delete_conn(conn);
@@ -345,6 +366,143 @@ set_label(struct dp_packet *pkt, struct conn *conn,
     conn->label = pkt->md.ct_label;
 }
 
+/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
+ * earliest expiration time among the remaining connections in 'ctb'.  Returns
+ * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
+ * if 'limit' is reached */
+static long long
+sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now,
+             size_t limit)
+    OVS_REQUIRES(ctb->lock)
+{
+    struct conn *conn, *next;
+    long long min_expiration = LLONG_MAX;
+    unsigned i;
+    size_t count = 0;
+
+    for (i = 0; i < N_CT_TM; i++) {
+        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
+            if (!conn_expired(conn, now) || count >= limit) {
+                min_expiration = MIN(min_expiration, conn->expiration);
+                if (count >= limit) {
+                    /* Do not check other lists. */
+                    return min_expiration;
+                }
+                break;
+            }
+            ovs_list_remove(&conn->exp_node);
+            hmap_remove(&ctb->connections, &conn->node);
+            atomic_count_dec(&ct->n_conn);
+            delete_conn(conn);
+            count++;
+        }
+    }
+
+    return min_expiration;
+}
+
+/* Cleans up old connection entries from 'ct'.  Returns the time when the
+ * next expiration might happen.  The return value might be smaller than
+ * 'now', meaning that an internal limit has been reached, and some expired
+ * connections have not been deleted. */
+static long long
+conntrack_clean(struct conntrack *ct, long long now)
+{
+    long long next_wakeup = now + CT_TM_MIN;
+    unsigned int n_conn_limit;
+    size_t clean_count = 0;
+    unsigned i;
+
+    atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
+
+    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+        struct conntrack_bucket *ctb = &ct->buckets[i];
+        size_t prev_count;
+        long long min_exp;
+
+        ovs_mutex_lock(&ctb->cleanup_mutex);
+        if (ctb->next_cleanup > now) {
+            goto next_bucket;
+        }
+
+        ct_lock_lock(&ctb->lock);
+        prev_count = hmap_count(&ctb->connections);
+        /* If the connections are well distributed among buckets, we want to
+         * limit to 10% of the global limit equally split among buckets. If
+         * the bucket is busier than the others, we limit to 10% of its
+         * current size. */
+        min_exp = sweep_bucket(ct, ctb, now,
+                MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10)));
+        clean_count += prev_count - hmap_count(&ctb->connections);
+
+        if (min_exp > now) {
+            /* We call hmap_shrink() only if sweep_bucket() managed to delete
+             * every expired connection. */
+            hmap_shrink(&ctb->connections);
+        }
+
+        ct_lock_unlock(&ctb->lock);
+
+        ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
+
+next_bucket:
+        next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
+        ovs_mutex_unlock(&ctb->cleanup_mutex);
+    }
+
+    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
+             clean_count, time_msec() - now);
+
+    return next_wakeup;
+}
+
+/* Cleanup:
+ *
+ *
+ * We must call conntrack_clean() periodically.  conntrack_clean() return
+ * value gives an hint on when the next cleanup must be done (either because
+ * there is an actual connection that expires, or because a new connection
+ * might be created with the minimum timeout).
+ *
+ * The logic below has two goals:
+ *
+ * - Avoid calling conntrack_clean() too often.  If we call conntrack_clean()
+ *   each time a connection expires, the thread will consume 100% CPU, so we
+ *   try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch
+ *   removal.
+ *
+ * - On the other hand, it's not a good idea to keep the buckets locked for
+ *   too long, as we might prevent traffic from flowing.  If conntrack_clean()
+ *   returns a value which is in the past, it means that the internal limit
+ *   has been reached and more cleanup is required.  In this case, just wait
+ *   CT_CLEAN_MIN_INTERVAL before the next call.
+ */
+#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
+#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
+
+static void *
+clean_thread_main(void *f_)
+{
+    struct conntrack *ct = f_;
+
+    while (!latch_is_set(&ct->clean_thread_exit)) {
+        long long next_wake;
+        long long now = time_msec();
+
+        next_wake = conntrack_clean(ct, now);
+
+        if (next_wake < now) {
+            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+        } else {
+            poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+        }
+        latch_wait(&ct->clean_thread_exit);
+        poll_block();
+    }
+
+    return NULL;
+}
+
 /* Key extraction */
 
 /* The function stores a pointer to the first byte after the header in
@@ -851,10 +1009,11 @@ conn_key_lookup(struct conntrack_bucket *ctb,
 }
 
 static enum ct_update_res
-conn_update(struct conn *conn, struct dp_packet *pkt, bool reply,
-            long long now)
+conn_update(struct conn *conn, struct conntrack_bucket *ctb,
+            struct dp_packet *pkt, bool reply, long long now)
 {
-    return l4_protos[conn->key.nw_proto]->conn_update(conn, pkt, reply, now);
+    return l4_protos[conn->key.nw_proto]->conn_update(conn, ctb, pkt,
+                                                      reply, now);
 }
 
 static bool
@@ -870,11 +1029,12 @@ valid_new(struct dp_packet *pkt, struct conn_key *key)
 }
 
 static struct conn *
-new_conn(struct dp_packet *pkt, struct conn_key *key, long long now)
+new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
+         struct conn_key *key, long long now)
 {
     struct conn *newconn;
 
-    newconn = l4_protos[key->nw_proto]->new_conn(pkt, now);
+    newconn = l4_protos[key->nw_proto]->new_conn(ctb, pkt, now);
 
     if (newconn) {
         newconn->key = *key;
diff --git a/lib/conntrack.h b/lib/conntrack.h
index 54731bd..29bf4b7 100644
--- a/lib/conntrack.h
+++ b/lib/conntrack.h
@@ -20,7 +20,9 @@
 #include <stdbool.h>
 
 #include "hmap.h"
+#include "latch.h"
 #include "odp-netlink.h"
+#include "openvswitch/list.h"
 #include "openvswitch/thread.h"
 #include "openvswitch/types.h"
 #include "ovs-atomic.h"
@@ -60,7 +62,6 @@ struct dp_packet_batch;
 struct conntrack;
 
 void conntrack_init(struct conntrack *);
-void conntrack_run(struct conntrack *);
 void conntrack_destroy(struct conntrack *);
 
 int conntrack_execute(struct conntrack *, struct dp_packet_batch *,
@@ -113,6 +114,10 @@ static inline void ct_lock_destroy(struct ct_lock *lock)
     CT_TIMEOUT(OTHER_MULTIPLE, 60 * 1000) \
     CT_TIMEOUT(OTHER_BIDIR, 30 * 1000) \
 
+/* The smallest of the above values: it is used as an upper bound for the
+ * interval between two rounds of cleanup of expired entries */
+#define CT_TM_MIN (30 * 1000)
+
 enum ct_timeout {
 #define CT_TIMEOUT(NAME, VALUE) CT_TM_##NAME,
     CT_TIMEOUTS
@@ -124,10 +129,29 @@ enum ct_timeout {
  *
  * The connections are kept in different buckets, which are completely
  * independent. The connection bucket is determined by the hash of its key.
+ *
+ * Each bucket has two locks. Acquisition order is, from outermost to
+ * innermost:
+ *
+ *    lock
+ *    cleanup_mutex
+ *
  * */
 struct conntrack_bucket {
+    /* Protects 'connections' and 'exp_lists'.  Used in the fast path */
     struct ct_lock lock;
+    /* Contains the connections in the bucket, indexed by key */
     struct hmap connections OVS_GUARDED;
+    /* For each possible timeout we have a list of connections. When the
+     * timeout of a connection is updated, we move it to the back of the list.
+     * Since the connection in a list have the same relative timeout, the list
+     * will be ordered, with the oldest connections to the front. */
+    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+
+    /* Protects 'next_cleanup'. Used to make sure that there's only one thread
+     * performing the cleanup. */
+    struct ovs_mutex cleanup_mutex;
+    long long next_cleanup OVS_GUARDED;
 };
 
 #define CONNTRACK_BUCKETS_SHIFT 8
@@ -140,6 +164,12 @@ struct conntrack {
     /* Salt for hashing a connection key. */
     uint32_t hash_basis;
 
+    /* The thread performing periodic cleanup of the connection
+     * tracker */
+    pthread_t clean_thread;
+    /* Latch to destroy the 'clean_thread' */
+    struct latch clean_thread_exit;
+
     /* Number of connections currently in the connection tracker. */
     atomic_count n_conn;
     /* Connections limit. When this limit is reached, no new connection
-- 
2.8.1




More information about the dev mailing list