[ovs-dev] [PATCH v5 04/10] dpif-netdev: Register packet processing cores to KA framework.

Bhanuprakash Bodireddy bhanuprakash.bodireddy at intel.com
Fri Sep 15 16:40:24 UTC 2017


This commit registers the packet processing PMD threads to keepalive
framework. Only PMDs that have rxqs mapped will be registered and
actively monitored by KA framework.

This commit spawns a keepalive thread that will dispatch heartbeats to
PMD threads. The pmd threads respond to heartbeats by marking themselves
alive. As long as PMD responds to heartbeats it is considered 'healthy'.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
---
 lib/dpif-netdev.c |  79 ++++++++++++++++++++++
 lib/keepalive.c   | 191 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 lib/keepalive.h   |  20 ++++++
 lib/ovs-thread.c  |   6 ++
 lib/ovs-thread.h  |   1 +
 lib/util.c        |  22 +++++++
 lib/util.h        |   1 +
 7 files changed, 316 insertions(+), 4 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index ca74df8..da419d5 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -49,6 +49,7 @@
 #include "flow.h"
 #include "hmapx.h"
 #include "id-pool.h"
+#include "keepalive.h"
 #include "latch.h"
 #include "netdev.h"
 #include "netdev-vport.h"
@@ -591,6 +592,7 @@ struct dp_netdev_pmd_thread {
     uint64_t last_reload_seq;
     atomic_bool reload;             /* Do we need to reload ports? */
     pthread_t thread;
+    pid_t tid;                      /* Thread id of this pmd thread. */
     unsigned core_id;               /* CPU core id of this pmd thread. */
     int numa_id;                    /* numa node id of this pmd thread. */
     bool isolated;
@@ -1018,6 +1020,72 @@ sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void *
+ovs_keepalive(void *f_ OVS_UNUSED)
+{
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        int interval;
+
+        interval = get_ka_interval();
+        xnanosleep(interval);
+    }
+
+    return NULL;
+}
+
+/* Kickstart 'ovs_keepalive' thread. */
+static void
+ka_thread_start(struct dp_netdev *dp)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+    if (ovsthread_once_start(&once)) {
+        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
+
+        ovsthread_once_done(&once);
+    }
+}
+
+/* Register the datapath threads. This gets invoked on every datapath
+ * reconfiguration. The pmd thread[s] having rxq[s] mapped will be
+ * registered to KA framework.
+ */
+static void
+ka_register_datapath_threads(struct dp_netdev *dp)
+{
+    if (!ka_is_enabled()) {
+        return;
+    }
+
+    ka_thread_start(dp);
+
+    ka_reload_datapath_threads_begin();
+
+    struct dp_netdev_pmd_thread *pmd;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /*  Register only PMD threads. */
+        if (pmd->core_id != NON_PMD_CORE_ID) {
+            /* Skip PMD thread with no rxqs mapping. */
+            if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
+                /* Rxq mapping changes due to datapath reconfiguration.
+                 * If no rxqs mapped to PMD now due to reconfiguration,
+                 * unregister the pmd thread. */
+                ka_unregister_thread(pmd->tid);
+                continue;
+            }
+
+            ka_register_thread(pmd->tid);
+            VLOG_INFO("Registered PMD thread [%d] on Core[%d] to KA framework",
+                      pmd->tid, pmd->core_id);
+        }
+    }
+    ka_cache_registered_threads();
+
+    ka_reload_datapath_threads_end();
+}
+
 static void
 dpif_netdev_pmd_rebalance(struct unixctl_conn *conn, int argc,
                           const char *argv[], void *aux OVS_UNUSED)
@@ -3821,6 +3889,9 @@ reconfigure_datapath(struct dp_netdev *dp)
 
     /* Reload affected pmd threads. */
     reload_affected_pmds(dp);
+
+    /* Register datapath threads to KA monitoring. */
+    ka_register_datapath_threads(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -4023,6 +4094,8 @@ pmd_thread_main(void *f_)
 
     /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
     ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
+    /* Stores tid in to 'pmd->tid'. */
+    ovsthread_settid(&pmd->tid);
     ovs_numa_thread_setaffinity_core(pmd->core_id);
     dpdk_set_lcore_id(pmd->core_id);
     poll_cnt = pmd_load_queues_and_ports(pmd, &poll_list);
@@ -4056,6 +4129,9 @@ reload:
                                                       : PMD_CYCLES_IDLE);
         }
 
+        /* Mark PMD thread alive. */
+        ka_mark_pmd_thread_alive(pmd->tid);
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4089,6 +4165,9 @@ reload:
     }
 
     emc_cache_uninit(&pmd->flow_cache);
+
+    ka_unregister_thread(pmd->tid);
+
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
diff --git a/lib/keepalive.c b/lib/keepalive.c
index 1f151f6..da4defd 100644
--- a/lib/keepalive.c
+++ b/lib/keepalive.c
@@ -19,6 +19,7 @@
 #include "keepalive.h"
 #include "lib/vswitch-idl.h"
 #include "openvswitch/vlog.h"
+#include "process.h"
 #include "seq.h"
 #include "timeval.h"
 
@@ -28,11 +29,18 @@ static bool keepalive_enable = false;      /* Keepalive disabled by default. */
 static uint32_t keepalive_timer_interval;  /* keepalive timer interval. */
 static struct keepalive_info ka_info;
 
-/* Returns true if keepalive is enabled, false otherwise. */
-bool
-ka_is_enabled(void)
+/* Returns true if state update is allowed, false otherwise. */
+static bool
+ka_can_update_state(void)
 {
-    return keepalive_enable;
+    bool reload_inprogress, keepalive;
+
+    atomic_read_relaxed(&ka_info.reload_threads, &reload_inprogress);
+    keepalive = ka_is_enabled();
+
+    /* Return true if KA is enabled and 'cached_process_list' map reload
+     * is completed. */
+    return keepalive && !reload_inprogress;
 }
 
 /* Finds the thread by 'tid' in 'process_list' map and update
@@ -105,6 +113,177 @@ keepalive_register_relay_cb(ka_relay_cb cb, void *aux)
     ka_info.relay_cb_data = aux;
 }
 
+/* Returns true if keepalive is enabled, false otherwise. */
+bool
+ka_is_enabled(void)
+{
+    return keepalive_enable;
+}
+
+/* Return the Keepalive timer interval. */
+uint32_t
+get_ka_interval(void)
+{
+    return keepalive_timer_interval;
+}
+
+/* 'cached_process_list' map reload in progress.
+ *
+ * Should be called before the 'ka_info.cached_process_list'
+ * is populated from 'ka_info.process_list'. This way the pmd
+ * doesn't heartbeat while the reload is in progress. */
+void
+ka_reload_datapath_threads_begin(void)
+{
+    atomic_store_relaxed(&ka_info.reload_threads, true);
+}
+
+/* 'cached_process_list' map reload finished.
+ *
+ * Should be called after the 'ka_info.cached_process_list'
+ * is populated from 'ka_info.process_list'. This way the pmd
+ * can restart heartbeat when the reload is finished. */
+void
+ka_reload_datapath_threads_end(void)
+{
+    atomic_store_relaxed(&ka_info.reload_threads, false);
+}
+
+/* Register thread to KA framework. */
+void
+ka_register_thread(pid_t tid)
+{
+    if (ka_is_enabled()) {
+        struct ka_process_info *ka_pinfo;
+        int core_id = -1;
+        char proc_name[18] = "UNDEFINED";
+
+        struct process_info pinfo;
+        int success = get_process_info(tid, &pinfo);
+        if (success) {
+            core_id = pinfo.core_id;
+            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
+        }
+
+        uint32_t hash = hash_int(tid, 0);
+        ovs_mutex_lock(&ka_info.proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node,
+                                 hash, &ka_info.process_list) {
+            /* Thread is already registered. */
+            if (ka_pinfo->tid == tid) {
+                goto out;
+            }
+        }
+
+        ka_pinfo = xmalloc(sizeof *ka_pinfo);
+        ka_pinfo->tid = tid;
+        ka_pinfo->core_id = core_id;
+        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
+
+        hmap_insert(&ka_info.process_list, &ka_pinfo->node, hash);
+
+        ka_pinfo->state = KA_STATE_ALIVE;
+        ka_pinfo->last_seen_time = time_wall_msec();
+        ka_info.thread_cnt++;  /* Increment count of registered threads. */
+out:
+        ovs_mutex_unlock(&ka_info.proclist_mutex);
+    }
+}
+
+/* Unregister thread from KA framework. */
+void
+ka_unregister_thread(pid_t tid)
+{
+    if (ka_is_enabled()) {
+        struct ka_process_info *ka_pinfo;
+
+        ovs_mutex_lock(&ka_info.proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
+                                 &ka_info.process_list) {
+            /* If thread is registered, remove it from the list */
+            if (ka_pinfo->tid == tid) {
+                hmap_remove(&ka_info.process_list, &ka_pinfo->node);
+                free(ka_pinfo);
+
+                ka_pinfo->state = KA_STATE_UNUSED;
+                ka_info.thread_cnt--;  /* Decrement thread count. */
+                break;
+            }
+        }
+        ovs_mutex_unlock(&ka_info.proclist_mutex);
+    }
+}
+
+/* Free the 'ka_info.cached_process_list' list. */
+void
+ka_free_cached_threads(void)
+{
+    struct ka_process_info *pinfo_cached;
+    /* Free threads in the cached list. */
+    HMAP_FOR_EACH_POP (pinfo_cached, node, &ka_info.cached_process_list) {
+        free(pinfo_cached);
+    }
+    hmap_shrink(&ka_info.cached_process_list);
+}
+
+/* Cache the list of registered threads from 'ka_info.process_list'
+ * map into 'ka_info.cached_process_list.
+ *
+ * 'cached_process_list' map is an exact copy of 'process_list' that will
+ * be updated by 'pmd' and 'ovs_keepalive' threads as part of heartbeat
+ * mechanism.  This cached copy is created so that the heartbeats can be
+ * performed with out acquiring locks.
+ *
+ * On datapath reconfiguration, both the 'process_list' and the cached copy
+ * 'cached_process_list' is updated after setting 'reload_threads' to 'true'
+ * so that pmd doesn't heartbeat while the maps are updated.
+ *
+ */
+void
+ka_cache_registered_threads(void)
+{
+    struct ka_process_info *pinfo, *next, *pinfo_cached;
+
+    ka_free_cached_threads();
+
+    HMAP_FOR_EACH_SAFE (pinfo, next, node, &ka_info.process_list) {
+        pinfo_cached = xmemdup(pinfo, sizeof *pinfo_cached);
+        hmap_insert(&ka_info.cached_process_list, &pinfo_cached->node,
+                     hash_int(pinfo->tid,0));
+    }
+}
+
+/* Mark packet processing thread alive. */
+void
+ka_mark_pmd_thread_alive(int tid)
+{
+    if (ka_can_update_state()) {
+        struct ka_process_info *pinfo;
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
+                             &ka_info.cached_process_list) {
+            if (pinfo->tid == tid) {
+                pinfo->state = KA_STATE_ALIVE;
+            }
+        }
+    }
+}
+
+/* Mark packet processing thread as idle. */
+void
+ka_mark_pmd_thread_sleep(int tid)
+{
+    if (ka_can_update_state()) {
+        struct ka_process_info *pinfo;
+
+        HMAP_FOR_EACH_WITH_HASH (pinfo, node, hash_int(tid, 0),
+                             &ka_info.cached_process_list) {
+            if (pinfo->tid == tid) {
+                pinfo->state = KA_STATE_DOZING;
+            }
+        }
+    }
+}
+
 void
 ka_init(const struct smap *ovs_other_config)
 {
@@ -121,6 +300,7 @@ ka_init(const struct smap *ovs_other_config)
             keepalive_register_relay_cb(ka_update_thread_state, NULL);
             ovs_mutex_init(&ka_info.proclist_mutex);
             hmap_init(&ka_info.process_list);
+            hmap_init(&ka_info.cached_process_list);
 
             ka_info.init_time = time_wall_msec();
 
@@ -141,5 +321,8 @@ ka_destroy(void)
     ovs_mutex_unlock(&ka_info.proclist_mutex);
 
     hmap_destroy(&ka_info.process_list);
+
+    ka_free_cached_threads();
+    hmap_destroy(&ka_info.cached_process_list);
     ovs_mutex_destroy(&ka_info.proclist_mutex);
 }
diff --git a/lib/keepalive.h b/lib/keepalive.h
index 6b33243..9e8bfdf 100644
--- a/lib/keepalive.h
+++ b/lib/keepalive.h
@@ -50,6 +50,9 @@ enum keepalive_state {
 };
 
 struct ka_process_info {
+    /* Process name. */
+    char name[16];
+
     /* Thread id of the process, retrieved using ovs_gettid(). */
     pid_t tid;
 
@@ -73,15 +76,32 @@ struct keepalive_info {
     /* List of process/threads monitored by KA framework. */
     struct hmap process_list OVS_GUARDED;
 
+    /* cached copy of 'process_list' list. */
+    struct hmap cached_process_list;
+
+    /* count of threads registered to KA framework. */
+    uint32_t thread_cnt;
+
     /* Keepalive initialization time. */
     uint64_t init_time;
 
     /* keepalive relay handler. */
     ka_relay_cb relay_cb;
     void *relay_cb_data;
+
+    atomic_bool reload_threads;   /* Reload threads in to cached list. */
 };
 
 bool ka_is_enabled(void);
+uint32_t get_ka_interval(void);
+void ka_reload_datapath_threads_begin(void);
+void ka_reload_datapath_threads_end(void);
+void ka_register_thread(pid_t);
+void ka_unregister_thread(pid_t);
+void ka_free_cached_threads(void);
+void ka_cache_registered_threads(void);
+void ka_mark_pmd_thread_alive(int);
+void ka_mark_pmd_thread_sleep(int);
 void ka_init(const struct smap *);
 void ka_destroy(void);
 
diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c
index 1f4995b..ad1d1b0 100644
--- a/lib/ovs-thread.c
+++ b/lib/ovs-thread.c
@@ -597,6 +597,12 @@ thread_is_pmd(void)
     return !strncmp(name, "pmd", 3);
 }
 
+void
+ovsthread_settid(pid_t *tid)
+{
+    *tid = ovs_gettid();
+}
+
 
 /* ovsthread_key. */
 
diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h
index 55e51a4..018bbb2 100644
--- a/lib/ovs-thread.h
+++ b/lib/ovs-thread.h
@@ -524,5 +524,6 @@ bool may_fork(void);
 
 int count_cpu_cores(void);
 bool thread_is_pmd(void);
+void ovsthread_settid(pid_t *);
 
 #endif /* ovs-thread.h */
diff --git a/lib/util.c b/lib/util.c
index 4ad7eea..c94969a 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -26,6 +26,12 @@
 #include <stdlib.h>
 #include <string.h>
 #include <sys/stat.h>
+#ifdef __linux__
+#include <sys/syscall.h>
+#endif
+#ifdef __FreeBSD__
+#include <sys/thr.h>
+#endif
 #include <unistd.h>
 #include "bitmap.h"
 #include "byte-order.h"
@@ -568,6 +574,22 @@ get_page_size(void)
     return cached;
 }
 
+/* Returns the tid of the calling thread if supported, -EINVAL otherwise. */
+pid_t
+ovs_gettid(void)
+{
+#ifdef __linux__
+    return syscall(SYS_gettid);
+#elif defined(__FreeBSD__) || defined(__NetBSD__)
+    long tid;
+    thr_self(&tid);
+    return (pid_t)tid;
+#endif
+
+    VLOG_ERR("ovs_gettid(): unsupported.");
+    return -EINVAL;
+}
+
 /* Returns the time at which the system booted, as the number of milliseconds
  * since the epoch, or 0 if the time of boot cannot be determined. */
 long long int
diff --git a/lib/util.h b/lib/util.h
index 0449fa1..844dd34 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -143,6 +143,7 @@ void free_cacheline(void *);
 
 void ovs_strlcpy(char *dst, const char *src, size_t size);
 void ovs_strzcpy(char *dst, const char *src, size_t size);
+pid_t ovs_gettid(void);
 
 /* The C standards say that neither the 'dst' nor 'src' argument to
  * memcpy() may be null, even if 'n' is zero.  This wrapper tolerates
-- 
2.4.11



More information about the dev mailing list