[ovs-dev] [PATCH 3/4] ovsdb-server: Factor out complication by using jsonrpc_session.

Justin Pettit jpettit at nicira.com
Wed Dec 23 03:16:28 UTC 2009


Looks good.

--Justin


On Dec 18, 2009, at 1:47 PM, Ben Pfaff wrote:

> ---
> lib/jsonrpc.c          |   46 ++++++++++++++-
> lib/jsonrpc.h          |    2 +
> lib/ovsdb-idl.c        |    4 +-
> ovsdb/jsonrpc-server.c |  153 +++++++++++-------------------------------------
> 4 files changed, 81 insertions(+), 124 deletions(-)
> 
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index bd019f7..0f53515 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -630,6 +630,9 @@ struct jsonrpc_session {
>     unsigned int seqno;
> };
> 
> +/* Creates and returns a jsonrpc_session that connects and reconnects, with
> + * back-off, to 'name', which should be a string acceptable to
> + * stream_open(). */
> struct jsonrpc_session *
> jsonrpc_session_open(const char *name)
> {
> @@ -646,6 +649,25 @@ jsonrpc_session_open(const char *name)
>     return s;
> }
> 
> +/* Creates and returns a jsonrpc_session that is initially connected to
> + * 'jsonrpc'.  If the connection is dropped, it will not be reconnected. */
> +struct jsonrpc_session *
> +jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc)
> +{
> +    struct jsonrpc_session *s;
> +
> +    s = xmalloc(sizeof *s);
> +    s->reconnect = reconnect_create(time_msec());
> +    reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
> +    reconnect_set_max_tries(s->reconnect, 0);
> +    reconnect_connected(s->reconnect, time_msec());
> +    s->rpc = jsonrpc;
> +    s->stream = NULL;
> +    s->seqno = 0;
> +
> +    return s;
> +}
> +
> void
> jsonrpc_session_close(struct jsonrpc_session *s)
> {
> @@ -767,14 +789,28 @@ jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
> struct jsonrpc_msg *
> jsonrpc_session_recv(struct jsonrpc_session *s)
> {
> -    struct jsonrpc_msg *msg = NULL;
>     if (s->rpc) {
> +        struct jsonrpc_msg *msg;
>         jsonrpc_recv(s->rpc, &msg);
>         if (msg) {
>             reconnect_received(s->reconnect, time_msec());
> +            if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
> +                /* Echo request.  Send reply. */
> +                struct jsonrpc_msg *reply;
> +
> +                reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
> +                jsonrpc_session_send(s, reply);
> +            } else if (msg->type == JSONRPC_REPLY
> +                && msg->id && msg->id->type == JSON_STRING
> +                && !strcmp(msg->id->u.string, "echo")) {
> +                /* It's a reply to our echo request.  Suppress it. */
> +            } else {
> +                return msg;
> +            }
> +            jsonrpc_msg_destroy(msg);
>         }
>     }
> -    return msg;
> +    return NULL;
> }
> 
> void
> @@ -786,6 +822,12 @@ jsonrpc_session_recv_wait(struct jsonrpc_session *s)
> }
> 
> bool
> +jsonrpc_session_is_alive(const struct jsonrpc_session *s)
> +{
> +    return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
> +}
> +
> +bool
> jsonrpc_session_is_connected(const struct jsonrpc_session *s)
> {
>     return s->rpc != NULL;
> diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
> index 93ac2e8..ae8b9de 100644
> --- a/lib/jsonrpc.h
> +++ b/lib/jsonrpc.h
> @@ -86,6 +86,7 @@ struct json *jsonrpc_msg_to_json(struct jsonrpc_msg *);
> /* A JSON-RPC session with reconnection. */
> 
> struct jsonrpc_session *jsonrpc_session_open(const char *name);
> +struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *);
> void jsonrpc_session_close(struct jsonrpc_session *);
> 
> void jsonrpc_session_run(struct jsonrpc_session *);
> @@ -98,6 +99,7 @@ int jsonrpc_session_send(struct jsonrpc_session *, struct jsonrpc_msg *);
> struct jsonrpc_msg *jsonrpc_session_recv(struct jsonrpc_session *);
> void jsonrpc_session_recv_wait(struct jsonrpc_session *);
> 
> +bool jsonrpc_session_is_alive(const struct jsonrpc_session *);
> bool jsonrpc_session_is_connected(const struct jsonrpc_session *);
> unsigned int jsonrpc_session_get_seqno(const struct jsonrpc_session *);
> void jsonrpc_session_force_reconnect(struct jsonrpc_session *);
> diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
> index 877fa3e..29d1d0c 100644
> --- a/lib/ovsdb-idl.c
> +++ b/lib/ovsdb-idl.c
> @@ -244,9 +244,7 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
>         }
> 
>         reply = NULL;
> -        if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
> -            reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
> -        } else if (msg->type == JSONRPC_NOTIFY
> +        if (msg->type == JSONRPC_NOTIFY
>                    && !strcmp(msg->method, "update")
>                    && msg->params->type == JSON_ARRAY
>                    && msg->params->u.array.n == 2
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index a8e724a..0bf1137 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -160,15 +160,12 @@ struct ovsdb_jsonrpc_session {
>     /* Monitors. */
>     struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
> 
> -    /* Connecting and reconnecting. */
> -    struct reconnect *reconnect; /* For back-off. */
> -    bool active;                /* Active or passive connection? */
> -    struct jsonrpc *rpc;
> -    struct stream *stream;      /* Only if active == false and rpc == NULL. */
> +    /* Network connectivity. */
> +    struct jsonrpc_session *js;  /* JSON-RPC session. */
> +    unsigned int js_seqno;       /* Last jsonrpc_session_get_seqno() value. */
> };
> 
> static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
> -static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
> static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
> static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
> static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
> @@ -178,7 +175,7 @@ static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
> 
> static struct ovsdb_jsonrpc_session *
> ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
> -                             const char *name, bool active)
> +                             struct jsonrpc_session *js)
> {
>     struct ovsdb_jsonrpc_session *s;
> 
> @@ -188,10 +185,8 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
>     hmap_init(&s->triggers);
>     hmap_init(&s->monitors);
>     list_init(&s->completions);
> -    s->reconnect = reconnect_create(time_msec());
> -    reconnect_set_name(s->reconnect, name);
> -    reconnect_enable(s->reconnect, time_msec());
> -    s->active = active;
> +    s->js = js;
> +    s->js_seqno = jsonrpc_session_get_seqno(js);
> 
>     svr->n_sessions++;
> 
> @@ -202,132 +197,54 @@ static void
> ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *svr,
>                                     const char *name)
> {
> -    ovsdb_jsonrpc_session_create(svr, name, true);
> +    ovsdb_jsonrpc_session_create(svr, jsonrpc_session_open(name));
> }
> 
> static void
> ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *svr,
>                                      struct stream *stream)
> {
> -    struct ovsdb_jsonrpc_session *s;
> -
> -    s = ovsdb_jsonrpc_session_create(svr, stream_get_name(stream), false);
> -    reconnect_connected(s->reconnect, time_msec());
> -    s->rpc = jsonrpc_open(stream);
> +    ovsdb_jsonrpc_session_create(
> +        svr, jsonrpc_session_open_unreliably(jsonrpc_open(stream)));
> }
> 
> static void
> ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
> {
> -    ovsdb_jsonrpc_session_disconnect(s);
> +    jsonrpc_session_close(s->js);
>     list_remove(&s->node);
>     s->server->n_sessions--;
> }
> 
> -static void
> -ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
> +static int
> +ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
> {
> -    reconnect_disconnected(s->reconnect, time_msec(), 0);
> -    if (s->rpc) {
> -        jsonrpc_error(s->rpc, EOF);
> +    jsonrpc_session_run(s->js);
> +    if (s->js_seqno != jsonrpc_session_get_seqno(s->js)) {
> +        s->js_seqno = jsonrpc_session_get_seqno(s->js);
>         ovsdb_jsonrpc_trigger_complete_all(s);
>         ovsdb_jsonrpc_monitor_remove_all(s);
> -        jsonrpc_close(s->rpc);
> -        s->rpc = NULL;
> -    } else if (s->stream) {
> -        stream_close(s->stream);
> -        s->stream = NULL;
>     }
> -}
> 
> -static void
> -ovsdb_jsonrpc_session_connect(struct ovsdb_jsonrpc_session *s)
> -{
> -    ovsdb_jsonrpc_session_disconnect(s);
> -    if (s->active) {
> -        int error = stream_open(reconnect_get_name(s->reconnect), &s->stream);
> -        if (error) {
> -            reconnect_connect_failed(s->reconnect, time_msec(), error);
> -        } else {
> -            reconnect_connecting(s->reconnect, time_msec());
> -        }
> -    }
> -}
> -
> -static int
> -ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
> -{
> -    if (s->rpc) {
> -        struct jsonrpc_msg *msg;
> -        int error;
> +    ovsdb_jsonrpc_trigger_complete_done(s);
> 
> -        jsonrpc_run(s->rpc);
> -
> -        ovsdb_jsonrpc_trigger_complete_done(s);
> -
> -        if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
> -            reconnect_received(s->reconnect, time_msec());
> +    if (!jsonrpc_session_get_backlog(s->js)) {
> +        struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
> +        if (msg) {
>             if (msg->type == JSONRPC_REQUEST) {
>                 ovsdb_jsonrpc_session_got_request(s, msg);
>             } else if (msg->type == JSONRPC_NOTIFY) {
>                 ovsdb_jsonrpc_session_got_notify(s, msg);
> -            } else if (msg->type == JSONRPC_REPLY
> -                       && msg->id && msg->id->type == JSON_STRING
> -                       && !strcmp(msg->id->u.string, "echo")) {
> -                /* It's a reply to our echo request.  Ignore it. */
>             } else {
>                 VLOG_WARN("%s: received unexpected %s message",
> -                          jsonrpc_get_name(s->rpc),
> +                          jsonrpc_session_get_name(s->js),
>                           jsonrpc_msg_type_to_string(msg->type));
> -                jsonrpc_error(s->rpc, EPROTO);
> +                jsonrpc_session_force_reconnect(s->js);
>                 jsonrpc_msg_destroy(msg);
>             }
>         }
> -
> -        error = jsonrpc_get_status(s->rpc);
> -        if (error) {
> -            if (s->active) {
> -                ovsdb_jsonrpc_session_disconnect(s);
> -            } else {
> -                return error;
> -            }
> -        }
> -    } else if (s->stream) {
> -        int error = stream_connect(s->stream);
> -        if (!error) {
> -            reconnect_connected(s->reconnect, time_msec());
> -            s->rpc = jsonrpc_open(s->stream);
> -            s->stream = NULL;
> -        } else if (error != EAGAIN) {
> -            reconnect_connect_failed(s->reconnect, time_msec(), error);
> -            stream_close(s->stream);
> -            s->stream = NULL;
> -        }
>     }
> -
> -    switch (reconnect_run(s->reconnect, time_msec())) {
> -    case RECONNECT_CONNECT:
> -        ovsdb_jsonrpc_session_connect(s);
> -        break;
> -
> -    case RECONNECT_DISCONNECT:
> -        ovsdb_jsonrpc_session_disconnect(s);
> -        break;
> -
> -    case RECONNECT_PROBE:
> -        if (s->rpc) {
> -            struct json *params;
> -            struct jsonrpc_msg *request;
> -
> -            params = json_array_create_empty();
> -            request = jsonrpc_create_request("echo", params, NULL);
> -            json_destroy(request->id);
> -            request->id = json_string_create("echo");
> -            jsonrpc_send(s->rpc, request);
> -        }
> -        break;
> -    }
> -    return s->active || s->rpc ? 0 : ETIMEDOUT;
> +    return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
> }
> 
> static void
> @@ -347,15 +264,10 @@ ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
> static void
> ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
> {
> -    if (s->rpc) {
> -        jsonrpc_wait(s->rpc);
> -        if (!jsonrpc_get_backlog(s->rpc)) {
> -            jsonrpc_recv_wait(s->rpc);
> -        }
> -    } else if (s->stream) {
> -        stream_connect_wait(s->stream);
> +    jsonrpc_session_wait(s->js);
> +    if (!jsonrpc_session_get_backlog(s->js)) {
> +        jsonrpc_session_recv_wait(s->js);
>     }
> -    reconnect_wait(s->reconnect, time_msec());
> }
> 
> static void
> @@ -404,7 +316,7 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
> 
>     if (reply) {
>         jsonrpc_msg_destroy(request);
> -        jsonrpc_send(s->rpc, reply);
> +        jsonrpc_session_send(s->js, reply);
>     }
> }
> 
> @@ -456,8 +368,11 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
>     hash = json_hash(id, 0);
>     t = ovsdb_jsonrpc_trigger_find(s, id, hash);
>     if (t) {
> -        jsonrpc_send(s->rpc, jsonrpc_create_error(
> -                         json_string_create("duplicate request ID"), id));
> +        struct jsonrpc_msg *msg;
> +
> +        msg = jsonrpc_create_error(json_string_create("duplicate request ID"),
> +                                   id);
> +        jsonrpc_session_send(s->js, msg);
>         json_destroy(id);
>         json_destroy(params);
>         return;
> @@ -499,7 +414,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
> {
>     struct ovsdb_jsonrpc_session *s = t->session;
> 
> -    if (s->rpc && !jsonrpc_get_status(s->rpc)) {
> +    if (jsonrpc_session_is_connected(s->js)) {
>         struct jsonrpc_msg *reply;
>         struct json *result;
> 
> @@ -510,7 +425,7 @@ ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
>             reply = jsonrpc_create_error(json_string_create("canceled"),
>                                          t->id);
>         }
> -        jsonrpc_send(s->rpc, reply);
> +        jsonrpc_session_send(s->js, reply);
>     }
> 
>     json_destroy(t->id);
> @@ -891,7 +806,7 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
>         params = json_array_create_2(json_clone(aux.monitor->monitor_id),
>                                      aux.json);
>         msg = jsonrpc_create_notify("update", params);
> -        jsonrpc_send(aux.monitor->session->rpc, msg);
> +        jsonrpc_session_send(aux.monitor->session->js, msg);
>     }
> 
>     return NULL;
> -- 
> 1.6.3.3
> 
> 
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev_openvswitch.org





More information about the dev mailing list