[ovs-dev] [PATCH v4 05/17] conntrack: Periodically delete expired connections.
Joe Stringer
joe at ovn.org
Sat Jun 25 00:18:04 UTC 2016
On 24 June 2016 at 16:42, Daniele Di Proietto <diproiettod at vmware.com> wrote:
> Thanks for your comments Joe, replies inline
>
>
>
> On 24/06/2016 15:29, "Joe Stringer" <joe at ovn.org> wrote:
>
>>On 10 June 2016 at 15:47, Daniele Di Proietto <diproiettod at vmware.com> wrote:
>>> This commit adds a thread that periodically removes expired connections.
>>>
>>> The expiration time of a connection can be expressed by:
>>>
>>> expiration = now + timeout
>>>
>>> For each possible 'timeout' value (there aren't many) we keep a list.
>>> When the expiration is updated, we move the connection to the back of the
>>> corresponding 'timeout' list. This ways, the list is always ordered by
>>> 'expiration'.
>>>
>>> When the cleanup thread iterates through the lists for expired
>>> connections, it can stop at the first non expired connection.
>>>
>>> Suggested-by: Joe Stringer <joe at ovn.org>
>>> Signed-off-by: Daniele Di Proietto <diproiettod at vmware.com>
>>> ---
>>> lib/conntrack-other.c | 11 +--
>>> lib/conntrack-private.h | 21 ++++--
>>> lib/conntrack-tcp.c | 20 +++---
>>> lib/conntrack.c | 184 ++++++++++++++++++++++++++++++++++++++++++++----
>>> lib/conntrack.h | 32 ++++++++-
>>> 5 files changed, 237 insertions(+), 31 deletions(-)
>>>
>>> diff --git a/lib/conntrack-other.c b/lib/conntrack-other.c
>>> index 295cb2c..2920889 100644
>>> --- a/lib/conntrack-other.c
>>> +++ b/lib/conntrack-other.c
>>> @@ -43,8 +43,8 @@ conn_other_cast(const struct conn *conn)
>>> }
>>>
>>> static enum ct_update_res
>>> -other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
>>> - bool reply, long long now)
>>> +other_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
>>> + struct dp_packet *pkt OVS_UNUSED, bool reply, long long now)
>>> {
>>> struct conn_other *conn = conn_other_cast(conn_);
>>>
>>> @@ -54,7 +54,7 @@ other_conn_update(struct conn *conn_, struct dp_packet *pkt OVS_UNUSED,
>>> conn->state = OTHERS_MULTIPLE;
>>> }
>>>
>>> - update_expiration(conn_, other_timeouts[conn->state], now);
>>> + conn_update_expiration(ctb, &conn->up, othler_timeouts[conn->state], now);
>>>
>>> return CT_UPDATE_VALID;
>>> }
>>> @@ -66,14 +66,15 @@ other_valid_new(struct dp_packet *pkt OVS_UNUSED)
>>> }
>>>
>>> static struct conn *
>>> -other_new_conn(struct dp_packet *pkt OVS_UNUSED, long long now)
>>> +other_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt OVS_UNUSED,
>>> + long long now)
>>> {
>>> struct conn_other *conn;
>>>
>>> conn = xzalloc(sizeof *conn);
>>> conn->state = OTHERS_FIRST;
>>>
>>> - update_expiration(&conn->up, other_timeouts[conn->state], now);
>>> + conn_init_expiration(ctb, &conn->up, other_timeouts[conn->state], now);
>>>
>>> return &conn->up;
>>> }
>>> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
>>> index d3e0099..4743dc6 100644
>>> --- a/lib/conntrack-private.h
>>> +++ b/lib/conntrack-private.h
>>> @@ -68,10 +68,13 @@ enum ct_update_res {
>>> };
>>>
>>> struct ct_l4_proto {
>>> - struct conn *(*new_conn)(struct dp_packet *pkt, long long now);
>>> + struct conn *(*new_conn)(struct conntrack_bucket *, struct dp_packet *pkt,
>>> + long long now);
>>> bool (*valid_new)(struct dp_packet *pkt);
>>> - enum ct_update_res (*conn_update)(struct conn *conn, struct dp_packet *pkt,
>>> - bool reply, long long now);
>>> + enum ct_update_res (*conn_update)(struct conn *conn,
>>> + struct conntrack_bucket *,
>>> + struct dp_packet *pkt, bool reply,
>>> + long long now);
>>> };
>>>
>>> extern struct ct_l4_proto ct_proto_tcp;
>>> @@ -80,9 +83,19 @@ extern struct ct_l4_proto ct_proto_other;
>>> extern long long ct_timeout_val[];
>>>
>>> static inline void
>>> -update_expiration(struct conn *conn, enum ct_timeout tm, long long now)
>>> +conn_init_expiration(struct conntrack_bucket *ctb, struct conn *conn,
>>> + enum ct_timeout tm, long long now)
>>> {
>>> conn->expiration = now + ct_timeout_val[tm];
>>> + ovs_list_push_back(&ctb->exp_lists[tm], &conn->exp_node);
>>> +}
>>> +
>>> +static inline void
>>> +conn_update_expiration(struct conntrack_bucket *ctb, struct conn *conn,
>>> + enum ct_timeout tm, long long now)
>>> +{
>>> + ovs_list_remove(&conn->exp_node);
>>> + conn_init_expiration(ctb, conn, tm, now);
>>> }
>>>
>>> #endif /* conntrack-private.h */
>>> diff --git a/lib/conntrack-tcp.c b/lib/conntrack-tcp.c
>>> index b574eeb..71eadc1 100644
>>> --- a/lib/conntrack-tcp.c
>>> +++ b/lib/conntrack-tcp.c
>>> @@ -152,8 +152,8 @@ tcp_payload_length(struct dp_packet *pkt)
>>> }
>>>
>>> static enum ct_update_res
>>> -tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
>>> - long long now)
>>> +tcp_conn_update(struct conn *conn_, struct conntrack_bucket *ctb,
>>> + struct dp_packet *pkt, bool reply, long long now)
>>> {
>>> struct conn_tcp *conn = conn_tcp_cast(conn_);
>>> struct tcp_header *tcp = dp_packet_l4(pkt);
>>> @@ -319,18 +319,18 @@ tcp_conn_update(struct conn* conn_, struct dp_packet *pkt, bool reply,
>>>
>>> if (src->state >= CT_DPIF_TCPS_FIN_WAIT_2
>>> && dst->state >= CT_DPIF_TCPS_FIN_WAIT_2) {
>>> - update_expiration(conn_, CT_TM_TCP_CLOSED, now);
>>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSED, now);
>>> } else if (src->state >= CT_DPIF_TCPS_CLOSING
>>> && dst->state >= CT_DPIF_TCPS_CLOSING) {
>>> - update_expiration(conn_, CT_TM_TCP_FIN_WAIT, now);
>>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_FIN_WAIT, now);
>>> } else if (src->state < CT_DPIF_TCPS_ESTABLISHED
>>> || dst->state < CT_DPIF_TCPS_ESTABLISHED) {
>>> - update_expiration(conn_, now, CT_TM_TCP_OPENING);
>>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_OPENING, now);
>>> } else if (src->state >= CT_DPIF_TCPS_CLOSING
>>> || dst->state >= CT_DPIF_TCPS_CLOSING) {
>>> - update_expiration(conn_, now, CT_TM_TCP_CLOSING);
>>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_CLOSING, now);
>>> } else {
>>> - update_expiration(conn_, now, CT_TM_TCP_ESTABLISHED);
>>> + conn_update_expiration(ctb, &conn->up, CT_TM_TCP_ESTABLISHED, now);
>>> }
>>> } else if ((dst->state < CT_DPIF_TCPS_SYN_SENT
>>> || dst->state >= CT_DPIF_TCPS_FIN_WAIT_2
>>> @@ -414,7 +414,8 @@ tcp_valid_new(struct dp_packet *pkt)
>>> }
>>>
>>> static struct conn *
>>> -tcp_new_conn(struct dp_packet *pkt, long long now)
>>> +tcp_new_conn(struct conntrack_bucket *ctb, struct dp_packet *pkt,
>>> + long long now)
>>> {
>>> struct conn_tcp* newconn = NULL;
>>> struct tcp_header *tcp = dp_packet_l4(pkt);
>>> @@ -450,7 +451,8 @@ tcp_new_conn(struct dp_packet *pkt, long long now)
>>> src->state = CT_DPIF_TCPS_SYN_SENT;
>>> dst->state = CT_DPIF_TCPS_CLOSED;
>>>
>>> - update_expiration(&newconn->up, now, CT_TM_TCP_FIRST_PACKET);
>>> + conn_init_expiration(ctb, &newconn->up, CT_TM_TCP_FIRST_PACKET,
>>> + now);
>>>
>>> return &newconn->up;
>>> }
>>> diff --git a/lib/conntrack.c b/lib/conntrack.c
>>> index 96935bc..5376550 100644
>>> --- a/lib/conntrack.c
>>> +++ b/lib/conntrack.c
>>> @@ -33,6 +33,8 @@
>>> #include "odp-netlink.h"
>>> #include "openvswitch/vlog.h"
>>> #include "ovs-rcu.h"
>>> +#include "ovs-thread.h"
>>> +#include "poll-loop.h"
>>> #include "random.h"
>>> #include "timeval.h"
>>>
>>> @@ -56,17 +58,20 @@ static void conn_key_lookup(struct conntrack_bucket *ctb,
>>> struct conn_lookup_ctx *ctx,
>>> long long now);
>>> static bool valid_new(struct dp_packet *pkt, struct conn_key *);
>>> -static struct conn *new_conn(struct dp_packet *pkt, struct conn_key *,
>>> - long long now);
>>> +static struct conn *new_conn(struct conntrack_bucket *, struct dp_packet *pkt,
>>> + struct conn_key *, long long now);
>>> static void delete_conn(struct conn *);
>>> -static enum ct_update_res conn_update(struct conn *, struct dp_packet*,
>>> - bool reply, long long now);
>>> +static enum ct_update_res conn_update(struct conn *,
>>> + struct conntrack_bucket *ctb,
>>> + struct dp_packet *, bool reply,
>>> + long long now);
>>> static bool conn_expired(struct conn *, long long now);
>>> static void set_mark(struct dp_packet *, struct conn *,
>>> uint32_t val, uint32_t mask);
>>> static void set_label(struct dp_packet *, struct conn *,
>>> const struct ovs_key_ct_labels *val,
>>> const struct ovs_key_ct_labels *mask);
>>> +static void *clean_thread_main(void *f_);
>>>
>>> static struct ct_l4_proto *l4_protos[] = {
>>> [IPPROTO_TCP] = &ct_proto_tcp,
>>> @@ -90,7 +95,8 @@ long long ct_timeout_val[] = {
>>> void
>>> conntrack_init(struct conntrack *ct)
>>> {
>>> - unsigned i;
>>> + unsigned i, j;
>>> + long long now = time_msec();
>>>
>>> for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>> struct conntrack_bucket *ctb = &ct->buckets[i];
>>> @@ -98,11 +104,20 @@ conntrack_init(struct conntrack *ct)
>>> ct_lock_init(&ctb->lock);
>>> ct_lock_lock(&ctb->lock);
>>> hmap_init(&ctb->connections);
>>> + for (j = 0; j < ARRAY_SIZE(ctb->exp_lists); j++) {
>>> + ovs_list_init(&ctb->exp_lists[j]);
>>> + }
>>> ct_lock_unlock(&ctb->lock);
>>> + ovs_mutex_init(&ctb->cleanup_mutex);
>>> + ovs_mutex_lock(&ctb->cleanup_mutex);
>>> + ctb->next_cleanup = now + CT_TM_MIN;
>>> + ovs_mutex_unlock(&ctb->cleanup_mutex);
>>> }
>>> ct->hash_basis = random_uint32();
>>> atomic_count_init(&ct->n_conn, 0);
>>> atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>>> + latch_init(&ct->clean_thread_exit);
>>> + ct->clean_thread = ovs_thread_create("ct_clean", clean_thread_main, ct);
>>> }
>>>
>>> /* Destroys the connection tracker 'ct' and frees all the allocated memory. */
>>> @@ -111,10 +126,14 @@ conntrack_destroy(struct conntrack *ct)
>>> {
>>> unsigned i;
>>>
>>> + latch_set(&ct->clean_thread_exit);
>>> + pthread_join(ct->clean_thread, NULL);
>>> + latch_destroy(&ct->clean_thread_exit);
>>> for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>> struct conntrack_bucket *ctb = &ct->buckets[i];
>>> struct conn *conn;
>>>
>>> + ovs_mutex_destroy(&ctb->cleanup_mutex);
>>> ct_lock_lock(&ctb->lock);
>>
>>I'm confused, is the order to grab cleanup_mutex then ctb->lock, or
>>the other way around? (See also conntrack_clean() if this is the wrong
>>order)
>
> Well, this is not grabbing the mutex, just destroying it. Does it need to
> be in a particular order? I chose this order because it's the reverse of
> what conntrack_init() does.
>
> There's only one place where 'lock' and 'cleanup_mutex' nest, and this is
> conntrack_clean().
>
> I've actually tried to documented the acquisition order in conntrack.h, but
> it's inverted. Sorry about this
Ah, okay. That's the source of my confusion.
> I'll fix the acquisition order in conntrack.h, agreed?
That sounds fine.
>>
>>> HMAP_FOR_EACH_POP(conn, node, &ctb->connections) {
>>> atomic_count_dec(&ct->n_conn);
>>> @@ -170,7 +189,7 @@ conn_not_found(struct conntrack *ct, struct dp_packet *pkt,
>>> return nc;
>>> }
>>>
>>> - nc = new_conn(pkt, &ctx->key, now);
>>> + nc = new_conn(&ct->buckets[bucket], pkt, &ctx->key, now);
>>>
>>> memcpy(&nc->rev_key, &ctx->key, sizeof nc->rev_key);
>>>
>>> @@ -200,7 +219,8 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>> } else {
>>> enum ct_update_res res;
>>>
>>> - res = conn_update(conn, pkt, ctx->reply, now);
>>> + res = conn_update(conn, &ct->buckets[bucket], pkt,
>>> + ctx->reply, now);
>>>
>>> switch (res) {
>>> case CT_UPDATE_VALID:
>>> @@ -213,6 +233,7 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>>> state |= CS_INVALID;
>>> break;
>>> case CT_UPDATE_NEW:
>>> + ovs_list_remove(&conn->exp_node);
>>> hmap_remove(&ct->buckets[bucket].connections, &conn->node);
>>> atomic_count_dec(&ct->n_conn);
>>> delete_conn(conn);
>>> @@ -345,6 +366,143 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>>> conn->label = pkt->md.ct_label;
>>> }
>>>
>>> +/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
>>> + * earliest expiration time among the remaining connections in 'ctb'. Returns
>>> + * LLONG_MAX if 'ctb' is empty. The return value might be smaller than 'now',
>>> + * if 'limit' is reached */
>>> +static long long
>>> +sweep_bucket(struct conntrack *ct, struct conntrack_bucket *ctb, long long now,
>>> + size_t limit)
>>> + OVS_REQUIRES(ctb->lock)
>>> +{
>>> + struct conn *conn, *next;
>>> + long long min_expiration = LLONG_MAX;
>>> + unsigned i;
>>> + size_t count = 0;
>>> +
>>> + for (i = 0; i < N_CT_TM; i++) {
>>> + LIST_FOR_EACH_SAFE (conn, next, exp_node, &ctb->exp_lists[i]) {
>>> + if (!conn_expired(conn, now) || count >= limit) {
>>> + min_expiration = MIN(min_expiration, conn->expiration);
>>> + if (count >= limit) {
>>> + /* Do not check other lists. */
>>> + return min_expiration;
>>
>>Do you think we should have a coverage counter for this?
>
> Good idea, I'll add that, thanks.
>
>>
>>> + }
>>> + break;
>>> + }
>>> + ovs_list_remove(&conn->exp_node);
>>> + hmap_remove(&ctb->connections, &conn->node);
>>> + atomic_count_dec(&ct->n_conn);
>>> + delete_conn(conn);
>>> + count++;
>>> + }
>>> + }
>>> +
>>> + return min_expiration;
>>> +}
>>> +
>>> +/* Cleans up old connection entries from 'ct'. Returns the time when the
>>> + * next expiration might happen. The return value might be smaller than
>>> + * 'now', meaning that an internal limit has been reached, and some expired
>>> + * connections have not been deleted. */
>>> +static long long
>>> +conntrack_clean(struct conntrack *ct, long long now)
>>> +{
>>> + long long next_wakeup = now + CT_TM_MIN;
>>> + unsigned int n_conn_limit;
>>> + size_t clean_count = 0;
>>> + unsigned i;
>>> +
>>> + atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
>>> +
>>> + for (i = 0; i < CONNTRACK_BUCKETS; i++) {
>>> + struct conntrack_bucket *ctb = &ct->buckets[i];
>>> + size_t prev_count;
>>> + long long min_exp;
>>> +
>>> + ovs_mutex_lock(&ctb->cleanup_mutex);
>>>
>>> + if (ctb->next_cleanup > now) {
>>> + goto next_bucket;
>>> + }
>>> +
>>> + ct_lock_lock(&ctb->lock);
>>> + prev_count = hmap_count(&ctb->connections);
>>> + /* If the connections are well distributed among buckets, we want to
>>> + * limit to 10% of the global limit equally split among buckets. If
>>> + * the bucket is busier than the others, we limit to 10% of its
>>> + * current size. */
>>> + min_exp = sweep_bucket(ct, ctb, now,
>>> + MAX(prev_count/10, n_conn_limit/(CONNTRACK_BUCKETS*10)));
>>> + clean_count += prev_count - hmap_count(&ctb->connections);
>>> +
>>> + if (min_exp > now) {
>>> + /* We call hmap_shrink() only if sweep_bucket() managed to delete
>>> + * every expired connection. */
>>> + hmap_shrink(&ctb->connections);
>>> + }
>>> +
>>> + ct_lock_unlock(&ctb->lock);
>>> +
>>> + ctb->next_cleanup = MIN(min_exp, now + CT_TM_MIN);
>>> +
>>> +next_bucket:
>>> + next_wakeup = MIN(next_wakeup, ctb->next_cleanup);
>>> + ovs_mutex_unlock(&ctb->cleanup_mutex);
>>> + }
>>> +
>>> + VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec",
>>> + clean_count, time_msec() - now);
>>> +
>>> + return next_wakeup;
>>> +}
>>> +
>>> +/* Cleanup:
>>> + *
>>> + *
>>> + * We must call conntrack_clean() periodically. conntrack_clean() return
>>> + * value gives an hint on when the next cleanup must be done (either because
>>> + * there is an actual connection that expires, or because a new connection
>>> + * might be created with the minimum timeout).
>>> + *
>>> + * The logic below has two goals:
>>> + *
>>> + * - Avoid calling conntrack_clean() too often. If we call conntrack_clean()
>>> + * each time a connection expires, the thread will consume 100% CPU, so we
>>> + * try to call the function _at most_ once every CT_CLEAN_INTERVAL, to batch
>>> + * removal.
>>> + *
>>> + * - On the other hand, it's not a good idea to keep the buckets locked for
>>> + * too long, as we might prevent traffic from flowing. If conntrack_clean()
>>> + * returns a value which is in the past, it means that the internal limit
>>> + * has been reached and more cleanup is required. In this case, just wait
>>> + * CT_CLEAN_MIN_INTERVAL before the next call.
>>> + */
>>> +#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
>>> +#define CT_CLEAN_MIN_INTERVAL 200 /* 0.2 seconds */
>>> +
>>> +static void *
>>> +clean_thread_main(void *f_)
>>> +{
>>> + struct conntrack *ct = f_;
>>> +
>>> + while (!latch_is_set(&ct->clean_thread_exit)) {
>>> + long long next_wake;
>>> + long long now = time_msec();
>>> +
>>> + next_wake = conntrack_clean(ct, now);
>>> +
>>> + if (next_wake < now) {
>>> + poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
>>> + } else {
>>> + poll_timer_wait_until(MAX(next_wake, now + CT_CLEAN_INTERVAL));
>>> + }
>>> + latch_wait(&ct->clean_thread_exit);
>>> + poll_block();
>>> + }
>>
>>Are the logs going to constantly complain that this thread sleeps for
>>more than a second?
>
> poll_timer_wait_until() arranges for the next poll_block to return after the
> interval. The thread will be sleeping inside the poll_block, there should
> be no warning (unless conntrack_clean takes more than 1s, but in that case
> a warning is appropriate).
Ah, I was mistaken about when the "last_wakeup" was taken. OK.
>>> struct hmap connections OVS_GUARDED;
>>> + /* For each possible timeout we have a list of connections. When the
>>> + * timeout of a connection is updated, we move it to the back of the list.
>>> + * Since the connection in a list have the same relative timeout, the list
>>> + * will be ordered, with the oldest connections to the front. */
>>> + struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
>>> +
>>> + /* Protects 'next_cleanup'. Used to make sure that there's only one thread
>>> + * performing the cleanup. */
>>> + struct ovs_mutex cleanup_mutex;
>>
>>I guess cleanup_mutex is mostly to keep Clang threadsafety analysis
>>happy, because the main thread may destroy conns during
>>conntrack_destroy()?
>
> Yes, that mutex is acquired by the cleanup thread only (except for the main thread
> during init and destroy). It's pretty much useless, except that I find it better
> to have a mutex, rather than documenting that "'next_cleanup' should only be accessed
> by the cleanup thread".
>
> I can remove it if you prefer
No, that's fine as-is. Sometimes I put a comment above things like
that to say "/* Appease clang threadsafety analyser */".
More information about the dev
mailing list