[ovs-dev] [threaded-learning 11/25] ofproto: Add a ref_count to "struct rule" to protect it from being freed.

Ben Pfaff blp at nicira.com
Wed Sep 11 05:27:11 UTC 2013


Taking a read-lock on the 'rwlock' member of struct rule prevents members
of the rule from changing.  This is a short-term use of the 'rwlock': one
takes the lock, reads some members, and releases the lock.

Taking a read-lock on the 'rwlock' also prevents the rule from being freed.
This is often a relatively long-term need.  For example, until now flow
translation has held the rwlock in xlate_table_action() across the entire
recursive translation, which can call into a great deal of different code
across multiple files.

This commit switches to using a reference count for this second purpose
of struct rule's rwlock.  This means that all the code that previously
held a read-lock on the rwlock across deep stacks of functions can now
simply keep a reference.

Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 ofproto/ofproto-dpif-upcall.c |    2 +-
 ofproto/ofproto-dpif-xlate.c  |   16 +++++++---------
 ofproto/ofproto-dpif.c        |   36 +++++++++++++++++++-----------------
 ofproto/ofproto-dpif.h        |   12 +++++-------
 ofproto/ofproto-provider.h    |   23 +++++++++++++++--------
 ofproto/ofproto.c             |   32 +++++++++++++++++++++++++-------
 6 files changed, 72 insertions(+), 49 deletions(-)

diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
index 54f441b..ae856a4 100644
--- a/ofproto/ofproto-dpif-upcall.c
+++ b/ofproto/ofproto-dpif-upcall.c
@@ -725,7 +725,7 @@ execute_flow_miss(struct flow_miss *miss, struct dpif_op *ops, size_t *n_ops)
             xlate_actions_for_side_effects(&xin);
         }
     }
-    rule_dpif_release(rule);
+    rule_dpif_unref(rule);
 
     if (miss->xout.odp_actions.size) {
         LIST_FOR_EACH (packet, list_node, &miss->packets) {
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index eb6a1f9..611861c 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -1669,7 +1669,6 @@ compose_output_action(struct xlate_ctx *ctx, ofp_port_t ofp_port)
 
 static void
 xlate_recursively(struct xlate_ctx *ctx, struct rule_dpif *rule)
-    OVS_RELEASES(rule)
 {
     struct rule_dpif *old_rule = ctx->rule;
     struct rule_actions *actions;
@@ -1686,7 +1685,7 @@ xlate_recursively(struct xlate_ctx *ctx, struct rule_dpif *rule)
     ctx->rule = old_rule;
     ctx->recurse--;
 
-    rule_dpif_release(rule);
+    rule_dpif_unref(rule);
 }
 
 static void
@@ -1697,7 +1696,6 @@ xlate_table_action(struct xlate_ctx *ctx,
         struct rule_dpif *rule;
         ofp_port_t old_in_port = ctx->xin->flow.in_port.ofp_port;
         uint8_t old_table_id = ctx->table_id;
-        bool got_rule;
 
         ctx->table_id = table_id;
 
@@ -1705,18 +1703,16 @@ xlate_table_action(struct xlate_ctx *ctx,
          * original input port (otherwise OFPP_NORMAL and OFPP_IN_PORT will
          * have surprising behavior). */
         ctx->xin->flow.in_port.ofp_port = in_port;
-        got_rule = rule_dpif_lookup_in_table(ctx->xbridge->ofproto,
-                                             &ctx->xin->flow, &ctx->xout->wc,
-                                             table_id, &rule);
+        rule_dpif_lookup_in_table(ctx->xbridge->ofproto,
+                                  &ctx->xin->flow, &ctx->xout->wc,
+                                  table_id, &rule);
         ctx->xin->flow.in_port.ofp_port = old_in_port;
 
         if (ctx->xin->resubmit_hook) {
             ctx->xin->resubmit_hook(ctx->xin, rule, ctx->recurse);
         }
 
-        if (got_rule) {
-            xlate_recursively(ctx, rule);
-        } else if (may_packet_in) {
+        if (!rule && may_packet_in) {
             struct xport *xport;
 
             /* XXX
@@ -1729,6 +1725,8 @@ xlate_table_action(struct xlate_ctx *ctx,
             choose_miss_rule(xport ? xport->config : 0,
                              ctx->xbridge->miss_rule,
                              ctx->xbridge->no_packet_in_rule, &rule);
+        }
+        if (rule) {
             xlate_recursively(ctx, rule);
         }
 
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index cbaae5a..2b2fe62 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -1371,7 +1371,7 @@ add_internal_flow(struct ofproto_dpif *ofproto, int id,
 
     if (rule_dpif_lookup_in_table(ofproto, &fm.match.flow, NULL, TBL_INTERNAL,
                                   rulep)) {
-        rule_dpif_release(*rulep);
+        rule_dpif_unref(*rulep);
     } else {
         NOT_REACHED();
     }
@@ -4167,7 +4167,7 @@ facet_is_controller_flow(struct facet *facet)
         is_controller = ofpacts_len > 0
             && ofpacts->type == OFPACT_CONTROLLER
             && ofpact_next(ofpacts) >= ofpact_end(ofpacts, ofpacts_len);
-        rule_dpif_release(rule);
+        rule_dpif_unref(rule);
 
         return is_controller;
     }
@@ -4262,7 +4262,7 @@ facet_check_consistency(struct facet *facet)
     rule_dpif_lookup(facet->ofproto, &facet->flow, NULL, &rule);
     xlate_in_init(&xin, facet->ofproto, &facet->flow, rule, 0, NULL);
     xlate_actions(&xin, &xout);
-    rule_dpif_release(rule);
+    rule_dpif_unref(rule);
 
     ok = ofpbuf_equal(&facet->xout.odp_actions, &xout.odp_actions)
         && facet->xout.slow == xout.slow;
@@ -4360,7 +4360,7 @@ facet_revalidate(struct facet *facet)
         || memcmp(&facet->xout.wc, &xout.wc, sizeof xout.wc)) {
         facet_remove(facet);
         xlate_out_uninit(&xout);
-        rule_dpif_release(new_rule);
+        rule_dpif_unref(new_rule);
         return false;
     }
 
@@ -4392,7 +4392,7 @@ facet_revalidate(struct facet *facet)
     facet->used = MAX(facet->used, new_rule->up.created);
 
     xlate_out_uninit(&xout);
-    rule_dpif_release(new_rule);
+    rule_dpif_unref(new_rule);
     return true;
 }
 
@@ -4425,7 +4425,7 @@ flow_push_stats(struct ofproto_dpif *ofproto, struct flow *flow,
     xin.resubmit_stats = stats;
     xin.may_learn = may_learn;
     xlate_actions_for_side_effects(&xin);
-    rule_dpif_release(rule);
+    rule_dpif_unref(rule);
 }
 
 static void
@@ -4811,7 +4811,6 @@ bool
 rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto,
                           const struct flow *flow, struct flow_wildcards *wc,
                           uint8_t table_id, struct rule_dpif **rule)
-    OVS_TRY_RDLOCK(true, (*rule)->up.rwlock)
 {
     struct cls_rule *cls_rule;
     struct classifier *cls;
@@ -4846,11 +4845,7 @@ rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto,
     }
 
     *rule = rule_dpif_cast(rule_from_cls_rule(cls_rule));
-    if (*rule && ovs_rwlock_tryrdlock(&(*rule)->up.rwlock)) {
-        /* The rule is in the process of being removed.  Best we can do is
-         * pretend it isn't there. */
-        *rule = NULL;
-    }
+    rule_dpif_ref(*rule);
     ovs_rwlock_unlock(&cls->rwlock);
 
     return *rule != NULL;
@@ -4862,18 +4857,25 @@ rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto,
 void
 choose_miss_rule(enum ofputil_port_config config, struct rule_dpif *miss_rule,
                  struct rule_dpif *no_packet_in_rule, struct rule_dpif **rule)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     *rule = config & OFPUTIL_PC_NO_PACKET_IN ? no_packet_in_rule : miss_rule;
-    ovs_rwlock_rdlock(&(*rule)->up.rwlock);
+    rule_dpif_ref(*rule);
+}
+
+void
+rule_dpif_ref(struct rule_dpif *rule)
+{
+    if (rule) {
+        ofproto_rule_ref(&rule->up);
+    }
 }
 
 void
-rule_dpif_release(struct rule_dpif *rule)
+rule_dpif_unref(struct rule_dpif *rule)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     if (rule) {
-        ovs_rwlock_unlock(&rule->up.rwlock);
+        ofproto_rule_unref(&rule->up);
     }
 }
 
@@ -5589,7 +5591,7 @@ ofproto_trace(struct ofproto_dpif *ofproto, const struct flow *flow,
         xlate_out_uninit(&trace.xout);
     }
 
-    rule_dpif_release(rule);
+    rule_dpif_unref(rule);
 }
 
 /* Runs a self-check of flow translations in 'ofproto'.  Appends a message to
diff --git a/ofproto/ofproto-dpif.h b/ofproto/ofproto-dpif.h
index befd458..14a9669 100644
--- a/ofproto/ofproto-dpif.h
+++ b/ofproto/ofproto-dpif.h
@@ -61,15 +61,14 @@ struct OVS_LOCKABLE rule_dpif;
  *   actions into datapath actions. */
 
 void rule_dpif_lookup(struct ofproto_dpif *, const struct flow *,
-                      struct flow_wildcards *, struct rule_dpif **rule)
-    OVS_ACQ_RDLOCK(*rule);
+                      struct flow_wildcards *, struct rule_dpif **rule);
 
 bool rule_dpif_lookup_in_table(struct ofproto_dpif *, const struct flow *,
                                struct flow_wildcards *, uint8_t table_id,
-                               struct rule_dpif **rule)
-    OVS_TRY_RDLOCK(true, *rule);
+                               struct rule_dpif **rule);
 
-void rule_dpif_release(struct rule_dpif *rule) OVS_RELEASES(rule);
+void rule_dpif_ref(struct rule_dpif *);
+void rule_dpif_unref(struct rule_dpif *);
 
 void rule_dpif_credit_stats(struct rule_dpif *rule ,
                             const struct dpif_flow_stats *);
@@ -86,8 +85,7 @@ void rule_dpif_reduce_timeouts(struct rule_dpif *rule, uint16_t idle_timeout,
 void choose_miss_rule(enum ofputil_port_config,
                       struct rule_dpif *miss_rule,
                       struct rule_dpif *no_packet_in_rule,
-                      struct rule_dpif **rule)
-    OVS_ACQ_RDLOCK(*rule);
+                      struct rule_dpif **rule);
 
 bool ofproto_has_vlan_splinters(const struct ofproto_dpif *);
 ofp_port_t vsp_realdev_to_vlandev(const struct ofproto_dpif *,
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index f8b856e..7c71aad 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -27,6 +27,7 @@
 #include "ofp-errors.h"
 #include "ofp-util.h"
 #include "ofproto/ofproto.h"
+#include "ovs-atomic.h"
 #include "ovs-thread.h"
 #include "shash.h"
 #include "simap.h"
@@ -220,6 +221,7 @@ struct rule {
     struct list ofproto_node;    /* Owned by ofproto base code. */
     struct ofproto *ofproto;     /* The ofproto that contains this rule. */
     struct cls_rule cr;          /* In owning ofproto's classifier. */
+    atomic_uint ref_count;
 
     struct ofoperation *pending; /* Operation now in progress, if nonnull. */
 
@@ -242,14 +244,16 @@ struct rule {
     struct eviction_group *eviction_group; /* NULL if not in any group. */
 
     /* The rwlock is used to protect those elements in struct rule which are
-     * accessed by multiple threads.  While maintaining a pointer to struct
-     * rule, threads are required to hold a readlock.  The main ofproto code is
-     * guaranteed not to evict the rule, or change any of the elements "Guarded
-     * by rwlock" without holding the writelock.
-     *
-     * A rule will not be evicted unless both its own and its classifier's
-     * write locks are held.  Therefore, while holding a classifier readlock,
-     * one can be assured that write locked rules are safe to reference. */
+     * accessed by multiple threads.  The main ofproto code is guaranteed not
+     * to change any of the elements "Guarded by rwlock" without holding the
+     * writelock.
+     *
+     * While maintaining a pointer to struct rule, threads are required to hold
+     * a readlock on the classifier that holds the rule or increment the rule's
+     * ref_count.
+     *
+     * A rule will not be evicted unless its classifier's write lock is
+     * held. */
     struct ovs_rwlock rwlock;
 
     /* Guarded by rwlock. */
@@ -267,6 +271,9 @@ struct rule {
                                  * is expirable, otherwise empty. */
 };
 
+void ofproto_rule_ref(struct rule *);
+void ofproto_rule_unref(struct rule *);
+
 /* A set of actions within a "struct rule".
  *
  *
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 9399d41..7cdca26 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -197,7 +197,6 @@ static int init_ports(struct ofproto *);
 static void reinit_ports(struct ofproto *);
 
 /* rule. */
-static void ofproto_rule_destroy(struct rule *);
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 static bool rule_is_modifiable(const struct rule *);
@@ -2294,12 +2293,30 @@ update_mtu(struct ofproto *p, struct ofport *port)
     }
 }
 
-static void
-ofproto_rule_destroy(struct rule *rule)
+void
+ofproto_rule_ref(struct rule *rule)
 {
     if (rule) {
-        rule->ofproto->ofproto_class->rule_destruct(rule);
-        ofproto_rule_destroy__(rule);
+        unsigned int orig;
+
+        atomic_add(&rule->ref_count, 1, &orig);
+        ovs_assert(orig != 0);
+    }
+}
+
+void
+ofproto_rule_unref(struct rule *rule)
+{
+    if (rule) {
+        unsigned int orig;
+
+        atomic_sub(&rule->ref_count, 1, &orig);
+        if (orig == 1) {
+            rule->ofproto->ofproto_class->rule_destruct(rule);
+            ofproto_rule_destroy__(rule);
+        } else {
+            ovs_assert(orig != 0);
+        }
     }
 }
 
@@ -3596,6 +3613,7 @@ add_flow(struct ofproto *ofproto, struct ofconn *ofconn,
     /* Initialize base state. */
     rule->ofproto = ofproto;
     cls_rule_move(&rule->cr, &cr);
+    atomic_init(&rule->ref_count, 1);
     rule->pending = NULL;
     rule->flow_cookie = fm->new_cookie;
     rule->created = rule->modified = rule->used = time_msec();
@@ -5541,13 +5559,13 @@ ofopgroup_complete(struct ofopgroup *group)
             } else {
                 ovs_rwlock_wrlock(&rule->rwlock);
                 oftable_remove_rule(rule);
-                ofproto_rule_destroy(rule);
+                ofproto_rule_unref(rule);
             }
             break;
 
         case OFOPERATION_DELETE:
             ovs_assert(!op->error);
-            ofproto_rule_destroy(rule);
+            ofproto_rule_unref(rule);
             op->rule = NULL;
             break;
 
-- 
1.7.10.4




More information about the dev mailing list