[ovs-dev] [PATCH v5 ovn 2/4] ovn-controller: Add per node states to I-P engine.

Dumitru Ceara dceara at redhat.com
Tue Nov 19 09:57:55 UTC 2019


On Tue, Nov 19, 2019 at 1:41 AM Han Zhou <hzhou at ovn.org> wrote:
>
> Thanks Dumitru. Please see my comments inline below.
>
> On Mon, Nov 18, 2019 at 6:07 AM Dumitru Ceara <dceara at redhat.com> wrote:
> >
> > This commit transforms the 'changed' field in struct engine_node in a
> > 'state' field. Possible node states are:
> > - "Stale": data in the node is not up to date with the DB.
> > - "Updated": data in the node is valid but was updated during
> >   the last run of the engine.
> > - "Valid": data in the node is valid and didn't change during
> >   the last run of the engine.
> > - "Aborted": during the last run, processing was aborted for
> >   this node.
> >
> > This commit also further refactors the I-P engine:
> > - instead of recursively performing all the engine processing a
> >   preprocessing stage is added (engine_get_nodes()) before the main processing
> >   loop is executed in order to topologically sort nodes in the engine such
> >   that all inputs of a given node appear in the sorted array before the node
> >   itself. This simplifies a bit the code in engine_run().
>
> Could you tell the reason of changing it to non-recursive? It seems adding more code rather than simplifying, and effort is needed to ensure the correctness for the new code. Probably there are some benefit that make the later patches easier, but it is not obvious to me. Could you help point out if that's the case?

My reasoning was that the engine graph is static (i.e., we build it
once at startup and it never changes afterwards) so all recursion
trees are always identical.

Moreover, with adding engine node explicit states we don't really need
to store a run_id in the nodes because we have the state of each node
after an engine run. I think removing the run_id is a good idea
because we minimize the external state the user of the engine should
manage (in this case engine_run_id and it's incrementing logic).

However, if we keep the engine processing in a recursive fashion then
for each of the recursive operations (engine_run, engine_need_run,
engine_init, engine_cleanup) we need a way to avoid executing the
operation twice for a given node. This can be done by adding more
flags to the nodes (or more state values) but given that our DAG is
fixed, precomputing the processing order made more sense to me. What
do you think?

>
> > - remove the need for using an engine_run_id by using the newly added states.
> >
> > Signed-off-by: Dumitru Ceara <dceara at redhat.com>
> > ---
> >  controller/ovn-controller.c |   88 ++++++++++-------
> >  lib/inc-proc-eng.c          |  219 ++++++++++++++++++++++++++++++++-----------
> >  lib/inc-proc-eng.h          |   75 +++++++++++----
> >  3 files changed, 271 insertions(+), 111 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index c56190f..033eff4 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
> >          (struct ed_type_ofctrl_is_connected *)node->data;
> >      if (data->connected != ofctrl_is_connected()) {
> >          data->connected = !data->connected;
> > -        node->changed = true;
> > +        engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > -    node->changed = false;
> > +    engine_set_node_state(node, EN_VALID);
> >  }
> >
> >  struct ed_type_addr_sets {
> > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
> >      addr_sets_init(as_table, &as->addr_sets);
> >
> >      as->change_tracked = false;
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node)
> >      addr_sets_update(as_table, &as->addr_sets, &as->new,
> >                       &as->deleted, &as->updated);
> >
> > -    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
> > -                    || !sset_is_empty(&as->updated);
> > +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> > +            !sset_is_empty(&as->updated)) {
> > +        engine_set_node_state(node, EN_UPDATED);
> > +    } else {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >
> >      as->change_tracked = true;
> > -    node->changed = true;
> >      return true;
> >  }
> >
> > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
> >      port_groups_init(pg_table, &pg->port_groups);
> >
> >      pg->change_tracked = false;
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct engine_node *node)
> >      port_groups_update(pg_table, &pg->port_groups, &pg->new,
> >                       &pg->deleted, &pg->updated);
> >
> > -    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
> > -                    || !sset_is_empty(&pg->updated);
> > +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> > +            !sset_is_empty(&pg->updated)) {
> > +        engine_set_node_state(node, EN_UPDATED);
> > +    } else {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >
> >      pg->change_tracked = true;
> > -    node->changed = true;
> >      return true;
> >  }
> >
> > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
> >      update_ct_zones(local_lports, local_datapaths, ct_zones,
> >                      ct_zone_bitmap, pending_ct_zones);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node)
> >      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> >      if (data->mff_ovn_geneve != mff_ovn_geneve) {
> >          data->mff_ovn_geneve = mff_ovn_geneve;
> > -        node->changed = true;
> > +        engine_set_node_state(node, EN_UPDATED);
> >          return;
> >      }
> > -    node->changed = false;
> > +    engine_set_node_state(node, EN_VALID);
> >  }
> >
> >  struct ed_type_flow_output {
> > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
> >                   active_tunnels,
> >                   flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >  }
> >
> >  static bool
> > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node)
> >                flow_table, group_table, meter_table, lfrr,
> >                conj_id_ofs);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return handled;
> >  }
> >
> > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct engine_node *node)
> >      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> >              mac_binding_table, flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >  }
> >
> > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
> >              chassis, ct_zones, local_datapaths,
> >              active_tunnels, flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >  }
> >
> > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
> >              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
> >              flow_table);
> >
> > -    node->changed = true;
> > +    engine_set_node_state(node, EN_UPDATED);
> >      return true;
> >
> >  }
> > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >      SSET_FOR_EACH (ref_name, updated) {
> >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >      SSET_FOR_EACH (ref_name, new) {
> >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
> >                      conj_id_ofs, &changed)) {
> >              return false;
> >          }
> > -        node->changed = changed || node->changed;
> > +        if (changed) {
> > +            engine_set_node_state(node, EN_UPDATED);
> > +        }
> >      }
> >
> >      return true;
> > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
> >      engine_add_input(&en_runtime_data, &en_sb_port_binding,
> >                       runtime_data_sb_port_binding_handler);
> >
> > -    engine_init(&en_flow_output);
> > +    /* Get the sorted engine nodes to be used for every engine run. */
> > +    size_t en_count = 0;
> > +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> > +                                                     &en_count);
> > +    engine_init(en_nodes, en_count);
>
> I think engine_get_nodes() and engine_init() can be combined. We only need to expose engine_init() interface, which can call engine_get_nodes() internally and store n_count and nodes internally in engine module.

Ok but then wouldn't it make more sense then to store all this in an
"struct inc_engine" structure which engine_init() would return? We'd
also need to move the globals engine_force_recompute,
engine_abort_recompute, engine_context inside the new inc_engine
structure then.

>
> In addition, there can be more than 1 root node in the DAG. Originally we can just call engine_run(root_node) for each root node. Now with the non-recursive, I think we can handle this by:
>   - in engine_get_nodes, we don't need to pass the root_node, but instead, the engine can just process all engine nodes.
>   - remove the root_node parameter from all interfaces.

In general, a DAG doesn't really have a root node, indeed. But even in
the original code everything was designed with one "ultimate" output
in mind, en_flow_output, and we would only call
engine_run(&en_flow_output, ..). I don't think we can remove the
root_node parameter from engine_get_nodes because we need to have a
starting point for precomputing the processing order of the nodes. We
were implicitly doing the same thing through recursion in the original
code.

>
> >
> >      ofctrl_init(&ed_flow_output.group_table,
> >                  &ed_flow_output.meter_table,
> > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
> >      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
> >                               &pending_pkt);
> >
> > -    uint64_t engine_run_id = 0;
> > -    bool engine_run_done = true;
> > -
> >      unsigned int ovs_cond_seqno = UINT_MAX;
> >      unsigned int ovnsb_cond_seqno = UINT_MAX;
> >
> > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
> >      exiting = false;
> >      restart = false;
> >      while (!exiting) {
> > -        engine_run_id++;
> > +        engine_init_run(en_nodes, en_count, &en_flow_output);
> >
> >          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
> >          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
> >                               * this round of engine_run and continue processing
> >                               * acculated changes incrementally later when
> >                               * ofctrl_can_put() returns true. */
> > -                            if (engine_run_done) {
> > +                            if (!engine_aborted(&en_flow_output)) {
> >                                  engine_set_abort_recompute(true);
> > -                                engine_run_done = engine_run(&en_flow_output,
> > -                                                             engine_run_id);
> > +                                engine_run(en_nodes, en_count);
> >                              }
> >                          } else {
> >                              engine_set_abort_recompute(false);
> > -                            engine_run_done = true;
> > -                            engine_run(&en_flow_output, engine_run_id);
> > +                            engine_run(en_nodes, en_count);
> >                          }
> >                      }
> >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
> >                                 sbrec_meter_table_get(ovnsb_idl_loop.idl),
> >                                 get_nb_cfg(sbrec_sb_global_table_get(
> >                                                ovnsb_idl_loop.idl)),
> > -                               en_flow_output.changed);
> > +                               engine_node_changed(&en_flow_output));
> >                      pinctrl_run(ovnsb_idl_txn,
> >                                  sbrec_datapath_binding_by_key,
> >                                  sbrec_port_binding_by_datapath,
> > @@ -2089,7 +2100,7 @@ main(int argc, char *argv[])
> >                                  &ed_runtime_data.local_datapaths,
> >                                  &ed_runtime_data.active_tunnels);
> >
> > -                    if (en_runtime_data.changed) {
> > +                    if (engine_node_changed(&en_runtime_data)) {
> >                          update_sb_monitors(ovnsb_idl_loop.idl, chassis,
> >                                             &ed_runtime_data.local_lports,
> >                                             &ed_runtime_data.local_datapaths);
> > @@ -2097,17 +2108,17 @@ main(int argc, char *argv[])
> >                  }
> >
> >              }
> > -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> > +            if (engine_need_run(en_nodes, en_count, &en_flow_output)) {
> >                  VLOG_DBG("engine did not run, force recompute next time: "
> >                              "br_int %p, chassis %p", br_int, chassis);
> >                  engine_set_force_recompute(true);
> >                  poll_immediate_wake();
> > -            } else if (!engine_run_done) {
> > +            } else if (engine_aborted(&en_flow_output)) {
> >                  VLOG_DBG("engine was aborted, force recompute next time: "
> >                           "br_int %p, chassis %p", br_int, chassis);
> >                  engine_set_force_recompute(true);
> >                  poll_immediate_wake();
> > -            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> > +            } else if (!engine_has_run(&en_flow_output)) {
> >                  VLOG_DBG("engine did not run, and it was not needed"
> >                           " either: br_int %p, chassis %p",
> >                           br_int, chassis);
> > @@ -2135,8 +2146,7 @@ main(int argc, char *argv[])
> >                      }
> >                  } else {
> >                      VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
> > -                             "%p not ready. run-id: %"PRIu64, br_int,
> > -                             chassis, engine_run_id);
> > +                             "%p not ready.", br_int, chassis);
> >                      unixctl_command_reply_error(pending_pkt.conn,
> >                          "ovn-controller not ready.");
> >                  }
> > @@ -2185,7 +2195,7 @@ main(int argc, char *argv[])
> >      }
> >
> >      engine_set_context(NULL);
> > -    engine_cleanup(&en_flow_output);
> > +    engine_cleanup(en_nodes, en_count);
> >
> >      /* It's time to exit.  Clean up the databases if we are not restarting */
> >      if (!restart) {
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 8a085e2..ee6afbe 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
> >  static bool engine_abort_recompute = false;
> >  static const struct engine_context *engine_context;
> >
> > +static const char *engine_node_state_name[EN_STATE_MAX] = {
> > +    [EN_STALE]   = "Stale",
> > +    [EN_UPDATED] = "Updated",
> > +    [EN_VALID]   = "Valid",
> > +    [EN_ABORTED] = "Aborted",
> > +};
> > +
> >  void
> >  engine_set_force_recompute(bool val)
> >  {
> > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context *ctx)
> >      engine_context = ctx;
> >  }
> >
> > -void
> > -engine_init(struct engine_node *node)
> > +/* Builds the topologically sorted 'sorted_nodes' array starting from
> > + * 'node'.
> > + */
> > +static struct engine_node **
> > +engine_topo_sort(struct engine_node *node, struct engine_node **sorted_nodes,
> > +                 size_t *n_count, size_t *n_size)
> >  {
> > +    /* It's not so efficient to walk the array of already sorted nodes but
> > +     * we know that sorting is done only once at startup so it's ok for now.
> > +     */
> > +    for (size_t i = 0; i < *n_count; i++) {
> > +        if (sorted_nodes[i] == node) {
> > +            return sorted_nodes;
> > +        }
> > +    }
> > +
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        engine_init(node->inputs[i].node);
> > +        sorted_nodes = engine_topo_sort(node->inputs[i].node, sorted_nodes,
> > +                                        n_count, n_size);
> >      }
> > -    if (node->init) {
> > -        node->init(node);
> > +    if (*n_count == *n_size) {
> > +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof *sorted_nodes);
> >      }
> > +    sorted_nodes[(*n_count)] = node;
> > +    (*n_count)++;
> > +    return sorted_nodes;
> > +}
> > +
> > +struct engine_node **
> > +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> > +{
> > +    size_t n_size = 0;
> > +
> > +    *n_count = 0;
> > +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
> >  }
> >
> >  void
> > -engine_cleanup(struct engine_node *node)
> > +engine_init(struct engine_node **nodes, size_t n_count)
> >  {
> > -    for (size_t i = 0; i < node->n_inputs; i++) {
> > -        engine_cleanup(node->inputs[i].node);
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        if (nodes[i]->init) {
> > +            nodes[i]->init(nodes[i]);
> > +        }
> >      }
> > -    if (node->cleanup) {
> > -        node->cleanup(node);
> > +}
> > +
> > +void
> > +engine_cleanup(struct engine_node **nodes, size_t n_count)
> > +{
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        if (nodes[i]->cleanup) {
> > +            nodes[i]->cleanup(nodes[i]);
> > +        }
> >      }
> > +    free(nodes);
> >  }
> >
> >  struct engine_node *
> > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
> >      ed->n_indexes ++;
> >  }
> >
> > +void
> > +engine_set_node_state_at(struct engine_node *node,
> > +                         enum engine_node_state state,
> > +                         const char *where)
> > +{
> > +    if (node->state == state) {
> > +        return;
> > +    }
> > +
> > +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> > +             where, node->name,
> > +             engine_node_state_name[node->state],
> > +             engine_node_state_name[state]);
> > +
> > +    node->state = state;
> > +}
> > +
> > +static bool
> > +engine_node_valid(struct engine_node *node)
> > +{
> > +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> > +}
> > +
> >  bool
> > -engine_has_run(struct engine_node *node, uint64_t run_id)
> > +engine_node_changed(struct engine_node *node)
> >  {
> > -    return node->run_id == run_id;
> > +    return node->state == EN_UPDATED;
> > +}
> > +
> > +bool
> > +engine_has_run(struct engine_node *node)
> > +{
>
> engine_has_run() should go through all nodes. If any node is NOT STALE, return true. Engine hasn't run only if all nodes are STALE. (orginially it is easier to tell by utilizing engine_run_id)
> If some nodes are STALE, some are not, it means engine has run but aborted.

Due to the way the code is written (and as you noticed in earlier
versions of the patches) the STALE state is transient. If the engine
ran, even if it aborted, then no node will be in state STALE. Please
see below regarding how ABORTED is propagated.

>
> > +    return node->state != EN_STALE;
> > +}
> > +
> > +bool
> > +engine_aborted(struct engine_node *node)
> > +{
> > +    return node->state == EN_ABORTED;
> > +}
> > +
> > +void
> > +engine_init_run(struct engine_node **nodes, size_t n_count,
> > +                struct engine_node *root_node)
> > +{
> > +    /* No need to reinitialize if last run didn't happen. */
> > +    if (!engine_has_run(root_node)) {
>
> I think here is a problem. If in the last round, the root node didn't run because it is aborted at some intermediate node, but many other nodes could have run and state already changed to VALID/UPDATED. Now if we don't do the init, those nodes may contain invalid data since it is a new round of iteration, but the state telling they are valid.

If one of the inputs of a given node went to ABORT state then the node
itself will also move to ABORT. And this will get propagated to all
successor nodes.
The idea of the check was to avoid walking all the nodes if the
root_node is in state STALE (engine_has_run() == false). STALE is
transient so if one node is STALE then all nodes are STALE and this
can only happen if the engine didn't run in the current iteration.

>
> We don't need to pass "root_node" for engine_init_run. We can just reset all nodes to STALE state.

I agree, we could decide if an engine needs init_run by checking that
the first node in 'nodes' is not in state STALE.

>
>
> > +        return;
> > +    }
> > +
> > +    VLOG_DBG("Initializing new run");
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        engine_set_node_state(nodes[i], EN_STALE);
> > +    }
> >  }
> >
> >  /* Do a full recompute (or at least try). If we're not allowed then
> >   * mark the node as "aborted".
> >   */
> > -static bool
> > +static void
> >  engine_recompute(struct engine_node *node, bool forced, bool allowed)
> >  {
> >      VLOG_DBG("node: %s, recompute (%s)", node->name,
> > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool forced, bool allowed)
> >
> >      if (!allowed) {
> >          VLOG_DBG("node: %s, recompute aborted", node->name);
> > -        return false;
> > +        engine_set_node_state(node, EN_ABORTED);
> > +        return;
> >      }
> >
> > +    /* Run the node handler which might change state. */
> >      node->run(node);
> > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -    return true;
> >  }
> >
> >  /* Return true if the node could be computed without triggerring a full
> > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
> >  {
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> >          /* If the input node data changed call its change handler. */
> > -        if (node->inputs[i].node->changed) {
> > +        if (node->inputs[i].node->state == EN_UPDATED) {
> >              VLOG_DBG("node: %s, handle change for input %s",
> >                       node->name, node->inputs[i].node->name);
> >
> > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool recompute_allowed)
> >                  VLOG_DBG("node: %s, can't handle change for input %s, "
> >                           "fall back to recompute",
> >                           node->name, node->inputs[i].node->name);
> > -                if (!engine_recompute(node, false, recompute_allowed)) {
> > +                engine_recompute(node, false, recompute_allowed);
> > +                if (engine_aborted(node)) {
>
> The aborted state was propagated through the recursive logic, but it is not the case in this new implementation. Is this on purpose?

With the new implementation the aborted state is also propagated, just
that it's done in engine_run_node().

>
> >                      return false;
> >                  }
> >              }
> >          }
> >      }
> > -
> >      return true;
> >  }
> >
> > -bool engine_run(struct engine_node *node, uint64_t run_id)
> > +static void
> > +engine_run_node(struct engine_node *node)
> >  {
> > -    if (node->run_id == run_id) {
> > -        /* The node was already updated in this run (could be input for
> > -         * multiple other nodes). Stop processing.
> > -         */
> > -        return true;
> > -    }
> > -
> > -    /* Initialize the node for this run. */
> > -    node->run_id = run_id;
> > -    node->changed = false;
> > -
> >      if (!node->n_inputs) {
> > +        /* Run the node handler which might change state. */
> >          node->run(node);
> > -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -        return true;
> > +        return;
> >      }
> >
> > +    bool input_stale = false;
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        if (!engine_run(node->inputs[i].node, run_id)) {
> > -            return false;
> > +        if (!engine_node_valid(node->inputs[i].node)) {
> > +            /* If the input node aborted computation, move to EN_ABORTED.
> > +             * This will be propagated to following nodes.
> > +             */
> > +            if (engine_aborted(node->inputs[i].node)) {
> > +                engine_set_node_state(node, EN_ABORTED);
> > +            }

Here we propagate the ABORTED state.

> > +
> > +            input_stale = true;
> >          }
> >      }
> >
> > -    bool need_compute = false;
> > +    /* If at least one input is stale, don't change state. */
> > +    if (input_stale) {
> > +        return;
> > +    }

With the current ovn-controller code, input_stale will never be true
unless one of the input nodes aborted computation. However, because of
the is_valid interface we're adding in the last patch of the series,
engine_node_valid(input) might return false in the future based on a
user defined condition even if the state is computed.

Maybe for better readability I can introduce a new state called
EN_INVALID and enter it every time engine_node_valid(input) returns
false. What do you think?

> >
> >      if (engine_force_recompute) {
> > -        return engine_recompute(node, true, !engine_abort_recompute);
> > +        engine_recompute(node, true, !engine_abort_recompute);
> > +        return;
> >      }
> >
> >      /* If any of the inputs updated data but there is no change_handler, then
> >       * recompute the current node too.
> >       */
> > +    bool need_compute = false;
> >      for (size_t i = 0; i < node->n_inputs; i++) {
> > -        if (node->inputs[i].node->changed) {
> > +        if (node->inputs[i].node->state == EN_UPDATED) {
> >              need_compute = true;
> >
> >              /* Trigger a recompute if we don't have a change handler. */
> >              if (!node->inputs[i].change_handler) {
> > -                return engine_recompute(node, false, !engine_abort_recompute);
> > +                engine_recompute(node, false, !engine_abort_recompute);
> > +                return;
> >              }
> >          }
> >      }
> > @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node, uint64_t run_id)
> >          /* If we couldn't compute the node we either aborted or triggered
> >           * a full recompute. In any case, stop processing.
> >           */
> > -        return engine_compute(node, !engine_abort_recompute);
> > +        if (!engine_compute(node, !engine_abort_recompute)) {
> > +            return;
> > +        }
> >      }
> >
> > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > -    return true;
> > +    /* If we reached this point, either the node was updated or its state is
> > +     * still valid.
> > +     */
> > +    if (!engine_node_changed(node)) {
> > +        engine_set_node_state(node, EN_VALID);
> > +    }
> >  }
> >
> > -bool
> > -engine_need_run(struct engine_node *node, uint64_t run_id)
> > +void
> > +engine_run(struct engine_node **nodes, size_t n_count)
> >  {
> > -    size_t i;
> > +    for (size_t i = 0; i < n_count; i++) {
>
> If an input node didn't finish the run, e.g. aborted, then we shouldn't continue running for the node depends on it.

In this case we have to so we propagate the ABORTED state (in the same
way we were doing all the recursive returns).

>
> > +        engine_run_node(nodes[i]);
> > +    }
> > +}
> >
> > -    if (node->run_id == run_id) {
> > +bool
> > +engine_need_run(struct engine_node **nodes, size_t n_count,
> > +                struct engine_node *root_node)
> > +{
> > +    if (engine_has_run(root_node)) {
> >          return false;
> >      }
> >
> > -    if (!node->n_inputs) {
> > -        node->run(node);
> > -        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
> > -        return node->changed;
> > -    }
> > +    for (size_t i = 0; i < n_count; i++) {
> > +        /* Check only leaf nodes. */
> > +        if (nodes[i]->n_inputs) {
> > +            continue;
> > +        }
> >
> > -    for (i = 0; i < node->n_inputs; i++) {
> > -        if (engine_need_run(node->inputs[i].node, run_id)) {
> > +        nodes[i]->run(nodes[i]);
> > +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> > +                 engine_node_state_name[nodes[i]->state]);
> > +        if (nodes[i]->state == EN_UPDATED) {
> >              return true;
> >          }
> >      }
> > -
> >      return false;
> >  }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index abd41b2..69eb9b6 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -82,10 +82,21 @@ struct engine_node_input {
> >      bool (*change_handler)(struct engine_node *node);
> >  };
> >
> > -struct engine_node {
> > -    /* A unique id to distinguish each iteration of the engine_run(). */
> > -    uint64_t run_id;
> > +enum engine_node_state {
> > +    EN_STALE,     /* Data in the node is not up to date with the DB. */
> > +    EN_UPDATED,   /* Data in the node is valid but was updated during the
> > +                   * last run.
> > +                   */
> > +    EN_VALID,     /* Data in the node is valid and didn't change during the
> > +                   * last run.
> > +                   */
> > +    EN_ABORTED,   /* During the last run, processing was aborted for
> > +                   * this node.
> > +                   */
> > +    EN_STATE_MAX,
> > +};
> >
> > +struct engine_node {
> >      /* A unique name for each node. */
> >      char *name;
> >
> > @@ -102,8 +113,8 @@ struct engine_node {
> >       * node. */
> >      void *data;
> >
> > -    /* Whether the data changed in the last engine run. */
> > -    bool changed;
> > +    /* State of the node after the last engine run. */
> > +    enum engine_node_state state;
> >
> >      /* Method to initialize data. It may be NULL. */
> >      void (*init)(struct engine_node *);
> > @@ -116,23 +127,36 @@ struct engine_node {
> >      void (*run)(struct engine_node *);
> >  };
> >
> > -/* Initialize the data for the engine nodes recursively. It calls each node's
> > +/* Return the array of topologically sorted nodes when starting from
> > + * 'root_node'. Stores the number of nodes in 'n_count'.
> > + * It should be called before the main loop.
> > + */
> > +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> > +                                      size_t *n_count);
> > +
> > +/* Initialize the data for the engine nodes. It calls each node's
> >   * init() method if not NULL. It should be called before the main loop. */
> > -void engine_init(struct engine_node *);
> > +void engine_init(struct engine_node **nodes, size_t n_count);
> > +
> > +/* Initialize the engine nodes for a new run. It should be called in the
> > + * main processing loop before every potential engine_run().
> > + */
> > +void engine_init_run(struct engine_node **nodes, size_t n_count,
> > +                     struct engine_node *root_node);
> >
> >  /* Execute the processing recursively, which should be called in the main
>
> This comment should be updated since you changed it to be non-recursive.

Ack.

>
> > - * loop. Returns true if the execution is compelte, false if it is aborted,
> > - * which could happen when engine_abort_recompute is set. */
> > -bool engine_run(struct engine_node *, uint64_t run_id);
> > + * loop. Updates the engine node's states accordingly.
> > + */
> > +void engine_run(struct engine_node **nodes, size_t n_count);
> >
> > -/* Clean up the data for the engine nodes recursively. It calls each node's
> > +/* Clean up the data for the engine nodes. It calls each node's
> >   * cleanup() method if not NULL. It should be called before the program
> >   * terminates. */
> > -void engine_cleanup(struct engine_node *);
> > +void engine_cleanup(struct engine_node **nodes, size_t n_count);
> >
> >  /* Check if engine needs to run but didn't. */
> > -bool
> > -engine_need_run(struct engine_node *, uint64_t run_id);
> > +bool engine_need_run(struct engine_node **nodes, size_t n_count,
> > +                     struct engine_node *root_node);
> >
> >  /* Get the input node with <name> for <node> */
> >  struct engine_node * engine_get_input(const char *input_name,
> > @@ -159,8 +183,22 @@ const struct engine_context * engine_get_context(void);
> >
> >  void engine_set_context(const struct engine_context *);
> >
> > -/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
> > -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> > +void engine_set_node_state_at(struct engine_node *node,
> > +                              enum engine_node_state state,
> > +                              const char *where);
> > +
> > +/* Return true if during the last iteration the node's data was updated. */
> > +bool engine_node_changed(struct engine_node *node);
> > +
> > +/* Return true if the engine has run for 'node' in the last iteration. */
> > +bool engine_has_run(struct engine_node *node);
> > +
> > +/* Returns true if during the last engine run we had to abort processing. */
> > +bool engine_aborted(struct engine_node *node);
> > +
> > +/* Set the state of the node and log changes. */
> > +#define engine_set_node_state(node, state) \
> > +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> >
> >  struct ed_ovsdb_index {
> >      const char *name;
> > @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
> >      struct engine_node en_##NAME = { \
> >          .name = NAME_STR, \
> >          .data = &ed_##NAME, \
> > +        .state = EN_STALE, \
> >          .init = en_##NAME##_init, \
> >          .run = en_##NAME##_run, \
> >          .cleanup = en_##NAME##_cleanup, \
> > @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
> >      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
> >          EN_OVSDB_GET(node); \
> >      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> > -        node->changed = true; \
> > +        engine_set_node_state(node, EN_UPDATED); \
> >          return; \
> >      } \
> > -    node->changed = false; \
> > +    engine_set_node_state(node, EN_VALID); \
> >  } \
> >  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
> >              = NULL; \
> >

Thanks again for the thorough review Han. Before I start working on a
new version of the series maybe we need to agree on some of these
points:

1. Do we switch back to recursion or are you ok with the iterative approach?
2. Do we add a structure, e.g., inc_engine, to store an instance of
the incremental engine (sorted nodes and count, "root node",
force_recompute, abort_recompute)?
3. Do we add a new state EN_INVALID to be reached whenever
engine_node_valid(input) returns false and the input node didn't
abort? It would never be the case in this patch series but it could
happen in the future if more custom is_valid() handlers are added.

Thanks,
Dumitru



More information about the dev mailing list