[ovs-dev] [PATCH v5 6/7] dpif-netdev: Time based output batching.
Ilya Maximets
i.maximets at samsung.com
Fri Oct 27 16:19:46 UTC 2017
This allows to collect packets from more than one RX burst
and send them together with a configurable intervals.
'other_config:tx-flush-interval' can be used to configure
time that a packet can wait in output batch for sending.
dpif-netdev turned to microsecond resolution for time
measuring to ensure desired resolution of 'tx-flush-interval'.
Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
---
lib/dpif-netdev.c | 149 +++++++++++++++++++++++++++++++++++++++------------
vswitchd/vswitch.xml | 16 ++++++
2 files changed, 131 insertions(+), 34 deletions(-)
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index cf11352..2c05494 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
#define MAX_RECIRC_DEPTH 5
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
+/* Use instant packet send by default. */
+#define DEFAULT_TX_FLUSH_INTERVAL 0
+
/* Configuration parameters. */
enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */
enum { MAX_METERS = 65536 }; /* Maximum number of meters. */
@@ -178,12 +181,13 @@ struct emc_cache {
/* Simple non-wildcarding single-priority classifier. */
-/* Time in ms between successive optimizations of the dpcls subtable vector */
-#define DPCLS_OPTIMIZATION_INTERVAL 1000
+/* Time in microseconds between successive optimizations of the dpcls
+ * subtable vector */
+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
-/* Time in ms of the interval in which rxq processing cycles used in
- * rxq to pmd assignments is measured and stored. */
-#define PMD_RXQ_INTERVAL_LEN 10000
+/* Time in microseconds of the interval in which rxq processing cycles used
+ * in rxq to pmd assignments is measured and stored. */
+#define PMD_RXQ_INTERVAL_LEN 10000000LL
/* Number of intervals for which cycles are stored
* and used during rxq to pmd assignment. */
@@ -270,6 +274,9 @@ struct dp_netdev {
struct hmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
+ /* The time that a packet can wait in output batch for sending. */
+ atomic_uint32_t tx_flush_interval;
+
/* Meters. */
struct ovs_mutex meter_locks[N_METER_LOCKS];
struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
@@ -356,7 +363,7 @@ enum rxq_cycles_counter_type {
RXQ_N_CYCLES
};
-#define XPS_TIMEOUT_MS 500LL
+#define XPS_TIMEOUT 500000LL /* In microseconds. */
/* Contained by struct dp_netdev_port's 'rxqs' member. */
struct dp_netdev_rxq {
@@ -526,6 +533,7 @@ struct tx_port {
int qid;
long long last_used;
struct hmap_node node;
+ long long flush_time;
struct dp_packet_batch output_pkts;
};
@@ -614,6 +622,9 @@ struct dp_netdev_pmd_thread {
* than 'cmap_count(dp->poll_threads)'. */
uint32_t static_tx_qid;
+ /* Number of filled output batches. */
+ int n_output_batches;
+
struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */
/* List of rx queues to poll. */
struct hmap poll_list OVS_GUARDED;
@@ -707,8 +718,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
struct rxq_poll *poll)
OVS_REQUIRES(pmd->port_mutex);
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+ bool force);
static void reconfigure_datapath(struct dp_netdev *dp)
OVS_REQUIRES(dp->port_mutex);
@@ -783,7 +795,7 @@ emc_cache_slow_sweep(struct emc_cache *flow_cache)
static inline void
pmd_thread_ctx_time_update(struct dp_netdev_pmd_thread *pmd)
{
- pmd->ctx.now = time_msec();
+ pmd->ctx.now = time_usec();
}
/* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */
@@ -1283,6 +1295,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
conntrack_init(&dp->conntrack);
atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+ atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
cmap_init(&dp->poll_threads);
@@ -2950,7 +2963,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
dp_packet_batch_init_packet(&pp, execute->packet);
dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
execute->actions, execute->actions_len);
- dp_netdev_pmd_flush_output_packets(pmd);
+ dp_netdev_pmd_flush_output_packets(pmd, true);
if (pmd->core_id == NON_PMD_CORE_ID) {
ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config)
smap_get_ullong(other_config, "emc-insert-inv-prob",
DEFAULT_EM_FLOW_INSERT_INV_PROB);
uint32_t insert_min, cur_min;
+ uint32_t tx_flush_interval, cur_tx_flush_interval;
+
+ tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
+ DEFAULT_TX_FLUSH_INTERVAL);
+ atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
+ if (tx_flush_interval != cur_tx_flush_interval) {
+ atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
+ VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
+ tx_flush_interval);
+ }
if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
free(dp->pmd_cmask);
@@ -3237,12 +3260,14 @@ dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx)
return processing_cycles;
}
-static void
+static int
dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
struct tx_port *p)
{
int tx_qid;
+ int output_cnt;
bool dynamic_txqs;
+ uint32_t tx_flush_interval;
dynamic_txqs = p->port->dynamic_txqs;
if (dynamic_txqs) {
@@ -3251,20 +3276,40 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
tx_qid = pmd->static_tx_qid;
}
+ output_cnt = dp_packet_batch_size(&p->output_pkts);
+ ovs_assert(output_cnt > 0);
+
netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
dp_packet_batch_init(&p->output_pkts);
+
+ /* Update time of the next flush. */
+ atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
+ p->flush_time = pmd->ctx.now + tx_flush_interval;
+
+ ovs_assert(pmd->n_output_batches > 0);
+ pmd->n_output_batches--;
+
+ return output_cnt;
}
-static void
-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
+static int
+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
+ bool force)
{
struct tx_port *p;
+ int output_cnt = 0;
+
+ if (!pmd->n_output_batches) {
+ return 0;
+ }
HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
- if (!dp_packet_batch_is_empty(&p->output_pkts)) {
- dp_netdev_pmd_flush_output_on_port(pmd, p);
+ if (!dp_packet_batch_is_empty(&p->output_pkts)
+ && (force || pmd->ctx.now >= p->flush_time)) {
+ output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
}
}
+ return output_cnt;
}
static int
@@ -3274,7 +3319,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
{
struct dp_packet_batch batch;
int error;
- int batch_cnt = 0;
+ int batch_cnt = 0, output_cnt = 0;
dp_packet_batch_init(&batch);
error = netdev_rxq_recv(rx, &batch);
@@ -3284,7 +3329,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
batch_cnt = batch.count;
dp_netdev_input(pmd, &batch, port_no);
- dp_netdev_pmd_flush_output_packets(pmd);
+ output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
} else if (error != EAGAIN && error != EOPNOTSUPP) {
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -3292,7 +3337,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
netdev_rxq_get_name(rx), ovs_strerror(error));
}
- return batch_cnt;
+ return batch_cnt + output_cnt;
}
static struct tx_port *
@@ -3904,7 +3949,8 @@ dpif_netdev_run(struct dpif *dpif)
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_pmd_thread *non_pmd;
uint64_t new_tnl_seq;
- int process_packets = 0;
+ int process_packets;
+ bool need_to_flush = true;
ovs_mutex_lock(&dp->port_mutex);
non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
@@ -3924,11 +3970,25 @@ dpif_netdev_run(struct dpif *dpif)
process_packets
? PMD_CYCLES_PROCESSING
: PMD_CYCLES_IDLE);
+ if (process_packets) {
+ need_to_flush = false;
+ }
}
}
}
+ if (need_to_flush) {
+ /* We didn't receive anything in the process loop.
+ * Check if we need to send something.
+ * There was no time updates on current iteration. */
+ pmd_thread_ctx_time_update(non_pmd);
+ process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
+ false);
+ cycles_count_intermediate(non_pmd, NULL, process_packets
+ ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
+ }
+
cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
- pmd_thread_ctx_time_update(non_pmd);
dpif_netdev_xps_revalidate_pmd(non_pmd, false);
ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -3979,6 +4039,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
{
struct tx_port *tx_port_cached;
+ /* Flush all the queued packets. */
+ dp_netdev_pmd_flush_output_packets(pmd, true);
/* Free all used tx queue ids. */
dpif_netdev_xps_revalidate_pmd(pmd, true);
@@ -4077,7 +4139,6 @@ pmd_thread_main(void *f_)
bool exiting;
int poll_cnt;
int i;
- int process_packets = 0;
poll_list = NULL;
@@ -4107,6 +4168,9 @@ reload:
cycles_count_start(pmd);
for (;;) {
+ int process_packets;
+ bool need_to_flush = true;
+
for (i = 0; i < poll_cnt; i++) {
process_packets =
dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
@@ -4114,6 +4178,20 @@ reload:
cycles_count_intermediate(pmd, poll_list[i].rxq,
process_packets ? PMD_CYCLES_PROCESSING
: PMD_CYCLES_IDLE);
+ if (process_packets) {
+ need_to_flush = false;
+ }
+ }
+
+ if (need_to_flush) {
+ /* We didn't receive anything in the process loop.
+ * Check if we need to send something.
+ * There was no time updates on current iteration. */
+ pmd_thread_ctx_time_update(pmd);
+ process_packets = dp_netdev_pmd_flush_output_packets(pmd, false);
+ cycles_count_intermediate(pmd, NULL,
+ process_packets ? PMD_CYCLES_PROCESSING
+ : PMD_CYCLES_IDLE);
}
if (lc++ > 1024) {
@@ -4122,9 +4200,6 @@ reload:
lc = 0;
coverage_try_clear();
- /* It's possible that the time was not updated on current
- * iteration, if there were no received packets. */
- pmd_thread_ctx_time_update(pmd);
dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(&pmd->flow_cache);
@@ -4210,7 +4285,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
/* All packets will hit the meter at the same time. */
- long_delta_t = (now - meter->used); /* msec */
+ long_delta_t = (now - meter->used) / 1000; /* msec */
/* Make sure delta_t will not be too large, so that bucket will not
* wrap around below. */
@@ -4366,7 +4441,7 @@ dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id,
meter->flags = config->flags;
meter->n_bands = config->n_bands;
meter->max_delta_t = 0;
- meter->used = time_msec();
+ meter->used = time_usec();
/* set up bands */
for (i = 0; i < config->n_bands; ++i) {
@@ -4564,6 +4639,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
pmd->core_id = core_id;
pmd->numa_id = numa_id;
pmd->need_reload = false;
+ pmd->n_output_batches = 0;
ovs_refcount_init(&pmd->ref_cnt);
latch_init(&pmd->exit_latch);
@@ -4751,6 +4827,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd,
tx->port = port;
tx->qid = -1;
+ tx->flush_time = 0LL;
dp_packet_batch_init(&tx->output_pkts);
hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
@@ -4914,7 +4991,7 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
struct dp_netdev_flow *flow = batch->flow;
dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
- batch->tcp_flags, pmd->ctx.now);
+ batch->tcp_flags, pmd->ctx.now / 1000);
actions = dp_netdev_flow_get_actions(flow);
@@ -5289,7 +5366,7 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
continue;
}
interval = pmd->ctx.now - tx->last_used;
- if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
port = tx->port;
ovs_mutex_lock(&port->txq_used_mutex);
port->txq_used[tx->qid]--;
@@ -5310,7 +5387,7 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
interval = pmd->ctx.now - tx->last_used;
tx->last_used = pmd->ctx.now;
- if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
return tx->qid;
}
@@ -5442,12 +5519,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
dp_netdev_pmd_flush_output_on_port(pmd, p);
}
#endif
- if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
- + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
- /* Some packets was generated while input batch processing.
- * Flush here to avoid overflow. */
+ if (dp_packet_batch_size(&p->output_pkts)
+ + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+ /* Flush here to avoid overflow. */
dp_netdev_pmd_flush_output_on_port(pmd, p);
}
+
+ if (dp_packet_batch_is_empty(&p->output_pkts)) {
+ pmd->n_output_batches++;
+ }
+
DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
dp_packet_batch_add(&p->output_pkts, packet);
}
@@ -5688,7 +5769,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_,
conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force,
commit, zone, setmark, setlabel, helper,
- nat_action_info_ref, pmd->ctx.now);
+ nat_action_info_ref, pmd->ctx.now / 1000);
break;
}
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index d7f6839..9c3bddd 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -344,6 +344,22 @@
</p>
</column>
+ <column name="other_config" key="tx-flush-interval"
+ type='{"type": "integer",
+ "minInteger": 0, "maxInteger": 1000000}'>
+ <p>
+ Specifies the time in microseconds that a packet can wait in output
+ batch for sending i.e. amount of time that packet can spend in an
+ intermediate output queue before sending to netdev.
+ This option can be used to configure balance between throughput
+ and latency. Lower values decreases latency while higher values
+ may be useful to achieve higher performance.
+ </p>
+ <p>
+ Defaults to 0 i.e. instant packet sending (latency optimized).
+ </p>
+ </column>
+
<column name="other_config" key="n-handler-threads"
type='{"type": "integer", "minInteger": 1}'>
<p>
--
2.7.4
More information about the dev
mailing list