[ovs-dev] [PATCH ovn v2] ovn-controller: Fix use of dangling pointers in I-P runtime_data.

Dumitru Ceara dceara at redhat.com
Mon Nov 4 12:53:46 UTC 2019


The incremental processing engine might stop a run before the
en_runtime_data node is processed. In such cases the ed_runtime_data
fields might contain pointers to already deleted SB records. For
example, if a port binding corresponding to a patch port is removed from
the SB database and the incremental processing engine aborts before the
en_runtime_data node is processed then the corresponding local_datapath
hashtable entry in ed_runtime_data is stale and will store a pointer to
the already freed sbrec_port_binding record.

This will cause invalid memory accesses in various places (e.g.,
pinctrl_run() -> prepare_ipv6_ras()).

To fix the issue we need a way to track how each node was processed
during an engine run. This commit transforms the 'changed' field in
struct engine_node in a 'state' field. Possible node states are:
- "New": the node is not yet initialized.
- "Stale": data in the node is not up to date with the DB.
- "Updated": data in the node is valid but was updated during
  the last run of the engine.
- "Valid": data in the node is valid and didn't change during
  the last run of the engine.
- "Aborted": during the last run, processing was aborted for
  this node.
- "Destroyed": the node was already cleaned up.

We also add a separation between engine node data that can be accessed
at any time (regardless if the last engine run was successful or not)
and data that may be accessed only if the nodes are up to date. This
helps avoiding custom "engine_node_valid" handlers for different
nodes.

The commit also simplifies the logic of calling engine_run and
engine_need_run in order to reduce the number of external variables
required to track the result of the last engine execution.

Functions that need to be called from the main loop and depend on
various data contents of the engine's nodes are now called only if
the data is up to date.

CC: Han Zhou <hzhou8 at ebay.com>
Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine - quiet mode.")
Signed-off-by: Dumitru Ceara <dceara at redhat.com>

---
v2: Address Han's comments:
- call engine_node_valid() in all the places where node local data is
  used.
- move out "global" data outside the engine nodes. Make a clear
  separation between data that can be safely used at any time and data
  that can be used only when the engine run was successful.
- add a debug log for iterations when the engine didn't run.
- refactor a bit more the incremental engine code.
---
 controller/ovn-controller.c | 312 +++++++++++++++++++++++++++-----------------
 lib/inc-proc-eng.c          | 233 ++++++++++++++++++++++++++-------
 lib/inc-proc-eng.h          |  72 ++++++++--
 3 files changed, 433 insertions(+), 184 deletions(-)

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index 9ab98be..6d2cbea 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -90,6 +90,21 @@ struct pending_pkt {
     char *flow_s;
 };
 
+/* Structure to hold global engine data. This is data that can be safely
+ * accessed at any time after engine_init, regardless if the incremental
+ * engine has updated it or not. The incremental engine is responsible for
+ * managing the memory (i.e., allocate/destroy hashtables and maps).
+ */
+struct engine_storage {
+    struct shash pending_ct_zones;
+    struct simap ct_zones;
+
+    struct ovn_desired_flow_table flow_table;
+    struct ovn_extend_table group_table;
+    struct ovn_extend_table meter_table;
+    struct lflow_resource_ref lflow_resource_ref;
+};
+
 struct local_datapath *
 get_local_datapath(const struct hmap *local_datapaths, uint32_t tunnel_key)
 {
@@ -739,7 +754,7 @@ struct ed_type_ofctrl_is_connected {
 };
 
 static void
-en_ofctrl_is_connected_init(struct engine_node *node)
+en_ofctrl_is_connected_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_ofctrl_is_connected *data =
         (struct ed_type_ofctrl_is_connected *)node->data;
@@ -758,10 +773,10 @@ en_ofctrl_is_connected_run(struct engine_node *node)
         (struct ed_type_ofctrl_is_connected *)node->data;
     if (data->connected != ofctrl_is_connected()) {
         data->connected = !data->connected;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_addr_sets {
@@ -773,7 +788,7 @@ struct ed_type_addr_sets {
 };
 
 static void
-en_addr_sets_init(struct engine_node *node)
+en_addr_sets_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_addr_sets *as = (struct ed_type_addr_sets *)node->data;
     shash_init(&as->addr_sets);
@@ -811,7 +826,7 @@ en_addr_sets_run(struct engine_node *node)
     addr_sets_init(as_table, &as->addr_sets);
 
     as->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -830,11 +845,14 @@ addr_sets_sb_address_set_handler(struct engine_node *node)
     addr_sets_update(as_table, &as->addr_sets, &as->new,
                      &as->deleted, &as->updated);
 
-    node->changed = !sset_is_empty(&as->new) || !sset_is_empty(&as->deleted)
-                    || !sset_is_empty(&as->updated);
+    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
+            !sset_is_empty(&as->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     as->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -847,7 +865,7 @@ struct ed_type_port_groups{
 };
 
 static void
-en_port_groups_init(struct engine_node *node)
+en_port_groups_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_port_groups *pg = (struct ed_type_port_groups *)node->data;
     shash_init(&pg->port_groups);
@@ -885,7 +903,7 @@ en_port_groups_run(struct engine_node *node)
     port_groups_init(pg_table, &pg->port_groups);
 
     pg->change_tracked = false;
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -904,11 +922,14 @@ port_groups_sb_port_group_handler(struct engine_node *node)
     port_groups_update(pg_table, &pg->port_groups, &pg->new,
                      &pg->deleted, &pg->updated);
 
-    node->changed = !sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted)
-                    || !sset_is_empty(&pg->updated);
+    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
+            !sset_is_empty(&pg->updated)) {
+        engine_set_node_state(node, EN_UPDATED);
+    } else {
+        engine_set_node_state(node, EN_VALID);
+    }
 
     pg->change_tracked = true;
-    node->changed = true;
     return true;
 }
 
@@ -930,13 +951,15 @@ struct ed_type_runtime_data {
 
     /* connection tracking zones. */
     unsigned long ct_zone_bitmap[BITMAP_N_LONGS(MAX_CT_ZONES)];
-    struct shash pending_ct_zones;
-    struct simap ct_zones;
+    struct shash *pending_ct_zones;
+    struct simap *ct_zones;
 };
 
 static void
-en_runtime_data_init(struct engine_node *node)
+en_runtime_data_init(struct engine_node *node, void *arg)
 {
+    struct engine_storage *storage = arg;
+
     struct ed_type_runtime_data *data =
         (struct ed_type_runtime_data *)node->data;
     struct ovsrec_open_vswitch_table *ovs_table =
@@ -949,14 +972,18 @@ en_runtime_data_init(struct engine_node *node)
     sset_init(&data->local_lports);
     sset_init(&data->local_lport_ids);
     sset_init(&data->active_tunnels);
-    shash_init(&data->pending_ct_zones);
-    simap_init(&data->ct_zones);
+
+    data->pending_ct_zones = &storage->pending_ct_zones;
+    data->ct_zones         = &storage->ct_zones;
+
+    shash_init(data->pending_ct_zones);
+    simap_init(data->ct_zones);
 
     /* Initialize connection tracking zones. */
     memset(data->ct_zone_bitmap, 0, sizeof data->ct_zone_bitmap);
     bitmap_set1(data->ct_zone_bitmap, 0); /* Zone 0 is reserved. */
     restore_ct_zones(bridge_table, ovs_table,
-                     &data->ct_zones, data->ct_zone_bitmap);
+                     data->ct_zones, data->ct_zone_bitmap);
 }
 
 static void
@@ -978,8 +1005,8 @@ en_runtime_data_cleanup(struct engine_node *node)
     }
     hmap_destroy(&data->local_datapaths);
 
-    simap_destroy(&data->ct_zones);
-    shash_destroy(&data->pending_ct_zones);
+    simap_destroy(data->ct_zones);
+    shash_destroy(data->pending_ct_zones);
 }
 
 static void
@@ -992,8 +1019,8 @@ en_runtime_data_run(struct engine_node *node)
     struct sset *local_lport_ids = &data->local_lport_ids;
     struct sset *active_tunnels = &data->active_tunnels;
     unsigned long *ct_zone_bitmap = data->ct_zone_bitmap;
-    struct shash *pending_ct_zones = &data->pending_ct_zones;
-    struct simap *ct_zones = &data->ct_zones;
+    struct shash *pending_ct_zones = data->pending_ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     static bool first_run = true;
     if (first_run) {
@@ -1091,7 +1118,7 @@ en_runtime_data_run(struct engine_node *node)
     update_ct_zones(local_lports, local_datapaths, ct_zones,
                     ct_zone_bitmap, pending_ct_zones);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1137,7 +1164,7 @@ struct ed_type_mff_ovn_geneve {
 };
 
 static void
-en_mff_ovn_geneve_init(struct engine_node *node)
+en_mff_ovn_geneve_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_mff_ovn_geneve *data =
         (struct ed_type_mff_ovn_geneve *)node->data;
@@ -1157,35 +1184,43 @@ en_mff_ovn_geneve_run(struct engine_node *node)
     enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
     if (data->mff_ovn_geneve != mff_ovn_geneve) {
         data->mff_ovn_geneve = mff_ovn_geneve;
-        node->changed = true;
+        engine_set_node_state(node, EN_UPDATED);
         return;
     }
-    node->changed = false;
+    engine_set_node_state(node, EN_VALID);
 }
 
 struct ed_type_flow_output {
     /* desired flows */
-    struct ovn_desired_flow_table flow_table;
+    struct ovn_desired_flow_table *flow_table;
     /* group ids for load balancing */
-    struct ovn_extend_table group_table;
+    struct ovn_extend_table *group_table;
     /* meter ids for QoS */
-    struct ovn_extend_table meter_table;
+    struct ovn_extend_table *meter_table;
+    /* lflow resource cross reference */
+    struct lflow_resource_ref *lflow_resource_ref;
+
     /* conjunction id offset */
     uint32_t conj_id_ofs;
-    /* lflow resource cross reference */
-    struct lflow_resource_ref lflow_resource_ref;
 };
 
 static void
-en_flow_output_init(struct engine_node *node)
+en_flow_output_init(struct engine_node *node, void *arg OVS_UNUSED)
 {
     struct ed_type_flow_output *data =
         (struct ed_type_flow_output *)node->data;
-    ovn_desired_flow_table_init(&data->flow_table);
-    ovn_extend_table_init(&data->group_table);
-    ovn_extend_table_init(&data->meter_table);
+    struct engine_storage *storage = arg;
+
+    data->flow_table         = &storage->flow_table;
+    data->group_table        = &storage->group_table;
+    data->meter_table        = &storage->meter_table;
+    data->lflow_resource_ref = &storage->lflow_resource_ref;
+
+    ovn_desired_flow_table_init(data->flow_table);
+    ovn_extend_table_init(data->group_table);
+    ovn_extend_table_init(data->meter_table);
     data->conj_id_ofs = 1;
-    lflow_resource_init(&data->lflow_resource_ref);
+    lflow_resource_init(data->lflow_resource_ref);
 }
 
 static void
@@ -1193,10 +1228,10 @@ en_flow_output_cleanup(struct engine_node *node)
 {
     struct ed_type_flow_output *data =
         (struct ed_type_flow_output *)node->data;
-    ovn_desired_flow_table_destroy(&data->flow_table);
-    ovn_extend_table_destroy(&data->group_table);
-    ovn_extend_table_destroy(&data->meter_table);
-    lflow_resource_destroy(&data->lflow_resource_ref);
+    ovn_desired_flow_table_destroy(data->flow_table);
+    ovn_extend_table_destroy(data->group_table);
+    ovn_extend_table_destroy(data->meter_table);
+    lflow_resource_destroy(data->lflow_resource_ref);
 }
 
 static void
@@ -1209,7 +1244,7 @@ en_flow_output_run(struct engine_node *node)
     struct sset *local_lports = &rt_data->local_lports;
     struct sset *local_lport_ids = &rt_data->local_lport_ids;
     struct sset *active_tunnels = &rt_data->active_tunnels;
-    struct simap *ct_zones = &rt_data->ct_zones;
+    struct simap *ct_zones = rt_data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1247,11 +1282,11 @@ en_flow_output_run(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     static bool first_run = true;
     if (first_run) {
@@ -1322,7 +1357,7 @@ en_flow_output_run(struct engine_node *node)
                  active_tunnels,
                  flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
 }
 
 static bool
@@ -1366,11 +1401,11 @@ flow_output_sb_logical_flow_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath =
         engine_ovsdb_node_get_index(
@@ -1404,7 +1439,7 @@ flow_output_sb_logical_flow_handler(struct engine_node *node)
               flow_table, group_table, meter_table, lfrr,
               conj_id_ofs);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return handled;
 }
 
@@ -1422,12 +1457,12 @@ flow_output_sb_mac_binding_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
             mac_binding_table, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1439,7 +1474,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
                 "runtime_data", node)->data;
     struct hmap *local_datapaths = &data->local_datapaths;
     struct sset *active_tunnels = &data->active_tunnels;
-    struct simap *ct_zones = &data->ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1467,7 +1502,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     struct ovsdb_idl_index *sbrec_port_binding_by_name =
         engine_ovsdb_node_get_index(
@@ -1531,7 +1566,7 @@ flow_output_sb_port_binding_handler(struct engine_node *node)
             chassis, ct_zones, local_datapaths,
             active_tunnels, flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 }
 
@@ -1542,7 +1577,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
         (struct ed_type_runtime_data *)engine_get_input(
                 "runtime_data", node)->data;
     struct hmap *local_datapaths = &data->local_datapaths;
-    struct simap *ct_zones = &data->ct_zones;
+    struct simap *ct_zones = data->ct_zones;
 
     struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
         (struct ed_type_mff_ovn_geneve *)engine_get_input(
@@ -1570,7 +1605,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
 
     struct sbrec_multicast_group_table *multicast_group_table =
         (struct sbrec_multicast_group_table *)EN_OVSDB_GET(
@@ -1580,7 +1615,7 @@ flow_output_sb_multicast_group_handler(struct engine_node *node)
             mff_ovn_geneve, chassis, ct_zones, local_datapaths,
             flow_table);
 
-    node->changed = true;
+    engine_set_node_state(node, EN_UPDATED);
     return true;
 
 }
@@ -1627,11 +1662,11 @@ _flow_output_resource_ref_handler(struct engine_node *node,
 
     struct ed_type_flow_output *fo =
         (struct ed_type_flow_output *)node->data;
-    struct ovn_desired_flow_table *flow_table = &fo->flow_table;
-    struct ovn_extend_table *group_table = &fo->group_table;
-    struct ovn_extend_table *meter_table = &fo->meter_table;
+    struct ovn_desired_flow_table *flow_table = fo->flow_table;
+    struct ovn_extend_table *group_table = fo->group_table;
+    struct ovn_extend_table *meter_table = fo->meter_table;
+    struct lflow_resource_ref *lfrr = fo->lflow_resource_ref;
     uint32_t *conj_id_ofs = &fo->conj_id_ofs;
-    struct lflow_resource_ref *lfrr = &fo->lflow_resource_ref;
 
     struct ovsdb_idl_index *sbrec_multicast_group_by_name_datapath =
         engine_ovsdb_node_get_index(
@@ -1694,7 +1729,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, updated) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1707,7 +1744,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
     SSET_FOR_EACH (ref_name, new) {
         if (!lflow_handle_changed_ref(ref_type, ref_name,
@@ -1720,7 +1759,9 @@ _flow_output_resource_ref_handler(struct engine_node *node,
                     conj_id_ofs, &changed)) {
             return false;
         }
-        node->changed = changed || node->changed;
+        if (changed) {
+            engine_set_node_state(node, EN_UPDATED);
+        }
     }
 
     return true;
@@ -1922,28 +1963,33 @@ main(int argc, char *argv[])
     engine_add_input(&en_runtime_data, &en_sb_port_binding,
                      runtime_data_sb_port_binding_handler);
 
-    engine_init(&en_flow_output);
+    /* Storage space for engine data that can be accessed at any moment
+     * after engine_init(). */
+    struct engine_storage storage;
+
+    /* Initialize the incremental engine and point it to the global engine
+     * data. The engine takes ownership of the 'storage' memory and makes
+     * sure it's initialized and destroyed properly.
+     */
+    engine_init(&en_flow_output, &storage);
 
-    ofctrl_init(&ed_flow_output.group_table,
-                &ed_flow_output.meter_table,
+    ofctrl_init(&storage.group_table, &storage.meter_table,
                 get_ofctrl_probe_interval(ovs_idl_loop.idl));
 
     unixctl_command_register("group-table-list", "", 0, 0,
-                             group_table_list, &ed_flow_output.group_table);
+                             group_table_list, &storage.group_table);
 
     unixctl_command_register("meter-table-list", "", 0, 0,
-                             meter_table_list, &ed_flow_output.meter_table);
+                             meter_table_list, &storage.meter_table);
 
     unixctl_command_register("ct-zone-list", "", 0, 0,
-                             ct_zone_list, &ed_runtime_data.ct_zones);
+                             ct_zone_list, &storage.ct_zones);
 
     struct pending_pkt pending_pkt = { .conn = NULL };
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
     uint64_t engine_run_id = 0;
-    uint64_t old_engine_run_id = 0;
-    bool engine_run_done = true;
 
     unsigned int ovs_cond_seqno = UINT_MAX;
     unsigned int ovnsb_cond_seqno = UINT_MAX;
@@ -1952,10 +1998,11 @@ main(int argc, char *argv[])
     exiting = false;
     restart = false;
     while (!exiting) {
+        engine_run_id++;
+
         update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
         update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
         ofctrl_set_probe_interval(get_ofctrl_probe_interval(ovs_idl_loop.idl));
-        old_engine_run_id = engine_run_id;
 
         struct ovsdb_idl_txn *ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
         unsigned int new_ovs_cond_seqno
@@ -2011,7 +2058,7 @@ main(int argc, char *argv[])
             }
 
             if (br_int) {
-                ofctrl_run(br_int, &ed_runtime_data.pending_ct_zones);
+                ofctrl_run(br_int, &storage.pending_ct_zones);
 
                 if (chassis) {
                     patch_run(ovs_idl_txn,
@@ -2044,50 +2091,66 @@ main(int argc, char *argv[])
                              * this round of engine_run and continue processing
                              * acculated changes incrementally later when
                              * ofctrl_can_put() returns true. */
-                            if (engine_run_done) {
+                            if (!engine_aborted(&en_flow_output)) {
                                 engine_set_abort_recompute(true);
-                                engine_run_done = engine_run(&en_flow_output,
-                                                             ++engine_run_id);
+                                engine_run(&en_flow_output, engine_run_id);
                             }
                         } else {
                             engine_set_abort_recompute(false);
-                            engine_run_done = true;
-                            engine_run(&en_flow_output, ++engine_run_id);
+                            engine_run(&en_flow_output, engine_run_id);
                         }
                     }
                     stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
                                    time_msec());
                     if (ovs_idl_txn) {
-                        commit_ct_zones(br_int,
-                                        &ed_runtime_data.pending_ct_zones);
+                        commit_ct_zones(br_int, &storage.pending_ct_zones);
                         bfd_run(ovsrec_interface_table_get(ovs_idl_loop.idl),
                                 br_int, chassis,
                                 sbrec_ha_chassis_group_table_get(
                                     ovnsb_idl_loop.idl),
                                 sbrec_sb_global_table_get(ovnsb_idl_loop.idl));
                     }
-                    ofctrl_put(&ed_flow_output.flow_table,
-                               &ed_runtime_data.pending_ct_zones,
-                               sbrec_meter_table_get(ovnsb_idl_loop.idl),
-                               get_nb_cfg(sbrec_sb_global_table_get(
-                                              ovnsb_idl_loop.idl)),
-                               en_flow_output.changed);
-                    pinctrl_run(ovnsb_idl_txn,
-                                sbrec_datapath_binding_by_key,
-                                sbrec_port_binding_by_datapath,
-                                sbrec_port_binding_by_key,
-                                sbrec_port_binding_by_name,
-                                sbrec_mac_binding_by_lport_ip,
-                                sbrec_igmp_group,
-                                sbrec_ip_multicast,
-                                sbrec_dns_table_get(ovnsb_idl_loop.idl),
-                                sbrec_controller_event_table_get(
-                                    ovnsb_idl_loop.idl),
-                                br_int, chassis,
-                                &ed_runtime_data.local_datapaths,
-                                &ed_runtime_data.active_tunnels);
 
-                    if (en_runtime_data.changed) {
+                    /* We need to make sure the en_flow_output node was
+                     * properly computed before trying to install OF flows.
+                     */
+                    if (engine_node_valid(&en_flow_output, engine_run_id)) {
+                        ofctrl_put(&storage.flow_table,
+                                   &storage.pending_ct_zones,
+                                   sbrec_meter_table_get(ovnsb_idl_loop.idl),
+                                   get_nb_cfg(sbrec_sb_global_table_get(
+                                                   ovnsb_idl_loop.idl)),
+                                   engine_node_changed(&en_flow_output,
+                                                       engine_run_id));
+                    }
+
+                    /* We need to make sure that at least the runtime data
+                     * (e.g., local datapaths, local lports) are fresh before
+                     * calling pinctrl_run to avoid using potentially freed
+                     * references to database records.
+                     */
+                    if (engine_node_valid(&en_runtime_data, engine_run_id)) {
+                        pinctrl_run(ovnsb_idl_txn,
+                                    sbrec_datapath_binding_by_key,
+                                    sbrec_port_binding_by_datapath,
+                                    sbrec_port_binding_by_key,
+                                    sbrec_port_binding_by_name,
+                                    sbrec_mac_binding_by_lport_ip,
+                                    sbrec_igmp_group,
+                                    sbrec_ip_multicast,
+                                    sbrec_dns_table_get(ovnsb_idl_loop.idl),
+                                    sbrec_controller_event_table_get(
+                                        ovnsb_idl_loop.idl),
+                                    br_int, chassis,
+                                    &ed_runtime_data.local_datapaths,
+                                    &ed_runtime_data.active_tunnels);
+                    }
+
+                    /* If the runtime data changed we might need to update
+                     * the tables we need to monitor from the SB DB.
+                     */
+                    if (engine_node_changed(&en_runtime_data,
+                                            engine_run_id)) {
                         update_sb_monitors(ovnsb_idl_loop.idl, chassis,
                                            &ed_runtime_data.local_lports,
                                            &ed_runtime_data.local_datapaths);
@@ -2095,17 +2158,20 @@ main(int argc, char *argv[])
                 }
 
             }
-            if (old_engine_run_id == engine_run_id || !engine_run_done) {
-                if (!engine_run_done || engine_need_run(&en_flow_output)) {
-                    VLOG_DBG("engine did not run, force recompute next time: "
-                             "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
-                    poll_immediate_wake();
-                } else {
-                    VLOG_DBG("engine did not run, and it was not needed"
-                             " either: br_int %p, chassis %p",
-                             br_int, chassis);
-                }
+            if (engine_need_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, force recompute next time: "
+                            "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (engine_aborted(&en_flow_output)) {
+                VLOG_DBG("engine was aborted, force recompute next time: "
+                         "br_int %p, chassis %p", br_int, chassis);
+                engine_set_force_recompute(true);
+                poll_immediate_wake();
+            } else if (!engine_has_run(&en_flow_output, engine_run_id)) {
+                VLOG_DBG("engine did not run, and it was not needed"
+                         " either: br_int %p, chassis %p",
+                         br_int, chassis);
             } else {
                 engine_set_force_recompute(false);
             }
@@ -2117,9 +2183,14 @@ main(int argc, char *argv[])
                 }
             }
 
-
             if (pending_pkt.conn) {
-                if (br_int && chassis) {
+                /* We need to make sure that en_addr_sets and en_port_group
+                 * nodes contain valid data before trying to inject the
+                 * packet.
+                 */
+                if (br_int && chassis &&
+                        engine_node_valid(&en_addr_sets, engine_run_id) &&
+                        engine_node_valid(&en_port_groups, engine_run_id)) {
                     char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
                         &ed_addr_sets.addr_sets, &ed_port_groups.port_groups);
                     if (error) {
@@ -2161,11 +2232,10 @@ main(int argc, char *argv[])
 
         if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
             struct shash_node *iter, *iter_next;
-            SHASH_FOR_EACH_SAFE (iter, iter_next,
-                                 &ed_runtime_data.pending_ct_zones) {
+            SHASH_FOR_EACH_SAFE (iter, iter_next, &storage.pending_ct_zones) {
                 struct ct_zone_pending_entry *ctzpe = iter->data;
                 if (ctzpe->state == CT_ZONE_DB_SENT) {
-                    shash_delete(&ed_runtime_data.pending_ct_zones, iter);
+                    shash_delete(&storage.pending_ct_zones, iter);
                     free(ctzpe);
                 }
             }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 1064a08..cbb9c39 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -34,6 +34,15 @@ static bool engine_force_recompute = false;
 static bool engine_abort_recompute = false;
 static const struct engine_context *engine_context;
 
+static const char *engine_node_state_name[EN_STATE_MAX] = {
+    [EN_NEW]       = "New",
+    [EN_STALE]     = "Stale",
+    [EN_UPDATED]   = "Updated",
+    [EN_VALID]     = "Valid",
+    [EN_ABORTED]   = "Aborted",
+    [EN_DESTROYED] = "Destroyed",
+};
+
 void
 engine_set_force_recompute(bool val)
 {
@@ -59,19 +68,35 @@ engine_set_context(const struct engine_context *ctx)
 }
 
 void
-engine_init(struct engine_node *node)
+engine_init(struct engine_node *node, void *arg)
 {
+    if (!engine_node_new(node)) {
+        /* The node was already initialized (could be input for multiple
+         * nodes). Nothing to do then.
+         */
+        return;
+    }
+
+    engine_set_node_state(node, EN_STALE);
     for (size_t i = 0; i < node->n_inputs; i++) {
-        engine_init(node->inputs[i].node);
+        engine_init(node->inputs[i].node, arg);
     }
     if (node->init) {
-        node->init(node);
+        node->init(node, arg);
     }
 }
 
 void
 engine_cleanup(struct engine_node *node)
 {
+    /* The neode was already destroyed (could be input for multiple nodes).
+     * Nothing to do then.
+     */
+    if (engine_node_destroyed(node)) {
+        return;
+    }
+
+    engine_set_node_state(node, EN_DESTROYED);
     for (size_t i = 0; i < node->n_inputs; i++) {
         engine_cleanup(node->inputs[i].node);
     }
@@ -128,89 +153,197 @@ engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
     ed->n_indexes ++;
 }
 
+void
+engine_set_node_state_at(struct engine_node *node,
+                         enum engine_node_state state,
+                         const char *where)
+{
+    if (node->state == state) {
+        return;
+    }
+
+    VLOG_DBG("%s: node: %s (run-id %lu), old_state %s, new_state %s",
+             where, node->name, node->run_id,
+             engine_node_state_name[node->state],
+             engine_node_state_name[state]);
+
+    node->state = state;
+}
+
 bool
+engine_node_new(struct engine_node *node)
+{
+    return node->state == EN_NEW;
+}
+
+bool
+engine_node_destroyed(struct engine_node *node)
+{
+    return node->state == EN_DESTROYED;
+}
+
+bool
+engine_node_valid(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id &&
+        (node->state == EN_UPDATED || node->state == EN_VALID);
+}
+
+bool
+engine_node_changed(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id && node->state == EN_UPDATED;
+}
+
+bool
+engine_has_run(struct engine_node *node, uint64_t run_id)
+{
+    return node->run_id == run_id;
+}
+
+bool
+engine_aborted(struct engine_node *node)
+{
+    return node->state == EN_ABORTED;
+}
+
+/* Do a full recompute (or at least try). If we're not allowed then
+ * mark the node as "aborted".
+ */
+static void
+engine_recompute(struct engine_node *node, bool forced, bool allowed)
+{
+    VLOG_DBG("node: %s, recompute (%s)", node->name,
+             forced ? "forced" : "triggered");
+
+    if (!allowed) {
+        VLOG_DBG("node: %s, recompute aborted", node->name);
+        engine_set_node_state(node, EN_ABORTED);
+        return;
+    }
+
+    /* Run the node handler which might change state. */
+    node->run(node);
+}
+
+/* Return true if the node could be computed without triggerring a full
+ * recompute.
+ */
+static bool
+engine_compute(struct engine_node *node, bool recompute_allowed)
+{
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        /* If the input node data changed call its change handler. */
+        if (node->inputs[i].node->state == EN_UPDATED) {
+            VLOG_DBG("node: %s, handle change for input %s",
+                     node->name, node->inputs[i].node->name);
+
+            /* If the input change can't be handled incrementally, run
+             * the node handler.
+             */
+            if (!node->inputs[i].change_handler(node)) {
+                VLOG_DBG("node: %s, can't handle change for input %s, "
+                         "fall back to recompute",
+                         node->name, node->inputs[i].node->name);
+                engine_recompute(node, false, recompute_allowed);
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
+void
 engine_run(struct engine_node *node, uint64_t run_id)
 {
     if (node->run_id == run_id) {
-        return true;
+        /* The node was already updated in this run (could be input for
+         * multiple other nodes). Stop processing.
+         */
+        return;
     }
+
+    /* Initialize the node for this run. */
     node->run_id = run_id;
+    engine_set_node_state(node, EN_STALE);
 
-    node->changed = false;
     if (!node->n_inputs) {
+        /* Run the node handler which might change state. */
         node->run(node);
-        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-        return true;
+        return;
     }
 
     for (size_t i = 0; i < node->n_inputs; i++) {
-        if (!engine_run(node->inputs[i].node, run_id)) {
-            return false;
+        engine_run(node->inputs[i].node, run_id);
+        if (!engine_node_valid(node->inputs[i].node, run_id)) {
+            /* If the input node aborted computation, move to EN_ABORTED to
+             * propagate the result, otherwise stay in EN_STALE.
+             */
+            if (engine_aborted(node->inputs[i].node)) {
+                engine_set_node_state(node, EN_ABORTED);
+            }
+            return;
         }
     }
 
     bool need_compute = false;
-    bool need_recompute = false;
 
     if (engine_force_recompute) {
-        need_recompute = true;
-    } else {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                need_compute = true;
-                if (!node->inputs[i].change_handler) {
-                    need_recompute = true;
-                    break;
-                }
+        engine_recompute(node, true, !engine_abort_recompute);
+        return;
+    }
+
+    /* If one of the inputs updated data then we need to recompute the
+     * current node too.
+     */
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        if (node->inputs[i].node->state == EN_UPDATED) {
+            need_compute = true;
+
+            /* Trigger a recompute if we don't have a change handler. */
+            if (!node->inputs[i].change_handler) {
+                engine_recompute(node, false, !engine_abort_recompute);
+                return;
             }
         }
     }
 
-    if (need_recompute) {
-        VLOG_DBG("node: %s, recompute (%s)", node->name,
-                 engine_force_recompute ? "forced" : "triggered");
-        if (engine_abort_recompute) {
-            VLOG_DBG("node: %s, recompute aborted", node->name);
-            return false;
-        }
-        node->run(node);
-    } else if (need_compute) {
-        for (size_t i = 0; i < node->n_inputs; i++) {
-            if (node->inputs[i].node->changed) {
-                VLOG_DBG("node: %s, handle change for input %s",
-                         node->name, node->inputs[i].node->name);
-                if (!node->inputs[i].change_handler(node)) {
-                    VLOG_DBG("node: %s, can't handle change for input %s, "
-                             "fall back to recompute",
-                             node->name, node->inputs[i].node->name);
-                    if (engine_abort_recompute) {
-                        VLOG_DBG("node: %s, recompute aborted", node->name);
-                        return false;
-                    }
-                    node->run(node);
-                    break;
-                }
-            }
+    if (need_compute) {
+        /* If we coudln't compute the node we either aborted or triggered
+         * a full recompute. In any case, stop processing.
+         */
+        if (!engine_compute(node, !engine_abort_recompute)) {
+            return;
         }
     }
 
-    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
-    return true;
+    /* If we reached this point, either the node was updated or its state is
+     * still valid.
+     */
+    if (!engine_node_changed(node, run_id)) {
+        engine_set_node_state(node, EN_VALID);
+    }
 }
 
 bool
-engine_need_run(struct engine_node *node)
+engine_need_run(struct engine_node *node, uint64_t run_id)
 {
     size_t i;
 
+    if (node->run_id == run_id) {
+        return false;
+    }
+
     if (!node->n_inputs) {
         node->run(node);
-        VLOG_DBG("input node: %s, changed: %d", node->name, node->changed);
-        return node->changed;
+        VLOG_DBG("input node: %s, state: %s", node->name,
+                 engine_node_state_name[node->state]);
+        return node->state == EN_UPDATED;
     }
 
     for (i = 0; i < node->n_inputs; i++) {
-        if (engine_need_run(node->inputs[i].node)) {
+        if (engine_need_run(node->inputs[i].node, run_id)) {
             return true;
         }
     }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 3a69dc2..43ef82a 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -82,6 +82,22 @@ struct engine_node_input {
     bool (*change_handler)(struct engine_node *node);
 };
 
+enum engine_node_state {
+    EN_NEW,       /* Node is not initialized yet. */
+    EN_STALE,     /* Data in the node is not up to date with the DB. */
+    EN_UPDATED,   /* Data in the node is valid but was updated during the
+                   * last run.
+                   */
+    EN_VALID,     /* Data in the node is valid and didn't change during the
+                   * last run.
+                   */
+    EN_ABORTED,   /* During the last run, processing was aborted for
+                   * this node.
+                   */
+    EN_DESTROYED, /* The node was cleaned up. */
+    EN_STATE_MAX,
+};
+
 struct engine_node {
     /* A unique id to distinguish each iteration of the engine_run(). */
     uint64_t run_id;
@@ -102,11 +118,11 @@ struct engine_node {
      * node. */
     void *data;
 
-    /* Whether the data changed in the last engine run. */
-    bool changed;
+    /* State of the node after the last engine run. */
+    enum engine_node_state state;
 
     /* Method to initialize data. It may be NULL. */
-    void (*init)(struct engine_node *);
+    void (*init)(struct engine_node *, void *arg);
 
     /* Method to clean up data. It may be NULL. */
     void (*cleanup)(struct engine_node *);
@@ -117,22 +133,24 @@ struct engine_node {
 };
 
 /* Initialize the data for the engine nodes recursively. It calls each node's
- * init() method if not NULL. It should be called before the main loop. */
-void engine_init(struct engine_node *);
+ * init() method if not NULL. It should be called before the main loop.
+ * 'arg' is user provided and is passed to all of the node's init handlers.
+ */
+void engine_init(struct engine_node *, void *arg);
 
 /* Execute the processing recursively, which should be called in the main
- * loop. Returns true if the execution is compelte, false if it is aborted,
- * which could happen when engine_abort_recompute is set. */
-bool engine_run(struct engine_node *, uint64_t run_id);
+ * loop. Updates the engine node's states accordingly.
+ */
+void engine_run(struct engine_node *, uint64_t run_id);
 
 /* Clean up the data for the engine nodes recursively. It calls each node's
  * cleanup() method if not NULL. It should be called before the program
  * terminates. */
 void engine_cleanup(struct engine_node *);
 
-/* Check if engine needs to run, i.e. any change to be processed. */
+/* Check if engine needs to run but didn't. */
 bool
-engine_need_run(struct engine_node *);
+engine_need_run(struct engine_node *, uint64_t run_id);
 
 /* Get the input node with <name> for <node> */
 struct engine_node * engine_get_input(const char *input_name,
@@ -159,6 +177,32 @@ const struct engine_context * engine_get_context(void);
 
 void engine_set_context(const struct engine_context *);
 
+void engine_set_node_state_at(struct engine_node *node,
+                              enum engine_node_state state,
+                              const char *where);
+
+/* Return true if the node is "new" (i.e., uninitialized). */
+bool engine_node_new(struct engine_node *node);
+
+/* Return true if the node was already destroyed. */
+bool engine_node_destroyed(struct engine_node *node);
+
+/* Return true if the node's data is up to date with the database contents. */
+bool engine_node_valid(struct engine_node *node, uint64_t run_id);
+
+/* Return true if during the 'run_id' iteration the node's data was updated. */
+bool engine_node_changed(struct engine_node *node, uint64_t run_id);
+
+/* Return true if the engine has run for 'node' in the 'run_id' iteration. */
+bool engine_has_run(struct engine_node *node, uint64_t run_id);
+
+/* Returns true if during the last engine run we had to abort processing. */
+bool engine_aborted(struct engine_node *node);
+
+/* Set the state of the node and log changes. */
+#define engine_set_node_state(node, state) \
+    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
+
 struct ed_ovsdb_index {
     const char *name;
     struct ovsdb_idl_index *index;
@@ -184,6 +228,7 @@ void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
     struct engine_node en_##NAME = { \
         .name = NAME_STR, \
         .data = &ed_##NAME, \
+        .state = EN_NEW, \
         .init = en_##NAME##_init, \
         .run = en_##NAME##_run, \
         .cleanup = en_##NAME##_cleanup, \
@@ -198,12 +243,13 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
     const struct DB_NAME##rec_##TBL_NAME##_table *table = \
         EN_OVSDB_GET(node); \
     if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
-        node->changed = true; \
+        engine_set_node_state(node, EN_UPDATED); \
         return; \
     } \
-    node->changed = false; \
+    engine_set_node_state(node, EN_VALID); \
 } \
-static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node) \
+static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node *node, \
+                                                void *arg) \
             = NULL; \
 static void (*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node *node) \
             = NULL;
-- 
1.8.3.1



More information about the dev mailing list