[ovs-dev] [PATCH v5 2/2] dpif-netdev: Exact match cache

Daniele Di Proietto ddiproietto at vmware.com
Fri Aug 29 23:06:43 UTC 2014


Since lookups in the classifier can be pretty expensive, we introduce this
(thread local) cache which simply compares the miniflows of the packets

Signed-off-by: Daniele Di Proietto <ddiproietto at vmware.com>
---
 lib/dpif-netdev.c | 447 +++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 374 insertions(+), 73 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index fbb9244..81857ce 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -87,6 +87,66 @@ static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
 
 static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
 
+/* Stores a miniflow */
+
+/* There are fields in the flow structure that we never use. Therefore we can
+ * save a few words of memory */
+#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S                 \
+                                 - MINI_N_INLINE           \
+                                 - FLOW_U32_SIZE(regs)     \
+                                 - FLOW_U32_SIZE(metadata) \
+                                )
+struct netdev_flow_key {
+    struct miniflow flow;
+    uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
+};
+
+/* Exact match cache for frequently used flows
+ *
+ * The cache uses a 32-bit hash of the packet (which can be the RSS hash) to
+ * search its entries for a miniflow that matches exactly the miniflow of the
+ * packet. It stores the 'cls_rule'(rule) that matches the miniflow.
+ *
+ * A cache entry holds a reference to its 'dp_netdev_flow'.
+ *
+ * A miniflow with a given hash can be in one of EM_FLOW_HASH_SEGS different
+ * entries. The 32-bit hash is split into EM_FLOW_HASH_SEGS values (each of
+ * them is EM_FLOW_HASH_SHIFT bits wide and the remainder is thrown away). Each
+ * value is the index of a cache entry where the miniflow could be.
+ *
+ *
+ * Thread-safety
+ * =============
+ *
+ * Each pmd_thread has its own private exact match cache.
+ * If dp_netdev_input is not called from a pmd thread, a mutex is used.
+ */
+
+#define EM_FLOW_HASH_SHIFT 10
+#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT)
+#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1)
+#define EM_FLOW_HASH_SEGS 2
+
+#define EM_BATCH_SIZE 4
+
+struct emc_entry {
+    uint32_t hash;
+    struct netdev_flow_key mf;
+    struct dp_netdev_flow *flow;
+};
+
+struct emc_cache {
+    struct emc_entry entries[EM_FLOW_HASH_ENTRIES];
+};
+
+/* Iterate in the exact match cache through every entry that might contain a
+ * miniflow with hash 'HASH'. */
+#define EMC_FOR_EACH_POS_WITH_HASH(EMC, CURRENT_ENTRY, HASH)                 \
+    for (uint32_t i__ = 0, srch_hash__ = (HASH);                             \
+         (CURRENT_ENTRY) = &(EMC)->entries[srch_hash__ & EM_FLOW_HASH_MASK], \
+         i__ < EM_FLOW_HASH_SEGS;                                            \
+         i__++, srch_hash__ >>= EM_FLOW_HASH_SHIFT)
+
 /* Datapath based on the network device interface from netdev.h.
  *
  *
@@ -100,6 +160,7 @@ static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600);
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
+ *    emc_mutex
  *    flow_mutex
  */
 struct dp_netdev {
@@ -141,6 +202,12 @@ struct dp_netdev {
     struct pmd_thread *pmd_threads;
     size_t n_pmd_threads;
     int pmd_count;
+
+    /* Exact match cache for non-pmd devices.
+     * Pmd devices use instead each thread's flow_cache for this purpose.
+     * Protected by emc_mutex */
+    struct emc_cache flow_cache OVS_GUARDED;
+    struct ovs_mutex emc_mutex;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -173,20 +240,6 @@ struct dp_netdev_port {
     char *type;                 /* Port type as requested by user. */
 };
 
-
-/* Stores a miniflow */
-
-/* There are fields in the flow structure that we never use. Therefore we can
- * save a few words of memory */
-#define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S - MINI_N_INLINE \
-                                 - FLOW_U32_SIZE(regs) \
-                                 - FLOW_U32_SIZE(metadata) \
-                                )
-struct netdev_flow_key {
-    struct miniflow flow;
-    uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
-};
-
 /* A flow in dp_netdev's 'flow_table'.
  *
  *
@@ -225,6 +278,7 @@ struct netdev_flow_key {
  * requires synchronization, as noted in more detail below.
  */
 struct dp_netdev_flow {
+    bool dead;
     /* Packet classification. */
     const struct cls_rule cr;   /* In owning dp_netdev's 'cls'. */
 
@@ -248,6 +302,7 @@ struct dp_netdev_flow {
 };
 
 static void dp_netdev_flow_unref(struct dp_netdev_flow *);
+static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
 struct dp_netdev_flow_stats {
@@ -292,6 +347,7 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
  **/
 struct pmd_thread {
     struct dp_netdev *dp;
+    struct emc_cache flow_cache;
     pthread_t thread;
     int id;
     atomic_uint change_seq;
@@ -323,15 +379,42 @@ static int dpif_netdev_open(const struct dpif_class *, const char *name,
 static void dp_netdev_execute_actions(struct dp_netdev *dp,
                                       struct dpif_packet **, int c,
                                       bool may_steal, struct pkt_metadata *,
+                                      struct emc_cache *flow_cache,
                                       const struct nlattr *actions,
                                       size_t actions_len);
 static void dp_netdev_port_input(struct dp_netdev *dp,
+                                 struct emc_cache *flow_cache,
                                  struct dpif_packet **packets, int cnt,
                                  odp_port_t port_no);
 
 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 
+static void emc_clear_entry(struct emc_entry *ce);
+
+static void
+emc_cache_init(struct emc_cache *flow_cache)
+{
+    int i;
+
+    for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) {
+        flow_cache->entries[i].flow = NULL;
+        flow_cache->entries[i].hash = 0;
+        miniflow_initialize(&flow_cache->entries[i].mf.flow,
+                            flow_cache->entries[i].mf.buf);
+    }
+}
+
+static void
+emc_cache_uninit(struct emc_cache *flow_cache)
+{
+    int i;
+
+    for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) {
+        emc_clear_entry(&flow_cache->entries[i]);
+    }
+}
+
 static struct dpif_netdev *
 dpif_netdev_cast(const struct dpif *dpif)
 {
@@ -479,6 +562,9 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
         return error;
     }
 
+    ovs_mutex_init(&dp->emc_mutex);
+    emc_cache_init(&dp->flow_cache);
+
     *dpp = dp;
     return 0;
 }
@@ -543,6 +629,10 @@ dp_netdev_free(struct dp_netdev *dp)
     cmap_destroy(&dp->ports);
     fat_rwlock_destroy(&dp->upcall_rwlock);
     latch_destroy(&dp->exit_latch);
+
+    emc_cache_uninit(&dp->flow_cache);
+    ovs_mutex_destroy(&dp->emc_mutex);
+
     free(CONST_CAST(char *, dp->name));
     free(dp);
 }
@@ -922,6 +1012,7 @@ dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
 
     classifier_remove(&dp->cls, cr);
     cmap_remove(&dp->flow_table, node, flow_hash(&flow->flow, 0));
+    flow->dead = true;
 
     dp_netdev_flow_unref(flow);
 }
@@ -1029,6 +1120,95 @@ dp_netdev_flow_cast(const struct cls_rule *cr)
     return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL;
 }
 
+static bool dp_netdev_flow_ref(struct dp_netdev_flow *flow)
+{
+    return ovs_refcount_try_ref_rcu(&flow->ref_cnt);
+}
+
+static inline bool
+emc_entry_alive(struct emc_entry *ce)
+{
+    return ce->flow && !ce->flow->dead;
+}
+
+static void
+emc_clear_entry(struct emc_entry *ce)
+{
+    if (ce->flow) {
+        dp_netdev_flow_unref(ce->flow);
+        ce->flow = NULL;
+    }
+}
+
+static inline void
+emc_change_entry(struct emc_entry *ce, struct dp_netdev_flow *flow,
+                 const struct miniflow *mf, uint32_t hash)
+{
+    if (ce->flow != flow) {
+        if (ce->flow) {
+            dp_netdev_flow_unref(ce->flow);
+        }
+
+        if (dp_netdev_flow_ref(flow)) {
+            ce->flow = flow;
+        } else {
+            ce->flow = NULL;
+        }
+    }
+    if (mf) {
+        miniflow_clone_inline(&ce->mf.flow, mf, count_1bits(mf->map));
+        ce->hash = hash;
+    }
+}
+
+static inline void
+emc_insert(struct emc_cache *cache, const struct miniflow *mf, uint32_t hash,
+           struct dp_netdev_flow *flow)
+{
+    struct emc_entry *to_be_replaced = NULL;
+    struct emc_entry *current_entry;
+
+    EMC_FOR_EACH_POS_WITH_HASH(cache, current_entry, hash) {
+        if (current_entry->hash == hash
+            && miniflow_equal(&current_entry->mf.flow, mf)) {
+
+            /* We found the entry with the 'mf' miniflow */
+            emc_change_entry(current_entry, flow, NULL, 0);
+            return;
+        }
+
+        /* Replacement policy: put the flow in an empty (not alive) entry, or
+         * in the first entry where it can be */
+        if (!to_be_replaced
+            || (emc_entry_alive(to_be_replaced)
+                && !emc_entry_alive(current_entry))
+            || current_entry->hash < to_be_replaced->hash) {
+            to_be_replaced = current_entry;
+        }
+    }
+    /* We didn't find the miniflow in the cache.
+     * The 'to_be_replaced' entry is where the new flow will be stored */
+
+    emc_change_entry(to_be_replaced, flow, mf, hash);
+}
+
+static inline struct dp_netdev_flow *
+emc_lookup(struct emc_cache *cache, const struct miniflow *mf, uint32_t hash)
+{
+    struct emc_entry *current_entry;
+
+    EMC_FOR_EACH_POS_WITH_HASH(cache, current_entry, hash) {
+        if (current_entry->hash == hash && emc_entry_alive(current_entry)
+            && miniflow_equal(&current_entry->mf.flow, mf)) {
+
+            /* We found the entry with the 'mf' miniflow */
+            return current_entry->flow;
+        }
+    }
+
+    return NULL;
+}
+
 static struct dp_netdev_flow *
 dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key)
 {
@@ -1520,8 +1700,11 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     packet.ofpbuf = *execute->packet;
     pp = &packet;
 
+    ovs_mutex_lock(&dp->emc_mutex);
     dp_netdev_execute_actions(dp, &pp, 1, false, md,
-                              execute->actions, execute->actions_len);
+                              &dp->flow_cache, execute->actions,
+                              execute->actions_len);
+    ovs_mutex_unlock(&dp->emc_mutex);
 
     /* Even though may_steal is set to false, some actions could modify or
      * reallocate the ofpbuf memory. We need to pass those changes to the
@@ -1599,15 +1782,16 @@ dp_netdev_actions_free(struct dp_netdev_actions *actions)
 
 static void
 dp_netdev_process_rxq_port(struct dp_netdev *dp,
-                          struct dp_netdev_port *port,
-                          struct netdev_rxq *rxq)
+                           struct emc_cache *flow_cache,
+                           struct dp_netdev_port *port,
+                           struct netdev_rxq *rxq)
 {
     struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
     int error, cnt;
 
     error = netdev_rxq_recv(rxq, packets, &cnt);
     if (!error) {
-        dp_netdev_port_input(dp, packets, cnt, port->port_no);
+        dp_netdev_port_input(dp, flow_cache, packets, cnt, port->port_no);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl
             = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -1624,15 +1808,18 @@ dpif_netdev_run(struct dpif *dpif)
     struct dp_netdev_port *port;
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
+    ovs_mutex_lock(&dp->emc_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
         if (!netdev_is_pmd(port->netdev)) {
             int i;
 
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
+                dp_netdev_process_rxq_port(dp, &dp->flow_cache, port,
+                                           port->rxq[i]);
             }
         }
     }
+    ovs_mutex_unlock(&dp->emc_mutex);
 }
 
 static void
@@ -1716,13 +1903,15 @@ pmd_thread_main(void *f_)
 
     pmd_thread_setaffinity_cpu(f->id);
 reload:
+    emc_cache_init(&f->flow_cache);
     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
 
     for (;;) {
         int i;
 
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
+            dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port,
+                                       poll_list[i].rx);
         }
 
         if (lc++ > 1024) {
@@ -1740,6 +1929,8 @@ reload:
         }
     }
 
+    emc_cache_uninit(&f->flow_cache);
+
     if (!latch_is_set(&f->dp->exit_latch)){
         goto reload;
     }
@@ -1908,6 +2099,20 @@ dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
                          put_actions, dp->upcall_aux);
 }
 
+static inline uint32_t
+dpif_netdev_packet_get_dp_hash(struct dpif_packet *packet,
+                               const struct miniflow *mf)
+{
+    uint32_t hash;
+
+    hash = dpif_packet_get_dp_hash(packet);
+    if (OVS_UNLIKELY(!hash)) {
+        hash = miniflow_hash_5tuple(mf, 0);
+        dpif_packet_set_dp_hash(packet, hash);
+    }
+    return hash;
+}
+
 struct packet_batch {
     unsigned int packet_count;
     unsigned int byte_count;
@@ -1920,8 +2125,8 @@ struct packet_batch {
 };
 
 static inline void
-packet_batch_update(struct packet_batch *batch,
-                    struct dpif_packet *packet, const struct miniflow *mf)
+packet_batch_update(struct packet_batch *batch, struct dpif_packet *packet,
+                    const struct miniflow *mf)
 {
     batch->tcp_flags |= miniflow_get_tcp_flags(mf);
     batch->packets[batch->packet_count++] = packet;
@@ -1941,7 +2146,8 @@ packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow,
 }
 
 static inline void
-packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp)
+packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
+                     struct emc_cache *flow_cache)
 {
     struct dp_netdev_actions *actions;
     struct dp_netdev_flow *flow = batch->flow;
@@ -1951,36 +2157,129 @@ packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp)
 
     actions = dp_netdev_flow_get_actions(flow);
 
-    dp_netdev_execute_actions(dp, batch->packets,
-                              batch->packet_count, true, &batch->md,
+    dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true,
+                              &batch->md, flow_cache,
                               actions->actions, actions->size);
 
     dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
 }
 
-static void
-dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
-                struct pkt_metadata *md)
+static inline bool
+dp_netdev_queue_batches(struct dpif_packet *pkt, struct pkt_metadata *md,
+                        struct dp_netdev_flow *flow, const struct miniflow *mf,
+                        struct packet_batch *batches, size_t *n_batches,
+                        size_t max_batches)
+{
+    struct packet_batch *batch = NULL;
+    int j;
+
+    if (OVS_UNLIKELY(!flow)) {
+        return false;
+    }
+    /* XXX: This O(n^2) algortihm makes sense if we're operating under the
+     * assumption that the number of distinct flows (and therefore the
+     * number of distinct batches) is quite small.  If this turns out not
+     * to be the case, it may make sense to pre sort based on the
+     * netdev_flow pointer.  That done we can get the appropriate batching
+     * in O(n * log(n)) instead. */
+    for (j = *n_batches - 1; j >= 0; j--) {
+        if (batches[j].flow == flow) {
+            batch = &batches[j];
+            packet_batch_update(batch, pkt, mf);
+            return true;
+        }
+    }
+    if (OVS_UNLIKELY(*n_batches >= max_batches)) {
+        return false;
+    }
+
+    batch = &batches[(*n_batches)++];
+    packet_batch_init(batch, flow, md);
+    packet_batch_update(batch, pkt, mf);
+    return true;
+}
+
+static inline void
+dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b)
+{
+    struct dpif_packet *tmp = *a;
+    *a = *b;
+    *b = tmp;
+}
+
+/* Try to process all ('cnt') the 'packets' using only the exact match cache
+ * 'flow_cache'. If a flow is not found for a packet 'packets[i]', or if there
+ * is no matching batch for a packet's flow, the miniflow is copied into 'keys'
+ * and the packet pointer is moved at the beginning of the 'packets' array.
+ *
+ * The function returns the number of packets that needs to be processed in the
+ * 'packets' array (they have been moved to the beginning of the vector).
+ */
+static inline size_t
+emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
+               struct dpif_packet **packets, size_t cnt,
+               struct pkt_metadata *md, struct netdev_flow_key *keys)
 {
-    struct packet_batch batches[NETDEV_MAX_RX_BATCH];
-    struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
-    const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
-    struct cls_rule *rules[NETDEV_MAX_RX_BATCH];
+    struct netdev_flow_key key;
+    struct packet_batch batches[EM_BATCH_SIZE];
     size_t n_batches, i;
-    bool any_miss;
+    size_t notfound_cnt = 0;
 
+    n_batches = 0;
+    miniflow_initialize(&key.flow, key.buf);
     for (i = 0; i < cnt; i++) {
+        struct dp_netdev_flow *flow;
+        uint32_t hash;
+
         if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) {
             dpif_packet_delete(packets[i]);
-            mfs[i] = NULL;
             continue;
         }
 
-        miniflow_initialize(&keys[i].flow, keys[i].buf);
-        miniflow_extract(&packets[i]->ofpbuf, md, &keys[i].flow);
-        mfs[i] = &keys[i].flow;
+        miniflow_extract(&packets[i]->ofpbuf, md, &key.flow);
+
+        hash = dpif_netdev_packet_get_dp_hash(packets[i], &key.flow);
+
+        flow = emc_lookup(flow_cache, &key.flow, hash);
+        if (OVS_UNLIKELY(!dp_netdev_queue_batches(packets[i], md,
+                                                  flow,  &key.flow,
+                                                  batches, &n_batches,
+                                                  ARRAY_SIZE(batches)))) {
+            if (i != notfound_cnt) {
+                dpif_packet_swap(&packets[i], &packets[notfound_cnt]);
+            }
+
+            keys[notfound_cnt++] = key;
+        }
+    }
+
+    for (i = 0; i < n_batches; i++) {
+        packet_batch_execute(&batches[i], dp, flow_cache);
     }
 
+    return notfound_cnt;
+}
+
+static inline void
+fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
+                     struct dpif_packet **packets, size_t cnt,
+                     struct pkt_metadata *md, struct netdev_flow_key *keys)
+{
+#ifndef __CHECKER__
+    const size_t PKT_ARRAY_SIZE = cnt;
+#else
+    /* Sparse doesn't like variable length array */
+    enum { PKT_ARRAY_SIZE = NETDEV_MAX_RX_BATCH };
+#endif
+    struct packet_batch batches[PKT_ARRAY_SIZE];
+    const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. */
+    struct cls_rule *rules[PKT_ARRAY_SIZE];
+    size_t n_batches, i;
+    bool any_miss;
+
+    for (i = 0; i < cnt; i++) {
+        mfs[i] = &keys[i].flow;
+    }
     any_miss = !classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
     if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
         uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
@@ -2024,7 +2323,7 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
              * the actions.  Otherwise, if there are any slow path actions,
              * we'll send the packet up twice. */
             dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
-                                      ofpbuf_data(&actions),
+                                      flow_cache, ofpbuf_data(&actions),
                                       ofpbuf_size(&actions));
 
             add_actions = ofpbuf_size(&put_actions)
@@ -2053,54 +2352,58 @@ dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
 
     n_batches = 0;
     for (i = 0; i < cnt; i++) {
+        struct dpif_packet *packet = packets[i];
         struct dp_netdev_flow *flow;
-        struct packet_batch *batch;
-        size_t j;
 
         if (OVS_UNLIKELY(!rules[i] || !mfs[i])) {
             continue;
         }
 
-        /* XXX: This O(n^2) algortihm makes sense if we're operating under the
-         * assumption that the number of distinct flows (and therefore the
-         * number of distinct batches) is quite small.  If this turns out not
-         * to be the case, it may make sense to pre sort based on the
-         * netdev_flow pointer.  That done we can get the appropriate batching
-         * in O(n * log(n)) instead. */
-        batch = NULL;
         flow = dp_netdev_flow_cast(rules[i]);
-        for (j = 0; j < n_batches; j++) {
-            if (batches[j].flow == flow) {
-                batch = &batches[j];
-                break;
-            }
-        }
-
-        if (!batch) {
-            batch = &batches[n_batches++];
-            packet_batch_init(batch, flow, md);
-        }
-        packet_batch_update(batch, packets[i], mfs[i]);
+        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), flow);
+        dp_netdev_queue_batches(packet, md, flow, mfs[i], batches, &n_batches,
+                                ARRAY_SIZE(batches));
     }
 
     for (i = 0; i < n_batches; i++) {
-        packet_batch_execute(&batches[i], dp);
+        packet_batch_execute(&batches[i], dp, flow_cache);
     }
 }
 
 static void
-dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets,
-                     int cnt, odp_port_t port_no)
+dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
+                struct dpif_packet **packets, int cnt, struct pkt_metadata *md)
+{
+#ifndef __CHECKER__
+    const size_t PKT_ARRAY_SIZE = cnt;
+#else
+    /* Sparse doesn't like variable length array */
+    enum { PKT_ARRAY_SIZE = NETDEV_MAX_RX_BATCH };
+#endif
+    struct netdev_flow_key keys[PKT_ARRAY_SIZE];
+    size_t newcnt;
+
+    newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys);
+    if (OVS_UNLIKELY(newcnt)) {
+        fast_path_processing(dp, flow_cache, packets, newcnt, md, keys);
+    }
+}
+
+
+static void
+dp_netdev_port_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
+                     struct dpif_packet **packets, int cnt, odp_port_t port_no)
 {
     uint32_t *recirc_depth = recirc_depth_get();
     struct pkt_metadata md = PKT_METADATA_INITIALIZER(port_no);
 
     *recirc_depth = 0;
-    dp_netdev_input(dp, packets, cnt, &md);
+    dp_netdev_input(dp, flow_cache, packets, cnt, &md);
 }
 
 struct dp_netdev_execute_aux {
     struct dp_netdev *dp;
+    struct emc_cache *flow_cache;
 };
 
 static void
@@ -2157,6 +2460,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                                          NULL);
                 if (!error || error == ENOSPC) {
                     dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
+                                              aux->flow_cache,
                                               ofpbuf_data(&actions),
                                               ofpbuf_size(&actions));
                 }
@@ -2173,22 +2477,17 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
     case OVS_ACTION_ATTR_HASH: {
         const struct ovs_action_hash *hash_act;
-        struct netdev_flow_key key;
         uint32_t hash;
 
         hash_act = nl_attr_get(a);
 
-        miniflow_initialize(&key.flow, key.buf);
-
         for (i = 0; i < cnt; i++) {
 
-            /* XXX: this is slow. Use RSS hash in the future */
-            miniflow_extract(&packets[i]->ofpbuf, md, &key.flow);
-
             if (hash_act->hash_alg == OVS_HASH_ALG_L4) {
                 /* Hash need not be symmetric, nor does it need to include
                  * L2 fields. */
-                hash = miniflow_hash_5tuple(&key.flow, hash_act->hash_basis);
+                hash = hash_2words(dpif_packet_get_dp_hash(packets[i]),
+                                   hash_act->hash_basis);
             } else {
                 VLOG_WARN("Unknown hash algorithm specified "
                           "for the hash action.");
@@ -2202,7 +2501,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
             if (i == 0) {
                 md->dp_hash = hash;
             }
-            packets[i]->dp_hash = hash;
+            dpif_packet_set_dp_hash(packets[i], hash);
         }
         break;
     }
@@ -2223,7 +2522,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                 /* Hash is private to each packet */
                 recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]);
 
-                dp_netdev_input(dp, &recirc_pkt, 1, &recirc_md);
+                dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1,
+                                &recirc_md);
             }
             (*depth)--;
 
@@ -2254,9 +2554,10 @@ static void
 dp_netdev_execute_actions(struct dp_netdev *dp,
                           struct dpif_packet **packets, int cnt,
                           bool may_steal, struct pkt_metadata *md,
+                          struct emc_cache *flow_cache,
                           const struct nlattr *actions, size_t actions_len)
 {
-    struct dp_netdev_execute_aux aux = {dp};
+    struct dp_netdev_execute_aux aux = {dp, flow_cache};
 
     odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
                         actions_len, dp_execute_cb);
-- 
2.1.0.rc1




More information about the dev mailing list