[ovs-dev] [PATCH RFC] dpif-netdev: Rework of rx queue management.

Ilya Maximets i.maximets at samsung.com
Thu Jan 14 14:54:19 UTC 2016


Superseded by version from patch-set:
"dpif-netdev: Rework of queue management."
http://openvswitch.org/pipermail/dev/2016-January/064478.html

On 31.12.2015 14:29, Ilya Maximets wrote:
> Current rx queue management model is buggy and
> will not work properly without additional barriers
> and other syncronization between PMD threads and
> main thread.
> 
> Introducing the new model, where distribution
> of queues is made by main thread with minimal
> synchronizations and without data races between
> pmd threads. Also, this model should work faster,
> because only needed threads will be interrupted
> for reconfiguraition and total computational
> complexity of reconfiguration is lower.
> 
> Signed-off-by: Ilya Maximets <i.maximets at samsung.com>
> ---
> 
> This patch supersedes "[PATCH 0/2] Per numa node barriers for pmd threads".
> I will be on vacation untill Jan 11, so *other my patches will be
> rebased on top of this* after Jan 11.
> 
>  lib/dpif-netdev.c | 272 ++++++++++++++++++++++++++++++------------------------
>  1 file changed, 151 insertions(+), 121 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index cd72e62..fdd6f2a 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -372,6 +372,13 @@ struct dp_netdev_pmd_cycles {
>      atomic_ullong n[PMD_N_CYCLES];
>  };
>  
> +/* Contained by struct dp_netdev_pmd_thread's 'poll_list' member. */
> +struct rxq_poll {
> +    struct dp_netdev_port *port;
> +    struct netdev_rxq *rx;
> +    struct ovs_list node;
> +};
> +
>  /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
>   * the performance overhead of interrupt processing.  Therefore netdev can
>   * not implement rx-wait for these devices.  dpif-netdev needs to poll
> @@ -429,6 +436,9 @@ struct dp_netdev_pmd_thread {
>      int numa_id;                    /* numa node id of this pmd thread. */
>      int tx_qid;                     /* Queue id used by this pmd thread to
>                                       * send packets on all netdevs */
> +    /* List of rx queues to poll. */
> +    struct ovs_list poll_list;
> +    int poll_cnt;
>  
>      /* Only a pmd thread can write on its own 'cycles' and 'stats'.
>       * The main thread keeps 'stats_zero' and 'cycles_zero' as base
> @@ -469,7 +479,7 @@ static void dp_netdev_input(struct dp_netdev_pmd_thread *,
>                              struct dp_packet **, int cnt);
>  
>  static void dp_netdev_disable_upcall(struct dp_netdev *);
> -void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
> +void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd);
>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>                                      struct dp_netdev *dp, int index,
>                                      unsigned core_id, int numa_id);
> @@ -482,6 +492,11 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
>  static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
>  static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
>  static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
> +static void
> +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
> +                         struct dp_netdev_port *port, struct netdev_rxq *rx);
> +static struct dp_netdev_pmd_thread *
> +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id);
>  static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
>  static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
>  static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
> @@ -1011,7 +1026,7 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
>  }
>  
>  static void
> -dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
> +dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd)
>  {
>      int old_seq;
>  
> @@ -1025,16 +1040,28 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
>      ovs_mutex_unlock(&pmd->cond_mutex);
>  }
>  
> -/* Causes all pmd threads to reload its tx/rx devices.
> - * Must be called after adding/removing ports. */
>  static void
> -dp_netdev_reload_pmds(struct dp_netdev *dp)
> +dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd)
>  {
> -    struct dp_netdev_pmd_thread *pmd;
> +    int old_seq;
>  
> -    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> -        dp_netdev_reload_pmd__(pmd);
> +    if (pmd->core_id == NON_PMD_CORE_ID) {
> +        return;
>      }
> +
> +    ovs_mutex_lock(&pmd->cond_mutex);
> +    atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
> +}
> +
> +static void
> +dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd)
> +{
> +    if (pmd->core_id == NON_PMD_CORE_ID) {
> +        return;
> +    }
> +
> +    ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
> +    ovs_mutex_unlock(&pmd->cond_mutex);
>  }
>  
>  static uint32_t
> @@ -1128,8 +1155,22 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
>      cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
>  
>      if (netdev_is_pmd(netdev)) {
> -        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
> -        dp_netdev_reload_pmds(dp);
> +        int numa_id = netdev_get_numa_id(netdev);
> +        struct dp_netdev_pmd_thread *pmd;
> +
> +        for (i = 0; i < netdev_n_rxq(netdev); i++) {
> +            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
> +            if (!pmd) {
> +                /* There is no pmd threads on this numa node. */
> +                dp_netdev_set_pmds_on_numa(dp, numa_id);
> +                /* Assigning of rx queues done. */
> +                break;
> +            }
> +
> +            dp_netdev_pause_pmd__(pmd);
> +            dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
> +            dp_netdev_resume_pmd__(pmd);
> +        }
>      }
>      seq_change(dp->port_seq);
>  
> @@ -1226,16 +1267,6 @@ port_ref(struct dp_netdev_port *port)
>      }
>  }
>  
> -static bool
> -port_try_ref(struct dp_netdev_port *port)
> -{
> -    if (port) {
> -        return ovs_refcount_try_ref_rcu(&port->ref_cnt);
> -    }
> -
> -    return false;
> -}
> -
>  static void
>  port_unref(struct dp_netdev_port *port)
>  {
> @@ -1314,11 +1345,36 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
>          int numa_id = netdev_get_numa_id(port->netdev);
>  
>          /* If there is no netdev on the numa node, deletes the pmd threads
> -         * for that numa.  Else, just reloads the queues.  */
> +         * for that numa.  Else, deletes the queues from polling lists. */
>          if (!has_pmd_port_for_numa(dp, numa_id)) {
>              dp_netdev_del_pmds_on_numa(dp, numa_id);
>          }
> -        dp_netdev_reload_pmds(dp);
> +        else {
> +            bool found;
> +            struct dp_netdev_pmd_thread *pmd;
> +            struct rxq_poll *poll, *next;
> +
> +            CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +                if (pmd->numa_id == numa_id) {
> +                    found = false;
> +                    dp_netdev_pause_pmd__(pmd);
> +                    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
> +                        if (poll->port == port) {
> +                            port_unref(poll->port);
> +                            list_remove(&poll->node);
> +                            free(poll);
> +                            found = true;
> +                        }
> +                    }
> +                    if (found) {
> +                        /* Clean up emc cache if poll_list modified. */
> +                        emc_cache_uninit(&pmd->flow_cache);
> +                        emc_cache_init(&pmd->flow_cache);
> +                    }
> +                    dp_netdev_resume_pmd__(pmd);
> +                }
> +            }
> +        }
>      }
>  
>      port_unref(port);
> @@ -2583,92 +2639,27 @@ dpif_netdev_wait(struct dpif *dpif)
>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>  }
>  
> -struct rxq_poll {
> -    struct dp_netdev_port *port;
> -    struct netdev_rxq *rx;
> -};
> -
> -static int
> -pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> -                struct rxq_poll **ppoll_list, int poll_cnt)
> -{
> -    struct rxq_poll *poll_list = *ppoll_list;
> -    struct dp_netdev_port *port;
> -    int n_pmds_on_numa, index, i;
> -
> -    /* Simple scheduler for netdev rx polling. */
> -    for (i = 0; i < poll_cnt; i++) {
> -        port_unref(poll_list[i].port);
> -    }
> -
> -    poll_cnt = 0;
> -    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
> -    index = 0;
> -
> -    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
> -        /* Calls port_try_ref() to prevent the main thread
> -         * from deleting the port. */
> -        if (port_try_ref(port)) {
> -            if (netdev_is_pmd(port->netdev)
> -                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
> -                int i;
> -
> -                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> -                    if ((index % n_pmds_on_numa) == pmd->index) {
> -                        poll_list = xrealloc(poll_list,
> -                                        sizeof *poll_list * (poll_cnt + 1));
> -
> -                        port_ref(port);
> -                        poll_list[poll_cnt].port = port;
> -                        poll_list[poll_cnt].rx = port->rxq[i];
> -                        poll_cnt++;
> -                    }
> -                    index++;
> -                }
> -            }
> -            /* Unrefs the port_try_ref(). */
> -            port_unref(port);
> -        }
> -    }
> -
> -    *ppoll_list = poll_list;
> -    return poll_cnt;
> -}
> -
>  static void *
>  pmd_thread_main(void *f_)
>  {
>      struct dp_netdev_pmd_thread *pmd = f_;
> +    struct rxq_poll *poll;
>      unsigned int lc = 0;
> -    struct rxq_poll *poll_list;
>      unsigned int port_seq = PMD_INITIAL_SEQ;
> -    int poll_cnt;
> -    int i;
> -
> -    poll_cnt = 0;
> -    poll_list = NULL;
>  
>      /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>      ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
>      pmd_thread_setaffinity_cpu(pmd->core_id);
>  reload:
> -    emc_cache_init(&pmd->flow_cache);
> -    poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
> -
>      /* List port/core affinity */
> -    for (i = 0; i < poll_cnt; i++) {
> -       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id, netdev_get_name(poll_list[i].port->netdev));
> +    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
> +       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
> +                 netdev_get_name(poll->port->netdev));
>      }
>  
> -    /* Signal here to make sure the pmd finishes
> -     * reloading the updated configuration. */
> -    dp_netdev_pmd_reload_done(pmd);
> -
>      for (;;) {
> -        int i;
> -
> -        for (i = 0; i < poll_cnt; i++) {
> -            dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
> +        LIST_FOR_EACH (poll, node, &pmd->poll_list) {
> +            dp_netdev_process_rxq_port(pmd, poll->port, poll->rx);
>          }
>  
>          if (lc++ > 1024) {
> @@ -2688,19 +2679,13 @@ reload:
>          }
>      }
>  
> -    emc_cache_uninit(&pmd->flow_cache);
> +    /* Synchronize with breaker thread. */
> +    dp_netdev_pmd_break_done(pmd);
>  
> -    if (!latch_is_set(&pmd->exit_latch)){
> +    if (!latch_is_set(&pmd->exit_latch)) {
>          goto reload;
>      }
>  
> -    for (i = 0; i < poll_cnt; i++) {
> -         port_unref(poll_list[i].port);
> -    }
> -
> -    dp_netdev_pmd_reload_done(pmd);
> -
> -    free(poll_list);
>      return NULL;
>  }
>  
> @@ -2735,7 +2720,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
>  }
>  
>  void
> -dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
> +dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd)
>  {
>      ovs_mutex_lock(&pmd->cond_mutex);
>      xpthread_cond_signal(&pmd->cond);
> @@ -2827,6 +2812,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd->core_id = core_id;
>      pmd->tx_qid = core_id_to_qid(core_id);
>      pmd->numa_id = numa_id;
> +    pmd->poll_cnt = 0;
>  
>      ovs_refcount_init(&pmd->ref_cnt);
>      latch_init(&pmd->exit_latch);
> @@ -2836,11 +2822,9 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      ovs_mutex_init(&pmd->flow_mutex);
>      dpcls_init(&pmd->cls);
>      cmap_init(&pmd->flow_table);
> -    /* init the 'flow_cache' since there is no
> -     * actual thread created for NON_PMD_CORE_ID. */
> -    if (core_id == NON_PMD_CORE_ID) {
> -        emc_cache_init(&pmd->flow_cache);
> -    }
> +    list_init(&pmd->poll_list);
> +    emc_cache_init(&pmd->flow_cache);
> +
>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
>                  hash_int(core_id, 0));
>  }
> @@ -2863,16 +2847,23 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
>  static void
>  dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
>  {
> -    /* Uninit the 'flow_cache' since there is
> -     * no actual thread uninit it for NON_PMD_CORE_ID. */
> -    if (pmd->core_id == NON_PMD_CORE_ID) {
> -        emc_cache_uninit(&pmd->flow_cache);
> -    } else {
> +    struct rxq_poll *poll;
> +
> +    emc_cache_uninit(&pmd->flow_cache);
> +
> +    if (pmd->core_id != NON_PMD_CORE_ID) {
>          latch_set(&pmd->exit_latch);
> -        dp_netdev_reload_pmd__(pmd);
> +        dp_netdev_break_pmd__(pmd);
>          ovs_numa_unpin_core(pmd->core_id);
>          xpthread_join(pmd->thread, NULL);
>      }
> +
> +    /* Unref all ports and free poll_list. */
> +    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
> +        port_unref(poll->port);
> +        free(poll);
> +    }
> +
>      /* Purges the 'pmd''s flows after stopping the thread, but before
>       * destroying the flows, so that the flow stats can be collected. */
>      if (dp->dp_purge_cb) {
> @@ -2906,6 +2897,37 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>      }
>  }
>  
> +static struct dp_netdev_pmd_thread *
> +dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
> +{
> +    int min_cnt = -1;
> +    struct dp_netdev_pmd_thread *pmd, *res = NULL;
> +
> +    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> +        if (pmd->numa_id == numa_id
> +            && (min_cnt > pmd->poll_cnt || res == NULL)) {
> +            min_cnt = pmd->poll_cnt;
> +            res = pmd;
> +        }
> +    }
> +
> +    return res;
> +}
> +
> +static void
> +dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
> +                         struct dp_netdev_port *port, struct netdev_rxq *rx)
> +{
> +    struct rxq_poll *poll = xmalloc(sizeof *poll);
> +
> +    port_ref(port);
> +    poll->port = port;
> +    poll->rx = rx;
> +
> +    list_push_back(&pmd->poll_list, &poll->node);
> +    pmd->poll_cnt++;
> +}
> +
>  /* Checks the numa node id of 'netdev' and starts pmd threads for
>   * the numa node. */
>  static void
> @@ -2925,8 +2947,9 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>       * in which 'netdev' is on, do nothing.  Else, creates the
>       * pmd threads for the numa node. */
>      if (!n_pmds) {
> -        int can_have, n_unpinned, i;
> +        int can_have, n_unpinned, i, index = 0;
>          struct dp_netdev_pmd_thread **pmds;
> +        struct dp_netdev_port *port;
>  
>          n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
>          if (!n_unpinned) {
> @@ -2944,13 +2967,20 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
>              pmds[i] = xzalloc(sizeof **pmds);
>              dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
>          }
> -        /* The pmd thread code needs to see all the others configured pmd
> -         * threads on the same numa node.  That's why we call
> -         * 'dp_netdev_configure_pmd()' on all the threads and then we actually
> -         * start them. */
> +
> +        /* Distributes rx queues of this numa node between new pmd threads. */
> +        CMAP_FOR_EACH (port, node, &dp->ports) {
> +            if (netdev_is_pmd(port->netdev)
> +                && netdev_get_numa_id(port->netdev) == numa_id) {
> +                for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
> +                    dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]);
> +                    index = (index + 1) % can_have;
> +                }
> +            }
> +        }
> +
> +        /* Actual start of pmd threads. */
>          for (i = 0; i < can_have; i++) {
> -            /* Each thread will distribute all devices rx-queues among
> -             * themselves. */
>              pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]);
>          }
>          free(pmds);
> 



More information about the dev mailing list