[ovs-dev] [ovsdb-server multithreading RFC 1/9] ovsdb: Do not group sessions by remote.
Andy Zhou
azhou at ovn.org
Thu Mar 3 08:13:20 UTC 2016
Currently ovsdb_jsonrpc_session are grouped together in a linked
list within 'ovsdb_jsonrpc_remote'. This makes sense since most
session operations applies to sessions within a remote.
However, in order to scale up ovsdb-server with multi-threading, it is
more convenient to distribute a sessions to any thread available,
regardless which remote it is associated with.
This patch introduces a set of APIs that provide operations on
a list of sessions. Instead of group sessions by remote, they
are linked together in a new ovs_list field 'all_sessions' in the
ovsdb_jsonrpc_server struct.
With multi-threading, the design is that all sessions managed
by a thread will have them linked together on a thread private
linked list. At that time, the 'all_sessions' field in
ovsdb_jsonrpc_server struct will have all session managed
the main process.
Signed-off-by: Andy Zhou <azhou at ovn.org>
---
ovsdb/jsonrpc-server.c | 242 ++++++++++++++++++++++++++++---------------------
1 file changed, 140 insertions(+), 102 deletions(-)
diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
index 15dbc4e..56dddc6 100644
--- a/ovsdb/jsonrpc-server.c
+++ b/ovsdb/jsonrpc-server.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,23 +52,30 @@ static bool monitor2_enable__ = true;
/* Message rate-limiting. */
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+/* Session set. */
+static void ovsdb_jsonrpc_sessions_run(struct ovs_list *);
+static void ovsdb_jsonrpc_sessions_wait(struct ovs_list *);
+static void ovsdb_jsonrpc_sessions_close(struct ovs_list *,
+ const struct ovsdb_jsonrpc_remote *remote);
+static void ovsdb_jsonrpc_sessions_reconnect( struct ovs_list *,
+ const struct ovsdb_jsonrpc_remote *remote);
+static void ovsdb_jsonrpc_sessions_set_options(struct ovs_list *,
+ const struct ovsdb_jsonrpc_remote *remote,
+ const struct ovsdb_jsonrpc_options *);
+static size_t ovsdb_jsonrpc_sessions_count(const struct ovs_list *,
+ const struct ovsdb_jsonrpc_remote *remote);
+
/* Sessions. */
static struct ovsdb_jsonrpc_session *ovsdb_jsonrpc_session_create(
struct ovsdb_jsonrpc_remote *, struct jsonrpc_session *);
-static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_get_memory_usage_all(
- const struct ovsdb_jsonrpc_remote *, struct simap *usage);
-static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
-static void ovsdb_jsonrpc_session_set_all_options(
- struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
static bool ovsdb_jsonrpc_active_session_get_status(
- const struct ovsdb_jsonrpc_remote *,
+ const struct ovsdb_jsonrpc_remote *remote,
struct ovsdb_jsonrpc_remote_status *);
static void ovsdb_jsonrpc_session_get_status(
const struct ovsdb_jsonrpc_session *,
struct ovsdb_jsonrpc_remote_status *);
+static void ovsdb_jsonrpc_session_get_memory_usage_all(
+ const struct ovsdb_jsonrpc_server *, struct simap *usage);
static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
@@ -106,15 +113,22 @@ struct ovsdb_jsonrpc_server {
struct ovsdb_server up;
unsigned int n_sessions;
struct shash remotes; /* Contains "struct ovsdb_jsonrpc_remote *"s. */
+ struct ovs_list all_sessions; /* All 'ovsdb_jsonrpc_session's. */
};
+/* Cast an 'ovsdb_server' pointer down into an ovsdb_jsonrpc_server pinter.
+ * Caller needs to make sure this conversion is valid. */
+static struct ovsdb_jsonrpc_server *
+ovsdb_jsonrpc_server_cast(struct ovsdb_server *s) {
+ return CONTAINER_OF(s, struct ovsdb_jsonrpc_server, up);
+}
+
/* A configured remote. This is either a passive stream listener plus a list
* of the currently connected sessions, or a list of exactly one active
* session. */
struct ovsdb_jsonrpc_remote {
struct ovsdb_jsonrpc_server *server;
struct pstream *listener; /* Listener, if passive. */
- struct ovs_list sessions; /* List of "struct ovsdb_jsonrpc_session"s. */
uint8_t dscp;
};
@@ -134,6 +148,7 @@ ovsdb_jsonrpc_server_create(void)
struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
ovsdb_server_init(&server->up);
shash_init(&server->remotes);
+ list_init(&server->all_sessions);
return server;
}
@@ -232,7 +247,8 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
}
}
- ovsdb_jsonrpc_session_set_all_options(remote, options);
+ ovsdb_jsonrpc_sessions_set_options(&svr->all_sessions, remote,
+ options);
}
}
@@ -254,7 +270,6 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
remote = xmalloc(sizeof *remote);
remote->server = svr;
remote->listener = listener;
- list_init(&remote->sessions);
remote->dscp = options->dscp;
shash_add(&svr->remotes, name, remote);
@@ -268,10 +283,11 @@ static void
ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
{
struct ovsdb_jsonrpc_remote *remote = node->data;
+ struct ovsdb_jsonrpc_server *server = remote->server;
- ovsdb_jsonrpc_session_close_all(remote);
+ ovsdb_jsonrpc_sessions_close(&server->all_sessions, remote);
pstream_close(remote->listener);
- shash_delete(&remote->server->remotes, node);
+ shash_delete(&server->remotes, node);
free(remote);
}
@@ -297,9 +313,12 @@ ovsdb_jsonrpc_server_get_remote_status(
}
if (remote->listener) {
+ int n_connections = ovsdb_jsonrpc_sessions_count(&svr->all_sessions,
+ remote);
+
status->bound_port = pstream_get_bound_port(remote->listener);
- status->is_connected = !list_is_empty(&remote->sessions);
- status->n_connections = list_size(&remote->sessions);
+ status->n_connections = n_connections;
+ status->is_connected = (n_connections != 0);
return true;
}
@@ -325,7 +344,7 @@ ovsdb_jsonrpc_server_reconnect(struct ovsdb_jsonrpc_server *svr)
SHASH_FOR_EACH (node, &svr->remotes) {
struct ovsdb_jsonrpc_remote *remote = node->data;
- ovsdb_jsonrpc_session_reconnect_all(remote);
+ ovsdb_jsonrpc_sessions_reconnect(&svr->all_sessions, remote);
}
}
@@ -354,7 +373,7 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
}
}
- ovsdb_jsonrpc_session_run_all(remote);
+ ovsdb_jsonrpc_sessions_run(&svr->all_sessions);
}
}
@@ -370,7 +389,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
pstream_wait(remote->listener);
}
- ovsdb_jsonrpc_session_wait_all(remote);
+ ovsdb_jsonrpc_sessions_wait(&svr->all_sessions);
}
}
@@ -380,14 +399,8 @@ void
ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
struct simap *usage)
{
- struct shash_node *node;
-
simap_increase(usage, "sessions", svr->n_sessions);
- SHASH_FOR_EACH (node, &svr->remotes) {
- struct ovsdb_jsonrpc_remote *remote = node->data;
-
- ovsdb_jsonrpc_session_get_memory_usage_all(remote, usage);
- }
+ ovsdb_jsonrpc_session_get_memory_usage_all(svr, usage);
}
/* JSON-RPC database server session. */
@@ -423,17 +436,18 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
struct jsonrpc_session *js)
{
struct ovsdb_jsonrpc_session *s;
+ struct ovsdb_jsonrpc_server *server = remote->server;
s = xzalloc(sizeof *s);
- ovsdb_session_init(&s->up, &remote->server->up);
+ ovsdb_session_init(&s->up, &server->up);
s->remote = remote;
- list_push_back(&remote->sessions, &s->node);
+ list_push_back(&server->all_sessions, &s->node);
hmap_init(&s->triggers);
hmap_init(&s->monitors);
s->js = js;
s->js_seqno = jsonrpc_session_get_seqno(js);
- remote->server->n_sessions++;
+ server->n_sessions++;
return s;
}
@@ -441,6 +455,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
static void
ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
{
+ struct ovsdb_jsonrpc_server *server;
+
ovsdb_jsonrpc_monitor_remove_all(s);
ovsdb_jsonrpc_session_unlock_all(s);
ovsdb_jsonrpc_trigger_complete_all(s);
@@ -450,7 +466,8 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
jsonrpc_session_close(s->js);
list_remove(&s->node);
- s->remote->server->n_sessions--;
+ server = ovsdb_jsonrpc_server_cast(s->up.server);
+ server->n_sessions--;
ovsdb_session_destroy(&s->up);
free(s);
}
@@ -501,19 +518,6 @@ ovsdb_jsonrpc_session_set_options(struct ovsdb_jsonrpc_session *session,
}
static void
-ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_remote *remote)
-{
- struct ovsdb_jsonrpc_session *s, *next;
-
- LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
- int error = ovsdb_jsonrpc_session_run(s);
- if (error) {
- ovsdb_jsonrpc_session_close(s);
- }
- }
-}
-
-static void
ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
{
jsonrpc_session_wait(s->js);
@@ -527,16 +531,6 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
}
static void
-ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_remote *remote)
-{
- struct ovsdb_jsonrpc_session *s;
-
- LIST_FOR_EACH (s, node, &remote->sessions) {
- ovsdb_jsonrpc_session_wait(s);
- }
-}
-
-static void
ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
struct simap *usage)
{
@@ -546,65 +540,22 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
static void
ovsdb_jsonrpc_session_get_memory_usage_all(
- const struct ovsdb_jsonrpc_remote *remote,
- struct simap *usage)
+ const struct ovsdb_jsonrpc_server *svr, struct simap *usage)
{
struct ovsdb_jsonrpc_session *s;
- LIST_FOR_EACH (s, node, &remote->sessions) {
+ LIST_FOR_EACH (s, node, &svr->all_sessions) {
ovsdb_jsonrpc_session_get_memory_usage(s, usage);
}
}
-static void
-ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *remote)
-{
- struct ovsdb_jsonrpc_session *s, *next;
-
- LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
- ovsdb_jsonrpc_session_close(s);
- }
-}
-
-/* Forces all of the JSON-RPC sessions managed by 'remote' to disconnect and
- * reconnect. */
-static void
-ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
-{
- struct ovsdb_jsonrpc_session *s, *next;
-
- LIST_FOR_EACH_SAFE (s, next, node, &remote->sessions) {
- jsonrpc_session_force_reconnect(s->js);
- if (!jsonrpc_session_is_alive(s->js)) {
- ovsdb_jsonrpc_session_close(s);
- }
- }
-}
-
-/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
- * 'options'.
- *
- * (The dscp value can't be changed directly; the caller must instead close and
- * re-open the session.) */
-static void
-ovsdb_jsonrpc_session_set_all_options(
- struct ovsdb_jsonrpc_remote *remote,
- const struct ovsdb_jsonrpc_options *options)
-{
- struct ovsdb_jsonrpc_session *s;
-
- LIST_FOR_EACH (s, node, &remote->sessions) {
- ovsdb_jsonrpc_session_set_options(s, options);
- }
-}
-
/* Sets the 'status' of for the 'remote' with an outgoing connection. */
static bool
ovsdb_jsonrpc_active_session_get_status(
const struct ovsdb_jsonrpc_remote *remote,
struct ovsdb_jsonrpc_remote_status *status)
{
- const struct ovs_list *sessions = &remote->sessions;
+ const struct ovs_list *sessions = &remote->server->all_sessions;
const struct ovsdb_jsonrpc_session *s;
if (list_is_empty(sessions)) {
@@ -764,8 +715,7 @@ ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
}
/* Get the lock, add us as a waiter. */
- waiter = ovsdb_server_lock(&s->remote->server->up, &s->up, lock_name, mode,
- &victim);
+ waiter = ovsdb_server_lock(s->up.server, &s->up, lock_name, mode, &victim);
if (victim) {
ovsdb_jsonrpc_session_notify(victim, lock_name, "stolen");
}
@@ -1399,3 +1349,91 @@ ovsdb_jsonrpc_disable_monitor2(void)
/* Once disabled, it is not possible to re-enable it. */
monitor2_enable__ = false;
}
+
+static void
+ovsdb_jsonrpc_sessions_run(struct ovs_list *sessions)
+{
+ struct ovsdb_jsonrpc_session *s, *next;
+
+ LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+ int error = ovsdb_jsonrpc_session_run(s);
+ if (error) {
+ ovsdb_jsonrpc_session_close(s);
+ }
+ }
+}
+
+static void
+ovsdb_jsonrpc_sessions_wait(struct ovs_list *sessions)
+{
+ struct ovsdb_jsonrpc_session *s;
+
+ LIST_FOR_EACH (s, node, sessions) {
+ ovsdb_jsonrpc_session_wait(s);
+ }
+}
+
+static void
+ovsdb_jsonrpc_sessions_close(struct ovs_list *sessions,
+ const struct ovsdb_jsonrpc_remote *remote)
+{
+ struct ovsdb_jsonrpc_session *s, *next;
+
+ LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+ if (s->remote == remote) {
+ ovsdb_jsonrpc_session_close(s);
+ }
+ }
+}
+
+/* Forces all of the JSON-RPC sessions to disconnect and
+ * reconnect. */
+static void
+ovsdb_jsonrpc_sessions_reconnect(struct ovs_list *sessions,
+ const struct ovsdb_jsonrpc_remote *remote)
+{
+ struct ovsdb_jsonrpc_session *s, *next;
+
+ LIST_FOR_EACH_SAFE (s, next, node, sessions) {
+ if (s->remote == remote) {
+ jsonrpc_session_force_reconnect(s->js);
+ if (!jsonrpc_session_is_alive(s->js)) {
+ ovsdb_jsonrpc_session_close(s);
+ }
+ }
+ }
+}
+
+static size_t
+ovsdb_jsonrpc_sessions_count(const struct ovs_list *sessions,
+ const struct ovsdb_jsonrpc_remote *remote)
+{
+ struct ovsdb_jsonrpc_session *s = NULL;
+ size_t count = 0;
+
+ LIST_FOR_EACH (s, node, sessions) {
+ if (s->remote == remote) {
+ count++;
+ }
+ }
+ return count;
+}
+
+/* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
+ * 'options'.
+ *
+ * (The dscp value can't be changed directly; the caller must instead close and
+ * re-open the session.) */
+static void
+ovsdb_jsonrpc_sessions_set_options(struct ovs_list *sessions,
+ const struct ovsdb_jsonrpc_remote *remote,
+ const struct ovsdb_jsonrpc_options *options)
+{
+ struct ovsdb_jsonrpc_session *s;
+
+ LIST_FOR_EACH (s, node, sessions) {
+ if (s->remote == remote) {
+ ovsdb_jsonrpc_session_set_options(s, options);
+ }
+ }
+}
--
1.9.1
More information about the dev
mailing list