[ovs-dev] [PATCH v6 2/8] dpif-netdev: Register packet processing cores to KA framework.
Fischetti, Antonio
antonio.fischetti at intel.com
Fri Dec 15 14:24:05 UTC 2017
LGTM,
Tested-by: Antonio Fischetti <antonio.fischetti at intel.com>
Acked-by: Antonio Fischetti <antonio.fischetti at intel.com>
> -----Original Message-----
> From: ovs-dev-bounces at openvswitch.org [mailto:ovs-dev-
> bounces at openvswitch.org] On Behalf Of Bhanuprakash Bodireddy
> Sent: Friday, December 8, 2017 12:04 PM
> To: dev at openvswitch.org
> Subject: [ovs-dev] [PATCH v6 2/8] dpif-netdev: Register packet
> processing cores to KA framework.
>
> This commit registers the packet processing PMD threads to keepalive
> framework. Only PMDs that have rxqs mapped will be registered and
> actively monitored by KA framework.
>
> This commit spawns a keepalive thread that will dispatch heartbeats to
> PMD threads. The pmd threads respond to heartbeats by marking themselves
> alive. As long as PMD responds to heartbeats it is considered 'healthy'.
>
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
> ---
> lib/dpif-netdev.c | 79 ++++++++++++++++++++++
> lib/keepalive.c | 194
> ++++++++++++++++++++++++++++++++++++++++++++++++++++--
> lib/keepalive.h | 20 ++++++
> lib/ovs-thread.c | 6 ++
> lib/ovs-thread.h | 1 +
> lib/util.c | 22 +++++++
> lib/util.h | 1 +
> 7 files changed, 318 insertions(+), 5 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 07f6113..c978a76 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -49,6 +49,7 @@
> #include "flow.h"
> #include "hmapx.h"
> #include "id-pool.h"
> +#include "keepalive.h"
> #include "latch.h"
> #include "netdev.h"
> #include "netdev-vport.h"
> @@ -592,6 +593,7 @@ struct dp_netdev_pmd_thread {
> atomic_bool reload; /* Do we need to reload ports? */
> pthread_t thread;
> unsigned core_id; /* CPU core id of this pmd thread.
> */
> + pid_t tid; /* PMD thread tid. */
> int numa_id; /* numa node id of this pmd thread.
> */
> bool isolated;
>
> @@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp,
> *n = k;
> }
>
> +static void *
> +ovs_keepalive(void *f_ OVS_UNUSED)
> +{
> + pthread_detach(pthread_self());
> +
> + for (;;) {
> + uint64_t interval;
> +
> + interval = get_ka_interval();
> + xnanosleep(interval * 1000 * 1000);
> + }
> +
> + return NULL;
> +}
> +
> +/* Kickstart 'ovs_keepalive' thread. */
> +static void
> +ka_thread_start(struct dp_netdev *dp)
> +{
> + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
> +
> + if (ovsthread_once_start(&once)) {
> + ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
> +
> + ovsthread_once_done(&once);
> + }
> +}
> +
> +/* Register the datapath threads. This gets invoked on every datapath
> + * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
> + * registered to KA framework.
> + */
> +static void
> +ka_register_datapath_threads(struct dp_netdev *dp)
> +{
> + if (!ka_is_enabled()) {
> + return;
> + }
> +
> + ka_thread_start(dp);
> +
> + ka_reload_datapath_threads_begin();
> +
> + struct dp_netdev_pmd_thread *pmd;
> + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> + /* Register only PMD threads. */
> + if (pmd->core_id != NON_PMD_CORE_ID) {
> + /* Skip PMD thread with no rxqs mapping. */
> + if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
> + /* Rxq mapping changes due to datapath reconfiguration.
> + * If no rxqs mapped to PMD now due to reconfiguration,
> + * unregister the pmd thread. */
> + ka_unregister_thread(pmd->tid);
> + continue;
> + }
> +
> + ka_register_thread(pmd->tid);
> + VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA
> framework",
> + pmd->tid, pmd->core_id);
> + }
> + }
> + ka_cache_registered_threads();
> +
> + ka_reload_datapath_threads_end();
> +}
> +
> static void
> dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
> const char *argv[], void *aux OVS_UNUSED)
> @@ -3819,6 +3887,9 @@ reconfigure_datapath(struct dp_netdev *dp)
>
> /* Reload affected pmd threads. */
> reload_affected_pmds(dp);
> +
> + /* Register datapath threads to KA monitoring. */
> + ka_register_datapath_threads(dp);
> }
>
> /* Returns true if one of the netdevs in 'dp' requires a
> reconfiguration */
> @@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_)
>
> /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
> ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
> + /* Stores tid in to 'pmd->tid'. */
> + ovsthread_set_tid(&pmd->tid);
> ovs_numa_thread_setaffinity_core(pmd->core_id);
> dpdk_set_lcore_id(pmd->core_id);
> poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
> @@ -4056,6 +4129,9 @@ reload:
> :
> PMD_CYCLES_IDLE);
> }
>
> + /* Mark PMD thread alive. */
> + ka_mark_pmd_thread_alive(pmd->tid);
> +
> if (lc++ > 1024) {
> bool reload;
>
> @@ -4089,6 +4165,9 @@ reload:
> }
>
> emc_cache_uninit(&pmd->flow_cache);
> +
> + ka_unregister_thread(pmd->tid);
> +
> free(poll_list);
> pmd_free_cached_ports(pmd);
> return NULL;
> diff --git a/lib/keepalive.c b/lib/keepalive.c
> index ca8dccb..b04877f 100644
> --- a/lib/keepalive.c
> +++ b/lib/keepalive.c
> @@ -19,6 +19,7 @@
> #include "keepalive.h"
> #include "lib/vswitch-idl.h"
> #include "openvswitch/vlog.h"
> +#include "process.h"
> #include "seq.h"
> #include "timeval.h"
>
> @@ -28,11 +29,19 @@ static bool keepalive_enable = false; /*
> Keepalive disabled by default. */
> static uint32_t keepalive_timer_interval; /* keepalive timer interval.
> */
> static struct keepalive_info ka_info;
>
> -/* Returns true if keepalive is enabled, false otherwise. */
> -bool
> -ka_is_enabled(void)
> +/* Returns true if state update is allowed, false otherwise. */
> +static bool
> +ka_can_update_state(void)
> {
> - return keepalive_enable;
> + bool reload_inprogress;
> + bool ka_enable;
> +
> + atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
> + ka_enable = ka_is_enabled();
> +
> + /* Return true if KA is enabled and 'cached_process_list' map
> reload
> + * is completed. */
> + return ka_enable && !reload_inprogress;
> }
>
> /* Finds the thread by 'tid' in 'process_list' map and update
> @@ -49,7 +58,7 @@ ka_set_thread_state_ts(pid_t tid, enum keepalive_state
> state,
> ovs_mutex_lock(&ka_info.proclist_mutex);
> HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> &ka_info.process_list) {
> - if (pinfo->tid == tid) {
> + if (OVS_LIKELY(pinfo->tid == tid)) {
> pinfo->state = state;
> pinfo->last_seen_time = last_alive;
> }
> @@ -104,6 +113,177 @@ ka_register_relay_cb(ka_relay_cb cb, void *aux)
> ka_info.relay_cb_data = aux;
> }
>
> +/* Returns true if keepalive is enabled, false otherwise. */
> +bool
> +ka_is_enabled(void)
> +{
> + return keepalive_enable;
> +}
> +
> +/* Return the Keepalive timer interval. */
> +uint32_t
> +get_ka_interval(void)
> +{
> + return keepalive_timer_interval;
> +}
> +
> +/* 'cached_process_list' map reload in progress.
> + *
> + * Should be called before the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * doesn't heartbeat while the reload is in progress. */
> +void
> +ka_reload_datapath_threads_begin(void)
> +{
> + atomic_store_relaxed(&ka_info.reload_threads, true);
> +}
> +
> +/* 'cached_process_list' map reload finished.
> + *
> + * Should be called after the 'ka_info.cached_process_list'
> + * is populated from 'ka_info.process_list'. This way the pmd
> + * can restart heartbeat when the reload is finished. */
> +void
> +ka_reload_datapath_threads_end(void)
> +{
> + atomic_store_relaxed(&ka_info.reload_threads, false);
> +}
> +
> +/* Register thread to KA framework. */
> +void
> +ka_register_thread(pid_t tid)
> +{
> + if (ka_is_enabled()) {
> + struct ka_process_info *ka_pinfo;
> + int core_id = -1;
> + char proc_name[18] = "UNDEFINED";
> +
> + struct process_info pinfo;
> + int success = get_process_info(tid, &pinfo);
> + if (success) {
> + core_id = pinfo.core_id;
> + ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
> + }
> +
> + uint32_t hash = hash_int(tid, 0);
> + ovs_mutex_lock(&ka_info.proclist_mutex);
> + HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
> + hash, &ka_info.process_list) {
> + /* Thread is already registered. */
> + if (ka_pinfo->tid == tid) {
> + goto out;
> + }
> + }
> +
> + ka_pinfo = xmalloc(sizeof *ka_pinfo);
> + ka_pinfo->tid = tid;
> + ka_pinfo->core_id = core_id;
> + ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
> +
> + hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
> +
> + ka_pinfo->state = KA_STATE_ALIVE;
> + ka_pinfo->last_seen_time = time_wall_msec();
> + ka_info.thread_cnt++; /* Increment count of registered
> threads. */
> +out:
> + ovs_mutex_unlock(&ka_info.proclist_mutex);
> + }
> +}
> +
> +/* Unregister thread from KA framework. */
> +void
> +ka_unregister_thread(pid_t tid)
> +{
> + if (ka_is_enabled()) {
> + struct ka_process_info *ka_pinfo;
> +
> + ovs_mutex_lock(&ka_info.proclist_mutex);
> + HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
> + &ka_info.process_list) {
> + /* If thread is registered, remove it from the list */
> + if (ka_pinfo->tid == tid) {
> + hmap_remove(&ka_info.process_list, &ka_pinfo->node);
> + free(ka_pinfo);
> +
> + ka_pinfo->state = KA_STATE_UNUSED;
> + ka_info.thread_cnt--; /* Decrement thread count. */
> + break;
> + }
> + }
> + ovs_mutex_unlock(&ka_info.proclist_mutex);
> + }
> +}
> +
> +/* Free the 'ka_info.cached_process_list' list. */
> +void
> +ka_free_cached_threads(void)
> +{
> + struct ka_process_info *pinfo_cached;
> + /* Free threads in the cached list. */
> + HMAP_FOR_EACH_POP (pinfo_cached, node,
> &ka_info.cached_process_list) {
> + free(pinfo_cached);
> + }
> + hmap_shrink(&ka_info.cached_process_list);
> +}
> +
> +/* Cache the list of registered threads from 'ka_info.process_list'
> + * map into 'ka_info.cached_process_list.
> + *
> + * 'cached_process_list' map is an exact copy of 'process_list' that
> will
> + * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
> + * mechanism. This cached copy is created so that the heartbeats can
> be
> + * performed with out acquiring locks.
> + *
> + * On datapath reconfiguration, both the 'process_list' and the cached
> copy
> + * 'cached_process_list' is updated after setting 'reload_threads' to
> 'true'
> + * so that pmd doesn't heartbeat while the maps are updated.
> + *
> + */
> +void
> +ka_cache_registered_threads(void)
> +{
> + struct ka_process_info *pinfo, *next, *pinfo_cached;
> +
> + ka_free_cached_threads();
> +
> + HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
> + pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
> + hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
> + hash_int(pinfo->tid,0));
> + }
> +}
> +
> +/* Mark packet processing thread alive. */
> +void
> +ka_mark_pmd_thread_alive(int tid)
> +{
> + if (ka_can_update_state()) {
> + struct ka_process_info *pinfo;
> + HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> + &ka_info.cached_process_list) {
> + if (OVS_LIKELY(pinfo->tid == tid)) {
> + pinfo->state = KA_STATE_ALIVE;
> + }
> + }
> + }
> +}
> +
> +/* Mark packet processing thread as sleeping. */
> +void
> +ka_mark_pmd_thread_sleep(int tid)
> +{
> + if (ka_can_update_state()) {
> + struct ka_process_info *pinfo;
> +
> + HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
> + &ka_info.cached_process_list) {
> + if (pinfo->tid == tid) {
> + pinfo->state = KA_STATE_SLEEP;
> + }
> + }
> + }
> +}
> +
> void
> ka_init(const struct smap *ovs_other_config)
> {
> @@ -120,6 +300,7 @@ ka_init(const struct smap *ovs_other_config)
> ka_register_relay_cb(ka_update_thread_state, NULL);
> ovs_mutex_init(&ka_info.proclist_mutex);
> hmap_init(&ka_info.process_list);
> + hmap_init(&ka_info.cached_process_list);
>
> ka_info.init_time = time_wall_msec();
>
> @@ -143,5 +324,8 @@ ka_destroy(void)
> ovs_mutex_unlock(&ka_info.proclist_mutex);
>
> hmap_destroy(&ka_info.process_list);
> +
> + ka_free_cached_threads();
> + hmap_destroy(&ka_info.cached_process_list);
> ovs_mutex_destroy(&ka_info.proclist_mutex);
> }
> diff --git a/lib/keepalive.h b/lib/keepalive.h
> index a738daa..7674ea3 100644
> --- a/lib/keepalive.h
> +++ b/lib/keepalive.h
> @@ -48,6 +48,9 @@ enum keepalive_state {
> };
>
> struct ka_process_info {
> + /* Process name. */
> + char name[16];
> +
> /* Thread id of the process, retrieved using ovs_gettid(). */
> pid_t tid;
>
> @@ -71,15 +74,32 @@ struct keepalive_info {
> /* List of process/threads monitored by KA framework. */
> struct hmap process_list OVS_GUARDED;
>
> + /* cached copy of 'process_list' list. */
> + struct hmap cached_process_list;
> +
> + /* count of threads registered to KA framework. */
> + uint32_t thread_cnt;
> +
> /* Keepalive initialization time. */
> uint64_t init_time;
>
> /* keepalive relay handler. */
> ka_relay_cb relay_cb;
> void *relay_cb_data;
> +
> + atomic_bool reload_threads; /* Reload threads in to cached list.
> */
> };
>
> bool ka_is_enabled(void);
> +uint32_t get_ka_interval(void);
> +void ka_reload_datapath_threads_begin(void);
> +void ka_reload_datapath_threads_end(void);
> +void ka_register_thread(pid_t);
> +void ka_unregister_thread(pid_t);
> +void ka_free_cached_threads(void);
> +void ka_cache_registered_threads(void);
> +void ka_mark_pmd_thread_alive(int);
> +void ka_mark_pmd_thread_sleep(int);
> void ka_init(const struct smap *);
> void ka_destroy(void);
>
> diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
> index f8bc06d..ae8e450 100644
> --- a/lib/ovs-thread.c
> +++ b/lib/ovs-thread.c
> @@ -597,6 +597,12 @@ thread_is_pmd(void)
> return !strncmp(name, "pmd", 3);
> }
>
> +void
> +ovsthread_set_tid(pid_t *tid)
> +{
> + *tid = ovs_get_tid();
> +}
> +
>
> /* ovsthread_key. */
>
> diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
> index 55e51a4..cfb4b04 100644
> --- a/lib/ovs-thread.h
> +++ b/lib/ovs-thread.h
> @@ -524,5 +524,6 @@ bool may_fork(void);
>
> int count_cpu_cores(void);
> bool thread_is_pmd(void);
> +void ovsthread_set_tid(pid_t *);
>
> #endif /* ovs-thread.h */
> diff --git a/lib/util.c b/lib/util.c
> index 2965656..927929b 100644
> --- a/lib/util.c
> +++ b/lib/util.c
> @@ -26,6 +26,12 @@
> #include <stdlib.h>
> #include <string.h>
> #include <sys/stat.h>
> +#ifdef __linux__
> +#include <sys/syscall.h>
> +#endif
> +#ifdef __FreeBSD__
> +#include <sys/thr.h>
> +#endif
> #include <unistd.h>
> #include "bitmap.h"
> #include "byte-order.h"
> @@ -575,6 +581,22 @@ get_page_size(void)
> return cached;
> }
>
> +/* Returns the tid of the calling thread if supported, -EINVAL
> otherwise. */
> +pid_t
> +ovs_get_tid(void)
> +{
> +#ifdef __linux__
> + return syscall(SYS_gettid);
> +#elif defined(__FreeBSD__) || defined(__NetBSD__)
> + long tid;
> + thr_self(&tid);
> + return (pid_t)tid;
> +#endif
> +
> + VLOG_ERR("ovs_get_tid(): unsupported.");
> + return -EINVAL;
> +}
> +
> /* Returns the time at which the system booted, as the number of
> milliseconds
> * since the epoch, or 0 if the time of boot cannot be determined. */
> long long int
> diff --git a/lib/util.h b/lib/util.h
> index b01f421..259346d 100644
> --- a/lib/util.h
> +++ b/lib/util.h
> @@ -156,6 +156,7 @@ void free_cacheline(void *);
>
> void ovs_strlcpy(char *dst, const char *src, size_t size);
> void ovs_strzcpy(char *dst, const char *src, size_t size);
> +pid_t ovs_get_tid(void);
>
> /* The C standards say that neither the 'dst' nor 'src' argument to
> * memcpy() may be null, even if 'n' is zero. This wrapper tolerates
> --
> 2.4.11
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
More information about the dev
mailing list