[ovs-dev] [OVN Patch v13 3/4] ovn-northd: Introduce parallel lflow build

Anton Ivanov anton.ivanov at cambridgegreys.com
Wed Feb 10 17:12:37 UTC 2021


OK, found it.

For some reason you have a sem_open() failing and returning NULL.

I did not add a check in the semaphore initialization for this case after switching to named from anonymous semaphores. Anonymous always succeed so it was unnecessary. Unfortunately, Apple in their infinite wisdom has marked them as deprecated as so I had to switch to named.

I cannot reproduce it locally with CLANG and I cannot think of case where it will happen with CLANG and not happen with GCC. In fact, I cannot think of a case where a uniquely named semaphore open with O_CREAT will fail on a standard Linux setup.

I will add some error handling to that + a suitable WARN message and prohibit parallel processing if the initialization has failed.

A.


I frankly, cannot

On 10/02/2021 15:34, Numan Siddique wrote:
> On Fri, Jan 29, 2021 at 4:52 PM <anton.ivanov at cambridgegreys.com> wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> Datapaths, ports, igmp groups and load balancers can now
>> be iterated over in parallel in order to speed up the lflow
>> generation. This decreases the time needed to generate the
>> logical flows by a factor of 4+ on a 6 core/12 thread CPU
>> without datapath groups - from 0.8-1 microseconds per flow
>> down to 0.2-0.3 microseconds per flow on average.
>>
>> The decrease in time to compute lflows with datapath groups
>> enabled is ~2 times for the same hardware - from an average of
>> 2.4 microseconds per flow to 1.2 microseconds per flow.
>>
>> Tested for on an 8 node, 400 pod K8 simulation resulting
>> in > 6K flows.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
> Hi Anton,
>
> I tested the first 3 patches applying  on top of the latest master.
> (The patch series
> needs a rebase though). I noticed a few issues with clang.
>
> ovn-northd is crashing with the below stack trace
>
> ----
> (gdb) bt
> #0  0x00007fb5f7cca212 in __new_sem_wait_slow.constprop.0 () from
> /lib64/libpthread.so.0
> #1  0x0000000000421724 in wait_for_work (control=0x1962740) at
> ./lib/ovn-parallel-hmap.h:199
> #2  build_lflows_thread (arg=0x1962740) at northd/ovn-northd.c:11545
> #3  0x000000000049bb22 in ovsthread_wrapper (aux_=<optimized out>) at
> ../lib/ovs-thread.c:383
> #4  0x00007fb5f7cc13f9 in start_thread () from /lib64/libpthread.so.0
> #5  0x00007fb5f795e903 in clone () from /lib64/libc.so.6
> -----
>
> I pushed a commit to my github repo force enabling parallel
> computation and I see many tests fail
> and the reason is this crash. You can download the test log tar file
> and inspect it if you want.
>
> But the crash is seen 100% of the time on my local machine.
>
> Please see below for a few comments.
>
> Thanks
> Numan
>
>> ---
>>   northd/ovn-northd.c | 319 ++++++++++++++++++++++++++++++++++++--------
>>   1 file changed, 266 insertions(+), 53 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index f36640061..992292562 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -39,6 +39,7 @@
>>   #include "lib/ovn-util.h"
>>   #include "lib/lb.h"
>>   #include "memory.h"
>> +#include "lib/ovn-parallel-hmap.h"
>>   #include "ovn/actions.h"
>>   #include "ovn/logical-fields.h"
>>   #include "packets.h"
>> @@ -3962,6 +3963,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>>   /* If this option is 'true' northd will combine logical flows that differs by
>>    * logical datapath only by creating a datapah group. */
>>   static bool use_logical_dp_groups = false;
>> +static bool use_parallel_build = true;
>> +
>> +static struct hashrow_locks lflow_locks;
>> +
>> +/* Adds a row with the specified contents to the Logical_Flow table.
>> + * Version to use when locking is required.
>> + */
>> +static void
>> +do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
>> +                        struct ovn_datapath *od,
>> +                        uint32_t hash, struct ovn_lflow *lflow)
> Please fix the indentation here.
>
>> +{
>> +
>> +    struct ovn_lflow *old_lflow;
>> +
>> +    if (shared && use_logical_dp_groups) {
>> +        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> +        if (old_lflow) {
>> +            ovn_lflow_destroy(NULL, lflow);
>> +            hmapx_add(&old_lflow->od_group, od);
>> +            return;
>> +        }
>> +    }
>> +
>> +    hmapx_add(&lflow->od_group, od);
>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> +}
>> +
>>
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>>   static void
>> @@ -3972,7 +4001,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>>   {
>>       ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
>>
>> -    struct ovn_lflow *old_lflow, *lflow;
>> +    struct ovn_lflow *lflow;
>>       uint32_t hash;
>>
>>       lflow = xmalloc(sizeof *lflow);
>> @@ -3984,17 +4013,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct ovn_datapath *od,
>>                      ovn_lflow_hint(stage_hint), where);
>>
>>       hash = ovn_lflow_hash(lflow);
>> -    if (shared && use_logical_dp_groups) {
>> -        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> -        if (old_lflow) {
>> -            ovn_lflow_destroy(NULL, lflow);
>> -            hmapx_add(&old_lflow->od_group, od);
>> -            return;
>> -        }
>> -    }
>>
>> -    hmapx_add(&lflow->od_group, od);
>> -    hmap_insert(lflow_map, &lflow->hmap_node, hash);
>> +    if (use_logical_dp_groups && use_parallel_build) {
>> +        lock_hash_row(&lflow_locks, hash);
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +        unlock_hash_row(&lflow_locks, hash);
>> +    } else {
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +    }
>>   }
>>
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -7136,6 +7162,8 @@ build_lswitch_ip_mcast_igmp_mld(struct ovn_igmp_group *igmp_group,
>>       }
>>   }
>>
>> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
>> +
>>   /* Ingress table 19: Destination lookup, unicast handling (priority 50), */
>>   static void
>>   build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>> @@ -7174,7 +7202,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>>                                           &op->nbsp->header_);
>>               } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
>>                   if (lsp_is_enabled(op->nbsp)) {
>> +                    ovs_mutex_lock(&mcgroup_mutex);
>>                       ovn_multicast_add(mcgroups, &mc_unknown, op);
>> +                    ovs_mutex_unlock(&mcgroup_mutex);
>>                       op->od->has_unknown = true;
>>                   }
>>               } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
>> @@ -11488,6 +11518,120 @@ build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                   &lsi->match, &lsi->actions);
>>   }
>>
>> +struct lflows_thread_pool {
>> +    struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg)
>> +{
>> +    struct worker_control *control = (struct worker_control *) arg;
>> +    struct lflows_thread_pool *workload;
>> +    struct lswitch_flow_build_info *lsi;
>> +
>> +    struct ovn_datapath *od;
>> +    struct ovn_port *op;
>> +    struct ovn_northd_lb *lb;
>> +    struct ovn_igmp_group *igmp_group;
>> +    int bnum;
>> +
>> +    while (!stop_parallel_processing()) {
>> +        wait_for_work(control);
> The crash seems to be here.
>
>> +        workload = (struct lflows_thread_pool *) control->workload;
>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>> +        if (lsi && workload) {
>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->datapaths->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, lsi->datapaths) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->ports->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->lbs->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
>> +                                                         &lsi->match,
>> +                                                         &lsi->actions);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->igmp_groups->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_ip_mcast_igmp_mld(igmp_group, lsi->lflows,
>> +                                                    &lsi->match,
>> +                                                    &lsi->actions);
>> +                }
>> +            }
>> +        }
>> +        post_completed_work(control);
>> +    }
>> +    return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> +    int index;
>> +
>> +    if (!pool_init_done) {
>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> +        pool_init_done = true;
> I see a problem here. Lets say add_worker_pool() returns NULL for some
> reason. Then we don't allocate memory for 'build_lflows_pool', but the
> function build_lswitch_and_lrouter_flows() after calling
> init_lflows_thread_pool()
> accesses 'build_lflows_pool'.
>
> Since init_lflows_thread_pool() is called only if 'use_parallel_build' is set,
> I'd expect add_worker_pool() to not return NULL.
>
> So I'd suggest adding
> ovs_assert(pool) here.
>
>> +        if (pool) {
>> +            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> +            build_lflows_pool->pool = pool;
>> +            for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +                build_lflows_pool->pool->controls[index].workload =
>> +                    build_lflows_pool;
>> +            }
>> +        }
>> +    }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +static void
>> +noop_callback(struct worker_pool *pool OVS_UNUSED,
>> +              void *fin_result OVS_UNUSED,
>> +              void *result_frags OVS_UNUSED,
>> +              int index OVS_UNUSED)
>> +{
>> +    /* Do nothing */
>> +}
>> +
>> +
>>   static void
>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct hmap *port_groups, struct hmap *lflows,
>> @@ -11496,53 +11640,108 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
>>                                   struct shash *meter_groups, struct hmap *lbs,
>>                                   struct hmap *bfd_connections)
>>   {
>> -    struct ovn_datapath *od;
>> -    struct ovn_port *op;
>> -    struct ovn_northd_lb *lb;
>> -    struct ovn_igmp_group *igmp_group;
>>
>>       char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>
>> -    struct lswitch_flow_build_info lsi = {
>> -        .datapaths = datapaths,
>> -        .ports = ports,
>> -        .port_groups = port_groups,
>> -        .lflows = lflows,
>> -        .mcgroups = mcgroups,
>> -        .igmp_groups = igmp_groups,
>> -        .meter_groups = meter_groups,
>> -        .lbs = lbs,
>> -        .bfd_connections = bfd_connections,
>> -        .svc_check_match = svc_check_match,
>> -        .match = DS_EMPTY_INITIALIZER,
>> -        .actions = DS_EMPTY_INITIALIZER,
>> -    };
>> +    if (use_parallel_build) {
>> +        init_lflows_thread_pool();
>> +        struct hmap *lflow_segs;
>> +        struct lswitch_flow_build_info *lsiv;
>> +        int index;
>>
>> -    /* Combined build - all lflow generation from lswitch and lrouter
>> -     * will move here and will be reogranized by iterator type.
>> -     */
>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (op, key_node, ports) {
>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> -        build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> -                                             &lsi.actions,
>> -                                             &lsi.match);
>> -    }
>> -    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> -        build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> -                                        lsi.lflows,
>> -                                        &lsi.actions,
>> -                                        &lsi.match);
>> -    }
>> -    free(svc_check_match);
>> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> +        if (use_logical_dp_groups) {
>> +            lflow_segs = NULL;
>> +        } else {
>> +            lflow_segs = xcalloc(sizeof(*lflow_segs), build_lflows_pool->pool->size);
>> +        }
>> +
>> +        /* Set up "work chunks" for each thread to work on. */
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            if (use_logical_dp_groups) {
>> +                /* if dp_groups are in use we lock a shared lflows hash
>> +                 * on a per-bucket level instead of merging hash frags */
>> +                lsiv[index].lflows = lflows;
>> +            } else {
>> +                fast_hmap_init(&lflow_segs[index], lflows->mask);
>> +                lsiv[index].lflows = &lflow_segs[index];
>> +            }
>> +
>> +            lsiv[index].datapaths = datapaths;
>> +            lsiv[index].ports = ports;
>> +            lsiv[index].port_groups = port_groups;
>> +            lsiv[index].mcgroups = mcgroups;
>> +            lsiv[index].igmp_groups = igmp_groups;
>> +            lsiv[index].meter_groups = meter_groups;
>> +            lsiv[index].lbs = lbs;
>> +            lsiv[index].bfd_connections = bfd_connections;
>> +            lsiv[index].svc_check_match = svc_check_match;
>> +            ds_init(&lsiv[index].match);
>> +            ds_init(&lsiv[index].actions);
>> +
>> +            build_lflows_pool->pool->controls[index].data = &lsiv[index];
>> +        }
>> +
>> +        /* Run thread pool. */
>> +        if (use_logical_dp_groups) {
>> +            run_pool_callback(build_lflows_pool->pool, NULL, NULL, noop_callback);
> Since noop_callback() does nothing, I think it's better to call -
> ovn_run_pool() directly.
>
>
>> +        } else {
>> +            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +        }
>> +
>> +        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +            ds_destroy(&lsiv[index].match);
>> +            ds_destroy(&lsiv[index].actions);
>> +        }
>> +        free(lflow_segs);
>> +        free(lsiv);
>> +    } else {
>> +        struct ovn_datapath *od;
>> +        struct ovn_port *op;
>> +        struct ovn_northd_lb *lb;
>> +        struct ovn_igmp_group *igmp_group;
>> +        struct lswitch_flow_build_info lsi = {
>> +            .datapaths = datapaths,
>> +            .ports = ports,
>> +            .port_groups = port_groups,
>> +            .lflows = lflows,
>> +            .mcgroups = mcgroups,
>> +            .igmp_groups = igmp_groups,
>> +            .meter_groups = meter_groups,
>> +            .lbs = lbs,
>> +            .bfd_connections = bfd_connections,
>> +            .svc_check_match = svc_check_match,
>> +            .match = DS_EMPTY_INITIALIZER,
>> +            .actions = DS_EMPTY_INITIALIZER,
>> +        };
>> +
>> +        /* Combined build - all lflow generation from lswitch and lrouter
>> +         * will move here and will be reogranized by iterator type.
>> +         */
>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (op, key_node, ports) {
>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> +            build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> +                                                 &lsi.actions,
>> +                                                 &lsi.match);
>> +        }
>> +        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> +            build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> +                                            lsi.lflows,
>> +                                            &lsi.actions,
>> +                                            &lsi.match);
>> +        }
>>
>> -    ds_destroy(&lsi.match);
>> -    ds_destroy(&lsi.actions);
>> +        ds_destroy(&lsi.match);
>> +        ds_destroy(&lsi.actions);
>> +    }
>>
>> +    free(svc_check_match);
>>       build_lswitch_flows(datapaths, lflows);
>>   }
>>
>> @@ -11613,6 +11812,8 @@ ovn_sb_set_lflow_logical_dp_group(
>>       sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
>>   }
>>
>> +static ssize_t max_seen_lflow_size = 128;
>> +
>>   /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
>>    * constructing their contents based on the OVN_NB database. */
>>   static void
>> @@ -11622,13 +11823,21 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
>>                struct shash *meter_groups,
>>                struct hmap *lbs, struct hmap *bfd_connections)
>>   {
>> -    struct hmap lflows = HMAP_INITIALIZER(&lflows);
>> +    struct hmap lflows;
>>
>> +    fast_hmap_size_for(&lflows, max_seen_lflow_size);
>> +    if (use_parallel_build) {
>> +        update_hashrow_locks(&lflows, &lflow_locks);
>> +    }
>>       build_lswitch_and_lrouter_flows(datapaths, ports,
>>                                       port_groups, &lflows, mcgroups,
>>                                       igmp_groups, meter_groups, lbs,
>>                                       bfd_connections);
>>
>> +    if (hmap_count(&lflows) > max_seen_lflow_size) {
>> +        max_seen_lflow_size = hmap_count(&lflows);
>> +    }
>> +
>>       /* Collecting all unique datapath groups. */
>>       struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups);
>>       struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows);
>> @@ -13394,6 +13603,9 @@ main(int argc, char *argv[])
>>
>>       daemonize_complete();
>>
>> +    init_hash_row_locks(&lflow_locks);
> Looks like lflow_locks is not freed during the exit. To make valgrind
> / Address saniterz happy
> I would suggest to free the lflow_locks during exit.
>
>> +    use_parallel_build = can_parallelize_hashes();
>> +
>>       /* We want to detect (almost) all changes to the ovn-nb db. */
>>       struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
>>           ovsdb_idl_create(ovnnb_db, &nbrec_idl_class, true, true));
>> @@ -13656,6 +13868,7 @@ main(int argc, char *argv[])
>>       exiting = false;
>>       state.had_lock = false;
>>       state.paused = false;
>> +
>>       while (!exiting) {
>>           memory_run();
>>           if (memory_should_report()) {
>> --
>> 2.20.1
>>
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>>
-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/



More information about the dev mailing list