[ovs-dev] [PATCH v7] ovsdb: provide raft and command interfaces with priority

Ilya Maximets i.maximets at ovn.org
Wed Aug 11 10:49:50 UTC 2021


On 8/11/21 12:06 AM, Ilya Maximets wrote:
> On 7/27/21 10:21 AM, anton.ivanov at cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> Set a soft time limit of "raft election timer"/2 on ovsdb
>> processing.
>>
>> This improves behaviour in large heavily loaded clusters.
>> While it cannot fully eliminate spurious raft elections
>> under heavy load, it significantly decreases their number.
>>
>> Processing is (to the extent possible) restarted where it
>> stopped on the previous iteration to ensure that sessions
>> towards the tail of the session list are not starved.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>> ---
>>  ovsdb/jsonrpc-server.c | 90 ++++++++++++++++++++++++++++++++++++++++--
>>  ovsdb/jsonrpc-server.h |  2 +-
>>  ovsdb/ovsdb-server.c   | 16 +++++++-
>>  ovsdb/raft.c           |  6 +++
>>  ovsdb/raft.h           |  3 ++
>>  ovsdb/storage.c        | 12 ++++++
>>  ovsdb/storage.h        |  2 +
>>  7 files changed, 125 insertions(+), 6 deletions(-)
>>
>> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
>> index 351c39d8a..ea82d4161 100644
>> --- a/ovsdb/jsonrpc-server.c
>> +++ b/ovsdb/jsonrpc-server.c
>> @@ -60,7 +60,8 @@ static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
>>      struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *, bool);
>>  static void ovsdb_jsonrpc_session_preremove_db(struct ovsdb_jsonrpc_remote *,
>>                                                 struct ovsdb *);
>> -static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
>> +static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *,
>> +                                          uint64_t limit);
>>  static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
>>  static void ovsdb_jsonrpc_session_get_memory_usage_all(
>>      const struct ovsdb_jsonrpc_remote *, struct simap *usage);
>> @@ -128,6 +129,11 @@ struct ovsdb_jsonrpc_server {
>>      bool read_only;            /* This server is does not accept any
>>                                    transactions that can modify the database. */
>>      struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
>> +    struct ovsdb_jsonrpc_remote *skip_to; /* Pointer to remote where processing
>> +                                             should restart after a time
>> +                                             constraint interruption. */
>> +    bool must_wake_up; /* The processing loop must be re-run. It was
>> +                          interrupted due to exceeding a time constraint. */
>>  };
>>  
>>  /* A configured remote.  This is either a passive stream listener plus a list
>> @@ -137,6 +143,9 @@ struct ovsdb_jsonrpc_remote {
>>      struct ovsdb_jsonrpc_server *server;
>>      struct pstream *listener;   /* Listener, if passive. */
>>      struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
>> +    struct ovsdb_jsonrpc_session *skip_to; /* Session at which processing
>> +                                              should restart after an
>> +                                              interruption. */
>>      uint8_t dscp;
>>      bool read_only;
>>      char *role;
>> @@ -279,6 +288,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
>>      remote->dscp = options->dscp;
>>      remote->read_only = options->read_only;
>>      remote->role = nullable_xstrdup(options->role);
>> +    remote->skip_to = NULL;
>>      shash_add(&svr->remotes, name, remote);
>>  
>>      if (!listener) {
>> @@ -293,6 +303,9 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
>>  {
>>      struct ovsdb_jsonrpc_remote *remote = node->data;
>>  
>> +    /* The safest option is to rerun all remotes. */
>> +    remote->server->skip_to = NULL;
>> +
>>      ovsdb_jsonrpc_session_close_all(remote);
>>      pstream_close(remote->listener);
>>      shash_delete(&remote->server->remotes, node);
>> @@ -378,12 +391,24 @@ ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *svr,
>>  }
>>  
>>  void
>> -ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>> +ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr, uint64_t limit)
>>  {
>>      struct shash_node *node;
>> +    uint64_t elapsed = 0;
>> +    uint64_t start_time = time_msec();
>> +
>> +    svr->must_wake_up = false;
>>  
>>      SHASH_FOR_EACH (node, &svr->remotes) {
>>          struct ovsdb_jsonrpc_remote *remote = node->data;
>> +        if (svr->skip_to) {
>> +            if (remote != svr->skip_to) {
>> +                continue;
>> +            } else {
>> +                svr->skip_to = NULL;
>> +                svr->must_wake_up = true;
>> +            }
>> +        }
>>  
>>          if (remote->listener) {
>>              struct stream *stream;
>> @@ -403,7 +428,17 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
>>              }
>>          }
>>  
>> -        ovsdb_jsonrpc_session_run_all(remote);
>> +        /* We assume accept and session creation time to be
>> +         * negligible for the purposes of computing timeouts.
>> +         */
>> +        ovsdb_jsonrpc_session_run_all(remote, limit - elapsed);
>> +
>> +        elapsed = time_msec() - start_time;
>> +        if (elapsed > limit) {
>> +            svr->must_wake_up = true;
>> +            svr->skip_to = remote;
>> +            break;
>> +        }
>>      }
>>  }
>>  
>> @@ -412,6 +447,16 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
>>  {
>>      struct shash_node *node;
>>  
>> +    if (svr->must_wake_up) {
>> +        /* We have stopped processing due to a time constraint.
>> +         * In this case there is no point to walk all sessions
>> +         * and rebuild the poll structure for the poll loop.
> 
> The last sentence seems redundant.  We need to wake up because there
> is work that we didn't finish.  This module should not reference
> internals of the poll_loop.
> 
>> +         */
>> +        poll_immediate_wake();
>> +        svr->must_wake_up = false;
>> +        return;
>> +    }
>> +
>>      SHASH_FOR_EACH (node, &svr->remotes) {
>>          struct ovsdb_jsonrpc_remote *remote = node->data;
>>  
>> @@ -583,15 +628,52 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
>>  }
>>  
>>  static void
>> -ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
>> +ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
>> +                              uint64_t limit)
>>  {
>>      struct ovsdb_jsonrpc_session *s, *next;
>> +    uint64_t start_time = time_msec();
>>  
>>      LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
>> +        if (remote->skip_to && s != remote->skip_to) {
>> +            /* Processing was interrupted, we skip to the point
>> +             * where we had to interrupt it.
>> +             * We cannot use the _CONTINUE macro as it is not safe
>> +             * if the list has been changed in the meantime.
>> +             */
>> +            continue;
>> +        }
>> +
>> +        /* Set ->next as skip point if we need to restart processing.
>> +         * This way, even if current is removed, we always have a
>> +         * valid pointer to continue processing.
> 
> Hmm, but what if the 'next' will be removed?  In this case we will not
> have a valid pointer.

Clarification:  I mean the case where it gets removed outside of this loop,
e.g. by the ovsdb_jsonrpc_session_reconnect_all().

> 
>> +         */
>> +        remote->skip_to =
>> +            CONTAINER_OF(ovs_list_front(&s->node),
>> +                         struct ovsdb_jsonrpc_session, node);
>> +
>> +        if (remote->skip_to == s) {
>> +            /* The list is a singleton. This is a special case, where
>> +             * deleting the node will invalidate the pointer.
>> +             */
>> +            remote->skip_to = NULL;
> 
> Assuming we have several remotes and one of them has only one session,
> but this session is extremely heavy in a way that their processing
> always take longer than limit (this can be a session that spams
> with monitor requests or constantly changes conditions that triggers
> the walk across the database, or something else).  So, at the exit
> of ovsdb_jsonrpc_session_run_all(), remote->skip_to will be NULL.
> And ovsdb_jsonrpc_server_run() will have svr->skip_to set to the
> same remote.  This way ovsdb-server will serve only this one hungry
> session and will never serve any other clients on other remotes.

This thing is tricky and the whole 'skip_to' concept seems to be
too fragile.  But since we have a list here, how about re-ordering
it instead of skipping elements?  Something like this (not tested):

@@ -583,14 +628,36 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
 }
 
 static void
-ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
-{
-    struct ovsdb_jsonrpc_session *s, *next;
+ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote,
+                              uint64_t limit)
+{
+    struct ovsdb_jsonrpc_session *s, *first = NULL;
+    uint64_t start_time = time_msec();
+
+    while (!ovs_list_is_empty(&remote->sessions)) {
+        s = CONTAINER_OF(ovs_list_front(&remote->sessions),
+                         struct ovsdb_jsonrpc_session, node);
+        if (s == first) {
+            /* All sessions are processed. */
+            break;
+        }
 
-    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
         int error = ovsdb_jsonrpc_session_run(s);
         if (error) {
             ovsdb_jsonrpc_session_close(s);
+        } else {
+            /* This session was processed, moving it to the end of a list. */
+            ovs_list_remove(&s->node);
+            ovs_list_push_back(&remote->sessions, &s->node);
+            if (!first) {
+                first = s;
+            }
+        }
+
+        if (time_msec() - start_time > limit) {
+            /* Time is up.  Since the list is re-ordered, next time the loop
+             * will start from where it was left.  */
+            break;
         }
     }
 }
---

This change alone will not help, but we can do the same for 'remotes' loop.
'remotes' is an shash, so we can change the order there, but we can add
a list that holds all the same elements and iterate over it, e.g.:

@@ -128,6 +129,10 @@ struct ovsdb_jsonrpc_server {
     bool read_only;            /* This server is does not accept any
                                   transactions that can modify the database. */
     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
+    struct ovs_list remotes_list; /* Same as 'remotes', but list.  To keep
+                                   * the iteration order. */
+    bool must_wake_up; /* The processing loop must be re-run. It was
+                          interrupted due to exceeding a time constraint. */
 };
 
 /* A configured remote.  This is either a passive stream listener plus a list
@@ -137,6 +142,7 @@ struct ovsdb_jsonrpc_remote {
     struct ovsdb_jsonrpc_server *server;
     struct pstream *listener;   /* Listener, if passive. */
     struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
+    struct ovs_list node;       /* Element in servers's "remotes_list". */
     uint8_t dscp;
     bool read_only;
     char *role;
--

When something is added to shash -- add to the list.  Same for removing.
Using this list exactly same iteration schema can be implemented inside
ovsdb_jsonrpc_server_run().  i.e. move processed remote to the end.
We will need to move partially processed remote to the end of the list
too, but this should not be too bad in terms of procesing fairness.
More importantly, I think, this schema will not allow any cases where we
can stuck processing the same remote or session forverer.  Additionally,
we will not need to have an extra wake up to process skipped sessions,
because the loop will always iterate over all the remotes and sessions.

What do you think?

> 
>> +        }
>> +
>>          int error = ovsdb_jsonrpc_session_run(s);
>>          if (error) {
>>              ovsdb_jsonrpc_session_close(s);
>>          }
>> +
>> +        if (time_msec() - start_time > limit) {
>> +            /* We bail leaving skip_to set. Next processing iteration
>> +             * will skip everything up to skip_to which was set to
>> +             * ->next earlier on.
>> +             */
>> +            break;
>> +        } else {
>> +            /* We are within time constraints, clear skip_to. */
>> +            remote->skip_to = NULL;
>> +        }
>>      }
>>  }
>>  
>> diff --git a/ovsdb/jsonrpc-server.h b/ovsdb/jsonrpc-server.h
>> index e0653aa39..218152e9d 100644
>> --- a/ovsdb/jsonrpc-server.h
>> +++ b/ovsdb/jsonrpc-server.h
>> @@ -67,7 +67,7 @@ void ovsdb_jsonrpc_server_free_remote_status(
>>  void ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *, bool force,
>>                                      char *comment);
>>  
>> -void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
>> +void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *, uint64_t limit);
>>  void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
>>  
>>  void ovsdb_jsonrpc_server_set_read_only(struct ovsdb_jsonrpc_server *,
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index 0b3d2bb71..ed43da968 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -216,7 +216,21 @@ main_loop(struct server_config *config,
>>              reconfigure_remotes(jsonrpc, all_dbs, remotes),
>>              &remotes_error);
>>          report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
>> -        ovsdb_jsonrpc_server_run(jsonrpc);
>> +
>> +        /* Figure out current processing time limit. */
>> +        uint64_t limit = UINT64_MAX;
>> +        SHASH_FOR_EACH (node, all_dbs) {
>> +            struct db *db = node->data;
>> +            uint64_t db_limit;
>> +
>> +            db_limit = ovsdb_storage_max_processing_time(db->db->storage);
>> +            limit = MIN(db_limit, limit);
>> +        }
>> +        if (ovs_replay_is_active()) {
>> +            limit = UINT64_MAX;
>> +        }
> 
> Suggesting to move this code to a separate function, e.g.
> 
> static uint64_t *get_max_processing_time(const struct shash *all_dbs);
> 
> And only leave a call here:
> 
>            uint64_t limit = get_max_processing_time(all_dbs);
> 
>> +
>> +        ovsdb_jsonrpc_server_run(jsonrpc, limit);
>>  
>>          if (*is_backup) {
>>              replication_run();
>> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
>> index 2fb515651..183463aba 100644
>> --- a/ovsdb/raft.c
>> +++ b/ovsdb/raft.c
>> @@ -407,6 +407,12 @@ raft_make_address_passive(const char *address_)
>>      }
>>  }
>>  
>> +uint64_t
>> +raft_get_election_timer(const struct raft *raft)
>> +{
>> +    return raft->election_timer;
>> +}
>> +
>>  static struct raft *
>>  raft_alloc(void)
>>  {
>> diff --git a/ovsdb/raft.h b/ovsdb/raft.h
>> index 3545c41c2..575e7f609 100644
>> --- a/ovsdb/raft.h
>> +++ b/ovsdb/raft.h
>> @@ -188,4 +188,7 @@ void raft_take_leadership(struct raft *);
>>  void raft_transfer_leadership(struct raft *, const char *reason);
>>  
>>  const struct uuid *raft_current_eid(const struct raft *);
>> +
>> +uint64_t raft_get_election_timer(const struct raft *);
>> +
>>  #endif /* lib/raft.h */
>> diff --git a/ovsdb/storage.c b/ovsdb/storage.c
>> index d727b1eac..58018fe6d 100644
>> --- a/ovsdb/storage.c
>> +++ b/ovsdb/storage.c
>> @@ -647,3 +647,15 @@ ovsdb_storage_peek_last_eid(struct ovsdb_storage *storage)
>>      }
>>      return raft_current_eid(storage->raft);
>>  }
>> +
>> +uint64_t
>> +ovsdb_storage_max_processing_time(struct ovsdb_storage *storage)
> 
> Can it be 'const struct ovsdb_storage *storage' ?
> 
>> +{
>> +    if (!storage->raft) {
>> +        return UINT64_MAX;
>> +    }
>> +    if (raft_get_election_timer(storage->raft) > 2) {
>> +        return raft_get_election_timer(storage->raft) / 2;
>> +    }
>> +    return 1;
>> +}
>> diff --git a/ovsdb/storage.h b/ovsdb/storage.h
>> index e120094d7..a8a02e0bd 100644
>> --- a/ovsdb/storage.h
>> +++ b/ovsdb/storage.h
>> @@ -97,4 +97,6 @@ struct ovsdb_schema *ovsdb_storage_read_schema(struct ovsdb_storage *);
>>  
>>  const struct uuid *ovsdb_storage_peek_last_eid(struct ovsdb_storage *);
>>  
>> +uint64_t ovsdb_storage_max_processing_time(struct ovsdb_storage *);
>> +
>>  #endif /* ovsdb/storage.h */
>>
> 



More information about the dev mailing list