[ovs-dev] [PATCH/RFC v2 2/4] connmgr: Buffer multipart requests

Simon Horman horms at verge.net.au
Wed Jun 11 04:34:42 UTC 2014


Buffer a multi-part requests until all its parts are received.

This is achieved by initialising the list_node field of messages
and passing them to ofmp_req_filter().

* If the message is not recognised as part of a multi-part requests it is
  simply returned and processing continues as before.

* If the messages part of a multipart request but is not the last message
  of that request then it is buffered and ofmp_req_filter() returns NULL
  indicating that the message should be skipped for now.

* Otherwise, if the message is the last part of a multipart request then
  the first message that is the part of the request is returned and any
  subsequent parts are accessible via its list_head field.

Some implementation notes:

* As the list_head field may now contain messages ofpbuf_list_delete
  should be used to delete them as necessary.

* This places a limit of OFCONN_MP_REQ_MAX (=1024) on the
  number of multipart requests may be buffered.

* This code should have no affect on message handling at this
  time as ofpmsg_may_buffer_mp_request() always returns false.
  It should be updated along with support for traversing
  the list_head field of messages when adding multipart request
  support to individual message types.

Signed-off-by: Simon Horman <horms at verge.net.au>

---
v2
* Use ofpmsg_may_buffer_mp_request() to only buffer messages for
  which buffering is enabled. Currently there are none.
* Annotate mp_reqs field of struct ofconn with OVS_GUARDED_BY(ofproto_mutex)
* Add locking
* Add limitation on the number of messages that may be buffered

Signed-off-by: Simon Horman <horms at verge.net.au>

wip: buffer limit

Signed-off-by: Simon Horman <horms at verge.net.au>
---
 ofproto/connmgr.c | 240 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 240 insertions(+)

diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index 78a3cab..4424a02 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -142,6 +142,12 @@ struct ofconn {
 
     /* Active bundles. Contains "struct ofp_bundle"s. */
     struct hmap bundles;
+
+    /* Partial multipart messages. Contains "struct ofmp_req"s. */
+    struct hmap mp_reqs OVS_GUARDED_BY(ofproto_mutex);
+#define OFCONN_MP_REQ_MAX 1024
+    /* Number of buffered partial multipart messages */
+    size_t mp_req_count OVS_GUARDED_BY(ofproto_mutex);
 };
 
 static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
@@ -223,6 +229,16 @@ struct connmgr {
     int in_band_queue;
 };
 
+/* Buffer for multipart requests */
+struct ofmp_req {
+    struct hmap_node hmap_node;     /* In struct ofconn's 'mp_reqs' hmap. */
+    struct ofpbuf *front;           /* First message.
+                                     * Remaining messages are linked via its
+                                     * list_node */
+};
+
+static void ofmp_req_destroy(struct ofconn *, struct ofmp_req *mp_req);
+
 static void update_in_band_remotes(struct connmgr *);
 static void add_snooper(struct connmgr *, struct vconn *);
 static void ofmonitor_run(struct connmgr *);
@@ -1217,6 +1233,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
 
     hmap_init(&ofconn->monitors);
     list_init(&ofconn->updates);
+    hmap_init(&ofconn->mp_reqs);
 
     hmap_init(&ofconn->bundles);
 
@@ -1232,6 +1249,7 @@ ofconn_flush(struct ofconn *ofconn)
     OVS_REQUIRES(ofproto_mutex)
 {
     struct ofmonitor *monitor, *next_monitor;
+    struct ofmp_req *mp_req, *next_mp_req;
     int i;
 
     ofconn_log_flow_mods(ofconn);
@@ -1312,6 +1330,9 @@ ofconn_flush(struct ofconn *ofconn)
                         &ofconn->monitors) {
         ofmonitor_destroy(monitor);
     }
+    HMAP_FOR_EACH_SAFE (mp_req, next_mp_req, hmap_node, &ofconn->mp_reqs) {
+        ofmp_req_destroy(ofconn, mp_req);
+    }
     rconn_packet_counter_destroy(ofconn->monitor_counter);
     ofconn->monitor_counter = rconn_packet_counter_create();
     ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
@@ -1330,6 +1351,7 @@ ofconn_destroy(struct ofconn *ofconn)
     ofp_bundle_remove_all(ofconn);
 
     hmap_destroy(&ofconn->monitors);
+    hmap_destroy(&ofconn->mp_reqs);
     list_remove(&ofconn->node);
     rconn_destroy(ofconn->rconn);
     rconn_packet_counter_destroy(ofconn->packet_in_counter);
@@ -1372,6 +1394,202 @@ ofconn_may_recv(const struct ofconn *ofconn)
     return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
 }
 
+static struct ofmp_req *
+ofmp_req_create(void)
+{
+    struct ofmp_req *mp_req;
+
+    mp_req = xmalloc(sizeof *mp_req);
+    mp_req->front = NULL;
+
+    return mp_req;
+}
+
+/* Destroy a buffered multipart request and all its component messages */
+static void
+ofmp_req_destroy(struct ofconn *ofconn, struct ofmp_req *mp_req)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    if (!mp_req) {
+        return;
+    }
+    if (mp_req->front) {
+        ofconn->mp_req_count -= list_size(&mp_req->front->list_node) + 1;
+        ofpbuf_list_delete(&mp_req->front->list_node);
+        ofpbuf_delete(mp_req->front);
+    }
+    hmap_remove(&ofconn->mp_reqs, &mp_req->hmap_node);
+    free(mp_req);
+}
+
+/* Multi-part requests */
+COVERAGE_DEFINE(mp_req_overflow);
+
+static uint32_t
+ofmp_req_hash(ovs_be32 xid)
+{
+    return hash_int(ntohl(xid), 0);
+}
+
+/* Find the buffer for a multipart message in 'ofconn'
+ * with transaction id 'xid' using 'hash' which is calculated
+ * as ofmpp_req_hash(xid) */
+static struct ofmp_req *
+ofmp_req_find(const struct ofconn *ofconn, ovs_be32 xid, ovs_be32 hash)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    struct ofmp_req *mp_req;
+
+    HMAP_FOR_EACH_WITH_HASH (mp_req, hmap_node, hash, &ofconn->mp_reqs) {
+        const struct ofp_header *oh = ofpbuf_data(mp_req->front);
+
+        if (xid == oh->xid) {
+            return mp_req;
+        }
+    }
+
+    return NULL;
+}
+
+/* Buffer 'part' as a component of a multipart request 'mp_req' in 'ofconn'
+ * using 'hash' which is calculated as ofmpp_req_hash(xid), where xid
+ * is the transaction id of 'part'.
+ *
+ * If 'mp_req' is NULL then create a new multipart request buffer in
+ * 'ofconn'. */
+static struct ofmp_req *
+ofmp_req_add(struct ofconn *ofconn, struct ofmp_req *mp_req,
+             struct ofpbuf *part, uint32_t hash)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    /* Too many requests! */
+    if (ofconn->mp_req_count >= OFCONN_MP_REQ_MAX) {
+        ofmp_req_destroy(ofconn, mp_req);
+        return NULL;
+    }
+
+    if (!mp_req) {
+        mp_req = ofmp_req_create();
+        hmap_insert(&ofconn->mp_reqs, &mp_req->hmap_node, hash);
+        mp_req->front = part;
+    } else {
+        list_push_back(&mp_req->front->list_node, &part->list_node);
+    }
+
+    ofconn->mp_req_count++;
+
+    return mp_req;
+}
+
+/* Disconnect the component messages from the multipart request buffer
+ * 'mp_req' that belongs to 'ofconn' and free memory associated with
+ * 'mp_req'. Returns the front message of the multipart request.
+ *
+ * This should be called once all components of a buffered multipart
+ * request have been received. */
+static struct ofpbuf *
+ofmp_req_finished(struct ofconn *ofconn, struct ofmp_req *mp_req)
+    OVS_REQUIRES(ofproto_mutex)
+{
+    struct ofpbuf *front;
+
+    front = mp_req->front;
+    mp_req->front = NULL;
+    ofmp_req_destroy(ofconn, mp_req);
+
+    return front;
+}
+
+/* Filter 'msg' received for 'ofconn' and:
+ * - Return '0' if 'msg' is not part of a multipart request, is
+ *   part of a multipart request that may not be buffered that may be
+ *   buffered, or the last part of a multipart request with only one
+ *   component message.
+ *
+ *   'msg' will be updated to point to a message that may be processed
+ *   by the caller. This may differer from the value of 'msg'. The
+ *   caller should only use the new value after.
+ *
+ * - Return 'OFPROTO_POSTPONE' if 'msg' is part of a multipart request
+ *   that may be buffered but is not the last part of the request.
+ *
+ *   This indicates to the caller that no further processing should
+ *   occur at this time.
+ *
+ * - Otherwise a positive error value is returned. */
+static int
+ofmp_req_filter(struct ofconn *ofconn, struct ofpbuf **msg)
+    OVS_EXCLUDED(ofproto_mutex)
+{
+    const struct ofp_header *oh = ofpbuf_data(*msg);
+    uint32_t hash = ofmp_req_hash(oh->xid);
+    struct ofmp_req *mp_req;
+    enum ofptype type;
+    enum ofperr error;
+    int retval = 0;
+    bool more;
+
+    /* If the message is not a multipart request then
+     * there is nothing to do here. */
+    if (!ofpmsg_is_mp_request(oh)) {
+        goto out;
+    }
+
+    /* If the type could not be decoded there is nothing more to do here,
+     * an error will later be returned by handle_openflow__(). */
+    error = ofptype_decode(&type, oh);
+    if (error) {
+        goto out;
+    }
+
+    /* If the message is not allowed to have multiple parts then
+     * there is nothing to do here. An error will later be returned
+     * by handle_openflow__() if there are multiple parts. */
+    if (!ofpmsg_may_buffer_mp_request(type)) {
+        goto out;
+    }
+
+    more = ofpmp_more(oh);
+
+    ovs_mutex_lock(&ofproto_mutex);
+    /* Find any previous parts of the multipart message */
+    mp_req = ofmp_req_find(ofconn, oh->xid, hash);
+
+    if (!mp_req && !more) {
+        /* Singleton multipart message, it can be processed as-is */
+        goto unlock;
+    }
+
+    /* Add this msg to the previous parts of the multipart message */
+    mp_req = ofmp_req_add(ofconn, mp_req, *msg, hash);
+    if (!mp_req) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+        COVERAGE_INC(mp_req_overflow);
+        VLOG_WARN_RL(&rl, "multipart request buffer overflow");
+
+        retval = OFPERR_OFPBRC_MULTIPART_BUFFER_OVERFLOW;
+        goto unlock;
+    }
+
+    /* More to come: leave the parts stored in its mp_req
+     * and do no further processing for now */
+    if (more) {
+        retval = OFPROTO_POSTPONE;
+        goto unlock;
+    }
+
+    /* The last part.
+     * Return the first part which is linked to all the other parts
+     * via its list_node */
+    *msg = ofmp_req_finished(ofconn, mp_req);
+
+unlock:
+    ovs_mutex_unlock(&ofproto_mutex);
+out:
+    return retval;
+}
+
 static void
 ofconn_run(struct ofconn *ofconn,
            bool (*handle_openflow)(struct ofconn *,
@@ -1392,7 +1610,10 @@ ofconn_run(struct ofconn *ofconn,
     if (handle_openflow) {
         /* Limit the number of iterations to avoid starving other tasks. */
         for (i = 0; i < 50 && ofconn_may_recv(ofconn); i++) {
+            const struct ofp_header *oh;
             struct ofpbuf *of_msg;
+            enum ofptype type;
+            int error;
 
             of_msg = (ofconn->blocked
                       ? ofconn->blocked
@@ -1400,11 +1621,30 @@ ofconn_run(struct ofconn *ofconn,
             if (!of_msg) {
                 break;
             }
+            list_init(&of_msg->list_node);
+
+            oh = ofpbuf_data(of_msg);
+            if (ofptype_decode(&type, oh)) {
+                ofpbuf_delete(of_msg);
+                ofconn->blocked = NULL;
+                continue;
+            }
+
+            error = ofmp_req_filter(ofconn, &of_msg);
+            if (error) {
+                if (error != OFPROTO_POSTPONE) {
+                    ofconn_send_error(ofconn, ofpbuf_data(of_msg), error);
+                    ofpbuf_delete(of_msg);
+                    ofconn->blocked = NULL;
+                }
+                continue;
+            }
             if (mgr->fail_open) {
                 fail_open_maybe_recover(mgr->fail_open);
             }
 
             if (handle_openflow(ofconn, of_msg)) {
+                ofpbuf_list_delete(&of_msg->list_node);
                 ofpbuf_delete(of_msg);
                 ofconn->blocked = NULL;
             } else {
-- 
2.0.0.rc2




More information about the dev mailing list