[ovs-dev] [PATCH v8 5/6] dpif-netdev: Time based output batching.

Kevin Traynor ktraynor at redhat.com
Tue Dec 19 18:45:59 UTC 2017


hi Ilya,

On 12/14/2017 11:59 AM, Ilya Maximets wrote:
> This allows to collect packets from more than one RX burst
> and send them together with a configurable intervals.
> 
> 'other_config:tx-flush-interval' can be used to configure
> time that a packet can wait in output batch for sending.
> 
> dpif-netdev turned to microsecond resolution for time
> measuring to ensure desired resolution of 'tx-flush-interval'.
> 
> Acked-by: Eelco Chaudron <echaudro at redhat.com>
> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
> ---
>  lib/dpif-netdev.c    | 149 +++++++++++++++++++++++++++++++++++++++------------
>  vswitchd/vswitch.xml |  16 ++++++
>  2 files changed, 131 insertions(+), 34 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index d7f6171..f5a7793 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>  #define MAX_RECIRC_DEPTH 6
>  DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>  
> +/* Use instant packet send by default. */
> +#define DEFAULT_TX_FLUSH_INTERVAL 0
> +
>  /* Configuration parameters. */
>  enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
>  enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
> @@ -178,12 +181,13 @@ struct emc_cache {
>  
>  /* Simple non-wildcarding single-priority classifier. */
>  
> -/* Time in ms between successive optimizations of the dpcls subtable vector */
> -#define DPCLS_OPTIMIZATION_INTERVAL 1000
> +/* Time in microseconds between successive optimizations of the dpcls
> + * subtable vector */
> +#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
>  
> -/* Time in ms of the interval in which rxq processing cycles used in
> - * rxq to pmd assignments is measured and stored. */
> -#define PMD_RXQ_INTERVAL_LEN 10000
> +/* Time in microseconds of the interval in which rxq processing cycles used
> + * in rxq to pmd assignments is measured and stored. */
> +#define PMD_RXQ_INTERVAL_LEN 10000000LL
>  
>  /* Number of intervals for which cycles are stored
>   * and used during rxq to pmd assignment. */
> @@ -270,6 +274,9 @@ struct dp_netdev {
>      struct hmap ports;
>      struct seq *port_seq;       /* Incremented whenever a port changes. */
>  
> +    /* The time that a packet can wait in output batch for sending. */
> +    atomic_uint32_t tx_flush_interval;
> +
>      /* Meters. */
>      struct ovs_mutex meter_locks[N_METER_LOCKS];
>      struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
> @@ -356,7 +363,7 @@ enum rxq_cycles_counter_type {
>      RXQ_N_CYCLES
>  };
>  
> -#define XPS_TIMEOUT_MS 500LL
> +#define XPS_TIMEOUT 500000LL    /* In microseconds. */
>  
>  /* Contained by struct dp_netdev_port's 'rxqs' member.  */
>  struct dp_netdev_rxq {
> @@ -526,6 +533,7 @@ struct tx_port {
>      int qid;
>      long long last_used;
>      struct hmap_node node;
> +    long long flush_time;
>      struct dp_packet_batch output_pkts;
>  };
>  
> @@ -612,6 +620,9 @@ struct dp_netdev_pmd_thread {
>       * than 'cmap_count(dp->poll_threads)'. */
>      uint32_t static_tx_qid;
>  
> +    /* Number of filled output batches. */
> +    int n_output_batches;
> +
>      struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
>      /* List of rx queues to poll. */
>      struct hmap poll_list OVS_GUARDED;
> @@ -705,8 +716,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
>  static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
>                                         struct rxq_poll *poll)
>      OVS_REQUIRES(pmd->port_mutex);
> -static void
> -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
> +static int
> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> +                                   bool force);
>  
>  static void reconfigure_datapath(struct dp_netdev *dp)
>      OVS_REQUIRES(dp->port_mutex);
> @@ -797,7 +809,7 @@ emc_cache_slow_sweep(struct emc_cache *flow_cache)
>  static inline void
>  pmd_thread_ctx_time_update(struct dp_netdev_pmd_thread *pmd)
>  {
> -    pmd->ctx.now = time_msec();
> +    pmd->ctx.now = time_usec();
>  }
>  
>  /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */
> @@ -1297,6 +1309,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>      conntrack_init(&dp->conntrack);
>  
>      atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
> +    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
>  
>      cmap_init(&dp->poll_threads);
>  
> @@ -2967,7 +2980,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
>      dp_packet_batch_init_packet(&pp, execute->packet);
>      dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>                                execute->actions, execute->actions_len);
> -    dp_netdev_pmd_flush_output_packets(pmd);
> +    dp_netdev_pmd_flush_output_packets(pmd, true);
>  
>      if (pmd->core_id == NON_PMD_CORE_ID) {
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
> @@ -3016,6 +3029,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
>          smap_get_ullong(other_config, "emc-insert-inv-prob",
>                          DEFAULT_EM_FLOW_INSERT_INV_PROB);
>      uint32_t insert_min, cur_min;
> +    uint32_t tx_flush_interval, cur_tx_flush_interval;
> +
> +    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
> +                                     DEFAULT_TX_FLUSH_INTERVAL);
> +    atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
> +    if (tx_flush_interval != cur_tx_flush_interval) {
> +        atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
> +        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
> +                  tx_flush_interval);
> +    }
>  
>      if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>          free(dp->pmd_cmask);
> @@ -3254,12 +3277,14 @@ dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
>      return processing_cycles;
>  }
>  
> -static void
> +static int
>  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>                                     struct tx_port *p)
>  {
>      int tx_qid;
> +    int output_cnt;
>      bool dynamic_txqs;
> +    uint32_t tx_flush_interval;
>  
>      dynamic_txqs = p->port->dynamic_txqs;
>      if (dynamic_txqs) {
> @@ -3268,20 +3293,40 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
>          tx_qid = pmd->static_tx_qid;
>      }
>  
> +    output_cnt = dp_packet_batch_size(&p->output_pkts);
> +    ovs_assert(output_cnt > 0);
> +
>      netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>      dp_packet_batch_init(&p->output_pkts);
> +
> +    /* Update time of the next flush. */
> +    atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
> +    p->flush_time = pmd->ctx.now + tx_flush_interval;
> +
> +    ovs_assert(pmd->n_output_batches > 0);
> +    pmd->n_output_batches--;
> +
> +    return output_cnt;
>  }
>  
> -static void
> -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
> +static int
> +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> +                                   bool force)
>  {
>      struct tx_port *p;
> +    int output_cnt = 0;
> +
> +    if (!pmd->n_output_batches) {
> +        return 0;
> +    }
>  
>      HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
> -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
> -            dp_netdev_pmd_flush_output_on_port(pmd, p);
> +        if (!dp_packet_batch_is_empty(&p->output_pkts)
> +            && (force || pmd->ctx.now >= p->flush_time)) {
> +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
>          }
>      }
> +    return output_cnt;
>  }
>  
>  static int
> @@ -3291,7 +3336,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>  {
>      struct dp_packet_batch batch;
>      int error;
> -    int batch_cnt = 0;
> +    int batch_cnt = 0, output_cnt = 0;
>  
>      dp_packet_batch_init(&batch);
>      error = netdev_rxq_recv(rx, &batch);
> @@ -3301,7 +3346,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>  
>          batch_cnt = batch.count;
>          dp_netdev_input(pmd, &batch, port_no);
> -        dp_netdev_pmd_flush_output_packets(pmd);
> +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
>      } else if (error != EAGAIN && error != EOPNOTSUPP) {
>          static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>  
> @@ -3309,7 +3354,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>                      netdev_rxq_get_name(rx), ovs_strerror(error));
>      }
>  
> -    return batch_cnt;
> +    return batch_cnt + output_cnt;
>  }
>  
>  static struct tx_port *
> @@ -3932,7 +3977,8 @@ dpif_netdev_run(struct dpif *dpif)
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_pmd_thread *non_pmd;
>      uint64_t new_tnl_seq;
> -    int process_packets = 0;
> +    int process_packets;
> +    bool need_to_flush = true;
>  
>      ovs_mutex_lock(&dp->port_mutex);
>      non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> @@ -3952,11 +3998,25 @@ dpif_netdev_run(struct dpif *dpif)
>                                                process_packets
>                                                ? PMD_CYCLES_PROCESSING
>                                                : PMD_CYCLES_IDLE);
> +                    if (process_packets) {
> +                        need_to_flush = false;
> +                    }
>                  }
>              }
>          }
> +        if (need_to_flush) {
> +            /* We didn't receive anything in the process loop.
> +             * Check if we need to send something.
> +             * There was no time updates on current iteration. */
> +            pmd_thread_ctx_time_update(non_pmd);
> +            process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
> +                                                                 false);
> +            cycles_count_intermediate(non_pmd, NULL, process_packets
> +                                                     ? PMD_CYCLES_PROCESSING
> +                                                     : PMD_CYCLES_IDLE);
> +        }
> +
>          cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
> -        pmd_thread_ctx_time_update(non_pmd);
>          dpif_netdev_xps_revalidate_pmd(non_pmd, false);
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
>  
> @@ -4007,6 +4067,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
>  {
>      struct tx_port *tx_port_cached;
>  
> +    /* Flush all the queued packets. */
> +    dp_netdev_pmd_flush_output_packets(pmd, true);
>      /* Free all used tx queue ids. */
>      dpif_netdev_xps_revalidate_pmd(pmd, true);
>  
> @@ -4105,7 +4167,6 @@ pmd_thread_main(void *f_)
>      bool exiting;
>      int poll_cnt;
>      int i;
> -    int process_packets = 0;
>  
>      poll_list = NULL;
>  
> @@ -4135,6 +4196,9 @@ reload:
>  
>      cycles_count_start(pmd);
>      for (;;) {
> +        int process_packets;
> +        bool need_to_flush = true;
> +
>          for (i = 0; i < poll_cnt; i++) {
>              process_packets =
>                  dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
> @@ -4142,6 +4206,20 @@ reload:
>              cycles_count_intermediate(pmd, poll_list[i].rxq,
>                                        process_packets ? PMD_CYCLES_PROCESSING
>                                                        : PMD_CYCLES_IDLE);
> +            if (process_packets) {
> +                need_to_flush = false;
> +            }
> +        }
> +
> +        if (need_to_flush) {
> +            /* We didn't receive anything in the process loop.
> +             * Check if we need to send something.
> +             * There was no time updates on current iteration. */
> +            pmd_thread_ctx_time_update(pmd);
> +            process_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
> +            cycles_count_intermediate(pmd, NULL,
> +                                      process_packets ? PMD_CYCLES_PROCESSING
> +                                                      : PMD_CYCLES_IDLE);

I noticed the processing cycles for an rxq are not counted here. It
means those processing cycles to tx pkts will no longer be reflected in
the rxq to pmd assignment (or any rxq stats). I realize the tx
processing cycles are now shared so we will have some inaccuracy anyway
but for an individual rxq that has to send to vhost, it could be a
significant % of it's measured cycles.

OTOH, this code seems like it would only hit when there are low rates
(no packets on any queue during the last poll)? so I'm not sure how
significant it would be in the overall rxq-pmd assignment. e.g. if the
rxq is measured as using 2% or 7% of a pmd it's probably fine for
rxq-pmd assignment, whereas 20% or 70% could create a very imbalanced
distribution.

If it was significant for rxq-pmd assignment, I'm thinking the best way
would be to add in the cycles required to tx flush each port to each rxq
that has packets in the flush. It's over counting rather than under
counting but we can't assume any shared batching after a new assignment.

Let me know what you think? Do you think it would only impact the rxq
measurements during low traffic rates?

Kevin.

>          }
>  
>          if (lc++ > 1024) {
> @@ -4150,9 +4228,6 @@ reload:
>              lc = 0;
>  
>              coverage_try_clear();
> -            /* It's possible that the time was not updated on current
> -             * iteration, if there were no received packets. */
> -            pmd_thread_ctx_time_update(pmd);
>              dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
>              if (!ovsrcu_try_quiesce()) {
>                  emc_cache_slow_sweep(&pmd->flow_cache);
> @@ -4238,7 +4313,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
>      memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>  
>      /* All packets will hit the meter at the same time. */
> -    long_delta_t = (now - meter->used); /* msec */
> +    long_delta_t = (now - meter->used) / 1000; /* msec */
>  
>      /* Make sure delta_t will not be too large, so that bucket will not
>       * wrap around below. */
> @@ -4394,7 +4469,7 @@ dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
>          meter->flags = config->flags;
>          meter->n_bands = config->n_bands;
>          meter->max_delta_t = 0;
> -        meter->used = time_msec();
> +        meter->used = time_usec();
>  
>          /* set up bands */
>          for (i = 0; i < config->n_bands; ++i) {
> @@ -4592,6 +4667,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd->core_id = core_id;
>      pmd->numa_id = numa_id;
>      pmd->need_reload = false;
> +    pmd->n_output_batches = 0;
>  
>      ovs_refcount_init(&pmd->ref_cnt);
>      latch_init(&pmd->exit_latch);
> @@ -4779,6 +4855,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
>  
>      tx->port = port;
>      tx->qid = -1;
> +    tx->flush_time = 0LL;
>      dp_packet_batch_init(&tx->output_pkts);
>  
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
> @@ -4942,7 +5019,7 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
>      struct dp_netdev_flow *flow = batch->flow;
>  
>      dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
> -                        batch->tcp_flags, pmd->ctx.now);
> +                        batch->tcp_flags, pmd->ctx.now / 1000);
>  
>      actions = dp_netdev_flow_get_actions(flow);
>  
> @@ -5317,7 +5394,7 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
>              continue;
>          }
>          interval = pmd->ctx.now - tx->last_used;
> -        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
> +        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
>              port = tx->port;
>              ovs_mutex_lock(&port->txq_used_mutex);
>              port->txq_used[tx->qid]--;
> @@ -5338,7 +5415,7 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>      interval = pmd->ctx.now - tx->last_used;
>      tx->last_used = pmd->ctx.now;
>  
> -    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
> +    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
>          return tx->qid;
>      }
>  
> @@ -5470,12 +5547,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
>                  dp_netdev_pmd_flush_output_on_port(pmd, p);
>              }
>  #endif
> -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
> -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
> -                /* Some packets was generated while input batch processing.
> -                 * Flush here to avoid overflow. */
> +            if (dp_packet_batch_size(&p->output_pkts)
> +                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
> +                /* Flush here to avoid overflow. */
>                  dp_netdev_pmd_flush_output_on_port(pmd, p);
>              }
> +
> +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
> +                pmd->n_output_batches++;
> +            }
> +
>              DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>                  dp_packet_batch_add(&p->output_pkts, packet);
>              }
> @@ -5717,7 +5798,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
>          conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
>                            commit, zone, setmark, setlabel, aux->flow->tp_src,
>                            aux->flow->tp_dst, helper, nat_action_info_ref,
> -                          pmd->ctx.now);
> +                          pmd->ctx.now / 1000);
>          break;
>      }
>  
> diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
> index 21ffaf5..bc6b1be 100644
> --- a/vswitchd/vswitch.xml
> +++ b/vswitchd/vswitch.xml
> @@ -359,6 +359,22 @@
>          </p>
>        </column>
>  
> +      <column name="other_config" key="tx-flush-interval"
> +              type='{"type": "integer",
> +                     "minInteger": 0, "maxInteger": 1000000}'>
> +        <p>
> +          Specifies the time in microseconds that a packet can wait in output
> +          batch for sending i.e. amount of time that packet can spend in an
> +          intermediate output queue before sending to netdev.
> +          This option can be used to configure balance between throughput
> +          and latency. Lower values decreases latency while higher values
> +          may be useful to achieve higher performance.
> +        </p>
> +        <p>
> +          Defaults to 0 i.e. instant packet sending (latency optimized).
> +        </p>
> +      </column>
> +
>        <column name="other_config" key="n-handler-threads"
>                type='{"type": "integer", "minInteger": 1}'>
>          <p>
> 



More information about the dev mailing list