[ovs-dev] [dpdk patch 2/2] dpif-netdev: Allow multi-rx-queue, multi-pmd-thread configuration.

Pravin Shelar pshelar at nicira.com
Fri Sep 19 18:14:10 UTC 2014


On Mon, Sep 15, 2014 at 2:03 PM, Alex Wang <alexw at nicira.com> wrote:
> This commits adds the multithreading functionality to OVS dpdk
> module.  Users are able to create multiple pmd threads and set
> their cpu affinity via specifying the cpu mask string similar
> to the EAL '-c COREMASK' option.
>
> Also, the number of rx queues for each dpdk interface is made
> configurable to help distribution of rx packets among multiple
> pmd threads.
>
> Signed-off-by: Alex Wang <alexw at nicira.com>
> ---
>  lib/dpif-linux.c           |    1 +
>  lib/dpif-netdev.c          |  122 +++++++++++++++++++++++++++++++++++++++++---
>  lib/dpif-provider.h        |    7 +++
>  lib/dpif.c                 |   18 +++++++
>  lib/dpif.h                 |    2 +
>  ofproto/ofproto-dpif.c     |    2 +
>  ofproto/ofproto-provider.h |    6 +++
>  ofproto/ofproto.c          |   16 ++++++
>  ofproto/ofproto.h          |    2 +
>  vswitchd/bridge.c          |    3 ++
>  vswitchd/vswitch.xml       |   27 ++++++++++
>  11 files changed, 198 insertions(+), 8 deletions(-)
>
> diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
> index 2c387ed..ed2058c 100644
> --- a/lib/dpif-linux.c
> +++ b/lib/dpif-linux.c
> @@ -1873,6 +1873,7 @@ const struct dpif_class dpif_linux_class = {
>      dpif_linux_operate,
>      dpif_linux_recv_set,
>      dpif_linux_handlers_set,
> +    NULL,                       /* poll_thread_set */
>      dpif_linux_queue_to_priority,
>      dpif_linux_recv,
>      dpif_linux_recv_wait,
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 3f69219..bfab78a 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -205,6 +205,11 @@ struct dp_netdev {
>      /* Each pmd thread will store its pointer to
>       * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
>      ovsthread_key_t per_pmd_key;
> +
> +    /* Number of rx queues for each dpdk interface and the cpu mask
> +     * for pin of pmd threads. */
> +    size_t n_dpdk_rxqs;
> +    char *pmd_cmask;
>  };
>
>  static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
> @@ -395,10 +400,12 @@ static void dp_netdev_disable_upcall(struct dp_netdev *);
>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>                                      struct dp_netdev *dp, int index,
>                                      int core_id, int numa_id);
> +static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
>  static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
>  static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
>  static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
>  static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
> +static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
>
>  static void emc_clear_entry(struct emc_entry *ce);
>
> @@ -537,7 +544,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>      OVS_REQUIRES(dp_netdev_mutex)
>  {
>      struct dp_netdev *dp;
> -    struct dp_netdev_pmd_thread *non_pmd;
>      int error;
>
>      dp = xzalloc(sizeof *dp);
> @@ -570,9 +576,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>
>      /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
>      ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
> -    non_pmd = xzalloc(sizeof *non_pmd);
> -    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
> -                            OVS_NUMA_UNSPEC);
> +    dp_netdev_set_nonpmd(dp);
>
>      ovs_mutex_lock(&dp->port_mutex);
>      error = do_add_port(dp, name, "internal", ODPP_LOCAL);
> @@ -776,8 +780,10 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>              return ENOENT;
>          }
>          /* There can only be ovs_numa_get_n_cores() pmd threads,
> -         * so creates a tx_q for each. */
> -        error = netdev_set_multiq(netdev, n_cores, NR_QUEUE);
> +         * so creates a txq for each. */
> +        error = netdev_set_multiq(netdev, n_cores,
> +                                  dp->n_dpdk_rxqs ? dp->n_dpdk_rxqs
> +                                                  : NR_QUEUE);
We can just initialize n_dpdk_rxqs to NR_QUEUE, so that we can avoid this check.

>          if (error) {
>              VLOG_ERR("%s, cannot set multiq", devname);
>              return errno;
> @@ -1842,6 +1848,77 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
>      }
>  }
>
> +/* Returns true if the configuration for rx queues or cpu mask
> + * changed. */
> +static bool
> +pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
> +{
> +    if (dp->n_dpdk_rxqs != rxqs) {
> +        return true;
> +    } else {
> +        if (dp->pmd_cmask != NULL && cmask != NULL) {
> +            return strcmp(dp->pmd_cmask, cmask);
> +        } else {
> +            return (dp->pmd_cmask != NULL || cmask != NULL);
> +        }
> +    }
> +}
> +
> +/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
> +static int
> +dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
> +{
> +    struct dp_netdev *dp = get_dp_netdev(dpif);
> +
> +    if (pmd_config_changed(dp, n_rxqs, cmask)) {
> +        struct dp_netdev_port *port;
> +
> +        dp_netdev_destroy_all_pmds(dp);
> +
> +        CMAP_FOR_EACH (port, node, &dp->ports) {
> +            if (netdev_is_pmd(port->netdev)) {
> +                int i, err;
> +
> +                /* Closes the existing 'rxq's. */
> +                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> +                    netdev_rxq_close(port->rxq[i]);
> +                    port->rxq[i] = NULL;
> +                }
> +
> +                /* Sets the new rx queue config.  */
> +                err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(),
> +                                        n_rxqs);
> +                if (err) {
> +                    VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
> +                             " %u", netdev_get_name(port->netdev),
> +                             n_rxqs);
> +                    return err;
> +                }
> +
> +                /* If the set_multiq() above succeeds, reopens the 'rxq's. */
> +                port->rxq = xrealloc(port->rxq, sizeof *port->rxq
> +                                     * netdev_n_rxq(port->netdev));
> +                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> +                    netdev_rxq_open(port->netdev, &port->rxq[i], i);
> +                }
> +            }
> +        }
> +        dp->n_dpdk_rxqs = n_rxqs;
> +
> +        /* Reconfigures the cpu mask. */
> +        ovs_numa_set_cpu_mask(cmask);
> +        free(dp->pmd_cmask);
> +        dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL;
> +
> +        /* Restores the non-pmd. */
> +        dp_netdev_set_nonpmd(dp);
> +        /* Restores all pmd threads. */
> +        dp_netdev_reset_pmd_threads(dp);
> +    }
> +
> +    return 0;
> +}
> +
>  static int
>  dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
>                                uint32_t queue_id, uint32_t *priority)
> @@ -2093,6 +2170,17 @@ dp_netdev_get_nonpmd(struct dp_netdev *dp)
>      return pmd;
>  }
>
> +/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
> +static void
> +dp_netdev_set_nonpmd(struct dp_netdev *dp)
> +{
> +    struct dp_netdev_pmd_thread *non_pmd;
> +
> +    non_pmd = xzalloc(sizeof *non_pmd);
> +    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
> +                            OVS_NUMA_UNSPEC);
> +}
> +
>  /* Configures the 'pmd' based on the input argument. */
>  static void
>  dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
> @@ -2185,8 +2273,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>              return;
>          }
>
> -        /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */
> -        can_have = MIN(n_unpinned, NR_PMD_THREADS);
> +        /* If cpu mask is specified, uses all unpinned cores, otherwise
> +         * tries creating NR_PMD_THREADS pmd threads. */
> +        can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
>          for (i = 0; i < can_have; i++) {
>              struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
>              int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
> @@ -2209,6 +2298,22 @@ dp_netdev_flow_stats_new_cb(void)
>      return bucket;
>  }
>
> +/* Called after pmd threads config change.  Restarts pmd threads with
> + * new configuration. */
> +static void
> +dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
> +{
> +    struct dp_netdev_port *port;
> +
> +    CMAP_FOR_EACH (port, node, &dp->ports) {
> +        if (netdev_is_pmd(port->netdev)) {
> +            int numa_id = netdev_get_numa_id(port->netdev);
> +
> +            dp_netdev_set_pmds_on_numa(dp, numa_id);
> +        }
> +    }
> +}
> +
>  static void
>  dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
>                      int cnt, int size,
> @@ -2772,6 +2877,7 @@ const struct dpif_class dpif_netdev_class = {
>      dpif_netdev_operate,
>      NULL,                       /* recv_set */
>      NULL,                       /* handlers_set */
> +    dpif_netdev_pmd_set,
>      dpif_netdev_queue_to_priority,
>      NULL,                       /* recv */
>      NULL,                       /* recv_wait */
> diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> index 89b32dd..e1136e1 100644
> --- a/lib/dpif-provider.h
> +++ b/lib/dpif-provider.h
> @@ -300,6 +300,13 @@ struct dpif_class {
>       * */
>      int (*handlers_set)(struct dpif *dpif, uint32_t n_handlers);
>
> +    /* If 'dpif' creates its own I/O polling threads, refreshes poll threads
> +     * configuration.  'n_rxqs' configures the number of rx_queues, which
> +     * are distributed among threads.  'cmask' configures the cpu mask
> +     * for setting the polling threads' cpu affinity. */
> +    int (*poll_threads_set)(struct dpif *dpif, unsigned int n_rxqs,
> +                            const char *cmask);
> +
>      /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
>       * priority value used for setting packet priority. */
>      int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
> diff --git a/lib/dpif.c b/lib/dpif.c
> index bf2c5f9..91ccfd8 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -1300,6 +1300,24 @@ dpif_print_packet(struct dpif *dpif, struct dpif_upcall *upcall)
>      }
>  }
>
> +/* If 'dpif' creates its own I/O polling threads, refreshes poll threads
> + * configuration. */
> +int
> +dpif_poll_threads_set(struct dpif *dpif, unsigned int n_rxqs,
> +                      const char *cmask)
> +{
> +    int error = 0;
> +
> +    if (dpif->dpif_class->poll_threads_set) {
> +        error = dpif->dpif_class->poll_threads_set(dpif, n_rxqs, cmask);
> +        if (error) {
> +            log_operation(dpif, "poll_threads_set", error);
> +        }
> +    }
> +
> +    return error;
> +}
> +
>  /* Polls for an upcall from 'dpif' for an upcall handler.  Since there
>   * there can be multiple poll loops, 'handler_id' is needed as index to
>   * identify the corresponding poll loop.  If successful, stores the upcall
> diff --git a/lib/dpif.h b/lib/dpif.h
> index be1bc4f..c57c8b0 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -769,6 +769,8 @@ void dpif_register_upcall_cb(struct dpif *, upcall_callback *, void *aux);
>
>  int dpif_recv_set(struct dpif *, bool enable);
>  int dpif_handlers_set(struct dpif *, uint32_t n_handlers);
> +int dpif_poll_threads_set(struct dpif *, unsigned int n_rxqs,
> +                          const char *cmask);
>  int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
>                struct ofpbuf *);
>  void dpif_recv_purge(struct dpif *);
> diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
> index 6a59098..6cc9789 100644
> --- a/ofproto/ofproto-dpif.c
> +++ b/ofproto/ofproto-dpif.c
> @@ -532,6 +532,8 @@ type_run(const char *type)
>          udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
>      }
>
> +    dpif_poll_threads_set(backer->dpif, n_dpdk_rxqs, pmd_cpu_mask);
> +
>      if (backer->need_revalidate) {
>          struct ofproto_dpif *ofproto;
>          struct simap_node *node;
> diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
> index de354ec..158f86e 100644
> --- a/ofproto/ofproto-provider.h
> +++ b/ofproto/ofproto-provider.h
> @@ -451,6 +451,12 @@ extern unsigned ofproto_max_idle;
>   * ofproto-dpif implementation. */
>  extern size_t n_handlers, n_revalidators;
>
> +/* Number of rx queues to be created for each dpdk interface. */
> +extern size_t n_dpdk_rxqs;
> +
> +/* Cpu mask for pmd threads. */
> +extern char *pmd_cpu_mask;
> +
>  static inline struct rule *rule_from_cls_rule(const struct cls_rule *);
>
>  void ofproto_rule_expire(struct rule *rule, uint8_t reason)
> diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
> index 7b1d478..818e23f 100644
> --- a/ofproto/ofproto.c
> +++ b/ofproto/ofproto.c
> @@ -304,6 +304,8 @@ unsigned ofproto_flow_limit = OFPROTO_FLOW_LIMIT_DEFAULT;
>  unsigned ofproto_max_idle = OFPROTO_MAX_IDLE_DEFAULT;
>
>  size_t n_handlers, n_revalidators;
> +size_t n_dpdk_rxqs;
> +char *pmd_cpu_mask;
>
>  /* Map from datapath name to struct ofproto, for use by unixctl commands. */
>  static struct hmap all_ofprotos = HMAP_INITIALIZER(&all_ofprotos);
> @@ -731,6 +733,20 @@ ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux, bool flood)
>  }
>
>  void
> +ofproto_set_n_dpdk_rxqs(int n_rxqs)
> +{
> +    n_dpdk_rxqs = MAX(n_rxqs, 0);
> +}
> +
> +void
> +ofproto_set_cpu_mask(const char *cmask)
> +{
> +    free(pmd_cpu_mask);
> +
> +    pmd_cpu_mask = cmask ? xstrdup(cmask) : NULL;
> +}
> +
> +void
>  ofproto_set_threads(int n_handlers_, int n_revalidators_)
>  {
>      int threads = MAX(count_cpu_cores(), 2);
> diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h
> index d60b198..40bb3b7 100644
> --- a/ofproto/ofproto.h
> +++ b/ofproto/ofproto.h
> @@ -299,6 +299,8 @@ int ofproto_set_mcast_snooping(struct ofproto *ofproto,
>  int ofproto_port_set_mcast_snooping(struct ofproto *ofproto, void *aux,
>                                      bool flood);
>  void ofproto_set_threads(int n_handlers, int n_revalidators);
> +void ofproto_set_n_dpdk_rxqs(int n_rxqs);
> +void ofproto_set_cpu_mask(const char *cmask);
>  void ofproto_set_dp_desc(struct ofproto *, const char *dp_desc);
>  int ofproto_set_snoops(struct ofproto *, const struct sset *snoops);
>  int ofproto_set_netflow(struct ofproto *,
> diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
> index 8f99d7d..045dd77 100644
> --- a/vswitchd/bridge.c
> +++ b/vswitchd/bridge.c
> @@ -537,6 +537,9 @@ bridge_reconfigure(const struct ovsrec_open_vswitch *ovs_cfg)
>                                          OFPROTO_FLOW_LIMIT_DEFAULT));
>      ofproto_set_max_idle(smap_get_int(&ovs_cfg->other_config, "max-idle",
>                                        OFPROTO_MAX_IDLE_DEFAULT));
> +    ofproto_set_n_dpdk_rxqs(smap_get_int(&ovs_cfg->other_config,
> +                                         "n-dpdk-rxqs", 0));
> +    ofproto_set_cpu_mask(smap_get(&ovs_cfg->other_config, "pmd-cpu-mask"));
>
>      ofproto_set_threads(
>          smap_get_int(&ovs_cfg->other_config, "n-handler-threads", 0),
> diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
> index d07d54f..b00f74d 100644
> --- a/vswitchd/vswitch.xml
> +++ b/vswitchd/vswitch.xml
> @@ -152,6 +152,33 @@
>          </p>
>        </column>
>
> +      <column name="other_config" key="n-dpdk-rxqs"
> +              type='{"type": "integer", "minInteger": 1}'>
> +        <p>
> +          Specifies the number of rx queues to be created for each dpdk
> +          interface.  If not specified or specified to 0, one rx queue will
> +          be created for each dpdk interface by default.
> +        </p>
> +      </column>
> +
> +      <column name="other_config" key="pmd-cpu-mask">
> +        <p>
> +          Specifies CPU mask for setting the cpu affinity of PMD (Poll
> +          Mode Driver) threads.  Value should be in the form of hex string,
> +          similar to the dpdk EAL '-c COREMASK' option input or the 'taskset'
> +          mask input.
> +        </p>
> +        <p>
> +          The lowest order bit corresponds to the first CPU core.  A set bit
> +          means the corresponding core is available.  If the input does not
> +          cover all cores, those uncovered cores are considered not set.
> +        </p>
> +        <p>
> +          If not specified, one pmd thread will be created for each numa node
> +          and pinned to any available core on the numa node by default.
> +        </p>
> +      </column>
> +
>        <column name="other_config" key="n-handler-threads"
>                type='{"type": "integer", "minInteger": 1}'>
>          <p>

Otherwise looks good.

Acked-by: Pravin B Shelar <pshelar at nicira.com>



More information about the dev mailing list