[ovs-dev] [PATCH v4] dpif-netdev: XPS (Transmit Packet Steering) implementation.
Ilya Maximets
i.maximets at samsung.com
Wed Jul 13 12:34:25 UTC 2016
If CPU number in pmd-cpu-mask is not divisible by the number of queues and
in a few more complex situations there may be unfair distribution of TX
queue-ids between PMD threads.
For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask
such distribution is possible:
<------------------------------------------------------------------------>
pmd thread numa_id 0 core_id 13:
port: vhost-user1 queue-id: 1
port: dpdk0 queue-id: 3
pmd thread numa_id 0 core_id 14:
port: vhost-user1 queue-id: 2
pmd thread numa_id 0 core_id 16:
port: dpdk0 queue-id: 0
pmd thread numa_id 0 core_id 17:
port: dpdk0 queue-id: 1
pmd thread numa_id 0 core_id 12:
port: vhost-user1 queue-id: 0
port: dpdk0 queue-id: 2
pmd thread numa_id 0 core_id 15:
port: vhost-user1 queue-id: 3
<------------------------------------------------------------------------>
As we can see above dpdk0 port polled by threads on cores:
12, 13, 16 and 17.
By design of dpif-netdev, there is only one TX queue-id assigned to each
pmd thread. This queue-id's are sequential similar to core-id's. And
thread will send packets to queue with exact this queue-id regardless
of port.
In previous example:
pmd thread on core 12 will send packets to tx queue 0
pmd thread on core 13 will send packets to tx queue 1
...
pmd thread on core 17 will send packets to tx queue 5
So, for dpdk0 port after truncating in netdev-dpdk:
core 12 --> TX queue-id 0 % 4 == 0
core 13 --> TX queue-id 1 % 4 == 1
core 16 --> TX queue-id 4 % 4 == 0
core 17 --> TX queue-id 5 % 4 == 1
As a result only 2 of 4 queues used.
To fix this issue some kind of XPS implemented in following way:
* TX queue-ids are allocated dynamically.
* When PMD thread first time tries to send packets to new port
it allocates less used TX queue for this port.
* PMD threads periodically performes revalidation of
allocated TX queue-ids. If queue wasn't used in last
XPS_TIMEOUT_MS milliseconds it will be freed while revalidation.
Reported-by: Zhihong Wang <zhihong.wang at intel.com>
Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
---
lib/dpif-netdev.c | 170 +++++++++++++++++++++++++++++++++++++-----------------
1 file changed, 117 insertions(+), 53 deletions(-)
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index e0107b7..6345944 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type {
PMD_N_CYCLES
};
+#define XPS_TIMEOUT_MS 500LL
+
/* A port in a netdev-based datapath. */
struct dp_netdev_port {
odp_port_t port_no;
@@ -256,6 +258,8 @@ struct dp_netdev_port {
struct netdev_saved_flags *sf;
unsigned n_rxq; /* Number of elements in 'rxq' */
struct netdev_rxq **rxq;
+ unsigned *txq_used; /* Number of threads that uses each tx queue. */
+ struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
};
@@ -384,8 +388,9 @@ struct rxq_poll {
/* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
struct tx_port {
- odp_port_t port_no;
- struct netdev *netdev;
+ struct dp_netdev_port *port;
+ int qid;
+ long long last_used;
struct hmap_node node;
};
@@ -498,7 +503,8 @@ static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *,
bool may_steal,
const struct nlattr *actions,
- size_t actions_len);
+ size_t actions_len,
+ long long now);
static void dp_netdev_input(struct dp_netdev_pmd_thread *,
struct dp_packet_batch *, odp_port_t port_no);
static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
@@ -541,6 +547,12 @@ static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
OVS_REQUIRES(pmd->port_mutex);
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+ long long now, bool purge);
+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx, long long now);
+
static inline bool emc_entry_alive(struct emc_entry *ce);
static void emc_clear_entry(struct emc_entry *ce);
@@ -1185,7 +1197,9 @@ port_create(const char *devname, const char *open_type, const char *type,
port->netdev = netdev;
port->n_rxq = netdev_n_rxq(netdev);
port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
port->type = xstrdup(type);
+ ovs_mutex_init(&port->txq_used_mutex);
for (i = 0; i < port->n_rxq; i++) {
error = netdev_rxq_open(netdev, &port->rxq[i], i);
@@ -1211,7 +1225,9 @@ out_rxq_close:
for (i = 0; i < n_open_rxqs; i++) {
netdev_rxq_close(port->rxq[i]);
}
+ ovs_mutex_destroy(&port->txq_used_mutex);
free(port->type);
+ free(port->txq_used);
free(port->rxq);
free(port);
@@ -1351,7 +1367,8 @@ port_destroy(struct dp_netdev_port *port)
for (unsigned i = 0; i < port->n_rxq; i++) {
netdev_rxq_close(port->rxq[i]);
}
-
+ ovs_mutex_destroy(&port->txq_used_mutex);
+ free(port->txq_used);
free(port->rxq);
free(port->type);
free(port);
@@ -1374,13 +1391,6 @@ get_port_by_name(struct dp_netdev *dp,
}
static int
-get_n_pmd_threads(struct dp_netdev *dp)
-{
- /* There is one non pmd thread in dp->poll_threads */
- return cmap_count(&dp->poll_threads) - 1;
-}
-
-static int
get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
{
struct dp_netdev_pmd_thread *pmd;
@@ -2476,7 +2486,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
packet_batch_init_packet(&pp, execute->packet);
dp_netdev_execute_actions(pmd, &pp, false, execute->actions,
- execute->actions_len);
+ execute->actions_len, 0);
if (pmd->core_id == NON_PMD_CORE_ID) {
ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2660,6 +2670,10 @@ port_reconfigure(struct dp_netdev_port *port)
}
/* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
+ /* Realloc 'used' counters for tx queues. */
+ free(port->txq_used);
+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
+
for (i = 0; i < netdev_n_rxq(netdev); i++) {
err = netdev_rxq_open(netdev, &port->rxq[i], i);
if (err) {
@@ -2737,6 +2751,7 @@ dpif_netdev_run(struct dpif *dpif)
}
}
}
+ dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
ovs_mutex_unlock(&dp->non_pmd_mutex);
dp_netdev_pmd_unref(non_pmd);
@@ -2786,6 +2801,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
{
struct tx_port *tx_port_cached;
+ /* Free all used tx queue ids. */
+ dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
+
HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
free(tx_port_cached);
}
@@ -2805,7 +2823,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
hmap_insert(&pmd->port_cache, &tx_port_cached->node,
- hash_port_no(tx_port_cached->port_no));
+ hash_port_no(tx_port_cached->port->port_no));
}
}
@@ -3021,11 +3039,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->numa_id = numa_id;
pmd->poll_cnt = 0;
- atomic_init(&pmd->tx_qid,
- (core_id == NON_PMD_CORE_ID)
- ? ovs_numa_get_n_cores()
- : get_n_pmd_threads(dp));
-
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
@@ -3116,18 +3129,16 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
free(pmd_list);
}
-/* Deletes all pmd threads on numa node 'numa_id' and
- * fixes tx_qids of other threads to keep them sequential. */
+/* Deletes all pmd threads on numa node 'numa_id'. */
static void
dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
{
struct dp_netdev_pmd_thread *pmd;
- int n_pmds_on_numa, n_pmds;
- int *free_idx, k = 0;
+ int n_pmds_on_numa;
+ int k = 0;
struct dp_netdev_pmd_thread **pmd_list;
n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
- free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
@@ -3135,7 +3146,6 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
* 'dp->poll_threads' (while we're iterating it) and it
* might quiesce. */
if (pmd->numa_id == numa_id) {
- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
pmd_list[k] = pmd;
ovs_assert(k < n_pmds_on_numa);
k++;
@@ -3146,21 +3156,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
dp_netdev_del_pmd(dp, pmd_list[i]);
}
- n_pmds = get_n_pmd_threads(dp);
- CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
- int old_tx_qid;
-
- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
-
- if (old_tx_qid >= n_pmds) {
- int new_tx_qid = free_idx[--k];
-
- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
- }
- }
-
free(pmd_list);
- free(free_idx);
}
/* Deletes all rx queues from pmd->poll_list and all the ports from
@@ -3188,7 +3184,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t port_no)
struct tx_port *tx;
HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
- if (tx->port_no == port_no) {
+ if (tx->port->port_no == port_no) {
return tx;
}
}
@@ -3313,11 +3309,11 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
{
struct tx_port *tx = xzalloc(sizeof *tx);
- tx->netdev = port->netdev;
- tx->port_no = port->port_no;
+ tx->port = port;
+ tx->qid = -1;
ovs_mutex_lock(&pmd->port_mutex);
- hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
+ hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
ovs_mutex_unlock(&pmd->port_mutex);
}
@@ -3658,7 +3654,7 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
actions = dp_netdev_flow_get_actions(flow);
dp_netdev_execute_actions(pmd, &batch->array, true,
- actions->actions, actions->size);
+ actions->actions, actions->size, now);
}
static inline void
@@ -3785,7 +3781,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet,
* we'll send the packet up twice. */
packet_batch_init_packet(&b, packet);
dp_netdev_execute_actions(pmd, &b, true,
- actions->data, actions->size);
+ actions->data, actions->size, 0);
add_actions = put_actions->size ? put_actions : actions;
if (OVS_LIKELY(error != ENOSPC)) {
@@ -3954,6 +3950,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
struct dp_netdev_execute_aux {
struct dp_netdev_pmd_thread *pmd;
+ long long now;
};
static void
@@ -3974,6 +3971,74 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
dp->upcall_cb = cb;
}
+static void
+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
+ long long now, bool purge)
+{
+ struct tx_port *tx;
+ struct dp_netdev_port *port;
+ long long interval;
+
+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
+ interval = now - tx->last_used;
+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+ port = tx->port;
+ ovs_mutex_lock(&port->txq_used_mutex);
+ port->txq_used[tx->qid]--;
+ ovs_mutex_unlock(&port->txq_used_mutex);
+ tx->qid = -1;
+ }
+ }
+}
+
+static int
+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
+ struct tx_port *tx, long long now)
+{
+ struct dp_netdev_port *port;
+ long long interval;
+ int i, min_cnt, min_qid;
+
+ if (OVS_UNLIKELY(!now)) {
+ now = time_msec();
+ }
+
+ interval = now - tx->last_used;
+ tx->last_used = now;
+
+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+ return tx->qid;
+ }
+
+ port = tx->port;
+
+ ovs_mutex_lock(&port->txq_used_mutex);
+ if (tx->qid >= 0) {
+ port->txq_used[tx->qid]--;
+ tx->qid = -1;
+ }
+
+ min_cnt = -1;
+ min_qid = 0;
+ for (i = 0; i < netdev_n_txq(port->netdev); i++) {
+ if (port->txq_used[i] < min_cnt || min_cnt == -1) {
+ min_cnt = port->txq_used[i];
+ min_qid = i;
+ }
+ }
+
+ port->txq_used[min_qid]++;
+ tx->qid = min_qid;
+
+ ovs_mutex_unlock(&port->txq_used_mutex);
+
+ dpif_netdev_xps_revalidate_pmd(pmd, now, false);
+
+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
+ pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
+ return min_qid;
+}
+
static struct tx_port *
pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
odp_port_t port_no)
@@ -3997,7 +4062,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
err = -EINVAL;
goto error;
}
- err = netdev_push_header(tun_port->netdev, batch, data);
+ err = netdev_push_header(tun_port->port->netdev, batch, data);
if (!err) {
return 0;
}
@@ -4024,7 +4089,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread *pmd,
if (!error || error == ENOSPC) {
packet_batch_init_packet(&b, packet);
dp_netdev_execute_actions(pmd, &b, may_steal,
- actions->data, actions->size);
+ actions->data, actions->size, 0);
} else if (may_steal) {
dp_packet_delete(packet);
}
@@ -4045,11 +4110,9 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
case OVS_ACTION_ATTR_OUTPUT:
p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
if (OVS_LIKELY(p)) {
- int tx_qid;
-
- atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
+ int tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, aux->now);
- netdev_send(p->netdev, tx_qid, packets_, may_steal);
+ netdev_send(p->port->netdev, tx_qid, packets_, may_steal);
return;
}
break;
@@ -4096,7 +4159,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
dp_packet_batch_apply_cutlen(packets_);
- netdev_pop_header(p->netdev, packets_);
+ netdev_pop_header(p->port->netdev, packets_);
if (!packets_->count) {
return;
}
@@ -4210,9 +4273,10 @@ static void
dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
bool may_steal,
- const struct nlattr *actions, size_t actions_len)
+ const struct nlattr *actions, size_t actions_len,
+ long long now)
{
- struct dp_netdev_execute_aux aux = { pmd };
+ struct dp_netdev_execute_aux aux = { pmd, now };
odp_execute_actions(&aux, packets, may_steal, actions,
actions_len, dp_execute_cb);
--
2.7.4
More information about the dev
mailing list