[ovs-dev] [PATCH V2 6/10] ofproto-dpif-monitor: Run ofproto-dpif-monitor in a thread.

Alex Wang alexw at nicira.com
Wed Oct 9 01:17:22 UTC 2013


This commit moves the ofproto-dpif-monitor module into a
dedicated thread.  This helps eliminate the burden of main
thread having to wake up very frequently for periodic
interface monitoring (bfd, cfm).  Also, this commit greatly
increases the number of bfd/cfm sessions that can be supported
by ovs.

Signed-off-by: Alex Wang <alexw at nicira.com>

---

v1 -> v2:
- rebase to master.

---
 ofproto/ofproto-dpif-monitor.c |  149 ++++++++++++++++++++++++++++++----------
 ofproto/ofproto-dpif-monitor.h |    4 --
 ofproto/ofproto-dpif.c         |    5 --
 tests/bfd.at                   |   34 ++++-----
 tests/ofproto-dpif.at          |   70 +++++++++++++++++++
 5 files changed, 196 insertions(+), 66 deletions(-)

diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c
index a0c3843..0eb8529 100644
--- a/ofproto/ofproto-dpif-monitor.c
+++ b/ofproto/ofproto-dpif-monitor.c
@@ -23,11 +23,17 @@
 #include "cfm.h"
 #include "hash.h"
 #include "hmap.h"
+#include "latch.h"
 #include "ofpbuf.h"
 #include "ofproto-dpif.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
+#include "seq.h"
 #include "util.h"
 #include "vlog.h"
 
+VLOG_DEFINE_THIS_MODULE(ofproto_dpif_monitor);
+
 /* Monitored port.  It owns references to ofport, bfd, cfm structs. */
 struct mport {
     struct hmap_node hmap_node;       /* In monitor_hmap. */
@@ -41,8 +47,24 @@ struct mport {
 /* hmap that contains "struct mport"s. */
 static struct hmap monitor_hmap = HMAP_INITIALIZER(&monitor_hmap);
 
+/* latch that controls the exit of monitor thread. */
+static struct latch monitor_exit_latch;
+
+/* NULL if monitor thread is not started. */
+static pthread_t *monitor_tid;
+
+/* "struct seq" to wakeup the monitor thread. */
+static struct seq *monitor_seq;
+
 static struct ovs_rwlock monitor_rwlock = OVS_RWLOCK_INITIALIZER;
 
+static void monitor_check_run(void) OVS_EXCLUDED(monitor_rwlock);
+static void monitor_init(void);
+static void *monitor_main(void *);
+static void monitor_run(void);
+static void monitor_start(void) OVS_EXCLUDED(monitor_rwlock);
+static void monitor_terminate(void) OVS_EXCLUDED(monitor_rwlock);
+
 static void mport_register(const struct ofport_dpif *, struct bfd *,
                            struct cfm *, uint8_t[ETH_ADDR_LEN])
     OVS_REQ_WRLOCK(monitor_rwlock);
@@ -118,29 +140,49 @@ mport_update(struct mport *mport, struct bfd *bfd, struct cfm *cfm,
     if (hw_addr && memcmp(mport->hw_addr, hw_addr, ETH_ADDR_LEN)) {
         memcpy(mport->hw_addr, hw_addr, ETH_ADDR_LEN);
     }
+    /* If bfd/cfm is added or reconfigured, wakes up the monitor thread. */
+    if (mport->bfd || mport->cfm) {
+        seq_change(monitor_seq);
+    }
 }
 
 
-/* Creates the mport in monitor module if either bfd or cfm
- * is configured.  Otherwise, deletes the mport. */
-void
-ofproto_dpif_monitor_port_update(const struct ofport_dpif *ofport,
-                                 struct bfd *bfd, struct cfm *cfm,
-                                 uint8_t hw_addr[ETH_ADDR_LEN])
+/* Initializes the global variables.  This will only run once. */
+static void
+monitor_init(void)
 {
-    ovs_rwlock_wrlock(&monitor_rwlock);
-    if (!cfm && !bfd) {
-        mport_unregister(ofport);
-    } else {
-        mport_register(ofport, bfd, cfm, hw_addr);
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+    if (ovsthread_once_start(&once)) {
+        hmap_init(&monitor_hmap);
+        monitor_seq = seq_create();
+        ovsthread_once_done(&once);
     }
-    ovs_rwlock_unlock(&monitor_rwlock);
+}
+
+/* The 'main' function for the monitor thread. */
+static void *
+monitor_main(void * args OVS_UNUSED)
+{
+    set_subprogram_name("monitor");
+    VLOG_INFO("monitor thread created");
+    while (!latch_is_set(&monitor_exit_latch)) {
+        uint64_t seq = seq_read(monitor_seq);
+
+        monitor_run();
+        latch_wait(&monitor_exit_latch);
+        seq_wait(monitor_seq, seq);
+        poll_block();
+    }
+    VLOG_INFO("monitor thread terminated");
+    return NULL;
 }
 
 /* Checks the sending of control packets on all mports.  Sends the control
- * packets if needed. */
-void
-ofproto_dpif_monitor_run_fast(void)
+ * packets if needed.  Executes bfd and cfm periodic functions (run, wait)
+ * on all mports. */
+static void
+monitor_run(void)
 {
     struct mport *mport;
     static uint32_t buf_stub[128 / 4];
@@ -159,41 +201,74 @@ ofproto_dpif_monitor_run_fast(void)
             ofproto_dpif_send_packet(mport->ofport, &packet);
         }
     }
-    ovs_rwlock_unlock(&monitor_rwlock);
-}
-
-/* Executes bfd_run(), cfm_run() on all mports. */
-void
-ofproto_dpif_monitor_run(void)
-{
-    struct mport *mport;
-
-    ovs_rwlock_rdlock(&monitor_rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
         if (mport->cfm) {
             cfm_run(mport->cfm);
+            cfm_wait(mport->cfm);
         }
         if (mport->bfd) {
             bfd_run(mport->bfd);
+            bfd_wait(mport->bfd);
         }
     }
     ovs_rwlock_unlock(&monitor_rwlock);
 }
 
-/* Executes the bfd_wait() and cfm_wait() functions on all mports. */
-void
-ofproto_dpif_monitor_wait(void)
+/* Checks if the monitor thread has been started.  If it hasn't,
+ * and the hmap is not empty, starts it.  If it has
+ * and the hmap is empty, terminates it.
+ * This function can only be called in the main thread.  */
+static void
+monitor_check_run(void) OVS_EXCLUDED(monitor_rwlock)
 {
-    struct mport *mport;
+    if (!monitor_tid && !hmap_is_empty(&monitor_hmap))  {
+        monitor_start();
+    } else if (monitor_tid && hmap_is_empty(&monitor_hmap))  {
+        monitor_terminate();
+    }
+}
 
-    ovs_rwlock_rdlock(&monitor_rwlock);
-    HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
-        if (mport->cfm) {
-            cfm_wait(mport->cfm);
-        }
-        if (mport->bfd) {
-            bfd_wait(mport->bfd);
-        }
+/* Starts the monitor thread. */
+static void
+monitor_start(void) OVS_EXCLUDED(monitor_rwlock)
+{
+    /* zalloc tid. */
+    monitor_tid = xzalloc(sizeof *monitor_tid);
+    latch_init(&monitor_exit_latch);
+
+    xpthread_create(monitor_tid, NULL, monitor_main, NULL);
+}
+
+/* Terminates the monitor thread. */
+static void
+monitor_terminate(void) OVS_EXCLUDED(monitor_rwlock)
+{
+    latch_set(&monitor_exit_latch);
+    xpthread_join(*monitor_tid, NULL);
+
+    /* frees the tid. */
+    latch_destroy(&monitor_exit_latch);
+    free(monitor_tid);
+    monitor_tid = NULL;
+}
+
+
+/* Creates the mport in monitor module if either bfd or cfm
+ * is configured.  Otherwise, deletes the mport.
+ * Also checks whether the monitor thread should be started
+ * or terminated. */
+void
+ofproto_dpif_monitor_port_update(const struct ofport_dpif *ofport,
+                                 struct bfd *bfd, struct cfm *cfm,
+                                 uint8_t hw_addr[ETH_ADDR_LEN])
+{
+    monitor_init();
+    ovs_rwlock_wrlock(&monitor_rwlock);
+    if (!cfm && !bfd) {
+        mport_unregister(ofport);
+    } else {
+        mport_register(ofport, bfd, cfm, hw_addr);
     }
     ovs_rwlock_unlock(&monitor_rwlock);
+    monitor_check_run();
 }
diff --git a/ofproto/ofproto-dpif-monitor.h b/ofproto/ofproto-dpif-monitor.h
index 8e26814..f914fbe 100644
--- a/ofproto/ofproto-dpif-monitor.h
+++ b/ofproto/ofproto-dpif-monitor.h
@@ -23,10 +23,6 @@ struct bfd;
 struct cfm;
 struct ofport_dpif;
 
-void ofproto_dpif_monitor_run(void);
-void ofproto_dpif_monitor_run_fast(void);
-void ofproto_dpif_monitor_wait(void);
-
 void ofproto_dpif_monitor_port_update(const struct ofport_dpif *,
                                       struct bfd *, struct cfm *,
                                       uint8_t[OFP_ETH_ALEN]);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 7eff326..66c41bc 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -1445,7 +1445,6 @@ run_fast(struct ofproto *ofproto_)
         free(pin);
     }
 
-    ofproto_dpif_monitor_run_fast();
     return 0;
 }
 
@@ -1487,9 +1486,6 @@ run(struct ofproto *ofproto_)
         dpif_ipfix_run(ofproto->ipfix);
     }
 
-    ofproto_dpif_monitor_run_fast();
-    ofproto_dpif_monitor_run();
-
     HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
         port_run(ofport);
     }
@@ -1546,7 +1542,6 @@ wait(struct ofproto *ofproto_)
     if (ofproto->ipfix) {
         dpif_ipfix_wait(ofproto->ipfix);
     }
-    ofproto_dpif_monitor_wait();
     HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
         bundle_wait(bundle);
     }
diff --git a/tests/bfd.at b/tests/bfd.at
index 3154909..cff0fce 100644
--- a/tests/bfd.at
+++ b/tests/bfd.at
@@ -255,32 +255,26 @@ OVS_VSWITCHD_START([add-br br1 -- set bridge br1 datapath-type=dummy -- \
                     options:peer=p0 ofport_request=2 -- \
                     add-port br0 p0 -- set Interface p0 type=patch \
                     options:peer=p1 ofport_request=1 -- \
-                    set Interface p0 bfd:enable=true bfd:min_tx=300 bfd:min_rx=300 bfd:decay_min_rx=3000 -- \
+                    set Interface p0 bfd:enable=true bfd:min_tx=300 bfd:min_rx=300 -- \
                     set Interface p1 bfd:enable=true bfd:min_tx=500 bfd:min_rx=500])
 
 ovs-appctl time/stop
-for i in `seq 0 1`; do ovs-appctl time/warp 500; done
-
-# figuring out which port initiates the bfd session is important,
-# since this whole unit test is based on exact timing sequence.
-# for example, if p0 starts the bfd session, the p0 should have gone
-# [up] now, and it will decay after 3000ms. if p1 starts the bfd
-# session, we should wait for another 1000ms for p0 to go [up], and
-# then 3000ms for it to decay.
-
-# check which port sends the first bfd control packet.
-if [ ovs-appctl bfd/show p0 | grep "Remote Session State: init" ]
-then
-# if p0 sends first, it should have gone up already.
-    BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [init], [No Diagnostic])
-else
-# if p1 sends first, wait 1000ms for p0 to go up.
-    BFD_CHECK([p0], [false], [false], [none], [init], [No Diagnostic], [none], [down], [No Diagnostic])
-    for i in `seq 0 1`; do ovs-appctl time/warp 500; done
-fi
 
+# wait for a while to stablize everything.
+for i in `seq 0 9`; do ovs-appctl time/warp 500; done
+BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
+BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
+BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
+BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
 
 # Test-1 BFD decay: decay to decay_min_rx
+AT_CHECK([ovs-vsctl set interface p0 bfd:decay_min_rx=3000])
+# set the bfd:decay_min_rx of p0 to 3000ms.  there should be the poll sequence.
+BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
+BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
+BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
+BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
+
 # bfd:decay_min_rx is set to 3000ms after the local state of p0 goes up,
 # so for the first 2500ms, there should be no change.
 for i in `seq 0 4`; do ovs-appctl time/warp 500; done
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index 87e3d0e..ab61ad1 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -2841,6 +2841,76 @@ AT_CHECK([ovs-appctl bond/show | sed -n '/^.*may_enable:.*/p'], [0], [dnl
 
 OVS_VSWITCHD_STOP
 AT_CLEANUP
+
+AT_SETUP([ofproto-dpif - ofproto-dpif-monitor 1])
+OVS_VSWITCHD_START([add-port br0 p0 -- set interface p0 type=gre options:remote_ip=1.2.3.4])
+
+# enable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log])
+
+# enable cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable cfm on p0.
+AT_CHECK([ovs-vsctl remove interface p0 cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log])
+
+# enable both bfd and cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log, there should not be the log of thread terminated.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+])
+# reenable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log, should still be on log of thread created.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread created
+])
+# disable bfd and cfm together.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false -- remove interface p0 cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+monitor thread terminated
+])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
+# this test helps avoid the deadlock between the main thread and monitor thread.
+AT_SETUP([ofproto-dpif - ofproto-dpif-monitor 2])
+OVS_VSWITCHD_START
+
+for i in `seq 1 199`
+do
+    AT_CHECK([ovs-vsctl add-port br0 p$i -- set interface p$i type=gre options:remote_ip=1.2.3.4 options:key=$i bfd:enable=true])
+done
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
 
 AT_BANNER([ofproto-dpif - flow translation resource limits])
 
-- 
1.7.9.5




More information about the dev mailing list