[ovs-dev] [PATCH ovn RFC v3 28/29] Parallel reconciliation of southdb flows

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Thu Jul 16 13:19:26 UTC 2020


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 northd/ovn-northd.c | 196 ++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 178 insertions(+), 18 deletions(-)

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index a53b31906..ef188fb88 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -4146,7 +4146,9 @@ static void
 ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
 {
     if (lflow) {
-        hmap_remove(lflows, &lflow->hmap_node);
+        if (lflows) {
+            hmap_remove(lflows, &lflow->hmap_node);
+        }
         free(lflow->match);
         free(lflow->actions);
         free(lflow->stage_hint);
@@ -11071,6 +11073,120 @@ build_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
     }
  
 }
+
+struct sbrec_result {
+    struct ovs_list list_node;
+    const struct sbrec_logical_flow *sbflow;
+    struct ovn_lflow *lflow;
+    ssize_t lflow_hash;
+};
+
+struct reconcile_info {
+    struct northd_context *ctx;
+    struct hmap *lflows;
+    struct hmap *datapaths;
+    struct ovs_list results;
+};
+
+struct lflow_reconciliation_pool {
+    struct worker_pool *pool;
+};
+
+static void *reconciliation_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lflow_reconciliation_pool *workload;
+    struct reconcile_info *ri;
+    struct sbrec_result *res;
+
+    while (!seize_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lflow_reconciliation_pool *) control->workload;
+        ri = (struct reconcile_info *) control->data;
+        if (ri && workload) {
+            /* Push changes to the Logical_Flow table to database. */
+            const struct sbrec_logical_flow *sbflow;
+            SBREC_LOGICAL_FLOW_PARALLEL_FOR_EACH(sbflow, ri->ctx->ovnsb_idl, control->id,  workload->pool->size) {
+                struct ovn_datapath *od
+                    = ovn_datapath_from_sbrec(ri->datapaths, sbflow->logical_datapath);
+                res = xmalloc(sizeof(struct sbrec_result));
+
+                if (!od || ovn_datapath_is_stale(od)) {
+                    res->sbflow = sbflow;
+                    res->lflow = NULL;
+                    ovs_list_push_back(&ri->results, &res->list_node);
+                    continue;
+                }
+
+                enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
+                enum ovn_pipeline pipeline
+                    = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
+                struct ovn_lflow *lflow = ovn_lflow_find(
+                    ri->lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
+                    sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
+                if (lflow) {
+                    res->lflow = lflow;
+                    res->sbflow = sbflow;
+                    res->lflow_hash = lflow->hmap_node.hash;
+                } else {
+                    res->sbflow = sbflow;
+                    res->lflow = NULL;
+                }
+                ovs_list_push_back(&ri->results, &res->list_node);
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+    }
+    return NULL;
+}
+
+static struct lflow_reconciliation_pool *reconcile_pool = NULL;
+
+static void init_reconciliation_pool(void) {
+
+    int index;
+
+    if (!reconcile_pool) {
+        reconcile_pool =
+            xmalloc(sizeof(struct lflow_reconciliation_pool));
+        reconcile_pool->pool =
+            add_worker_pool(reconciliation_thread);
+
+        for (index = 0; index < reconcile_pool->pool->size; index++) {
+            reconcile_pool->pool->controls[index].workload =
+                reconcile_pool;
+        }
+    }
+}
+
+/* Removes 'node' from 'hmap' if present. Does not shrink the hash table; call
+ * hmap_shrink() directly if desired. 
+ * Returns true if the node was found and removed, false otherwise.
+ * It needs both a node and a hash in order to function even if the node
+ * has already been freed.
+ */
+static bool
+hmap_safe_remove(struct hmap *hmap, struct hmap_node *node, size_t hash)
+{
+    struct hmap_node **bucket = &hmap->buckets[hash & hmap->mask];
+
+    if (!node) {
+        return false;
+    }
+
+    while ((*bucket) && (*bucket != node)) {
+        bucket = &(*bucket)->next;
+    }
+    if (*bucket) {
+        *bucket = node->next;
+        hmap->n--;
+        return true;
+    }
+    return false;
+}
+
+#define RECONCILE_CUTOFF 1
  
 static ssize_t max_seen_lflow_size = 128;
 
@@ -11084,6 +11200,7 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
              struct hmap *lbs)
 {
     struct hmap lflows;
+    const struct sbrec_logical_flow *sbflow;
 
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
@@ -11096,27 +11213,70 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
     }
 
     /* Push changes to the Logical_Flow table to database. */
-    const struct sbrec_logical_flow *sbflow, *next_sbflow;
-    SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
-        struct ovn_datapath *od
-            = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath);
 
-        if (!od || ovn_datapath_is_stale(od)) {
-            sbrec_logical_flow_delete(sbflow);
-            continue;
+    if (hmap_count(&lflows) < RECONCILE_CUTOFF) {
+        /* Push changes to the Logical_Flow table to database. */
+        const struct sbrec_logical_flow *next_sbflow;
+        SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
+            struct ovn_datapath *od
+                = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath);
+            if (!od || ovn_datapath_is_stale(od)) {
+                sbrec_logical_flow_delete(sbflow);
+                continue;
+            }
+
+            enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
+            enum ovn_pipeline pipeline
+                = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
+            struct ovn_lflow *lflow = ovn_lflow_find(
+                &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
+                sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
+            if (lflow) {
+                ovn_lflow_destroy(&lflows, lflow);
+            } else {
+                sbrec_logical_flow_delete(sbflow);
+            }
         }
+    } else {
+        struct reconcile_info *ri;
+        struct ovs_list *combined_result = NULL;
+        struct ovs_list **results = NULL;
+        int index;
+        init_reconciliation_pool();
 
-        enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
-        enum ovn_pipeline pipeline
-            = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
-        struct ovn_lflow *lflow = ovn_lflow_find(
-            &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
-            sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
-        if (lflow) {
-            ovn_lflow_destroy(&lflows, lflow);
-        } else {
-            sbrec_logical_flow_delete(sbflow);
+        ri = xmalloc(sizeof(struct reconcile_info) *
+                reconcile_pool->pool->size);
+        results = xmalloc(sizeof(struct ovs_list *) *
+                reconcile_pool->pool->size);
+
+        for (index = 0;
+                index < reconcile_pool->pool->size; index++) {
+
+            ri[index].lflows = &lflows;
+            ri[index].datapaths = datapaths;
+            ri[index].ctx = ctx;
+            ovs_list_init(&ri[index].results);
+            results[index] = &ri[index].results;
+            reconcile_pool->pool->controls[index].data = &ri[index];
         }
+
+        run_pool_list(
+            reconcile_pool->pool,
+            &combined_result,
+            results);
+
+        struct sbrec_result *res;
+        LIST_FOR_EACH_POP (res, list_node, combined_result) {
+            if (hmap_safe_remove(&lflows, &res->lflow->hmap_node, res->lflow_hash)) {
+                ovn_lflow_destroy(NULL, res->lflow);
+            } else {
+                 sbrec_logical_flow_delete(res->sbflow);
+            }
+            free(res);
+         }
+        free(results);
+        free(ri);
+
     }
     struct ovn_lflow *lflow, *next_lflow;
     HMAP_FOR_EACH_SAFE (lflow, next_lflow, hmap_node, &lflows) {
-- 
2.20.1



More information about the dev mailing list