[ovs-dev] [v3 3/4] datapath: Implement recirc action without recursion

Pravin Shelar pshelar at nicira.com
Thu Aug 28 22:40:23 UTC 2014


On Wed, Aug 27, 2014 at 4:13 AM, Andy Zhou <azhou at nicira.com> wrote:
> Since kernel stack is limited in size, it is not wise to using
> recursive function with large stack frames.
>
> This patch provides an alternative implementation of recirc action
> without using recursion.
>
> A per CPU fixed sized, 'deferred action stack' is used to store both
> recirc and sample actions when executing actions. These actions becomes
> 'deferred actions".
>
> Deferred actions are only executed after all other actions has been
> executed, including the ones triggered by loopback from the kernel
> network stack.
>
> The depth of the private stack, currently set to 10, limits the number
> of 'deferred actions' can be accumulated for each packet.
>
> Signed-off-by: Andy Zhou <azhou at nicira.com>

Patch looks good, I have few comments.

> ---
>  datapath/actions.c  | 150 +++++++++++++++++++++++++++++++++++++---------------
>  datapath/actions.h  |   4 +-
>  datapath/datapath.c |   4 +-
>  3 files changed, 112 insertions(+), 46 deletions(-)
>
> diff --git a/datapath/actions.c b/datapath/actions.c
> index 31fb57d..662de9c 100644
> --- a/datapath/actions.c
> +++ b/datapath/actions.c
> @@ -42,6 +42,7 @@
>  #include "actions.h"
>
>  static DEFINE_PER_CPU(struct ovs_action_stack, action_stacks);
> +DEFINE_PER_CPU(int, ovs_exec_actions_count);
>
....

@@ -741,33 +758,39 @@ static int sample(struct datapath *dp, struct
sk_buff *skb,
>         rem = nla_len(acts_list);
>         a = nla_data(acts_list);
>
> +       if (unlikely(rem == 0))
> +               return 0;
> +
>         /* Actions list is either empty or only contains a single user-space
>          * action, the latter being a special case as it is the only known
>          * usage of the sample action.
>          * In these special cases don't clone the skb as there are no
>          * side-effects in the nested actions.
>          * Otherwise, clone in case the nested actions have side effects. */
> -       if (likely(rem == 0 ||
> -                  (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
> -                   last_action(a, rem)))) {
> -               sample_skb = skb;
> -               skb_get(skb);
> +       if (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
> +           last_action(a, rem)) {
> +               return output_userspace(dp, skb, a);
This is optimization, should be kept in separate patch.

>         } else {
> -               sample_skb = skb_clone(skb, GFP_ATOMIC);
> -               if (!sample_skb)
> -                       /* Skip the sample action when out of memory. */
> +               struct ovs_deferred_action *da;
> +
> +               skb = skb_clone(skb, GFP_ATOMIC);
> +               if (!skb)
> +                       /* Out of memory, skip the sample action. */
> +                       return 0;
> +
> +               da = push_actions(skb, a, rem);
> +               if (!da) {
> +                       if (net_ratelimit())
> +                               pr_warn("%s: stack limit reached, drop sample action\n",
> +                                       ovs_dp_name(dp));
> +                       kfree_skb(skb);
>                         return 0;
> +               }
>
> -               flow_key_clone(skb, &sample_key);
> +               flow_key_clone(skb, &da->pkt_key);
>         }
>
> -       /* Note that do_execute_actions() never consumes skb.
> -        * In the case where skb has been cloned above it is the clone that
> -        * is consumed.  Otherwise the skb_get(skb) call prevents
> -        * consumption by do_execute_actions(). Thus, it is safe to simply
> -        * return the error code and let the caller (also
> -        * do_execute_actions()) free skb on error. */
> -       return do_execute_actions(dp, sample_skb, a, rem);
> +       return 0;
>  }
>
>  static void execute_hash(struct sk_buff *skb, const struct nlattr *attr)
> @@ -837,11 +860,10 @@ static int execute_set_action(struct sk_buff *skb,
>         return err;
>  }
>
> -
>  static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
>                           const struct nlattr *a, int rem)
>  {
> -       struct sw_flow_key recirc_key;
> +       struct ovs_deferred_action *da;
>
>         if (!is_skb_flow_key_valid(skb)) {
>                 int err;
> @@ -854,26 +876,40 @@ static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
>         BUG_ON(!is_skb_flow_key_valid(skb));
>
>         if (!last_action(a, rem)) {
> -               /* Recirc action is the not the last action
> -                * of the action list. */
> -               skb = skb_clone(skb, GFP_ATOMIC);
> +               /* Recirc action is the not the last action,
> +                * We will need to push skb clone instead of skb. */
>
> -               /* Skip the recirc action when out of memory, but
> -                * continue on with the rest of the action list. */
> +               skb = skb_clone(skb, GFP_ATOMIC);
>                 if (!skb)
> +                       /* Out of memory, skip recirc. */
>                         return 0;
>
> -               flow_key_clone(skb, &recirc_key);
> +               /* Push the skb clone for recirc. */
> +               da = push_actions(skb, NULL, 0);
> +               if (!da)
> +                       goto stack_full;
> +
> +               flow_key_clone(skb, &da->pkt_key);
> +       } else {
> +               skb_get(skb);
why do we need skb_get()?

> +               if (!push_actions(skb, NULL, 0))
> +                       goto stack_full;

>         }
>
>         flow_key_set_recirc_id(skb, nla_get_u32(a));
> -       ovs_dp_process_packet(skb);
> +       return 0;
> +
> +stack_full:
> +       if (net_ratelimit())
> +               pr_warn("%s: stack limit reached, drop recirc action\n",
> +                       ovs_dp_name(dp));
> +
>         return 0;
>  }
>
>  /* Execute a list of actions against 'skb'. */
> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
> -                       const struct nlattr *attr, int len)
> +static void do_execute_actions(struct datapath *dp, struct sk_buff *skb,
> +                              const struct nlattr *attr, int len)
>  {
what is reason for changing function prototype?

>         /* Every output action needs a separate clone of 'skb', but the common
>          * case is just a single output action, so that doing a clone and
> @@ -920,7 +956,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>                 case OVS_ACTION_ATTR_PUSH_VLAN:
>                         err = push_vlan(skb, nla_data(a));
>                         if (unlikely(err)) /* skb already freed. */
> -                               return err;
> +                               return;
>                         break;
>
>                 case OVS_ACTION_ATTR_POP_VLAN:
> @@ -929,12 +965,6 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>
>                 case OVS_ACTION_ATTR_RECIRC:
>                         err = execute_recirc(dp, skb, a, rem);
> -                       if (last_action(a, rem)) {
> -                               /* If this is the last action, the skb has
> -                                * been consumed or freed.
> -                                * Return immediately. */
> -                               return err;
> -                       }
>                         break;
>
>                 case OVS_ACTION_ATTR_SET:
> @@ -948,7 +978,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>
>                 if (unlikely(err)) {
>                         kfree_skb(skb);
> -                       return err;
> +                       return;
>                 }
>         }
>
> @@ -956,14 +986,50 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>                 do_output(dp, skb, prev_port);
>         else
>                 consume_skb(skb);
> +}
>
> -       return 0;
> +static void ovs_process_deferred_packets(struct datapath *dp)
> +{
> +       struct ovs_action_stack *stack;
> +
> +       stack = this_cpu_ptr(&(action_stacks));
> +
> +       /* Finishing executing all deferred actions. */
> +       while (!action_stack_is_empty(stack)) {
> +               struct ovs_deferred_action *da = action_stack_pop(stack);
> +               struct sk_buff *skb = da->skb;
> +               const struct nlattr *actions = da->actions;
> +               int rem = da->rem;
> +
> +               if (!actions) {
> +                       struct sw_flow *flow = ovs_dp_packet_flow_lookup(dp, skb);
> +
> +                       if (flow) {
> +                               struct sw_flow_actions *sf_acts;
> +
> +                               sf_acts = rcu_dereference(flow->sf_acts);
> +                               actions = sf_acts->actions;
> +                               rem = sf_acts->actions_len;
> +                       }
> +               }
> +
> +               if (actions)
> +                       do_execute_actions(dp, skb, actions, rem);
> +       }

We also need limit on number of recirc actions executed for given packet.

>  }
>
>  /* Execute a list of actions against 'skb'. */
> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb, struct sw_flow_actions *acts)
> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb,
> +                        struct sw_flow_actions *acts)
>  {
> -       return do_execute_actions(dp, skb, acts->actions, acts->actions_len);
> +       this_cpu_inc(ovs_exec_actions_count);
> +
> +       do_execute_actions(dp, skb, acts->actions, acts->actions_len);
> +
> +       if (this_cpu_read(ovs_exec_actions_count) == 1)
> +               ovs_process_deferred_packets(dp);
> +
> +       this_cpu_dec(ovs_exec_actions_count);
>  }
>
>  void ovs_action_stacks_init(void)
> diff --git a/datapath/actions.h b/datapath/actions.h
> index 5f98ad4..84801fa 100644
> --- a/datapath/actions.h
> +++ b/datapath/actions.h
> @@ -33,7 +33,7 @@
>
>  struct ovs_deferred_action {
>         struct sk_buff *skb;
> -       struct nlattr *actions;
> +       const struct nlattr *actions;
>         int rem;
>
This should be part of first patch.

>         /* Store pkt_key clone during recirc */
> @@ -48,7 +48,7 @@ struct ovs_action_stack {
>
>  void ovs_action_stacks_init(void);
>  void ovs_action_stacks_exit(void);;
> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb, struct sw_flow_actions *acts);
> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb, struct sw_flow_actions *acts);
>  void ovs_process_action_stack(void);
>
>  #endif /* actions.h */
> diff --git a/datapath/datapath.c b/datapath/datapath.c
> index 5bae23e..2424a62 100644
> --- a/datapath/datapath.c
> +++ b/datapath/datapath.c
> @@ -595,12 +595,12 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
>         sf_acts = rcu_dereference(flow->sf_acts);
>
>         local_bh_disable();
> -       err = ovs_execute_actions(dp, packet, sf_acts);
> +       ovs_execute_actions(dp, packet, sf_acts);
>         local_bh_enable();
>         rcu_read_unlock();
>
>         ovs_flow_free(flow, false);
> -       return err;
> +       return 0;
>
>  err_unlock:
>         rcu_read_unlock();
> --
> 1.9.1
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list