[ovs-dev] [ovsdb-server multithreading RFC 1/9] ovsdb: Do not group sessions by remote.

Andy Zhou azhou at ovn.org
Thu Mar 3 08:13:20 UTC 2016


Currently ovsdb_jsonrpc_session are grouped together in a linked
list within  'ovsdb_jsonrpc_remote'. This makes sense since most
session operations applies to sessions within a remote.

However, in order to scale up ovsdb-server with multi-threading, it is
more convenient to distribute a sessions to any thread available,
regardless which remote it is associated with.

This patch introduces a set of APIs that provide operations on
a list of sessions. Instead of group sessions by remote, they
are linked together in a new ovs_list field 'all_sessions' in the
ovsdb_jsonrpc_server struct.

With multi-threading, the design is that all sessions managed
by a thread will have them linked together on a thread private
linked list. At that time, the 'all_sessions' field in
ovsdb_jsonrpc_server struct will have all session managed
the main process.

Signed-off-by: Andy Zhou <azhou at ovn.org>
---
 ovsdb/jsonrpc-server.c | 242 ++++++++++++++++++++++++++++---------------------
 1 file changed, 140 insertions(+), 102 deletions(-)

diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 15dbc4e..56dddc6 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -52,23 +52,30 @@ static bool monitor2_enable__ = true;
 /* Message rate-limiting. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
+/* Session set. */
+static void ovsdb_jsonrpc_sessions_run(struct ovs_list *);
+static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *);
+static void ovsdb_jsonrpc_sessions_close(struct ovs_list *,
+                                   const struct ovsdb_jsonrpc_remote *remote);
+static void ovsdb_jsonrpc_sessions_reconnect( struct ovs_list *,
+                                   const struct ovsdb_jsonrpc_remote *remote);
+static void ovsdb_jsonrpc_sessions_set_options(struct ovs_list *,
+                                   const struct ovsdb_jsonrpc_remote *remote,
+                                   const struct ovsdb_jsonrpc_options *);
+static size_t ovsdb_jsonrpc_sessions_count(const struct ovs_list *,
+                                   const struct ovsdb_jsonrpc_remote *remote);
+
 /* Sessions. */
 static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
     struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
-static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_get_memory_usage_all(
-    const struct ovsdb_jsonrpc_remote *, struct simap *usage);
-static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_set_all_options(
-    struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
 static bool ovsdb_jsonrpc_active_session_get_status(
-    const struct ovsdb_jsonrpc_remote *,
+    const struct ovsdb_jsonrpc_remote *remote,
     struct ovsdb_jsonrpc_remote_status *);
 static void ovsdb_jsonrpc_session_get_status(
     const struct ovsdb_jsonrpc_session *,
     struct ovsdb_jsonrpc_remote_status *);
+static void ovsdb_jsonrpc_session_get_memory_usage_all(
+    const struct ovsdb_jsonrpc_server *, struct simap *usage);
 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
@@ -106,15 +113,22 @@ struct ovsdb_jsonrpc_server {
     struct ovsdb_server up;
     unsigned int n_sessions;
     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
+    struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's.   */
 };
 
+/* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server pinter.
+ * Caller needs to make sure this conversion is valid.   */
+static struct ovsdb_jsonrpc_server *
+ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) {
+    return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up);
+}
+
 /* A configured remote.  This is either a passive stream listener plus a list
  * of the currently connected sessions, or a list of exactly one active
  * session. */
 struct ovsdb_jsonrpc_remote {
     struct ovsdb_jsonrpc_server *server;
     struct pstream *listener;   /* Listener, if passive. */
-    struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
     uint8_t dscp;
 };
 
@@ -134,6 +148,7 @@ ovsdb_jsonrpc_server_create(void)
     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     ovsdb_server_init(&server->up);
     shash_init(&server->remotes);
+    list_init(&server->all_sessions);
     return server;
 }
 
@@ -232,7 +247,8 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
             }
         }
 
-        ovsdb_jsonrpc_session_set_all_options(remote, options);
+        ovsdb_jsonrpc_sessions_set_options(&svr->all_sessions, remote,
+                                           options);
     }
 }
 
@@ -254,7 +270,6 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     remote = xmalloc(sizeof *remote);
     remote->server = svr;
     remote->listener = listener;
-    list_init(&remote->sessions);
     remote->dscp = options->dscp;
     shash_add(&svr->remotes, name, remote);
 
@@ -268,10 +283,11 @@ static void
 ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
 {
     struct ovsdb_jsonrpc_remote *remote = node->data;
+    struct ovsdb_jsonrpc_server *server = remote->server;
 
-    ovsdb_jsonrpc_session_close_all(remote);
+    ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote);
     pstream_close(remote->listener);
-    shash_delete(&remote->server->remotes, node);
+    shash_delete(&server->remotes, node);
     free(remote);
 }
 
@@ -297,9 +313,12 @@ ovsdb_jsonrpc_server_get_remote_status(
     }
 
     if (remote->listener) {
+        int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions,
+                                                         remote);
+
         status->bound_port = pstream_get_bound_port(remote->listener);
-        status->is_connected = !list_is_empty(&remote->sessions);
-        status->n_connections = list_size(&remote->sessions);
+        status->n_connections = n_connections;
+        status->is_connected = (n_connections != 0);
         return true;
     }
 
@@ -325,7 +344,7 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
     SHASH_FOR_EACH (node, &svr->remotes) {
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
-        ovsdb_jsonrpc_session_reconnect_all(remote);
+        ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote);
     }
 }
 
@@ -354,7 +373,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
             }
         }
 
-        ovsdb_jsonrpc_session_run_all(remote);
+        ovsdb_jsonrpc_sessions_run(&svr->all_sessions);
     }
 }
 
@@ -370,7 +389,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
             pstream_wait(remote->listener);
         }
 
-        ovsdb_jsonrpc_session_wait_all(remote);
+        ovsdb_jsonrpc_sessions_wait(&svr->all_sessions);
     }
 }
 
@@ -380,14 +399,8 @@ void
 ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
                                       struct simap *usage)
 {
-    struct shash_node *node;
-
     simap_increase(usage, "sessions", svr->n_sessions);
-    SHASH_FOR_EACH (node, &svr->remotes) {
-        struct ovsdb_jsonrpc_remote *remote = node->data;
-
-        ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
-    }
+    ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage);
 }
 
 /* JSON-RPC database server session. */
@@ -423,17 +436,18 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
                              struct jsonrpc_session *js)
 {
     struct ovsdb_jsonrpc_session *s;
+    struct ovsdb_jsonrpc_server *server = remote->server;
 
     s = xzalloc(sizeof *s);
-    ovsdb_session_init(&s->up, &remote->server->up);
+    ovsdb_session_init(&s->up, &server->up);
     s->remote = remote;
-    list_push_back(&remote->sessions, &s->node);
+    list_push_back(&server->all_sessions, &s->node);
     hmap_init(&s->triggers);
     hmap_init(&s->monitors);
     s->js = js;
     s->js_seqno = jsonrpc_session_get_seqno(js);
 
-    remote->server->n_sessions++;
+    server->n_sessions++;
 
     return s;
 }
@@ -441,6 +455,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
 static void
 ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 {
+    struct ovsdb_jsonrpc_server *server;
+
     ovsdb_jsonrpc_monitor_remove_all(s);
     ovsdb_jsonrpc_session_unlock_all(s);
     ovsdb_jsonrpc_trigger_complete_all(s);
@@ -450,7 +466,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
 
     jsonrpc_session_close(s->js);
     list_remove(&s->node);
-    s->remote->server->n_sessions--;
+    server = ovsdb_jsonrpc_server_cast(s->up.server);
+    server->n_sessions--;
     ovsdb_session_destroy(&s->up);
     free(s);
 }
@@ -501,19 +518,6 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
 }
 
 static void
-ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
-{
-    struct ovsdb_jsonrpc_session *s, *next;
-
-    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
-        int error = ovsdb_jsonrpc_session_run(s);
-        if (error) {
-            ovsdb_jsonrpc_session_close(s);
-        }
-    }
-}
-
-static void
 ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
 {
     jsonrpc_session_wait(s->js);
@@ -527,16 +531,6 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
 }
 
 static void
-ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
-{
-    struct ovsdb_jsonrpc_session *s;
-
-    LIST_FOR_EACH (s, node, &remote->sessions) {
-        ovsdb_jsonrpc_session_wait(s);
-    }
-}
-
-static void
 ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
                                        struct simap *usage)
 {
@@ -546,65 +540,22 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
 
 static void
 ovsdb_jsonrpc_session_get_memory_usage_all(
-    const struct ovsdb_jsonrpc_remote *remote,
-    struct simap *usage)
+    const struct ovsdb_jsonrpc_server *svr, struct simap *usage)
 {
     struct ovsdb_jsonrpc_session *s;
 
-    LIST_FOR_EACH (s, node, &remote->sessions) {
+    LIST_FOR_EACH (s, node, &svr->all_sessions) {
         ovsdb_jsonrpc_session_get_memory_usage(s, usage);
     }
 }
 
-static void
-ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
-{
-    struct ovsdb_jsonrpc_session *s, *next;
-
-    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
-        ovsdb_jsonrpc_session_close(s);
-    }
-}
-
-/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
- * reconnect. */
-static void
-ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
-{
-    struct ovsdb_jsonrpc_session *s, *next;
-
-    LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
-        jsonrpc_session_force_reconnect(s->js);
-        if (!jsonrpc_session_is_alive(s->js)) {
-            ovsdb_jsonrpc_session_close(s);
-        }
-    }
-}
-
-/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
- * 'options'.
- *
- * (The dscp value can't be changed directly; the caller must instead close and
- * re-open the session.) */
-static void
-ovsdb_jsonrpc_session_set_all_options(
-    struct ovsdb_jsonrpc_remote *remote,
-    const struct ovsdb_jsonrpc_options *options)
-{
-    struct ovsdb_jsonrpc_session *s;
-
-    LIST_FOR_EACH (s, node, &remote->sessions) {
-        ovsdb_jsonrpc_session_set_options(s, options);
-    }
-}
-
 /* Sets the 'status' of for the 'remote' with an outgoing connection.   */
 static bool
 ovsdb_jsonrpc_active_session_get_status(
     const struct ovsdb_jsonrpc_remote *remote,
     struct ovsdb_jsonrpc_remote_status *status)
 {
-    const struct ovs_list *sessions = &remote->sessions;
+    const struct ovs_list *sessions = &remote->server->all_sessions;
     const struct ovsdb_jsonrpc_session *s;
 
     if (list_is_empty(sessions)) {
@@ -764,8 +715,7 @@ ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
     }
 
     /* Get the lock, add us as a waiter. */
-    waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
-                               &victim);
+    waiter = ovsdb_server_lock(s->up.server, &s->up, lock_name, mode, &victim);
     if (victim) {
         ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
     }
@@ -1399,3 +1349,91 @@ ovsdb_jsonrpc_disable_monitor2(void)
     /* Once disabled, it is not possible to re-enable it. */
     monitor2_enable__ = false;
 }
+
+static void
+ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions)
+{
+    struct ovsdb_jsonrpc_session *s, *next;
+
+    LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+    int error = ovsdb_jsonrpc_session_run(s);
+        if (error) {
+            ovsdb_jsonrpc_session_close(s);
+        }
+    }
+}
+
+static void
+ovsdb_jsonrpc_sessions_wait(struct ovs_list *sessions)
+{
+    struct ovsdb_jsonrpc_session *s;
+
+    LIST_FOR_EACH (s, node, sessions) {
+        ovsdb_jsonrpc_session_wait(s);
+    }
+}
+
+static void
+ovsdb_jsonrpc_sessions_close(struct ovs_list *sessions,
+                             const struct ovsdb_jsonrpc_remote *remote)
+{
+    struct ovsdb_jsonrpc_session *s, *next;
+
+    LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+        if (s->remote == remote) {
+            ovsdb_jsonrpc_session_close(s);
+        }
+    }
+}
+
+/* Forces all of the JSON-RPC sessions to disconnect and
+ * reconnect. */
+static void
+ovsdb_jsonrpc_sessions_reconnect(struct ovs_list *sessions,
+                                 const struct ovsdb_jsonrpc_remote *remote)
+{
+    struct ovsdb_jsonrpc_session *s, *next;
+
+    LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+        if (s->remote == remote) {
+            jsonrpc_session_force_reconnect(s->js);
+            if (!jsonrpc_session_is_alive(s->js)) {
+                ovsdb_jsonrpc_session_close(s);
+            }
+        }
+    }
+}
+
+static size_t
+ovsdb_jsonrpc_sessions_count(const struct ovs_list *sessions,
+                             const struct ovsdb_jsonrpc_remote *remote)
+{
+    struct ovsdb_jsonrpc_session *s = NULL;
+    size_t count = 0;
+
+    LIST_FOR_EACH (s, node, sessions) {
+        if (s->remote == remote) {
+            count++;
+        }
+    }
+    return count;
+}
+
+/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
+ * 'options'.
+ *
+ * (The dscp value can't be changed directly; the caller must instead close and
+ * re-open the session.) */
+static void
+ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions,
+                                   const struct ovsdb_jsonrpc_remote *remote,
+                                   const struct ovsdb_jsonrpc_options *options)
+{
+    struct ovsdb_jsonrpc_session *s;
+
+    LIST_FOR_EACH (s, node, sessions) {
+        if (s->remote == remote) {
+            ovsdb_jsonrpc_session_set_options(s, options);
+        }
+    }
+}
-- 
1.9.1




More information about the dev mailing list