[ovs-dev] [PATCH v7] ovsdb: provide raft and command interfaces with priority

Ilya Maximets i.maximets at ovn.org
Wed Aug 11 10:52:47 UTC 2021


On 8/11/21 12:49 PM, Ilya Maximets wrote:
> On 8/11/21 12:06 AM, Ilya Maximets wrote:
>> On 7/27/21 10:21 AM, anton.ivanov at cambridgegreys.com wrote:
>>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>>
>>> Set a soft time limit of "raft election timer"/2 on ovsdb
>>> processing.
>>>
>>> This improves behaviour in large heavily loaded clusters.
>>> While it cannot fully eliminate spurious raft elections
>>> under heavy load, it significantly decreases their number.
>>>
>>> Processing is (to the extent possible) restarted where it
>>> stopped on the previous iteration to ensure that sessions
>>> towards the tail of the session list are not starved.
>>>
>>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>> ---
>>>  ovsdb/jsonrpc-server.c | 90 ++++++++++++++++++++++++++++++++++++++++--
>>>  ovsdb/jsonrpc-server.h |  2 +-
>>>  ovsdb/ovsdb-server.c   | 16 +++++++-
>>>  ovsdb/raft.c           |  6 +++
>>>  ovsdb/raft.h           |  3 ++
>>>  ovsdb/storage.c        | 12 ++++++
>>>  ovsdb/storage.h        |  2 +
>>>  7 files changed, 125 insertions(+), 6 deletions(-)
>>>
>>> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
>>> index 351c39d8a..ea82d4161 100644
>>> --- a/ovsdb/jsonrpc-server.c
>>> +++ b/ovsdb/jsonrpc-server.c
>>> @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
>>>      struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
>>>  static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
>>>                                                 struct ovsdb *);
>>> -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
>>> +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *,
>>> +                                          uint64_t limit);
>>>  static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
>>>  static void ovsdb_jsonrpc_session_get_memory_usage_all(
>>>      const struct ovsdb_jsonrpc_remote *, struct simap *usage);
>>> @@ -128,6 +129,11 @@ struct ovsdb_jsonrpc_server {
>>>      bool read_only;            /* This server is does not accept any
>>>                                    transactions that can modify the database. */
>>>      struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
>>> +    struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where processing
>>> +                                             should restart after a time
>>> +                                             constraint interruption. */
>>> +    bool must_wake_up; /* The processing loop must be re-run. It was
>>> +                          interrupted due to exceeding a time constraint. */
>>>  };
>>>  
>>>  /* A configured remote.  This is either a passive stream listener plus a list
>>> @@ -137,6 +143,9 @@ struct ovsdb_jsonrpc_remote {
>>>      struct ovsdb_jsonrpc_server *server;
>>>      struct pstream *listener;   /* Listener, if passive. */
>>>      struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
>>> +    struct ovsdb_jsonrpc_session *skip_to; /* Session at which processing
>>> +                                              should restart after an
>>> +                                              interruption. */
>>>      uint8_t dscp;
>>>      bool read_only;
>>>      char *role;
>>> @@ -279,6 +288,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
>>>      remote->dscp = options->dscp;
>>>      remote->read_only = options->read_only;
>>>      remote->role = nullable_xstrdup(options->role);
>>> +    remote->skip_to = NULL;
>>>      shash_add(&svr->remotes, name, remote);
>>>  
>>>      if (!listener) {
>>> @@ -293,6 +303,9 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
>>>  {
>>>      struct ovsdb_jsonrpc_remote *remote = node->data;
>>>  
>>> +    /* The safest option is to rerun all remotes. */
>>> +    remote->server->skip_to = NULL;
>>> +
>>>      ovsdb_jsonrpc_session_close_all(remote);
>>>      pstream_close(remote->listener);
>>>      shash_delete(&remote->server->remotes, node);
>>> @@ -378,12 +391,24 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
>>>  }
>>>  
>>>  void
>>> -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>>> +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit)
>>>  {
>>>      struct shash_node *node;
>>> +    uint64_t elapsed = 0;
>>> +    uint64_t start_time = time_msec();
>>> +
>>> +    svr->must_wake_up = false;
>>>  
>>>      SHASH_FOR_EACH (node, &svr->remotes) {
>>>          struct ovsdb_jsonrpc_remote *remote = node->data;
>>> +        if (svr->skip_to) {
>>> +            if (remote != svr->skip_to) {
>>> +                continue;
>>> +            } else {
>>> +                svr->skip_to = NULL;
>>> +                svr->must_wake_up = true;
>>> +            }
>>> +        }
>>>  
>>>          if (remote->listener) {
>>>              struct stream *stream;
>>> @@ -403,7 +428,17 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>>>              }
>>>          }
>>>  
>>> -        ovsdb_jsonrpc_session_run_all(remote);
>>> +        /* We assume accept and session creation time to be
>>> +         * negligible for the purposes of computing timeouts.
>>> +         */
>>> +        ovsdb_jsonrpc_session_run_all(remote, limit - elapsed);
>>> +
>>> +        elapsed = time_msec() - start_time;
>>> +        if (elapsed > limit) {
>>> +            svr->must_wake_up = true;
>>> +            svr->skip_to = remote;
>>> +            break;
>>> +        }
>>>      }
>>>  }
>>>  
>>> @@ -412,6 +447,16 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
>>>  {
>>>      struct shash_node *node;
>>>  
>>> +    if (svr->must_wake_up) {
>>> +        /* We have stopped processing due to a time constraint.
>>> +         * In this case there is no point to walk all sessions
>>> +         * and rebuild the poll structure for the poll loop.
>>
>> The last sentence seems redundant.  We need to wake up because there
>> is work that we didn't finish.  This module should not reference
>> internals of the poll_loop.
>>
>>> +         */
>>> +        poll_immediate_wake();
>>> +        svr->must_wake_up = false;
>>> +        return;
>>> +    }
>>> +
>>>      SHASH_FOR_EACH (node, &svr->remotes) {
>>>          struct ovsdb_jsonrpc_remote *remote = node->data;
>>>  
>>> @@ -583,15 +628,52 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
>>>  }
>>>  
>>>  static void
>>> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
>>> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
>>> +                              uint64_t limit)
>>>  {
>>>      struct ovsdb_jsonrpc_session *s, *next;
>>> +    uint64_t start_time = time_msec();
>>>  
>>>      LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
>>> +        if (remote->skip_to && s != remote->skip_to) {
>>> +            /* Processing was interrupted, we skip to the point
>>> +             * where we had to interrupt it.
>>> +             * We cannot use the _CONTINUE macro as it is not safe
>>> +             * if the list has been changed in the meantime.
>>> +             */
>>> +            continue;
>>> +        }
>>> +
>>> +        /* Set ->next as skip point if we need to restart processing.
>>> +         * This way, even if current is removed, we always have a
>>> +         * valid pointer to continue processing.
>>
>> Hmm, but what if the 'next' will be removed?  In this case we will not
>> have a valid pointer.
> 
> Clarification:  I mean the case where it gets removed outside of this loop,
> e.g. by the ovsdb_jsonrpc_session_reconnect_all().
> 
>>
>>> +         */
>>> +        remote->skip_to =
>>> +            CONTAINER_OF(ovs_list_front(&s->node),
>>> +                         struct ovsdb_jsonrpc_session, node);
>>> +
>>> +        if (remote->skip_to == s) {
>>> +            /* The list is a singleton. This is a special case, where
>>> +             * deleting the node will invalidate the pointer.
>>> +             */
>>> +            remote->skip_to = NULL;
>>
>> Assuming we have several remotes and one of them has only one session,
>> but this session is extremely heavy in a way that their processing
>> always take longer than limit (this can be a session that spams
>> with monitor requests or constantly changes conditions that triggers
>> the walk across the database, or something else).  So, at the exit
>> of ovsdb_jsonrpc_session_run_all(), remote->skip_to will be NULL.
>> And ovsdb_jsonrpc_server_run() will have svr->skip_to set to the
>> same remote.  This way ovsdb-server will serve only this one hungry
>> session and will never serve any other clients on other remotes.
> 
> This thing is tricky and the whole 'skip_to' concept seems to be
> too fragile.  But since we have a list here, how about re-ordering
> it instead of skipping elements?  Something like this (not tested):
> 
> @@ -583,14 +628,36 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
>  }
>  
>  static void
> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
> -{
> -    struct ovsdb_jsonrpc_session *s, *next;
> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
> +                              uint64_t limit)
> +{
> +    struct ovsdb_jsonrpc_session *s, *first = NULL;
> +    uint64_t start_time = time_msec();
> +
> +    while (!ovs_list_is_empty(&remote->sessions)) {
> +        s = CONTAINER_OF(ovs_list_front(&remote->sessions),
> +                         struct ovsdb_jsonrpc_session, node);
> +        if (s == first) {
> +            /* All sessions are processed. */
> +            break;
> +        }
>  
> -    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
>          int error = ovsdb_jsonrpc_session_run(s);
>          if (error) {
>              ovsdb_jsonrpc_session_close(s);
> +        } else {
> +            /* This session was processed, moving it to the end of a list. */
> +            ovs_list_remove(&s->node);
> +            ovs_list_push_back(&remote->sessions, &s->node);
> +            if (!first) {
> +                first = s;
> +            }
> +        }
> +
> +        if (time_msec() - start_time > limit) {
> +            /* Time is up.  Since the list is re-ordered, next time the loop
> +             * will start from where it was left.  */
> +            break;
>          }
>      }
>  }
> ---
> 
> This change alone will not help, but we can do the same for 'remotes' loop.
> 'remotes' is an shash, so we can change the order there, but we can add

* so we can't change the order there, but ...

> a list that holds all the same elements and iterate over it, e.g.:
> 
> @@ -128,6 +129,10 @@ struct ovsdb_jsonrpc_server {
>      bool read_only;            /* This server is does not accept any
>                                    transactions that can modify the database. */
>      struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
> +    struct ovs_list remotes_list; /* Same as 'remotes', but list.  To keep
> +                                   * the iteration order. */
> +    bool must_wake_up; /* The processing loop must be re-run. It was
> +                          interrupted due to exceeding a time constraint. */
>  };
>  
>  /* A configured remote.  This is either a passive stream listener plus a list
> @@ -137,6 +142,7 @@ struct ovsdb_jsonrpc_remote {
>      struct ovsdb_jsonrpc_server *server;
>      struct pstream *listener;   /* Listener, if passive. */
>      struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
> +    struct ovs_list node;       /* Element in servers's "remotes_list". */
>      uint8_t dscp;
>      bool read_only;
>      char *role;
> --
> 
> When something is added to shash -- add to the list.  Same for removing.
> Using this list exactly same iteration schema can be implemented inside
> ovsdb_jsonrpc_server_run().  i.e. move processed remote to the end.
> We will need to move partially processed remote to the end of the list
> too, but this should not be too bad in terms of procesing fairness.
> More importantly, I think, this schema will not allow any cases where we
> can stuck processing the same remote or session forverer.  Additionally,
> we will not need to have an extra wake up to process skipped sessions,
> because the loop will always iterate over all the remotes and sessions.
> 
> What do you think?
> 
>>
>>> +        }
>>> +
>>>          int error = ovsdb_jsonrpc_session_run(s);
>>>          if (error) {
>>>              ovsdb_jsonrpc_session_close(s);
>>>          }
>>> +
>>> +        if (time_msec() - start_time > limit) {
>>> +            /* We bail leaving skip_to set. Next processing iteration
>>> +             * will skip everything up to skip_to which was set to
>>> +             * ->next earlier on.
>>> +             */
>>> +            break;
>>> +        } else {
>>> +            /* We are within time constraints, clear skip_to. */
>>> +            remote->skip_to = NULL;
>>> +        }
>>>      }
>>>  }
>>>  
>>> diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
>>> index e0653aa39..218152e9d 100644
>>> --- a/ovsdb/jsonrpc-server.h
>>> +++ b/ovsdb/jsonrpc-server.h
>>> @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status(
>>>  void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool force,
>>>                                      char *comment);
>>>  
>>> -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
>>> +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit);
>>>  void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
>>>  
>>>  void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
>>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>>> index 0b3d2bb71..ed43da968 100644
>>> --- a/ovsdb/ovsdb-server.c
>>> +++ b/ovsdb/ovsdb-server.c
>>> @@ -216,7 +216,21 @@ main_loop(struct server_config *config,
>>>              reconfigure_remotes(jsonrpc, all_dbs, remotes),
>>>              &remotes_error);
>>>          report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
>>> -        ovsdb_jsonrpc_server_run(jsonrpc);
>>> +
>>> +        /* Figure out current processing time limit. */
>>> +        uint64_t limit = UINT64_MAX;
>>> +        SHASH_FOR_EACH (node, all_dbs) {
>>> +            struct db *db = node->data;
>>> +            uint64_t db_limit;
>>> +
>>> +            db_limit = ovsdb_storage_max_processing_time(db->db->storage);
>>> +            limit = MIN(db_limit, limit);
>>> +        }
>>> +        if (ovs_replay_is_active()) {
>>> +            limit = UINT64_MAX;
>>> +        }
>>
>> Suggesting to move this code to a separate function, e.g.
>>
>> static uint64_t *get_max_processing_time(const struct shash *all_dbs);
>>
>> And only leave a call here:
>>
>>            uint64_t limit = get_max_processing_time(all_dbs);
>>
>>> +
>>> +        ovsdb_jsonrpc_server_run(jsonrpc, limit);
>>>  
>>>          if (*is_backup) {
>>>              replication_run();
>>> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
>>> index 2fb515651..183463aba 100644
>>> --- a/ovsdb/raft.c
>>> +++ b/ovsdb/raft.c
>>> @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_)
>>>      }
>>>  }
>>>  
>>> +uint64_t
>>> +raft_get_election_timer(const struct raft *raft)
>>> +{
>>> +    return raft->election_timer;
>>> +}
>>> +
>>>  static struct raft *
>>>  raft_alloc(void)
>>>  {
>>> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
>>> index 3545c41c2..575e7f609 100644
>>> --- a/ovsdb/raft.h
>>> +++ b/ovsdb/raft.h
>>> @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *);
>>>  void raft_transfer_leadership(struct raft *, const char *reason);
>>>  
>>>  const struct uuid *raft_current_eid(const struct raft *);
>>> +
>>> +uint64_t raft_get_election_timer(const struct raft *);
>>> +
>>>  #endif /* lib/raft.h */
>>> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
>>> index d727b1eac..58018fe6d 100644
>>> --- a/ovsdb/storage.c
>>> +++ b/ovsdb/storage.c
>>> @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
>>>      }
>>>      return raft_current_eid(storage->raft);
>>>  }
>>> +
>>> +uint64_t
>>> +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage)
>>
>> Can it be 'const struct ovsdb_storage *storage' ?
>>
>>> +{
>>> +    if (!storage->raft) {
>>> +        return UINT64_MAX;
>>> +    }
>>> +    if (raft_get_election_timer(storage->raft) > 2) {
>>> +        return raft_get_election_timer(storage->raft) / 2;
>>> +    }
>>> +    return 1;
>>> +}
>>> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
>>> index e120094d7..a8a02e0bd 100644
>>> --- a/ovsdb/storage.h
>>> +++ b/ovsdb/storage.h
>>> @@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct ovsdb_storage *);
>>>  
>>>  const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *);
>>>  
>>> +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *);
>>> +
>>>  #endif /* ovsdb/storage.h */
>>>
>>
> 



More information about the dev mailing list