[ovs-dev] [OVN Patch v3 1/2] Make changes to the parallel processing API to allow pool sizing

anton.ivanov at cambridgegreys.com anton.ivanov at cambridgegreys.com
Mon Sep 6 14:59:46 UTC 2021


From: Anton Ivanov <anton.ivanov at cambridgegreys.com>

1. Make pool size user defineable.
2. Expose pool destruction.
3. Make pools resizeable at runtime.

Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
---
 lib/ovn-parallel-hmap.c | 202 ++++++++++++++++++++++++++++++----------
 lib/ovn-parallel-hmap.h |  23 ++++-
 northd/ovn-northd.c     |  58 +++++-------
 ovs                     |   2 +-
 4 files changed, 194 insertions(+), 91 deletions(-)

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index b8c7ac786..30de457b5 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -51,7 +51,6 @@ static bool can_parallelize = false;
  * accompanied by a fence. It does not need to be atomic or be
  * accessed under a lock.
  */
-static bool workers_must_exit = false;
 
 static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
 
@@ -70,10 +69,20 @@ static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
                                void *fin_result, void *result_frags,
                                int index);
 
+
+static bool init_control(struct worker_control *control, int id,
+                         struct worker_pool *pool);
+
+static void cleanup_control(struct worker_pool *pool, int id);
+
+static void free_controls(struct worker_pool *pool);
+
+static struct worker_control *alloc_controls(int size);
+
 bool
-ovn_stop_parallel_processing(void)
+ovn_stop_parallel_processing(struct worker_pool *pool)
 {
-    return workers_must_exit;
+    return pool->workers_must_exit;
 }
 
 bool
@@ -92,11 +101,67 @@ ovn_can_parallelize_hashes(bool force_parallel)
     return can_parallelize;
 }
 
+
+void
+destroy_pool(struct worker_pool *pool) {
+    char sem_name[256];
+
+    free_controls(pool);
+    sem_close(pool->done);
+    sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
+    sem_unlink(sem_name);
+    free(pool);
+}
+
+bool
+ovn_resize_pool(struct worker_pool *pool, int size)
+{
+    int i;
+
+    ovs_assert(pool != NULL);
+
+    if (!size) {
+        size = pool_size;
+    }
+
+    ovs_mutex_lock(&init_mutex);
+
+    if (can_parallelize) {
+        free_controls(pool);
+        pool->size = size;
+
+        /* Allocate new control structures. */
+
+        pool->controls = alloc_controls(size);
+        pool->workers_must_exit = false;
+
+        for (i = 0; i < pool->size; i++) {
+            if (! init_control(&pool->controls[i], i, pool)) {
+                goto cleanup;
+            }
+        }
+    }
+    ovs_mutex_unlock(&init_mutex);
+    return true;
+cleanup:
+
+    /* Something went wrong when opening semaphores. In this case
+     * it is better to shut off parallel procesing altogether
+     */
+
+    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
+    can_parallelize = false;
+    free_controls(pool);
+
+    ovs_mutex_unlock(&init_mutex);
+    return false;
+}
+
+
 struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *))
+ovn_add_worker_pool(void *(*start)(void *), int size)
 {
     struct worker_pool *new_pool = NULL;
-    struct worker_control *new_control;
     bool test = false;
     int i;
     char sem_name[256];
@@ -113,38 +178,29 @@ ovn_add_worker_pool(void *(*start)(void *))
         ovs_mutex_unlock(&init_mutex);
     }
 
+    if (!size) {
+        size = pool_size;
+    }
+
     ovs_mutex_lock(&init_mutex);
     if (can_parallelize) {
         new_pool = xmalloc(sizeof(struct worker_pool));
-        new_pool->size = pool_size;
-        new_pool->controls = NULL;
+        new_pool->size = size;
+        new_pool->start = start;
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
         if (new_pool->done == SEM_FAILED) {
             goto cleanup;
         }
 
-        new_pool->controls =
-            xmalloc(sizeof(struct worker_control) * new_pool->size);
+        new_pool->controls = alloc_controls(size);
+        new_pool->workers_must_exit = false;
 
         for (i = 0; i < new_pool->size; i++) {
-            new_control = &new_pool->controls[i];
-            new_control->id = i;
-            new_control->done = new_pool->done;
-            new_control->data = NULL;
-            ovs_mutex_init(&new_control->mutex);
-            new_control->finished = ATOMIC_VAR_INIT(false);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
-            if (new_control->fire == SEM_FAILED) {
+            if (!init_control(&new_pool->controls[i], i, new_pool)) {
                 goto cleanup;
             }
         }
-
-        for (i = 0; i < pool_size; i++) {
-            new_pool->controls[i].worker =
-                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
-        }
         ovs_list_push_back(&worker_pools, &new_pool->list_node);
     }
     ovs_mutex_unlock(&init_mutex);
@@ -157,16 +213,7 @@ cleanup:
 
     VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
     can_parallelize = false;
-    if (new_pool->controls) {
-        for (i = 0; i < new_pool->size; i++) {
-            if (new_pool->controls[i].fire != SEM_FAILED) {
-                sem_close(new_pool->controls[i].fire);
-                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
-                sem_unlink(sem_name);
-                break; /* semaphores past this one are uninitialized */
-            }
-        }
-    }
+    free_controls(new_pool);
     if (new_pool->done != SEM_FAILED) {
         sem_close(new_pool->done);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
@@ -176,7 +223,6 @@ cleanup:
     return NULL;
 }
 
-
 /* Initializes 'hmap' as an empty hash table with mask N. */
 void
 ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
@@ -365,14 +411,84 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
     }
 }
 
+static bool
+init_control(struct worker_control *control, int id,
+             struct worker_pool *pool)
+{
+    char sem_name[256];
+    control->id = id;
+    control->done = pool->done;
+    control->data = NULL;
+    ovs_mutex_init(&control->mutex);
+    control->finished = ATOMIC_VAR_INIT(false);
+    sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+    control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
+    control->pool = pool;
+    control->worker = 0;
+    if (control->fire == SEM_FAILED) {
+        return false;
+    }
+    control->worker =
+        ovs_thread_create("worker pool helper", pool->start, control);
+    return true;
+}
+
 static void
-worker_pool_hook(void *aux OVS_UNUSED) {
+cleanup_control(struct worker_pool *pool, int id)
+{
+    char sem_name[256];
+    struct worker_control *control = &pool->controls[id];
+
+    if (control->fire != SEM_FAILED) {
+        sem_close(control->fire);
+        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
+        sem_unlink(sem_name);
+    }
+}
+
+static void
+free_controls(struct worker_pool *pool)
+{
     int i;
+    if (pool->controls) {
+        pool->workers_must_exit = true;
+        for (i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].fire != SEM_FAILED) {
+                sem_post(pool->controls[i].fire);
+            }
+        }
+        for (i = 0; i < pool->size ; i++) {
+            if (pool->controls[i].worker) {
+                pthread_join(pool->controls[i].worker, NULL);
+                pool->controls[i].worker = 0;
+            }
+        }
+        for (i = 0; i < pool->size; i++) {
+                cleanup_control(pool, i);
+            }
+        free(pool->controls);
+        pool->controls = NULL;
+        pool->workers_must_exit = false;
+    }
+}
+
+static struct worker_control *alloc_controls(int size)
+{
+    int i;
+    struct worker_control *controls =
+        xcalloc(sizeof(struct worker_control), size);
+
+    for (i = 0; i < size ; i++) {
+        controls[i].fire = SEM_FAILED;
+    }
+    return controls;
+}
+
+static void
+worker_pool_hook(void *aux OVS_UNUSED) {
     static struct worker_pool *pool;
     char sem_name[256];
 
-    workers_must_exit = true;
-
     /* All workers must honour the must_exit flag and check for it regularly.
      * We can make it atomic and check it via atomics in workers, but that
      * is not really necessary as it is set just once - when the program
@@ -383,17 +499,7 @@ worker_pool_hook(void *aux OVS_UNUSED) {
     /* Wake up the workers after the must_exit flag has been set */
 
     LIST_FOR_EACH (pool, list_node, &worker_pools) {
-        for (i = 0; i < pool->size ; i++) {
-            sem_post(pool->controls[i].fire);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            pthread_join(pool->controls[i].worker, NULL);
-        }
-        for (i = 0; i < pool->size ; i++) {
-            sem_close(pool->controls[i].fire);
-            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
-            sem_unlink(sem_name);
-        }
+        free_controls(pool);
         sem_close(pool->done);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
         sem_unlink(sem_name);
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 2df132ea8..4708f41f2 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -83,6 +83,7 @@ struct worker_control {
     void *data; /* Pointer to data to be processed. */
     void *workload; /* back-pointer to the worker pool structure. */
     pthread_t worker;
+    struct worker_pool *pool;
 };
 
 struct worker_pool {
@@ -90,16 +91,21 @@ struct worker_pool {
     struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
     struct worker_control *controls; /* "Handles" in this pool. */
     sem_t *done; /* Work completion semaphorew. */
+    void *(*start)(void *); /* Work function. */
+    bool workers_must_exit; /* Pool to be destroyed flag. */
 };
 
 /* Add a worker pool for thread function start() which expects a pointer to
- * a worker_control structure as an argument. */
+ * a worker_control structure as an argument.
+ * If size is non-zero, it is used for pool sizing. If size is zero, pool
+ * size uses system defaults.
+ */
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
 
 /* Setting this to true will make all processing threads exit */
 
-bool ovn_stop_parallel_processing(void);
+bool ovn_stop_parallel_processing(struct worker_pool *pool);
 
 /* Build a hmap pre-sized for size elements */
 
@@ -253,6 +259,10 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
 
 bool ovn_can_parallelize_hashes(bool force_parallel);
 
+void ovn_destroy_pool(struct worker_pool *pool);
+
+bool ovn_resize_pool(struct worker_pool *pool, int size);
+
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
@@ -263,9 +273,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
 
 #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
 
-#define stop_parallel_processing() ovn_stop_parallel_processing()
+#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
 
-#define add_worker_pool(start) ovn_add_worker_pool(start)
+#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
 
 #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
 
@@ -286,6 +296,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
 #define run_pool_callback(pool, fin_result, result_frags, helper_func) \
     ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
 
+#define destroy_pool(pool) ovn_destroy_pool(pool)
+
+#define resize_pool(pool, size) ovn_resize_pool(pool, size)
 
 
 #ifdef __clang__
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index ee761cef0..324800c32 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -12828,16 +12828,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
                                       &lsi->actions);
 }
 
-struct lflows_thread_pool {
-    struct worker_pool *pool;
-};
-
-
 static void *
 build_lflows_thread(void *arg)
 {
     struct worker_control *control = (struct worker_control *) arg;
-    struct lflows_thread_pool *workload;
     struct lswitch_flow_build_info *lsi;
 
     struct ovn_datapath *od;
@@ -12846,21 +12840,21 @@ build_lflows_thread(void *arg)
     struct ovn_igmp_group *igmp_group;
     int bnum;
 
-    while (!stop_parallel_processing()) {
+
+    while (!stop_parallel_processing(control->pool)) {
         wait_for_work(control);
-        workload = (struct lflows_thread_pool *) control->workload;
         lsi = (struct lswitch_flow_build_info *) control->data;
-        if (stop_parallel_processing()) {
+        if (stop_parallel_processing(control->pool)) {
             return NULL;
         }
-        if (lsi && workload) {
+        if (lsi) {
             /* Iterate over bucket ThreadID, ThreadID+size, ... */
             for (bnum = control->id;
                     bnum <= lsi->datapaths->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_and_lrouter_iterate_by_od(od, lsi);
@@ -12868,10 +12862,10 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->ports->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_and_lrouter_iterate_by_op(op, lsi);
@@ -12879,10 +12873,10 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->lbs->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
@@ -12900,11 +12894,11 @@ build_lflows_thread(void *arg)
             }
             for (bnum = control->id;
                     bnum <= lsi->igmp_groups->mask;
-                    bnum += workload->pool->size)
+                    bnum += control->pool->size)
             {
                 HMAP_FOR_EACH_IN_PARALLEL (
                         igmp_group, hmap_node, bnum, lsi->igmp_groups) {
-                    if (stop_parallel_processing()) {
+                    if (stop_parallel_processing(control->pool)) {
                         return NULL;
                     }
                     build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
@@ -12919,24 +12913,14 @@ build_lflows_thread(void *arg)
 }
 
 static bool pool_init_done = false;
-static struct lflows_thread_pool *build_lflows_pool = NULL;
+static struct worker_pool *build_lflows_pool = NULL;
 
 static void
 init_lflows_thread_pool(void)
 {
-    int index;
-
     if (!pool_init_done) {
-        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
         pool_init_done = true;
-        if (pool) {
-            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
-            build_lflows_pool->pool = pool;
-            for (index = 0; index < build_lflows_pool->pool->size; index++) {
-                build_lflows_pool->pool->controls[index].workload =
-                    build_lflows_pool;
-            }
-        }
     }
 }
 
@@ -12979,16 +12963,16 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
         struct lswitch_flow_build_info *lsiv;
         int index;
 
-        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
+        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
         if (use_logical_dp_groups) {
             lflow_segs = NULL;
         } else {
-            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
+            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
         }
 
         /* Set up "work chunks" for each thread to work on. */
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             if (use_logical_dp_groups) {
                 /* if dp_groups are in use we lock a shared lflows hash
                  * on a per-bucket level instead of merging hash frags */
@@ -13010,17 +12994,17 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
             ds_init(&lsiv[index].match);
             ds_init(&lsiv[index].actions);
 
-            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+            build_lflows_pool->controls[index].data = &lsiv[index];
         }
 
         /* Run thread pool. */
         if (use_logical_dp_groups) {
-            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
+            run_pool_callback(build_lflows_pool, NULL, NULL, noop_callback);
         } else {
-            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
         }
 
-        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+        for (index = 0; index < build_lflows_pool->size; index++) {
             ds_destroy(&lsiv[index].match);
             ds_destroy(&lsiv[index].actions);
         }
diff --git a/ovs b/ovs
index 748010ff3..50e5523b9 160000
--- a/ovs
+++ b/ovs
@@ -1 +1 @@
-Subproject commit 748010ff304b7cd2c43f4eb98a554433f0df07f9
+Subproject commit 50e5523b9b2b154e5fafc5acdcdec85e9cc5a330
-- 
2.20.1



More information about the dev mailing list