[ovs-dev] [PATCH v4 3/3] ovsdb-idl: Break into two layers.

Dumitru Ceara dceara at redhat.com
Fri Jan 8 13:59:24 UTC 2021


On 12/19/20 3:44 AM, Ben Pfaff wrote:
> This change breaks the IDL into two layers: the IDL proper, whose
> interface to its client is unchanged, and a low-level library called
> the OVSDB "client synchronization" (CS) library.  There are two
> reasons for this change.  First, the IDL is big and complicated and
> I think that this change factors out some of that complication into
> a simpler lower layer.  Second, the OVN northd implementation based
> on DDlog can benefit from the client synchronization library even
> though it would actually be made increasingly complicated by the IDL.
> 
> Signed-off-by: Ben Pfaff <blp at ovn.org>
> ---

Hi Ben,

Overall this looks OK to me.

OVS & OVN unit tests pass with this series applied.

There are however a couple memory leaks and I have another small comment
inline.

>  lib/ovsdb-cs.c           | 1955 ++++++++++++++++++++++++++++++++++
>  lib/ovsdb-cs.h           |  141 ++-
>  lib/ovsdb-idl-provider.h |    8 +-
>  lib/ovsdb-idl.c          | 2161 +++++++++-----------------------------
>  4 files changed, 2563 insertions(+), 1702 deletions(-)
> 
> diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c
> index f37aa5b04414..285b164108b2 100644
> --- a/lib/ovsdb-cs.c
> +++ b/lib/ovsdb-cs.c
> @@ -39,6 +39,1947 @@
>  #include "uuid.h"
>  
>  VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
> +
> +/* Connection state machine.
> + *
> + * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
> + * request for the Database table in the _Server database and transitions to
> + * the CS_S_SERVER_MONITOR_REQUESTED state.  If the session drops and
> + * reconnects, or if the CS receives a "monitor_canceled" notification for a
> + * table it is monitoring, the CS starts over again in the same way. */
> +#define OVSDB_CS_STATES                                                 \
> +    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
> +     * request for the Database table in the _Server database, whose    \
> +     * details are informed by the schema, and transitions to           \
> +     * CS_S_SERVER_MONITOR_REQUESTED. */                                \
> +    OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED)                             \
> +                                                                        \
> +    /* Waits for "monitor_cond" reply for the Database table:           \
> +     *                                                                  \
> +     * - If the reply indicates success, and the Database table has a   \
> +     *   row for the CS database:                                       \
> +     *                                                                  \
> +     *   * If the row indicates that this is a clustered database       \
> +     *     that is not connected to the cluster, closes the             \
> +     *     connection.  The next connection attempt has a chance at     \
> +     *     picking a connected server.                                  \
> +     *                                                                  \
> +     *   * Otherwise, sends a monitoring request for the CS             \
> +     *     database whose details are informed by the schema            \
> +     *     (obtained from the row), and transitions to                  \
> +     *     CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED.                  \
> +     *                                                                  \
> +     * - If the reply indicates success, but the Database table does    \
> +     *   not have a row for the CS database, transitions to             \
> +     *   CS_S_ERROR.                                                    \
> +     *                                                                  \
> +     * - If the reply indicates failure, sends a "get_schema" request   \
> +     *   for the CS database and transitions to                         \
> +     *   CS_S_DATA_SCHEMA_REQUESTED. */                                 \
> +    OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED)                            \
> +                                                                        \
> +    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
> +     * request whose details are informed by the schema, and            \
> +     * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */              \
> +    OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED)                               \
> +                                                                        \
> +    /* Waits for "monitor_cond_since" reply.  If successful, replaces   \
> +     * the CS contents by the data carried in the reply and             \
> +     * transitions to CS_S_MONITORING.  On failure, sends a             \
> +     * "monitor_cond" request and transitions to                        \
> +     * CS_S_DATA_MONITOR_COND_REQUESTED. */                             \
> +    OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED)                   \
> +                                                                        \
> +    /* Waits for "monitor_cond" reply.  If successful, replaces the     \
> +     * CS contents by the data carried in the reply and transitions     \
> +     * to CS_S_MONITORING.  On failure, sends a "monitor" request       \
> +     * and transitions to CS_S_DATA_MONITOR_REQUESTED. */               \
> +    OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED)                         \
> +                                                                        \
> +    /* Waits for "monitor" reply.  If successful, replaces the CS       \
> +     * contents by the data carried in the reply and transitions to     \
> +     * CS_S_MONITORING.  On failure, transitions to CS_S_ERROR. */      \
> +    OVSDB_CS_STATE(DATA_MONITOR_REQUESTED)                              \
> +                                                                        \
> +    /* State that processes "update", "update2" or "update3"            \
> +     * notifications for the main database (and the Database table      \
> +     * in _Server if available).                                        \
> +     *                                                                  \
> +     * If we're monitoring the Database table and we get notified       \
> +     * that the CS database has been deleted, we close the              \
> +     * connection (which will restart the state machine). */            \
> +    OVSDB_CS_STATE(MONITORING)                                          \
> +                                                                        \
> +    /* Terminal error state that indicates that nothing useful can be   \
> +     * done, for example because the database server doesn't actually   \
> +     * have the desired database.  We maintain the session with the     \
> +     * database server anyway.  If it starts serving the database       \
> +     * that we want, or if someone fixes and restarts the database,     \
> +     * then it will kill the session and we will automatically          \
> +     * reconnect and try again. */                                      \
> +    OVSDB_CS_STATE(ERROR)                                               \
> +                                                                        \
> +    /* Terminal state that indicates we connected to a useless server   \
> +     * in a cluster, e.g. one that is partitioned from the rest of      \
> +     * the cluster. We're waiting to retry. */                          \
> +    OVSDB_CS_STATE(RETRY)
> +
> +enum ovsdb_cs_state {
> +#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
> +    OVSDB_CS_STATES
> +#undef OVSDB_CS_STATE
> +};
> +
> +static const char *
> +ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
> +{
> +    switch (state) {
> +#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
> +        OVSDB_CS_STATES
> +#undef OVSDB_CS_STATE
> +    default: return "<unknown>";
> +    }
> +}
> +
> +/* A database being monitored.
> + *
> + * There are two instances of this data structure for each CS instance, one for
> + * the _Server database used for working with clusters, and the other one for
> + * the actual database that the client is interested in.  */
> +struct ovsdb_cs_db {
> +    struct ovsdb_cs *cs;
> +
> +    /* Data. */
> +    const char *db_name;        /* Database's name. */
> +    struct hmap tables;         /* Contains "struct ovsdb_cs_db_table *"s.*/
> +    struct json *monitor_id;
> +    struct json *schema;
> +
> +    /* Monitor version. */
> +    int max_version;            /* Maximum version of monitor request to use. */
> +    int monitor_version;        /* 0 if not monitoring, 1=monitor,
> +                                 * 2=monitor_cond, 3=monitor_cond_since. */
> +
> +    /* Condition changes. */
> +    bool cond_changed;          /* Change not yet sent to server? */
> +    unsigned int cond_seqno;    /* Increments when condition changes. */
> +
> +    /* Database locking. */
> +    char *lock_name;            /* Name of lock we need, NULL if none. */
> +    bool has_lock;              /* Has db server told us we have the lock? */
> +    bool is_lock_contended;     /* Has db server told us we can't get lock? */
> +    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
> +
> +    /* Last db txn id, used for fast resync through monitor_cond_since */
> +    struct uuid last_id;
> +
> +    /* Client interface. */
> +    struct ovs_list events;
> +    const struct ovsdb_cs_ops *ops;
> +    void *ops_aux;
> +};
> +
> +static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
> +
> +static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
> +static unsigned int ovsdb_cs_db_set_condition(
> +    struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
> +
> +static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
> +                                          struct ovsdb_cs_db *);
> +static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
> +static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
> +static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
> +static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
> +                                          struct ovsdb_cs_db *, int version);
> +static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
> +static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
> +
> +struct ovsdb_cs {
> +    struct ovsdb_cs_db server;
> +    struct ovsdb_cs_db data;
> +
> +    /* Session state.
> +     *
> +     * 'state_seqno' is a snapshot of the session's sequence number as returned
> +     * jsonrpc_session_get_seqno(session), so if it differs from the value that
> +     * function currently returns then the session has reconnected and the
> +     * state machine must restart.  */
> +    struct jsonrpc_session *session; /* Connection to the server. */
> +    char *remote;                    /* 'session' remote name. */
> +    enum ovsdb_cs_state state;       /* Current session state. */
> +    unsigned int state_seqno;        /* See above. */
> +    struct json *request_id;         /* JSON ID for request awaiting reply. */
> +
> +    /* IDs of outstanding transactions. */
> +    struct json **txns;
> +    size_t n_txns, allocated_txns;
> +
> +    /* Info for the _Server database. */
> +    struct uuid cid;
> +    struct hmap server_rows;
> +
> +    /* Clustered servers. */
> +    uint64_t min_index;      /* Minimum allowed index, to avoid regression. */
> +    bool leader_only;        /* If true, do not connect to Raft followers. */
> +    bool shuffle_remotes;    /* If true, connect to servers in random order. */
> +};
> +
> +static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
> +                                    const char *where);
> +#define ovsdb_cs_transition(CS, STATE) \
> +    ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
> +
> +static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
> +#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
> +
> +static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
> +
> +static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
> +                                            const struct json *result,
> +                                            int version);
> +static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
> +                                         const struct jsonrpc_msg *);
> +static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
> +                                              struct ovsdb_cs_db *,
> +                                              const struct jsonrpc_msg *);
> +
> +static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
> +                                              const struct jsonrpc_msg *);
> +static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
> +    struct ovsdb_cs_db *);
> +static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
> +    struct ovsdb_cs_db *);
> +static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
> +                                          const struct json *);
> +static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
> +                                           const struct json *params,
> +                                           bool new_has_lock);
> +static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
> +
> +static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
> +                                          const struct jsonrpc_msg *reply);
> +

> +/* Events. */
> +
> +void
> +ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
> +{
> +    if (event) {
> +        switch (event->type) {
> +        case OVSDB_CS_EVENT_TYPE_RECONNECT:
> +        case OVSDB_CS_EVENT_TYPE_LOCKED:
> +            break;
> +
> +        case OVSDB_CS_EVENT_TYPE_UPDATE:
> +            json_destroy(event->update.table_updates);
> +            break;
> +
> +        case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
> +            jsonrpc_msg_destroy(event->txn_reply);
> +            break;
> +        }
> +        free(event);
> +    }
> +}
> +

> +/* Lifecycle. */
> +
> +static void
> +ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
> +                 struct ovsdb_cs *parent, int max_version,
> +                 const struct ovsdb_cs_ops *ops, void *ops_aux)
> +{
> +    *db = (struct ovsdb_cs_db) {
> +        .cs = parent,
> +        .db_name = db_name,
> +        .tables = HMAP_INITIALIZER(&db->tables),
> +        .max_version = max_version,
> +        .monitor_id = json_array_create_2(json_string_create("monid"),
> +                                          json_string_create(db_name)),
> +        .events = OVS_LIST_INITIALIZER(&db->events),
> +        .ops = ops,
> +        .ops_aux = ops_aux,
> +    };
> +}
> +
> +/* Creates and returns a new client synchronization object.  The connection
> + * will monitor remote database 'db_name'.  If 'retry' is true, then also
> + * reconnect if the connection fails.
> + *
> + * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient
> + * "monitor_cond_since" method with the database.  Currently there's some kind
> + * of bug in the DDlog Rust code that interfaces to that, so instead
> + * ovn-northd-ddlog passes 1 to use plain 'monitor' instead.  Once the DDlog
> + * Rust code gets fixed, we might as well just delete 'max_version'
> + * entirely.
> + *
> + * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer
> + * that gets passed into each call.
> + *
> + * Use ovsdb_cs_set_remote() to configure the database to which to connect.
> + * Until a remote is configured, no data can be retrieved.
> + */
> +struct ovsdb_cs *
> +ovsdb_cs_create(const char *db_name, int max_version,
> +                const struct ovsdb_cs_ops *ops, void *ops_aux)
> +{
> +    struct ovsdb_cs *cs = xzalloc(sizeof *cs);
> +    ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs);
> +    ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux);
> +    cs->state_seqno = UINT_MAX;
> +    cs->request_id = NULL;
> +    cs->leader_only = true;
> +    cs->shuffle_remotes = true;
> +    hmap_init(&cs->server_rows);
> +
> +    return cs;
> +}
> +
> +static void
> +ovsdb_cs_db_destroy(struct ovsdb_cs_db *db)
> +{
> +    ovsdb_cs_db_destroy_tables(db);
> +
> +    json_destroy(db->monitor_id);
> +    json_destroy(db->schema);
> +
> +    free(db->lock_name);
> +
> +    json_destroy(db->lock_request_id);
> +
> +    /* This list always gets flushed out at the end of ovsdb_cs_run(). */
> +    ovs_assert(ovs_list_is_empty(&db->events));
> +}
> +
> +/* Destroys 'cs' and all of the data structures that it manages. */
> +void
> +ovsdb_cs_destroy(struct ovsdb_cs *cs)
> +{
> +    if (cs) {
> +        ovsdb_cs_db_destroy(&cs->server);
> +        ovsdb_cs_db_destroy(&cs->data);
> +        jsonrpc_session_close(cs->session);
> +        free(cs->remote);
> +        json_destroy(cs->request_id);
> +
> +        for (size_t i = 0; i < cs->n_txns; i++) {
> +            json_destroy(cs->txns[i]);
> +        }
> +        free(cs->txns);
> +
> +        ovsdb_cs_clear_server_rows(cs);
> +        hmap_destroy(&cs->server_rows);
> +
> +        free(cs);
> +    }
> +}
> +
> +static void
> +ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state,
> +                        const char *where)
> +{
> +    VLOG_DBG("%s: %s -> %s at %s",
> +             cs->session ? jsonrpc_session_get_name(cs->session) : "void",
> +             ovsdb_cs_state_to_string(cs->state),
> +             ovsdb_cs_state_to_string(new_state),
> +             where);
> +    cs->state = new_state;
> +}
> +
> +static void
> +ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request)
> +{
> +    json_destroy(cs->request_id);
> +    cs->request_id = json_clone(request->id);
> +    if (cs->session) {
> +        jsonrpc_session_send(cs->session, request);
> +    } else {
> +        jsonrpc_msg_destroy(request);
> +    }
> +}
> +
> +static void
> +ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where)
> +{
> +    ovsdb_cs_force_reconnect(cs);
> +    ovsdb_cs_transition_at(cs, CS_S_RETRY, where);
> +}
> +
> +static void
> +ovsdb_cs_restart_fsm(struct ovsdb_cs *cs)
> +{
> +    /* Resync data DB table conditions to avoid missing updates due to
> +     * conditions that were in flight or changed locally while the connection
> +     * was down.
> +     */
> +    ovsdb_cs_db_sync_condition(&cs->data);
> +
> +    ovsdb_cs_send_schema_request(cs, &cs->server);
> +    ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED);
> +    cs->data.monitor_version = 0;
> +    cs->server.monitor_version = 0;
> +}
> +
> +static void
> +ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
> +{
> +    bool ok = msg->type == JSONRPC_REPLY;
> +    if (!ok
> +        && cs->state != CS_S_SERVER_SCHEMA_REQUESTED
> +        && cs->state != CS_S_SERVER_MONITOR_REQUESTED
> +        && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED
> +        && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
> +        char *s = jsonrpc_msg_to_string(msg);
> +        VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
> +                     "%s state: %s", jsonrpc_session_get_name(cs->session),
> +                     jsonrpc_msg_type_to_string(msg->type),
> +                     ovsdb_cs_state_to_string(cs->state),
> +                     s);
> +        free(s);
> +        ovsdb_cs_retry(cs);
> +        return;
> +    }
> +
> +    switch (cs->state) {
> +    case CS_S_SERVER_SCHEMA_REQUESTED:
> +        if (ok) {
> +            json_destroy(cs->server.schema);
> +            cs->server.schema = json_clone(msg->result);
> +            ovsdb_cs_send_monitor_request(cs, &cs->server,
> +                                          cs->server.max_version);
> +            ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED);
> +        } else {
> +            ovsdb_cs_send_schema_request(cs, &cs->data);
> +            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
> +        }
> +        break;
> +
> +    case CS_S_SERVER_MONITOR_REQUESTED:
> +        if (ok) {
> +            cs->server.monitor_version = cs->server.max_version;
> +            ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result,
> +                                            cs->server.monitor_version);
> +            if (ovsdb_cs_check_server_db(cs)) {
> +                ovsdb_cs_send_db_change_aware(cs);
> +            }
> +        } else {
> +            ovsdb_cs_send_schema_request(cs, &cs->data);
> +            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
> +        }
> +        break;
> +
> +    case CS_S_DATA_SCHEMA_REQUESTED:
> +        json_destroy(cs->data.schema);
> +        cs->data.schema = json_clone(msg->result);
> +        if (cs->data.max_version >= 2) {
> +            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
> +            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
> +        } else {
> +            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
> +            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
> +        }
> +        break;
> +
> +    case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED:
> +        if (!ok) {
> +            /* "monitor_cond_since" not supported.  Try "monitor_cond". */
> +            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
> +            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
> +        } else {
> +            cs->data.monitor_version = 3;
> +            ovsdb_cs_transition(cs, CS_S_MONITORING);
> +            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3);
> +        }
> +        break;
> +
> +    case CS_S_DATA_MONITOR_COND_REQUESTED:
> +        if (!ok) {
> +            /* "monitor_cond" not supported.  Try "monitor". */
> +            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
> +            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
> +        } else {
> +            cs->data.monitor_version = 2;
> +            ovsdb_cs_transition(cs, CS_S_MONITORING);
> +            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2);
> +        }
> +        break;
> +
> +    case CS_S_DATA_MONITOR_REQUESTED:
> +        cs->data.monitor_version = 1;
> +        ovsdb_cs_transition(cs, CS_S_MONITORING);
> +        ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1);
> +        break;
> +
> +    case CS_S_MONITORING:
> +        /* We don't normally have a request outstanding in this state.  If we
> +         * do, it's a "monitor_cond_change", which means that the conditional
> +         * monitor clauses were updated.
> +         *
> +         * Mark the last requested conditions as acked and if further
> +         * condition changes were pending, send them now. */
> +        ovsdb_cs_db_ack_condition(&cs->data);
> +        ovsdb_cs_send_cond_change(cs);
> +        cs->data.cond_seqno++;
> +        break;
> +
> +    case CS_S_ERROR:
> +    case CS_S_RETRY:
> +        /* Nothing to do in this state. */
> +        break;
> +
> +    default:
> +        OVS_NOT_REACHED();
> +    }
> +}
> +
> +static void
> +ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
> +{
> +    bool is_response = (msg->type == JSONRPC_REPLY ||
> +                        msg->type == JSONRPC_ERROR);
> +
> +    /* Process a reply to an outstanding request. */
> +    if (is_response
> +        && cs->request_id && json_equal(cs->request_id, msg->id)) {
> +        json_destroy(cs->request_id);
> +        cs->request_id = NULL;
> +        ovsdb_cs_process_response(cs, msg);
> +        return;
> +    }
> +
> +    /* Process database contents updates. */
> +    if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) {
> +        return;
> +    }
> +    if (cs->server.monitor_version
> +        && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) {
> +        ovsdb_cs_check_server_db(cs);
> +        return;
> +    }
> +
> +    if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg)
> +        || (cs->server.monitor_version
> +            && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) {
> +        return;
> +    }
> +
> +    /* Process "lock" replies and related notifications. */
> +    if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) {
> +        return;
> +    }
> +
> +    /* Process response to a database transaction we submitted. */
> +    if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) {
> +        return;
> +    }
> +
> +    /* Unknown message.  Log at a low level because this can happen if
> +     * ovsdb_cs_txn_destroy() is called to destroy a transaction
> +     * before we receive the reply.
> +     *
> +     * (We could sort those out from other kinds of unknown messages by
> +     * using distinctive IDs for transactions, if it seems valuable to
> +     * do so, and then it would be possible to use different log
> +     * levels. XXX?) */
> +    char *s = jsonrpc_msg_to_string(msg);
> +    VLOG_DBG("%s: received unexpected %s message: %s",
> +             jsonrpc_session_get_name(cs->session),
> +             jsonrpc_msg_type_to_string(msg->type), s);
> +    free(s);
> +}
> +
> +static struct ovsdb_cs_event *
> +ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
> +{
> +    struct ovsdb_cs_event *event = xmalloc(sizeof *event);
> +    event->type = type;
> +    ovs_list_push_back(&db->events, &event->list_node);
> +    return event;
> +}
> +
> +/* Processes a batch of messages from the database server on 'cs'.  This may
> + * cause the CS's contents to change.
> + *
> + * Initializes 'events' with a list of events that occurred on 'cs'.  The
> + * caller must process and destroy all of the events. */
> +void
> +ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
> +{
> +    ovs_list_init(events);
> +    if (!cs->session) {
> +        return;
> +    }
> +
> +    ovsdb_cs_send_cond_change(cs);
> +
> +    jsonrpc_session_run(cs->session);
> +
> +    unsigned int seqno = jsonrpc_session_get_seqno(cs->session);
> +    if (cs->state_seqno != seqno) {
> +        cs->state_seqno = seqno;
> +        ovsdb_cs_restart_fsm(cs);
> +
> +        for (size_t i = 0; i < cs->n_txns; i++) {
> +            json_destroy(cs->txns[i]);
> +        }
> +        cs->n_txns = 0;
> +
> +        ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT);
> +
> +        if (cs->data.lock_name) {
> +            jsonrpc_session_send(
> +                cs->session,
> +                ovsdb_cs_db_compose_lock_request(&cs->data));
> +        }
> +    }
> +
> +    for (int i = 0; i < 50; i++) {
> +        struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
> +        if (!msg) {
> +            break;
> +        }
> +        ovsdb_cs_process_msg(cs, msg);
> +        jsonrpc_msg_destroy(msg);
> +    }
> +    ovs_list_push_back_all(events, &cs->data.events);
> +}
> +
> +/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
> + * do or when activity occurs on a transaction on 'cs'. */
> +void
> +ovsdb_cs_wait(struct ovsdb_cs *cs)
> +{
> +    if (!cs->session) {
> +        return;
> +    }
> +    jsonrpc_session_wait(cs->session);
> +    jsonrpc_session_recv_wait(cs->session);
> +}
> +

> +/* Network connection. */
> +
> +/* Changes the remote and creates a new session.
> + *
> + * If 'retry' is true, the connection to the remote will automatically retry
> + * when it fails.  If 'retry' is false, the connection is one-time. */
> +void
> +ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry)
> +{
> +    if (cs
> +        && ((remote != NULL) != (cs->remote != NULL)
> +            || (remote && cs->remote && strcmp(remote, cs->remote)))) {
> +        /* Close the old session, if any. */
> +        if (cs->session) {
> +            jsonrpc_session_close(cs->session);
> +            cs->session = NULL;
> +
> +            free(cs->remote);
> +            cs->remote = NULL;
> +        }
> +
> +        /* Open new session, if any. */
> +        if (remote) {
> +            struct svec remotes = SVEC_EMPTY_INITIALIZER;
> +            ovsdb_session_parse_remote(remote, &remotes, &cs->cid);
> +            if (cs->shuffle_remotes) {
> +                svec_shuffle(&remotes);
> +            }
> +            cs->session = jsonrpc_session_open_multiple(&remotes, retry);
> +            svec_destroy(&remotes);
> +
> +            cs->state_seqno = UINT_MAX;
> +
> +            cs->remote = xstrdup(remote);
> +        }
> +    }
> +}
> +
> +/* Reconfigures 'cs' so that it would reconnect to the database, if
> + * connection was dropped. */
> +void
> +ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs)
> +{
> +    if (cs->session) {
> +        jsonrpc_session_enable_reconnect(cs->session);
> +    }
> +}
> +
> +/* Forces 'cs' to drop its connection to the database and reconnect.  In the
> + * meantime, the contents of 'cs' will not change. */
> +void
> +ovsdb_cs_force_reconnect(struct ovsdb_cs *cs)
> +{
> +    if (cs->session) {
> +        jsonrpc_session_force_reconnect(cs->session);
> +    }
> +}
> +
> +/* Drops 'cs''s current connection and the cached session.  This is useful if
> + * the client notices some kind of inconsistency. */
> +void
> +ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs)
> +{
> +    cs->data.last_id = UUID_ZERO;
> +    ovsdb_cs_retry(cs);
> +}
> +
> +/* Returns true if 'cs' is currently connected or will eventually try to
> + * reconnect. */
> +bool
> +ovsdb_cs_is_alive(const struct ovsdb_cs *cs)
> +{
> +    return (cs->session
> +            && jsonrpc_session_is_alive(cs->session)
> +            && cs->state != CS_S_ERROR);
> +}
> +
> +/* Returns true if 'cs' is currently connected to a server. */
> +bool
> +ovsdb_cs_is_connected(const struct ovsdb_cs *cs)
> +{
> +    return cs->session && jsonrpc_session_is_connected(cs->session);
> +}
> +
> +/* Returns the last error reported on a connection by 'cs'.  The return value
> + * is 0 only if no connection made by 'cs' has ever encountered an error and
> + * a negative response to a schema request has never been received. See
> + * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
> + * interpretation. */
> +int
> +ovsdb_cs_get_last_error(const struct ovsdb_cs *cs)
> +{
> +    int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0;
> +    if (err) {
> +        return err;
> +    } else if (cs->state == CS_S_ERROR) {
> +        return ENOENT;
> +    } else {
> +        return 0;
> +    }
> +}
> +
> +/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in
> + * milliseconds. */
> +void
> +ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval)
> +{
> +    if (cs->session) {
> +        jsonrpc_session_set_probe_interval(cs->session, probe_interval);
> +    }
> +}
> +

> +/* Conditional monitoring. */
> +
> +/* A table being monitored.
> + *
> + * At the CS layer, the only thing we care about, table-wise, is the conditions
> + * we're using for monitoring them, so there's little here.  We only create
> + * these table structures at all for tables that have conditions. */
> +struct ovsdb_cs_db_table {
> +    struct hmap_node hmap_node; /* Indexed by 'name'. */
> +    const char *name;      /* Table name. */
> +
> +    /* Each of these is a null pointer if it is empty, or JSON [<condition>*]
> +     * or [true] or [false] otherwise.  [true] could be represented as a null
> +     * pointer, but we want to distinguish "empty slot" from "a condition that
> +     * is always true" in a slot. */
> +    struct json *ack_cond; /* Last condition acked by the server. */
> +    struct json *req_cond; /* Last condition requested to the server. */
> +    struct json *new_cond; /* Latest condition set by the IDL client. */
> +};
> +
> +/* A kind of condition, so that we can treat equivalent JSON as equivalent. */
> +enum condition_type {
> +    COND_FALSE,                 /* [] or [false] */
> +    COND_TRUE,                  /* Null pointer or [true] */
> +    COND_OTHER                  /* Anything else. */
> +};
> +
> +/* Returns the condition_type for 'condition'. */
> +static enum condition_type
> +condition_classify(const struct json *condition)
> +{
> +    if (condition) {
> +        const struct json_array *a = json_array(condition);
> +        switch (a->n) {
> +        case 0:
> +            return COND_FALSE;
> +
> +        case 1:
> +            return (a->elems[0]->type == JSON_FALSE ? COND_FALSE
> +                    : a->elems[0]->type == JSON_TRUE ? COND_TRUE
> +                    : COND_OTHER);
> +
> +        default:
> +            return COND_OTHER;
> +        }
> +    } else {
> +        return COND_TRUE;
> +    }
> +}
> +
> +/* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're
> + * not going to compare for boolean equivalence or anything). */
> +static bool
> +condition_equal(const struct json *a, const struct json *b)
> +{
> +    enum condition_type type = condition_classify(a);
> +    return (type == condition_classify(b)
> +            && (type != COND_OTHER || json_equal(a, b)));
> +}
> +
> +/* Returns a clone of 'condition', translating always-true and always-false to
> + * [true] and [false], respectively. */
> +static struct json *
> +condition_clone(const struct json *condition)
> +{
> +    switch (condition_classify(condition)) {
> +    case COND_TRUE:
> +        return json_array_create_1(json_boolean_create(true));
> +
> +    case COND_FALSE:
> +        return json_array_create_1(json_boolean_create(false));
> +
> +    case COND_OTHER:
> +        return json_clone(condition);
> +    }
> +
> +    OVS_NOT_REACHED();
> +}
> +
> +/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an
> + * empty one if necessary. */
> +static struct ovsdb_cs_db_table *
> +ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table)
> +{
> +    uint32_t hash = hash_string(table, 0);
> +    struct ovsdb_cs_db_table *t;
> +
> +    HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) {
> +        if (!strcmp(t->name, table)) {
> +            return t;
> +        }
> +    }
> +
> +    t = xzalloc(sizeof *t);
> +    t->name = xstrdup(table);
> +    t->new_cond = json_array_create_1(json_boolean_create(true));
> +    hmap_insert(&db->tables, &t->hmap_node, hash);
> +    return t;
> +}
> +
> +static void
> +ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db)
> +{
> +    struct ovsdb_cs_db_table *table, *next;
> +    HMAP_FOR_EACH_SAFE (table, next, hmap_node, &db->tables) {
> +        json_destroy(table->ack_cond);
> +        json_destroy(table->req_cond);
> +        json_destroy(table->new_cond);

We leak both 'table' and 'table->name' because we don't free them here.

> +        hmap_remove(&db->tables, &table->hmap_node);
> +    }
> +    hmap_destroy(&db->tables);
> +}
> +
> +static unsigned int
> +ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table,
> +                          const struct json *condition)
> +{
> +    /* Compare the new condition to the last known condition which can be
> +     * either "new" (not sent yet), "requested" or "acked", in this order. */
> +    struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table);
> +    const struct json *table_cond = (t->new_cond ? t->new_cond
> +                                     : t->req_cond ? t->req_cond
> +                                     : t->ack_cond);
> +    if (!condition_equal(condition, table_cond)) {
> +        json_destroy(t->new_cond);
> +        t->new_cond = condition_clone(condition);
> +        db->cond_changed = true;
> +        poll_immediate_wake();
> +    }
> +
> +    /* Conditions will be up to date when we receive replies for already
> +     * requested and new conditions, if any. */
> +    return db->cond_seqno + (t->new_cond ? 1 : 0) + (t->req_cond ? 1 : 0);
> +}
> +
> +/* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges
> + * to send the new condition to the database server.
> + *
> + * Return the next conditional update sequence number.  When this value and
> + * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the
> + * 'condition'. */
> +unsigned int
> +ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table,
> +                       const struct json *condition)
> +{
> +    return ovsdb_cs_db_set_condition(&cs->data, table, condition);
> +}
> +
> +/* Returns a "sequence number" that represents the number of conditional
> + * monitoring updates successfully received by the OVSDB server of a CS
> + * connection.
> + *
> + * ovsdb_cs_set_condition() sets a new condition that is different from the
> + * current condtion, the next expected "sequence number" is returned.
> + *
> + * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the
> + * return value of ovsdb_cs_set_condition(), the client is assured that:
> + *
> + *   - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB
> + *     server.
> + *
> + *   -  'cs' now contains the content matches the new conditions.   */
> +unsigned int
> +ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs)
> +{
> +    return cs->data.cond_seqno;
> +}
> +
> +static struct json *
> +ovsdb_cs_create_cond_change_req(const struct json *cond)
> +{
> +    struct json *monitor_cond_change_request = json_object_create();
> +    json_object_put(monitor_cond_change_request, "where", json_clone(cond));
> +    return monitor_cond_change_request;
> +}
> +
> +static struct jsonrpc_msg *
> +ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db)
> +{
> +    if (!db->cond_changed) {
> +        return NULL;
> +    }
> +
> +    struct json *monitor_cond_change_requests = NULL;
> +    struct ovsdb_cs_db_table *table;
> +    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
> +        /* Always use the most recent conditions set by the CS client when
> +         * requesting monitor_cond_change, i.e., table->new_cond.
> +         */
> +        if (table->new_cond) {
> +            struct json *req =
> +                ovsdb_cs_create_cond_change_req(table->new_cond);
> +            if (req) {
> +                if (!monitor_cond_change_requests) {
> +                    monitor_cond_change_requests = json_object_create();
> +                }
> +                json_object_put(monitor_cond_change_requests,
> +                                table->name,
> +                                json_array_create_1(req));
> +            }
> +            /* Mark the new condition as requested by moving it to req_cond.
> +             * If there's already requested condition that's a bug.
> +             */
> +            ovs_assert(table->req_cond == NULL);
> +            table->req_cond = table->new_cond;
> +            table->new_cond = NULL;
> +        }
> +    }
> +
> +    if (!monitor_cond_change_requests) {
> +        return NULL;
> +    }
> +
> +    db->cond_changed = false;
> +    struct json *params = json_array_create_3(json_clone(db->monitor_id),
> +                                              json_clone(db->monitor_id),
> +                                              monitor_cond_change_requests);
> +    return jsonrpc_create_request("monitor_cond_change", params, NULL);
> +}
> +
> +/* Marks all requested table conditions in 'db' as acked by the server.
> + * It should be called when the server replies to monitor_cond_change
> + * requests.
> + */
> +static void
> +ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db)
> +{
> +    struct ovsdb_cs_db_table *table;
> +    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
> +        if (table->req_cond) {
> +            json_destroy(table->ack_cond);
> +            table->ack_cond = table->req_cond;
> +            table->req_cond = NULL;
> +        }
> +    }
> +}
> +
> +/* Should be called when the CS fsm is restarted and resyncs table conditions
> + * based on the state the DB is in:
> + * - if a non-zero last_id is available for the DB then upon reconnect
> + *   the CS should first request acked conditions to avoid missing updates
> + *   about records that were added before the transaction with
> + *   txn-id == last_id. If there were requested condition changes in flight
> + *   (i.e., req_cond not NULL) and the CS client didn't set new conditions
> + *   (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a
> + *   follow up monitor_cond_change request.
> + * - if there's no last_id available for the DB then it's safe to use the
> + *   latest conditions set by the CS client even if they weren't acked yet.
> + */
> +static void
> +ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db)
> +{
> +    bool ack_all = uuid_is_zero(&db->last_id);
> +    if (ack_all) {
> +        db->cond_changed = false;
> +    }
> +
> +    struct ovsdb_cs_db_table *table;
> +    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
> +        /* When monitor_cond_since requests will be issued, the
> +         * table->ack_cond condition will be added to the "where" clause".
> +         * Follow up monitor_cond_change requests will use table->new_cond.
> +         */
> +        if (ack_all) {
> +            if (table->new_cond) {
> +                json_destroy(table->req_cond);
> +                table->req_cond = table->new_cond;
> +                table->new_cond = NULL;
> +            }
> +
> +            if (table->req_cond) {
> +                json_destroy(table->ack_cond);
> +                table->ack_cond = table->req_cond;
> +                table->req_cond = NULL;
> +            }
> +        } else {
> +            /* If there was no "unsent" condition but instead a
> +             * monitor_cond_change request was in flight, move table->req_cond
> +             * to table->new_cond and set db->cond_changed to trigger a new
> +             * monitor_cond_change request.
> +             *
> +             * However, if a new condition has been set by the CS client,
> +             * monitor_cond_change will be sent anyway and will use the most
> +             * recent table->new_cond so there's no need to update it here.
> +             */
> +            if (table->req_cond) {
> +                if (table->new_cond) {
> +                    json_destroy(table->req_cond);
> +                } else {
> +                    table->new_cond = table->req_cond;
> +                }
> +                table->req_cond = NULL;
> +                db->cond_changed = true;
> +            }

This used to be:

if (table->req_cond && !table->new_cond) {
    /* Move "req_cond" to "new_cond". */
    ovsdb_idl_condition_move(&table->new_cond, &table->req_cond);
    db->cond_changed = true;
}

Which is what the comment above tried to explain.

Is there a case that was missed in the old code?  If so, can we factor
out the fix in a separate patch to make it easier to track?

Thanks,
Dumitru



More information about the dev mailing list