[ovs-dev] [urcu v2 13/15] ovs-thread: Replace ovsthread_counter by more general ovsthread_stats.
Ben Pfaff
blp at nicira.com
Tue Mar 18 23:42:02 UTC 2014
I'm not sure whether I've satisfied your concerns here or if you'd
still like to see some changes.
On Fri, Mar 14, 2014 at 11:19:11AM -0700, Ben Pfaff wrote:
> I think that using atomics and using per-thread or per-NUMA node
> buckets is orthogonal: one can use either technique without the other.
>
> I am reluctant to use atomics here because I think we need 64-bit
> stats (packet counters can definitely roll over past 2**32) and 64-bit
> atomics are internally implemented via a mutex on 32-bit systems.
>
> I agree that per-NUMA node buckets, as opposed to the per-thread
> buckets I have here, would also work well, but it is more difficult to
> determine the current NUMA node from userspace than it is in the
> kernel.
>
> On Fri, Mar 14, 2014 at 02:25:04AM -0700, Andy Zhou wrote:
> > How about using atomic operations to maintain stats instead of per thread
> > buckets? It seems to be a win at least within a NUMA node from kernel
> > datapath experience.
> >
> >
> > On Tue, Mar 11, 2014 at 1:56 PM, Ben Pfaff <blp at nicira.com> wrote:
> >
> > > This allows clients to do more than just increment a counter. The
> > > following commit will make the first use of that feature.
> > >
> > > Signed-off-by: Ben Pfaff <blp at nicira.com>
> > > ---
> > > lib/dpif-netdev.c | 75 +++++++++++++++++++++++++++++++++----------
> > > lib/ovs-thread.c | 91
> > > ++++++++++++++++++-----------------------------------
> > > lib/ovs-thread.h | 23 +++++++++++---
> > > 3 files changed, 108 insertions(+), 81 deletions(-)
> > >
> > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > > index 54b8f50..268b04d 100644
> > > --- a/lib/dpif-netdev.c
> > > +++ b/lib/dpif-netdev.c
> > > @@ -147,10 +147,8 @@ struct dp_netdev {
> > >
> > > /* Statistics.
> > > *
> > > - * ovsthread_counter is internally synchronized. */
> > > - struct ovsthread_counter *n_hit; /* Number of flow table matches.
> > > */
> > > - struct ovsthread_counter *n_missed; /* Number of flow table misses. */
> > > - struct ovsthread_counter *n_lost; /* Number of misses not passed
> > > up. */
> > > + * ovsthread_stats is internally synchronized. */
> > > + struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'.
> > > */
> > >
> > > /* Ports.
> > > *
> > > @@ -170,6 +168,22 @@ static struct dp_netdev_port
> > > *dp_netdev_lookup_port(const struct dp_netdev *dp,
> > > odp_port_t)
> > > OVS_REQ_RDLOCK(dp->port_rwlock);
> > >
> > > +enum dp_stat_type {
> > > + DP_STAT_HIT, /* Packets that matched in the flow
> > > table. */
> > > + DP_STAT_MISS, /* Packets that did not match. */
> > > + DP_STAT_LOST, /* Packets not passed up to the client. */
> > > + DP_N_STATS
> > > +};
> > > +
> > > +/* Contained by struct dp_netdev's 'stats' member. */
> > > +struct dp_netdev_stats {
> > > + struct ovs_mutex mutex; /* Protects 'n'. */
> > > +
> > > + /* Indexed by DP_STAT_*, protected by 'mutex'. */
> > > + unsigned long long int n[DP_N_STATS] OVS_GUARDED;
> > > +};
> > > +
> > > +
> > > /* A port in a netdev-based datapath. */
> > > struct dp_netdev_port {
> > > struct hmap_node node; /* Node in dp_netdev's 'ports'. */
> > > @@ -461,9 +475,7 @@ create_dp_netdev(const char *name, const struct
> > > dpif_class *class,
> > > ovs_mutex_unlock(&dp->queue_mutex);
> > > dp->queue_seq = seq_create();
> > >
> > > - dp->n_hit = ovsthread_counter_create();
> > > - dp->n_missed = ovsthread_counter_create();
> > > - dp->n_lost = ovsthread_counter_create();
> > > + ovsthread_stats_init(&dp->stats);
> > >
> > > ovs_rwlock_init(&dp->port_rwlock);
> > > hmap_init(&dp->ports);
> > > @@ -532,6 +544,8 @@ dp_netdev_free(struct dp_netdev *dp)
> > > OVS_REQUIRES(dp_netdev_mutex)
> > > {
> > > struct dp_netdev_port *port, *next;
> > > + struct dp_netdev_stats *bucket;
> > > + int i;
> > >
> > > shash_find_and_delete(&dp_netdevs, dp->name);
> > >
> > > @@ -544,9 +558,12 @@ dp_netdev_free(struct dp_netdev *dp)
> > > do_del_port(dp, port->port_no);
> > > }
> > > ovs_rwlock_unlock(&dp->port_rwlock);
> > > - ovsthread_counter_destroy(dp->n_hit);
> > > - ovsthread_counter_destroy(dp->n_missed);
> > > - ovsthread_counter_destroy(dp->n_lost);
> > > +
> > > + OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> > > + ovs_mutex_destroy(&bucket->mutex);
> > > + free_cacheline(bucket);
> > > + }
> > > + ovsthread_stats_destroy(&dp->stats);
> > >
> > > dp_netdev_purge_queues(dp);
> > > seq_destroy(dp->queue_seq);
> > > @@ -604,14 +621,21 @@ static int
> > > dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats
> > > *stats)
> > > {
> > > struct dp_netdev *dp = get_dp_netdev(dpif);
> > > + struct dp_netdev_stats *bucket;
> > > + size_t i;
> > >
> > > fat_rwlock_rdlock(&dp->cls.rwlock);
> > > stats->n_flows = hmap_count(&dp->flow_table);
> > > fat_rwlock_unlock(&dp->cls.rwlock);
> > >
> > > - stats->n_hit = ovsthread_counter_read(dp->n_hit);
> > > - stats->n_missed = ovsthread_counter_read(dp->n_missed);
> > > - stats->n_lost = ovsthread_counter_read(dp->n_lost);
> > > + stats->n_hit = stats->n_missed = stats->n_lost = 0;
> > > + OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> > > + ovs_mutex_lock(&bucket->mutex);
> > > + stats->n_hit += bucket->n[DP_STAT_HIT];
> > > + stats->n_missed += bucket->n[DP_STAT_MISS];
> > > + stats->n_lost += bucket->n[DP_STAT_LOST];
> > > + ovs_mutex_unlock(&bucket->mutex);
> > > + }
> > > stats->n_masks = UINT32_MAX;
> > > stats->n_mask_hit = UINT64_MAX;
> > >
> > > @@ -1711,6 +1735,25 @@ dp_netdev_flow_used(struct dp_netdev_flow
> > > *netdev_flow,
> > > netdev_flow->tcp_flags |= packet_get_tcp_flags(packet,
> > > &netdev_flow->flow);
> > > }
> > >
> > > +static void *
> > > +dp_netdev_stats_new_cb(void)
> > > +{
> > > + struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
> > > + ovs_mutex_init(&bucket->mutex);
> > > + return bucket;
> > > +}
> > > +
> > > +static void
> > > +dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
> > > +{
> > > + struct dp_netdev_stats *bucket;
> > > +
> > > + bucket = ovsthread_stats_bucket_get(&dp->stats,
> > > dp_netdev_stats_new_cb);
> > > + ovs_mutex_lock(&bucket->mutex);
> > > + bucket->n[type]++;
> > > + ovs_mutex_unlock(&bucket->mutex);
> > > +}
> > > +
> > > static void
> > > dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
> > > struct pkt_metadata *md)
> > > @@ -1736,9 +1779,9 @@ dp_netdev_port_input(struct dp_netdev *dp, struct
> > > ofpbuf *packet,
> > > actions->actions, actions->size);
> > > dp_netdev_actions_unref(actions);
> > > dp_netdev_flow_unref(netdev_flow);
> > > - ovsthread_counter_inc(dp->n_hit, 1);
> > > + dp_netdev_count_packet(dp, DP_STAT_HIT);
> > > } else {
> > > - ovsthread_counter_inc(dp->n_missed, 1);
> > > + dp_netdev_count_packet(dp, DP_STAT_MISS);
> > > dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
> > > }
> > > }
> > > @@ -1788,7 +1831,7 @@ dp_netdev_output_userspace(struct dp_netdev *dp,
> > > struct ofpbuf *packet,
> > >
> > > error = 0;
> > > } else {
> > > - ovsthread_counter_inc(dp->n_lost, 1);
> > > + dp_netdev_count_packet(dp, DP_STAT_LOST);
> > > error = ENOBUFS;
> > > }
> > > ovs_mutex_unlock(&dp->queue_mutex);
> > > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> > > index 33b9e08..d313c5a 100644
> > > --- a/lib/ovs-thread.c
> > > +++ b/lib/ovs-thread.c
> > > @@ -377,83 +377,54 @@ may_fork(void)
> > > return !must_not_fork;
> > > }
> > >
> > > -/* ovsthread_counter.
> > > - *
> > > - * We implement the counter as an array of N_COUNTERS individual
> > > counters, each
> > > - * with its own lock. Each thread uses one of the counters chosen based
> > > on a
> > > - * hash of the thread's ID, the idea being that, statistically, different
> > > - * threads will tend to use different counters and therefore avoid
> > > - * interfering with each other.
> > > - *
> > > - * Undoubtedly, better implementations are possible. */
> > > -
> > > -/* Basic counter structure. */
> > > -struct ovsthread_counter__ {
> > > - struct ovs_mutex mutex;
> > > - unsigned long long int value;
> > > -};
> > > -
> > > -/* Pad the basic counter structure to 64 bytes to avoid cache line
> > > - * interference. */
> > > -struct ovsthread_counter {
> > > - struct ovsthread_counter__ c;
> > > - char pad[ROUND_UP(sizeof(struct ovsthread_counter__), 64)
> > > - - sizeof(struct ovsthread_counter__)];
> > > -};
> > > -
> > > -#define N_COUNTERS 16
> > > +/* ovsthread_stats. */
> > >
> > > -struct ovsthread_counter *
> > > -ovsthread_counter_create(void)
> > > +void
> > > +ovsthread_stats_init(struct ovsthread_stats *stats)
> > > {
> > > - struct ovsthread_counter *c;
> > > int i;
> > >
> > > - c = xmalloc(N_COUNTERS * sizeof *c);
> > > - for (i = 0; i < N_COUNTERS; i++) {
> > > - ovs_mutex_init(&c[i].c.mutex);
> > > - c[i].c.value = 0;
> > > + ovs_mutex_init(&stats->mutex);
> > > + for (i = 0; i < ARRAY_SIZE(stats->buckets); i++) {
> > > + stats->buckets[i] = NULL;
> > > }
> > > - return c;
> > > }
> > >
> > > void
> > > -ovsthread_counter_destroy(struct ovsthread_counter *c)
> > > +ovsthread_stats_destroy(struct ovsthread_stats *stats)
> > > {
> > > - if (c) {
> > > - int i;
> > > -
> > > - for (i = 0; i < N_COUNTERS; i++) {
> > > - ovs_mutex_destroy(&c[i].c.mutex);
> > > - }
> > > - free(c);
> > > - }
> > > + ovs_mutex_destroy(&stats->mutex);
> > > }
> > >
> > > -void
> > > -ovsthread_counter_inc(struct ovsthread_counter *c, unsigned long long int
> > > n)
> > > +void *
> > > +ovsthread_stats_bucket_get(struct ovsthread_stats *stats,
> > > + void *(*new_bucket)(void))
> > > {
> > > - c = &c[hash_int(ovsthread_id_self(), 0) % N_COUNTERS];
> > > -
> > > - ovs_mutex_lock(&c->c.mutex);
> > > - c->c.value += n;
> > > - ovs_mutex_unlock(&c->c.mutex);
> > > + unsigned int hash = hash_int(ovsthread_id_self(), 0);
> > > + unsigned int idx = hash & (ARRAY_SIZE(stats->buckets) - 1);
> > > + void *bucket = stats->buckets[idx];
> > > + if (!bucket) {
> > > + ovs_mutex_lock(&stats->mutex);
> > > + bucket = stats->buckets[idx];
> > > + if (!bucket) {
> > > + bucket = stats->buckets[idx] = new_bucket();
> > > + }
> > > + ovs_mutex_unlock(&stats->mutex);
> > > + }
> > > + return bucket;
> > > }
> > >
> > > -unsigned long long int
> > > -ovsthread_counter_read(const struct ovsthread_counter *c)
> > > +size_t
> > > +ovs_thread_stats_next_bucket(const struct ovsthread_stats *stats, size_t
> > > i)
> > > {
> > > - unsigned long long int sum;
> > > - int i;
> > > -
> > > - sum = 0;
> > > - for (i = 0; i < N_COUNTERS; i++) {
> > > - ovs_mutex_lock(&c[i].c.mutex);
> > > - sum += c[i].c.value;
> > > - ovs_mutex_unlock(&c[i].c.mutex);
> > > + for (; i < ARRAY_SIZE(stats->buckets); i++) {
> > > + if (stats->buckets[i]) {
> > > + break;
> > > + }
> > > }
> > > - return sum;
> > > + return i;
> > > }
> > > +
> > >
> > > /* Parses /proc/cpuinfo for the total number of physical cores on this
> > > system
> > > * across all CPU packages, not counting hyper-threads.
> > > diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> > > index f489308..cf1de64 100644
> > > --- a/lib/ovs-thread.h
> > > +++ b/lib/ovs-thread.h
> > > @@ -592,11 +592,24 @@ ovsthread_id_self(void)
> > > *
> > > * Fully thread-safe. */
> > >
> > > -struct ovsthread_counter *ovsthread_counter_create(void);
> > > -void ovsthread_counter_destroy(struct ovsthread_counter *);
> > > -void ovsthread_counter_inc(struct ovsthread_counter *, unsigned long long
> > > int);
> > > -unsigned long long int ovsthread_counter_read(
> > > - const struct ovsthread_counter *);
> > > +struct ovsthread_stats {
> > > + struct ovs_mutex mutex;
> > > + void *volatile buckets[16];
> > > +};
> > > +
> > > +void ovsthread_stats_init(struct ovsthread_stats *);
> > > +void ovsthread_stats_destroy(struct ovsthread_stats *);
> > > +
> > > +void *ovsthread_stats_bucket_get(struct ovsthread_stats *,
> > > + void *(*new_bucket)(void));
> > > +
> > > +#define OVSTHREAD_STATS_FOR_EACH_BUCKET(BUCKET, IDX, STATS) \
> > > + for ((IDX) = ovs_thread_stats_next_bucket(STATS, 0); \
> > > + ((IDX) < ARRAY_SIZE((STATS)->buckets) \
> > > + ? ((BUCKET) = (STATS)->buckets[IDX], true) \
> > > + : false); \
> > > + (IDX) = ovs_thread_stats_next_bucket(STATS, (IDX) + 1))
> > > +size_t ovs_thread_stats_next_bucket(const struct ovsthread_stats *,
> > > size_t);
> > >
> > > bool single_threaded(void);
> > >
> > > --
> > > 1.7.10.4
> > >
> > > _______________________________________________
> > > dev mailing list
> > > dev at openvswitch.org
> > > http://openvswitch.org/mailman/listinfo/dev
> > >
More information about the dev
mailing list