[ovs-dev] [OVN Patch v10 1/4] ovn-libs: Add support for parallel processing
Mark Michelson
mmichels at redhat.com
Thu Jan 14 13:46:48 UTC 2021
I see that Ilya has some comments, so those need addressing, but aside
from that, I reviewed and
Acked-by: Mark Michelson <mmichels at redhat.com>
On 1/11/21 4:51 AM, anton.ivanov at cambridgegreys.com wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> This adds a set of functions and macros intended to process
> hashes in parallel.
>
> The principles of operation are documented in the fasthmap.h
>
> If these one day go into the OVS tree, the OVS tree versions
> would be used in preference.
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> ---
> lib/automake.mk | 2 +
> lib/fasthmap.c | 281 ++++++++++++++++++++++++++++++++++++++++++++++++
> lib/fasthmap.h | 206 +++++++++++++++++++++++++++++++++++
> 3 files changed, 489 insertions(+)
> create mode 100644 lib/fasthmap.c
> create mode 100644 lib/fasthmap.h
>
> diff --git a/lib/automake.mk b/lib/automake.mk
> index 250c7aefa..d7e4b20cf 100644
> --- a/lib/automake.mk
> +++ b/lib/automake.mk
> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \
> lib/expr.c \
> lib/extend-table.h \
> lib/extend-table.c \
> + lib/fasthmap.h \
> + lib/fasthmap.c \
> lib/ip-mcast-index.c \
> lib/ip-mcast-index.h \
> lib/mcast-group-index.c \
> diff --git a/lib/fasthmap.c b/lib/fasthmap.c
> new file mode 100644
> index 000000000..3096c90d3
> --- /dev/null
> +++ b/lib/fasthmap.c
> @@ -0,0 +1,281 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#include <config.h>
> +#include <stdint.h>
> +#include <string.h>
> +#include <stdlib.h>
> +#include <unistd.h>
> +#include <semaphore.h>
> +#include "fatal-signal.h"
> +#include "util.h"
> +#include "openvswitch/vlog.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "fasthmap.h"
> +#include "ovs-atomic.h"
> +#include "ovs-thread.h"
> +#include "ovs-numa.h"
> +
> +VLOG_DEFINE_THIS_MODULE(fasthmap);
> +
> +
> +static bool worker_pool_setup = false;
> +static bool workers_must_exit = false;
> +static bool can_parallelize = false;
> +
> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools);
> +
> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
> +
> +static int pool_size;
> +
> +static void worker_pool_hook(void *aux OVS_UNUSED) {
> + int i;
> + static struct worker_pool *pool;
> + workers_must_exit = true; /* all workers must honour this flag */
> + atomic_thread_fence(memory_order_release);
> + LIST_FOR_EACH (pool, list_node, &worker_pools) {
> + for (i = 0; i < pool->size ; i++) {
> + sem_post(&pool->controls[i].fire);
> + }
> + }
> +}
> +
> +static void setup_worker_pools(void) {
> + int cores, nodes;
> +
> + nodes = ovs_numa_get_n_numas();
> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> + nodes = 1;
> + }
> + cores = ovs_numa_get_n_cores();
> +
> + /* If there is no NUMA config, use 4 cores.
> + * If there is NUMA config use half the cores on
> + * one node so that the OS does not start pushing
> + * threads to other nodes.
> + */
> + if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> + /* If there is no NUMA we can try the ovs-threads routine.
> + * It falls back to sysconf and/or affinity mask.
> + */
> + cores = count_cpu_cores();
> + pool_size = cores;
> + } else {
> + pool_size = cores / nodes;
> + }
> + if (pool_size > 16) {
> + pool_size = 16;
> + }
> + can_parallelize = (pool_size >= 3);
> + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> + worker_pool_setup = true;
> +}
> +
> +bool ovn_cease_fire(void)
> +{
> + return workers_must_exit;
> +}
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){
> +
> + struct worker_pool *new_pool = NULL;
> + struct worker_control *new_control;
> + int i;
> +
> + ovs_mutex_lock(&init_mutex);
> +
> + if (!worker_pool_setup) {
> + setup_worker_pools();
> + }
> +
> + if (can_parallelize) {
> + new_pool = xmalloc(sizeof(struct worker_pool));
> + new_pool->size = pool_size;
> + sem_init(&new_pool->done, 0, 0);
> +
> + ovs_list_push_back(&worker_pools, &new_pool->list_node);
> +
> + new_pool->controls =
> + xmalloc(sizeof(struct worker_control) * new_pool->size);
> +
> + for (i = 0; i < new_pool->size; i++) {
> + new_control = &new_pool->controls[i];
> + sem_init(&new_control->fire, 0, 0);
> + new_control->id = i;
> + new_control->done = &new_pool->done;
> + new_control->data = NULL;
> + ovs_mutex_init(&new_control->mutex);
> + new_control->finished = ATOMIC_VAR_INIT(false);
> + }
> +
> + for (i = 0; i < pool_size; i++) {
> + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]);
> + }
> + }
> + ovs_mutex_unlock(&init_mutex);
> + return new_pool;
> +}
> +
> +
> +/* Initializes 'hmap' as an empty hash table with mask N. */
> +void
> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> +{
> + size_t i;
> +
> + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1));
> + hmap->one = NULL;
> + hmap->mask = mask;
> + hmap->n = 0;
> + for (i = 0; i <= hmap->mask; i++) {
> + hmap->buckets[i] = NULL;
> + }
> +}
> +
> +/* Initializes 'hmap' as an empty hash table of size X.
> + * Intended for use in parallel processing so that all
> + * fragments used to store results in a parallel job
> + * are the same size.
> + */
> +void
> +ovn_fast_hmap_size_for(struct hmap *hmap, int size)
> +{
> + size_t mask;
> + mask = size / 2;
> + mask |= mask >> 1;
> + mask |= mask >> 2;
> + mask |= mask >> 4;
> + mask |= mask >> 8;
> + mask |= mask >> 16;
> +#if SIZE_MAX > UINT32_MAX
> + mask |= mask >> 32;
> +#endif
> +
> + /* If we need to dynamically allocate buckets we might as well allocate at
> + * least 4 of them. */
> + mask |= (mask & 1) << 1;
> +
> + fast_hmap_init(hmap, mask);
> +}
> +
> +/* Run a thread pool which uses a callback function to process results
> + */
> +
> +void ovn_run_pool_callback(
> + struct worker_pool *pool,
> + void *fin_result,
> + void *result_frags,
> + void (*helper_func)(struct worker_pool *pool,
> + void *fin_result, void *result_frags, int index))
> +{
> + int index, completed;
> +
> + atomic_thread_fence(memory_order_release);
> +
> + for (index = 0; index < pool->size; index++) {
> + sem_post(&pool->controls[index].fire);
> + }
> +
> + completed = 0;
> +
> + do {
> + bool test;
> + sem_wait(&pool->done);
> + for (index = 0; index < pool->size; index++) {
> + test = true;
> + if (atomic_compare_exchange_weak(
> + &pool->controls[index].finished,
> + &test,
> + false)) {
> + if (helper_func) {
> + (helper_func)(pool, fin_result, result_frags, index);
> + }
> + completed++;
> + pool->controls[index].data = NULL;
> + }
> + }
> + } while (completed < pool->size);
> +}
> +
> +/* Run a thread pool - basic, does not do results processing.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool)
> +{
> + ovn_run_pool_callback(pool, NULL, NULL, NULL);
> +}
> +
> +/* Brute force merge of a hashmap into another hashmap.
> + * Intended for use in parallel processing. The destination
> + * hashmap MUST be the same size as the one being merged.
> + *
> + * This can be achieved by pre-allocating them to correct size
> + * and using hmap_insert_fast() instead of hmap_insert()
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
> +{
> + size_t i;
> +
> + ovs_assert(inc->mask == dest->mask);
> +
> + if (!inc->n) {
> + /* Request to merge an empty frag, nothing to do */
> + return;
> + }
> +
> + for (i = 0; i <= dest->mask; i++) {
> + struct hmap_node **dest_bucket = &dest->buckets[i];
> + struct hmap_node **inc_bucket = &inc->buckets[i];
> + if (*inc_bucket != NULL) {
> + struct hmap_node *last_node = *inc_bucket;
> + while (last_node->next != NULL) {
> + last_node = last_node->next;
> + }
> + last_node->next = *dest_bucket;
> + *dest_bucket = *inc_bucket;
> + *inc_bucket = NULL;
> + }
> + }
> + dest->n += inc->n;
> + inc->n = 0;
> +}
> +
> +/* Run a thread pool which gathers results in an array
> + * of hashes. Merge results.
> + */
> +
> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> + void *fin_result, void *result_frags, int index)
> +{
> + struct hmap *result = (struct hmap *)fin_result;
> + struct hmap *res_frags = (struct hmap *)result_frags;
> +
> + fast_hmap_merge(result, &res_frags[index]);
> + hmap_destroy(&res_frags[index]);
> +}
> +
> +
> +void ovn_run_pool_hash(
> + struct worker_pool *pool,
> + struct hmap *result,
> + struct hmap *result_frags)
> +{
> + ovn_run_pool_callback(pool, result, result_frags, merge_hash_results);
> +}
> diff --git a/lib/fasthmap.h b/lib/fasthmap.h
> new file mode 100644
> index 000000000..2a28553d5
> --- /dev/null
> +++ b/lib/fasthmap.h
> @@ -0,0 +1,206 @@
> +/*
> + * Copyright (c) 2020 Red Hat, Inc.
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License");
> + * you may not use this file except in compliance with the License.
> + * You may obtain a copy of the License at:
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +#ifndef OVN_PARALLEL_HMAP
> +#define OVN_PARALLEL_HMAP 1
> +
> +/* Process this include only if OVS does not supply parallel definitions
> + */
> +
> +#ifndef OVS_HAS_PARALLEL_HMAP
> +
> +/* if the parallel macros are defined by hmap.h or any other ovs define
> + * we skip over the ovn specific definitions.
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdbool.h>
> +#include <stdlib.h>
> +#include <semaphore.h>
> +#include "openvswitch/util.h"
> +#include "openvswitch/hmap.h"
> +#include "openvswitch/thread.h"
> +#include "ovs-atomic.h"
> +
> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part
> + * of parallel processing.
> + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE
> + * and will iterate hash buckets ThreadID, ThreadID + step,
> + * ThreadID + step * 2, etc. The actual macro accepts
> + * ThreadID + step * i as the JOBID parameter.
> + */
> +
> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \
> + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \
> + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \
> + || ((NODE = NULL), false); \
> + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER))
> +
> +/* We do not have a SAFE version of the macro, because the hash size is not
> + * atomic and hash removal operations would need to be wrapped with
> + * locks. This will defeat most of the benefits from doing anything in
> + * parallel.
> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements,
> + * each thread should store them in a temporary list result instead, merging
> + * the lists into a combined result at the end */
> +
> +/* Work "Handle" */
> +
> +struct worker_control {
> + int id; /* Used as a modulo when iterating over a hash. */
> + atomic_bool finished; /* Set to true after achunk of work is complete. */
> + sem_t fire; /* Work start semaphore - sem_post starts the worker. */
> + sem_t *done; /* Work completion semaphore - sem_post on completion. */
> + struct ovs_mutex mutex; /* Guards the data. */
> + void *data; /* Pointer to data to be processed. */
> + void *workload; /* back-pointer to the worker pool structure. */
> +};
> +
> +struct worker_pool {
> + int size; /* Number of threads in the pool. */
> + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> + struct worker_control *controls; /* "Handles" in this pool. */
> + sem_t done; /* Work completion semaphorew. */
> +};
> +
> +/* Add a worker pool for thread function start() which expects a pointer to
> + * a worker_control structure as an argument. */
> +
> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +
> +/* Setting this to true will make all processing threads exit */
> +
> +bool ovn_cease_fire(void);
> +
> +/* Build a hmap pre-sized for size elements */
> +
> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size);
> +
> +/* Build a hmap with a mask equals to size */
> +
> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size);
> +
> +/* Brute-force merge a hmap into hmap.
> + * Dest and inc have to have the same mask. The merge is performed
> + * by extending the element list for bucket N in the dest hmap with the list
> + * from bucket N in inc.
> + */
> +
> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc);
> +
> +/* Run a pool, without any default processing of results.
> + */
> +
> +void ovn_run_pool(struct worker_pool *pool);
> +
> +/* Run a pool, merge results from hash frags into a final hash result.
> + * The hash frags must be pre-sized to the same size.
> + */
> +
> +void ovn_run_pool_hash(struct worker_pool *pool,
> + struct hmap *result, struct hmap *result_frags);
> +
> +/* Run a pool, call a callback function to perform processing of results.
> + */
> +
> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> + void *result_frags,
> + void (*helper_func)(struct worker_pool *pool,
> + void *fin_result, void *result_frags, int index));
> +
> +
> +/* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> + * would land, or a null pointer if that bucket is empty. */
> +
> +static inline struct hmap_node *
> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num)
> +{
> + return hmap->buckets[num];
> +}
> +
> +static inline struct hmap_node *
> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size)
> +{
> + size_t i;
> + for (i = start; i <= hmap->mask; i+= pool_size) {
> + struct hmap_node *node = hmap->buckets[i];
> + if (node) {
> + return node;
> + }
> + }
> + return NULL;
> +}
> +
> +/* Returns the first node in 'hmap', as expected by thread with job_id
> + * for parallel processing in arbitrary order, or a null pointer if
> + * the slice of 'hmap' for that job_id is empty. */
> +static inline struct hmap_node *
> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size)
> +{
> + return parallel_hmap_next__(hmap, job_id, pool_size);
> +}
> +
> +/* Returns the next node in the slice of 'hmap' following 'node',
> + * in arbitrary order, or a * null pointer if 'node' is the last node in
> + * the 'hmap' slice.
> + *
> + */
> +static inline struct hmap_node *
> +parallel_hmap_next(const struct hmap *hmap,
> + const struct hmap_node *node, ssize_t pool_size)
> +{
> + return (node->next
> + ? node->next
> + : parallel_hmap_next__(hmap,
> + (node->hash & hmap->mask) + pool_size, pool_size));
> +}
> +
> +/* Use the OVN library functions for stuff which OVS has not defined
> + * If OVS has defined these, they will still compile using the OVN
> + * local names, but will be dropped by the linker in favour of the OVS
> + * supplied functions.
> + */
> +
> +#define cease_fire() ovn_cease_fire()
> +
> +#define add_worker_pool(start) ovn_add_worker_pool(start)
> +
> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size)
> +
> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size)
> +
> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc)
> +
> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc)
> +
> +#define ovn_run_pool(pool) ovn_run_pool(pool)
> +
> +#define run_pool_hash(pool, result, result_frags) \
> + ovn_run_pool_hash(pool, result, result_frags)
> +
> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \
> + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func)
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> +
> +#endif /* lib/fasthmap.h */
>
More information about the dev
mailing list