[ovs-dev] [PATCH v2 2/3] raft: Don't keep full json objects in memory if no longer needed.

Ilya Maximets i.maximets at ovn.org
Tue Aug 24 19:00:38 UTC 2021


Raft log entries (and raft database snapshot) contains json objects
of the data.  Follower receives append requests with data that gets
parsed and added to the raft log.  Leader receives execution requests,
parses data out of them and adds to the log.  In both cases, later
ovsdb-server reads the log with ovsdb_storage_read(), constructs
transaction and updates the database.  On followers these json objects
in common case are never used again.  Leader may use them to send
append requests or snapshot installation requests to followers.
However, all these operations (except for ovsdb_storage_read()) are
just serializing the json in order to send it over the network.

Json objects are significantly larger than their serialized string
representation.  For example, the snapshot of the database from one of
the ovn-heater scale tests takes 270 MB as a string, but 1.6 GB as
a json object from the total 3.8 GB consumed by ovsdb-server process.

ovsdb_storage_read() for a given raft entry happens only once in a
lifetime, so after this call, we can serialize the json object, store
the string representation and free the actual json object that ovsdb
will never need again.  This can save a lot of memory and can also
save serialization time, because each raft entry for append requests
and snapshot installation requests serialized only once instead of
doing that every time such request needs to be sent.

JSON_SERIALIZED_OBJECT can be used in order to seamlessly integrate
pre-serialized data into raft_header and similar json objects.

One major special case is creation of a database snapshot.
Snapshot installation request received over the network will be parsed
and read by ovsdb-server just like any other raft log entry.  However,
snapshots created locally with raft_store_snapshot() will never be
read back, because they reflect the current state of the database,
hence already applied.  For this case we can free the json object
right after writing snapshot on disk.

Tests performed with ovn-heater on 60 node density-light scenario,
where on-disk database goes up to 97 MB, shows average memory
consumption of ovsdb-server Southbound DB processes decreased by 58%
(from 602 MB to 256 MB per process) and peak memory consumption
decreased by 40% (from 1288 MB to 771 MB).

Test with 120 nodes on density-heavy scenario with 270 MB on-disk
database shows 1.5 GB memory consumption decrease as expected.
Also, total CPU time consumed by the Southbound DB process reduced
from 296 to 256 minutes.  Number of unreasonably long poll intervals
reduced from 2896 down to 1934.

Deserialization is also implemented just in case.  I didn't see this
function being invoked in practice.

Signed-off-by: Ilya Maximets <i.maximets at ovn.org>
---
 ovsdb/ovsdb-tool.c   |  10 ++--
 ovsdb/raft-private.c | 111 ++++++++++++++++++++++++++++++++++++++-----
 ovsdb/raft-private.h |  12 ++++-
 ovsdb/raft.c         |  94 ++++++++++++++++++++----------------
 ovsdb/raft.h         |   3 +-
 ovsdb/storage.c      |   4 +-
 6 files changed, 170 insertions(+), 64 deletions(-)

diff --git a/ovsdb/ovsdb-tool.c b/ovsdb/ovsdb-tool.c
index 05a0223e7..903b2ebc9 100644
--- a/ovsdb/ovsdb-tool.c
+++ b/ovsdb/ovsdb-tool.c
@@ -919,7 +919,7 @@ print_raft_header(const struct raft_header *h,
         if (!uuid_is_zero(&h->snap.eid)) {
             printf(" prev_eid: %04x\n", uuid_prefix(&h->snap.eid, 4));
         }
-        print_data("prev_", h->snap.data, schemap, names);
+        print_data("prev_", raft_entry_get_data(&h->snap), schemap, names);
     }
 }
 
@@ -973,11 +973,13 @@ raft_header_to_standalone_log(const struct raft_header *h,
                               struct ovsdb_log *db_log_data)
 {
     if (h->snap_index) {
-        if (!h->snap.data || json_array(h->snap.data)->n != 2) {
+        const struct json *data = raft_entry_get_data(&h->snap);
+
+        if (!data || json_array(data)->n != 2) {
             ovs_fatal(0, "Incorrect raft header data array length");
         }
 
-        struct json_array *pa = json_array(h->snap.data);
+        struct json_array *pa = json_array(data);
         struct json *schema_json = pa->elems[0];
         struct ovsdb_error *error = NULL;
 
@@ -1373,7 +1375,7 @@ do_check_cluster(struct ovs_cmdl_context *ctx)
                 }
                 struct raft_entry *e = &s->entries[log_idx];
                 e->term = r->term;
-                e->data = r->entry.data;
+                raft_entry_set_data_nocopy(e, r->entry.data);
                 e->eid = r->entry.eid;
                 e->servers = r->entry.servers;
                 break;
diff --git a/ovsdb/raft-private.c b/ovsdb/raft-private.c
index 26d39a087..33c20e71b 100644
--- a/ovsdb/raft-private.c
+++ b/ovsdb/raft-private.c
@@ -18,11 +18,16 @@
 
 #include "raft-private.h"
 
+#include "coverage.h"
 #include "openvswitch/dynamic-string.h"
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "socket-util.h"
 #include "sset.h"
+
+COVERAGE_DEFINE(raft_entry_deserialize);
+COVERAGE_DEFINE(raft_entry_serialize);
+
 
 /* Addresses of Raft servers. */
 
@@ -281,7 +286,8 @@ void
 raft_entry_clone(struct raft_entry *dst, const struct raft_entry *src)
 {
     dst->term = src->term;
-    dst->data = json_nullable_clone(src->data);
+    dst->data.full_json = json_nullable_clone(src->data.full_json);
+    dst->data.serialized = json_nullable_clone(src->data.serialized);
     dst->eid = src->eid;
     dst->servers = json_nullable_clone(src->servers);
     dst->election_timer = src->election_timer;
@@ -291,7 +297,8 @@ void
 raft_entry_uninit(struct raft_entry *e)
 {
     if (e) {
-        json_destroy(e->data);
+        json_destroy(e->data.full_json);
+        json_destroy(e->data.serialized);
         json_destroy(e->servers);
     }
 }
@@ -301,8 +308,9 @@ raft_entry_to_json(const struct raft_entry *e)
 {
     struct json *json = json_object_create();
     raft_put_uint64(json, "term", e->term);
-    if (e->data) {
-        json_object_put(json, "data", json_clone(e->data));
+    if (raft_entry_has_data(e)) {
+        json_object_put(json, "data",
+                        json_clone(raft_entry_get_serialized_data(e)));
         json_object_put_format(json, "eid", UUID_FMT, UUID_ARGS(&e->eid));
     }
     if (e->servers) {
@@ -323,9 +331,10 @@ raft_entry_from_json(struct json *json, struct raft_entry *e)
     struct ovsdb_parser p;
     ovsdb_parser_init(&p, json, "raft log entry");
     e->term = raft_parse_required_uint64(&p, "term");
-    e->data = json_nullable_clone(
+    raft_entry_set_data(e,
         ovsdb_parser_member(&p, "data", OP_OBJECT | OP_ARRAY | OP_OPTIONAL));
-    e->eid = e->data ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO;
+    e->eid = raft_entry_has_data(e)
+             ? raft_parse_required_uuid(&p, "eid") : UUID_ZERO;
     e->servers = json_nullable_clone(
         ovsdb_parser_member(&p, "servers", OP_OBJECT | OP_OPTIONAL));
     if (e->servers) {
@@ -344,9 +353,86 @@ bool
 raft_entry_equals(const struct raft_entry *a, const struct raft_entry *b)
 {
     return (a->term == b->term
-            && json_equal(a->data, b->data)
             && uuid_equals(&a->eid, &b->eid)
-            && json_equal(a->servers, b->servers));
+            && json_equal(a->servers, b->servers)
+            && json_equal(raft_entry_get_data(a), raft_entry_get_data(b)));
+}
+
+bool
+raft_entry_has_data(const struct raft_entry *e)
+{
+    return e->data.full_json || e->data.serialized;
+}
+
+/* Triggers generation of a serialized copy of the data. */
+void
+raft_entry_data_serialize(struct raft_entry *e)
+{
+    if (!raft_entry_has_data(e) || e->data.serialized) {
+        return;
+    }
+    COVERAGE_INC(raft_entry_serialize);
+    e->data.serialized = json_serialized_object_create(e->data.full_json);
+}
+
+static void
+raft_entry_data_deserialize(struct raft_entry *e)
+{
+    if (!raft_entry_has_data(e) || e->data.full_json) {
+        return;
+    }
+    COVERAGE_INC(raft_entry_deserialize);
+    e->data.full_json = json_from_serialized_object(e->data.serialized);
+}
+
+
+void
+raft_entry_set_data_nocopy(struct raft_entry *e, struct json *json)
+{
+    ovs_assert(!json || json->type != JSON_SERIALIZED_OBJECT);
+    e->data.full_json = json;
+    e->data.serialized = NULL;
+}
+
+void
+raft_entry_set_data(struct raft_entry *e, const struct json *json)
+{
+    raft_entry_set_data_nocopy(e, json_nullable_clone(json));
+}
+
+/* Returns a pointer to the fully parsed json object of the data.
+ * Caller takes the ownership of the result.
+ *
+ * Performance notice: Subsequent call of this function for the same data
+ * object will lead to re-parsing of the serialized json, unless updated
+ * with raft_entry_set_data(). */
+struct json *
+raft_entry_steal_parsed_data(struct raft_entry *e)
+{
+    /* Ensure that both versions exists. */
+    raft_entry_data_serialize(e);
+    raft_entry_data_deserialize(e);
+
+    struct json *json = e->data.full_json;
+    e->data.full_json = NULL;
+
+    return json;
+}
+
+/* Returns a pointer to the fully parsed json object of the data. */
+const struct json *
+raft_entry_get_data(const struct raft_entry *e)
+{
+    raft_entry_data_deserialize(CONST_CAST(struct raft_entry *, e));
+    return e->data.full_json;
+}
+
+/* Returns a pointer to the JSON_SERIALIZED_OBJECT of the data. */
+const struct json *
+raft_entry_get_serialized_data(const struct raft_entry *e)
+{
+    raft_entry_data_serialize(CONST_CAST(struct raft_entry *, e));
+    return e->data.serialized;
 }
 
 void
@@ -402,8 +488,8 @@ raft_header_from_json__(struct raft_header *h, struct ovsdb_parser *p)
          * present, all of them must be. */
         h->snap_index = raft_parse_optional_uint64(p, "prev_index");
         if (h->snap_index) {
-            h->snap.data = json_nullable_clone(
-                ovsdb_parser_member(p, "prev_data", OP_ANY));
+            raft_entry_set_data(
+                &h->snap, ovsdb_parser_member(p, "prev_data", OP_ANY));
             h->snap.eid = raft_parse_required_uuid(p, "prev_eid");
             h->snap.term = raft_parse_required_uint64(p, "prev_term");
             h->snap.election_timer = raft_parse_optional_uint64(
@@ -455,8 +541,9 @@ raft_header_to_json(const struct raft_header *h)
     if (h->snap_index) {
         raft_put_uint64(json, "prev_index", h->snap_index);
         raft_put_uint64(json, "prev_term", h->snap.term);
-        if (h->snap.data) {
-            json_object_put(json, "prev_data", json_clone(h->snap.data));
+        if (raft_entry_has_data(&h->snap)) {
+            json_object_put(json, "prev_data",
+                json_clone(raft_entry_get_serialized_data(&h->snap)));
         }
         json_object_put_format(json, "prev_eid",
                                UUID_FMT, UUID_ARGS(&h->snap.eid));
diff --git a/ovsdb/raft-private.h b/ovsdb/raft-private.h
index a69e37e5c..c39842593 100644
--- a/ovsdb/raft-private.h
+++ b/ovsdb/raft-private.h
@@ -118,7 +118,10 @@ void raft_servers_format(const struct hmap *servers, struct ds *ds);
  * entry.  */
 struct raft_entry {
     uint64_t term;
-    struct json *data;
+    struct {
+        struct json *full_json;   /* Fully parsed JSON object. */
+        struct json *serialized;  /* JSON_SERIALIZED_OBJECT version of data. */
+    } data;
     struct uuid eid;
     struct json *servers;
     uint64_t election_timer;
@@ -130,6 +133,13 @@ struct json *raft_entry_to_json(const struct raft_entry *);
 struct ovsdb_error *raft_entry_from_json(struct json *, struct raft_entry *)
     OVS_WARN_UNUSED_RESULT;
 bool raft_entry_equals(const struct raft_entry *, const struct raft_entry *);
+bool raft_entry_has_data(const struct raft_entry *);
+void raft_entry_set_data(struct raft_entry *, const struct json *);
+void raft_entry_set_data_nocopy(struct raft_entry *, struct json *);
+void raft_entry_data_serialize(struct raft_entry *);
+struct json *raft_entry_steal_parsed_data(struct raft_entry *);
+const struct json *raft_entry_get_data(const struct raft_entry *);
+const struct json *raft_entry_get_serialized_data(const struct raft_entry *);
 
 /* On disk data serialization and deserialization. */
 
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index 2fb515651..cca551496 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -494,11 +494,11 @@ raft_create_cluster(const char *file_name, const char *name,
         .snap_index = index++,
         .snap = {
             .term = term,
-            .data = json_nullable_clone(data),
             .eid = uuid_random(),
             .servers = json_object_create(),
         },
     };
+    raft_entry_set_data(&h.snap, data);
     shash_add_nocopy(json_object(h.snap.servers),
                      xasprintf(UUID_FMT, UUID_ARGS(&h.sid)),
                      json_string_create(local_address));
@@ -727,10 +727,10 @@ raft_add_entry(struct raft *raft,
     uint64_t index = raft->log_end++;
     struct raft_entry *entry = &raft->entries[index - raft->log_start];
     entry->term = term;
-    entry->data = data;
     entry->eid = eid ? *eid : UUID_ZERO;
     entry->servers = servers;
     entry->election_timer = election_timer;
+    raft_entry_set_data_nocopy(entry, data);
     return index;
 }
 
@@ -741,13 +741,16 @@ raft_write_entry(struct raft *raft, uint64_t term, struct json *data,
                  const struct uuid *eid, struct json *servers,
                  uint64_t election_timer)
 {
+    uint64_t index = raft_add_entry(raft, term, data, eid, servers,
+                                    election_timer);
+    const struct json *entry_data = raft_entry_get_serialized_data(
+                                      &raft->entries[index - raft->log_start]);
     struct raft_record r = {
         .type = RAFT_REC_ENTRY,
         .term = term,
         .entry = {
-            .index = raft_add_entry(raft, term, data, eid, servers,
-                                    election_timer),
-            .data = data,
+            .index = index,
+            .data = CONST_CAST(struct json *, entry_data),
             .servers = servers,
             .election_timer = election_timer,
             .eid = eid ? *eid : UUID_ZERO,
@@ -2161,7 +2164,7 @@ raft_get_eid(const struct raft *raft, uint64_t index)
 {
     for (; index >= raft->log_start; index--) {
         const struct raft_entry *e = raft_get_entry(raft, index);
-        if (e->data) {
+        if (raft_entry_has_data(e)) {
             return &e->eid;
         }
     }
@@ -2826,8 +2829,8 @@ raft_truncate(struct raft *raft, uint64_t new_end)
     return servers_changed;
 }
 
-static const struct json *
-raft_peek_next_entry(struct raft *raft, struct uuid *eid)
+static const struct raft_entry *
+raft_peek_next_entry(struct raft *raft)
 {
     /* Invariant: log_start - 2 <= last_applied <= commit_index < log_end. */
     ovs_assert(raft->log_start <= raft->last_applied + 2);
@@ -2839,32 +2842,20 @@ raft_peek_next_entry(struct raft *raft, struct uuid *eid)
     }
 
     if (raft->log_start == raft->last_applied + 2) {
-        *eid = raft->snap.eid;
-        return raft->snap.data;
+        return &raft->snap;
     }
 
     while (raft->last_applied < raft->commit_index) {
         const struct raft_entry *e = raft_get_entry(raft,
                                                     raft->last_applied + 1);
-        if (e->data) {
-            *eid = e->eid;
-            return e->data;
+        if (raft_entry_has_data(e)) {
+            return e;
         }
         raft->last_applied++;
     }
     return NULL;
 }
 
-static const struct json *
-raft_get_next_entry(struct raft *raft, struct uuid *eid)
-{
-    const struct json *data = raft_peek_next_entry(raft, eid);
-    if (data) {
-        raft->last_applied++;
-    }
-    return data;
-}
-
 /* Updates commit index in raft log. If commit index is already up-to-date
  * it does nothing and return false, otherwise, returns true. */
 static bool
@@ -2878,7 +2869,7 @@ raft_update_commit_index(struct raft *raft, uint64_t new_commit_index)
         while (raft->commit_index < new_commit_index) {
             uint64_t index = ++raft->commit_index;
             const struct raft_entry *e = raft_get_entry(raft, index);
-            if (e->data) {
+            if (raft_entry_has_data(e)) {
                 struct raft_command *cmd
                     = raft_find_command_by_eid(raft, &e->eid);
                 if (cmd) {
@@ -3059,7 +3050,8 @@ raft_handle_append_entries(struct raft *raft,
     for (; i < n_entries; i++) {
         const struct raft_entry *e = &entries[i];
         error = raft_write_entry(raft, e->term,
-                                 json_nullable_clone(e->data), &e->eid,
+                                 json_nullable_clone(raft_entry_get_data(e)),
+                                 &e->eid,
                                  json_nullable_clone(e->servers),
                                  e->election_timer);
         if (error) {
@@ -3314,20 +3306,26 @@ bool
 raft_has_next_entry(const struct raft *raft_)
 {
     struct raft *raft = CONST_CAST(struct raft *, raft_);
-    struct uuid eid;
-    return raft_peek_next_entry(raft, &eid) != NULL;
+    return raft_peek_next_entry(raft) != NULL;
 }
 
 /* Returns the next log entry or snapshot from 'raft', or NULL if there are
- * none left to read.  Stores the entry ID of the log entry in '*eid'.  Stores
- * true in '*is_snapshot' if the returned data is a snapshot, false if it is a
- * log entry. */
-const struct json *
-raft_next_entry(struct raft *raft, struct uuid *eid, bool *is_snapshot)
+ * none left to read.  Stores the entry ID of the log entry in '*eid'.
+ *
+ * The caller takes ownership of the result. */
+struct json *
+raft_next_entry(struct raft *raft, struct uuid *eid)
 {
-    const struct json *data = raft_get_next_entry(raft, eid);
-    *is_snapshot = data == raft->snap.data;
-    return data;
+    const struct raft_entry *e = raft_peek_next_entry(raft);
+
+    if (!e) {
+        return NULL;
+    }
+
+    raft->last_applied++;
+    *eid = e->eid;
+
+    return raft_entry_steal_parsed_data(CONST_CAST(struct raft_entry *, e));
 }
 
 /* Returns the log index of the last-read snapshot or log entry. */
@@ -3420,6 +3418,7 @@ raft_send_install_snapshot_request(struct raft *raft,
                                    const struct raft_server *s,
                                    const char *comment)
 {
+    const struct json *data = raft_entry_get_serialized_data(&raft->snap);
     union raft_rpc rpc = {
         .install_snapshot_request = {
             .common = {
@@ -3432,7 +3431,7 @@ raft_send_install_snapshot_request(struct raft *raft,
             .last_term = raft->snap.term,
             .last_servers = raft->snap.servers,
             .last_eid = raft->snap.eid,
-            .data = raft->snap.data,
+            .data = CONST_CAST(struct json *, data),
             .election_timer = raft->election_timer, /* use latest value */
         }
     };
@@ -3998,12 +3997,13 @@ raft_write_snapshot(struct raft *raft, struct ovsdb_log *log,
     /* Write log records. */
     for (uint64_t index = new_log_start; index < raft->log_end; index++) {
         const struct raft_entry *e = &raft->entries[index - raft->log_start];
+        const struct json *log_data = raft_entry_get_serialized_data(e);
         struct raft_record r = {
             .type = RAFT_REC_ENTRY,
             .term = e->term,
             .entry = {
                 .index = index,
-                .data = e->data,
+                .data = CONST_CAST(struct json *, log_data),
                 .servers = e->servers,
                 .election_timer = e->election_timer,
                 .eid = e->eid,
@@ -4093,19 +4093,23 @@ raft_handle_install_snapshot_request__(
 
     /* Case 3: The new snapshot starts past the end of our current log, so
      * discard all of our current log. */
-    const struct raft_entry new_snapshot = {
+    struct raft_entry new_snapshot = {
         .term = rq->last_term,
-        .data = rq->data,
         .eid = rq->last_eid,
-        .servers = rq->last_servers,
+        .servers = json_clone(rq->last_servers),
         .election_timer = rq->election_timer,
     };
+    raft_entry_set_data(&new_snapshot, rq->data);
+    /* Ensure that new snapshot contains serialized data object. */
+    raft_entry_data_serialize(&new_snapshot);
+
     struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
                                                    &new_snapshot);
     if (error) {
         char *error_s = ovsdb_error_to_string_free(error);
         VLOG_WARN("could not save snapshot: %s", error_s);
         free(error_s);
+        raft_entry_uninit(&new_snapshot);
         return false;
     }
 
@@ -4120,7 +4124,7 @@ raft_handle_install_snapshot_request__(
     }
 
     raft_entry_uninit(&raft->snap);
-    raft_entry_clone(&raft->snap, &new_snapshot);
+    raft->snap = new_snapshot;
 
     raft_get_servers_from_log(raft, VLL_INFO);
     raft_get_election_timer_from_log(raft);
@@ -4265,11 +4269,14 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
     uint64_t new_log_start = raft->last_applied + 1;
     struct raft_entry new_snapshot = {
         .term = raft_get_term(raft, new_log_start - 1),
-        .data = json_clone(new_snapshot_data),
         .eid = *raft_get_eid(raft, new_log_start - 1),
         .servers = json_clone(raft_servers_for_index(raft, new_log_start - 1)),
         .election_timer = raft->election_timer,
     };
+    raft_entry_set_data(&new_snapshot, new_snapshot_data);
+    /* Ensure that new snapshot contains serialized data object. */
+    raft_entry_data_serialize(&new_snapshot);
+
     struct ovsdb_error *error = raft_save_snapshot(raft, new_log_start,
                                                    &new_snapshot);
     if (error) {
@@ -4286,6 +4293,9 @@ raft_store_snapshot(struct raft *raft, const struct json *new_snapshot_data)
     memmove(&raft->entries[0], &raft->entries[new_log_start - raft->log_start],
             (raft->log_end - new_log_start) * sizeof *raft->entries);
     raft->log_start = new_log_start;
+    /* It's a snapshot of a current database state, ovsdb-server will not
+     * read it back.  Destroying the parsed json object to not waste memory. */
+    json_destroy(raft_entry_steal_parsed_data(&raft->snap));
     return NULL;
 }
 
diff --git a/ovsdb/raft.h b/ovsdb/raft.h
index 3545c41c2..710862f94 100644
--- a/ovsdb/raft.h
+++ b/ovsdb/raft.h
@@ -132,8 +132,7 @@ bool raft_left(const struct raft *);
 bool raft_failed(const struct raft *);
 
 /* Reading snapshots and log entries. */
-const struct json *raft_next_entry(struct raft *, struct uuid *eid,
-                                   bool *is_snapshot);
+struct json *raft_next_entry(struct raft *, struct uuid *eid);
 bool raft_has_next_entry(const struct raft *);
 
 uint64_t raft_get_applied_index(const struct raft *);
diff --git a/ovsdb/storage.c b/ovsdb/storage.c
index d727b1eac..9e32efe58 100644
--- a/ovsdb/storage.c
+++ b/ovsdb/storage.c
@@ -268,9 +268,7 @@ ovsdb_storage_read(struct ovsdb_storage *storage,
     struct json *schema_json = NULL;
     struct json *txn_json = NULL;
     if (storage->raft) {
-        bool is_snapshot;
-        json = json_nullable_clone(
-            raft_next_entry(storage->raft, txnid, &is_snapshot));
+        json = raft_next_entry(storage->raft, txnid);
         if (!json) {
             return NULL;
         } else if (json->type != JSON_ARRAY || json->array.n != 2) {
-- 
2.31.1



More information about the dev mailing list