[ovs-dev] [urcu v2 13/15] ovs-thread: Replace ovsthread_counter by more general ovsthread_stats.

Ben Pfaff blp at nicira.com
Tue Mar 18 23:42:02 UTC 2014


I'm not sure whether I've satisfied your concerns here or if you'd
still like to see some changes.

On Fri, Mar 14, 2014 at 11:19:11AM -0700, Ben Pfaff wrote:
> I think that using atomics and using per-thread or per-NUMA node
> buckets is orthogonal: one can use either technique without the other.
> 
> I am reluctant to use atomics here because I think we need 64-bit
> stats (packet counters can definitely roll over past 2**32) and 64-bit
> atomics are internally implemented via a mutex on 32-bit systems.
> 
> I agree that per-NUMA node buckets, as opposed to the per-thread
> buckets I have here, would also work well, but it is more difficult to
> determine the current NUMA node from userspace than it is in the
> kernel.
> 
> On Fri, Mar 14, 2014 at 02:25:04AM -0700, Andy Zhou wrote:
> > How about using atomic operations to maintain stats instead of per thread
> > buckets? It seems to be a win at least within a NUMA node from kernel
> > datapath experience.
> > 
> > 
> > On Tue, Mar 11, 2014 at 1:56 PM, Ben Pfaff <blp at nicira.com> wrote:
> > 
> > > This allows clients to do more than just increment a counter.  The
> > > following commit will make the first use of that feature.
> > >
> > > Signed-off-by: Ben Pfaff <blp at nicira.com>
> > > ---
> > >  lib/dpif-netdev.c |   75 +++++++++++++++++++++++++++++++++----------
> > >  lib/ovs-thread.c  |   91
> > > ++++++++++++++++++-----------------------------------
> > >  lib/ovs-thread.h  |   23 +++++++++++---
> > >  3 files changed, 108 insertions(+), 81 deletions(-)
> > >
> > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > > index 54b8f50..268b04d 100644
> > > --- a/lib/dpif-netdev.c
> > > +++ b/lib/dpif-netdev.c
> > > @@ -147,10 +147,8 @@ struct dp_netdev {
> > >
> > >      /* Statistics.
> > >       *
> > > -     * ovsthread_counter is internally synchronized. */
> > > -    struct ovsthread_counter *n_hit;    /* Number of flow table matches.
> > > */
> > > -    struct ovsthread_counter *n_missed; /* Number of flow table misses. */
> > > -    struct ovsthread_counter *n_lost;   /* Number of misses not passed
> > > up. */
> > > +     * ovsthread_stats is internally synchronized. */
> > > +    struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'.
> > > */
> > >
> > >      /* Ports.
> > >       *
> > > @@ -170,6 +168,22 @@ static struct dp_netdev_port
> > > *dp_netdev_lookup_port(const struct dp_netdev *dp,
> > >                                                      odp_port_t)
> > >      OVS_REQ_RDLOCK(dp->port_rwlock);
> > >
> > > +enum dp_stat_type {
> > > +    DP_STAT_HIT,                /* Packets that matched in the flow
> > > table. */
> > > +    DP_STAT_MISS,               /* Packets that did not match. */
> > > +    DP_STAT_LOST,               /* Packets not passed up to the client. */
> > > +    DP_N_STATS
> > > +};
> > > +
> > > +/* Contained by struct dp_netdev's 'stats' member.  */
> > > +struct dp_netdev_stats {
> > > +    struct ovs_mutex mutex;          /* Protects 'n'. */
> > > +
> > > +    /* Indexed by DP_STAT_*, protected by 'mutex'. */
> > > +    unsigned long long int n[DP_N_STATS] OVS_GUARDED;
> > > +};
> > > +
> > > +
> > >  /* A port in a netdev-based datapath. */
> > >  struct dp_netdev_port {
> > >      struct hmap_node node;      /* Node in dp_netdev's 'ports'. */
> > > @@ -461,9 +475,7 @@ create_dp_netdev(const char *name, const struct
> > > dpif_class *class,
> > >      ovs_mutex_unlock(&dp->queue_mutex);
> > >      dp->queue_seq = seq_create();
> > >
> > > -    dp->n_hit = ovsthread_counter_create();
> > > -    dp->n_missed = ovsthread_counter_create();
> > > -    dp->n_lost = ovsthread_counter_create();
> > > +    ovsthread_stats_init(&dp->stats);
> > >
> > >      ovs_rwlock_init(&dp->port_rwlock);
> > >      hmap_init(&dp->ports);
> > > @@ -532,6 +544,8 @@ dp_netdev_free(struct dp_netdev *dp)
> > >      OVS_REQUIRES(dp_netdev_mutex)
> > >  {
> > >      struct dp_netdev_port *port, *next;
> > > +    struct dp_netdev_stats *bucket;
> > > +    int i;
> > >
> > >      shash_find_and_delete(&dp_netdevs, dp->name);
> > >
> > > @@ -544,9 +558,12 @@ dp_netdev_free(struct dp_netdev *dp)
> > >          do_del_port(dp, port->port_no);
> > >      }
> > >      ovs_rwlock_unlock(&dp->port_rwlock);
> > > -    ovsthread_counter_destroy(dp->n_hit);
> > > -    ovsthread_counter_destroy(dp->n_missed);
> > > -    ovsthread_counter_destroy(dp->n_lost);
> > > +
> > > +    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> > > +        ovs_mutex_destroy(&bucket->mutex);
> > > +        free_cacheline(bucket);
> > > +    }
> > > +    ovsthread_stats_destroy(&dp->stats);
> > >
> > >      dp_netdev_purge_queues(dp);
> > >      seq_destroy(dp->queue_seq);
> > > @@ -604,14 +621,21 @@ static int
> > >  dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats
> > > *stats)
> > >  {
> > >      struct dp_netdev *dp = get_dp_netdev(dpif);
> > > +    struct dp_netdev_stats *bucket;
> > > +    size_t i;
> > >
> > >      fat_rwlock_rdlock(&dp->cls.rwlock);
> > >      stats->n_flows = hmap_count(&dp->flow_table);
> > >      fat_rwlock_unlock(&dp->cls.rwlock);
> > >
> > > -    stats->n_hit = ovsthread_counter_read(dp->n_hit);
> > > -    stats->n_missed = ovsthread_counter_read(dp->n_missed);
> > > -    stats->n_lost = ovsthread_counter_read(dp->n_lost);
> > > +    stats->n_hit = stats->n_missed = stats->n_lost = 0;
> > > +    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> > > +        ovs_mutex_lock(&bucket->mutex);
> > > +        stats->n_hit += bucket->n[DP_STAT_HIT];
> > > +        stats->n_missed += bucket->n[DP_STAT_MISS];
> > > +        stats->n_lost += bucket->n[DP_STAT_LOST];
> > > +        ovs_mutex_unlock(&bucket->mutex);
> > > +    }
> > >      stats->n_masks = UINT32_MAX;
> > >      stats->n_mask_hit = UINT64_MAX;
> > >
> > > @@ -1711,6 +1735,25 @@ dp_netdev_flow_used(struct dp_netdev_flow
> > > *netdev_flow,
> > >      netdev_flow->tcp_flags |= packet_get_tcp_flags(packet,
> > > &netdev_flow->flow);
> > >  }
> > >
> > > +static void *
> > > +dp_netdev_stats_new_cb(void)
> > > +{
> > > +    struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
> > > +    ovs_mutex_init(&bucket->mutex);
> > > +    return bucket;
> > > +}
> > > +
> > > +static void
> > > +dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
> > > +{
> > > +    struct dp_netdev_stats *bucket;
> > > +
> > > +    bucket = ovsthread_stats_bucket_get(&dp->stats,
> > > dp_netdev_stats_new_cb);
> > > +    ovs_mutex_lock(&bucket->mutex);
> > > +    bucket->n[type]++;
> > > +    ovs_mutex_unlock(&bucket->mutex);
> > > +}
> > > +
> > >  static void
> > >  dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
> > >                       struct pkt_metadata *md)
> > > @@ -1736,9 +1779,9 @@ dp_netdev_port_input(struct dp_netdev *dp, struct
> > > ofpbuf *packet,
> > >                                    actions->actions, actions->size);
> > >          dp_netdev_actions_unref(actions);
> > >          dp_netdev_flow_unref(netdev_flow);
> > > -        ovsthread_counter_inc(dp->n_hit, 1);
> > > +        dp_netdev_count_packet(dp, DP_STAT_HIT);
> > >      } else {
> > > -        ovsthread_counter_inc(dp->n_missed, 1);
> > > +        dp_netdev_count_packet(dp, DP_STAT_MISS);
> > >          dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
> > >      }
> > >  }
> > > @@ -1788,7 +1831,7 @@ dp_netdev_output_userspace(struct dp_netdev *dp,
> > > struct ofpbuf *packet,
> > >
> > >          error = 0;
> > >      } else {
> > > -        ovsthread_counter_inc(dp->n_lost, 1);
> > > +        dp_netdev_count_packet(dp, DP_STAT_LOST);
> > >          error = ENOBUFS;
> > >      }
> > >      ovs_mutex_unlock(&dp->queue_mutex);
> > > diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> > > index 33b9e08..d313c5a 100644
> > > --- a/lib/ovs-thread.c
> > > +++ b/lib/ovs-thread.c
> > > @@ -377,83 +377,54 @@ may_fork(void)
> > >      return !must_not_fork;
> > >  }
> > >
> > > -/* ovsthread_counter.
> > > - *
> > > - * We implement the counter as an array of N_COUNTERS individual
> > > counters, each
> > > - * with its own lock.  Each thread uses one of the counters chosen based
> > > on a
> > > - * hash of the thread's ID, the idea being that, statistically, different
> > > - * threads will tend to use different counters and therefore avoid
> > > - * interfering with each other.
> > > - *
> > > - * Undoubtedly, better implementations are possible. */
> > > -
> > > -/* Basic counter structure. */
> > > -struct ovsthread_counter__ {
> > > -    struct ovs_mutex mutex;
> > > -    unsigned long long int value;
> > > -};
> > > -
> > > -/* Pad the basic counter structure to 64 bytes to avoid cache line
> > > - * interference. */
> > > -struct ovsthread_counter {
> > > -    struct ovsthread_counter__ c;
> > > -    char pad[ROUND_UP(sizeof(struct ovsthread_counter__), 64)
> > > -             - sizeof(struct ovsthread_counter__)];
> > > -};
> > > -
> > > -#define N_COUNTERS 16
> > > +/* ovsthread_stats. */
> > >
> > > -struct ovsthread_counter *
> > > -ovsthread_counter_create(void)
> > > +void
> > > +ovsthread_stats_init(struct ovsthread_stats *stats)
> > >  {
> > > -    struct ovsthread_counter *c;
> > >      int i;
> > >
> > > -    c = xmalloc(N_COUNTERS * sizeof *c);
> > > -    for (i = 0; i < N_COUNTERS; i++) {
> > > -        ovs_mutex_init(&c[i].c.mutex);
> > > -        c[i].c.value = 0;
> > > +    ovs_mutex_init(&stats->mutex);
> > > +    for (i = 0; i < ARRAY_SIZE(stats->buckets); i++) {
> > > +        stats->buckets[i] = NULL;
> > >      }
> > > -    return c;
> > >  }
> > >
> > >  void
> > > -ovsthread_counter_destroy(struct ovsthread_counter *c)
> > > +ovsthread_stats_destroy(struct ovsthread_stats *stats)
> > >  {
> > > -    if (c) {
> > > -        int i;
> > > -
> > > -        for (i = 0; i < N_COUNTERS; i++) {
> > > -            ovs_mutex_destroy(&c[i].c.mutex);
> > > -        }
> > > -        free(c);
> > > -    }
> > > +    ovs_mutex_destroy(&stats->mutex);
> > >  }
> > >
> > > -void
> > > -ovsthread_counter_inc(struct ovsthread_counter *c, unsigned long long int
> > > n)
> > > +void *
> > > +ovsthread_stats_bucket_get(struct ovsthread_stats *stats,
> > > +                           void *(*new_bucket)(void))
> > >  {
> > > -    c = &c[hash_int(ovsthread_id_self(), 0) % N_COUNTERS];
> > > -
> > > -    ovs_mutex_lock(&c->c.mutex);
> > > -    c->c.value += n;
> > > -    ovs_mutex_unlock(&c->c.mutex);
> > > +    unsigned int hash = hash_int(ovsthread_id_self(), 0);
> > > +    unsigned int idx = hash & (ARRAY_SIZE(stats->buckets) - 1);
> > > +    void *bucket = stats->buckets[idx];
> > > +    if (!bucket) {
> > > +        ovs_mutex_lock(&stats->mutex);
> > > +        bucket = stats->buckets[idx];
> > > +        if (!bucket) {
> > > +            bucket = stats->buckets[idx] = new_bucket();
> > > +        }
> > > +        ovs_mutex_unlock(&stats->mutex);
> > > +    }
> > > +    return bucket;
> > >  }
> > >
> > > -unsigned long long int
> > > -ovsthread_counter_read(const struct ovsthread_counter *c)
> > > +size_t
> > > +ovs_thread_stats_next_bucket(const struct ovsthread_stats *stats, size_t
> > > i)
> > >  {
> > > -    unsigned long long int sum;
> > > -    int i;
> > > -
> > > -    sum = 0;
> > > -    for (i = 0; i < N_COUNTERS; i++) {
> > > -        ovs_mutex_lock(&c[i].c.mutex);
> > > -        sum += c[i].c.value;
> > > -        ovs_mutex_unlock(&c[i].c.mutex);
> > > +    for (; i < ARRAY_SIZE(stats->buckets); i++) {
> > > +        if (stats->buckets[i]) {
> > > +            break;
> > > +        }
> > >      }
> > > -    return sum;
> > > +    return i;
> > >  }
> > > +
> > >
> > >  /* Parses /proc/cpuinfo for the total number of physical cores on this
> > > system
> > >   * across all CPU packages, not counting hyper-threads.
> > > diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> > > index f489308..cf1de64 100644
> > > --- a/lib/ovs-thread.h
> > > +++ b/lib/ovs-thread.h
> > > @@ -592,11 +592,24 @@ ovsthread_id_self(void)
> > >   *
> > >   * Fully thread-safe. */
> > >
> > > -struct ovsthread_counter *ovsthread_counter_create(void);
> > > -void ovsthread_counter_destroy(struct ovsthread_counter *);
> > > -void ovsthread_counter_inc(struct ovsthread_counter *, unsigned long long
> > > int);
> > > -unsigned long long int ovsthread_counter_read(
> > > -    const struct ovsthread_counter *);
> > > +struct ovsthread_stats {
> > > +    struct ovs_mutex mutex;
> > > +    void *volatile buckets[16];
> > > +};
> > > +
> > > +void ovsthread_stats_init(struct ovsthread_stats *);
> > > +void ovsthread_stats_destroy(struct ovsthread_stats *);
> > > +
> > > +void *ovsthread_stats_bucket_get(struct ovsthread_stats *,
> > > +                                 void *(*new_bucket)(void));
> > > +
> > > +#define OVSTHREAD_STATS_FOR_EACH_BUCKET(BUCKET, IDX, STATS)             \
> > > +    for ((IDX) = ovs_thread_stats_next_bucket(STATS, 0);                \
> > > +         ((IDX) < ARRAY_SIZE((STATS)->buckets)                          \
> > > +          ? ((BUCKET) = (STATS)->buckets[IDX], true)                    \
> > > +          : false);                                                     \
> > > +         (IDX) = ovs_thread_stats_next_bucket(STATS, (IDX) + 1))
> > > +size_t ovs_thread_stats_next_bucket(const struct ovsthread_stats *,
> > > size_t);
> > >
> > >  bool single_threaded(void);
> > >
> > > --
> > > 1.7.10.4
> > >
> > > _______________________________________________
> > > dev mailing list
> > > dev at openvswitch.org
> > > http://openvswitch.org/mailman/listinfo/dev
> > >



More information about the dev mailing list