[ovs-dev] [RFC v2 2/2] ingress scheduling: Provide per interface ingress priority
Billy O'Mahony
billy.o.mahony at intel.com
Wed Mar 28 22:11:58 UTC 2018
Allow configuration to specify an ingress priority for interfaces.
Modify ovs-netdev datapath to act on this configuration so that packets
on interfaces with a higher priority will tend be processed ahead of
packets on lower priority interfaces. This protects traffic on higher
priority interfaces from loss and latency as PMDs get overloaded.
Signed-off-by: Billy O'Mahony <billy.o.mahony at intel.com>
---
include/openvswitch/ofp-parse.h | 3 ++
lib/dpif-netdev.c | 103 +++++++++++++++++++++++++++++++++++-----
lib/netdev-bsd.c | 1 +
lib/netdev-dpdk.c | 13 ++++-
lib/netdev-dummy.c | 1 +
lib/netdev-linux.c | 1 +
lib/netdev-provider.h | 11 ++++-
lib/netdev-vport.c | 1 +
lib/netdev.c | 42 ++++++++++++++++
lib/netdev.h | 2 +
vswitchd/bridge.c | 2 +
11 files changed, 165 insertions(+), 15 deletions(-)
diff --git a/include/openvswitch/ofp-parse.h b/include/openvswitch/ofp-parse.h
index 3fdd468..d77ab8f 100644
--- a/include/openvswitch/ofp-parse.h
+++ b/include/openvswitch/ofp-parse.h
@@ -33,6 +33,9 @@ extern "C" {
struct match;
struct mf_field;
struct ofputil_port_map;
+struct tun_table;
+struct flow_wildcards;
+struct ofputil_port_map;
struct ofp_protocol {
const char *name;
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index b07fc6b..736d0b6 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -22,6 +22,7 @@
#include <fcntl.h>
#include <inttypes.h>
#include <net/if.h>
+#include <math.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <stdint.h>
@@ -42,6 +43,7 @@
#include "dpif.h"
#include "dpif-netdev-perf.h"
#include "dpif-provider.h"
+#include "netdev-provider.h"
#include "dummy.h"
#include "fat-rwlock.h"
#include "flow.h"
@@ -487,6 +489,7 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
struct polled_queue {
struct dp_netdev_rxq *rxq;
odp_port_t port_no;
+ uint8_t priority;
};
/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
@@ -626,6 +629,12 @@ struct dpif_netdev {
uint64_t last_port_seq;
};
+static void
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_rxq *rxq,
+ odp_port_t port_no,
+ unsigned int *rxd_cnt,
+ unsigned int *txd_cnt);
static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
struct dp_netdev_port **portp)
OVS_REQUIRES(dp->port_mutex);
@@ -3259,15 +3268,16 @@ dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
return output_cnt;
}
-static int
+static void
dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_rxq *rxq,
- odp_port_t port_no)
+ odp_port_t port_no,
+ unsigned int *rxd_cnt,
+ unsigned int *txd_cnt)
{
struct dp_packet_batch batch;
struct cycle_timer timer;
int error;
- int batch_cnt = 0, output_cnt = 0;
uint64_t cycles;
/* Measure duration for polling and processing rx burst. */
@@ -3279,17 +3289,17 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
error = netdev_rxq_recv(rxq->rx, &batch);
if (!error) {
/* At least one packet received. */
+ *rxd_cnt = batch.count;
*recirc_depth_get() = 0;
pmd_thread_ctx_time_update(pmd);
- batch_cnt = batch.count;
dp_netdev_input(pmd, &batch, port_no);
/* Assign processing cycles to rx queue. */
cycles = cycle_timer_stop(&pmd->perf_stats, &timer);
dp_netdev_rxq_add_cycles(rxq, RXQ_CYCLES_PROC_CURR, cycles);
- output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
+ *txd_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
} else {
/* Discard cycles. */
cycle_timer_stop(&pmd->perf_stats, &timer);
@@ -3299,11 +3309,11 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
netdev_rxq_get_name(rxq->rx), ovs_strerror(error));
}
+ *txd_cnt = 0;
}
pmd->ctx.last_rxq = NULL;
- return batch_cnt + output_cnt;
}
static struct tx_port *
@@ -3935,11 +3945,16 @@ dpif_netdev_run(struct dpif *dpif)
HMAP_FOR_EACH (port, node, &dp->ports) {
if (!netdev_is_pmd(port->netdev)) {
int i;
+ unsigned int rxd_cnt;
+ unsigned int txd_cnt;
for (i = 0; i < port->n_rxq; i++) {
- if (dp_netdev_process_rxq_port(non_pmd,
- &port->rxqs[i],
- port->port_no)) {
+ dp_netdev_process_rxq_port(non_pmd,
+ &port->rxqs[i],
+ port->port_no,
+ &rxd_cnt,
+ &txd_cnt);
+ if (rxd_cnt) {
need_to_flush = false;
}
}
@@ -4068,6 +4083,21 @@ pmd_free_static_tx_qid(struct dp_netdev_pmd_thread *pmd)
}
static int
+get_nb_rxqdesc (struct netdev *netdev) {
+ struct smap smap = SMAP_INITIALIZER(&smap);
+ netdev_get_config(netdev, &smap);
+ const char *n_rxq_s = smap_get(&smap, "configured_rxq_descriptors");
+ long n_rxq;
+ str_to_long(n_rxq_s, 10, &n_rxq);
+ smap_destroy(&smap);
+ return (int) n_rxq;
+}
+#define MAX_PRIO_READS (48)
+#define MIN_PRIO_READS (1)
+#define RAW_TO_NORM_FN_EXP (-0.0187)
+#define PRIO_TO_MAX_READS_SCALAR (10)
+
+static int
pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
struct polled_queue **ppoll_list)
{
@@ -4079,13 +4109,54 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
poll_list = xrealloc(poll_list, hmap_count(&pmd->poll_list)
* sizeof *poll_list);
+ /* Find max rxq len - used to weight raw priority to account for differing
+ * queue lengths. Has no effect on q's for non-prioritized netdevs. */
+ int max_nb_rxqdesc = 0;
+ HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+ int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+ if (nb_rxqdesc > max_nb_rxqdesc) {
+ max_nb_rxqdesc = nb_rxqdesc;
+ }
+ }
+
+ /* Populate ppoll_list; Assign 'raw' queue q priorities. */
i = 0;
+ uint16_t min_prio = UINT16_MAX;
+ uint16_t max_prio = 0;
HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
poll_list[i].rxq = poll->rxq;
poll_list[i].port_no = poll->rxq->port->port_no;
+
+ int nb_rxqdesc = get_nb_rxqdesc(poll->rxq->rx->netdev);
+ int prio_max_reads = poll->rxq->rx->netdev->ingress_prio
+ * PRIO_TO_MAX_READS_SCALAR * max_nb_rxqdesc
+ / nb_rxqdesc;
+ poll_list[i].priority = prio_max_reads;
+ if (prio_max_reads > max_prio) {
+ max_prio = prio_max_reads;
+ }
+ if (prio_max_reads < min_prio) {
+ min_prio = prio_max_reads;
+ }
i++;
}
+ /* Normalize 'raw' queue priorities. Adjust so that:
+ * 1. MAX_PRIO_READS is not exeeded.
+ * 2. The lowest prio_read value for the PMD is 1.
+ * 3. The ratio between raw prio_read values is more or less maintained
+ * for lower values but higher values reduced to meet criterion 1.
+ * Using a exponential fn(x): x = a + b^(-ex) with well-chosen parameters
+ * meets these requirements. */
+ int end_idx = i;
+ for (i = 0; i < end_idx; i++) {
+ int current = poll_list[i].priority;
+ current -= min_prio;
+ poll_list[i].priority = (int) MAX_PRIO_READS -
+ (MAX_PRIO_READS - MIN_PRIO_READS) *
+ exp(RAW_TO_NORM_FN_EXP * current);
+ }
+
pmd_load_cached_ports(pmd);
ovs_mutex_unlock(&pmd->port_mutex);
@@ -4104,7 +4175,6 @@ pmd_thread_main(void *f_)
bool exiting;
int poll_cnt;
int i;
- int process_packets = 0;
poll_list = NULL;
@@ -4142,10 +4212,17 @@ reload:
pmd_perf_start_iteration(s);
for (i = 0; i < poll_cnt; i++) {
- process_packets =
+ unsigned int priority_max_reads = poll_list[i].priority;
+ unsigned int rxd_cnt;
+ unsigned int txd_cnt;
+
+ do {
dp_netdev_process_rxq_port(pmd, poll_list[i].rxq,
- poll_list[i].port_no);
- iter_packets += process_packets;
+ poll_list[i].port_no,
+ &rxd_cnt, &txd_cnt);
+ iter_packets = iter_packets + rxd_cnt + txd_cnt;
+ priority_max_reads--;
+ } while (rxd_cnt >= NETDEV_MAX_BURST && priority_max_reads);
}
if (!iter_packets) {
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index 05974c1..ddfbdf2 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -1506,6 +1506,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
netdev_bsd_get_etheraddr, \
netdev_bsd_get_mtu, \
NULL, /* set_mtu */ \
+ NULL, /* set_ingress_sched */ \
netdev_bsd_get_ifindex, \
netdev_bsd_get_carrier, \
NULL, /* get_carrier_resets */ \
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index af9843a..f9915fa 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -39,6 +39,8 @@
#include <rte_vhost.h>
#include <rte_version.h>
+#include <openvswitch/ofp-parse.h>
+#include <openvswitch/ofp-util.h>
#include "dirs.h"
#include "dp-packet.h"
#include "dpdk.h"
@@ -88,6 +90,7 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
+ sizeof(struct dp_packet) \
+ RTE_PKTMBUF_HEADROOM), \
RTE_CACHE_LINE_SIZE)
+#define MAX_PORT_PRIO 3
#define NETDEV_DPDK_MBUF_ALIGN 1024
#define NETDEV_DPDK_MAX_PKT_LEN 9728
@@ -808,6 +811,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev)
return -diag;
}
+
diag = rte_eth_dev_start(dev->port_id);
if (diag) {
VLOG_ERR("Interface %s start error: %s", dev->up.name,
@@ -2213,6 +2217,13 @@ netdev_dpdk_set_mtu(struct netdev *netdev, int mtu)
}
static int
+netdev_dpdk_set_ingress_sched(struct netdev *netdev OVS_UNUSED,
+ const struct smap *ingress_sched_smap OVS_UNUSED)
+{
+ return ENOTSUP; /* XXX placeholder for eventual ingress filtering */
+}
+
+static int
netdev_dpdk_get_carrier(const struct netdev *netdev, bool *carrier);
static int
@@ -3550,7 +3561,6 @@ netdev_dpdk_reconfigure(struct netdev *netdev)
&& dev->txq_size == dev->requested_txq_size
&& dev->socket_id == dev->requested_socket_id) {
/* Reconfiguration is unnecessary */
-
goto out;
}
@@ -3742,6 +3752,7 @@ unlock:
netdev_dpdk_get_etheraddr, \
netdev_dpdk_get_mtu, \
netdev_dpdk_set_mtu, \
+ netdev_dpdk_set_ingress_sched, \
netdev_dpdk_get_ifindex, \
GET_CARRIER, \
netdev_dpdk_get_carrier_resets, \
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 8af9e1a..750ea7f 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1415,6 +1415,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
netdev_dummy_get_etheraddr, \
netdev_dummy_get_mtu, \
netdev_dummy_set_mtu, \
+ NULL, /* set_ingress_sched */ \
netdev_dummy_get_ifindex, \
NULL, /* get_carrier */ \
NULL, /* get_carrier_resets */ \
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 7ea40a8..ec48d24 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2867,6 +2867,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
netdev_linux_get_etheraddr, \
netdev_linux_get_mtu, \
netdev_linux_set_mtu, \
+ NULL, /* set_ingress_sched */ \
netdev_linux_get_ifindex, \
netdev_linux_get_carrier, \
netdev_linux_get_carrier_resets, \
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 25bd671..5165ea9 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -78,7 +78,8 @@ struct netdev {
* modify them. */
int n_txq;
int n_rxq;
- struct shash_node *node; /* Pointer to element in global map. */
+ int ingress_prio; /* 0 lowest to 3 highest. Default 0. */
+ struct shash_node *node; /* Pointer to element in global map. */
struct ovs_list saved_flags_list; /* Contains "struct netdev_saved_flags". */
};
@@ -412,6 +413,14 @@ struct netdev_class {
* null if it would always return EOPNOTSUPP. */
int (*set_mtu)(struct netdev *netdev, int mtu);
+ /* Sets 'netdev''s ingress scheduling policy.
+ *
+ * If 'netdev' does not support the specified policy then this function
+ * should return EOPNOTSUPP. This function may be set to null if it would
+ * always return EOPNOTSUPP. */
+ int (*set_ingress_sched)(struct netdev *netdev,
+ const struct smap *ingress_sched_smap);
+
/* Returns the ifindex of 'netdev', if successful, as a positive number.
* On failure, returns a negative errno value.
*
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 52aa12d..b77b28f 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -902,6 +902,7 @@ netdev_vport_get_ifindex(const struct netdev *netdev_)
netdev_vport_get_etheraddr, \
NULL, /* get_mtu */ \
NULL, /* set_mtu */ \
+ NULL, /* set_ingress_sched */ \
GET_IFINDEX, \
NULL, /* get_carrier */ \
NULL, /* get_carrier_resets */ \
diff --git a/lib/netdev.c b/lib/netdev.c
index b303a7d..8f5d129 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -42,6 +42,7 @@
#include "netdev-dpdk.h"
#include "netdev-provider.h"
#include "netdev-vport.h"
+#include "openvswitch/ofp-parse.h"
#include "odp-netlink.h"
#include "openflow/openflow.h"
#include "packets.h"
@@ -978,6 +979,47 @@ netdev_mtu_is_user_config(struct netdev *netdev)
return netdev->mtu_user_config;
}
+/* Sets the Ingress Scheduling policy of 'netdev'.
+ *
+ * If successful, returns 0. Returns EOPNOTSUPP if 'netdev' does not support
+ * the specified policy */
+int
+netdev_set_ingress_sched(struct netdev *netdev,
+ const struct smap *ingress_sched_smap)
+{
+ /* Extract port priority here; It is common to all netdevs. */
+ char *mallocd_err_str; /* str_to_x mallocs a str we'll need to free */
+ uint8_t port_prio;
+ const char *port_prio_str = smap_get(ingress_sched_smap, "port_prio");
+
+ if (port_prio_str) {
+ mallocd_err_str = str_to_u8(port_prio_str, "port_prio",
+ &port_prio);
+ if (mallocd_err_str) {
+ VLOG_ERR ("%s while parsing ingress_sched:port_prio for %s",
+ mallocd_err_str, netdev->name);
+ free(mallocd_err_str);
+ mallocd_err_str = NULL;
+ return EINVAL;
+ }
+ netdev->ingress_prio = port_prio;
+ }
+
+ /* Pass rest of config on to specific netdev impl. */
+ const struct netdev_class *class = netdev->netdev_class;
+ int error;
+
+ error = class->set_ingress_sched ?
+ class->set_ingress_sched(netdev, ingress_sched_smap) : EOPNOTSUPP;
+ if (error && error != EOPNOTSUPP) {
+ VLOG_DBG_RL(&rl, "failed to set ingress scheduling for network " \
+ "device %s: %s",
+ netdev_get_name(netdev), ovs_strerror(error));
+ }
+
+ return error;
+}
+
/* Returns the ifindex of 'netdev', if successful, as a positive number. On
* failure, returns a negative errno value.
*
diff --git a/lib/netdev.h b/lib/netdev.h
index ff1b604..d49ba91 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -164,6 +164,8 @@ int netdev_get_mtu(const struct netdev *, int *mtup);
int netdev_set_mtu(struct netdev *, int mtu);
void netdev_mtu_user_config(struct netdev *, bool);
bool netdev_mtu_is_user_config(struct netdev *);
+int netdev_set_ingress_sched(struct netdev *,
+ const struct smap *ingress_sched_smap);
int netdev_get_ifindex(const struct netdev *);
int netdev_set_tx_multiq(struct netdev *, unsigned int n_txq);
enum netdev_pt_mode netdev_get_pt_mode(const struct netdev *);
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index d90997e..643b104 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -831,6 +831,7 @@ bridge_delete_or_reconfigure_ports(struct bridge *br)
}
iface_set_netdev_mtu(iface->cfg, iface->netdev);
+ netdev_set_ingress_sched(iface->netdev, &iface->cfg->ingress_sched);
/* If the requested OpenFlow port for 'iface' changed, and it's not
* already the correct port, then we might want to temporarily delete
@@ -1794,6 +1795,7 @@ iface_do_create(const struct bridge *br,
}
iface_set_netdev_mtu(iface_cfg, netdev);
+ netdev_set_ingress_sched(netdev, &iface_cfg->ingress_sched);
*ofp_portp = iface_pick_ofport(iface_cfg);
error = ofproto_port_add(br->ofproto, netdev, ofp_portp);
--
2.7.4
More information about the dev
mailing list