[ovs-dev] [OVN Patch v14 1/3] ovn-libs: Add support for parallel processing
Anton Ivanov
anton.ivanov at cambridgegreys.com
Fri Feb 19 07:34:30 UTC 2021
On 18/02/2021 22:36, Mark Michelson wrote:
> Hi Anton, in short:
>
> Acked-by: Mark Michelson <mmichels at redhat.com>
>
> I think this can go in as-is. My only question is with regards to how
> dp_groups are handled. The current patch makes it so that when
> dp_groups are used, we parallelize the work. However, instead of each
> thread creating discrete segments of lflows, each thread shares a
> reference to the same large lflow hmap. This means the threads have to
> lock the pertinent hash row each time they add a lflow to the hmap.
> This makes sense since the entire lflow hmap needs to be taken into
> account when determining if a flow is repeated between different
> datapaths.
>
> I'm curious if this contention might result in poorer parallization
> speedup.
It does. According to Newman's results the speedup is ~ 2 times instead
of 4+
> Without having done any profiling, my assumption is that this
> shouldn't perform worse than single-threaded code, but that it likely
> isn't performing that much better either.
See above.
> Because this isn't actively *harming* performance, that's why I went
> ahead and acked the patch series.
>
> Once we have run some performance tests, it would be worth determining
> if the algorithm used for dp_groups could be improved. I have a couple
> of thoughts:
>
> 1) It might just make sense to disable parallelization when dp_groups
> are enabled. This certainly wouldn't help performance, but it would
> simplify northd somewhat. This might be an OK tradeoff if the
> performance improvement from parallelization with dp_groups is
> insignificant.
Actually, it looks like it is still faster if you have at least 8
threads to throw at it.
>
> 2) We could fully parallelize lflow table creation just like we do
> when dp_groups are disabled: split the lflow hmap into segments for
> each worker to fill in, and then merge them together once all threads
> are complete. Then after merging, we could iterate over the completed
> lflow hmap and determine the dp_groups at that point, rather than when
> inserting the lflow into the hmap. This extra hmap traversal might
> slow things down some, but uncontended parallelization could more than
> offset that cost.
I would suggest leaving it as is. It does not lock full table - it locks
"hash rows". So the contention is not as bad as one would expect and it
still provides a reasonable gain.
>
> What do you think about these ideas? I bring them up because OVN is
> trending towards eventually having dp_groups enabled and no longer
> being disable-able.
>
> On 2/12/21 9:49 AM, Anton Ivanov wrote:
>> This adds a set of functions and macros intended to process
>> hashes in parallel.
>>
>> The principles of operation are documented in the fasthmap.h
>>
>> If these one day go into the OVS tree, the OVS tree versions
>> would be used in preference.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>> ---
>> lib/automake.mk | 2 +
>> lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++
>> lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++
>> 3 files changed, 742 insertions(+)
>> create mode 100644 lib/ovn-parallel-hmap.c
>> create mode 100644 lib/ovn-parallel-hmap.h
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index 250c7aefa..781be2109 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
>> lib/expr.c \
>> lib/extend-table.h \
>> lib/extend-table.c \
>> + lib/ovn-parallel-hmap.h \
>> + lib/ovn-parallel-hmap.c \
>> lib/ip-mcast-index.c \
>> lib/ip-mcast-index.h \
>> lib/mcast-group-index.c \
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> new file mode 100644
>> index 000000000..06aa95aba
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -0,0 +1,455 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <stdint.h>
>> +#include <string.h>
>> +#include <stdlib.h>
>> +#include <fcntl.h>
>> +#include <unistd.h>
>> +#include <errno.h>
>> +#include <semaphore.h>
>> +#include "fatal-signal.h"
>> +#include "util.h"
>> +#include "openvswitch/vlog.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovn-parallel-hmap.h"
>> +#include "ovs-atomic.h"
>> +#include "ovs-thread.h"
>> +#include "ovs-numa.h"
>> +#include "random.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>> +
>> +#ifndef OVS_HAS_PARALLEL_HMAP
>> +
>> +#define WORKER_SEM_NAME "%x-%p-%x"
>> +#define MAIN_SEM_NAME "%x-%p-main"
>> +
>> +/* These are accessed under mutex inside add_worker_pool().
>> + * They do not need to be atomic.
>> + */
>> +
>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>> +static bool can_parallelize = false;
>> +
>> +/* This is set only in the process of exit and the set is
>> + * accompanied by a fence. It does not need to be atomic or be
>> + * accessed under a lock.
>> + */
>> +
>> +static bool workers_must_exit = false;
>> +
>> +static struct ovs_list worker_pools =
>> OVS_LIST_INITIALIZER(&worker_pools);
>> +
>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>> +
>> +static int pool_size;
>> +
>> +static int sembase;
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED);
>> +static void setup_worker_pools(bool force);
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> + void *fin_result, void *result_frags,
>> + int index);
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> + void *fin_result, void *result_frags,
>> + int index);
>> +
>> +bool ovn_stop_parallel_processing(void)
>> +{
>> + return workers_must_exit;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel)
>> +{
>> + bool test = false;
>> +
>> + if (atomic_compare_exchange_strong(
>> + &initial_pool_setup,
>> + &test,
>> + true)) {
>> + ovs_mutex_lock(&init_mutex);
>> + setup_worker_pools(force_parallel);
>> + ovs_mutex_unlock(&init_mutex);
>> + }
>> + return can_parallelize;
>> +}
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
>> +
>> + struct worker_pool *new_pool = NULL;
>> + struct worker_control *new_control;
>> + bool test = false;
>> + int i;
>> + char sem_name[256];
>> +
>> +
>> + /* Belt and braces - initialize the pool system just in case if
>> + * if it is not yet initialized.
>> + */
>> +
>> + if (atomic_compare_exchange_strong(
>> + &initial_pool_setup,
>> + &test,
>> + true)) {
>> + ovs_mutex_lock(&init_mutex);
>> + setup_worker_pools(false);
>> + ovs_mutex_unlock(&init_mutex);
>> + }
>> +
>> + ovs_mutex_lock(&init_mutex);
>> + if (can_parallelize) {
>> + new_pool = xmalloc(sizeof(struct worker_pool));
>> + new_pool->size = pool_size;
>> + new_pool->controls = NULL;
>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> + new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> + if (new_pool->done == SEM_FAILED) {
>> + goto cleanup;
>> + }
>> +
>> + new_pool->controls =
>> + xmalloc(sizeof(struct worker_control) * new_pool->size);
>> +
>> + for (i = 0; i < new_pool->size; i++) {
>> + new_control = &new_pool->controls[i];
>> + new_control->id = i;
>> + new_control->done = new_pool->done;
>> + new_control->data = NULL;
>> + ovs_mutex_init(&new_control->mutex);
>> + new_control->finished = ATOMIC_VAR_INIT(false);
>> + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU,
>> 0);
>> + if (new_control->fire == SEM_FAILED) {
>> + goto cleanup;
>> + }
>> + }
>> +
>> + for (i = 0; i < pool_size; i++) {
>> + ovs_thread_create("worker pool helper", start,
>> &new_pool->controls[i]);
>> + }
>> + ovs_list_push_back(&worker_pools, &new_pool->list_node);
>> + }
>> + ovs_mutex_unlock(&init_mutex);
>> + return new_pool;
>> +cleanup:
>> +
>> + /* Something went wrong when opening semaphores. In this case
>> + * it is better to shut off parallel procesing altogether
>> + */
>> +
>> + VLOG_INFO("Failed to initialize parallel processing, error %d",
>> errno);
>> + can_parallelize = false;
>> + if (new_pool->controls) {
>> + for (i = 0; i < new_pool->size; i++) {
>> + if (new_pool->controls[i].fire != SEM_FAILED) {
>> + sem_close(new_pool->controls[i].fire);
>> + sprintf(sem_name, WORKER_SEM_NAME, sembase,
>> new_pool, i);
>> + sem_unlink(sem_name);
>> + break; /* semaphores past this one are uninitialized */
>> + }
>> + }
>> + }
>> + if (new_pool->done != SEM_FAILED) {
>> + sem_close(new_pool->done);
>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> + sem_unlink(sem_name);
>> + }
>> + ovs_mutex_unlock(&init_mutex);
>> + return NULL;
>> +}
>> +
>> +
>> +/* Initializes 'hmap' as an empty hash table with mask N. */
>> +void
>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> +{
>> + size_t i;
>> +
>> + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
>> + hmap->one = NULL;
>> + hmap->mask = mask;
>> + hmap->n = 0;
>> + for (i = 0; i <= hmap->mask; i++) {
>> + hmap->buckets[i] = NULL;
>> + }
>> +}
>> +
>> +/* Initializes 'hmap' as an empty hash table of size X.
>> + * Intended for use in parallel processing so that all
>> + * fragments used to store results in a parallel job
>> + * are the same size.
>> + */
>> +void
>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
>> +{
>> + size_t mask;
>> + mask = size / 2;
>> + mask |= mask >> 1;
>> + mask |= mask >> 2;
>> + mask |= mask >> 4;
>> + mask |= mask >> 8;
>> + mask |= mask >> 16;
>> +#if SIZE_MAX > UINT32_MAX
>> + mask |= mask >> 32;
>> +#endif
>> +
>> + /* If we need to dynamically allocate buckets we might as well
>> allocate at
>> + * least 4 of them. */
>> + mask |= (mask & 1) << 1;
>> +
>> + fast_hmap_init(hmap, mask);
>> +}
>> +
>> +/* Run a thread pool which uses a callback function to process results
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool,
>> + void *fin_result, void *result_frags,
>> + void (*helper_func)(struct worker_pool
>> *pool,
>> + void *fin_result,
>> + void *result_frags,
>> int index))
>> +{
>> + int index, completed;
>> +
>> + /* Ensure that all worker threads see the same data as the
>> + * main thread.
>> + */
>> +
>> + atomic_thread_fence(memory_order_acq_rel);
>> +
>> + /* Start workers */
>> +
>> + for (index = 0; index < pool->size; index++) {
>> + sem_post(pool->controls[index].fire);
>> + }
>> +
>> + completed = 0;
>> +
>> + do {
>> + bool test;
>> + /* Note - we do not loop on semaphore until it reaches
>> + * zero, but on pool size/remaining workers.
>> + * This is by design. If the inner loop can handle
>> + * completion for more than one worker within an iteration
>> + * it will do so to ensure no additional iterations and
>> + * waits once all of them are done.
>> + *
>> + * This may result in us having an initial positive value
>> + * of the semaphore when the pool is invoked the next time.
>> + * This is harmless - the loop will spin up a couple of times
>> + * doing nothing while the workers are processing their data
>> + * slices.
>> + */
>> + sem_wait(pool->done);
>> + for (index = 0; index < pool->size; index++) {
>> + test = true;
>> + /* If the worker has marked its data chunk as complete,
>> + * invoke the helper function to combine the results of
>> + * this worker into the main result.
>> + *
>> + * The worker must invoke an appropriate memory fence
>> + * (most likely acq_rel) to ensure that the main thread
>> + * sees all of the results produced by the worker.
>> + */
>> + if (atomic_compare_exchange_weak(
>> + &pool->controls[index].finished,
>> + &test,
>> + false)) {
>> + if (helper_func) {
>> + (helper_func)(pool, fin_result, result_frags,
>> index);
>> + }
>> + completed++;
>> + pool->controls[index].data = NULL;
>> + }
>> + }
>> + } while (completed < pool->size);
>> +}
>> +
>> +/* Run a thread pool - basic, does not do results processing.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool)
>> +{
>> + run_pool_callback(pool, NULL, NULL, NULL);
>> +}
>> +
>> +/* Brute force merge of a hashmap into another hashmap.
>> + * Intended for use in parallel processing. The destination
>> + * hashmap MUST be the same size as the one being merged.
>> + *
>> + * This can be achieved by pre-allocating them to correct size
>> + * and using hmap_insert_fast() instead of hmap_insert()
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
>> +{
>> + size_t i;
>> +
>> + ovs_assert(inc->mask == dest->mask);
>> +
>> + if (!inc->n) {
>> + /* Request to merge an empty frag, nothing to do */
>> + return;
>> + }
>> +
>> + for (i = 0; i <= dest->mask; i++) {
>> + struct hmap_node **dest_bucket = &dest->buckets[i];
>> + struct hmap_node **inc_bucket = &inc->buckets[i];
>> + if (*inc_bucket != NULL) {
>> + struct hmap_node *last_node = *inc_bucket;
>> + while (last_node->next != NULL) {
>> + last_node = last_node->next;
>> + }
>> + last_node->next = *dest_bucket;
>> + *dest_bucket = *inc_bucket;
>> + *inc_bucket = NULL;
>> + }
>> + }
>> + dest->n += inc->n;
>> + inc->n = 0;
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array
>> + * of hashes. Merge results.
>> + */
>> +
>> +
>> +void ovn_run_pool_hash(
>> + struct worker_pool *pool,
>> + struct hmap *result,
>> + struct hmap *result_frags)
>> +{
>> + run_pool_callback(pool, result, result_frags, merge_hash_results);
>> +}
>> +
>> +/* Run a thread pool which gathers results in an array of lists.
>> + * Merge results.
>> + */
>> +void ovn_run_pool_list(
>> + struct worker_pool *pool,
>> + struct ovs_list *result,
>> + struct ovs_list *result_frags)
>> +{
>> + run_pool_callback(pool, result, result_frags, merge_list_results);
>> +}
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct
>> hashrow_locks *hrl)
>> +{
>> + int i;
>> + if (hrl->mask != lflows->mask) {
>> + if (hrl->row_locks) {
>> + free(hrl->row_locks);
>> + }
>> + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex),
>> lflows->mask + 1);
>> + hrl->mask = lflows->mask;
>> + for (i = 0; i <= lflows->mask; i++) {
>> + ovs_mutex_init(&hrl->row_locks[i]);
>> + }
>> + }
>> +}
>> +
>> +static void worker_pool_hook(void *aux OVS_UNUSED) {
>> + int i;
>> + static struct worker_pool *pool;
>> + char sem_name[256];
>> +
>> + workers_must_exit = true;
>> +
>> + /* All workers must honour the must_exit flag and check for it
>> regularly.
>> + * We can make it atomic and check it via atomics in workers,
>> but that
>> + * is not really necessary as it is set just once - when the
>> program
>> + * terminates. So we use a fence which is invoked before exiting
>> instead.
>> + */
>> + atomic_thread_fence(memory_order_acq_rel);
>> +
>> + /* Wake up the workers after the must_exit flag has been set */
>> +
>> + LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> + for (i = 0; i < pool->size ; i++) {
>> + sem_post(pool->controls[i].fire);
>> + }
>> + for (i = 0; i < pool->size ; i++) {
>> + sem_close(pool->controls[i].fire);
>> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> + sem_unlink(sem_name);
>> + }
>> + sem_close(pool->done);
>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> + sem_unlink(sem_name);
>> + }
>> +}
>> +
>> +static void setup_worker_pools(bool force) {
>> + int cores, nodes;
>> +
>> + nodes = ovs_numa_get_n_numas();
>> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> + nodes = 1;
>> + }
>> + cores = ovs_numa_get_n_cores();
>> +
>> + /* If there is no NUMA config, use 4 cores.
>> + * If there is NUMA config use half the cores on
>> + * one node so that the OS does not start pushing
>> + * threads to other nodes.
>> + */
>> + if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> + /* If there is no NUMA we can try the ovs-threads routine.
>> + * It falls back to sysconf and/or affinity mask.
>> + */
>> + cores = count_cpu_cores();
>> + pool_size = cores;
>> + } else {
>> + pool_size = cores / nodes;
>> + }
>> + if ((pool_size < 4) && force) {
>> + pool_size = 4;
>> + }
>> + can_parallelize = (pool_size >= 3);
>> + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>> + sembase = random_uint32();
>> +}
>> +
>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>> + void *fin_result, void *result_frags,
>> + int index)
>> +{
>> + struct ovs_list *result = (struct ovs_list *)fin_result;
>> + struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>> +
>> + if (!ovs_list_is_empty(&res_frags[index])) {
>> + ovs_list_splice(result->next,
>> + ovs_list_front(&res_frags[index]), &res_frags[index]);
>> + }
>> +}
>> +
>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>> + void *fin_result, void *result_frags,
>> + int index)
>> +{
>> + struct hmap *result = (struct hmap *)fin_result;
>> + struct hmap *res_frags = (struct hmap *)result_frags;
>> +
>> + fast_hmap_merge(result, &res_frags[index]);
>> + hmap_destroy(&res_frags[index]);
>> +}
>> +
>> +#endif
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> new file mode 100644
>> index 000000000..71ad17fb0
>> --- /dev/null
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -0,0 +1,285 @@
>> +/*
>> + * Copyright (c) 2020 Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#ifndef OVN_PARALLEL_HMAP
>> +#define OVN_PARALLEL_HMAP 1
>> +
>> +/* if the parallel macros are defined by hmap.h or any other ovs define
>> + * we skip over the ovn specific definitions.
>> + */
>> +
>> +#ifdef __cplusplus
>> +extern "C" {
>> +#endif
>> +
>> +#include <stdbool.h>
>> +#include <stdlib.h>
>> +#include <semaphore.h>
>> +#include "openvswitch/util.h"
>> +#include "openvswitch/hmap.h"
>> +#include "openvswitch/thread.h"
>> +#include "ovs-atomic.h"
>> +
>> +/* Process this include only if OVS does not supply parallel
>> definitions
>> + */
>> +
>> +#ifdef OVS_HAS_PARALLEL_HMAP
>> +
>> +#include "parallel-hmap.h"
>> +
>> +#else
>> +
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic push
>> +#pragma clang diagnostic ignored "-Wthread-safety"
>> +#endif
>> +
>> +
>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
>> + * of parallel processing.
>> + * Each worker thread has a different ThreadID in the range of
>> 0..POOL_SIZE
>> + * and will iterate hash buckets ThreadID, ThreadID + step,
>> + * ThreadID + step * 2, etc. The actual macro accepts
>> + * ThreadID + step * i as the JOBID parameter.
>> + */
>> +
>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
>> + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID),
>> MEMBER); \
>> + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
>> + || ((NODE = NULL), false); \
>> + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER),
>> MEMBER))
>> +
>> +/* We do not have a SAFE version of the macro, because the hash size
>> is not
>> + * atomic and hash removal operations would need to be wrapped with
>> + * locks. This will defeat most of the benefits from doing anything in
>> + * parallel.
>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove
>> elements,
>> + * each thread should store them in a temporary list result instead,
>> merging
>> + * the lists into a combined result at the end */
>> +
>> +/* Work "Handle" */
>> +
>> +struct worker_control {
>> + int id; /* Used as a modulo when iterating over a hash. */
>> + atomic_bool finished; /* Set to true after achunk of work is
>> complete. */
>> + sem_t *fire; /* Work start semaphore - sem_post starts the
>> worker. */
>> + sem_t *done; /* Work completion semaphore - sem_post on
>> completion. */
>> + struct ovs_mutex mutex; /* Guards the data. */
>> + void *data; /* Pointer to data to be processed. */
>> + void *workload; /* back-pointer to the worker pool structure. */
>> +};
>> +
>> +struct worker_pool {
>> + int size; /* Number of threads in the pool. */
>> + struct ovs_list list_node; /* List of pools - used in
>> cleanup/exit. */
>> + struct worker_control *controls; /* "Handles" in this pool. */
>> + sem_t *done; /* Work completion semaphorew. */
>> +};
>> +
>> +/* Add a worker pool for thread function start() which expects a
>> pointer to
>> + * a worker_control structure as an argument. */
>> +
>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +
>> +/* Setting this to true will make all processing threads exit */
>> +
>> +bool ovn_stop_parallel_processing(void);
>> +
>> +/* Build a hmap pre-sized for size elements */
>> +
>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
>> +
>> +/* Build a hmap with a mask equals to size */
>> +
>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
>> +
>> +/* Brute-force merge a hmap into hmap.
>> + * Dest and inc have to have the same mask. The merge is performed
>> + * by extending the element list for bucket N in the dest hmap with
>> the list
>> + * from bucket N in inc.
>> + */
>> +
>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
>> +
>> +/* Run a pool, without any default processing of results.
>> + */
>> +
>> +void ovn_run_pool(struct worker_pool *pool);
>> +
>> +/* Run a pool, merge results from hash frags into a final hash result.
>> + * The hash frags must be pre-sized to the same size.
>> + */
>> +
>> +void ovn_run_pool_hash(struct worker_pool *pool,
>> + struct hmap *result, struct hmap *result_frags);
>> +/* Run a pool, merge results from list frags into a final list result.
>> + */
>> +
>> +void ovn_run_pool_list(struct worker_pool *pool,
>> + struct ovs_list *result, struct ovs_list
>> *result_frags);
>> +
>> +/* Run a pool, call a callback function to perform processing of
>> results.
>> + */
>> +
>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>> + void *result_frags,
>> + void (*helper_func)(struct worker_pool *pool,
>> + void *fin_result, void *result_frags, int
>> index));
>> +
>> +
>> +/* Returns the first node in 'hmap' in the bucket in which the given
>> 'hash'
>> + * would land, or a null pointer if that bucket is empty. */
>> +
>> +static inline struct hmap_node *
>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
>> +{
>> + return hmap->buckets[num];
>> +}
>> +
>> +static inline struct hmap_node *
>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t
>> pool_size)
>> +{
>> + size_t i;
>> + for (i = start; i <= hmap->mask; i+= pool_size) {
>> + struct hmap_node *node = hmap->buckets[i];
>> + if (node) {
>> + return node;
>> + }
>> + }
>> + return NULL;
>> +}
>> +
>> +/* Returns the first node in 'hmap', as expected by thread with job_id
>> + * for parallel processing in arbitrary order, or a null pointer if
>> + * the slice of 'hmap' for that job_id is empty. */
>> +static inline struct hmap_node *
>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t
>> pool_size)
>> +{
>> + return parallel_hmap_next__(hmap, job_id, pool_size);
>> +}
>> +
>> +/* Returns the next node in the slice of 'hmap' following 'node',
>> + * in arbitrary order, or a * null pointer if 'node' is the last
>> node in
>> + * the 'hmap' slice.
>> + *
>> + */
>> +static inline struct hmap_node *
>> +parallel_hmap_next(const struct hmap *hmap,
>> + const struct hmap_node *node, ssize_t pool_size)
>> +{
>> + return (node->next
>> + ? node->next
>> + : parallel_hmap_next__(hmap,
>> + (node->hash & hmap->mask) + pool_size, pool_size));
>> +}
>> +
>> +static inline void post_completed_work(struct worker_control *control)
>> +{
>> + atomic_thread_fence(memory_order_acq_rel);
>> + atomic_store_relaxed(&control->finished, true);
>> + sem_post(control->done);
>> +}
>> +
>> +static inline void wait_for_work(struct worker_control *control)
>> +{
>> + sem_wait(control->fire);
>> +}
>> +
>> +/* Hash per-row locking support - to be used only in conjunction
>> + * with fast hash inserts. Normal hash inserts may resize the hash
>> + * rendering the locking invalid.
>> + */
>> +
>> +struct hashrow_locks {
>> + ssize_t mask;
>> + struct ovs_mutex *row_locks;
>> +};
>> +
>> +/* Update an hash row locks structure to match the current hash size */
>> +
>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct
>> hashrow_locks *hrl);
>> +
>> +/* Lock a hash row */
>> +
>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t
>> hash)
>> +{
>> + ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +
>> +/* Unlock a hash row */
>> +
>> +static inline void unlock_hash_row(struct hashrow_locks *hrl,
>> uint32_t hash)
>> +{
>> + ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]);
>> +}
>> +/* Init the row locks structure */
>> +
>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl)
>> +{
>> + hrl->mask = 0;
>> + hrl->row_locks = NULL;
>> +}
>> +
>> +bool ovn_can_parallelize_hashes(bool force_parallel);
>> +
>> +/* Use the OVN library functions for stuff which OVS has not defined
>> + * If OVS has defined these, they will still compile using the OVN
>> + * local names, but will be dropped by the linker in favour of the OVS
>> + * supplied functions.
>> + */
>> +
>> +#define update_hashrow_locks(lflows, hrl)
>> ovn_update_hashrow_locks(lflows, hrl)
>> +
>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>> +
>> +#define stop_parallel_processing() ovn_stop_parallel_processing()
>> +
>> +#define add_worker_pool(start) ovn_add_worker_pool(start)
>> +
>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap,
>> size)
>> +
>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
>> +
>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
>> +
>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
>> +
>> +#define ovn_run_pool(pool) ovn_run_pool(pool)
>> +
>> +#define run_pool_hash(pool, result, result_frags) \
>> + ovn_run_pool_hash(pool, result, result_frags)
>> +
>> +#define run_pool_list(pool, result, result_frags) \
>> + ovn_run_pool_list(pool, result, result_frags)
>> +
>> +#define run_pool_callback(pool, fin_result, result_frags,
>> helper_func) \
>> + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
>> +
>> +
>> +
>> +#ifdef __clang__
>> +#pragma clang diagnostic pop
>> +#endif
>> +
>> +#endif
>> +
>> +#ifdef __cplusplus
>> +}
>> +#endif
>> +
>> +
>> +#endif /* lib/fasthmap.h */
>>
>
>
--
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/
More information about the dev
mailing list