[ovs-dev] [v2 4/4] datapath: Implement recirc action without recursion

Andy Zhou azhou at nicira.com
Tue Aug 19 06:41:43 UTC 2014


Since kernel stack is limited in size, it is not wise to using
recursive function with large stack frames.

This patch provides an alternative implementation of recirc action with
out using recursion.

A recirc action can be embedded in the middle of an actions list.
Instead of immediately (and recursively) process the recirc action,
the remaining the actions list is pushed on a per-cpu fixed size stack,
the skb needs recirc is then returned.

ovs_execute_actions now only finishes whenever both per-cpu stack is
empty, and the actions list does not have recirc action. Recirc skb
is always process before check the stack to preserve the correct
execution order.a

The depth of the private stack limit the depth of recirc action.

Signed-off-by: Andy Zhou <azhou at nicira.com>
---
 datapath/actions.c  | 172 +++++++++++++++++++++++++++++++++++++---------------
 datapath/actions.h  |   4 +-
 datapath/datapath.c |   4 +-
 3 files changed, 126 insertions(+), 54 deletions(-)

diff --git a/datapath/actions.c b/datapath/actions.c
index 6e40e66..54b7abc 100644
--- a/datapath/actions.c
+++ b/datapath/actions.c
@@ -73,6 +73,24 @@ action_stack_push(struct ovs_action_stack *stack)
 	return &stack->stack[stack->top++];
 }
 
+static inline struct ovs_deferred_action *
+push_actions(struct sk_buff *skb, const struct nlattr *attr, int len)
+{
+	struct ovs_action_stack *stack;
+	struct ovs_deferred_action *da;
+
+	stack = &__get_cpu_var(action_stacks);
+	da = action_stack_push(stack);
+
+	if (da) {
+		da->skb = skb;
+		da->actions = attr;
+		da->rem = len;
+	}
+
+	return da;
+}
+
 static void flow_key_clone(struct sk_buff *skb, struct sw_flow_key *new_key)
 {
 	*new_key = *OVS_CB(skb)->pkt_key;
@@ -173,8 +191,8 @@ static bool is_skb_flow_key_valid(struct sk_buff *skb)
 	return !!OVS_CB(skb)->pkt_key->eth.type;
 }
 
-static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
-			      const struct nlattr *attr, int len);
+static bool do_execute_actions(struct datapath *dp, struct sk_buff *skb,
+			       const struct nlattr *attr, int len);
 
 static int make_writable(struct sk_buff *skb, int write_len)
 {
@@ -716,12 +734,10 @@ static bool last_action(const struct nlattr *a, int rem)
 }
 
 static int sample(struct datapath *dp, struct sk_buff *skb,
-		  const struct nlattr *attr)
+		  const struct nlattr *attr, bool *deferred)
 {
-	struct sw_flow_key sample_key;
 	const struct nlattr *acts_list = NULL;
 	const struct nlattr *a;
-	struct sk_buff *sample_skb;
 	int rem;
 
 	for (a = nla_data(attr), rem = nla_len(attr); rem > 0;
@@ -750,24 +766,30 @@ static int sample(struct datapath *dp, struct sk_buff *skb,
 	if (likely(rem == 0 ||
 		   (nla_type(a) == OVS_ACTION_ATTR_USERSPACE &&
 		    last_action(a, rem)))) {
-		sample_skb = skb;
-		skb_get(skb);
+		if (rem)
+			return output_userspace(dp, skb, a);
 	} else {
-		sample_skb = skb_clone(skb, GFP_ATOMIC);
-		if (!sample_skb)
-			/* Skip the sample action when out of memory. */
-			return 0;
+		struct ovs_deferred_action *da;
+
+		skb = skb_clone(skb, GFP_ATOMIC);
+		if (!skb)
+			/* Out of memory, skip the sample action. */
+			return -ENOMEM;
+
+		da = push_actions(skb, a, rem);
+		if (!da) {
+			if (net_ratelimit())
+				pr_warn("%s: stack limit reached, drop sample action\n",
+					ovs_dp_name(dp));
+			consume_skb(skb);
+			return -ELOOP;
+		}
 
-		flow_key_clone(skb, &sample_key);
+		flow_key_clone(skb, &da->pkt_key);
+		*deferred = true;
 	}
 
-	/* Note that do_execute_actions() never consumes skb.
-	 * In the case where skb has been cloned above it is the clone that
-	 * is consumed.  Otherwise the skb_get(skb) call prevents
-	 * consumption by do_execute_actions(). Thus, it is safe to simply
-	 * return the error code and let the caller (also
-	 * do_execute_actions()) free skb on error. */
-	return do_execute_actions(dp, sample_skb, a, rem);
+	return 0;
 }
 
 static void execute_hash(struct sk_buff *skb, const struct nlattr *attr)
@@ -839,48 +861,66 @@ static int execute_set_action(struct sk_buff *skb,
 
 
 static int execute_recirc(struct datapath *dp, struct sk_buff *skb,
-			  const struct nlattr *a, int rem)
+			  const struct nlattr *a, int rem, bool *deferred)
 {
-	struct sw_flow_key recirc_key;
+	struct ovs_action_stack *stack;
+	struct ovs_deferred_action *da;
 	int err;
 
-	if (!last_action(a, rem)) {
-		/* Recirc action is the not the last action
-		 * of the action list. */
-		skb = skb_clone(skb, GFP_ATOMIC);
-
-		/* Skip the recirc action when out of memory, but
-		 * continue on with the rest of the action list. */
-		if (!skb)
-			return 0;
-	}
-
 	if (!is_skb_flow_key_valid(skb)) {
 		err = ovs_flow_key_update(skb, OVS_CB(skb)->pkt_key);
-		if (err) {
-			kfree_skb(skb);
+		if (err)
 			return err;
-		}
-	}
+	};
 	BUG_ON(!is_skb_flow_key_valid(skb));
 
-	if (!last_action(a, rem))
-		flow_key_clone(skb, &recirc_key);
+	stack = &__get_cpu_var(action_stacks);
+	da = action_stack_push(stack);
+	if (!da) {
+		/* Stack is full, drop the skb. */
+		if (net_ratelimit())
+			pr_warn("%s: stack limit reached, drop recirc action\n",
+				ovs_dp_name(dp));
+		return -ELOOP;
+	}
+
+	if (!last_action(a, rem)) {
+		/* Recirc action is the not the last action,
+		 * Push the remaining actions on to the stack,
+		 * clone skb and clone pkt key.
+		 *
+		 * Skip the recirc action if the stack is full, or
+		 * we are out of memory to clone the skb.
+		 */
+		skb = skb_clone(skb, GFP_ATOMIC);
+		if (!skb) {
+			/* Out of memory, do not recirc*/
+			action_stack_pop(stack);
+			return -ENOMEM;
+		}
+
+		flow_key_clone(skb, &da->pkt_key);
+	} else {
+		skb_get(skb);
+	}
 
 	flow_key_set_recirc_id(skb, nla_get_u32(a));
-	ovs_dp_process_packet(skb);
+	push_actions(skb, NULL, 0);
+	*deferred = true;
+
 	return 0;
 }
 
 /* Execute a list of actions against 'skb'. */
-static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
-			const struct nlattr *attr, int len)
+static bool do_execute_actions(struct datapath *dp, struct sk_buff *skb,
+			       const struct nlattr *attr, int len)
 {
 	/* Every output action needs a separate clone of 'skb', but the common
 	 * case is just a single output action, so that doing a clone and
 	 * then freeing the original skbuff is wasteful.  So the following code
 	 * is slightly obscure just to avoid that. */
 	int prev_port = -1;
+	bool deferred = false;
 	const struct nlattr *a;
 	int rem;
 
@@ -921,7 +961,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
 		case OVS_ACTION_ATTR_PUSH_VLAN:
 			err = push_vlan(skb, nla_data(a));
 			if (unlikely(err)) /* skb already freed. */
-				return err;
+				return NULL;
 			break;
 
 		case OVS_ACTION_ATTR_POP_VLAN:
@@ -929,7 +969,7 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
 			break;
 
 		case OVS_ACTION_ATTR_RECIRC:
-			err = execute_recirc(dp, skb, a, rem);
+			err = execute_recirc(dp, skb, a, rem, &deferred);
 			break;
 
 		case OVS_ACTION_ATTR_SET:
@@ -937,13 +977,13 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
 			break;
 
 		case OVS_ACTION_ATTR_SAMPLE:
-			err = sample(dp, skb, a);
+			err = sample(dp, skb, a, &deferred);
 			break;
 		}
 
 		if (unlikely(err)) {
-			kfree_skb(skb);
-			return err;
+			consume_skb(skb);
+			return NULL;
 		}
 	}
 
@@ -952,16 +992,48 @@ static int do_execute_actions(struct datapath *dp, struct sk_buff *skb,
 	else
 		consume_skb(skb);
 
-	return 0;
+	return deferred;
+}
+
+static void ovs_process_deferred_packets(struct datapath *dp)
+{
+	struct ovs_action_stack *stack;
+
+	stack = &__get_cpu_var(action_stacks);
+
+	/* Finishing executing all deferred actions. */
+	while (!action_stack_is_empty(stack)) {
+		struct ovs_deferred_action *da = action_stack_pop(stack);
+		struct sk_buff *skb = da->skb;
+		const struct nlattr *actions = da->actions;
+		int rem = da->rem;
+
+		if (!actions && ovs_dp_packet_flow_lookup(dp, skb)) {
+			struct sw_flow_actions *acts;
+
+			acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
+
+			actions = acts->actions;
+			rem = acts->actions_len;
+		}
+
+		if (actions)
+			do_execute_actions(dp, skb, actions, rem);
+	}
 }
 
 /* Execute a list of actions against 'skb'. */
-int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
+void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb)
 {
-	struct sw_flow_actions *acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
+	struct sw_flow_actions *acts;
+	bool deferred;
 
-	return do_execute_actions(dp, skb, acts->actions, acts->actions_len);
+	acts = rcu_dereference(OVS_CB(skb)->flow->sf_acts);
+	deferred = do_execute_actions(dp, skb, acts->actions,
+				      acts->actions_len);
 
+	if (deferred)
+		ovs_process_deferred_packets(dp);
 }
 
 void ovs_action_stacks_init(void)
diff --git a/datapath/actions.h b/datapath/actions.h
index 19b3091..ca53374 100644
--- a/datapath/actions.h
+++ b/datapath/actions.h
@@ -33,7 +33,7 @@
 
 struct ovs_deferred_action {
 	struct sk_buff *skb;
-	struct nlattr *actions;
+	const struct nlattr *actions;
 	int rem;
 
 	/* Store pkt_key clone during recirc */
@@ -48,7 +48,7 @@ struct ovs_action_stack {
 
 void ovs_action_stacks_init(void);
 void ovs_action_stacks_exit(void);
-int ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
+void ovs_execute_actions(struct datapath *dp, struct sk_buff *skb);
 void ovs_process_action_stack(void);
 
 #endif /* actions.h */
diff --git a/datapath/datapath.c b/datapath/datapath.c
index f3c23ee..b623304 100644
--- a/datapath/datapath.c
+++ b/datapath/datapath.c
@@ -593,12 +593,12 @@ static int ovs_packet_cmd_execute(struct sk_buff *skb, struct genl_info *info)
 	OVS_CB(packet)->input_vport = input_vport;
 
 	local_bh_disable();
-	err = ovs_execute_actions(dp, packet);
+	ovs_execute_actions(dp, packet);
 	local_bh_enable();
 	rcu_read_unlock();
 
 	ovs_flow_free(flow, false);
-	return err;
+	return 0;
 
 err_unlock:
 	rcu_read_unlock();
-- 
1.9.1




More information about the dev mailing list