[ovs-dev] [PATCH RFC 4/5] conntrack: Split single cmap to multiple buckets.

Paolo Valerio pvalerio at redhat.com
Mon Nov 29 18:06:05 UTC 2021


The purpose of this commit is to split the current way of storing the
conn nodes. Before this patch the nodes were stored into a single cmap
using ct->lock to avoid concurrent write access.
With this commit a single connection can be stored into one or two (at
most) CONNTRACK_BUCKETS available based on the outcome of the function
hash_scale() on the key.
Every bucket has its local lock that needs to be acquired every time a
node has to be removed/inserted from/to the cmap.
This means that, in case the hash of the CT_DIR_FWD key differs from
the one of the CT_DIR_REV, we can end up having the reference of the
two key nodes in different buckets, and consequently acquiring two locks
(one per bucket).
This approach may be handy in different ways, depending on the way the
stale connection removal gets designed. The attempt of this patch is
to remove the expiration lists, removing the stale entries mostly in
two ways:

- during the key lookup
- when the sweeper task wakes up

the first case is not very strict, as we remove only expired entries
with the same hash. To increase its effectiveness, we should probably
increase the number of buckets and replace the cmaps with other data
structures like rcu lists.
The sweeper task instead takes charge of the remaining stale entries
removal. The heuristics used in the sweeper task are mostly an
example, but could be modified to match any possible uncovered use
case.

Signed-off-by: Paolo Valerio <pvalerio at redhat.com>
---
The cover letter includes further details.
---
 lib/conntrack-private.h |   34 +++
 lib/conntrack-tp.c      |   42 ----
 lib/conntrack.c         |  461 +++++++++++++++++++++++++++++++----------------
 tests/system-traffic.at |    5 -
 4 files changed, 331 insertions(+), 211 deletions(-)

diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
index ea5ba3d9e..a89ff96fa 100644
--- a/lib/conntrack-private.h
+++ b/lib/conntrack-private.h
@@ -95,6 +95,7 @@ struct alg_exp_node {
 
 struct conn_key_node {
     struct conn_key key;
+    uint32_t key_hash;
     struct cmap_node cm_node;
 };
 
@@ -102,7 +103,6 @@ struct conn {
     /* Immutable data. */
     struct conn_key_node key_node[CT_DIR_MAX];
     struct conn_key parent_key; /* Only used for orig_tuple support. */
-    struct ovs_list exp_node;
 
     uint16_t nat_action;
     char *alg;
@@ -121,7 +121,9 @@ struct conn {
     /* Mutable data. */
     bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
                         * control messages; true if reply direction. */
-    bool cleaned; /* True if cleaned from expiry lists. */
+    atomic_flag cleaned; /* True if the entry was stale and one of the
+                          * cleaner (i.e. packet path or sweeper) took
+                          * charge of it. */
 
     /* Immutable data. */
     bool alg_related; /* True if alg data connection. */
@@ -192,10 +194,25 @@ enum ct_timeout {
     N_CT_TM
 };
 
-struct conntrack {
-    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
+#define CONNTRACK_BUCKETS_SHIFT 10
+#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
+
+struct ct_bucket {
+    /* Protects 'conns'. In case of natted conns, there's a high
+     * chance that the forward and the reverse key stand in different
+     * buckets. buckets_lock() should be the preferred way to acquire
+     * these locks (unless otherwise needed), as it deals with the
+     * acquisition order. */
+    struct ovs_mutex lock;
+    /* Contains the connections in the bucket, indexed by
+     * 'struct conn_key'. */
     struct cmap conns OVS_GUARDED;
-    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
+};
+
+struct conntrack {
+    struct ct_bucket buckets[CONNTRACK_BUCKETS];
+    unsigned int next_bucket;
+    struct ovs_mutex ct_lock;
     struct cmap zone_limits OVS_GUARDED;
     struct cmap timeout_policies OVS_GUARDED;
     uint32_t hash_basis; /* Salt for hashing a connection key. */
@@ -220,9 +237,10 @@ struct conntrack {
 };
 
 /* Lock acquisition order:
- *    1. 'ct_lock'
- *    2. 'conn->lock'
- *    3. 'resources_lock'
+ *    1. 'buckets[p1]->lock'
+ *    2  'buckets[p2]->lock' (with p1 < p2)
+ *    3. 'conn->lock'
+ *    4. 'resources_lock'
  */
 
 extern struct ct_l4_proto ct_proto_tcp;
diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
index 9ecb06978..117810528 100644
--- a/lib/conntrack-tp.c
+++ b/lib/conntrack-tp.c
@@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
     return CT_DPIF_TP_ATTR_MAX;
 }
 
-static void
-conn_update_expiration__(struct conntrack *ct, struct conn *conn,
-                         enum ct_timeout tm, long long now,
-                         uint32_t tp_value)
-    OVS_REQUIRES(conn->lock)
-{
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
-    if (!conn->cleaned) {
-        conn->expiration = now + tp_value * 1000;
-        ovs_list_remove(&conn->exp_node);
-        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
-    }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
-
-    ovs_mutex_lock(&conn->lock);
-}
-
 /* The conn entry lock must be held on entry and exit. */
 void
 conn_update_expiration(struct conntrack *ct, struct conn *conn,
@@ -266,42 +245,25 @@ conn_update_expiration(struct conntrack *ct, struct conn *conn,
     struct timeout_policy *tp;
     uint32_t val;
 
-    ovs_mutex_unlock(&conn->lock);
-
-    ovs_mutex_lock(&ct->ct_lock);
-    ovs_mutex_lock(&conn->lock);
     tp = timeout_policy_lookup(ct, conn->tp_id);
     if (tp) {
         val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
     } else {
         val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
     }
-    ovs_mutex_unlock(&conn->lock);
-    ovs_mutex_unlock(&ct->ct_lock);
 
-    ovs_mutex_lock(&conn->lock);
     VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
                 "val=%u sec.",
                 ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
                 conn->tp_id, val);
 
-    conn_update_expiration__(ct, conn, tm, now, val);
-}
-
-static void
-conn_init_expiration__(struct conntrack *ct, struct conn *conn,
-                       enum ct_timeout tm, long long now,
-                       uint32_t tp_value)
-{
-    conn->expiration = now + tp_value * 1000;
-    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
+    conn->expiration = now + val * 1000;
 }
 
 /* ct_lock must be held. */
 void
 conn_init_expiration(struct conntrack *ct, struct conn *conn,
                      enum ct_timeout tm, long long now)
-    OVS_REQUIRES(ct->ct_lock)
 {
     struct timeout_policy *tp;
     uint32_t val;
@@ -317,5 +279,5 @@ conn_init_expiration(struct conntrack *ct, struct conn *conn,
                 ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
                 conn->tp_id, val);
 
-    conn_init_expiration__(ct, conn, tm, now, val);
+    conn->expiration = now + val * 1000;
 }
diff --git a/lib/conntrack.c b/lib/conntrack.c
index a284c57c0..1c019af29 100644
--- a/lib/conntrack.c
+++ b/lib/conntrack.c
@@ -85,9 +85,12 @@ struct zone_limit {
     struct conntrack_zone_limit czl;
 };
 
+static unsigned hash_scale(uint32_t hash);
+static void conn_clean(struct conntrack *ct, struct conn *conn);
 static bool conn_key_extract(struct conntrack *, struct dp_packet *,
                              ovs_be16 dl_type, struct conn_lookup_ctx *,
                              uint16_t zone);
+static uint32_t cached_key_hash(struct conn_key_node *n);
 static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
 static void conn_key_reverse(struct conn_key *);
 static bool valid_new(struct dp_packet *pkt, struct conn_key *);
@@ -109,8 +112,9 @@ static void set_label(struct dp_packet *, struct conn *,
 static void *clean_thread_main(void *f_);
 
 static bool
-nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
-                     const struct nat_action_info_t *nat_info);
+nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
+                          const struct nat_action_info_t *nat_info,
+                          uint32_t *rev_hash);
 
 static uint8_t
 reverse_icmp_type(uint8_t type);
@@ -249,16 +253,17 @@ conntrack_init(void)
     ovs_rwlock_unlock(&ct->resources_lock);
 
     ovs_mutex_init_adaptive(&ct->ct_lock);
-    ovs_mutex_lock(&ct->ct_lock);
-    cmap_init(&ct->conns);
-    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
-        ovs_list_init(&ct->exp_lists[i]);
+
+    ct->next_bucket = 0;
+    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
+        struct ct_bucket *bucket = &ct->buckets[i];
+        cmap_init(&bucket->conns);
+        ovs_mutex_init_recursive(&bucket->lock);
     }
+
     cmap_init(&ct->zone_limits);
     ct->zone_limit_seq = 0;
     timeout_policy_init(ct);
-    ovs_mutex_unlock(&ct->ct_lock);
-
     atomic_count_init(&ct->n_conn, 0);
     atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
     atomic_init(&ct->tcp_seq_chk, true);
@@ -410,9 +415,9 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
 }
 
 static void
-conn_clean(struct conntrack *ct, struct conn *conn)
-    OVS_REQUIRES(ct->ct_lock)
+conn_clean__(struct conntrack *ct, struct conn *conn)
 {
+    struct ct_bucket *bucket;
     struct zone_limit *zl;
     uint32_t hash;
 
@@ -420,8 +425,9 @@ conn_clean(struct conntrack *ct, struct conn *conn)
         expectation_clean(ct, &conn->key_node[CT_DIR_FWD].key);
     }
 
-    hash = conn_key_hash(&conn->key_node[CT_DIR_FWD].key, ct->hash_basis);
-    cmap_remove(&ct->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
+    hash = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
+    bucket = &ct->buckets[hash_scale(hash)];
+    cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
 
     zl = zone_limit_lookup(ct, conn->admit_zone);
     if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
@@ -429,12 +435,10 @@ conn_clean(struct conntrack *ct, struct conn *conn)
     }
 
     if (conn->nat_action) {
-        hash = conn_key_hash(&conn->key_node[CT_DIR_REV].key,
-                             ct->hash_basis);
-        cmap_remove(&ct->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
+        hash = cached_key_hash(&conn->key_node[CT_DIR_REV]);
+        bucket = &ct->buckets[hash_scale(hash)];
+        cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
     }
-    ovs_list_remove(&conn->exp_node);
-    conn->cleaned = true;
     ovsrcu_postpone(delete_conn, conn);
     atomic_count_dec(&ct->n_conn);
 }
@@ -446,22 +450,35 @@ void
 conntrack_destroy(struct conntrack *ct)
 {
     struct conn_key_node *keyn;
+    struct ct_bucket *bucket;
     struct conn *conn;
+    int i;
 
     latch_set(&ct->clean_thread_exit);
     pthread_join(ct->clean_thread, NULL);
     latch_destroy(&ct->clean_thread_exit);
 
-    ovs_mutex_lock(&ct->ct_lock);
-    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
-        if (keyn->key.dir != CT_DIR_FWD) {
-            continue;
+    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+        bucket = &ct->buckets[i];
+        CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
+            if (keyn->key.dir != CT_DIR_FWD) {
+                continue;
+            }
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+            conn_clean(ct, conn);
         }
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        conn_clean(ct, conn);
     }
-    cmap_destroy(&ct->conns);
 
+    /* XXX: we need this loop because connections may be in multiple
+     * buckets.  The former loop should probably use conn_clean__()
+     * or an unlocked version of conn_clean(). */
+    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
+        bucket = &ct->buckets[i];
+        ovs_mutex_destroy(&bucket->lock);
+        cmap_destroy(&ct->buckets[i].conns);
+    }
+
+    ovs_mutex_lock(&ct->ct_lock);
     struct zone_limit *zl;
     CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
         uint32_t hash = zone_key_hash(zl->czl.zone, ct->hash_basis);
@@ -498,45 +515,108 @@ conntrack_destroy(struct conntrack *ct)
 }
 

 
+static unsigned hash_scale(uint32_t hash)
+{
+    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
+}
+
 static bool
-conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
-                uint32_t hash, long long now, struct conn **conn_out,
+conn_key_lookup(struct conntrack *ct, unsigned bucket,
+                const struct conn_key *key, uint32_t hash,
+                long long now, struct conn **conn_out,
                 bool *reply)
 {
+    struct ct_bucket *ctb = &ct->buckets[bucket];
     struct conn_key_node *keyn;
-    struct conn *conn = NULL;
     bool found = false;
+    struct conn *conn;
 
-    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ct->conns) {
+    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ctb->conns) {
         conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+        if (conn_expired(conn, now)) {
+            conn_clean(ct, conn);
+            continue;
+        }
+
         for (int i = CT_DIR_FWD; i < CT_DIR_MAX; i++) {
-            if (!conn_key_cmp(&conn->key_node[i].key, key) &&
-                !conn_expired(conn, now)) {
+            if (!conn_key_cmp(&conn->key_node[i].key, key)) {
                 found = true;
                 if (reply) {
                     *reply = i;
                 }
-                goto out_found;
+
+                goto conn_out_found;
             }
         }
     }
 
-out_found:
-    if (found && conn_out) {
-        *conn_out = conn;
-    } else if (conn_out) {
-        *conn_out = NULL;
+conn_out_found:
+    if (conn_out) {
+        *conn_out = found ? conn : NULL;
     }
 
     return found;
 }
 
+static void
+buckets_unlock(struct conntrack *ct, uint32_t h1, uint32_t h2)
+{
+    unsigned p1 = hash_scale(h1),
+        p2 = hash_scale(h2);
+
+    if (p1 > p2) {
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+        ovs_mutex_unlock(&ct->buckets[p2].lock);
+    } else if (p1 < p2) {
+        ovs_mutex_unlock(&ct->buckets[p2].lock);
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+    } else {
+        ovs_mutex_unlock(&ct->buckets[p1].lock);
+    }
+}
+
+/* Acquires both locks in an ordered way. */
+static void
+buckets_lock(struct conntrack *ct, uint32_t h1, uint32_t h2)
+{
+    unsigned p1 = hash_scale(h1),
+        p2 = hash_scale(h2);
+
+    if (p1 < p2) {
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+        ovs_mutex_lock(&ct->buckets[p2].lock);
+    } else if (p1 > p2) {
+        ovs_mutex_lock(&ct->buckets[p2].lock);
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+    } else {
+        ovs_mutex_lock(&ct->buckets[p1].lock);
+    }
+}
+
+static void
+conn_clean(struct conntrack *ct, struct conn *conn)
+{
+    uint32_t h1, h2;
+
+    if (atomic_flag_test_and_set(&conn->cleaned)) {
+        return;
+    }
+
+    h1 = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
+    h2 = cached_key_hash(&conn->key_node[CT_DIR_REV]);
+    buckets_lock(ct, h1, h2);
+    conn_clean__(ct, conn);
+    buckets_unlock(ct, h1, h2);
+}
+
 static bool
 conn_lookup(struct conntrack *ct, const struct conn_key *key,
             long long now, struct conn **conn_out, bool *reply)
 {
     uint32_t hash = conn_key_hash(key, ct->hash_basis);
-    return conn_key_lookup(ct, key, hash, now, conn_out, reply);
+    unsigned bucket = hash_scale(hash);
+
+    return conn_key_lookup(ct, bucket, key, hash, now, conn_out, reply);
 }
 
 static void
@@ -944,7 +1024,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                const struct nat_action_info_t *nat_action_info,
                const char *helper, const struct alg_exp_node *alg_exp,
                enum ct_alg_ctl_type ct_alg_ctl, uint32_t tp_id)
-    OVS_REQUIRES(ct->ct_lock)
 {
     struct conn *nc = NULL;
     uint32_t rev_hash = ctx->hash;
@@ -954,6 +1033,8 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         return nc;
     }
 
+    /* XXX: We are unlocked here, so we don't know
+     * if the tuple already exists in the table. */
     pkt->md.ct_state = CS_NEW;
 
     if (alg_exp) {
@@ -961,10 +1042,11 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
     }
 
     if (commit) {
-        struct conn_key_node *fwd_key_node, *rev_key_node;
-
         struct zone_limit *zl = zone_limit_lookup_or_default(ct,
                                                              ctx->key.zone);
+        struct conn_key_node *fwd_key_node, *rev_key_node;
+        bool handle_tuple = false;
+
         if (zl && atomic_count_get(&zl->czl.count) >= zl->czl.limit) {
             return nc;
         }
@@ -1007,22 +1089,40 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
                     nc->nat_action = NAT_ACTION_DST;
                 }
             } else {
-                bool nat_res = nat_get_unique_tuple(ct, nc, nat_action_info);
-
-                if (!nat_res) {
-                    goto nat_res_exhaustion;
-                }
+                handle_tuple = true;
             }
-
-            nat_packet(pkt, nc, ctx->icmp_related);
-            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
-            rev_key_node->key.dir = CT_DIR_REV;
-            cmap_insert(&ct->conns, &rev_key_node->cm_node, rev_hash);
         }
 
         ovs_mutex_init_adaptive(&nc->lock);
+        atomic_flag_clear(&nc->cleaned);
         fwd_key_node->key.dir = CT_DIR_FWD;
-        cmap_insert(&ct->conns, &fwd_key_node->cm_node, ctx->hash);
+        rev_key_node->key.dir = CT_DIR_REV;
+
+        if (handle_tuple) {
+            bool nat_res = nat_get_unique_tuple_lock(ct, nc, nat_action_info,
+                                                     &rev_hash);
+
+            if (!nat_res) {
+                goto out_error;
+            }
+        } else {
+            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
+            buckets_lock(ct, ctx->hash, rev_hash);
+        }
+
+        if (conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
+            goto out_error_unlock;
+        }
+
+        fwd_key_node->key_hash = ctx->hash;
+        cmap_insert(&ct->buckets[hash_scale(ctx->hash)].conns,
+                    &fwd_key_node->cm_node, ctx->hash);
+        if (nat_action_info) {
+            rev_key_node->key_hash = rev_hash;
+            cmap_insert(&ct->buckets[hash_scale(rev_hash)].conns,
+                        &rev_key_node->cm_node, rev_hash);
+            nat_packet(pkt, nc, ctx->icmp_related);
+        }
 
         atomic_count_inc(&ct->n_conn);
         ctx->conn = nc; /* For completeness. */
@@ -1033,21 +1133,23 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
         } else {
             nc->admit_zone = INVALID_ZONE;
         }
+        buckets_unlock(ct, ctx->hash, rev_hash);
     }
 
     return nc;
 
+out_error_unlock:
+    buckets_unlock(ct, ctx->hash, rev_hash);
     /* This would be a user error or a DOS attack.  A user error is prevented
      * by allocating enough combinations of NAT addresses when combined with
      * ephemeral ports.  A DOS attack should be protected against with
      * firewall rules or a separate firewall.  Also using zone partitioning
      * can limit DoS impact. */
-nat_res_exhaustion:
-    ovs_list_remove(&nc->exp_node);
+out_error:
+    ovs_mutex_destroy(&nc->lock);
     delete_conn_cmn(nc);
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
-    VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
-                 "if DoS attack, use firewalling and/or zone partitioning.");
+    VLOG_WARN_RL(&rl, "Unable to insert a new connection.");
     return NULL;
 }
 
@@ -1082,12 +1184,7 @@ conn_update_state(struct conntrack *ct, struct dp_packet *pkt,
             pkt->md.ct_state = CS_INVALID;
             break;
         case CT_UPDATE_NEW:
-            ovs_mutex_lock(&ct->ct_lock);
-            if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
-                            now, NULL, NULL)) {
-                conn_clean(ct, conn);
-            }
-            ovs_mutex_unlock(&ct->ct_lock);
+            conn_clean(ct, conn);
             create_new_conn = true;
             break;
         case CT_UPDATE_VALID_NEW:
@@ -1253,6 +1350,8 @@ static void
 initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
                     long long now, bool natted)
 {
+    unsigned bucket = hash_scale(ctx->hash);
+
     if (natted) {
         /* If the packet has been already natted (e.g. a previous
          * action took place), retrieve it performing a lookup of its
@@ -1260,7 +1359,8 @@ initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
         conn_key_reverse(&ctx->key);
     }
 
-    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
+    conn_key_lookup(ct, bucket, &ctx->key, ctx->hash,
+                    now, &ctx->conn, &ctx->reply);
 
     if (natted) {
         if (OVS_LIKELY(ctx->conn)) {
@@ -1287,24 +1387,20 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
             ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper,
             uint32_t tp_id)
 {
+    bool create_new_conn = false;
+
     /* Reset ct_state whenever entering a new zone. */
     if (pkt->md.ct_state && pkt->md.ct_zone != zone) {
         pkt->md.ct_state = 0;
     }
 
-    bool create_new_conn = false;
     initial_conn_lookup(ct, ctx, now, !!(pkt->md.ct_state &
                                          (CS_SRC_NAT | CS_DST_NAT)));
     struct conn *conn = ctx->conn;
 
     /* Delete found entry if in wrong direction. 'force' implies commit. */
     if (OVS_UNLIKELY(force && ctx->reply && conn)) {
-        ovs_mutex_lock(&ct->ct_lock);
-        if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
-                        now, NULL, NULL)) {
-            conn_clean(ct, conn);
-        }
-        ovs_mutex_unlock(&ct->ct_lock);
+        conn_clean(ct, conn);
         conn = NULL;
     }
 
@@ -1338,7 +1434,6 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
     struct alg_exp_node alg_exp_entry;
 
     if (OVS_UNLIKELY(create_new_conn)) {
-
         ovs_rwlock_rdlock(&ct->resources_lock);
         alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
                                      ct->hash_basis,
@@ -1349,12 +1444,9 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
         }
         ovs_rwlock_unlock(&ct->resources_lock);
 
-        ovs_mutex_lock(&ct->ct_lock);
-        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
-            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
-                                  helper, alg_exp, ct_alg_ctl, tp_id);
-        }
-        ovs_mutex_unlock(&ct->ct_lock);
+        conn = conn_not_found(ct, pkt, ctx, commit, now,
+                              nat_action_info, helper, alg_exp,
+                              ct_alg_ctl, tp_id);
     }
 
     write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
@@ -1467,83 +1559,92 @@ set_label(struct dp_packet *pkt, struct conn *conn,
 }
 
 

-/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
- * earliest expiration time among the remaining connections in 'ctb'.  Returns
- * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
- * if 'limit' is reached */
+/* Delete the expired connections from 'bucket', up to 'limit'.
+ * Returns the earliest expiration time among the remaining
+ * connections in 'bucket'.  Returns LLONG_MAX if 'bucket' is empty.
+ * The return value might be smaller than 'now', if 'limit' is
+ * reached. */
 static long long
-ct_sweep(struct conntrack *ct, long long now, size_t limit)
+sweep_bucket(struct conntrack *ct, struct ct_bucket *bucket,
+             long long now)
 {
-    struct conn *conn, *next;
-    long long min_expiration = LLONG_MAX;
-    size_t count = 0;
+    struct conn_key_node *keyn;
+    unsigned int conn_count = 0;
+    struct conn *conn;
+    long long expiration;
 
-    ovs_mutex_lock(&ct->ct_lock);
+    CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
+        if (keyn->key.dir != CT_DIR_FWD) {
+            continue;
+        }
 
-    for (unsigned i = 0; i < N_CT_TM; i++) {
-        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
-            ovs_mutex_lock(&conn->lock);
-            if (now < conn->expiration || count >= limit) {
-                min_expiration = MIN(min_expiration, conn->expiration);
-                ovs_mutex_unlock(&conn->lock);
-                if (count >= limit) {
-                    /* Do not check other lists. */
-                    COVERAGE_INC(conntrack_long_cleanup);
-                    goto out;
-                }
-                break;
-            } else {
-                ovs_mutex_unlock(&conn->lock);
-                conn_clean(ct, conn);
-            }
-            count++;
+        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+        ovs_mutex_lock(&conn->lock);
+        expiration = conn->expiration;
+        ovs_mutex_unlock(&conn->lock);
+
+        if (now >= expiration) {
+            conn_clean(ct, conn);
         }
+
+        conn_count++;
     }
 
-out:
-    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
-             time_msec() - now);
-    ovs_mutex_unlock(&ct->ct_lock);
-    return min_expiration;
+    return conn_count;
 }
 
-/* Cleans up old connection entries from 'ct'.  Returns the time when the
- * next expiration might happen.  The return value might be smaller than
- * 'now', meaning that an internal limit has been reached, and some expired
- * connections have not been deleted. */
+/* Cleans up old connection entries from 'ct'.  Returns the the next
+ * wake up time.  The return value might be smaller than 'now', meaning
+ * that an internal limit has been reached, that is, the table
+ * hasn't been entirely scanned. */
 static long long
 conntrack_clean(struct conntrack *ct, long long now)
 {
-    unsigned int n_conn_limit;
+    long long next_wakeup = now + 90 * 1000;
+    unsigned int n_conn_limit, i, count = 0;
+    size_t clean_end;
+
     atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
-    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
-    long long min_exp = ct_sweep(ct, now, clean_max);
-    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
+    clean_end = n_conn_limit / 64;
+
+    for (i = ct->next_bucket; i < CONNTRACK_BUCKETS; i++) {
+        struct ct_bucket *bucket = &ct->buckets[i];
+
+        count += sweep_bucket(ct, bucket, now);
+
+        if (count > clean_end) {
+            next_wakeup = 0;
+            break;
+        }
+    }
+
+    ct->next_bucket = (i < CONNTRACK_BUCKETS) ? i : 0;
 
     return next_wakeup;
 }
 
 /* Cleanup:
  *
- * We must call conntrack_clean() periodically.  conntrack_clean() return
- * value gives an hint on when the next cleanup must be done (either because
- * there is an actual connection that expires, or because a new connection
- * might be created with the minimum timeout).
+ * We must call conntrack_clean() periodically.  conntrack_clean()
+ * return value gives an hint on when the next cleanup must be done
+ * (either because there is still work to do, or because a new
+ * connection might be created).
  *
  * The logic below has two goals:
  *
- * - We want to reduce the number of wakeups and batch connection cleanup
- *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
- *   are coping with the current cleanup tasks, then we wait at least
- *   5 seconds to do further cleanup.
+ * - When the load is high, we want to avoid to hog the CPU scanning
+ *   all the buckets and their respective CMAPs "at once". For this
+ *   reason, every batch cleanup aims to scan at most n_conn_limit /
+ *   64 entries (more if the buckets contains many entrie) before
+ *   yielding the CPU. In this case, the next wake up will happen in
+ *   CT_CLEAN_MIN_INTERVAL_MS and the scan will resume starting from
+ *   the first bucket not scanned.
  *
- * - We don't want to keep the map locked too long, as we might prevent
- *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
- *   behind, there is at least some 200ms blocks of time when the map will be
- *   left alone, so the datapath can operate unhindered.
- */
-#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
-#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
+ * - We also don't want to scan the buckets so frequently, as going
+ *   through all the connections, during high loads, may be costly in
+ *   terms of CPU time. In this case the next wake up is set to 90
+ *   seconds. */
+#define CT_CLEAN_MIN_INTERVAL_MS 100  /* 0.1 seconds */
 
 static void *
 clean_thread_main(void *f_)
@@ -1556,9 +1657,9 @@ clean_thread_main(void *f_)
         next_wake = conntrack_clean(ct, now);
 
         if (next_wake < now) {
-            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
+            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
         } else {
-            poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
+            poll_timer_wait_until(next_wake);
         }
         latch_wait(&ct->clean_thread_exit);
         poll_block();
@@ -2088,6 +2189,12 @@ ct_endpoint_hash_add(uint32_t hash, const struct ct_endpoint *ep)
     return hash_add_bytes32(hash, (const uint32_t *) ep, sizeof *ep);
 }
 

+static uint32_t
+cached_key_hash(struct conn_key_node *n)
+{
+    return n->key_hash;
+}
+
 /* Symmetric */
 static uint32_t
 conn_key_hash(const struct conn_key *key, uint32_t basis)
@@ -2357,8 +2464,9 @@ next_addr_in_range_guarded(union ct_addr *curr, union ct_addr *min,
  *
  * If none can be found, return exhaustion to the caller. */
 static bool
-nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
-                     const struct nat_action_info_t *nat_info)
+nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
+                          const struct nat_action_info_t *nat_info,
+                          uint32_t *rev_hash)
 {
     union ct_addr min_addr = {0}, max_addr = {0}, curr_addr = {0},
                   guard_addr = {0};
@@ -2392,10 +2500,15 @@ another_round:
                       nat_info->nat_action);
 
     if (!pat_proto) {
+        uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
+        *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
+
+        buckets_lock(ct, key_hash, *rev_hash);
         if (!conn_lookup(ct, rev_key,
                          time_msec(), NULL, NULL)) {
             return true;
         }
+        buckets_unlock(ct, key_hash, *rev_hash);
 
         goto next_addr;
     }
@@ -2404,10 +2517,15 @@ another_round:
         rev_key->src.port = htons(curr_dport);
         FOR_EACH_PORT_IN_RANGE(curr_sport, min_sport, max_sport) {
             rev_key->dst.port = htons(curr_sport);
+            uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
+            *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
+
+            buckets_lock(ct, key_hash, *rev_hash);
             if (!conn_lookup(ct, rev_key,
                              time_msec(), NULL, NULL)) {
                 return true;
             }
+            buckets_unlock(ct, key_hash, *rev_hash);
         }
     }
 
@@ -2615,20 +2733,39 @@ conntrack_dump_next(struct conntrack_dump *dump, struct ct_dpif_entry *entry)
 {
     struct conntrack *ct = dump->ct;
     long long now = time_msec();
+    struct ct_bucket *bucket;
 
-    for (;;) {
-        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
-                                                       &dump->cm_pos);
-        if (!cm_node) {
-            break;
+    while (dump->bucket < CONNTRACK_BUCKETS) {
+        struct cmap_node *cm_node;
+        bucket = &ct->buckets[dump->bucket];
+
+        for (;;) {
+            cm_node = cmap_next_position(&bucket->conns,
+                                         &dump->cm_pos);
+            if (!cm_node) {
+                break;
+            }
+            struct conn_key_node *keyn;
+            struct conn *conn;
+            INIT_CONTAINER(keyn, cm_node, cm_node);
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+
+            if (conn_expired(conn, now)) {
+                /* XXX: ideally this should call conn_clean(). */
+                continue;
+            }
+
+            if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
+                (keyn->key.dir == CT_DIR_FWD)) {
+                conn_to_ct_dpif_entry(conn, entry, now);
+                break;
+            }
         }
-        struct conn_key_node *keyn;
-        struct conn *conn;
-        INIT_CONTAINER(keyn, cm_node, cm_node);
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
-            (keyn->key.dir == CT_DIR_FWD)) {
-            conn_to_ct_dpif_entry(conn, entry, now);
+
+        if (!cm_node) {
+            memset(&dump->cm_pos, 0, sizeof dump->cm_pos);
+            dump->bucket++;
+        } else {
             return 0;
         }
     }
@@ -2648,17 +2785,18 @@ conntrack_flush(struct conntrack *ct, const uint16_t *zone)
     struct conn_key_node *keyn;
     struct conn *conn;
 
-    ovs_mutex_lock(&ct->ct_lock);
-    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
-        if (keyn->key.dir != CT_DIR_FWD) {
-            continue;
-        }
-        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
-        if (!zone || *zone == keyn->key.zone) {
-            conn_clean(ct, conn);
+    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
+        CMAP_FOR_EACH (keyn, cm_node, &ct->buckets[i].conns) {
+            if (keyn->key.dir != CT_DIR_FWD) {
+                continue;
+            }
+
+            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
+            if (!zone || *zone == keyn->key.zone) {
+                conn_clean(ct, conn);
+            }
         }
     }
-    ovs_mutex_unlock(&ct->ct_lock);
 
     return 0;
 }
@@ -2667,15 +2805,19 @@ int
 conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
                       uint16_t zone)
 {
-    int error = 0;
     struct conn_key key;
-    struct conn *conn;
+    struct conn *conn = NULL;
+    unsigned bucket;
+    uint32_t hash;
+    int error = 0;
 
     memset(&key, 0, sizeof(key));
     tuple_to_conn_key(tuple, zone, &key);
-    ovs_mutex_lock(&ct->ct_lock);
-    conn_lookup(ct, &key, time_msec(), &conn, NULL);
 
+    hash = conn_key_hash(&key, ct->hash_basis);
+    bucket = hash_scale(hash);
+
+    conn_key_lookup(ct, bucket, &key, hash, time_msec(), &conn, NULL);
     if (conn) {
         conn_clean(ct, conn);
     } else {
@@ -2683,7 +2825,6 @@ conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple *tuple,
         error = ENOENT;
     }
 
-    ovs_mutex_unlock(&ct->ct_lock);
     return error;
 }
 
diff --git a/tests/system-traffic.at b/tests/system-traffic.at
index d79753a99..9a48a9a7f 100644
--- a/tests/system-traffic.at
+++ b/tests/system-traffic.at
@@ -4611,9 +4611,8 @@ AT_CHECK([ovs-appctl dpctl/dump-conntrack | FORMAT_CT(10.1.1.2) | sed -e 's/dst=
 tcp,orig=(src=10.1.1.1,dst=10.1.1.2,sport=<cleared>,dport=<cleared>),reply=(src=10.1.1.2,dst=10.1.1.2XX,sport=<cleared>,dport=<cleared>),zone=1,protoinfo=(state=<cleared>)
 ])
 
-OVS_TRAFFIC_VSWITCHD_STOP(["dnl
-/Unable to NAT due to tuple space exhaustion - if DoS attack, use firewalling and\/or zone partitioning./d
-/Dropped .* log messages in last .* seconds \(most recently, .* seconds ago\) due to excessive rate/d"])
+OVS_TRAFFIC_VSWITCHD_STOP(["/dnl
+Unable to insert a new connection./d"])
 AT_CLEANUP
 
 AT_SETUP([conntrack - more complex SNAT])




More information about the dev mailing list