[ovs-dev] [PATCH 2/3] [RFC] ofproto-dpif-monitor: Make ofproto-dpif-monitor thread safe.
Alex Wang
alexw at nicira.com
Mon Sep 23 18:15:08 UTC 2013
This commit makes the ofproto-dpif-monitor module thread-safe.
Also, to guarantee thread-safety, the ofproto_dpif_send_packet()
function is moved to ofproto-dpif-xlate module.
Signed-off-by: Alex Wang <alexw at nicira.com>
---
ofproto/ofproto-dpif-monitor.c | 106 ++++++++++++++++++++++++++++++----------
ofproto/ofproto-dpif-xlate.c | 52 ++++++++++++++++++++
ofproto/ofproto-dpif-xlate.h | 1 +
ofproto/ofproto-dpif.c | 44 ++---------------
4 files changed, 136 insertions(+), 67 deletions(-)
diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c
index 8f18440..ce7e657 100644
--- a/ofproto/ofproto-dpif-monitor.c
+++ b/ofproto/ofproto-dpif-monitor.c
@@ -50,23 +50,28 @@ struct mport {
static struct monitor monitor;
+static struct ovs_rwlock rwlock = OVS_RWLOCK_INITIALIZER;
+
static void ofproto_dpif_monitor_init(void);
-static struct mport *mport_create(const struct ofport_dpif *);
-static void mport_check_delete(struct mport *);
-static void mport_delete(struct mport *);
-static struct mport *mport_find(const struct ofport_dpif *);
+static struct mport *mport_create(const struct ofport_dpif *)
+ OVS_REQ_WRLOCK(rwlock);
+static void mport_check_delete(struct mport *) OVS_REQ_WRLOCK(rwlock);
+static void mport_delete(struct mport *) OVS_REQ_WRLOCK(rwlock);
+static struct mport *mport_find(const struct ofport_dpif *)
+ OVS_REQ_RDLOCK(rwlock);
/* Initializes the monitor struct. */
static void
ofproto_dpif_monitor_init(void) {
+ ovs_rwlock_wrlock(&rwlock);
hmap_init(&monitor.hmap);
+ ovs_rwlock_unlock(&rwlock);
}
-
/* Creates a new mport and inserts it into monitor->hmap. */
static struct mport *
-mport_create(const struct ofport_dpif *ofport)
+mport_create(const struct ofport_dpif *ofport) OVS_REQ_WRLOCK(rwlock)
{
struct mport *mport;
@@ -81,7 +86,7 @@ mport_create(const struct ofport_dpif *ofport)
/* Checkes 'mport', if both bfd and cfm are NULL, deletes the 'mport'. */
static void
-mport_check_delete(struct mport *mport)
+mport_check_delete(struct mport *mport) OVS_REQ_WRLOCK(rwlock)
{
if (!mport) {
return;
@@ -94,7 +99,7 @@ mport_check_delete(struct mport *mport)
/* Removes 'mport' from monitor's hmap and frees it. */
static void
-mport_delete(struct mport *mport)
+mport_delete(struct mport *mport) OVS_REQ_WRLOCK(rwlock)
{
hmap_remove(&monitor.hmap, &mport->hmap_node);
free(mport);
@@ -103,7 +108,7 @@ mport_delete(struct mport *mport)
/* Tries finding and returning the 'mport' from the monitor's hash map.
* If there is no such 'mport', returns NULL. */
static struct mport *
-mport_find(const struct ofport_dpif *ofport)
+mport_find(const struct ofport_dpif *ofport) OVS_REQ_RDLOCK(rwlock)
{
struct mport *mport = NULL;
struct mport *node;
@@ -125,12 +130,15 @@ mport_find(const struct ofport_dpif *ofport)
bool
ofproto_dpif_monitor_check_alive(const struct ofport_dpif *ofport)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
bool cfm_enable = false;
bool bfd_enable = false;
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (!mport) {
- return true;
+ ovs_rwlock_unlock(&rwlock);
+ return true;
}
if (mport->cfm) {
@@ -145,6 +153,7 @@ ofproto_dpif_monitor_check_alive(const struct ofport_dpif *ofport)
if (mport->bfd) {
bfd_enable = bfd_forwarding(mport->bfd);
}
+ ovs_rwlock_unlock(&rwlock);
return cfm_enable || bfd_enable;
}
@@ -154,12 +163,18 @@ ofproto_dpif_monitor_should_process_bfd(const struct ofport_dpif *ofport,
const struct flow *flow,
struct flow_wildcards *wc)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+ bool retval = false;
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (mport && mport->bfd) {
- return bfd_should_process_flow(mport->bfd, flow, wc);
+ retval = bfd_should_process_flow(mport->bfd, flow, wc);
+ ovs_rwlock_unlock(&rwlock);
+ return retval;
}
- return false;
+ ovs_rwlock_unlock(&rwlock);
+ return retval;
}
/* Checks if the cfm flow should be processed. */
@@ -168,12 +183,18 @@ ofproto_dpif_monitor_should_process_cfm(const struct ofport_dpif *ofport,
const struct flow *flow,
struct flow_wildcards *wc)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+ bool retval = false;
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (mport && mport->cfm) {
- return cfm_should_process_flow(mport->cfm, flow, wc);
+ retval = cfm_should_process_flow(mport->cfm, flow, wc);
+ ovs_rwlock_unlock(&rwlock);
+ return retval;
}
- return false;
+ ovs_rwlock_unlock(&rwlock);
+ return retval;
}
/* Processes the bfd flow. */
@@ -182,10 +203,14 @@ ofproto_dpif_monitor_process_bfd(const struct ofport_dpif *ofport,
const struct flow *flow,
const struct ofpbuf *packet)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (mport) {
bfd_process_packet(mport->bfd, flow, packet);
}
+ ovs_rwlock_unlock(&rwlock);
}
/* Processes the CFM control packet. */
@@ -193,10 +218,14 @@ void
ofproto_dpif_monitor_process_cfm(const struct ofport_dpif *ofport,
const struct ofpbuf *packet)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (mport) {
cfm_process_heartbeat(mport->cfm, packet);
}
+ ovs_rwlock_unlock(&rwlock);
}
/* Starts the monitor, makes sure the init() function is called only once. */
@@ -218,10 +247,13 @@ ofproto_dpif_monitor_set_bfd(const struct ofport_dpif *ofport,
const struct smap *cfg,
struct netdev *netdev, uint8_t *hw_addr)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
struct bfd *old_bfd, *new_bfd;
int retval = 0;
+ ovs_rwlock_wrlock(&rwlock);
+
+ mport = mport_find(ofport);
old_bfd = mport ? mport->bfd : NULL;
new_bfd = bfd_configure(old_bfd, netdev_get_name(netdev), cfg, netdev);
@@ -240,7 +272,7 @@ ofproto_dpif_monitor_set_bfd(const struct ofport_dpif *ofport,
}
retval = 1;
}
-
+ ovs_rwlock_unlock(&rwlock);
return retval;
}
@@ -251,9 +283,11 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
const struct cfm_settings *s,
struct netdev *netdev, uint8_t *hw_addr)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
int retval = 0;
+ ovs_rwlock_wrlock(&rwlock);
+ mport = mport_find(ofport);
if (s) {
if (!mport) {
mport = mport_create(ofport);
@@ -264,6 +298,7 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
retval = 1;
}
if (cfm_configure(mport->cfm, s)) {
+ ovs_rwlock_unlock(&rwlock);
return retval;
}
}
@@ -275,7 +310,7 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
mport->cfm = NULL;
mport_check_delete(mport);
}
-
+ ovs_rwlock_unlock(&rwlock);
retval = -EINVAL;
return retval;
}
@@ -285,13 +320,16 @@ int
ofproto_dpif_monitor_get_bfd_status(const struct ofport_dpif *ofport,
struct smap *smap)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (!mport || !mport->bfd) {
+ ovs_rwlock_unlock(&rwlock);
return ENOENT;
}
bfd_get_status(mport->bfd, smap);
-
+ ovs_rwlock_unlock(&rwlock);
return 0;
}
@@ -300,15 +338,19 @@ bool
ofproto_dpif_monitor_get_cfm_status(const struct ofport_dpif *ofport,
struct ofproto_cfm_status *status)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+ ovs_rwlock_rdlock(&rwlock);
+ mport = mport_find(ofport);
if (!mport || !mport->cfm) {
+ ovs_rwlock_unlock(&rwlock);
return false;
}
status->faults = cfm_get_fault(mport->cfm);
status->remote_opstate = cfm_get_opup(mport->cfm);
status->health = cfm_get_health(mport->cfm);
cfm_get_remote_mpids(mport->cfm, &status->rmps, &status->n_rmps);
+ ovs_rwlock_unlock(&rwlock);
return true;
}
@@ -319,9 +361,12 @@ ofproto_dpif_monitor_set_netdev(const struct ofport_dpif *ofport,
struct netdev *netdev,
uint8_t *hw_addr)
{
- struct mport *mport = mport_find(ofport);
+ struct mport *mport;
+ ovs_rwlock_wrlock(&rwlock);
+ mport = mport_find(ofport);
if (!mport) {
+ ovs_rwlock_unlock(&rwlock);
return;
} else {
mport->hw_addr = hw_addr;
@@ -332,6 +377,7 @@ ofproto_dpif_monitor_set_netdev(const struct ofport_dpif *ofport,
cfm_set_netdev(mport->cfm, netdev);
}
}
+ ovs_rwlock_unlock(&rwlock);
}
/* Checks the sending of control packets on all mports. Sends the control
@@ -341,6 +387,7 @@ ofproto_dpif_monitor_run_fast(void)
{
struct mport *mport;
+ ovs_rwlock_rdlock(&rwlock);
HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
if (mport->cfm && cfm_should_send_ccm(mport->cfm)) {
struct ofpbuf packet;
@@ -360,6 +407,7 @@ ofproto_dpif_monitor_run_fast(void)
ofpbuf_uninit(&packet);
}
}
+ ovs_rwlock_unlock(&rwlock);
}
/* Executes bfd_run(), cfm_run() on all mports. */
@@ -368,6 +416,7 @@ ofproto_dpif_monitor_run(void)
{
struct mport *mport;
+ ovs_rwlock_rdlock(&rwlock);
HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
if (mport->cfm) {
cfm_run(mport->cfm);
@@ -377,6 +426,7 @@ ofproto_dpif_monitor_run(void)
bfd_run(mport->bfd);
}
}
+ ovs_rwlock_unlock(&rwlock);
}
/* Executes the bfd_wait() and cfm_wait() functions on all mports. */
@@ -385,6 +435,7 @@ ofproto_dpif_monitor_wait(void)
{
struct mport *mport;
+ ovs_rwlock_rdlock(&rwlock);
HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
if (mport->cfm) {
cfm_wait(mport->cfm);
@@ -394,4 +445,5 @@ ofproto_dpif_monitor_wait(void)
bfd_wait(mport->bfd);
}
}
+ ovs_rwlock_unlock(&rwlock);
}
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index 5d9297e..3225f2c 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -2677,3 +2677,55 @@ out:
rule_actions_unref(actions);
}
+
+int
+xlate_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
+{
+ uint64_t odp_actions_stub[1024 / 8];
+ struct xport *xport;
+ struct ofpbuf key, odp_actions;
+ struct dpif_flow_stats stats;
+ struct odputil_keybuf keybuf;
+ struct ofpact_output output;
+ struct xlate_out xout;
+ struct xlate_in xin;
+ struct flow flow;
+ union flow_in_port in_port_;
+ int error;
+
+ ovs_rwlock_rdlock(&xlate_rwlock);
+ xport = xport_lookup(ofport);
+ if (!xport) {
+ error = EINVAL;
+ goto out;
+ }
+
+ ofpbuf_use_stub(&odp_actions, odp_actions_stub, sizeof odp_actions_stub);
+ ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
+
+ /* Use OFPP_NONE as the in_port to avoid special packet processing. */
+ in_port_.ofp_port = OFPP_NONE;
+ flow_extract(packet, 0, 0, NULL, &in_port_, &flow);
+ odp_flow_key_from_flow(&key, &flow, ofp_port_to_odp_port(xport->xbridge, OFPP_LOCAL));
+ dpif_flow_stats_extract(&flow, packet, time_msec(), &stats);
+
+ ofpact_init(&output.ofpact, OFPACT_OUTPUT, sizeof output);
+ output.port = xport->ofp_port;
+ output.max_len = 0;
+
+ xlate_in_init(&xin, xport->xbridge->ofproto, &flow, NULL, 0, packet);
+ xin.ofpacts_len = sizeof output;
+ xin.ofpacts = &output.ofpact;
+ xin.resubmit_stats = &stats;
+ xlate_actions(&xin, &xout);
+
+ error = dpif_execute(xport->xbridge->dpif,
+ key.data, key.size,
+ xout.odp_actions.data, xout.odp_actions.size,
+ packet);
+ xlate_out_uninit(&xout);
+
+out:
+ ovs_rwlock_unlock(&xlate_rwlock);
+ return error;
+}
diff --git a/ofproto/ofproto-dpif-xlate.h b/ofproto/ofproto-dpif-xlate.h
index 8768ad3..ffed74e 100644
--- a/ofproto/ofproto-dpif-xlate.h
+++ b/ofproto/ofproto-dpif-xlate.h
@@ -152,4 +152,5 @@ void xlate_in_init(struct xlate_in *, struct ofproto_dpif *,
void xlate_out_uninit(struct xlate_out *);
void xlate_actions_for_side_effects(struct xlate_in *);
void xlate_out_copy(struct xlate_out *dst, const struct xlate_out *src);
+int xlate_send_packet(const struct ofport_dpif *, struct ofpbuf *packet);
#endif /* ofproto-dpif-xlate.h */
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index ee107a9..2ebcdf9 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -4849,51 +4849,15 @@ int
ofproto_dpif_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
{
struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
- uint64_t odp_actions_stub[1024 / 8];
- struct ofpbuf key, odp_actions;
- struct dpif_flow_stats stats;
- struct odputil_keybuf keybuf;
- struct ofpact_output output;
- struct xlate_out xout;
- struct xlate_in xin;
- struct flow flow;
- union flow_in_port in_port_;
int error;
- ofpbuf_use_stub(&odp_actions, odp_actions_stub, sizeof odp_actions_stub);
- ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
-
- /* Use OFPP_NONE as the in_port to avoid special packet processing. */
- in_port_.ofp_port = OFPP_NONE;
- flow_extract(packet, 0, 0, NULL, &in_port_, &flow);
- odp_flow_key_from_flow(&key, &flow, ofp_port_to_odp_port(ofproto,
- OFPP_LOCAL));
- dpif_flow_stats_extract(&flow, packet, time_msec(), &stats);
-
- ofpact_init(&output.ofpact, OFPACT_OUTPUT, sizeof output);
- output.port = ofport->up.ofp_port;
- output.max_len = 0;
+ error = xlate_send_packet(ofport, packet);
- xlate_in_init(&xin, ofproto, &flow, NULL, 0, packet);
- xin.ofpacts_len = sizeof output;
- xin.ofpacts = &output.ofpact;
- xin.resubmit_stats = &stats;
- xlate_actions(&xin, &xout);
-
- error = dpif_execute(ofproto->backer->dpif,
- key.data, key.size,
- xout.odp_actions.data, xout.odp_actions.size,
- packet);
- xlate_out_uninit(&xout);
-
- if (error) {
- VLOG_WARN_RL(&rl, "%s: failed to send packet on port %s (%s)",
- ofproto->up.name, netdev_get_name(ofport->up.netdev),
- ovs_strerror(error));
+ if (!error) {
+ ofproto->stats.tx_packets++;
+ ofproto->stats.tx_bytes += packet->size;
}
- ofproto->stats.tx_packets++;
- ofproto->stats.tx_bytes += packet->size;
return error;
}
--
1.7.9.5
More information about the dev
mailing list