[ovs-dev] [PATCH 4/4] datapath: Implement recirc action without recursion
Andy Zhou
azhou at nicira.com
Mon Aug 18 20:44:21 UTC 2014
On Sun, Aug 17, 2014 at 7:43 PM, Pravin Shelar <pshelar at nicira.com> wrote:
> On Fri, Aug 15, 2014 at 3:12 AM, Andy Zhou <azhou at nicira.com> wrote:
>> Since kernel stack is limited in size, it is not wise to using
>> recursive function with large stack frames.
>>
>> This patch provides an alternative implementation of recirc action with
>> out using recursion.
>>
>> A recirc action can be embedded in the middle of an actions list.
>> Instead of immediately (and recursively) process the recirc action,
>> the remaining the actions list is pushed on a per-cpu fixed size stack,
>> the skb needs recirc is then returned.
>>
>> ovs_execute_actions now only finishes whenever both per-cpu stack is
>> empty, and the actions list does not have recirc action. Recirc skb
>> is always process before check the stack to preserve the correct
>> execution order.a
>>
>> The depth of the private stack limit the depth of recirc action.
>>
>> Signed-off-by: Andy Zhou <azhou at nicira.com>
>> ---
>> datapath/actions.c | 175 ++++++++++++++++++++++++++++++++++++----------------
>> datapath/actions.h | 7 ++-
>> datapath/datapath.c | 4 +-
>> 3 files changed, 130 insertions(+), 56 deletions(-)
>>
>> diff --git a/datapath/actions.c b/datapath/actions.c
>> index 9dd9063..da3ac89 100644
>> --- a/datapath/actions.c
>> +++ b/datapath/actions.c
>> @@ -143,8 +143,8 @@ static bool is_skb_flow_key_valid(struct sk_buff *skb)
>> return !!OVS_CB(skb)->pkt_key->eth.type;
>> }
>>
>> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> - const struct nlattr *attr, int len);
>> +static struct sk_buff *do_execute_actions(struct datapath *dp,
>> + struct sk_buff *skb, const struct nlattr *attr, int len);
>>
>> static int make_writable(struct sk_buff *skb, int write_len)
>> {
>> @@ -154,6 +154,19 @@ static int make_writable(struct sk_buff *skb, int write_len)
>> return pskb_expand_head(skb, 0, 0, GFP_ATOMIC);
>> }
>>
>> +static inline struct ovs_deferred_action *
>> +push_next_action(struct sk_buff *skb, const struct nlattr *attr, int len)
>> +{
>> + struct ovs_deferred_action *da;
>> + struct ovs_action_stack *stack;
>> + struct nlattr *next_action = nla_next(attr, &len);
>> +
>> + stack = &__get_cpu_var(ovs_action_stacks);
>> + da = ovs_action_stack_push(stack, skb, next_action, len);
>> +
>> + return da;
>> +}
>> +
>> /* The end of the mac header.
>> *
>> * For non-MPLS skbs this will correspond to the network header.
>> @@ -669,10 +682,8 @@ static bool last_action(const struct nlattr *a, int rem)
>> static int sample(struct datapath *dp, struct sk_buff *skb,
>> const struct nlattr *attr)
>> {
>> - struct sw_flow_key sample_key;
>> const struct nlattr *acts_list = NULL;
>> const struct nlattr *a;
>> - struct sk_buff *sample_skb;
>> int rem;
>>
>> for (a = nla_data(attr), rem = nla_len(attr); rem > 0;
>> @@ -701,24 +712,28 @@ static int sample(struct datapath *dp, struct sk_buff *skb,
>> if (likely(rem == 0 ||
>> (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
>> last_action(a, rem)))) {
>> - sample_skb = skb;
>> - skb_get(skb);
>> + output_userspace(dp, skb, a);
>> } else {
>> - sample_skb = skb_clone(skb, GFP_ATOMIC);
>> - if (!sample_skb)
>> - /* Skip the sample action when out of memory. */
>> + struct ovs_deferred_action *da;
>> +
>> + skb = skb_clone(skb, GFP_ATOMIC);
>> + if (!skb)
>> + /* Out of memory, skip the sample action. */
>> + return 0;
>> +
>> + da = push_next_action(skb, a, rem);
>> + if (!da) {
>> + if (net_ratelimit())
>> + pr_warn("%s: stack limit reached, drop sample action\n",
>> + ovs_dp_name(dp));
>> + kfree_skb(skb);
>> return 0;
>> + }
>>
>> - flow_key_clone(skb, &sample_key);
>> + flow_key_clone(skb, &da->pkt_key);
>> }
>>
>> - /* Note that do_execute_actions() never consumes skb.
>> - * In the case where skb has been cloned above it is the clone that
>> - * is consumed. Otherwise the skb_get(skb) call prevents
>> - * consumption by do_execute_actions(). Thus, it is safe to simply
>> - * return the error code and let the caller (also
>> - * do_execute_actions()) free skb on error. */
>> - return do_execute_actions(dp, sample_skb, a, rem);
>> + return 0;
>> }
>>
>> static void execute_hash(struct sk_buff *skb, const struct nlattr *attr)
>> @@ -789,43 +804,57 @@ static int execute_set_action(struct sk_buff *skb,
>> }
>>
>>
>> -static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
>> - const struct nlattr *a, int rem)
>> +static struct sk_buff *execute_recirc(struct datapath *dp,
>> + struct sk_buff *skb,
>> + const struct nlattr *a, int rem)
>> {
>> - struct sw_flow_key recirc_key;
>> int err;
>>
>> - if (!last_action(a, rem)) {
>> - /* Recirc action is the not the last action
>> - * of the action list. */
>> - skb = skb_clone(skb, GFP_ATOMIC);
>> -
>> - /* Skip the recirc action when out of memory, but
>> - * continue on with the rest of the action list. */
>> - if (!skb)
>> - return 0;
>> - }
>> -
>> if (!is_skb_flow_key_valid(skb)) {
>> err = ovs_flow_key_update(skb, OVS_CB(skb)->pkt_key);
>> if (err) {
>> kfree_skb(skb);
>> - return err;
>> + return NULL;
>> }
>> }
>> BUG_ON(!is_skb_flow_key_valid(skb));
>>
>> - if (!last_action(a, rem))
>> - flow_key_clone(skb, &recirc_key);
>> + if (!last_action(a, rem)) {
>> + struct ovs_deferred_action *da;
>> + /* Recirc action is the not the last action,
>> + * Push the remaining actions on to the stack,
>> + * clone skb and clone pkt key.
>> + *
>> + * Skip the recirc action if the stack is full, or
>> + * we are out of memory to clone the skb.
>> + */
>> +
>> + da = push_next_action(skb, a, rem);
>> + if (!da) {
>> + /* Stack is full, drop the skb. */
>> + if (net_ratelimit())
>> + pr_warn("%s: stack limit reached, drop recirc action\n",
>> + ovs_dp_name(dp));
>> + consume_skb(skb);
>> + return NULL;
>> + }
>> +
>> + skb = skb_clone(skb, GFP_ATOMIC);
>> + if (skb)
>> + /* Out of memory, do not recurc*/
>> + return NULL;
>> +
>> + flow_key_clone(skb, &da->pkt_key);
>> + }
>>
>> flow_key_set_recirc_id(skb, nla_get_u32(a));
>> - ovs_dp_process_packet(skb);
>> - return 0;
>> + return skb; /* Recirc skb */
>> }
>>
>> /* Execute a list of actions against 'skb'. */
>> -static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> - const struct nlattr *attr, int len)
>> +static struct sk_buff *do_execute_actions(struct datapath *dp,
>> + struct sk_buff *skb,
>> + const struct nlattr *attr, int len)
>> {
>> /* Every output action needs a separate clone of 'skb', but the common
>> * case is just a single output action, so that doing a clone and
>> @@ -872,7 +901,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> case OVS_ACTION_ATTR_PUSH_VLAN:
>> err = push_vlan(skb, nla_data(a));
>> if (unlikely(err)) /* skb already freed. */
>> - return err;
>> + return NULL;
>> break;
>>
>> case OVS_ACTION_ATTR_POP_VLAN:
>> @@ -880,8 +909,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> break;
>>
>> case OVS_ACTION_ATTR_RECIRC:
>> - err = execute_recirc(dp, skb, a, rem);
>> - break;
>> + return execute_recirc(dp, skb, a, rem);
>>
>> case OVS_ACTION_ATTR_SET:
>> err = execute_set_action(skb, nla_data(a));
>> @@ -894,7 +922,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>>
>> if (unlikely(err)) {
>> kfree_skb(skb);
>> - return err;
>> + return NULL;
>> }
>> }
>>
>> @@ -903,7 +931,47 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
>> else
>> consume_skb(skb);
>>
>> - return 0;
>> + return NULL;
>> +}
>> +
>> +static struct sk_buff *recirc_packet_process(struct datapath *dp,
>> + struct sk_buff *skb)
>> +{
>> + if (ovs_dp_packet_flow_lookup(dp, skb)) {
>> + struct sw_flow_actions *acts;
>> +
>> + acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> +
>> + return do_execute_actions(dp, skb, acts->actions,
>> + acts->actions_len);
>> + }
>> +
>> + return NULL;
>> +}
>> +
>> +static struct sk_buff *execute_deferred_actions(struct datapath *dp,
>> + struct ovs_deferred_action *da)
>> +{
>> + return do_execute_actions(dp, da->skb, da->actions, da->rem);
>> +}
>> +
>> +static void ovs_recirc_packet(struct datapath *dp, struct sk_buff *rskb)
>> +{
>> + struct ovs_action_stack *stack;
>> +
>> + stack = &__get_cpu_var(ovs_action_stacks);
>> +
>> + /* Finishing executing all deferred actions. */
>> + while (rskb || !ovs_action_stack_is_empty(stack)) {
>> + if (rskb)
>> + rskb = recirc_packet_process(dp, rskb);
>> + else {
>> + struct ovs_deferred_action *da;
>> +
>> + da = ovs_action_stack_pop(stack);
>> + rskb = execute_deferred_actions(dp, da);
>> + }
>> + }
>> }
>>
> I have couple comments.
> 1. We can handle sample and recirc in same way. So rather than calling
> do_execute_actions() recursively, it can just push cloned skb to
> stack. we can check this stack at end of do_execute_actions(). sample
> action can set action pointer, recirc does not. So on pop if there is
> no action pointer we can lookup flow.
O.K. I will make the change. It may be cleaner and more efficient for
recirc packet to lookup flow immediately, and push packet with looked
up actions on the stack.
> 2. This can execute sample action, so we can not call this ovs_recirc_packet().
>
how about ovs_process_deferred_packet() ?
>> /* We limit the number of times that we pass into execute_actions()
>> @@ -917,40 +985,41 @@ struct loop_counter {
>>
>> static DEFINE_PER_CPU(struct loop_counter, loop_counters);
>>
>> -static int loop_suppress(struct datapath *dp, struct sw_flow_actions *actions)
>> +static void loop_suppress(struct datapath *dp)
>> {
>> if (net_ratelimit())
>> pr_warn("%s: flow looped %d times, dropping\n",
>> ovs_dp_name(dp), MAX_LOOPS);
>> - actions->actions_len = 0;
>> - return -ELOOP;
>> }
>>
>> /* Execute a list of actions against 'skb'. */
>> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
>> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
>> {
>> - struct sw_flow_actions *acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> + struct sw_flow_actions *acts;
>> + struct sk_buff *rskb;
>> struct loop_counter *loop;
>> - int error;
>>
>> /* Check whether we've looped too much. */
>> loop = &__get_cpu_var(loop_counters);
>> if (unlikely(++loop->count > MAX_LOOPS))
>> loop->looping = true;
>> if (unlikely(loop->looping)) {
>> - error = loop_suppress(dp, acts);
>> + loop_suppress(dp);
>> kfree_skb(skb);
>> goto out_loop;
>> }
>>
>> - error = do_execute_actions(dp, skb, acts->actions, acts->actions_len);
>> + acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
>> + rskb = do_execute_actions(dp, skb, acts->actions,
>> + acts->actions_len);
>> +
>> + if (rskb)
>> + ovs_recirc_packet(dp, rskb);
>>
>> out_loop:
>> /* Decrement loop counter. */
>> if (!--loop->count)
>> loop->looping = false;
>> -
>> - return error;
>> }
>>
>> void ovs_action_stacks_init(void)
>> diff --git a/datapath/actions.h b/datapath/actions.h
>> index e4601a6..de8a0dc 100644
>> --- a/datapath/actions.h
>> +++ b/datapath/actions.h
>> @@ -48,7 +48,7 @@ struct ovs_action_stack {
>>
>> void ovs_action_stacks_init(void);
>> void ovs_action_stacks_exit(void);
>> -int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
>> +void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
>> void ovs_process_action_stack(void);
>>
>> static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
>> @@ -56,6 +56,11 @@ static inline void ovs_action_stack_init(struct ovs_action_stack *stack)
>> stack->top = 0;
>> }
>>
>> +static inline bool ovs_action_stack_is_empty(struct ovs_action_stack *stack)
>> +{
>> + return (stack->top == 0);
>> +}
>> +
>> static inline struct ovs_deferred_action *
>> ovs_action_stack_pop(struct ovs_action_stack *stack)
>> {
>> diff --git a/datapath/datapath.c b/datapath/datapath.c
>> index bcfdd62..981862f 100644
>> --- a/datapath/datapath.c
>> +++ b/datapath/datapath.c
>> @@ -580,12 +580,12 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
>> OVS_CB(packet)->input_vport = input_vport;
>>
>> local_bh_disable();
>> - err = ovs_execute_actions(dp, packet);
>> + ovs_execute_actions(dp, packet);
>> local_bh_enable();
>> rcu_read_unlock();
>>
>> ovs_flow_free(flow, false);
>> - return err;
>> + return 0;
>>
>> err_unlock:
>> rcu_read_unlock();
>> --
>> 1.9.1
>>
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> http://openvswitch.org/mailman/listinfo/dev
More information about the dev
mailing list