[ovs-dev] [ovsdb 7/7] jsonrpc-server: Combine notifications when connection becomes backlogged.

Ben Pfaff blp at nicira.com
Wed Apr 2 21:27:51 UTC 2014


Connections that queue up too much data, because they are monitoring a
table that is changing quickly and failing to keep up with the updates,
cause problems with buffer management.  Since commit 60533a405b2e
(jsonrpc-server: Disconnect connections that queue too much data.),
ovsdb-server has dealt with them by disconnecting the connection and
letting them start up again with a fresh copy of the database.  However,
this is not ideal because of situations where disconnection happens
repeatedly.  For example:

     - A manager toggles a column back and forth between two or more values
       quickly (in which case the data transmitted over the monitoring
       connections always increases quickly, without bound).

     - A manager repeatedly extends the contents of some column in some row
       (in which case the data transmitted over the monitoring connection
       grows with O(n**2) in the length of the string).

A better way to deal with this problem is to combine updates when they are
sent to the monitoring connection, if that connection is not keeping up.
In both the above cases, this reduces the data that must be sent to a
manageable amount.  This commit implements this new way.

Bug #1211786.
Bug #1221378.
Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 ovsdb/jsonrpc-server.c |   84 ++++++++++++++++++++++++++++--------
 tests/ovsdb-server.at  |  112 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 179 insertions(+), 17 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 3e4e71e..692830c 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -27,6 +27,7 @@
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
+#include "poll-loop.h"
 #include "reconnect.h"
 #include "row.h"
 #include "server.h"
@@ -62,6 +63,8 @@ static bool ovsdb_jsonrpc_session_get_status(
     struct ovsdb_jsonrpc_remote_status *);
 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
+static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
+                                       struct jsonrpc_msg *);
 
 /* Triggers. */
 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
@@ -82,6 +85,8 @@ static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct json_array *params,
     const struct json *request_id);
 static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
+static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
 
 /* JSON-RPC database server. */
 
@@ -437,7 +442,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
     ovsdb_jsonrpc_trigger_complete_done(s);
 
     if (!jsonrpc_session_get_backlog(s->js)) {
-        struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
+        struct jsonrpc_msg *msg;
+
+        ovsdb_jsonrpc_monitor_flush_all(s);
+
+        msg = jsonrpc_session_recv(s->js);
         if (msg) {
             if (msg->type == JSONRPC_REQUEST) {
                 ovsdb_jsonrpc_session_got_request(s, msg);
@@ -482,7 +491,11 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
 {
     jsonrpc_session_wait(s->js);
     if (!jsonrpc_session_get_backlog(s->js)) {
-        jsonrpc_session_recv_wait(s->js);
+        if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
+            poll_immediate_wake();
+        } else {
+            jsonrpc_session_recv_wait(s->js);
+        }
     }
 }
 
@@ -698,7 +711,7 @@ ovsdb_jsonrpc_session_notify(struct ovsdb_session *session,
 
     s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
     params = json_array_create_1(json_string_create(lock_name));
-    jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
+    ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
 }
 
 static struct jsonrpc_msg *
@@ -873,7 +886,7 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
 
     if (reply) {
         jsonrpc_msg_destroy(request);
-        jsonrpc_session_send(s->js, reply);
+        ovsdb_jsonrpc_session_send(s, reply);
     }
 }
 
@@ -901,6 +914,14 @@ ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
     }
     jsonrpc_msg_destroy(request);
 }
+
+static void
+ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
+                           struct jsonrpc_msg *msg)
+{
+    ovsdb_jsonrpc_monitor_flush_all(s);
+    jsonrpc_session_send(s->js, msg);
+}
 
 /* JSON-RPC database server triggers.
  *
@@ -928,7 +949,7 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
 
         msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
                                    id);
-        jsonrpc_session_send(s->js, msg);
+        ovsdb_jsonrpc_session_send(s, msg);
         json_destroy(id);
         json_destroy(params);
         return;
@@ -979,7 +1000,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
             reply = jsonrpc_create_error(json_string_create("canceled"),
                                          t->id);
         }
-        jsonrpc_session_send(s->js, reply);
+        ovsdb_jsonrpc_session_send(s, reply);
     }
 
     json_destroy(t->id);
@@ -1639,6 +1660,46 @@ ovsdb_jsonrpc_monitor_compose_table_update(
     return json;
 }
 
+static bool
+ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
+{
+    struct ovsdb_jsonrpc_monitor *m;
+
+    HMAP_FOR_EACH (m, node, &s->monitors) {
+        struct shash_node *node;
+
+        SHASH_FOR_EACH (node, &m->tables) {
+            struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+
+            if (!hmap_is_empty(&mt->changes)) {
+                return true;
+            }
+        }
+    }
+
+    return false;
+}
+
+static void
+ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
+{
+    struct ovsdb_jsonrpc_monitor *m;
+
+    HMAP_FOR_EACH (m, node, &s->monitors) {
+        struct json *json;
+
+        json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
+        if (json) {
+            struct jsonrpc_msg *msg;
+            struct json *params;
+
+            params = json_array_create_2(json_clone(m->monitor_id), json);
+            msg = jsonrpc_create_notify("update", params);
+            jsonrpc_session_send(s->js, msg);
+        }
+    }
+}
+
 static void
 ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
                                const struct ovsdb_jsonrpc_monitor *m)
@@ -1654,20 +1715,9 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
 {
     struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
     struct ovsdb_jsonrpc_monitor_aux aux;
-    struct json *json;
 
     ovsdb_jsonrpc_monitor_init_aux(&aux, m);
     ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
-    json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
-    if (json) {
-        struct jsonrpc_msg *msg;
-        struct json *params;
-
-        params = json_array_create_2(json_clone(aux.monitor->monitor_id),
-                                     json);
-        msg = jsonrpc_create_notify("update", params);
-        jsonrpc_session_send(aux.monitor->session->js, msg);
-    }
 
     return NULL;
 }
diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
index aee6f77..3393b94 100644
--- a/tests/ovsdb-server.at
+++ b/tests/ovsdb-server.at
@@ -38,6 +38,8 @@ cat stdout >> output
 
 EXECUTION_EXAMPLES
 
+AT_BANNER([ovsdb-server miscellaneous features])
+
 AT_SETUP([truncating corrupted database log])
 AT_KEYWORDS([ovsdb server positive unix])
 OVS_RUNDIR=`pwd`; export OVS_RUNDIR
@@ -662,6 +664,116 @@ _uuid                                name  number
 ], [], [test ! -e pid || kill `cat pid`])
 OVSDB_SERVER_SHUTDOWN
 AT_CLEANUP
+
+AT_SETUP([ovsdb-server combines updates on backlogged connections])
+OVS_LOGDIR=`pwd`; export OVS_LOGDIR
+OVS_RUNDIR=`pwd`; export OVS_RUNDIR
+ON_EXIT([kill `cat *.pid`])
+
+# The maximum socket receive buffer size is important for this test, which
+# tests behavior when the receive buffer overflows.
+if test -e /proc/sys/net/core/rmem_max; then
+    # Linux
+    rmem_max=`cat /proc/sys/net/core/rmem_max`
+elif rmem_max=`sysctl -n net.inet.tcp.recvbuf_max 2>/dev/null`; then
+    : # FreeBSD
+else
+    # Don't know how to get maximum socket receive buffer on this OS
+    AT_SKIP_IF([:])
+fi
+
+# Calculate the number of iterations we need to queue.  Each of the
+# iterations we execute, by itself, yields a monitor update of about
+# 25 kB, so fill up that much space plus a few for luck.
+n_iterations=`expr $rmem_max / 2500 + 5`
+echo rmem_max=$rmem_max n_iterations=$n_iterations
+
+# Calculate the exact number of monitor updates expected for $n_iterations,
+# assuming no updates are combined.  The "extra" update is for the initial
+# contents of the database.
+n_updates=`expr $n_iterations \* 3 + 1`
+
+# Start an ovsdb-server with the vswitchd schema.
+OVSDB_INIT([db])
+AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --log-file --remote=punix:db.sock db],
+  [0], [ignore], [ignore])
+
+# Executes a set of transactions that add a bridge with 100 ports, and
+# then deletes that bridge.  This yields three monitor updates that
+# add up to about 25 kB in size.
+#
+# The update also increments a counter held in the database so that we can
+# verify that the overall effect of the transactions took effect (e.g.
+# monitor updates at the end weren't just dropped).  We add an arbitrary
+# string to the counter to make grepping for it more reliable.
+counter=0
+trigger_big_update () {
+    counter=`expr $counter + 1`
+    ovs-vsctl --no-wait -- set open_vswitch . system_version=xyzzy$counter
+    ovs-vsctl --no-wait -- add-br br0 $add
+    ovs-vsctl --no-wait -- del-br br0
+}
+add_ports () {
+    for j in `seq 1 100`; do
+        printf " -- add-port br0 p%d" $j
+    done
+}
+add=`add_ports`
+
+AT_CAPTURE_FILE([ovsdb-client.err])
+
+# Start an ovsdb-client monitoring all changes to the database,
+# make it block to force the buffers to fill up, and then execute
+# enough iterations that ovsdb-server starts combining updates.
+AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL >ovsdb-client.out 2>ovsdb-client.err])
+AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/block])
+for i in `seq 1 $n_iterations`; do
+    echo "blocked update ($i of $n_iterations)"
+    trigger_big_update $i
+done
+AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/unblock])
+OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
+AT_CHECK([ovs-appctl -t ovsdb-client exit])
+OVS_WAIT_WHILE([test -e ovsdb-client.pid])
+
+# Count the number of updates in the ovsdb-client output, by counting
+# the number of changes to the Open_vSwitch table.  (All of our
+# transactions modify the Open_vSwitch table.)  It should be less than
+# $n_updates updates.
+#
+# Check that the counter is what we expect.
+logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
+echo "logged_updates=$logged_updates (expected less than $n_updates)"
+AT_CHECK([test $logged_updates -lt $n_updates])
+AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
+  ["xyzzy$counter"
+])
+
+# Start an ovsdb-client monitoring all changes to the database,
+# without making it block, and then execute the same transactions that
+# we did before.
+AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL >ovsdb-client.out 2>ovsdb-client.err])
+for i in `seq 1 $n_iterations`; do
+    echo "unblocked update ($i of $n_iterations)"
+    trigger_big_update
+
+    # Make sure that ovsdb-client gets enough CPU time to process the updates.
+    ovs-appctl -t ovsdb-client version > /dev/null
+done
+OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
+AT_CHECK([ovs-appctl -t ovsdb-client exit])
+OVS_WAIT_WHILE([test -e ovsdb-client.pid])
+
+# The ovsdb-client output should have exactly $n_updates updates.
+#
+# Also check that the counter is what we expect.
+logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
+echo "logged_updates=$logged_updates (expected $n_updates)"
+AT_CHECK([test $logged_updates -eq $n_updates])
+AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
+  ["xyzzy$counter"
+])
+AT_CLEANUP
 
 AT_BANNER([OVSDB -- ovsdb-server transactions (SSL IPv4 sockets)])
 
-- 
1.7.10.4




More information about the dev mailing list