[ovs-dev] [PATCH 2/3] [RFC] ofproto-dpif-monitor: Make ofproto-dpif-monitor thread safe.

Alex Wang alexw at nicira.com
Mon Sep 23 18:15:08 UTC 2013


This commit makes the ofproto-dpif-monitor module thread-safe.
Also, to guarantee thread-safety, the ofproto_dpif_send_packet()
function is moved to ofproto-dpif-xlate module.

Signed-off-by: Alex Wang <alexw at nicira.com>
---
 ofproto/ofproto-dpif-monitor.c |  106 ++++++++++++++++++++++++++++++----------
 ofproto/ofproto-dpif-xlate.c   |   52 ++++++++++++++++++++
 ofproto/ofproto-dpif-xlate.h   |    1 +
 ofproto/ofproto-dpif.c         |   44 ++---------------
 4 files changed, 136 insertions(+), 67 deletions(-)

diff --git a/ofproto/ofproto-dpif-monitor.c b/ofproto/ofproto-dpif-monitor.c
index 8f18440..ce7e657 100644
--- a/ofproto/ofproto-dpif-monitor.c
+++ b/ofproto/ofproto-dpif-monitor.c
@@ -50,23 +50,28 @@ struct mport {
 
 static struct monitor monitor;
 
+static struct ovs_rwlock rwlock = OVS_RWLOCK_INITIALIZER;
+
 static void ofproto_dpif_monitor_init(void);
 
-static struct mport *mport_create(const struct ofport_dpif *);
-static void mport_check_delete(struct mport *);
-static void mport_delete(struct mport *);
-static struct mport *mport_find(const struct ofport_dpif *);
+static struct mport *mport_create(const struct ofport_dpif *)
+    OVS_REQ_WRLOCK(rwlock);
+static void mport_check_delete(struct mport *) OVS_REQ_WRLOCK(rwlock);
+static void mport_delete(struct mport *) OVS_REQ_WRLOCK(rwlock);
+static struct mport *mport_find(const struct ofport_dpif *)
+    OVS_REQ_RDLOCK(rwlock);
 
 /* Initializes the monitor struct. */
 static void
 ofproto_dpif_monitor_init(void) {
+    ovs_rwlock_wrlock(&rwlock);
     hmap_init(&monitor.hmap);
+    ovs_rwlock_unlock(&rwlock);
 }
 
-
 /* Creates a new mport and inserts it into monitor->hmap. */
 static struct mport *
-mport_create(const struct ofport_dpif *ofport)
+mport_create(const struct ofport_dpif *ofport) OVS_REQ_WRLOCK(rwlock)
 {
     struct mport *mport;
 
@@ -81,7 +86,7 @@ mport_create(const struct ofport_dpif *ofport)
 
 /* Checkes 'mport', if both bfd and cfm are NULL, deletes the 'mport'. */
 static void
-mport_check_delete(struct mport *mport)
+mport_check_delete(struct mport *mport) OVS_REQ_WRLOCK(rwlock)
 {
     if (!mport) {
         return;
@@ -94,7 +99,7 @@ mport_check_delete(struct mport *mport)
 
 /* Removes 'mport' from monitor's hmap and frees it. */
 static void
-mport_delete(struct mport *mport)
+mport_delete(struct mport *mport) OVS_REQ_WRLOCK(rwlock)
 {
     hmap_remove(&monitor.hmap, &mport->hmap_node);
     free(mport);
@@ -103,7 +108,7 @@ mport_delete(struct mport *mport)
 /* Tries finding and returning the 'mport' from the monitor's hash map.
  * If there is no such 'mport', returns NULL. */
 static struct mport *
-mport_find(const struct ofport_dpif *ofport)
+mport_find(const struct ofport_dpif *ofport) OVS_REQ_RDLOCK(rwlock)
 {
     struct mport *mport = NULL;
     struct mport *node;
@@ -125,12 +130,15 @@ mport_find(const struct ofport_dpif *ofport)
 bool
 ofproto_dpif_monitor_check_alive(const struct ofport_dpif *ofport)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
     bool cfm_enable = false;
     bool bfd_enable = false;
 
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (!mport) {
-        return true;
+      ovs_rwlock_unlock(&rwlock);
+      return true;
     }
 
     if (mport->cfm) {
@@ -145,6 +153,7 @@ ofproto_dpif_monitor_check_alive(const struct ofport_dpif *ofport)
     if (mport->bfd) {
         bfd_enable = bfd_forwarding(mport->bfd);
     }
+    ovs_rwlock_unlock(&rwlock);
     return cfm_enable || bfd_enable;
 }
 
@@ -154,12 +163,18 @@ ofproto_dpif_monitor_should_process_bfd(const struct ofport_dpif *ofport,
                                         const struct flow *flow,
                                         struct flow_wildcards *wc)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
+    bool retval = false;
 
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (mport && mport->bfd) {
-        return bfd_should_process_flow(mport->bfd, flow, wc);
+        retval = bfd_should_process_flow(mport->bfd, flow, wc);
+        ovs_rwlock_unlock(&rwlock);
+        return retval;
     }
-    return false;
+    ovs_rwlock_unlock(&rwlock);
+    return retval;
 }
 
 /* Checks if the cfm flow should be processed. */
@@ -168,12 +183,18 @@ ofproto_dpif_monitor_should_process_cfm(const struct ofport_dpif *ofport,
                                         const struct flow *flow,
                                         struct flow_wildcards *wc)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
+    bool retval = false;
 
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (mport && mport->cfm) {
-        return cfm_should_process_flow(mport->cfm, flow, wc);
+        retval = cfm_should_process_flow(mport->cfm, flow, wc);
+        ovs_rwlock_unlock(&rwlock);
+        return retval;
     }
-    return false;
+    ovs_rwlock_unlock(&rwlock);
+    return retval;
 }
 
 /* Processes the bfd flow. */
@@ -182,10 +203,14 @@ ofproto_dpif_monitor_process_bfd(const struct ofport_dpif *ofport,
                                  const struct flow *flow,
                                  const struct ofpbuf *packet)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
+
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (mport) {
         bfd_process_packet(mport->bfd, flow, packet);
     }
+    ovs_rwlock_unlock(&rwlock);
 }
 
 /* Processes the CFM control packet. */
@@ -193,10 +218,14 @@ void
 ofproto_dpif_monitor_process_cfm(const struct ofport_dpif *ofport,
                                  const struct ofpbuf *packet)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
+
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (mport) {
         cfm_process_heartbeat(mport->cfm, packet);
     }
+    ovs_rwlock_unlock(&rwlock);
 }
 
 /* Starts the monitor, makes sure the init() function is called only once. */
@@ -218,10 +247,13 @@ ofproto_dpif_monitor_set_bfd(const struct ofport_dpif *ofport,
                              const struct smap *cfg,
                              struct netdev *netdev,  uint8_t *hw_addr)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
     struct bfd *old_bfd, *new_bfd;
     int retval = 0;
 
+    ovs_rwlock_wrlock(&rwlock);
+
+    mport = mport_find(ofport);
     old_bfd  = mport ? mport->bfd : NULL;
     new_bfd = bfd_configure(old_bfd, netdev_get_name(netdev), cfg, netdev);
 
@@ -240,7 +272,7 @@ ofproto_dpif_monitor_set_bfd(const struct ofport_dpif *ofport,
         }
         retval = 1;
     }
-
+    ovs_rwlock_unlock(&rwlock);
     return retval;
 }
 
@@ -251,9 +283,11 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
                              const struct cfm_settings *s,
                              struct netdev *netdev, uint8_t *hw_addr)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
     int retval = 0;
 
+    ovs_rwlock_wrlock(&rwlock);
+    mport = mport_find(ofport);
     if (s) {
         if (!mport) {
             mport = mport_create(ofport);
@@ -264,6 +298,7 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
             retval = 1;
         }
         if (cfm_configure(mport->cfm, s)) {
+            ovs_rwlock_unlock(&rwlock);
             return retval;
         }
     }
@@ -275,7 +310,7 @@ ofproto_dpif_monitor_set_cfm(const struct ofport_dpif *ofport,
         mport->cfm = NULL;
         mport_check_delete(mport);
     }
-
+    ovs_rwlock_unlock(&rwlock);
     retval = -EINVAL;
     return retval;
 }
@@ -285,13 +320,16 @@ int
 ofproto_dpif_monitor_get_bfd_status(const struct ofport_dpif *ofport,
                                     struct smap *smap)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
 
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (!mport || !mport->bfd) {
+        ovs_rwlock_unlock(&rwlock);
         return ENOENT;
     }
     bfd_get_status(mport->bfd, smap);
-
+    ovs_rwlock_unlock(&rwlock);
     return 0;
 }
 
@@ -300,15 +338,19 @@ bool
 ofproto_dpif_monitor_get_cfm_status(const struct ofport_dpif *ofport,
                                     struct ofproto_cfm_status *status)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
 
+    ovs_rwlock_rdlock(&rwlock);
+    mport = mport_find(ofport);
     if (!mport || !mport->cfm) {
+        ovs_rwlock_unlock(&rwlock);
         return false;
     }
     status->faults = cfm_get_fault(mport->cfm);
     status->remote_opstate = cfm_get_opup(mport->cfm);
     status->health = cfm_get_health(mport->cfm);
     cfm_get_remote_mpids(mport->cfm, &status->rmps, &status->n_rmps);
+    ovs_rwlock_unlock(&rwlock);
     return true;
 }
 
@@ -319,9 +361,12 @@ ofproto_dpif_monitor_set_netdev(const struct ofport_dpif *ofport,
                                 struct netdev *netdev,
                                 uint8_t *hw_addr)
 {
-    struct mport *mport = mport_find(ofport);
+    struct mport *mport;
 
+    ovs_rwlock_wrlock(&rwlock);
+    mport = mport_find(ofport);
     if (!mport) {
+        ovs_rwlock_unlock(&rwlock);
         return;
     } else {
         mport->hw_addr = hw_addr;
@@ -332,6 +377,7 @@ ofproto_dpif_monitor_set_netdev(const struct ofport_dpif *ofport,
             cfm_set_netdev(mport->cfm, netdev);
         }
     }
+    ovs_rwlock_unlock(&rwlock);
 }
 
 /* Checks the sending of control packets on all mports.  Sends the control
@@ -341,6 +387,7 @@ ofproto_dpif_monitor_run_fast(void)
 {
     struct mport *mport;
 
+    ovs_rwlock_rdlock(&rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
         if (mport->cfm && cfm_should_send_ccm(mport->cfm)) {
             struct ofpbuf packet;
@@ -360,6 +407,7 @@ ofproto_dpif_monitor_run_fast(void)
             ofpbuf_uninit(&packet);
         }
     }
+    ovs_rwlock_unlock(&rwlock);
 }
 
 /* Executes bfd_run(), cfm_run() on all mports. */
@@ -368,6 +416,7 @@ ofproto_dpif_monitor_run(void)
 {
     struct mport *mport;
 
+    ovs_rwlock_rdlock(&rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
         if (mport->cfm) {
             cfm_run(mport->cfm);
@@ -377,6 +426,7 @@ ofproto_dpif_monitor_run(void)
             bfd_run(mport->bfd);
         }
     }
+    ovs_rwlock_unlock(&rwlock);
 }
 
 /* Executes the bfd_wait() and cfm_wait() functions on all mports. */
@@ -385,6 +435,7 @@ ofproto_dpif_monitor_wait(void)
 {
     struct mport *mport;
 
+    ovs_rwlock_rdlock(&rwlock);
     HMAP_FOR_EACH (mport, hmap_node, &monitor.hmap) {
         if (mport->cfm) {
             cfm_wait(mport->cfm);
@@ -394,4 +445,5 @@ ofproto_dpif_monitor_wait(void)
             bfd_wait(mport->bfd);
         }
     }
+    ovs_rwlock_unlock(&rwlock);
 }
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index 5d9297e..3225f2c 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -2677,3 +2677,55 @@ out:
 
     rule_actions_unref(actions);
 }
+
+int
+xlate_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
+{
+    uint64_t odp_actions_stub[1024 / 8];
+    struct xport *xport;
+    struct ofpbuf key, odp_actions;
+    struct dpif_flow_stats stats;
+    struct odputil_keybuf keybuf;
+    struct ofpact_output output;
+    struct xlate_out xout;
+    struct xlate_in xin;
+    struct flow flow;
+    union flow_in_port in_port_;
+    int error;
+
+    ovs_rwlock_rdlock(&xlate_rwlock);
+    xport = xport_lookup(ofport);
+    if (!xport) {
+        error = EINVAL;
+        goto out;
+    }
+
+    ofpbuf_use_stub(&odp_actions, odp_actions_stub, sizeof odp_actions_stub);
+    ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
+
+    /* Use OFPP_NONE as the in_port to avoid special packet processing. */
+    in_port_.ofp_port = OFPP_NONE;
+    flow_extract(packet, 0, 0, NULL, &in_port_, &flow);
+    odp_flow_key_from_flow(&key, &flow, ofp_port_to_odp_port(xport->xbridge, OFPP_LOCAL));
+    dpif_flow_stats_extract(&flow, packet, time_msec(), &stats);
+
+    ofpact_init(&output.ofpact, OFPACT_OUTPUT, sizeof output);
+    output.port = xport->ofp_port;
+    output.max_len = 0;
+
+    xlate_in_init(&xin, xport->xbridge->ofproto, &flow, NULL, 0, packet);
+    xin.ofpacts_len = sizeof output;
+    xin.ofpacts = &output.ofpact;
+    xin.resubmit_stats = &stats;
+    xlate_actions(&xin, &xout);
+
+    error = dpif_execute(xport->xbridge->dpif,
+                         key.data, key.size,
+                         xout.odp_actions.data, xout.odp_actions.size,
+                         packet);
+    xlate_out_uninit(&xout);
+
+out:
+    ovs_rwlock_unlock(&xlate_rwlock);
+    return error;
+}
diff --git a/ofproto/ofproto-dpif-xlate.h b/ofproto/ofproto-dpif-xlate.h
index 8768ad3..ffed74e 100644
--- a/ofproto/ofproto-dpif-xlate.h
+++ b/ofproto/ofproto-dpif-xlate.h
@@ -152,4 +152,5 @@ void xlate_in_init(struct xlate_in *, struct ofproto_dpif *,
 void xlate_out_uninit(struct xlate_out *);
 void xlate_actions_for_side_effects(struct xlate_in *);
 void xlate_out_copy(struct xlate_out *dst, const struct xlate_out *src);
+int xlate_send_packet(const struct ofport_dpif *, struct ofpbuf *packet);
 #endif /* ofproto-dpif-xlate.h */
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index ee107a9..2ebcdf9 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -4849,51 +4849,15 @@ int
 ofproto_dpif_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
-    uint64_t odp_actions_stub[1024 / 8];
-    struct ofpbuf key, odp_actions;
-    struct dpif_flow_stats stats;
-    struct odputil_keybuf keybuf;
-    struct ofpact_output output;
-    struct xlate_out xout;
-    struct xlate_in xin;
-    struct flow flow;
-    union flow_in_port in_port_;
     int error;
 
-    ofpbuf_use_stub(&odp_actions, odp_actions_stub, sizeof odp_actions_stub);
-    ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
-
-    /* Use OFPP_NONE as the in_port to avoid special packet processing. */
-    in_port_.ofp_port = OFPP_NONE;
-    flow_extract(packet, 0, 0, NULL, &in_port_, &flow);
-    odp_flow_key_from_flow(&key, &flow, ofp_port_to_odp_port(ofproto,
-                                                             OFPP_LOCAL));
-    dpif_flow_stats_extract(&flow, packet, time_msec(), &stats);
-
-    ofpact_init(&output.ofpact, OFPACT_OUTPUT, sizeof output);
-    output.port = ofport->up.ofp_port;
-    output.max_len = 0;
+    error = xlate_send_packet(ofport, packet);
 
-    xlate_in_init(&xin, ofproto, &flow, NULL, 0, packet);
-    xin.ofpacts_len = sizeof output;
-    xin.ofpacts = &output.ofpact;
-    xin.resubmit_stats = &stats;
-    xlate_actions(&xin, &xout);
-
-    error = dpif_execute(ofproto->backer->dpif,
-                         key.data, key.size,
-                         xout.odp_actions.data, xout.odp_actions.size,
-                         packet);
-    xlate_out_uninit(&xout);
-
-    if (error) {
-        VLOG_WARN_RL(&rl, "%s: failed to send packet on port %s (%s)",
-                     ofproto->up.name, netdev_get_name(ofport->up.netdev),
-                     ovs_strerror(error));
+    if (!error) {
+        ofproto->stats.tx_packets++;
+        ofproto->stats.tx_bytes += packet->size;
     }
 
-    ofproto->stats.tx_packets++;
-    ofproto->stats.tx_bytes += packet->size;
     return error;
 }
 
-- 
1.7.9.5




More information about the dev mailing list