[ovs-dev] [PATCH 4/4] datapath: Implement recirc action without recursion

Andy Zhou azhou at nicira.com
Mon Aug 18 20:44:21 UTC 2014


On Sun, Aug 17, 2014 at 7:43 PM, Pravin Shelar <pshelar at nicira.com> wrote:
> On Fri, Aug 15, 2014 at 3:12 AM, Andy Zhou <azhou at nicira.com> wrote:
>> Since kernel stack is limited in size, it is not wise to using
>> recursive function with large stack frames.
>>
>> This patch provides an alternative implementation of recirc action with
>> out using recursion.
>>
>> A recirc action can be embedded in the middle of an actions list.
>> Instead of immediately (and recursively) process the recirc action,
>> the remaining the actions list is pushed on a per-cpu fixed size stack,
>> the skb needs recirc is then returned.
>>
>> ovs_execute_actions now only finishes whenever both per-cpu stack is
>> empty, and the actions list does not have recirc action. Recirc skb
>> is always process before check the stack to preserve the correct
>> execution order.a
>>
>> The depth of the private stack limit the depth of recirc action.
>>
>> Signed-off-by: Andy Zhou <azhou at nicira.com>
>> ---
>>  datapath/actions.c  | 175 ++++++++++++++++++++++++++++++++++++----------------
>>  datapath/actions.h  |   7 ++-
>>  datapath/datapath.c |   4 +-
>>  3 files changed, 130 insertions(+), 56 deletions(-)
>>
>> diff --git a/datapath/actions.c b/datapath/actions.c
>> index 9dd9063..da3ac89 100644
>> --- a/datapath/actions.c
>> +++ b/datapath/actions.c
>> @@ -143,8 +143,8 @@ static bool is_skb_flow_key_valid(struct sk_buff *skb)
>>         return !!OVS_CB(skb)->pkt_key->eth.type;
>>  }
>>
>> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> -                             const struct nlattr *attr, int len);
>> +static struct sk_buff *do_execute_actions(struct datapath *dp,
>> +               struct sk_buff *skb, const struct nlattr *attr, int len);
>>
>>  static int make_writable(struct sk_buff *skb, int write_len)
>>  {
>> @@ -154,6 +154,19 @@ static int make_writable(struct sk_buff *skb, int write_len)
>>         return pskb_expand_head(skb, 0, 0, GFP_ATOMIC);
>>  }
>>
>> +static inline struct ovs_deferred_action *
>> +push_next_action(struct sk_buff *skb, const struct nlattr *attr, int len)
>> +{
>> +       struct ovs_deferred_action *da;
>> +       struct ovs_action_stack *stack;
>> +       struct nlattr *next_action = nla_next(attr, &len);
>> +
>> +       stack = &__get_cpu_var(ovs_action_stacks);
>> +       da = ovs_action_stack_push(stack, skb, next_action, len);
>> +
>> +       return da;
>> +}
>> +
>>  /* The end of the mac header.
>>   *
>>   * For non-MPLS skbs this will correspond to the network header.
>> @@ -669,10 +682,8 @@ static bool last_action(const struct nlattr *a, int rem)
>>  static int sample(struct datapath *dp, struct sk_buff *skb,
>>                   const struct nlattr *attr)
>>  {
>> -       struct sw_flow_key sample_key;
>>         const struct nlattr *acts_list = NULL;
>>         const struct nlattr *a;
>> -       struct sk_buff *sample_skb;
>>         int rem;
>>
>>         for (a = nla_data(attr), rem = nla_len(attr); rem > 0;
>> @@ -701,24 +712,28 @@ static int sample(struct datapath *dp, struct sk_buff *skb,
>>         if (likely(rem == 0 ||
>>                    (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
>>                     last_action(a, rem)))) {
>> -               sample_skb = skb;
>> -               skb_get(skb);
>> +               output_userspace(dp, skb, a);
>>         } else {
>> -               sample_skb = skb_clone(skb, GFP_ATOMIC);
>> -               if (!sample_skb)
>> -                       /* Skip the sample action when out of memory. */
>> +               struct ovs_deferred_action *da;
>> +
>> +               skb = skb_clone(skb, GFP_ATOMIC);
>> +               if (!skb)
>> +                       /* Out of memory, skip the sample action. */
>> +                       return 0;
>> +
>> +               da = push_next_action(skb, a, rem);
>> +               if (!da) {
>> +                       if (net_ratelimit())
>> +                               pr_warn("%s: stack limit reached, drop sample action\n",
>> +                                       ovs_dp_name(dp));
>> +                       kfree_skb(skb);
>>                         return 0;
>> +               }
>>
>> -               flow_key_clone(skb, &sample_key);
>> +               flow_key_clone(skb, &da->pkt_key);
>>         }
>>
>> -       /* Note that do_execute_actions() never consumes skb.
>> -        * In the case where skb has been cloned above it is the clone that
>> -        * is consumed.  Otherwise the skb_get(skb) call prevents
>> -        * consumption by do_execute_actions(). Thus, it is safe to simply
>> -        * return the error code and let the caller (also
>> -        * do_execute_actions()) free skb on error. */
>> -       return do_execute_actions(dp, sample_skb, a, rem);
>> +       return 0;
>>  }
>>
>>  static void execute_hash(struct sk_buff *skb, const struct nlattr *attr)
>> @@ -789,43 +804,57 @@ static int execute_set_action(struct sk_buff *skb,
>>  }
>>
>>
>> -static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
>> -                         const struct nlattr *a, int rem)
>> +static struct sk_buff *execute_recirc(struct datapath *dp,
>> +                                     struct sk_buff *skb,
>> +                                     const struct nlattr *a, int rem)
>>  {
>> -       struct sw_flow_key recirc_key;
>>         int err;
>>
>> -       if (!last_action(a, rem)) {
>> -               /* Recirc action is the not the last action
>> -                * of the action list. */
>> -               skb = skb_clone(skb, GFP_ATOMIC);
>> -
>> -               /* Skip the recirc action when out of memory, but
>> -                * continue on with the rest of the action list. */
>> -               if (!skb)
>> -                       return 0;
>> -       }
>> -
>>         if (!is_skb_flow_key_valid(skb)) {
>>                 err = ovs_flow_key_update(skb, OVS_CB(skb)->pkt_key);
>>                 if (err) {
>>                         kfree_skb(skb);
>> -                       return err;
>> +                       return NULL;
>>                 }
>>         }
>>         BUG_ON(!is_skb_flow_key_valid(skb));
>>
>> -       if (!last_action(a, rem))
>> -               flow_key_clone(skb, &recirc_key);
>> +       if (!last_action(a, rem)) {
>> +               struct ovs_deferred_action *da;
>> +               /* Recirc action is the not the last action,
>> +                * Push the remaining actions on to the stack,
>> +                * clone skb and clone pkt key.
>> +                *
>> +                * Skip the recirc action if the stack is full, or
>> +                * we are out of memory to clone the skb.
>> +                */
>> +
>> +               da = push_next_action(skb, a, rem);
>> +               if (!da) {
>> +                       /* Stack is full, drop the skb. */
>> +                       if (net_ratelimit())
>> +                               pr_warn("%s: stack limit reached, drop recirc action\n",
>> +                                       ovs_dp_name(dp));
>> +                       consume_skb(skb);
>> +                       return NULL;
>> +               }
>> +
>> +               skb = skb_clone(skb, GFP_ATOMIC);
>> +               if (skb)
>> +                       /* Out of memory, do not recurc*/
>> +                       return NULL;
>> +
>> +               flow_key_clone(skb, &da->pkt_key);
>> +       }
>>
>>         flow_key_set_recirc_id(skb, nla_get_u32(a));
>> -       ovs_dp_process_packet(skb);
>> -       return 0;
>> +       return skb; /* Recirc skb */
>>  }
>>
>>  /* Execute a list of actions against 'skb'. */
>> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> -                       const struct nlattr *attr, int len)
>> +static struct sk_buff *do_execute_actions(struct datapath *dp,
>> +                                         struct sk_buff *skb,
>> +                                         const struct nlattr *attr, int len)
>>  {
>>         /* Every output action needs a separate clone of 'skb', but the common
>>          * case is just a single output action, so that doing a clone and
>> @@ -872,7 +901,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>>                 case OVS_ACTION_ATTR_PUSH_VLAN:
>>                         err = push_vlan(skb, nla_data(a));
>>                         if (unlikely(err)) /* skb already freed. */
>> -                               return err;
>> +                               return NULL;
>>                         break;
>>
>>                 case OVS_ACTION_ATTR_POP_VLAN:
>> @@ -880,8 +909,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>>                         break;
>>
>>                 case OVS_ACTION_ATTR_RECIRC:
>> -                       err = execute_recirc(dp, skb, a, rem);
>> -                       break;
>> +                       return execute_recirc(dp, skb, a, rem);
>>
>>                 case OVS_ACTION_ATTR_SET:
>>                         err = execute_set_action(skb, nla_data(a));
>> @@ -894,7 +922,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>>
>>                 if (unlikely(err)) {
>>                         kfree_skb(skb);
>> -                       return err;
>> +                       return NULL;
>>                 }
>>         }
>>
>> @@ -903,7 +931,47 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>>         else
>>                 consume_skb(skb);
>>
>> -       return 0;
>> +       return NULL;
>> +}
>> +
>> +static struct sk_buff *recirc_packet_process(struct datapath *dp,
>> +                                            struct sk_buff *skb)
>> +{
>> +       if (ovs_dp_packet_flow_lookup(dp, skb)) {
>> +               struct sw_flow_actions *acts;
>> +
>> +               acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> +
>> +               return do_execute_actions(dp, skb, acts->actions,
>> +                                         acts->actions_len);
>> +       }
>> +
>> +       return NULL;
>> +}
>> +
>> +static struct sk_buff *execute_deferred_actions(struct datapath *dp,
>> +                                              struct ovs_deferred_action *da)
>> +{
>> +       return do_execute_actions(dp, da->skb, da->actions, da->rem);
>> +}
>> +
>> +static void ovs_recirc_packet(struct datapath *dp, struct sk_buff *rskb)
>> +{
>> +       struct ovs_action_stack *stack;
>> +
>> +       stack = &__get_cpu_var(ovs_action_stacks);
>> +
>> +       /* Finishing executing all deferred actions. */
>> +       while (rskb || !ovs_action_stack_is_empty(stack)) {
>> +               if (rskb)
>> +                       rskb = recirc_packet_process(dp, rskb);
>> +               else {
>> +                       struct ovs_deferred_action *da;
>> +
>> +                       da = ovs_action_stack_pop(stack);
>> +                       rskb = execute_deferred_actions(dp, da);
>> +               }
>> +       }
>>  }
>>
> I have couple comments.
> 1. We can handle sample and recirc in same way. So rather than calling
> do_execute_actions() recursively, it can just push cloned skb to
> stack. we can check this stack at end of do_execute_actions(). sample
> action can set action pointer, recirc does not. So on pop if there is
> no action pointer we can lookup flow.
O.K. I will make the change. It may be cleaner and more efficient for
recirc packet to lookup flow immediately, and push packet with looked
up actions on the stack.

> 2. This can execute sample action, so we can not call this ovs_recirc_packet().
>
how about ovs_process_deferred_packet() ?
>>  /* We limit the number of times that we pass into execute_actions()
>> @@ -917,40 +985,41 @@ struct loop_counter {
>>
>>  static DEFINE_PER_CPU(struct loop_counter, loop_counters);
>>
>> -static int loop_suppress(struct datapath *dp, struct sw_flow_actions *actions)
>> +static void loop_suppress(struct datapath *dp)
>>  {
>>         if (net_ratelimit())
>>                 pr_warn("%s: flow looped %d times, dropping\n",
>>                                 ovs_dp_name(dp), MAX_LOOPS);
>> -       actions->actions_len = 0;
>> -       return -ELOOP;
>>  }
>>
>>  /* Execute a list of actions against 'skb'. */
>> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
>> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
>>  {
>> -       struct sw_flow_actions *acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> +       struct sw_flow_actions *acts;
>> +       struct sk_buff *rskb;
>>         struct loop_counter *loop;
>> -       int error;
>>
>>         /* Check whether we've looped too much. */
>>         loop = &__get_cpu_var(loop_counters);
>>         if (unlikely(++loop->count > MAX_LOOPS))
>>                 loop->looping = true;
>>         if (unlikely(loop->looping)) {
>> -               error = loop_suppress(dp, acts);
>> +               loop_suppress(dp);
>>                 kfree_skb(skb);
>>                 goto out_loop;
>>         }
>>
>> -       error = do_execute_actions(dp, skb, acts->actions, acts->actions_len);
>> +       acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> +       rskb = do_execute_actions(dp, skb, acts->actions,
>> +                                       acts->actions_len);
>> +
>> +       if (rskb)
>> +               ovs_recirc_packet(dp, rskb);
>>
>>  out_loop:
>>         /* Decrement loop counter. */
>>         if (!--loop->count)
>>                 loop->looping = false;
>> -
>> -       return error;
>>  }
>>
>>  void ovs_action_stacks_init(void)
>> diff --git a/datapath/actions.h b/datapath/actions.h
>> index e4601a6..de8a0dc 100644
>> --- a/datapath/actions.h
>> +++ b/datapath/actions.h
>> @@ -48,7 +48,7 @@ struct ovs_action_stack {
>>
>>  void ovs_action_stacks_init(void);
>>  void ovs_action_stacks_exit(void);
>> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
>> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
>>  void ovs_process_action_stack(void);
>>
>>  static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
>> @@ -56,6 +56,11 @@ static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
>>         stack->top = 0;
>>  }
>>
>> +static inline bool ovs_action_stack_is_empty(struct ovs_action_stack *stack)
>> +{
>> +       return (stack->top == 0);
>> +}
>> +
>>  static inline struct ovs_deferred_action *
>>  ovs_action_stack_pop(struct ovs_action_stack *stack)
>>  {
>> diff --git a/datapath/datapath.c b/datapath/datapath.c
>> index bcfdd62..981862f 100644
>> --- a/datapath/datapath.c
>> +++ b/datapath/datapath.c
>> @@ -580,12 +580,12 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
>>         OVS_CB(packet)->input_vport = input_vport;
>>
>>         local_bh_disable();
>> -       err = ovs_execute_actions(dp, packet);
>> +       ovs_execute_actions(dp, packet);
>>         local_bh_enable();
>>         rcu_read_unlock();
>>
>>         ovs_flow_free(flow, false);
>> -       return err;
>> +       return 0;
>>
>>  err_unlock:
>>         rcu_read_unlock();
>> --
>> 1.9.1
>>
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list