[ovs-dev] [PATCH 4/4] ovsdb: Parallel processing for monitor compose cond changes
anton.ivanov at cambridgegreys.com
anton.ivanov at cambridgegreys.com
Tue Aug 3 17:02:18 UTC 2021
From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
Processing monitor cond changes in parallel where applicable.
The cut-off at 32 entries is set based on experimentation
on a 6/12 Ryzen. It is reasonably conservative and should
be OK for other CPUs with higher cost of thread IPC.
Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
ovsdb/monitor.c | 143 ++++++++++++++++++++++++++++++++++++++++++++----
1 file changed, 132 insertions(+), 11 deletions(-)
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 7c7a9faea..a7f6c1cf9 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -1254,6 +1254,109 @@ ovsdb_monitor_compose_update(
return json;
}
+/* Parallel processing thread for conditional updates.
+ * This parallelisation is significantly less agressive,
+ * because ovsdb_monitor_table_condition_updated has to
+ * be run after each table.
+ */
+
+struct compose_cond_change_info {
+ compose_row_update_cb_func row_update;
+ struct json *json;
+ struct ovsdb_monitor_table *mt;
+ struct ovsdb_monitor_session_condition *condition;
+ struct ovsdb_monitor *dbmon;
+};
+
+static void *
+compose_cond_change_thread(void *arg)
+{
+ struct worker_control *control = (struct worker_control *) arg;
+ struct worker_pool *workload;
+ struct compose_cond_change_info *cci;
+ int bnum;
+
+ while (!stop_parallel_processing()) {
+ wait_for_work(control);
+ workload = (struct worker_pool *) control->workload;
+ cci = (struct compose_cond_change_info *) control->data;
+ if (stop_parallel_processing()) {
+ return NULL;
+ }
+ if (cci && workload) {
+ size_t max_columns = ovsdb_monitor_max_columns(cci->dbmon);
+ unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ struct json *table_json = NULL;
+
+ for (bnum = control->id;
+ bnum <= cci->mt->table->rows.mask;
+ bnum += workload->size) {
+
+ /* Iterate over all rows in table */
+ struct ovsdb_row *row;
+ HMAP_FOR_EACH_IN_PARALLEL (row,
+ hmap_node, bnum, &cci->mt->table->rows) {
+ struct json *row_json;
+
+ row_json =
+ ovsdb_monitor_compose_row_update2(cci->mt,
+ cci->condition,
+ OVSDB_ROW, row,
+ false, changed,
+ cci->mt->n_columns);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&cci->json,
+ cci->mt->table->schema->name,
+ &table_json, row_json,
+ ovsdb_row_get_uuid(row));
+ }
+ }
+ }
+ free(changed);
+ }
+ post_completed_work(control);
+ }
+ return NULL;
+}
+
+static struct worker_pool *monitor_pool_b = NULL;
+static bool pool_init_b_done = false;
+
+static void
+init_monitor_pool_b(void)
+{
+ if (!pool_init_b_done) {
+ if (can_parallelize_hashes(false)) {
+ monitor_pool_b = add_worker_pool(compose_cond_change_thread);
+ if (monitor_pool_b) {
+ int i;
+ for (i = 0; i < monitor_pool_b->size; i++) {
+ monitor_pool_b->controls[i].workload = monitor_pool_b;
+ }
+ }
+ }
+ }
+ pool_init_b_done = true;
+}
+
+static void
+pool_b_merge_cb(struct worker_pool *pool OVS_UNUSED,
+ void *fin_result,
+ void *result_frags,
+ int index)
+{
+ struct json **result = fin_result;
+ struct compose_cond_change_info *cci = result_frags;
+
+ parallel_json_merge_tables(result, cci[index].json);
+}
+
+/* Rather conservative cutoff. The JSON stack speed is such that > 8
+ * seems to perform better in parallel.
+ */
+
+#define POOL_B_CUTOFF 32
+
static struct json*
ovsdb_monitor_compose_cond_change_update(
struct ovsdb_monitor *dbmon,
@@ -1264,6 +1367,8 @@ ovsdb_monitor_compose_cond_change_update(
size_t max_columns = ovsdb_monitor_max_columns(dbmon);
unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ init_monitor_pool_b();
+
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
struct ovsdb_row *row;
@@ -1280,17 +1385,33 @@ ovsdb_monitor_compose_cond_change_update(
}
/* Iterate over all rows in table */
- HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
- struct json *row_json;
-
- row_json = ovsdb_monitor_compose_row_update2(mt, condition,
- OVSDB_ROW, row,
- false, changed,
- mt->n_columns);
- if (row_json) {
- ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
- &table_json, row_json,
- ovsdb_row_get_uuid(row));
+ if (monitor_pool_b && hmap_count(&mt->table->rows) > POOL_B_CUTOFF) {
+ struct compose_cond_change_info *cci =
+ xcalloc(sizeof(struct compose_cond_change_info),
+ monitor_pool_b->size);
+ int i;
+ for (i = 0; i < monitor_pool_b->size; i++) {
+ cci[i].json = NULL;
+ cci[i].mt = mt;
+ cci[i].condition = condition;
+ cci[i].dbmon = dbmon;
+ monitor_pool_b->controls[i].data = &cci[i];
+ }
+ run_pool_callback(monitor_pool_b, &json, cci, pool_b_merge_cb);
+ free(cci);
+ } else {
+ HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+ struct json *row_json;
+
+ row_json = ovsdb_monitor_compose_row_update2(mt, condition,
+ OVSDB_ROW, row,
+ false, changed,
+ mt->n_columns);
+ if (row_json) {
+ ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
+ &table_json, row_json,
+ ovsdb_row_get_uuid(row));
+ }
}
}
ovsdb_monitor_table_condition_updated(mt, condition);
--
2.20.1
More information about the dev
mailing list