[ovs-dev] [PATCH RFC net-next] openvswitch: Queue upcalls to userspace in per-port round-robin order

Matteo Croce mcroce at redhat.com
Wed Jul 4 14:23:42 UTC 2018


From: Stefano Brivio <sbrivio at redhat.com>

Open vSwitch sends to userspace all received packets that have
no associated flow (thus doing an "upcall"). Then the userspace
program creates a new flow and determines the actions to apply
based on its configuration.

When a single port generates a high rate of upcalls, it can
prevent other ports from dispatching their own upcalls. vswitchd
overcomes this problem by creating many netlink sockets for each
port, but it quickly exceeds any reasonable maximum number of
open files when dealing with huge amounts of ports.

This patch queues all the upcalls into a list, ordering them in
a per-port round-robin fashion, and schedules a deferred work to
queue them to userspace.

The algorithm to queue upcalls in a round-robin fashion,
provided by Stefano, is based on these two rules:
 - upcalls for a given port must be inserted after all the other
   occurrences of upcalls for the same port already in the queue,
   in order to avoid out-of-order upcalls for a given port
 - insertion happens once the highest upcall count for any given
   port (excluding the one currently at hand) is greater than the
   count for the port we're queuing to -- if this condition is
   never true, upcall is queued at the tail. This results in a
   per-port round-robin order.

In order to implement a fair round-robin behaviour, a variable
queueing delay is introduced. This will be zero if the upcalls
rate is below a given threshold, and grows linearly with the
queue utilisation (i.e. upcalls rate) otherwise.

This ensures fairness among ports under load and with few
netlink sockets.

Signed-off-by: Matteo Croce <mcroce at redhat.com>
Co-authored-by: Stefano Brivio <sbrivio at redhat.com>
---
 net/openvswitch/datapath.c | 143 ++++++++++++++++++++++++++++++++++---
 net/openvswitch/datapath.h |  27 ++++++-
 2 files changed, 161 insertions(+), 9 deletions(-)

diff --git a/net/openvswitch/datapath.c b/net/openvswitch/datapath.c
index 0f5ce77460d4..2cfd504562d8 100644
--- a/net/openvswitch/datapath.c
+++ b/net/openvswitch/datapath.c
@@ -59,6 +59,10 @@
 #include "vport-internal_dev.h"
 #include "vport-netdev.h"
 
+#define UPCALL_QUEUE_TIMEOUT	msecs_to_jiffies(10)
+#define UPCALL_QUEUE_MAX_DELAY	msecs_to_jiffies(10)
+#define UPCALL_QUEUE_MAX_LEN	200
+
 unsigned int ovs_net_id __read_mostly;
 
 static struct genl_family dp_packet_genl_family;
@@ -225,6 +229,116 @@ void ovs_dp_detach_port(struct vport *p)
 	ovs_vport_del(p);
 }
 
+static void ovs_dp_upcall_dequeue(struct work_struct *work)
+{
+	struct datapath *dp = container_of(work, struct datapath,
+					   upcalls.work.work);
+	struct dp_upcall_info *u, *n;
+
+	spin_lock_bh(&dp->upcalls.lock);
+	list_for_each_entry_safe(u, n, &dp->upcalls.list, list) {
+		if (unlikely(ovs_dp_upcall(dp, u->skb, &u->key, u, 0)))
+			kfree_skb(u->skb);
+		else
+			consume_skb(u->skb);
+		kfree(u);
+	}
+	dp->upcalls.len = 0;
+	INIT_LIST_HEAD(&dp->upcalls.list);
+	spin_unlock_bh(&dp->upcalls.lock);
+}
+
+/* Calculate the delay of the deferred work which sends the upcalls. If it ran
+ * more than UPCALL_QUEUE_TIMEOUT ago, schedule the work immediately. Otherwise
+ * return a time between 0 and UPCALL_QUEUE_MAX_DELAY, depending linearly on the
+ * queue utilisation.
+ */
+static unsigned long ovs_dp_upcall_delay(int queue_len, unsigned long last_run)
+{
+	if (jiffies - last_run >= UPCALL_QUEUE_TIMEOUT)
+		return 0;
+
+	return UPCALL_QUEUE_MAX_DELAY -
+	       UPCALL_QUEUE_MAX_DELAY * queue_len / UPCALL_QUEUE_MAX_LEN;
+}
+
+static int ovs_dp_upcall_queue_roundrobin(struct datapath *dp,
+					  struct dp_upcall_info *upcall)
+{
+	struct list_head *head = &dp->upcalls.list;
+	struct dp_upcall_info *here = NULL, *pos;
+	bool find_next = true;
+	unsigned long delay;
+	int err = 0;
+	u8 count;
+
+	spin_lock_bh(&dp->upcalls.lock);
+	if (dp->upcalls.len > UPCALL_QUEUE_MAX_LEN) {
+		err = -ENOSPC;
+		goto out;
+	}
+
+	/* Insert upcalls in the list in a per-port round-robin fashion, look
+	 * for insertion point:
+	 * - to avoid out-of-order per-port upcalls, we can insert only after
+	 *   the last occurrence of upcalls for the same port
+	 * - insert upcall only after we reach a count of occurrences for a
+	 *   given port greater than the one we're inserting this upcall for
+	 */
+	list_for_each_entry(pos, head, list) {
+		/* Count per-port upcalls. */
+		if (dp->upcalls.count[pos->port_no] == U8_MAX - 1) {
+			err = -ENOSPC;
+			goto out_clear;
+		}
+		dp->upcalls.count[pos->port_no]++;
+
+		if (pos->port_no == upcall->port_no) {
+			/* Another upcall for the same port: move insertion
+			 * point here, keep looking for insertion condition to
+			 * be still met further on.
+			 */
+			find_next = true;
+			here = pos;
+			continue;
+		}
+
+		count = dp->upcalls.count[upcall->port_no];
+		if (find_next && dp->upcalls.count[pos->port_no] >= count) {
+			/* Insertion condition met: no need to look further,
+			 * unless another upcall for the same port occurs later.
+			 */
+			find_next = false;
+			here = pos;
+		}
+	}
+
+	if (here)
+		list_add(&upcall->list, &here->list);
+	else
+		list_add_tail(&upcall->list, head);
+
+	dp->upcalls.len++;
+
+out_clear:
+	/* Clear the per-port counters we used, so that we don't need to zero
+	 * out the counters array on every insertion.
+	 */
+	list_for_each_entry_reverse(pos, head, list)
+		dp->upcalls.count[pos->port_no] = 0;
+
+out:
+	spin_unlock_bh(&dp->upcalls.lock);
+
+	if (!err) {
+		delay = ovs_dp_upcall_delay(dp->upcalls.len,
+					    dp->upcalls.last_run);
+		mod_delayed_work(system_wq, &dp->upcalls.work, delay);
+	}
+
+	return err;
+}
+
 /* Must be called with rcu_read_lock. */
 void ovs_dp_process_packet(struct sk_buff *skb, struct sw_flow_key *key)
 {
@@ -241,18 +355,25 @@ void ovs_dp_process_packet(struct sk_buff *skb, struct sw_flow_key *key)
 	/* Look up flow. */
 	flow = ovs_flow_tbl_lookup_stats(&dp->table, key, &n_mask_hit);
 	if (unlikely(!flow)) {
-		struct dp_upcall_info upcall;
+		struct dp_upcall_info *upcall;
 		int error;
 
-		memset(&upcall, 0, sizeof(upcall));
-		upcall.cmd = OVS_PACKET_CMD_MISS;
-		upcall.portid = ovs_vport_find_upcall_portid(p, skb);
-		upcall.mru = OVS_CB(skb)->mru;
-		error = ovs_dp_upcall(dp, skb, key, &upcall, 0);
+		upcall = kzalloc(sizeof(*upcall), GFP_ATOMIC);
+		if (!upcall) {
+			kfree_skb(skb);
+			stats_counter = &stats->n_missed;
+			goto out;
+		}
+
+		upcall->cmd = OVS_PACKET_CMD_MISS;
+		upcall->portid = ovs_vport_find_upcall_portid(p, skb);
+		upcall->port_no = p->port_no;
+		upcall->mru = OVS_CB(skb)->mru;
+		upcall->skb = skb;
+		upcall->key = *key;
+		error = ovs_dp_upcall_queue_roundrobin(dp, upcall);
 		if (unlikely(error))
 			kfree_skb(skb);
-		else
-			consume_skb(skb);
 		stats_counter = &stats->n_missed;
 		goto out;
 	}
@@ -1589,6 +1710,10 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct genl_info *info)
 	for (i = 0; i < DP_VPORT_HASH_BUCKETS; i++)
 		INIT_HLIST_HEAD(&dp->ports[i]);
 
+	INIT_LIST_HEAD(&dp->upcalls.list);
+	spin_lock_init(&dp->upcalls.lock);
+	INIT_DELAYED_WORK(&dp->upcalls.work, ovs_dp_upcall_dequeue);
+
 	err = ovs_meters_init(dp);
 	if (err)
 		goto err_destroy_ports_array;
@@ -1658,6 +1783,8 @@ static void __dp_destroy(struct datapath *dp)
 {
 	int i;
 
+	cancel_delayed_work_sync(&dp->upcalls.work);
+
 	for (i = 0; i < DP_VPORT_HASH_BUCKETS; i++) {
 		struct vport *vport;
 		struct hlist_node *n;
diff --git a/net/openvswitch/datapath.h b/net/openvswitch/datapath.h
index c9eb267c6f7e..f8b8bb679929 100644
--- a/net/openvswitch/datapath.h
+++ b/net/openvswitch/datapath.h
@@ -24,6 +24,7 @@
 #include <linux/mutex.h>
 #include <linux/netdevice.h>
 #include <linux/skbuff.h>
+#include <linux/workqueue.h>
 #include <linux/u64_stats_sync.h>
 #include <net/ip_tunnels.h>
 
@@ -70,6 +71,12 @@ struct dp_stats_percpu {
  * @net: Reference to net namespace.
  * @max_headroom: the maximum headroom of all vports in this datapath; it will
  * be used by all the internal vports in this dp.
+ * @upcalls.work: sends queued upcalls to userspace.
+ * @upcalls.list: list of queued upcalls.
+ * @upcalls.len: elements in upcall_list.
+ * @upcalls.lock: lock for the upcall list.
+ * @upcalls.count: array used to sort the upcalls delivered to userspace.
+ * @upcalls.last_run: timestamp of last work run.
  *
  * Context: See the comment on locking at the top of datapath.c for additional
  * locking information.
@@ -96,6 +103,16 @@ struct datapath {
 
 	/* Switch meters. */
 	struct hlist_head *meters;
+
+	/* Upcalls queue handling. */
+	struct {
+		struct delayed_work work;
+		struct list_head list;
+		int len;
+		spinlock_t lock;	/* Protects len and upcall list. */
+		u8 count[DP_MAX_PORTS];
+		unsigned long last_run;
+	} upcalls;
 };
 
 /**
@@ -116,7 +133,7 @@ struct ovs_skb_cb {
 #define OVS_CB(skb) ((struct ovs_skb_cb *)(skb)->cb)
 
 /**
- * struct dp_upcall - metadata to include with a packet to send to userspace
+ * struct dp_upcall_info - Upcall for userspace, including metadata to send
  * @cmd: One of %OVS_PACKET_CMD_*.
  * @userdata: If nonnull, its variable-length value is passed to userspace as
  * %OVS_PACKET_ATTR_USERDATA.
@@ -125,6 +142,10 @@ struct ovs_skb_cb {
  * counter.
  * @egress_tun_info: If nonnull, becomes %OVS_PACKET_ATTR_EGRESS_TUN_KEY.
  * @mru: If not zero, Maximum received IP fragment size.
+ * @list: list within vport for upcall queue handling.
+ * @skb: the socket buffer that generated the upcall.
+ * @key: flow key.
+ * @port_no: port number within the datapath.
  */
 struct dp_upcall_info {
 	struct ip_tunnel_info *egress_tun_info;
@@ -134,6 +155,10 @@ struct dp_upcall_info {
 	u32 portid;
 	u8 cmd;
 	u16 mru;
+	struct list_head list;
+	struct sk_buff *skb;
+	struct sw_flow_key key;
+	u16 port_no;
 };
 
 /**
-- 
2.17.1



More information about the dev mailing list