[ovs-dev] [OVN Patch] Make changes to the parallel processing API to allow pool sizing

Numan Siddique numans at ovn.org
Thu Sep 2 18:30:12 UTC 2021


On Mon, Aug 16, 2021 at 1:31 PM <anton.ivanov at cambridgegreys.com> wrote:
>
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> 1. Make pool size user defineable.
> 2. Expose pool destruction.
> 3. Make pools resizeable at runtime.
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>

Hi Anton,

Thanks for the patch.  If I understand correctly this patch adds the
ability to override the pool
size.  What is missing is the usage of the API from ovn-northd.  Is
there a plan to have a follow up
patch which makes use of this ?

I'd expect a config option for the user to specify the pool size and
ovn-northd calling add_worker_pool()
with the configured pool size.  Right now I see it's called with 0.

Also in order for us to be more confident and adopt parallel
processing we need to enable
parallel runs in CI.

For this to happen, there should be a config option in NB_Global to
force parallel processing.
The parallel processing library function - setup_worker_pools() takes
a 'boo' param to force
enable the parallel processing,  but it is never used by ovn-northd.c

So I'd suggest adding this support so that we can test the parallel
processing patches in CI.

One comment below.


> ---
>  lib/ovn-parallel-hmap.c | 202 ++++++++++++++++++++++++++++++----------
>  lib/ovn-parallel-hmap.h |  23 ++++-
>  northd/ovn-northd.c     |  58 +++++-------
>  ovs                     |   2 +-
>  4 files changed, 194 insertions(+), 91 deletions(-)
>
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index b8c7ac786..6c5199fb3 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -51,7 +51,6 @@ static bool can_parallelize = false;
>   * accompanied by a fence. It does not need to be atomic or be
>   * accessed under a lock.
>   */
> -static bool workers_must_exit = false;
>
>  static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>
> @@ -70,10 +69,20 @@ static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>                                 void *fin_result, void *result_frags,
>                                 int index);
>
> +
> +static bool init_control(struct worker_control *control, int id,
> +                         struct worker_pool *pool);
> +
> +static void cleanup_control(struct worker_pool *pool, int id);
> +
> +static void free_controls(struct worker_pool *pool);
> +
> +static struct worker_control *alloc_controls(int size);
> +
>  bool
> -ovn_stop_parallel_processing(void)
> +ovn_stop_parallel_processing(struct worker_pool *pool)
>  {
> -    return workers_must_exit;
> +    return pool->workers_must_exit;
>  }
>
>  bool
> @@ -92,11 +101,67 @@ ovn_can_parallelize_hashes(bool force_parallel)
>      return can_parallelize;
>  }
>
> +
> +void
> +destroy_pool(struct worker_pool *pool) {
> +    char sem_name[256];
> +
> +    free_controls(pool);
> +    sem_close(pool->done);
> +    sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +    sem_unlink(sem_name);
> +    free(pool);
> +}
> +
> +bool
> +ovn_resize_pool(struct worker_pool *pool, int size)
> +{
> +    int i;
> +
> +    ovs_assert(pool != NULL);
> +
> +    if (!size) {
> +        size = pool_size;
> +    }
> +
> +    ovs_mutex_lock(&init_mutex);
> +
> +    if (can_parallelize) {
> +        free_controls(pool);
> +        pool->size = size;
> +
> +        /* Allocate new control structures. */
> +
> +        pool->controls = alloc_controls(size);
> +        pool->workers_must_exit = false;
> +
> +        for (i = 0; i < pool->size; i++) {
> +            if (! init_control(&pool->controls[i], i, pool)) {
> +                goto cleanup;
> +            }
> +        }
> +    }
> +    ovs_mutex_unlock(&init_mutex);
> +    return true;
> +cleanup:
> +
> +    /* Something went wrong when opening semaphores. In this case
> +     * it is better to shut off parallel procesing altogether
> +     */
> +
> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> +    can_parallelize = false;
> +    free_controls(pool);
> +
> +    ovs_mutex_unlock(&init_mutex);
> +    return false;
> +}
> +
> +
>  struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *))
> +ovn_add_worker_pool(void *(*start)(void *), int size)
>  {
>      struct worker_pool *new_pool = NULL;
> -    struct worker_control *new_control;
>      bool test = false;
>      int i;
>      char sem_name[256];
> @@ -113,38 +178,29 @@ ovn_add_worker_pool(void *(*start)(void *))
>          ovs_mutex_unlock(&init_mutex);
>      }
>
> +    if (!size) {
> +        size = pool_size;
> +    }
> +
>      ovs_mutex_lock(&init_mutex);
>      if (can_parallelize) {
>          new_pool = xmalloc(sizeof(struct worker_pool));
> -        new_pool->size = pool_size;
> -        new_pool->controls = NULL;
> +        new_pool->size = size;
> +        new_pool->start = start;
>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>          new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>          if (new_pool->done == SEM_FAILED) {
>              goto cleanup;
>          }
>
> -        new_pool->controls =
> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
> +        new_pool->controls = alloc_controls(size);
> +        new_pool->workers_must_exit = false;
>
>          for (i = 0; i < new_pool->size; i++) {
> -            new_control = &new_pool->controls[i];
> -            new_control->id = i;
> -            new_control->done = new_pool->done;
> -            new_control->data = NULL;
> -            ovs_mutex_init(&new_control->mutex);
> -            new_control->finished = ATOMIC_VAR_INIT(false);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -            if (new_control->fire == SEM_FAILED) {
> +            if (!init_control(&new_pool->controls[i], i, new_pool)) {
>                  goto cleanup;
>              }
>          }
> -
> -        for (i = 0; i < pool_size; i++) {
> -            new_pool->controls[i].worker =
> -                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> -        }
>          ovs_list_push_back(&worker_pools, &new_pool->list_node);
>      }
>      ovs_mutex_unlock(&init_mutex);
> @@ -157,16 +213,7 @@ cleanup:
>
>      VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>      can_parallelize = false;
> -    if (new_pool->controls) {
> -        for (i = 0; i < new_pool->size; i++) {
> -            if (new_pool->controls[i].fire != SEM_FAILED) {
> -                sem_close(new_pool->controls[i].fire);
> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -                sem_unlink(sem_name);
> -                break; /* semaphores past this one are uninitialized */
> -            }
> -        }
> -    }
> +    free_controls(new_pool);
>      if (new_pool->done != SEM_FAILED) {
>          sem_close(new_pool->done);
>          sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> @@ -176,7 +223,6 @@ cleanup:
>      return NULL;
>  }
>
> -
>  /* Initializes 'hmap' as an empty hash table with mask N. */
>  void
>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> @@ -365,14 +411,84 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>      }
>  }
>
> +static bool
> +init_control(struct worker_control *control, int id,
> +             struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    control->id = id;
> +    control->done = pool->done;
> +    control->data = NULL;
> +    ovs_mutex_init(&control->mutex);
> +    control->finished = ATOMIC_VAR_INIT(false);
> +    sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
> +    control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +    control->pool = pool;
> +    control->worker = 0;
> +    if (control->fire == SEM_FAILED) {
> +        return false;
> +    }
> +    control->worker =
> +        ovs_thread_create("worker pool helper", pool->start, control);
> +    return true;
> +}
> +
>  static void
> -worker_pool_hook(void *aux OVS_UNUSED) {
> +cleanup_control(struct worker_pool *pool, int id)
> +{
> +    char sem_name[256];
> +    struct worker_control *control = &pool->controls[id];
> +
> +    if (control->fire != SEM_FAILED) {
> +        sem_close(control->fire);
> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, id);
> +        sem_unlink(sem_name);
> +    }
> +}
> +
> +static void
> +free_controls(struct worker_pool *pool)
> +{
>      int i;
> +    if (!pool->controls) {

Do you mean - if(pool->controls)   ?  Otherwise if pool->controls is
NULL, the code will crash
and if pool->controls is not NULL, there will be memory leak which I
observed in my run

********************
=================================================================
==396==ERROR: LeakSanitizer: detected memory leaks

Direct leak of 5280 byte(s) in 44 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x42bc1a in build_lswitch_output_port_sec_op ../northd/ovn-northd.c:5276
    #6 0x45d514 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12806
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 4800 byte(s) in 40 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x428d13 in build_port_security_ip ../northd/ovn-northd.c:4819
    #6 0x42bd6b in build_lswitch_output_port_sec_op ../northd/ovn-northd.c:5288
    #7 0x45d514 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12806
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 4560 byte(s) in 38 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x428d13 in build_port_security_ip ../northd/ovn-northd.c:4819
    #6 0x42b38a in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5184
    #7 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3960 byte(s) in 33 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x42825a in build_port_security_nd ../northd/ovn-northd.c:4723
    #6 0x42b3cd in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5185
    #7 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3960 byte(s) in 33 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x42934f in build_port_security_ip ../northd/ovn-northd.c:4857
    #6 0x42bd6b in build_lswitch_output_port_sec_op ../northd/ovn-northd.c:5288
    #7 0x45d514 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12806
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3720 byte(s) in 31 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x427e9f in build_port_security_nd ../northd/ovn-northd.c:4702
    #6 0x42b3cd in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5185
    #7 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3600 byte(s) in 30 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x428760 in build_port_security_ip ../northd/ovn-northd.c:4777
    #6 0x42b38a in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5184
    #7 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3480 byte(s) in 29 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x42934f in build_port_security_ip ../northd/ovn-northd.c:4857
    #6 0x42b38a in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5184
    #7 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3360 byte(s) in 28 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x450d7a in build_arp_resolve_flows_for_lrouter_port
../northd/ovn-northd.c:10954
    #6 0x45d705 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12817
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 3120 byte(s) in 26 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x42b309 in build_lswitch_input_port_sec_op ../northd/ovn-northd.c:5179
    #6 0x45d273 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12790
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 2160 byte(s) in 18 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x43c3d7 in build_lswitch_ip_unicast_lookup ../northd/ovn-northd.c:7929
    #6 0x45d4ca in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12804
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 1800 byte(s) in 15 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x452197 in build_arp_resolve_flows_for_lrouter_port
../northd/ovn-northd.c:11127
    #6 0x45d705 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12817
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 720 byte(s) in 6 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x452333 in build_arp_resolve_flows_for_lrouter_port
../northd/ovn-northd.c:11143
    #6 0x45d705 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12817
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4485a3 in build_lrouter_defrag_flows_for_lb ../northd/ovn-northd.c:9482
    #5 0x45de4f in build_lflows_thread ../northd/ovn-northd.c:12885
    #6 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x456bff in build_lrouter_ipv4_ip_input ../northd/ovn-northd.c:11982
    #6 0x45d887 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12825
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x439cfb in build_lswitch_arp_nd_responder_known_ips
../northd/ovn-northd.c:7486
    #6 0x45d3ab in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12796
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x4456ca in add_route ../northd/ovn-northd.c:9066
    #6 0x44e898 in build_ip_routing_flows_for_lrouter_port
../northd/ovn-northd.c:10580
    #7 0x45d611 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12814
    #8 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #9 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x4355c1 in build_lswitch_rport_arp_req_flow_for_reachable_ip
../northd/ovn-northd.c:6863
    #6 0x4364f3 in build_lswitch_rport_arp_req_flows ../northd/ovn-northd.c:7002
    #7 0x43c018 in build_lswitch_ip_unicast_lookup ../northd/ovn-northd.c:7912
    #8 0x45d4ca in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12804
    #9 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #10 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x435519 in build_lswitch_rport_arp_req_flow_for_reachable_ip
../northd/ovn-northd.c:6858
    #6 0x4364f3 in build_lswitch_rport_arp_req_flows ../northd/ovn-northd.c:7002
    #7 0x43c018 in build_lswitch_ip_unicast_lookup ../northd/ovn-northd.c:7912
    #8 0x45d4ca in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12804
    #9 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #10 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x44cecd in build_adm_ctrl_flows_for_lrouter_port
../northd/ovn-northd.c:10240
    #6 0x45d55e in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12810
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Direct leak of 120 byte(s) in 1 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172
    #3 0x426909 in ovn_lflow_add_at_with_hash ../northd/ovn-northd.c:4419
    #4 0x4269fd in ovn_lflow_add_at ../northd/ovn-northd.c:4444
    #5 0x455a87 in build_lrouter_ipv4_ip_input ../northd/ovn-northd.c:11846
    #6 0x45d887 in build_lswitch_and_lrouter_iterate_by_op
../northd/ovn-northd.c:12825
    #7 0x45dc0f in build_lflows_thread ../northd/ovn-northd.c:12871
    #8 0x5b1fd4 in ovsthread_wrapper ../lib/ovs-thread.c:383

Indirect leak of 36133 byte(s) in 1062 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d2b22 in xmalloc__ ../lib/util.c:137
    #2 0x5d2b22 in xmalloc ../lib/util.c:172
    #3 0x5d2b22 in xmemdup0 ../lib/util.c:193
    #4 0x5d2b22 in xstrdup ../lib/util.c:202

Indirect leak of 12464 byte(s) in 499 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d270d in xmalloc__ ../lib/util.c:137
    #2 0x5d270d in xmalloc ../lib/util.c:172

Indirect leak of 3411 byte(s) in 379 object(s) allocated from:
    #0 0x7fd2403dd93f in __interceptor_malloc (/lib64/libasan.so.6+0xae93f)
    #1 0x5d282d in xmalloc__ ../lib/util.c:137
    #2 0x5d282d in xmalloc ../lib/util.c:172
    #3 0x5d282d in xvasprintf ../lib/util.c:227

SUMMARY: AddressSanitizer: 97488 byte(s) leaked in 2319 allocation(s).

--------------------

Thanks
Numan


> +        pool->workers_must_exit = true;
> +        for (i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_post(pool->controls[i].fire);
> +            }
> +        }
> +        for (i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].worker) {
> +                pthread_join(pool->controls[i].worker, NULL);
> +                pool->controls[i].worker = 0;
> +            }
> +        }
> +        for (i = 0; i < pool->size; i++) {
> +                cleanup_control(pool, i);
> +            }
> +        free(pool->controls);
> +        pool->controls = NULL;
> +        pool->workers_must_exit = false;
> +    }
> +}
> +
> +static struct worker_control *alloc_controls(int size)
> +{
> +    int i;
> +    struct worker_control *controls =
> +        xcalloc(sizeof(struct worker_control), size);
> +
> +    for (i = 0; i < size ; i++) {
> +        controls[i].fire = SEM_FAILED;
> +    }
> +    return controls;
> +}
> +
> +static void
> +worker_pool_hook(void *aux OVS_UNUSED) {
>      static struct worker_pool *pool;
>      char sem_name[256];
>
> -    workers_must_exit = true;
> -
>      /* All workers must honour the must_exit flag and check for it regularly.
>       * We can make it atomic and check it via atomics in workers, but that
>       * is not really necessary as it is set just once - when the program
> @@ -383,17 +499,7 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>      /* Wake up the workers after the must_exit flag has been set */
>
>      LIST_FOR_EACH (pool, list_node, &worker_pools) {
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_post(pool->controls[i].fire);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            pthread_join(pool->controls[i].worker, NULL);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_close(pool->controls[i].fire);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> -            sem_unlink(sem_name);
> -        }
> +        free_controls(pool);
>          sem_close(pool->done);
>          sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>          sem_unlink(sem_name);
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 2df132ea8..4708f41f2 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -83,6 +83,7 @@ struct worker_control {
>      void *data; /* Pointer to data to be processed. */
>      void *workload; /* back-pointer to the worker pool structure. */
>      pthread_t worker;
> +    struct worker_pool *pool;
>  };
>
>  struct worker_pool {
> @@ -90,16 +91,21 @@ struct worker_pool {
>      struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>      struct worker_control *controls; /* "Handles" in this pool. */
>      sem_t *done; /* Work completion semaphorew. */
> +    void *(*start)(void *); /* Work function. */
> +    bool workers_must_exit; /* Pool to be destroyed flag. */
>  };
>
>  /* Add a worker pool for thread function start() which expects a pointer to
> - * a worker_control structure as an argument. */
> + * a worker_control structure as an argument.
> + * If size is non-zero, it is used for pool sizing. If size is zero, pool
> + * size uses system defaults.
> + */
>
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
>
>  /* Setting this to true will make all processing threads exit */
>
> -bool ovn_stop_parallel_processing(void);
> +bool ovn_stop_parallel_processing(struct worker_pool *pool);
>
>  /* Build a hmap pre-sized for size elements */
>
> @@ -253,6 +259,10 @@ static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>
>  bool ovn_can_parallelize_hashes(bool force_parallel);
>
> +void ovn_destroy_pool(struct worker_pool *pool);
> +
> +bool ovn_resize_pool(struct worker_pool *pool, int size);
> +
>  /* Use the OVN library functions for stuff which OVS has not defined
>   * If OVS has defined these, they will still compile using the OVN
>   * local names, but will be dropped by the linker in favour of the OVS
> @@ -263,9 +273,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
>
>  #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>
> -#define stop_parallel_processing() ovn_stop_parallel_processing()
> +#define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
>
> -#define add_worker_pool(start) ovn_add_worker_pool(start)
> +#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
>
>  #define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>
> @@ -286,6 +296,9 @@ bool ovn_can_parallelize_hashes(bool force_parallel);
>  #define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>      ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>
> +#define destroy_pool(pool) ovn_destroy_pool(pool)
> +
> +#define resize_pool(pool, size) ovn_resize_pool(pool, size)
>
>
>  #ifdef __clang__
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index ebe12cace..94c720866 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -12537,16 +12537,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>                                        &lsi->actions);
>  }
>
> -struct lflows_thread_pool {
> -    struct worker_pool *pool;
> -};
> -
> -
>  static void *
>  build_lflows_thread(void *arg)
>  {
>      struct worker_control *control = (struct worker_control *) arg;
> -    struct lflows_thread_pool *workload;
>      struct lswitch_flow_build_info *lsi;
>
>      struct ovn_datapath *od;
> @@ -12555,21 +12549,21 @@ build_lflows_thread(void *arg)
>      struct ovn_igmp_group *igmp_group;
>      int bnum;
>
> -    while (!stop_parallel_processing()) {
> +
> +    while (!stop_parallel_processing(control->pool)) {
>          wait_for_work(control);
> -        workload = (struct lflows_thread_pool *) control->workload;
>          lsi = (struct lswitch_flow_build_info *) control->data;
> -        if (stop_parallel_processing()) {
> +        if (stop_parallel_processing(control->pool)) {
>              return NULL;
>          }
> -        if (lsi && workload) {
> +        if (lsi) {
>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>              for (bnum = control->id;
>                      bnum <= lsi->datapaths->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
> -                    if (stop_parallel_processing()) {
> +                    if (stop_parallel_processing(control->pool)) {
>                          return NULL;
>                      }
>                      build_lswitch_and_lrouter_iterate_by_od(od, lsi);
> @@ -12577,10 +12571,10 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->ports->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
> -                    if (stop_parallel_processing()) {
> +                    if (stop_parallel_processing(control->pool)) {
>                          return NULL;
>                      }
>                      build_lswitch_and_lrouter_iterate_by_op(op, lsi);
> @@ -12588,10 +12582,10 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->lbs->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
> -                    if (stop_parallel_processing()) {
> +                    if (stop_parallel_processing(control->pool)) {
>                          return NULL;
>                      }
>                      build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
> @@ -12609,11 +12603,11 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->igmp_groups->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (
>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> -                    if (stop_parallel_processing()) {
> +                    if (stop_parallel_processing(control->pool)) {
>                          return NULL;
>                      }
>                      build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
> @@ -12628,24 +12622,14 @@ build_lflows_thread(void *arg)
>  }
>
>  static bool pool_init_done = false;
> -static struct lflows_thread_pool *build_lflows_pool = NULL;
> +static struct worker_pool *build_lflows_pool = NULL;
>
>  static void
>  init_lflows_thread_pool(void)
>  {
> -    int index;
> -
>      if (!pool_init_done) {
> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> +        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
>          pool_init_done = true;
> -        if (pool) {
> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> -            build_lflows_pool->pool = pool;
> -            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> -                build_lflows_pool->pool->controls[index].workload =
> -                    build_lflows_pool;
> -            }
> -        }
>      }
>  }
>
> @@ -12688,16 +12672,16 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>          struct lswitch_flow_build_info *lsiv;
>          int index;
>
> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>          if (use_logical_dp_groups) {
>              lflow_segs = NULL;
>          } else {
> -            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->size);
>          }
>
>          /* Set up "work chunks" for each thread to work on. */
>
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              if (use_logical_dp_groups) {
>                  /* if dp_groups are in use we lock a shared lflows hash
>                   * on a per-bucket level instead of merging hash frags */
> @@ -12719,17 +12703,17 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>              ds_init(&lsiv[index].match);
>              ds_init(&lsiv[index].actions);
>
> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +            build_lflows_pool->controls[index].data = &lsiv[index];
>          }
>
>          /* Run thread pool. */
>          if (use_logical_dp_groups) {
> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
> +            run_pool_callback(build_lflows_pool, NULL, NULL, noop_callback);
>          } else {
> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>          }
>
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              ds_destroy(&lsiv[index].match);
>              ds_destroy(&lsiv[index].actions);
>          }
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>


More information about the dev mailing list