[ovs-dev] [PATCH 1/1] ofproto-dpif: Fix for out-of-order packets in upcalls processing

Dmitry Fleytman dmitry at daynix.com
Wed Jul 24 13:59:34 UTC 2013


From: Dmitry Fleytman <dfleytma at redhat.com>

User mode logic groups incoming packets by flows and then
process flows in arbitrary order based on flow hash keys values.
This behavior may lead to unexpected packets reordering. For example
two packets that belong to the same "logical stream" and differ
by ECN bit only placed to different flows and may be transmitted
in reverse order.

This patch removes connection between packets classification and
packets transmission order to ensure proper order for outgoing packets.
---
 ofproto/ofproto-dpif.c |   65 +++++++++++++++++++++++++++++++++++------------
 1 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 79e23a4..74c5b9b 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -3336,13 +3336,26 @@ flow_miss_should_make_facet(struct flow_miss *miss, struct flow_wildcards *wc)
                                         list_size(&miss->packets));
 }
 
+static inline void
+buf_set_batch_index(struct ofpbuf *buf, size_t idx)
+{
+    buf->private_p = (void*) idx;
+}
+
+static inline size_t
+buf_get_batch_index(struct ofpbuf *buf)
+{
+    return (size_t) buf->private_p;
+}
+
 /* Handles 'miss' without creating a facet or subfacet or creating any datapath
  * flow.  'miss->flow' must have matched 'rule' and been xlated into 'xout'.
- * May add an "execute" operation to 'ops' and increment '*n_ops'. */
+ * May add an "execute" operation to item of exec_ops in accordance to
+ * packet index in batch. */
 static void
 handle_flow_miss_without_facet(struct rule_dpif *rule, struct xlate_out *xout,
                                struct flow_miss *miss,
-                               struct flow_miss_op *ops, size_t *n_ops)
+                               struct flow_miss_op *exec_ops)
 {
     struct ofpbuf *packet;
 
@@ -3361,7 +3374,7 @@ handle_flow_miss_without_facet(struct rule_dpif *rule, struct xlate_out *xout,
         }
 
         if (xout->odp_actions.size) {
-            struct flow_miss_op *op = &ops[*n_ops];
+            struct flow_miss_op *op = &exec_ops[buf_get_batch_index(packet)];
             struct dpif_execute *execute = &op->dpif_op.u.execute;
 
             init_flow_miss_execute_op(miss, packet, op);
@@ -3369,14 +3382,13 @@ handle_flow_miss_without_facet(struct rule_dpif *rule, struct xlate_out *xout,
             execute->actions = op->xout.odp_actions.data;
             execute->actions_len = op->xout.odp_actions.size;
             op->xout_garbage = true;
-
-            (*n_ops)++;
         }
     }
 }
 
 /* Handles 'miss', which matches 'facet'.  May add any required datapath
- * operations to 'ops', incrementing '*n_ops' for each new op.
+ * operations to 'ops', incrementing '*n_ops' for each new op. May add execute
+ * operations to items of exec_ops in accordance with packet index in batch.
  *
  * All of the packets in 'miss' are considered to have arrived at time 'now'.
  * This is really important only for new facets: if we just called time_msec()
@@ -3389,7 +3401,8 @@ handle_flow_miss_without_facet(struct rule_dpif *rule, struct xlate_out *xout,
 static void
 handle_flow_miss_with_facet(struct flow_miss *miss, struct facet *facet,
                             long long int now, struct dpif_flow_stats *stats,
-                            struct flow_miss_op *ops, size_t *n_ops)
+                            struct flow_miss_op *ops, size_t *n_ops,
+                            struct flow_miss_op *exec_ops)
 {
     enum subfacet_path want_path;
     struct subfacet *subfacet;
@@ -3398,7 +3411,7 @@ handle_flow_miss_with_facet(struct flow_miss *miss, struct facet *facet,
     want_path = facet->xout.slow ? SF_SLOW_PATH : SF_FAST_PATH;
 
     LIST_FOR_EACH (packet, list_node, &miss->packets) {
-        struct flow_miss_op *op = &ops[*n_ops];
+        struct flow_miss_op *op = &exec_ops[buf_get_batch_index(packet)];
 
         handle_flow_miss_common(miss->ofproto, packet, &miss->flow,
                                 facet->fail_open);
@@ -3418,7 +3431,6 @@ handle_flow_miss_with_facet(struct flow_miss *miss, struct facet *facet,
             init_flow_miss_execute_op(miss, packet, op);
             execute->actions = facet->xout.odp_actions.data,
             execute->actions_len = facet->xout.odp_actions.size;
-            (*n_ops)++;
         }
     }
 
@@ -3480,7 +3492,7 @@ handle_flow_miss_with_facet(struct flow_miss *miss, struct facet *facet,
  * to 'ops', incrementing '*n_ops' for each new op. */
 static void
 handle_flow_miss(struct flow_miss *miss, struct flow_miss_op *ops,
-                 size_t *n_ops)
+                 size_t *n_ops, struct flow_miss_op *exec_ops)
 {
     struct ofproto_dpif *ofproto = miss->ofproto;
     struct dpif_flow_stats stats__;
@@ -3523,14 +3535,14 @@ handle_flow_miss(struct flow_miss *miss, struct flow_miss_op *ops,
          * skip facet creation, avoiding the problem altogether. */
         if (miss->key_fitness == ODP_FIT_TOO_LITTLE
             || !flow_miss_should_make_facet(miss, &xout.wc)) {
-            handle_flow_miss_without_facet(rule, &xout, miss, ops, n_ops);
+            handle_flow_miss_without_facet(rule, &xout, miss, exec_ops);
             return;
         }
 
         facet = facet_create(miss, rule, &xout, stats);
         stats = NULL;
     }
-    handle_flow_miss_with_facet(miss, facet, now, stats, ops, n_ops);
+    handle_flow_miss_with_facet(miss, facet, now, stats, ops, n_ops, exec_ops);
 }
 
 static struct drop_key *
@@ -3673,12 +3685,15 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
     struct dpif_upcall *upcall;
     struct flow_miss *miss;
     struct flow_miss misses[FLOW_MISS_MAX_BATCH];
-    struct flow_miss_op flow_miss_ops[FLOW_MISS_MAX_BATCH * 2];
+    struct flow_miss_op flow_miss_ops[FLOW_MISS_MAX_BATCH];
+    struct flow_miss_op flow_miss_exec_ops[FLOW_MISS_MAX_BATCH];
     struct dpif_op *dpif_ops[FLOW_MISS_MAX_BATCH * 2];
     struct hmap todo;
     int n_misses;
     size_t n_ops;
+    size_t n_exec_ops;
     size_t i;
+    size_t idx;
 
     if (!n_upcalls) {
         return;
@@ -3691,7 +3706,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
      * that we can process them together. */
     hmap_init(&todo);
     n_misses = 0;
-    for (upcall = upcalls; upcall < &upcalls[n_upcalls]; upcall++) {
+    for (idx = 0; idx < n_upcalls; idx++) {
         struct flow_miss *miss = &misses[n_misses];
         struct flow_miss *existing_miss;
         struct ofproto_dpif *ofproto;
@@ -3699,6 +3714,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
         struct flow flow;
         uint32_t hash;
         int error;
+        upcall = &upcalls[idx];
 
         error = ofproto_receive(backer, upcall->packet, upcall->key,
                                 upcall->key_len, &flow, &miss->key_fitness,
@@ -3752,6 +3768,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
         } else {
             miss = existing_miss;
         }
+        buf_set_batch_index(upcall->packet, idx);
         list_push_back(&miss->packets, &upcall->packet->list_node);
     }
 
@@ -3759,15 +3776,22 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
      * operations to batch. */
     n_ops = 0;
     HMAP_FOR_EACH (miss, hmap_node, &todo) {
-        handle_flow_miss(miss, flow_miss_ops, &n_ops);
+        handle_flow_miss(miss, flow_miss_ops, &n_ops, flow_miss_exec_ops);
     }
     ovs_assert(n_ops <= ARRAY_SIZE(flow_miss_ops));
 
     /* Execute batch. */
+    n_exec_ops = 0;
+    for (i = 0; i < ARRAY_SIZE(flow_miss_exec_ops); i++) {
+        if(flow_miss_exec_ops[i].dpif_op.u.execute.actions_len > 0) {
+            dpif_ops[n_exec_ops++] = &flow_miss_exec_ops[i].dpif_op;
+        }
+    }
+
     for (i = 0; i < n_ops; i++) {
-        dpif_ops[i] = &flow_miss_ops[i].dpif_op;
+        dpif_ops[n_exec_ops + i] = &flow_miss_ops[i].dpif_op;
     }
-    dpif_operate(backer->dpif, dpif_ops, n_ops);
+    dpif_operate(backer->dpif, dpif_ops, n_exec_ops + n_ops);
 
     for (i = 0; i < n_ops; i++) {
         if (dpif_ops[i]->error != 0
@@ -3797,6 +3821,13 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
             xlate_out_uninit(&flow_miss_ops[i].xout);
         }
     }
+
+    for (i = 0; i < ARRAY_SIZE(flow_miss_exec_ops); i++) {
+        if(flow_miss_exec_ops[i].xout_garbage) {
+            xlate_out_uninit(&flow_miss_exec_ops[i].xout);
+        }
+    }
+
     hmap_destroy(&todo);
 }
 
-- 
1.7.1




More information about the dev mailing list