[ovs-dev] [OVN Patch v11 3/4] ovn-northd: Introduce parallel lflow build
Numan Siddique
numans at ovn.org
Fri Jan 15 10:01:33 UTC 2021
On Thu, Jan 14, 2021 at 10:12 PM <anton.ivanov at cambridgegreys.com> wrote:
> From: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
> Datapaths, ports, igmp groups and load balancers can now
> be iterated over in parallel in order to speed up the lflow
> generation. This decreases the time needed to generate the
> logical flows by a factor of 4+ on a 6 core/12 thread CPU
> without datapath groups - from 0.8-1 microseconds per flow
> down to 0.2-0.3 microseconds per flow on average.
>
> The decrease in time to compute lflows with datapath groups
> enabled is ~2 times for the same hardware - from an average of
> 2.4 microseconds per flow to 1.2 microseconds per flow.
>
> Tested for on an 8 node, 400 pod K8 simulation resulting
> in > 6K flows.
>
> Signed-off-by: Anton Ivanov <anton.ivanov at cambridgegreys.com>
>
Please see below a few minor comments.
Thanks
Numan
> ---
> lib/fasthmap.c | 3 +
> northd/ovn-northd.c | 340 +++++++++++++++++++++++++++++++++++++-------
> 2 files changed, 290 insertions(+), 53 deletions(-)
>
> diff --git a/lib/fasthmap.c b/lib/fasthmap.c
> index 2be93e6a8..24acb278b 100644
> --- a/lib/fasthmap.c
> +++ b/lib/fasthmap.c
> @@ -33,6 +33,7 @@
>
> VLOG_DEFINE_THIS_MODULE(fasthmap);
>
> +#ifndef OVS_HAS_PARALLEL_HMAP
>
> /* These are accessed under mutex inside ovn_add_worker_pool().
> * They do not need to be atomic.
> @@ -324,3 +325,5 @@ void ovn_run_pool_hash(
> {
> ovn_run_pool_callback(pool, result, result_frags, merge_hash_results);
> }
> +
> +#endif
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index dda033543..5e1d129d5 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -37,6 +37,7 @@
> #include "lib/ovn-sb-idl.h"
> #include "lib/ovn-util.h"
> #include "lib/lb.h"
> +#include "lib/fasthmap.h"
> #include "ovn/actions.h"
> #include "ovn/logical-fields.h"
> #include "packets.h"
> @@ -4174,6 +4175,34 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct
> ovn_datapath *od,
> /* If this option is 'true' northd will combine logical flows that
> differs by
> * logical datapath only by creating a datapah group. */
> static bool use_logical_dp_groups = false;
> +static bool use_parallel_build = true;
> +
> +static struct ovs_mutex *slice_locks = NULL;
> +
> +/* Adds a row with the specified contents to the Logical_Flow table.
> + * Version to use when locking is required.
> + */
> +static void
> +do_ovn_lflow_add_locked(struct hmap *lflow_map, bool shared,
> + struct ovn_datapath *od,
> + uint32_t hash, struct ovn_lflow *lflow)
> +{
> +
> + struct ovn_lflow *old_lflow;
> +
> + if (shared && use_logical_dp_groups) {
> + old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
> + if (old_lflow) {
> + ovn_lflow_destroy(NULL, lflow);
> + hmapx_add(&old_lflow->od_group, od);
> + return;
> + }
> + }
> +
> + hmapx_add(&lflow->od_group, od);
> + hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> +}
> +
>
> /* Adds a row with the specified contents to the Logical_Flow table. */
> static void
> @@ -4184,7 +4213,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct
> ovn_datapath *od,
> {
> ovs_assert(ovn_stage_to_datapath_type(stage) ==
> ovn_datapath_get_type(od));
>
> - struct ovn_lflow *old_lflow, *lflow;
> + struct ovn_lflow *lflow;
> uint32_t hash;
>
> lflow = xmalloc(sizeof *lflow);
> @@ -4196,17 +4225,14 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct
> ovn_datapath *od,
> ovn_lflow_hint(stage_hint), where);
>
> hash = ovn_lflow_hash(lflow);
> - if (shared && use_logical_dp_groups) {
> - old_lflow = ovn_lflow_find_by_lflow(lflow_map, lflow, hash);
> - if (old_lflow) {
> - ovn_lflow_destroy(NULL, lflow);
> - hmapx_add(&old_lflow->od_group, od);
> - return;
> - }
> - }
>
> - hmapx_add(&lflow->od_group, od);
> - hmap_insert(lflow_map, &lflow->hmap_node, hash);
> + if (use_logical_dp_groups && use_parallel_build) {
> + ovs_mutex_lock(&slice_locks[hash % lflow_map->mask]);
> + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow);
> + ovs_mutex_unlock(&slice_locks[hash % lflow_map->mask]);
> + } else {
> + do_ovn_lflow_add_locked(lflow_map, shared, od, hash, lflow);
> + }
> }
>
> /* Adds a row with the specified contents to the Logical_Flow table. */
> @@ -7348,6 +7374,8 @@ build_lswitch_ip_mcast_igmp_mld(struct
> ovn_igmp_group *igmp_group,
> }
> }
>
> +static struct ovs_mutex mcgroup_mutex = OVS_MUTEX_INITIALIZER;
> +
> /* Ingress table 19: Destination lookup, unicast handling (priority 50),
> */
> static void
> build_lswitch_ip_unicast_lookup(struct ovn_port *op,
> @@ -7386,7 +7414,9 @@ build_lswitch_ip_unicast_lookup(struct ovn_port *op,
> &op->nbsp->header_);
> } else if (!strcmp(op->nbsp->addresses[i], "unknown")) {
> if (lsp_is_enabled(op->nbsp)) {
> + ovs_mutex_lock(&mcgroup_mutex);
> ovn_multicast_add(mcgroups, &mc_unknown, op);
> + ovs_mutex_unlock(&mcgroup_mutex);
> op->od->has_unknown = true;
> }
> } else if (is_dynamic_lsp_address(op->nbsp->addresses[i])) {
> @@ -11676,6 +11706,125 @@ build_lswitch_and_lrouter_iterate_by_op(struct
> ovn_port *op,
> &lsi->match, &lsi->actions);
> }
>
> +struct lflows_thread_pool {
> + struct worker_pool *pool;
> +};
> +
> +static void *build_lflows_thread(void *arg) {
> + struct worker_control *control = (struct worker_control *) arg;
> + struct lflows_thread_pool *workload;
> + struct lswitch_flow_build_info *lsi;
> +
> + struct ovn_datapath *od;
> + struct ovn_port *op;
> + struct ovn_northd_lb *lb;
> + struct ovn_igmp_group *igmp_group;
> + int bnum;
> +
> + while (!cease_fire()) {
> + sem_wait(&control->fire);
> + workload = (struct lflows_thread_pool *) control->workload;
> + lsi = (struct lswitch_flow_build_info *) control->data;
> + if (lsi && workload) {
> + /* Iterate over bucket ThreadID, ThreadID+size, ... */
> + for (bnum = control->id;
> + bnum <= lsi->datapaths->mask;
> + bnum += workload->pool->size)
> + {
> + HMAP_FOR_EACH_IN_PARALLEL (
> + od, key_node, bnum, lsi->datapaths) {
> + if (cease_fire()) {
> + return NULL;
> + }
> + build_lswitch_and_lrouter_iterate_by_od(od, lsi);
> + }
> + }
> + for (bnum = control->id;
> + bnum <= lsi->ports->mask;
> + bnum += workload->pool->size)
> + {
> + HMAP_FOR_EACH_IN_PARALLEL (
> + op, key_node, bnum, lsi->ports) {
> + if (cease_fire()) {
> + return NULL;
> + }
> + build_lswitch_and_lrouter_iterate_by_op(op, lsi);
> + }
> + }
> + for (bnum = control->id;
> + bnum <= lsi->lbs->mask;
> + bnum += workload->pool->size)
> + {
> + HMAP_FOR_EACH_IN_PARALLEL (
> + lb, hmap_node, bnum, lsi->lbs) {
> + if (cease_fire()) {
> + return NULL;
> + }
> + build_lswitch_arp_nd_service_monitor(lb, lsi->lflows,
> + &lsi->match,
> + &lsi->actions);
> + }
> + }
> + for (bnum = control->id;
> + bnum <= lsi->igmp_groups->mask;
> + bnum += workload->pool->size)
> + {
> + HMAP_FOR_EACH_IN_PARALLEL (
> + igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> + if (cease_fire()) {
> + return NULL;
> + }
> + build_lswitch_ip_mcast_igmp_mld(igmp_group,
> lsi->lflows,
> + &lsi->match,
> + &lsi->actions);
> + }
> + }
> + atomic_store_relaxed(&control->finished, true);
> + atomic_thread_fence(memory_order_acq_rel);
> + }
> + sem_post(control->done);
> + }
> + return NULL;
> +}
> +
> +static bool pool_init_done = false;
> +static struct lflows_thread_pool *build_lflows_pool = NULL;
> +
> +static void init_lflows_thread_pool(void)
> +{
> + int index;
> +
> + if (!pool_init_done) {
> + struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> + pool_init_done = true;
> + if (pool) {
> + build_lflows_pool =
> + xmalloc(sizeof(struct lflows_thread_pool));
>
As per the guidelines, using expression is preferred over data type in
sizeof operator -
https://github.com/ovn-org/ovn/blob/master/Documentation/internals/contributing/coding-style.rst
I would suggest to change the above to -
build_lflows_pool = xmalloc(sizeof *build_lflows_pool);
I would suggest doing the same for few other places in the patch series.
+ build_lflows_pool->pool = pool;
> + for (index = 0; index < build_lflows_pool->pool->size;
> index++) {
> + build_lflows_pool->pool->controls[index].workload =
> + build_lflows_pool;
> + }
> + }
> + }
> +}
> +
> +/* TODO: replace hard cutoffs by configurable via commands. These are
> + * temporary defines to determine single-thread to multi-thread processing
> + * cutoff.
> + * Setting to 1 forces "all parallel" lflow build.
> + */
> +
> +static void
> +noop_callback(struct worker_pool *pool OVS_UNUSED,
> + void *fin_result OVS_UNUSED,
> + void *result_frags OVS_UNUSED,
> + int index OVS_UNUSED)
> +{
> + /* Do nothing */
> +}
> +
> +
> static void
> build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap
> *ports,
> struct hmap *port_groups, struct hmap
> *lflows,
> @@ -11684,53 +11833,111 @@ build_lswitch_and_lrouter_flows(struct hmap
> *datapaths, struct hmap *ports,
> struct shash *meter_groups, struct hmap
> *lbs,
> struct hmap *bfd_connections)
> {
> - struct ovn_datapath *od;
> - struct ovn_port *op;
> - struct ovn_northd_lb *lb;
> - struct ovn_igmp_group *igmp_group;
>
> char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>
> - struct lswitch_flow_build_info lsi = {
> - .datapaths = datapaths,
> - .ports = ports,
> - .port_groups = port_groups,
> - .lflows = lflows,
> - .mcgroups = mcgroups,
> - .igmp_groups = igmp_groups,
> - .meter_groups = meter_groups,
> - .lbs = lbs,
> - .bfd_connections = bfd_connections,
> - .svc_check_match = svc_check_match,
> - .match = DS_EMPTY_INITIALIZER,
> - .actions = DS_EMPTY_INITIALIZER,
> - };
> + if (use_parallel_build) {
> + init_lflows_thread_pool();
> + struct hmap *lflow_segs;
> + struct lswitch_flow_build_info *lsiv;
> + int index;
> +
> + lsiv = xmalloc(
> + sizeof(struct lswitch_flow_build_info) *
> + build_lflows_pool->pool->size);
> + if (use_logical_dp_groups) {
> + lflow_segs = NULL;
> + } else {
> + lflow_segs = xmalloc(
> + sizeof(struct hmap) * build_lflows_pool->pool->size);
>
Same here - lflow_segs = xmalloc(
+ sizeof *lflow_segs * build_lflows_pool->pool->size);
+ }
>
> - /* Combined build - all lflow generation from lswitch and lrouter
> - * will move here and will be reogranized by iterator type.
> - */
> - HMAP_FOR_EACH (od, key_node, datapaths) {
> - build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> - }
> - HMAP_FOR_EACH (op, key_node, ports) {
> - build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> - }
> - HMAP_FOR_EACH (lb, hmap_node, lbs) {
> - build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
> - &lsi.actions,
> - &lsi.match);
> - }
> - HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> - build_lswitch_ip_mcast_igmp_mld(igmp_group,
> - lsi.lflows,
> - &lsi.actions,
> - &lsi.match);
> - }
> - free(svc_check_match);
> + /* Set up "work chunks" for each thread to work on. */
>
> - ds_destroy(&lsi.match);
> - ds_destroy(&lsi.actions);
> + for (index = 0; index < build_lflows_pool->pool->size; index++) {
> + if (use_logical_dp_groups) {
> + /* if dp_groups are in use we lock a shared lflows hash
> + * on a per-bucket level instead of merging hash frags */
> + lsiv[index].lflows = lflows;
> + } else {
> + fast_hmap_init(&lflow_segs[index], lflows->mask);
> + lsiv[index].lflows = &lflow_segs[index];
> + }
> +
> + lsiv[index].datapaths = datapaths;
> + lsiv[index].ports = ports;
> + lsiv[index].port_groups = port_groups;
> + lsiv[index].mcgroups = mcgroups;
> + lsiv[index].igmp_groups = igmp_groups;
> + lsiv[index].meter_groups = meter_groups;
> + lsiv[index].lbs = lbs;
> + lsiv[index].bfd_connections = bfd_connections;
> + lsiv[index].svc_check_match = svc_check_match;
> + ds_init(&lsiv[index].match);
> + ds_init(&lsiv[index].actions);
> +
> + build_lflows_pool->pool->controls[index].data = &lsiv[index];
> + }
> +
> + /* Run thread pool. */
> + if (use_logical_dp_groups) {
> + run_pool_callback(build_lflows_pool->pool, NULL, NULL,
> noop_callback);
> + } else {
> + run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> + }
> +
> + for (index = 0; index < build_lflows_pool->pool->size; index++) {
> + ds_destroy(&lsiv[index].match);
> + ds_destroy(&lsiv[index].actions);
> + }
> + free(lflow_segs);
> + free(lsiv);
> + } else {
> + struct ovn_datapath *od;
> + struct ovn_port *op;
> + struct ovn_northd_lb *lb;
> + struct ovn_igmp_group *igmp_group;
> + struct lswitch_flow_build_info lsi = {
> + .datapaths = datapaths,
> + .ports = ports,
> + .port_groups = port_groups,
> + .lflows = lflows,
> + .mcgroups = mcgroups,
> + .igmp_groups = igmp_groups,
> + .meter_groups = meter_groups,
> + .lbs = lbs,
> + .bfd_connections = bfd_connections,
> + .svc_check_match = svc_check_match,
> + .match = DS_EMPTY_INITIALIZER,
> + .actions = DS_EMPTY_INITIALIZER,
> + };
> +
> + /* Combined build - all lflow generation from lswitch and lrouter
> + * will move here and will be reogranized by iterator type.
> + */
> + HMAP_FOR_EACH (od, key_node, datapaths) {
> + build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
> + }
> + HMAP_FOR_EACH (op, key_node, ports) {
> + build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
> + }
> + HMAP_FOR_EACH (lb, hmap_node, lbs) {
> + build_lswitch_arp_nd_service_monitor(lb, lsi.lflows,
> + &lsi.actions,
> + &lsi.match);
> + }
> + HMAP_FOR_EACH (igmp_group, hmap_node, igmp_groups) {
> + build_lswitch_ip_mcast_igmp_mld(igmp_group,
> + lsi.lflows,
> + &lsi.actions,
> + &lsi.match);
> + }
> +
> + ds_destroy(&lsi.match);
> + ds_destroy(&lsi.actions);
> + }
>
> + free(svc_check_match);
> build_lswitch_flows(datapaths, lflows);
> }
>
> @@ -11801,6 +12008,25 @@ ovn_sb_set_lflow_logical_dp_group(
> sbrec_logical_flow_set_logical_dp_group(sbflow, dpg->dp_group);
> }
>
> +static ssize_t max_seen_lflow_size = 128;
> +
> +static ssize_t recent_lflow_map_mask = 0;
> +
> +static void update_lock_array(struct hmap *lflows)
> +{
> + int i;
> + if (recent_lflow_map_mask != lflows->mask) {
> + if (slice_locks) {
> + free(slice_locks);
> + }
> + slice_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1);
> + recent_lflow_map_mask = lflows->mask;
> + for (i = 0; i <= lflows->mask; i++) {
> + ovs_mutex_init(&slice_locks[i]);
> + }
> + }
> +}
> +
> /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB
> database,
> * constructing their contents based on the OVN_NB database. */
> static void
> @@ -11810,13 +12036,21 @@ build_lflows(struct northd_context *ctx, struct
> hmap *datapaths,
> struct shash *meter_groups,
> struct hmap *lbs, struct hmap *bfd_connections)
> {
> - struct hmap lflows = HMAP_INITIALIZER(&lflows);
> + struct hmap lflows;
>
> + fast_hmap_size_for(&lflows, max_seen_lflow_size);
> + if (use_parallel_build) {
> + update_lock_array(&lflows);
> + }
> build_lswitch_and_lrouter_flows(datapaths, ports,
> port_groups, &lflows, mcgroups,
> igmp_groups, meter_groups, lbs,
> bfd_connections);
>
> + if (hmap_count(&lflows) > max_seen_lflow_size) {
> + max_seen_lflow_size = hmap_count(&lflows);
> + }
> +
> /* Collecting all unique datapath groups. */
> struct hmap dp_groups = HMAP_INITIALIZER(&dp_groups);
> struct hmapx single_dp_lflows = HMAPX_INITIALIZER(&single_dp_lflows);
> --
> 2.20.1
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
>
>
More information about the dev
mailing list