[ovs-dev] [PATCH 6/7] lib/classifier: Lockless and robust classifier iteration.

Jarno Rajahalme jrajahalme at nicira.com
Fri Oct 24 20:36:40 UTC 2014


Previously, accurate iteration required writers to be excluded during
iteration.  This patch changes the structure of the classifier by
moving the list of rules from struct cls_match to struct cls_subtable.
The list element is also moved from the struct cls_match to struct
cls_rule, which makes iteration more straightforward, and allows the
iterators to remain ignorant of the internals of the cls_match.  These
changes allow iteration of rules in the classifier by traversing the
RCU-friendly subtables vector, and the rculist of rules in each
subtable.  Classifier modifications may be performed concurrently, but
whether or not the concurrent iterator sees those changes depends on
the timing of change.  This is similar to having writers excluded by a
mutex, where visibility of changes depends on the timing of mutex
acquisition.

The subtable's rculist also allows to make
classifier_find_rule_exactly() and classifier_rule_overlaps()
lockless.

Signed-off-by: Jarno Rajahalme <jrajahalme at nicira.com>
---
 lib/classifier-private.h   |   72 +++--
 lib/classifier.c           |  622 +++++++++++++++++++++-----------------------
 lib/classifier.h           |   83 +++---
 lib/flow.c                 |    4 +-
 lib/pvector.h              |    7 +
 ofproto/connmgr.c          |    2 +-
 ofproto/fail-open.h        |    2 +-
 ofproto/ofproto-dpif.c     |    2 +-
 ofproto/ofproto-provider.h |    4 +-
 ofproto/ofproto.c          |    8 +-
 tests/test-classifier.c    |   38 +--
 utilities/ovs-ofctl.c      |   12 +-
 12 files changed, 442 insertions(+), 414 deletions(-)

diff --git a/lib/classifier-private.h b/lib/classifier-private.h
index 10d29a5..1ab76fc 100644
--- a/lib/classifier-private.h
+++ b/lib/classifier-private.h
@@ -19,23 +19,23 @@
 
 #include "flow.h"
 #include "hash.h"
-#include "cmap.h"
-#include "list.h"
 #include "tag.h"
 
 /* Classifier internal definitions, subject to change at any time. */
 
 /* A set of rules that all have the same fields wildcarded. */
 struct cls_subtable {
-    /* The fields are only used by writers and iterators. */
-    struct cmap_node cmap_node; /* Within struct classifier 'subtables_map'. */
-
-    /* The fields are only used by writers. */
-    int n_rules OVS_GUARDED;                /* Number of rules, including
-                                             * duplicates. */
+    /* These fields are only used by writers. */
+    struct cmap_node cmap_node OVS_GUARDED; /* Within struct classifier's
+                                             * 'subtables_map'. */
+    unsigned int max_count OVS_GUARDED;     /* Count of max_priority rules. */
     unsigned int max_priority OVS_GUARDED;  /* Max priority of any rule in
                                              * the subtable. */
-    unsigned int max_count OVS_GUARDED;     /* Count of max_priority rules. */
+    /* Accessed by iterators. */
+    struct cls_list rules_list;             /* Identical rules are adjacent. */
+
+    /* Identical, but lower priority rules are not inserted to any of the
+     * following data structures. */
 
     /* These fields are accessed by readers who care about wildcarding. */
     tag_type tag;       /* Tag generated from mask for partitioning (const). */
@@ -65,20 +65,17 @@ struct cls_partition {
 
 /* Internal representation of a rule in a "struct cls_subtable". */
 struct cls_match {
-    /* Accessed only by writers and iterators. */
-    struct list list OVS_GUARDED; /* List of identical, lower-priority rules. */
-
     /* Accessed only by writers. */
     struct cls_partition *partition OVS_GUARDED;
 
     /* Accessed by readers interested in wildcarding. */
-    unsigned int priority;      /* Larger numbers are higher priorities. */
+    const unsigned int priority; /* Larger numbers are higher priorities. */
     struct cmap_node index_nodes[CLS_MAX_INDICES]; /* Within subtable's
                                                     * 'indices'. */
     /* Accessed by all readers. */
     struct cmap_node cmap_node; /* Within struct cls_subtable 'rules'. */
-    struct cls_rule *cls_rule;
-    struct miniflow flow;       /* Matching rule. Mask is in the subtable. */
+    const struct cls_rule *cls_rule;
+    const struct miniflow flow; /* Matching rule. Mask is in the subtable. */
     /* 'flow' must be the last field. */
 };
 
@@ -93,6 +90,51 @@ struct trie_node {
 /* Max bits per node.  Must fit in struct trie_node's 'prefix'.
  * Also tested with 16, 8, and 5 to stress the implementation. */
 #define TRIE_PREFIX_BITS 32
+
+
+static inline const struct cls_rule *
+next_rule_in_list__(const struct cls_rule *rule)
+{
+    return CONTAINER_OF(rculist_next(&rule->node.list), struct cls_rule,
+                        node.list);
+}
+
+/* 'prev' may not be the list head, but 'rule' can be.  List head is not really
+ * a struct cls_rule, so it does not have a 'match'. */
+static inline bool
+rule_lower_and_equal(const struct cls_rule *prev, const struct cls_rule *rule)
+{
+    /* When 'rule' is the list head, the priority comparison will fail and
+     * no attempt is made at comparing a non-existent match. */
+    return rule->node.priority < prev->node.priority
+        && minimatch_equal(&rule->match, &prev->match);
+}
+
+/* Iterates ITER over RULE and all of the equal cls_rules.  Iteration is RCU
+ * protected.
+ * At the end of the iteration 'RULE' points to the next rule (or list head)
+ * after all the equal ones. */
+#define FOR_EACH_RULE_IN_LIST(RULE, HEAD)   \
+    for (const struct cls_rule *prev__ = (RULE) = (HEAD)->cls_rule;   \
+         prev__ == (RULE) || rule_lower_and_equal(prev__, RULE);      \
+         prev__ = (RULE), (RULE) = next_rule_in_list__(RULE))
+
+static inline struct cls_rule *
+next_rule_in_list_protected(const struct cls_rule *rule)
+{
+    return CONTAINER_OF(rculist_next_protected(&rule->node.list),
+                        struct cls_rule, node.list);
+}
+
+/* Iterates ITER over RULE and all of the equal cls_rules.
+ * Iteration is RCU protected.
+ * At the end of the iteration 'RULE' points to the next rule (or list head)
+ * after all the equal ones. */
+#define FOR_EACH_RULE_IN_LIST_PROTECTED(RULE, HEAD)                     \
+    for (const struct cls_rule *prev__ = (RULE) =                       \
+             CONST_CAST(struct cls_rule *, (HEAD)->cls_rule);           \
+         prev__ == (RULE) || rule_lower_and_equal(prev__, RULE);        \
+         prev__ = (RULE), (RULE) = next_rule_in_list_protected(RULE))
 
 /* flow/miniflow/minimask/minimatch utilities.
  * These are only used by the classifier, so place them here to allow
diff --git a/lib/classifier.c b/lib/classifier.c
index 05e7b2b..c6391c8 100644
--- a/lib/classifier.c
+++ b/lib/classifier.c
@@ -43,47 +43,29 @@ cls_match_alloc(struct cls_rule *rule)
     struct cls_match *cls_match
         = xmalloc(sizeof *cls_match - sizeof cls_match->flow.inline_values
                   + MINIFLOW_VALUES_SIZE(count));
-
-    cls_match->cls_rule = rule;
-    miniflow_clone_inline(&cls_match->flow, &rule->match.flow, count);
-    cls_match->priority = rule->priority;
-    rule->cls_match = cls_match;
-
+    *CONST_CAST(const struct cls_rule **, &cls_match->cls_rule) = rule;
+    *CONST_CAST(unsigned int *, &cls_match->priority) = rule->node.priority;
+    miniflow_clone_inline(CONST_CAST(struct miniflow *, &cls_match->flow),
+                          &rule->match.flow, count);
     return cls_match;
 }
 
 static struct cls_subtable *find_subtable(const struct classifier *cls,
-                                          const struct minimask *)
-    OVS_REQUIRES(cls->mutex);
+                                          const struct minimask *);
 static struct cls_subtable *insert_subtable(struct classifier *cls,
                                             const struct minimask *)
     OVS_REQUIRES(cls->mutex);
 static void destroy_subtable(struct classifier *cls, struct cls_subtable *)
     OVS_REQUIRES(cls->mutex);
-static struct cls_match *insert_rule(struct classifier *cls,
-                                     struct cls_subtable *, struct cls_rule *)
-    OVS_REQUIRES(cls->mutex);
 
-static struct cls_match *find_match_wc(const struct cls_subtable *,
-                                       const struct flow *, struct trie_ctx *,
-                                       unsigned int n_tries,
-                                       struct flow_wildcards *);
+static const struct cls_match *find_match_wc(const struct cls_subtable *,
+                                             const struct flow *,
+                                             struct trie_ctx *,
+                                             unsigned int n_tries,
+                                             struct flow_wildcards *);
 static struct cls_match *find_equal(struct cls_subtable *,
                                     const struct miniflow *, uint32_t hash);
 
-/* Iterates RULE over HEAD and all of the cls_rules on HEAD->list.
- * Classifier's mutex must be held while iterating, as the list is
- * protoceted by it. */
-#define FOR_EACH_RULE_IN_LIST(RULE, HEAD)                               \
-    for ((RULE) = (HEAD); (RULE) != NULL; (RULE) = next_rule_in_list(RULE))
-#define FOR_EACH_RULE_IN_LIST_SAFE(RULE, NEXT, HEAD)                    \
-    for ((RULE) = (HEAD);                                               \
-         (RULE) != NULL && ((NEXT) = next_rule_in_list(RULE), true);    \
-         (RULE) = (NEXT))
-
-static struct cls_match *next_rule_in_list__(struct cls_match *);
-static struct cls_match *next_rule_in_list(struct cls_match *);
-
 static unsigned int minimask_get_prefix_len(const struct minimask *,
                                             const struct mf_field *);
 static void trie_init(struct classifier *cls, int trie_idx,
@@ -108,6 +90,15 @@ static bool mask_prefix_bits_set(const struct flow_wildcards *,
 
 /* cls_rule. */
 
+static inline void
+cls_rule_init__(struct cls_rule *rule, unsigned int priority)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+    rculist_init(&rule->node.list);
+    rule->node.priority = priority;
+    rule->cls_match = NULL;
+}
+
 /* Initializes 'rule' to match packets specified by 'match' at the given
  * 'priority'.  'match' must satisfy the invariant described in the comment at
  * the definition of struct match.
@@ -120,9 +111,8 @@ void
 cls_rule_init(struct cls_rule *rule,
               const struct match *match, unsigned int priority)
 {
+    cls_rule_init__(rule, priority);
     minimatch_init(&rule->match, match);
-    rule->priority = priority;
-    rule->cls_match = NULL;
 }
 
 /* Same as cls_rule_init() for initialization from a "struct minimatch". */
@@ -131,9 +121,8 @@ cls_rule_init_from_minimatch(struct cls_rule *rule,
                              const struct minimatch *match,
                              unsigned int priority)
 {
+    cls_rule_init__(rule, priority);
     minimatch_clone(&rule->match, match);
-    rule->priority = priority;
-    rule->cls_match = NULL;
 }
 
 /* Initializes 'dst' as a copy of 'src'.
@@ -142,20 +131,23 @@ cls_rule_init_from_minimatch(struct cls_rule *rule,
 void
 cls_rule_clone(struct cls_rule *dst, const struct cls_rule *src)
 {
+    cls_rule_init__(dst, src->node.priority);
     minimatch_clone(&dst->match, &src->match);
-    dst->priority = src->priority;
-    dst->cls_match = NULL;
 }
 
 /* Initializes 'dst' with the data in 'src', destroying 'src'.
+ * 'src' must be a cls_rule NOT in a classifier.
  *
  * The caller must eventually destroy 'dst' with cls_rule_destroy(). */
 void
 cls_rule_move(struct cls_rule *dst, struct cls_rule *src)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
+#ifndef NDEBUG
+    ovs_assert(!src->cls_match);   /* Must not be in a classifier. */
+#endif
+    cls_rule_init__(dst, src->node.priority);
     minimatch_move(&dst->match, &src->match);
-    dst->priority = src->priority;
-    dst->cls_match = NULL;
 }
 
 /* Frees memory referenced by 'rule'.  Doesn't free 'rule' itself (it's
@@ -164,8 +156,17 @@ cls_rule_move(struct cls_rule *dst, struct cls_rule *src)
  * ('rule' must not currently be in a classifier.) */
 void
 cls_rule_destroy(struct cls_rule *rule)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    ovs_assert(!rule->cls_match);
+#ifndef NDEBUG
+    ovs_assert(!rule->cls_match);   /* Must not be in a classifier. */
+
+    /* Check that the rule has been properly removed from the classifier and
+     * that the destruction only happens after the RCU grace period, or that
+     * the rule was never inserted to the classifier in the first place. */
+    ovs_assert(rculist_next_protected(&rule->node.list) == RCULIST_POISON
+               || rculist_is_empty(&rule->node.list));
+#endif
     minimatch_destroy(&rule->match);
 }
 
@@ -174,21 +175,22 @@ cls_rule_destroy(struct cls_rule *rule)
 bool
 cls_rule_equal(const struct cls_rule *a, const struct cls_rule *b)
 {
-    return a->priority == b->priority && minimatch_equal(&a->match, &b->match);
+    return a->node.priority == b->node.priority
+        && minimatch_equal(&a->match, &b->match);
 }
 
 /* Returns a hash value for 'rule', folding in 'basis'. */
 uint32_t
 cls_rule_hash(const struct cls_rule *rule, uint32_t basis)
 {
-    return minimatch_hash(&rule->match, hash_int(rule->priority, basis));
+    return minimatch_hash(&rule->match, hash_int(rule->node.priority, basis));
 }
 
 /* Appends a string describing 'rule' to 's'. */
 void
 cls_rule_format(const struct cls_rule *rule, struct ds *s)
 {
-    minimatch_format(&rule->match, s, rule->priority);
+    minimatch_format(&rule->match, s, rule->node.priority);
 }
 
 /* Returns true if 'rule' matches every packet, false otherwise. */
@@ -363,11 +365,7 @@ trie_init(struct classifier *cls, int trie_idx, const struct mf_field *field)
             struct cls_match *head;
 
             CMAP_FOR_EACH (head, cmap_node, &subtable->rules) {
-                struct cls_match *match;
-
-                FOR_EACH_RULE_IN_LIST (match, head) {
-                    trie_insert(trie, match->cls_rule, plen);
-                }
+                trie_insert(trie, head->cls_rule, plen);
             }
         }
         /* Initialize subtable's prefix length on this field.  This will
@@ -441,52 +439,92 @@ static inline ovs_be32 minimatch_get_ports(const struct minimatch *match)
         & MINIFLOW_GET_BE32(&match->mask.masks, tp_src);
 }
 
+static void
+subtable_replace_head_rule(struct classifier *cls OVS_UNUSED,
+                           struct cls_subtable *subtable,
+                           struct cls_match *head, struct cls_match *new,
+                           uint32_t hash, uint32_t ihash[CLS_MAX_INDICES])
+    OVS_REQUIRES(cls->mutex)
+{
+    /* Rule's data is already in the tries. */
+
+    new->partition = head->partition; /* Steal partition, if any. */
+    head->partition = NULL;
+
+    for (int i = 0; i < subtable->n_indices; i++) {
+        cmap_replace(&subtable->indices[i], &head->index_nodes[i],
+                     &new->index_nodes[i], ihash[i]);
+    }
+    cmap_replace(&subtable->rules, &head->cmap_node, &new->cmap_node,
+                 hash);
+}
+
 /* Inserts 'rule' into 'cls'.  Until 'rule' is removed from 'cls', the caller
  * must not modify or free it.
  *
  * If 'cls' already contains an identical rule (including wildcards, values of
  * fixed fields, and priority), replaces the old rule by 'rule' and returns the
  * rule that was replaced.  The caller takes ownership of the returned rule and
- * is thus responsible for destroying it with cls_rule_destroy(), freeing the
- * memory block in which it resides, etc., as necessary.
+ * is thus responsible for destroying it with cls_rule_destroy(), after RCU
+ * grace period has passed (see ovsrcu_postpone()).
  *
  * Returns NULL if 'cls' does not contain a rule with an identical key, after
  * inserting the new rule.  In this case, no rules are displaced by the new
  * rule, even rules that cannot have any effect because the new rule matches a
- * superset of their flows and has higher priority. */
+ * superset of their flows and has higher priority.
+ *
+ * As the readers are operating concurrently with the modifications, a
+ * concurrent reader may or may not see the new rule, depending on how the
+ * concurrent events overlap with each other.
+ *
+ * The new rule is first added to the segment indices, so the readers may find
+ * the rule in the indices before the rule is visible in the subtables 'rules'
+ * map.  This may result in us losing the opportunity to quit lookups earlier,
+ * resulting in sub-optimal wildcarding.  This will be fixed by forthcoming
+ * revalidation always scheduled after flow table changes.
+ *
+ * The subtable's max priority is updated only after the rule is inserted, so
+ * the concurrent readers may not see the rule, as the updated priority ordered
+ * subtable list will only be visible after the subtable's max priority is
+ * updated. */
 struct cls_rule *
 classifier_replace(struct classifier *cls, struct cls_rule *rule)
     OVS_EXCLUDED(cls->mutex)
 {
-    struct cls_match *old_rule;
     struct cls_subtable *subtable;
-    struct cls_rule *old_cls_rule = NULL;
+    struct cls_match *head;
+    int i;
+    uint32_t basis = 0, hash, ihash[CLS_MAX_INDICES];
+    uint8_t prev_be32ofs = 0;
+    size_t n_rules = 0;
+    struct cls_match *new = cls_match_alloc(rule);
+    struct cls_rule *iter = NULL;
 
     ovs_mutex_lock(&cls->mutex);
+    rule->cls_match = new;
+
     subtable = find_subtable(cls, &rule->match.mask);
     if (!subtable) {
         subtable = insert_subtable(cls, &rule->match.mask);
     }
 
-    old_rule = insert_rule(cls, subtable, rule);
-    if (!old_rule) {
-        old_cls_rule = NULL;
-
-        rule->cls_match->partition = NULL;
-        if (minimask_get_metadata_mask(&rule->match.mask) == OVS_BE64_MAX) {
-            ovs_be64 metadata = miniflow_get_metadata(&rule->match.flow);
-            rule->cls_match->partition = create_partition(cls, subtable,
-                                                          metadata);
-        }
-
-        cls->n_rules++;
+    /* Compute hashes in segments. */
+    for (i = 0; i < subtable->n_indices; i++) {
+        ihash[i] = minimatch_hash_range(&rule->match, prev_be32ofs,
+                                        subtable->index_ofs[i], &basis);
+        prev_be32ofs = subtable->index_ofs[i];
+    }
+    hash = minimatch_hash_range(&rule->match, prev_be32ofs, FLOW_U32S,
+                                &basis);
 
-        for (int i = 0; i < cls->n_tries; i++) {
+    head = find_equal(subtable, &rule->match.flow, hash);
+    if (!head) {
+        /* Tries. */
+        for (i = 0; i < cls->n_tries; i++) {
             if (subtable->trie_plen[i]) {
                 trie_insert(&cls->tries[i], rule, subtable->trie_plen[i]);
             }
         }
-
         /* Ports trie. */
         if (subtable->ports_mask_len) {
             /* We mask the value to be inserted to always have the wildcarded
@@ -498,17 +536,82 @@ classifier_replace(struct classifier *cls, struct cls_rule *rule)
             trie_insert_prefix(&subtable->ports_trie, &masked_ports,
                                subtable->ports_mask_len);
         }
-    } else {
-        old_cls_rule = old_rule->cls_rule;
-        rule->cls_match->partition = old_rule->partition;
-        old_cls_rule->cls_match = NULL;
 
-        /* 'old_rule' contains a cmap_node, which may not be freed
-         * immediately. */
-        ovsrcu_postpone(free, old_rule);
+        new->partition = NULL;
+        if (minimask_get_metadata_mask(&rule->match.mask) == OVS_BE64_MAX) {
+            ovs_be64 metadata = miniflow_get_metadata(&rule->match.flow);
+
+            new->partition = create_partition(cls, subtable, metadata);
+        }
+
+        /* Make rule visible to lookups. */
+        for (i = 0; i < subtable->n_indices; i++) {
+            cmap_insert(&subtable->indices[i], &new->index_nodes[i], ihash[i]);
+        }
+        n_rules = cmap_insert(&subtable->rules, &new->cmap_node, hash);
+
+        /* Make rule visible to iterators. */
+        rculist_push_back(&subtable->rules_list.list, &rule->node.list);
+    } else {   /* Equal rules exist in the classifier already. */
+        bool replace = false;
+
+        /* Scan the list for the insertion point that will keep the list in
+         * order of decreasing priority. */
+        FOR_EACH_RULE_IN_LIST_PROTECTED (iter, head) {
+            if (rule->node.priority >= iter->node.priority) {
+                if (rule->node.priority == iter->node.priority) {
+                    replace = true;   /* 'rule' replaces 'iter'. */
+                }
+                break;
+            }
+        }
+        /* 'iter' now at the insertion point. */
+
+        /* Replace the existing head in data structures, if rule is the new
+         * head. */
+        if (iter == head->cls_rule) {
+            subtable_replace_head_rule(cls, subtable, head, new, hash, ihash);
+        }
+
+        if (replace) {
+            /* This makes the new rule visible to iterators. */
+            rculist_replace(&rule->node.list, &iter->node.list);
+
+            ovsrcu_postpone(free, iter->cls_match);
+            iter->cls_match = NULL;
+
+            /* No change in subtable's max priority or max count. */
+
+            /* Return displaced rule.  Caller is responsible for keeping it
+             * around until all threads quiesce. */
+            ovs_mutex_unlock(&cls->mutex);
+            return iter;
+        }
+
+        /* Insert 'rule' right before 'iter'. */
+        rculist_insert(&iter->node.list, &rule->node.list);
+    }
+
+    /* Update 'subtable's 'max_priority' and 'max_count', if necessary. */
+    if (n_rules == 1) {
+        subtable->max_priority = rule->node.priority;
+        subtable->max_count = 1;
+        pvector_insert(&cls->subtables, subtable, rule->node.priority);
+    } else {
+        if (rule->node.priority == subtable->max_priority) {
+            ++subtable->max_count;
+        } else if (rule->node.priority > subtable->max_priority) {
+            subtable->max_priority = rule->node.priority;
+            subtable->max_count = 1;
+            pvector_change_priority(&cls->subtables, subtable,
+                                    rule->node.priority);
+        }
     }
+
+    /* Nothing was replaced. */
+    cls->n_rules++;
     ovs_mutex_unlock(&cls->mutex);
-    return old_cls_rule;
+    return NULL;
 }
 
 /* Inserts 'rule' into 'cls'.  Until 'rule' is removed from 'cls', the caller
@@ -521,7 +624,11 @@ void
 classifier_insert(struct classifier *cls, struct cls_rule *rule)
 {
     struct cls_rule *displaced_rule = classifier_replace(cls, rule);
+#ifndef NDEBUG
     ovs_assert(!displaced_rule);
+#else
+    (void)displaced_rule;
+#endif
 }
 
 /* Removes 'rule' from 'cls'.  It is the caller's responsibility to destroy
@@ -538,11 +645,13 @@ classifier_remove(struct classifier *cls, struct cls_rule *rule)
 {
     struct cls_partition *partition;
     struct cls_match *cls_match;
-    struct cls_match *head;
     struct cls_subtable *subtable;
+    struct rculist *prev;
+    struct cls_rule *next;
     int i;
     uint32_t basis = 0, hash, ihash[CLS_MAX_INDICES];
     uint8_t prev_be32ofs = 0;
+    size_t n_rules;
 
     ovs_mutex_lock(&cls->mutex);
     cls_match = rule->cls_match;
@@ -550,10 +659,43 @@ classifier_remove(struct classifier *cls, struct cls_rule *rule)
         rule = NULL;
         goto unlock; /* Already removed. */
     }
+    rule->cls_match = NULL; /* Mark as removed. */
+
+    prev = rculist_back_protected(&rule->node.list);
+    next = next_rule_in_list_protected(rule);
+    /* Remove 'rule' from the 'cls->rules' list. */
+    rculist_remove(&rule->node.list);
 
     subtable = find_subtable(cls, &rule->match.mask);
     ovs_assert(subtable);
 
+    /* Check if this is NOT a head rule. */
+    if (prev != &subtable->rules_list.list &&
+        rule_lower_and_equal(CONTAINER_OF(prev, struct cls_rule, node.list),
+                             rule)) {
+        /* Not the highest priority rule, no need to check subtable's
+         * 'max_priority'. */
+        goto free;
+    }
+
+    for (i = 0; i < subtable->n_indices; i++) {
+        ihash[i] = minimatch_hash_range(&rule->match, prev_be32ofs,
+                                        subtable->index_ofs[i], &basis);
+        prev_be32ofs = subtable->index_ofs[i];
+    }
+    hash = minimatch_hash_range(&rule->match, prev_be32ofs, FLOW_U32S, &basis);
+
+    /* Head rule.  Check if 'next' is an identical, lower-priority rule that
+     * will replace 'rule' in the data structures. */
+    if (rule_lower_and_equal(rule, next)) {
+        subtable_replace_head_rule(cls, subtable, cls_match, next->cls_match,
+                                   hash, ihash);
+        goto check_priority;
+    }
+
+    /* 'rule' is last of the kind in the classifier, must remove from all the
+     * data structures. */
+
     if (subtable->ports_mask_len) {
         ovs_be32 masked_ports = minimatch_get_ports(&rule->match);
 
@@ -568,27 +710,10 @@ classifier_remove(struct classifier *cls, struct cls_rule *rule)
 
     /* Remove rule node from indices. */
     for (i = 0; i < subtable->n_indices; i++) {
-        ihash[i] = minimatch_hash_range(&rule->match, prev_be32ofs,
-                                        subtable->index_ofs[i], &basis);
         cmap_remove(&subtable->indices[i], &cls_match->index_nodes[i],
                     ihash[i]);
-        prev_be32ofs = subtable->index_ofs[i];
-    }
-    hash = minimatch_hash_range(&rule->match, prev_be32ofs, FLOW_U32S, &basis);
-
-    head = find_equal(subtable, &rule->match.flow, hash);
-    if (head != cls_match) {
-        list_remove(&cls_match->list);
-    } else if (list_is_empty(&cls_match->list)) {
-        cmap_remove(&subtable->rules, &cls_match->cmap_node, hash);
-    } else {
-        struct cls_match *next = CONTAINER_OF(cls_match->list.next,
-                                              struct cls_match, list);
-
-        list_remove(&cls_match->list);
-        cmap_replace(&subtable->rules, &cls_match->cmap_node,
-                     &next->cmap_node, hash);
     }
+    n_rules = cmap_remove(&subtable->rules, &cls_match->cmap_node, hash);
 
     partition = cls_match->partition;
     if (partition) {
@@ -601,30 +726,31 @@ classifier_remove(struct classifier *cls, struct cls_rule *rule)
         }
     }
 
-    if (--subtable->n_rules == 0) {
+    if (n_rules == 0) {
         destroy_subtable(cls, subtable);
-    } else if (subtable->max_priority == cls_match->priority
-               && --subtable->max_count == 0) {
-        /* Find the new 'max_priority' and 'max_count'. */
-        struct cls_match *head;
-        unsigned int max_priority = 0;
-
-        CMAP_FOR_EACH (head, cmap_node, &subtable->rules) {
-            if (head->priority > max_priority) {
-                max_priority = head->priority;
-                subtable->max_count = 1;
-            } else if (head->priority == max_priority) {
-                ++subtable->max_count;
+    } else {
+check_priority:
+        if (subtable->max_priority == rule->node.priority
+            && --subtable->max_count == 0) {
+            /* Find the new 'max_priority' and 'max_count'. */
+            struct cls_match *head;
+            unsigned int max_priority = 0;
+
+            CMAP_FOR_EACH (head, cmap_node, &subtable->rules) {
+                if (head->priority > max_priority) {
+                    max_priority = head->priority;
+                    subtable->max_count = 1;
+                } else if (head->priority == max_priority) {
+                    ++subtable->max_count;
+                }
             }
+            subtable->max_priority = max_priority;
+            pvector_change_priority(&cls->subtables, subtable, max_priority);
         }
-        subtable->max_priority = max_priority;
-        pvector_change_priority(&cls->subtables, subtable, max_priority);
     }
-
-    cls->n_rules--;
-
+free:
     ovsrcu_postpone(free, cls_match);
-    rule->cls_match = NULL;
+    cls->n_rules--;
 unlock:
     ovs_mutex_unlock(&cls->mutex);
 
@@ -662,7 +788,7 @@ trie_ctx_init(struct trie_ctx *ctx, const struct cls_trie *trie)
  * set of bits that were significant in the lookup.  At some point
  * earlier, 'wc' should have been initialized (e.g., by
  * flow_wildcards_init_catchall()). */
-struct cls_rule *
+const struct cls_rule *
 classifier_lookup(const struct classifier *cls, const struct flow *flow,
                   struct flow_wildcards *wc)
 {
@@ -710,7 +836,7 @@ classifier_lookup(const struct classifier *cls, const struct flow *flow,
     best = NULL;
     PVECTOR_FOR_EACH_PRIORITY(subtable, best_priority, 2,
                               sizeof(struct cls_subtable), &cls->subtables) {
-        struct cls_match *rule;
+        const struct cls_match *rule;
 
         if (!tag_intersects(tags, subtable->tag)) {
             continue;
@@ -729,48 +855,42 @@ classifier_lookup(const struct classifier *cls, const struct flow *flow,
 /* Finds and returns a rule in 'cls' with exactly the same priority and
  * matching criteria as 'target'.  Returns a null pointer if 'cls' doesn't
  * contain an exact match. */
-struct cls_rule *
+const struct cls_rule *
 classifier_find_rule_exactly(const struct classifier *cls,
                              const struct cls_rule *target)
-    OVS_EXCLUDED(cls->mutex)
 {
-    struct cls_match *head, *rule;
+    const struct cls_rule *rule;
+    const struct cls_match *head;
     struct cls_subtable *subtable;
 
-    ovs_mutex_lock(&cls->mutex);
     subtable = find_subtable(cls, &target->match.mask);
     if (!subtable) {
-        goto out;
-    }
-
-    /* Skip if there is no hope. */
-    if (target->priority > subtable->max_priority) {
-        goto out;
+        return NULL;
     }
 
     head = find_equal(subtable, &target->match.flow,
                       miniflow_hash_in_minimask(&target->match.flow,
                                                 &target->match.mask, 0));
+    if (!head) {
+        return NULL;
+    }
     FOR_EACH_RULE_IN_LIST (rule, head) {
-        if (target->priority >= rule->priority) {
-            ovs_mutex_unlock(&cls->mutex);
-            return target->priority == rule->priority ? rule->cls_rule : NULL;
+        if (target->node.priority >= rule->node.priority) {
+            return target->node.priority == rule->node.priority ? rule : NULL;
         }
     }
-out:
-    ovs_mutex_unlock(&cls->mutex);
     return NULL;
 }
 
 /* Finds and returns a rule in 'cls' with priority 'priority' and exactly the
  * same matching criteria as 'target'.  Returns a null pointer if 'cls' doesn't
  * contain an exact match. */
-struct cls_rule *
+const struct cls_rule *
 classifier_find_match_exactly(const struct classifier *cls,
                               const struct match *target,
                               unsigned int priority)
 {
-    struct cls_rule *retval;
+    const struct cls_rule *retval;
     struct cls_rule cr;
 
     cls_rule_init(&cr, target, priority);
@@ -782,42 +902,36 @@ classifier_find_match_exactly(const struct classifier *cls,
 
 /* Checks if 'target' would overlap any other rule in 'cls'.  Two rules are
  * considered to overlap if both rules have the same priority and a packet
- * could match both. */
+ * could match both.
+ *
+ * A trivial example of overlapping rules is two rules matching disjoint sets
+ * of fields. E.g., if one rule matches only on port number, while another only
+ * on dl_type, any packet from that specific port and with that specific
+ * dl_type could match both, if the rules also have the same priority. */
 bool
 classifier_rule_overlaps(const struct classifier *cls,
                          const struct cls_rule *target)
-    OVS_EXCLUDED(cls->mutex)
 {
     struct cls_subtable *subtable;
-    int64_t stop_at_priority = (int64_t)target->priority - 1;
+    int64_t stop_at_priority = (int64_t)target->node.priority - 1;
 
-    ovs_mutex_lock(&cls->mutex);
     /* Iterate subtables in the descending max priority order. */
     PVECTOR_FOR_EACH_PRIORITY (subtable, stop_at_priority, 2,
                                sizeof(struct cls_subtable), &cls->subtables) {
         uint32_t storage[FLOW_U32S];
         struct minimask mask;
-        struct cls_match *head;
+        const struct cls_rule *rule;
 
         minimask_combine(&mask, &target->match.mask, &subtable->mask, storage);
-        CMAP_FOR_EACH (head, cmap_node, &subtable->rules) {
-            struct cls_match *rule;
 
-            FOR_EACH_RULE_IN_LIST (rule, head) {
-                if (rule->priority < target->priority) {
-                    break; /* Rules in descending priority order. */
-                }
-                if (rule->priority == target->priority
-                    && miniflow_equal_in_minimask(&target->match.flow,
-                                                  &rule->flow, &mask)) {
-                    ovs_mutex_unlock(&cls->mutex);
-                    return true;
-                }
+        RCULIST_FOR_EACH (rule, node.list, &subtable->rules_list.list) {
+            if (rule->node.priority == target->node.priority
+                && miniflow_equal_in_minimask(&target->match.flow,
+                                              &rule->match.flow, &mask)) {
+                return true;
             }
         }
     }
-
-    ovs_mutex_unlock(&cls->mutex);
     return false;
 }
 
@@ -853,7 +967,7 @@ classifier_rule_overlaps(const struct classifier *cls,
  * This is the matching rule used by OpenFlow 1.0 non-strict OFPT_FLOW_MOD
  * commands and by OpenFlow 1.0 aggregate and flow stats.
  *
- * Ignores rule->priority. */
+ * Ignores rule->node.priority. */
 bool
 cls_rule_is_loose_match(const struct cls_rule *rule,
                         const struct minimatch *criteria)
@@ -866,24 +980,23 @@ cls_rule_is_loose_match(const struct cls_rule *rule,
 /* Iteration. */
 
 static bool
-rule_matches(const struct cls_match *rule, const struct cls_rule *target)
+rule_matches(const struct cls_rule *rule, const struct cls_rule *target)
 {
     return (!target
-            || miniflow_equal_in_minimask(&rule->flow,
+            || miniflow_equal_in_minimask(&rule->match.flow,
                                           &target->match.flow,
                                           &target->match.mask));
 }
 
-static struct cls_match *
+static struct cls_rule *
 search_subtable(const struct cls_subtable *subtable,
                 struct cls_cursor *cursor)
 {
     if (!cursor->target
         || !minimask_has_extra(&subtable->mask, &cursor->target->match.mask)) {
-        struct cls_match *rule;
+        struct cls_rule *rule;
 
-        CMAP_CURSOR_FOR_EACH (rule, cmap_node, &cursor->rules,
-                              &subtable->rules) {
+        RCULIST_FOR_EACH (rule, node.list, &subtable->rules_list.list) {
             if (rule_matches(rule, cursor->target)) {
                 return rule;
             }
@@ -900,69 +1013,56 @@ search_subtable(const struct cls_subtable *subtable,
  *     - If 'target' is nonnull, the cursor will visit each 'rule' in 'cls'
  *       such that cls_rule_is_loose_match(rule, target) returns true.
  *
- * Ignores target->priority. */
+ * Ignores target->node.priority. */
 struct cls_cursor cls_cursor_start(const struct classifier *cls,
-                                   const struct cls_rule *target,
-                                   bool safe)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
+                                   const struct cls_rule *target)
 {
     struct cls_cursor cursor;
     struct cls_subtable *subtable;
 
-    cursor.safe = safe;
     cursor.cls = cls;
     cursor.target = target && !cls_rule_is_catchall(target) ? target : NULL;
     cursor.rule = NULL;
 
     /* Find first rule. */
-    ovs_mutex_lock(&cursor.cls->mutex);
-    CMAP_CURSOR_FOR_EACH (subtable, cmap_node, &cursor.subtables,
-                          &cursor.cls->subtables_map) {
-        struct cls_match *rule = search_subtable(subtable, &cursor);
+    PVECTOR_CURSOR_FOR_EACH (subtable, &cursor.subtables,
+                             &cursor.cls->subtables) {
+        struct cls_rule *rule = search_subtable(subtable, &cursor);
 
         if (rule) {
             cursor.subtable = subtable;
-            cursor.rule = rule->cls_rule;
+            cursor.rule = rule;
             break;
         }
     }
 
-    /* Leave locked if requested and have a rule. */
-    if (safe || !cursor.rule) {
-        ovs_mutex_unlock(&cursor.cls->mutex);
-    }
     return cursor;
 }
 
-static struct cls_rule *
+static const struct cls_rule *
 cls_cursor_next(struct cls_cursor *cursor)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    struct cls_match *rule = cursor->rule->cls_match;
+    const struct cls_rule *rule, *next;
     const struct cls_subtable *subtable;
-    struct cls_match *next;
 
+    rule = cursor->rule;
     next = next_rule_in_list__(rule);
-    if (next->priority < rule->priority) {
-        return next->cls_rule;
+    if (rule_lower_and_equal(rule, next)) {
+        return next;   /* Identical rule (but different priority). */
     }
 
-    /* 'next' is the head of the list, that is, the rule that is included in
-     * the subtable's map.  (This is important when the classifier contains
-     * rules that differ only in priority.) */
-    rule = next;
-    CMAP_CURSOR_FOR_EACH_CONTINUE (rule, cmap_node, &cursor->rules) {
+    subtable = cursor->subtable;
+    RCULIST_FOR_EACH_CONTINUE (rule, node.list, &subtable->rules_list.list) {
         if (rule_matches(rule, cursor->target)) {
-            return rule->cls_rule;
+            return rule;
         }
     }
 
-    subtable = cursor->subtable;
-    CMAP_CURSOR_FOR_EACH_CONTINUE (subtable, cmap_node, &cursor->subtables) {
+    PVECTOR_CURSOR_FOR_EACH_CONTINUE (subtable, &cursor->subtables) {
         rule = search_subtable(subtable, cursor);
         if (rule) {
             cursor->subtable = subtable;
-            return rule->cls_rule;
+            return rule;
         }
     }
 
@@ -973,20 +1073,12 @@ cls_cursor_next(struct cls_cursor *cursor)
  * or to null if all matching rules have been visited. */
 void
 cls_cursor_advance(struct cls_cursor *cursor)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
-    if (cursor->safe) {
-        ovs_mutex_lock(&cursor->cls->mutex);
-    }
     cursor->rule = cls_cursor_next(cursor);
-    if (cursor->safe || !cursor->rule) {
-        ovs_mutex_unlock(&cursor->cls->mutex);
-    }
 }
 
 static struct cls_subtable *
 find_subtable(const struct classifier *cls, const struct minimask *mask)
-    OVS_REQUIRES(cls->mutex)
 {
     struct cls_subtable *subtable;
 
@@ -1058,11 +1150,17 @@ insert_subtable(struct classifier *cls, const struct minimask *mask)
     subtable->ports_mask_len
         = 32 - ctz32(ntohl(MINIFLOW_GET_BE32(&mask->masks, tp_src)));
 
+    /* List of rules. */
+    rculist_init(&subtable->rules_list.list);
+    /* No rule has higher priority. */
+    subtable->rules_list.priority = UINT_MAX;
+
     cmap_insert(&cls->subtables_map, &subtable->cmap_node, hash);
 
     return subtable;
 }
 
+/* RCU readers may still access the subtable before it is actually freed. */
 static void
 destroy_subtable(struct classifier *cls, struct cls_subtable *subtable)
     OVS_REQUIRES(cls->mutex)
@@ -1070,14 +1168,16 @@ destroy_subtable(struct classifier *cls, struct cls_subtable *subtable)
     int i;
 
     pvector_remove(&cls->subtables, subtable);
-    trie_destroy(&subtable->ports_trie);
-
+    cmap_remove(&cls->subtables_map, &subtable->cmap_node,
+                minimask_hash(&subtable->mask, 0));
+#ifndef NDEBUG
+    ovs_assert(ovsrcu_get_protected(struct trie_node *, &subtable->ports_trie)
+               == NULL);
+    ovs_assert(rculist_is_empty(&subtable->rules_list.list));
+#endif
     for (i = 0; i < subtable->n_indices; i++) {
         cmap_destroy(&subtable->indices[i]);
     }
-    cmap_remove(&cls->subtables_map, &subtable->cmap_node,
-                minimask_hash(&subtable->mask, 0));
-    minimask_destroy(&subtable->mask);
     cmap_destroy(&subtable->rules);
     ovsrcu_postpone(free, subtable);
 }
@@ -1176,7 +1276,7 @@ miniflow_and_mask_matches_flow(const struct miniflow *flow,
     return true;
 }
 
-static inline struct cls_match *
+static inline const struct cls_match *
 find_match(const struct cls_subtable *subtable, const struct flow *flow,
            uint32_t hash)
 {
@@ -1238,13 +1338,13 @@ fill_range_wc(const struct cls_subtable *subtable, struct flow_wildcards *wc,
     }
 }
 
-static struct cls_match *
+static const struct cls_match *
 find_match_wc(const struct cls_subtable *subtable, const struct flow *flow,
               struct trie_ctx trie_ctx[CLS_MAX_TRIES], unsigned int n_tries,
               struct flow_wildcards *wc)
 {
     uint32_t basis = 0, hash;
-    struct cls_match *rule = NULL;
+    const struct cls_match *rule = NULL;
     int i;
     struct range ofs;
 
@@ -1339,128 +1439,6 @@ find_equal(struct cls_subtable *subtable, const struct miniflow *flow,
     }
     return NULL;
 }
-
-/*
- * As the readers are operating concurrently with the modifications, a
- * concurrent reader may or may not see the new rule, depending on how
- * the concurrent events overlap with each other.  This is no
- * different from the former locked behavior, but there the visibility
- * of the new rule only depended on the timing of the locking
- * functions.
- *
- * The new rule is first added to the segment indices, so the readers
- * may find the rule in the indices before the rule is visible in the
- * subtables 'rules' map.  This may result in us losing the
- * opportunity to quit lookups earlier, resulting in sub-optimal
- * wildcarding.  This will be fixed by forthcoming revalidation always
- * scheduled after flow table changes.
- *
- * Similar behavior may happen due to us removing the overlapping rule
- * (if any) from the indices only after the new rule has been added.
- *
- * The subtable's max priority is updated only after the rule is
- * inserted, so the concurrent readers may not see the rule, as the
- * updated priority ordered subtable list will only be visible after
- * the subtable's max priority is updated.
- *
- * Similarly, the classifier's partitions for new rules are updated by
- * the caller after this function, so the readers may keep skipping
- * the subtable until they see the updated partitions.
- */
-static struct cls_match *
-insert_rule(struct classifier *cls, struct cls_subtable *subtable,
-            struct cls_rule *new_rule)
-    OVS_REQUIRES(cls->mutex)
-{
-    struct cls_match *old = NULL;
-    struct cls_match *new = cls_match_alloc(new_rule);
-    struct cls_match *head;
-    int i;
-    uint32_t basis = 0, hash, ihash[CLS_MAX_INDICES];
-    uint8_t prev_be32ofs = 0;
-
-    /* Add new node to segment indices. */
-    for (i = 0; i < subtable->n_indices; i++) {
-        ihash[i] = minimatch_hash_range(&new_rule->match, prev_be32ofs,
-                                        subtable->index_ofs[i], &basis);
-        cmap_insert(&subtable->indices[i], &new->index_nodes[i], ihash[i]);
-        prev_be32ofs = subtable->index_ofs[i];
-    }
-    hash = minimatch_hash_range(&new_rule->match, prev_be32ofs, FLOW_U32S,
-                                &basis);
-    head = find_equal(subtable, &new_rule->match.flow, hash);
-    if (!head) {
-        cmap_insert(&subtable->rules, &new->cmap_node, hash);
-        list_init(&new->list);
-        goto out;
-    } else {
-        /* Scan the list for the insertion point that will keep the list in
-         * order of decreasing priority. */
-        struct cls_match *rule;
-
-        FOR_EACH_RULE_IN_LIST (rule, head) {
-            if (new->priority >= rule->priority) {
-                if (rule == head) {
-                    /* 'new' is the new highest-priority flow in the list. */
-                    cmap_replace(&subtable->rules, &rule->cmap_node,
-                                 &new->cmap_node, hash);
-                }
-
-                if (new->priority == rule->priority) {
-                    list_replace(&new->list, &rule->list);
-                    old = rule;
-                } else {
-                    list_insert(&rule->list, &new->list);
-                }
-                goto out;
-            }
-        }
-
-        /* Insert 'new' at the end of the list. */
-        list_push_back(&head->list, &new->list);
-    }
-
- out:
-    if (!old) {
-        subtable->n_rules++;
-
-        /* Rule was added, not replaced.  Update 'subtable's 'max_priority'
-         * and 'max_count', if necessary. */
-        if (subtable->n_rules == 1) {
-            subtable->max_priority = new->priority;
-            subtable->max_count = 1;
-            pvector_insert(&cls->subtables, subtable, new->priority);
-        } else if (subtable->max_priority == new->priority) {
-            ++subtable->max_count;
-        } else if (new->priority > subtable->max_priority) {
-            subtable->max_priority = new->priority;
-            subtable->max_count = 1;
-            pvector_change_priority(&cls->subtables, subtable, new->priority);
-        }
-    } else {
-        /* Remove old node from indices. */
-        for (i = 0; i < subtable->n_indices; i++) {
-            cmap_remove(&subtable->indices[i], &old->index_nodes[i], ihash[i]);
-        }
-    }
-    return old;
-}
-
-static struct cls_match *
-next_rule_in_list__(struct cls_match *rule)
-    OVS_NO_THREAD_SAFETY_ANALYSIS
-{
-    struct cls_match *next = NULL;
-    next = OBJECT_CONTAINING(rule->list.next, next, list);
-    return next;
-}
-
-static struct cls_match *
-next_rule_in_list(struct cls_match *rule)
-{
-    struct cls_match *next = next_rule_in_list__(rule);
-    return next->priority < rule->priority ? next : NULL;
-}
 
 /* A longest-prefix match tree. */
 
diff --git a/lib/classifier.h b/lib/classifier.h
index c910ac4..f520bc1 100644
--- a/lib/classifier.h
+++ b/lib/classifier.h
@@ -218,6 +218,7 @@
 #include "meta-flow.h"
 #include "ovs-thread.h"
 #include "pvector.h"
+#include "rculist.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -241,6 +242,21 @@ enum {
     CLS_MAX_TRIES = 3      /* Maximum number of prefix trees per classifier. */
 };
 
+/* List node for subtable's rules.  Identical, lower-priority rules follow
+ * immediately, otherwise the list is not ordered.  List head has the priority
+ * of UINT_MAX (other nodes can have that priority, too). */
+struct cls_list {
+    struct rculist list;
+    unsigned int priority;       /* Larger numbers are higher priorities. */
+};
+
+/* A rule to be inserted to the classifier. */
+struct cls_rule {
+    struct cls_list node;        /* In struct cls_subtable 'rules_list'. */
+    struct cls_match *cls_match OVS_GUARDED; /* NULL if not in a classifier. */
+    struct minimatch match;      /* Matching rule. */
+};
+
 /* A flow classifier. */
 struct classifier {
     struct ovs_mutex mutex;
@@ -255,13 +271,6 @@ struct classifier {
     unsigned int n_tries;
 };
 
-/* A rule to be inserted to the classifier. */
-struct cls_rule {
-    struct minimatch match;      /* Matching rule. */
-    unsigned int priority;       /* Larger numbers are higher priorities. */
-    struct cls_match *cls_match; /* NULL if rule is not in a classifier. */
-};
-
 void cls_rule_init(struct cls_rule *, const struct match *,
                    unsigned int priority);
 void cls_rule_init_from_minimatch(struct cls_rule *, const struct minimatch *,
@@ -292,61 +301,53 @@ void classifier_insert(struct classifier *, struct cls_rule *);
 struct cls_rule *classifier_replace(struct classifier *, struct cls_rule *);
 
 struct cls_rule *classifier_remove(struct classifier *, struct cls_rule *);
-struct cls_rule *classifier_lookup(const struct classifier *,
-                                   const struct flow *,
-                                   struct flow_wildcards *);
+const struct cls_rule *classifier_lookup(const struct classifier *,
+                                         const struct flow *,
+                                         struct flow_wildcards *);
 bool classifier_rule_overlaps(const struct classifier *,
                               const struct cls_rule *);
 
-struct cls_rule *classifier_find_rule_exactly(const struct classifier *,
-                                              const struct cls_rule *);
+const struct cls_rule *classifier_find_rule_exactly(const struct classifier *,
+                                                    const struct cls_rule *);
 
-struct cls_rule *classifier_find_match_exactly(const struct classifier *,
-                                               const struct match *,
-                                               unsigned int priority);
+const struct cls_rule *classifier_find_match_exactly(const struct classifier *,
+                                                     const struct match *,
+                                                     unsigned int priority);
 
-/* Iteration. */
-
+/* Iteration.
+ *
+ * Iteration is lockless and RCU-protected.  Concurrent threads may perform all
+ * kinds of concurrent modifications without ruining the iteration.  Obviously,
+ * any modifications may or may not be visible to the concurrent iterator, but
+ * all the rules NOT modified are visited by the iteration.  The iterating
+ * thread may also modify the classifier rules itself.
+ *
+ * 'TARGET' iteration only iterates rules matching the 'TARGET' criteria.
+ * Rather than looping through all the rules and skipping ones that can't
+ * match, 'TARGET' iteration skips whole subtables, if the 'TARGET' happens to
+ * be more specific than the subtable. */
 struct cls_cursor {
     const struct classifier *cls;
     const struct cls_subtable *subtable;
     const struct cls_rule *target;
-    struct cmap_cursor subtables;
-    struct cmap_cursor rules;
-    struct cls_rule *rule;
-    bool safe;
+    struct pvector_cursor subtables;
+    const struct cls_rule *rule;
 };
 
-/* Iteration requires mutual exclusion of writers.  We do this by taking
- * a mutex for the duration of the iteration, except for the
- * 'SAFE' variant, where we release the mutex for the body of the loop. */
 struct cls_cursor cls_cursor_start(const struct classifier *cls,
-                                   const struct cls_rule *target,
-                                   bool safe);
-
+                                   const struct cls_rule *target);
 void cls_cursor_advance(struct cls_cursor *);
 
-#define CLS_FOR_EACH(RULE, MEMBER, CLS) \
+#define CLS_FOR_EACH(RULE, MEMBER, CLS)             \
     CLS_FOR_EACH_TARGET(RULE, MEMBER, CLS, NULL)
 #define CLS_FOR_EACH_TARGET(RULE, MEMBER, CLS, TARGET)                  \
-    for (struct cls_cursor cursor__ = cls_cursor_start(CLS, TARGET, false); \
-         (cursor__.rule                                                 \
-          ? (INIT_CONTAINER(RULE, cursor__.rule, MEMBER),               \
-             true)                                                      \
-          : false);                                                     \
-         cls_cursor_advance(&cursor__))
-
-/* These forms allows classifier_remove() to be called within the loop. */
-#define CLS_FOR_EACH_SAFE(RULE, MEMBER, CLS) \
-    CLS_FOR_EACH_TARGET_SAFE(RULE, MEMBER, CLS, NULL)
-#define CLS_FOR_EACH_TARGET_SAFE(RULE, MEMBER, CLS, TARGET)             \
-    for (struct cls_cursor cursor__ = cls_cursor_start(CLS, TARGET, true); \
+    for (struct cls_cursor cursor__ = cls_cursor_start(CLS, TARGET);    \
          (cursor__.rule                                                 \
           ? (INIT_CONTAINER(RULE, cursor__.rule, MEMBER),               \
              cls_cursor_advance(&cursor__),                             \
              true)                                                      \
           : false);                                                     \
-        )                                                               \
+        )
 
 #ifdef __cplusplus
 }
diff --git a/lib/flow.c b/lib/flow.c
index 3935ea6..af96195 100644
--- a/lib/flow.c
+++ b/lib/flow.c
@@ -1988,8 +1988,8 @@ miniflow_equal(const struct miniflow *a, const struct miniflow *b)
     return true;
 }
 
-/* Returns true if 'a' and 'b' are equal at the places where there are 1-bits
- * in 'mask', false if they differ. */
+/* Returns false if 'a' and 'b' differ at the places where there are 1-bits
+ * in 'mask', true otherwise. */
 bool
 miniflow_equal_in_minimask(const struct miniflow *a, const struct miniflow *b,
                            const struct minimask *mask)
diff --git a/lib/pvector.h b/lib/pvector.h
index 61d71b9..2d646af 100644
--- a/lib/pvector.h
+++ b/lib/pvector.h
@@ -150,6 +150,13 @@ static inline void pvector_cursor_lookahead(const struct pvector_cursor *,
     for (struct pvector_cursor cursor__ = pvector_cursor_init(PVECTOR, N, SZ); \
          ((PTR) = pvector_cursor_next(&cursor__, PRIORITY, N, SZ)) != NULL; )
 
+#define PVECTOR_CURSOR_FOR_EACH(PTR, CURSOR, PVECTOR)                \
+    for (*(CURSOR) = pvector_cursor_init(PVECTOR, 0, 0);             \
+         ((PTR) = pvector_cursor_next(CURSOR, -1, 0, 0)) != NULL; )
+
+#define PVECTOR_CURSOR_FOR_EACH_CONTINUE(PTR, CURSOR)                   \
+    for (; ((PTR) = pvector_cursor_next(CURSOR, -1, 0, 0)) != NULL; )
+
 
 /* Inline implementations. */
 
diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index 627f326..f6e73e8 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -2165,7 +2165,7 @@ ofmonitor_report(struct connmgr *mgr, struct rule *rule,
                 fu.cookie = rule->flow_cookie;
                 minimatch_expand(&rule->cr.match, &match);
                 fu.match = &match;
-                fu.priority = rule->cr.priority;
+                fu.priority = rule->cr.node.priority;
 
                 ovs_mutex_lock(&rule->mutex);
                 fu.idle_timeout = rule->idle_timeout;
diff --git a/ofproto/fail-open.h b/ofproto/fail-open.h
index 725b82d..a9381c9 100644
--- a/ofproto/fail-open.h
+++ b/ofproto/fail-open.h
@@ -36,7 +36,7 @@ struct ofproto;
 static inline bool
 is_fail_open_rule(const struct rule *rule)
 {
-    return rule->cr.priority == FAIL_OPEN_PRIORITY;
+    return rule->cr.node.priority == FAIL_OPEN_PRIORITY;
 }
 
 struct fail_open *fail_open_create(struct ofproto *, struct connmgr *);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index d965d38..1dfae96 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -1375,7 +1375,7 @@ destruct(struct ofproto *ofproto_)
     hmap_remove(&all_ofproto_dpifs, &ofproto->all_ofproto_dpifs_node);
 
     OFPROTO_FOR_EACH_TABLE (table, &ofproto->up) {
-        CLS_FOR_EACH_SAFE (rule, up.cr, &table->cls) {
+        CLS_FOR_EACH (rule, up.cr, &table->cls) {
             ofproto_rule_delete(&ofproto->up, &rule->up);
         }
     }
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index 158f86e..80a5001 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -1693,7 +1693,7 @@ rule_get_actions(const struct rule *rule)
 static inline bool
 rule_is_table_miss(const struct rule *rule)
 {
-    return rule->cr.priority == 0 && cls_rule_is_catchall(&rule->cr);
+    return rule->cr.node.priority == 0 && cls_rule_is_catchall(&rule->cr);
 }
 
 /* Returns true if 'rule' should be hidden from the controller.
@@ -1704,7 +1704,7 @@ rule_is_table_miss(const struct rule *rule)
 static inline bool
 rule_is_hidden(const struct rule *rule)
 {
-    return rule->cr.priority > UINT16_MAX;
+    return rule->cr.node.priority > UINT16_MAX;
 }
 
 static inline struct rule *
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index b8f0e62..cd3d04d 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -1376,7 +1376,7 @@ ofproto_flush__(struct ofproto *ofproto)
             continue;
         }
 
-        CLS_FOR_EACH_SAFE (rule, cr, &table->cls) {
+        CLS_FOR_EACH (rule, cr, &table->cls) {
             ofproto_rule_delete__(rule, OFPRR_DELETE);
         }
     }
@@ -3822,7 +3822,7 @@ handle_flow_stats_request(struct ofconn *ofconn,
         minimatch_expand(&rule->cr.match, &fs.match);
         fs.table_id = rule->table_id;
         calc_duration(created, now, &fs.duration_sec, &fs.duration_nsec);
-        fs.priority = rule->cr.priority;
+        fs.priority = rule->cr.node.priority;
         fs.idle_age = age_secs(now - used);
         fs.hard_age = age_secs(now - modified);
         fs.ofpacts = actions->ofpacts;
@@ -4580,7 +4580,7 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
     }
 
     minimatch_expand(&rule->cr.match, &fr.match);
-    fr.priority = rule->cr.priority;
+    fr.priority = rule->cr.node.priority;
     fr.cookie = rule->flow_cookie;
     fr.reason = reason;
     fr.table_id = rule->table_id;
@@ -4908,7 +4908,7 @@ ofproto_compose_flow_refresh_update(const struct rule *rule,
     fu.cookie = rule->flow_cookie;
     minimatch_expand(&rule->cr.match, &match);
     fu.match = &match;
-    fu.priority = rule->cr.priority;
+    fu.priority = rule->cr.node.priority;
 
     actions = flags & NXFMF_ACTIONS ? rule_get_actions(rule) : NULL;
     fu.ofpacts = actions ? actions->ofpacts : NULL;
diff --git a/tests/test-classifier.c b/tests/test-classifier.c
index 048aaa0..9798a85 100644
--- a/tests/test-classifier.c
+++ b/tests/test-classifier.c
@@ -158,7 +158,7 @@ tcls_insert(struct tcls *tcls, const struct test_rule *rule)
             ovsrcu_postpone(free_rule, tcls->rules[i]);
             tcls->rules[i] = clone_rule(rule);
             return tcls->rules[i];
-        } else if (pos->priority < rule->cls_rule.priority) {
+        } else if (pos->node.priority < rule->cls_rule.node.priority) {
             break;
         }
     }
@@ -406,7 +406,7 @@ compare_classifiers(struct classifier *cls, struct tcls *tcls)
 
     assert(classifier_count(cls) == tcls->n_rules);
     for (i = 0; i < confidence; i++) {
-        struct cls_rule *cr0, *cr1, *cr2;
+        const struct cls_rule *cr0, *cr1, *cr2;
         struct flow flow;
         struct flow_wildcards wc;
         unsigned int x;
@@ -454,7 +454,7 @@ destroy_classifier(struct classifier *cls)
 {
     struct test_rule *rule;
 
-    CLS_FOR_EACH_SAFE (rule, cls_rule, cls) {
+    CLS_FOR_EACH (rule, cls_rule, cls) {
         if (classifier_remove(cls, &rule->cls_rule)) {
             ovsrcu_postpone(free_rule, rule);
         }
@@ -547,13 +547,13 @@ check_tables(const struct classifier *cls, int n_tables, int n_rules,
 
         ovs_mutex_lock(&cls->mutex);
         assert(trie_verify(&table->ports_trie, 0, table->ports_mask_len)
-               == (table->ports_mask_len ? table->n_rules : 0));
+               == (table->ports_mask_len ? cmap_count(&table->rules) : 0));
         ovs_mutex_unlock(&cls->mutex);
 
         found_tables++;
         CMAP_FOR_EACH (head, cmap_node, &table->rules) {
             unsigned int prev_priority = UINT_MAX;
-            const struct cls_match *rule;
+            const struct cls_rule *rule;
 
             if (head->priority > max_priority) {
                 max_priority = head->priority;
@@ -563,18 +563,18 @@ check_tables(const struct classifier *cls, int n_tables, int n_rules,
             }
 
             found_rules++;
+
             ovs_mutex_lock(&cls->mutex);
-            LIST_FOR_EACH (rule, list, &head->list) {
-                assert(rule->priority < prev_priority);
-                assert(rule->priority <= table->max_priority);
-
-                prev_priority = rule->priority;
-                found_rules++;
-                found_dups++;
-                ovs_mutex_unlock(&cls->mutex);
-                assert(classifier_find_rule_exactly(cls, rule->cls_rule)
-                       == rule->cls_rule);
-                ovs_mutex_lock(&cls->mutex);
+            FOR_EACH_RULE_IN_LIST (rule, head) {
+                if (rule != head->cls_rule) {
+                    assert(rule->node.priority < prev_priority);
+                    assert(rule->node.priority <= table->max_priority);
+
+                    prev_priority = rule->node.priority;
+                    found_rules++;
+                    found_dups++;
+                    assert(classifier_find_rule_exactly(cls, rule) == rule);
+                }
             }
             ovs_mutex_unlock(&cls->mutex);
         }
@@ -908,7 +908,8 @@ test_many_rules_in_one_list (int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
                         int k = pri_rules[pris[j]];
                         assert(displaced_rule != NULL);
                         assert(displaced_rule != rules[j]);
-                        assert(pris[j] == displaced_rule->cls_rule.priority);
+                        assert(pris[j] ==
+                               displaced_rule->cls_rule.node.priority);
                         tcls_rules[k] = NULL;
                     } else {
                         assert(displaced_rule == NULL);
@@ -1075,8 +1076,7 @@ test_many_rules_in_n_tables(int n_tables)
 
             target = clone_rule(tcls.rules[random_range(tcls.n_rules)]);
 
-            CLS_FOR_EACH_TARGET_SAFE (rule, cls_rule, &cls,
-                                      &target->cls_rule) {
+            CLS_FOR_EACH_TARGET (rule, cls_rule, &cls, &target->cls_rule) {
                 if (classifier_remove(&cls, &rule->cls_rule)) {
                     ovsrcu_postpone(free_rule, rule);
                 }
diff --git a/utilities/ovs-ofctl.c b/utilities/ovs-ofctl.c
index ae8d59d..7023cd1 100644
--- a/utilities/ovs-ofctl.c
+++ b/utilities/ovs-ofctl.c
@@ -2330,9 +2330,9 @@ fte_free_all(struct classifier *cls)
 {
     struct fte *fte;
 
-    CLS_FOR_EACH_SAFE (fte, rule, cls) {
+    CLS_FOR_EACH (fte, rule, cls) {
         classifier_remove(cls, &fte->rule);
-        fte_free(fte);
+        ovsrcu_postpone(fte_free, fte);
     }
     classifier_destroy(cls);
 }
@@ -2354,10 +2354,10 @@ fte_insert(struct classifier *cls, const struct match *match,
 
     old = fte_from_cls_rule(classifier_replace(cls, &fte->rule));
     if (old) {
-        fte_version_free(old->versions[index]);
         fte->versions[!index] = old->versions[!index];
-        cls_rule_destroy(&old->rule);
-        free(old);
+        old->versions[!index] = NULL;
+
+        ovsrcu_postpone(fte_free, old);
     }
 }
 
@@ -2516,7 +2516,7 @@ fte_make_flow_mod(const struct fte *fte, int index, uint16_t command,
     struct ofpbuf *ofm;
 
     minimatch_expand(&fte->rule.match, &fm.match);
-    fm.priority = fte->rule.priority;
+    fm.priority = fte->rule.node.priority;
     fm.cookie = htonll(0);
     fm.cookie_mask = htonll(0);
     fm.new_cookie = version->cookie;
-- 
1.7.10.4




More information about the dev mailing list