[ovs-dev] [PATCH 2/2] Run monitor processing in parallel
anton.ivanov at cambridgegreys.com
anton.ivanov at cambridgegreys.com
Mon Jul 6 08:36:50 UTC 2020
From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
Monitor processing performs a full walk of all rows in all
tables referenced in a monitor.
The rows are internally represented as a hash map.
This operation can be run in parallel where thread M out N is
walking hash buckets M, N+M, etc.
Running inter-thread IPC for only a handful of datum values
is not likely to be efficient, hence there is a cut-off. Tables
smaller than the cut-off value are run in the main thread.
Larger tables are run via multiple threads.
Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
lib/ovsdb-idl.c | 31 ++++
lib/ovsdb-idl.h | 6 +
ovsdb/monitor.c | 358 ++++++++++++++++++++++++++++++++++++++++----
ovsdb/ovsdb-idlc.in | 26 ++++
4 files changed, 394 insertions(+), 27 deletions(-)
diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
index 0a18261fc..0720e7a68 100644
--- a/lib/ovsdb-idl.c
+++ b/lib/ovsdb-idl.c
@@ -45,6 +45,7 @@
#include "svec.h"
#include "util.h"
#include "uuid.h"
+#include "fasthmap.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_idl);
@@ -3543,6 +3544,24 @@ ovsdb_idl_first_row(const struct ovsdb_idl *idl,
return next_real_row(table, hmap_first(&table->rows));
}
+/* Returns a row in 'table_class''s table slice in 'idl', or a null pointer
+ * if that table slice is empty.
+ *
+ * Database tables are internally maintained as hash tables, so adding or
+ * removing rows while traversing the same table can cause some rows to be
+ * visited twice or not at apply. */
+const struct ovsdb_idl_row *
+parallel_ovsdb_idl_first_row(const struct ovsdb_idl *idl,
+ const struct ovsdb_idl_table_class *table_class,
+ ssize_t job_id,
+ ssize_t pool_size)
+{
+ struct ovsdb_idl_table *table = ovsdb_idl_table_from_class(idl,
+ table_class);
+ return next_real_row(
+ table, parallel_hmap_first(&table->rows, job_id, pool_size));
+}
+
/* Returns a row following 'row' within its table, or a null pointer if 'row'
* is the last row in its table. */
const struct ovsdb_idl_row *
@@ -3553,6 +3572,18 @@ ovsdb_idl_next_row(const struct ovsdb_idl_row *row)
return next_real_row(table, hmap_next(&table->rows, &row->hmap_node));
}
+/* Returns a row following 'row' within its table slice, or a null pointer
+ * if 'row' is the last row in its table slice. */
+const struct ovsdb_idl_row *
+parallel_ovsdb_idl_next_row(
+ const struct ovsdb_idl_row *row, ssize_t pool_size)
+{
+ struct ovsdb_idl_table *table = row->table;
+
+ return next_real_row(table,
+ parallel_hmap_next(&table->rows, &row->hmap_node, pool_size));
+}
+
/* Reads and returns the value of 'column' within 'row'. If an ongoing
* transaction has changed 'column''s value, the modified value is returned.
*
diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h
index c56cd19b1..935db0dbe 100644
--- a/lib/ovsdb-idl.h
+++ b/lib/ovsdb-idl.h
@@ -235,6 +235,12 @@ const struct ovsdb_idl_row *ovsdb_idl_first_row(
const struct ovsdb_idl *, const struct ovsdb_idl_table_class *);
const struct ovsdb_idl_row *ovsdb_idl_next_row(const struct ovsdb_idl_row *);
+const struct ovsdb_idl_row *parallel_ovsdb_idl_first_row(
+ const struct ovsdb_idl *, const struct ovsdb_idl_table_class *,
+ ssize_t job_id, ssize_t pool);
+const struct ovsdb_idl_row *parallel_ovsdb_idl_next_row(
+ const struct ovsdb_idl_row *, ssize_t pool);
+
const struct ovsdb_datum *ovsdb_idl_read(const struct ovsdb_idl_row *,
const struct ovsdb_idl_column *);
const struct ovsdb_datum *ovsdb_idl_get(const struct ovsdb_idl_row *,
diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
index 1c66b428e..d950aef38 100644
--- a/ovsdb/monitor.c
+++ b/ovsdb/monitor.c
@@ -37,6 +37,7 @@
#include "jsonrpc-server.h"
#include "monitor.h"
#include "util.h"
+#include "fasthmap.h"
#include "openvswitch/vlog.h"
VLOG_DEFINE_THIS_MODULE(ovsdb_monitor);
@@ -1085,7 +1086,6 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
const struct uuid *row_uuid)
{
char uuid[UUID_LEN + 1];
-
/* Create JSON object for transaction overall. */
if (!*json) {
*json = json_object_create();
@@ -1102,6 +1102,140 @@ ovsdb_monitor_add_json_row(struct json **json, const char *table_name,
json_object_put(*table_json, uuid, row_json);
}
+/* We cannot reuse the same multi-threaded conventions as for the cond
+ * update (further down) as the calling conventions and data supplied
+ * to update generation are quite different
+ *
+ * In addition to this, key parts of the code are dependent on container
+ * macros making it impossible to reuse processing code between the two.
+ *
+ * The only part which we can reuse is json result processing.
+ */
+
+/* To debug parallel processing set this to zero */
+
+#define PARALLEL_CUT_OFF 64
+
+struct mon_json_result {
+ struct ovs_list list_node;
+ const char *table_name;
+ const struct uuid *row_uuid;
+ struct json *row_json;
+};
+
+struct monitor_compose_update_info {
+ struct ovsdb_monitor_change_set_for_table *mcst;
+ const struct ovsdb_monitor_session_condition *condition;
+ unsigned long int *changed;
+ struct ovs_list results;
+ compose_row_update_cb_func row_update;
+ bool initial;
+};
+
+struct monitor_compose_update_pool {
+ void (*row_helper_func)(struct ovsdb_monitor_row *row,
+ struct monitor_compose_update_info *mci);
+ struct worker_pool *pool;
+};
+
+
+static void ovsdb_monitor_compose_update_process_one_row (
+ struct ovsdb_monitor_row *row,
+ struct monitor_compose_update_info *mci)
+{
+ struct json *row_json;
+ struct mon_json_result *temp;
+
+ row_json = (*mci->row_update)(
+ mci->mcst->mt, mci->condition, OVSDB_MONITOR_ROW, row,
+ mci->initial, mci->changed, mci->mcst->n_columns);
+ if (row_json) {
+ temp = xmalloc(sizeof(struct mon_json_result));
+ temp->table_name = mci->mcst->mt->table->schema->name;
+ temp->row_uuid = &row->uuid;
+ temp->row_json = row_json;
+ ovs_list_push_back(&mci->results, &temp->list_node);
+ }
+}
+
+
+static void *monitor_compose_update_thread(void *arg) {
+ struct worker_control *control = (struct worker_control *) arg;
+ struct monitor_compose_update_pool *workload;
+ struct monitor_compose_update_info *mci;
+ struct ovsdb_monitor_row *row, *next;
+ struct hmap *table_rows;
+ int bnum;
+
+
+ while (!seize_fire()) {
+ sem_wait(&control->fire);
+ if (seize_fire()) {
+ return NULL;
+ }
+ workload = (struct monitor_compose_update_pool *) control->workload;
+ mci = (struct monitor_compose_update_info *) control->data;
+ if (mci && workload) {
+ table_rows = (struct hmap *) &mci->mcst->rows;
+ for (bnum = control->id;
+ bnum <= table_rows->mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL_SAFE (
+ row, next, hmap_node, bnum, table_rows) {
+ if (seize_fire()) {
+ return NULL;
+ }
+ (workload->row_helper_func)(row, control->data);
+ }
+ }
+ atomic_store_relaxed(&control->finished, true);
+ atomic_thread_fence(memory_order_release);
+ }
+ sem_post(control->done);
+ }
+ return NULL;
+}
+
+
+static void
+ ovsdb_monitor_compose_cond_change_update_generate_json(
+ struct json **json, struct json **table_json,
+ struct ovs_list *temp_result) {
+ struct mon_json_result * temp;
+
+ LIST_FOR_EACH_POP (temp, list_node, temp_result) {
+ ovsdb_monitor_add_json_row(json,
+ temp->table_name,
+ table_json,
+ temp->row_json,
+ temp->row_uuid);
+ free(temp);
+ }
+}
+
+
+static struct monitor_compose_update_pool *monitor_compose_pool = NULL;
+
+static void init_compose_pool(void) {
+
+ int index;
+
+ if (!monitor_compose_pool) {
+ monitor_compose_pool =
+ xmalloc(sizeof (struct monitor_compose_update_pool));
+ monitor_compose_pool->pool =
+ add_worker_pool(monitor_compose_update_thread);
+ monitor_compose_pool->row_helper_func =
+ ovsdb_monitor_compose_update_process_one_row;
+
+ for (index = 0; index < monitor_compose_pool->pool->size; index++) {
+ monitor_compose_pool->pool->controls[index].workload =
+ monitor_compose_pool;
+ }
+ }
+}
+
/* Constructs and returns JSON for a <table-updates> object (as described in
* RFC 7047) for all the outstanding changes within 'monitor', starting from
* 'transaction'. */
@@ -1114,31 +1248,174 @@ ovsdb_monitor_compose_update(
{
struct json *json;
size_t max_columns = ovsdb_monitor_max_columns(dbmon);
- unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
json = NULL;
struct ovsdb_monitor_change_set_for_table *mcst;
LIST_FOR_EACH (mcst, list_in_change_set, &mcs->change_set_for_tables) {
struct ovsdb_monitor_row *row, *next;
struct json *table_json = NULL;
- struct ovsdb_monitor_table *mt = mcst->mt;
- HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
- struct json *row_json;
- row_json = (*row_update)(mt, condition, OVSDB_MONITOR_ROW, row,
- initial, changed, mcst->n_columns);
- if (row_json) {
- ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
- &table_json, row_json,
- &row->uuid);
+ if (hmap_count(&mcst->rows) < PARALLEL_CUT_OFF) {
+ struct monitor_compose_update_info mci;
+ mci.mcst = mcst;
+ mci.condition = condition;
+ mci.initial = initial;
+ mci.changed = xmalloc(bitmap_n_bytes(max_columns));
+ mci.row_update = row_update;
+ ovs_list_init(&mci.results);
+
+ HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mcst->rows) {
+ ovsdb_monitor_compose_update_process_one_row(
+ row, &mci);
+ }
+ if (!ovs_list_is_empty(&mci.results)) {
+ ovsdb_monitor_compose_cond_change_update_generate_json(
+ &json, &table_json, &mci.results);
+ }
+ free(mci.changed);
+ } else {
+ struct monitor_compose_update_info *mci;
+ struct ovs_list *combined_result = NULL;
+ struct ovs_list **results = NULL;
+ int index;
+
+ init_compose_pool();
+
+ mci = xmalloc(sizeof(struct monitor_compose_update_info) *
+ monitor_compose_pool->pool->size);
+ results = xmalloc(sizeof(struct ovs_list *) *
+ monitor_compose_pool->pool->size);
+
+ for (index = 0;
+ index < monitor_compose_pool->pool->size; index++) {
+
+ mci[index].mcst = mcst;
+ mci[index].condition = condition;
+ mci[index].initial = initial;
+ mci[index].changed = xmalloc(bitmap_n_bytes(max_columns));
+ mci[index].row_update = row_update;
+ ovs_list_init(&mci[index].results);
+ results[index] = &mci[index].results;
+ monitor_compose_pool->pool->controls[index].data = &mci[index];
+ }
+
+ run_pool_list(
+ monitor_compose_pool->pool,
+ &combined_result,
+ results);
+ ovsdb_monitor_compose_cond_change_update_generate_json(
+ &json, &table_json, combined_result);
+
+ free(results);
+ for (index = 0;
+ index < monitor_compose_pool->pool->size; index++) {
+ free(mci[index].changed);
}
+ free(mci);
+
}
}
- free(changed);
return json;
}
+
+struct monitor_cond_change_info {
+ struct ovsdb_monitor_table *mt;
+ struct ovsdb_monitor_session_condition *condition;
+ unsigned long int *changed;
+ struct ovs_list results;
+};
+
+struct monitor_cond_change_pool {
+ void (*row_helper_func)(struct ovsdb_row *row,
+ struct monitor_cond_change_info *mi);
+ struct worker_pool *pool;
+};
+
+static void *monitor_cond_change_thread(void *arg) {
+ struct worker_control *control = (struct worker_control *) arg;
+ struct monitor_cond_change_pool *workload;
+ struct monitor_cond_change_info *mi;
+ struct ovsdb_row *row;
+ struct hmap *table_rows;
+ int bnum;
+
+
+ while (!seize_fire()) {
+ sem_wait(&control->fire);
+ if (seize_fire()) {
+ return NULL;
+ }
+ workload = (struct monitor_cond_change_pool *) control->workload;
+ mi = (struct monitor_cond_change_info *) control->data;
+ if (mi && workload) {
+ table_rows = (struct hmap *) &mi->mt->table->rows;
+ for (bnum = control->id;
+ bnum <= mi->mt->table->rows.mask;
+ bnum += workload->pool->size)
+ {
+ HMAP_FOR_EACH_IN_PARALLEL (row, hmap_node, bnum, table_rows) {
+ if (seize_fire()) {
+ return NULL;
+ }
+ (workload->row_helper_func)(row, control->data);
+ }
+ }
+ atomic_store_relaxed(&control->finished, true);
+ atomic_thread_fence(memory_order_release);
+ }
+ sem_post(control->done);
+ }
+ return NULL;
+}
+
+static void ovsdb_monitor_cond_change_process_one_row (
+ struct ovsdb_row *row,
+ struct monitor_cond_change_info *mi)
+{
+ struct json *row_json;
+ struct mon_json_result *temp;
+
+ row_json = ovsdb_monitor_compose_row_update2(
+ mi->mt,
+ mi->condition,
+ OVSDB_ROW,
+ row,
+ false,
+ mi->changed,
+ mi->mt->n_columns);
+ if (row_json) {
+ temp = xmalloc(sizeof(struct mon_json_result));
+ temp->table_name = mi->mt->table->schema->name;
+ temp->row_uuid = ovsdb_row_get_uuid(row);
+ temp->row_json = row_json;
+ ovs_list_push_back(&mi->results, &temp->list_node);
+ }
+}
+
+static struct monitor_cond_change_pool *monitor_cond_pool = NULL;
+
+static void init_cond_pool(void) {
+
+ int index;
+
+ if (!monitor_cond_pool) {
+ monitor_cond_pool =
+ xmalloc(sizeof (struct monitor_cond_change_pool));
+ monitor_cond_pool->pool =
+ add_worker_pool(monitor_cond_change_thread);
+ monitor_cond_pool->row_helper_func =
+ ovsdb_monitor_cond_change_process_one_row;
+
+ for (index = 0; index < monitor_cond_pool->pool->size; index++) {
+ monitor_cond_pool->pool->controls[index].workload =
+ monitor_cond_pool;
+ }
+ }
+}
+
+
static struct json*
ovsdb_monitor_compose_cond_change_update(
struct ovsdb_monitor *dbmon,
@@ -1147,13 +1424,33 @@ ovsdb_monitor_compose_cond_change_update(
struct shash_node *node;
struct json *json = NULL;
size_t max_columns = ovsdb_monitor_max_columns(dbmon);
- unsigned long int *changed = xmalloc(bitmap_n_bytes(max_columns));
+ int index;
+
+ init_cond_pool();
SHASH_FOR_EACH (node, &dbmon->tables) {
struct ovsdb_monitor_table *mt = node->data;
struct ovsdb_row *row;
struct json *table_json = NULL;
struct ovsdb_condition *old_condition, *new_condition;
+ struct monitor_cond_change_info *mi;
+ struct ovs_list *combined_result = NULL;
+ struct ovs_list **results = NULL;
+
+
+ mi = xmalloc(sizeof(struct monitor_cond_change_info) *
+ monitor_cond_pool->pool->size);
+ results = xmalloc(sizeof(struct ovs_list *) *
+ monitor_cond_pool->pool->size);
+
+ for (index = 0; index < monitor_cond_pool->pool->size; index++) {
+ mi[index].changed = xmalloc(bitmap_n_bytes(max_columns));
+ mi[index].condition = condition;
+ mi[index].mt = mt;
+ ovs_list_init(&mi[index].results);
+ results[index] = &mi[index].results;
+ monitor_cond_pool->pool->controls[index].data = &mi[index];
+ }
if (!ovsdb_monitor_get_table_conditions(mt,
condition,
@@ -1163,24 +1460,31 @@ ovsdb_monitor_compose_cond_change_update(
/* Nothing to update on this table */
continue;
}
-
- /* Iterate over all rows in table */
- HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
- struct json *row_json;
-
- row_json = ovsdb_monitor_compose_row_update2(mt, condition,
- OVSDB_ROW, row,
- false, changed,
- mt->n_columns);
- if (row_json) {
- ovsdb_monitor_add_json_row(&json, mt->table->schema->name,
- &table_json, row_json,
- ovsdb_row_get_uuid(row));
+ if (hmap_count(&mt->table->rows) < PARALLEL_CUT_OFF) {
+ /* Iterate over all rows in table - single threaded */
+ HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+ ovsdb_monitor_cond_change_process_one_row(row, &mi[0]);
+ }
+ if (!ovs_list_is_empty(&mi[0].results)) {
+ ovsdb_monitor_compose_cond_change_update_generate_json(
+ &json, &table_json, &mi[0].results);
}
+ } else {
+ run_pool_list(
+ monitor_cond_pool->pool,
+ &combined_result,
+ results);
+ ovsdb_monitor_compose_cond_change_update_generate_json(
+ &json, &table_json, combined_result);
}
ovsdb_monitor_table_condition_updated(mt, condition);
+
+ free(results);
+ for (index = 0; index < monitor_cond_pool->pool->size; index++) {
+ free(mi[index].changed);
+ }
+ free(mi);
}
- free(changed);
return json;
}
diff --git a/ovsdb/ovsdb-idlc.in b/ovsdb/ovsdb-idlc.in
index 698fe25f3..db3ff5a60 100755
--- a/ovsdb/ovsdb-idlc.in
+++ b/ovsdb/ovsdb-idlc.in
@@ -254,6 +254,8 @@ const struct %(s)s *%(s)s_get_for_uuid(const struct ovsdb_idl *, const struct uu
const struct %(s)s *%(s)s_table_get_for_uuid(const struct %(s)s_table *, const struct uuid *);
const struct %(s)s *%(s)s_first(const struct ovsdb_idl *);
const struct %(s)s *%(s)s_next(const struct %(s)s *);
+const struct %(s)s *%(s)s_parallel_first(const struct ovsdb_idl *, ssize_t job_id, ssize_t pool);
+const struct %(s)s *%(s)s_parallel_next(const struct %(s)s *, ssize_t pool);
#define %(S)s_FOR_EACH(ROW, IDL) \\
for ((ROW) = %(s)s_first(IDL); \\
(ROW); \\
@@ -262,6 +264,10 @@ const struct %(s)s *%(s)s_next(const struct %(s)s *);
for ((ROW) = %(s)s_first(IDL); \\
(ROW) ? ((NEXT) = %(s)s_next(ROW), 1) : 0; \\
(ROW) = (NEXT))
+#define %(S)s_PARALLEL_FOR_EACH(ROW, IDL, JOBID, POOL) \\
+ for ((ROW) = %(s)s_parallel_first(IDL, JOBID, POOL); \\
+ (ROW); \\
+ (ROW) = %(s)s_parallel_next(ROW, POOL))
unsigned int %(s)s_get_seqno(const struct ovsdb_idl *);
unsigned int %(s)s_row_get_seqno(const struct %(s)s *row, enum ovsdb_idl_change change);
@@ -692,6 +698,18 @@ const struct %(s)s *
return %(s)s_cast(ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s));
}
+/* Returns a row in table "%(t)s" in 'idl', or a null pointer if that
+ * table is empty.
+ *
+ * Database tables are internally maintained as hash tables, so adding or
+ * removing rows while traversing the same table can cause some rows to be
+ * visited twice or not at apply. */
+const struct %(s)s *
+%(s)s_parallel_first(const struct ovsdb_idl *idl, ssize_t job_id, ssize_t pool)
+{
+ return %(s)s_cast(parallel_ovsdb_idl_first_row(idl, &%(p)stable_%(tl)s, job_id, pool));
+}
+
/* Returns a row following 'row' within its table, or a null pointer if 'row'
* is the last row in its table. */
const struct %(s)s *
@@ -700,6 +718,14 @@ const struct %(s)s *
return %(s)s_cast(ovsdb_idl_next_row(&row->header_));
}
+/* Returns a row following 'row' within its table slice, or a null pointer if 'row'
+ * is the last row in its table slice. */
+const struct %(s)s *
+%(s)s_parallel_next(const struct %(s)s *row, ssize_t pool)
+{
+ return %(s)s_cast(parallel_ovsdb_idl_next_row(&row->header_, pool));
+}
+
unsigned int %(s)s_get_seqno(const struct ovsdb_idl *idl)
{
return ovsdb_idl_table_get_seqno(idl, &%(p)stable_%(tl)s);
--
2.20.1
More information about the dev
mailing list