[ovs-dev] [PATCH V2 3/3] ofproto-dpif-monitor: Move ofproto-dpif-monitor to a single thread.
Alex Wang
alexw at nicira.com
Fri Sep 27 22:41:18 UTC 2013
This commit moves the ofproto-dpif-monitor module into a
dedicated thread.
Signed-off-by: Alex Wang <alexw at nicira.com>
---
v1 -> v2:
- re-adjust the code base on changes made to previous patches.
---
lib/bfd.c | 4 +-
lib/timeval.c | 20 ++++++
lib/timeval.h | 4 ++
ofproto/ofproto-dpif-monitor.c | 149 +++++++++++++++++++++++++++++++---------
ofproto/ofproto-dpif-monitor.h | 7 +-
ofproto/ofproto-dpif.c | 7 +-
tests/bfd.at | 33 +++++----
tests/ofproto-dpif.at | 58 ++++++++++++++++
8 files changed, 225 insertions(+), 57 deletions(-)
diff --git a/lib/bfd.c b/lib/bfd.c
index 6c9e920..c106983 100644
--- a/lib/bfd.c
+++ b/lib/bfd.c
@@ -721,8 +721,10 @@ bfd_process_packet(struct bfd *bfd, const struct flow *flow,
rmt_min_rx = MAX(ntohl(msg->min_rx) / 1000, 1);
if (bfd->rmt_min_rx != rmt_min_rx) {
bfd->rmt_min_rx = rmt_min_rx;
- bfd_set_next_tx(bfd);
log_msg(VLL_INFO, msg, "New remote min_rx", bfd);
+ if (bfd->last_tx) {
+ bfd_set_next_tx(bfd);
+ }
}
bfd->rmt_min_tx = MAX(ntohl(msg->min_tx) / 1000, 1);
diff --git a/lib/timeval.c b/lib/timeval.c
index 223ed30..befba1c 100644
--- a/lib/timeval.c
+++ b/lib/timeval.c
@@ -33,6 +33,7 @@
#include "hmap.h"
#include "ovs-thread.h"
#include "signals.h"
+#include "seq.h"
#include "unixctl.h"
#include "util.h"
#include "vlog.h"
@@ -57,6 +58,9 @@ static struct clock wall_clock; /* CLOCK_REALTIME. */
/* The monotonic time at which the time module was initialized. */
static long long int boot_time;
+/* Reference to the seq struct of monitor thread. */
+static struct seq *monitor_seq;
+
/* Monotonic time in milliseconds at which to die with SIGALRM (if not
* LLONG_MAX). */
static long long int deadline = LLONG_MAX;
@@ -294,6 +298,18 @@ time_boot_msec(void)
return boot_time;
}
+/* Sets monitor_seq to 'seq'. */
+void
+time_set_monitor_seq(struct seq *seq) {
+ monitor_seq = seq;
+}
+
+/* Clears monitor_seq. */
+void
+time_clear_monitor_seq(void) {
+ monitor_seq = NULL;
+}
+
void
xgettimeofday(struct timeval *tv)
{
@@ -509,6 +525,10 @@ timeval_warp_cb(struct unixctl_conn *conn,
ovs_mutex_lock(&monotonic_clock.mutex);
atomic_store(&monotonic_clock.slow_path, true);
timespec_add(&monotonic_clock.warp, &monotonic_clock.warp, &ts);
+ /* Changes 'monitor_seq' to wakeup monitor thread. */
+ if (monitor_seq) {
+ seq_change(monitor_seq);
+ }
ovs_mutex_unlock(&monotonic_clock.mutex);
unixctl_command_reply(conn, "warped");
diff --git a/lib/timeval.h b/lib/timeval.h
index 99b3af0..1273dd8 100644
--- a/lib/timeval.h
+++ b/lib/timeval.h
@@ -27,6 +27,7 @@ extern "C" {
struct ds;
struct pollfd;
+struct seq;
struct timespec;
struct timeval;
@@ -69,6 +70,9 @@ int get_cpu_usage(void);
long long int time_boot_msec(void);
+void time_set_monitor_seq(struct seq *);
+void time_clear_monitor_seq(void);
+
#ifdef __cplusplus
}
#endif
diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c
index 97c6e40..5847b08 100644
--- a/ofproto/ofproto-dpif-monitor.c
+++ b/ofproto/ofproto-dpif-monitor.c
@@ -21,11 +21,18 @@
#include "cfm.h"
#include "hash.h"
#include "hmap.h"
+#include "latch.h"
#include "ofpbuf.h"
#include "ofproto-dpif.h"
+#include "ovs-thread.h"
+#include "poll-loop.h"
+#include "seq.h"
+#include "timeval.h"
#include "util.h"
#include "vlog.h"
+VLOG_DEFINE_THIS_MODULE(ofproto_dpif_monitor);
+
/* Monitored port. It contains references to ofport, bfd, cfm structs. */
struct mport {
struct hmap_node hmap_node; /* In monitor's hmap. */
@@ -36,11 +43,26 @@ struct mport {
uint8_t *hw_addr; /* Hardware address. */
};
+/* Monitor thread related variables. */
/* hmap that contains all port monitors. */
static struct hmap monitor_hmap = HMAP_INITIALIZER(&monitor_hmap);
+/* latch that controls the exit of monitor thread. */
+static struct latch monitor_exit_latch;
+/* seq that controls the wakeup of monitor thread. */
+static struct seq *monitor_wait_seq;
+static uint64_t monitor_last_seq;
+/* NULL if monitor thread is not started. */
+static pthread_t *monitor_tid;
static struct ovs_rwlock monitor_rwlock = OVS_RWLOCK_INITIALIZER;
+static void monitor_init(void);
+static void *monitor_handler(void *);
+static void monitor_run(void);
+static void monitor_wait(void);
+static void monitor_start(void);
+static void monitor_terminate(void);
+
static void mport_register(const struct ofport_dpif *, struct bfd *,
struct cfm *, uint8_t *)
OVS_REQ_WRLOCK(monitor_rwlock);
@@ -51,6 +73,20 @@ static void mport_update(struct mport *, struct bfd *, struct cfm *, uint8_t *)
static struct mport *mport_find(const struct ofport_dpif *)
OVS_REQ_WRLOCK(monitor_rwlock);
+/* Initializes the monitor struct. The init function can only be
+ * called once. */
+static void
+monitor_init(void) {
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+
+ if (ovsthread_once_start(&once)) {
+ monitor_wait_seq = seq_create();
+ monitor_last_seq = seq_read(monitor_wait_seq);
+ monitor_tid = NULL;
+ ovsthread_once_done(&once);
+ }
+}
+
/* Tries finding and returning the 'mport' from the monitor's hash map.
* If there is no such 'mport', returns NULL. */
static struct mport *
@@ -117,28 +153,27 @@ mport_update(struct mport *mport, struct bfd *bfd, struct cfm *cfm,
mport->hw_addr = hw_addr;
}
}
-
-/* Creates the mport in monitor module if either bfd or cfm
- * is configured. Otherwise, deletes the mport. */
-void
-ofproto_dpif_monitor_mport_update(const struct ofport_dpif *ofport,
- struct bfd *bfd, struct cfm *cfm,
- uint8_t *hw_addr)
+/* The handler function for the monitor thread. */
+static void *
+monitor_handler(void * args OVS_UNUSED)
{
- ovs_rwlock_wrlock(&monitor_rwlock);
- if (!cfm && !bfd) {
- mport_unregister(ofport);
- } else {
- mport_register(ofport, bfd, cfm, hw_addr);
+ set_subprogram_name("ofproto_dpif_monitor");
+ VLOG_INFO("ofproto_dpif_monitor thread created");
+ while (!latch_is_set(&monitor_exit_latch)) {
+ monitor_run();
+ monitor_wait();
+ latch_wait(&monitor_exit_latch);
+ poll_block();
}
- ovs_rwlock_unlock(&monitor_rwlock);
+ VLOG_INFO("ofproto_dpif_monitor thread terminated");
+ return NULL;
}
/* Checks the sending of control packets on all mports. Sends the control
- * packets if needed. */
-void
-ofproto_dpif_monitor_run_fast(void)
+ * packets if needed. Executes bfd_run(), cfm_run() on all mports. */
+static void
+monitor_run(void)
{
struct mport *mport;
@@ -160,18 +195,6 @@ ofproto_dpif_monitor_run_fast(void)
ofproto_dpif_send_packet(mport->ofport, &packet);
ofpbuf_uninit(&packet);
}
- }
- ovs_rwlock_unlock(&monitor_rwlock);
-}
-
-/* Executes bfd_run(), cfm_run() on all mports. */
-void
-ofproto_dpif_monitor_run(void)
-{
- struct mport *mport;
-
- ovs_rwlock_rdlock(&monitor_rwlock);
- HMAP_FOR_EACH (mport, hmap_node, &monitor_hmap) {
if (mport->cfm) {
cfm_run(mport->cfm);
}
@@ -182,9 +205,10 @@ ofproto_dpif_monitor_run(void)
ovs_rwlock_unlock(&monitor_rwlock);
}
-/* Executes the bfd_wait() and cfm_wait() functions on all mports. */
-void
-ofproto_dpif_monitor_wait(void)
+/* Executes the bfd_wait() and cfm_wait() functions on all mports,
+ * and wait on the sequence number. */
+static void
+monitor_wait(void)
{
struct mport *mport;
@@ -197,5 +221,68 @@ ofproto_dpif_monitor_wait(void)
bfd_wait(mport->bfd);
}
}
+ monitor_last_seq = seq_read(monitor_wait_seq);
+ seq_wait(monitor_wait_seq, monitor_last_seq);
ovs_rwlock_unlock(&monitor_rwlock);
}
+
+/* Starts the monitor thread. */
+static void
+monitor_start(void)
+{
+ /* zalloc tid. */
+ monitor_tid = xzalloc(sizeof *monitor_tid);
+ latch_init(&monitor_exit_latch);
+
+ xpthread_create(monitor_tid, NULL, monitor_handler, NULL);
+ time_set_monitor_seq(monitor_wait_seq);
+}
+
+/* Terminates the monitor thread. */
+static void
+monitor_terminate(void)
+{
+ time_clear_monitor_seq();
+ latch_set(&monitor_exit_latch);
+ xpthread_join(*monitor_tid, NULL);
+
+ /* frees the tid. */
+ latch_destroy(&monitor_exit_latch);
+ free(monitor_tid);
+ monitor_tid = NULL;
+}
+
+
+/* Creates the mport in monitor module if either bfd or cfm
+ * is configured. Otherwise, deletes the mport. */
+void
+ofproto_dpif_monitor_mport_update(const struct ofport_dpif *ofport,
+ struct bfd *bfd, struct cfm *cfm,
+ uint8_t *hw_addr)
+{
+ monitor_init();
+
+ ovs_rwlock_wrlock(&monitor_rwlock);
+ if (!cfm && !bfd) {
+ mport_unregister(ofport);
+ } else {
+ mport_register(ofport, bfd, cfm, hw_addr);
+ }
+ ovs_rwlock_unlock(&monitor_rwlock);
+}
+
+/* Checks if the monitor thread has been started. If it hasn't,
+ * and the hmap is not empty, starts it. If it has
+ * and the hmap is empty, terminates it.
+ * This function can only be called by the main thread. */
+void
+ofproto_dpif_monitor_check(void)
+{
+ monitor_init();
+
+ if (!monitor_tid && !hmap_is_empty(&monitor_hmap)) {
+ monitor_start();
+ } else if (monitor_tid && hmap_is_empty(&monitor_hmap)) {
+ monitor_terminate();
+ }
+}
diff --git a/ofproto/ofproto-dpif-monitor.h b/ofproto/ofproto-dpif-monitor.h
index 5a03883..97e7663 100644
--- a/ofproto/ofproto-dpif-monitor.h
+++ b/ofproto/ofproto-dpif-monitor.h
@@ -17,16 +17,11 @@
#include <stdint.h>
-#include "compiler.h"
-
struct bfd;
struct cfm;
struct ofport_dpif;
-void ofproto_dpif_monitor_run(void);
-void ofproto_dpif_monitor_run_fast(void);
-void ofproto_dpif_monitor_wait(void);
-
+void ofproto_dpif_monitor_check(void);
void ofproto_dpif_monitor_mport_update(const struct ofport_dpif *,
struct bfd *, struct cfm *,
uint8_t *);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index b90815a..3b6e602 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -835,6 +835,8 @@ type_run(const char *type)
udpif_revalidate(backer->udpif);
}
+ ofproto_dpif_monitor_check();
+
if (!backer->recv_set_enable) {
/* Wake up before a max of 1000ms. */
timer_set_duration(&backer->next_expiration, 1000);
@@ -1467,7 +1469,6 @@ run_fast(struct ofproto *ofproto_)
free(pin);
}
- ofproto_dpif_monitor_run_fast();
return 0;
}
@@ -1509,9 +1510,6 @@ run(struct ofproto *ofproto_)
dpif_ipfix_run(ofproto->ipfix);
}
- ofproto_dpif_monitor_run_fast();
- ofproto_dpif_monitor_run();
-
HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
port_run(ofport);
}
@@ -1568,7 +1566,6 @@ wait(struct ofproto *ofproto_)
if (ofproto->ipfix) {
dpif_ipfix_wait(ofproto->ipfix);
}
- ofproto_dpif_monitor_wait();
HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
bundle_wait(bundle);
}
diff --git a/tests/bfd.at b/tests/bfd.at
index cc6755c..b83440a 100644
--- a/tests/bfd.at
+++ b/tests/bfd.at
@@ -271,6 +271,7 @@ else
for i in `seq 0 1`; do ovs-appctl time/warp 500; done
fi
+
# Test-1 BFD decay: decay to decay_min_rx
# bfd:decay_min_rx is set to 3000ms after the local state of p0 goes up,
# so for the first 2500ms, there should be no change.
@@ -345,15 +346,16 @@ BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [N
# change decay_min_rx to 1000ms.
# for decay_min_rx < 2000ms, the decay detection time is set to 2000ms.
# this should firstly reset the min_rx and start poll sequence.
-AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=1000])
+AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=1000])o
+ovs-appctl time/warp 500
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
-# for the following 1500ms, there should be no decay,
+# for the following 1000ms, there should be no decay,
# since the decay_detect_time is set to 2000ms.
-for i in `seq 0 2`
+for i in `seq 0 1`
do
ovs-appctl time/warp 500
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
@@ -376,21 +378,19 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done
# Test-4 BFD decay: set min_rx to 800ms.
# this should firstly reset the min_rx and then re-decay to 1000ms.
AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=800])
+ovs-appctl time/warp 800
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
-# for the following 1600ms, there should be no decay,
+# for the following 800ms, there should be no decay,
# since the decay detection time is set to 2000ms.
-for i in `seq 0 1`
-do
- ovs-appctl time/warp 800
- BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
- BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
- BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
- BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
-done
+ovs-appctl time/warp 800
+BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
+BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
+BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
+BFD_CHECK_RX([p0], [800ms], [800ms], [500ms])
ovs-appctl time/warp 400
# at 2000ms, decay should happen and there should be the poll sequence flags.
@@ -405,6 +405,7 @@ for i in `seq 0 4`; do ovs-appctl time/warp 500; done
# Test-5 BFD decay: set min_rx to 300ms and decay_min_rx to 5000ms together.
AT_CHECK([ovs-vsctl set Interface p0 bfd:min_rx=300 bfd:decay_min_rx=5000])
+ovs-appctl time/warp 500
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
@@ -412,9 +413,9 @@ BFD_CHECK_RX([p0], [500ms], [300ms], [500ms])
# for decay_min_rx > 2000ms, the decay detection time is set to
# decay_min_rx (5000ms).
-# for the following 4500ms, there should be no decay,
+# for the following 4000ms, there should be no decay,
# since the decay detection time is set to 5000ms.
-for i in `seq 0 8`
+for i in `seq 1 8`
do
ovs-appctl time/warp 500
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [none], [up], [No Diagnostic])
@@ -436,6 +437,7 @@ for i in `seq 0 9`; do ovs-appctl time/warp 500; done
# Test-6 BFD decay: set decay_min_rx to 0 to disable bfd decay.
AT_CHECK([ovs-vsctl set Interface p0 bfd:decay_min_rx=0])
+ovs-appctl time/warp 500
# min_rx is reset, and there should be the poll sequence flags.
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
@@ -472,6 +474,7 @@ do
done
# reset the p1's min_tx to 500ms.
AT_CHECK([ovs-vsctl set Interface p1 bfd:min_tx=500])
+ovs-appctl time/warp 500
# check the poll sequence. since p0 has been in decay, now the RX will show 3000ms.
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
@@ -686,6 +689,8 @@ done
# reconfigure the decay_min_rx to 1000ms. check the poll sequence.
AT_CHECK([ovs-vsctl set interface p0 bfd:decay_min_rx=1000])
+# advance the clock by tiny little bit to wakeup the monitor thread.
+ovs-appctl time/warp 1
BFD_CHECK([p0], [true], [false], [none], [up], [No Diagnostic], [final], [up], [No Diagnostic])
BFD_CHECK([p1], [true], [false], [none], [up], [No Diagnostic], [poll], [up], [No Diagnostic])
BFD_CHECK_TX([p0], [500ms], [300ms], [500ms])
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index f67c3ab..c2c0b24 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -2816,3 +2816,61 @@ AT_CHECK([ovs-appctl bond/show | sed -n '/^.*may_enable:.*/p'], [0], [dnl
OVS_VSWITCHD_STOP
AT_CLEANUP
+
+AT_SETUP([ofproto-dpif - ofproto-dpif-monitor])
+OVS_VSWITCHD_START([add-port br0 p0 -- set interface p0 type=gre options:remote_ip=1.2.3.4])
+
+# enable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log])
+
+# enable cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread created
+])
+# disable cfm on p0.
+AT_CHECK([ovs-vsctl remove interface p0 cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread terminated
+])
+AT_CHECK([cat ovs-vswitchd.log | sed -e '/^.*ofproto_dpif_monitor.*$/d' > ovs-vswitchd.log])
+
+# enable both bfd and cfm on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true cfm_mpid=10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread created
+])
+# disable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false])
+# check log, there should not be the log of thread terminated.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+])
+# reenable bfd on p0.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=true])
+# check log, should still be on log of thread created.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* created\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread created
+])
+# disable bfd and cfm together.
+AT_CHECK([ovs-vsctl set interface p0 bfd:enable=false -- remove interface p0 cfm_mpid 10])
+# check log.
+AT_CHECK([sed -n "s/^.*|ofproto_dpif_monitor(ofproto_dpif_monitor)|INFO|\(.* terminated\)$/\1/p" ovs-vswitchd.log], [0], [dnl
+ofproto_dpif_monitor thread terminated
+])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
\ No newline at end of file
--
1.7.9.5
More information about the dev
mailing list