[ovs-dev] [async 6/6] ofproto: Implement asynchronous OFPT_FLOW_MOD commands.

Ben Pfaff blp at nicira.com
Fri May 27 21:25:22 UTC 2011


Some switching hardware takes a very long time to update its forwarding
rules, up to hundreds of milliseconds.  It is undesirable for Open
vSwitch to block waiting this long for individual OpenFlow flow table
modification commands to complete.  This commit enables ofproto to queue
up any number of independent flow table operations with asynchronous
completion.

I tested earlier versions of this commit using the "ofproto/clog" and
"ofproto/unclog" commands that it implements in the software switch
implementation.  I have not tested the current version very much at all.

I'm looking for feedback on the interface and the implementation.

I'm aware that this version leaks flows and operations when ofprotos are
destroyed.  I'll fix that once I have a little more confidence that this
is a good approach overall.

CC: Casey Barker <crbarker at google.com>
CC: Rajiv Ramanathan <rajivr at google.com>
---
 ofproto/connmgr.c      |  160 ++++++++--
 ofproto/connmgr.h      |   13 +-
 ofproto/in-band.c      |  295 ++++++++-----------
 ofproto/in-band.h      |    1 -
 ofproto/ofproto-dpif.c |  105 ++++++--
 ofproto/ofproto.c      |  762 +++++++++++++++++++++++++++++++++++-------------
 ofproto/private.h      |  184 +++++++++---
 7 files changed, 1046 insertions(+), 474 deletions(-)

diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index bdebd3b..745e0a6 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -40,8 +40,6 @@
 VLOG_DEFINE_THIS_MODULE(connmgr);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
-COVERAGE_DEFINE(ofconn_stuck);
-
 /* An OpenFlow connection. */
 struct ofconn {
     struct connmgr *connmgr;    /* Connection's manager. */
@@ -51,6 +49,11 @@ struct ofconn {
     enum nx_flow_format flow_format; /* Currently selected flow format. */
     bool flow_mod_table_id;     /* NXT_FLOW_MOD_TABLE_ID enabled? */
 
+    /* Asynchronous flow table operation support. */
+    struct list opgroups;       /* Contains pending "ofopgroups", if any. */
+    struct ofpbuf *blocked;     /* Postponed OpenFlow message, if any. */
+    bool retry;                 /* True if 'blocked' is ready to try again. */
+
     /* OFPT_PACKET_IN related data. */
     struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
 #define N_SCHEDULERS 2
@@ -78,9 +81,9 @@ static void ofconn_reconfigure(struct ofconn *,
                                const struct ofproto_controller *);
 
 static void ofconn_run(struct ofconn *,
-                       void (*handle_openflow)(struct ofconn *,
+                       bool (*handle_openflow)(struct ofconn *,
                                                struct ofpbuf *ofp_msg));
-static void ofconn_wait(struct ofconn *);
+static void ofconn_wait(struct ofconn *, bool handling_openflow);
 
 static const char *ofconn_get_target(const struct ofconn *);
 static char *ofconn_make_name(const struct connmgr *, const char *target);
@@ -216,19 +219,29 @@ connmgr_destroy(struct connmgr *mgr)
     free(mgr);
 }
 
-/* Does all of the periodic maintenance required by 'mgr'.  Calls
- * 'handle_openflow' for each message received on an OpenFlow connection,
- * passing along the OpenFlow connection itself and the message that was sent.
- * The 'handle_openflow' callback must not free the message. */
+/* Does all of the periodic maintenance required by 'mgr'.
+ *
+ * If 'handle_openflow' is nonnull, calls 'handle_openflow' for each message
+ * received on an OpenFlow connection, passing along the OpenFlow connection
+ * itself and the message that was sent.  If 'handle_openflow' returns true,
+ * the message is considered to be fully processed.  If 'handle_openflow'
+ * returns false, the message is considered not to have been processed at all;
+ * it will be stored and re-presented to 'handle_openflow' following the next
+ * call to connmgr_retry().  'handle_openflow' must not modify or free the
+ * message.
+ *
+ * If 'handle_openflow' is NULL, no OpenFlow messages will be processed and
+ * other activities that could affect the flow table (in-band processing,
+ * fail-open processing) are suppressed too. */
 void
 connmgr_run(struct connmgr *mgr,
-            void (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+            bool (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
 {
     struct ofconn *ofconn, *next_ofconn;
     struct ofservice *ofservice;
     size_t i;
 
-    if (mgr->in_band) {
+    if (handle_openflow && mgr->in_band) {
         if (time_msec() >= mgr->next_in_band_update) {
             update_in_band_remotes(mgr);
         }
@@ -241,7 +254,7 @@ connmgr_run(struct connmgr *mgr,
 
     /* Fail-open maintenance.  Do this after processing the ofconns since
      * fail-open checks the status of the controller rconn. */
-    if (mgr->fail_open) {
+    if (handle_openflow && mgr->fail_open) {
         fail_open_run(mgr->fail_open);
     }
 
@@ -280,22 +293,26 @@ connmgr_run(struct connmgr *mgr,
     }
 }
 
-/* Causes the poll loop to wake up when connmgr_run() needs to run. */
+/* Causes the poll loop to wake up when connmgr_run() needs to run.
+ *
+ * If 'handling_openflow' is true, arriving OpenFlow messages and other
+ * activities that affect the flow table will wake up the poll loop.  If
+ * 'handling_openflow' is false, they will not. */
 void
-connmgr_wait(struct connmgr *mgr)
+connmgr_wait(struct connmgr *mgr, bool handling_openflow)
 {
     struct ofservice *ofservice;
     struct ofconn *ofconn;
     size_t i;
 
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
-        ofconn_wait(ofconn);
+        ofconn_wait(ofconn, handling_openflow);
     }
-    if (mgr->in_band) {
+    if (handling_openflow && mgr->in_band) {
         poll_timer_wait_until(mgr->next_in_band_update);
         in_band_wait(mgr->in_band);
     }
-    if (mgr->fail_open) {
+    if (handling_openflow && mgr->fail_open) {
         fail_open_wait(mgr->fail_open);
     }
     HMAP_FOR_EACH (ofservice, node, &mgr->services) {
@@ -312,6 +329,19 @@ ofconn_get_ofproto(const struct ofconn *ofconn)
 {
     return ofconn->connmgr->ofproto;
 }
+
+/* If processing of OpenFlow messages was blocked on any 'mgr' ofconns by
+ * returning false to the 'handle_openflow' callback to connmgr_run(), this
+ * re-enables them. */
+void
+connmgr_retry(struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        ofconn->retry = true;
+    }
+}
 
 /* OpenFlow configuration. */
 
@@ -798,6 +828,35 @@ ofconn_pktbuf_retrieve(struct ofconn *ofconn, uint32_t id,
 {
     return pktbuf_retrieve(ofconn->pktbuf, id, bufferp, in_port);
 }
+
+/* Returns true if 'ofconn' has any pending opgroups. */
+bool
+ofconn_has_pending_opgroups(const struct ofconn *ofconn)
+{
+    return !list_is_empty(&ofconn->opgroups);
+}
+
+/* Returns the number of pending opgroups on 'ofconn'. */
+size_t
+ofconn_n_pending_opgroups(const struct ofconn *ofconn)
+{
+    return list_size(&ofconn->opgroups);
+}
+
+/* Adds 'ofconn_node' to 'ofconn''s list of pending opgroups.
+ *
+ * If 'ofconn' is destroyed or its connection drops, then 'ofconn' will remove
+ * 'ofconn_node' from the list and re-initialize it with list_init().  The
+ * client may, therefore, use list_is_empty(ofconn_node) to determine whether
+ * 'ofconn_node' is still associated with an active ofconn.
+ *
+ * The client may also remove ofconn_node from the list itself, with
+ * list_remove(). */
+void
+ofconn_add_opgroup(struct ofconn *ofconn, struct list *ofconn_node)
+{
+    list_push_back(&ofconn->opgroups, ofconn_node);
+}
 
 /* Private ofconn functions. */
 
@@ -817,6 +876,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type)
     ofconn->type = type;
     ofconn->flow_format = NXFF_OPENFLOW10;
     ofconn->flow_mod_table_id = false;
+    list_init(&ofconn->opgroups);
     ofconn->role = NX_ROLE_OTHER;
     ofconn->packet_in_counter = rconn_packet_counter_create ();
     ofconn->pktbuf = NULL;
@@ -825,9 +885,27 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type)
     return ofconn;
 }
 
+/* Disassociates 'ofconn' from all of the ofopgroups that it initiated that
+ * have not yet completed.  (Those ofopgroups will still run to completion in
+ * the usual way, but any errors that they run into will not be reported on any
+ * OpenFlow channel.)
+ *
+ * Also discards any blocked operation on 'ofconn'. */
+static void
+ofconn_flush(struct ofconn *ofconn)
+{
+    while (!list_is_empty(&ofconn->opgroups)) {
+        list_init(list_pop_front(&ofconn->opgroups));
+    }
+    ofpbuf_delete(ofconn->blocked);
+    ofconn->blocked = NULL;
+}
+
 static void
 ofconn_destroy(struct ofconn *ofconn)
 {
+    ofconn_flush(ofconn);
+
     if (ofconn->type == OFCONN_PRIMARY) {
         hmap_remove(&ofconn->connmgr->controllers, &ofconn->hmap_node);
     }
@@ -857,12 +935,20 @@ ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c)
     ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
 }
 
+/* Returns true if it makes sense for 'ofconn' to receive and process OpenFlow
+ * messages. */
+static bool
+ofconn_may_recv(const struct ofconn *ofconn)
+{
+    int count = rconn_packet_counter_read (ofconn->reply_counter);
+    return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
+}
+
 static void
 ofconn_run(struct ofconn *ofconn,
-           void (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+           bool (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
 {
     struct connmgr *mgr = ofconn->connmgr;
-    int iteration;
     size_t i;
 
     for (i = 0; i < N_SCHEDULERS; i++) {
@@ -871,29 +957,40 @@ ofconn_run(struct ofconn *ofconn,
 
     rconn_run(ofconn->rconn);
 
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        /* Limit the number of iterations to prevent other tasks from
-         * starving. */
-        for (iteration = 0; iteration < 50; iteration++) {
-            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
+    if (handle_openflow) {
+        /* Limit the number of iterations to avoid starving other tasks. */
+        for (i = 0; i < 50 && ofconn_may_recv(ofconn); i++) {
+            struct ofpbuf *of_msg;
+
+            of_msg = (ofconn->blocked
+                      ? ofconn->blocked
+                      : rconn_recv(ofconn->rconn));
             if (!of_msg) {
                 break;
             }
             if (mgr->fail_open) {
                 fail_open_maybe_recover(mgr->fail_open);
             }
-            handle_openflow(ofconn, of_msg);
-            ofpbuf_delete(of_msg);
+
+            if (handle_openflow(ofconn, of_msg)) {
+                ofpbuf_delete(of_msg);
+                ofconn->blocked = NULL;
+            } else {
+                ofconn->blocked = of_msg;
+                ofconn->retry = false;
+            }
         }
     }
 
     if (!rconn_is_alive(ofconn->rconn)) {
         ofconn_destroy(ofconn);
+    } else if (!rconn_is_connected(ofconn->rconn)) {
+        ofconn_flush(ofconn);
     }
 }
 
 static void
-ofconn_wait(struct ofconn *ofconn)
+ofconn_wait(struct ofconn *ofconn, bool handling_openflow)
 {
     int i;
 
@@ -901,10 +998,8 @@ ofconn_wait(struct ofconn *ofconn)
         pinsched_wait(ofconn->schedulers[i]);
     }
     rconn_run_wait(ofconn->rconn);
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+    if (handling_openflow && ofconn_may_recv(ofconn)) {
         rconn_recv_wait(ofconn->rconn);
-    } else {
-        COVERAGE_INC(ofconn_stuck);
     }
 }
 
@@ -1300,13 +1395,12 @@ connmgr_may_set_up_flow(struct connmgr *mgr, const struct flow *flow,
 /* Fail-open and in-band implementation. */
 
 /* Called by 'ofproto' after all flows have been flushed, to allow fail-open
- * and in-band control to re-create their flows. */
+ * and standalone mode to re-create their flows.
+ *
+ * In-band control has more sophisticated code that manages flows itself. */
 void
 connmgr_flushed(struct connmgr *mgr)
 {
-    if (mgr->in_band) {
-        in_band_flushed(mgr->in_band);
-    }
     if (mgr->fail_open) {
         fail_open_flushed(mgr->fail_open);
     }
diff --git a/ofproto/connmgr.h b/ofproto/connmgr.h
index 4ac564c..58b1ae3 100644
--- a/ofproto/connmgr.h
+++ b/ofproto/connmgr.h
@@ -24,6 +24,7 @@
 #include "openvswitch/types.h"
 
 struct ofconn;
+struct ofopgroup;
 struct ofputil_flow_removed;
 struct ofputil_packet_in;
 struct sset;
@@ -54,12 +55,14 @@ struct connmgr *connmgr_create(struct ofproto *ofproto,
 void connmgr_destroy(struct connmgr *);
 
 void connmgr_run(struct connmgr *,
-                 void (*handle_openflow)(struct ofconn *,
+                 bool (*handle_openflow)(struct ofconn *,
                                          struct ofpbuf *ofp_msg));
-void connmgr_wait(struct connmgr *);
+void connmgr_wait(struct connmgr *, bool handling_openflow);
 
 struct ofproto *ofconn_get_ofproto(const struct ofconn *);
 
+void connmgr_retry(struct connmgr *);
+
 /* OpenFlow configuration. */
 bool connmgr_has_controllers(const struct connmgr *);
 void connmgr_get_controller_info(struct connmgr *, struct shash *);
@@ -94,6 +97,12 @@ void ofconn_send_error(const struct ofconn *, const struct ofp_header *request,
 int ofconn_pktbuf_retrieve(struct ofconn *, uint32_t id,
                            struct ofpbuf **bufferp, uint16_t *in_port);
 
+size_t ofconn_n_pending_opgroups(const struct ofconn *);
+bool ofconn_has_pending_opgroups(const struct ofconn *);
+void ofconn_add_opgroup(struct ofconn *, struct list *);
+void ofconn_remove_opgroup(struct ofconn *, struct list *,
+                           const struct ofp_header *request, int error);
+
 /* Sending asynchronous messages. */
 void connmgr_send_port_status(struct connmgr *, const struct ofp_phy_port *,
                               uint8_t reason);
diff --git a/ofproto/in-band.c b/ofproto/in-band.c
index 710aadd..30385b2 100644
--- a/ofproto/in-band.c
+++ b/ofproto/in-band.c
@@ -73,9 +73,21 @@ struct in_band_remote {
     struct netdev *remote_netdev; /* Device to send to next-hop MAC. */
 };
 
+/* What to do to an in_band_rule. */
+enum in_band_op {
+    ADD,                       /* Add the rule to ofproto's flow table. */
+    DELETE                     /* Delete the rule from ofproto's flow table. */
+};
+
+/* A rule to add to or delete from ofproto's flow table.  */
+struct in_band_rule {
+    struct cls_rule cls_rule;
+    enum in_band_op op;
+};
+
 struct in_band {
     struct ofproto *ofproto;
-    int queue_id, prev_queue_id;
+    int queue_id;
 
     /* Remote information. */
     time_t next_remote_refresh; /* Refresh timer. */
@@ -87,12 +99,8 @@ struct in_band {
     uint8_t local_mac[ETH_ADDR_LEN]; /* Current MAC. */
     struct netdev *local_netdev;     /* Local port's network device. */
 
-    /* Local and remote addresses that are installed as flows. */
-    uint8_t installed_local_mac[ETH_ADDR_LEN];
-    struct sockaddr_in *remote_addrs;
-    size_t n_remote_addrs;
-    uint8_t *remote_macs;
-    size_t n_remote_macs;
+    /* Flow tracking. */
+    struct hmap rules;          /* Contains "struct in_band_rule"s. */
 };
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
@@ -271,47 +279,68 @@ in_band_rule_check(const struct flow *flow,
 }
 
 static void
-make_rules(struct in_band *ib,
-           void (*cb)(struct in_band *, const struct cls_rule *))
+add_rule(struct in_band *ib, const struct cls_rule *cls_rule)
 {
+    uint32_t hash = cls_rule_hash(cls_rule, 0);
+    struct in_band_rule *rule;
+
+    HMAP_FOR_EACH_WITH_HASH (rule, cls_rule.hmap_node, hash, &ib->rules) {
+        if (cls_rule_equal(&rule->cls_rule, cls_rule)) {
+            rule->op = ADD;
+            return;
+        }
+    }
+
+    rule = xmalloc(sizeof *rule);
+    rule->cls_rule = *cls_rule;
+    rule->op = ADD;
+    hmap_insert(&ib->rules, &rule->cls_rule.hmap_node, hash);
+}
+
+static void
+update_rules(struct in_band *ib)
+{
+    struct in_band_rule *ib_rule;
+    struct in_band_remote *r;
     struct cls_rule rule;
-    size_t i;
 
-    if (!eth_addr_is_zero(ib->installed_local_mac)) {
+    /* Mark all the existing rules for deletion.  (Afterward we will re-add any
+     * rules that are still valid.) */
+    HMAP_FOR_EACH (ib_rule, cls_rule.hmap_node, &ib->rules) {
+        ib_rule->op = DELETE;
+    }
+
+    if (!eth_addr_is_zero(ib->local_mac)) {
         /* (a) Allow DHCP requests sent from the local port. */
         cls_rule_init_catchall(&rule, IBR_FROM_LOCAL_DHCP);
         cls_rule_set_in_port(&rule, ODPP_LOCAL);
         cls_rule_set_dl_type(&rule, htons(ETH_TYPE_IP));
-        cls_rule_set_dl_src(&rule, ib->installed_local_mac);
+        cls_rule_set_dl_src(&rule, ib->local_mac);
         cls_rule_set_nw_proto(&rule, IPPROTO_UDP);
         cls_rule_set_tp_src(&rule, htons(DHCP_CLIENT_PORT));
         cls_rule_set_tp_dst(&rule, htons(DHCP_SERVER_PORT));
-        cb(ib, &rule);
+        add_rule(ib, &rule);
 
         /* (b) Allow ARP replies to the local port's MAC address. */
         cls_rule_init_catchall(&rule, IBR_TO_LOCAL_ARP);
         cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
-        cls_rule_set_dl_dst(&rule, ib->installed_local_mac);
+        cls_rule_set_dl_dst(&rule, ib->local_mac);
         cls_rule_set_nw_proto(&rule, ARP_OP_REPLY);
-        cb(ib, &rule);
+        add_rule(ib, &rule);
 
         /* (c) Allow ARP requests from the local port's MAC address.  */
         cls_rule_init_catchall(&rule, IBR_FROM_LOCAL_ARP);
         cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
-        cls_rule_set_dl_src(&rule, ib->installed_local_mac);
+        cls_rule_set_dl_src(&rule, ib->local_mac);
         cls_rule_set_nw_proto(&rule, ARP_OP_REQUEST);
-        cb(ib, &rule);
+        add_rule(ib, &rule);
     }
 
-    for (i = 0; i < ib->n_remote_macs; i++) {
-        const uint8_t *remote_mac = &ib->remote_macs[i * ETH_ADDR_LEN];
+    for (r = ib->remotes; r < &ib->remotes[ib->n_remotes]; r++) {
+        const uint8_t *remote_mac = r->remote_mac;
 
-        if (i > 0) {
-            const uint8_t *prev_mac = &ib->remote_macs[(i - 1) * ETH_ADDR_LEN];
-            if (eth_addr_equals(remote_mac, prev_mac)) {
-                /* Skip duplicates. */
-                continue;
-            }
+        if (eth_addr_is_zero(remote_mac)) {
+            continue;
         }
 
         /* (d) Allow ARP replies to the next hop's MAC address. */
@@ -319,184 +348,104 @@ make_rules(struct in_band *ib,
         cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
         cls_rule_set_dl_dst(&rule, remote_mac);
         cls_rule_set_nw_proto(&rule, ARP_OP_REPLY);
-        cb(ib, &rule);
+        add_rule(ib, &rule);
 
         /* (e) Allow ARP requests from the next hop's MAC address. */
         cls_rule_init_catchall(&rule, IBR_FROM_NEXT_HOP_ARP);
         cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
         cls_rule_set_dl_src(&rule, remote_mac);
         cls_rule_set_nw_proto(&rule, ARP_OP_REQUEST);
-        cb(ib, &rule);
+        add_rule(ib, &rule);
     }
 
-    for (i = 0; i < ib->n_remote_addrs; i++) {
-        const struct sockaddr_in *a = &ib->remote_addrs[i];
-
-        if (!i || a->sin_addr.s_addr != a[-1].sin_addr.s_addr) {
-            /* (f) Allow ARP replies containing the remote's IP address as a
-             * target. */
-            cls_rule_init_catchall(&rule, IBR_TO_REMOTE_ARP);
-            cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
-            cls_rule_set_nw_proto(&rule, ARP_OP_REPLY);
-            cls_rule_set_nw_dst(&rule, a->sin_addr.s_addr);
-            cb(ib, &rule);
-
-            /* (g) Allow ARP requests containing the remote's IP address as a
-             * source. */
-            cls_rule_init_catchall(&rule, IBR_FROM_REMOTE_ARP);
-            cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
-            cls_rule_set_nw_proto(&rule, ARP_OP_REQUEST);
-            cls_rule_set_nw_src(&rule, a->sin_addr.s_addr);
-            cb(ib, &rule);
-        }
-
-        if (!i
-            || a->sin_addr.s_addr != a[-1].sin_addr.s_addr
-            || a->sin_port != a[-1].sin_port) {
-            /* (h) Allow TCP traffic to the remote's IP and port. */
-            cls_rule_init_catchall(&rule, IBR_TO_REMOTE_TCP);
-            cls_rule_set_dl_type(&rule, htons(ETH_TYPE_IP));
-            cls_rule_set_nw_proto(&rule, IPPROTO_TCP);
-            cls_rule_set_nw_dst(&rule, a->sin_addr.s_addr);
-            cls_rule_set_tp_dst(&rule, a->sin_port);
-            cb(ib, &rule);
-
-            /* (i) Allow TCP traffic from the remote's IP and port. */
-            cls_rule_init_catchall(&rule, IBR_FROM_REMOTE_TCP);
-            cls_rule_set_dl_type(&rule, htons(ETH_TYPE_IP));
-            cls_rule_set_nw_proto(&rule, IPPROTO_TCP);
-            cls_rule_set_nw_src(&rule, a->sin_addr.s_addr);
-            cls_rule_set_tp_src(&rule, a->sin_port);
-            cb(ib, &rule);
-        }
-    }
-}
-
-static void
-drop_rule(struct in_band *ib, const struct cls_rule *rule)
-{
-    ofproto_delete_flow(ib->ofproto, rule);
-}
+    for (r = ib->remotes; r < &ib->remotes[ib->n_remotes]; r++) {
+        const struct sockaddr_in *a = &r->remote_addr;
 
-/* Drops from the flow table all of the flows set up by 'ib', then clears out
- * the information about the installed flows so that they can be filled in
- * again if necessary. */
-static void
-drop_rules(struct in_band *ib)
-{
-    /* Drop rules. */
-    make_rules(ib, drop_rule);
+        /* (f) Allow ARP replies containing the remote's IP address as a
+         * target. */
+        cls_rule_init_catchall(&rule, IBR_TO_REMOTE_ARP);
+        cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
+        cls_rule_set_nw_proto(&rule, ARP_OP_REPLY);
+        cls_rule_set_nw_dst(&rule, a->sin_addr.s_addr);
+        add_rule(ib, &rule);
 
-    /* Clear out state. */
-    memset(ib->installed_local_mac, 0, sizeof ib->installed_local_mac);
+        /* (g) Allow ARP requests containing the remote's IP address as a
+         * source. */
+        cls_rule_init_catchall(&rule, IBR_FROM_REMOTE_ARP);
+        cls_rule_set_dl_type(&rule, htons(ETH_TYPE_ARP));
+        cls_rule_set_nw_proto(&rule, ARP_OP_REQUEST);
+        cls_rule_set_nw_src(&rule, a->sin_addr.s_addr);
+        add_rule(ib, &rule);
 
-    free(ib->remote_addrs);
-    ib->remote_addrs = NULL;
-    ib->n_remote_addrs = 0;
+        /* (h) Allow TCP traffic to the remote's IP and port. */
+        cls_rule_init_catchall(&rule, IBR_TO_REMOTE_TCP);
+        cls_rule_set_dl_type(&rule, htons(ETH_TYPE_IP));
+        cls_rule_set_nw_proto(&rule, IPPROTO_TCP);
+        cls_rule_set_nw_dst(&rule, a->sin_addr.s_addr);
+        cls_rule_set_tp_dst(&rule, a->sin_port);
+        add_rule(ib, &rule);
 
-    free(ib->remote_macs);
-    ib->remote_macs = NULL;
-    ib->n_remote_macs = 0;
+        /* (i) Allow TCP traffic from the remote's IP and port. */
+        cls_rule_init_catchall(&rule, IBR_FROM_REMOTE_TCP);
+        cls_rule_set_dl_type(&rule, htons(ETH_TYPE_IP));
+        cls_rule_set_nw_proto(&rule, IPPROTO_TCP);
+        cls_rule_set_nw_src(&rule, a->sin_addr.s_addr);
+        cls_rule_set_tp_src(&rule, a->sin_port);
+        add_rule(ib, &rule);
+    }
 }
 
-static void
-add_rule(struct in_band *ib, const struct cls_rule *rule)
+void
+in_band_run(struct in_band *ib)
 {
     struct {
         struct nx_action_set_queue nxsq;
         union ofp_action oa;
     } actions;
+    const void *a;
+    size_t na;
 
-    memset(&actions, 0, sizeof actions);
+    struct in_band_rule *rule, *next;
 
+    memset(&actions, 0, sizeof actions);
     actions.oa.output.type = htons(OFPAT_OUTPUT);
     actions.oa.output.len = htons(sizeof actions.oa);
     actions.oa.output.port = htons(OFPP_NORMAL);
     actions.oa.output.max_len = htons(0);
-
     if (ib->queue_id < 0) {
-        ofproto_add_flow(ib->ofproto, rule, &actions.oa, 1);
+        a = &actions.oa;
+        na = sizeof actions.oa / sizeof(union ofp_action);
     } else {
         actions.nxsq.type = htons(OFPAT_VENDOR);
         actions.nxsq.len = htons(sizeof actions.nxsq);
         actions.nxsq.vendor = htonl(NX_VENDOR_ID);
         actions.nxsq.subtype = htons(NXAST_SET_QUEUE);
         actions.nxsq.queue_id = htonl(ib->queue_id);
-
-        ofproto_add_flow(ib->ofproto, rule, (union ofp_action *) &actions,
-                         sizeof actions / sizeof(union ofp_action));
+        a = &actions;
+        na = sizeof actions / sizeof(union ofp_action);
     }
-}
 
-/* Inserts flows into the flow table for the current state of 'ib'. */
-static void
-add_rules(struct in_band *ib)
-{
-    make_rules(ib, add_rule);
-}
+    refresh_local(ib);
+    refresh_remotes(ib);
 
-static int
-compare_addrs(const void *a_, const void *b_)
-{
-    const struct sockaddr_in *a = a_;
-    const struct sockaddr_in *b = b_;
-    int cmp;
-
-    cmp = memcmp(&a->sin_addr.s_addr,
-                 &b->sin_addr.s_addr,
-                 sizeof a->sin_addr.s_addr);
-    if (cmp) {
-        return cmp;
-    }
-    return memcmp(&a->sin_port, &b->sin_port, sizeof a->sin_port);
-}
+    update_rules(ib);
 
-static int
-compare_macs(const void *a, const void *b)
-{
-    return eth_addr_compare_3way(a, b);
-}
+    HMAP_FOR_EACH_SAFE (rule, next, cls_rule.hmap_node, &ib->rules) {
+        switch (rule->op) {
+        case ADD:
+            ofproto_add_flow(ib->ofproto, &rule->cls_rule, a, na);
+            break;
 
-void
-in_band_run(struct in_band *ib)
-{
-    bool local_change, remote_change, queue_id_change;
-    struct in_band_remote *r;
-
-    local_change = refresh_local(ib);
-    remote_change = refresh_remotes(ib);
-    queue_id_change = ib->queue_id != ib->prev_queue_id;
-    if (!local_change && !remote_change && !queue_id_change) {
-        /* Nothing changed, nothing to do. */
-        return;
-    }
-    ib->prev_queue_id = ib->queue_id;
-
-    /* Drop old rules. */
-    drop_rules(ib);
-
-    /* Figure out new rules. */
-    memcpy(ib->installed_local_mac, ib->local_mac, ETH_ADDR_LEN);
-    ib->remote_addrs = xmalloc(ib->n_remotes * sizeof *ib->remote_addrs);
-    ib->n_remote_addrs = 0;
-    ib->remote_macs = xmalloc(ib->n_remotes * ETH_ADDR_LEN);
-    ib->n_remote_macs = 0;
-    for (r = ib->remotes; r < &ib->remotes[ib->n_remotes]; r++) {
-        ib->remote_addrs[ib->n_remote_addrs++] = r->remote_addr;
-        if (!eth_addr_is_zero(r->remote_mac)) {
-            memcpy(&ib->remote_macs[ib->n_remote_macs * ETH_ADDR_LEN],
-                   r->remote_mac, ETH_ADDR_LEN);
-            ib->n_remote_macs++;
+        case DELETE:
+            if (ofproto_delete_flow(ib->ofproto, &rule->cls_rule)) {
+                /* ofproto doesn't have the rule anymore so there's no reason
+                 * for us to track it any longer. */
+                hmap_remove(&ib->rules, &rule->cls_rule.hmap_node);
+                free(rule);
+            }
+            break;
         }
     }
-
-    /* Sort, to allow make_rules() to easily skip duplicates. */
-    qsort(ib->remote_addrs, ib->n_remote_addrs, sizeof *ib->remote_addrs,
-          compare_addrs);
-    qsort(ib->remote_macs, ib->n_remote_macs, ETH_ADDR_LEN, compare_macs);
-
-    /* Add new rules. */
-    add_rules(ib);
 }
 
 void
@@ -507,14 +456,6 @@ in_band_wait(struct in_band *in_band)
     poll_timer_wait_until(wakeup * 1000);
 }
 
-/* ofproto has flushed all flows from the flow table and it is calling us back
- * to allow us to reinstall the ones that are important to us. */
-void
-in_band_flushed(struct in_band *in_band)
-{
-    add_rules(in_band);
-}
-
 int
 in_band_create(struct ofproto *ofproto, const char *local_name,
                struct in_band **in_bandp)
@@ -533,10 +474,11 @@ in_band_create(struct ofproto *ofproto, const char *local_name,
 
     in_band = xzalloc(sizeof *in_band);
     in_band->ofproto = ofproto;
-    in_band->queue_id = in_band->prev_queue_id = -1;
+    in_band->queue_id = -1;
     in_band->next_remote_refresh = TIME_MIN;
     in_band->next_local_refresh = TIME_MIN;
     in_band->local_netdev = local_netdev;
+    hmap_init(&in_band->rules);
 
     *in_bandp = in_band;
 
@@ -547,7 +489,12 @@ void
 in_band_destroy(struct in_band *ib)
 {
     if (ib) {
-        drop_rules(ib);
+        struct in_band_rule *rule, *next;
+
+        HMAP_FOR_EACH_SAFE (rule, next, cls_rule.hmap_node, &ib->rules) {
+            hmap_remove(&ib->rules, &rule->cls_rule.hmap_node);
+        }
+        hmap_destroy(&ib->rules);
         in_band_set_remotes(ib, NULL, 0);
         netdev_close(ib->local_netdev);
         free(ib);
diff --git a/ofproto/in-band.h b/ofproto/in-band.h
index 5fa3666..e2d8e80 100644
--- a/ofproto/in-band.h
+++ b/ofproto/in-band.h
@@ -42,6 +42,5 @@ bool in_band_msg_in_hook(struct in_band *, const struct flow *,
                          const struct ofpbuf *packet);
 bool in_band_rule_check(const struct flow *,
                         const struct nlattr *odp_actions, size_t actions_len);
-void in_band_flushed(struct in_band *);
 
 #endif /* in-band.h */
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 2f8d404..a151074 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -299,6 +299,11 @@ static void port_run(struct ofport_dpif *);
 static void port_wait(struct ofport_dpif *);
 static int set_cfm(struct ofport *, const struct cfm_settings *);
 
+struct dpif_completion {
+    struct list list_node;
+    struct ofoperation *op;
+};
+
 struct ofproto_dpif {
     struct ofproto up;
     struct dpif *dpif;
@@ -322,8 +327,15 @@ struct ofproto_dpif {
     struct hmap facets;
     bool need_revalidate;
     struct tag_set revalidate_set;
+
+    /* Support for debugging async flow mods. */
+    struct list completions;
 };
 
+/* Defer flow mod completion until "ovs-appctl ofproto/unclog"?  (Useful only
+ * for debugging the asynchronous flow_mod implementation.) */
+static bool clogged;
+
 static void ofproto_dpif_unixctl_init(void);
 
 static struct ofproto_dpif *
@@ -445,6 +457,8 @@ construct(struct ofproto *ofproto_)
     ofproto->need_revalidate = false;
     tag_set_init(&ofproto->revalidate_set);
 
+    list_init(&ofproto->completions);
+
     ofproto->up.tables = xmalloc(sizeof *ofproto->up.tables);
     classifier_init(&ofproto->up.tables[0]);
     ofproto->up.n_tables = 1;
@@ -482,6 +496,16 @@ run(struct ofproto *ofproto_)
     struct ofbundle *bundle;
     int i;
 
+    if (!clogged) {
+        struct dpif_completion *c, *next;
+
+        LIST_FOR_EACH_SAFE (c, next, list_node, &ofproto->completions) {
+            ofoperation_complete(c->op, 0);
+            list_remove(&c->list_node);
+            free(c);
+        }
+    }
+
     dpif_run(ofproto->dpif);
 
     for (i = 0; i < 50; i++) {
@@ -548,6 +572,10 @@ wait(struct ofproto *ofproto_)
     struct ofport_dpif *ofport;
     struct ofbundle *bundle;
 
+    if (!clogged && !list_is_empty(&ofproto->completions)) {
+        poll_immediate_wake();
+    }
+
     dpif_wait(ofproto->dpif);
     dpif_recv_wait(ofproto->dpif);
     if (ofproto->sflow) {
@@ -2526,6 +2554,21 @@ rule_dpif_lookup(struct ofproto_dpif *ofproto, const struct flow *flow)
                                                 flow)));
 }
 
+static void
+complete_operation(struct rule_dpif *rule)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
+
+    ofproto->need_revalidate = true;
+    if (clogged) {
+        struct dpif_completion *c = xmalloc(sizeof *c);
+        c->op = rule->up.pending;
+        list_push_back(&ofproto->completions, &c->list_node);
+    } else {
+        ofoperation_complete(rule->up.pending, 0);
+    }
+}
+
 static struct rule *
 rule_alloc(void)
 {
@@ -2545,7 +2588,7 @@ rule_construct(struct rule *rule_)
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
-    struct rule_dpif *old_rule;
+    struct rule_dpif *victim;
     int error;
 
     error = validate_actions(rule->up.actions, rule->up.n_actions,
@@ -2554,21 +2597,23 @@ rule_construct(struct rule *rule_)
         return error;
     }
 
-    old_rule = rule_dpif_cast(rule_from_cls_rule(classifier_find_rule_exactly(
-                                                     &ofproto->up.tables[0],
-                                                     &rule->up.cr)));
-    if (old_rule) {
-        ofproto_rule_destroy(&old_rule->up);
-    }
-
     rule->used = rule->up.created;
     rule->packet_count = 0;
     rule->byte_count = 0;
     list_init(&rule->facets);
-    classifier_insert(&ofproto->up.tables[0], &rule->up.cr);
 
-    ofproto->need_revalidate = true;
+    victim = rule_dpif_cast(ofoperation_get_victim(rule->up.pending));
+    if (victim) {
+        struct facet *facet;
 
+        rule->facets = victim->facets;
+        list_moved(&rule->facets);
+        LIST_FOR_EACH (facet, list_node, &rule->facets) {
+            facet->rule = rule;
+        }
+    }
+
+    complete_operation(rule);
     return 0;
 }
 
@@ -2579,11 +2624,11 @@ rule_destruct(struct rule *rule_)
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
     struct facet *facet, *next_facet;
 
-    classifier_remove(&ofproto->up.tables[0], &rule->up.cr);
     LIST_FOR_EACH_SAFE (facet, next_facet, list_node, &rule->facets) {
         facet_revalidate(ofproto, facet);
     }
-    ofproto->need_revalidate = true;
+
+    complete_operation(rule);
 }
 
 static void
@@ -2649,20 +2694,21 @@ rule_execute(struct rule *rule_, struct flow *flow, struct ofpbuf *packet)
     return 0;
 }
 
-static int
-rule_modify_actions(struct rule *rule_,
-                    const union ofp_action *actions, size_t n_actions)
+static void
+rule_modify_actions(struct rule *rule_)
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
     int error;
 
-    error = validate_actions(actions, n_actions, &rule->up.cr.flow,
-                             ofproto->max_ports);
-    if (!error) {
-        ofproto->need_revalidate = true;
+    error = validate_actions(rule->up.actions, rule->up.n_actions,
+                             &rule->up.cr.flow, ofproto->max_ports);
+    if (error) {
+        ofoperation_complete(rule->up.pending, error);
+        return;
     }
-    return error;
+
+    complete_operation(rule);
 }
 
 /* Sends 'packet' out of port 'odp_port' within 'p'.
@@ -3917,6 +3963,22 @@ exit:
 }
 
 static void
+ofproto_dpif_clog(struct unixctl_conn *conn OVS_UNUSED,
+                  const char *args_ OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    clogged = true;
+    unixctl_command_reply(conn, 200, NULL);
+}
+
+static void
+ofproto_dpif_unclog(struct unixctl_conn *conn OVS_UNUSED,
+                    const char *args_ OVS_UNUSED, void *aux OVS_UNUSED)
+{
+    clogged = false;
+    unixctl_command_reply(conn, 200, NULL);
+}
+
+static void
 ofproto_dpif_unixctl_init(void)
 {
     static bool registered;
@@ -3927,6 +3989,9 @@ ofproto_dpif_unixctl_init(void)
 
     unixctl_command_register("ofproto/trace", ofproto_unixctl_trace, NULL);
     unixctl_command_register("fdb/show", ofproto_unixctl_fdb_show, NULL);
+
+    unixctl_command_register("ofproto/clog", ofproto_dpif_clog, NULL);
+    unixctl_command_register("ofproto/unclog", ofproto_dpif_unclog, NULL);
 }
 
 const struct ofproto_class ofproto_dpif_class = {
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index bbc47a1..90c93c4 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -59,26 +59,95 @@ COVERAGE_DEFINE(ofproto_reinit_ports);
 COVERAGE_DEFINE(ofproto_uninstallable);
 COVERAGE_DEFINE(ofproto_update_port);
 
+enum ofproto_state {
+    S_OPENFLOW,                 /* Processing OpenFlow commands. */
+    S_FLUSH,                    /* Deleting all flow table rules. */
+};
+
+enum ofoperation_type {
+    OFOPERATION_ADD,
+    OFOPERATION_DELETE,
+    OFOPERATION_MODIFY
+};
+
+/* A single OpenFlow request can execute any number of operations.  The
+ * ofopgroup maintain OpenFlow state common to all of the operations, e.g. the
+ * ofconn to which an error reply should be sent if necessary.
+ *
+ * ofproto initiates some operations internally.  These operations are still
+ * assigned to groups but will not have an associated ofconn. */
+struct ofopgroup {
+    struct ofproto *ofproto;    /* Owning ofproto. */
+    struct list ofproto_node;   /* In ofproto's "pending" list. */
+    struct list ops;            /* List of "struct ofoperation"s. */
+
+    /* Data needed to send OpenFlow reply on failure or to send a buffered
+     * packet on success.
+     *
+     * If list_is_empty(ofconn_node) then this ofopgroup never had an
+     * associated ofconn or its ofconn's connection dropped after it initiated
+     * the operation.  In the latter case 'ofconn' is a wild pointer that
+     * refers to freed memory, so the 'ofconn' member must be used only if
+     * !list_is_empty(ofconn_node).
+     */
+    struct list ofconn_node;    /* In ofconn's list of pending opgroups. */
+    struct ofconn *ofconn;      /* ofconn for reply (but see note above). */
+    struct ofp_header *request; /* Original request (truncated at 64 bytes). */
+    uint32_t buffer_id;         /* Buffer id from original request. */
+    int error;                  /* 0 if no error yet, otherwise error code. */
+};
+
+static struct ofopgroup *ofopgroup_create(struct ofproto *);
+static struct ofopgroup *ofopgroup_create_for_ofconn(struct ofconn *,
+                                                     const struct ofp_header *,
+                                                     uint32_t buffer_id);
+static void ofopgroup_submit(struct ofopgroup *);
+static void ofopgroup_destroy(struct ofopgroup *);
+
+/* A single flow table operation. */
+struct ofoperation {
+    struct ofopgroup *group;    /* Owning group. */
+    struct list group_node;     /* In ofopgroup's "ops" list. */
+    struct hmap_node hmap_node; /* In ofproto's "deletions" hmap. */
+    struct rule *rule;          /* Rule being operated upon. */
+    enum ofoperation_type type; /* Type of operation. */
+    int status;                 /* -1 if pending, otherwise 0 or error code. */
+    struct rule *victim;        /* OFOPERATION_ADDING: Replaced rule. */
+    union ofp_action *actions;  /* OFOPERATION_MODIFYING: Replaced actions. */
+    int n_actions;              /* OFOPERATION_MODIFYING: # of old actions. */
+    ovs_be64 flow_cookie;       /* Rule's old flow cookie. */
+};
+
+static void ofoperation_create(struct ofopgroup *, struct rule *,
+                               enum ofoperation_type);
+static void ofoperation_destroy(struct ofoperation *);
+
 static void ofport_destroy__(struct ofport *);
 static void ofport_destroy(struct ofport *);
 
-static int rule_create(struct ofproto *,
-                       const struct cls_rule *, uint8_t table_id,
-                       const union ofp_action *, size_t n_actions,
-                       uint16_t idle_timeout, uint16_t hard_timeout,
-                       ovs_be64 flow_cookie, bool send_flow_removed,
-                       struct rule **rulep);
-
 static uint64_t pick_datapath_id(const struct ofproto *);
 static uint64_t pick_fallback_dpid(void);
 
 static void ofproto_destroy__(struct ofproto *);
-static void ofproto_flush_flows__(struct ofproto *);
 
 static void ofproto_rule_destroy__(struct rule *);
 static void ofproto_rule_send_removed(struct rule *, uint8_t reason);
 
-static void handle_openflow(struct ofconn *, struct ofpbuf *);
+static void ofopgroup_destroy(struct ofopgroup *);
+
+static int add_flow(struct ofproto *, struct ofconn *, struct flow_mod *,
+                    const struct ofp_header *);
+
+/* This return value tells handle_openflow() that processing of the current
+ * OpenFlow message must be postponed until some ongoing operations have
+ * completed.
+ *
+ * This particular value is a good choice because it is negative (so it won't
+ * collide with any errno value or any value returned by ofp_mkerr()) and large
+ * (so it won't accidentally collide with EOF or a negative errno value). */
+enum { OFPROTO_POSTPONE = -100000 };
+
+static bool handle_openflow(struct ofconn *, struct ofpbuf *);
 
 static void update_port(struct ofproto *, const char *devname);
 static int init_ports(struct ofproto *);
@@ -263,6 +332,9 @@ ofproto_create(const char *datapath_name, const char *datapath_type,
     ofproto->tables = NULL;
     ofproto->n_tables = 0;
     ofproto->connmgr = connmgr_create(ofproto, datapath_name, datapath_name);
+    ofproto->state = S_OPENFLOW;
+    list_init(&ofproto->pending);
+    hmap_init(&ofproto->deletions);
 
     error = ofproto->ofproto_class->construct(ofproto);
     if (error) {
@@ -572,6 +644,34 @@ ofproto_get_snoops(const struct ofproto *ofproto, struct sset *snoops)
 }
 
 static void
+ofproto_flush__(struct ofproto *ofproto)
+{
+    struct classifier *table;
+    struct ofopgroup *group;
+
+    if (ofproto->ofproto_class->flush) {
+        ofproto->ofproto_class->flush(ofproto);
+    }
+
+    group = ofopgroup_create(ofproto);
+    for (table = ofproto->tables; table < &ofproto->tables[ofproto->n_tables];
+         table++) {
+        struct rule *rule, *next_rule;
+        struct cls_cursor cursor;
+
+        cls_cursor_init(&cursor, table, NULL);
+        CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
+            if (!rule->pending) {
+                ofoperation_create(group, rule, OFOPERATION_DELETE);
+                classifier_remove(table, &rule->cr);
+                ofproto->ofproto_class->rule_destruct(rule);
+            }
+        }
+    }
+    ofopgroup_submit(group);
+}
+
+static void
 ofproto_destroy__(struct ofproto *ofproto)
 {
     size_t i;
@@ -594,6 +694,8 @@ ofproto_destroy__(struct ofproto *ofproto)
     }
     free(ofproto->tables);
 
+    hmap_destroy(&ofproto->deletions);
+
     ofproto->ofproto_class->dealloc(ofproto);
 }
 
@@ -606,7 +708,7 @@ ofproto_destroy(struct ofproto *p)
         return;
     }
 
-    ofproto_flush_flows__(p);
+    ofproto_flush__(p);
     HMAP_FOR_EACH_SAFE (ofport, next_ofport, hmap_node, &p->ports) {
         ofport_destroy(ofport);
     }
@@ -668,7 +770,24 @@ ofproto_run(struct ofproto *p)
         process_port_change(p, error, devname);
     }
 
-    connmgr_run(p->connmgr, handle_openflow);
+
+    switch (p->state) {
+    case S_OPENFLOW:
+        connmgr_run(p->connmgr, handle_openflow);
+        break;
+
+    case S_FLUSH:
+        connmgr_run(p->connmgr, NULL);
+        ofproto_flush__(p);
+        if (list_is_empty(&p->pending) && hmap_is_empty(&p->deletions)) {
+            connmgr_flushed(p->connmgr);
+            p->state = S_OPENFLOW;
+        }
+        break;
+
+    default:
+        NOT_REACHED();
+    }
 
     return 0;
 }
@@ -681,7 +800,19 @@ ofproto_wait(struct ofproto *p)
         p->ofproto_class->port_poll_wait(p);
     }
     netdev_monitor_poll_wait(p->netdev_monitor);
-    connmgr_wait(p->connmgr);
+
+    switch (p->state) {
+    case S_OPENFLOW:
+        connmgr_wait(p->connmgr, true);
+        break;
+
+    case S_FLUSH:
+        connmgr_wait(p->connmgr, false);
+        if (list_is_empty(&p->pending) && hmap_is_empty(&p->deletions)) {
+            poll_immediate_wake();
+        }
+        break;
+    }
 }
 
 bool
@@ -866,56 +997,64 @@ ofproto_port_del(struct ofproto *ofproto, uint16_t ofp_port)
  *
  * This is a helper function for in-band control and fail-open. */
 void
-ofproto_add_flow(struct ofproto *p, const struct cls_rule *cls_rule,
+ofproto_add_flow(struct ofproto *ofproto, const struct cls_rule *cls_rule,
                  const union ofp_action *actions, size_t n_actions)
 {
-    struct rule *rule;
-    rule_create(p, cls_rule, 0, actions, n_actions, 0, 0, 0, false, &rule);
+    const struct rule *rule;
+
+    rule = rule_from_cls_rule(classifier_find_rule_exactly(
+                                    &ofproto->tables[0], cls_rule));
+    if (!rule || !ofputil_actions_equal(rule->actions, rule->n_actions,
+                                        actions, n_actions)) {
+        struct flow_mod fm;
+
+        memset(&fm, 0, sizeof fm);
+        fm.cr = *cls_rule;
+        fm.buffer_id = UINT32_MAX;
+        fm.actions = (union ofp_action *) actions;
+        fm.n_actions = n_actions;
+        add_flow(ofproto, NULL, &fm, NULL);
+    }
 }
 
 /* Searches for a rule with matching criteria exactly equal to 'target' in
  * ofproto's table 0 and, if it finds one, deletes it.
  *
  * This is a helper function for in-band control and fail-open. */
-void
+bool
 ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
 {
     struct rule *rule;
 
     rule = rule_from_cls_rule(classifier_find_rule_exactly(
                                   &ofproto->tables[0], target));
-    ofproto_rule_destroy(rule);
-}
-
-static void
-ofproto_flush_flows__(struct ofproto *ofproto)
-{
-    size_t i;
-
-    COVERAGE_INC(ofproto_flush);
-
-    if (ofproto->ofproto_class->flush) {
-        ofproto->ofproto_class->flush(ofproto);
+    if (!rule) {
+        /* No such rule -> success. */
+        return true;
+    } else if (rule->pending) {
+        /* An operation on the rule is already pending -> failure.
+         * Caller must retry later if it's important. */
+        return false;
+    } else {
+        /* Initiate deletion -> success. */
+        struct ofopgroup *group = ofopgroup_create(ofproto);
+        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        classifier_remove(&ofproto->tables[rule->table_id], &rule->cr);
+        rule->ofproto->ofproto_class->rule_destruct(rule);
+        ofopgroup_submit(group);
+        return true;
     }
 
-    for (i = 0; i < ofproto->n_tables; i++) {
-        struct rule *rule, *next_rule;
-        struct cls_cursor cursor;
-
-        cls_cursor_init(&cursor, &ofproto->tables[i], NULL);
-        CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, cr, &cursor) {
-            ofproto_rule_destroy(rule);
-        }
-    }
 }
 
-/* Deletes all of the flows from all of ofproto's flow tables, then
- * reintroduces rules required by in-band control and fail open. */
+/* Starts the process of deleting all of the flows from all of ofproto's flow
+ * tables and then reintroducing the flows required by in-band control and
+ * fail-open.  The process will complete in a later call to ofproto_run(). */
 void
 ofproto_flush_flows(struct ofproto *ofproto)
 {
-    ofproto_flush_flows__(ofproto);
-    connmgr_flushed(ofproto->connmgr);
+    COVERAGE_INC(ofproto_flush);
+    ofproto->state = S_FLUSH;
 }
 
 static void
@@ -1217,71 +1356,6 @@ init_ports(struct ofproto *p)
     return 0;
 }
 
-/* Creates a new rule initialized as specified, inserts it into 'ofproto''s
- * flow table, and stores the new rule into '*rulep'.  Returns 0 on success,
- * otherwise a positive errno value or OpenFlow error code. */
-static int
-rule_create(struct ofproto *ofproto,
-            const struct cls_rule *cls_rule, uint8_t table_id,
-            const union ofp_action *actions, size_t n_actions,
-            uint16_t idle_timeout, uint16_t hard_timeout,
-            ovs_be64 flow_cookie, bool send_flow_removed,
-            struct rule **rulep)
-{
-    struct rule *rule;
-    int error;
-
-    if (table_id == 0xff) {
-        if (ofproto->n_tables > 1) {
-            error = ofproto->ofproto_class->rule_choose_table(ofproto,
-                                                              cls_rule,
-                                                              &table_id);
-            if (error) {
-                return error;
-            }
-            assert(table_id < ofproto->n_tables);
-        } else {
-            table_id = 0;
-        }
-    }
-
-    rule = ofproto->ofproto_class->rule_alloc();
-    if (!rule) {
-        error = ENOMEM;
-        goto error;
-    }
-
-    rule->ofproto = ofproto;
-    rule->cr = *cls_rule;
-    rule->table_id = table_id;
-    rule->flow_cookie = flow_cookie;
-    rule->created = time_msec();
-    rule->idle_timeout = idle_timeout;
-    rule->hard_timeout = hard_timeout;
-    rule->send_flow_removed = send_flow_removed;
-    if (n_actions > 0) {
-        rule->actions = xmemdup(actions, n_actions * sizeof *actions);
-    } else {
-        rule->actions = NULL;
-    }
-    rule->n_actions = n_actions;
-
-    error = ofproto->ofproto_class->rule_construct(rule);
-    if (error) {
-        ofproto_rule_destroy__(rule);
-        goto error;
-    }
-
-    *rulep = rule;
-    return 0;
-
-error:
-    VLOG_WARN_RL(&rl, "%s: failed to create rule (%s)",
-                 ofproto->name, strerror(error));
-    *rulep = NULL;
-    return error;
-}
-
 static void
 ofproto_rule_destroy__(struct rule *rule)
 {
@@ -1289,16 +1363,6 @@ ofproto_rule_destroy__(struct rule *rule)
     rule->ofproto->ofproto_class->rule_dealloc(rule);
 }
 
-/* Destroys 'rule' and removes it from the flow table and the datapath. */
-void
-ofproto_rule_destroy(struct rule *rule)
-{
-    if (rule) {
-        rule->ofproto->ofproto_class->rule_destruct(rule);
-        ofproto_rule_destroy__(rule);
-    }
-}
-
 /* Returns true if 'rule' has an OpenFlow OFPAT_OUTPUT or OFPAT_ENQUEUE action
  * that outputs to 'out_port' (output to OFPP_FLOOD and OFPP_ALL doesn't
  * count). */
@@ -1736,6 +1800,9 @@ collect_rules_loose(struct ofproto *ofproto, uint8_t table_id,
 
         cls_cursor_init(&cursor, cls, match);
         CLS_CURSOR_FOR_EACH (rule, cr, &cursor) {
+            if (rule->pending) {
+                return OFPROTO_POSTPONE;
+            }
             if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -1768,6 +1835,9 @@ collect_rules_strict(struct ofproto *ofproto, uint8_t table_id,
 
         rule = rule_from_cls_rule(classifier_find_rule_exactly(cls, match));
         if (rule) {
+            if (rule->pending) {
+                return OFPROTO_POSTPONE;
+            }
             if (!rule_is_hidden(rule) && rule_has_out_port(rule, out_port)) {
                 list_push_back(rules, &rule->ofproto_node);
             }
@@ -2009,7 +2079,27 @@ handle_queue_stats_request(struct ofconn *ofconn,
 
     return 0;
 }
-
+
+static bool
+is_flow_deletion_pending(const struct ofproto *ofproto,
+                         const struct cls_rule *cls_rule,
+                         uint8_t table_id)
+{
+    if (!hmap_is_empty(&ofproto->deletions)) {
+        struct ofoperation *op;
+
+        HMAP_FOR_EACH_WITH_HASH (op, hmap_node,
+                                 cls_rule_hash(cls_rule, table_id),
+                                 &ofproto->deletions) {
+            if (cls_rule_equal(cls_rule, &op->rule->cr)) {
+                return true;
+            }
+        }
+    }
+
+    return false;
+}
+
 /* Implements OFPFC_ADD and the cases for OFPFC_MODIFY and OFPFC_MODIFY_STRICT
  * in which no matching flow already exists in the flow table.
  *
@@ -2020,59 +2110,96 @@ handle_queue_stats_request(struct ofconn *ofconn,
  * 'ofconn' is used to retrieve the packet buffer specified in ofm->buffer_id,
  * if any. */
 static int
-add_flow(struct ofconn *ofconn, struct flow_mod *fm)
+add_flow(struct ofproto *ofproto, struct ofconn *ofconn, struct flow_mod *fm,
+         const struct ofp_header *request)
 {
-    struct ofproto *p = ofconn_get_ofproto(ofconn);
-    struct ofpbuf *packet;
+    struct classifier *table;
+    struct ofopgroup *group;
+    struct rule *victim;
     struct rule *rule;
-    uint16_t in_port;
-    int buf_err;
     int error;
 
+    /* Check for overlap, if requested. */
     if (fm->flags & OFPFF_CHECK_OVERLAP) {
         struct classifier *cls;
 
-        FOR_EACH_MATCHING_TABLE (cls, fm->table_id, p) {
+        FOR_EACH_MATCHING_TABLE (cls, fm->table_id, ofproto) {
             if (classifier_rule_overlaps(cls, &fm->cr)) {
                 return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_OVERLAP);
             }
         }
     }
 
-    buf_err = ofconn_pktbuf_retrieve(ofconn, fm->buffer_id, &packet, &in_port);
-    error = rule_create(p, &fm->cr, fm->table_id, fm->actions, fm->n_actions,
-                        fm->idle_timeout, fm->hard_timeout, fm->cookie,
-                        fm->flags & OFPFF_SEND_FLOW_REM, &rule);
-    if (error) {
-        ofpbuf_delete(packet);
-        return error;
+    /* Pick table. */
+    if (fm->table_id == 0xff) {
+        uint8_t table_id;
+        if (ofproto->n_tables > 1) {
+            error = ofproto->ofproto_class->rule_choose_table(ofproto, &fm->cr,
+                                                              &table_id);
+            if (error) {
+                return error;
+            }
+            assert(table_id < ofproto->n_tables);
+            table = &ofproto->tables[table_id];
+        } else {
+            table = &ofproto->tables[0];
+        }
+    } else if (fm->table_id < ofproto->n_tables) {
+        table = &ofproto->tables[fm->table_id];
+    } else {
+        return ofp_mkerr_nicira(OFPET_FLOW_MOD_FAILED, NXFMFC_BAD_TABLE_ID);
     }
 
-    if (packet) {
-        assert(!buf_err);
-        return rule_execute(rule, in_port, packet);
+    /* Serialize against pending deletion. */
+    if (is_flow_deletion_pending(ofproto, &fm->cr, table - ofproto->tables)) {
+        return OFPROTO_POSTPONE;
     }
-    return buf_err;
-}
 
-static int
-send_buffered_packet(struct ofconn *ofconn,
-                     struct rule *rule, uint32_t buffer_id)
-{
-    struct ofpbuf *packet;
-    uint16_t in_port;
-    int error;
+    /* Allocate new rule. */
+    rule = ofproto->ofproto_class->rule_alloc();
+    if (!rule) {
+        VLOG_WARN_RL(&rl, "%s: failed to create rule (%s)",
+                     ofproto->name, strerror(error));
+        return ENOMEM;
+    }
+    rule->ofproto = ofproto;
+    rule->cr = fm->cr;
+    rule->pending = NULL;
+    rule->flow_cookie = fm->cookie;
+    rule->created = time_msec();
+    rule->idle_timeout = fm->idle_timeout;
+    rule->hard_timeout = fm->hard_timeout;
+    rule->table_id = table - ofproto->tables;
+    rule->send_flow_removed = (fm->flags & OFPFF_SEND_FLOW_REM) != 0;
+    rule->actions = ofputil_actions_clone(fm->actions, fm->n_actions);
+    rule->n_actions = fm->n_actions;
+
+    /* Insert new rule. */
+    victim = rule_from_cls_rule(classifier_replace(table, &rule->cr));
+    if (victim && victim->pending) {
+        error = OFPROTO_POSTPONE;
+    } else {
+        group = ofopgroup_create_for_ofconn(ofconn, request, fm->buffer_id);
+        ofoperation_create(group, rule, OFOPERATION_ADD);
+        rule->pending->victim = victim;
 
-    if (buffer_id == UINT32_MAX) {
-        return 0;
+        error = ofproto->ofproto_class->rule_construct(rule);
+        if (error) {
+            ofoperation_destroy(rule->pending);
+        }
+        ofopgroup_submit(group);
     }
 
-    error = ofconn_pktbuf_retrieve(ofconn, buffer_id, &packet, &in_port);
+    /* Back out if an error occurred. */
     if (error) {
-        return error;
+        if (victim) {
+            classifier_replace(table, &victim->cr);
+        } else {
+            classifier_remove(table, &rule->cr);
+        }
+        ofproto_rule_destroy__(rule);
     }
-
-    return rule_execute(rule, in_port, packet);
+    return error;
 }
 
 /* OFPFC_MODIFY and OFPFC_MODIFY_STRICT. */
@@ -2086,42 +2213,25 @@ send_buffered_packet(struct ofconn *ofconn,
  * Returns 0 on success, otherwise an OpenFlow error code. */
 static int
 modify_flows__(struct ofconn *ofconn, const struct flow_mod *fm,
-               struct list *rules)
+               const struct ofp_header *request, struct list *rules)
 {
-    struct rule *match;
+    struct ofopgroup *group;
     struct rule *rule;
-    int error;
 
-    error = 0;
-    match = NULL;
+    group = ofopgroup_create_for_ofconn(ofconn, request, fm->buffer_id);
     LIST_FOR_EACH (rule, ofproto_node, rules) {
         if (!ofputil_actions_equal(fm->actions, fm->n_actions,
                                    rule->actions, rule->n_actions)) {
-            int retval;
-
-            retval = rule->ofproto->ofproto_class->rule_modify_actions(
-                rule, fm->actions, fm->n_actions);
-            if (!retval) {
-                match = rule;
-                free(rule->actions);
-                rule->actions = ofputil_actions_clone(fm->actions,
-                                                      fm->n_actions);
-                rule->n_actions = fm->n_actions;
-            } else if (!error) {
-                error = retval;
-            }
+            ofoperation_create(group, rule, OFOPERATION_MODIFY);
+            rule->actions = ofputil_actions_clone(fm->actions, fm->n_actions);
+            rule->n_actions = fm->n_actions;
+            rule->ofproto->ofproto_class->rule_modify_actions(rule);
         }
         rule->flow_cookie = fm->cookie;
     }
+    ofopgroup_submit(group);
 
-    if (!error && match) {
-        /* This credits the packet to whichever flow happened to match last.
-         * That's weird.  Maybe we should do a lookup for the flow that
-         * actually matches the packet?  Who knows. */
-        send_buffered_packet(ofconn, match, fm->buffer_id);
-    }
-
-    return error;
+    return 0;
 }
 
 /* Implements OFPFC_MODIFY.  Returns 0 on success or an OpenFlow error code as
@@ -2130,7 +2240,8 @@ modify_flows__(struct ofconn *ofconn, const struct flow_mod *fm,
  * 'ofconn' is used to retrieve the packet buffer specified in fm->buffer_id,
  * if any. */
 static int
-modify_flows_loose(struct ofconn *ofconn, struct flow_mod *fm)
+modify_flows_loose(struct ofconn *ofconn, struct flow_mod *fm,
+                   const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct list rules;
@@ -2138,8 +2249,8 @@ modify_flows_loose(struct ofconn *ofconn, struct flow_mod *fm)
 
     error = collect_rules_loose(p, fm->table_id, &fm->cr, OFPP_NONE, &rules);
     return (error ? error
-            : list_is_empty(&rules) ? add_flow(ofconn, fm)
-            : modify_flows__(ofconn, fm, &rules));
+            : list_is_empty(&rules) ? add_flow(p, ofconn, fm, request)
+            : modify_flows__(ofconn, fm, request, &rules));
 }
 
 /* Implements OFPFC_MODIFY_STRICT.  Returns 0 on success or an OpenFlow error
@@ -2148,7 +2259,8 @@ modify_flows_loose(struct ofconn *ofconn, struct flow_mod *fm)
  * 'ofconn' is used to retrieve the packet buffer specified in fm->buffer_id,
  * if any. */
 static int
-modify_flow_strict(struct ofconn *ofconn, struct flow_mod *fm)
+modify_flow_strict(struct ofconn *ofconn, struct flow_mod *fm,
+                   const struct ofp_header *request)
 {
     struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct list rules;
@@ -2156,8 +2268,9 @@ modify_flow_strict(struct ofconn *ofconn, struct flow_mod *fm)
 
     error = collect_rules_strict(p, fm->table_id, &fm->cr, OFPP_NONE, &rules);
     return (error ? error
-            : list_is_empty(&rules) ? add_flow(ofconn, fm)
-            : list_is_singleton(&rules) ? modify_flows__(ofconn, fm, &rules)
+            : list_is_empty(&rules) ? add_flow(p, ofconn, fm, request)
+            : list_is_singleton(&rules) ? modify_flows__(ofconn, fm, request,
+                                                         &rules)
             : 0);
 }
 
@@ -2167,43 +2280,56 @@ modify_flow_strict(struct ofconn *ofconn, struct flow_mod *fm)
  *
  * Returns 0 on success, otherwise an OpenFlow error code. */
 static int
-delete_flows__(struct list *rules)
+delete_flows__(struct ofconn *ofconn, const struct ofp_header *request,
+               struct list *rules)
 {
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct rule *rule, *next;
+    struct ofopgroup *group;
 
+    group = ofopgroup_create_for_ofconn(ofconn, request, UINT32_MAX);
     LIST_FOR_EACH_SAFE (rule, next, ofproto_node, rules) {
         ofproto_rule_send_removed(rule, OFPRR_DELETE);
-        ofproto_rule_destroy(rule);
+
+        ofoperation_create(group, rule, OFOPERATION_DELETE);
+        classifier_remove(&ofproto->tables[rule->table_id], &rule->cr);
+        rule->ofproto->ofproto_class->rule_destruct(rule);
     }
+    ofopgroup_submit(group);
 
     return 0;
 }
 
 /* Implements OFPFC_DELETE. */
 static int
-delete_flows_loose(struct ofproto *p, const struct flow_mod *fm)
+delete_flows_loose(struct ofconn *ofconn, const struct flow_mod *fm,
+                   const struct ofp_header *request)
 {
+    struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct list rules;
     int error;
 
     error = collect_rules_loose(p, fm->table_id, &fm->cr, fm->out_port,
                                 &rules);
     return (error ? error
-            : !list_is_empty(&rules) ? delete_flows__(&rules)
+            : !list_is_empty(&rules) ? delete_flows__(ofconn, request, &rules)
             : 0);
 }
 
 /* Implements OFPFC_DELETE_STRICT. */
 static int
-delete_flow_strict(struct ofproto *p, struct flow_mod *fm)
+delete_flow_strict(struct ofconn *ofconn, struct flow_mod *fm,
+                   const struct ofp_header *request)
 {
+    struct ofproto *p = ofconn_get_ofproto(ofconn);
     struct list rules;
     int error;
 
     error = collect_rules_strict(p, fm->table_id, &fm->cr, fm->out_port,
                                  &rules);
     return (error ? error
-            : list_is_singleton(&rules) ? delete_flows__(&rules)
+            : list_is_singleton(&rules) ? delete_flows__(ofconn, request,
+                                                         &rules)
             : 0);
 }
 
@@ -2236,15 +2362,24 @@ ofproto_rule_send_removed(struct rule *rule, uint8_t reason)
 void
 ofproto_rule_expire(struct rule *rule, uint8_t reason)
 {
+    struct ofproto *ofproto = rule->ofproto;
+    struct ofopgroup *group;
+
     assert(reason == OFPRR_HARD_TIMEOUT || reason == OFPRR_IDLE_TIMEOUT);
+
     ofproto_rule_send_removed(rule, reason);
-    ofproto_rule_destroy(rule);
+
+    group = ofopgroup_create(ofproto);
+    ofoperation_create(group, rule, OFOPERATION_DELETE);
+    classifier_remove(&ofproto->tables[rule->table_id], &rule->cr);
+    rule->ofproto->ofproto_class->rule_destruct(rule);
+    ofopgroup_submit(group);
 }
 
 static int
 handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 {
-    struct ofproto *p = ofconn_get_ofproto(ofconn);
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     struct flow_mod fm;
     int error;
 
@@ -2253,6 +2388,10 @@ handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
         return error;
     }
 
+    if (list_size(&ofproto->pending) >= 50) {
+        return OFPROTO_POSTPONE;
+    }
+
     error = ofputil_decode_flow_mod(&fm, oh,
                                     ofconn_get_flow_mod_table_id(ofconn));
     if (error) {
@@ -2269,21 +2408,19 @@ handle_flow_mod(struct ofconn *ofconn, const struct ofp_header *oh)
 
     switch (fm.command) {
     case OFPFC_ADD:
-        return add_flow(ofconn, &fm);
+        return add_flow(ofproto, ofconn, &fm, oh);
 
     case OFPFC_MODIFY:
-        return modify_flows_loose(ofconn, &fm);
+        return modify_flows_loose(ofconn, &fm, oh);
 
     case OFPFC_MODIFY_STRICT:
-        return modify_flow_strict(ofconn, &fm);
+        return modify_flow_strict(ofconn, &fm, oh);
 
     case OFPFC_DELETE:
-        delete_flows_loose(p, &fm);
-        return 0;
+        return delete_flows_loose(ofconn, &fm, oh);
 
     case OFPFC_DELETE_STRICT:
-        delete_flow_strict(p, &fm);
-        return 0;
+        return delete_flow_strict(ofconn, &fm, oh);
 
     default:
         if (fm.command > 0xff) {
@@ -2316,6 +2453,11 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
     }
 
+    if (ofconn_get_role(ofconn) != role
+        && ofconn_has_pending_opgroups(ofconn)) {
+        return OFPROTO_POSTPONE;
+    }
+
     ofconn_set_role(ofconn, role);
 
     reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
@@ -2344,13 +2486,18 @@ handle_nxt_set_flow_format(struct ofconn *ofconn, const struct ofp_header *oh)
     uint32_t format;
 
     format = ntohl(msg->format);
-    if (format == NXFF_OPENFLOW10
-        || format == NXFF_NXM) {
-        ofconn_set_flow_format(ofconn, format);
-        return 0;
-    } else {
+    if (format != NXFF_OPENFLOW10 && format != NXFF_NXM) {
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
     }
+
+    if (format != ofconn_get_flow_format(ofconn)
+        && ofconn_has_pending_opgroups(ofconn)) {
+        /* Avoid sending async messages in surprising flow format. */
+        return OFPROTO_POSTPONE;
+    }
+
+    ofconn_set_flow_format(ofconn, format);
+    return 0;
 }
 
 static int
@@ -2359,8 +2506,10 @@ handle_barrier_request(struct ofconn *ofconn, const struct ofp_header *oh)
     struct ofp_header *ob;
     struct ofpbuf *buf;
 
-    /* Currently, everything executes synchronously, so we can just
-     * immediately send the barrier reply. */
+    if (ofconn_has_pending_opgroups(ofconn)) {
+        return OFPROTO_POSTPONE;
+    }
+
     ob = make_openflow_xid(sizeof *ob, OFPT_BARRIER_REPLY, oh->xid, &buf);
     ofconn_send_reply(ofconn, buf);
     return 0;
@@ -2477,14 +2626,219 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     }
 }
 
-static void
+static bool
 handle_openflow(struct ofconn *ofconn, struct ofpbuf *ofp_msg)
 {
     int error = handle_openflow__(ofconn, ofp_msg);
-    if (error) {
+    if (error && error != OFPROTO_POSTPONE) {
         ofconn_send_error(ofconn, ofp_msg->data, error);
     }
     COVERAGE_INC(ofproto_recv_openflow);
+    return error != OFPROTO_POSTPONE;
+}
+
+/* Asynchronous operations. */
+
+/* Creates and returns a new ofopgroup that is not associated with any
+ * OpenFlow connection.
+ *
+ * The caller should add operations to the returned group with
+ * ofoperation_create() and then submit it with ofopgroup_submit(). */
+static struct ofopgroup *
+ofopgroup_create(struct ofproto *ofproto)
+{
+    struct ofopgroup *group = xzalloc(sizeof *group);
+    group->ofproto = ofproto;
+    list_init(&group->ofproto_node);
+    list_init(&group->ops);
+    list_init(&group->ofconn_node);
+    return group;
+}
+
+/* Creates and returns a new ofopgroup that is associated with 'ofconn'.  If
+ * the ofopgroup eventually fails, then the error reply will include 'request'.
+ * If the ofopgroup eventually succeeds, then the packet with buffer id
+ * 'buffer_id' on 'ofconn' will be sent by 'ofconn''s ofproto.
+ *
+ * The caller should add operations to the returned group with
+ * ofoperation_create() and then submit it with ofopgroup_submit(). */
+static struct ofopgroup *
+ofopgroup_create_for_ofconn(struct ofconn *ofconn,
+                            const struct ofp_header *request,
+                            uint32_t buffer_id)
+{
+    struct ofopgroup *group = ofopgroup_create(ofconn_get_ofproto(ofconn));
+    size_t request_len = ntohs(request->length);
+
+    ofconn_add_opgroup(ofconn, &group->ofconn_node);
+    group->ofconn = ofconn;
+    group->request = xmemdup(request, MIN(request_len, 64));
+    group->buffer_id = buffer_id;
+
+    return group;
+}
+
+/* Submits 'group' for processing.
+ *
+ * If 'group' contains no operations (e.g. none were ever added, or all of the
+ * ones that were added completed synchronously), then it is destroyed
+ * immediately.  Otherwise it is added to the ofproto's list of pending
+ * groups. */
+static void
+ofopgroup_submit(struct ofopgroup *group)
+{
+    if (list_is_empty(&group->ops)) {
+        ofopgroup_destroy(group);
+    } else {
+        list_push_back(&group->ofproto->pending, &group->ofproto_node);
+    }
+}
+
+static void
+ofopgroup_destroy(struct ofopgroup *group)
+{
+    assert(list_is_empty(&group->ops));
+    if (!list_is_empty(&group->ofproto_node)) {
+        list_remove(&group->ofproto_node);
+    }
+    if (!list_is_empty(&group->ofconn_node)) {
+        list_remove(&group->ofconn_node);
+        if (group->error) {
+            ofconn_send_error(group->ofconn, group->request, group->error);
+        }
+        connmgr_retry(group->ofproto->connmgr);
+    }
+    free(group->request);
+    free(group);
+}
+
+/* Initiates a new operation on 'rule', of the specified 'type', within
+ * 'group'.  Prior to calling, 'rule' must not have any pending operation. */
+static void
+ofoperation_create(struct ofopgroup *group, struct rule *rule,
+                   enum ofoperation_type type)
+{
+    struct ofoperation *op;
+
+    assert(!rule->pending);
+
+    op = rule->pending = xzalloc(sizeof *op);
+    op->group = group;
+    list_push_back(&group->ops, &op->group_node);
+    op->rule = rule;
+    op->type = type;
+    op->status = -1;
+    op->flow_cookie = rule->flow_cookie;
+
+    if (type == OFOPERATION_DELETE) {
+        hmap_insert(&op->group->ofproto->deletions, &op->hmap_node,
+                    cls_rule_hash(&rule->cr, rule->table_id));
+    }
+}
+
+static void
+ofoperation_destroy(struct ofoperation *op)
+{
+    struct ofopgroup *group = op->group;
+
+    if (op->rule) {
+        op->rule->pending = NULL;
+    }
+    if (op->type == OFOPERATION_DELETE) {
+        hmap_remove(&group->ofproto->deletions, &op->hmap_node);
+    }
+    list_remove(&op->group_node);
+    free(op->actions);
+    free(op);
+
+    if (list_is_empty(&group->ops) && !list_is_empty(&group->ofproto_node)) {
+        ofopgroup_destroy(group);
+    }
+}
+
+/* Indicates that 'op' completed with status 'error', which is either 0 to
+ * indicate success or an OpenFlow error code (constructed with
+ * e.g. ofp_mkerr()).
+ *
+ * If 'op' is a "delete flow" operation, 'error' must be 0.  That is, flow
+ * deletions are not allowed to fail.
+ *
+ * Please see the large comment in ofproto/private.h titled "Asynchronous
+ * Operation Support" for more information. */
+void
+ofoperation_complete(struct ofoperation *op, int error)
+{
+    struct ofopgroup *group = op->group;
+    struct rule *rule = op->rule;
+    struct classifier *table = &rule->ofproto->tables[rule->table_id];
+
+    assert(rule->pending == op);
+    assert(op->status < 0);
+    assert(error >= 0);
+
+    if (!error
+        && !group->error
+        && op->type != OFOPERATION_DELETE
+        && group->ofconn
+        && group->buffer_id != UINT32_MAX
+        && list_is_singleton(&op->group_node)) {
+        struct ofpbuf *packet;
+        uint16_t in_port;
+
+        error = ofconn_pktbuf_retrieve(group->ofconn, group->buffer_id,
+                                       &packet, &in_port);
+        if (!error) {
+            error = rule_execute(rule, in_port, packet);
+        }
+    }
+    if (!group->error) {
+        group->error = error;
+    }
+
+    switch (op->type) {
+    case OFOPERATION_ADD:
+        if (!error) {
+            if (op->victim) {
+                ofproto_rule_destroy__(op->victim);
+            }
+        } else {
+            if (op->victim) {
+                classifier_replace(table, &op->victim->cr);
+                op->victim = NULL;
+            } else {
+                classifier_remove(table, &rule->cr);
+            }
+            ofproto_rule_destroy__(rule);
+        }
+        op->victim = NULL;
+        break;
+
+    case OFOPERATION_DELETE:
+        assert(!error);
+        ofproto_rule_destroy__(rule);
+        op->rule = NULL;
+        break;
+
+    case OFOPERATION_MODIFY:
+        if (error) {
+            free(rule->actions);
+            rule->actions = op->actions;
+            rule->n_actions = op->n_actions;
+            op->actions = NULL;
+        }
+        break;
+
+    default:
+        NOT_REACHED();
+    }
+    ofoperation_destroy(op);
+}
+
+struct rule *
+ofoperation_get_victim(struct ofoperation *op)
+{
+    assert(op->type == OFOPERATION_ADD);
+    return op->victim;
 }
 
 static uint64_t
diff --git a/ofproto/private.h b/ofproto/private.h
index d5f1000..17166d4 100644
--- a/ofproto/private.h
+++ b/ofproto/private.h
@@ -55,6 +55,11 @@ struct ofproto {
 
     /* OpenFlow connections. */
     struct connmgr *connmgr;
+
+    /* Flow table operation tracking. */
+    int state;                  /* Internal state. */
+    struct list pending;        /* List of "struct ofopgroup"s. */
+    struct hmap deletions;      /* All OFOPERATION_DELETE "ofoperation"s. */
 };
 
 struct ofproto *ofproto_lookup(const char *name);
@@ -81,6 +86,8 @@ struct rule {
     struct list ofproto_node;    /* Owned by ofproto base code. */
     struct cls_rule cr;          /* In owning ofproto's classifier. */
 
+    struct ofoperation *pending; /* Operation now in progress, if nonnull. */
+
     ovs_be64 flow_cookie;        /* Controller-issued identifier. */
 
     long long int created;       /* Creation time. */
@@ -102,6 +109,9 @@ rule_from_cls_rule(const struct cls_rule *cls_rule)
 void ofproto_rule_expire(struct rule *, uint8_t reason);
 void ofproto_rule_destroy(struct rule *);
 
+void ofoperation_complete(struct ofoperation *, int status);
+struct rule *ofoperation_get_victim(struct ofoperation *);
+
 /* ofproto class structure, to be defined by each ofproto implementation.
  *
  *
@@ -507,6 +517,8 @@ struct ofproto_class {
 /* ## OpenFlow Rule Functions ## */
 /* ## ----------------------- ## */
 
+
+
     /* Chooses an appropriate table for 'cls_rule' within 'ofproto'.  On
      * success, stores the table ID into '*table_idp' and returns 0.  On
      * failure, returns an OpenFlow error code (as returned by ofp_mkerr()).
@@ -528,51 +540,131 @@ struct ofproto_class {
 
     /* Life-cycle functions for a "struct rule" (see "Life Cycle" above).
      *
-     * ->rule_construct() should first check whether the rule is acceptable:
      *
-     *   - Validate that the matching rule in 'rule->cr' is supported by the
-     *     datapath.  If not, then return an OpenFlow error code (as returned
-     *     by ofp_mkerr()).
+     * Asynchronous Operation Support
+     * ==============================
+     *
+     * The life-cycle operations on rules can operate asynchronously, meaning
+     * that ->rule_construct() and ->rule_destruct() only need to initiate
+     * their respective operations and do not need to wait for them to complete
+     * before they return.  ->rule_modify_actions() also operates
+     * asynchronously.
+     *
+     * An ofproto implementation reports the success or failure of an
+     * asynchronous operation on a rule using the rule's 'pending' member,
+     * which points to a opaque "struct ofoperation" that represents the
+     * ongoing opreation.  When the operation completes, the ofproto
+     * implementation calls ofoperation_complete(), passing the ofoperation and
+     * an error indication.
+     *
+     * Only the following contexts may call ofoperation_complete():
+     *
+     *   - The function called to initiate the operation,
+     *     e.g. ->rule_construct() or ->rule_destruct().  This is the best
+     *     choice if the operation completes quickly.
+     *
+     *   - The implementation's ->run() function.  This is the only choice for
+     *     an operation that is truly asynchronous.
+     *
+     * The ofproto base code updates the flow table optimistically, assuming
+     * that the operation will probably succeed:
+     *
+     *   - ofproto adds or replaces the rule in the flow table before calling
+     *     ->rule_construct().
+     *
+     *   - ofproto updates the rule's actions before calling
+     *     ->rule_modify_actions().
+     *
+     *   - ofproto removes the rule before calling ->rule_destruct().
+     *
+     * With one exception, when an asynchronous operation completes with an
+     * error, ofoperation_complete() backs out the already applied changes:
+     *
+     *   - If adding or replacing a rule in the flow table fails, ofproto
+     *     removes the new rule or restores the original rule.
+     *
+     *   - If modifying a rule's actions fails, ofproto restores the original
+     *     actions.
+     *
+     *   - Removing a rule is not allowed to fail.  It must always succeed.
+     *
+     * The ofproto base code serializes operations: if any operation is in
+     * progress on a given rule, ofproto postpones initiating any new operation
+     * on that rule until the pending operation completes.  Therefore, every
+     * operation must eventually complete through a call to
+     * ofoperation_complete() to avoid delaying new operations indefinitely
+     * (including any OpenFlow request that affects the rule in question, even
+     * just to query its statistics).
      *
-     *     For example, if the datapath does not support registers, then it
-     *     should return an error if 'rule->cr' does not wildcard all
+     *
+     * Construction
+     * ============
+     *
+     * When ->rule_construct() is called, the caller has already inserted
+     * 'rule' into 'rule->ofproto''s flow table numbered 'rule->table_id'.
+     * There are two cases:
+     *
+     *   - 'rule' is a new rule in its flow table.  In this case,
+     *     ofoperation_get_victim(rule) returns NULL.
+     *
+     *   - 'rule' is replacing an existing rule in its flow table that had the
+     *     same matching criteria and priority.  In this case,
+     *     ofoperation_get_victim(rule) returns the rule being replaced.
+     *
+     * ->rule_construct() should set the following in motion:
+     *
+     *   - Validate that the matching rule in 'rule->cr' is supported by the
+     *     datapath.  For example, if the rule's table does not support
+     *     registers, then it is an error if 'rule->cr' does not wildcard all
      *     registers.
      *
      *   - Validate that 'rule->actions' and 'rule->n_actions' are well-formed
-     *     OpenFlow actions that can be correctly implemented by the datapath.
-     *     If not, then return an OpenFlow error code (as returned by
-     *     ofp_mkerr()).
-     *
-     *     The validate_actions() function (in ofp-util.c) can be useful as a
-     *     model for action validation, but it accepts all of the OpenFlow
-     *     actions that OVS understands.  If your ofproto implementation only
+     *     OpenFlow actions that the datapath can correctly implement.  The
+     *     validate_actions() function (in ofp-util.c) can be useful as a model
+     *     for action validation, but it accepts all of the OpenFlow actions
+     *     that OVS understands.  If your ofproto implementation only
      *     implements a subset of those, then you should implement your own
      *     action validation.
      *
-     * If the rule is acceptable, then ->rule_construct() should modify the
-     * flow table:
+     *   - If the rule is valid, update the datapath flow table, adding the new
+     *     rule or replacing the existing one.
      *
-     *   - If there was already a rule with exactly the same matching criteria
-     *     and priority in the classifier, then it should destroy it (with
-     *     ofproto_rule_destroy()).
+     * (On failure, the ofproto code will roll back the insertion from the flow
+     * table, either removing 'rule' or replacing it by the flow that was
+     * originally in its place.)
      *
-     *     To the greatest extent possible, the old rule should be destroyed
-     *     only if inserting the new rule succeeds; that is, ->rule_construct()
-     *     should be transactional.
+     * ->rule_construct() must act in one of the following ways:
      *
-     *     The function classifier_find_rule_exactly() can locate such a rule.
+     *   - If it succeeds, it must call ofoperation_complete() and return 0.
      *
-     *   - Insert the new rule into the ofproto's 'cls' classifier, and into
-     *     the datapath flow table.
+     *   - If it fails, it must act in one of the following ways:
      *
-     *     The function classifier_insert() inserts a rule into the classifier.
+     *       * Call ofoperation_complete() and return 0.
      *
-     * Other than inserting 'rule->cr' into the classifier, ->rule_construct()
-     * should not modify any base members of struct rule.
+     *       * Return an OpenFlow error code (as returned by ofp_mkerr()).  (Do
+     *         not call ofoperation_complete() in this case.)
      *
-     * ->rule_destruct() should remove 'rule' from the ofproto's 'cls'
-     * classifier (e.g. with classifier_remove()) and from the datapath flow
-     * table. */
+     *     In the former case, ->rule_destruct() will be called; in the latter
+     *     case, it will not.  ->rule_dealloc() will be called in either case.
+     *
+     *   - If the operation is only partially complete, then it must return 0.
+     *     Later, when the operation is complete, the ->run() function must
+     *     call ofoperation_complete() to report success or failure.
+     *
+     * ->rule_construct() should not modify any base members of struct rule.
+     *
+     *
+     * Destruction
+     * ===========
+     *
+     * When ->rule_destruct() is called, the caller has already removed 'rule'
+     * from 'rule->ofproto''s flow table.  ->rule_destruct() should set in
+     * motion removing 'rule' from the datapath flow table.  If removal
+     * completes synchronously, it should call ofoperation_complete().
+     * Otherwise, the ->run() function must later call ofoperation_complete()
+     * after the operation completes.
+     *
+     * Rule destruction must not fail. */
     struct rule *(*rule_alloc)(void);
     int (*rule_construct)(struct rule *rule);
     void (*rule_destruct)(struct rule *rule);
@@ -602,16 +694,28 @@ struct ofproto_class {
     int (*rule_execute)(struct rule *rule, struct flow *flow,
                         struct ofpbuf *packet);
 
-    /* Validates that the 'n' elements in 'actions' are well-formed OpenFlow
-     * actions that can be correctly implemented by the datapath.  If not, then
-     * return an OpenFlow error code (as returned by ofp_mkerr()).  If so,
-     * then update the datapath to implement the new actions and return 0.
+    /* When ->rule_modify_actions() is called, the caller has already replaced
+     * the OpenFlow actions in 'rule' by a new set.  (The original actions are
+     * in rule->pending->actions.)
+     *
+     * ->rule_modify_actions() should set the following in motion:
+     *
+     *   - Validate that the actions now in 'rule' are well-formed OpenFlow
+     *     actions that the datapath can correctly implement.
+     *
+     *   - Update the datapath flow table with the new actions.
+     *
+     * If the operation synchronously completes, ->rule_modify_actions() may
+     * call ofoperation_complete() before it returns.  Otherwise, ->run()
+     * should call ofoperation_complete() later, after the operation does
+     * complete.
+     *
+     * If the operation fails, then the base ofproto code will restore the
+     * original 'actions' and 'n_actions' of 'rule'.
      *
-     * When this function runs, 'rule' still has its original actions.  If this
-     * function returns 0, then the caller will update 'rule' with the new
-     * actions and free the old ones. */
-    int (*rule_modify_actions)(struct rule *rule,
-                               const union ofp_action *actions, size_t n);
+     * ->rule_modify_actions() should not modify any base members of struct
+     * rule. */
+    void (*rule_modify_actions)(struct rule *rule);
 
     /* These functions implement the OpenFlow IP fragment handling policy.  By
      * default ('drop_frags' == false), an OpenFlow switch should treat IP
@@ -754,7 +858,7 @@ int ofproto_class_unregister(const struct ofproto_class *);
 
 void ofproto_add_flow(struct ofproto *, const struct cls_rule *,
                       const union ofp_action *, size_t n_actions);
-void ofproto_delete_flow(struct ofproto *, const struct cls_rule *);
+bool ofproto_delete_flow(struct ofproto *, const struct cls_rule *);
 void ofproto_flush_flows(struct ofproto *);
 
 #endif /* ofproto/private.h */
-- 
1.7.4.4




More information about the dev mailing list