[ovs-dev] [PATCH v2 1/2] raft: Set threshold on backlog for raft connections.

Ilya Maximets i.maximets at ovn.org
Tue Nov 3 15:53:49 UTC 2020


RAFT messages could be fairly big.  If something abnormal happens to
one of the servers in a cluster it may not be able to process all the
incoming messages in a timely manner.  This results in jsonrpc backlog
growth on the sender's side.  For example if follower gets many new
clients at once that it needs to serve, or it decides to take a
snapshot in a period of high number of database changes.
If backlog grows large enough it becomes harder and harder for follower
to process incoming raft messages, it sends outdated replies and
starts receiving snapshots and the whole raft log from the leader.
Sometimes backlog grows too high (60GB in this example):

      jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>,
                   num of msgs: 15370, backlog: 61731060773.

In this case OS might actually decide to kill the sender to free some
memory.  Anyway, It could take a lot of time for such a server to catch
up with the rest of the cluster if it has so much data to receive and
process.

Introducing backlog thresholds for jsonrpc connections.
If sending backlog will exceed particular values (500 messages or
4GB in size), connection will be dropped and re-created.  This will
allow to drop all the current backlog and start over increasing
chances of cluster recovery.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829
Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
---
 NEWS          |  2 ++
 lib/jsonrpc.c | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/jsonrpc.h |  6 ++++++
 ovsdb/raft.c  |  5 +++++
 4 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/NEWS b/NEWS
index 2860a8e9c..ebdf8758b 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,8 @@ Post-v2.14.0
      * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
        If turned on, ovsdb-server will try to reclaim all the unused memory
        after every DB compaction back to OS.  Disabled by default.
+     * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
+       Once threshold reached, connection is dropped (and re-established).
    - DPDK:
      * Removed support for vhost-user dequeue zero-copy.
    - The environment variable OVS_UNBOUND_CONF, if set, is now used
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ecbc939fe..eb611018d 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -50,6 +50,10 @@ struct jsonrpc {
     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
     size_t output_count;        /* Number of elements in "output". */
     size_t backlog;
+
+    /* Limits. */
+    size_t max_output;          /* 'output_count' disconnection threshold. */
+    size_t max_backlog;         /* 'backlog' disconnection threshold. */
 };
 
 /* Rate limit for error messages. */
@@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
     return rpc->status ? 0 : rpc->backlog;
 }
 
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
+ * will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+                              size_t max_n_msgs, size_t max_backlog_bytes)
+{
+    rpc->max_output = max_n_msgs;
+    rpc->max_backlog = max_backlog_bytes;
+}
+
 /* Returns the number of bytes that have been received on 'rpc''s underlying
  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
 unsigned int
@@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     rpc->backlog += length;
 
     if (rpc->output_count >= 50) {
-        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+        bool disconnect = false;
+
+        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
                      " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
                      rpc->output_count, rpc->backlog);
+        if (rpc->max_output && rpc->output_count > rpc->max_output) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+                      rpc->output_count, rpc->max_output, rpc->name);
+        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+                      rpc->backlog, rpc->max_backlog, rpc->name);
+        }
+        if (disconnect) {
+            jsonrpc_error(rpc, E2BIG);
+        }
     }
 
     if (rpc->backlog == length) {
@@ -787,6 +819,10 @@ struct jsonrpc_session {
     int last_error;
     unsigned int seqno;
     uint8_t dscp;
+
+    /* Limits for jsonrpc. */
+    size_t max_n_msgs;
+    size_t max_backlog_bytes;
 };
 
 static void
@@ -842,6 +878,8 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
     s->dscp = 0;
     s->last_error = 0;
 
+    jsonrpc_session_set_backlog_threshold(s, 0, 0);
+
     const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
         reconnect_set_passive(s->reconnect, true, time_msec());
@@ -882,6 +920,7 @@ jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
     s->pstream = NULL;
     s->seqno = 1;
 
+    jsonrpc_session_set_backlog_threshold(s, 0, 0);
     return s;
 }
 
@@ -970,6 +1009,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
             }
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->seqno++;
         } else if (error != EAGAIN) {
             reconnect_listen_error(s->reconnect, time_msec(), error);
@@ -1010,6 +1051,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
         if (!error) {
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(s->stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->stream = NULL;
             s->seqno++;
         } else if (error != EAGAIN) {
@@ -1250,3 +1293,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
         jsonrpc_session_force_reconnect(s);
     }
 }
+
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
+ * will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+                                      size_t max_n_msgs,
+                                      size_t max_backlog_bytes)
+{
+    s->max_n_msgs = max_n_msgs;
+    s->max_backlog_bytes = max_backlog_bytes;
+    if (s->rpc) {
+        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+    }
+}
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index a44114e8d..d75d66b86 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
 
 int jsonrpc_get_status(const struct jsonrpc *);
 size_t jsonrpc_get_backlog(const struct jsonrpc *);
+void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
+                                                     size_t max_backlog_bytes);
+
 unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
 const char *jsonrpc_get_name(const struct jsonrpc *);
 
@@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
                                         int probe_interval);
 void jsonrpc_session_set_dscp(struct jsonrpc_session *,
                               uint8_t dscp);
+void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
+                                           size_t max_n_msgs,
+                                           size_t max_backlog_bytes);
 const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
 
 #endif /* jsonrpc.h */
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index f94a3eed8..67c714ff4 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft)
     raft->ping_timeout = time_msec() + raft->election_timer / 3;
 }
 
+#define RAFT_MAX_BACKLOG_N_MSGS    500
+#define RAFT_MAX_BACKLOG_BYTES     UINT32_MAX
+
 static void
 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
               const struct uuid *sid, bool incoming)
@@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
     conn->incoming = incoming;
     conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
     jsonrpc_session_set_probe_interval(js, 0);
+    jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
+                                              RAFT_MAX_BACKLOG_BYTES);
 }
 
 /* Starts the local server in an existing Raft cluster, using the local copy of
-- 
2.25.4



More information about the dev mailing list