[ovs-dev] [PATCH v3 07/19] dpif-netdev: Register packet processing cores to KA framework.

Bhanuprakash Bodireddy bhanuprakash.bodireddy at intel.com
Fri Aug 4 08:07:54 UTC 2017


This commit registers the packet processing PMD cores to keepalive
framework. Only PMDs that have rxqs mapped will be registered and
actively monitored by KA framework.

This commit spawns a keepalive thread that will dispatch heartbeats to
PMD cores. The pmd threads respond to heartbeats by marking themselves
alive. As long as PMD responds to heartbeats it is considered 'healthy'.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodireddy at intel.com>
---
 lib/dpif-netdev.c |  99 +++++++++++++++++++++++++++++++++++++++
 lib/keepalive.c   | 138 +++++++++++++++++++++++++++++++++++++++++++++++-------
 lib/keepalive.h   |  25 +++++++++-
 lib/util.c        |  10 ++++
 lib/util.h        |   1 +
 5 files changed, 254 insertions(+), 19 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index b51674f..5737223 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -73,6 +73,7 @@
 #include "seq.h"
 #include "smap.h"
 #include "sset.h"
+#include "svec.h"
 #include "timeval.h"
 #include "tnl-neigh-cache.h"
 #include "tnl-ports.h"
@@ -979,6 +980,94 @@ sorted_poll_thread_list(struct dp_netdev *dp,
     *n = k;
 }
 
+static void *
+ovs_keepalive(void *f_ OVS_UNUSED)
+{
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        xusleep(get_ka_interval() * 1000);
+    }
+
+    return NULL;
+}
+
+static void
+ka_thread_start(struct dp_netdev *dp)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+    if (ovsthread_once_start(&once)) {
+        ovs_thread_create("ovs_keepalive", ovs_keepalive, dp);
+
+        ovsthread_once_done(&once);
+    }
+}
+
+static void
+pmd_num_poll_ports(struct dp_netdev_pmd_thread *pmd, int *num_poll_ports)
+{
+    struct svec pmd_port_poll_list;
+    svec_init(&pmd_port_poll_list);
+
+    struct rxq_poll *poll;
+    const char *port_name;
+    int i = 0;
+
+    HMAP_FOR_EACH (poll, node, &pmd->poll_list) {
+        svec_add(&pmd_port_poll_list, netdev_rxq_get_name(poll->rxq->rx));
+    }
+    /* With MQ enabled, remove the duplicates. */
+    svec_sort_unique(&pmd_port_poll_list);
+    SVEC_FOR_EACH (i, port_name, &pmd_port_poll_list) {
+        VLOG_DBG("%d Port:%s", i, port_name);
+    }
+    svec_destroy(&pmd_port_poll_list);
+
+    *num_poll_ports = i;
+    VLOG_DBG("PMD thread [%d] polling [%d] ports",
+                 pmd->core_id, *num_poll_ports);
+}
+
+static void
+ka_register_datapath_threads(struct dp_netdev *dp)
+{
+    int ka_init = get_ka_init_status();
+    VLOG_DBG("Keepalive: Was initialization successful? [%s]",
+                ka_init ? "Success" : "Failure");
+    if (!ka_init) {
+        return;
+    }
+
+    ka_thread_start(dp);
+
+    struct dp_netdev_pmd_thread *pmd;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /* Skip PMD thread with no rxqs mapping. */
+        if (OVS_UNLIKELY(!hmap_count(&pmd->poll_list))) {
+            continue;
+        }
+
+        /*  Register only PMD threads. */
+        if (pmd->core_id != NON_PMD_CORE_ID) {
+            int err;
+            int nports;
+            pmd_num_poll_ports(pmd, &nports);
+            err = ka_alloc_portstats(pmd->core_id, nports);
+            if (err) {
+                VLOG_FATAL("Unable to allocate memory for PMD core %d",
+                            pmd->core_id);
+                return;
+            }
+
+            int tid = ka_get_pmd_tid(pmd->core_id);
+            ka_register_thread(tid, true);
+            VLOG_DBG("Registered PMD thread [%d] on Core [%d] to KA framework",
+                      tid, pmd->core_id);
+        }
+    }
+}
+
 static void
 dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
@@ -3626,6 +3715,9 @@ reconfigure_datapath(struct dp_netdev *dp)
 
     /* Reload affected pmd threads. */
     reload_affected_pmds(dp);
+
+    /* Register datapath threads to KA monitoring. */
+    ka_register_datapath_threads(dp);
 }
 
 /* Returns true if one of the netdevs in 'dp' requires a reconfiguration */
@@ -3862,6 +3954,9 @@ reload:
                                                       : PMD_CYCLES_IDLE);
         }
 
+        /* Mark PMD thread alive. */
+        ka_mark_pmd_thread_alive();
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -3895,6 +3990,10 @@ reload:
     }
 
     emc_cache_uninit(&pmd->flow_cache);
+
+    int tid = ka_get_pmd_tid(pmd->core_id);
+    ka_unregister_thread(tid, true);
+
     free(poll_list);
     pmd_free_cached_ports(pmd);
     return NULL;
diff --git a/lib/keepalive.c b/lib/keepalive.c
index 0087e5c..eeaa25a 100644
--- a/lib/keepalive.c
+++ b/lib/keepalive.c
@@ -24,6 +24,7 @@
 #include "keepalive.h"
 #include "lib/vswitch-idl.h"
 #include "openvswitch/vlog.h"
+#include "process.h"
 
 VLOG_DEFINE_THIS_MODULE(keepalive);
 
@@ -77,21 +78,85 @@ ka_store_pmd_id(unsigned core_idx)
 
 /* Register thread to KA framework. */
 void
-ka_register_pmd_thread(int tid OVS_UNUSED, bool thread_is_pmd OVS_UNUSED,
-                       unsigned core_id)
+ka_register_thread(int tid, bool thread_is_pmd)
 {
     if (ka_is_enabled()) {
-        dpdk_register_pmd_core(core_id);
+        struct ka_process_info *ka_pinfo;
+        int core_num = -1;
+        char proc_name[18] = "UNDEFINED";
+
+        struct process_info pinfo;
+        int success = get_process_info(tid, &pinfo);
+        if (success) {
+            core_num = pinfo.core_id;
+            ovs_strlcpy(proc_name, pinfo.name, sizeof proc_name);
+        }
+
+        ovs_assert(core_num >= 0);
+
+        uint32_t hash = hash_int(tid, 0);
+        ovs_mutex_lock(&ka_info->proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash, &ka_info->process_list) {
+            /* PMD thread is already registered. */
+            if (ka_pinfo->tid == tid) {
+                goto out;
+            }
+        }
+
+        ka_pinfo = xmalloc(sizeof *ka_pinfo);
+        ka_pinfo->tid = tid;
+        ka_pinfo->heartbeats = true;
+        ka_pinfo->core_id = core_num;
+        ovs_strlcpy(ka_pinfo->name, proc_name, sizeof ka_pinfo->name);
+
+        hmap_insert(&ka_info->process_list, &ka_pinfo->node, hash);
+
+        if (thread_is_pmd) {
+            dpdk_register_pmd_core(core_num);
+            ka_info->pmd_cnt++;  /* Increment PMD count. */
+        } else {
+            ka_info->nonpmd_cnt++;  /* Increment non-pmd thread count. */
+        }
+out:
+        ovs_mutex_unlock(&ka_info->proclist_mutex);
     }
 }
 
 /* Unregister thread from KA framework. */
 void
-ka_unregister_pmd_thread(int tid OVS_UNUSED, bool thread_is_pmd OVS_UNUSED,
-                         unsigned core_id)
+ka_unregister_thread(int tid, bool thread_is_pmd)
 {
     if (ka_is_enabled()) {
-        dpdk_unregister_pmd_core(core_id);
+        struct ka_process_info *ka_pinfo;
+
+        int core_num = -1;
+        struct process_info pinfo;
+        if (get_process_info(tid, &pinfo)) {
+            core_num = pinfo.core_id;
+        }
+
+        ovs_assert(core_num >= 0);
+
+        ovs_mutex_lock(&ka_info->proclist_mutex);
+        HMAP_FOR_EACH_WITH_HASH (ka_pinfo, node, hash_int(tid, 0),
+                                 &ka_info->process_list) {
+            /* If PMD thread is registered, remove it from the list */
+            if (ka_pinfo->tid == tid) {
+                hmap_remove(&ka_info->process_list, &ka_pinfo->node);
+                free(ka_pinfo);
+
+                if (thread_is_pmd) {
+                    dpdk_unregister_pmd_core(core_num);
+                    ka_info->pmd_cnt--;  /* Decrement PMD count. */
+                } else {
+                    ka_info->nonpmd_cnt--;  /* Decrement non-pmd thread cnt. */
+                }
+
+                break;
+            }
+        }
+
+        ovs_mutex_unlock(&ka_info->proclist_mutex);
     }
 }
 
@@ -147,6 +212,42 @@ get_ka_timer_interval(const struct smap *ovs_other_config OVS_UNUSED)
     return ka_interval;
 }
 
+/* This gets called during every datapath reconfiguration.
+ * Don't allocate memory every time, allocate memory only if
+ * this is invoked first time and use realloc all other times.
+ */
+int
+ka_alloc_portstats(unsigned core_id, int nports)
+{
+    struct poll_port_stats *port_stats =
+        ka_info->ext_stats[core_id].port_stats;
+
+    if (!port_stats) {
+        port_stats = xmalloc(nports * sizeof *port_stats);
+        if (!port_stats) {
+            return ENOMEM;
+        }
+    } else {
+        port_stats = xrealloc(port_stats, nports * sizeof *port_stats);
+    }
+
+    ka_info->ext_stats[core_id].port_stats = port_stats;
+    ka_info->ext_stats[core_id].num_poll_ports = nports;
+    return 0;
+}
+
+void
+ka_destroy_portstats(void)
+{
+    struct poll_port_stats *port_stats;
+    for (int coreid = 0; coreid < KA_DP_MAXCORES; coreid++) {
+        port_stats = ka_info->ext_stats[coreid].port_stats;
+        if (port_stats) {
+            free(port_stats);
+        }
+    }
+}
+
 static struct keepalive_info *
 keepalive_info_create(void)
 {
@@ -205,17 +306,20 @@ ka_init(const struct smap *ovs_other_config)
 void
 ka_destroy(void)
 {
-    if (ka_info) {
-        struct ka_process_info *pinfo;
-        ovs_mutex_lock(&ka_info->proclist_mutex);
-        HMAP_FOR_EACH_POP (pinfo, node, &ka_info->process_list) {
-            free(pinfo);
-        }
-        ovs_mutex_unlock(&ka_info->proclist_mutex);
-        hmap_destroy(&ka_info->process_list);
-
-        ovs_mutex_destroy(&ka_info->proclist_mutex);
+    if (!ka_info) {
+        return;
+    }
 
-        free(ka_info);
+    struct ka_process_info *pinfo;
+    ovs_mutex_lock(&ka_info->proclist_mutex);
+    HMAP_FOR_EACH_POP (pinfo, node, &ka_info->process_list) {
+        free(pinfo);
     }
+    ovs_mutex_unlock(&ka_info->proclist_mutex);
+    hmap_destroy(&ka_info->process_list);
+
+    ovs_mutex_destroy(&ka_info->proclist_mutex);
+    ka_destroy_portstats();
+
+    free(ka_info);
 }
diff --git a/lib/keepalive.h b/lib/keepalive.h
index f74b23a..605e41c 100644
--- a/lib/keepalive.h
+++ b/lib/keepalive.h
@@ -41,13 +41,25 @@ enum keepalive_state {
 };
 
 struct ka_process_info {
+    char name[16];
     int tid;
     int core_id;
+    bool heartbeats;
     enum keepalive_state core_state;
     uint64_t core_last_seen_times;
     struct hmap_node node;
 };
 
+struct poll_port_stats {
+    const char *port;
+    int qid;
+};
+
+struct pmd_extended_stats {
+    struct poll_port_stats *port_stats;
+    int num_poll_ports;
+};
+
 struct keepalive_info {
     /* Mutex for 'process_list'. */
     struct ovs_mutex proclist_mutex;
@@ -55,9 +67,16 @@ struct keepalive_info {
     /* List of process/threads monitored by KA framework. */
     struct hmap process_list OVS_GUARDED;
 
+    /* count of threads registered to KA framework. */
+    uint32_t pmd_cnt;
+    uint32_t nonpmd_cnt;
+
     /* Store Datapath threads 'tid'.
      * In case of DPDK there can be max of KA_DP_MAXCORES threads. */
     pid_t thread_id[KA_DP_MAXCORES];
+
+    /* Additional statistics to monitor health. */
+    struct pmd_extended_stats ext_stats[KA_DP_MAXCORES];
 };
 
 enum keepalive_status {
@@ -71,13 +90,15 @@ void ka_set_pmd_state_ts(unsigned, enum keepalive_state, uint64_t);
 
 int ka_get_pmd_tid(unsigned core);
 bool ka_is_enabled(void);
-void ka_register_pmd_thread(int, bool, unsigned);
-void ka_unregister_pmd_thread(int, bool, unsigned);
+void ka_register_thread(int, bool);
+void ka_unregister_thread(int, bool);
 void ka_mark_pmd_thread_alive(void);
 void ka_mark_pmd_thread_sleep(void);
 
 void ka_store_pmd_id(unsigned core);
 uint32_t get_ka_interval(void);
 int get_ka_init_status(void);
+int ka_alloc_portstats(unsigned, int);
+void ka_destroy_portstats(void);
 
 #endif /* keepalive.h */
diff --git a/lib/util.c b/lib/util.c
index 36e3731..2477ba6 100644
--- a/lib/util.c
+++ b/lib/util.c
@@ -2197,6 +2197,16 @@ xsleep(unsigned int seconds)
     ovsrcu_quiesce_end();
 }
 
+void
+xusleep(unsigned int microseconds)
+{
+    ovsrcu_quiesce_start();
+#ifdef __linux__
+    usleep(microseconds);
+#endif
+    ovsrcu_quiesce_end();
+}
+
 /* Determine whether standard output is a tty or not. This is useful to decide
  * whether to use color output or not when --color option for utilities is set
  * to `auto`.
diff --git a/lib/util.h b/lib/util.h
index 764e0a0..00bbea4 100644
--- a/lib/util.h
+++ b/lib/util.h
@@ -489,6 +489,7 @@ ovs_u128_and(const ovs_u128 a, const ovs_u128 b)
 }
 
 void xsleep(unsigned int seconds);
+void xusleep(unsigned int microseconds);
 
 bool is_stdout_a_tty(void);
 
-- 
2.4.11



More information about the dev mailing list