[ovs-dev] [PATCH 4/4] datapath: Implement recirc action without recursion

Pravin Shelar pshelar at nicira.com
Mon Aug 18 02:43:49 UTC 2014


On Fri, Aug 15, 2014 at 3:12 AM, Andy Zhou <azhou at nicira.com> wrote:
> Since kernel stack is limited in size, it is not wise to using
> recursive function with large stack frames.
>
> This patch provides an alternative implementation of recirc action with
> out using recursion.
>
> A recirc action can be embedded in the middle of an actions list.
> Instead of immediately (and recursively) process the recirc action,
> the remaining the actions list is pushed on a per-cpu fixed size stack,
> the skb needs recirc is then returned.
>
> ovs_execute_actions now only finishes whenever both per-cpu stack is
> empty, and the actions list does not have recirc action. Recirc skb
> is always process before check the stack to preserve the correct
> execution order.a
>
> The depth of the private stack limit the depth of recirc action.
>
> Signed-off-by: Andy Zhou <azhou at nicira.com>
> ---
>  datapath/actions.c  | 175 ++++++++++++++++++++++++++++++++++++----------------
>  datapath/actions.h  |   7 ++-
>  datapath/datapath.c |   4 +-
>  3 files changed, 130 insertions(+), 56 deletions(-)
>
> diff --git a/datapath/actions.c b/datapath/actions.c
> index 9dd9063..da3ac89 100644
> --- a/datapath/actions.c
> +++ b/datapath/actions.c
> @@ -143,8 +143,8 @@ static bool is_skb_flow_key_valid(struct sk_buff *skb)
>         return !!OVS_CB(skb)->pkt_key->eth.type;
>  }
>
> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
> -                             const struct nlattr *attr, int len);
> +static struct sk_buff *do_execute_actions(struct datapath *dp,
> +               struct sk_buff *skb, const struct nlattr *attr, int len);
>
>  static int make_writable(struct sk_buff *skb, int write_len)
>  {
> @@ -154,6 +154,19 @@ static int make_writable(struct sk_buff *skb, int write_len)
>         return pskb_expand_head(skb, 0, 0, GFP_ATOMIC);
>  }
>
> +static inline struct ovs_deferred_action *
> +push_next_action(struct sk_buff *skb, const struct nlattr *attr, int len)
> +{
> +       struct ovs_deferred_action *da;
> +       struct ovs_action_stack *stack;
> +       struct nlattr *next_action = nla_next(attr, &len);
> +
> +       stack = &__get_cpu_var(ovs_action_stacks);
> +       da = ovs_action_stack_push(stack, skb, next_action, len);
> +
> +       return da;
> +}
> +
>  /* The end of the mac header.
>   *
>   * For non-MPLS skbs this will correspond to the network header.
> @@ -669,10 +682,8 @@ static bool last_action(const struct nlattr *a, int rem)
>  static int sample(struct datapath *dp, struct sk_buff *skb,
>                   const struct nlattr *attr)
>  {
> -       struct sw_flow_key sample_key;
>         const struct nlattr *acts_list = NULL;
>         const struct nlattr *a;
> -       struct sk_buff *sample_skb;
>         int rem;
>
>         for (a = nla_data(attr), rem = nla_len(attr); rem > 0;
> @@ -701,24 +712,28 @@ static int sample(struct datapath *dp, struct sk_buff *skb,
>         if (likely(rem == 0 ||
>                    (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
>                     last_action(a, rem)))) {
> -               sample_skb = skb;
> -               skb_get(skb);
> +               output_userspace(dp, skb, a);
>         } else {
> -               sample_skb = skb_clone(skb, GFP_ATOMIC);
> -               if (!sample_skb)
> -                       /* Skip the sample action when out of memory. */
> +               struct ovs_deferred_action *da;
> +
> +               skb = skb_clone(skb, GFP_ATOMIC);
> +               if (!skb)
> +                       /* Out of memory, skip the sample action. */
> +                       return 0;
> +
> +               da = push_next_action(skb, a, rem);
> +               if (!da) {
> +                       if (net_ratelimit())
> +                               pr_warn("%s: stack limit reached, drop sample action\n",
> +                                       ovs_dp_name(dp));
> +                       kfree_skb(skb);
>                         return 0;
> +               }
>
> -               flow_key_clone(skb, &sample_key);
> +               flow_key_clone(skb, &da->pkt_key);
>         }
>
> -       /* Note that do_execute_actions() never consumes skb.
> -        * In the case where skb has been cloned above it is the clone that
> -        * is consumed.  Otherwise the skb_get(skb) call prevents
> -        * consumption by do_execute_actions(). Thus, it is safe to simply
> -        * return the error code and let the caller (also
> -        * do_execute_actions()) free skb on error. */
> -       return do_execute_actions(dp, sample_skb, a, rem);
> +       return 0;
>  }
>
>  static void execute_hash(struct sk_buff *skb, const struct nlattr *attr)
> @@ -789,43 +804,57 @@ static int execute_set_action(struct sk_buff *skb,
>  }
>
>
> -static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
> -                         const struct nlattr *a, int rem)
> +static struct sk_buff *execute_recirc(struct datapath *dp,
> +                                     struct sk_buff *skb,
> +                                     const struct nlattr *a, int rem)
>  {
> -       struct sw_flow_key recirc_key;
>         int err;
>
> -       if (!last_action(a, rem)) {
> -               /* Recirc action is the not the last action
> -                * of the action list. */
> -               skb = skb_clone(skb, GFP_ATOMIC);
> -
> -               /* Skip the recirc action when out of memory, but
> -                * continue on with the rest of the action list. */
> -               if (!skb)
> -                       return 0;
> -       }
> -
>         if (!is_skb_flow_key_valid(skb)) {
>                 err = ovs_flow_key_update(skb, OVS_CB(skb)->pkt_key);
>                 if (err) {
>                         kfree_skb(skb);
> -                       return err;
> +                       return NULL;
>                 }
>         }
>         BUG_ON(!is_skb_flow_key_valid(skb));
>
> -       if (!last_action(a, rem))
> -               flow_key_clone(skb, &recirc_key);
> +       if (!last_action(a, rem)) {
> +               struct ovs_deferred_action *da;
> +               /* Recirc action is the not the last action,
> +                * Push the remaining actions on to the stack,
> +                * clone skb and clone pkt key.
> +                *
> +                * Skip the recirc action if the stack is full, or
> +                * we are out of memory to clone the skb.
> +                */
> +
> +               da = push_next_action(skb, a, rem);
> +               if (!da) {
> +                       /* Stack is full, drop the skb. */
> +                       if (net_ratelimit())
> +                               pr_warn("%s: stack limit reached, drop recirc action\n",
> +                                       ovs_dp_name(dp));
> +                       consume_skb(skb);
> +                       return NULL;
> +               }
> +
> +               skb = skb_clone(skb, GFP_ATOMIC);
> +               if (skb)
> +                       /* Out of memory, do not recurc*/
> +                       return NULL;
> +
> +               flow_key_clone(skb, &da->pkt_key);
> +       }
>
>         flow_key_set_recirc_id(skb, nla_get_u32(a));
> -       ovs_dp_process_packet(skb);
> -       return 0;
> +       return skb; /* Recirc skb */
>  }
>
>  /* Execute a list of actions against 'skb'. */
> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
> -                       const struct nlattr *attr, int len)
> +static struct sk_buff *do_execute_actions(struct datapath *dp,
> +                                         struct sk_buff *skb,
> +                                         const struct nlattr *attr, int len)
>  {
>         /* Every output action needs a separate clone of 'skb', but the common
>          * case is just a single output action, so that doing a clone and
> @@ -872,7 +901,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>                 case OVS_ACTION_ATTR_PUSH_VLAN:
>                         err = push_vlan(skb, nla_data(a));
>                         if (unlikely(err)) /* skb already freed. */
> -                               return err;
> +                               return NULL;
>                         break;
>
>                 case OVS_ACTION_ATTR_POP_VLAN:
> @@ -880,8 +909,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>                         break;
>
>                 case OVS_ACTION_ATTR_RECIRC:
> -                       err = execute_recirc(dp, skb, a, rem);
> -                       break;
> +                       return execute_recirc(dp, skb, a, rem);
>
>                 case OVS_ACTION_ATTR_SET:
>                         err = execute_set_action(skb, nla_data(a));
> @@ -894,7 +922,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>
>                 if (unlikely(err)) {
>                         kfree_skb(skb);
> -                       return err;
> +                       return NULL;
>                 }
>         }
>
> @@ -903,7 +931,47 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>         else
>                 consume_skb(skb);
>
> -       return 0;
> +       return NULL;
> +}
> +
> +static struct sk_buff *recirc_packet_process(struct datapath *dp,
> +                                            struct sk_buff *skb)
> +{
> +       if (ovs_dp_packet_flow_lookup(dp, skb)) {
> +               struct sw_flow_actions *acts;
> +
> +               acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
> +
> +               return do_execute_actions(dp, skb, acts->actions,
> +                                         acts->actions_len);
> +       }
> +
> +       return NULL;
> +}
> +
> +static struct sk_buff *execute_deferred_actions(struct datapath *dp,
> +                                              struct ovs_deferred_action *da)
> +{
> +       return do_execute_actions(dp, da->skb, da->actions, da->rem);
> +}
> +
> +static void ovs_recirc_packet(struct datapath *dp, struct sk_buff *rskb)
> +{
> +       struct ovs_action_stack *stack;
> +
> +       stack = &__get_cpu_var(ovs_action_stacks);
> +
> +       /* Finishing executing all deferred actions. */
> +       while (rskb || !ovs_action_stack_is_empty(stack)) {
> +               if (rskb)
> +                       rskb = recirc_packet_process(dp, rskb);
> +               else {
> +                       struct ovs_deferred_action *da;
> +
> +                       da = ovs_action_stack_pop(stack);
> +                       rskb = execute_deferred_actions(dp, da);
> +               }
> +       }
>  }
>
I have couple comments.
1. We can handle sample and recirc in same way. So rather than calling
do_execute_actions() recursively, it can just push cloned skb to
stack. we can check this stack at end of do_execute_actions(). sample
action can set action pointer, recirc does not. So on pop if there is
no action pointer we can lookup flow.
2. This can execute sample action, so we can not call this ovs_recirc_packet().

>  /* We limit the number of times that we pass into execute_actions()
> @@ -917,40 +985,41 @@ struct loop_counter {
>
>  static DEFINE_PER_CPU(struct loop_counter, loop_counters);
>
> -static int loop_suppress(struct datapath *dp, struct sw_flow_actions *actions)
> +static void loop_suppress(struct datapath *dp)
>  {
>         if (net_ratelimit())
>                 pr_warn("%s: flow looped %d times, dropping\n",
>                                 ovs_dp_name(dp), MAX_LOOPS);
> -       actions->actions_len = 0;
> -       return -ELOOP;
>  }
>
>  /* Execute a list of actions against 'skb'. */
> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
>  {
> -       struct sw_flow_actions *acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
> +       struct sw_flow_actions *acts;
> +       struct sk_buff *rskb;
>         struct loop_counter *loop;
> -       int error;
>
>         /* Check whether we've looped too much. */
>         loop = &__get_cpu_var(loop_counters);
>         if (unlikely(++loop->count > MAX_LOOPS))
>                 loop->looping = true;
>         if (unlikely(loop->looping)) {
> -               error = loop_suppress(dp, acts);
> +               loop_suppress(dp);
>                 kfree_skb(skb);
>                 goto out_loop;
>         }
>
> -       error = do_execute_actions(dp, skb, acts->actions, acts->actions_len);
> +       acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
> +       rskb = do_execute_actions(dp, skb, acts->actions,
> +                                       acts->actions_len);
> +
> +       if (rskb)
> +               ovs_recirc_packet(dp, rskb);
>
>  out_loop:
>         /* Decrement loop counter. */
>         if (!--loop->count)
>                 loop->looping = false;
> -
> -       return error;
>  }
>
>  void ovs_action_stacks_init(void)
> diff --git a/datapath/actions.h b/datapath/actions.h
> index e4601a6..de8a0dc 100644
> --- a/datapath/actions.h
> +++ b/datapath/actions.h
> @@ -48,7 +48,7 @@ struct ovs_action_stack {
>
>  void ovs_action_stacks_init(void);
>  void ovs_action_stacks_exit(void);
> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
>  void ovs_process_action_stack(void);
>
>  static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
> @@ -56,6 +56,11 @@ static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
>         stack->top = 0;
>  }
>
> +static inline bool ovs_action_stack_is_empty(struct ovs_action_stack *stack)
> +{
> +       return (stack->top == 0);
> +}
> +
>  static inline struct ovs_deferred_action *
>  ovs_action_stack_pop(struct ovs_action_stack *stack)
>  {
> diff --git a/datapath/datapath.c b/datapath/datapath.c
> index bcfdd62..981862f 100644
> --- a/datapath/datapath.c
> +++ b/datapath/datapath.c
> @@ -580,12 +580,12 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
>         OVS_CB(packet)->input_vport = input_vport;
>
>         local_bh_disable();
> -       err = ovs_execute_actions(dp, packet);
> +       ovs_execute_actions(dp, packet);
>         local_bh_enable();
>         rcu_read_unlock();
>
>         ovs_flow_free(flow, false);
> -       return err;
> +       return 0;
>
>  err_unlock:
>         rcu_read_unlock();
> --
> 1.9.1
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list