[ovs-dev] [dpdk patch v3 6/6] dpif-netdev: Create multiple pmd threads by default.

Alex Wang alexw at nicira.com
Fri Sep 12 05:43:45 UTC 2014


One example is when STP config pkts are received from the dpdk port,

when processing the pkts in xlate's process_special(), if the stp state
machine need to send a config immediately via the dpdk interface, the state
machine will directly invoke the stp->send() callback function and sends the
pkt immediately (via calling dpif_netdev_execute()).

On Thu, Sep 11, 2014 at 10:39 PM, Pravin Shelar <pshelar at nicira.com> wrote:

> On Tue, Sep 9, 2014 at 5:00 PM, Alex Wang <alexw at nicira.com> wrote:
> > With this commit, ovs by default will try creating 'number of
> > dpdk interfaces on numa node' pmd threads for each numa node
> > and pin the pmd threads to available cpu cores on the numa node.
> >
> > NON_PMD_CORE_ID (currently 0) is used to reserve a particular
> > cpu core for the I/O of all non-pmd threads.  No pmd thread
> > can be pinned to this reserved core.
> >
> > As side-effects of this commit:
> >
> > - the exact-match cache for non-pmd threads is removed from
> >   'struct dp_netdev'.  Instead, all non-pmd threads will use
> >   the exact-match cache defined in the 'struct dp_netdev_pmd_thread'
> >   for NON_PMD_CORE_ID.
> >
> > - the received packet processing functions are refactored to use
> >   'struct dp_netdev_pmd_thread' as input.
> >
> > - the 'netdev_send()' function will be called with the proper
> >   queue id.
> >
> > Signed-off-by: Alex Wang <alexw at nicira.com>
> >
> > ---
> > PATCH -> V2
> > - rebase and refactor the code.
> >
> > V2 -> V3:
> > - both pmd and non-pmd threads can call the dpif_netdev_execute().
> >   so, use a per-thread variable to help recognize the calling thread.
> > ---
> >  lib/dpif-netdev.c |  407
> +++++++++++++++++++++++++++++++++++++----------------
> >  lib/dpif-netdev.h |    4 +-
> >  lib/netdev-dpdk.c |   17 ++-
> >  lib/netdev-dpdk.h |    7 +
> >  4 files changed, 302 insertions(+), 133 deletions(-)
> >
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index dcce02e..29a92b3 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -52,6 +52,7 @@
> >  #include "odp-util.h"
> >  #include "ofp-print.h"
> >  #include "ofpbuf.h"
> > +#include "ovs-numa.h"
> >  #include "ovs-rcu.h"
> >  #include "packet-dpif.h"
> >  #include "packets.h"
> > @@ -158,7 +159,6 @@ struct emc_cache {
> >   *
> >   *    dp_netdev_mutex (global)
> >   *    port_mutex
> > - *    emc_mutex
> >   *    flow_mutex
> >   */
> >  struct dp_netdev {
> > @@ -195,17 +195,16 @@ struct dp_netdev {
> >      upcall_callback *upcall_cb;  /* Callback function for executing
> upcalls. */
> >      void *upcall_aux;
> >
> > -    /* Forwarding threads. */
> > -    struct latch exit_latch;
> > -    struct pmd_thread *pmd_threads;
> > -    size_t n_pmd_threads;
> > -    int pmd_count;
> > -
> > -    /* Exact match cache for non-pmd devices.
> > -     * Pmd devices use instead each thread's flow_cache for this
> purpose.
> > -     * Protected by emc_mutex */
> > -    struct emc_cache flow_cache OVS_GUARDED;
> > -    struct ovs_mutex emc_mutex;
> > +    /* Stores all 'struct dp_netdev_pmd_thread's. */
> > +    struct cmap poll_threads;
> > +
> > +    /* Protects the access of the 'struct dp_netdev_pmd_thread'
> > +     * instance for non-pmd thread. */
> > +    struct ovs_mutex non_pmd_mutex;
> > +
> > +    /* Each pmd thread will store its pointer to
> > +     * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
> > +    ovsthread_key_t per_pmd_key;
> >  };
> >
> >  static struct dp_netdev_port *dp_netdev_lookup_port(const struct
> dp_netdev *dp,
> > @@ -340,15 +339,25 @@ static void dp_netdev_actions_free(struct
> dp_netdev_actions *);
> >   *
> >   * DPDK used PMD for accessing NIC.
> >   *
> > - * A thread that receives packets from PMD ports, looks them up in the
> flow
> > - * table, and executes the actions it finds.
> > + * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
> > + * I/O of all non-pmd threads.  There will be no actual thread created
> > + * for the instance.
> >   **/
> > -struct pmd_thread {
> > +struct dp_netdev_pmd_thread {
> >      struct dp_netdev *dp;
> > +    struct cmap_node node;          /* In 'dp->poll_threads'. */
> > +    /* Per thread exact-match cache.  Note, the instance for cpu core
> > +     * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
> > +     * need to be protected (e.g. by 'dp_netdev_mutex').  All other
> > +     * instances will only be accessed by its own pmd thread. */
> >      struct emc_cache flow_cache;
> > +    struct latch exit_latch;        /* For terminating the pmd thread.
> */
> > +    atomic_uint change_seq;         /* For reloading pmd ports. */
> >      pthread_t thread;
> > -    int id;
> > -    atomic_uint change_seq;
> > +    int index;                      /* Idx of this pmd thread among
> pmd*/
> > +                                    /* threads on same numa node. */
> > +    int core_id;                    /* CPU core id of this pmd thread.
> */
> > +    int numa_id;                    /* numa node id of this pmd thread.
> */
> >  };
> >
> >  #define PMD_INITIAL_SEQ 1
> > @@ -374,18 +383,22 @@ static void do_del_port(struct dp_netdev *dp,
> struct dp_netdev_port *)
> >      OVS_REQUIRES(dp->port_mutex);
> >  static int dpif_netdev_open(const struct dpif_class *, const char *name,
> >                              bool create, struct dpif **);
> > -static void dp_netdev_execute_actions(struct dp_netdev *dp,
> > +static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
> >                                        struct dpif_packet **, int c,
> >                                        bool may_steal, struct
> pkt_metadata *,
> > -                                      struct emc_cache *flow_cache,
> >                                        const struct nlattr *actions,
> >                                        size_t actions_len);
> > -static void dp_netdev_input(struct dp_netdev *, struct emc_cache *,
> > +static void dp_netdev_input(struct dp_netdev_pmd_thread *,
> >                              struct dpif_packet **, int cnt,
> >                              struct pkt_metadata *);
> > -
> > -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
> >  static void dp_netdev_disable_upcall(struct dp_netdev *);
> > +static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
> > +                                    struct dp_netdev *dp, int index,
> > +                                    int core_id, int numa_id);
> > +static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct
> dp_netdev *dp);
> > +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
> > +static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int
> numa_id);
> > +static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int
> numa_id);
> >
> >  static void emc_clear_entry(struct emc_entry *ce);
> >
> > @@ -524,6 +537,7 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >      OVS_REQUIRES(dp_netdev_mutex)
> >  {
> >      struct dp_netdev *dp;
> > +    struct dp_netdev_pmd_thread *non_pmd;
> >      int error;
> >
> >      dp = xzalloc(sizeof *dp);
> > @@ -543,7 +557,6 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >      ovs_mutex_init(&dp->port_mutex);
> >      cmap_init(&dp->ports);
> >      dp->port_seq = seq_create();
> > -    latch_init(&dp->exit_latch);
> >      fat_rwlock_init(&dp->upcall_rwlock);
> >
> >      /* Disable upcalls by default. */
> > @@ -551,6 +564,16 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >      dp->upcall_aux = NULL;
> >      dp->upcall_cb = NULL;
> >
> > +    cmap_init(&dp->poll_threads);
> > +    ovs_mutex_init_recursive(&dp->non_pmd_mutex);
> > +    ovsthread_key_create(&dp->per_pmd_key, NULL);
> > +
> > +    /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
> > +    ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
> > +    non_pmd = xzalloc(sizeof *non_pmd);
> > +    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
> > +                            OVS_NUMA_UNSPEC);
> > +
> >      ovs_mutex_lock(&dp->port_mutex);
> >      error = do_add_port(dp, name, "internal", ODPP_LOCAL);
> >      ovs_mutex_unlock(&dp->port_mutex);
> > @@ -559,9 +582,6 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >          return error;
> >      }
> >
> > -    ovs_mutex_init_recursive(&dp->emc_mutex);
> > -    emc_cache_init(&dp->flow_cache);
> > -
> >      *dpp = dp;
> >      return 0;
> >  }
> > @@ -603,8 +623,9 @@ dp_netdev_free(struct dp_netdev *dp)
> >
> >      shash_find_and_delete(&dp_netdevs, dp->name);
> >
> > -    dp_netdev_set_pmd_threads(dp, 0);
> > -    free(dp->pmd_threads);
> > +    dp_netdev_destroy_all_pmds(dp);
> > +    ovs_mutex_destroy(&dp->non_pmd_mutex);
> > +    ovsthread_key_delete(dp->per_pmd_key);
> >
> >      dp_netdev_flow_flush(dp);
> >      ovs_mutex_lock(&dp->port_mutex);
> > @@ -625,10 +646,6 @@ dp_netdev_free(struct dp_netdev *dp)
> >      seq_destroy(dp->port_seq);
> >      cmap_destroy(&dp->ports);
> >      fat_rwlock_destroy(&dp->upcall_rwlock);
> > -    latch_destroy(&dp->exit_latch);
> > -
> > -    emc_cache_uninit(&dp->flow_cache);
> > -    ovs_mutex_destroy(&dp->emc_mutex);
> >
> >      free(CONST_CAST(char *, dp->name));
> >      free(dp);
> > @@ -696,15 +713,22 @@ dpif_netdev_get_stats(const struct dpif *dpif,
> struct dpif_dp_stats *stats)
> >  }
> >
> >  static void
> > -dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
> > +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
> >  {
> > -    int i;
> > +    int old_seq;
> > +
> > +    atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
> > +}
> >
> > -    for (i = 0; i < dp->n_pmd_threads; i++) {
> > -        struct pmd_thread *f = &dp->pmd_threads[i];
> > -        int old_seq;
> > +/* Causes all pmd threads to reload its tx/rx devices.
> > + * Must be called after adding/removing ports. */
> > +static void
> > +dp_netdev_reload_pmds(struct dp_netdev *dp)
> > +{
> > +    struct dp_netdev_pmd_thread *pmd;
> >
> > -        atomic_add_relaxed(&f->change_seq, 1, &old_seq);
> > +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> > +        dp_netdev_reload_pmd__(pmd);
> >      }
> >  }
> >
> > @@ -777,9 +801,8 @@ do_add_port(struct dp_netdev *dp, const char
> *devname, const char *type,
> >      port->sf = sf;
> >
> >      if (netdev_is_pmd(netdev)) {
> > -        dp->pmd_count++;
> > -        dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
> > -        dp_netdev_reload_pmd_threads(dp);
> > +        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
> > +        dp_netdev_reload_pmds(dp);
> >      }
> >      ovs_refcount_init(&port->ref_cnt);
> >
> > @@ -930,6 +953,39 @@ get_port_by_name(struct dp_netdev *dp,
> >      return ENOENT;
> >  }
> >
> > +static int
> > +get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
> > +{
> > +    struct dp_netdev_pmd_thread *pmd;
> > +    int n_pmds = 0;
> > +
> > +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> > +        if (pmd->numa_id == numa_id) {
> > +            n_pmds++;
> > +        }
> > +    }
> > +
> > +    return n_pmds;
> > +}
> > +
> > +/* Returns 'true' if there is a port with pmd netdev and the netdev
> > + * is on numa node 'numa_id'. */
> > +static bool
> > +has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id)
> > +{
> > +    struct dp_netdev_port *port;
> > +
> > +    CMAP_FOR_EACH (port, node, &dp->ports) {
> > +        if (netdev_is_pmd(port->netdev)
> > +            && netdev_get_numa_id(port->netdev) == numa_id) {
> > +            return true;
> > +        }
> > +    }
> > +
> > +    return false;
> > +}
> > +
> > +
> >  static void
> >  do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
> >      OVS_REQUIRES(dp->port_mutex)
> > @@ -937,7 +993,14 @@ do_del_port(struct dp_netdev *dp, struct
> dp_netdev_port *port)
> >      cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
> >      seq_change(dp->port_seq);
> >      if (netdev_is_pmd(port->netdev)) {
> > -        dp_netdev_reload_pmd_threads(dp);
> > +        int numa_id = netdev_get_numa_id(port->netdev);
> > +
> > +        /* If there is no netdev on the numa node, deletes the pmd
> threads
> > +         * for that numa.  Else, just reloads the queues.  */
> > +        if (!has_pmd_port_for_numa(dp, numa_id)) {
> > +            dp_netdev_del_pmds_on_numa(dp, numa_id);
> > +        }
> > +        dp_netdev_reload_pmds(dp);
> >      }
> >
> >      port_unref(port);
> > @@ -1694,8 +1757,10 @@ dpif_netdev_flow_dump_next(struct
> dpif_flow_dump_thread *thread_,
> >
> >  static int
> >  dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
> > +    OVS_NO_THREAD_SAFETY_ANALYSIS
> >  {
> >      struct dp_netdev *dp = get_dp_netdev(dpif);
> > +    struct dp_netdev_pmd_thread *pmd;
> >      struct dpif_packet packet, *pp;
> >      struct pkt_metadata *md = &execute->md;
> >
> > @@ -1707,11 +1772,24 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
> >      packet.ofpbuf = *execute->packet;
> >      pp = &packet;
> >
> > -    ovs_mutex_lock(&dp->emc_mutex);
> > -    dp_netdev_execute_actions(dp, &pp, 1, false, md,
> > -                              &dp->flow_cache, execute->actions,
> > +    /* Tries finding the 'pmd'.  If NULL is returned, that means
> > +     * the current thread is a non-pmd thread and should use
> > +     * dp_netdev_get_nonpmd(). */
> > +    pmd = ovsthread_getspecific(dp->per_pmd_key);
> > +    if (!pmd) {
> > +        pmd = dp_netdev_get_nonpmd(dp);
> > +    }
> > +
> In which case execute is called in PMD thread?
>
> > +    /* If the current thread is non-pmd thread, acquires
> > +     * the 'non_pmd_mutex'. */
> > +    if (pmd->core_id == NON_PMD_CORE_ID) {
> > +        ovs_mutex_lock(&dp->non_pmd_mutex);
> > +    }
> > +    dp_netdev_execute_actions(pmd, &pp, 1, false, md, execute->actions,
> >                                execute->actions_len);
> > -    ovs_mutex_unlock(&dp->emc_mutex);
> > +    if (pmd->core_id == NON_PMD_CORE_ID) {
> > +        ovs_mutex_unlock(&dp->non_pmd_mutex);
> > +    }
> >
> >      /* Even though may_steal is set to false, some actions could modify
> or
> >       * reallocate the ofpbuf memory. We need to pass those changes to
> the
> > @@ -1788,8 +1866,7 @@ dp_netdev_actions_free(struct dp_netdev_actions
> *actions)
> >
> >
> >  static void
> > -dp_netdev_process_rxq_port(struct dp_netdev *dp,
> > -                           struct emc_cache *flow_cache,
> > +dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
> >                             struct dp_netdev_port *port,
> >                             struct netdev_rxq *rxq)
> >  {
> > @@ -1801,7 +1878,7 @@ dp_netdev_process_rxq_port(struct dp_netdev *dp,
> >          struct pkt_metadata md =
> PKT_METADATA_INITIALIZER(port->port_no);
> >
> >          *recirc_depth_get() = 0;
> > -        dp_netdev_input(dp, flow_cache, packets, cnt, &md);
> > +        dp_netdev_input(pmd, packets, cnt, &md);
> >      } else if (error != EAGAIN && error != EOPNOTSUPP) {
> >          static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> >
> > @@ -1815,19 +1892,19 @@ dpif_netdev_run(struct dpif *dpif)
> >  {
> >      struct dp_netdev_port *port;
> >      struct dp_netdev *dp = get_dp_netdev(dpif);
> > +    struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp);
> >
> > -    ovs_mutex_lock(&dp->emc_mutex);
> > +    ovs_mutex_lock(&dp->non_pmd_mutex);
> >      CMAP_FOR_EACH (port, node, &dp->ports) {
> >          if (!netdev_is_pmd(port->netdev)) {
> >              int i;
> >
> >              for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> > -                dp_netdev_process_rxq_port(dp, &dp->flow_cache, port,
> > -                                           port->rxq[i]);
> > +                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
> >              }
> >          }
> >      }
> > -    ovs_mutex_unlock(&dp->emc_mutex);
> > +    ovs_mutex_unlock(&dp->non_pmd_mutex);
> >  }
> >
> >  static void
> > @@ -1855,33 +1932,32 @@ struct rxq_poll {
> >  };
> >
> >  static int
> > -pmd_load_queues(struct pmd_thread *f,
> > +pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> >                  struct rxq_poll **ppoll_list, int poll_cnt)
> >  {
> > -    struct dp_netdev *dp = f->dp;
> >      struct rxq_poll *poll_list = *ppoll_list;
> >      struct dp_netdev_port *port;
> > -    int id = f->id;
> > -    int index;
> > -    int i;
> > +    int n_pmds_on_numa, index, i;
> >
> >      /* Simple scheduler for netdev rx polling. */
> >      for (i = 0; i < poll_cnt; i++) {
> > -         port_unref(poll_list[i].port);
> > +        port_unref(poll_list[i].port);
> >      }
> >
> >      poll_cnt = 0;
> > +    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
> >      index = 0;
> >
> > -    CMAP_FOR_EACH (port, node, &f->dp->ports) {
> > +    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
> >          /* Calls port_try_ref() to prevent the main thread
> >           * from deleting the port. */
> >          if (port_try_ref(port)) {
> > -            if (netdev_is_pmd(port->netdev)) {
> > +            if (netdev_is_pmd(port->netdev)
> > +                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
> >                  int i;
> >
> >                  for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> > -                    if ((index % dp->n_pmd_threads) == id) {
> > +                    if ((index % n_pmds_on_numa) == pmd->index) {
> >                          poll_list = xrealloc(poll_list,
> >                                          sizeof *poll_list * (poll_cnt +
> 1));
> >
> > @@ -1905,8 +1981,7 @@ pmd_load_queues(struct pmd_thread *f,
> >  static void *
> >  pmd_thread_main(void *f_)
> >  {
> > -    struct pmd_thread *f = f_;
> > -    struct dp_netdev *dp = f->dp;
> > +    struct dp_netdev_pmd_thread *pmd = f_;
> >      unsigned int lc = 0;
> >      struct rxq_poll *poll_list;
> >      unsigned int port_seq = PMD_INITIAL_SEQ;
> > @@ -1916,17 +1991,18 @@ pmd_thread_main(void *f_)
> >      poll_cnt = 0;
> >      poll_list = NULL;
> >
> > -    pmd_thread_setaffinity_cpu(f->id);
> > +    /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
> > +    ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> > +    pmd_thread_setaffinity_cpu(pmd->core_id);
> >  reload:
> > -    emc_cache_init(&f->flow_cache);
> > -    poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
> > +    emc_cache_init(&pmd->flow_cache);
> > +    poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
> >
> >      for (;;) {
> >          int i;
> >
> >          for (i = 0; i < poll_cnt; i++) {
> > -            dp_netdev_process_rxq_port(dp, &f->flow_cache,
> poll_list[i].port,
> > -                                       poll_list[i].rx);
> > +            dp_netdev_process_rxq_port(pmd, poll_list[i].port,
> poll_list[i].rx);
> >          }
> >
> >          if (lc++ > 1024) {
> > @@ -1936,7 +2012,7 @@ reload:
> >
> >              ovsrcu_quiesce();
> >
> > -            atomic_read_relaxed(&f->change_seq, &seq);
> > +            atomic_read_relaxed(&pmd->change_seq, &seq);
> >              if (seq != port_seq) {
> >                  port_seq = seq;
> >                  break;
> > @@ -1944,9 +2020,9 @@ reload:
> >          }
> >      }
> >
> > -    emc_cache_uninit(&f->flow_cache);
> > +    emc_cache_uninit(&pmd->flow_cache);
> >
> > -    if (!latch_is_set(&f->dp->exit_latch)){
> > +    if (!latch_is_set(&pmd->exit_latch)){
> >          goto reload;
> >      }
> >
> > @@ -1988,40 +2064,124 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
> >      dp_netdev_enable_upcall(dp);
> >  }
> >
> > +/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads.
> */
> > +static struct dp_netdev_pmd_thread *
> > +dp_netdev_get_nonpmd(struct dp_netdev *dp)
> > +{
> > +    struct dp_netdev_pmd_thread *pmd;
> > +    struct cmap_node *pnode;
> > +
> > +    pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0));
> > +    ovs_assert(pnode);
> > +    pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
> > +
> > +    return pmd;
> > +}
> > +
> > +/* Configures the 'pmd' based on the input argument. */
> > +static void
> > +dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
> dp_netdev *dp,
> > +                        int index, int core_id, int numa_id)
> > +{
> > +    pmd->dp = dp;
> > +    pmd->index = index;
> > +    pmd->core_id = core_id;
> > +    pmd->numa_id = numa_id;
> > +    latch_init(&pmd->exit_latch);
> > +    atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
> > +    /* init the 'flow_cache' since there is no
> > +     * actual thread created for NON_PMD_CORE_ID. */
> > +    if (core_id == NON_PMD_CORE_ID) {
> > +        emc_cache_init(&pmd->flow_cache);
> > +    }
> > +    cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
> &pmd->node),
> > +                hash_int(core_id, 0));
> > +}
> > +
> > +/* Stops the pmd thread, removes it from the 'dp->poll_threads'
> > + * and destroys the struct. */
> >  static void
> > -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
> > +dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
> >  {
> > -    int i;
> > +    /* Uninit the 'flow_cache' since there is
> > +     * no actual thread uninit it. */
> > +    if (pmd->core_id == NON_PMD_CORE_ID) {
> > +        emc_cache_uninit(&pmd->flow_cache);
> > +    } else {
> > +        latch_set(&pmd->exit_latch);
> > +        dp_netdev_reload_pmd__(pmd);
> > +        ovs_numa_unpin_core(pmd->core_id);
> > +        xpthread_join(pmd->thread, NULL);
> > +    }
> > +    cmap_remove(&pmd->dp->poll_threads, &pmd->node,
> hash_int(pmd->core_id, 0));
> > +    latch_destroy(&pmd->exit_latch);
> > +    free(pmd);
> > +}
> >
> > -    if (n == dp->n_pmd_threads) {
> > -        return;
> > +/* Destroys all pmd threads. */
> > +static void
> > +dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
> > +{
> > +    struct dp_netdev_pmd_thread *pmd;
> > +
> > +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> > +        dp_netdev_del_pmd(pmd);
> >      }
> > +}
> >
> > -    /* Stop existing threads. */
> > -    latch_set(&dp->exit_latch);
> > -    dp_netdev_reload_pmd_threads(dp);
> > -    for (i = 0; i < dp->n_pmd_threads; i++) {
> > -        struct pmd_thread *f = &dp->pmd_threads[i];
> > +/* Deletes all pmd threads on numa node 'numa_id'. */
> > +static void
> > +dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
> > +{
> > +    struct dp_netdev_pmd_thread *pmd;
> >
> > -        xpthread_join(f->thread, NULL);
> > +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> > +        if (pmd->numa_id == numa_id) {
> > +            dp_netdev_del_pmd(pmd);
> > +        }
> >      }
> > -    latch_poll(&dp->exit_latch);
> > -    free(dp->pmd_threads);
> > +}
> >
> > -    /* Start new threads. */
> > -    dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
> > -    dp->n_pmd_threads = n;
> > +/* Checks the numa node id of 'netdev' and starts pmd threads for
> > + * the numa node. */
> > +static void
> > +dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
> > +{
> > +    int n_pmds;
> >
> > -    for (i = 0; i < n; i++) {
> > -        struct pmd_thread *f = &dp->pmd_threads[i];
> > +    if (!ovs_numa_numa_id_is_valid(numa_id)) {
> > +        VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
> > +                 "invalid", numa_id);
> > +        return ;
> > +    }
> > +
> > +    n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
> > +
> > +    /* If there are already pmd threads created for the numa node
> > +     * in which 'netdev' is on, do nothing.  Else, creates the
> > +     * pmd threads for the numa node. */
> > +    if (!n_pmds) {
> > +        int can_have, n_unpinned, i;
> > +
> > +        n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
> > +        if (!n_unpinned) {
> > +            VLOG_ERR("Cannot create pmd threads due to out of unpinned "
> > +                     "cores on numa node");
> > +            return;
> > +        }
> >
> > -        f->dp = dp;
> > -        f->id = i;
> > -        atomic_init(&f->change_seq, PMD_INITIAL_SEQ);
> > +        /* Tries creating 'number of dpdk ifaces on numa node' pmd
> threads. */
> > +        can_have = MIN(n_unpinned, netdev_dpdk_n_devs_on_numa(numa_id));
> > +        for (i = 0; i < can_have; i++) {
> > +            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
> > +            int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
> >
> > -        /* Each thread will distribute all devices rx-queues among
> > -         * themselves. */
> > -        f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
> > +            dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id);
> > +            /* Each thread will distribute all devices rx-queues among
> > +             * themselves. */
> > +            pmd->thread = ovs_thread_create("pmd", pmd_thread_main,
> pmd);
> > +        }
> > +        VLOG_INFO("Created %d pmd threads on numa node %d", can_have,
> numa_id);
> >      }
> >  }
> >
> > @@ -2161,8 +2321,8 @@ packet_batch_init(struct packet_batch *batch,
> struct dp_netdev_flow *flow,
> >  }
> >
> >  static inline void
> > -packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
> > -                     struct emc_cache *flow_cache)
> > +packet_batch_execute(struct packet_batch *batch,
> > +                     struct dp_netdev_pmd_thread *pmd)
> >  {
> >      struct dp_netdev_actions *actions;
> >      struct dp_netdev_flow *flow = batch->flow;
> > @@ -2172,11 +2332,10 @@ packet_batch_execute(struct packet_batch *batch,
> struct dp_netdev *dp,
> >
> >      actions = dp_netdev_flow_get_actions(flow);
> >
> > -    dp_netdev_execute_actions(dp, batch->packets, batch->packet_count,
> true,
> > -                              &batch->md, flow_cache,
> > -                              actions->actions, actions->size);
> > +    dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count,
> true,
> > +                              &batch->md, actions->actions,
> actions->size);
> >
> > -    dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
> > +    dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
> >  }
> >
> >  static inline bool
> > @@ -2231,12 +2390,13 @@ dpif_packet_swap(struct dpif_packet **a, struct
> dpif_packet **b)
> >   * 'packets' array (they have been moved to the beginning of the
> vector).
> >   */
> >  static inline size_t
> > -emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
> > -               struct dpif_packet **packets, size_t cnt,
> > -               struct pkt_metadata *md, struct netdev_flow_key *keys)
> > +emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet
> **packets,
> > +               size_t cnt, struct pkt_metadata *md,
> > +               struct netdev_flow_key *keys)
> >  {
> >      struct netdev_flow_key key;
> >      struct packet_batch batches[4];
> > +    struct emc_cache *flow_cache = &pmd->flow_cache;
> >      size_t n_batches, i;
> >      size_t notfound_cnt = 0;
> >
> > @@ -2269,14 +2429,14 @@ emc_processing(struct dp_netdev *dp, struct
> emc_cache *flow_cache,
> >      }
> >
> >      for (i = 0; i < n_batches; i++) {
> > -        packet_batch_execute(&batches[i], dp, flow_cache);
> > +        packet_batch_execute(&batches[i], pmd);
> >      }
> >
> >      return notfound_cnt;
> >  }
> >
> >  static inline void
> > -fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
> > +fast_path_processing(struct dp_netdev_pmd_thread *pmd,
> >                       struct dpif_packet **packets, size_t cnt,
> >                       struct pkt_metadata *md, struct netdev_flow_key
> *keys)
> >  {
> > @@ -2289,6 +2449,8 @@ fast_path_processing(struct dp_netdev *dp, struct
> emc_cache *flow_cache,
> >      struct packet_batch batches[PKT_ARRAY_SIZE];
> >      const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets.
> */
> >      struct cls_rule *rules[PKT_ARRAY_SIZE];
> > +    struct dp_netdev *dp = pmd->dp;
> > +    struct emc_cache *flow_cache = &pmd->flow_cache;
> >      size_t n_batches, i;
> >      bool any_miss;
> >
> > @@ -2337,8 +2499,8 @@ fast_path_processing(struct dp_netdev *dp, struct
> emc_cache *flow_cache,
> >              /* We can't allow the packet batching in the next loop to
> execute
> >               * the actions.  Otherwise, if there are any slow path
> actions,
> >               * we'll send the packet up twice. */
> > -            dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
> > -                                      flow_cache, ofpbuf_data(&actions),
> > +            dp_netdev_execute_actions(pmd, &packets[i], 1, false, md,
> > +                                      ofpbuf_data(&actions),
> >                                        ofpbuf_size(&actions));
> >
> >              add_actions = ofpbuf_size(&put_actions)
> > @@ -2375,18 +2537,19 @@ fast_path_processing(struct dp_netdev *dp,
> struct emc_cache *flow_cache,
> >          }
> >
> >          flow = dp_netdev_flow_cast(rules[i]);
> > -        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet),
> flow);
> > +        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet),
> > +                   flow);
> >          dp_netdev_queue_batches(packet, md, flow, mfs[i], batches,
> &n_batches,
> >                                  ARRAY_SIZE(batches));
> >      }
> >
> >      for (i = 0; i < n_batches; i++) {
> > -        packet_batch_execute(&batches[i], dp, flow_cache);
> > +        packet_batch_execute(&batches[i], pmd);
> >      }
> >  }
> >
> >  static void
> > -dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
> > +dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
> >                  struct dpif_packet **packets, int cnt, struct
> pkt_metadata *md)
> >  {
> >  #if !defined(__CHECKER__) && !defined(_WIN32)
> > @@ -2398,15 +2561,14 @@ dp_netdev_input(struct dp_netdev *dp, struct
> emc_cache *flow_cache,
> >      struct netdev_flow_key keys[PKT_ARRAY_SIZE];
> >      size_t newcnt;
> >
> > -    newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys);
> > +    newcnt = emc_processing(pmd, packets, cnt, md, keys);
> >      if (OVS_UNLIKELY(newcnt)) {
> > -        fast_path_processing(dp, flow_cache, packets, newcnt, md, keys);
> > +        fast_path_processing(pmd, packets, newcnt, md, keys);
> >      }
> >  }
> >
> >  struct dp_netdev_execute_aux {
> > -    struct dp_netdev *dp;
> > -    struct emc_cache *flow_cache;
> > +    struct dp_netdev_pmd_thread *pmd;
> >  };
> >
> >  static void
> > @@ -2426,7 +2588,8 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >  {
> >      struct dp_netdev_execute_aux *aux = aux_;
> >      uint32_t *depth = recirc_depth_get();
> > -    struct dp_netdev *dp = aux->dp;
> > +    struct dp_netdev_pmd_thread *pmd= aux->pmd;
> > +    struct dp_netdev *dp= pmd->dp;
> >      int type = nl_attr_type(a);
> >      struct dp_netdev_port *p;
> >      int i;
> > @@ -2435,7 +2598,7 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >      case OVS_ACTION_ATTR_OUTPUT:
> >          p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
> >          if (OVS_LIKELY(p)) {
> > -            netdev_send(p->netdev, NETDEV_QID_NONE, packets, cnt,
> may_steal);
> > +            netdev_send(p->netdev, pmd->core_id, packets, cnt,
> may_steal);
> >          } else if (may_steal) {
> >              for (i = 0; i < cnt; i++) {
> >                  dpif_packet_delete(packets[i]);
> > @@ -2462,8 +2625,7 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >                                           DPIF_UC_ACTION, userdata,
> &actions,
> >                                           NULL);
> >                  if (!error || error == ENOSPC) {
> > -                    dp_netdev_execute_actions(dp, &packets[i], 1,
> false, md,
> > -                                              aux->flow_cache,
> > +                    dp_netdev_execute_actions(pmd, &packets[i], 1,
> false, md,
> >                                                ofpbuf_data(&actions),
> >                                                ofpbuf_size(&actions));
> >                  }
> > @@ -2525,7 +2687,7 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >                  /* Hash is private to each packet */
> >                  recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]);
> >
> > -                dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1,
> > +                dp_netdev_input(pmd, &recirc_pkt, 1,
> >                                  &recirc_md);
> >              }
> >              (*depth)--;
> > @@ -2555,13 +2717,12 @@ dp_execute_cb(void *aux_, struct dpif_packet
> **packets, int cnt,
> >  }
> >
> >  static void
> > -dp_netdev_execute_actions(struct dp_netdev *dp,
> > +dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
> >                            struct dpif_packet **packets, int cnt,
> >                            bool may_steal, struct pkt_metadata *md,
> > -                          struct emc_cache *flow_cache,
> >                            const struct nlattr *actions, size_t
> actions_len)
> >  {
> > -    struct dp_netdev_execute_aux aux = {dp, flow_cache};
> > +    struct dp_netdev_execute_aux aux = {pmd};
> >
> >      odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
> >                          actions_len, dp_execute_cb);
> > diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
> > index adbbf87..f501f7c 100644
> > --- a/lib/dpif-netdev.h
> > +++ b/lib/dpif-netdev.h
> > @@ -40,9 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b)
> >      }
> >  }
> >
> > -#define NETDEV_QID_NONE INT_MAX
> > -
> > -#define NR_PMD_THREADS 1
> > +#define NON_PMD_CORE_ID 0
> >
> >  #ifdef  __cplusplus
> >  }
> > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> > index 26b1591..928ca3f 100644
> > --- a/lib/netdev-dpdk.c
> > +++ b/lib/netdev-dpdk.c
> > @@ -404,7 +404,6 @@ dpdk_get_n_devs(int numa_id)
> >              count++;
> >          }
> >      }
> > -    ovs_assert(count);
> >
> >      return count;
> >  }
> > @@ -495,8 +494,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned
> int port_no) OVS_REQUIRES(dpdk
> >
> >      ovs_mutex_lock(&netdev->mutex);
> >
> > -    /* XXX: need to discover device node at run time. */
> > -    netdev->socket_id = SOCKET0;
> > +    netdev->socket_id = rte_eth_dev_socket_id(port_no);
> >
> >      /* There can only be ovs_numa_get_n_cores() pmd threads, so creates
> a tx_q
> >       * for each of them. */
> > @@ -1503,6 +1501,13 @@ netdev_dpdk_register(void)
> >      }
> >  }
> >
> > +/* Returns the number of dpdk interfaces on numa node 'numa_id'. */
> > +int
> > +netdev_dpdk_n_devs_on_numa(int numa_id)
> > +{
> > +    return dpdk_get_n_devs(numa_id);
> > +}
> > +
> >  int
> >  pmd_thread_setaffinity_cpu(int cpu)
> >  {
> > @@ -1517,7 +1522,8 @@ pmd_thread_setaffinity_cpu(int cpu)
> >          return err;
> >      }
> >      /* lcore_id 0 is reseved for use by non pmd threads. */
> > -    RTE_PER_LCORE(_lcore_id) = cpu + 1;
> > +    ovs_assert(cpu);
> > +    RTE_PER_LCORE(_lcore_id) = cpu;
> >
> >      return 0;
> >  }
> > @@ -1525,9 +1531,6 @@ pmd_thread_setaffinity_cpu(int cpu)
> >  void
> >  thread_set_nonpmd(void)
> >  {
> > -    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is
> reserved
> > -     * for non pmd threads */
> > -    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
> >      /* We have to use 0 to allow non pmd threads to perform certain DPDK
> >       * operations, like rte_eth_dev_configure(). */
> >      RTE_PER_LCORE(_lcore_id) = 0;
> > diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
> > index e4ba6fc..8fe5a42 100644
> > --- a/lib/netdev-dpdk.h
> > +++ b/lib/netdev-dpdk.h
> > @@ -22,6 +22,7 @@ struct dpif_packet;
> >
> >  int dpdk_init(int argc, char **argv);
> >  void netdev_dpdk_register(void);
> > +int netdev_dpdk_n_devs_on_numa(int numa_id);
> >  void free_dpdk_buf(struct dpif_packet *);
> >  int pmd_thread_setaffinity_cpu(int cpu);
> >  void thread_set_nonpmd(void);
> > @@ -40,6 +41,12 @@ netdev_dpdk_register(void)
> >      /* Nothing */
> >  }
> >
> > +static inline int
> > +netdev_dpdk_n_devs_on_numa(int numa_id OVS_UNUSED)
> > +{
> > +    return -1;
> > +}
> > +
> >  static inline void
> >  free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
> >  {
> > --
> > 1.7.9.5
> >
>



More information about the dev mailing list