[ovs-dev] [per-pmd ftb/cls 3/3] dpif-netdev: Add per-pmd flow-table/classifier.

Pravin Shelar pshelar at nicira.com
Fri Dec 19 01:12:04 UTC 2014


On Tue, Dec 2, 2014 at 11:12 PM, Alex Wang <alexw at nicira.com> wrote:
> This commit changes the per dpif-netdev datapath flow-table/
> classifier to per pmd-thread.  As direct benefit, datapath
> and flow statistics no longer need to be protected by mutex
> or be declared as per-thread variable, since they are only
> written by the owning pmd thread.
>
> As side effects, the flow-dump output of userspace datapath
> can contain overlapping flows.  To reduce confusion, the dump
> from different pmd thread will be separated by a title line.
> In addition, the flow operations via 'ovs-appctl dpctl/*'
> are modified so that if the given flow in_port corresponds
> to a dpdk interface, the operation will be conducted to all
> pmd threads recv from that interface.
>
> Signed-off-by: Alex Wang <alexw at nicira.com>

Thanks for doing this. I have few comments.
> ---
>  lib/dpctl.c                   |  134 ++++++++++-
>  lib/dpif-netdev.c             |  506 +++++++++++++++++++++++------------------
>  lib/dpif-netlink.c            |    3 +-
>  lib/dpif.c                    |   10 +-
>  lib/dpif.h                    |   34 ++-
>  ofproto/ofproto-dpif-upcall.c |   38 ++--
>  ofproto/ofproto-dpif.c        |    8 +-
>  tests/ofproto-dpif.at         |    6 +
>  8 files changed, 481 insertions(+), 258 deletions(-)
>
> diff --git a/lib/dpctl.c b/lib/dpctl.c
> index 4e41fe4..d5cccdf 100644
> --- a/lib/dpctl.c
> +++ b/lib/dpctl.c
> @@ -26,11 +26,13 @@
>  #include <string.h>
>  #include <unistd.h>
>
> +#include "bitmap.h"
>  #include "command-line.h"
>  #include "compiler.h"
>  #include "dirs.h"
>  #include "dpctl.h"
>  #include "dpif.h"
> +#include "dpif-netdev.h"
>  #include "dynamic-string.h"
>  #include "flow.h"
>  #include "match.h"
> @@ -39,6 +41,7 @@
>  #include "odp-util.h"
>  #include "ofp-parse.h"
>  #include "ofpbuf.h"
> +#include "ovs-numa.h"
>  #include "packets.h"
>  #include "shash.h"
>  #include "simap.h"
> @@ -708,7 +711,7 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p)
>      struct dpif_flow_dump_thread *flow_dump_thread;
>      struct dpif_flow_dump *flow_dump;
>      struct dpif_flow f;
> -
> +    int poller_id = POLLER_ID_NULL;
>      int error;
>
>      if (argc > 1 && !strncmp(argv[argc - 1], "filter=", 7)) {
> @@ -771,6 +774,19 @@ dpctl_dump_flows(int argc, const char *argv[], struct dpctl_params *dpctl_p)
>              minimatch_destroy(&minimatch);
>          }
>          ds_clear(&ds);
> +        /* If 'poller_id' is specified, overlapping flows could be dumped from
> +         * different pmd threads.  So, separates dumps from different pmds
> +         * by printing a title line. */
> +        if (poller_id != f.poller_id) {
> +            if (f.poller_id == NON_PMD_CORE_ID) {
> +                ds_put_format(&ds, "flow-dump from non-dpdk interfaces:\n");
> +            } else {
> +                ds_put_format(&ds, "flow-dump from pmd on cpu core: %d\n",
> +                              f.poller_id);
> +            }
> +            poller_id = f.poller_id;
> +        }
> +

You have introduced new 'poller' term. It is right term here but we
already have pmd used in same context. I would like to use same rather
than introducing another name for same.

>          if (dpctl_p->verbosity) {
>              if (f.ufid_present) {
>                  odp_format_ufid(&f.ufid, &ds);
> @@ -806,12 +822,43 @@ out_freefilter:
>      return error;
>  }
>
> +/* Extracts the in_port from the parsed keys, and returns the reference
> + * to the 'struct netdev *' of the dpif port.  On error, returns NULL.
> + * Users must call 'netdev_close()' after finish using the returned
> + * reference. */
> +static struct netdev *
> +get_in_port_netdev_from_key(struct dpif *dpif, const struct ofpbuf *key)
> +{
> +    const struct nlattr *in_port_nla;
> +    struct netdev *dev = NULL;
> +
> +    in_port_nla = nl_attr_find(key, 0, OVS_KEY_ATTR_IN_PORT);
> +    if (in_port_nla) {
> +        struct dpif_port dpif_port;
> +        odp_port_t port_no;
> +        int error;
> +
> +        port_no = ODP_PORT_C(nl_attr_get_u32(in_port_nla));
> +        error = dpif_port_query_by_number(dpif, port_no, &dpif_port);
> +        if (error) {
> +            goto out;
> +        }
> +
> +        netdev_open(dpif_port.name, dpif_port.type, &dev);
> +        dpif_port_destroy(&dpif_port);
> +    }
> +
> +out:
> +    return dev;
> +}
> +
>  static int
>  dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags,
>                 struct dpctl_params *dpctl_p)
>  {
>      const char *key_s = argv[argc - 2];
>      const char *actions_s = argv[argc - 1];
> +    struct netdev *in_port_netdev = NULL;
>      struct dpif_flow_stats stats;
>      struct dpif_port dpif_port;
>      struct dpif_port_dump port_dump;
> @@ -834,7 +881,6 @@ dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags,
>          return error;
>      }
>
> -
>      simap_init(&port_names);
>      DPIF_PORT_FOR_EACH (&dpif_port, &port_dump, dpif) {
>          simap_put(&port_names, dpif_port.name, odp_to_u32(dpif_port.port_no));
> @@ -855,12 +901,49 @@ dpctl_put_flow(int argc, const char *argv[], enum dpif_flow_put_flags flags,
>          dpctl_error(dpctl_p, error, "parsing actions");
>          goto out_freeactions;
>      }
> -    error = dpif_flow_put(dpif, flags,
> -                          ofpbuf_data(&key), ofpbuf_size(&key),
> -                          ofpbuf_size(&mask) == 0 ? NULL : ofpbuf_data(&mask),
> -                          ofpbuf_size(&mask),
> -                          ofpbuf_data(&actions), ofpbuf_size(&actions),
> -                          NULL, dpctl_p->print_statistics ? &stats : NULL);
> +
> +    /* For DPDK interface, apply the operation to all pmd threads
> +     * on the same numa node. */
> +    in_port_netdev = get_in_port_netdev_from_key(dpif, &key);
> +    if (in_port_netdev && netdev_is_pmd(in_port_netdev)) {
> +        int numa_id;
> +
> +        numa_id = netdev_get_numa_id(in_port_netdev);
> +        if (ovs_numa_numa_id_is_valid(numa_id)) {
> +            unsigned long *bm;
> +            int n_cores = ovs_numa_get_n_cores();
> +            int idx;
> +
> +            bm = bitmap_allocate(n_cores);
> +            ovs_numa_get_pinned_cores_on_numa(numa_id, bm);
> +
> +            BITMAP_FOR_EACH_1 (idx, n_cores, bm) {

Rather than exporting ovs_numa_get_pinned_cores_on_numa() API, we can export
iterator API for_each_cpu_on_numa() that ierates cpu from given NUMA node.

> +                error = dpif_flow_put(dpif, flags,
> +                                      ofpbuf_data(&key), ofpbuf_size(&key),
> +                                      ofpbuf_size(&mask) == 0
> +                                          ? NULL : ofpbuf_data(&mask),
> +                                      ofpbuf_size(&mask),
> +                                      ofpbuf_data(&actions),
> +                                      ofpbuf_size(&actions),
> +                                      NULL, idx,
> +                                      dpctl_p->print_statistics
> +                                          ? &stats : NULL);
> +
> +            }
> +            bitmap_free(bm);
> +        }
> +    } else {
> +        error = dpif_flow_put(dpif, flags,
> +                              ofpbuf_data(&key), ofpbuf_size(&key),
> +                              ofpbuf_size(&mask) == 0
> +                                  ? NULL : ofpbuf_data(&mask),
> +                              ofpbuf_size(&mask),
> +                              ofpbuf_data(&actions),
> +                              ofpbuf_size(&actions),
> +                              NULL, POLLER_ID_NULL,
> +                              dpctl_p->print_statistics
> +                                  ? &stats : NULL);
> +    }
>      if (error) {
>          dpctl_error(dpctl_p, error, "updating flow table");
>          goto out_freeactions;
> @@ -881,6 +964,7 @@ out_freekeymask:
>      ofpbuf_uninit(&mask);
>      ofpbuf_uninit(&key);
>      dpif_close(dpif);
> +    netdev_close(in_port_netdev);
>      return error;
>  }
>
> @@ -910,6 +994,7 @@ static int
>  dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p)
>  {
>      const char *key_s = argv[argc - 1];
> +    struct netdev *in_port_netdev = NULL;
>      struct dpif_flow_stats stats;
>      struct dpif_port dpif_port;
>      struct dpif_port_dump port_dump;
> @@ -945,9 +1030,35 @@ dpctl_del_flow(int argc, const char *argv[], struct dpctl_params *dpctl_p)
>          goto out;
>      }
>
> -    error = dpif_flow_del(dpif,
> -                          ofpbuf_data(&key), ofpbuf_size(&key), NULL,
> -                          dpctl_p->print_statistics ? &stats : NULL);
> +    /* For DPDK interface, apply the operation to all pmd threads
> +     * on the same numa node. */
> +    in_port_netdev = get_in_port_netdev_from_key(dpif, &key);
> +    if (in_port_netdev && netdev_is_pmd(in_port_netdev)) {
> +        int numa_id;
> +
> +        numa_id = netdev_get_numa_id(in_port_netdev);
> +        if (ovs_numa_numa_id_is_valid(numa_id)) {
> +            unsigned long *bm;
> +            int n_cores = ovs_numa_get_n_cores();
> +            int idx;
> +
> +            bm = bitmap_allocate(n_cores);
> +            ovs_numa_get_pinned_cores_on_numa(numa_id, bm);
> +
> +            BITMAP_FOR_EACH_1(idx, n_cores, bm) {
> +                error = dpif_flow_del(dpif,
> +                                      ofpbuf_data(&key), ofpbuf_size(&key),
> +                                      NULL, idx, dpctl_p->print_statistics
> +                                          ? &stats : NULL);
> +            }
> +            bitmap_free(bm);
> +        }
> +    } else {
> +        error = dpif_flow_del(dpif,
> +                              ofpbuf_data(&key), ofpbuf_size(&key), NULL,
> +                              POLLER_ID_NULL,
> +                              dpctl_p->print_statistics ? &stats : NULL);
> +    }
>      if (error) {
>          dpctl_error(dpctl_p, error, "deleting flow");
>          goto out;
> @@ -967,6 +1078,7 @@ out:
>      ofpbuf_uninit(&key);
>      simap_destroy(&port_names);
>      dpif_close(dpif);
> +    netdev_close(in_port_netdev);
>      return error;
>  }
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 9b047be..599ef6b 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -177,7 +177,6 @@ static bool dpcls_lookup(const struct dpcls *cls,
>   *
>   *    dp_netdev_mutex (global)
>   *    port_mutex
> - *    flow_mutex
>   */
>  struct dp_netdev {
>      const struct dpif_class *const class;
> @@ -186,20 +185,6 @@ struct dp_netdev {
>      struct ovs_refcount ref_cnt;
>      atomic_flag destroyed;
>
> -    /* Flows.
> -     *
> -     * Writers of 'flow_table' must take the 'flow_mutex'.  Corresponding
> -     * changes to 'cls' must be made while still holding the 'flow_mutex'.
> -     */
> -    struct ovs_mutex flow_mutex;
> -    struct dpcls cls;
> -    struct cmap flow_table OVS_GUARDED; /* Flow table. */
> -
> -    /* Statistics.
> -     *
> -     * ovsthread_stats is internally synchronized. */
> -    struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
> -
>      /* Ports.
>       *
>       * Protected by RCU.  Take the mutex to add or remove ports. */
> @@ -241,15 +226,6 @@ enum dp_stat_type {
>      DP_N_STATS
>  };
>
> -/* Contained by struct dp_netdev's 'stats' member.  */
> -struct dp_netdev_stats {
> -    struct ovs_mutex mutex;          /* Protects 'n'. */
> -
> -    /* Indexed by DP_STAT_*, protected by 'mutex'. */
> -    unsigned long long int n[DP_N_STATS] OVS_GUARDED;
> -};
> -
> -
>  /* A port in a netdev-based datapath. */
>  struct dp_netdev_port {
>      struct cmap_node node;      /* Node in dp_netdev's 'ports'. */
> @@ -261,15 +237,22 @@ struct dp_netdev_port {
>      char *type;                 /* Port type as requested by user. */
>  };
>
> -
> -/* A flow in dp_netdev's 'flow_table'.
> +/* Contained by struct dp_netdev_flow's 'stats' member.  */
> +struct dp_netdev_flow_stats {
> +    long long int used;             /* Last used time, in monotonic msecs. */
> +    long long int packet_count;     /* Number of packets matched. */
> +    long long int byte_count;       /* Number of bytes matched. */
> +    uint16_t tcp_flags;             /* Bitwise-OR of seen tcp_flags values. */
> +};
> +
> +/* A flow in 'dp_netdev_pmd_thread's 'flow_table'.
>   *
>   *
>   * Thread-safety
>   * =============
>   *
>   * Except near the beginning or ending of its lifespan, rule 'rule' belongs to
> - * its dp_netdev's classifier.  The text below calls this classifier 'cls'.
> + * its pmd thread's classifier.  The text below calls this classifier 'cls'.
>   *
>   * Motivation
>   * ----------
> @@ -303,9 +286,12 @@ struct dp_netdev_flow {
>      bool dead;
>
>      /* Hash table index by unmasked flow. */
> -    const struct cmap_node node; /* In owning dp_netdev's 'flow_table'. */
> +    const struct cmap_node node; /* In owning dp_netdev_pmd_thread's */
> +                                 /* 'flow_table'. */
>      const ovs_u128 ufid;         /* Unique flow identifier. */
>      const struct flow flow;      /* Unmasked flow that created this entry. */
> +    const int pmd_id;            /* The 'core_id' of pmd thread owning this */
> +                                 /* flow. */
>
>      /* Number of references.
>       * The classifier owns one reference.
> @@ -313,10 +299,8 @@ struct dp_netdev_flow {
>       * reference. */
>      struct ovs_refcount ref_cnt;
>
> -    /* Statistics.
> -     *
> -     * Reading or writing these members requires 'mutex'. */
> -    struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
> +    /* Statistics. */
> +    struct dp_netdev_flow_stats stats;
>
>      /* Actions. */
>      OVSRCU_TYPE(struct dp_netdev_actions *) actions;
> @@ -331,16 +315,6 @@ static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
>  static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t,
>                                           struct flow *);
>
> -/* Contained by struct dp_netdev_flow's 'stats' member.  */
> -struct dp_netdev_flow_stats {
> -    struct ovs_mutex mutex;         /* Guards all the other members. */
> -
> -    long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
> -    long long int packet_count OVS_GUARDED; /* Number of packets matched. */
> -    long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
> -    uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
> -};
> -
>  /* A set of datapath actions within a "struct dp_netdev_flow".
>   *
>   *
> @@ -361,20 +335,31 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
>      const struct dp_netdev_flow *);
>  static void dp_netdev_actions_free(struct dp_netdev_actions *);
>
> +/* Contained by struct dp_netdev_pmd_thread's 'stats' member.  */
> +struct dp_netdev_pmd_stats {
> +    /* Indexed by DP_STAT_*. */
> +    unsigned long long int n[DP_N_STATS];
> +};
> +
>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>   * the performance overhead of interrupt processing.  Therefore netdev can
>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
>   * these device to check for recv buffer.  pmd-thread does polling for
> - * devices assigned to itself thread.
> + * devices assigned to itself.
>   *
>   * DPDK used PMD for accessing NIC.
>   *
>   * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
>   * I/O of all non-pmd threads.  There will be no actual thread created
>   * for the instance.
> - **/
> + *
> + * Each struct has its own flow table and classifier.  Packets received
> + * from managed ports are looked up in the corresponding pmd thread's
> + * flow table, and are executed with the found actions.
> + * */
>  struct dp_netdev_pmd_thread {
>      struct dp_netdev *dp;
> +    struct ovs_refcount ref_cnt;    /* Every reference must be refcount'ed. */
>      struct cmap_node node;          /* In 'dp->poll_threads'. */
>
>      pthread_cond_t cond;            /* For synchronizing pmd thread reload. */
> @@ -385,6 +370,19 @@ struct dp_netdev_pmd_thread {
>       * need to be protected (e.g. by 'dp_netdev_mutex').  All other
>       * instances will only be accessed by its own pmd thread. */
>      struct emc_cache flow_cache;
> +
> +    /* Classifier and Flow-Table.
> +     *
> +     * Writers of 'flow_table' must take the 'flow_mutex'.  Corresponding
> +     * changes to 'cls' must be made while still holding the 'flow_mutex'.
> +     */
> +    struct ovs_mutex flow_mutex;
> +    struct dpcls cls;
> +    struct cmap flow_table OVS_GUARDED; /* Flow table. */
> +
> +    /* Statistics. */
> +    struct dp_netdev_pmd_stats stats;
> +
>      struct latch exit_latch;        /* For terminating the pmd thread. */
>      atomic_uint change_seq;         /* For reloading pmd ports. */
>      pthread_t thread;
> @@ -409,7 +407,6 @@ static int get_port_by_name(struct dp_netdev *dp, const char *devname,
>                              struct dp_netdev_port **portp);
>  static void dp_netdev_free(struct dp_netdev *)
>      OVS_REQUIRES(dp_netdev_mutex);
> -static void dp_netdev_flow_flush(struct dp_netdev *);
>  static int do_add_port(struct dp_netdev *dp, const char *devname,
>                         const char *type, odp_port_t port_no)
>      OVS_REQUIRES(dp->port_mutex);
> @@ -433,10 +430,15 @@ static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>  static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
>  static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
>                                                        int core_id);
> +static struct dp_netdev_pmd_thread *
> +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
>  static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
>  static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
>  static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
>  static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
> +static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
> +static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
>
>  static inline bool emc_entry_alive(struct emc_entry *ce);
>  static void emc_clear_entry(struct emc_entry *ce);
> @@ -604,12 +606,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>      ovs_refcount_init(&dp->ref_cnt);
>      atomic_flag_clear(&dp->destroyed);
>
> -    ovs_mutex_init(&dp->flow_mutex);
> -    dpcls_init(&dp->cls);
> -    cmap_init(&dp->flow_table);
> -
> -    ovsthread_stats_init(&dp->stats);
> -
>      ovs_mutex_init(&dp->port_mutex);
>      cmap_init(&dp->ports);
>      dp->port_seq = seq_create();
> @@ -686,8 +682,6 @@ dp_netdev_free(struct dp_netdev *dp)
>      OVS_REQUIRES(dp_netdev_mutex)
>  {
>      struct dp_netdev_port *port;
> -    struct dp_netdev_stats *bucket;
> -    int i;
>
>      shash_find_and_delete(&dp_netdevs, dp->name);
>
> @@ -696,22 +690,12 @@ dp_netdev_free(struct dp_netdev *dp)
>      ovs_mutex_destroy(&dp->non_pmd_mutex);
>      ovsthread_key_delete(dp->per_pmd_key);
>
> -    dp_netdev_flow_flush(dp);
>      ovs_mutex_lock(&dp->port_mutex);
>      CMAP_FOR_EACH (port, node, &dp->ports) {
>          do_del_port(dp, port);
>      }
>      ovs_mutex_unlock(&dp->port_mutex);
>
> -    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> -        ovs_mutex_destroy(&bucket->mutex);
> -        free_cacheline(bucket);
> -    }
> -    ovsthread_stats_destroy(&dp->stats);
> -
> -    dpcls_destroy(&dp->cls);
> -    cmap_destroy(&dp->flow_table);
> -    ovs_mutex_destroy(&dp->flow_mutex);
>      seq_destroy(dp->port_seq);
>      cmap_destroy(&dp->ports);
>
> @@ -765,18 +749,14 @@ static int
>  dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
> -    struct dp_netdev_stats *bucket;
> -    size_t i;
> -
> -    stats->n_flows = cmap_count(&dp->flow_table);
> +    struct dp_netdev_pmd_thread *pmd;
>
> -    stats->n_hit = stats->n_missed = stats->n_lost = 0;
> -    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
> -        ovs_mutex_lock(&bucket->mutex);
> -        stats->n_hit += bucket->n[DP_STAT_HIT];
> -        stats->n_missed += bucket->n[DP_STAT_MISS];
> -        stats->n_lost += bucket->n[DP_STAT_LOST];
> -        ovs_mutex_unlock(&bucket->mutex);
> +    stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        stats->n_flows += cmap_count(&pmd->flow_table);
> +        stats->n_hit += pmd->stats.n[DP_STAT_HIT];
> +        stats->n_missed += pmd->stats.n[DP_STAT_MISS];
> +        stats->n_lost += pmd->stats.n[DP_STAT_LOST];
>      }
>      stats->n_masks = UINT32_MAX;
>      stats->n_mask_hit = UINT64_MAX;
> @@ -1140,15 +1120,6 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
>  static void
>  dp_netdev_flow_free(struct dp_netdev_flow *flow)
>  {
> -    struct dp_netdev_flow_stats *bucket;
> -    size_t i;
> -
> -    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
> -        ovs_mutex_destroy(&bucket->mutex);
> -        free_cacheline(bucket);
> -    }
> -    ovsthread_stats_destroy(&flow->stats);
> -
>      dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
>      free(flow);
>  }
> @@ -1167,36 +1138,41 @@ dp_netdev_flow_hash(const ovs_u128 *ufid)
>  }
>
>  static void
> -dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
> -    OVS_REQUIRES(dp->flow_mutex)
> +dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
> +                          struct dp_netdev_flow *flow)
> +    OVS_REQUIRES(pmd->flow_mutex)
>  {
>      struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
>
> -    dpcls_remove(&dp->cls, &flow->cr);
> -    cmap_remove(&dp->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
> +    dpcls_remove(&pmd->cls, &flow->cr);
> +    cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
>      flow->dead = true;
>
>      dp_netdev_flow_unref(flow);
>  }
>
>  static void
> -dp_netdev_flow_flush(struct dp_netdev *dp)
> +dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd)
>  {
>      struct dp_netdev_flow *netdev_flow;
>
> -    ovs_mutex_lock(&dp->flow_mutex);
> -    CMAP_FOR_EACH (netdev_flow, node, &dp->flow_table) {
> -        dp_netdev_remove_flow(dp, netdev_flow);
> +    ovs_mutex_lock(&pmd->flow_mutex);
> +    CMAP_FOR_EACH (netdev_flow, node, &pmd->flow_table) {
> +        dp_netdev_pmd_remove_flow(pmd, netdev_flow);
>      }
> -    ovs_mutex_unlock(&dp->flow_mutex);
> +    ovs_mutex_unlock(&pmd->flow_mutex);
>  }
>
>  static int
>  dpif_netdev_flow_flush(struct dpif *dpif)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
> +    struct dp_netdev_pmd_thread *pmd;
> +
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        dp_netdev_pmd_flow_flush(pmd);
> +    }
>
> -    dp_netdev_flow_flush(dp);
>      return 0;
>  }
>
> @@ -1528,21 +1504,22 @@ emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
>  }
>
>  static struct dp_netdev_flow *
> -dp_netdev_lookup_flow(const struct dp_netdev *dp,
> -                      const struct netdev_flow_key *key)
> +dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd,
> +                          const struct netdev_flow_key *key)
>  {
>      struct dp_netdev_flow *netdev_flow;
>      struct dpcls_rule *rule;
>
> -    dpcls_lookup(&dp->cls, key, &rule, 1);
> +    dpcls_lookup(&pmd->cls, key, &rule, 1);
>      netdev_flow = dp_netdev_flow_cast(rule);
>
>      return netdev_flow;
>  }
>
>  static struct dp_netdev_flow *
> -dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp,
> -                    const struct nlattr *key, size_t key_len)
> +dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
> +                        const ovs_u128 *ufidp, const struct nlattr *key,
> +                        size_t key_len)
>  {
>      struct dp_netdev_flow *netdev_flow;
>      struct flow flow;
> @@ -1551,13 +1528,13 @@ dp_netdev_find_flow(const struct dp_netdev *dp, const ovs_u128 *ufidp,
>      /* If a UFID is not provided, determine one based on the key. */
>      if (!ufidp && key && key_len
>          && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) {
> -        dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
> +        dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
>          ufidp = &ufid;
>      }
>
>      if (ufidp) {
>          CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp),
> -                                 &dp->flow_table) {
> +                                 &pmd->flow_table) {
>              if (ovs_u128_equal(&netdev_flow->ufid, ufidp)) {
>                  return netdev_flow;
>              }
> @@ -1571,18 +1548,10 @@ static void
>  get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow,
>                      struct dpif_flow_stats *stats)
>  {
> -    struct dp_netdev_flow_stats *bucket;
> -    size_t i;
> -
> -    memset(stats, 0, sizeof *stats);
> -    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
> -        ovs_mutex_lock(&bucket->mutex);
> -        stats->n_packets += bucket->packet_count;
> -        stats->n_bytes += bucket->byte_count;
> -        stats->used = MAX(stats->used, bucket->used);
> -        stats->tcp_flags |= bucket->tcp_flags;
> -        ovs_mutex_unlock(&bucket->mutex);
> -    }
> +    stats->n_packets = netdev_flow->stats.packet_count;
> +    stats->n_bytes = netdev_flow->stats.byte_count;
> +    stats->used = netdev_flow->stats.used;
> +    stats->tcp_flags = netdev_flow->stats.tcp_flags;
>  }
>
>  static bool
> @@ -1632,6 +1601,7 @@ dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
>
>      flow->ufid = netdev_flow->ufid;
>      flow->ufid_present = true;
> +    flow->poller_id = netdev_flow->pmd_id;
>      get_dpif_flow_stats(netdev_flow, &flow->stats);
>  }
>
> @@ -1732,24 +1702,46 @@ dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_flow *netdev_flow;
> +    struct dp_netdev_pmd_thread *pmd;
> +    int poller_id = get->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID
> +                                                     : get->poller_id;
>      int error = 0;
>
> -    netdev_flow = dp_netdev_find_flow(dp, get->ufid, get->key, get->key_len);
> +    /* Each call must provide valid poller_id. */
> +    if (!ovs_numa_core_id_is_valid(poller_id)) {
> +        return EINVAL;
> +    }
> +
do we really need to check ovs_numa_core_id_is_valid()?
dp_netdev_get_pmd() will fail if it is invalid core anyways.

> +    pmd = dp_netdev_get_pmd(dp, poller_id);
> +    if (!pmd) {
> +        return EINVAL;
> +    }
> +
> +    netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
> +                                          get->key_len);
>      if (netdev_flow) {
>          dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
>                                      get->flow, false);
>      } else {
>          error = ENOENT;
>      }
> +    dp_netdev_pmd_unref(pmd);
> +
>
>      return error;
>  }
>
> +static void
> +clear_stats(struct dp_netdev_flow *netdev_flow)
> +{
> +    memset(&netdev_flow->stats, 0, sizeof netdev_flow->stats);
> +}
> +
I guess this can be inlined.

>  static struct dp_netdev_flow *
> -dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
> -                   const ovs_u128 *ufid,
> +dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
> +                   struct match *match, const ovs_u128 *ufid,
>                     const struct nlattr *actions, size_t actions_len)
> -    OVS_REQUIRES(dp->flow_mutex)
> +    OVS_REQUIRES(pmd->flow_mutex)
>  {
>      struct dp_netdev_flow *flow;
>      struct netdev_flow_key mask;
> @@ -1760,18 +1752,19 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
>
>      /* Do not allocate extra space. */
>      flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
> +    clear_stats(flow);
>      flow->dead = false;
> +    *CONST_CAST(int *, &flow->pmd_id) = pmd->core_id;
>      *CONST_CAST(struct flow *, &flow->flow) = match->flow;
>      *CONST_CAST(ovs_u128 *, &flow->ufid) = *ufid;
>      ovs_refcount_init(&flow->ref_cnt);
> -    ovsthread_stats_init(&flow->stats);
>      ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len));
>
> -    cmap_insert(&dp->flow_table,
> +    cmap_insert(&pmd->flow_table,
>                  CONST_CAST(struct cmap_node *, &flow->node),
>                  dp_netdev_flow_hash(&flow->ufid));
>      netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask);
> -    dpcls_insert(&dp->cls, &flow->cr, &mask);
> +    dpcls_insert(&pmd->cls, &flow->cr, &mask);
>
>      if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
>          struct match match;
> @@ -1795,30 +1788,17 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
>      return flow;
>  }
>
> -static void
> -clear_stats(struct dp_netdev_flow *netdev_flow)
> -{
> -    struct dp_netdev_flow_stats *bucket;
> -    size_t i;
> -
> -    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
> -        ovs_mutex_lock(&bucket->mutex);
> -        bucket->used = 0;
> -        bucket->packet_count = 0;
> -        bucket->byte_count = 0;
> -        bucket->tcp_flags = 0;
> -        ovs_mutex_unlock(&bucket->mutex);
> -    }
> -}
> -
>  static int
>  dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_flow *netdev_flow;
>      struct netdev_flow_key key;
> +    struct dp_netdev_pmd_thread *pmd;
>      struct match match;
>      ovs_u128 ufid;
> +    int poller_id = put->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID
> +                                                     : put->poller_id;
>      int error;
>
>      error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow);
> @@ -1832,6 +1812,16 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
>          return error;
>      }
>
> +    /* Each call must provide valid poller_id. */
> +    if (!ovs_numa_core_id_is_valid(poller_id)) {
> +        return EINVAL;
> +    }
> +
> +    pmd = dp_netdev_get_pmd(dp, poller_id);
> +    if (!pmd) {
> +        return EINVAL;
> +    }
> +
>      /* Must produce a netdev_flow_key for lookup.
>       * This interface is no longer performance critical, since it is not used
>       * for upcall processing any more. */
> @@ -1843,15 +1833,15 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
>          dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
>      }
>
> -    ovs_mutex_lock(&dp->flow_mutex);
> -    netdev_flow = dp_netdev_lookup_flow(dp, &key);
> +    ovs_mutex_lock(&pmd->flow_mutex);
> +    netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key);
>      if (!netdev_flow) {
>          if (put->flags & DPIF_FP_CREATE) {
> -            if (cmap_count(&dp->flow_table) < MAX_FLOWS) {
> +            if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
>                  if (put->stats) {
>                      memset(put->stats, 0, sizeof *put->stats);
>                  }
> -                dp_netdev_flow_add(dp, &match, &ufid, put->actions,
> +                dp_netdev_flow_add(pmd, &match, &ufid, put->actions,
>                                     put->actions_len);
>                  error = 0;
>              } else {
> @@ -1887,7 +1877,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
>              error = EINVAL;
>          }
>      }
> -    ovs_mutex_unlock(&dp->flow_mutex);
> +    ovs_mutex_unlock(&pmd->flow_mutex);
> +    dp_netdev_pmd_unref(pmd);
>
>      return error;
>  }
> @@ -1897,26 +1888,43 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_flow *netdev_flow;
> +    struct dp_netdev_pmd_thread *pmd;
> +    int poller_id = del->poller_id == POLLER_ID_NULL ? NON_PMD_CORE_ID
> +                                                     : del->poller_id;
>      int error = 0;
>
> -    ovs_mutex_lock(&dp->flow_mutex);
> -    netdev_flow = dp_netdev_find_flow(dp, del->ufid, del->key, del->key_len);
> +    /* Each call must provide valid poller_id. */
> +    if (!ovs_numa_core_id_is_valid(poller_id)) {
> +        return EINVAL;
> +    }
> +
> +    pmd = dp_netdev_get_pmd(dp, poller_id);
> +    if (!pmd) {
> +        return EINVAL;
> +    }
> +
> +    ovs_mutex_lock(&pmd->flow_mutex);
> +    netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key,
> +                                          del->key_len);
>      if (netdev_flow) {
>          if (del->stats) {
>              get_dpif_flow_stats(netdev_flow, del->stats);
>          }
> -        dp_netdev_remove_flow(dp, netdev_flow);
> +        dp_netdev_pmd_remove_flow(pmd, netdev_flow);
>      } else {
>          error = ENOENT;
>      }
> -    ovs_mutex_unlock(&dp->flow_mutex);
> +    ovs_mutex_unlock(&pmd->flow_mutex);
> +    dp_netdev_pmd_unref(pmd);
>
>      return error;
>  }
>
>  struct dpif_netdev_flow_dump {
>      struct dpif_flow_dump up;
> -    struct cmap_position pos;
> +    struct cmap_position poll_thread_pos;
> +    struct cmap_position flow_pos;
> +    struct dp_netdev_pmd_thread *cur_pmd;
>      int status;
>      struct ovs_mutex mutex;
>  };
> @@ -1932,10 +1940,8 @@ dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
>  {
>      struct dpif_netdev_flow_dump *dump;
>
> -    dump = xmalloc(sizeof *dump);
> +    dump = xzalloc(sizeof *dump);
>      dpif_flow_dump_init(&dump->up, dpif_);
> -    memset(&dump->pos, 0, sizeof dump->pos);
> -    dump->status = 0;
>      dump->up.terse = terse;
>      ovs_mutex_init(&dump->mutex);
>
> @@ -1993,26 +1999,58 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
>      struct dpif_netdev_flow_dump_thread *thread
>          = dpif_netdev_flow_dump_thread_cast(thread_);
>      struct dpif_netdev_flow_dump *dump = thread->dump;
> -    struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
>      struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
> -    struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
>      int n_flows = 0;
>      int i;
>
>      ovs_mutex_lock(&dump->mutex);
>      if (!dump->status) {
> -        for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH);
> -             n_flows++) {
> -            struct cmap_node *node;
> +        struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
> +        struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
> +        struct dp_netdev_pmd_thread *pmd = dump->cur_pmd;
> +        int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
> +
> +        /* First call to dump_next(), extracts the first pmd thread.
> +         * If there is no pmd thread, returns immediately. */
> +        if (!pmd) {
> +            pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
> +            if (!pmd) {
> +                ovs_mutex_unlock(&dump->mutex);
> +                return n_flows;
>
> -            node = cmap_next_position(&dp->flow_table, &dump->pos);
> -            if (!node) {
> -                dump->status = EOF;
> -                break;
>              }
> -            netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow,
> -                                                 node);
>          }
> +
> +        do {
> +            for (n_flows = 0; n_flows < flow_limit; n_flows++) {
> +                struct cmap_node *node;
> +
> +                node = cmap_next_position(&pmd->flow_table, &dump->flow_pos);
> +                if (!node) {
> +                    break;
> +                }
> +                netdev_flows[n_flows] = CONTAINER_OF(node,
> +                                                     struct dp_netdev_flow,
> +                                                     node);
> +            }
> +            /* When finishing dumping the current pmd thread, moves to
> +             * the next. */
> +            if (n_flows < flow_limit) {
> +                memset(&dump->flow_pos, 0, sizeof dump->flow_pos);
> +                dp_netdev_pmd_unref(pmd);
> +                pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
> +                if (!pmd) {
> +                    dump->status = EOF;
> +                    break;
> +                }
> +            }
> +            /* Keeps the reference to next caller. */
> +            dump->cur_pmd = pmd;
> +
> +            /* If the current dump is empty, do not exit the loop, since the
> +             * remaining pmds could have flows to be dumped.  Just dumps again
> +             * on the new 'pmd'. */
> +        } while (!n_flows);
>      }
>      ovs_mutex_unlock(&dump->mutex);
>
> @@ -2063,9 +2101,11 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
>          ovs_mutex_lock(&dp->non_pmd_mutex);
>          ovs_mutex_lock(&dp->port_mutex);
>      }
> +
>      dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions,
>                                execute->actions_len);
>      if (pmd->core_id == NON_PMD_CORE_ID) {
> +        dp_netdev_pmd_unref(pmd);
>          ovs_mutex_unlock(&dp->port_mutex);
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
>      }
> @@ -2263,6 +2303,8 @@ dpif_netdev_run(struct dpif *dpif)
>          }
>      }
>      ovs_mutex_unlock(&dp->non_pmd_mutex);
> +    dp_netdev_pmd_unref(non_pmd);
> +
>      tnl_arp_cache_run();
>      new_tnl_seq = seq_read(tnl_conf_seq);
>
> @@ -2446,7 +2488,10 @@ dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
>      ovs_mutex_unlock(&pmd->cond_mutex);
>  }
>
> -/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */
> +/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'.  Returns
> + * the pointer if succeeds, otherwise, NULL.
> + *
> + * Caller must unrefs the returned reference.  */
>  static struct dp_netdev_pmd_thread *
>  dp_netdev_get_pmd(struct dp_netdev *dp, int core_id)
>  {
> @@ -2454,10 +2499,12 @@ dp_netdev_get_pmd(struct dp_netdev *dp, int core_id)
>      const struct cmap_node *pnode;
>
>      pnode = cmap_find(&dp->poll_threads, hash_int(core_id, 0));
> -    ovs_assert(pnode);
> +    if (!pnode) {
> +        return NULL;
> +    }
>      pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
>
> -    return pmd;
> +    return dp_netdev_pmd_try_ref(pmd) ? pmd : NULL;
>  }
Since pnode is already checked for NULL, is it even possible to return
false there?

>
>  /* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
> @@ -2471,6 +2518,53 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
>                              OVS_NUMA_UNSPEC);
>  }
>
> +static bool
> +dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd)
> +{
> +    if (pmd) {
> +        return ovs_refcount_try_ref_rcu(&pmd->ref_cnt);
> +    }
> +
We can just drop this check since all callers make sure that they have
valid pmd pointer

> +    return false;
> +}
> +
> +static void
> +dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd)
> +{
> +    if (pmd && ovs_refcount_unref(&pmd->ref_cnt) == 1) {
> +        /* Since every reference is refcount'ed, reaching here
> +         * means we are safe to destroy everything. */
> +        dp_netdev_pmd_flow_flush(pmd);
> +        dpcls_destroy(&pmd->cls);
> +        cmap_destroy(&pmd->flow_table);
> +        ovs_mutex_destroy(&pmd->flow_mutex);
> +        latch_destroy(&pmd->exit_latch);
> +        xpthread_cond_destroy(&pmd->cond);
> +        ovs_mutex_destroy(&pmd->cond_mutex);
> +        free(pmd);
> +    }
> +}
> +
> +/* Given cmap position 'pos', tries to ref the next node.  If try_ref()
> + * fails, keeps checking for next node until reaching the end of cmap.
> + *
> + * Caller must unrefs the returned reference. */
> +static struct dp_netdev_pmd_thread *
> +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
> +{
> +    struct dp_netdev_pmd_thread *next;
> +
> +    do {
> +        struct cmap_node *node;
> +
> +        node = cmap_next_position(&dp->poll_threads, pos);
> +        next = node ? CONTAINER_OF(node, struct dp_netdev_pmd_thread, node)
> +            : NULL;
> +    } while (next && !dp_netdev_pmd_try_ref(next));
> +
> +    return next;
> +}
> +
>  /* Configures the 'pmd' based on the input argument. */
>  static void
>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> @@ -2480,10 +2574,15 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd->index = index;
>      pmd->core_id = core_id;
>      pmd->numa_id = numa_id;
> +
> +    ovs_refcount_init(&pmd->ref_cnt);
>      latch_init(&pmd->exit_latch);
>      atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
>      xpthread_cond_init(&pmd->cond, NULL);
>      ovs_mutex_init(&pmd->cond_mutex);
> +    ovs_mutex_init(&pmd->flow_mutex);
> +    dpcls_init(&pmd->cls);
> +    cmap_init(&pmd->flow_table);
>      /* init the 'flow_cache' since there is no
>       * actual thread created for NON_PMD_CORE_ID. */
>      if (core_id == NON_PMD_CORE_ID) {
> @@ -2493,13 +2592,13 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>                  hash_int(core_id, 0));
>  }
>
> -/* Stops the pmd thread, removes it from the 'dp->poll_threads'
> - * and destroys the struct. */
> +/* Stops the pmd thread, removes it from the 'dp->poll_threads',
> + * and unrefs the struct. */
>  static void
>  dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
>  {
>      /* Uninit the 'flow_cache' since there is
> -     * no actual thread uninit it. */
> +     * no actual thread uninit it for NON_PMD_CORE_ID. */
>      if (pmd->core_id == NON_PMD_CORE_ID) {
>          emc_cache_uninit(&pmd->flow_cache);
>      } else {
> @@ -2509,10 +2608,7 @@ dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
>          xpthread_join(pmd->thread, NULL);
>      }
>      cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
> -    latch_destroy(&pmd->exit_latch);
> -    xpthread_cond_destroy(&pmd->cond);
> -    ovs_mutex_destroy(&pmd->cond_mutex);
> -    free(pmd);
> +    dp_netdev_pmd_unref(pmd);
>  }
>
>  /* Destroys all pmd threads. */
> @@ -2584,14 +2680,6 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>  }
>
>
> -static void *
> -dp_netdev_flow_stats_new_cb(void)
> -{
> -    struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
> -    ovs_mutex_init(&bucket->mutex);
> -    return bucket;
> -}
> -
>  /* Called after pmd threads config change.  Restarts pmd threads with
>   * new configuration. */
>  static void
> @@ -2615,53 +2703,35 @@ dpif_netdev_get_datapath_version(void)
>  }
>
>  static void
> -dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
> -                    int cnt, int size,
> +dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size,
>                      uint16_t tcp_flags)
>  {
>      long long int now = time_msec();
> -    struct dp_netdev_flow_stats *bucket;
> -
> -    bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
> -                                        dp_netdev_flow_stats_new_cb);
> -
> -    ovs_mutex_lock(&bucket->mutex);
> -    bucket->used = MAX(now, bucket->used);
> -    bucket->packet_count += cnt;
> -    bucket->byte_count += size;
> -    bucket->tcp_flags |= tcp_flags;
> -    ovs_mutex_unlock(&bucket->mutex);
> -}
>
> -static void *
> -dp_netdev_stats_new_cb(void)
> -{
> -    struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
> -    ovs_mutex_init(&bucket->mutex);
> -    return bucket;
> +    netdev_flow->stats.used = MAX(now, netdev_flow->stats.used);
> +    netdev_flow->stats.packet_count += cnt;
> +    netdev_flow->stats.byte_count += size;
> +    netdev_flow->stats.tcp_flags |= tcp_flags;
>  }
>
>  static void
> -dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
> +dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd,
> +                       enum dp_stat_type type, int cnt)
>  {
> -    struct dp_netdev_stats *bucket;
> -
> -    bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
> -    ovs_mutex_lock(&bucket->mutex);
> -    bucket->n[type] += cnt;
> -    ovs_mutex_unlock(&bucket->mutex);
> +    pmd->stats.n[type] += cnt;
>  }
>
>  static int
> -dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
> +dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dpif_packet *packet_,
>                   struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid,
>                   enum dpif_upcall_type type, const struct nlattr *userdata,
>                   struct ofpbuf *actions, struct ofpbuf *put_actions)
>  {
> +    struct dp_netdev *dp = pmd->dp;
>      struct ofpbuf *packet = &packet_->ofpbuf;
>
>      if (type == DPIF_UC_MISS) {
> -        dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
> +        dp_netdev_count_packet(pmd, DP_STAT_MISS, 1);
>      }
>
>      if (OVS_UNLIKELY(!dp->upcall_cb)) {
> @@ -2690,8 +2760,8 @@ dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
>          ds_destroy(&ds);
>      }
>
> -    return dp->upcall_cb(packet, flow, ufid, type, userdata, actions, wc,
> -                         put_actions, dp->upcall_aux);
> +    return dp->upcall_cb(packet, flow, ufid, pmd->core_id, type, userdata,
> +                         actions, wc, put_actions, dp->upcall_aux);
>  }
>
>  static inline uint32_t
> @@ -2752,7 +2822,7 @@ packet_batch_execute(struct packet_batch *batch,
>      dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
>                                actions->actions, actions->size);
>
> -    dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
> +    dp_netdev_count_packet(pmd, DP_STAT_HIT, batch->packet_count);
>  }
>
>  static inline bool
> @@ -2871,7 +2941,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
>          /* Key length is needed in all the cases, hash computed on demand. */
>          keys[i].len = netdev_flow_key_size(count_1bits(keys[i].mf.map));
>      }
> -    any_miss = !dpcls_lookup(&dp->cls, keys, rules, cnt);
> +    any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt);
>      if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
>          uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
>          struct ofpbuf actions, put_actions;
> @@ -2893,7 +2963,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
>              /* It's possible that an earlier slow path execution installed
>               * a rule covering this flow.  In this case, it's a lot cheaper
>               * to catch it here than execute a miss. */
> -            netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
> +            netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
>              if (netdev_flow) {
>                  rules[i] = &netdev_flow->cr;
>                  continue;
> @@ -2905,7 +2975,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
>              ofpbuf_clear(&put_actions);
>
>              dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid);
> -            error = dp_netdev_upcall(dp, packets[i], &match.flow, &match.wc,
> +            error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc,
>                                       &ufid, DPIF_UC_MISS, NULL, &actions,
>                                       &put_actions);
>              if (OVS_UNLIKELY(error && error != ENOSPC)) {
> @@ -2930,14 +3000,14 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
>                   * mutex lock outside the loop, but that's an awful long time
>                   * to be locking everyone out of making flow installs.  If we
>                   * move to a per-core classifier, it would be reasonable. */
> -                ovs_mutex_lock(&dp->flow_mutex);
> -                netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
> +                ovs_mutex_lock(&pmd->flow_mutex);
> +                netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
>                  if (OVS_LIKELY(!netdev_flow)) {
> -                    netdev_flow = dp_netdev_flow_add(dp, &match, &ufid,
> +                    netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid,
>                                                       ofpbuf_data(add_actions),
>                                                       ofpbuf_size(add_actions));
>                  }
> -                ovs_mutex_unlock(&dp->flow_mutex);
> +                ovs_mutex_unlock(&pmd->flow_mutex);
>
>                  emc_insert(flow_cache, &keys[i], netdev_flow);
>              }
> @@ -2956,7 +3026,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
>              }
>          }
>
> -        dp_netdev_count_packet(dp, DP_STAT_LOST, dropped_cnt);
> +        dp_netdev_count_packet(pmd, DP_STAT_LOST, dropped_cnt);
>      }
>
>      n_batches = 0;
> @@ -3147,7 +3217,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
>
>                  flow_extract(&packets[i]->ofpbuf, &packets[i]->md, &flow);
>                  dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
> -                error = dp_netdev_upcall(dp, packets[i], &flow, NULL, &ufid,
> +                error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid,
>                                           DPIF_UC_ACTION, userdata,&actions,
>                                           NULL);
>                  if (!error || error == ENOSPC) {
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 3545290..15df30d 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -1452,6 +1452,7 @@ dpif_netlink_flow_to_dpif_flow(struct dpif *dpif, struct dpif_flow *dpif_flow,
>      dpif_flow->ufid_present = datapath_flow->ufid_present;
>      if (datapath_flow->ufid_present) {
>          dpif_flow->ufid = datapath_flow->ufid;
> +    dpif_flow->poller_id = POLLER_ID_NULL;
>      } else {
>          ovs_assert(datapath_flow->key && datapath_flow->key_len);
>          dpif_flow_hash(dpif, datapath_flow->key, datapath_flow->key_len,
> @@ -1763,7 +1764,7 @@ dpif_netlink_check_ufid__(struct dpif *dpif_)
>      dpif_flow_hash(dpif_, ofpbuf_data(&key), ofpbuf_size(&key), &ufid);
>      error = dpif_flow_put(dpif_, DPIF_FP_CREATE | DPIF_FP_PROBE,
>                            ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL,
> -                          0, &ufid, NULL);
> +                          0, &ufid, POLLER_ID_NULL, NULL);
>
>      if (error && error != EEXIST) {
>          VLOG_WARN("%s: UFID feature probe failed (%s).",
> diff --git a/lib/dpif.c b/lib/dpif.c
> index 50a7cc1..b729573 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -878,7 +878,7 @@ dpif_get_enable_ufid(struct dpif *dpif)
>  int
>  dpif_flow_get(struct dpif *dpif,
>                const struct nlattr *key, size_t key_len, const ovs_u128 *ufid,
> -              struct ofpbuf *buf, struct dpif_flow *flow)
> +              const int poller_id, struct ofpbuf *buf, struct dpif_flow *flow)
>  {
>      struct dpif_op *opp;
>      struct dpif_op op;
> @@ -887,6 +887,7 @@ dpif_flow_get(struct dpif *dpif,
>      op.u.flow_get.key = key;
>      op.u.flow_get.key_len = key_len;
>      op.u.flow_get.ufid = ufid;
> +    op.u.flow_get.poller_id = poller_id;
>      op.u.flow_get.buffer = buf;
>
>      memset(flow, 0, sizeof *flow);
> @@ -907,7 +908,8 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
>                const struct nlattr *key, size_t key_len,
>                const struct nlattr *mask, size_t mask_len,
>                const struct nlattr *actions, size_t actions_len,
> -              const ovs_u128 *ufid, struct dpif_flow_stats *stats)
> +              const ovs_u128 *ufid, const int poller_id,
> +              struct dpif_flow_stats *stats)
>  {
>      struct dpif_op *opp;
>      struct dpif_op op;
> @@ -921,6 +923,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
>      op.u.flow_put.actions = actions;
>      op.u.flow_put.actions_len = actions_len;
>      op.u.flow_put.ufid = ufid;
> +    op.u.flow_put.poller_id = poller_id;
>      op.u.flow_put.stats = stats;
>
>      opp = &op;
> @@ -933,7 +936,7 @@ dpif_flow_put(struct dpif *dpif, enum dpif_flow_put_flags flags,
>  int
>  dpif_flow_del(struct dpif *dpif,
>                const struct nlattr *key, size_t key_len, const ovs_u128 *ufid,
> -              struct dpif_flow_stats *stats)
> +              const int poller_id, struct dpif_flow_stats *stats)
>  {
>      struct dpif_op *opp;
>      struct dpif_op op;
> @@ -942,6 +945,7 @@ dpif_flow_del(struct dpif *dpif,
>      op.u.flow_del.key = key;
>      op.u.flow_del.key_len = key_len;
>      op.u.flow_del.ufid = ufid;
> +    op.u.flow_del.poller_id = poller_id;
>      op.u.flow_del.stats = stats;
>
>      opp = &op;
> diff --git a/lib/dpif.h b/lib/dpif.h
> index 527cedc..c069c1c 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -391,6 +391,7 @@
>  #include "netdev.h"
>  #include "ofpbuf.h"
>  #include "openflow/openflow.h"
> +#include "ovs-numa.h"
>  #include "packets.h"
>  #include "util.h"
>
> @@ -523,14 +524,15 @@ int dpif_flow_put(struct dpif *, enum dpif_flow_put_flags,
>                    const struct nlattr *key, size_t key_len,
>                    const struct nlattr *mask, size_t mask_len,
>                    const struct nlattr *actions, size_t actions_len,
> -                  const ovs_u128 *ufid, struct dpif_flow_stats *);
> -
> +                  const ovs_u128 *ufid, const int poller_id,
> +                  struct dpif_flow_stats *);
>  int dpif_flow_del(struct dpif *,
>                    const struct nlattr *key, size_t key_len,
> -                  const ovs_u128 *ufid, struct dpif_flow_stats *);
> +                  const ovs_u128 *ufid, const int poller_id,
> +                  struct dpif_flow_stats *);
>  int dpif_flow_get(struct dpif *,
>                    const struct nlattr *key, size_t key_len,
> -                  const ovs_u128 *ufid,
> +                  const ovs_u128 *ufid, const int poller_id,
>                    struct ofpbuf *, struct dpif_flow *);
>
>  /* Flow dumping interface
> @@ -568,6 +570,8 @@ struct dpif_flow_dump_thread *dpif_flow_dump_thread_create(
>      struct dpif_flow_dump *);
>  void dpif_flow_dump_thread_destroy(struct dpif_flow_dump_thread *);
>
> +#define POLLER_ID_NULL OVS_CORE_UNSPEC
> +
>  /* A datapath flow as dumped by dpif_flow_dump_next(). */
>  struct dpif_flow {
>      const struct nlattr *key;     /* Flow key, as OVS_KEY_ATTR_* attrs. */
> @@ -578,6 +582,7 @@ struct dpif_flow {
>      size_t actions_len;           /* 'actions' length in bytes. */
>      ovs_u128 ufid;                /* Unique flow identifier. */
>      bool ufid_present;            /* True if 'ufid' was provided by datapath.*/
> +    int poller_id;                /* Datapath polling thread id. */
>      struct dpif_flow_stats stats; /* Flow statistics. */
>  };
>  int dpif_flow_dump_next(struct dpif_flow_dump_thread *,
> @@ -619,6 +624,10 @@ enum dpif_op_type {
>   *
>   *     If the operation succeeds, then 'stats', if nonnull, will be set to the
>   *     flow's statistics before the update.
> + *
> + *   - If the datapath implements multiple polling thread with its own flow
> + *     table, 'poller_id' should be used to specify the particular polling
> + *     thread for the operation.
>   */
>  struct dpif_flow_put {
>      /* Input. */
> @@ -630,6 +639,7 @@ struct dpif_flow_put {
>      const struct nlattr *actions;   /* Actions to perform on flow. */
>      size_t actions_len;             /* Length of 'actions' in bytes. */
>      const ovs_u128 *ufid;           /* Optional unique flow identifier. */
> +    int poller_id;                  /* Datapath polling thread id. */
>
>      /* Output. */
>      struct dpif_flow_stats *stats;  /* Optional flow statistics. */
> @@ -647,6 +657,10 @@ struct dpif_flow_put {
>   * Callers should always provide the 'key' to improve dpif logging in the event
>   * of errors or unexpected behaviour.
>   *
> + * If the datapath implements multiple polling thread with its own flow table,
> + * 'poller_id' should be used to specify the particular polling thread for the
> + * operation.
> + *
>   * If the operation succeeds, then 'stats', if nonnull, will be set to the
>   * flow's statistics before its deletion. */
>  struct dpif_flow_del {
> @@ -654,6 +668,7 @@ struct dpif_flow_del {
>      const struct nlattr *key;       /* Flow to delete. */
>      size_t key_len;                 /* Length of 'key' in bytes. */
>      const ovs_u128 *ufid;           /* Unique identifier of flow to delete. */
> +    int poller_id;                  /* Datapath polling thread id. */
>
>      /* Output. */
>      struct dpif_flow_stats *stats;  /* Optional flow statistics. */
> @@ -703,6 +718,10 @@ struct dpif_execute {
>   * Callers should always provide 'key' to improve dpif logging in the event of
>   * errors or unexpected behaviour.
>   *
> + * If the datapath implements multiple polling thread with its own flow table,
> + * 'poller_id' should be used to specify the particular polling thread for the
> + * operation.
> + *
>   * Succeeds with status 0 if the flow is fetched, or fails with ENOENT if no
>   * such flow exists. Other failures are indicated with a positive errno value.
>   */
> @@ -711,6 +730,7 @@ struct dpif_flow_get {
>      const struct nlattr *key;       /* Flow to get. */
>      size_t key_len;                 /* Length of 'key' in bytes. */
>      const ovs_u128 *ufid;           /* Unique identifier of flow to get. */
> +    int poller_id;                  /* Datapath polling thread id. */
>      struct ofpbuf *buffer;          /* Storage for output parameters. */
>
>      /* Output. */
> @@ -767,8 +787,9 @@ struct dpif_upcall {
>  /* A callback to process an upcall, currently implemented only by dpif-netdev.
>   *
>   * The caller provides the 'packet' and 'flow' to process, the corresponding
> - * 'ufid' as generated by dpif_flow_hash(), the 'type' of the upcall, and if
> - * 'type' is DPIF_UC_ACTION then the 'userdata' attached to the action.
> + * 'ufid' as generated by dpif_flow_hash(), the polling thread id 'poller_id',
> + * the 'type' of the upcall, and if 'type' is DPIF_UC_ACTION then the
> + * 'userdata' attached to the action.
>   *
>   * The callback must fill in 'actions' with the datapath actions to apply to
>   * 'packet'.  'wc' and 'put_actions' will either be both null or both nonnull.
> @@ -784,6 +805,7 @@ struct dpif_upcall {
>  typedef int upcall_callback(const struct ofpbuf *packet,
>                              const struct flow *flow,
>                              ovs_u128 *ufid,
> +                            int poller_id,
>                              enum dpif_upcall_type type,
>                              const struct nlattr *userdata,
>                              struct ofpbuf *actions,
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index 38e1aff..deaf667 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -158,6 +158,7 @@ struct upcall {
>       * may be used with other datapaths. */
>      const struct flow *flow;       /* Parsed representation of the packet. */
>      const ovs_u128 *ufid;          /* Unique identifier for 'flow'. */
> +    int poller_id;                 /* Datapath polling thread id. */
>      const struct ofpbuf *packet;   /* Packet associated with this upcall. */
>      ofp_port_t in_port;            /* OpenFlow in port, or OFPP_NONE. */
>
> @@ -211,6 +212,7 @@ struct udpif_key {
>      ovs_u128 ufid;                 /* Unique flow identifier. */
>      bool ufid_present;             /* True if 'ufid' is in datapath. */
>      uint32_t hash;                 /* Pre-computed hash for 'key'. */
> +    int poller_id;                 /* Datapath polling thread id. */
>
>      struct ovs_mutex mutex;                   /* Guards the following. */
>      struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
> @@ -288,7 +290,7 @@ static enum upcall_type classify_upcall(enum dpif_upcall_type type,
>  static int upcall_receive(struct upcall *, const struct dpif_backer *,
>                            const struct ofpbuf *packet, enum dpif_upcall_type,
>                            const struct nlattr *userdata, const struct flow *,
> -                          const ovs_u128 *ufid);
> +                          const ovs_u128 *ufid, const int poller_id);
>  static void upcall_uninit(struct upcall *);
>
>  static upcall_callback upcall_cb;
> @@ -650,7 +652,7 @@ recv_upcalls(struct handler *handler)
>
>          error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
>                                 dupcall->type, dupcall->userdata, flow,
> -                               &dupcall->ufid);
> +                               &dupcall->ufid, POLLER_ID_NULL);
>          if (error) {
>              if (error == ENODEV) {
>                  /* Received packet on datapath port for which we couldn't
> @@ -659,7 +661,7 @@ recv_upcalls(struct handler *handler)
>                   * message in case it happens frequently. */
>                  dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
>                                dupcall->key_len, NULL, 0, NULL, 0,
> -                              &dupcall->ufid, NULL);
> +                              &dupcall->ufid, POLLER_ID_NULL, NULL);
>                  VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
>                               "port %"PRIu32, flow->in_port.odp_port);
>              }
> @@ -880,7 +882,7 @@ static int
>  upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
>                 const struct ofpbuf *packet, enum dpif_upcall_type type,
>                 const struct nlattr *userdata, const struct flow *flow,
> -               const ovs_u128 *ufid)
> +               const ovs_u128 *ufid, const int poller_id)
>  {
>      int error;
>
> @@ -893,6 +895,7 @@ upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
>      upcall->flow = flow;
>      upcall->packet = packet;
>      upcall->ufid = ufid;
> +    upcall->poller_id = poller_id;
>      upcall->type = type;
>      upcall->userdata = userdata;
>      ofpbuf_init(&upcall->put_actions, 0);
> @@ -994,9 +997,9 @@ upcall_uninit(struct upcall *upcall)
>
>  static int
>  upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid,
> -          enum dpif_upcall_type type, const struct nlattr *userdata,
> -          struct ofpbuf *actions, struct flow_wildcards *wc,
> -          struct ofpbuf *put_actions, void *aux)
> +          int poller_id, enum dpif_upcall_type type,
> +          const struct nlattr *userdata, struct ofpbuf *actions,
> +          struct flow_wildcards *wc, struct ofpbuf *put_actions, void *aux)
>  {
>      struct udpif *udpif = aux;
>      unsigned int flow_limit;
> @@ -1008,7 +1011,7 @@ upcall_cb(const struct ofpbuf *packet, const struct flow *flow, ovs_u128 *ufid,
>      atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
>
>      error = upcall_receive(&upcall, udpif->backer, packet, type, userdata,
> -                           flow, ufid);
> +                           flow, ufid, poller_id);
>      if (error) {
>          return error;
>      }
> @@ -1257,7 +1260,7 @@ static struct udpif_key *
>  ukey_create__(const struct nlattr *key, size_t key_len,
>                const struct nlattr *mask, size_t mask_len,
>                bool ufid_present, const ovs_u128 *ufid,
> -              const struct ofpbuf *actions,
> +              const int poller_id, const struct ofpbuf *actions,
>                uint64_t dump_seq, uint64_t reval_seq, long long int used)
>      OVS_NO_THREAD_SAFETY_ANALYSIS
>  {
> @@ -1271,6 +1274,7 @@ ukey_create__(const struct nlattr *key, size_t key_len,
>      ukey->mask_len = mask_len;
>      ukey->ufid_present = ufid_present;
>      ukey->ufid = *ufid;
> +    ukey->poller_id = poller_id;
>      ukey->hash = get_ufid_hash(&ukey->ufid);
>      ukey->actions = ofpbuf_clone(actions);
>
> @@ -1316,8 +1320,9 @@ ukey_create_from_upcall(const struct upcall *upcall)
>
>      return ukey_create__(ofpbuf_data(&keybuf), ofpbuf_size(&keybuf),
>                           ofpbuf_data(&maskbuf), ofpbuf_size(&maskbuf),
> -                         true, upcall->ufid, &upcall->put_actions,
> -                         upcall->dump_seq, upcall->reval_seq, 0);
> +                         true, upcall->ufid, upcall->poller_id,
> +                         &upcall->put_actions, upcall->dump_seq,
> +                         upcall->reval_seq, 0);
>  }
>
>  static int
> @@ -1336,8 +1341,8 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
>
>          /* If the key was not provided by the datapath, fetch the full flow. */
>          ofpbuf_use_stack(&buf, &stub, sizeof stub);
> -        err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid, &buf,
> -                            &full_flow);
> +        err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid,
> +                            flow->poller_id, &buf, &full_flow);
>          if (err) {
>              return err;
>          }
> @@ -1348,8 +1353,9 @@ ukey_create_from_dpif_flow(const struct udpif *udpif,
>      ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
>      *ukey = ukey_create__(flow->key, flow->key_len,
>                            flow->mask, flow->mask_len, flow->ufid_present,
> -                          &flow->ufid, &actions, dump_seq, reval_seq,
> -                          flow->stats.used);
> +                          &flow->ufid, flow->poller_id, &actions, dump_seq,
> +                          reval_seq, flow->stats.used);
> +
>      return 0;
>  }
>
> @@ -1679,6 +1685,7 @@ delete_op_init__(struct ukey_op *op, const struct dpif_flow *flow)
>      op->dop.u.flow_del.key = flow->key;
>      op->dop.u.flow_del.key_len = flow->key_len;
>      op->dop.u.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL;
> +    op->dop.u.flow_del.poller_id = flow->poller_id;
>      op->dop.u.flow_del.stats = &op->stats;
>  }
>
> @@ -1690,6 +1697,7 @@ delete_op_init(struct ukey_op *op, struct udpif_key *ukey)
>      op->dop.u.flow_del.key = ukey->key;
>      op->dop.u.flow_del.key_len = ukey->key_len;
>      op->dop.u.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
> +    op->dop.u.flow_del.poller_id = ukey->poller_id;
>      op->dop.u.flow_del.stats = &op->stats;
>  }
>
> diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
> index 5b3e64c..760a157 100644
> --- a/ofproto/ofproto-dpif.c
> +++ b/ofproto/ofproto-dpif.c
> @@ -1021,7 +1021,7 @@ check_recirc(struct dpif_backer *backer)
>
>      error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_PROBE,
>                            ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL,
> -                          0, NULL, NULL);
> +                          0, NULL, POLLER_ID_NULL, NULL);
>      if (error && error != EEXIST) {
>          if (error != EINVAL) {
>              VLOG_WARN("%s: Reciculation flow probe failed (%s)",
> @@ -1031,7 +1031,7 @@ check_recirc(struct dpif_backer *backer)
>      }
>
>      error = dpif_flow_del(backer->dpif, ofpbuf_data(&key), ofpbuf_size(&key),
> -                          NULL, NULL);
> +                          NULL, POLLER_ID_NULL, NULL);
>      if (error) {
>          VLOG_WARN("%s: failed to delete recirculation feature probe flow",
>                    dpif_name(backer->dpif));
> @@ -1150,7 +1150,7 @@ check_max_mpls_depth(struct dpif_backer *backer)
>
>          error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_PROBE,
>                                ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0,
> -                              NULL, 0, NULL, NULL);
> +                              NULL, 0, NULL, POLLER_ID_NULL, NULL);
>          if (error && error != EEXIST) {
>              if (error != EINVAL) {
>                  VLOG_WARN("%s: MPLS stack length feature probe failed (%s)",
> @@ -1160,7 +1160,7 @@ check_max_mpls_depth(struct dpif_backer *backer)
>          }
>
>          error = dpif_flow_del(backer->dpif, ofpbuf_data(&key),
> -                              ofpbuf_size(&key), NULL, NULL);
> +                              ofpbuf_size(&key), NULL, POLLER_ID_NULL, NULL);
>          if (error) {
>              VLOG_WARN("%s: failed to delete MPLS feature probe flow",
>                        dpif_name(backer->dpif));
> diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
> index fbf35c6..4cbe9a8 100644
> --- a/tests/ofproto-dpif.at
> +++ b/tests/ofproto-dpif.at
> @@ -3509,6 +3509,7 @@ for type in no first later; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),5
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6
> @@ -3527,6 +3528,7 @@ for type in no first later; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=first), packets:0, bytes:0, used:never, actions:drop
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(frag=later), packets:0, bytes:0, used:never, actions:drop
> @@ -3545,6 +3547,7 @@ for type in no first later; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(dst=80), packets:0, bytes:0, used:never, actions:set(tcp(dst=81)),2
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:0, bytes:0, used:never, actions:6
> @@ -3616,6 +3619,7 @@ for frag in 4000 6000 6008 4010; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=33419), packets:0, bytes:0, used:never, actions:set(tcp(src=33322)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:74, used:0.001s, actions:1
> @@ -3631,6 +3635,7 @@ for frag in 4000 6000 6008 4010; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1
> @@ -3646,6 +3651,7 @@ for frag in 4000 6000 6001 4002; do
>  done
>
>  AT_CHECK([ovs-appctl dpctl/dump-flows | sed 's/used:[[0-9]].[[0-9]]*s/used:0.001s/'], [0], [dnl
> +flow-dump from non-dpdk interfaces:
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=no),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=first),tcp(src=0), packets:0, bytes:0, used:never, actions:set(tcp(src=42)),1
>  recirc_id(0),in_port(90),eth_type(0x0800),ipv4(proto=6,frag=later), packets:1, bytes:60, used:0.001s, actions:1
> --
> 1.7.9.5
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list