[ovs-dev] [OVN Patch v12 3/4] ovn-northd: Introduce parallel lflow build

Anton Ivanov anton.ivanov at cambridgegreys.com
Mon Jan 25 21:10:36 UTC 2021


On 25/01/2021 20:56, Dumitru Ceara wrote:
> On 1/15/21 4:30 PM, anton.ivanov at cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> Datapaths, ports, igmp groups and load balancers can now
>> be iterated over in parallel in order to speed up the lflow
>> generation. This decreases the time needed to generate the
>> logical flows by a factor of 4+ on a 6 core/12 thread CPU
>> without datapath groups - from 0.8-1 microseconds per flow
>> down to 0.2-0.3 microseconds per flow on average.
>>
>> The decrease in time to compute lflows with datapath groups
>> enabled is ~2 times for the same hardware - from an average of
>> 2.4 microseconds per flow to 1.2 microseconds per flow.
>>
>> Tested for on an 8 node, 400 pod K8 simulation resulting
>> in > 6K flows.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>> ---
> 
> Hi Anton,
> 
> Again, not a full review, but some initial remarks, most of them I think 
> related to the fact that we're leaking the fast-hmap abstraction details.
> 
>>   northd/ovn-northd.c | 334 +++++++++++++++++++++++++++++++++++++-------
>>   1 file changed, 281 insertions(+), 53 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index dda033543..53e0cc50d 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -37,6 +37,7 @@
>>   #include "lib/ovn-sb-idl.h"
>>   #include "lib/ovn-util.h"
>>   #include "lib/lb.h"
>> +#include "lib/fasthmap.h"
>>   #include "ovn/actions.h"
>>   #include "ovn/logical-fields.h"
>>   #include "packets.h"
>> @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct 
>> ovn_datapath *od,
>>   /* If this option is 'true' northd will combine logical flows that 
>> differs by
>>    * logical datapath only by creating a datapah group. */
>>   static bool use_logical_dp_groups = false;
>> +static bool use_parallel_build = true;
>> +
>> +static struct ovs_mutex *slice_locks = NULL;
> 
> I'm not very sure why the locking details have to be known by the user 
> of the fasthmap.  Why doesn't the fasthmap library take care of 
> (re)allocating just the right amount of locks it needs?
> 
>> +
>> +/* Adds a row with the specified contents to the Logical_Flow table.
>> + * Version to use when locking is required.
>> + */
>> +static void
>> +do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
>> +                        struct ovn_datapath *od,
>> +                        uint32_t hash, struct ovn_lflow *lflow)
>> +{
>> +
>> +    struct ovn_lflow *old_lflow;
>> +
>> +    if (shared && use_logical_dp_groups) {
>> +        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> +        if (old_lflow) {
>> +            ovn_lflow_destroy(NULL, lflow);
>> +            hmapx_add(&old_lflow->od_group, od);
>> +            return;
>> +        }
>> +    }
>> +
>> +    hmapx_add(&lflow->od_group, od);
>> +    hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> +}
>> +
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>>   static void
>> @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct 
>> ovn_datapath *od,
>>   {
>>       ovs_assert(ovn_stage_to_datapath_type(stage) == 
>> ovn_datapath_get_type(od));
>> -    struct ovn_lflow *old_lflow, *lflow;
>> +    struct ovn_lflow *lflow;
>>       uint32_t hash;
>>       lflow = xmalloc(sizeof *lflow);
>> @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, 
>> struct ovn_datapath *od,
>>                      ovn_lflow_hint(stage_hint), where);
>>       hash = ovn_lflow_hash(lflow);
>> -    if (shared && use_logical_dp_groups) {
>> -        old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> -        if (old_lflow) {
>> -            ovn_lflow_destroy(NULL, lflow);
>> -            hmapx_add(&old_lflow->od_group, od);
>> -            return;
>> -        }
>> -    }
>> -    hmapx_add(&lflow->od_group, od);
>> -    hmap_insert(lflow_map, &lflow->hmap_node, hash);
>> +    if (use_logical_dp_groups && use_parallel_build) {
>> +        ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
> 
> We're using hmap internals and the fasthmap user has to make sure it 
> locked the correct slice lock.
> 
> If slice_locks were something internal to the fasthmap, an API to get 
> the corresponding lock, e.g., based on 'hash' would hide these 
> implementation details.

I can add the lock-on-slice facility there.

Though for now it has only one use case - datapath groups. The idea 
elsewhere is to use lockless "produce a result fragment" strategy in 
each worker thread and have the main thread merge the fragments.

> 
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +        ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
>> +    } else {
>> +        do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> +    }
>>   }
>>   /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct 
>> ovn_igmp_group *igmp_group,
>>       }
>>   }
>> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
>> +
>>   /* Ingress table 19: Destination lookup, unicast handling (priority 
>> 50), */
>>   static void
>>   build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>> @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port 
>> *op,
>>                                           &op->nbsp->header_);
>>               } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
>>                   if (lsp_is_enabled(op->nbsp)) {
>> +                    ovs_mutex_lock(&mcgroup_mutex);
>>                       ovn_multicast_add(mcgroups, &mc_unknown, op);
>> +                    ovs_mutex_unlock(&mcgroup_mutex);
>>                       op->od->has_unknown = true;
>>                   }
>>               } else if 
>> (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
>> @@ -11676,6 +11706,122 @@ 
>> build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>>                                   &lsi->match, &lsi->actions);
>>   }
>> +struct lflows_thread_pool {
>> +    struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg)
>> +{
>> +    struct worker_control *control = (struct worker_control *) arg;
>> +    struct lflows_thread_pool *workload;
>> +    struct lswitch_flow_build_info *lsi;
>> +
>> +    struct ovn_datapath *od;
>> +    struct ovn_port *op;
>> +    struct ovn_northd_lb *lb;
>> +    struct ovn_igmp_group *igmp_group;
>> +    int bnum;
>> +
>> +    while (!stop_parallel_processing()) {
>> +        sem_wait(&control->fire);
> 
> This feels a bit error prone.  Can all this be abstracted away in a 
> fasthmap API that executes callbacks for all fast-hmap inputs 
> (lsi->datapaths, lsi->ports, lsi->lbs, etc.) that have been defined by 
> the user code?

I need to get my head around. The stop part can.

The rest, not so much - the are iterators similar to HMAP_FOR_EACH

> 
>> +        workload = (struct lflows_thread_pool *) control->workload;
>> +        lsi = (struct lswitch_flow_build_info *) control->data;
>> +        if (lsi && workload) {
>> +            /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->datapaths->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, 
>> lsi->datapaths) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->ports->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, 
>> lsi->ports) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->lbs->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, 
>> lsi->lbs) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_arp_nd_service_monitor(lb, 
>> lsi->lflows,
>> +                                                         &lsi->match,
>> +                                                         &lsi->actions);
>> +                }
>> +            }
>> +            for (bnum = control->id;
>> +                    bnum <= lsi->igmp_groups->mask;
>> +                    bnum += workload->pool->size)
>> +            {
>> +                HMAP_FOR_EACH_IN_PARALLEL (
>> +                        igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> +                    if (stop_parallel_processing()) {
>> +                        return NULL;
>> +                    }
>> +                    build_lswitch_ip_mcast_igmp_mld(igmp_group, 
>> lsi->lflows,
>> +                                                    &lsi->match,
>> +                                                    &lsi->actions);
>> +                }
>> +            }
>> +            atomic_store_relaxed(&control->finished, true);
>> +            atomic_thread_fence(memory_order_acq_rel);
> 
> Especially this seems error prone and it would be better if it's 
> implemented inside fasthmap.c where we actually read &control->finished.

That's a different thread :) You cannot read it there.

You read control->finished in the master thread where you collect 
results, you set control->finished and do the barrier in the worker 
threads.


> 
>> +        }
>> +        sem_post(control->done);
> 
> Same here.
> 
>> +     }
>> +    return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> +    int index;
>> +
>> +    if (!pool_init_done) {
>> +        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> +        pool_init_done = true;
>> +        if (pool) {
>> +            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> +            build_lflows_pool->pool = pool;
>> +            for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +                build_lflows_pool->pool->controls[index].workload =
>> +                    build_lflows_pool;
>> +            }
>> +        }
> 
> As mentioned on patch 1/4, if pool == NULL, build_lflows_pool is always 
> NULL and ovn-northd crashes when use_parallel_build == true.
> 
>> +    }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread 
>> processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +static void
>> +noop_callback(struct worker_pool *pool OVS_UNUSED,
>> +              void *fin_result OVS_UNUSED,
>> +              void *result_frags OVS_UNUSED,
>> +              int index OVS_UNUSED)
>> +{
>> +    /* Do nothing */
>> +}
>> +
>> +
>>   static void
>>   build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap 
>> *ports,
>>                                   struct hmap *port_groups, struct 
>> hmap *lflows,
>> @@ -11684,53 +11830,108 @@ build_lswitch_and_lrouter_flows(struct hmap 
>> *datapaths, struct hmap *ports,
>>                                   struct shash *meter_groups, struct 
>> hmap *lbs,
>>                                   struct hmap *bfd_connections)
>>   {
>> -    struct ovn_datapath *od;
>> -    struct ovn_port *op;
>> -    struct ovn_northd_lb *lb;
>> -    struct ovn_igmp_group *igmp_group;
>>       char *svc_check_match = xasprintf("eth.dst == %s", 
>> svc_monitor_mac);
>> -    struct lswitch_flow_build_info lsi = {
>> -        .datapaths = datapaths,
>> -        .ports = ports,
>> -        .port_groups = port_groups,
>> -        .lflows = lflows,
>> -        .mcgroups = mcgroups,
>> -        .igmp_groups = igmp_groups,
>> -        .meter_groups = meter_groups,
>> -        .lbs = lbs,
>> -        .bfd_connections = bfd_connections,
>> -        .svc_check_match = svc_check_match,
>> -        .match = DS_EMPTY_INITIALIZER,
>> -        .actions = DS_EMPTY_INITIALIZER,
>> -    };
>> +    if (use_parallel_build) {
>> +        init_lflows_thread_pool();
>> +        struct hmap *lflow_segs;
>> +        struct lswitch_flow_build_info *lsiv;
>> +        int index;
>> -    /* Combined build - all lflow generation from lswitch and lrouter
>> -     * will move here and will be reogranized by iterator type.
>> -     */
>> -    HMAP_FOR_EACH (od, key_node, datapaths) {
>> -        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (op, key_node, ports) {
>> -        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> -    }
>> -    HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> -        build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> -                                             &lsi.actions,
>> -                                             &lsi.match);
>> -    }
>> -    HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> -        build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> -                                        lsi.lflows,
>> -                                        &lsi.actions,
>> -                                        &lsi.match);
>> -    }
>> -    free(svc_check_match);
>> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> +        if (use_logical_dp_groups) {
>> +            lflow_segs = NULL;
>> +        } else {
>> +            lflow_segs = xcalloc(sizeof(*lflow_segs), 
>> build_lflows_pool->pool->size);
>> +        }
>> +
>> +        /* Set up "work chunks" for each thread to work on. */
>> -    ds_destroy(&lsi.match);
>> -    ds_destroy(&lsi.actions);
>> +        for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +            if (use_logical_dp_groups) {
>> +                /* if dp_groups are in use we lock a shared lflows hash
>> +                 * on a per-bucket level instead of merging hash 
>> frags */
>> +                lsiv[index].lflows = lflows;
>> +            } else {
>> +                fast_hmap_init(&lflow_segs[index], lflows->mask);
> 
> I guess I should've mentioned this on patch 1/4 but now it became more 
> obvious that fast_hmap_*() APIs work on 'struct hmap' objects directly. 
>   It makes it very easy to end up mixing thread safe (i.e., fast_hmap*) 
> and regular hmap_*() APIs which might lead to unexpected results.
> 
>> +                lsiv[index].lflows = &lflow_segs[index];
>> +            }
>> +
>> +            lsiv[index].datapaths = datapaths;
>> +            lsiv[index].ports = ports;
>> +            lsiv[index].port_groups = port_groups;
>> +            lsiv[index].mcgroups = mcgroups;
>> +            lsiv[index].igmp_groups = igmp_groups;
>> +            lsiv[index].meter_groups = meter_groups;
>> +            lsiv[index].lbs = lbs;
>> +            lsiv[index].bfd_connections = bfd_connections;
>> +            lsiv[index].svc_check_match = svc_check_match;
>> +            ds_init(&lsiv[index].match);
>> +            ds_init(&lsiv[index].actions);
>> +
>> +            build_lflows_pool->pool->controls[index].data = 
>> &lsiv[index];
>> +        }
>> +
>> +        /* Run thread pool. */
>> +        if (use_logical_dp_groups) {
>> +            run_pool_callback(build_lflows_pool->pool, NULL, NULL, 
>> noop_callback);
>> +        } else {
>> +            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +        }
>> +        for (index = 0; index < build_lflows_pool->pool->size; 
>> index++) {
>> +            ds_destroy(&lsiv[index].match);
>> +            ds_destroy(&lsiv[index].actions);
>> +        }
>> +        free(lflow_segs);
>> +        free(lsiv);
>> +    } else {
>> +        struct ovn_datapath *od;
>> +        struct ovn_port *op;
>> +        struct ovn_northd_lb *lb;
>> +        struct ovn_igmp_group *igmp_group;
>> +        struct lswitch_flow_build_info lsi = {
>> +            .datapaths = datapaths,
>> +            .ports = ports,
>> +            .port_groups = port_groups,
>> +            .lflows = lflows,
>> +            .mcgroups = mcgroups,
>> +            .igmp_groups = igmp_groups,
>> +            .meter_groups = meter_groups,
>> +            .lbs = lbs,
>> +            .bfd_connections = bfd_connections,
>> +            .svc_check_match = svc_check_match,
>> +            .match = DS_EMPTY_INITIALIZER,
>> +            .actions = DS_EMPTY_INITIALIZER,
>> +        };
>> +
>> +        /* Combined build - all lflow generation from lswitch and 
>> lrouter
>> +         * will move here and will be reogranized by iterator type.
>> +         */
>> +        HMAP_FOR_EACH (od, key_node, datapaths) {
>> +            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (op, key_node, ports) {
>> +            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> +        }
>> +        HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> +            build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> +                                                 &lsi.actions,
>> +                                                 &lsi.match);
>> +        }
>> +        HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> +            build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> +                                            lsi.lflows,
>> +                                            &lsi.actions,
>> +                                            &lsi.match);
>> +        }
>> +
>> +        ds_destroy(&lsi.match);
>> +        ds_destroy(&lsi.actions);
>> +    }
>> +
>> +    free(svc_check_match);
>>       build_lswitch_flows(datapaths, lflows);
>>   }
>> @@ -11801,6 +12002,25 @@ ovn_sb_set_lflow_logical_dp_group(
>>       sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
>>   }
>> +static ssize_t max_seen_lflow_size = 128;
>> +
>> +static ssize_t recent_lflow_map_mask = 0;
> 
> I think this can also be part of the fast-hmap implementation.
> 
>> +
>> +static void update_lock_array(struct hmap *lflows)
>> +{
>> +    int i;
>> +    if (recent_lflow_map_mask != lflows->mask) {
>> +        if (slice_locks) {
>> +            free(slice_locks);
>> +        }
>> +        slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask 
>> + 1);
>> +        recent_lflow_map_mask = lflows->mask;
>> +        for (i = 0; i <= lflows->mask; i++) {
>> +            ovs_mutex_init(&slice_locks[i]);
>> +        }
>> +    }
>> +}
> 
> Same here.
> 
> Regards,
> Dumitru
> 
> 


-- 
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/


More information about the dev mailing list