[ovs-dev] [PATCH ovn RFC 3/5] Parallelise lrouter_flow generation

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Fri Jul 10 08:39:52 UTC 2020


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

Make the lrouter flow generation run in parallel.

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 northd/ovn-northd.c | 268 +++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 241 insertions(+), 27 deletions(-)

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index b41c523f2..53ea35de7 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -47,6 +47,7 @@
 #include "unixctl.h"
 #include "util.h"
 #include "uuid.h"
+#include "fasthmap.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_northd);
@@ -3964,7 +3965,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
     ovn_lflow_init(lflow, od, stage, priority,
                    xstrdup(match), xstrdup(actions),
                    ovn_lflow_hint(stage_hint), where);
-    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
+    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
 }
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
@@ -10380,43 +10381,250 @@ build_lrouter_egress_delivery(struct ovn_port *op, struct hmap *lflows)
     ds_destroy(&actions);
     ds_destroy(&match);
 }
+
+struct lrouter_flow_build_info {
+    struct hmap *datapaths;
+    struct hmap *ports;
+    struct hmap *lflows;
+    struct shash *meter_groups;
+    struct hmap *lbs;
+};
+
+static void
+build_lrouter_flows_od(struct ovn_datapath *od,
+        struct lrouter_flow_build_info *lfbi)
+{ 
+    build_lrouter_flow_table_0_od(od, lfbi->lflows);
+    build_lrouter_flow_table_1_and_2_od(od, lfbi->lflows);
+    build_lrouter_flow_table_3_od(od, lfbi->lflows);
+    build_lrouter_flow_nat_defrag_lb_od(od, lfbi->lflows, lfbi->meter_groups, lfbi->lbs);
+    build_lrouter_flow_ingress_ND_RA_od(od, lfbi->lflows);
+    build_lrouter_flow_datapath_to_static_route(od, lfbi->lflows, lfbi->ports);
+    build_lrouter_flow_multicast_lookup(od, lfbi->lflows);
+    build_lrouter_flow_ingress_policy(od, lfbi->lflows, lfbi->ports);
+    build_lrouter_flow_dest_unreachable(od, lfbi->lflows);
+    build_lrouter_arp_resolve_od(od, lfbi->lflows);
+    build_lrouter_check_pck_len_od(od, lfbi->lflows, lfbi->ports);
+    build_lrouter_gw_redirect_od(od, lfbi->lflows);
+    build_lrouter_arp_request_od(od, lfbi->lflows);
+}
+
+static void
+build_lrouter_flows_op(struct ovn_port *op,
+        struct lrouter_flow_build_info *lfbi)
+{
+    build_lrouter_flow_table_0_op(op, lfbi->lflows);
+    build_lrouter_flow_table_1_and_2_op(op, lfbi->lflows);
+    build_lrouter_flow_table_3_op(op, lfbi->lflows);
+    build_lrouter_flow_DHCP_v6_op(op, lfbi->lflows);
+    build_lrouter_flow_inpit_for_v6_op(op, lfbi->lflows);
+    build_lrouter_flow_ingress_ND_RA_op(op, lfbi->lflows);
+    build_lrouter_flow_ingress_ip_routing_ecmp(op, lfbi->lflows);
+    build_lrouter_arp_resolve_op(op, lfbi->lflows, lfbi->ports);
+    build_lrouter_egress_delivery(op, lfbi->lflows);
+}
+
+struct lrouter_thread_od_pool {
+    void (*od_helper_func)(struct ovn_datapath *od,
+            struct lrouter_flow_build_info *lfbi);
+    struct worker_pool *pool;
+};
+
+static void *build_lrouter_flows_od_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lrouter_thread_od_pool *workload;
+    struct lrouter_flow_build_info *lfbi;
+    struct ovn_datapath *od;
+    int bnum;
+
+
+    while (!seize_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lrouter_thread_od_pool *) control->workload;
+        lfbi = (struct lrouter_flow_build_info *) control->data;
+        if (lfbi && workload) {
+            for (bnum = control->id;
+                    bnum <= lfbi->datapaths->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        od, key_node, bnum, lfbi->datapaths) {
+                    if (seize_fire()) {
+                        return NULL;
+                    }
+                    (workload->od_helper_func)(od, lfbi);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+    }
+    return NULL;
+}
+
+struct lrouter_thread_op_pool {
+    void (*op_helper_func)(struct ovn_port *op,
+            struct lrouter_flow_build_info *lfbi);
+    struct worker_pool *pool;
+};
+
+static void *build_lrouter_flows_op_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lrouter_thread_op_pool *workload;
+    struct lrouter_flow_build_info *lfbi;
+    struct ovn_port *op;
+    int bnum;
+
+    while (!seize_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lrouter_thread_op_pool *) control->workload;
+        lfbi = (struct lrouter_flow_build_info *) control->data;
+        if (lfbi && workload) {
+            for (bnum = control->id;
+                    bnum <= lfbi->ports->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        op, key_node, bnum, lfbi->ports) {
+                    if (seize_fire()) {
+                        return NULL;
+                    }
+                    (workload->op_helper_func)(op, lfbi);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+    }
+    return NULL;
+}
+
+static struct lrouter_thread_od_pool *lrouter_od_pool = NULL;
+
+static void init_od_pool(void) {
+
+    int index;
+
+    if (!lrouter_od_pool) {
+        lrouter_od_pool =
+            xmalloc(sizeof(struct lrouter_thread_od_pool));
+        lrouter_od_pool->pool =
+            add_worker_pool(build_lrouter_flows_od_thread);
+        lrouter_od_pool->od_helper_func =
+            build_lrouter_flows_od;
+
+        for (index = 0; index < lrouter_od_pool->pool->size; index++) {
+            lrouter_od_pool->pool->controls[index].workload =
+                lrouter_od_pool;
+        }
+    }
+}
+
+
+static struct lrouter_thread_op_pool *lrouter_op_pool = NULL;
+
+
+static void init_op_pool(void) {
+
+    int index;
+
+    if (!lrouter_op_pool) {
+        lrouter_op_pool =
+            xmalloc(sizeof(struct lrouter_thread_op_pool));
+        lrouter_op_pool->pool =
+            add_worker_pool(build_lrouter_flows_op_thread);
+        lrouter_op_pool->op_helper_func =
+            build_lrouter_flows_op;
+
+        for (index = 0; index < lrouter_op_pool->pool->size; index++) {
+            lrouter_op_pool->pool->controls[index].workload =
+                lrouter_op_pool;
+        }
+    }
+}
+
+#define OD_CUTOFF 64
+/* This is probably still too low for ports, not sure if there is even
+ * a point to run them in parallel and at what point it should kick in.
+ */
+#define OP_CUTOFF 16
+
 static void
 build_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                     struct hmap *lflows, struct shash *meter_groups,
                     struct hmap *lbs)
 {
+
     struct ovn_datapath *od;
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        build_lrouter_flow_table_0_od(od, lflows);
-        build_lrouter_flow_table_1_and_2_od(od, lflows);
-        build_lrouter_flow_table_3_od(od, lflows);
-        build_lrouter_flow_nat_defrag_lb_od(od, lflows, meter_groups, lbs);
-        build_lrouter_flow_ingress_ND_RA_od(od, lflows);
-        build_lrouter_flow_datapath_to_static_route(od, lflows, ports);
-        build_lrouter_flow_multicast_lookup(od, lflows);
-        build_lrouter_flow_ingress_policy(od, lflows, ports);
-        build_lrouter_flow_dest_unreachable(od, lflows);
-        build_lrouter_arp_resolve_od(od, lflows);
-        build_lrouter_check_pck_len_od(od, lflows, ports);
-        build_lrouter_gw_redirect_od(od, lflows);
-        build_lrouter_arp_request_od(od, lflows);
+    struct ovn_port *op;
+    struct lrouter_flow_build_info *lfbi;
+    int index;
+
+
+    if (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF) {
+        init_od_pool();
+        init_op_pool();
+        lfbi = xmalloc(
+            sizeof(struct lrouter_flow_build_info) * lrouter_op_pool->pool->size);
+
+        for (index = 0; index < lrouter_od_pool->pool->size; index++) {
+
+
+            lfbi[index].datapaths = datapaths;
+            lfbi[index].ports = ports;
+            lfbi[index].meter_groups = meter_groups;
+            lfbi[index].lbs = lbs;
+        }
+    } else {
+        lfbi = xmalloc(
+            sizeof(struct lrouter_flow_build_info));
+
+            lfbi[0].datapaths = datapaths;
+            lfbi[0].ports = ports;
+            lfbi[0].meter_groups = meter_groups;
+            lfbi[0].lbs = lbs;
     }
 
-    struct ovn_port *op;
-    HMAP_FOR_EACH (op, key_node, ports) {
-        build_lrouter_flow_table_0_op(op, lflows);
-        build_lrouter_flow_table_1_and_2_op(op, lflows);
-        build_lrouter_flow_table_3_op(op, lflows);
-        build_lrouter_flow_DHCP_v6_op(op, lflows);
-        build_lrouter_flow_inpit_for_v6_op(op, lflows);
-        build_lrouter_flow_ingress_ND_RA_op(op, lflows);
-        build_lrouter_flow_ingress_ip_routing_ecmp(op, lflows);
-        build_lrouter_arp_resolve_op(op, lflows, ports);
-        build_lrouter_egress_delivery(op, lflows);
+    if (hmap_count(datapaths) > OD_CUTOFF) {
+        struct hmap *lflow_segs = xmalloc(
+            sizeof(struct hmap) * lrouter_od_pool->pool->size);
+        for (index = 0; index < lrouter_od_pool->pool->size; index++) {
+            fast_hmap_init(&lflow_segs[index], lflows->mask);
+            lfbi[index].lflows = &lflow_segs[index];
+            lrouter_od_pool->pool->controls[index].data = &lfbi[index];
+        }
+        run_pool_hash(lrouter_od_pool->pool, lflows, lflow_segs);
+        free(lflow_segs);
+    } else {
+        lfbi[0].lflows = lflows;
+        HMAP_FOR_EACH (od, key_node, datapaths) {
+            build_lrouter_flows_od(od, &lfbi[0]);
+        }
     }
 
+    if (hmap_count(ports) > OP_CUTOFF) {
+        struct hmap *lflow_segs = xmalloc(
+            sizeof(struct hmap) * lrouter_op_pool->pool->size);
+        for (index = 0; index < lrouter_op_pool->pool->size; index++) {
+            fast_hmap_init(&lflow_segs[index], lflows->mask);
+            lfbi[index].lflows = &lflow_segs[index];
+            lrouter_op_pool->pool->controls[index].data = &lfbi[index];
+        }
+        run_pool_hash(lrouter_op_pool->pool, lflows, lflow_segs);
+        free(lflow_segs);
+    } else {
+        lfbi[0].lflows = lflows;
+        HMAP_FOR_EACH (op, key_node, ports) {
+            build_lrouter_flows_op(op, &lfbi[0]);
+        }
+    }
+    free(lfbi);
 }
 
+static ssize_t max_seen_lflow_size = 128;
+
 /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
  * constructing their contents based on the OVN_NB database. */
 static void
@@ -10426,12 +10634,18 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
              struct shash *meter_groups,
              struct hmap *lbs)
 {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap lflows;
+
+    fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
     build_lswitch_flows(datapaths, ports, port_groups, &lflows, mcgroups,
                         igmp_groups, meter_groups, lbs);
     build_lrouter_flows(datapaths, ports, &lflows, meter_groups, lbs);
 
+    if (hmap_count(&lflows) > max_seen_lflow_size) {
+        max_seen_lflow_size = hmap_count(&lflows);
+    }
+
     /* Push changes to the Logical_Flow table to database. */
     const struct sbrec_logical_flow *sbflow, *next_sbflow;
     SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
-- 
2.20.1



More information about the dev mailing list