[ovs-dev] [OVN Patch v15 1/3] ovn-libs: Add support for parallel processing

Anton Ivanov anton.ivanov at cambridgegreys.com
Fri Mar 26 17:12:01 UTC 2021


On 26/03/2021 03:25, Numan Siddique wrote:
> On Thu, Mar 25, 2021 at 3:01 PM Anton Ivanov
> <anton.ivanov at cambridgegreys.com> wrote:
>>
>>
>> On 24/03/2021 15:31, Numan Siddique wrote:
>>> On Mon, Mar 1, 2021 at 6:35 PM <anton.ivanov at cambridgegreys.com> wrote:
>>>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>>>
>>>> This adds a set of functions and macros intended to process
>>>> hashes in parallel.
>>>>
>>>> The principles of operation are documented in the ovn-parallel-hmap.h
>>>>
>>>> If these one day go into the OVS tree, the OVS tree versions
>>>> would be used in preference.
>>>>
>>>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>> Hi Anton,
>>>
>>> I tested the first 2 patches of this series and it crashes again for me.
>>>
>>> This time I ran tests on a 4 core  machine - Intel(R) Xeon(R) CPU
>>> E3-1220 v5 @ 3.00GHz
>>>
>>> The below trace is seen for both gcc and clang.
>>>
>>> ----
>>> [Thread debugging using libthread_db enabled]
>>> Using host libthread_db library "/lib64/libthread_db.so.1".
>>> Core was generated by `ovn-northd -vjsonrpc
>>> --ovnnb-db=unix:/mnt/mydisk/myhome/numan_alt/work/ovs_ovn/'.
>>> Program terminated with signal SIGSEGV, Segmentation fault.
>>> #0  0x00007f27594ae212 in __new_sem_wait_slow.constprop.0 () from
>>> /lib64/libpthread.so.0
>>> [Current thread is 1 (Thread 0x7f2758c68640 (LWP 347378))]
>>> Missing separate debuginfos, use: dnf debuginfo-install
>>> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
>>> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
>>> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
>>> zlib-1.2.11-23.fc33.x86_64
>>> (gdb) bt
>>> #0  0x00007f27594ae212 in __new_sem_wait_slow.constprop.0 () from
>>> /lib64/libpthread.so.0
>>> #1  0x0000000000422184 in wait_for_work (control=<optimized out>) at
>>> ../lib/ovn-parallel-hmap.h:203
>>> #2  build_lflows_thread (arg=0x2538420) at ../northd/ovn-northd.c:11855
>>> #3  0x000000000049cd12 in ovsthread_wrapper (aux_=<optimized out>) at
>>> ../lib/ovs-thread.c:383
>>> #4  0x00007f27594a53f9 in start_thread () from /lib64/libpthread.so.0
>>> #5  0x00007f2759142903 in clone () from /lib64/libc.so.6
>>> -----
>>>
>>> I'm not sure why you're not able to reproduce this issue.
>> I can't. I have run it for days in a loop.
>>
>> One possibility is that for whatever reason your machine has slower IPC speeds compared to linear execution speeds. Thread debugging? AMD vs Intel? No idea.
>>
>> There is a race on-exit in the current code which I have found by inspection and which I have never been able to trigger. On my machines the workers always exit in time before the main thread has finished, so I cannot trigger this.
>>
>> Can you try this incremental fix to see if it fixes the problem for you. If that works, I will incorporate it and reissue the patch. If not - I will continue digging.
>>
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> index e83ae23cb..3597f896f 100644
>> --- a/lib/ovn-parallel-hmap.c
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -143,7 +143,8 @@ struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>>            }
>>
>>            for (i = 0; i < pool_size; i++) {
>> -            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>> +            new_pool->controls[i].worker =
>> +                ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>>            }
>>            ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>        }
>> @@ -386,6 +387,9 @@ static void worker_pool_hook(void *aux OVS_UNUSED) {
>>            for (i = 0; i < pool->size ; i++) {
>>                sem_post(pool->controls[i].fire);
>>            }
>> +        for (i = 0; i < pool->size ; i++) {
>> +            pthread_join(pool->controls[i].worker, NULL);
>> +        }
>>            for (i = 0; i < pool->size ; i++) {
>>                sem_close(pool->controls[i].fire);
>>                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> index 8db61eaba..d62ca3da5 100644
>> --- a/lib/ovn-parallel-hmap.h
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -82,6 +82,7 @@ struct worker_control {
>>        struct ovs_mutex mutex; /* Guards the data. */
>>        void *data; /* Pointer to data to be processed. */
>>        void *workload; /* back-pointer to the worker pool structure. */
>> +    pthread_t worker;
>>    };
>>
>>    struct worker_pool {
>>
> I applied the above diff on top of patch 2  and did some tests.  I see
> a big improvement
> with this.  On my "Intel(R) Xeon(R) CPU E3-1220 v5 @ 3.00GHz"  server,
> I saw just one
> crash only once when I ran the test suite multiple times.
>
> On my work laptop (in which the tests used to hang earlier), all the
> tests are passing now.
> But I see a lot more consistent crashes here.  For all single run of
> whole testsuite (with make check -j5)
> I observed around 7 crashes.  Definitely an improvement when compared
> to my previous runs with v14.
>
> Here are the back traces details of the core dumps I observed -
> https://gist.github.com/numansiddique/5cab90ec4a1ee6e1adbfd3cd90eccf5a
>
> Crash 1 and Crash 2 are frequent.  Let me know in case you want the core files.

The only suspect I can find so far is the bfd code. There is one place where it may end up as a ds operation with a string re-alloc and/or dereferencing pointers being changed in-flight.

I would not expect that to be invoked in all tests though and I still cannot reproduce it here. Do you have the crashes on all tests or in specific tests only?

A.

> Thanks
> Numan
>
>>> All the test cases passed for me. So maybe something's wrong when
>>> ovn-northd exits.
>>> IMHO, these crashes should be addressed before these patches can be considered.
>>>
>>> Thanks
>>> Numan
>>>
>>>> ---
>>>>    lib/automake.mk         |   2 +
>>>>    lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>>>    lib/ovn-parallel-hmap.h | 301 ++++++++++++++++++++++++++
>>>>    3 files changed, 758 insertions(+)
>>>>    create mode 100644 lib/ovn-parallel-hmap.c
>>>>    create mode 100644 lib/ovn-parallel-hmap.h
>>>>
>>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>>> index 250c7aefa..781be2109 100644
>>>> --- a/lib/automake.mk
>>>> +++ b/lib/automake.mk
>>>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>>>           lib/expr.c \
>>>>           lib/extend-table.h \
>>>>           lib/extend-table.c \
>>>> +       lib/ovn-parallel-hmap.h \
>>>> +       lib/ovn-parallel-hmap.c \
>>>>           lib/ip-mcast-index.c \
>>>>           lib/ip-mcast-index.h \
>>>>           lib/mcast-group-index.c \
>>>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>>>> new file mode 100644
>>>> index 000000000..e83ae23cb
>>>> --- /dev/null
>>>> +++ b/lib/ovn-parallel-hmap.c
>>>> @@ -0,0 +1,455 @@
>>>> +/*
>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>>>> + *
>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>> + * you may not use this file except in compliance with the License.
>>>> + * You may obtain a copy of the License at:
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing, software
>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>> + * See the License for the specific language governing permissions and
>>>> + * limitations under the License.
>>>> + */
>>>> +
>>>> +#include <config.h>
>>>> +#include <stdint.h>
>>>> +#include <string.h>
>>>> +#include <stdlib.h>
>>>> +#include <fcntl.h>
>>>> +#include <unistd.h>
>>>> +#include <errno.h>
>>>> +#include <semaphore.h>
>>>> +#include "fatal-signal.h"
>>>> +#include "util.h"
>>>> +#include "openvswitch/vlog.h"
>>>> +#include "openvswitch/hmap.h"
>>>> +#include "openvswitch/thread.h"
>>>> +#include "ovn-parallel-hmap.h"
>>>> +#include "ovs-atomic.h"
>>>> +#include "ovs-thread.h"
>>>> +#include "ovs-numa.h"
>>>> +#include "random.h"
>>>> +
>>>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>>> +
>>>> +#ifndef OVS_HAS_PARALLEL_HMAP
>>>> +
>>>> +#define WORKER_SEM_NAME "%x-%p-%x"
>>>> +#define MAIN_SEM_NAME "%x-%p-main"
>>>> +
>>>> +/* These are accessed under mutex inside add_worker_pool().
>>>> + * They do not need to be atomic.
>>>> + */
>>>> +
>>>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>>>> +static bool can_parallelize = false;
>>>> +
>>>> +/* This is set only in the process of exit and the set is
>>>> + * accompanied by a fence. It does not need to be atomic or be
>>>> + * accessed under a lock.
>>>> + */
>>>> +
>>>> +static bool workers_must_exit = false;
>>>> +
>>>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>>>> +
>>>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>>> +
>>>> +static int pool_size;
>>>> +
>>>> +static int sembase;
>>>> +
>>>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>>>> +static void setup_worker_pools(bool force);
>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>> +                               void *fin_result, void *result_frags,
>>>> +                               int index);
>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>> +                               void *fin_result, void *result_frags,
>>>> +                               int index);
>>>> +
>>>> +bool ovn_stop_parallel_processing(void)
>>>> +{
>>>> +    return workers_must_exit;
>>>> +}
>>>> +
>>>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>>>> +{
>>>> +    bool test = false;
>>>> +
>>>> +    if (atomic_compare_exchange_strong(
>>>> +            &initial_pool_setup,
>>>> +            &test,
>>>> +            true)) {
>>>> +        ovs_mutex_lock(&init_mutex);
>>>> +        setup_worker_pools(force_parallel);
>>>> +        ovs_mutex_unlock(&init_mutex);
>>>> +    }
>>>> +    return can_parallelize;
>>>> +}
>>>> +
>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>>>> +
>>>> +    struct worker_pool *new_pool = NULL;
>>>> +    struct worker_control *new_control;
>>>> +    bool test = false;
>>>> +    int i;
>>>> +    char sem_name[256];
>>>> +
>>>> +
>>>> +    /* Belt and braces - initialize the pool system just in case if
>>>> +     * if it is not yet initialized.
>>>> +     */
>>>> +
>>>> +    if (atomic_compare_exchange_strong(
>>>> +            &initial_pool_setup,
>>>> +            &test,
>>>> +            true)) {
>>>> +        ovs_mutex_lock(&init_mutex);
>>>> +        setup_worker_pools(false);
>>>> +        ovs_mutex_unlock(&init_mutex);
>>>> +    }
>>>> +
>>>> +    ovs_mutex_lock(&init_mutex);
>>>> +    if (can_parallelize) {
>>>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>>>> +        new_pool->size = pool_size;
>>>> +        new_pool->controls = NULL;
>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>> +        if (new_pool->done == SEM_FAILED) {
>>>> +            goto cleanup;
>>>> +        }
>>>> +
>>>> +        new_pool->controls =
>>>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>>>> +
>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>> +            new_control = &new_pool->controls[i];
>>>> +            new_control->id = i;
>>>> +            new_control->done = new_pool->done;
>>>> +            new_control->data = NULL;
>>>> +            ovs_mutex_init(&new_control->mutex);
>>>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>> +            if (new_control->fire == SEM_FAILED) {
>>>> +                goto cleanup;
>>>> +            }
>>>> +        }
>>>> +
>>>> +        for (i = 0; i < pool_size; i++) {
>>>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>>>> +        }
>>>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>>> +    }
>>>> +    ovs_mutex_unlock(&init_mutex);
>>>> +    return new_pool;
>>>> +cleanup:
>>>> +
>>>> +    /* Something went wrong when opening semaphores. In this case
>>>> +     * it is better to shut off parallel procesing altogether
>>>> +     */
>>>> +
>>>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>>>> +    can_parallelize = false;
>>>> +    if (new_pool->controls) {
>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>>>> +                sem_close(new_pool->controls[i].fire);
>>>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>> +                sem_unlink(sem_name);
>>>> +                break; /* semaphores past this one are uninitialized */
>>>> +            }
>>>> +        }
>>>> +    }
>>>> +    if (new_pool->done != SEM_FAILED) {
>>>> +        sem_close(new_pool->done);
>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>> +        sem_unlink(sem_name);
>>>> +    }
>>>> +    ovs_mutex_unlock(&init_mutex);
>>>> +    return NULL;
>>>> +}
>>>> +
>>>> +
>>>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>>>> +void
>>>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>>>> +{
>>>> +    size_t i;
>>>> +
>>>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>>>> +    hmap->one = NULL;
>>>> +    hmap->mask = mask;
>>>> +    hmap->n = 0;
>>>> +    for (i = 0; i <= hmap->mask; i++) {
>>>> +        hmap->buckets[i] = NULL;
>>>> +    }
>>>> +}
>>>> +
>>>> +/* Initializes 'hmap' as an empty hash table of size X.
>>>> + * Intended for use in parallel processing so that all
>>>> + * fragments used to store results in a parallel job
>>>> + * are the same size.
>>>> + */
>>>> +void
>>>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>>>> +{
>>>> +    size_t mask;
>>>> +    mask = size / 2;
>>>> +    mask |= mask >> 1;
>>>> +    mask |= mask >> 2;
>>>> +    mask |= mask >> 4;
>>>> +    mask |= mask >> 8;
>>>> +    mask |= mask >> 16;
>>>> +#if SIZE_MAX > UINT32_MAX
>>>> +    mask |= mask >> 32;
>>>> +#endif
>>>> +
>>>> +    /* If we need to dynamically allocate buckets we might as well allocate at
>>>> +     * least 4 of them. */
>>>> +    mask |= (mask & 1) << 1;
>>>> +
>>>> +    fast_hmap_init(hmap, mask);
>>>> +}
>>>> +
>>>> +/* Run a thread pool which uses a callback function to process results
>>>> + */
>>>> +
>>>> +void ovn_run_pool_callback(struct worker_pool *pool,
>>>> +                           void *fin_result, void *result_frags,
>>>> +                           void (*helper_func)(struct worker_pool *pool,
>>>> +                                               void *fin_result,
>>>> +                                               void *result_frags, int index))
>>>> +{
>>>> +    int index, completed;
>>>> +
>>>> +    /* Ensure that all worker threads see the same data as the
>>>> +     * main thread.
>>>> +     */
>>>> +
>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>> +
>>>> +    /* Start workers */
>>>> +
>>>> +    for (index = 0; index < pool->size; index++) {
>>>> +        sem_post(pool->controls[index].fire);
>>>> +    }
>>>> +
>>>> +    completed = 0;
>>>> +
>>>> +    do {
>>>> +        bool test;
>>>> +        /* Note - we do not loop on semaphore until it reaches
>>>> +         * zero, but on pool size/remaining workers.
>>>> +         * This is by design. If the inner loop can handle
>>>> +         * completion for more than one worker within an iteration
>>>> +         * it will do so to ensure no additional iterations and
>>>> +         * waits once all of them are done.
>>>> +         *
>>>> +         * This may result in us having an initial positive value
>>>> +         * of the semaphore when the pool is invoked the next time.
>>>> +         * This is harmless - the loop will spin up a couple of times
>>>> +         * doing nothing while the workers are processing their data
>>>> +         * slices.
>>>> +         */
>>>> +        wait_for_work_completion(pool);
>>>> +        for (index = 0; index < pool->size; index++) {
>>>> +            test = true;
>>>> +            /* If the worker has marked its data chunk as complete,
>>>> +             * invoke the helper function to combine the results of
>>>> +             * this worker into the main result.
>>>> +             *
>>>> +             * The worker must invoke an appropriate memory fence
>>>> +             * (most likely acq_rel) to ensure that the main thread
>>>> +             * sees all of the results produced by the worker.
>>>> +             */
>>>> +            if (atomic_compare_exchange_weak(
>>>> +                    &pool->controls[index].finished,
>>>> +                    &test,
>>>> +                    false)) {
>>>> +                if (helper_func) {
>>>> +                    (helper_func)(pool, fin_result, result_frags, index);
>>>> +                }
>>>> +                completed++;
>>>> +                pool->controls[index].data = NULL;
>>>> +            }
>>>> +        }
>>>> +    } while (completed < pool->size);
>>>> +}
>>>> +
>>>> +/* Run a thread pool - basic, does not do results processing.
>>>> + */
>>>> +
>>>> +void ovn_run_pool(struct worker_pool *pool)
>>>> +{
>>>> +    run_pool_callback(pool, NULL, NULL, NULL);
>>>> +}
>>>> +
>>>> +/* Brute force merge of a hashmap into another hashmap.
>>>> + * Intended for use in parallel processing. The destination
>>>> + * hashmap MUST be the same size as the one being merged.
>>>> + *
>>>> + * This can be achieved by pre-allocating them to correct size
>>>> + * and using hmap_insert_fast() instead of hmap_insert()
>>>> + */
>>>> +
>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>>>> +{
>>>> +    size_t i;
>>>> +
>>>> +    ovs_assert(inc->mask == dest->mask);
>>>> +
>>>> +    if (!inc->n) {
>>>> +        /* Request to merge an empty frag, nothing to do */
>>>> +        return;
>>>> +    }
>>>> +
>>>> +    for (i = 0; i <= dest->mask; i++) {
>>>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>>>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>>>> +        if (*inc_bucket != NULL) {
>>>> +            struct hmap_node *last_node = *inc_bucket;
>>>> +            while (last_node->next != NULL) {
>>>> +                last_node = last_node->next;
>>>> +            }
>>>> +            last_node->next = *dest_bucket;
>>>> +            *dest_bucket = *inc_bucket;
>>>> +            *inc_bucket = NULL;
>>>> +        }
>>>> +    }
>>>> +    dest->n += inc->n;
>>>> +    inc->n = 0;
>>>> +}
>>>> +
>>>> +/* Run a thread pool which gathers results in an array
>>>> + * of hashes. Merge results.
>>>> + */
>>>> +
>>>> +
>>>> +void ovn_run_pool_hash(
>>>> +        struct worker_pool *pool,
>>>> +        struct hmap *result,
>>>> +        struct hmap *result_frags)
>>>> +{
>>>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>>>> +}
>>>> +
>>>> +/* Run a thread pool which gathers results in an array of lists.
>>>> + * Merge results.
>>>> + */
>>>> +void ovn_run_pool_list(
>>>> +        struct worker_pool *pool,
>>>> +        struct ovs_list *result,
>>>> +        struct ovs_list *result_frags)
>>>> +{
>>>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>>>> +}
>>>> +
>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>>>> +{
>>>> +    int i;
>>>> +    if (hrl->mask != lflows->mask) {
>>>> +        if (hrl->row_locks) {
>>>> +            free(hrl->row_locks);
>>>> +        }
>>>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
>>>> +        hrl->mask = lflows->mask;
>>>> +        for (i = 0; i <= lflows->mask; i++) {
>>>> +            ovs_mutex_init(&hrl->row_locks[i]);
>>>> +        }
>>>> +    }
>>>> +}
>>>> +
>>>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>>>> +    int i;
>>>> +    static struct worker_pool *pool;
>>>> +    char sem_name[256];
>>>> +
>>>> +    workers_must_exit = true;
>>>> +
>>>> +    /* All workers must honour the must_exit flag and check for it regularly.
>>>> +     * We can make it atomic and check it via atomics in workers, but that
>>>> +     * is not really necessary as it is set just once - when the program
>>>> +     * terminates. So we use a fence which is invoked before exiting instead.
>>>> +     */
>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>> +
>>>> +    /* Wake up the workers after the must_exit flag has been set */
>>>> +
>>>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>>>> +        for (i = 0; i < pool->size ; i++) {
>>>> +            sem_post(pool->controls[i].fire);
>>>> +        }
>>>> +        for (i = 0; i < pool->size ; i++) {
>>>> +            sem_close(pool->controls[i].fire);
>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>>>> +            sem_unlink(sem_name);
>>>> +        }
>>>> +        sem_close(pool->done);
>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>>>> +        sem_unlink(sem_name);
>>>> +    }
>>>> +}
>>>> +
>>>> +static void setup_worker_pools(bool force) {
>>>> +    int cores, nodes;
>>>> +
>>>> +    nodes = ovs_numa_get_n_numas();
>>>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>>>> +        nodes = 1;
>>>> +    }
>>>> +    cores = ovs_numa_get_n_cores();
>>>> +
>>>> +    /* If there is no NUMA config, use 4 cores.
>>>> +     * If there is NUMA config use half the cores on
>>>> +     * one node so that the OS does not start pushing
>>>> +     * threads to other nodes.
>>>> +     */
>>>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>>>> +        /* If there is no NUMA we can try the ovs-threads routine.
>>>> +         * It falls back to sysconf and/or affinity mask.
>>>> +         */
>>>> +        cores = count_cpu_cores();
>>>> +        pool_size = cores;
>>>> +    } else {
>>>> +        pool_size = cores / nodes;
>>>> +    }
>>>> +    if ((pool_size < 4) && force) {
>>>> +        pool_size = 4;
>>>> +    }
>>>> +    can_parallelize = (pool_size >= 3);
>>>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>>> +    sembase = random_uint32();
>>>> +}
>>>> +
>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>> +                               void *fin_result, void *result_frags,
>>>> +                               int index)
>>>> +{
>>>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>>>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>>>> +
>>>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>>>> +        ovs_list_splice(result->next,
>>>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>>>> +    }
>>>> +}
>>>> +
>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>> +                               void *fin_result, void *result_frags,
>>>> +                               int index)
>>>> +{
>>>> +    struct hmap *result = (struct hmap *)fin_result;
>>>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>>>> +
>>>> +    fast_hmap_merge(result, &res_frags[index]);
>>>> +    hmap_destroy(&res_frags[index]);
>>>> +}
>>>> +
>>>> +#endif
>>>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>>>> new file mode 100644
>>>> index 000000000..8db61eaba
>>>> --- /dev/null
>>>> +++ b/lib/ovn-parallel-hmap.h
>>>> @@ -0,0 +1,301 @@
>>>> +/*
>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>> + *
>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>> + * you may not use this file except in compliance with the License.
>>>> + * You may obtain a copy of the License at:
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing, software
>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>> + * See the License for the specific language governing permissions and
>>>> + * limitations under the License.
>>>> + */
>>>> +
>>>> +#ifndef OVN_PARALLEL_HMAP
>>>> +#define OVN_PARALLEL_HMAP 1
>>>> +
>>>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>>>> + * we skip over the ovn specific definitions.
>>>> + */
>>>> +
>>>> +#ifdef  __cplusplus
>>>> +extern "C" {
>>>> +#endif
>>>> +
>>>> +#include <stdbool.h>
>>>> +#include <stdlib.h>
>>>> +#include <semaphore.h>
>>>> +#include <errno.h>
>>>> +#include "openvswitch/util.h"
>>>> +#include "openvswitch/hmap.h"
>>>> +#include "openvswitch/thread.h"
>>>> +#include "ovs-atomic.h"
>>>> +
>>>> +/* Process this include only if OVS does not supply parallel definitions
>>>> + */
>>>> +
>>>> +#ifdef OVS_HAS_PARALLEL_HMAP
>>>> +
>>>> +#include "parallel-hmap.h"
>>>> +
>>>> +#else
>>>> +
>>>> +
>>>> +#ifdef __clang__
>>>> +#pragma clang diagnostic push
>>>> +#pragma clang diagnostic ignored "-Wthread-safety"
>>>> +#endif
>>>> +
>>>> +
>>>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>>>> + * of parallel processing.
>>>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
>>>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>>>> + * ThreadID + step * 2, etc. The actual macro accepts
>>>> + * ThreadID + step * i as the JOBID parameter.
>>>> + */
>>>> +
>>>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>>>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
>>>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>>>> +       || ((NODE = NULL), false); \
>>>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>>>> +
>>>> +/* We do not have a SAFE version of the macro, because the hash size is not
>>>> + * atomic and hash removal operations would need to be wrapped with
>>>> + * locks. This will defeat most of the benefits from doing anything in
>>>> + * parallel.
>>>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
>>>> + * each thread should store them in a temporary list result instead, merging
>>>> + * the lists into a combined result at the end */
>>>> +
>>>> +/* Work "Handle" */
>>>> +
>>>> +struct worker_control {
>>>> +    int id; /* Used as a modulo when iterating over a hash. */
>>>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
>>>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
>>>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>>> +    struct ovs_mutex mutex; /* Guards the data. */
>>>> +    void *data; /* Pointer to data to be processed. */
>>>> +    void *workload; /* back-pointer to the worker pool structure. */
>>>> +};
>>>> +
>>>> +struct worker_pool {
>>>> +    int size;   /* Number of threads in the pool. */
>>>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>>> +    struct worker_control *controls; /* "Handles" in this pool. */
>>>> +    sem_t *done; /* Work completion semaphorew. */
>>>> +};
>>>> +
>>>> +/* Add a worker pool for thread function start() which expects a pointer to
>>>> + * a worker_control structure as an argument. */
>>>> +
>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>>>> +
>>>> +/* Setting this to true will make all processing threads exit */
>>>> +
>>>> +bool ovn_stop_parallel_processing(void);
>>>> +
>>>> +/* Build a hmap pre-sized for size elements */
>>>> +
>>>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>>>> +
>>>> +/* Build a hmap with a mask equals to size */
>>>> +
>>>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>>>> +
>>>> +/* Brute-force merge a hmap into hmap.
>>>> + * Dest and inc have to have the same mask. The merge is performed
>>>> + * by extending the element list for bucket N in the dest hmap with the list
>>>> + * from bucket N in inc.
>>>> + */
>>>> +
>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>>>> +
>>>> +/* Run a pool, without any default processing of results.
>>>> + */
>>>> +
>>>> +void ovn_run_pool(struct worker_pool *pool);
>>>> +
>>>> +/* Run a pool, merge results from hash frags into a final hash result.
>>>> + * The hash frags must be pre-sized to the same size.
>>>> + */
>>>> +
>>>> +void ovn_run_pool_hash(struct worker_pool *pool,
>>>> +                       struct hmap *result, struct hmap *result_frags);
>>>> +/* Run a pool, merge results from list frags into a final list result.
>>>> + */
>>>> +
>>>> +void ovn_run_pool_list(struct worker_pool *pool,
>>>> +                       struct ovs_list *result, struct ovs_list *result_frags);
>>>> +
>>>> +/* Run a pool, call a callback function to perform processing of results.
>>>> + */
>>>> +
>>>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>>> +                    void *result_frags,
>>>> +                    void (*helper_func)(struct worker_pool *pool,
>>>> +                        void *fin_result, void *result_frags, int index));
>>>> +
>>>> +
>>>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>>>> + * would land, or a null pointer if that bucket is empty. */
>>>> +
>>>> +static inline struct hmap_node *
>>>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>>>> +{
>>>> +    return hmap->buckets[num];
>>>> +}
>>>> +
>>>> +static inline struct hmap_node *
>>>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
>>>> +{
>>>> +    size_t i;
>>>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>>>> +        struct hmap_node *node = hmap->buckets[i];
>>>> +        if (node) {
>>>> +            return node;
>>>> +        }
>>>> +    }
>>>> +    return NULL;
>>>> +}
>>>> +
>>>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>>>> + * for parallel processing in arbitrary order, or a null pointer if
>>>> + * the slice of 'hmap' for that job_id is empty. */
>>>> +static inline struct hmap_node *
>>>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
>>>> +{
>>>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>>>> +}
>>>> +
>>>> +/* Returns the next node in the slice of 'hmap' following 'node',
>>>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>>>> + * the 'hmap' slice.
>>>> + *
>>>> + */
>>>> +static inline struct hmap_node *
>>>> +parallel_hmap_next(const struct hmap *hmap,
>>>> +                   const struct hmap_node *node, ssize_t pool_size)
>>>> +{
>>>> +    return (node->next
>>>> +            ? node->next
>>>> +            : parallel_hmap_next__(hmap,
>>>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>>>> +}
>>>> +
>>>> +static inline void post_completed_work(struct worker_control *control)
>>>> +{
>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>> +    atomic_store_relaxed(&control->finished, true);
>>>> +    sem_post(control->done);
>>>> +}
>>>> +
>>>> +static inline void wait_for_work(struct worker_control *control)
>>>> +{
>>>> +    int ret;
>>>> +
>>>> +    do {
>>>> +        ret = sem_wait(control->fire);
>>>> +    } while ((ret == -1) && (errno == EINTR));
>>>> +    ovs_assert(ret == 0);
>>>> +}
>>>> +static inline void wait_for_work_completion(struct worker_pool *pool)
>>>> +{
>>>> +    int ret;
>>>> +
>>>> +    do {
>>>> +        ret = sem_wait(pool->done);
>>>> +    } while ((ret == -1) && (errno == EINTR));
>>>> +    ovs_assert(ret == 0);
>>>> +}
>>>> +
>>>> +
>>>> +/* Hash per-row locking support - to be used only in conjunction
>>>> + * with fast hash inserts. Normal hash inserts may resize the hash
>>>> + * rendering the locking invalid.
>>>> + */
>>>> +
>>>> +struct hashrow_locks {
>>>> +    ssize_t mask;
>>>> +    struct ovs_mutex *row_locks;
>>>> +};
>>>> +
>>>> +/* Update an hash row locks structure to match the current hash size */
>>>> +
>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
>>>> +
>>>> +/* Lock a hash row */
>>>> +
>>>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>> +{
>>>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>>>> +}
>>>> +
>>>> +/* Unlock a hash row */
>>>> +
>>>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>> +{
>>>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>>>> +}
>>>> +/* Init the row locks structure */
>>>> +
>>>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>>>> +{
>>>> +    hrl->mask = 0;
>>>> +    hrl->row_locks = NULL;
>>>> +}
>>>> +
>>>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>>>> +
>>>> +/* Use the OVN library functions for stuff which OVS has not defined
>>>> + * If OVS has defined these, they will still compile using the OVN
>>>> + * local names, but will be dropped by the linker in favour of the OVS
>>>> + * supplied functions.
>>>> + */
>>>> +
>>>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>>>> +
>>>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>>>> +
>>>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>>>> +
>>>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>>>> +
>>>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>>>> +
>>>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>>>> +
>>>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>>>> +
>>>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>>>> +
>>>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>>>> +
>>>> +#define run_pool_hash(pool, result, result_frags) \
>>>> +    ovn_run_pool_hash(pool, result, result_frags)
>>>> +
>>>> +#define run_pool_list(pool, result, result_frags) \
>>>> +    ovn_run_pool_list(pool, result, result_frags)
>>>> +
>>>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>>>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>>>> +
>>>> +
>>>> +
>>>> +#ifdef __clang__
>>>> +#pragma clang diagnostic pop
>>>> +#endif
>>>> +
>>>> +#endif
>>>> +
>>>> +#ifdef  __cplusplus
>>>> +}
>>>> +#endif
>>>> +
>>>> +
>>>> +#endif /* lib/fasthmap.h */
>>>> --
>>>> 2.20.1
>>>>
>>>> _______________________________________________
>>>> dev mailing list
>>>> dev at openvswitch.org
>>>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>>>
>> --
>> Anton R. Ivanov
>> Cambridgegreys Limited. Registered in England. Company Number 10273661
>> https://www.cambridgegreys.com/
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>
-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/



More information about the dev mailing list