[ovs-dev] [PATCH v2 1/2] netdev-dpdk: Use intermediate queue during packet transmission.

Fischetti, Antonio antonio.fischetti at intel.com
Wed Jan 18 11:30:52 UTC 2017


Thanks Ilya for your review, please see my comments inline.

Regards,
Antonio

> -----Original Message-----
> From: Ilya Maximets [mailto:i.maximets at samsung.com]
> Sent: Wednesday, January 18, 2017 7:37 AM
> To: Fischetti, Antonio <antonio.fischetti at intel.com>; dev at openvswitch.org
> Cc: aconole at redhat.com; diproiettod at vmware.com; Bodireddy, Bhanuprakash
> <bhanuprakash.bodireddy at intel.com>; markus.magnusson at ericsson.com
> Subject: Re: [PATCH v2 1/2] netdev-dpdk: Use intermediate queue during
> packet transmission.
> 
> Not a complete review. This code is full of races.
> See details inline.
> 
> Best regards, Ilya Maximets.
> 
> On 17.01.2017 18:37, antonio.fischetti at intel.com wrote:
> > This patch implements the intermediate Tx queues on 'dpdk' type ports.
> >
> > Test results:
> >  * In worst case scenario with fewer packets per batch, a significant
> >    bottleneck is observed for netdev_dpdk_eth_send() function due to
> >    expensive MMIO writes.
> >
> >  * Also its observed that CPI(cycles per instruction) Rate for the
> function
> >    stood between 3.15 and 4.1 which is significantly higher than
> acceptable
> >    limit of 1.0 for HPC applications and theoretical limit of 0.25 (As
> Backend
> >    pipeline can retire 4 micro-operations in a cycle).
> >
> >  * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the
> overall
> >    throughput improved significantly.
> >
> >
> > Signed-off-by: Antonio Fischetti <antonio.fischetti at intel.com>
> > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> > Co-authored-by: Bhanuprakash Bodireddy
> <bhanuprakash.bodireddy at intel.com>
> > Signed-off-by: Markus Magnusson <markus.magnusson at ericsson.com>
> > Co-authored-by: Markus Magnusson <markus.magnusson at ericsson.com>
> > ---
> >  lib/dpif-netdev.c     | 53 +++++++++++++++++++++++++++++++--
> >  lib/netdev-bsd.c      |  1 +
> >  lib/netdev-dpdk.c     | 82
> ++++++++++++++++++++++++++++++++++++++++++++++-----
> >  lib/netdev-dummy.c    |  1 +
> >  lib/netdev-linux.c    |  1 +
> >  lib/netdev-provider.h |  8 +++++
> >  lib/netdev-vport.c    |  3 +-
> >  lib/netdev.c          |  9 ++++++
> >  lib/netdev.h          |  1 +
> >  9 files changed, 149 insertions(+), 10 deletions(-)
> >
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index 3901129..58ac429 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -289,6 +289,8 @@ struct dp_netdev_rxq {
> >      struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll
> this queue. */
> >  };
> >
> > +#define LAST_USED_QID_NONE -1
> > +
> >  /* A port in a netdev-based datapath. */
> >  struct dp_netdev_port {
> >      odp_port_t port_no;
> > @@ -303,6 +305,8 @@ struct dp_netdev_port {
> >      char *type;                 /* Port type as requested by user. */
> >      char *rxq_affinity_list;    /* Requested affinity of rx queues. */
> >      bool need_reconfigure;      /* True if we should reconfigure
> netdev. */
> > +    int last_used_qid;          /* Last queue id where packets could be
> > +                                   enqueued. */
> >  };
> >
> >  /* Contained by struct dp_netdev_flow's 'stats' member.  */
> > @@ -619,6 +623,9 @@ static int dpif_netdev_xps_get_tx_qid(const struct
> dp_netdev_pmd_thread *pmd,
> >  static inline bool emc_entry_alive(struct emc_entry *ce);
> >  static void emc_clear_entry(struct emc_entry *ce);
> >
> > +static struct tx_port *pmd_send_port_cache_lookup
> > +(const struct dp_netdev_pmd_thread *pmd, odp_port_t port_no);
> > +
> >  static void
> >  emc_cache_init(struct emc_cache *flow_cache)
> >  {
> > @@ -3507,15 +3514,19 @@ pmd_load_queues_and_ports(struct
> dp_netdev_pmd_thread *pmd,
> >      return i;
> >  }
> >
> > +enum { DRAIN_TSC = 20000ULL };
> > +
> >  static void *
> >  pmd_thread_main(void *f_)
> >  {
> >      struct dp_netdev_pmd_thread *pmd = f_;
> > -    unsigned int lc = 0;
> > +    unsigned int lc = 0, lc_drain = 0;
> >      struct polled_queue *poll_list;
> >      bool exiting;
> >      int poll_cnt;
> >      int i;
> > +    uint64_t prev = 0, now = 0;
> > +    struct tx_port *tx_port;
> >
> >      poll_list = NULL;
> >
> > @@ -3548,6 +3559,26 @@ reload:
> >                                         poll_list[i].port_no);
> >          }
> >
> > +#define MAX_LOOP_TO_DRAIN 128
> > +        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
> > +            lc_drain = 0;
> > +            prev = now;
> > +            now = pmd->last_cycles;
> > +            if ((now - prev) > DRAIN_TSC) {
> > +                HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
> 
> 'pmd->tx_ports' must be protected by 'pmd->port_mutex'. Also it can be
> changed
> while pmd still working. I think you wanted something like 'pmd-
> >send_port_cache'.

Ok, I will replace it with
HMAP_FOR_EACH (tx_port, node, &pmd->send_port_cache)


> 
> > +                    if (tx_port->port->last_used_qid !=
> LAST_USED_QID_NONE) {
> > +                        /* This queue may contain some buffered packets
> waiting
> > +                         * to be sent out. */
> > +                        netdev_txq_drain(tx_port->port->netdev,
> > +                                tx_port->port->last_used_qid,
> > +                                tx_port->port->dynamic_txqs);
> > +                        /* Mark it as empty. */
> > +                        tx_port->port->last_used_qid =
> LAST_USED_QID_NONE;
> 
> 'port' is a pointer to the common structure --> 'port->last_used_qid' will
> be
> concurrently updated by all threads --> total mess.

Ok, will fix this. What about moving 'last_used_qid' inside 'struct tx_port'?
This way, if I loop on send_port_cache like

    HMAP_FOR_EACH (tx_port, node, &pmd->send_port_cache)

then I could refer to
    tx_port->last_used_qid

with no concurrency issue.
Do you think this could be ok? Any suggestion will help, thanks.



> 
> > +                    }
> > +                }
> > +            }
> > +        }
> > +
> >          if (lc++ > 1024) {
> >              bool reload;
> >
> > @@ -3883,6 +3914,7 @@ dp_netdev_add_port_tx_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
> >
> >      tx->port = port;
> >      tx->qid = -1;
> > +    port->last_used_qid = LAST_USED_QID_NONE;
> >
> >      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
> >port_no));
> >      pmd->need_reload = true;
> > @@ -4538,7 +4570,24 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
> >              } else {
> >                  tx_qid = pmd->static_tx_qid;
> >              }
> > +//TODO Add UNLIKELY to the 1st condition?
> > +            /* Is the current qid the same as the last one we used? */
> > +            if ((p->port->last_used_qid != LAST_USED_QID_NONE) &&
> > +                (p->port->last_used_qid != tx_qid)) {
> > +                /* The current assigned queue was changed, we need to
> drain
> > +                 * packets from the previous queue. */
> > +                netdev_txq_drain(p->port->netdev,
> > +                        p->port->last_used_qid,
> > +                        p->port->dynamic_txqs);
> > +                p->port->last_used_qid = LAST_USED_QID_NONE;
> > +            }
> >
> > +            /* In case these packets gets buffered into an intermediate
> > +             * queue and XPS is enabled the drain function could find a
> > +             * different Tx qid assigned to its thread.  We keep track
> > +             * of the qid we're now using, that will trigger the drain
> > +             * function and will select the right queue to flush. */
> > +            p->port->last_used_qid = tx_qid;
> >              netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
> >                          dynamic_txqs);
> >              return;
> > @@ -4952,7 +5001,7 @@ dpif_dummy_register(enum dummy_level level)
> >                               "dp port new-number",
> >                               3, 3, dpif_dummy_change_port_number,
> NULL);
> >  }
> > -

> > +
> >  /* Datapath Classifier. */
> >
> >  /* A set of rules that all have the same fields wildcarded. */
> > diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
> > index 94c515d..00d5263 100644
> > --- a/lib/netdev-bsd.c
> > +++ b/lib/netdev-bsd.c
> > @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_,
> enum netdev_flags off,
> >      netdev_bsd_rxq_recv,                             \
> >      netdev_bsd_rxq_wait,                             \
> >      netdev_bsd_rxq_drain,                            \
> > +    NULL,                                            \
> >  }
> >
> >  const struct netdev_class netdev_bsd_class =
> > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> > index 94568a1..d560bf6 100644
> > --- a/lib/netdev-dpdk.c
> > +++ b/lib/netdev-dpdk.c
> > @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = {
> >
> >  enum { DPDK_RING_SIZE = 256 };
> >  BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
> > -enum { DRAIN_TSC = 200000ULL };
> >
> >  enum dpdk_dev_type {
> >      DPDK_DEV_ETH = 0,
> > @@ -289,12 +288,18 @@ struct dpdk_mp {
> >  /* There should be one 'struct dpdk_tx_queue' created for
> >   * each cpu core. */
> >  struct dpdk_tx_queue {
> > +    int count;                     /* Number of buffered packets
> waiting to
> > +                                      be sent. */
> >      rte_spinlock_t tx_lock;        /* Protects the members and the NIC
> queue
> >                                      * from concurrent access.  It is
> used only
> >                                      * if the queue is shared among
> different
> >                                      * pmd threads (see
> 'concurrent_txq'). */
> >      int map;                       /* Mapping of configured vhost-user
> queues
> >                                      * to enabled by guest. */
> > +    struct rte_mbuf *burst_pkts[NETDEV_MAX_BURST];
> > +                                   /* Intermediate queues where packets
> can
> > +                                    * be buffered to amortize the cost
> of MMIO
> > +                                    * writes. */
> >  };
> >
> >  /* dpdk has no way to remove dpdk ring ethernet devices
> > @@ -1381,6 +1386,7 @@ static inline int
> >  netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid,
> >                           struct rte_mbuf **pkts, int cnt)
> >  {
> > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> >      uint32_t nb_tx = 0;
> >
> >      while (nb_tx != cnt) {
> > @@ -1404,6 +1410,8 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev,
> int qid,
> >          }
> >      }
> >
> > +    txq->count = cnt - nb_tx;
> > +
> >      return cnt - nb_tx;
> >  }
> >
> > @@ -1788,6 +1796,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid,
> struct dp_packet_batch *batch)
> >      }
> >  }
> >
> > +/* Enqueue packets in an intermediate queue and call the burst
> > + * function when the queue is full.  This way we can amortize the
> > + * cost of MMIO writes. */
> > +static inline int
> > +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid,
> > +                            struct rte_mbuf **pkts, int cnt)
> > +{
> > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> > +
> > +    int i = 0;
> > +    int dropped = 0;
> > +
> > +    while (i < cnt) {
> > +        int freeslots = NETDEV_MAX_BURST - txq->count;
> > +        int tocopy = MIN(freeslots, cnt-i);
> > +
> > +        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
> > +               tocopy * sizeof (struct rte_mbuf *));
> > +
> > +        txq->count += tocopy;
> > +        i += tocopy;
> > +
> > +        /* Queue full, burst the packets */
> > +        if (txq->count >= NETDEV_MAX_BURST) {
> > +           dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq-
> >burst_pkts,
> > +                   txq->count);
> > +        }
> > +    }
> > +    return dropped;
> > +}
> > +
> >  static int
> >  netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
> >                         struct dp_packet_batch *batch,
> > @@ -1836,7 +1875,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int
> qid,
> >          cnt = netdev_dpdk_qos_run(dev, pkts, cnt);
> >          dropped = batch->count - cnt;
> >
> > -        dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt);
> > +        dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt);
> >
> >          if (OVS_UNLIKELY(dropped)) {
> >              rte_spinlock_lock(&dev->stats_lock);
> > @@ -1850,6 +1889,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int
> qid,
> >      }
> >  }
> >
> > +/* Drain tx queues, this is called periodically to empty the
> > + * intermediate queue in case of few packets (< NETDEV_MAX_BURST)
> > + * are buffered into the queue. */
> > +static int
> > +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool
> concurrent_txq)
> > +{
> > +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> > +    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
> > +
> > +    if (OVS_LIKELY(txq->count)) {
> > +        if (OVS_UNLIKELY(concurrent_txq)) {
> > +            qid = qid % dev->up.n_txq;  // TODO: do we need this?
> > +            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> > +        }
> > +
> > +        netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq-
> >count);
> > +
> > +        if (OVS_UNLIKELY(concurrent_txq)) {
> > +            rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> > +        }
> > +    }
> > +    return 0;
> > +}
> > +
> >  static int
> >  netdev_dpdk_eth_send(struct netdev *netdev, int qid,
> >                       struct dp_packet_batch *batch, bool may_steal,
> > @@ -3243,7 +3306,7 @@ unlock:
> >                            SET_CONFIG, SET_TX_MULTIQ, SEND,    \
> >                            GET_CARRIER, GET_STATS,             \
> >                            GET_FEATURES, GET_STATUS,           \
> > -                          RECONFIGURE, RXQ_RECV)              \
> > +                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
> >  {                                                             \
> >      NAME,                                                     \
> >      true,                       /* is_pmd */                  \
> > @@ -3310,6 +3373,7 @@ unlock:
> >      RXQ_RECV,                                                 \
> >      NULL,                       /* rx_wait */                 \
> >      NULL,                       /* rxq_drain */               \
> > +    TXQ_DRAIN,      /* txq_drain */               \
> >  }
> >
> >  static const struct netdev_class dpdk_class =
> > @@ -3326,7 +3390,8 @@ static const struct netdev_class dpdk_class =
> >          netdev_dpdk_get_features,
> >          netdev_dpdk_get_status,
> >          netdev_dpdk_reconfigure,
> > -        netdev_dpdk_rxq_recv);
> > +        netdev_dpdk_rxq_recv,
> > +        netdev_dpdk_txq_drain);
> >
> >  static const struct netdev_class dpdk_ring_class =
> >      NETDEV_DPDK_CLASS(
> > @@ -3342,7 +3407,8 @@ static const struct netdev_class dpdk_ring_class =
> >          netdev_dpdk_get_features,
> >          netdev_dpdk_get_status,
> >          netdev_dpdk_reconfigure,
> > -        netdev_dpdk_rxq_recv);
> > +        netdev_dpdk_rxq_recv,
> > +        NULL);
> >
> >  static const struct netdev_class dpdk_vhost_class =
> >      NETDEV_DPDK_CLASS(
> > @@ -3358,7 +3424,8 @@ static const struct netdev_class dpdk_vhost_class
> =
> >          NULL,
> >          NULL,
> >          netdev_dpdk_vhost_reconfigure,
> > -        netdev_dpdk_vhost_rxq_recv);
> > +        netdev_dpdk_vhost_rxq_recv,
> > +        NULL);
> >  static const struct netdev_class dpdk_vhost_client_class =
> >      NETDEV_DPDK_CLASS(
> >          "dpdkvhostuserclient",
> > @@ -3373,7 +3440,8 @@ static const struct netdev_class
> dpdk_vhost_client_class =
> >          NULL,
> >          NULL,
> >          netdev_dpdk_vhost_client_reconfigure,
> > -        netdev_dpdk_vhost_rxq_recv);
> > +        netdev_dpdk_vhost_rxq_recv,
> > +        NULL);
> >
> >  void
> >  netdev_dpdk_register(void)
> > diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
> > index e6e36cd..6a8ad45 100644
> > --- a/lib/netdev-dummy.c
> > +++ b/lib/netdev-dummy.c
> > @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
> >      netdev_dummy_rxq_recv,                                      \
> >      netdev_dummy_rxq_wait,                                      \
> >      netdev_dummy_rxq_drain,                                     \
> > +    NULL,                                                       \
> >  }
> >
> >  static const struct netdev_class dummy_class =
> > diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
> > index a5a9ec1..2499b3e 100644
> > --- a/lib/netdev-linux.c
> > +++ b/lib/netdev-linux.c
> > @@ -2831,6 +2831,7 @@ netdev_linux_update_flags(struct netdev *netdev_,
> enum netdev_flags off,
> >      netdev_linux_rxq_recv,                                      \
> >      netdev_linux_rxq_wait,                                      \
> >      netdev_linux_rxq_drain,                                     \
> > +    NULL,                                                       \
> >  }
> >
> >  const struct netdev_class netdev_linux_class =
> > diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> > index 8346fc4..97e72c6 100644
> > --- a/lib/netdev-provider.h
> > +++ b/lib/netdev-provider.h
> > @@ -335,6 +335,11 @@ struct netdev_class {
> >       * If the function returns a non-zero value, some of the packets
> might have
> >       * been sent anyway.
> >       *
> > +     * Some netdev provider - like in case of 'dpdk' - may buffer the
> batch
> > +     * of packets into an intermediate queue.  Buffered packets will be
> sent
> > +     * out when their number will exceed a threshold or by the periodic
> call
> > +     * to the drain function.
> > +     *
> >       * If 'may_steal' is false, the caller retains ownership of all the
> >       * packets.  If 'may_steal' is true, the caller transfers ownership
> of all
> >       * the packets to the network device, regardless of success.
> > @@ -769,6 +774,9 @@ struct netdev_class {
> >
> >      /* Discards all packets waiting to be received from 'rx'. */
> >      int (*rxq_drain)(struct netdev_rxq *rx);
> > +
> > +    /* Drain all packets waiting to be sent on queue 'qid'. */
> > +    int (*txq_drain)(struct netdev *netdev, int qid, bool
> concurrent_txq);
> >  };
> >
> >  int netdev_register_provider(const struct netdev_class *);
> > diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
> > index 4c2ced5..77225b8 100644
> > --- a/lib/netdev-vport.c
> > +++ b/lib/netdev-vport.c
> > @@ -838,7 +838,8 @@ get_stats(const struct netdev *netdev, struct
> netdev_stats *stats)
> >      NULL,                   /* rx_dealloc */                \
> >      NULL,                   /* rx_recv */                   \
> >      NULL,                   /* rx_wait */                   \
> > -    NULL,                   /* rx_drain */
> > +    NULL,                   /* rx_drain */                  \
> > +    NULL,                   /* tx_drain */
> >
> >
> >  #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER,
> POP_HEADER)   \
> > diff --git a/lib/netdev.c b/lib/netdev.c
> > index 839b1f6..0d48e41 100644
> > --- a/lib/netdev.c
> > +++ b/lib/netdev.c
> > @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
> >              : 0);
> >  }
> >
> > +/* Flush packets on the queue 'qid'. */
> > +int
> > +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
> > +{
> > +    return (netdev->netdev_class->txq_drain
> > +            ? netdev->netdev_class->txq_drain(netdev, qid,
> netdev_txq_drain)
> > +            : EOPNOTSUPP);
> > +}
> > +
> >  /* Configures the number of tx queues of 'netdev'. Returns 0 if
> successful,
> >   * otherwise a positive errno value.
> >   *
> > diff --git a/lib/netdev.h b/lib/netdev.h
> > index bef9cdd..49969a1 100644
> > --- a/lib/netdev.h
> > +++ b/lib/netdev.h
> > @@ -153,6 +153,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
> >  int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
> >                  bool may_steal, bool concurrent_txq);
> >  void netdev_send_wait(struct netdev *, int qid);
> > +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
> >
> >  /* native tunnel APIs */
> >  /* Structure to pass parameters required to build a tunnel header. */
> >


More information about the dev mailing list