[ovs-dev] [PATCH 3/4] ovsdb-server: Factor out complication by using jsonrpc_session.
Justin Pettit
jpettit at nicira.com
Wed Dec 23 03:16:28 UTC 2009
Looks good.
--Justin
On Dec 18, 2009, at 1:47 PM, Ben Pfaff wrote:
> ---
> lib/jsonrpc.c | 46 ++++++++++++++-
> lib/jsonrpc.h | 2 +
> lib/ovsdb-idl.c | 4 +-
> ovsdb/jsonrpc-server.c | 153 +++++++++++-------------------------------------
> 4 files changed, 81 insertions(+), 124 deletions(-)
>
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index bd019f7..0f53515 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -630,6 +630,9 @@ struct jsonrpc_session {
> unsigned int seqno;
> };
>
> +/* Creates and returns a jsonrpc_session that connects and reconnects, with
> + * back-off, to 'name', which should be a string acceptable to
> + * stream_open(). */
> struct jsonrpc_session *
> jsonrpc_session_open(const char *name)
> {
> @@ -646,6 +649,25 @@ jsonrpc_session_open(const char *name)
> return s;
> }
>
> +/* Creates and returns a jsonrpc_session that is initially connected to
> + * 'jsonrpc'. If the connection is dropped, it will not be reconnected. */
> +struct jsonrpc_session *
> +jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
> +{
> + struct jsonrpc_session *s;
> +
> + s = xmalloc(sizeof *s);
> + s->reconnect = reconnect_create(time_msec());
> + reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
> + reconnect_set_max_tries(s->reconnect, 0);
> + reconnect_connected(s->reconnect, time_msec());
> + s->rpc = jsonrpc;
> + s->stream = NULL;
> + s->seqno = 0;
> +
> + return s;
> +}
> +
> void
> jsonrpc_session_close(struct jsonrpc_session *s)
> {
> @@ -767,14 +789,28 @@ jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
> struct jsonrpc_msg *
> jsonrpc_session_recv(struct jsonrpc_session *s)
> {
> - struct jsonrpc_msg *msg = NULL;
> if (s->rpc) {
> + struct jsonrpc_msg *msg;
> jsonrpc_recv(s->rpc, &msg);
> if (msg) {
> reconnect_received(s->reconnect, time_msec());
> + if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
> + /* Echo request. Send reply. */
> + struct jsonrpc_msg *reply;
> +
> + reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
> + jsonrpc_session_send(s, reply);
> + } else if (msg->type == JSONRPC_REPLY
> + && msg->id && msg->id->type == JSON_STRING
> + && !strcmp(msg->id->u.string, "echo")) {
> + /* It's a reply to our echo request. Suppress it. */
> + } else {
> + return msg;
> + }
> + jsonrpc_msg_destroy(msg);
> }
> }
> - return msg;
> + return NULL;
> }
>
> void
> @@ -786,6 +822,12 @@ jsonrpc_session_recv_wait(struct jsonrpc_session *s)
> }
>
> bool
> +jsonrpc_session_is_alive(const struct jsonrpc_session *s)
> +{
> + return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
> +}
> +
> +bool
> jsonrpc_session_is_connected(const struct jsonrpc_session *s)
> {
> return s->rpc != NULL;
> diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
> index 93ac2e8..ae8b9de 100644
> --- a/lib/jsonrpc.h
> +++ b/lib/jsonrpc.h
> @@ -86,6 +86,7 @@ struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
> /* A JSON-RPC session with reconnection. */
>
> struct jsonrpc_session *jsonrpc_session_open(const char *name);
> +struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *);
> void jsonrpc_session_close(struct jsonrpc_session *);
>
> void jsonrpc_session_run(struct jsonrpc_session *);
> @@ -98,6 +99,7 @@ int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
> struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *);
> void jsonrpc_session_recv_wait(struct jsonrpc_session *);
>
> +bool jsonrpc_session_is_alive(const struct jsonrpc_session *);
> bool jsonrpc_session_is_connected(const struct jsonrpc_session *);
> unsigned int jsonrpc_session_get_seqno(const struct jsonrpc_session *);
> void jsonrpc_session_force_reconnect(struct jsonrpc_session *);
> diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
> index 877fa3e..29d1d0c 100644
> --- a/lib/ovsdb-idl.c
> +++ b/lib/ovsdb-idl.c
> @@ -244,9 +244,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
> }
>
> reply = NULL;
> - if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
> - reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
> - } else if (msg->type == JSONRPC_NOTIFY
> + if (msg->type == JSONRPC_NOTIFY
> && !strcmp(msg->method, "update")
> && msg->params->type == JSON_ARRAY
> && msg->params->u.array.n == 2
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index a8e724a..0bf1137 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -160,15 +160,12 @@ struct ovsdb_jsonrpc_session {
> /* Monitors. */
> struct hmap monitors; /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
>
> - /* Connecting and reconnecting. */
> - struct reconnect *reconnect; /* For back-off. */
> - bool active; /* Active or passive connection? */
> - struct jsonrpc *rpc;
> - struct stream *stream; /* Only if active == false and rpc == NULL. */
> + /* Network connectivity. */
> + struct jsonrpc_session *js; /* JSON-RPC session. */
> + unsigned int js_seqno; /* Last jsonrpc_session_get_seqno() value. */
> };
>
> static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
> -static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
> static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
> static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
> static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
> @@ -178,7 +175,7 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
>
> static struct ovsdb_jsonrpc_session *
> ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
> - const char *name, bool active)
> + struct jsonrpc_session *js)
> {
> struct ovsdb_jsonrpc_session *s;
>
> @@ -188,10 +185,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
> hmap_init(&s->triggers);
> hmap_init(&s->monitors);
> list_init(&s->completions);
> - s->reconnect = reconnect_create(time_msec());
> - reconnect_set_name(s->reconnect, name);
> - reconnect_enable(s->reconnect, time_msec());
> - s->active = active;
> + s->js = js;
> + s->js_seqno = jsonrpc_session_get_seqno(js);
>
> svr->n_sessions++;
>
> @@ -202,132 +197,54 @@ static void
> ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
> const char *name)
> {
> - ovsdb_jsonrpc_session_create(svr, name, true);
> + ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name));
> }
>
> static void
> ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
> struct stream *stream)
> {
> - struct ovsdb_jsonrpc_session *s;
> -
> - s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
> - reconnect_connected(s->reconnect, time_msec());
> - s->rpc = jsonrpc_open(stream);
> + ovsdb_jsonrpc_session_create(
> + svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream)));
> }
>
> static void
> ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
> {
> - ovsdb_jsonrpc_session_disconnect(s);
> + jsonrpc_session_close(s->js);
> list_remove(&s->node);
> s->server->n_sessions--;
> }
>
> -static void
> -ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
> +static int
> +ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
> {
> - reconnect_disconnected(s->reconnect, time_msec(), 0);
> - if (s->rpc) {
> - jsonrpc_error(s->rpc, EOF);
> + jsonrpc_session_run(s->js);
> + if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
> + s->js_seqno = jsonrpc_session_get_seqno(s->js);
> ovsdb_jsonrpc_trigger_complete_all(s);
> ovsdb_jsonrpc_monitor_remove_all(s);
> - jsonrpc_close(s->rpc);
> - s->rpc = NULL;
> - } else if (s->stream) {
> - stream_close(s->stream);
> - s->stream = NULL;
> }
> -}
>
> -static void
> -ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
> -{
> - ovsdb_jsonrpc_session_disconnect(s);
> - if (s->active) {
> - int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
> - if (error) {
> - reconnect_connect_failed(s->reconnect, time_msec(), error);
> - } else {
> - reconnect_connecting(s->reconnect, time_msec());
> - }
> - }
> -}
> -
> -static int
> -ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
> -{
> - if (s->rpc) {
> - struct jsonrpc_msg *msg;
> - int error;
> + ovsdb_jsonrpc_trigger_complete_done(s);
>
> - jsonrpc_run(s->rpc);
> -
> - ovsdb_jsonrpc_trigger_complete_done(s);
> -
> - if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
> - reconnect_received(s->reconnect, time_msec());
> + if (!jsonrpc_session_get_backlog(s->js)) {
> + struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
> + if (msg) {
> if (msg->type == JSONRPC_REQUEST) {
> ovsdb_jsonrpc_session_got_request(s, msg);
> } else if (msg->type == JSONRPC_NOTIFY) {
> ovsdb_jsonrpc_session_got_notify(s, msg);
> - } else if (msg->type == JSONRPC_REPLY
> - && msg->id && msg->id->type == JSON_STRING
> - && !strcmp(msg->id->u.string, "echo")) {
> - /* It's a reply to our echo request. Ignore it. */
> } else {
> VLOG_WARN("%s: received unexpected %s message",
> - jsonrpc_get_name(s->rpc),
> + jsonrpc_session_get_name(s->js),
> jsonrpc_msg_type_to_string(msg->type));
> - jsonrpc_error(s->rpc, EPROTO);
> + jsonrpc_session_force_reconnect(s->js);
> jsonrpc_msg_destroy(msg);
> }
> }
> -
> - error = jsonrpc_get_status(s->rpc);
> - if (error) {
> - if (s->active) {
> - ovsdb_jsonrpc_session_disconnect(s);
> - } else {
> - return error;
> - }
> - }
> - } else if (s->stream) {
> - int error = stream_connect(s->stream);
> - if (!error) {
> - reconnect_connected(s->reconnect, time_msec());
> - s->rpc = jsonrpc_open(s->stream);
> - s->stream = NULL;
> - } else if (error != EAGAIN) {
> - reconnect_connect_failed(s->reconnect, time_msec(), error);
> - stream_close(s->stream);
> - s->stream = NULL;
> - }
> }
> -
> - switch (reconnect_run(s->reconnect, time_msec())) {
> - case RECONNECT_CONNECT:
> - ovsdb_jsonrpc_session_connect(s);
> - break;
> -
> - case RECONNECT_DISCONNECT:
> - ovsdb_jsonrpc_session_disconnect(s);
> - break;
> -
> - case RECONNECT_PROBE:
> - if (s->rpc) {
> - struct json *params;
> - struct jsonrpc_msg *request;
> -
> - params = json_array_create_empty();
> - request = jsonrpc_create_request("echo", params, NULL);
> - json_destroy(request->id);
> - request->id = json_string_create("echo");
> - jsonrpc_send(s->rpc, request);
> - }
> - break;
> - }
> - return s->active || s->rpc ? 0 : ETIMEDOUT;
> + return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
> }
>
> static void
> @@ -347,15 +264,10 @@ ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
> static void
> ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
> {
> - if (s->rpc) {
> - jsonrpc_wait(s->rpc);
> - if (!jsonrpc_get_backlog(s->rpc)) {
> - jsonrpc_recv_wait(s->rpc);
> - }
> - } else if (s->stream) {
> - stream_connect_wait(s->stream);
> + jsonrpc_session_wait(s->js);
> + if (!jsonrpc_session_get_backlog(s->js)) {
> + jsonrpc_session_recv_wait(s->js);
> }
> - reconnect_wait(s->reconnect, time_msec());
> }
>
> static void
> @@ -404,7 +316,7 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
>
> if (reply) {
> jsonrpc_msg_destroy(request);
> - jsonrpc_send(s->rpc, reply);
> + jsonrpc_session_send(s->js, reply);
> }
> }
>
> @@ -456,8 +368,11 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
> hash = json_hash(id, 0);
> t = ovsdb_jsonrpc_trigger_find(s, id, hash);
> if (t) {
> - jsonrpc_send(s->rpc, jsonrpc_create_error(
> - json_string_create("duplicate request ID"), id));
> + struct jsonrpc_msg *msg;
> +
> + msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
> + id);
> + jsonrpc_session_send(s->js, msg);
> json_destroy(id);
> json_destroy(params);
> return;
> @@ -499,7 +414,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
> {
> struct ovsdb_jsonrpc_session *s = t->session;
>
> - if (s->rpc && !jsonrpc_get_status(s->rpc)) {
> + if (jsonrpc_session_is_connected(s->js)) {
> struct jsonrpc_msg *reply;
> struct json *result;
>
> @@ -510,7 +425,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
> reply = jsonrpc_create_error(json_string_create("canceled"),
> t->id);
> }
> - jsonrpc_send(s->rpc, reply);
> + jsonrpc_session_send(s->js, reply);
> }
>
> json_destroy(t->id);
> @@ -891,7 +806,7 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
> params = json_array_create_2(json_clone(aux.monitor->monitor_id),
> aux.json);
> msg = jsonrpc_create_notify("update", params);
> - jsonrpc_send(aux.monitor->session->rpc, msg);
> + jsonrpc_session_send(aux.monitor->session->js, msg);
> }
>
> return NULL;
> --
> 1.6.3.3
>
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev_openvswitch.org
More information about the dev
mailing list