[ovs-dev] [PATCH v2 1/2] netdev-dpdk: Use intermediate queue during packet transmission.
Ilya Maximets
i.maximets at samsung.com
Wed Jan 18 07:37:13 UTC 2017
Not a complete review. This code is full of races.
See details inline.
Best regards, Ilya Maximets.
On 17.01.2017 18:37, antonio.fischetti at intel.com wrote:
> This patch implements the intermediate Tx queues on 'dpdk' type ports.
>
> Test results:
> * In worst case scenario with fewer packets per batch, a significant
> bottleneck is observed for netdev_dpdk_eth_send() function due to
> expensive MMIO writes.
>
> * Also its observed that CPI(cycles per instruction) Rate for the function
> stood between 3.15 and 4.1 which is significantly higher than acceptable
> limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend
> pipeline can retire 4 micro-operations in a cycle).
>
> * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall
> throughput improved significantly.
>
>
> Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> Co-authored-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> Signed-off-by: Markus Magnusson <markus.magnusson at ericsson.com>
> Co-authored-by: Markus Magnusson <markus.magnusson at ericsson.com>
> ---
> lib/dpif-netdev.c | 53 +++++++++++++++++++++++++++++++--
> lib/netdev-bsd.c | 1 +
> lib/netdev-dpdk.c | 82 ++++++++++++++++++++++++++++++++++++++++++++++-----
> lib/netdev-dummy.c | 1 +
> lib/netdev-linux.c | 1 +
> lib/netdev-provider.h | 8 +++++
> lib/netdev-vport.c | 3 +-
> lib/netdev.c | 9 ++++++
> lib/netdev.h | 1 +
> 9 files changed, 149 insertions(+), 10 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 3901129..58ac429 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -289,6 +289,8 @@ struct dp_netdev_rxq {
> struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */
> };
>
> +#define LAST_USED_QID_NONE -1
> +
> /* A port in a netdev-based datapath. */
> struct dp_netdev_port {
> odp_port_t port_no;
> @@ -303,6 +305,8 @@ struct dp_netdev_port {
> char *type; /* Port type as requested by user. */
> char *rxq_affinity_list; /* Requested affinity of rx queues. */
> bool need_reconfigure; /* True if we should reconfigure netdev. */
> + int last_used_qid; /* Last queue id where packets could be
> + enqueued. */
> };
>
> /* Contained by struct dp_netdev_flow's 'stats' member. */
> @@ -619,6 +623,9 @@ static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
> static inline bool emc_entry_alive(struct emc_entry *ce);
> static void emc_clear_entry(struct emc_entry *ce);
>
> +static struct tx_port *pmd_send_port_cache_lookup
> +(const struct dp_netdev_pmd_thread *pmd, odp_port_t port_no);
> +
> static void
> emc_cache_init(struct emc_cache *flow_cache)
> {
> @@ -3507,15 +3514,19 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd,
> return i;
> }
>
> +enum { DRAIN_TSC = 20000ULL };
> +
> static void *
> pmd_thread_main(void *f_)
> {
> struct dp_netdev_pmd_thread *pmd = f_;
> - unsigned int lc = 0;
> + unsigned int lc = 0, lc_drain = 0;
> struct polled_queue *poll_list;
> bool exiting;
> int poll_cnt;
> int i;
> + uint64_t prev = 0, now = 0;
> + struct tx_port *tx_port;
>
> poll_list = NULL;
>
> @@ -3548,6 +3559,26 @@ reload:
> poll_list[i].port_no);
> }
>
> +#define MAX_LOOP_TO_DRAIN 128
> + if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
> + lc_drain = 0;
> + prev = now;
> + now = pmd->last_cycles;
> + if ((now - prev) > DRAIN_TSC) {
> + HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
'pmd->tx_ports' must be protected by 'pmd->port_mutex'. Also it can be changed
while pmd still working. I think you wanted something like 'pmd->send_port_cache'.
> + if (tx_port->port->last_used_qid != LAST_USED_QID_NONE) {
> + /* This queue may contain some buffered packets waiting
> + * to be sent out. */
> + netdev_txq_drain(tx_port->port->netdev,
> + tx_port->port->last_used_qid,
> + tx_port->port->dynamic_txqs);
> + /* Mark it as empty. */
> + tx_port->port->last_used_qid = LAST_USED_QID_NONE;
'port' is a pointer to the common structure --> 'port->last_used_qid' will be
concurrently updated by all threads --> total mess.
> + }
> + }
> + }
> + }
> +
> if (lc++ > 1024) {
> bool reload;
>
> @@ -3883,6 +3914,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>
> tx->port = port;
> tx->qid = -1;
> + port->last_used_qid = LAST_USED_QID_NONE;
>
> hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
> pmd->need_reload = true;
> @@ -4538,7 +4570,24 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
> } else {
> tx_qid = pmd->static_tx_qid;
> }
> +//TODO Add UNLIKELY to the 1st condition?
> + /* Is the current qid the same as the last one we used? */
> + if ((p->port->last_used_qid != LAST_USED_QID_NONE) &&
> + (p->port->last_used_qid != tx_qid)) {
> + /* The current assigned queue was changed, we need to drain
> + * packets from the previous queue. */
> + netdev_txq_drain(p->port->netdev,
> + p->port->last_used_qid,
> + p->port->dynamic_txqs);
> + p->port->last_used_qid = LAST_USED_QID_NONE;
> + }
>
> + /* In case these packets gets buffered into an intermediate
> + * queue and XPS is enabled the drain function could find a
> + * different Tx qid assigned to its thread. We keep track
> + * of the qid we're now using, that will trigger the drain
> + * function and will select the right queue to flush. */
> + p->port->last_used_qid = tx_qid;
> netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
> dynamic_txqs);
> return;
> @@ -4952,7 +5001,7 @@ dpif_dummy_register(enum dummy_level level)
> "dp port new-number",
> 3, 3, dpif_dummy_change_port_number, NULL);
> }
> -
> +
> /* Datapath Classifier. */
>
> /* A set of rules that all have the same fields wildcarded. */
> diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> index 94c515d..00d5263 100644
> --- a/lib/netdev-bsd.c
> +++ b/lib/netdev-bsd.c
> @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off,
> netdev_bsd_rxq_recv, \
> netdev_bsd_rxq_wait, \
> netdev_bsd_rxq_drain, \
> + NULL, \
> }
>
> const struct netdev_class netdev_bsd_class =
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index 94568a1..d560bf6 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = {
>
> enum { DPDK_RING_SIZE = 256 };
> BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
> -enum { DRAIN_TSC = 200000ULL };
>
> enum dpdk_dev_type {
> DPDK_DEV_ETH = 0,
> @@ -289,12 +288,18 @@ struct dpdk_mp {
> /* There should be one 'struct dpdk_tx_queue' created for
> * each cpu core. */
> struct dpdk_tx_queue {
> + int count; /* Number of buffered packets waiting to
> + be sent. */
> rte_spinlock_t tx_lock; /* Protects the members and the NIC queue
> * from concurrent access. It is used only
> * if the queue is shared among different
> * pmd threads (see 'concurrent_txq'). */
> int map; /* Mapping of configured vhost-user queues
> * to enabled by guest. */
> + struct rte_mbuf *burst_pkts[NETDEV_MAX_BURST];
> + /* Intermediate queues where packets can
> + * be buffered to amortize the cost of MMIO
> + * writes. */
> };
>
> /* dpdk has no way to remove dpdk ring ethernet devices
> @@ -1381,6 +1386,7 @@ static inline int
> netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
> struct rte_mbuf **pkts, int cnt)
> {
> + struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> uint32_t nb_tx = 0;
>
> while (nb_tx != cnt) {
> @@ -1404,6 +1410,8 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
> }
> }
>
> + txq->count = cnt - nb_tx;
> +
> return cnt - nb_tx;
> }
>
> @@ -1788,6 +1796,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch)
> }
> }
>
> +/* Enqueue packets in an intermediate queue and call the burst
> + * function when the queue is full. This way we can amortize the
> + * cost of MMIO writes. */
> +static inline int
> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
> + struct rte_mbuf **pkts, int cnt)
> +{
> + struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> + int i = 0;
> + int dropped = 0;
> +
> + while (i < cnt) {
> + int freeslots = NETDEV_MAX_BURST - txq->count;
> + int tocopy = MIN(freeslots, cnt-i);
> +
> + memcpy(&txq->burst_pkts[txq->count], &pkts[i],
> + tocopy * sizeof (struct rte_mbuf *));
> +
> + txq->count += tocopy;
> + i += tocopy;
> +
> + /* Queue full, burst the packets */
> + if (txq->count >= NETDEV_MAX_BURST) {
> + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts,
> + txq->count);
> + }
> + }
> + return dropped;
> +}
> +
> static int
> netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
> struct dp_packet_batch *batch,
> @@ -1836,7 +1875,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
> cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
> dropped = batch->count - cnt;
>
> - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
> + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
>
> if (OVS_UNLIKELY(dropped)) {
> rte_spinlock_lock(&dev->stats_lock);
> @@ -1850,6 +1889,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
> }
> }
>
> +/* Drain tx queues, this is called periodically to empty the
> + * intermediate queue in case of few packets (< NETDEV_MAX_BURST)
> + * are buffered into the queue. */
> +static int
> +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool concurrent_txq)
> +{
> + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> + struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> +
> + if (OVS_LIKELY(txq->count)) {
> + if (OVS_UNLIKELY(concurrent_txq)) {
> + qid = qid % dev->up.n_txq; // TODO: do we need this?
> + rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> + }
> +
> + netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq->count);
> +
> + if (OVS_UNLIKELY(concurrent_txq)) {
> + rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> + }
> + }
> + return 0;
> +}
> +
> static int
> netdev_dpdk_eth_send(struct netdev *netdev, int qid,
> struct dp_packet_batch *batch, bool may_steal,
> @@ -3243,7 +3306,7 @@ unlock:
> SET_CONFIG, SET_TX_MULTIQ, SEND, \
> GET_CARRIER, GET_STATS, \
> GET_FEATURES, GET_STATUS, \
> - RECONFIGURE, RXQ_RECV) \
> + RECONFIGURE, RXQ_RECV, TXQ_DRAIN) \
> { \
> NAME, \
> true, /* is_pmd */ \
> @@ -3310,6 +3373,7 @@ unlock:
> RXQ_RECV, \
> NULL, /* rx_wait */ \
> NULL, /* rxq_drain */ \
> + TXQ_DRAIN, /* txq_drain */ \
> }
>
> static const struct netdev_class dpdk_class =
> @@ -3326,7 +3390,8 @@ static const struct netdev_class dpdk_class =
> netdev_dpdk_get_features,
> netdev_dpdk_get_status,
> netdev_dpdk_reconfigure,
> - netdev_dpdk_rxq_recv);
> + netdev_dpdk_rxq_recv,
> + netdev_dpdk_txq_drain);
>
> static const struct netdev_class dpdk_ring_class =
> NETDEV_DPDK_CLASS(
> @@ -3342,7 +3407,8 @@ static const struct netdev_class dpdk_ring_class =
> netdev_dpdk_get_features,
> netdev_dpdk_get_status,
> netdev_dpdk_reconfigure,
> - netdev_dpdk_rxq_recv);
> + netdev_dpdk_rxq_recv,
> + NULL);
>
> static const struct netdev_class dpdk_vhost_class =
> NETDEV_DPDK_CLASS(
> @@ -3358,7 +3424,8 @@ static const struct netdev_class dpdk_vhost_class =
> NULL,
> NULL,
> netdev_dpdk_vhost_reconfigure,
> - netdev_dpdk_vhost_rxq_recv);
> + netdev_dpdk_vhost_rxq_recv,
> + NULL);
> static const struct netdev_class dpdk_vhost_client_class =
> NETDEV_DPDK_CLASS(
> "dpdkvhostuserclient",
> @@ -3373,7 +3440,8 @@ static const struct netdev_class dpdk_vhost_client_class =
> NULL,
> NULL,
> netdev_dpdk_vhost_client_reconfigure,
> - netdev_dpdk_vhost_rxq_recv);
> + netdev_dpdk_vhost_rxq_recv,
> + NULL);
>
> void
> netdev_dpdk_register(void)
> diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> index e6e36cd..6a8ad45 100644
> --- a/lib/netdev-dummy.c
> +++ b/lib/netdev-dummy.c
> @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
> netdev_dummy_rxq_recv, \
> netdev_dummy_rxq_wait, \
> netdev_dummy_rxq_drain, \
> + NULL, \
> }
>
> static const struct netdev_class dummy_class =
> diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> index a5a9ec1..2499b3e 100644
> --- a/lib/netdev-linux.c
> +++ b/lib/netdev-linux.c
> @@ -2831,6 +2831,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off,
> netdev_linux_rxq_recv, \
> netdev_linux_rxq_wait, \
> netdev_linux_rxq_drain, \
> + NULL, \
> }
>
> const struct netdev_class netdev_linux_class =
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index 8346fc4..97e72c6 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -335,6 +335,11 @@ struct netdev_class {
> * If the function returns a non-zero value, some of the packets might have
> * been sent anyway.
> *
> + * Some netdev provider - like in case of 'dpdk' - may buffer the batch
> + * of packets into an intermediate queue. Buffered packets will be sent
> + * out when their number will exceed a threshold or by the periodic call
> + * to the drain function.
> + *
> * If 'may_steal' is false, the caller retains ownership of all the
> * packets. If 'may_steal' is true, the caller transfers ownership of all
> * the packets to the network device, regardless of success.
> @@ -769,6 +774,9 @@ struct netdev_class {
>
> /* Discards all packets waiting to be received from 'rx'. */
> int (*rxq_drain)(struct netdev_rxq *rx);
> +
> + /* Drain all packets waiting to be sent on queue 'qid'. */
> + int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
> };
>
> int netdev_register_provider(const struct netdev_class *);
> diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
> index 4c2ced5..77225b8 100644
> --- a/lib/netdev-vport.c
> +++ b/lib/netdev-vport.c
> @@ -838,7 +838,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats)
> NULL, /* rx_dealloc */ \
> NULL, /* rx_recv */ \
> NULL, /* rx_wait */ \
> - NULL, /* rx_drain */
> + NULL, /* rx_drain */ \
> + NULL, /* tx_drain */
>
>
> #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER) \
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 839b1f6..0d48e41 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
> : 0);
> }
>
> +/* Flush packets on the queue 'qid'. */
> +int
> +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
> +{
> + return (netdev->netdev_class->txq_drain
> + ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
> + : EOPNOTSUPP);
> +}
> +
> /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
> * otherwise a positive errno value.
> *
> diff --git a/lib/netdev.h b/lib/netdev.h
> index bef9cdd..49969a1 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -153,6 +153,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
> int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
> bool may_steal, bool concurrent_txq);
> void netdev_send_wait(struct netdev *, int qid);
> +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
>
> /* native tunnel APIs */
> /* Structure to pass parameters required to build a tunnel header. */
>
More information about the dev
mailing list