[ovs-dev] [PATCH v5 ovn 2/4] ovn-controller: Add per node states to I-P engine.

Han Zhou hzhou at ovn.org
Tue Nov 19 18:15:41 UTC 2019


On Tue, Nov 19, 2019 at 1:58 AM Dumitru Ceara <dceara at redhat.com> wrote:
>
> On Tue, Nov 19, 2019 at 1:41 AM Han Zhou <hzhou at ovn.org> wrote:
> >
> > Thanks Dumitru. Please see my comments inline below.
> >
> > On Mon, Nov 18, 2019 at 6:07 AM Dumitru Ceara <dceara at redhat.com> wrote:
> > >
> > > This commit transforms the 'changed' field in struct engine_node in a
> > > 'state' field. Possible node states are:
> > > - "Stale": data in the node is not up to date with the DB.
> > > - "Updated": data in the node is valid but was updated during
> > >   the last run of the engine.
> > > - "Valid": data in the node is valid and didn't change during
> > >   the last run of the engine.
> > > - "Aborted": during the last run, processing was aborted for
> > >   this node.
> > >
> > > This commit also further refactors the I-P engine:
> > > - instead of recursively performing all the engine processing a
> > >   preprocessing stage is added (engine_get_nodes()) before the main
processing
> > >   loop is executed in order to topologically sort nodes in the engine
such
> > >   that all inputs of a given node appear in the sorted array before
the node
> > >   itself. This simplifies a bit the code in engine_run().
> >
> > Could you tell the reason of changing it to non-recursive? It seems
adding more code rather than simplifying, and effort is needed to ensure
the correctness for the new code. Probably there are some benefit that make
the later patches easier, but it is not obvious to me. Could you help point
out if that's the case?
>
> My reasoning was that the engine graph is static (i.e., we build it
> once at startup and it never changes afterwards) so all recursion
> trees are always identical.
>
I agree that the graph is static, which makes non-recursive a good option,
but it doesn't necessarily mean it simplifies code than the recursive
version :)

> Moreover, with adding engine node explicit states we don't really need
> to store a run_id in the nodes because we have the state of each node
> after an engine run. I think removing the run_id is a good idea
> because we minimize the external state the user of the engine should
> manage (in this case engine_run_id and it's incrementing logic).
>
> However, if we keep the engine processing in a recursive fashion then
> for each of the recursive operations (engine_run, engine_need_run,
> engine_init, engine_cleanup) we need a way to avoid executing the
> operation twice for a given node. This can be done by adding more
> flags to the nodes (or more state values) but given that our DAG is
> fixed, precomputing the processing order made more sense to me. What
> do you think?
>
It is true that run_id can be completely avoided with the non-recursive
version. However, it can be kept as internal variable for recursive
version, to skip repeated access of a node, if we use the engine_init_run()
to increment it internally. I think we don't really need any more flags
than the non-recursive version other than this internal run_id, and we can
remove the external engine_run_id incrementing logic for recursive version,
too.

> >
> > > - remove the need for using an engine_run_id by using the newly added
states.
> > >
> > > Signed-off-by: Dumitru Ceara <dceara at redhat.com>
> > > ---
> > >  controller/ovn-controller.c |   88 ++++++++++-------
> > >  lib/inc-proc-eng.c          |  219
++++++++++++++++++++++++++++++++-----------
> > >  lib/inc-proc-eng.h          |   75 +++++++++++----
> > >  3 files changed, 271 insertions(+), 111 deletions(-)
> > >
> > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > > index c56190f..033eff4 100644
> > > --- a/controller/ovn-controller.c
> > > +++ b/controller/ovn-controller.c
> > > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node
*node)
> > >          (struct ed_type_ofctrl_is_connected *)node->data;
> > >      if (data->connected != ofctrl_is_connected()) {
> > >          data->connected = !data->connected;
> > > -        node->changed = true;
> > > +        engine_set_node_state(node, EN_UPDATED);
> > >          return;
> > >      }
> > > -    node->changed = false;
> > > +    engine_set_node_state(node, EN_VALID);
> > >  }
> > >
> > >  struct ed_type_addr_sets {
> > > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
> > >      addr_sets_init(as_table, &as->addr_sets);
> > >
> > >      as->change_tracked = false;
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >  }
> > >
> > >  static bool
> > > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct
engine_node *node)
> > >      addr_sets_update(as_table, &as->addr_sets, &as->new,
> > >                       &as->deleted, &as->updated);
> > >
> > > -    node->changed = !sset_is_empty(&as->new) ||
!sset_is_empty(&as->deleted)
> > > -                    || !sset_is_empty(&as->updated);
> > > +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> > > +            !sset_is_empty(&as->updated)) {
> > > +        engine_set_node_state(node, EN_UPDATED);
> > > +    } else {
> > > +        engine_set_node_state(node, EN_VALID);
> > > +    }
> > >
> > >      as->change_tracked = true;
> > > -    node->changed = true;
> > >      return true;
> > >  }
> > >
> > > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
> > >      port_groups_init(pg_table, &pg->port_groups);
> > >
> > >      pg->change_tracked = false;
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >  }
> > >
> > >  static bool
> > > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct
engine_node *node)
> > >      port_groups_update(pg_table, &pg->port_groups, &pg->new,
> > >                       &pg->deleted, &pg->updated);
> > >
> > > -    node->changed = !sset_is_empty(&pg->new) ||
!sset_is_empty(&pg->deleted)
> > > -                    || !sset_is_empty(&pg->updated);
> > > +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> > > +            !sset_is_empty(&pg->updated)) {
> > > +        engine_set_node_state(node, EN_UPDATED);
> > > +    } else {
> > > +        engine_set_node_state(node, EN_VALID);
> > > +    }
> > >
> > >      pg->change_tracked = true;
> > > -    node->changed = true;
> > >      return true;
> > >  }
> > >
> > > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
> > >      update_ct_zones(local_lports, local_datapaths, ct_zones,
> > >                      ct_zone_bitmap, pending_ct_zones);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >  }
> > >
> > >  static bool
> > > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node
*node)
> > >      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> > >      if (data->mff_ovn_geneve != mff_ovn_geneve) {
> > >          data->mff_ovn_geneve = mff_ovn_geneve;
> > > -        node->changed = true;
> > > +        engine_set_node_state(node, EN_UPDATED);
> > >          return;
> > >      }
> > > -    node->changed = false;
> > > +    engine_set_node_state(node, EN_VALID);
> > >  }
> > >
> > >  struct ed_type_flow_output {
> > > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
> > >                   active_tunnels,
> > >                   flow_table);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >  }
> > >
> > >  static bool
> > > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct
engine_node *node)
> > >                flow_table, group_table, meter_table, lfrr,
> > >                conj_id_ofs);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >      return handled;
> > >  }
> > >
> > > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct
engine_node *node)
> > >      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> > >              mac_binding_table, flow_table);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >      return true;
> > >  }
> > >
> > > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct
engine_node *node)
> > >              chassis, ct_zones, local_datapaths,
> > >              active_tunnels, flow_table);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >      return true;
> > >  }
> > >
> > > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct
engine_node *node)
> > >              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
> > >              flow_table);
> > >
> > > -    node->changed = true;
> > > +    engine_set_node_state(node, EN_UPDATED);
> > >      return true;
> > >
> > >  }
> > > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
> > >                      conj_id_ofs, &changed)) {
> > >              return false;
> > >          }
> > > -        node->changed = changed || node->changed;
> > > +        if (changed) {
> > > +            engine_set_node_state(node, EN_UPDATED);
> > > +        }
> > >      }
> > >      SSET_FOR_EACH (ref_name, updated) {
> > >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
> > >                      conj_id_ofs, &changed)) {
> > >              return false;
> > >          }
> > > -        node->changed = changed || node->changed;
> > > +        if (changed) {
> > > +            engine_set_node_state(node, EN_UPDATED);
> > > +        }
> > >      }
> > >      SSET_FOR_EACH (ref_name, new) {
> > >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
> > >                      conj_id_ofs, &changed)) {
> > >              return false;
> > >          }
> > > -        node->changed = changed || node->changed;
> > > +        if (changed) {
> > > +            engine_set_node_state(node, EN_UPDATED);
> > > +        }
> > >      }
> > >
> > >      return true;
> > > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
> > >      engine_add_input(&en_runtime_data, &en_sb_port_binding,
> > >                       runtime_data_sb_port_binding_handler);
> > >
> > > -    engine_init(&en_flow_output);
> > > +    /* Get the sorted engine nodes to be used for every engine run.
*/
> > > +    size_t en_count = 0;
> > > +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> > > +                                                     &en_count);
> > > +    engine_init(en_nodes, en_count);
> >
> > I think engine_get_nodes() and engine_init() can be combined. We only
need to expose engine_init() interface, which can call engine_get_nodes()
internally and store n_count and nodes internally in engine module.
>
> Ok but then wouldn't it make more sense then to store all this in an
> "struct inc_engine" structure which engine_init() would return? We'd
> also need to move the globals engine_force_recompute,
> engine_abort_recompute, engine_context inside the new inc_engine
> structure then.
>

It is ok to have "struct inc_engine" structure, but it is only necessary if
we want to support multiple instances of inc-engine. It seems not needed at
this moment. I think it is ok to just keep the globals as module private
(static) variables.

> >
> > In addition, there can be more than 1 root node in the DAG. Originally
we can just call engine_run(root_node) for each root node. Now with the
non-recursive, I think we can handle this by:
> >   - in engine_get_nodes, we don't need to pass the root_node, but
instead, the engine can just process all engine nodes.
> >   - remove the root_node parameter from all interfaces.
>
> In general, a DAG doesn't really have a root node, indeed. But even in
> the original code everything was designed with one "ultimate" output
> in mind, en_flow_output, and we would only call
> engine_run(&en_flow_output, ..). I don't think we can remove the
> root_node parameter from engine_get_nodes because we need to have a
> starting point for precomputing the processing order of the nodes. We
> were implicitly doing the same thing through recursion in the original
> code.
>

In fact the recursive version didn't have such constraint. The idea was, if
there is any node that needs compute but is not a offspring of the
flow_output, we could just call engine_run(&en_xxx) in the same iteration
as engine_run(&en_flow_output). (Maybe there were places need some update
since it is never tested this way but in general this design should work).
Now with the non-recursive version, I think we'd better not add any extra
constraint for this. With the current patch, we can't just call engine_run
with another *root node* any more because the interface changed. But I
think this can be solved easily in the topo_sort logic, by processing all
nodes in the graph, instead of starting from a given root node.
In fact, even for the recursive version, it would be even better to let the
engine figure out by itself which nodes are the roots and execute from
there - it was just not that important, it would be needed if there were
more root nodes.
Removing the root node from all the interfaces would also ensure the engine
is always executed/evaluated as a whole and rule out any possible misuse of
the engine by passing a middle node that results in partial exeuction of
the engine and unpredicatble consequences.

> >
> > >
> > >      ofctrl_init(&ed_flow_output.group_table,
> > >                  &ed_flow_output.meter_table,
> > > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
> > >      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1,
inject_pkt,
> > >                               &pending_pkt);
> > >
> > > -    uint64_t engine_run_id = 0;
> > > -    bool engine_run_done = true;
> > > -
> > >      unsigned int ovs_cond_seqno = UINT_MAX;
> > >      unsigned int ovnsb_cond_seqno = UINT_MAX;
> > >
> > > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
> > >      exiting = false;
> > >      restart = false;
> > >      while (!exiting) {
> > > -        engine_run_id++;
> > > +        engine_init_run(en_nodes, en_count, &en_flow_output);
> > >
> > >          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
> > >          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> > > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
> > >                               * this round of engine_run and continue
processing
> > >                               * acculated changes incrementally later
when
> > >                               * ofctrl_can_put() returns true. */
> > > -                            if (engine_run_done) {
> > > +                            if (!engine_aborted(&en_flow_output)) {
> > >                                  engine_set_abort_recompute(true);
> > > -                                engine_run_done =
engine_run(&en_flow_output,
> > > -
engine_run_id);
> > > +                                engine_run(en_nodes, en_count);
> > >                              }
> > >                          } else {
> > >                              engine_set_abort_recompute(false);
> > > -                            engine_run_done = true;
> > > -                            engine_run(&en_flow_output,
engine_run_id);
> > > +                            engine_run(en_nodes, en_count);
> > >                          }
> > >                      }
> > >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> > > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
> > >
sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > >                                 get_nb_cfg(sbrec_sb_global_table_get(
> > >                                                ovnsb_idl_loop.idl)),
> > > -                               en_flow_output.changed);
> > > +                               engine_node_changed(&en_flow_output));
> > >                      pinctrl_run(ovnsb_idl_txn,
> > >                                  sbrec_datapath_binding_by_key,
> > >                                  sbrec_port_binding_by_datapath,
> > > @@ -2089,7 +2100,7 @@ main(int argc, char *argv[])
> > >                                  &ed_runtime_data.local_datapaths,
> > >                                  &ed_runtime_data.active_tunnels);
> > >
> > > -                    if (en_runtime_data.changed) {
> > > +                    if (engine_node_changed(&en_runtime_data)) {
> > >                          update_sb_monitors(ovnsb_idl_loop.idl,
chassis,
> > >
&ed_runtime_data.local_lports,
> > >
&ed_runtime_data.local_datapaths);
> > > @@ -2097,17 +2108,17 @@ main(int argc, char *argv[])
> > >                  }
> > >
> > >              }
> > > -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> > > +            if (engine_need_run(en_nodes, en_count,
&en_flow_output)) {
> > >                  VLOG_DBG("engine did not run, force recompute next
time: "
> > >                              "br_int %p, chassis %p", br_int,
chassis);
> > >                  engine_set_force_recompute(true);
> > >                  poll_immediate_wake();
> > > -            } else if (!engine_run_done) {
> > > +            } else if (engine_aborted(&en_flow_output)) {
> > >                  VLOG_DBG("engine was aborted, force recompute next
time: "
> > >                           "br_int %p, chassis %p", br_int, chassis);
> > >                  engine_set_force_recompute(true);
> > >                  poll_immediate_wake();
> > > -            } else if (!engine_has_run(&en_flow_output,
engine_run_id)) {
> > > +            } else if (!engine_has_run(&en_flow_output)) {
> > >                  VLOG_DBG("engine did not run, and it was not needed"
> > >                           " either: br_int %p, chassis %p",
> > >                           br_int, chassis);
> > > @@ -2135,8 +2146,7 @@ main(int argc, char *argv[])
> > >                      }
> > >                  } else {
> > >                      VLOG_DBG("Pending_pkt conn but br_int %p or
chassis "
> > > -                             "%p not ready. run-id: %"PRIu64, br_int,
> > > -                             chassis, engine_run_id);
> > > +                             "%p not ready.", br_int, chassis);
> > >                      unixctl_command_reply_error(pending_pkt.conn,
> > >                          "ovn-controller not ready.");
> > >                  }
> > > @@ -2185,7 +2195,7 @@ main(int argc, char *argv[])
> > >      }
> > >
> > >      engine_set_context(NULL);
> > > -    engine_cleanup(&en_flow_output);
> > > +    engine_cleanup(en_nodes, en_count);
> > >
> > >      /* It's time to exit.  Clean up the databases if we are not
restarting */
> > >      if (!restart) {
> > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > > index 8a085e2..ee6afbe 100644
> > > --- a/lib/inc-proc-eng.c
> > > +++ b/lib/inc-proc-eng.c
> > > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
> > >  static bool engine_abort_recompute = false;
> > >  static const struct engine_context *engine_context;
> > >
> > > +static const char *engine_node_state_name[EN_STATE_MAX] = {
> > > +    [EN_STALE]   = "Stale",
> > > +    [EN_UPDATED] = "Updated",
> > > +    [EN_VALID]   = "Valid",
> > > +    [EN_ABORTED] = "Aborted",
> > > +};
> > > +
> > >  void
> > >  engine_set_force_recompute(bool val)
> > >  {
> > > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context
*ctx)
> > >      engine_context = ctx;
> > >  }
> > >
> > > -void
> > > -engine_init(struct engine_node *node)
> > > +/* Builds the topologically sorted 'sorted_nodes' array starting from
> > > + * 'node'.
> > > + */
> > > +static struct engine_node **
> > > +engine_topo_sort(struct engine_node *node, struct engine_node
**sorted_nodes,
> > > +                 size_t *n_count, size_t *n_size)
> > >  {
> > > +    /* It's not so efficient to walk the array of already sorted
nodes but
> > > +     * we know that sorting is done only once at startup so it's ok
for now.
> > > +     */
> > > +    for (size_t i = 0; i < *n_count; i++) {
> > > +        if (sorted_nodes[i] == node) {
> > > +            return sorted_nodes;
> > > +        }
> > > +    }
> > > +
> > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > -        engine_init(node->inputs[i].node);
> > > +        sorted_nodes = engine_topo_sort(node->inputs[i].node,
sorted_nodes,
> > > +                                        n_count, n_size);
> > >      }
> > > -    if (node->init) {
> > > -        node->init(node);
> > > +    if (*n_count == *n_size) {
> > > +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof
*sorted_nodes);
> > >      }
> > > +    sorted_nodes[(*n_count)] = node;
> > > +    (*n_count)++;
> > > +    return sorted_nodes;
> > > +}
> > > +
> > > +struct engine_node **
> > > +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> > > +{
> > > +    size_t n_size = 0;
> > > +
> > > +    *n_count = 0;
> > > +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
> > >  }
> > >
> > >  void
> > > -engine_cleanup(struct engine_node *node)
> > > +engine_init(struct engine_node **nodes, size_t n_count)
> > >  {
> > > -    for (size_t i = 0; i < node->n_inputs; i++) {
> > > -        engine_cleanup(node->inputs[i].node);
> > > +    for (size_t i = 0; i < n_count; i++) {
> > > +        if (nodes[i]->init) {
> > > +            nodes[i]->init(nodes[i]);
> > > +        }
> > >      }
> > > -    if (node->cleanup) {
> > > -        node->cleanup(node);
> > > +}
> > > +
> > > +void
> > > +engine_cleanup(struct engine_node **nodes, size_t n_count)
> > > +{
> > > +    for (size_t i = 0; i < n_count; i++) {
> > > +        if (nodes[i]->cleanup) {
> > > +            nodes[i]->cleanup(nodes[i]);
> > > +        }
> > >      }
> > > +    free(nodes);
> > >  }
> > >
> > >  struct engine_node *
> > > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node
*node, const char *name,
> > >      ed->n_indexes ++;
> > >  }
> > >
> > > +void
> > > +engine_set_node_state_at(struct engine_node *node,
> > > +                         enum engine_node_state state,
> > > +                         const char *where)
> > > +{
> > > +    if (node->state == state) {
> > > +        return;
> > > +    }
> > > +
> > > +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> > > +             where, node->name,
> > > +             engine_node_state_name[node->state],
> > > +             engine_node_state_name[state]);
> > > +
> > > +    node->state = state;
> > > +}
> > > +
> > > +static bool
> > > +engine_node_valid(struct engine_node *node)
> > > +{
> > > +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> > > +}
> > > +
> > >  bool
> > > -engine_has_run(struct engine_node *node, uint64_t run_id)
> > > +engine_node_changed(struct engine_node *node)
> > >  {
> > > -    return node->run_id == run_id;
> > > +    return node->state == EN_UPDATED;
> > > +}
> > > +
> > > +bool
> > > +engine_has_run(struct engine_node *node)
> > > +{
> >
> > engine_has_run() should go through all nodes. If any node is NOT STALE,
return true. Engine hasn't run only if all nodes are STALE. (orginially it
is easier to tell by utilizing engine_run_id)
> > If some nodes are STALE, some are not, it means engine has run but
aborted.
>
> Due to the way the code is written (and as you noticed in earlier
> versions of the patches) the STALE state is transient. If the engine
> ran, even if it aborted, then no node will be in state STALE. Please
> see below regarding how ABORTED is propagated.
>

Ok, sorry that I misread the code earlier. However, after rewalk the code
with your explain, it seems there are still chances (in theory) that some
nodes are ABORTED but some nodes are STALE. For example, if A is input of
B, and if A is UPDATED but is_valid(A) returns false because of custom
implementation of is_valid(), then B will stay STALE.
Would it be more generic and correct to go through all nodes and tell if
the engine has run as a whole instead of based on the input node? (which
suggests not having the node arg)

> >
> > > +    return node->state != EN_STALE;
> > > +}
> > > +
> > > +bool
> > > +engine_aborted(struct engine_node *node)
> > > +{
> > > +    return node->state == EN_ABORTED;
> > > +}
> > > +
> > > +void
> > > +engine_init_run(struct engine_node **nodes, size_t n_count,
> > > +                struct engine_node *root_node)
> > > +{
> > > +    /* No need to reinitialize if last run didn't happen. */
> > > +    if (!engine_has_run(root_node)) {
> >
> > I think here is a problem. If in the last round, the root node didn't
run because it is aborted at some intermediate node, but many other nodes
could have run and state already changed to VALID/UPDATED. Now if we don't
do the init, those nodes may contain invalid data since it is a new round
of iteration, but the state telling they are valid.
>
> If one of the inputs of a given node went to ABORT state then the node
> itself will also move to ABORT. And this will get propagated to all
> successor nodes.
> The idea of the check was to avoid walking all the nodes if the
> root_node is in state STALE (engine_has_run() == false). STALE is
> transient so if one node is STALE then all nodes are STALE and this
> can only happen if the engine didn't run in the current iteration.
>
Sorry it was my misunderstanding of ABORT propagation logic.

> >
> > We don't need to pass "root_node" for engine_init_run. We can just
reset all nodes to STALE state.
>
> I agree, we could decide if an engine needs init_run by checking that
> the first node in 'nodes' is not in state STALE.
>
> >
> >
> > > +        return;
> > > +    }
> > > +
> > > +    VLOG_DBG("Initializing new run");
> > > +    for (size_t i = 0; i < n_count; i++) {
> > > +        engine_set_node_state(nodes[i], EN_STALE);
> > > +    }
> > >  }
> > >
> > >  /* Do a full recompute (or at least try). If we're not allowed then
> > >   * mark the node as "aborted".
> > >   */
> > > -static bool
> > > +static void
> > >  engine_recompute(struct engine_node *node, bool forced, bool allowed)
> > >  {
> > >      VLOG_DBG("node: %s, recompute (%s)", node->name,
> > > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool
forced, bool allowed)
> > >
> > >      if (!allowed) {
> > >          VLOG_DBG("node: %s, recompute aborted", node->name);
> > > -        return false;
> > > +        engine_set_node_state(node, EN_ABORTED);
> > > +        return;
> > >      }
> > >
> > > +    /* Run the node handler which might change state. */
> > >      node->run(node);
> > > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > -    return true;
> > >  }
> > >
> > >  /* Return true if the node could be computed without triggerring a
full
> > > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
> > >  {
> > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > >          /* If the input node data changed call its change handler. */
> > > -        if (node->inputs[i].node->changed) {
> > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > >              VLOG_DBG("node: %s, handle change for input %s",
> > >                       node->name, node->inputs[i].node->name);
> > >
> > > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
> > >                  VLOG_DBG("node: %s, can't handle change for input
%s, "
> > >                           "fall back to recompute",
> > >                           node->name, node->inputs[i].node->name);
> > > -                if (!engine_recompute(node, false,
recompute_allowed)) {
> > > +                engine_recompute(node, false, recompute_allowed);
> > > +                if (engine_aborted(node)) {
> >
> > The aborted state was propagated through the recursive logic, but it is
not the case in this new implementation. Is this on purpose?
>
> With the new implementation the aborted state is also propagated, just
> that it's done in engine_run_node().
>
Ack.

> >
> > >                      return false;
> > >                  }
> > >              }
> > >          }
> > >      }
> > > -
> > >      return true;
> > >  }
> > >
> > > -bool engine_run(struct engine_node *node, uint64_t run_id)
> > > +static void
> > > +engine_run_node(struct engine_node *node)
> > >  {
> > > -    if (node->run_id == run_id) {
> > > -        /* The node was already updated in this run (could be input
for
> > > -         * multiple other nodes). Stop processing.
> > > -         */
> > > -        return true;
> > > -    }
> > > -
> > > -    /* Initialize the node for this run. */
> > > -    node->run_id = run_id;
> > > -    node->changed = false;
> > > -
> > >      if (!node->n_inputs) {
> > > +        /* Run the node handler which might change state. */
> > >          node->run(node);
> > > -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > -        return true;
> > > +        return;
> > >      }
> > >
> > > +    bool input_stale = false;
> > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > -        if (!engine_run(node->inputs[i].node, run_id)) {
> > > -            return false;
> > > +        if (!engine_node_valid(node->inputs[i].node)) {
> > > +            /* If the input node aborted computation, move to
EN_ABORTED.
> > > +             * This will be propagated to following nodes.
> > > +             */
> > > +            if (engine_aborted(node->inputs[i].node)) {
> > > +                engine_set_node_state(node, EN_ABORTED);
> > > +            }
>
> Here we propagate the ABORTED state.
>
Ack.

> > > +
> > > +            input_stale = true;
> > >          }
> > >      }
> > >
> > > -    bool need_compute = false;
> > > +    /* If at least one input is stale, don't change state. */
> > > +    if (input_stale) {
> > > +        return;
> > > +    }
>
> With the current ovn-controller code, input_stale will never be true
> unless one of the input nodes aborted computation. However, because of
> the is_valid interface we're adding in the last patch of the series,
> engine_node_valid(input) might return false in the future based on a
> user defined condition even if the state is computed.
>
> Maybe for better readability I can introduce a new state called
> EN_INVALID and enter it every time engine_node_valid(input) returns
> false. What do you think?
>
I think we have 2 options here, each based on different assumption.

Option1: Assume there is no use case that a custom is_valid() would return
false after the node is computed. We can assert this assumption in
engine_node_valid() by checking the node state first, and only call the
node->is_valid() when the node state is not valid (i.e. ABORTED/STALE).

Option2: Assume there is real use case that a custom is_valid() would
return false after the node is computed, probably because some of the input
doesn't meet the node's expectation. In this case I think instead of
relying on the call of is_valid(), it is more straightforward that the
node->run() or node->change_handler_xxx() tells this unexpected situation
and set the node state directly, like how the UPDATED state is set. It is
better not to set the state to INVALID only when engine_node_valid() is
called.

I tend to do option1 first, because the more state, the more complexity,
and I don't see such use case of option2 yet.

> > >
> > >      if (engine_force_recompute) {
> > > -        return engine_recompute(node, true, !engine_abort_recompute);
> > > +        engine_recompute(node, true, !engine_abort_recompute);
> > > +        return;
> > >      }
> > >
> > >      /* If any of the inputs updated data but there is no
change_handler, then
> > >       * recompute the current node too.
> > >       */
> > > +    bool need_compute = false;
> > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > -        if (node->inputs[i].node->changed) {
> > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > >              need_compute = true;
> > >
> > >              /* Trigger a recompute if we don't have a change
handler. */
> > >              if (!node->inputs[i].change_handler) {
> > > -                return engine_recompute(node, false,
!engine_abort_recompute);
> > > +                engine_recompute(node, false,
!engine_abort_recompute);
> > > +                return;
> > >              }
> > >          }
> > >      }
> > > @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node,
uint64_t run_id)
> > >          /* If we couldn't compute the node we either aborted or
triggered
> > >           * a full recompute. In any case, stop processing.
> > >           */
> > > -        return engine_compute(node, !engine_abort_recompute);
> > > +        if (!engine_compute(node, !engine_abort_recompute)) {
> > > +            return;
> > > +        }
> > >      }
> > >
> > > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > -    return true;
> > > +    /* If we reached this point, either the node was updated or its
state is
> > > +     * still valid.
> > > +     */
> > > +    if (!engine_node_changed(node)) {
> > > +        engine_set_node_state(node, EN_VALID);
> > > +    }
> > >  }
> > >
> > > -bool
> > > -engine_need_run(struct engine_node *node, uint64_t run_id)
> > > +void
> > > +engine_run(struct engine_node **nodes, size_t n_count)
> > >  {
> > > -    size_t i;
> > > +    for (size_t i = 0; i < n_count; i++) {
> >
> > If an input node didn't finish the run, e.g. aborted, then we shouldn't
continue running for the node depends on it.
>
> In this case we have to so we propagate the ABORTED state (in the same
> way we were doing all the recursive returns).
>
Ack.

> >
> > > +        engine_run_node(nodes[i]);
> > > +    }
> > > +}
> > >
> > > -    if (node->run_id == run_id) {
> > > +bool
> > > +engine_need_run(struct engine_node **nodes, size_t n_count,
> > > +                struct engine_node *root_node)
> > > +{
> > > +    if (engine_has_run(root_node)) {
> > >          return false;
> > >      }
> > >
> > > -    if (!node->n_inputs) {
> > > -        node->run(node);
> > > -        VLOG_DBG("input node: %s, changed: %d", node->name,
node->changed);
> > > -        return node->changed;
> > > -    }
> > > +    for (size_t i = 0; i < n_count; i++) {
> > > +        /* Check only leaf nodes. */
> > > +        if (nodes[i]->n_inputs) {
> > > +            continue;
> > > +        }
> > >
> > > -    for (i = 0; i < node->n_inputs; i++) {
> > > -        if (engine_need_run(node->inputs[i].node, run_id)) {
> > > +        nodes[i]->run(nodes[i]);
> > > +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> > > +                 engine_node_state_name[nodes[i]->state]);
> > > +        if (nodes[i]->state == EN_UPDATED) {
> > >              return true;
> > >          }
> > >      }
> > > -
> > >      return false;
> > >  }
> > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > > index abd41b2..69eb9b6 100644
> > > --- a/lib/inc-proc-eng.h
> > > +++ b/lib/inc-proc-eng.h
> > > @@ -82,10 +82,21 @@ struct engine_node_input {
> > >      bool (*change_handler)(struct engine_node *node);
> > >  };
> > >
> > > -struct engine_node {
> > > -    /* A unique id to distinguish each iteration of the
engine_run(). */
> > > -    uint64_t run_id;
> > > +enum engine_node_state {
> > > +    EN_STALE,     /* Data in the node is not up to date with the DB.
*/
> > > +    EN_UPDATED,   /* Data in the node is valid but was updated
during the
> > > +                   * last run.
> > > +                   */
> > > +    EN_VALID,     /* Data in the node is valid and didn't change
during the
> > > +                   * last run.
> > > +                   */
> > > +    EN_ABORTED,   /* During the last run, processing was aborted for
> > > +                   * this node.
> > > +                   */
> > > +    EN_STATE_MAX,
> > > +};
> > >
> > > +struct engine_node {
> > >      /* A unique name for each node. */
> > >      char *name;
> > >
> > > @@ -102,8 +113,8 @@ struct engine_node {
> > >       * node. */
> > >      void *data;
> > >
> > > -    /* Whether the data changed in the last engine run. */
> > > -    bool changed;
> > > +    /* State of the node after the last engine run. */
> > > +    enum engine_node_state state;
> > >
> > >      /* Method to initialize data. It may be NULL. */
> > >      void (*init)(struct engine_node *);
> > > @@ -116,23 +127,36 @@ struct engine_node {
> > >      void (*run)(struct engine_node *);
> > >  };
> > >
> > > -/* Initialize the data for the engine nodes recursively. It calls
each node's
> > > +/* Return the array of topologically sorted nodes when starting from
> > > + * 'root_node'. Stores the number of nodes in 'n_count'.
> > > + * It should be called before the main loop.
> > > + */
> > > +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> > > +                                      size_t *n_count);
> > > +
> > > +/* Initialize the data for the engine nodes. It calls each node's
> > >   * init() method if not NULL. It should be called before the main
loop. */
> > > -void engine_init(struct engine_node *);
> > > +void engine_init(struct engine_node **nodes, size_t n_count);
> > > +
> > > +/* Initialize the engine nodes for a new run. It should be called in
the
> > > + * main processing loop before every potential engine_run().
> > > + */
> > > +void engine_init_run(struct engine_node **nodes, size_t n_count,
> > > +                     struct engine_node *root_node);
> > >
> > >  /* Execute the processing recursively, which should be called in the
main
> >
> > This comment should be updated since you changed it to be non-recursive.
>
> Ack.
>
> >
> > > - * loop. Returns true if the execution is compelte, false if it is
aborted,
> > > - * which could happen when engine_abort_recompute is set. */
> > > -bool engine_run(struct engine_node *, uint64_t run_id);
> > > + * loop. Updates the engine node's states accordingly.
> > > + */
> > > +void engine_run(struct engine_node **nodes, size_t n_count);
> > >
> > > -/* Clean up the data for the engine nodes recursively. It calls each
node's
> > > +/* Clean up the data for the engine nodes. It calls each node's
> > >   * cleanup() method if not NULL. It should be called before the
program
> > >   * terminates. */
> > > -void engine_cleanup(struct engine_node *);
> > > +void engine_cleanup(struct engine_node **nodes, size_t n_count);
> > >
> > >  /* Check if engine needs to run but didn't. */
> > > -bool
> > > -engine_need_run(struct engine_node *, uint64_t run_id);
> > > +bool engine_need_run(struct engine_node **nodes, size_t n_count,
> > > +                     struct engine_node *root_node);
> > >
> > >  /* Get the input node with <name> for <node> */
> > >  struct engine_node * engine_get_input(const char *input_name,
> > > @@ -159,8 +183,22 @@ const struct engine_context *
engine_get_context(void);
> > >
> > >  void engine_set_context(const struct engine_context *);
> > >
> > > -/* Return true if the engine has run for 'node' in the 'run_id'
iteration. */
> > > -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> > > +void engine_set_node_state_at(struct engine_node *node,
> > > +                              enum engine_node_state state,
> > > +                              const char *where);
> > > +
> > > +/* Return true if during the last iteration the node's data was
updated. */
> > > +bool engine_node_changed(struct engine_node *node);
> > > +
> > > +/* Return true if the engine has run for 'node' in the last
iteration. */
> > > +bool engine_has_run(struct engine_node *node);
> > > +
> > > +/* Returns true if during the last engine run we had to abort
processing. */
> > > +bool engine_aborted(struct engine_node *node);
> > > +
> > > +/* Set the state of the node and log changes. */
> > > +#define engine_set_node_state(node, state) \
> > > +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> > >
> > >  struct ed_ovsdb_index {
> > >      const char *name;
> > > @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct
engine_node *, const char *name,
> > >      struct engine_node en_##NAME = { \
> > >          .name = NAME_STR, \
> > >          .data = &ed_##NAME, \
> > > +        .state = EN_STALE, \
> > >          .init = en_##NAME##_init, \
> > >          .run = en_##NAME##_run, \
> > >          .cleanup = en_##NAME##_cleanup, \
> > > @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct
engine_node *node) \
> > >      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
> > >          EN_OVSDB_GET(node); \
> > >      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> > > -        node->changed = true; \
> > > +        engine_set_node_state(node, EN_UPDATED); \
> > >          return; \
> > >      } \
> > > -    node->changed = false; \
> > > +    engine_set_node_state(node, EN_VALID); \
> > >  } \
> > >  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node
*node) \
> > >              = NULL; \
> > >
>
> Thanks again for the thorough review Han. Before I start working on a
> new version of the series maybe we need to agree on some of these
> points:
>
> 1. Do we switch back to recursion or are you ok with the iterative
approach?
In general I'd prefer to the one with simpler logic, and if they are
similar then I prefer the one with less changes. In current case I've
already got familiar the new approach, and I am not sure which one would
require more effort for v4 at this point, so please pick either one that
you are comfortable with.

> 2. Do we add a structure, e.g., inc_engine, to store an instance of
> the incremental engine (sorted nodes and count, "root node",
> force_recompute, abort_recompute)?
We should move the sorted nodes and count as module static variables, but
structure is not needed.

> 3. Do we add a new state EN_INVALID to be reached whenever
> engine_node_valid(input) returns false and the input node didn't
> abort? It would never be the case in this patch series but it could
> happen in the future if more custom is_valid() handlers are added.

I tend to choose option1 of the inlined comment above.

>
> Thanks,
> Dumitru
>


More information about the dev mailing list