[ovs-dev] [PATCH v4 05/17] conntrack: Periodically delete expired connections.

Joe Stringer joe at ovn.org
Sat Jun 25 00:18:04 UTC 2016


On 24 June 2016 at 16:42, Daniele Di Proietto <diproiettod at vmware.com> wrote:
> Thanks for your comments Joe, replies inline
>
>
>
> On 24/06/2016 15:29, "Joe Stringer" <joe at ovn.org> wrote:
>
>>On 10 June 2016 at 15:47, Daniele Di Proietto <diproiettod at vmware.com> wrote:
>>> This commit adds a thread that periodically removes expired connections.
>>>
>>> The expiration time of a connection can be expressed by:
>>>
>>> expiration = now + timeout
>>>
>>> For each possible 'timeout' value (there aren't many) we keep a list.
>>> When the expiration is updated, we move the connection to the back of the
>>> corresponding 'timeout' list. This ways, the list is always ordered by
>>> 'expiration'.
>>>
>>> When the cleanup thread iterates through the lists for expired
>>> connections, it can stop at the first non expired connection.
>>>
>>> Suggested-by: Joe Stringer <joe at ovn.org>
>>> Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
>>> ---
>>>  lib/conntrack-other.c   |  11 +--
>>>  lib/conntrack-private.h |  21 ++++--
>>>  lib/conntrack-tcp.c     |  20 +++---
>>>  lib/conntrack.c         | 184 ++++++++++++++++++++++++++++++++++++++++++++----
>>>  lib/conntrack.h         |  32 ++++++++-
>>>  5 files changed, 237 insertions(+), 31 deletions(-)
>>>
>>> diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c
>>> index 295cb2c..2920889 100644
>>> --- a/lib/conntrack-other.c
>>> +++ b/lib/conntrack-other.c
>>> @@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn)
>>>  }
>>>
>>>  static enum ct_update_res
>>> -other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
>>> -                  bool reply, long long now)
>>> +other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
>>> +                  struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
>>>  {
>>>      struct conn_other *conn = conn_other_cast(conn_);
>>>
>>> @@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
>>>          conn->state = OTHERS_MULTIPLE;
>>>      }
>>>
>>> -    update_expiration(conn_, other_timeouts[conn->state], now);
>>> +    conn_update_expiration(ctb, &conn->up, othler_timeouts[conn->state], now);
>>>
>>>      return CT_UPDATE_VALID;
>>>  }
>>> @@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED)
>>>  }
>>>
>>>  static struct conn *
>>> -other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now)
>>> +other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
>>> +               long long now)
>>>  {
>>>      struct conn_other *conn;
>>>
>>>      conn = xzalloc(sizeof *conn);
>>>      conn->state = OTHERS_FIRST;
>>>
>>> -    update_expiration(&conn->up, other_timeouts[conn->state], now);
>>> +    conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
>>>
>>>      return &conn->up;
>>>  }
>>> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
>>> index d3e0099..4743dc6 100644
>>> --- a/lib/conntrack-private.h
>>> +++ b/lib/conntrack-private.h
>>> @@ -68,10 +68,13 @@ enum ct_update_res {
>>>  };
>>>
>>>  struct ct_l4_proto {
>>> -    struct conn *(*new_conn)(struct dp_packet *pkt, long long now);
>>> +    struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt,
>>> +                             long long now);
>>>      bool (*valid_new)(struct dp_packet *pkt);
>>> -    enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet *pkt,
>>> -                                      bool reply, long long now);
>>> +    enum ct_update_res (*conn_update)(struct conn *conn,
>>> +                                      struct conntrack_bucket *,
>>> +                                      struct dp_packet *pkt, bool reply,
>>> +                                      long long now);
>>>  };
>>>
>>>  extern struct ct_l4_proto ct_proto_tcp;
>>> @@ -80,9 +83,19 @@ extern struct ct_l4_proto ct_proto_other;
>>>  extern long long ct_timeout_val[];
>>>
>>>  static inline void
>>> -update_expiration(struct conn *conn, enum ct_timeout tm, long long now)
>>> +conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn,
>>> +                        enum ct_timeout tm, long long now)
>>>  {
>>>      conn->expiration = now + ct_timeout_val[tm];
>>> +    ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node);
>>> +}
>>> +
>>> +static inline void
>>> +conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn,
>>> +                       enum ct_timeout tm, long long now)
>>> +{
>>> +    ovs_list_remove(&conn->exp_node);
>>> +    conn_init_expiration(ctb, conn, tm, now);
>>>  }
>>>
>>>  #endif /* conntrack-private.h */
>>> diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c
>>> index b574eeb..71eadc1 100644
>>> --- a/lib/conntrack-tcp.c
>>> +++ b/lib/conntrack-tcp.c
>>> @@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt)
>>>  }
>>>
>>>  static enum ct_update_res
>>> -tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
>>> -                long long now)
>>> +tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
>>> +                struct dp_packet *pkt, bool reply, long long now)
>>>  {
>>>      struct conn_tcp *conn = conn_tcp_cast(conn_);
>>>      struct tcp_header *tcp = dp_packet_l4(pkt);
>>> @@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
>>>
>>>          if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
>>>              && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
>>> -            update_expiration(conn_, CT_TM_TCP_CLOSED, now);
>>> +            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
>>>          } else if (src->state >= CT_DPIF_TCPS_CLOSING
>>>                     && dst->state >= CT_DPIF_TCPS_CLOSING) {
>>> -            update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now);
>>> +            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
>>>          } else if (src->state < CT_DPIF_TCPS_ESTABLISHED
>>>                     || dst->state < CT_DPIF_TCPS_ESTABLISHED) {
>>> -            update_expiration(conn_, now, CT_TM_TCP_OPENING);
>>> +            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
>>>          } else if (src->state >= CT_DPIF_TCPS_CLOSING
>>>                     || dst->state >= CT_DPIF_TCPS_CLOSING) {
>>> -            update_expiration(conn_, now, CT_TM_TCP_CLOSING);
>>> +            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
>>>          } else {
>>> -            update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED);
>>> +            conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
>>>          }
>>>      } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
>>>                  || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
>>> @@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt)
>>>  }
>>>
>>>  static struct conn *
>>> -tcp_new_conn(struct dp_packet *pkt, long long now)
>>> +tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
>>> +             long long now)
>>>  {
>>>      struct conn_tcp* newconn = NULL;
>>>      struct tcp_header *tcp = dp_packet_l4(pkt);
>>> @@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now)
>>>      src->state = CT_DPIF_TCPS_SYN_SENT;
>>>      dst->state = CT_DPIF_TCPS_CLOSED;
>>>
>>> -    update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET);
>>> +    conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
>>> +                         now);
>>>
>>>      return &newconn->up;
>>>  }
>>> diff --git a/lib/conntrack.c b/lib/conntrack.c
>>> index 96935bc..5376550 100644
>>> --- a/lib/conntrack.c
>>> +++ b/lib/conntrack.c
>>> @@ -33,6 +33,8 @@
>>>  #include "odp-netlink.h"
>>>  #include "openvswitch/vlog.h"
>>>  #include "ovs-rcu.h"
>>> +#include "ovs-thread.h"
>>> +#include "poll-loop.h"
>>>  #include "random.h"
>>>  #include "timeval.h"
>>>
>>> @@ -56,17 +58,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb,
>>>                              struct conn_lookup_ctx *ctx,
>>>                              long long now);
>>>  static bool valid_new(struct dp_packet *pkt, struct conn_key *);
>>> -static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *,
>>> -                             long long now);
>>> +static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
>>> +                             struct conn_key *, long long now);
>>>  static void delete_conn(struct conn *);
>>> -static enum ct_update_res conn_update(struct conn *, struct dp_packet*,
>>> -                                      bool reply, long long now);
>>> +static enum ct_update_res conn_update(struct conn *,
>>> +                                      struct conntrack_bucket *ctb,
>>> +                                      struct dp_packet *, bool reply,
>>> +                                      long long now);
>>>  static bool conn_expired(struct conn *, long long now);
>>>  static void set_mark(struct dp_packet *, struct conn *,
>>>                       uint32_t val, uint32_t mask);
>>>  static void set_label(struct dp_packet *, struct conn *,
>>>                        const struct ovs_key_ct_labels *val,
>>>                        const struct ovs_key_ct_labels *mask);
>>> +static void *clean_thread_main(void *f_);
>>>
>>>  static struct ct_l4_proto *l4_protos[] = {
>>>      [IPPROTO_TCP] = &ct_proto_tcp,
>>> @@ -90,7 +95,8 @@ long long ct_timeout_val[] = {
>>>  void
>>>  conntrack_init(struct conntrack *ct)
>>>  {
>>> -    unsigned i;
>>> +    unsigned i, j;
>>> +    long long now = time_msec();
>>>
>>>      for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>>          struct conntrack_bucket *ctb = &ct->buckets[i];
>>> @@ -98,11 +104,20 @@ conntrack_init(struct conntrack *ct)
>>>          ct_lock_init(&ctb->lock);
>>>          ct_lock_lock(&ctb->lock);
>>>          hmap_init(&ctb->connections);
>>> +        for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
>>> +            ovs_list_init(&ctb->exp_lists[j]);
>>> +        }
>>>          ct_lock_unlock(&ctb->lock);
>>> +        ovs_mutex_init(&ctb->cleanup_mutex);
>>> +        ovs_mutex_lock(&ctb->cleanup_mutex);
>>> +        ctb->next_cleanup = now + CT_TM_MIN;
>>> +        ovs_mutex_unlock(&ctb->cleanup_mutex);
>>>      }
>>>      ct->hash_basis = random_uint32();
>>>      atomic_count_init(&ct->n_conn, 0);
>>>      atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>>> +    latch_init(&ct->clean_thread_exit);
>>> +    ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
>>>  }
>>>
>>>  /* Destroys the connection tracker 'ct' and frees all the allocated memory. */
>>> @@ -111,10 +126,14 @@ conntrack_destroy(struct conntrack *ct)
>>>  {
>>>      unsigned i;
>>>
>>> +    latch_set(&ct->clean_thread_exit);
>>> +    pthread_join(ct->clean_thread, NULL);
>>> +    latch_destroy(&ct->clean_thread_exit);
>>>      for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>>          struct conntrack_bucket *ctb = &ct->buckets[i];
>>>          struct conn *conn;
>>>
>>> +        ovs_mutex_destroy(&ctb->cleanup_mutex);
>>>          ct_lock_lock(&ctb->lock);
>>
>>I'm confused, is the order to grab cleanup_mutex then ctb->lock, or
>>the other way around? (See also conntrack_clean() if this is the wrong
>>order)
>
> Well, this is not grabbing the mutex, just destroying it. Does it need to
> be in a particular order? I chose this order because it's the reverse of
> what conntrack_init() does.
>
> There's only one place where 'lock' and 'cleanup_mutex' nest, and this is
> conntrack_clean().
>
> I've actually tried to documented the acquisition order in conntrack.h, but
> it's inverted. Sorry about this

Ah, okay. That's the source of my confusion.

> I'll fix the acquisition order in conntrack.h, agreed?

That sounds fine.

>>
>>>          HMAP_FOR_EACH_POP(conn, node, &ctb->connections) {
>>>              atomic_count_dec(&ct->n_conn);
>>> @@ -170,7 +189,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>>              return nc;
>>>          }
>>>
>>> -        nc = new_conn(pkt, &ctx->key, now);
>>> +        nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now);
>>>
>>>          memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key);
>>>
>>> @@ -200,7 +219,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>>          } else {
>>>              enum ct_update_res res;
>>>
>>> -            res = conn_update(conn, pkt, ctx->reply, now);
>>> +            res = conn_update(conn, &ct->buckets[bucket], pkt,
>>> +                              ctx->reply, now);
>>>
>>>              switch (res) {
>>>              case CT_UPDATE_VALID:
>>> @@ -213,6 +233,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>>                  state |= CS_INVALID;
>>>                  break;
>>>              case CT_UPDATE_NEW:
>>> +                ovs_list_remove(&conn->exp_node);
>>>                  hmap_remove(&ct->buckets[bucket].connections, &conn->node);
>>>                  atomic_count_dec(&ct->n_conn);
>>>                  delete_conn(conn);
>>> @@ -345,6 +366,143 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>>>      conn->label = pkt->md.ct_label;
>>>  }
>>>
>>> +/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
>>> + * earliest expiration time among the remaining connections in 'ctb'.  Returns
>>> + * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 'now',
>>> + * if 'limit' is reached */
>>> +static long long
>>> +sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now,
>>> +             size_t limit)
>>> +    OVS_REQUIRES(ctb->lock)
>>> +{
>>> +    struct conn *conn, *next;
>>> +    long long min_expiration = LLONG_MAX;
>>> +    unsigned i;
>>> +    size_t count = 0;
>>> +
>>> +    for (i = 0; i < N_CT_TM; i++) {
>>> +        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
>>> +            if (!conn_expired(conn, now) || count >= limit) {
>>> +                min_expiration = MIN(min_expiration, conn->expiration);
>>> +                if (count >= limit) {
>>> +                    /* Do not check other lists. */
>>> +                    return min_expiration;
>>
>>Do you think we should have a coverage counter for this?
>
> Good idea, I'll add that, thanks.
>
>>
>>> +                }
>>> +                break;
>>> +            }
>>> +            ovs_list_remove(&conn->exp_node);
>>> +            hmap_remove(&ctb->connections, &conn->node);
>>> +            atomic_count_dec(&ct->n_conn);
>>> +            delete_conn(conn);
>>> +            count++;
>>> +        }
>>> +    }
>>> +
>>> +    return min_expiration;
>>> +}
>>> +
>>> +/* Cleans up old connection entries from 'ct'.  Returns the time when the
>>> + * next expiration might happen.  The return value might be smaller than
>>> + * 'now', meaning that an internal limit has been reached, and some expired
>>> + * connections have not been deleted. */
>>> +static long long
>>> +conntrack_clean(struct conntrack *ct, long long now)
>>> +{
>>> +    long long next_wakeup = now + CT_TM_MIN;
>>> +    unsigned int n_conn_limit;
>>> +    size_t clean_count = 0;
>>> +    unsigned i;
>>> +
>>> +    atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
>>> +
>>> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>> +        struct conntrack_bucket *ctb = &ct->buckets[i];
>>> +        size_t prev_count;
>>> +        long long min_exp;
>>> +
>>> +        ovs_mutex_lock(&ctb->cleanup_mutex);
>>>
>>> +        if (ctb->next_cleanup > now) {
>>> +            goto next_bucket;
>>> +        }
>>> +
>>> +        ct_lock_lock(&ctb->lock);
>>> +        prev_count = hmap_count(&ctb->connections);
>>> +        /* If the connections are well distributed among buckets, we want to
>>> +         * limit to 10% of the global limit equally split among buckets. If
>>> +         * the bucket is busier than the others, we limit to 10% of its
>>> +         * current size. */
>>> +        min_exp = sweep_bucket(ct, ctb, now,
>>> +                MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10)));
>>> +        clean_count += prev_count - hmap_count(&ctb->connections);
>>> +
>>> +        if (min_exp > now) {
>>> +            /* We call hmap_shrink() only if sweep_bucket() managed to delete
>>> +             * every expired connection. */
>>> +            hmap_shrink(&ctb->connections);
>>> +        }
>>> +
>>> +        ct_lock_unlock(&ctb->lock);
>>> +
>>> +        ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
>>> +
>>> +next_bucket:
>>> +        next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
>>> +        ovs_mutex_unlock(&ctb->cleanup_mutex);
>>> +    }
>>> +
>>> +    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
>>> +             clean_count, time_msec() - now);
>>> +
>>> +    return next_wakeup;
>>> +}
>>> +
>>> +/* Cleanup:
>>> + *
>>> + *
>>> + * We must call conntrack_clean() periodically.  conntrack_clean() return
>>> + * value gives an hint on when the next cleanup must be done (either because
>>> + * there is an actual connection that expires, or because a new connection
>>> + * might be created with the minimum timeout).
>>> + *
>>> + * The logic below has two goals:
>>> + *
>>> + * - Avoid calling conntrack_clean() too often.  If we call conntrack_clean()
>>> + *   each time a connection expires, the thread will consume 100% CPU, so we
>>> + *   try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch
>>> + *   removal.
>>> + *
>>> + * - On the other hand, it's not a good idea to keep the buckets locked for
>>> + *   too long, as we might prevent traffic from flowing.  If conntrack_clean()
>>> + *   returns a value which is in the past, it means that the internal limit
>>> + *   has been reached and more cleanup is required.  In this case, just wait
>>> + *   CT_CLEAN_MIN_INTERVAL before the next call.
>>> + */
>>> +#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
>>> +#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
>>> +
>>> +static void *
>>> +clean_thread_main(void *f_)
>>> +{
>>> +    struct conntrack *ct = f_;
>>> +
>>> +    while (!latch_is_set(&ct->clean_thread_exit)) {
>>> +        long long next_wake;
>>> +        long long now = time_msec();
>>> +
>>> +        next_wake = conntrack_clean(ct, now);
>>> +
>>> +        if (next_wake < now) {
>>> +            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
>>> +        } else {
>>> +            poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
>>> +        }
>>> +        latch_wait(&ct->clean_thread_exit);
>>> +        poll_block();
>>> +    }
>>
>>Are the logs going to constantly complain that this thread sleeps for
>>more than a second?
>
> poll_timer_wait_until() arranges for the next poll_block to return after the
> interval.  The thread will be sleeping inside the poll_block, there should
> be no warning (unless conntrack_clean takes more than 1s, but in that case
> a warning is appropriate).

Ah, I was mistaken about when the "last_wakeup" was taken. OK.

>>>      struct hmap connections OVS_GUARDED;
>>> +    /* For each possible timeout we have a list of connections. When the
>>> +     * timeout of a connection is updated, we move it to the back of the list.
>>> +     * Since the connection in a list have the same relative timeout, the list
>>> +     * will be ordered, with the oldest connections to the front. */
>>> +    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
>>> +
>>> +    /* Protects 'next_cleanup'. Used to make sure that there's only one thread
>>> +     * performing the cleanup. */
>>> +    struct ovs_mutex cleanup_mutex;
>>
>>I guess cleanup_mutex is mostly to keep Clang threadsafety analysis
>>happy, because the main thread may destroy conns during
>>conntrack_destroy()?
>
> Yes, that mutex is acquired by the cleanup thread only (except for the main thread
> during init and destroy).  It's pretty much useless, except that I find it better
> to have a mutex, rather than documenting that "'next_cleanup' should only be accessed
> by the cleanup thread".
>
> I can remove it if you prefer

No, that's fine as-is. Sometimes I put a comment above things like
that to say "/* Appease clang threadsafety analyser */".



More information about the dev mailing list