[ovs-dev] [PATCH v3] ovsdb: provide raft and command interfaces with priority

Anton Ivanov anton.ivanov at cambridgegreys.com
Fri Jul 9 12:42:17 UTC 2021


On 09/07/2021 13:14, Dumitru Ceara wrote:
> On 6/23/21 12:28 PM, anton.ivanov at cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> Set a soft time limit of "raft election timer"/2 on ovsdb
>> processing.
>>
>> This improves behaviour in large heavily loaded clusters.
>> While it cannot fully eliminate spurious raft elections
>> under heavy load, it significantly decreases their number.
>>
>> Processing is (to the extent possible) restarted where it
>> stopped on the previous iteration to ensure that sessions
>> towards the tail of the session list are not starved.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>> ---
> Hi Anton,
>
> Thanks for the patch, I left some comments below.
>
>>   ovsdb/jsonrpc-server.c | 86 ++++++++++++++++++++++++++++++++++++++++--
>>   ovsdb/jsonrpc-server.h |  2 +-
>>   ovsdb/ovsdb-server.c   | 24 +++++++++++-
>>   ovsdb/raft.c           |  5 +++
>>   ovsdb/raft.h           |  3 ++
>>   ovsdb/storage.c        | 11 ++++++
>>   ovsdb/storage.h        |  2 +
>>   7 files changed, 127 insertions(+), 6 deletions(-)
>>
>> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
>> index 4e2dfc3d7..a2f47aae7 100644
>> --- a/ovsdb/jsonrpc-server.c
>> +++ b/ovsdb/jsonrpc-server.c
>> @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
>>       struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
>>   static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
>>                                                  struct ovsdb *);
>> -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
>> +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *,
>> +                                          uint64_t limit);
>>   static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
>>   static void ovsdb_jsonrpc_session_get_memory_usage_all(
>>       const struct ovsdb_jsonrpc_remote *, struct simap *usage);
>> @@ -128,6 +129,8 @@ struct ovsdb_jsonrpc_server {
>>       bool read_only;            /* This server is does not accept any
>>                                     transactions that can modify the database. */
>>       struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
>> +    struct ovsdb_jsonrpc_remote *skip_to;
>> +    bool must_wake_up;
> A comment for each of these would be nice to have in my opinion.
>
> Regarding 'must_wake_up', Ilya had a comment on v2, and I think he's
> right, shouldn't ovsdb_jsonrpc_server_wait(), which calls
> ovsdb_jsonrpc_session_wait_all(remote), wake us up if we skipped a
> session?

That wakes us up if there is outstanding IO only.

Imagine the following situation:

1. N sessions. Loop runs. Transaction is performed on session No 1.

2. The result of the transaction is a change which would trigger one or more monitors on the remaining sessions 2 to N.

3. The loop is exited early after session 1 to service raft.

4. wait_all walks all sessions. At that point there is no outstanding incoming IO - clients are all waiting.

5. poll_wait() goes to sleep.

Well... we have not serviced the monitors for 2 to N.

So we need to do a poll_immediate_wake() and that is what happens if must_wake_up is true.

We can actually skip wait_all after that. It is wait_all which is surplus to requirements after an interruption, not the must_wake_up

> In that case we don't need 'must_wake_up' at all.
>
>>   };
>>   
>>   /* A configured remote.  This is either a passive stream listener plus a list
>> @@ -137,6 +140,7 @@ struct ovsdb_jsonrpc_remote {
>>       struct ovsdb_jsonrpc_server *server;
>>       struct pstream *listener;   /* Listener, if passive. */
>>       struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
>> +    struct ovsdb_jsonrpc_session *skip_to;
>>       uint8_t dscp;
>>       bool read_only;
>>       char *role;
>> @@ -159,6 +163,8 @@ ovsdb_jsonrpc_server_create(bool read_only)
>>       ovsdb_server_init(&server->up);
>>       shash_init(&server->remotes);
>>       server->read_only = read_only;
>> +    server->must_wake_up = false;
>> +    server->skip_to = NULL;
> We should either initialize all fields or none.  'server' is zalloc-ed
> just above so all fields are already 0.  For that reason the code wasn't
> explicitly setting 'n_sessions' to 0.  I think it's fine to skip setting
> 'must_wake_up' and 'skip_to' as well.

Ack.

>
>>       return server;
>>   }
>>   
>> @@ -279,6 +285,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
>>       remote->dscp = options->dscp;
>>       remote->read_only = options->read_only;
>>       remote->role = nullable_xstrdup(options->role);
>> +    remote->skip_to = NULL;
>>       shash_add(&svr->remotes, name, remote);
>>   
>>       if (!listener) {
>> @@ -293,6 +300,13 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
>>   {
>>       struct ovsdb_jsonrpc_remote *remote = node->data;
>>   
>> +    /* safest option - rerun all remotes */
> Nit: "Safest option - rerun all remotes."
>
>> +    if (remote->server->skip_to) {
>> +        remote->server->skip_to = NULL;
>> +    }
>> +
>> +    remote->skip_to = NULL;
> We free 'remote' just below and we don't use remote->skip_to anywhere
> until then.  It's probably fine to remove this line.
>
>> +
>>       ovsdb_jsonrpc_session_close_all(remote);
>>       pstream_close(remote->listener);
>>       shash_delete(&remote->server->remotes, node);
>> @@ -378,12 +392,25 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
>>   }
>>   
>>   void
>> -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>> +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit)
>>   {
>>       struct shash_node *node;
>> +    uint64_t elapsed = 0, start_time = 0;
>> +
>> +    start_time = time_msec();
> We just set 'start_time' to 0 above.  I'd rewrite this to:
>
> uint64_t start_time = time_msec();
> uint64_t elapsed = 0;

Ack

>
>> +
>> +    svr->must_wake_up = false;
>>   
>>       SHASH_FOR_EACH (node, &svr->remotes) {
>>           struct ovsdb_jsonrpc_remote *remote = node->data;
>> +        if (svr->skip_to) {
>> +            if (remote != svr->skip_to) {
>> +                continue;
>> +            } else {
>> +                svr->skip_to = NULL;
>> +                svr->must_wake_up = true;
>> +            }
>> +        }
>>   
>>           if (remote->listener) {
>>               struct stream *stream;
>> @@ -403,7 +430,14 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>>               }
>>           }
>>   
>> -        ovsdb_jsonrpc_session_run_all(remote);
>> +        ovsdb_jsonrpc_session_run_all(remote, limit - elapsed);
> Kind of a nit: "elapsed" is not 100% accurate here but I guess that's
> acceptable, right?  Should we add a comment?
>
>> +
>> +        elapsed = time_msec() - start_time;
>> +        if (elapsed > limit) {
>> +            svr->must_wake_up = true;
>> +            svr->skip_to = remote;
>> +            break;
>> +        }
>>       }
>>   }
>>   
>> @@ -412,6 +446,11 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
>>   {
>>       struct shash_node *node;
>>   
>> +    if (svr->must_wake_up) {
>> +        poll_immediate_wake();
>> +        svr->must_wake_up = false;
>> +    }
>> +
>>       SHASH_FOR_EACH (node, &svr->remotes) {
>>           struct ovsdb_jsonrpc_remote *remote = node->data;
>>   
>> @@ -583,15 +622,54 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
>>   }
>>   
>>   static void
>> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
>> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
>> +                              uint64_t limit)
>>   {
>>       struct ovsdb_jsonrpc_session *s, *next;
>> +    uint64_t start_time;
>> +
>> +    start_time = time_msec();
> This can be initialized at declaration time, above.
>
>> +
> This empty line can be removed.
>
>>   
>>       LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
>> +        if ((remote->skip_to) && (s != remote->skip_to)) {
> No need for the additional (), this can just be:
>
> if (remote->skip_to && s != remote->skip_to) {
>
>> +            /* processing was interrupted, we skip to the point
>> +             * where we had to interrupt it
>> +             * we cannot use the _CONTINUE macro as it is not safe
>> +             * if the list has been changed in the meantime.
>> +             */
> Nit: s/processing/Processing.
>
> And the comment needs some text wrapping.
>
>> +            continue;
>> +        }
>> +
>> +        /* set NEXT as skip point if we need to restart processing
> Nit: s/set NEXT/Set 'skip_to'/
>
>> +         * This way, even if current is removed we always have a
>> +         * valid pointer.
>> +         */
>> +        remote->skip_to =
>> +            CONTAINER_OF(s->node.next, struct ovsdb_jsonrpc_session, node);
>> +        if (remote->skip_to == s) {
>> +            remote->skip_to = NULL;
>> +        }
>> +
>>           int error = ovsdb_jsonrpc_session_run(s);
>> +
>>           if (error) {
>>               ovsdb_jsonrpc_session_close(s);
> [0] If 's' is the only entry in 'remote->sessions' then, at this point,
> remote->skip_to == s, and ovsdb_jsonrpc_session_close(s) removes 's'
> from remote->sessions.
>
>>           }
>> +
>> +        if (time_msec() - start_time > limit) {
>> +            /* we bail leaving skip_to set. Next processing iteration
> Nit: s/we/We
>
>> +             * will skip everything up to skip_to which was set to
>> +             * next a few lines above from here.
>> +             */
>> +            break;
> If we hit [0] and we happen to break here then remote->skip_to points
> to a freed session and will likely cause the if in the beginning of
> this loop to skip all sessions next times ovsdb_jsonrpc_session_run_all()
> is called.  Or am I missing something?

It should never point to a freed session. Either NULL - resulting in running all sessions for this remote or the next session after the session being worked on. That is never NULL - we are not there yet.

A corner case is running the last session and bailing due to a time constraint. That is too difficult to push up so that the code skips to next remote, so in this case you grind your teeth and rerun all sessions for this remote.

There is a need for an extra "only one session" check as you noted above.

>
>> +        } else {
>> +            /* We are within time constraints, zero
>> +             * the session we need to skip to in order to
>> +             * restart processing.
>> +             */
>> +            remote->skip_to = NULL;
>> +        }
>>       }
>>   }
>>   
>> diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
>> index e0653aa39..218152e9d 100644
>> --- a/ovsdb/jsonrpc-server.h
>> +++ b/ovsdb/jsonrpc-server.h
>> @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status(
>>   void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool force,
>>                                       char *comment);
>>   
>> -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
>> +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit);
>>   void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
>>   
>>   void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index b09232c65..3f0b825a3 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -184,6 +184,7 @@ main_loop(struct server_config *config,
>>       char *remotes_error, *ssl_error;
>>       struct shash_node *node;
>>       long long int status_timer = LLONG_MIN;
>> +    uint64_t limit = UINT64_MAX;
>>   
>>       *exiting = false;
>>       ssl_error = NULL;
>> @@ -215,7 +216,7 @@ main_loop(struct server_config *config,
>>               reconfigure_remotes(jsonrpc, all_dbs, remotes),
>>               &remotes_error);
>>           report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
>> -        ovsdb_jsonrpc_server_run(jsonrpc);
>> +        ovsdb_jsonrpc_server_run(jsonrpc, limit);
>>   
>>           if (*is_backup) {
>>               replication_run();
>> @@ -225,11 +226,32 @@ main_loop(struct server_config *config,
>>               }
>>           }
>>   
>> +        /* figure out current processing time limit */
> Nit: This should start with capital letters and end with '.'.
>
>> +
>> +        bool first_db = true;
>> +        SHASH_FOR_EACH (node, all_dbs) {
>> +            struct db *db = node->data;
>> +            uint64_t db_limit;
>> +
>> +            db_limit = max_processing_time(db->db->storage);
>> +            if (first_db) {
>> +                /* reset the limit */
>> +                limit = db_limit;
>> +                first_db = false;
>> +            }
>> +            limit = MIN(db_limit, limit);
>> +        }
> Unless I'm missing something this can be simplified to:
>
> limit = UINT64_MAX;
> SHASH_FOR_EACH (node, all_dbs) {
>      struct db *db = node->data;
>      limit = MIN(max_processing_time(db->db->storage), limit);
> }
>
>> +        if (ovs_replay_is_active()) {
>> +            limit = UINT64_MAX;
>> +        }
>> +
>>           struct shash_node *next;
>>           SHASH_FOR_EACH_SAFE (node, next, all_dbs) {
>>               struct db *db = node->data;
>> +
> This is unrelated.
>
>>               ovsdb_txn_history_run(db->db);
>>               ovsdb_storage_run(db->db->storage);
>> +
> Same here.
>
>>               read_db(config, db);
>>               /* Run triggers after storage_run and read_db to make sure new raft
>>                * updates are utilized in current iteration. */
>> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
>> index 5bb901fd4..e33c09390 100644
>> --- a/ovsdb/raft.c
>> +++ b/ovsdb/raft.c
>> @@ -407,6 +407,11 @@ raft_make_address_passive(const char *address_)
>>       }
>>   }
>>   
>> +uint64_t raft_election_timer_value(const struct raft *raft)
>> +{
>> +    return raft->election_timer;
>> +}
>> +
>>   static struct raft *
>>   raft_alloc(void)
>>   {
>> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
>> index 3545c41c2..1d270ec0c 100644
>> --- a/ovsdb/raft.h
>> +++ b/ovsdb/raft.h
>> @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *);
>>   void raft_transfer_leadership(struct raft *, const char *reason);
>>   
>>   const struct uuid *raft_current_eid(const struct raft *);
>> +
>> +uint64_t raft_election_timer_value(const struct raft *);
>> +
>>   #endif /* lib/raft.h */
>> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
>> index 40415fcf6..9b4f4c7eb 100644
>> --- a/ovsdb/storage.c
>> +++ b/ovsdb/storage.c
>> @@ -640,3 +640,14 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
>>       }
>>       return raft_current_eid(storage->raft);
>>   }
>> +
>> +uint64_t max_processing_time(struct ovsdb_storage *storage)
> Ilya already commented on this on v2:
>
> "Name of the function should start from a new line since it's not
> just a prototype."
>
>> +{
>> +    if (!storage->raft) {
>> +        return UINT64_MAX;
>> +    }
>> +    if (raft_election_timer_value(storage->raft) > 2) {
>> +        return raft_election_timer_value(storage->raft) / 2;
>> +    }
>> +    return 1;
>> +}
>> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
>> index 02b6e7e6c..9e195bbe8 100644
>> --- a/ovsdb/storage.h
>> +++ b/ovsdb/storage.h
>> @@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct ovsdb_storage *);
>>   
>>   const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *);
>>   
>> +uint64_t max_processing_time(struct ovsdb_storage *);
> To be aligned with the current style this should probably be:
>
> uint64_t ovsdb_storage_get_max_processing_time(struct ovsdb_storage *);
>
>> +
>>   #endif /* ovsdb/storage.h */
>>
> Thanks,
> Dumitru
>
>
-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/



More information about the dev mailing list