[ovs-dev] [PATCH/RFC v2 2/4] connmgr: Buffer multipart requests
Simon Horman
horms at verge.net.au
Wed Jun 11 04:34:42 UTC 2014
Buffer a multi-part requests until all its parts are received.
This is achieved by initialising the list_node field of messages
and passing them to ofmp_req_filter().
* If the message is not recognised as part of a multi-part requests it is
simply returned and processing continues as before.
* If the messages part of a multipart request but is not the last message
of that request then it is buffered and ofmp_req_filter() returns NULL
indicating that the message should be skipped for now.
* Otherwise, if the message is the last part of a multipart request then
the first message that is the part of the request is returned and any
subsequent parts are accessible via its list_head field.
Some implementation notes:
* As the list_head field may now contain messages ofpbuf_list_delete
should be used to delete them as necessary.
* This places a limit of OFCONN_MP_REQ_MAX (=1024) on the
number of multipart requests may be buffered.
* This code should have no affect on message handling at this
time as ofpmsg_may_buffer_mp_request() always returns false.
It should be updated along with support for traversing
the list_head field of messages when adding multipart request
support to individual message types.
Signed-off-by: Simon Horman <horms at verge.net.au>
---
v2
* Use ofpmsg_may_buffer_mp_request() to only buffer messages for
which buffering is enabled. Currently there are none.
* Annotate mp_reqs field of struct ofconn with OVS_GUARDED_BY(ofproto_mutex)
* Add locking
* Add limitation on the number of messages that may be buffered
Signed-off-by: Simon Horman <horms at verge.net.au>
wip: buffer limit
Signed-off-by: Simon Horman <horms at verge.net.au>
---
ofproto/connmgr.c | 240 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 240 insertions(+)
diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
index 78a3cab..4424a02 100644
--- a/ofproto/connmgr.c
+++ b/ofproto/connmgr.c
@@ -142,6 +142,12 @@ struct ofconn {
/* Active bundles. Contains "struct ofp_bundle"s. */
struct hmap bundles;
+
+ /* Partial multipart messages. Contains "struct ofmp_req"s. */
+ struct hmap mp_reqs OVS_GUARDED_BY(ofproto_mutex);
+#define OFCONN_MP_REQ_MAX 1024
+ /* Number of buffered partial multipart messages */
+ size_t mp_req_count OVS_GUARDED_BY(ofproto_mutex);
};
static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
@@ -223,6 +229,16 @@ struct connmgr {
int in_band_queue;
};
+/* Buffer for multipart requests */
+struct ofmp_req {
+ struct hmap_node hmap_node; /* In struct ofconn's 'mp_reqs' hmap. */
+ struct ofpbuf *front; /* First message.
+ * Remaining messages are linked via its
+ * list_node */
+};
+
+static void ofmp_req_destroy(struct ofconn *, struct ofmp_req *mp_req);
+
static void update_in_band_remotes(struct connmgr *);
static void add_snooper(struct connmgr *, struct vconn *);
static void ofmonitor_run(struct connmgr *);
@@ -1217,6 +1233,7 @@ ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type,
hmap_init(&ofconn->monitors);
list_init(&ofconn->updates);
+ hmap_init(&ofconn->mp_reqs);
hmap_init(&ofconn->bundles);
@@ -1232,6 +1249,7 @@ ofconn_flush(struct ofconn *ofconn)
OVS_REQUIRES(ofproto_mutex)
{
struct ofmonitor *monitor, *next_monitor;
+ struct ofmp_req *mp_req, *next_mp_req;
int i;
ofconn_log_flow_mods(ofconn);
@@ -1312,6 +1330,9 @@ ofconn_flush(struct ofconn *ofconn)
&ofconn->monitors) {
ofmonitor_destroy(monitor);
}
+ HMAP_FOR_EACH_SAFE (mp_req, next_mp_req, hmap_node, &ofconn->mp_reqs) {
+ ofmp_req_destroy(ofconn, mp_req);
+ }
rconn_packet_counter_destroy(ofconn->monitor_counter);
ofconn->monitor_counter = rconn_packet_counter_create();
ofpbuf_list_delete(&ofconn->updates); /* ...but it should be empty. */
@@ -1330,6 +1351,7 @@ ofconn_destroy(struct ofconn *ofconn)
ofp_bundle_remove_all(ofconn);
hmap_destroy(&ofconn->monitors);
+ hmap_destroy(&ofconn->mp_reqs);
list_remove(&ofconn->node);
rconn_destroy(ofconn->rconn);
rconn_packet_counter_destroy(ofconn->packet_in_counter);
@@ -1372,6 +1394,202 @@ ofconn_may_recv(const struct ofconn *ofconn)
return (!ofconn->blocked || ofconn->retry) && count < OFCONN_REPLY_MAX;
}
+static struct ofmp_req *
+ofmp_req_create(void)
+{
+ struct ofmp_req *mp_req;
+
+ mp_req = xmalloc(sizeof *mp_req);
+ mp_req->front = NULL;
+
+ return mp_req;
+}
+
+/* Destroy a buffered multipart request and all its component messages */
+static void
+ofmp_req_destroy(struct ofconn *ofconn, struct ofmp_req *mp_req)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ if (!mp_req) {
+ return;
+ }
+ if (mp_req->front) {
+ ofconn->mp_req_count -= list_size(&mp_req->front->list_node) + 1;
+ ofpbuf_list_delete(&mp_req->front->list_node);
+ ofpbuf_delete(mp_req->front);
+ }
+ hmap_remove(&ofconn->mp_reqs, &mp_req->hmap_node);
+ free(mp_req);
+}
+
+/* Multi-part requests */
+COVERAGE_DEFINE(mp_req_overflow);
+
+static uint32_t
+ofmp_req_hash(ovs_be32 xid)
+{
+ return hash_int(ntohl(xid), 0);
+}
+
+/* Find the buffer for a multipart message in 'ofconn'
+ * with transaction id 'xid' using 'hash' which is calculated
+ * as ofmpp_req_hash(xid) */
+static struct ofmp_req *
+ofmp_req_find(const struct ofconn *ofconn, ovs_be32 xid, ovs_be32 hash)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct ofmp_req *mp_req;
+
+ HMAP_FOR_EACH_WITH_HASH (mp_req, hmap_node, hash, &ofconn->mp_reqs) {
+ const struct ofp_header *oh = ofpbuf_data(mp_req->front);
+
+ if (xid == oh->xid) {
+ return mp_req;
+ }
+ }
+
+ return NULL;
+}
+
+/* Buffer 'part' as a component of a multipart request 'mp_req' in 'ofconn'
+ * using 'hash' which is calculated as ofmpp_req_hash(xid), where xid
+ * is the transaction id of 'part'.
+ *
+ * If 'mp_req' is NULL then create a new multipart request buffer in
+ * 'ofconn'. */
+static struct ofmp_req *
+ofmp_req_add(struct ofconn *ofconn, struct ofmp_req *mp_req,
+ struct ofpbuf *part, uint32_t hash)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ /* Too many requests! */
+ if (ofconn->mp_req_count >= OFCONN_MP_REQ_MAX) {
+ ofmp_req_destroy(ofconn, mp_req);
+ return NULL;
+ }
+
+ if (!mp_req) {
+ mp_req = ofmp_req_create();
+ hmap_insert(&ofconn->mp_reqs, &mp_req->hmap_node, hash);
+ mp_req->front = part;
+ } else {
+ list_push_back(&mp_req->front->list_node, &part->list_node);
+ }
+
+ ofconn->mp_req_count++;
+
+ return mp_req;
+}
+
+/* Disconnect the component messages from the multipart request buffer
+ * 'mp_req' that belongs to 'ofconn' and free memory associated with
+ * 'mp_req'. Returns the front message of the multipart request.
+ *
+ * This should be called once all components of a buffered multipart
+ * request have been received. */
+static struct ofpbuf *
+ofmp_req_finished(struct ofconn *ofconn, struct ofmp_req *mp_req)
+ OVS_REQUIRES(ofproto_mutex)
+{
+ struct ofpbuf *front;
+
+ front = mp_req->front;
+ mp_req->front = NULL;
+ ofmp_req_destroy(ofconn, mp_req);
+
+ return front;
+}
+
+/* Filter 'msg' received for 'ofconn' and:
+ * - Return '0' if 'msg' is not part of a multipart request, is
+ * part of a multipart request that may not be buffered that may be
+ * buffered, or the last part of a multipart request with only one
+ * component message.
+ *
+ * 'msg' will be updated to point to a message that may be processed
+ * by the caller. This may differer from the value of 'msg'. The
+ * caller should only use the new value after.
+ *
+ * - Return 'OFPROTO_POSTPONE' if 'msg' is part of a multipart request
+ * that may be buffered but is not the last part of the request.
+ *
+ * This indicates to the caller that no further processing should
+ * occur at this time.
+ *
+ * - Otherwise a positive error value is returned. */
+static int
+ofmp_req_filter(struct ofconn *ofconn, struct ofpbuf **msg)
+ OVS_EXCLUDED(ofproto_mutex)
+{
+ const struct ofp_header *oh = ofpbuf_data(*msg);
+ uint32_t hash = ofmp_req_hash(oh->xid);
+ struct ofmp_req *mp_req;
+ enum ofptype type;
+ enum ofperr error;
+ int retval = 0;
+ bool more;
+
+ /* If the message is not a multipart request then
+ * there is nothing to do here. */
+ if (!ofpmsg_is_mp_request(oh)) {
+ goto out;
+ }
+
+ /* If the type could not be decoded there is nothing more to do here,
+ * an error will later be returned by handle_openflow__(). */
+ error = ofptype_decode(&type, oh);
+ if (error) {
+ goto out;
+ }
+
+ /* If the message is not allowed to have multiple parts then
+ * there is nothing to do here. An error will later be returned
+ * by handle_openflow__() if there are multiple parts. */
+ if (!ofpmsg_may_buffer_mp_request(type)) {
+ goto out;
+ }
+
+ more = ofpmp_more(oh);
+
+ ovs_mutex_lock(&ofproto_mutex);
+ /* Find any previous parts of the multipart message */
+ mp_req = ofmp_req_find(ofconn, oh->xid, hash);
+
+ if (!mp_req && !more) {
+ /* Singleton multipart message, it can be processed as-is */
+ goto unlock;
+ }
+
+ /* Add this msg to the previous parts of the multipart message */
+ mp_req = ofmp_req_add(ofconn, mp_req, *msg, hash);
+ if (!mp_req) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+ COVERAGE_INC(mp_req_overflow);
+ VLOG_WARN_RL(&rl, "multipart request buffer overflow");
+
+ retval = OFPERR_OFPBRC_MULTIPART_BUFFER_OVERFLOW;
+ goto unlock;
+ }
+
+ /* More to come: leave the parts stored in its mp_req
+ * and do no further processing for now */
+ if (more) {
+ retval = OFPROTO_POSTPONE;
+ goto unlock;
+ }
+
+ /* The last part.
+ * Return the first part which is linked to all the other parts
+ * via its list_node */
+ *msg = ofmp_req_finished(ofconn, mp_req);
+
+unlock:
+ ovs_mutex_unlock(&ofproto_mutex);
+out:
+ return retval;
+}
+
static void
ofconn_run(struct ofconn *ofconn,
bool (*handle_openflow)(struct ofconn *,
@@ -1392,7 +1610,10 @@ ofconn_run(struct ofconn *ofconn,
if (handle_openflow) {
/* Limit the number of iterations to avoid starving other tasks. */
for (i = 0; i < 50 && ofconn_may_recv(ofconn); i++) {
+ const struct ofp_header *oh;
struct ofpbuf *of_msg;
+ enum ofptype type;
+ int error;
of_msg = (ofconn->blocked
? ofconn->blocked
@@ -1400,11 +1621,30 @@ ofconn_run(struct ofconn *ofconn,
if (!of_msg) {
break;
}
+ list_init(&of_msg->list_node);
+
+ oh = ofpbuf_data(of_msg);
+ if (ofptype_decode(&type, oh)) {
+ ofpbuf_delete(of_msg);
+ ofconn->blocked = NULL;
+ continue;
+ }
+
+ error = ofmp_req_filter(ofconn, &of_msg);
+ if (error) {
+ if (error != OFPROTO_POSTPONE) {
+ ofconn_send_error(ofconn, ofpbuf_data(of_msg), error);
+ ofpbuf_delete(of_msg);
+ ofconn->blocked = NULL;
+ }
+ continue;
+ }
if (mgr->fail_open) {
fail_open_maybe_recover(mgr->fail_open);
}
if (handle_openflow(ofconn, of_msg)) {
+ ofpbuf_list_delete(&of_msg->list_node);
ofpbuf_delete(of_msg);
ofconn->blocked = NULL;
} else {
--
2.0.0.rc2
More information about the dev
mailing list