[ovs-dev] [PATCH v6 ovn 2/4] ovn-controller: Add per node states to I-P engine.

Han Zhou hzhou at ovn.org
Wed Nov 27 01:24:07 UTC 2019


Thanks Dumitru, please see my comments inlined.

On Fri, Nov 22, 2019 at 8:13 AM Dumitru Ceara <dceara at redhat.com> wrote:
>
> This commit transforms the 'changed' field in struct engine_node in a
> 'state' field. Possible node states are:
> - "Stale": data in the node is not up to date with the DB.
> - "Updated": data in the node is valid but was updated during
>   the last run of the engine.
> - "Valid": data in the node is valid and didn't change during
>   the last run of the engine.
> - "Aborted": during the last run, processing was aborted for
>   this node.
>
> This commit also further refactors the I-P engine:
> - instead of recursively performing all the engine processing a
>   preprocessing stage is added (engine_get_nodes()) before the main
processing
>   loop is executed in order to topologically sort nodes in the engine such
>   that all inputs of a given node appear in the sorted array before the
node
>   itself. This simplifies a bit the code in engine_run().
> - remove the need for using an engine_run_id by using the newly added
states.
> - turn the global 'engine_abort_recompute' into an argument to be passed
to
>   engine_run(). It's relevant only in the current run context anyway as
>   we reset it before every call to engine_run().
>
> Signed-off-by: Dumitru Ceara <dceara at redhat.com>
> ---
>  controller/ovn-controller.c |   84 ++++++++-------
>  lib/inc-proc-eng.c          |  242
++++++++++++++++++++++++++++++++-----------
>  lib/inc-proc-eng.h          |   74 +++++++++----
>  3 files changed, 276 insertions(+), 124 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index c56190f..a588531 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
>          (struct ed_type_ofctrl_is_connected *)node->data;
>      if (data->connected != ofctrl_is_connected()) {
>          data->connected = !data->connected;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_addr_sets {
> @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
>      addr_sets_init(as_table, &as->addr_sets);
>
>      as->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct engine_node
*node)
>      addr_sets_update(as_table, &as->addr_sets, &as->new,
>                       &as->deleted, &as->updated);
>
> -    node->changed = !sset_is_empty(&as->new) ||
!sset_is_empty(&as->deleted)
> -                    || !sset_is_empty(&as->updated);
> +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> +            !sset_is_empty(&as->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      as->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
>      port_groups_init(pg_table, &pg->port_groups);
>
>      pg->change_tracked = false;
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct
engine_node *node)
>      port_groups_update(pg_table, &pg->port_groups, &pg->new,
>                       &pg->deleted, &pg->updated);
>
> -    node->changed = !sset_is_empty(&pg->new) ||
!sset_is_empty(&pg->deleted)
> -                    || !sset_is_empty(&pg->updated);
> +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> +            !sset_is_empty(&pg->updated)) {
> +        engine_set_node_state(node, EN_UPDATED);
> +    } else {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>
>      pg->change_tracked = true;
> -    node->changed = true;
>      return true;
>  }
>
> @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
>      update_ct_zones(local_lports, local_datapaths, ct_zones,
>                      ct_zone_bitmap, pending_ct_zones);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node *node)
>      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
>      if (data->mff_ovn_geneve != mff_ovn_geneve) {
>          data->mff_ovn_geneve = mff_ovn_geneve;
> -        node->changed = true;
> +        engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> -    node->changed = false;
> +    engine_set_node_state(node, EN_VALID);
>  }
>
>  struct ed_type_flow_output {
> @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
>                   active_tunnels,
>                   flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>  }
>
>  static bool
> @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct
engine_node *node)
>                flow_table, group_table, meter_table, lfrr,
>                conj_id_ofs);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return handled;
>  }
>
> @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct
engine_node *node)
>      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
>              mac_binding_table, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct
engine_node *node)
>              chassis, ct_zones, local_datapaths,
>              active_tunnels, flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>  }
>
> @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct
engine_node *node)
>              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
>              flow_table);
>
> -    node->changed = true;
> +    engine_set_node_state(node, EN_UPDATED);
>      return true;
>
>  }
> @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, updated) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>      SSET_FOR_EACH (ref_name, new) {
>          if (!lflow_handle_changed_ref(ref_type, ref_name,
> @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>                      conj_id_ofs, &changed)) {
>              return false;
>          }
> -        node->changed = changed || node->changed;
> +        if (changed) {
> +            engine_set_node_state(node, EN_UPDATED);
> +        }
>      }
>
>      return true;
> @@ -1941,9 +1953,6 @@ main(int argc, char *argv[])
>      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
>                               &pending_pkt);
>
> -    uint64_t engine_run_id = 0;
> -    bool engine_run_done = true;
> -
>      unsigned int ovs_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>
> @@ -1951,7 +1960,7 @@ main(int argc, char *argv[])
>      exiting = false;
>      restart = false;
>      while (!exiting) {
> -        engine_run_id++;
> +        engine_init_run();
>
>          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
>          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> @@ -2044,15 +2053,11 @@ main(int argc, char *argv[])
>                               * this round of engine_run and continue
processing
>                               * acculated changes incrementally later when
>                               * ofctrl_can_put() returns true. */
> -                            if (engine_run_done) {
> -                                engine_set_abort_recompute(true);
> -                                engine_run_done =
engine_run(&en_flow_output,
> -
engine_run_id);
> +                            if (!engine_aborted()) {
> +                                engine_run(true);

Since we reset the nodes' state in the beginning of every iteration, this
check will always be true - engine_aborted() will never return false at
this point.
I think we may still need the "engine_run_done" variable to keep track
whether it has aborted before.

>                              }
>                          } else {
> -                            engine_set_abort_recompute(false);
> -                            engine_run_done = true;
> -                            engine_run(&en_flow_output, engine_run_id);
> +                            engine_run(false);
>                          }
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> @@ -2071,7 +2076,7 @@ main(int argc, char *argv[])
>                                 sbrec_meter_table_get(ovnsb_idl_loop.idl),
>                                 get_nb_cfg(sbrec_sb_global_table_get(
>                                                ovnsb_idl_loop.idl)),
> -                               en_flow_output.changed);
> +                               engine_node_changed(&en_flow_output));
>                      pinctrl_run(ovnsb_idl_txn,
>                                  sbrec_datapath_binding_by_key,
>                                  sbrec_port_binding_by_datapath,
> @@ -2089,7 +2094,7 @@ main(int argc, char *argv[])
>                                  &ed_runtime_data.local_datapaths,
>                                  &ed_runtime_data.active_tunnels);
>
> -                    if (en_runtime_data.changed) {
> +                    if (engine_node_changed(&en_runtime_data)) {
>                          update_sb_monitors(ovnsb_idl_loop.idl, chassis,
>                                             &ed_runtime_data.local_lports,
>
&ed_runtime_data.local_datapaths);
> @@ -2097,17 +2102,17 @@ main(int argc, char *argv[])
>                  }
>
>              }
> -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> +            if (engine_need_run()) {
>                  VLOG_DBG("engine did not run, force recompute next time:
"
>                              "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_run_done) {
> +            } else if (engine_aborted()) {
>                  VLOG_DBG("engine was aborted, force recompute next time:
"
>                           "br_int %p, chassis %p", br_int, chassis);
>                  engine_set_force_recompute(true);
>                  poll_immediate_wake();
> -            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
> +            } else if (!engine_has_run()) {
>                  VLOG_DBG("engine did not run, and it was not needed"
>                           " either: br_int %p, chassis %p",
>                           br_int, chassis);
> @@ -2135,8 +2140,7 @@ main(int argc, char *argv[])
>                      }
>                  } else {
>                      VLOG_DBG("Pending_pkt conn but br_int %p or chassis "
> -                             "%p not ready. run-id: %"PRIu64, br_int,
> -                             chassis, engine_run_id);
> +                             "%p not ready.", br_int, chassis);
>                      unixctl_command_reply_error(pending_pkt.conn,
>                          "ovn-controller not ready.");
>                  }
> @@ -2185,7 +2189,7 @@ main(int argc, char *argv[])
>      }
>
>      engine_set_context(NULL);
> -    engine_cleanup(&en_flow_output);
> +    engine_cleanup();
>
>      /* It's time to exit.  Clean up the databases if we are not
restarting */
>      if (!restart) {
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index ff07ad9..f88116f 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -31,21 +31,24 @@
>  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
>
>  static bool engine_force_recompute = false;
> -static bool engine_abort_recompute = false;
>  static const struct engine_context *engine_context;
>
> +static struct engine_node **engine_nodes;
> +static size_t engine_n_nodes;
> +
> +static const char *engine_node_state_name[EN_STATE_MAX] = {
> +    [EN_STALE]   = "Stale",
> +    [EN_UPDATED] = "Updated",
> +    [EN_VALID]   = "Valid",
> +    [EN_ABORTED] = "Aborted",
> +};
> +
>  void
>  engine_set_force_recompute(bool val)
>  {
>      engine_force_recompute = val;
>  }
>
> -void
> -engine_set_abort_recompute(bool val)
> -{
> -    engine_abort_recompute = val;
> -}
> -
>  const struct engine_context *
>  engine_get_context(void)
>  {
> @@ -58,26 +61,69 @@ engine_set_context(const struct engine_context *ctx)
>      engine_context = ctx;
>  }
>
> -void
> -engine_init(struct engine_node *node)
> +/* Builds the topologically sorted 'sorted_nodes' array starting from
> + * 'node'.
> + */
> +static struct engine_node **
> +engine_topo_sort(struct engine_node *node, struct engine_node
**sorted_nodes,
> +                 size_t *n_count, size_t *n_size)
>  {
> +    /* It's not so efficient to walk the array of already sorted nodes
but
> +     * we know that sorting is done only once at startup so it's ok for
now.
> +     */
> +    for (size_t i = 0; i < *n_count; i++) {
> +        if (sorted_nodes[i] == node) {
> +            return sorted_nodes;
> +        }
> +    }
> +
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_init(node->inputs[i].node);
> +        sorted_nodes = engine_topo_sort(node->inputs[i].node,
sorted_nodes,
> +                                        n_count, n_size);
>      }
> -    if (node->init) {
> -        node->init(node);
> +    if (*n_count == *n_size) {
> +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof
*sorted_nodes);
>      }
> +    sorted_nodes[(*n_count)] = node;
> +    (*n_count)++;
> +    return sorted_nodes;
> +}
> +
> +/* Return the array of topologically sorted nodes when starting from
> + * 'node'. Stores the number of nodes in 'n_count'.
> + */
> +static struct engine_node **
> +engine_get_nodes(struct engine_node *node, size_t *n_count)
> +{
> +    size_t n_size = 0;
> +
> +    *n_count = 0;
> +    return engine_topo_sort(node, NULL, n_count, &n_size);
>  }
>
>  void
> -engine_cleanup(struct engine_node *node)
> +engine_init(struct engine_node *node)
>  {
> -    for (size_t i = 0; i < node->n_inputs; i++) {
> -        engine_cleanup(node->inputs[i].node);
> +    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> +
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        if (engine_nodes[i]->init) {
> +            engine_nodes[i]->init(engine_nodes[i]);
> +        }
>      }
> -    if (node->cleanup) {
> -        node->cleanup(node);
> +}
> +
> +void
> +engine_cleanup(void)
> +{
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        if (engine_nodes[i]->cleanup) {
> +            engine_nodes[i]->cleanup(engine_nodes[i]);
> +        }
>      }
> +    free(engine_nodes);
> +    engine_nodes = NULL;
> +    engine_n_nodes = 0;
>  }
>
>  struct engine_node *
> @@ -128,16 +174,70 @@ engine_ovsdb_node_add_index(struct engine_node
*node, const char *name,
>      ed->n_indexes ++;
>  }
>
> +void
> +engine_set_node_state_at(struct engine_node *node,
> +                         enum engine_node_state state,
> +                         const char *where)
> +{
> +    if (node->state == state) {
> +        return;
> +    }
> +
> +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> +             where, node->name,
> +             engine_node_state_name[node->state],
> +             engine_node_state_name[state]);
> +
> +    node->state = state;
> +}
> +
> +static bool
> +engine_node_valid(struct engine_node *node)
> +{
> +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> +}
> +
>  bool
> -engine_has_run(struct engine_node *node, uint64_t run_id)
> +engine_node_changed(struct engine_node *node)
>  {
> -    return node->run_id == run_id;
> +    return node->state == EN_UPDATED;
> +}
> +
> +bool
> +engine_has_run(void)
> +{
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        if (engine_nodes[i]->state == EN_STALE) {
> +            return false;
> +        }
> +    }
> +    return true;

I think engine_has_run() should return true if engine_run() is called in
current iteration. We were using engine_run_id to track it. Now we rely on
this interface.
So it should return true if any node is NOT in STALE state, and return
false only if all nodes are STALE.

> +}
> +
> +bool
> +engine_aborted(void)
> +{
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        if (engine_nodes[i]->state == EN_ABORTED) {
> +            return true;
> +        }
> +    }
> +    return false;
> +}
> +
> +void
> +engine_init_run(void)
> +{
> +    VLOG_DBG("Initializing new run");
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        engine_set_node_state(engine_nodes[i], EN_STALE);
> +    }
>  }
>
>  /* Do a full recompute (or at least try). If we're not allowed then
>   * mark the node as "aborted".
>   */
> -static bool
> +static void
>  engine_recompute(struct engine_node *node, bool forced, bool allowed)
>  {
>      VLOG_DBG("node: %s, recompute (%s)", node->name,
> @@ -145,12 +245,12 @@ engine_recompute(struct engine_node *node, bool
forced, bool allowed)
>
>      if (!allowed) {
>          VLOG_DBG("node: %s, recompute aborted", node->name);
> -        return false;
> +        engine_set_node_state(node, EN_ABORTED);
> +        return;
>      }
>
> +    /* Run the node handler which might change state. */
>      node->run(node);
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
>  }
>
>  /* Return true if the node could be computed, false otherwise. */
> @@ -159,7 +259,7 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>  {
>      for (size_t i = 0; i < node->n_inputs; i++) {
>          /* If the input node data changed call its change handler. */
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              VLOG_DBG("node: %s, handle change for input %s",
>                       node->name, node->inputs[i].node->name);
>
> @@ -170,55 +270,62 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>                  VLOG_DBG("node: %s, can't handle change for input %s, "
>                           "fall back to recompute",
>                           node->name, node->inputs[i].node->name);
> -                return engine_recompute(node, false, recompute_allowed);
> +                engine_recompute(node, false, recompute_allowed);
> +                if (node->state == EN_ABORTED) {
> +                    return false;
> +                }
> +                return true;
>              }
>          }
>      }
> -
>      return true;
>  }
>
> -bool engine_run(struct engine_node *node, uint64_t run_id)
> +static void
> +engine_run_node(struct engine_node *node, bool recompute_allowed)
>  {
> -    if (node->run_id == run_id) {
> -        /* The node was already updated in this run (could be input for
> -         * multiple other nodes). Stop processing.
> -         */
> -        return true;
> -    }
> -
> -    /* Initialize the node for this run. */
> -    node->run_id = run_id;
> -    node->changed = false;
> -
>      if (!node->n_inputs) {
> +        /* Run the node handler which might change state. */
>          node->run(node);
> -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -        return true;
> +        return;
>      }
>
> +    bool input_stale = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (!engine_run(node->inputs[i].node, run_id)) {
> -            return false;
> +        if (!engine_node_valid(node->inputs[i].node)) {

It seems we can simply return and leave the node as in STALE state without
any further processing, if any of the input is invalid. In fact, I think we
can completely avoid this situation, if we quit the engine_run() whenever a
node is aborted. See my comment in engine_run().

> +            /* If the input node aborted computation, move to EN_ABORTED.
> +             * This will be propagated to following nodes.
> +             */
> +            if (node->inputs[i].node->state == EN_ABORTED) {
> +                engine_set_node_state(node, EN_ABORTED);
> +            }
> +
> +            input_stale = true;
>          }
>      }
>
> -    bool need_compute = false;
> +    /* If at least one input is stale, don't change state. */
> +    if (input_stale) {
> +        return;
> +    }
>
>      if (engine_force_recompute) {
> -        return engine_recompute(node, true, !engine_abort_recompute);
> +        engine_recompute(node, true, recompute_allowed);
> +        return;
>      }
>
>      /* If any of the inputs updated data but there is no change_handler,
then
>       * recompute the current node too.
>       */
> +    bool need_compute = false;
>      for (size_t i = 0; i < node->n_inputs; i++) {
> -        if (node->inputs[i].node->changed) {
> +        if (node->inputs[i].node->state == EN_UPDATED) {
>              need_compute = true;
>
>              /* Trigger a recompute if we don't have a change handler. */
>              if (!node->inputs[i].change_handler) {
> -                return engine_recompute(node, false,
!engine_abort_recompute);
> +                engine_recompute(node, false, recompute_allowed);
> +                return;
>              }
>          }
>      }
> @@ -227,33 +334,46 @@ bool engine_run(struct engine_node *node, uint64_t
run_id)
>          /* If we couldn't compute the node we either aborted or triggered
>           * a full recompute. In any case, stop processing.
>           */
> -        return engine_compute(node, !engine_abort_recompute);
> +        if (!engine_compute(node, recompute_allowed)) {
> +            return;
> +        }
>      }
>
> -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> -    return true;
> +    /* If we reached this point, either the node was updated or its
state is
> +     * still valid.
> +     */
> +    if (!engine_node_changed(node)) {
> +        engine_set_node_state(node, EN_VALID);
> +    }
>  }
>
> -bool
> -engine_need_run(struct engine_node *node, uint64_t run_id)
> +void
> +engine_run(bool abort_on_recompute)

It may be better to use the reverse of the "abort_on_recompute" parameter:
"recompute_allowed", to be consistent with all the other places, which
makes the logic slightly more straightforward.

>  {
> -    size_t i;
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        engine_run_node(engine_nodes[i], !abort_on_recompute);

Maybe we should break if a node run resulted in aborted, which may avoid
waste of computes. It seems not useful to continue the loop if it is
already aborted on any node. Would it be ok to just leave all the following
node in STALE state? I know we discussed in last version about propagating
the ABORTED state. However, with this new version since engine_init_run()
is simpler and we don't pass any node to the engine_aborted(), etc., I
think maybe we don't need to propagate the ABORTED state, right?

> +    }
> +}
>
> -    if (node->run_id == run_id) {
> +bool
> +engine_need_run(void)
> +{
> +    if (engine_has_run()) {
>          return false;
>      }
>
> -    if (!node->n_inputs) {
> -        node->run(node);
> -        VLOG_DBG("input node: %s, changed: %d", node->name,
node->changed);
> -        return node->changed;
> -    }
> +    for (size_t i = 0; i < engine_n_nodes; i++) {
> +        /* Check only leaf nodes for updates. */
> +        if (engine_nodes[i]->n_inputs) {
> +            continue;
> +        }
>
> -    for (i = 0; i < node->n_inputs; i++) {
> -        if (engine_need_run(node->inputs[i].node, run_id)) {
> +        engine_nodes[i]->run(engine_nodes[i]);
> +        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
> +                 engine_node_state_name[engine_nodes[i]->state]);
> +        if (engine_nodes[i]->state == EN_UPDATED) {
>              return true;
>          }
>      }
> -
>      return false;
>  }
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index abd41b2..5315649 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -82,10 +82,21 @@ struct engine_node_input {
>      bool (*change_handler)(struct engine_node *node);
>  };
>
> -struct engine_node {
> -    /* A unique id to distinguish each iteration of the engine_run(). */
> -    uint64_t run_id;
> +enum engine_node_state {
> +    EN_STALE,     /* Data in the node is not up to date with the DB. */
> +    EN_UPDATED,   /* Data in the node is valid but was updated during the
> +                   * last run.
> +                   */
> +    EN_VALID,     /* Data in the node is valid and didn't change during
the
> +                   * last run.
> +                   */
> +    EN_ABORTED,   /* During the last run, processing was aborted for
> +                   * this node.
> +                   */
> +    EN_STATE_MAX,
> +};
>
> +struct engine_node {
>      /* A unique name for each node. */
>      char *name;
>
> @@ -102,8 +113,8 @@ struct engine_node {
>       * node. */
>      void *data;
>
> -    /* Whether the data changed in the last engine run. */
> -    bool changed;
> +    /* State of the node after the last engine run. */
> +    enum engine_node_state state;
>
>      /* Method to initialize data. It may be NULL. */
>      void (*init)(struct engine_node *);
> @@ -116,23 +127,29 @@ struct engine_node {
>      void (*run)(struct engine_node *);
>  };
>
> -/* Initialize the data for the engine nodes recursively. It calls each
node's
> +/* Initialize the data for the engine nodes. It calls each node's
>   * init() method if not NULL. It should be called before the main loop.
*/
> -void engine_init(struct engine_node *);
> +void engine_init(struct engine_node *node);
>
> -/* Execute the processing recursively, which should be called in the main
> - * loop. Returns true if the execution is compelte, false if it is
aborted,
> - * which could happen when engine_abort_recompute is set. */
> -bool engine_run(struct engine_node *, uint64_t run_id);
> +/* Initialize the engine nodes for a new run. It should be called in the
> + * main processing loop before every potential engine_run().
> + */
> +void engine_init_run(void);
> +
> +/* Execute the processing, which should be called in the main loop.
> + * Updates the engine node's states accordingly. If 'abort_on_recompute'
is
> + * true, if a recompute is required by the current engine run then the
engine
> + * aborts.
> + */
> +void engine_run(bool abort_on_recompute);
>
> -/* Clean up the data for the engine nodes recursively. It calls each
node's
> +/* Clean up the data for the engine nodes. It calls each node's
>   * cleanup() method if not NULL. It should be called before the program
>   * terminates. */
> -void engine_cleanup(struct engine_node *);
> +void engine_cleanup(void);
>
>  /* Check if engine needs to run but didn't. */
> -bool
> -engine_need_run(struct engine_node *, uint64_t run_id);
> +bool engine_need_run(void);
>
>  /* Get the input node with <name> for <node> */
>  struct engine_node * engine_get_input(const char *input_name,
> @@ -151,16 +168,26 @@ void engine_add_input(struct engine_node *node,
struct engine_node *input,
>   * iteration, and the change can't be tracked across iterations */
>  void engine_set_force_recompute(bool val);
>
> -/* Set the flag to cause engine execution to be aborted when there
> - * is any recompute to be triggered in any node. */
> -void engine_set_abort_recompute(bool val);
> -
>  const struct engine_context * engine_get_context(void);
>
>  void engine_set_context(const struct engine_context *);
>
> -/* Return true if the engine has run for 'node' in the 'run_id'
iteration. */
> -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> +void engine_set_node_state_at(struct engine_node *node,
> +                              enum engine_node_state state,
> +                              const char *where);
> +
> +/* Return true if during the last iteration the node's data was updated.
*/
> +bool engine_node_changed(struct engine_node *node);
> +
> +/* Return true if the engine has run in the last iteration. */
> +bool engine_has_run(void);
> +
> +/* Returns true if during the last engine run we had to abort
processing. */
> +bool engine_aborted(void);
> +
> +/* Set the state of the node and log changes. */
> +#define engine_set_node_state(node, state) \
> +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
>
>  struct ed_ovsdb_index {
>      const char *name;
> @@ -187,6 +214,7 @@ void engine_ovsdb_node_add_index(struct engine_node
*, const char *name,
>      struct engine_node en_##NAME = { \
>          .name = NAME_STR, \
>          .data = &ed_##NAME, \
> +        .state = EN_STALE, \
>          .init = en_##NAME##_init, \
>          .run = en_##NAME##_run, \
>          .cleanup = en_##NAME##_cleanup, \
> @@ -201,10 +229,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node
*node) \
>      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
>          EN_OVSDB_GET(node); \
>      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> -        node->changed = true; \
> +        engine_set_node_state(node, EN_UPDATED); \
>          return; \
>      } \
> -    node->changed = false; \
> +    engine_set_node_state(node, EN_VALID); \
>  } \
>  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node
*node) \
>              = NULL; \
>


More information about the dev mailing list