[ovs-dev] [PATCH 41/41] [RFC] Implement "closures".

Ben Pfaff blp at ovn.org
Tue Jan 19 07:27:28 UTC 2016


One purpose of OpenFlow packet-in messages is to allow a controller to
interpose on the path of a packet through the flow tables.  If, for
example, the controller needs to modify a packet in some way that the
switch doesn't directly support, the controller should be able to
program the switch to send it the packet, then modify the packet and
send it back to the switch to continue through the flow table.

That's the theory.  In practice, this doesn't work with any but the
simplest flow tables.  Packet-in messages simply don't include enough
context to allow the flow table traversal to continue.  For example:

    * Via "resubmit" actions, an Open vSwitch packet can have an
      effective "call stack", but a packet-in can't describe it, and
      so it would be lost.

    * Via "patch ports", an Open vSwitch packet can traverse multiple
      OpenFlow logical switches.  A packet-in can't describe or resume
      this context.

    * A packet-in can't preserve the stack used by NXAST_PUSH and
      NXAST_POP actions.

    * A packet-in can't preserve the OpenFlow 1.1+ action set.

    * A packet-in can't preserve the state of Open vSwitch mirroring
      or connection tracking.

This commit introduces a solution called "closures".  A closure is the
state of a packet's traversal through OpenFlow flow tables.  A new
NXAST_PAUSE action generates a closure and sends it to the OpenFlow
controller as an NXT_CLOSURE asynchronous message (which must be
enabled through an extension to OFPT_SET_ASYNC or NXT_SET_ASYNC2).
The controller processes the closure, possibly modifying some of its
publicly accessible data (for now, the packet and its metadata), and
sends it back to the switch with an NXT_RESUME request, which
causes flow table traversal to continue.  In principle, a single
packet can be paused and resumed multiple times.

This RFC patch has a number of caveats:

    * No real tests yet.

    * Needs more and better documentation.

    * Flow statistics are not yet correctly updated.

Signed-off-by: Ben Pfaff <blp at ovn.org>
---
 NEWS                          |   4 +
 include/openflow/nicira-ext.h |  79 +++++++++-
 lib/meta-flow.c               |   9 +-
 lib/meta-flow.h               |   3 +-
 lib/ofp-actions.c             |  73 ++++++++-
 lib/ofp-actions.h             |   9 +-
 lib/ofp-errors.h              |  10 +-
 lib/ofp-msgs.h                |   8 +
 lib/ofp-print.c               |  68 +++++++++
 lib/ofp-util.c                | 341 ++++++++++++++++++++++++++++++++++++++++++
 lib/ofp-util.h                |  56 ++++++-
 lib/rconn.c                   |   4 +-
 ofproto/connmgr.c             |  65 ++++++--
 ofproto/connmgr.h             |   3 +
 ofproto/ofproto-dpif-xlate.c  | 112 ++++++++++++--
 ofproto/ofproto-dpif-xlate.h  |   4 +
 ofproto/ofproto-dpif.c        |  32 ++++
 ofproto/ofproto-provider.h    |   5 +-
 ofproto/ofproto.c             |  25 ++++
 tests/ofp-print.at            |   6 +
 tests/ofproto-dpif.at         |  24 +++
 tests/ofproto.at              |   2 +
 utilities/ovs-ofctl.c         |  59 +++++++-
 vswitchd/vswitch.xml          |  17 ++-
 24 files changed, 965 insertions(+), 53 deletions(-)

diff --git a/NEWS b/NEWS
index 5c18867..b264701 100644
--- a/NEWS
+++ b/NEWS
@@ -5,6 +5,10 @@ Post-v2.5.0
    - OpenFlow:
      * OpenFlow 1.1+ OFPT_QUEUE_GET_CONFIG_REQUEST now supports OFPP_ANY.
      * OpenFlow 1.4+ OFPMP_QUEUE_DESC is now supported.
+     * New extension message NXT_SET_ASYNC_CONFIG2 to allow OpenFlow 1.4-like
+       control over asynchronous messages in earlier versions of OpenFlow.
+     * New "closure" extension to pause and resume flow table traversal.
+       See NXAST_PAUSE, NXT_CLOSURE, and NXT_RESUME for more information.
    - ovs-ofctl:
      * queue-get-config command now allows a queue ID to be specified.
 
diff --git a/include/openflow/nicira-ext.h b/include/openflow/nicira-ext.h
index dad8707..78e0a56 100644
--- a/include/openflow/nicira-ext.h
+++ b/include/openflow/nicira-ext.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -246,6 +246,83 @@ struct nx_packet_in {
 };
 OFP_ASSERT(sizeof(struct nx_packet_in) == 24);
 
+/* NXT_CLOSURE.
+ *
+ * A closure serializes the state of a packet's trip through Open vSwitch flow
+ * tables.  This permits an OpenFlow controller to interpose on a packet midway
+ * through processing in Open vSwitch.
+ *
+ * Closures fit into packet processing this way:
+ *
+ * 1. A packet ingresses into Open vSwitch, which runs it through the OpenFlow
+ *    tables.
+ *
+ * 2. An OpenFlow flow executes a NXAST_PAUSE action.  Open vSwitch serializes
+ *    the packet processing state into a closure and sends it (as an
+ *    NXT_CLOSURE) to the OpenFlow controller.
+ *
+ * 3. The controller receives the NXT_CLOSURE and processes it.  The controller
+ *    can interpret and, if desired, modify some of the contents of the
+ *    closure, such as the packet and the metadata being processed.
+ *
+ * 4. The controller sends the NXT_CLOSURE back to the switch, using an
+ *    NXAST_RESUME action encapsulated in an OFPT_PACKET_OUT message.  Packet
+ *    processing resumes where it left off.
+ *
+ * External side effects (e.g. "output") of OpenFlow actions processed before
+ * NXAST_PAUSE is encountered might be executed during step 2 or step 4, and
+ * the details may vary among Open vSwitch features and versions.  Thus, a
+ * controller that wants to make sure that side effects are executed must pass
+ * the closure back to the switch, that is, must not skip step 4.
+ *
+ * Closures may be "stateful" or "stateless", that is, they may or may not
+ * refer to buffered state maintained in Open vSwitch.  This means that a
+ * controller should not attempt to resume a given closure more than once
+ * (because the switch might have discarded the buffered state after the first
+ * use).  For the same reason, closures might become "stale" if the controller
+ * takes too long to resume them (beacuse the switch might have discarded old
+ * buffered state).  Taken together with the previous note, this means that a
+ * controller should resume each closure exactly once (and promptly).
+ *
+ * NXT_CLOSURE is something like OFPT_PACKET_IN (if OFPT_PACKET_IN were
+ * extensible then it would make sense to implement NXT_CLOSURE as an
+ * extension).  The important difference is the extra information that captures
+ * the state of the pipeline.  Without this information, the controller can
+ * (with careful design, and help from the flow cookie) determine where the
+ * packet is in the pipeline, but in the general case it can't determine what
+ * nested "resubmit"s that may be in progress, or what data is on the stack
+ * maintained by NXAST_STACK_PUSH and NXAST_STACK_POP actions, what is in the
+ * OpenFlow action set, etc.
+ *
+ * Closures are expensive because they require a round trip between the switch
+ * and the controller.  Thus, they should not be used to implement processing
+ * that needs to happen at "line rate".
+ *
+ * Closures are serialized as type-level-value properties in the format
+ * commonly used in OpenFlow 1.4 and later.  Some properties, with 'type'
+ * values less than 0x8000, are meant to be interpreted by the controller, and
+ * are documented here.  Other properties, with 'type' values of 0x8000 or
+ * greater, are private to the switch, may change unpredictably from one
+ * version of Open vSwitch to another, and are not documented here.
+ */
+
+enum nx_closure_prop_type {
+    /* Public properties. */
+    NXCPT_PACKET,               /* Raw packet data. */
+    NXCPT_METADATA,             /* NXM or OXM for metadata fields. */
+
+    /* Private properties.  These are not architectural and subject to change.
+     * Do not depend on them. */
+    NXCPT_BRIDGE = 0x8000,
+    NXCPT_STACK,
+    NXCPT_MIRRORS,
+    NXCPT_CONNTRACKED,
+    NXCPT_TABLE_ID,
+    NXCPT_COOKIE,
+    NXCPT_ACTIONS,
+    NXCPT_ACTION_SET,
+};
+
 /* Configures the "role" of the sending controller.  The default role is:
  *
  *    - Other (NX_ROLE_OTHER), which allows the controller access to all
diff --git a/lib/meta-flow.c b/lib/meta-flow.c
index 6bd0b99..16b9c92 100644
--- a/lib/meta-flow.c
+++ b/lib/meta-flow.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -171,6 +171,13 @@ mf_subvalue_shift(union mf_subvalue *value, int n)
     }
 }
 
+/* Appends a formatted representation of 'sv' to 's'. */
+void
+mf_subvalue_format(const union mf_subvalue *sv, struct ds *s)
+{
+    ds_put_hex(s, sv, sizeof *sv);
+}
+
 /* Returns true if 'wc' wildcards all the bits in field 'mf', false if 'wc'
  * specifies at least one bit in the field.
  *
diff --git a/lib/meta-flow.h b/lib/meta-flow.h
index 95090fe..b335ca0 100644
--- a/lib/meta-flow.h
+++ b/lib/meta-flow.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -1940,6 +1940,7 @@ bool mf_subvalue_intersect(const union mf_subvalue *a_value,
                            union mf_subvalue *dst_mask);
 int mf_subvalue_width(const union mf_subvalue *);
 void mf_subvalue_shift(union mf_subvalue *, int n);
+void mf_subvalue_format(const union mf_subvalue *, struct ds *);
 
 /* An array of fields with values */
 struct field_array {
diff --git a/lib/ofp-actions.c b/lib/ofp-actions.c
index ff3bc12..9a3fd8d 100644
--- a/lib/ofp-actions.c
+++ b/lib/ofp-actions.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
+ * Copyright (c) 2008-2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -295,6 +295,9 @@ enum ofp_raw_action_type {
     /* NX1.0+(36): struct nx_action_nat, ... */
     NXAST_RAW_NAT,
 
+    /* NX1.0+(37): struct nx_action_pause, ... */
+    NXAST_RAW_PAUSE,
+
 /* ## ------------------ ## */
 /* ## Debugging actions. ## */
 /* ## ------------------ ## */
@@ -373,6 +376,7 @@ ofpact_next_flattened(const struct ofpact *ofpact)
     case OFPACT_OUTPUT:
     case OFPACT_GROUP:
     case OFPACT_CONTROLLER:
+    case OFPACT_PAUSE:
     case OFPACT_ENQUEUE:
     case OFPACT_OUTPUT_REG:
     case OFPACT_BUNDLE:
@@ -737,6 +741,66 @@ format_CONTROLLER(const struct ofpact_controller *a, struct ds *s)
     }
 }
 
+/* Action structure for NXAST_PAUSE. */
+struct nx_action_pause {
+    ovs_be16 type;                  /* OFPAT_VENDOR. */
+    ovs_be16 len;                   /* Length is 16. */
+    ovs_be32 vendor;                /* NX_VENDOR_ID. */
+    ovs_be16 subtype;               /* NXAST_CONTROLLER. */
+    ovs_be16 controller_id;         /* Controller ID to send packet-in. */
+    uint8_t zero[4];                /* Must be zero. */
+};
+OFP_ASSERT(sizeof(struct nx_action_pause) == 16);
+
+static enum ofperr
+decode_NXAST_RAW_PAUSE(const struct nx_action_pause *nap,
+                       enum ofp_version ofp_version OVS_UNUSED,
+                       struct ofpbuf *out)
+{
+    struct ofpact_pause *pause = ofpact_put_PAUSE(out);
+    pause->controller_id = ntohs(nap->controller_id);
+    return 0;
+}
+
+static void
+encode_PAUSE(const struct ofpact_pause *pause,
+             enum ofp_version ofp_version OVS_UNUSED, struct ofpbuf *out)
+{
+    struct nx_action_pause *nap = put_NXAST_PAUSE(out);
+    nap->controller_id = htons(pause->controller_id);
+}
+
+static char * OVS_WARN_UNUSED_RESULT
+parse_PAUSE(char *arg, struct ofpbuf *ofpacts,
+            enum ofputil_protocol *usable_protocols OVS_UNUSED)
+{
+    struct ofpact_pause *pause = ofpact_put_PAUSE(ofpacts);
+    char *name, *value;
+
+    while (ofputil_parse_key_value(&arg, &name, &value)) {
+        if (!strcmp(name, "id")) {
+            char *error = str_to_u16(value, "id", &pause->controller_id);
+            if (error) {
+                return error;
+            }
+        } else {
+            return xasprintf("unknown key \"%s\" parsing controller "
+                             "action", name);
+        }
+    }
+
+    return NULL;
+}
+
+static void
+format_PAUSE(const struct ofpact_pause *pause, struct ds *s)
+{
+    ds_put_cstr(s, "pause");
+    if (pause->controller_id) {
+        ds_put_format(s, "(id=%"PRIu16")", pause->controller_id);
+    }
+}
+
 /* Enqueue action. */
 struct ofp10_action_enqueue {
     ovs_be16 type;            /* OFPAT10_ENQUEUE. */
@@ -5725,6 +5789,7 @@ ofpact_is_set_or_move_action(const struct ofpact *a)
     case OFPACT_CT:
     case OFPACT_NAT:
     case OFPACT_CONTROLLER:
+    case OFPACT_PAUSE:
     case OFPACT_DEC_MPLS_TTL:
     case OFPACT_DEC_TTL:
     case OFPACT_ENQUEUE:
@@ -5798,6 +5863,7 @@ ofpact_is_allowed_in_actions_set(const struct ofpact *a)
      * in the action set is undefined. */
     case OFPACT_BUNDLE:
     case OFPACT_CONTROLLER:
+    case OFPACT_PAUSE:
     case OFPACT_CT:
     case OFPACT_NAT:
     case OFPACT_ENQUEUE:
@@ -5989,6 +6055,7 @@ ovs_instruction_type_from_ofpact_type(enum ofpact_type type)
     case OFPACT_OUTPUT:
     case OFPACT_GROUP:
     case OFPACT_CONTROLLER:
+    case OFPACT_PAUSE:
     case OFPACT_ENQUEUE:
     case OFPACT_OUTPUT_REG:
     case OFPACT_BUNDLE:
@@ -6407,6 +6474,9 @@ ofpact_check__(enum ofputil_protocol *usable_protocols, struct ofpact *a,
     case OFPACT_CONTROLLER:
         return 0;
 
+    case OFPACT_PAUSE:
+        return 0;
+
     case OFPACT_ENQUEUE:
         enqueue = ofpact_get_ENQUEUE(a);
         if (ofp_to_u16(enqueue->port) >= ofp_to_u16(max_ports)
@@ -7095,6 +7165,7 @@ ofpact_outputs_to_port(const struct ofpact *ofpact, ofp_port_t port)
         return port == OFPP_CONTROLLER;
 
     case OFPACT_OUTPUT_REG:
+    case OFPACT_PAUSE:
     case OFPACT_BUNDLE:
     case OFPACT_SET_VLAN_VID:
     case OFPACT_SET_VLAN_PCP:
diff --git a/lib/ofp-actions.h b/lib/ofp-actions.h
index 5dec177..776dea2 100644
--- a/lib/ofp-actions.h
+++ b/lib/ofp-actions.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
     OFPACT(OUTPUT,          ofpact_output,      ofpact, "output")       \
     OFPACT(GROUP,           ofpact_group,       ofpact, "group")        \
     OFPACT(CONTROLLER,      ofpact_controller,  ofpact, "controller")   \
+    OFPACT(PAUSE,           ofpact_pause,       ofpact, "pause")        \
     OFPACT(ENQUEUE,         ofpact_enqueue,     ofpact, "enqueue")      \
     OFPACT(OUTPUT_REG,      ofpact_output_reg,  ofpact, "output_reg")   \
     OFPACT(BUNDLE,          ofpact_bundle,      slaves, "bundle")       \
@@ -731,6 +732,12 @@ struct ofpact_conjunction {
     uint32_t id;
 };
 
+/* OFPACT_PAUSE. */
+struct ofpact_pause {
+    struct ofpact ofpact;
+    uint16_t controller_id;     /* Controller ID to send closure. */
+};
+
 /* OFPACT_MULTIPATH.
  *
  * Used for NXAST_MULTIPATH. */
diff --git a/lib/ofp-errors.h b/lib/ofp-errors.h
index 8e13873..204eb29 100644
--- a/lib/ofp-errors.h
+++ b/lib/ofp-errors.h
@@ -764,9 +764,13 @@ enum ofperr {
      * to be mapped is the same as one assigned to a different field. */
     OFPERR_NXTTMFC_DUP_ENTRY,
 
-/* ## ------------------ ## */
-/* ## OFPET_EXPERIMENTER ## */
-/* ## ------------------ ## */
+/* ## ---------- ## */
+/* ## NXT_RESUME ## */
+/* ## ---------- ## */
+
+    /* NX1.0-1.1(1,533), NX1.2+(24).  This datapath doesn't support
+     * NXT_RESUME. */
+    OFPERR_NXR_NOT_SUPPORTED,
 };
 
 const char *ofperr_domain_get_name(enum ofp_version);
diff --git a/lib/ofp-msgs.h b/lib/ofp-msgs.h
index a15efb6..5156c94 100644
--- a/lib/ofp-msgs.h
+++ b/lib/ofp-msgs.h
@@ -450,6 +450,12 @@ enum ofpraw {
 
     /* NXT 1.0+ (26): struct nx_tlv_table_reply, struct nx_tlv_map[]. */
     OFPRAW_NXT_TLV_TABLE_REPLY,
+
+    /* NXT 1.0+ (28): uint8_t[8][]. */
+    OFPRAW_NXT_CLOSURE,
+
+    /* NXT 1.0+ (29): uint8_t[8][]. */
+    OFPRAW_NXT_RESUME,
 };
 
 /* Decoding messages into OFPRAW_* values. */
@@ -667,6 +673,8 @@ enum ofptype {
     OFPTYPE_NXT_TLV_TABLE_MOD, /* OFPRAW_NXT_TLV_TABLE_MOD. */
     OFPTYPE_NXT_TLV_TABLE_REQUEST, /* OFPRAW_NXT_TLV_TABLE_REQUEST. */
     OFPTYPE_NXT_TLV_TABLE_REPLY, /* OFPRAW_NXT_TLV_TABLE_REPLY. */
+    OFPTYPE_NXT_CLOSURE,         /* OFPRAW_NXT_CLOSURE. */
+    OFPTYPE_NXT_RESUME,          /* OFPRAW_NXT_RESUME. */
 
     /* Flow monitor extension. */
     OFPTYPE_FLOW_MONITOR_CANCEL,        /* OFPRAW_NXT_FLOW_MONITOR_CANCEL. */
diff --git a/lib/ofp-print.c b/lib/ofp-print.c
index 5c9a6ab..55c9b50 100644
--- a/lib/ofp-print.c
+++ b/lib/ofp-print.c
@@ -152,6 +152,66 @@ ofp_print_packet_in(struct ds *string, const struct ofp_header *oh,
 }
 
 static void
+ofp_print_closure(struct ds *string, const struct ofp_header *oh)
+{
+    struct ofputil_closure_private closure;
+    struct ofputil_closure *public = &closure.public;
+    enum ofperr error;
+
+    error = ofputil_decode_closure_private(oh, true, &closure);
+    if (error) {
+        ofp_print_error(string, error);
+        return;
+    }
+
+    ds_put_char(string, '\n');
+
+    ds_put_cstr(string, " packet=");
+    char *packet = ofp_packet_to_string(public->packet, public->packet_len);
+    ds_put_cstr(string, packet);
+    free(packet);
+
+    ds_put_cstr(string, " metadata=");
+    match_format(&public->metadata, string, OFP_DEFAULT_PRIORITY);
+    ds_put_char(string, '\n');
+
+    ds_put_format(string, " bridge="UUID_FMT"\n",
+                  UUID_ARGS(&closure.bridge));
+
+    if (closure.n_stack) {
+        ds_put_cstr(string, " stack=");
+        for (size_t i = 0; i < closure.n_stack; i++) {
+            if (i) {
+                ds_put_char(string, ' ');
+            }
+            mf_subvalue_format(&closure.stack[i], string);
+        }
+    }
+
+    if (closure.mirrors) {
+        ds_put_format(string, " mirrors=0x%"PRIx32"\n", closure.mirrors);
+    }
+
+    if (closure.conntracked) {
+        ds_put_cstr(string, " conntracked=true\n");
+    }
+
+    if (closure.actions_len) {
+        ds_put_cstr(string, " actions=");
+        ofpacts_format(closure.actions, closure.actions_len, string);
+        ds_put_char(string, '\n');
+    }
+
+    if (closure.action_set_len) {
+        ds_put_cstr(string, " action_set=");
+        ofpacts_format(closure.action_set, closure.action_set_len, string);
+        ds_put_char(string, '\n');
+    }
+
+    ofputil_closure_private_destroy(&closure);
+}
+
+static void
 ofp_print_packet_out(struct ds *string, const struct ofp_header *oh,
                      int verbosity)
 {
@@ -2057,6 +2117,10 @@ ofp_async_config_reason_to_string(uint32_t reason,
     case OAM_REQUESTFORWARD:
         return ofp_requestforward_reason_to_string(reason, reasonbuf, bufsize);
 
+    case OAM_CLOSURE:
+        /* There's only one "reason". */
+        return "enable";
+
     case OAM_N_TYPES:
     default:
         return "Unknown asynchronous configuration message type";
@@ -3352,6 +3416,10 @@ ofp_to_string__(const struct ofp_header *oh, enum ofpraw raw,
         ofp_print_tlv_table_reply(string, msg);
         break;
 
+    case OFPTYPE_NXT_CLOSURE:
+    case OFPTYPE_NXT_RESUME:
+        ofp_print_closure(string, msg);
+        break;
     }
 }
 
diff --git a/lib/ofp-util.c b/lib/ofp-util.c
index 48e2e8e..a5ed33b 100644
--- a/lib/ofp-util.c
+++ b/lib/ofp-util.c
@@ -3656,6 +3656,342 @@ ofputil_packet_in_reason_from_string(const char *s,
     return false;
 }
 
+void
+ofputil_closure_destroy(struct ofputil_closure *closure)
+{
+    if (closure) {
+        free(closure->packet);
+    }
+}
+
+void
+ofputil_closure_private_destroy(struct ofputil_closure_private *closure)
+{
+    if (closure) {
+        ofputil_closure_destroy(&closure->public);
+        free(closure->stack);
+        free(closure->actions);
+        free(closure->action_set);
+    }
+}
+
+static void
+ofputil_put_closure(const struct ofputil_closure *closure,
+                    enum ofputil_protocol protocol, struct ofpbuf *msg)
+{
+    enum ofp_version version = ofputil_protocol_to_ofp_version(protocol);
+
+    ofpprop_put(msg, NXCPT_PACKET, closure->packet, closure->packet_len);
+
+    size_t start = ofpprop_start(msg, NXCPT_METADATA);
+    oxm_put_raw(msg, &closure->metadata, version);
+    ofpprop_end(msg, start);
+}
+
+static void
+put_actions_property(struct ofpbuf *msg, uint64_t prop_type,
+                     enum ofp_version version,
+                     const struct ofpact *actions, size_t actions_len)
+{
+    if (actions_len) {
+        size_t start = ofpprop_start(msg, prop_type);
+        ofpbuf_padto(msg, ROUND_UP(msg->size, 8));
+        ofpacts_put_openflow_actions(actions, actions_len, msg, version);
+        ofpprop_end(msg, start);
+    }
+}
+
+static void
+ofputil_put_closure_private(const struct ofputil_closure_private *closure,
+                            enum ofputil_protocol protocol, struct ofpbuf *msg)
+{
+    enum ofp_version version = ofputil_protocol_to_ofp_version(protocol);
+
+    ofputil_put_closure(&closure->public, protocol, msg);
+
+    ofpprop_put_uuid(msg, NXCPT_BRIDGE, &closure->bridge);
+
+    for (size_t i = 0; i < closure->n_stack; i++) {
+        const union mf_subvalue *s = &closure->stack[i];
+        size_t ofs;
+        for (ofs = 0; ofs < sizeof *s; ofs++) {
+            if (s->u8[ofs]) {
+                break;
+            }
+        }
+
+        ofpprop_put(msg, NXCPT_STACK, &s->u8[ofs], sizeof *s - ofs);
+    }
+
+    if (closure->mirrors) {
+        ofpprop_put_u32(msg, NXCPT_MIRRORS, closure->mirrors);
+    }
+
+    if (closure->conntracked) {
+        ofpprop_put_flag(msg, NXCPT_CONNTRACKED);
+    }
+
+    if (closure->actions_len) {
+        const struct ofpact *const end = ofpact_end(closure->actions,
+                                                    closure->actions_len);
+        const struct ofpact_unroll_xlate *unroll = NULL;
+        uint8_t table_id = 0;
+        ovs_be64 cookie = 0;
+
+        const struct ofpact *a;
+        for (a = closure->actions; ; a = ofpact_next(a)) {
+            if (a == end || a->type == OFPACT_UNROLL_XLATE) {
+                if (unroll) {
+                    if (table_id != unroll->rule_table_id) {
+                        ofpprop_put_u8(msg, NXCPT_TABLE_ID,
+                                       unroll->rule_table_id);
+                        table_id = unroll->rule_table_id;
+                    }
+                    if (cookie != unroll->rule_cookie) {
+                        ofpprop_put_be64(msg, NXCPT_COOKIE,
+                                         unroll->rule_cookie);
+                        cookie = unroll->rule_cookie;
+                    }
+                }
+
+                const struct ofpact *start
+                    = unroll ? ofpact_next(&unroll->ofpact) : closure->actions;
+                put_actions_property(msg, NXCPT_ACTIONS, version,
+                                     start, (a - start) * sizeof *a);
+
+                if (a == end) {
+                    break;
+                }
+                unroll = ofpact_get_UNROLL_XLATE(a);
+            }
+        }
+    }
+
+    if (closure->action_set_len) {
+        size_t start = ofpprop_start(msg, NXCPT_ACTION_SET);
+        ofpbuf_padto(msg, ROUND_UP(msg->size, 8));
+        ofpacts_put_openflow_actions(closure->action_set,
+                                     closure->action_set_len, msg, version);
+        ofpprop_end(msg, start);
+    }
+}
+
+struct ofpbuf *
+ofputil_encode_resume(const struct ofputil_closure *closure,
+                      const struct ofpbuf *private_properties,
+                      enum ofputil_protocol protocol)
+{
+    enum ofp_version version = ofputil_protocol_to_ofp_version(protocol);
+    size_t extra = (closure->packet_len
+                    + NXM_TYPICAL_LEN   /* flow_metadata */
+                    + private_properties->size);
+    struct ofpbuf *msg = ofpraw_alloc_xid(OFPRAW_NXT_RESUME, version,
+                                          0, extra);
+    ofputil_put_closure(closure, protocol, msg);
+    ofpbuf_put(msg, private_properties->data, private_properties->size);
+    ofpmsg_update_length(msg);
+    return msg;
+}
+
+struct ofpbuf *
+ofputil_encode_closure_private(const struct ofputil_closure_private *closure,
+                               enum ofputil_protocol protocol)
+{
+    enum ofp_version version = ofputil_protocol_to_ofp_version(protocol);
+    size_t extra = (closure->public.packet_len
+                    + NXM_TYPICAL_LEN   /* flow_metadata */
+                    + closure->n_stack * 16
+                    + closure->actions_len
+                    + closure->action_set_len
+                    + 256);     /* fudge factor */
+    struct ofpbuf *msg = ofpraw_alloc_xid(OFPRAW_NXT_CLOSURE, version,
+                                          0, extra);
+    ofputil_put_closure_private(closure, protocol, msg);
+    ofpmsg_update_length(msg);
+    return msg;
+}
+
+static enum ofperr
+parse_subvalue_prop(const struct ofpbuf *property, union mf_subvalue *sv)
+{
+    unsigned int len = ofpbuf_msgsize(property);
+    if (len > sizeof *sv) {
+        VLOG_WARN_RL(&bad_ofmsg_rl, "NXCPT_STACK property has bad length %u",
+                     len);
+        return OFPERR_OFPBPC_BAD_LEN;
+    }
+    memset(sv, 0, sizeof *sv);
+    memcpy(&sv->u8[sizeof *sv - len], property->msg, len);
+    return 0;
+}
+
+enum ofperr
+ofputil_decode_closure(const struct ofp_header *oh,
+                       struct ofputil_closure *closure,
+                       struct ofpbuf *private_properties)
+{
+    memset(closure, 0, sizeof *closure);
+
+    struct ofpbuf properties;
+    ofpbuf_use_const(&properties, oh, ntohs(oh->length));
+    ofpraw_pull_assert(&properties);
+
+    while (properties.size > 0) {
+        struct ofpbuf payload;
+        uint64_t type;
+
+        enum ofperr error = ofpprop_pull(&properties, &payload, &type);
+        if (error) {
+            return error;
+        }
+
+        if (type >= 0x8000) {
+            if (private_properties) {
+                ofpbuf_put(private_properties, payload.data, payload.size);
+                ofpbuf_padto(private_properties,
+                             ROUND_UP(private_properties->size, 8));
+            }
+            continue;
+        }
+
+        switch (type) {
+        case NXCPT_PACKET:
+            free(closure->packet);
+            closure->packet_len = ofpbuf_msgsize(&payload);
+            closure->packet = xmemdup(payload.msg, closure->packet_len);
+            break;
+
+        case NXCPT_METADATA:
+            error = oxm_decode_match(payload.msg, ofpbuf_msgsize(&payload),
+                                     &closure->metadata);
+            break;
+
+        default:
+            error = OFPPROP_UNKNOWN(false, "closure", type);
+            break;
+        }
+        if (error) {
+            ofputil_closure_destroy(closure);
+            return error;
+        }
+    }
+
+    return 0;
+}
+
+static enum ofperr
+parse_actions_property(struct ofpbuf *property, enum ofp_version version,
+                       struct ofpbuf *ofpacts)
+{
+    if (!ofpbuf_try_pull(property, ROUND_UP(ofpbuf_headersize(property), 8))) {
+        VLOG_WARN_RL(&bad_ofmsg_rl,
+                     "actions property has bad length %"PRIuSIZE,
+                     property->size);
+        return OFPERR_OFPBPC_BAD_LEN;
+    }
+
+    return ofpacts_pull_openflow_actions(property, property->size,
+                                         version, ofpacts);
+}
+
+enum ofperr
+ofputil_decode_closure_private(const struct ofp_header *oh, bool loose,
+                               struct ofputil_closure_private *closure)
+{
+    memset(closure, 0, sizeof *closure);
+    enum ofperr error = ofputil_decode_closure(oh, &closure->public, NULL);
+    if (error) {
+        return error;
+    }
+
+    struct ofpbuf properties;
+    ofpbuf_use_const(&properties, oh, ntohs(oh->length));
+    ofpraw_pull_assert(&properties);
+
+    struct ofpbuf actions, action_set;
+    ofpbuf_init(&actions, 0);
+    ofpbuf_init(&action_set, 0);
+
+    uint8_t table_id = 0;
+    ovs_be64 cookie = 0;
+
+    size_t allocated_stack = 0;
+
+    while (properties.size > 0) {
+        struct ofpbuf payload;
+        uint64_t type;
+
+        error = ofpprop_pull(&properties, &payload, &type);
+        ovs_assert(!error);
+
+        if (type < 0x8000) {
+            continue;
+        }
+
+        switch (type) {
+        case NXCPT_BRIDGE:
+            error = ofpprop_parse_uuid(&payload, &closure->bridge);
+            break;
+
+        case NXCPT_STACK:
+            if (closure->n_stack >= allocated_stack) {
+                closure->stack = x2nrealloc(closure->stack, &allocated_stack,
+                                           sizeof *closure->stack);
+            }
+            error = parse_subvalue_prop(&payload,
+                                        &closure->stack[closure->n_stack++]);
+            break;
+
+        case NXCPT_MIRRORS:
+            error = ofpprop_parse_u32(&payload, &closure->mirrors);
+            break;
+
+        case NXCPT_CONNTRACKED:
+            closure->conntracked = true;
+            break;
+
+        case NXCPT_TABLE_ID:
+            error = ofpprop_parse_u8(&payload, &table_id);
+            break;
+
+        case NXCPT_COOKIE:
+            error = ofpprop_parse_be64(&payload, &cookie);
+            break;
+
+        case NXCPT_ACTIONS: {
+            struct ofpact_unroll_xlate *unroll
+                = ofpact_put_UNROLL_XLATE(&actions);
+            unroll->rule_table_id = table_id;
+            unroll->rule_cookie = cookie;
+            error = parse_actions_property(&payload, oh->version, &actions);
+            break;
+        }
+
+        case NXCPT_ACTION_SET:
+            error = parse_actions_property(&payload, oh->version, &action_set);
+            break;
+
+        default:
+            error = OFPPROP_UNKNOWN(loose, "closure", type);
+            break;
+        }
+        if (error) {
+            break;
+        }
+    }
+
+    closure->actions_len = actions.size;
+    closure->actions = ofpbuf_steal_data(&actions);
+    closure->action_set_len = action_set.size;
+    closure->action_set = ofpbuf_steal_data(&action_set);
+
+    if (error) {
+        ofputil_closure_private_destroy(closure);
+    }
+
+    return 0;
+}
+
 /* Converts an OFPT_PACKET_OUT in 'opo' into an abstract ofputil_packet_out in
  * 'po'.
  *
@@ -9241,6 +9577,8 @@ ofputil_is_bundlable(enum ofptype type)
     case OFPTYPE_REQUESTFORWARD:
     case OFPTYPE_NXT_TLV_TABLE_REQUEST:
     case OFPTYPE_NXT_TLV_TABLE_REPLY:
+    case OFPTYPE_NXT_CLOSURE:
+    case OFPTYPE_NXT_RESUME:
         break;
     }
 
@@ -9471,6 +9809,7 @@ ofputil_async_msg_type_to_string(enum ofputil_async_msg_type type)
     case OAM_ROLE_STATUS:    return "ROLE_STATUS";
     case OAM_TABLE_STATUS:   return "TABLE_STATUS";
     case OAM_REQUESTFORWARD: return "REQUESTFORWARD";
+    case OAM_CLOSURE:        return "CLOSURE";
 
     case OAM_N_TYPES:
     default:
@@ -9496,6 +9835,8 @@ static const struct ofp14_async_prop async_props[] = {
     AP_PAIR( 6, OAM_ROLE_STATUS,    (1 << OFPCRR_N_REASONS) - 1, 0),
     AP_PAIR( 8, OAM_TABLE_STATUS,   OFPTR_BITS, 0),
     AP_PAIR(10, OAM_REQUESTFORWARD, (1 << OFPRFR_N_REASONS) - 1, 0),
+
+    AP_PAIR(OFPPROP_EXP(NX_VENDOR_ID, 0), OAM_CLOSURE, 1, 0),
 };
 
 static const struct ofp14_async_prop *
diff --git a/lib/ofp-util.h b/lib/ofp-util.h
index cf77d8a..77493f3 100644
--- a/lib/ofp-util.h
+++ b/lib/ofp-util.h
@@ -31,6 +31,7 @@
 #include "openflow/nicira-ext.h"
 #include "openvswitch/types.h"
 #include "type-props.h"
+#include "uuid.h"
 
 struct ofpbuf;
 union ofp_action;
@@ -459,6 +460,56 @@ const char *ofputil_packet_in_reason_to_string(enum ofp_packet_in_reason,
 bool ofputil_packet_in_reason_from_string(const char *,
                                           enum ofp_packet_in_reason *);
 
+/* Abstract NXT_CLOSURE. */
+struct ofputil_closure {
+    /* NXCPT_PACKET. */
+    void *packet;
+    size_t packet_len;
+
+    /* NXCPT_METADATA. */
+    struct match metadata;
+};
+void ofputil_closure_destroy(struct ofputil_closure *);
+
+enum ofperr ofputil_decode_closure(const struct ofp_header *,
+                                   struct ofputil_closure *,
+                                   struct ofpbuf *private_properties);
+struct ofpbuf *ofputil_encode_resume(const struct ofputil_closure *,
+                                     const struct ofpbuf *private_properties,
+                                     enum ofputil_protocol);
+
+struct ofputil_closure_private {
+    struct ofputil_closure public;
+
+    /* NXCPT_BRIDGE. */
+    struct uuid bridge;
+
+    /* NXCPT_STACK. */
+    union mf_subvalue *stack;
+    size_t n_stack;
+
+    /* NXCPT_MIRRORS. */
+    uint32_t mirrors;
+
+    /* NXCPT_CONNTRACKED. */
+    bool conntracked;
+
+    /* NXCPT_ACTIONS. */
+    struct ofpact *actions;
+    size_t actions_len;
+
+    /* NXCPT_ACTION_SET. */
+    struct ofpact *action_set;
+    size_t action_set_len;
+};
+void ofputil_closure_private_destroy(struct ofputil_closure_private *);
+
+enum ofperr ofputil_decode_closure_private(const struct ofp_header *,
+                                           bool loose,
+                                           struct ofputil_closure_private *);
+struct ofpbuf *ofputil_encode_closure_private(
+    const struct ofputil_closure_private *, enum ofputil_protocol);
+
 /* Abstract packet-out message.
  *
  * ofputil_decode_packet_out() will ensure that 'in_port' is a physical port
@@ -1311,8 +1362,9 @@ enum ofputil_async_msg_type {
     OAM_TABLE_STATUS,           /* OFPT_TABLE_STATUS. */
     OAM_REQUESTFORWARD,         /* OFPT_REQUESTFORWARD. */
 
-    /* Extension asynchronous messages (none yet--coming soon!). */
-#define OAM_EXTENSIONS 0        /* Bitmap of all extensions. */
+    /* Extension asynchronous messages. */
+    OAM_CLOSURE,                /* NXT_CLOSURE. */
+#define OAM_EXTENSIONS (1u << OAM_CLOSURE) /* Bitmap of all extensions. */
 
     OAM_N_TYPES
 };
diff --git a/lib/rconn.c b/lib/rconn.c
index b601eed..6b97b26 100644
--- a/lib/rconn.c
+++ b/lib/rconn.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -1420,6 +1420,8 @@ is_admitted_msg(const struct ofpbuf *b)
     case OFPTYPE_NXT_TLV_TABLE_MOD:
     case OFPTYPE_NXT_TLV_TABLE_REQUEST:
     case OFPTYPE_NXT_TLV_TABLE_REPLY:
+    case OFPTYPE_NXT_CLOSURE:
+    case OFPTYPE_NXT_RESUME:
     default:
         return true;
     }
diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index 5161a15..6779184 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -47,6 +47,13 @@
 VLOG_DEFINE_THIS_MODULE(connmgr);
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
+enum ofconn_scheduler {
+    SCHED_ACTION,               /* Packet-ins due to actions. */
+    SCHED_MISS,                 /* Packet-ins due to flow table misses. */
+    SCHED_CLOSURE,              /* Closures. */
+    N_SCHEDULERS
+};
+
 /* An OpenFlow connection.
  *
  *
@@ -78,7 +85,6 @@ struct ofconn {
 
     /* OFPT_PACKET_IN related data. */
     struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
-#define N_SCHEDULERS 2
     struct pinsched *schedulers[N_SCHEDULERS];
     struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
     int miss_send_len;             /* Bytes to send of buffered packets. */
@@ -500,7 +506,10 @@ connmgr_get_controller_info(struct connmgr *mgr, struct shash *info)
 
             for (i = 0; i < N_SCHEDULERS; i++) {
                 if (ofconn->schedulers[i]) {
-                    const char *name = i ? "miss" : "action";
+                    const char *name
+                        = (i == SCHED_ACTION ? "action"
+                           : i == SCHED_MISS ? "miss"
+                           : "closure");
                     struct pinsched_stats stats;
 
                     pinsched_get_stats(ofconn->schedulers[i], &stats);
@@ -1661,24 +1670,44 @@ connmgr_send_async_msg(struct connmgr *mgr,
     LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
         enum ofputil_protocol protocol = ofconn_get_protocol(ofconn);
         if (protocol == OFPUTIL_P_NONE || !rconn_is_connected(ofconn->rconn)
-            || ofconn->controller_id != am->controller_id
-            || !ofconn_receives_async_msg(ofconn, am->oam,
-                                          am->pin.up.reason)) {
+            || ofconn->controller_id != am->controller_id) {
             continue;
         }
 
-        struct ofpbuf *msg = ofputil_encode_packet_in(
-            &am->pin.up, protocol, ofconn->packet_in_format,
-            am->pin.max_len >= 0 ? am->pin.max_len : ofconn->miss_send_len,
-            ofconn->pktbuf);
+        struct ofpbuf *msg;
+        enum ofconn_scheduler sched;
+        ofp_port_t port;
+        if (am->oam == OAM_PACKET_IN) {
+            if (!ofconn_receives_async_msg(ofconn, am->oam,
+                                           am->pin.up.reason)) {
+                continue;
+            }
+
+            uint16_t max_len = (am->pin.max_len >= 0
+                                ? am->pin.max_len
+                                : ofconn->miss_send_len);
+            msg = ofputil_encode_packet_in(&am->pin.up, protocol,
+                                           ofconn->packet_in_format,
+                                           max_len, ofconn->pktbuf);
+            sched = ((am->pin.up.reason == OFPR_NO_MATCH ||
+                      am->pin.up.reason == OFPR_EXPLICIT_MISS ||
+                      am->pin.up.reason == OFPR_IMPLICIT_MISS)
+                     ? SCHED_MISS : SCHED_ACTION);
+            port = am->pin.up.flow_metadata.flow.in_port.ofp_port;
+        } else if (am->oam == OAM_CLOSURE) {
+            if (!ofconn_receives_async_msg(ofconn, am->oam, 0)) {
+                continue;
+            }
+
+            msg = ofputil_encode_closure_private(&am->closure, protocol);
+            sched = SCHED_CLOSURE;
+            port = 0; /* XXX */
+        } else {
+            OVS_NOT_REACHED();
+        }
 
         struct ovs_list txq;
-        bool is_miss = (am->pin.up.reason == OFPR_NO_MATCH ||
-                        am->pin.up.reason == OFPR_EXPLICIT_MISS ||
-                        am->pin.up.reason == OFPR_IMPLICIT_MISS);
-        pinsched_send(ofconn->schedulers[is_miss],
-                      am->pin.up.flow_metadata.flow.in_port.ofp_port /* XXX */,
-                      msg, &txq);
+        pinsched_send(ofconn->schedulers[sched], port, msg, &txq);
         do_send_packet_ins(ofconn, &txq);
     }
 }
@@ -2247,6 +2276,10 @@ ofmonitor_wait(struct connmgr *mgr)
 void
 ofproto_async_msg_free(struct ofproto_async_msg *am)
 {
-    free(CONST_CAST(void *, am->pin.up.packet));
+    if (am->oam == OAM_PACKET_IN) {
+        free(CONST_CAST(void *, am->pin.up.packet));
+    } else if (am->oam == OAM_CLOSURE) {
+        ofputil_closure_private_destroy(&am->closure);
+    }
     free(am);
 }
diff --git a/ofproto/connmgr.h b/ofproto/connmgr.h
index fb7573e..e3c6b59 100644
--- a/ofproto/connmgr.h
+++ b/ofproto/connmgr.h
@@ -66,6 +66,9 @@ struct ofproto_async_msg {
             struct ofputil_packet_in up;
             int max_len;                /* From action, or -1 if none. */
         } pin;
+
+        /* OAM_CLOSURE. */
+        struct ofputil_closure_private closure;
     };
 };
 void ofproto_async_msg_free(struct ofproto_async_msg *);
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
index 184eb46..9a067d1 100644
--- a/ofproto/ofproto-dpif-xlate.c
+++ b/ofproto/ofproto-dpif-xlate.c
@@ -294,6 +294,7 @@ struct xlate_ctx {
                                  * executed after recirculation, or -1. */
     int last_unroll_offset;     /* Offset in 'action_set' to the latest unroll
                                  * action, or -1. */
+    const struct ofpact_pause *pause;
 
     /* True if a packet was but is no longer MPLS (due to an MPLS pop action).
      * This is a trigger for recirculation in cases where translating an action
@@ -3628,6 +3629,35 @@ execute_controller_action(struct xlate_ctx *ctx, int len,
 }
 
 static void
+emit_closure(struct xlate_ctx *ctx, const struct recirc_state *state)
+{
+    struct ofproto_async_msg *am = xmalloc(sizeof *am);
+    *am = (struct ofproto_async_msg) {
+        .controller_id = ctx->pause->controller_id,
+        .oam = OAM_CLOSURE,
+        .closure = {
+            .public = {
+                .packet = xmemdup(dp_packet_data(ctx->xin->packet),
+                                  dp_packet_size(ctx->xin->packet)),
+                .packet_len = dp_packet_size(ctx->xin->packet),
+            },
+            .bridge = *ofproto_dpif_get_uuid(ctx->xbridge->ofproto),
+            .stack = xmemdup(state->stack,
+                             state->n_stack * sizeof *state->stack),
+            .n_stack = state->n_stack,
+            .mirrors = state->mirrors,
+            .conntracked = state->conntracked,
+            .actions = xmemdup(state->ofpacts, state->ofpacts_len),
+            .actions_len = state->ofpacts_len,
+            .action_set = xmemdup(state->action_set, state->action_set_len),
+            .action_set_len = state->action_set_len,
+        }
+    };
+    flow_get_metadata(&ctx->xin->flow, &am->closure.public.metadata);
+    ofproto_dpif_send_async_msg(ctx->xbridge->ofproto, am);
+}
+
+static void
 compose_recirculate_action__(struct xlate_ctx *ctx, uint8_t table)
 {
     struct recirc_metadata md;
@@ -3652,20 +3682,26 @@ compose_recirculate_action__(struct xlate_ctx *ctx, uint8_t table)
         .action_set_len = ctx->recirc_action_offset,
     };
 
-    /* Allocate a unique recirc id for the given metadata state in the
-     * flow.  An existing id, with a new reference to the corresponding
-     * recirculation context, will be returned if possible.
-     * The life-cycle of this recirc id is managed by associating it
-     * with the udpif key ('ukey') created for each new datapath flow. */
-    id = recirc_alloc_id_ctx(&state);
-    if (!id) {
-        XLATE_REPORT_ERROR(ctx, "Failed to allocate recirculation id");
-        ctx->error = XLATE_NO_RECIRCULATION_CONTEXT;
-        return;
-    }
-    recirc_refs_add(&ctx->xout->recircs, id);
+    if (ctx->pause) {
+        if (ctx->xin->packet) {
+            emit_closure(ctx, &state);
+        }
+    } else {
+        /* Allocate a unique recirc id for the given metadata state in the
+         * flow.  An existing id, with a new reference to the corresponding
+         * recirculation context, will be returned if possible.
+         * The life-cycle of this recirc id is managed by associating it
+         * with the udpif key ('ukey') created for each new datapath flow. */
+        id = recirc_alloc_id_ctx(&state);
+        if (!id) {
+            XLATE_REPORT_ERROR(ctx, "Failed to allocate recirculation id");
+            ctx->error = XLATE_NO_RECIRCULATION_CONTEXT;
+            return;
+        }
+        recirc_refs_add(&ctx->xout->recircs, id);
 
-    nl_msg_put_u32(ctx->odp_actions, OVS_ACTION_ATTR_RECIRC, id);
+        nl_msg_put_u32(ctx->odp_actions, OVS_ACTION_ATTR_RECIRC, id);
+    }
 
     /* Undo changes done by recirculation. */
     ctx->action_set.size = ctx->recirc_action_offset;
@@ -4180,6 +4216,7 @@ recirc_unroll_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
         case OFPACT_GROUP:
         case OFPACT_OUTPUT:
         case OFPACT_CONTROLLER:
+        case OFPACT_PAUSE:
         case OFPACT_DEC_MPLS_TTL:
         case OFPACT_DEC_TTL:
             recirc_put_unroll_xlate(ctx);
@@ -4717,6 +4754,13 @@ do_xlate_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
             break;
         }
 
+        case OFPACT_PAUSE:
+            ctx->pause = ofpact_get_PAUSE(a);
+            ctx->xout->slow |= SLOW_CONTROLLER;
+            ctx_trigger_recirculation(ctx);
+            a = ofpact_next(a);
+            break;
+
         case OFPACT_EXIT:
             ctx->exit = true;
             break;
@@ -5103,6 +5147,7 @@ xlate_actions(struct xlate_in *xin, struct xlate_out *xout)
 
         .recirc_action_offset = -1,
         .last_unroll_offset = -1,
+        .pause = NULL,
 
         .was_mpls = false,
         .conntracked = false,
@@ -5143,7 +5188,7 @@ xlate_actions(struct xlate_in *xin, struct xlate_out *xout)
     COVERAGE_INC(xlate_actions);
 
     if (xin->recirc) {
-        const struct recirc_state *state = &xin->recirc->state;
+        const struct recirc_state *state = xin->recirc;
 
         xlate_report(&ctx, "Restoring state post-recirculation:");
 
@@ -5405,6 +5450,45 @@ exit:
     return ctx.error;
 }
 
+void
+xlate_closure(struct ofproto_dpif *ofproto,
+              const struct ofputil_closure_private *closure,
+              struct ofpbuf *odp_actions,
+              enum slow_path_reason *slow)
+{
+    struct dp_packet packet;
+    dp_packet_use_const(&packet, closure->public.packet,
+                        closure->public.packet_len);
+
+    struct flow flow;
+    flow_extract(&packet, &flow);
+
+    struct xlate_in xin;
+    xlate_in_init(&xin, ofproto, &flow, 0, NULL, flow.tcp_flags, &packet,
+                  NULL, odp_actions);
+
+    struct recirc_state recirc = {
+        .table_id = 0,     /* Not the table where NXAST_PAUSE was executed. */
+        .ofproto_uuid = closure->bridge,
+        .stack = closure->stack,
+        .n_stack = closure->n_stack,
+        .mirrors = closure->mirrors,
+        .conntracked = closure->conntracked,
+        .ofpacts = closure->actions,
+        .ofpacts_len = closure->actions_len,
+        .action_set = closure->action_set,
+        .action_set_len = closure->action_set_len,
+    };
+    recirc_metadata_from_flow(&recirc.metadata,
+                              &closure->public.metadata.flow);
+    xin.recirc = &recirc;
+
+    struct xlate_out xout;
+    xlate_actions(&xin, &xout);
+    *slow = xout.slow;
+    xlate_out_uninit(&xout);
+}
+
 /* Sends 'packet' out 'ofport'.
  * May modify 'packet'.
  * Returns 0 if successful, otherwise a positive errno value. */
diff --git a/ofproto/ofproto-dpif-xlate.h b/ofproto/ofproto-dpif-xlate.h
index 3b06285..338dfa1 100644
--- a/ofproto/ofproto-dpif-xlate.h
+++ b/ofproto/ofproto-dpif-xlate.h
@@ -202,6 +202,10 @@ void xlate_in_init(struct xlate_in *, struct ofproto_dpif *,
 void xlate_out_uninit(struct xlate_out *);
 void xlate_actions_for_side_effects(struct xlate_in *);
 
+void xlate_closure(struct ofproto_dpif *,
+                   const struct ofputil_closure_private *,
+                   struct ofpbuf *odp_actions, enum slow_path_reason *);
+
 int xlate_send_packet(const struct ofport_dpif *, struct dp_packet *);
 
 struct xlate_cache *xlate_cache_new(void);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 89e06aa..f9e0819 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -4424,6 +4424,37 @@ packet_out(struct ofproto *ofproto_, struct dp_packet *packet,
                                  ofpacts_len, packet);
     return 0;
 }
+
+static enum ofperr
+nxt_resume(struct ofproto *ofproto_, struct ofputil_closure_private *closure)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    uint64_t odp_actions_stub[1024 / 8];
+    struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
+    enum slow_path_reason slow;
+    xlate_closure(ofproto, closure, &odp_actions, &slow);
+
+    struct dp_packet packet;
+    dp_packet_use(&packet, closure->public.packet, closure->public.packet_len);
+    dp_packet_put_uninit(&packet, closure->public.packet_len);
+    closure->public.packet = NULL;
+    closure->public.packet_len = 0;
+
+    struct dpif_execute execute = {
+        .actions = odp_actions.data,
+        .actions_len = odp_actions.size,
+        .needs_help = (slow & SLOW_ACTION) != 0,
+        .packet = &packet,
+    };
+    enum ofperr error = dpif_execute(ofproto->backer->dpif, &execute);
+
+    /* XXX what about stats? */
+
+    ofpbuf_uninit(&odp_actions);
+    dp_packet_uninit(&packet);
+    return error;
+}
 
 /* NetFlow. */
 
@@ -5771,6 +5802,7 @@ const struct ofproto_class ofproto_dpif_class = {
     rule_execute,
     set_frag_handling,
     packet_out,
+    nxt_resume,
     set_netflow,
     get_netflow_ids,
     set_sflow,
diff --git a/ofproto/ofproto-provider.h b/ofproto/ofproto-provider.h
index b6aac0a..63359f5 100644
--- a/ofproto/ofproto-provider.h
+++ b/ofproto/ofproto-provider.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -1305,6 +1305,9 @@ struct ofproto_class {
                               const struct ofpact *ofpacts,
                               size_t ofpacts_len);
 
+    enum ofperr (*nxt_resume)(struct ofproto *ofproto,
+                              struct ofputil_closure_private *closure);
+
 /* ## ------------------------- ## */
 /* ## OFPP_NORMAL configuration ## */
 /* ## ------------------------- ## */
diff --git a/ofproto/ofproto.c b/ofproto/ofproto.c
index 3faf42a..19c647a 100644
--- a/ofproto/ofproto.c
+++ b/ofproto/ofproto.c
@@ -3414,6 +3414,27 @@ exit:
     return error;
 }
 
+static enum ofperr
+handle_nxt_resume(struct ofconn *ofconn, const struct ofp_header *oh)
+{
+    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
+    struct ofputil_closure_private closure;
+    enum ofperr error;
+
+    error = ofputil_decode_closure_private(oh, false, &closure);
+    if (error) {
+        return error;
+    }
+
+    error = (ofproto->ofproto_class->nxt_resume
+             ? ofproto->ofproto_class->nxt_resume(ofproto, &closure)
+             : OFPERR_NXR_NOT_SUPPORTED);
+
+    ofputil_closure_private_destroy(&closure);
+
+    return error;
+}
+
 static void
 update_port_config(struct ofconn *ofconn, struct ofport *port,
                    enum ofputil_port_config config,
@@ -7217,6 +7238,9 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPTYPE_GET_ASYNC_REQUEST:
         return handle_nxt_get_async_request(ofconn, oh);
 
+    case OFPTYPE_NXT_RESUME:
+        return handle_nxt_resume(ofconn, oh);
+
         /* Statistics requests. */
     case OFPTYPE_DESC_STATS_REQUEST:
         return handle_desc_stats_request(ofconn, oh);
@@ -7311,6 +7335,7 @@ handle_openflow__(struct ofconn *ofconn, const struct ofpbuf *msg)
     case OFPTYPE_ROLE_STATUS:
     case OFPTYPE_REQUESTFORWARD:
     case OFPTYPE_NXT_TLV_TABLE_REPLY:
+    case OFPTYPE_NXT_CLOSURE:
     default:
         if (ofpmsg_is_stat_request(oh)) {
             return OFPERR_OFPBRC_BAD_STAT;
diff --git a/tests/ofp-print.at b/tests/ofp-print.at
index c791cb2..fdbe0f3 100644
--- a/tests/ofp-print.at
+++ b/tests/ofp-print.at
@@ -2653,6 +2653,7 @@ OFPT_SET_ASYNC (OF1.3) (xid=0x0):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 
  slave:
        PACKET_IN: no_match action invalid_ttl
@@ -2661,6 +2662,7 @@ OFPT_SET_ASYNC (OF1.3) (xid=0x0):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 ])
 AT_CLEANUP
 
@@ -2863,6 +2865,7 @@ NXT_SET_ASYNC_CONFIG (xid=0x0):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 
  slave:
        PACKET_IN: no_match action invalid_ttl
@@ -2871,6 +2874,7 @@ NXT_SET_ASYNC_CONFIG (xid=0x0):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 ])
 AT_CLEANUP
 
@@ -2890,6 +2894,7 @@ OFPT_SET_ASYNC (OF1.4) (xid=0x2):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 
  slave:
        PACKET_IN: no_match invalid_ttl
@@ -2898,6 +2903,7 @@ OFPT_SET_ASYNC (OF1.4) (xid=0x2):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 ])
 AT_CLEANUP
 
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index bc1af8f..5508736 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -2813,6 +2813,7 @@ send: OFPT_SET_ASYNC (OF1.3) (xid=0x2):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 
  slave:
        PACKET_IN: no_match
@@ -2821,6 +2822,7 @@ send: OFPT_SET_ASYNC (OF1.3) (xid=0x2):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 dnl
 OFPT_PACKET_IN (OF1.3) (xid=0x0): cookie=0x0 total_len=60 in_port=1 (via no_match) data_len=60 (unbuffered)
 tcp,vlan_tci=0x0000,dl_src=10:11:11:11:11:11,dl_dst=50:54:00:00:00:07,nw_src=192.168.0.1,nw_dst=192.168.0.2,nw_tos=0,nw_ecn=0,nw_ttl=64,tp_src=8,tp_dst=10,tcp_flags=syn tcp_csum:0
@@ -4178,6 +4180,28 @@ AT_CHECK_UNQUOTED([tail -1 stdout], [0], [Datapath actions: 4
 OVS_VSWITCHD_STOP
 AT_CLEANUP
 
+AT_SETUP([ofproto-dpif - resume])
+OVS_VSWITCHD_START
+ADD_OF_PORTS([br0], 1, 2, 3, 4)
+
+for port in p1 p2 p3 p4; do
+    ovs-vsctl set Interface $port options:rxq_pcap="$ovs_base/$port-rx.pcap" options:tx_pcap="$ovs_base/$port-tx.pcap"
+done
+
+AT_DATA([flows.txt], [dnl
+in_port=1 actions=2,pause,4
+])
+AT_CHECK([ovs-ofctl add-flows br0 flows.txt])
+
+AT_CAPTURE_FILE([ofctl_monitor.log])
+AT_CHECK([ovs-ofctl monitor br0 resume --detach --no-chdir --pidfile 2> ofctl_monitor.log])
+
+flow="in_port(1),eth(src=50:54:00:00:00:05,dst=50:54:00:00:00:07),eth_type(0x0800),ipv4(src=192.168.0.1,dst=192.168.0.2,proto=1,tos=0,ttl=128,frag=no),icmp(type=8,code=0)"
+AT_CHECK([ovs-appctl netdev-dummy/receive p1 "$flow"], [0], [stdout])
+
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
 # Two testcases below are for the ofproto/trace command
 # The first one tests all correct syntax:
 # ofproto/trace [dp_name] odp_flow [-generate|packet]
diff --git a/tests/ofproto.at b/tests/ofproto.at
index 61a6be5..0dfb440 100644
--- a/tests/ofproto.at
+++ b/tests/ofproto.at
@@ -3848,6 +3848,7 @@ OFPT_GET_ASYNC_REPLY (OF1.3):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 
  slave:
        PACKET_IN: (off)
@@ -3856,6 +3857,7 @@ OFPT_GET_ASYNC_REPLY (OF1.3):
      ROLE_STATUS: (off)
     TABLE_STATUS: (off)
   REQUESTFORWARD: (off)
+         CLOSURE: (off)
 OFPT_BARRIER_REPLY (OF1.3):
 ])
 
diff --git a/utilities/ovs-ofctl.c b/utilities/ovs-ofctl.c
index 1ad48c3..64482d3 100644
--- a/utilities/ovs-ofctl.c
+++ b/utilities/ovs-ofctl.c
@@ -1617,7 +1617,8 @@ ofctl_unblock(struct unixctl_conn *conn, int argc OVS_UNUSED,
  * Iff 'reply_to_echo_requests' is true, sends a reply to any echo request
  * received on 'vconn'. */
 static void
-monitor_vconn(struct vconn *vconn, bool reply_to_echo_requests)
+monitor_vconn(struct vconn *vconn, bool reply_to_echo_requests,
+              bool resume_closures)
 {
     struct barrier_aux barrier_aux = { vconn, NULL };
     struct unixctl_server *server;
@@ -1645,6 +1646,10 @@ monitor_vconn(struct vconn *vconn, bool reply_to_echo_requests)
 
     daemonize_complete();
 
+    enum ofp_version version = vconn_get_version(vconn);
+    enum ofputil_protocol protocol
+        = ofputil_protocol_from_ofp_version(version);
+
     for (;;) {
         struct ofpbuf *b;
         int retval;
@@ -1690,6 +1695,40 @@ monitor_vconn(struct vconn *vconn, bool reply_to_echo_requests)
                     }
                 }
                 break;
+
+            case OFPTYPE_NXT_CLOSURE:
+                if (resume_closures) {
+                    struct ofpbuf private_properties;
+                    struct ofputil_closure closure;
+
+                    ofpbuf_init(&private_properties, 0);
+                    error = ofputil_decode_closure(b->data, &closure,
+                                                   &private_properties);
+                    if (error) {
+                        fprintf(stderr, "decoding closure failed: %s",
+                                ofperr_to_string(error));
+                    } else {
+                        struct ofpbuf *reply;
+
+                        reply = ofputil_encode_resume(&closure,
+                                                      &private_properties,
+                                                      protocol);
+
+                        fprintf(stderr, "send: ");
+                        ofp_print(stderr, reply->data, reply->size,
+                                  verbosity + 2);
+                        fflush(stderr);
+
+                        retval = vconn_send_block(vconn, reply);
+                        if (retval) {
+                            ovs_fatal(retval, "failed to send NXT_RESUME");
+                        }
+                    }
+
+                    ofpbuf_uninit(&private_properties);
+                    ofputil_closure_destroy(&closure);
+                }
+                break;
             }
             ofpbuf_delete(b);
         }
@@ -1739,7 +1778,8 @@ ofctl_monitor(struct ovs_cmdl_context *ctx)
         }
     }
 
-    open_vconn(ctx->argv[1], &vconn);
+    enum ofputil_protocol protocol = open_vconn(ctx->argv[1], &vconn);
+    bool resume_closures = false;
     for (i = 2; i < ctx->argc; i++) {
         const char *arg = ctx->argv[i];
 
@@ -1766,6 +1806,17 @@ ofctl_monitor(struct ovs_cmdl_context *ctx)
             ofputil_append_flow_monitor_request(&fmr, msg);
             dump_transaction(vconn, msg);
             fflush(stdout);
+        } else if (!strcmp(arg, "resume")) {
+            /* This option is intentionally undocumented because it is meant
+             * only for testing. */
+            resume_closures = true;
+
+            struct ofputil_async_cfg ac = OFPUTIL_ASYNC_CFG_INIT;
+            ac.master[OAM_CLOSURE] = 1;
+            struct ofpbuf *rq = ofputil_encode_set_async_config(
+                &ac, 1u << OAM_CLOSURE,
+                ofputil_protocol_to_ofp_version(protocol));
+            run(vconn_send_block(vconn, rq), "failed to set async config");
         } else {
             ovs_fatal(0, "%s: unsupported \"monitor\" argument", arg);
         }
@@ -1805,7 +1856,7 @@ ofctl_monitor(struct ovs_cmdl_context *ctx)
         }
     }
 
-    monitor_vconn(vconn, true);
+    monitor_vconn(vconn, true, resume_closures);
 }
 
 static void
@@ -1814,7 +1865,7 @@ ofctl_snoop(struct ovs_cmdl_context *ctx)
     struct vconn *vconn;
 
     open_vconn__(ctx->argv[1], SNOOP, &vconn);
-    monitor_vconn(vconn, false);
+    monitor_vconn(vconn, false, false);
 }
 
 static void
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index ce0dbc1..5038681 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -3806,9 +3806,11 @@
           controller: either the packet ``misses'' in the flow table, that is,
           there is no matching flow, or a flow table action says to send the
           packet to the controller.  Open vSwitch limits the rate of each kind
-          of packet separately at the configured rate.  Therefore, the actual
-          rate that packets are sent to the controller can be up to twice the
-          configured rate, when packets are sent for both reasons.
+          of packet separately at the configured rate.  In addition, Open
+          vSwitch has an OpenFlow extension called ``closures'' that are
+          rate-limited the same way.  Therefore, the actual rate that packets
+          are sent to the controller can be up to three times the configured
+          rate, when packets and closures are sent for all three reasons.
         </p>
 
         <p>
@@ -3849,11 +3851,12 @@
             These values report the effects of rate limiting.  Their values are
             relative to establishment of the most recent OpenFlow connection,
             or since rate limiting was enabled, whichever happened more
-            recently.  Each consists of two values, one with <code>TYPE</code>
-            replaced by <code>miss</code> for rate limiting flow table misses,
-            and the other with <code>TYPE</code> replaced by
+            recently.  Each consists of three values, one with
+            <code>TYPE</code> replaced by <code>miss</code> for rate limiting
+            flow table misses, one with <code>TYPE</code> replaced by
             <code>action</code> for rate limiting packets sent by OpenFlow
-            actions.
+            actions, and a third with <code>TYPE</code> replaced by
+            <code>closure</code> for rate limiting closures.
           </p>
 
           <p>
-- 
2.1.3




More information about the dev mailing list