[ovs-dev] [OVN Patch v12 3/4] ovn-northd: Introduce parallel lflow build
Anton Ivanov
anton.ivanov at cambridgegreys.com
Mon Jan 25 21:10:36 UTC 2021
On 25/01/2021 20:56, Dumitru Ceara wrote:
> On 1/15/21 4:30 PM, anton.ivanov at cambridgegreys.com wrote:
>> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>>
>> Datapaths, ports, igmp groups and load balancers can now
>> be iterated over in parallel in order to speed up the lflow
>> generation. This decreases the time needed to generate the
>> logical flows by a factor of 4+ on a 6 core/12 thread CPU
>> without datapath groups - from 0.8-1 microseconds per flow
>> down to 0.2-0.3 microseconds per flow on average.
>>
>> The decrease in time to compute lflows with datapath groups
>> enabled is ~2 times for the same hardware - from an average of
>> 2.4 microseconds per flow to 1.2 microseconds per flow.
>>
>> Tested for on an 8 node, 400 pod K8 simulation resulting
>> in > 6K flows.
>>
>> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>> ---
>
> Hi Anton,
>
> Again, not a full review, but some initial remarks, most of them I think
> related to the fact that we're leaking the fast-hmap abstraction details.
>
>> northd/ovn-northd.c | 334 +++++++++++++++++++++++++++++++++++++-------
>> 1 file changed, 281 insertions(+), 53 deletions(-)
>>
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index dda033543..53e0cc50d 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -37,6 +37,7 @@
>> #include "lib/ovn-sb-idl.h"
>> #include "lib/ovn-util.h"
>> #include "lib/lb.h"
>> +#include "lib/fasthmap.h"
>> #include "ovn/actions.h"
>> #include "ovn/logical-fields.h"
>> #include "packets.h"
>> @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct
>> ovn_datapath *od,
>> /* If this option is 'true' northd will combine logical flows that
>> differs by
>> * logical datapath only by creating a datapah group. */
>> static bool use_logical_dp_groups = false;
>> +static bool use_parallel_build = true;
>> +
>> +static struct ovs_mutex *slice_locks = NULL;
>
> I'm not very sure why the locking details have to be known by the user
> of the fasthmap. Why doesn't the fasthmap library take care of
> (re)allocating just the right amount of locks it needs?
>
>> +
>> +/* Adds a row with the specified contents to the Logical_Flow table.
>> + * Version to use when locking is required.
>> + */
>> +static void
>> +do_ovn_lflow_add(struct hmap *lflow_map, bool shared,
>> + struct ovn_datapath *od,
>> + uint32_t hash, struct ovn_lflow *lflow)
>> +{
>> +
>> + struct ovn_lflow *old_lflow;
>> +
>> + if (shared && use_logical_dp_groups) {
>> + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> + if (old_lflow) {
>> + ovn_lflow_destroy(NULL, lflow);
>> + hmapx_add(&old_lflow->od_group, od);
>> + return;
>> + }
>> + }
>> +
>> + hmapx_add(&lflow->od_group, od);
>> + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> +}
>> +
>> /* Adds a row with the specified contents to the Logical_Flow table. */
>> static void
>> @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct
>> ovn_datapath *od,
>> {
>> ovs_assert(ovn_stage_to_datapath_type(stage) ==
>> ovn_datapath_get_type(od));
>> - struct ovn_lflow *old_lflow, *lflow;
>> + struct ovn_lflow *lflow;
>> uint32_t hash;
>> lflow = xmalloc(sizeof *lflow);
>> @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map,
>> struct ovn_datapath *od,
>> ovn_lflow_hint(stage_hint), where);
>> hash = ovn_lflow_hash(lflow);
>> - if (shared && use_logical_dp_groups) {
>> - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
>> - if (old_lflow) {
>> - ovn_lflow_destroy(NULL, lflow);
>> - hmapx_add(&old_lflow->od_group, od);
>> - return;
>> - }
>> - }
>> - hmapx_add(&lflow->od_group, od);
>> - hmap_insert(lflow_map, &lflow->hmap_node, hash);
>> + if (use_logical_dp_groups && use_parallel_build) {
>> + ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
>
> We're using hmap internals and the fasthmap user has to make sure it
> locked the correct slice lock.
>
> If slice_locks were something internal to the fasthmap, an API to get
> the corresponding lock, e.g., based on 'hash' would hide these
> implementation details.
I can add the lock-on-slice facility there.
Though for now it has only one use case - datapath groups. The idea
elsewhere is to use lockless "produce a result fragment" strategy in
each worker thread and have the main thread merge the fragments.
>
>> + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> + ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
>> + } else {
>> + do_ovn_lflow_add(lflow_map, shared, od, hash, lflow);
>> + }
>> }
>> /* Adds a row with the specified contents to the Logical_Flow table. */
>> @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct
>> ovn_igmp_group *igmp_group,
>> }
>> }
>> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
>> +
>> /* Ingress table 19: Destination lookup, unicast handling (priority
>> 50), */
>> static void
>> build_lswitch_ip_unicast_lookup(struct ovn_port *op,
>> @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port
>> *op,
>> &op->nbsp->header_);
>> } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
>> if (lsp_is_enabled(op->nbsp)) {
>> + ovs_mutex_lock(&mcgroup_mutex);
>> ovn_multicast_add(mcgroups, &mc_unknown, op);
>> + ovs_mutex_unlock(&mcgroup_mutex);
>> op->od->has_unknown = true;
>> }
>> } else if
>> (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
>> @@ -11676,6 +11706,122 @@
>> build_lswitch_and_lrouter_iterate_by_op(struct ovn_port *op,
>> &lsi->match, &lsi->actions);
>> }
>> +struct lflows_thread_pool {
>> + struct worker_pool *pool;
>> +};
>> +
>> +static void *build_lflows_thread(void *arg)
>> +{
>> + struct worker_control *control = (struct worker_control *) arg;
>> + struct lflows_thread_pool *workload;
>> + struct lswitch_flow_build_info *lsi;
>> +
>> + struct ovn_datapath *od;
>> + struct ovn_port *op;
>> + struct ovn_northd_lb *lb;
>> + struct ovn_igmp_group *igmp_group;
>> + int bnum;
>> +
>> + while (!stop_parallel_processing()) {
>> + sem_wait(&control->fire);
>
> This feels a bit error prone. Can all this be abstracted away in a
> fasthmap API that executes callbacks for all fast-hmap inputs
> (lsi->datapaths, lsi->ports, lsi->lbs, etc.) that have been defined by
> the user code?
I need to get my head around. The stop part can.
The rest, not so much - the are iterators similar to HMAP_FOR_EACH
>
>> + workload = (struct lflows_thread_pool *) control->workload;
>> + lsi = (struct lswitch_flow_build_info *) control->data;
>> + if (lsi && workload) {
>> + /* Iterate over bucket ThreadID, ThreadID+size, ... */
>> + for (bnum = control->id;
>> + bnum <= lsi->datapaths->mask;
>> + bnum += workload->pool->size)
>> + {
>> + HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum,
>> lsi->datapaths) {
>> + if (stop_parallel_processing()) {
>> + return NULL;
>> + }
>> + build_lswitch_and_lrouter_iterate_by_od(od, lsi);
>> + }
>> + }
>> + for (bnum = control->id;
>> + bnum <= lsi->ports->mask;
>> + bnum += workload->pool->size)
>> + {
>> + HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum,
>> lsi->ports) {
>> + if (stop_parallel_processing()) {
>> + return NULL;
>> + }
>> + build_lswitch_and_lrouter_iterate_by_op(op, lsi);
>> + }
>> + }
>> + for (bnum = control->id;
>> + bnum <= lsi->lbs->mask;
>> + bnum += workload->pool->size)
>> + {
>> + HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum,
>> lsi->lbs) {
>> + if (stop_parallel_processing()) {
>> + return NULL;
>> + }
>> + build_lswitch_arp_nd_service_monitor(lb,
>> lsi->lflows,
>> + &lsi->match,
>> + &lsi->actions);
>> + }
>> + }
>> + for (bnum = control->id;
>> + bnum <= lsi->igmp_groups->mask;
>> + bnum += workload->pool->size)
>> + {
>> + HMAP_FOR_EACH_IN_PARALLEL (
>> + igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> + if (stop_parallel_processing()) {
>> + return NULL;
>> + }
>> + build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> lsi->lflows,
>> + &lsi->match,
>> + &lsi->actions);
>> + }
>> + }
>> + atomic_store_relaxed(&control->finished, true);
>> + atomic_thread_fence(memory_order_acq_rel);
>
> Especially this seems error prone and it would be better if it's
> implemented inside fasthmap.c where we actually read &control->finished.
That's a different thread :) You cannot read it there.
You read control->finished in the master thread where you collect
results, you set control->finished and do the barrier in the worker
threads.
>
>> + }
>> + sem_post(control->done);
>
> Same here.
>
>> + }
>> + return NULL;
>> +}
>> +
>> +static bool pool_init_done = false;
>> +static struct lflows_thread_pool *build_lflows_pool = NULL;
>> +
>> +static void init_lflows_thread_pool(void)
>> +{
>> + int index;
>> +
>> + if (!pool_init_done) {
>> + struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> + pool_init_done = true;
>> + if (pool) {
>> + build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> + build_lflows_pool->pool = pool;
>> + for (index = 0; index < build_lflows_pool->pool->size;
>> index++) {
>> + build_lflows_pool->pool->controls[index].workload =
>> + build_lflows_pool;
>> + }
>> + }
>
> As mentioned on patch 1/4, if pool == NULL, build_lflows_pool is always
> NULL and ovn-northd crashes when use_parallel_build == true.
>
>> + }
>> +}
>> +
>> +/* TODO: replace hard cutoffs by configurable via commands. These are
>> + * temporary defines to determine single-thread to multi-thread
>> processing
>> + * cutoff.
>> + * Setting to 1 forces "all parallel" lflow build.
>> + */
>> +
>> +static void
>> +noop_callback(struct worker_pool *pool OVS_UNUSED,
>> + void *fin_result OVS_UNUSED,
>> + void *result_frags OVS_UNUSED,
>> + int index OVS_UNUSED)
>> +{
>> + /* Do nothing */
>> +}
>> +
>> +
>> static void
>> build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap
>> *ports,
>> struct hmap *port_groups, struct
>> hmap *lflows,
>> @@ -11684,53 +11830,108 @@ build_lswitch_and_lrouter_flows(struct hmap
>> *datapaths, struct hmap *ports,
>> struct shash *meter_groups, struct
>> hmap *lbs,
>> struct hmap *bfd_connections)
>> {
>> - struct ovn_datapath *od;
>> - struct ovn_port *op;
>> - struct ovn_northd_lb *lb;
>> - struct ovn_igmp_group *igmp_group;
>> char *svc_check_match = xasprintf("eth.dst == %s",
>> svc_monitor_mac);
>> - struct lswitch_flow_build_info lsi = {
>> - .datapaths = datapaths,
>> - .ports = ports,
>> - .port_groups = port_groups,
>> - .lflows = lflows,
>> - .mcgroups = mcgroups,
>> - .igmp_groups = igmp_groups,
>> - .meter_groups = meter_groups,
>> - .lbs = lbs,
>> - .bfd_connections = bfd_connections,
>> - .svc_check_match = svc_check_match,
>> - .match = DS_EMPTY_INITIALIZER,
>> - .actions = DS_EMPTY_INITIALIZER,
>> - };
>> + if (use_parallel_build) {
>> + init_lflows_thread_pool();
>> + struct hmap *lflow_segs;
>> + struct lswitch_flow_build_info *lsiv;
>> + int index;
>> - /* Combined build - all lflow generation from lswitch and lrouter
>> - * will move here and will be reogranized by iterator type.
>> - */
>> - HMAP_FOR_EACH (od, key_node, datapaths) {
>> - build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> - }
>> - HMAP_FOR_EACH (op, key_node, ports) {
>> - build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> - }
>> - HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> - &lsi.actions,
>> - &lsi.match);
>> - }
>> - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> - build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> - lsi.lflows,
>> - &lsi.actions,
>> - &lsi.match);
>> - }
>> - free(svc_check_match);
>> + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> + if (use_logical_dp_groups) {
>> + lflow_segs = NULL;
>> + } else {
>> + lflow_segs = xcalloc(sizeof(*lflow_segs),
>> build_lflows_pool->pool->size);
>> + }
>> +
>> + /* Set up "work chunks" for each thread to work on. */
>> - ds_destroy(&lsi.match);
>> - ds_destroy(&lsi.actions);
>> + for (index = 0; index < build_lflows_pool->pool->size;
>> index++) {
>> + if (use_logical_dp_groups) {
>> + /* if dp_groups are in use we lock a shared lflows hash
>> + * on a per-bucket level instead of merging hash
>> frags */
>> + lsiv[index].lflows = lflows;
>> + } else {
>> + fast_hmap_init(&lflow_segs[index], lflows->mask);
>
> I guess I should've mentioned this on patch 1/4 but now it became more
> obvious that fast_hmap_*() APIs work on 'struct hmap' objects directly.
> It makes it very easy to end up mixing thread safe (i.e., fast_hmap*)
> and regular hmap_*() APIs which might lead to unexpected results.
>
>> + lsiv[index].lflows = &lflow_segs[index];
>> + }
>> +
>> + lsiv[index].datapaths = datapaths;
>> + lsiv[index].ports = ports;
>> + lsiv[index].port_groups = port_groups;
>> + lsiv[index].mcgroups = mcgroups;
>> + lsiv[index].igmp_groups = igmp_groups;
>> + lsiv[index].meter_groups = meter_groups;
>> + lsiv[index].lbs = lbs;
>> + lsiv[index].bfd_connections = bfd_connections;
>> + lsiv[index].svc_check_match = svc_check_match;
>> + ds_init(&lsiv[index].match);
>> + ds_init(&lsiv[index].actions);
>> +
>> + build_lflows_pool->pool->controls[index].data =
>> &lsiv[index];
>> + }
>> +
>> + /* Run thread pool. */
>> + if (use_logical_dp_groups) {
>> + run_pool_callback(build_lflows_pool->pool, NULL, NULL,
>> noop_callback);
>> + } else {
>> + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> + }
>> + for (index = 0; index < build_lflows_pool->pool->size;
>> index++) {
>> + ds_destroy(&lsiv[index].match);
>> + ds_destroy(&lsiv[index].actions);
>> + }
>> + free(lflow_segs);
>> + free(lsiv);
>> + } else {
>> + struct ovn_datapath *od;
>> + struct ovn_port *op;
>> + struct ovn_northd_lb *lb;
>> + struct ovn_igmp_group *igmp_group;
>> + struct lswitch_flow_build_info lsi = {
>> + .datapaths = datapaths,
>> + .ports = ports,
>> + .port_groups = port_groups,
>> + .lflows = lflows,
>> + .mcgroups = mcgroups,
>> + .igmp_groups = igmp_groups,
>> + .meter_groups = meter_groups,
>> + .lbs = lbs,
>> + .bfd_connections = bfd_connections,
>> + .svc_check_match = svc_check_match,
>> + .match = DS_EMPTY_INITIALIZER,
>> + .actions = DS_EMPTY_INITIALIZER,
>> + };
>> +
>> + /* Combined build - all lflow generation from lswitch and
>> lrouter
>> + * will move here and will be reogranized by iterator type.
>> + */
>> + HMAP_FOR_EACH (od, key_node, datapaths) {
>> + build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
>> + }
>> + HMAP_FOR_EACH (op, key_node, ports) {
>> + build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
>> + }
>> + HMAP_FOR_EACH (lb, hmap_node, lbs) {
>> + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
>> + &lsi.actions,
>> + &lsi.match);
>> + }
>> + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
>> + build_lswitch_ip_mcast_igmp_mld(igmp_group,
>> + lsi.lflows,
>> + &lsi.actions,
>> + &lsi.match);
>> + }
>> +
>> + ds_destroy(&lsi.match);
>> + ds_destroy(&lsi.actions);
>> + }
>> +
>> + free(svc_check_match);
>> build_lswitch_flows(datapaths, lflows);
>> }
>> @@ -11801,6 +12002,25 @@ ovn_sb_set_lflow_logical_dp_group(
>> sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
>> }
>> +static ssize_t max_seen_lflow_size = 128;
>> +
>> +static ssize_t recent_lflow_map_mask = 0;
>
> I think this can also be part of the fast-hmap implementation.
>
>> +
>> +static void update_lock_array(struct hmap *lflows)
>> +{
>> + int i;
>> + if (recent_lflow_map_mask != lflows->mask) {
>> + if (slice_locks) {
>> + free(slice_locks);
>> + }
>> + slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask
>> + 1);
>> + recent_lflow_map_mask = lflows->mask;
>> + for (i = 0; i <= lflows->mask; i++) {
>> + ovs_mutex_init(&slice_locks[i]);
>> + }
>> + }
>> +}
>
> Same here.
>
> Regards,
> Dumitru
>
>
--
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/
More information about the dev
mailing list