[ovs-dev] [RFC PATCH] netdev-dpdk: Add Tx intermediate queue for vhost ports.

Eelco Chaudron echaudro at redhat.com
Tue May 9 15:25:07 UTC 2017


On 23/04/17 17:41, Bhanuprakash Bodireddy wrote:

 > This commit adds the intermediate queue for vHost-user ports. It
 > improves the throughput in multiple virtual machines deployments and
 > also in cases with VM doing packet forwarding in kernel stack.
 >
 > This patch is aligned with intermediate queue implementation for dpdk
 > ports that can be found here: https://patchwork.ozlabs.org/patch/723309/

This patch and the one above combined will increase throughput in general
however to the cost of additional latency (see some numbers below).

However I still would like to see both patches applied with a flush every
tx batch. This still increase performance if the rx batch has overlapping
egress ports, but lacks the latency increase.

It would be nice if you could do your latency tests with this flush included
to see if you get the same results I got with this patch and the earlier 
one.

 > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
 > Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
 > Co-authored-by: Antonio Fischetti <antonio.fischetti at intel.com>
 > ---
 > - Limited testing is done with this RFC patch, test scenarios includes
 >   VM doing ip forwarding(Linux stack) and running testpmd in the guest.
 > - Sanity testing is done with multiple VMs to check for any 
locking/crashes.
 > - Much of the testing is done with single queue, and very basic 
testing with MQ.
 > - No throughput/latency tests are done at this point.

I did do some quick latency and throughput tests (with only this patch 
applied).
Same test setup as for the other patch set, i.e. two 82599ES 10G port 
with 64 byte
packets being send at wire speed:

Physical to Virtual test:

flows
Number      plain                patch +
of flows  git clone    patch      flush
========  =========  =========  =========
10          5945899    8006593    7833914
32          3872211    6596310    6530133
50          3283713    5861894    6618711
100         3132540    5953752    5857226
500         2964499    5612901    5273006
1000        2931952    5233089    5178038


Physical to Virtual to Physical test:

Number      plain                patch +
of flows  git clone    patch      flush
========  =========  =========  =========
10          3240647    2659526    3652217
32          2136872    2060313    2834941
50          1981795    1912476    2897763
100         1794678    1798084    2014881
500         1686756    1672014    1657513
1000        1677795    1628578    1612480

The results for the latency tests mimics your test case 2 form the previous
patch set, sending 10G traffic @ wire speed:

===== GIT CLONE
Pkt size  min(ns)  avg(ns)  max(ns)
  512      10,011   12,100   281,915
1024       7,870    9,313   193,116
1280       7,862    9,036   194,439
1518       8,215    9,417   204,782

===== PATCH
Pkt size  min(ns)  avg(ns)  max(ns)
  512      25,044   28,244   774,921
1024      29,029   33,031   218,653
1280      26,464   30,097   203,083
1518      25,870   29,412   204,165

===== PATCH + FLUSH
Pkt size  min(ns)  avg(ns)  max(ns)
  512      10,492   13,655   281,538
1024       8,407    9,784   205,095
1280       8,399    9,750   194,888
1518       8,367    9,722   196,973


 > TODO:
 > - Retry logic in 'netdev_dpdk_vhost_tx_burst' should be handled 
appropriately to
 >   lessen the throughput impact when multiple vHost-user port serviced 
by same PMD.
 >   An option could be to allow configurable 'retries' option and the 
default being
 >   no retries. During testing it was found that the second retry 
couldn't add a single
 >   packet to RX queue most of the times with ip forwarding in kernel 
stack*.

Taking the above into consideration see my review comment inline. Also 
I'll assume this
patch will be applied together with the other patch set.

 >
 >  lib/dpif-netdev.c     |  51 +++++++++++++++++++++-
 >  lib/netdev-dpdk.c     | 117 
++++++++++++++++++++++++++++++++++++++------------
 >  lib/netdev-dummy.c    |   1 +
 >  lib/netdev-linux.c    |   1 +
 >  lib/netdev-provider.h |   3 ++
 >  lib/netdev-vport.c    |   3 +-
 >  lib/netdev.c          |   9 ++++
 >  lib/netdev.h          |   1 +
 >  8 files changed, 156 insertions(+), 30 deletions(-)
 >
 > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
 > index a14a2eb..4710985 100644
 > --- a/lib/dpif-netdev.c
 > +++ b/lib/dpif-netdev.c
 > @@ -344,6 +344,8 @@ struct dp_netdev_rxq {
 >      struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll 
this queue. */
 >  };
 >
 > +#define LAST_USED_QID_NONE -1
 > +
 >  /* A port in a netdev-based datapath. */
 >  struct dp_netdev_port {
 >      odp_port_t port_no;
 > @@ -494,6 +496,8 @@ struct tx_port {
 >      int qid;
 >      long long last_used;
 >      struct hmap_node node;
 > +    int last_used_qid;    /* Last queue id where packets could be
 > +                             enqueued. */
 >  };
 >
 >  /* PMD: Poll modes drivers.  PMD accesses devices via polling to 
eliminate
 > @@ -3033,6 +3037,26 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 >  }
 >
 >  static void
 > +dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
 > +{
 > +    struct tx_port *cached_tx_port;
 > +    int tx_qid;
 > +
 > +    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
 > +        tx_qid = cached_tx_port->last_used_qid;
 > +
 > +        if (tx_qid != LAST_USED_QID_NONE) {
 > + netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
 > + cached_tx_port->port->dynamic_txqs);
 > +
 > +            /* Queue drained and mark it empty. */
 > +            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
 > +        }
 > +    }
 > +}
 > +
 > +
 > +static void
 >  dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
 >                             struct netdev_rxq *rx,
 >                             odp_port_t port_no)
 > @@ -3647,15 +3671,18 @@ pmd_load_queues_and_ports(struct 
dp_netdev_pmd_thread *pmd,
 >      return i;
 >  }
 >
 > +enum { DRAIN_TSC = 20000ULL };
 > +
 >  static void *
 >  pmd_thread_main(void *f_)
 >  {
 >      struct dp_netdev_pmd_thread *pmd = f_;
 > -    unsigned int lc = 0;
 > +    unsigned int lc = 0, lc_drain = 0;
 >      struct polled_queue *poll_list;
 >      bool exiting;
 >      int poll_cnt;
 >      int i;
 > +    uint64_t prev = 0, now = 0;
 >
 >      poll_list = NULL;
 >
 > @@ -3688,6 +3715,17 @@ reload:
 > poll_list[i].port_no);
 >          }
 >
 > +#define MAX_LOOP_TO_DRAIN 128
 > +        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
 > +            lc_drain = 0;
 > +            prev = now;
 > +            now = pmd->last_cycles;
 > +
 > +            if ((now - prev) > DRAIN_TSC) {
 > +                dp_netdev_drain_txq_ports(pmd);
 > +            }
 > +        }
 > +
 >          if (lc++ > 1024) {
 >              bool reload;
 >
 > @@ -4330,6 +4368,7 @@ dp_netdev_add_port_tx_to_pmd(struct 
dp_netdev_pmd_thread *pmd,
 >
 >      tx->port = port;
 >      tx->qid = -1;
 > +    tx->last_used_qid = LAST_USED_QID_NONE;
 >

Looks this could lead to a queue not being drained. However this is not
happening if the other patch is applied. So this patch should only be 
applied together
with the other one!

 >      hmap_insert(&pmd->tx_ports, &tx->node, 
hash_port_no(tx->port->port_no));
 >      pmd->need_reload = true;
 > @@ -4892,6 +4931,14 @@ dpif_netdev_xps_get_tx_qid(const struct 
dp_netdev_pmd_thread *pmd,
 >
 >      dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 >
 > +    /* The tx queue can change in XPS case, make sure packets in 
previous
 > +     * queue is drained properly. */
 > +    if (tx->last_used_qid != LAST_USED_QID_NONE &&
 > +               tx->qid != tx->last_used_qid) {
 > +        netdev_txq_drain(port->netdev, tx->last_used_qid, 
port->dynamic_txqs);
 > +        tx->last_used_qid = LAST_USED_QID_NONE;
 > +    }
 > +
 >      VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
 >               pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
 >      return min_qid;
 > @@ -4987,6 +5034,8 @@ dp_execute_cb(void *aux_, struct 
dp_packet_batch *packets_,
 >                  tx_qid = pmd->static_tx_qid;
 >              }
 >
 > +            p->last_used_qid = tx_qid;
 > +
 >              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
 >                          dynamic_txqs);
 >              return;
 > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
 > index ddc651b..26cfa85 100644
 > --- a/lib/netdev-dpdk.c
 > +++ b/lib/netdev-dpdk.c
 > @@ -286,6 +286,11 @@ struct dpdk_mp {
 >      struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
 >  };
 >
 > +/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
 > + * Defaults to 'NETDEV_MAX_BURST'(32) now.
 > + */
 > +#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
 > +
 >  /* There should be one 'struct dpdk_tx_queue' created for
 >   * each cpu core. */
 >  struct dpdk_tx_queue {
 > @@ -295,6 +300,11 @@ struct dpdk_tx_queue {
 >                                      * pmd threads (see 
'concurrent_txq'). */
 >      int map;                       /* Mapping of configured 
vhost-user queues
 >                                      * to enabled by guest. */
 > +    struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD];
 > +                                   /* Intermediate queue where 
packets can
 > +                                    * be buffered for vhost ports */
 > +    int pkt_cnt;                   /* Number of packets waiting to 
be sent on
 > +                                    * vhost port */
 >  };
 >
 >  /* dpdk has no way to remove dpdk ring ethernet devices
 > @@ -1666,6 +1676,61 @@ netdev_dpdk_vhost_update_tx_counters(struct 
netdev_stats *stats,
 >      }
 >  }
 >
 > +static int
 > +netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid,
 > +                           int dropped)

We should skip dropped here, see below..

 > +{
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 > +    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts;
 > +
 > +    int tx_vid = netdev_dpdk_get_vid(dev);
 > +    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
 > +    uint32_t sent=0;
 > +    uint32_t retries=0;

Guess there should be spaces around = for the 2 above?

 > +    uint32_t sum=0;
 > +    uint32_t total_pkts = 0;

No need to set sum and total_pkts to zero as it's set below

 > +
 > +    total_pkts = sum = txq->pkt_cnt;
 > +    do {
 > +        uint32_t ret;
 > +        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, 
&cur_pkts[sent], sum);
 > +        if (!ret) {

Do we need an OVS_UNLIKELY() here?

 > +            /* No packets enqueued - do not retry. */
 > +            break;
 > +        } else {
 > +            /* Packet have been sent */
 > +            sent += ret;
 > +
 > +            /* 'sum; packet have to be retransmitted */
 > +            sum -= ret;
 > +        }
 > +    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
 > +
 > +    for (int i=0; i < total_pkts - dropped; i++) {
 > +        dp_packet_delete(txq->pkts[i]);

Guess here we should delete all the packets. Not taking the dropped into 
account.
This has to do with the old function below, where qos would have dropped 
packets.
Here the total number of packets we need to free is total_pkts, i.e. the 
ones in
our intermediate queue we're flushing.

 > +    }
 > +
 > +    /* Reset pkt count */
 > +    txq->pkt_cnt = 0;
 > +
 > +    /* 'sum' refers to packets dropped */
 > +    return sum;
 > +}
 > +
 > +static int
 > +netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid,
 > +                            bool concurrent_txq OVS_UNUSED)
 > +{
 > +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 > +
 > +    if (OVS_LIKELY(txq->pkt_cnt)) {
 > +        netdev_dpdk_vhost_tx_burst(dev, qid, 0);
 > +    }
 > +
 > +    return 0;
 > +}
 > +

Why not keep the netdev_dpdk_txq_drain() function from your previous patch,
or maybe add a small comment why concurrent_txq's are not needed?


 >  static void
 >  __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
 >                           struct dp_packet **pkts, int cnt)
 > @@ -1674,16 +1739,20 @@ __netdev_dpdk_vhost_send(struct netdev 
*netdev, int qid,
 >      struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts;
 >      unsigned int total_pkts = cnt;
 >      unsigned int dropped = 0;
 > -    int i, retries = 0;
 > +    int i;
 >
 >      qid = dev->tx_q[qid % netdev->n_txq].map;
 > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 >
 >      if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0
 >                       || !(dev->flags & NETDEV_UP))) {
 >          rte_spinlock_lock(&dev->stats_lock);
 >          dev->stats.tx_dropped+= cnt;
 >          rte_spinlock_unlock(&dev->stats_lock);
 > -        goto out;
 > +
 > +        for (i = 0; i < total_pkts; i++) {
 > +            dp_packet_delete(pkts[i]);
 > +        }

Should we not bail out here?

 >      }
 >
 >      rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
 > @@ -1693,34 +1762,21 @@ __netdev_dpdk_vhost_send(struct netdev 
*netdev, int qid,
 >      cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
 >      dropped = total_pkts - cnt;
 >
 > -    do {
 > -        int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
 > -        unsigned int tx_pkts;
 > -
 > -        tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev),
 > -                                          vhost_qid, cur_pkts, cnt);
 > -        if (OVS_LIKELY(tx_pkts)) {
 > -            /* Packets have been sent.*/
 > -            cnt -= tx_pkts;
 > -            /* Prepare for possible retry.*/
 > -            cur_pkts = &cur_pkts[tx_pkts];
 > -        } else {
 > -            /* No packets sent - do not retry.*/
 > -            break;
 > +    int idx = 0;
 > +    while (idx < cnt) {
 > +        txq->pkts[txq->pkt_cnt++] = pkts[idx++];
 > +
 > +        if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
 > +            dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped);
 >          }
 > -    } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM));
 > +    }
 >
 >      rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
 >
 >      rte_spinlock_lock(&dev->stats_lock);
 >      netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
 > -                                         cnt + dropped);
 > +                                         dropped);
 >      rte_spinlock_unlock(&dev->stats_lock);
 > -
 > -out:
 > -    for (i = 0; i < total_pkts - dropped; i++) {
 > -        dp_packet_delete(pkts[i]);
 > -    }
 >  }
 >
 >  /* Tx function. Transmit packets indefinitely */
 > @@ -3247,7 +3303,7 @@ unlock:
 >                            SET_CONFIG, SET_TX_MULTIQ, SEND,    \
 >                            GET_CARRIER, GET_STATS,             \
 >                            GET_FEATURES, GET_STATUS,           \
 > -                          RECONFIGURE, RXQ_RECV)              \
 > +                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
 > {                                                             \
 > NAME,                                                     \
 >      true,                       /* is_pmd */                  \
 > @@ -3314,6 +3370,7 @@ unlock:
 > RXQ_RECV,                                                 \
 >      NULL,                       /* rx_wait */                 \
 >      NULL,                       /* rxq_drain */               \
 > +    TXQ_DRAIN,                  /* txq_drain */               \
 >  }
 >
 >  static const struct netdev_class dpdk_class =
 > @@ -3330,7 +3387,8 @@ static const struct netdev_class dpdk_class =
 >          netdev_dpdk_get_features,
 >          netdev_dpdk_get_status,
 >          netdev_dpdk_reconfigure,
 > -        netdev_dpdk_rxq_recv);
 > +        netdev_dpdk_rxq_recv,
 > +        NULL);
 >
 >  static const struct netdev_class dpdk_ring_class =
 >      NETDEV_DPDK_CLASS(
 > @@ -3346,7 +3404,8 @@ static const struct netdev_class dpdk_ring_class =
 >          netdev_dpdk_get_features,
 >          netdev_dpdk_get_status,
 >          netdev_dpdk_reconfigure,
 > -        netdev_dpdk_rxq_recv);
 > +        netdev_dpdk_rxq_recv,
 > +        NULL);
 >
 >  static const struct netdev_class dpdk_vhost_class =
 >      NETDEV_DPDK_CLASS(
 > @@ -3362,7 +3421,8 @@ static const struct netdev_class dpdk_vhost_class =
 >          NULL,
 >          NULL,
 >          netdev_dpdk_vhost_reconfigure,
 > -        netdev_dpdk_vhost_rxq_recv);
 > +        netdev_dpdk_vhost_rxq_recv,
 > +        netdev_dpdk_vhost_txq_drain);
 >  static const struct netdev_class dpdk_vhost_client_class =
 >      NETDEV_DPDK_CLASS(
 >          "dpdkvhostuserclient",
 > @@ -3377,7 +3437,8 @@ static const struct netdev_class 
dpdk_vhost_client_class =
 >          NULL,
 >          NULL,
 >          netdev_dpdk_vhost_client_reconfigure,
 > -        netdev_dpdk_vhost_rxq_recv);
 > +        netdev_dpdk_vhost_rxq_recv,
 > +        netdev_dpdk_vhost_txq_drain);
 >
 >  void
 >  netdev_dpdk_register(void)
 > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
 > index 0657434..4ef659e 100644
 > --- a/lib/netdev-dummy.c
 > +++ b/lib/netdev-dummy.c
 > @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
 > netdev_dummy_rxq_recv,                                      \
 > netdev_dummy_rxq_wait,                                      \
 > netdev_dummy_rxq_drain,                                     \
 > + NULL,                                                       \
 >  }
 >
 >  static const struct netdev_class dummy_class =
 > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
 > index 9ff1333..79478ee 100644
 > --- a/lib/netdev-linux.c
 > +++ b/lib/netdev-linux.c
 > @@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev 
*netdev_, enum netdev_flags off,
 > netdev_linux_rxq_recv,                                      \
 > netdev_linux_rxq_wait,                                      \
 > netdev_linux_rxq_drain,                                     \
 > + NULL,                                                       \
 >  }
 >
 >  const struct netdev_class netdev_linux_class =
 > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
 > index 8346fc4..5dd68db 100644
 > --- a/lib/netdev-provider.h
 > +++ b/lib/netdev-provider.h
 > @@ -769,6 +769,9 @@ struct netdev_class {
 >
 >      /* Discards all packets waiting to be received from 'rx'. */
 >      int (*rxq_drain)(struct netdev_rxq *rx);
 > +
 > +    /* Drain all packets waiting to be sent on queue 'qid'. */
 > +    int (*txq_drain)(struct netdev *netdev, int qid, bool 
concurrent_txq);
 >  };
 >
 >  int netdev_register_provider(const struct netdev_class *);
 > diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
 > index 39093e8..eb4b7d2 100644
 > --- a/lib/netdev-vport.c
 > +++ b/lib/netdev-vport.c
 > @@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct 
netdev_stats *stats)
 >      NULL,                   /* rx_dealloc */ \
 >      NULL,                   /* rx_recv */ \
 >      NULL,                   /* rx_wait */ \
 > -    NULL,                   /* rx_drain */
 > +    NULL,                   /* rx_drain */ \
 > +    NULL,                   /* tx_drain */
 >
 >
 >  #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, 
POP_HEADER)   \
 > diff --git a/lib/netdev.c b/lib/netdev.c
 > index a8d8eda..b486b5d 100644
 > --- a/lib/netdev.c
 > +++ b/lib/netdev.c
 > @@ -678,6 +678,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
 >              : 0);
 >  }
 >
 > +/* Flush packets on the queue 'qid'. */
 > +int
 > +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
 > +{
 > +    return (netdev->netdev_class->txq_drain
 > +            ? netdev->netdev_class->txq_drain(netdev, qid, 
netdev_txq_drain)
 > +            : EOPNOTSUPP);
 > +}
 > +
 >  /* Configures the number of tx queues of 'netdev'. Returns 0 if 
successful,
 >   * otherwise a positive errno value.
 >   *
 > diff --git a/lib/netdev.h b/lib/netdev.h
 > index d6c07c1..7ddd790 100644
 > --- a/lib/netdev.h
 > +++ b/lib/netdev.h
 > @@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
 >  int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
 >                  bool may_steal, bool concurrent_txq);
 >  void netdev_send_wait(struct netdev *, int qid);
 > +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
 >
 >  /* native tunnel APIs */
 >  /* Structure to pass parameters required to build a tunnel header. */





More information about the dev mailing list