[ovs-dev] [PATCH 4/4] ovsdb: Parallel processing for monitor compose cond changes

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Tue Aug 3 17:02:18 UTC 2021


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

Processing monitor cond changes in parallel where applicable.

The cut-off at 32 entries is set based on experimentation
on a 6/12 Ryzen. It is reasonably conservative and should
be OK for other CPUs with higher cost of thread IPC.

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 ovsdb/monitor.c | 143 ++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 132 insertions(+), 11 deletions(-)

diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 7c7a9faea..a7f6c1cf9 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -1254,6 +1254,109 @@ ovsdb_monitor_compose_update(
     return json;
 }
 
+/* Parallel processing thread for conditional updates.
+ * This parallelisation is significantly less agressive,
+ * because ovsdb_monitor_table_condition_updated has to
+ * be run after each table.
+ */
+
+struct compose_cond_change_info {
+    compose_row_update_cb_func row_update;
+    struct json *json;
+    struct ovsdb_monitor_table *mt;
+    struct ovsdb_monitor_session_condition *condition;
+    struct ovsdb_monitor *dbmon;
+};
+
+static void *
+compose_cond_change_thread(void *arg)
+{
+    struct worker_control *control = (struct worker_control *) arg;
+    struct worker_pool *workload;
+    struct compose_cond_change_info *cci;
+    int bnum;
+
+    while (!stop_parallel_processing()) {
+        wait_for_work(control);
+        workload = (struct worker_pool *) control->workload;
+        cci = (struct compose_cond_change_info *) control->data;
+        if (stop_parallel_processing()) {
+            return NULL;
+        }
+        if (cci && workload) {
+            size_t max_columns = ovsdb_monitor_max_columns(cci->dbmon);
+            unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+            struct json *table_json = NULL;
+
+            for (bnum = control->id;
+                        bnum <= cci->mt->table->rows.mask;
+                        bnum += workload->size) {
+
+                /* Iterate over all rows in table */
+                struct ovsdb_row *row;
+                HMAP_FOR_EACH_IN_PARALLEL (row,
+                        hmap_node, bnum, &cci->mt->table->rows) {
+                    struct json *row_json;
+
+                    row_json =
+                        ovsdb_monitor_compose_row_update2(cci->mt,
+                                                          cci->condition,
+                                                          OVSDB_ROW, row,
+                                                          false, changed,
+                                                          cci->mt->n_columns);
+                    if (row_json) {
+                        ovsdb_monitor_add_json_row(&cci->json,
+                                                   cci->mt->table->schema->name,
+                                                   &table_json, row_json,
+                                                   ovsdb_row_get_uuid(row));
+                    }
+                }
+            }
+            free(changed);
+        }
+        post_completed_work(control);
+    }
+    return NULL;
+}
+
+static struct worker_pool *monitor_pool_b = NULL;
+static bool pool_init_b_done = false;
+
+static void
+init_monitor_pool_b(void)
+{
+    if (!pool_init_b_done) {
+        if (can_parallelize_hashes(false)) {
+            monitor_pool_b = add_worker_pool(compose_cond_change_thread);
+            if (monitor_pool_b) {
+                int i;
+                for (i = 0; i < monitor_pool_b->size; i++) {
+                   monitor_pool_b->controls[i].workload = monitor_pool_b;
+                }
+            }
+        }
+    }
+    pool_init_b_done = true;
+}
+
+static void
+pool_b_merge_cb(struct worker_pool *pool OVS_UNUSED,
+              void *fin_result,
+              void *result_frags,
+              int index)
+{
+    struct json **result = fin_result;
+    struct compose_cond_change_info *cci = result_frags;
+
+    parallel_json_merge_tables(result, cci[index].json);
+}
+
+/* Rather conservative cutoff. The JSON stack speed is such that > 8
+ * seems to perform better in parallel.
+ */
+
+#define POOL_B_CUTOFF 32
+
 static struct json*
 ovsdb_monitor_compose_cond_change_update(
                     struct ovsdb_monitor *dbmon,
@@ -1264,6 +1367,8 @@ ovsdb_monitor_compose_cond_change_update(
     size_t max_columns = ovsdb_monitor_max_columns(dbmon);
     unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
 
+    init_monitor_pool_b();
+
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
         struct ovsdb_row *row;
@@ -1280,17 +1385,33 @@ ovsdb_monitor_compose_cond_change_update(
         }
 
         /* Iterate over all rows in table */
-        HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
-            struct json *row_json;
-
-            row_json = ovsdb_monitor_compose_row_update2(mt, condition,
-                                                         OVSDB_ROW, row,
-                                                         false, changed,
-                                                         mt->n_columns);
-            if (row_json) {
-                ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
-                                           &table_json, row_json,
-                                           ovsdb_row_get_uuid(row));
+        if (monitor_pool_b && hmap_count(&mt->table->rows) > POOL_B_CUTOFF) {
+            struct compose_cond_change_info *cci =
+                xcalloc(sizeof(struct compose_cond_change_info),
+                        monitor_pool_b->size);
+            int i;
+            for (i = 0; i < monitor_pool_b->size; i++) {
+                cci[i].json = NULL;
+                cci[i].mt = mt;
+                cci[i].condition = condition;
+                cci[i].dbmon = dbmon;
+                monitor_pool_b->controls[i].data = &cci[i];
+            }
+            run_pool_callback(monitor_pool_b, &json, cci, pool_b_merge_cb);
+            free(cci);
+        } else {
+            HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+                struct json *row_json;
+
+                row_json = ovsdb_monitor_compose_row_update2(mt, condition,
+                                                             OVSDB_ROW, row,
+                                                             false, changed,
+                                                             mt->n_columns);
+                if (row_json) {
+                    ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+                                               &table_json, row_json,
+                                               ovsdb_row_get_uuid(row));
+                }
             }
         }
         ovsdb_monitor_table_condition_updated(mt, condition);
-- 
2.20.1



More information about the dev mailing list