[ovs-dev] [RFC v2 03/10] ovn-controller: Initial use of incremental engine in main

Han Zhou zhouhan at gmail.com
Thu Mar 22 18:42:20 UTC 2018


Incremental proccessing engine is used to compute flows. In this
patch we only create 2 engine nodes with simple dependency:
    runtime_data -> flow_output

In each iteration everything is still recomputed.
---
 ovn/controller/ofctrl.c         |  21 ++-
 ovn/controller/ofctrl.h         |   5 +-
 ovn/controller/ovn-controller.c | 370 +++++++++++++++++++++++++++-------------
 ovn/controller/ovn-controller.h |   5 +
 4 files changed, 269 insertions(+), 132 deletions(-)

diff --git a/ovn/controller/ofctrl.c b/ovn/controller/ofctrl.c
index 8d6d1b6..4466d1a 100644
--- a/ovn/controller/ofctrl.c
+++ b/ovn/controller/ofctrl.c
@@ -475,11 +475,21 @@ recv_S_UPDATE_FLOWS(const struct ofp_header *oh, enum ofptype type,
     }
 }
 
+
+enum mf_field_id
+ofctrl_get_mf_field_id(void)
+{
+    if (!rconn_is_connected(swconn)) {
+        return 0;
+    }
+    return (state == S_CLEAR_FLOWS || state == S_UPDATE_FLOWS
+            ? mff_ovn_geneve : 0);
+}
+
 /* Runs the OpenFlow state machine against 'br_int', which is local to the
  * hypervisor on which we are running.  Attempts to negotiate a Geneve option
- * field for class OVN_GENEVE_CLASS, type OVN_GENEVE_TYPE.  If successful,
- * returns the MFF_* field ID for the option, otherwise returns 0. */
-enum mf_field_id
+ * field for class OVN_GENEVE_CLASS, type OVN_GENEVE_TYPE. */
+void
 ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
 {
     char *target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int->name);
@@ -492,7 +502,7 @@ ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
     rconn_run(swconn);
 
     if (!rconn_is_connected(swconn)) {
-        return 0;
+        return;
     }
     if (seqno != rconn_get_connection_seqno(swconn)) {
         seqno = rconn_get_connection_seqno(swconn);
@@ -555,9 +565,6 @@ ofctrl_run(const struct ovsrec_bridge *br_int, struct shash *pending_ct_zones)
          * point, so ensure that we come back again without waiting. */
         poll_immediate_wake();
     }
-
-    return (state == S_CLEAR_FLOWS || state == S_UPDATE_FLOWS
-            ? mff_ovn_geneve : 0);
 }
 
 void
diff --git a/ovn/controller/ofctrl.h b/ovn/controller/ofctrl.h
index d53bc68..e25f7d1 100644
--- a/ovn/controller/ofctrl.h
+++ b/ovn/controller/ofctrl.h
@@ -33,8 +33,9 @@ struct shash;
 /* Interface for OVN main loop. */
 void ofctrl_init(struct ovn_extend_table *group_table,
                  struct ovn_extend_table *meter_table);
-enum mf_field_id ofctrl_run(const struct ovsrec_bridge *br_int,
-                            struct shash *pending_ct_zones);
+void ofctrl_run(const struct ovsrec_bridge *br_int,
+                struct shash *pending_ct_zones);
+enum mf_field_id ofctrl_get_mf_field_id(void);
 bool ofctrl_can_put(void);
 void ofctrl_put(struct hmap *flow_table, struct shash *pending_ct_zones,
                 int64_t nb_cfg);
diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c
index 18468cf..b3a0ebf 100644
--- a/ovn/controller/ovn-controller.c
+++ b/ovn/controller/ovn-controller.c
@@ -57,6 +57,7 @@
 #include "stream.h"
 #include "unixctl.h"
 #include "util.h"
+#include "ovn/lib/inc-proc-eng.h"
 
 VLOG_DEFINE_THIS_MODULE(main);
 
@@ -205,15 +206,26 @@ update_sb_monitors(struct ovsdb_idl *ovnsb_idl,
     ovsdb_idl_condition_destroy(&dns);
 }
 
+static const char *
+br_int_name(const struct ovsrec_open_vswitch *cfg)
+{
+    return smap_get_def(&cfg->external_ids, "ovn-bridge", DEFAULT_BRIDGE_NAME);
+}
+
 static const struct ovsrec_bridge *
-create_br_int(struct controller_ctx *ctx,
-              const struct ovsrec_open_vswitch *cfg,
-              const char *bridge_name)
+create_br_int(struct controller_ctx *ctx)
 {
     if (!ctx->ovs_idl_txn) {
         return NULL;
     }
 
+    const struct ovsrec_open_vswitch *cfg;
+    cfg = ovsrec_open_vswitch_first(ctx->ovs_idl);
+    if (!cfg) {
+        return NULL;
+    }
+    const char *bridge_name = br_int_name(cfg);
+
     ovsdb_idl_txn_add_comment(ctx->ovs_idl_txn,
             "ovn-controller: creating integration bridge '%s'", bridge_name);
 
@@ -256,15 +268,7 @@ get_br_int(struct controller_ctx *ctx)
         return NULL;
     }
 
-    const char *br_int_name = smap_get_def(&cfg->external_ids, "ovn-bridge",
-                                           DEFAULT_BRIDGE_NAME);
-
-    const struct ovsrec_bridge *br;
-    br = get_bridge(ctx->ovs_idl, br_int_name);
-    if (!br) {
-        return create_br_int(ctx, cfg, br_int_name);
-    }
-    return br;
+    return get_bridge(ctx->ovs_idl, br_int_name(cfg));
 }
 
 static const char *
@@ -460,11 +464,8 @@ restore_ct_zones(struct ovsdb_idl *ovs_idl,
         return;
     }
 
-    const char *br_int_name = smap_get_def(&cfg->external_ids, "ovn-bridge",
-                                           DEFAULT_BRIDGE_NAME);
-
     const struct ovsrec_bridge *br_int;
-    br_int = get_bridge(ovs_idl, br_int_name);
+    br_int = get_bridge(ovs_idl, br_int_name(cfg));
     if (!br_int) {
         /* If the integration bridge hasn't been defined, assume that
          * any existing ct-zone definitions aren't valid. */
@@ -572,6 +573,137 @@ create_ovnsb_indexes(struct ovsdb_idl *ovnsb_idl)
                                OVSDB_INDEX_ASC, NULL);
 }
 
+struct ed_type_runtime_data {
+    struct chassis_index *chassis_index;
+    struct hmap *local_datapaths;
+    struct sset *local_lports;
+    struct sset *local_lport_ids;
+    struct sset *active_tunnels;
+    struct shash *addr_sets;
+};
+
+static void
+runtime_data_run(struct engine_node *node)
+{
+    struct controller_ctx *ctx = (struct controller_ctx *)node->context;
+    struct ed_type_runtime_data *data = (struct ed_type_runtime_data *)node->data;
+    struct hmap *local_datapaths = data->local_datapaths;
+    struct sset *local_lports = data->local_lports;
+    struct sset *local_lport_ids = data->local_lport_ids;
+    struct sset *active_tunnels = data->active_tunnels;
+    struct chassis_index *chassis_index = data->chassis_index;
+    struct shash *addr_sets = data->addr_sets;
+
+    static bool first_run = true;
+    if (first_run) {
+        /* don't cleanup since there is no data yet */
+        // TODO: add a cleanup function for engine for final cleanup upon exit.
+        first_run = false;
+    } else {
+        struct local_datapath *cur_node, *next_node;
+        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, local_datapaths) {
+            free(cur_node->peer_dps);
+            hmap_remove(local_datapaths, &cur_node->hmap_node);
+            free(cur_node);
+        }
+        hmap_clear(local_datapaths);
+        sset_destroy(local_lports);
+        sset_destroy(local_lport_ids);
+        sset_destroy(active_tunnels);
+        chassis_index_destroy(chassis_index);
+        expr_addr_sets_destroy(addr_sets);
+        shash_destroy(addr_sets);
+    }
+
+    chassis_index_init(chassis_index, ctx->ovnsb_idl);
+    sset_init(local_lports);
+    sset_init(local_lport_ids);
+    sset_init(active_tunnels);
+    const char *chassis_id = get_chassis_id(ctx->ovs_idl);
+    const struct ovsrec_bridge *br_int = get_br_int(ctx);
+
+    ovs_assert(br_int && chassis_id);
+    const struct sbrec_chassis *chassis = NULL;
+    chassis = get_chassis(ctx->ovnsb_idl, chassis_id);
+    ovs_assert(chassis);
+
+    bfd_calculate_active_tunnels(br_int, active_tunnels);
+    binding_run(ctx, br_int, chassis,
+                chassis_index, active_tunnels, local_datapaths,
+                local_lports, local_lport_ids);
+
+    addr_sets_init(ctx, addr_sets);
+    update_ct_zones(local_lports, local_datapaths, ctx->ct_zones,
+                    ctx->ct_zone_bitmap, ctx->pending_ct_zones);
+    update_sb_monitors(ctx->ovnsb_idl, chassis,
+                       local_lports, local_datapaths);
+
+    node->changed = true;
+}
+
+struct ed_type_flow_output {
+    struct hmap *flow_table;
+    struct ovn_extend_table *group_table;
+    struct ovn_extend_table *meter_table;
+};
+
+static void
+flow_output_run(struct engine_node *node)
+{
+    struct controller_ctx *ctx = (struct controller_ctx *)node->context;
+    struct ed_type_runtime_data *data =
+        (struct ed_type_runtime_data *)engine_get_input(
+            "runtime_data", node)->data;
+    struct hmap *local_datapaths = data->local_datapaths;
+    struct sset *local_lports = data->local_lports;
+    struct sset *local_lport_ids = data->local_lport_ids;
+    struct sset *active_tunnels = data->active_tunnels;
+    struct chassis_index *chassis_index = data->chassis_index;
+    struct shash *addr_sets = data->addr_sets;
+    const struct ovsrec_bridge *br_int = get_br_int(ctx);
+
+    const char *chassis_id = get_chassis_id(ctx->ovs_idl);
+
+    const struct sbrec_chassis *chassis = NULL;
+    if (chassis_id) {
+        chassis = get_chassis(ctx->ovnsb_idl, chassis_id);
+    }
+
+    ovs_assert(br_int && chassis);
+
+    struct hmap *flow_table =
+        ((struct ed_type_flow_output *)node->data)->flow_table;
+    struct ovn_extend_table *group_table =
+        ((struct ed_type_flow_output *)node->data)->group_table;
+    struct ovn_extend_table *meter_table =
+        ((struct ed_type_flow_output *)node->data)->meter_table;
+
+    if (ctx->ovs_idl_txn) {
+        static bool first_run = true;
+        if (first_run) {
+            first_run = false;
+        } else {
+            hmap_clear(flow_table);
+        }
+        commit_ct_zones(br_int, ctx->pending_ct_zones);
+
+        lflow_run(ctx, chassis,
+                  chassis_index, local_datapaths, group_table,
+                  meter_table, addr_sets, flow_table, active_tunnels,
+                  local_lport_ids);
+
+        bfd_run(ctx, br_int, chassis, local_datapaths,
+                chassis_index);
+        enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
+
+        physical_run(ctx, mff_ovn_geneve,
+                     br_int, chassis, ctx->ct_zones,
+                     flow_table, local_datapaths, local_lports,
+                     chassis_index, active_tunnels);
+    }
+    node->changed = true;
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -642,6 +774,50 @@ main(int argc, char *argv[])
     unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
                              &pending_pkt);
 
+    struct controller_ctx ctx;
+    ctx.ct_zone_bitmap = ct_zone_bitmap;
+    ctx.pending_ct_zones = &pending_ct_zones;
+    ctx.ct_zones = &ct_zones;
+
+    /* Contains "struct local_datapath" nodes. */
+    struct hmap local_datapaths = HMAP_INITIALIZER(&local_datapaths);
+
+    /* Contains the name of each logical port resident on the local
+     * hypervisor.  These logical ports include the VIFs (and their child
+     * logical ports, if any) that belong to VMs running on the hypervisor,
+     * l2gateway ports for which options:l2gateway-chassis designates the
+     * local hypervisor, and localnet ports. */
+    struct sset local_lports = SSET_INITIALIZER(&local_lports);
+    /* Contains the same ports as local_lports, but in the format:
+     * <datapath-tunnel-key>_<port-tunnel-key> */
+    struct sset local_lport_ids = SSET_INITIALIZER(&local_lport_ids);
+    struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
+    struct chassis_index chassis_index;
+    struct shash addr_sets = SHASH_INITIALIZER(&addr_sets);
+
+    struct ed_type_runtime_data ed_runtime_data = {
+        .chassis_index = &chassis_index,
+        .local_datapaths = &local_datapaths,
+        .local_lports = &local_lports,
+        .local_lport_ids = &local_lport_ids,
+        .active_tunnels = &active_tunnels,
+        .addr_sets = &addr_sets
+    };
+
+    struct hmap flow_table = HMAP_INITIALIZER(&flow_table);
+
+    struct ed_type_flow_output ed_flow_output = {
+        .flow_table = &flow_table,
+        .group_table = &group_table,
+        .meter_table = &meter_table
+    };
+
+    ENGINE_NODE(runtime_data, "runtime_data");
+    ENGINE_NODE(flow_output, "flow_output");
+
+    engine_add_input(&en_flow_output, &en_runtime_data, NULL);
+
+    uint64_t engine_run_id = 0;
     /* Main loop. */
     exiting = false;
     while (!exiting) {
@@ -655,93 +831,49 @@ main(int argc, char *argv[])
             free(new_ovnsb_remote);
         }
 
-        struct controller_ctx ctx = {
-            .ovs_idl = ovs_idl_loop.idl,
-            .ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop),
-            .ovnsb_idl = ovnsb_idl_loop.idl,
-            .ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
-        };
+        ctx.ovs_idl = ovs_idl_loop.idl;
+        ctx.ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop);
+        ctx.ovnsb_idl = ovnsb_idl_loop.idl;
+        ctx.ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop);
 
         update_probe_interval(&ctx, ovnsb_remote);
 
         update_ssl_config(ctx.ovs_idl);
 
-        /* Contains "struct local_datapath" nodes. */
-        struct hmap local_datapaths = HMAP_INITIALIZER(&local_datapaths);
-
-        /* Contains the name of each logical port resident on the local
-         * hypervisor.  These logical ports include the VIFs (and their child
-         * logical ports, if any) that belong to VMs running on the hypervisor,
-         * l2gateway ports for which options:l2gateway-chassis designates the
-         * local hypervisor, and localnet ports. */
-        struct sset local_lports = SSET_INITIALIZER(&local_lports);
-        /* Contains the same ports as local_lports, but in the format:
-         * <datapath-tunnel-key>_<port-tunnel-key> */
-        struct sset local_lport_ids = SSET_INITIALIZER(&local_lport_ids);
-        struct sset active_tunnels = SSET_INITIALIZER(&active_tunnels);
-
         const struct ovsrec_bridge *br_int = get_br_int(&ctx);
+        if (!br_int) {
+            br_int = create_br_int(&ctx);
+        }
         const char *chassis_id = get_chassis_id(ctx.ovs_idl);
+        const struct sbrec_chassis *chassis
+            = chassis_id ? chassis_run(&ctx, chassis_id, br_int) : NULL;
 
-        struct chassis_index chassis_index;
-
-        chassis_index_init(&chassis_index, ctx.ovnsb_idl);
-
-        const struct sbrec_chassis *chassis = NULL;
-        if (chassis_id) {
-            chassis = chassis_run(&ctx, chassis_id, br_int);
-            encaps_run(&ctx, br_int, chassis_id);
-            bfd_calculate_active_tunnels(br_int, &active_tunnels);
-            binding_run(&ctx, br_int, chassis,
-                        &chassis_index, &active_tunnels, &local_datapaths,
-                        &local_lports, &local_lport_ids);
-        }
         if (br_int && chassis) {
-            struct shash addr_sets = SHASH_INITIALIZER(&addr_sets);
-            addr_sets_init(&ctx, &addr_sets);
-
+            ofctrl_run(br_int, &pending_ct_zones);
             patch_run(&ctx, br_int, chassis);
+            encaps_run(&ctx, br_int, chassis_id);
 
-            enum mf_field_id mff_ovn_geneve = ofctrl_run(br_int,
-                                                         &pending_ct_zones);
+            if (ofctrl_can_put()) {
+                engine_run(&en_flow_output, ++engine_run_id);
 
+                ofctrl_put(&flow_table, &pending_ct_zones,
+                           get_nb_cfg(ctx.ovnsb_idl));
+            }
             pinctrl_run(&ctx, br_int, chassis, &chassis_index,
                         &local_datapaths, &active_tunnels);
-            update_ct_zones(&local_lports, &local_datapaths, &ct_zones,
-                            ct_zone_bitmap, &pending_ct_zones);
-            if (ctx.ovs_idl_txn) {
-                if (ofctrl_can_put()) {
-                    commit_ct_zones(br_int, &pending_ct_zones);
-
-                    struct hmap flow_table = HMAP_INITIALIZER(&flow_table);
-                    lflow_run(&ctx, chassis,
-                              &chassis_index, &local_datapaths, &group_table,
-                              &meter_table, &addr_sets, &flow_table,
-                              &active_tunnels, &local_lport_ids);
-
-                    if (chassis_id) {
-                        bfd_run(&ctx, br_int, chassis, &local_datapaths,
-                                &chassis_index);
-                    }
-                    physical_run(&ctx, mff_ovn_geneve,
-                                 br_int, chassis, &ct_zones,
-                                 &flow_table, &local_datapaths, &local_lports,
-                                 &chassis_index, &active_tunnels);
-
-                    ofctrl_put(&flow_table, &pending_ct_zones,
-                               get_nb_cfg(ctx.ovnsb_idl));
-
-                    hmap_destroy(&flow_table);
-                }
-                if (ctx.ovnsb_idl_txn) {
-                    int64_t cur_cfg = ofctrl_get_cur_cfg();
-                    if (cur_cfg && cur_cfg != chassis->nb_cfg) {
-                        sbrec_chassis_set_nb_cfg(chassis, cur_cfg);
-                    }
-                }
+
+        }
+
+        if (ctx.ovnsb_idl_txn) {
+            int64_t cur_cfg = ofctrl_get_cur_cfg();
+            if (cur_cfg && cur_cfg != chassis->nb_cfg) {
+                sbrec_chassis_set_nb_cfg(chassis, cur_cfg);
             }
+        }
+
 
-            if (pending_pkt.conn) {
+        if (pending_pkt.conn) {
+            if (br_int && chassis) {
                 char *error = ofctrl_inject_pkt(br_int, pending_pkt.flow_s,
                                                 &addr_sets);
                 if (error) {
@@ -750,40 +882,14 @@ main(int argc, char *argv[])
                 } else {
                     unixctl_command_reply(pending_pkt.conn, NULL);
                 }
-                pending_pkt.conn = NULL;
-                free(pending_pkt.flow_s);
+            } else {
+                unixctl_command_reply_error(pending_pkt.conn,
+                                            "ovn-controller not ready.");
             }
-
-            update_sb_monitors(ctx.ovnsb_idl, chassis,
-                               &local_lports, &local_datapaths);
-
-            expr_addr_sets_destroy(&addr_sets);
-            shash_destroy(&addr_sets);
-        }
-
-        /* If we haven't handled the pending packet insertion
-         * request, the system is not ready. */
-        if (pending_pkt.conn) {
-            unixctl_command_reply_error(pending_pkt.conn,
-                                        "ovn-controller not ready.");
             pending_pkt.conn = NULL;
             free(pending_pkt.flow_s);
         }
 
-        chassis_index_destroy(&chassis_index);
-
-        sset_destroy(&local_lports);
-        sset_destroy(&local_lport_ids);
-        sset_destroy(&active_tunnels);
-
-        struct local_datapath *cur_node, *next_node;
-        HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, &local_datapaths) {
-            free(cur_node->peer_dps);
-            hmap_remove(&local_datapaths, &cur_node->hmap_node);
-            free(cur_node);
-        }
-        hmap_destroy(&local_datapaths);
-
         unixctl_server_run(unixctl);
 
         unixctl_server_wait(unixctl);
@@ -816,26 +922,44 @@ main(int argc, char *argv[])
         }
     }
 
+    expr_addr_sets_destroy(&addr_sets);
+    shash_destroy(&addr_sets);
+
+    chassis_index_destroy(&chassis_index);
+
+    sset_destroy(&local_lports);
+    sset_destroy(&local_lport_ids);
+    sset_destroy(&active_tunnels);
+    struct local_datapath *cur_node, *next_node;
+    HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node, &local_datapaths) {
+        free(cur_node->peer_dps);
+        hmap_remove(&local_datapaths, &cur_node->hmap_node);
+        free(cur_node);
+    }
+    hmap_destroy(&local_datapaths);
+
+    hmap_destroy(&flow_table);
+
     /* It's time to exit.  Clean up the databases. */
     bool done = false;
     while (!done) {
-        struct controller_ctx ctx = {
+        struct controller_ctx ctx_ = {
             .ovs_idl = ovs_idl_loop.idl,
             .ovs_idl_txn = ovsdb_idl_loop_run(&ovs_idl_loop),
             .ovnsb_idl = ovnsb_idl_loop.idl,
             .ovnsb_idl_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
         };
 
-        const struct ovsrec_bridge *br_int = get_br_int(&ctx);
-        const char *chassis_id = get_chassis_id(ctx.ovs_idl);
+        const struct ovsrec_bridge *br_int = get_br_int(&ctx_);
+        const char *chassis_id = get_chassis_id(ctx_.ovs_idl);
         const struct sbrec_chassis *chassis
-            = chassis_id ? get_chassis(ctx.ovnsb_idl, chassis_id) : NULL;
+            = chassis_id ? get_chassis(ctx_.ovnsb_idl, chassis_id) : NULL;
 
         /* Run all of the cleanup functions, even if one of them returns false.
          * We're done if all of them return true. */
-        done = binding_cleanup(&ctx, chassis);
-        done = chassis_cleanup(&ctx, chassis) && done;
-        done = encaps_cleanup(&ctx, br_int) && done;
+        done = binding_cleanup(&ctx_, chassis);
+        done = chassis_cleanup(&ctx_, chassis) && done;
+        done = encaps_cleanup(&ctx_, br_int) && done;
         if (done) {
             poll_immediate_wake();
         }
diff --git a/ovn/controller/ovn-controller.h b/ovn/controller/ovn-controller.h
index 6617b0c..ce8f010 100644
--- a/ovn/controller/ovn-controller.h
+++ b/ovn/controller/ovn-controller.h
@@ -29,6 +29,11 @@ struct controller_ctx {
 
     struct ovsdb_idl *ovs_idl;
     struct ovsdb_idl_txn *ovs_idl_txn;
+
+    // TODO: these should be output of runtime_data and input of flow_output?
+    unsigned long *ct_zone_bitmap;
+    struct shash *pending_ct_zones;
+    struct simap *ct_zones;
 };
 
 /* States to move through when a new conntrack zone has been allocated. */
-- 
2.1.0



More information about the dev mailing list