[ovs-dev] [OVN Patch v14 1/3] ovn-libs: Add support for parallel processing

Anton Ivanov anton.ivanov at cambridgegreys.com
Thu Feb 25 14:33:01 UTC 2021


On 25/02/2021 14:29, Numan Siddique wrote:
> On Thu, Feb 25, 2021 at 7:33 PM Anton Ivanov
> <anton.ivanov at cambridgegreys.com> wrote:
>> Found the most likely culprit.
>>
>> This is similar to this: https://bugzilla.redhat.com/show_bug.cgi?id=663584
>> this: https://bugzilla.redhat.com/show_bug.cgi?id=1554955
>> and god knows how many others.
>>
>> Selinux is "securing" your semaphores.
> I disabled selinux (permissive) and I still see the same behavior

I have set-up a couple of VMs to run it with different CPU numbers/settings and it is always passing, no cores.

A.

>
>
>> A.
>>
>> On 25/02/2021 13:27, Anton Ivanov wrote:
>>> On 25/02/2021 12:41, Numan Siddique wrote:
>>>> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov
>>>> <anton.ivanov at cambridgegreys.com> wrote:
>>>>> This adds a set of functions and macros intended to process
>>>>> hashes in parallel.
>>>>>
>>>>> The principles of operation are documented in the fasthmap.h
>>>>>
>>>>> If these one day go into the OVS tree, the OVS tree versions
>>>>> would be used in preference.
>>>>>
>>>>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>>> Hi Anton,
>>>>
>>>> I still see problems with this patch set.
>>>>
>>>> If I apply the first 2 patches and run the tests, most of the test
>>>> cases fail or hang.
>>>>
>>>> When configured with gcc and address sanitizer,  the test case fails -
>>>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>>>> localnet ports with same tags (ovn.at:2970): FAILED
>>>> (ovs-macros.at:219) and I see the below in the testsuite.log
>>>>
>>>> *********
>>>> clean up OVN
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded after 1 seconds
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded quickly
>>>>
>>>> main: clean up vswitch
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit --cleanup
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded quickly
>>>> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid
>>>> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit
>>>> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null...
>>>> ovn.at:3091: wait succeeded immediately
>>>> Address Sanitizer reported errors in: asan.2986645
>>>> =================================================================
>>>> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address
>>>> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80
>>>> T2)
>>>> ==2986645==The signal is caused by a READ memory access.
>>>> ../../tests/ovs-macros.at:219: hard failure
>>>> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple
>>>> localnet ports with same tags (ovn.at:2970): FAILED
>>>> (ovs-macros.at:219)
>>>>
>>>> ******
>>>>
>>>> The same is observed for many test cases.
>>>>
>>>> When I configure clang and run the tests, all the tests pass, but I
>>>> see lot of coredumps.
>>>> I think I had reported this earlier too.
>>> I have absolutely no idea how you get this. Seriously.
>>>
>>> I have been unable to reproduce this
>>>
>>> The only thing that comes to mind is to "fuzz" the semaphore creation and introduce random failures there to see if the error logic works. I can't see any logical problems in it via code inspection.
>>>
>>>> Here is the backtrace
>>>> -----
>>>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>>>> /lib64/libpthread.so.0
>>>> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))]
>>>> Missing separate debuginfos, use: dnf debuginfo-install
>>>> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64
>>>> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64
>>>> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64
>>>> zlib-1.2.11-23.fc33.x86_64
>>>> (gdb) bt
>>>> #0  0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from
>>>> /lib64/libpthread.so.0
>>>> #1  0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at
>>>> ../lib/ovn-parallel-hmap.h:193
>>>> #2  build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807
>>>> #3  0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at
>>>> ../lib/ovs-thread.c:383
>>>> #4  0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0
>>>> #5  0x00001528ad00e903 in clone () from /lib64/libc.so.6
>>>> ----
>>>>
>>>>
>>>> I'm sorry but something is not right. Instead of using semaphores, why
>>>> can't we use
>>>> 'struct latch' for each worker and use it to synchronize between the main thread
>>>> and the workers ?
>>> We can't because this will conflict with the main poll loop.
>>>
>>> You need to rewrite the entire northd processing logic and the IO logic to use latch here. Or establish parallel logic - part of the joy of using "thead once" to do poll magic.
>>>
>>> None of these are a realistic option. I'd rather try to understand exactly what happens on your setup and what makes a sem_open() fail and why that is not handled properly by the error checking code.
>>>
>>> Can you send the results of "sysctl kernel.sem" please? That's the only way I know to limit semaphores and the usual limits on Linux are in the 1G range.
> Here's the output
>
> sysctl kernel.sem
> kernel.sem = 32000 1024000000 500 32000
>
> I ran the same tests on the rhel 8 system, I don't see any crashes.
>
>
> Thanks
> Numan
>>>> The usage of the function - can_parallelize_hashes() in ovn-northd.c is very
>>>> confusing to me.
>>>>
>>>> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only once
>>>> and for the subsequent calls, this function will be a no-op (due to
>>>> atomic_compare_exchange_strong()).
>>>>
>>>> Since this function is called before the workers are started, what is
>>>> the need to use
>>>> atomic_compare_exchange_strong() ?
>>> We have ended there over time. There was a point where there was no option and no initial invocation wasn't and we were invoking the set-up on first processing.
>>>
>>> In fact that is how it is invoked in the OVS codebase port for the parallelized monitors. I would actually like to keep this as an option.
>>>
>>>> Let's say we want to add another config option -
>>>> force_northd_parallel, and if the user
>>>> toggles the value between the runs, then the below code in
>>>> ovnnb_db_run() in patch 3
>>>>
>>>> ---
>>>> use_parallel_build = smap_get_bool(&nb->options,
>>>> "use_parallel_build", false) &&
>>>> ovn_can_parallelize_hashes(false);
>>>> --
>>>>    will have no effect since setup_worker_pools() is not called later
>>>> once the atomic
>>>> bool initial_pool_setup is set to true.
>>>>
>>>> I think we should provide the option to toggle this value at run time.
>>>> Also the patch 3 should
>>>> add an option to configure the force_parallel.
>>>>
>>>>> ---
>>>>>    lib/automake.mk         |   2 +
>>>>>    lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>>>>>    lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>>>>>    3 files changed, 742 insertions(+)
>>>>>    create mode 100644 lib/ovn-parallel-hmap.c
>>>>>    create mode 100644 lib/ovn-parallel-hmap.h
>>>>>
>>>>> diff --git a/lib/automake.mk b/lib/automake.mk
>>>>> index 250c7aefa..781be2109 100644
>>>>> --- a/lib/automake.mk
>>>>> +++ b/lib/automake.mk
>>>>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>>>>>           lib/expr.c \
>>>>>           lib/extend-table.h \
>>>>>           lib/extend-table.c \
>>>>> +       lib/ovn-parallel-hmap.h \
>>>>> +       lib/ovn-parallel-hmap.c \
>>>>>           lib/ip-mcast-index.c \
>>>>>           lib/ip-mcast-index.h \
>>>>>           lib/mcast-group-index.c \
>>>>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>>>>> new file mode 100644
>>>>> index 000000000..06aa95aba
>>>>> --- /dev/null
>>>>> +++ b/lib/ovn-parallel-hmap.c
>>>>> @@ -0,0 +1,455 @@
>>>>> +/*
>>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#include <config.h>
>>>>> +#include <stdint.h>
>>>>> +#include <string.h>
>>>>> +#include <stdlib.h>
>>>>> +#include <fcntl.h>
>>>>> +#include <unistd.h>
>>>>> +#include <errno.h>
>>>>> +#include <semaphore.h>
>>>>> +#include "fatal-signal.h"
>>>>> +#include "util.h"
>>>>> +#include "openvswitch/vlog.h"
>>>>> +#include "openvswitch/hmap.h"
>>>>> +#include "openvswitch/thread.h"
>>>>> +#include "ovn-parallel-hmap.h"
>>>>> +#include "ovs-atomic.h"
>>>>> +#include "ovs-thread.h"
>>>>> +#include "ovs-numa.h"
>>>>> +#include "random.h"
>>>>> +
>>>>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>>>> +
>>>>> +#ifndef OVS_HAS_PARALLEL_HMAP
>>>>> +
>>>>> +#define WORKER_SEM_NAME "%x-%p-%x"
>>>>> +#define MAIN_SEM_NAME "%x-%p-main"
>>>>> +
>>>>> +/* These are accessed under mutex inside add_worker_pool().
>>>>> + * They do not need to be atomic.
>>>>> + */
>>>>> +
>>>>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>>>>> +static bool can_parallelize = false;
>>>>> +
>>>>> +/* This is set only in the process of exit and the set is
>>>>> + * accompanied by a fence. It does not need to be atomic or be
>>>>> + * accessed under a lock.
>>>>> + */
>>>>> +
>>>>> +static bool workers_must_exit = false;
>>>>> +
>>>>> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
>>>>> +
>>>>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>>>> +
>>>>> +static int pool_size;
>>>>> +
>>>>> +static int sembase;
>>>>> +
>>>>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>>>>> +static void setup_worker_pools(bool force);
>>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index);
>>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index);
>>>>> +
>>>>> +bool ovn_stop_parallel_processing(void)
>>>>> +{
>>>>> +    return workers_must_exit;
>>>>> +}
>>>>> +
>>>>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>>>>> +{
>>>>> +    bool test = false;
>>>>> +
>>>>> +    if (atomic_compare_exchange_strong(
>>>>> +            &initial_pool_setup,
>>>>> +            &test,
>>>>> +            true)) {
>>>>> +        ovs_mutex_lock(&init_mutex);
>>>>> +        setup_worker_pools(force_parallel);
>>>>> +        ovs_mutex_unlock(&init_mutex);
>>>>> +    }
>>>>> +    return can_parallelize;
>>>>> +}
>>>>> +
>>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>>>>> +
>>>>> +    struct worker_pool *new_pool = NULL;
>>>>> +    struct worker_control *new_control;
>>>>> +    bool test = false;
>>>>> +    int i;
>>>>> +    char sem_name[256];
>>>>> +
>>>>> +
>>>>> +    /* Belt and braces - initialize the pool system just in case if
>>>>> +     * if it is not yet initialized.
>>>>> +     */
>>>>> +
>>>>> +    if (atomic_compare_exchange_strong(
>>>>> +            &initial_pool_setup,
>>>>> +            &test,
>>>>> +            true)) {
>>>>> +        ovs_mutex_lock(&init_mutex);
>>>>> +        setup_worker_pools(false);
>>>>> +        ovs_mutex_unlock(&init_mutex);
>>>>> +    }
>>>>> +
>>>>> +    ovs_mutex_lock(&init_mutex);
>>>>> +    if (can_parallelize) {
>>>>> +        new_pool = xmalloc(sizeof(struct worker_pool));
>>>>> +        new_pool->size = pool_size;
>>>>> +        new_pool->controls = NULL;
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>>> +        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>>> +        if (new_pool->done == SEM_FAILED) {
>>>>> +            goto cleanup;
>>>>> +        }
>>>>> +
>>>>> +        new_pool->controls =
>>>>> +            xmalloc(sizeof(struct worker_control) * new_pool->size);
>>>>> +
>>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>>> +            new_control = &new_pool->controls[i];
>>>>> +            new_control->id = i;
>>>>> +            new_control->done = new_pool->done;
>>>>> +            new_control->data = NULL;
>>>>> +            ovs_mutex_init(&new_control->mutex);
>>>>> +            new_control->finished = ATOMIC_VAR_INIT(false);
>>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>>> +            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>>>>> +            if (new_control->fire == SEM_FAILED) {
>>>>> +                goto cleanup;
>>>>> +            }
>>>>> +        }
>>>>> +
>>>>> +        for (i = 0; i < pool_size; i++) {
>>>>> +            ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
>>>>> +        }
>>>>> +        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>>>> +    }
>>>>> +    ovs_mutex_unlock(&init_mutex);
>>>>> +    return new_pool;
>>>>> +cleanup:
>>>>> +
>>>>> +    /* Something went wrong when opening semaphores. In this case
>>>>> +     * it is better to shut off parallel procesing altogether
>>>>> +     */
>>>>> +
>>>>> +    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>>>>> +    can_parallelize = false;
>>>>> +    if (new_pool->controls) {
>>>>> +        for (i = 0; i < new_pool->size; i++) {
>>>>> +            if (new_pool->controls[i].fire != SEM_FAILED) {
>>>>> +                sem_close(new_pool->controls[i].fire);
>>>>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>>>>> +                sem_unlink(sem_name);
>>>>> +                break; /* semaphores past this one are uninitialized */
>>>>> +            }
>>>>> +        }
>>>>> +    }
>>>>> +    if (new_pool->done != SEM_FAILED) {
>>>>> +        sem_close(new_pool->done);
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>>>>> +        sem_unlink(sem_name);
>>>>> +    }
>>>>> +    ovs_mutex_unlock(&init_mutex);
>>>>> +    return NULL;
>>>>> +}
>>>>> +
>>>>> +
>>>>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>>>>> +void
>>>>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>>>>> +{
>>>>> +    size_t i;
>>>>> +
>>>>> +    hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>>>>> +    hmap->one = NULL;
>>>>> +    hmap->mask = mask;
>>>>> +    hmap->n = 0;
>>>>> +    for (i = 0; i <= hmap->mask; i++) {
>>>>> +        hmap->buckets[i] = NULL;
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +/* Initializes 'hmap' as an empty hash table of size X.
>>>>> + * Intended for use in parallel processing so that all
>>>>> + * fragments used to store results in a parallel job
>>>>> + * are the same size.
>>>>> + */
>>>>> +void
>>>>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>>>>> +{
>>>>> +    size_t mask;
>>>>> +    mask = size / 2;
>>>>> +    mask |= mask >> 1;
>>>>> +    mask |= mask >> 2;
>>>>> +    mask |= mask >> 4;
>>>>> +    mask |= mask >> 8;
>>>>> +    mask |= mask >> 16;
>>>>> +#if SIZE_MAX > UINT32_MAX
>>>>> +    mask |= mask >> 32;
>>>>> +#endif
>>>>> +
>>>>> +    /* If we need to dynamically allocate buckets we might as well allocate at
>>>>> +     * least 4 of them. */
>>>>> +    mask |= (mask & 1) << 1;
>>>>> +
>>>>> +    fast_hmap_init(hmap, mask);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which uses a callback function to process results
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_callback(struct worker_pool *pool,
>>>>> +                           void *fin_result, void *result_frags,
>>>>> +                           void (*helper_func)(struct worker_pool *pool,
>>>>> +                                               void *fin_result,
>>>>> +                                               void *result_frags, int index))
>>>>> +{
>>>>> +    int index, completed;
>>>>> +
>>>>> +    /* Ensure that all worker threads see the same data as the
>>>>> +     * main thread.
>>>>> +     */
>>>>> +
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +
>>>>> +    /* Start workers */
>>>>> +
>>>>> +    for (index = 0; index < pool->size; index++) {
>>>>> +        sem_post(pool->controls[index].fire);
>>>>> +    }
>>>>> +
>>>>> +    completed = 0;
>>>>> +
>>>>> +    do {
>>>>> +        bool test;
>>>>> +        /* Note - we do not loop on semaphore until it reaches
>>>>> +         * zero, but on pool size/remaining workers.
>>>>> +         * This is by design. If the inner loop can handle
>>>>> +         * completion for more than one worker within an iteration
>>>>> +         * it will do so to ensure no additional iterations and
>>>>> +         * waits once all of them are done.
>>>>> +         *
>>>>> +         * This may result in us having an initial positive value
>>>>> +         * of the semaphore when the pool is invoked the next time.
>>>>> +         * This is harmless - the loop will spin up a couple of times
>>>>> +         * doing nothing while the workers are processing their data
>>>>> +         * slices.
>>>>> +         */
>>>>> +        sem_wait(pool->done);
>>>>> +        for (index = 0; index < pool->size; index++) {
>>>>> +            test = true;
>>>>> +            /* If the worker has marked its data chunk as complete,
>>>>> +             * invoke the helper function to combine the results of
>>>>> +             * this worker into the main result.
>>>>> +             *
>>>>> +             * The worker must invoke an appropriate memory fence
>>>>> +             * (most likely acq_rel) to ensure that the main thread
>>>>> +             * sees all of the results produced by the worker.
>>>>> +             */
>>>>> +            if (atomic_compare_exchange_weak(
>>>>> +                    &pool->controls[index].finished,
>>>>> +                    &test,
>>>>> +                    false)) {
>>>>> +                if (helper_func) {
>>>>> +                    (helper_func)(pool, fin_result, result_frags, index);
>>>>> +                }
>>>>> +                completed++;
>>>>> +                pool->controls[index].data = NULL;
>>>>> +            }
>>>>> +        }
>>>>> +    } while (completed < pool->size);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool - basic, does not do results processing.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool(struct worker_pool *pool)
>>>>> +{
>>>>> +    run_pool_callback(pool, NULL, NULL, NULL);
>>>>> +}
>>>>> +
>>>>> +/* Brute force merge of a hashmap into another hashmap.
>>>>> + * Intended for use in parallel processing. The destination
>>>>> + * hashmap MUST be the same size as the one being merged.
>>>>> + *
>>>>> + * This can be achieved by pre-allocating them to correct size
>>>>> + * and using hmap_insert_fast() instead of hmap_insert()
>>>>> + */
>>>>> +
>>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>>>>> +{
>>>>> +    size_t i;
>>>>> +
>>>>> +    ovs_assert(inc->mask == dest->mask);
>>>>> +
>>>>> +    if (!inc->n) {
>>>>> +        /* Request to merge an empty frag, nothing to do */
>>>>> +        return;
>>>>> +    }
>>>>> +
>>>>> +    for (i = 0; i <= dest->mask; i++) {
>>>>> +        struct hmap_node **dest_bucket = &dest->buckets[i];
>>>>> +        struct hmap_node **inc_bucket = &inc->buckets[i];
>>>>> +        if (*inc_bucket != NULL) {
>>>>> +            struct hmap_node *last_node = *inc_bucket;
>>>>> +            while (last_node->next != NULL) {
>>>>> +                last_node = last_node->next;
>>>>> +            }
>>>>> +            last_node->next = *dest_bucket;
>>>>> +            *dest_bucket = *inc_bucket;
>>>>> +            *inc_bucket = NULL;
>>>>> +        }
>>>>> +    }
>>>>> +    dest->n += inc->n;
>>>>> +    inc->n = 0;
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which gathers results in an array
>>>>> + * of hashes. Merge results.
>>>>> + */
>>>>> +
>>>>> +
>>>>> +void ovn_run_pool_hash(
>>>>> +        struct worker_pool *pool,
>>>>> +        struct hmap *result,
>>>>> +        struct hmap *result_frags)
>>>>> +{
>>>>> +    run_pool_callback(pool, result, result_frags, merge_hash_results);
>>>>> +}
>>>>> +
>>>>> +/* Run a thread pool which gathers results in an array of lists.
>>>>> + * Merge results.
>>>>> + */
>>>>> +void ovn_run_pool_list(
>>>>> +        struct worker_pool *pool,
>>>>> +        struct ovs_list *result,
>>>>> +        struct ovs_list *result_frags)
>>>>> +{
>>>>> +    run_pool_callback(pool, result, result_frags, merge_list_results);
>>>>> +}
>>>>> +
>>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl)
>>>>> +{
>>>>> +    int i;
>>>>> +    if (hrl->mask != lflows->mask) {
>>>>> +        if (hrl->row_locks) {
>>>>> +            free(hrl->row_locks);
>>>>> +        }
>>>>> +        hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
>>>>> +        hrl->mask = lflows->mask;
>>>>> +        for (i = 0; i <= lflows->mask; i++) {
>>>>> +            ovs_mutex_init(&hrl->row_locks[i]);
>>>>> +        }
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>>>>> +    int i;
>>>>> +    static struct worker_pool *pool;
>>>>> +    char sem_name[256];
>>>>> +
>>>>> +    workers_must_exit = true;
>>>>> +
>>>>> +    /* All workers must honour the must_exit flag and check for it regularly.
>>>>> +     * We can make it atomic and check it via atomics in workers, but that
>>>>> +     * is not really necessary as it is set just once - when the program
>>>>> +     * terminates. So we use a fence which is invoked before exiting instead.
>>>>> +     */
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +
>>>>> +    /* Wake up the workers after the must_exit flag has been set */
>>>>> +
>>>>> +    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>>>>> +        for (i = 0; i < pool->size ; i++) {
>>>>> +            sem_post(pool->controls[i].fire);
>>>>> +        }
>>>>> +        for (i = 0; i < pool->size ; i++) {
>>>>> +            sem_close(pool->controls[i].fire);
>>>>> +            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>>>>> +            sem_unlink(sem_name);
>>>>> +        }
>>>>> +        sem_close(pool->done);
>>>>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>>>>> +        sem_unlink(sem_name);
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void setup_worker_pools(bool force) {
>>>>> +    int cores, nodes;
>>>>> +
>>>>> +    nodes = ovs_numa_get_n_numas();
>>>>> +    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>>>>> +        nodes = 1;
>>>>> +    }
>>>>> +    cores = ovs_numa_get_n_cores();
>>>>> +
>>>>> +    /* If there is no NUMA config, use 4 cores.
>>>>> +     * If there is NUMA config use half the cores on
>>>>> +     * one node so that the OS does not start pushing
>>>>> +     * threads to other nodes.
>>>>> +     */
>>>>> +    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>>>>> +        /* If there is no NUMA we can try the ovs-threads routine.
>>>>> +         * It falls back to sysconf and/or affinity mask.
>>>>> +         */
>>>>> +        cores = count_cpu_cores();
>>>>> +        pool_size = cores;
>>>>> +    } else {
>>>>> +        pool_size = cores / nodes;
>>>>> +    }
>>>>> +    if ((pool_size < 4) && force) {
>>>>> +        pool_size = 4;
>>>>> +    }
>>>>> +    can_parallelize = (pool_size >= 3);
>>>>> +    fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>>>> +    sembase = random_uint32();
>>>>> +}
>>>>> +
>>>>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index)
>>>>> +{
>>>>> +    struct ovs_list *result = (struct ovs_list *)fin_result;
>>>>> +    struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>>>>> +
>>>>> +    if (!ovs_list_is_empty(&res_frags[index])) {
>>>>> +        ovs_list_splice(result->next,
>>>>> +                ovs_list_front(&res_frags[index]), &res_frags[index]);
>>>>> +    }
>>>>> +}
>>>>> +
>>>>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>>>> +                               void *fin_result, void *result_frags,
>>>>> +                               int index)
>>>>> +{
>>>>> +    struct hmap *result = (struct hmap *)fin_result;
>>>>> +    struct hmap *res_frags = (struct hmap *)result_frags;
>>>>> +
>>>>> +    fast_hmap_merge(result, &res_frags[index]);
>>>>> +    hmap_destroy(&res_frags[index]);
>>>>> +}
>>>>> +
>>>>> +#endif
>>>>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>>>>> new file mode 100644
>>>>> index 000000000..71ad17fb0
>>>>> --- /dev/null
>>>>> +++ b/lib/ovn-parallel-hmap.h
>>>>> @@ -0,0 +1,285 @@
>>>>> +/*
>>>>> + * Copyright (c) 2020 Red Hat, Inc.
>>>>> + *
>>>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>>>> + * you may not use this file except in compliance with the License.
>>>>> + * You may obtain a copy of the License at:
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing, software
>>>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>>>> + * See the License for the specific language governing permissions and
>>>>> + * limitations under the License.
>>>>> + */
>>>>> +
>>>>> +#ifndef OVN_PARALLEL_HMAP
>>>>> +#define OVN_PARALLEL_HMAP 1
>>>>> +
>>>>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>>>>> + * we skip over the ovn specific definitions.
>>>>> + */
>>>>> +
>>>>> +#ifdef  __cplusplus
>>>>> +extern "C" {
>>>>> +#endif
>>>>> +
>>>>> +#include <stdbool.h>
>>>>> +#include <stdlib.h>
>>>>> +#include <semaphore.h>
>>>>> +#include "openvswitch/util.h"
>>>>> +#include "openvswitch/hmap.h"
>>>>> +#include "openvswitch/thread.h"
>>>>> +#include "ovs-atomic.h"
>>>>> +
>>>>> +/* Process this include only if OVS does not supply parallel definitions
>>>>> + */
>>>>> +
>>>>> +#ifdef OVS_HAS_PARALLEL_HMAP
>>>>> +
>>>>> +#include "parallel-hmap.h"
>>>>> +
>>>>> +#else
>>>>> +
>>>>> +
>>>>> +#ifdef __clang__
>>>>> +#pragma clang diagnostic push
>>>>> +#pragma clang diagnostic ignored "-Wthread-safety"
>>>>> +#endif
>>>> I think you missed addressing my comment I provided in v13 to
>>>> add some comments on why this is required.
>>>>
>>>>
>>>>> +
>>>>> +
>>>>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>>>>> + * of parallel processing.
>>>>> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
>>>>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>>>>> + * ThreadID + step * 2, etc. The actual macro accepts
>>>>> + * ThreadID + step * i as the JOBID parameter.
>>>>> + */
>>>>> +
>>>>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>>>>> +   for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
>>>>> +        (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>>>>> +       || ((NODE = NULL), false); \
>>>>> +       ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
>>>>> +
>>>>> +/* We do not have a SAFE version of the macro, because the hash size is not
>>>>> + * atomic and hash removal operations would need to be wrapped with
>>>>> + * locks. This will defeat most of the benefits from doing anything in
>>>>> + * parallel.
>>>>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
>>>>> + * each thread should store them in a temporary list result instead, merging
>>>>> + * the lists into a combined result at the end */
>>>>> +
>>>>> +/* Work "Handle" */
>>>>> +
>>>>> +struct worker_control {
>>>>> +    int id; /* Used as a modulo when iterating over a hash. */
>>>>> +    atomic_bool finished; /* Set to true after achunk of work is complete. */
>>>>> +    sem_t *fire; /* Work start semaphore - sem_post starts the worker. */
>>>>> +    sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>>>> +    struct ovs_mutex mutex; /* Guards the data. */
>>>>> +    void *data; /* Pointer to data to be processed. */
>>>>> +    void *workload; /* back-pointer to the worker pool structure. */
>>>>> +};
>>>>> +
>>>>> +struct worker_pool {
>>>>> +    int size;   /* Number of threads in the pool. */
>>>>> +    struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>>>> +    struct worker_control *controls; /* "Handles" in this pool. */
>>>>> +    sem_t *done; /* Work completion semaphorew. */
>>>>> +};
>>>>> +
>>>>> +/* Add a worker pool for thread function start() which expects a pointer to
>>>>> + * a worker_control structure as an argument. */
>>>>> +
>>>>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>>>>> +
>>>>> +/* Setting this to true will make all processing threads exit */
>>>>> +
>>>>> +bool ovn_stop_parallel_processing(void);
>>>>> +
>>>>> +/* Build a hmap pre-sized for size elements */
>>>>> +
>>>>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>>>>> +
>>>>> +/* Build a hmap with a mask equals to size */
>>>>> +
>>>>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>>>>> +
>>>>> +/* Brute-force merge a hmap into hmap.
>>>>> + * Dest and inc have to have the same mask. The merge is performed
>>>>> + * by extending the element list for bucket N in the dest hmap with the list
>>>>> + * from bucket N in inc.
>>>>> + */
>>>>> +
>>>>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>>>>> +
>>>>> +/* Run a pool, without any default processing of results.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool(struct worker_pool *pool);
>>>>> +
>>>>> +/* Run a pool, merge results from hash frags into a final hash result.
>>>>> + * The hash frags must be pre-sized to the same size.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_hash(struct worker_pool *pool,
>>>>> +                       struct hmap *result, struct hmap *result_frags);
>>>>> +/* Run a pool, merge results from list frags into a final list result.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_list(struct worker_pool *pool,
>>>>> +                       struct ovs_list *result, struct ovs_list *result_frags);
>>>>> +
>>>>> +/* Run a pool, call a callback function to perform processing of results.
>>>>> + */
>>>>> +
>>>>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>>>> +                    void *result_frags,
>>>>> +                    void (*helper_func)(struct worker_pool *pool,
>>>>> +                        void *fin_result, void *result_frags, int index));
>>>>> +
>>>>> +
>>>>> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>>>>> + * would land, or a null pointer if that bucket is empty. */
>>>>> +
>>>>> +static inline struct hmap_node *
>>>>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>>>>> +{
>>>>> +    return hmap->buckets[num];
>>>>> +}
>>>>> +
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
>>>>> +{
>>>>> +    size_t i;
>>>>> +    for (i = start; i <= hmap->mask; i+= pool_size) {
>>>>> +        struct hmap_node *node = hmap->buckets[i];
>>>>> +        if (node) {
>>>>> +            return node;
>>>>> +        }
>>>>> +    }
>>>>> +    return NULL;
>>>>> +}
>>>>> +
>>>>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>>>>> + * for parallel processing in arbitrary order, or a null pointer if
>>>>> + * the slice of 'hmap' for that job_id is empty. */
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
>>>>> +{
>>>>> +    return parallel_hmap_next__(hmap, job_id, pool_size);
>>>>> +}
>>>>> +
>>>>> +/* Returns the next node in the slice of 'hmap' following 'node',
>>>>> + * in arbitrary order, or a * null pointer if 'node' is the last node in
>>>>> + * the 'hmap' slice.
>>>>> + *
>>>>> + */
>>>>> +static inline struct hmap_node *
>>>>> +parallel_hmap_next(const struct hmap *hmap,
>>>>> +                   const struct hmap_node *node, ssize_t pool_size)
>>>>> +{
>>>>> +    return (node->next
>>>>> +            ? node->next
>>>>> +            : parallel_hmap_next__(hmap,
>>>>> +                (node->hash & hmap->mask) + pool_size, pool_size));
>>>>> +}
>>>>> +
>>>>> +static inline void post_completed_work(struct worker_control *control)
>>>>> +{
>>>>> +    atomic_thread_fence(memory_order_acq_rel);
>>>>> +    atomic_store_relaxed(&control->finished, true);
>>>>> +    sem_post(control->done);
>>>>> +}
>>>>> +
>>>>> +static inline void wait_for_work(struct worker_control *control)
>>>>> +{
>>>>> +    sem_wait(control->fire);
>>>>> +}
>>>>> +
>>>>> +/* Hash per-row locking support - to be used only in conjunction
>>>>> + * with fast hash inserts. Normal hash inserts may resize the hash
>>>>> + * rendering the locking invalid.
>>>>> + */
>>>>> +
>>>>> +struct hashrow_locks {
>>>>> +    ssize_t mask;
>>>>> +    struct ovs_mutex *row_locks;
>>>>> +};
>>>>> +
>>>>> +/* Update an hash row locks structure to match the current hash size */
>>>>> +
>>>>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl);
>>>>> +
>>>>> +/* Lock a hash row */
>>>>> +
>>>>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>>> +{
>>>>> +    ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>>>>> +}
>>>>> +
>>>>> +/* Unlock a hash row */
>>>>> +
>>>>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash)
>>>>> +{
>>>>> +    ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>>>>> +}
>>>>> +/* Init the row locks structure */
>>>>> +
>>>>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>>>>> +{
>>>>> +    hrl->mask = 0;
>>>>> +    hrl->row_locks = NULL;
>>>>> +}
>>>>> +
>>>>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>>>>> +
>>>>> +/* Use the OVN library functions for stuff which OVS has not defined
>>>>> + * If OVS has defined these, they will still compile using the OVN
>>>>> + * local names, but will be dropped by the linker in favour of the OVS
>>>>> + * supplied functions.
>>>>> + */
>>>>> +
>>>>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, hrl)
>>>>> +
>>>>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>>>>> +
>>>>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>>>>> +
>>>>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>>>>> +
>>>>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
>>>>> +
>>>>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>>>>> +
>>>>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>>>>> +
>>>>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>>>>> +
>>>>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>>>>> +
>>>>> +#define run_pool_hash(pool, result, result_frags) \
>>>>> +    ovn_run_pool_hash(pool, result, result_frags)
>>>>> +
>>>>> +#define run_pool_list(pool, result, result_frags) \
>>>>> +    ovn_run_pool_list(pool, result, result_frags)
>>>>> +
>>>>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
>>>>> +    ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>>>>> +
>>>>> +
>>>> In my opinion, we can switch over to making use of OVS APIs for fast_hmap
>>>> once the patches in OVS are merged.  Until then I think we should just assume
>>>> that these functions are part of OVN lib and consume them directly.
>>>>
>>>> It's possible that the function names could change when those patches
>>>> land in OVS.
>>>>
>>>> Thanks
>>>> Numan
>>>>
>>>>> +
>>>>> +#ifdef __clang__
>>>>> +#pragma clang diagnostic pop
>>>>> +#endif
>>>>> +
>>>>> +#endif
>>>>> +
>>>>> +#ifdef  __cplusplus
>>>>> +}
>>>>> +#endif
>>>>> +
>>>>> +
>>>>> +#endif /* lib/fasthmap.h */
>>>>> --
>>>>> 2.20.1
>>>>>
>>>>> _______________________________________________
>>>>> dev mailing list
>>>>> dev at openvswitch.org
>>>>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>>>>
>> --
>> Anton R. Ivanov
>> Cambridgegreys Limited. Registered in England. Company Number 10273661
>> https://www.cambridgegreys.com/
>>
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev

-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/




More information about the dev mailing list