[ovs-dev] [PATCH 01/11] dpif: Pool dpif_recv() calls.
Jarno Rajahalme
jarno.rajahalme at nsn.com
Mon Feb 11 14:46:17 UTC 2013
Take ofproto-dpif upcall recv pooling down to the system call interface.
Signed-off-by: Jarno Rajahalme <jarno.rajahalme at nsn.com>
---
lib/dpif-linux.c | 136 +++++++++++++++++++++++++++++++-----------------
lib/dpif-netdev.c | 24 ++++-----
lib/dpif-provider.h | 28 ++++++----
lib/dpif.c | 53 ++++++++++++-------
lib/dpif.h | 6 ++-
lib/netlink-socket.c | 89 +++++++++++++++++++++++++++++++
lib/netlink-socket.h | 11 ++++
ofproto/ofproto-dpif.c | 71 ++++++++++++-------------
8 files changed, 289 insertions(+), 129 deletions(-)
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index b6eba39..a44e4a4 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -1271,76 +1271,118 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,
return 0;
}
+#define RECVMMSG_MAX_BATCH 50
+
static int
-dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
- struct ofpbuf *buf)
+dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall upcalls[],
+ struct ofpbuf bufs[], int *n_bufs,
+ void * buf_space, size_t buf_space_size)
{
struct dpif_linux *dpif = dpif_linux_cast(dpif_);
- int read_tries = 0;
+ int read_errors = 0;
+ int n = 0; /* upcall index */
+ struct iovec iov[RECVMMSG_MAX_BATCH];
+ struct mmsghdr mmsg[RECVMMSG_MAX_BATCH];
+ int n_msgs = MIN(*n_bufs, RECVMMSG_MAX_BATCH);
+ size_t buf_size = (buf_space_size / n_msgs) & ~0x7;
+ int error = EAGAIN;
+
+ memset(&mmsg, 0, sizeof mmsg);
+ /* Split the buf_space among the msgs */
+ for (n = 0; n < n_msgs; ++n) {
+ iov[n].iov_base = (char *)buf_space;
+ iov[n].iov_len = buf_size;
+ buf_space = (char *)buf_space + buf_size; /* Space for next buf */
+ mmsg[n].msg_hdr.msg_iov = &iov[n];
+ mmsg[n].msg_hdr.msg_iovlen = 1;
+ }
+
+ n = 0;
if (dpif->epoll_fd < 0) {
return EAGAIN;
}
- if (dpif->event_offset >= dpif->n_events) {
- int retval;
+ while (n < n_msgs) {
+ /* Check if wrap around */
+ if (dpif->event_offset >= dpif->n_events) {
+ int retval;
- dpif->event_offset = dpif->n_events = 0;
+ dpif->event_offset = dpif->n_events = 0;
- do {
- retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
- dpif->uc_array_size, 0);
- } while (retval < 0 && errno == EINTR);
- if (retval < 0) {
- static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
- VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno));
- } else if (retval > 0) {
+ do {
+ retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
+ dpif->uc_array_size, 0);
+ } while (retval < 0 && errno == EINTR);
+ if (retval <= 0) {
+ if (retval < 0) {
+ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+ VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno));
+ }
+ error = EAGAIN;
+ goto out; /* stop receiving */
+ }
dpif->n_events = retval;
}
- }
- while (dpif->event_offset < dpif->n_events) {
- int idx = dpif->epoll_events[dpif->event_offset].data.u32;
- struct dpif_channel *ch = &dpif->channels[idx];
+ while (n < n_msgs && dpif->event_offset < dpif->n_events) {
+ int idx = dpif->epoll_events[dpif->event_offset].data.u32;
+ struct dpif_channel *ch = &dpif->channels[idx];
- dpif->event_offset++;
+ dpif->event_offset++;
- for (;;) {
- int dp_ifindex;
- int error;
+ for (;;) {
+ int dp_ifindex;
+ int count = n_msgs - n;
- if (++read_tries > 50) {
- return EAGAIN;
- }
+ error = nl_sock_recvm(ch->sock, &mmsg[n], &count, false);
- error = nl_sock_recv(ch->sock, buf, false);
- if (error == ENOBUFS) {
- /* ENOBUFS typically means that we've received so many
- * packets that the buffer overflowed. Try again
- * immediately because there's almost certainly a packet
- * waiting for us. */
- report_loss(dpif_, ch);
- continue;
- }
+ if (error == ENOBUFS) {
+ /* ENOBUFS typically means that we've received so many
+ * packets that the buffer overflowed. Try again
+ * immediately because there's almost certainly a packet
+ * waiting for us. */
+ report_loss(dpif_, ch);
+ if (++read_errors > 50) {
+ error = EAGAIN;
+ goto out;
+ }
+ continue;
+ }
- ch->last_poll = time_msec();
- if (error) {
- if (error == EAGAIN) {
- break;
+ ch->last_poll = time_msec();
+ if (error) {
+ if (error != EAGAIN && ++read_errors > 50) {
+ goto out;
+ }
+ break; /* Skip this channel */
}
- return error;
- }
- error = parse_odp_packet(buf, upcall, &dp_ifindex);
- if (!error && dp_ifindex == dpif->dp_ifindex) {
- return 0;
- } else if (error) {
- return error;
+ count += n; /* One past last index */
+ for (;n < count; ++n) {
+ /* Init ofpbuf */
+ ofpbuf_use_stub(&bufs[n], (char *)iov[n].iov_base,
+ iov[n].iov_len);
+ bufs[n].size = mmsg[n].msg_len;
+
+ error = parse_odp_packet(&bufs[n], &upcalls[n],
+ &dp_ifindex);
+ if (error || dp_ifindex != dpif->dp_ifindex) {
+ /* Mark upcall invalid */
+ upcalls[n].type = DPIF_N_UC_TYPES;
+ }
+ }
+ break;
}
}
}
-
- return EAGAIN;
+ out:
+ /* See what we got */
+ *n_bufs = n;
+ if (n > 0) {
+ return 0;
+ }
+ return error; /* may be zero */
}
static void
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d315b59..3fa76d8 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -978,23 +978,23 @@ find_nonempty_queue(struct dpif *dpif)
}
static int
-dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
- struct ofpbuf *buf)
+dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall upcalls[],
+ struct ofpbuf bufs[], int *n_bufs,
+ void * buf_space OVS_UNUSED, size_t buf_space_size OVS_UNUSED)
{
- struct dp_netdev_queue *q = find_nonempty_queue(dpif);
- if (q) {
- struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
+ int n = 0;
- *upcall = u->upcall;
- upcall->packet = buf;
+ struct dp_netdev_queue *q;
- ofpbuf_uninit(buf);
- *buf = u->buf;
+ for (n = 0; n < *n_bufs && (q = find_nonempty_queue(dpif)); ++n) {
+ struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
- return 0;
- } else {
- return EAGAIN;
+ upcalls[n] = u->upcall;
+ upcalls[n].packet = &bufs[n];
+ bufs[n] = u->buf;
}
+ *n_bufs = n;
+ return (n > 0) ? 0 : EAGAIN;
}
static void
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index bea822f..c41827e 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -25,6 +25,7 @@
#include "openflow/openflow.h"
#include "dpif.h"
#include "util.h"
+#include "ofpbuf.h"
#ifdef __cplusplus
extern "C" {
@@ -326,20 +327,27 @@ struct dpif_class {
int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
uint32_t *priority);
- /* Polls for an upcall from 'dpif'. If successful, stores the upcall into
- * '*upcall', using 'buf' for storage. Should only be called if 'recv_set'
- * has been used to enable receiving packets from 'dpif'.
+ /* Polls for upcalls from 'dpif'. If successful, stores the upcalls into
+ * 'upcalls[]', initializing 'bufs[]' with 'buf_space' for storage.
+ * Should only be called if 'recv_set' has been used to enable receiving
+ * packets from 'dpif'.
*
- * The implementation should point 'upcall->packet' and 'upcall->key' into
- * data in the caller-provided 'buf'. If necessary to make room, the
- * implementation may expand the data in 'buf'. (This is hardly a great
- * way to do things but it works out OK for the dpif providers that exist
- * so far.)
+ * The implementation should initialize the 'bufs[]' needed, preferably
+ * using 'buf_space' of size 'buf_space_size', and point
+ * 'upcalls[].packet' and 'upcalls[].key' into 'data' in the 'bufs[]'.
+ *
+ * Caller gives the size of the 'upcalls' and 'bufs' arrays in '*n_bufs',
+ * which also returns the number of received upcalls.
+ *
+ * If necessary, the implementation can use or allocate additional space
+ * for 'bufs[]'. It is thus important the caller frees the first '*n_bufs'
+ * (as returned) 'bufs' with ofpbuf_uninit().
*
* This function must not block. If no upcall is pending when it is
* called, it should return EAGAIN without blocking. */
- int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
- struct ofpbuf *buf);
+ int (*recv)(struct dpif *dpif, struct dpif_upcall upcalls[],
+ struct ofpbuf bufs[], int *n_bufs,
+ void *buf_space, size_t buf_space_size);
/* Arranges for the poll loop to wake up when 'dpif' has a message queued
* to be received with the recv member function. */
diff --git a/lib/dpif.c b/lib/dpif.c
index 6aa52d5..fda7924 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1116,37 +1116,50 @@ dpif_recv_set(struct dpif *dpif, bool enable)
return error;
}
-/* Polls for an upcall from 'dpif'. If successful, stores the upcall into
- * '*upcall', using 'buf' for storage. Should only be called if
- * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
+/* Polls for upcalls from 'dpif'. If successful, stores the upcalls into
+ * 'upcalls[]', using 'bufs[]' for storage preferably within 'buf_space' of
+ * size 'buf_space_size'. Caller gives the size of the arrays in '*n_bufs',
+ * which also returns the number of received upcalls.
+ * Should only be called if dpif_recv_set() has been used to enable receiving
+ * packets on 'dpif'.
*
- * 'upcall->packet' and 'upcall->key' point into data in the caller-provided
- * 'buf', so their memory cannot be freed separately from 'buf'. (This is
- * hardly a great way to do things but it works out OK for the dpif providers
- * and clients that exist so far.)
+ * 'upcalls[].packet' and 'upcalls[].key' point into data in 'bufs[]', so their
+ * memory cannot be freed separately from 'bufs'. The first '*n_bufs' (as
+ * returned) 'bufs' are initialized by the class implementation, and must be
+ * freed with ofpbuf_uninit() by the caller after the 'upcalls' have been
+ * processed.
*
* Returns 0 if successful, otherwise a positive errno value. Returns EAGAIN
* if no upcall is immediately available. */
int
-dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)
+dpif_recv(struct dpif *dpif,
+ struct dpif_upcall upcalls[], struct ofpbuf bufs[], int * n_bufs,
+ void *buf_space, size_t buf_space_size)
{
- int error = dpif->dpif_class->recv(dpif, upcall, buf);
+ int error = dpif->dpif_class->recv(dpif, upcalls, bufs, n_bufs,
+ buf_space, buf_space_size);
+
if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
- struct ds flow;
- char *packet;
+ int i;
- packet = ofp_packet_to_string(upcall->packet->data,
- upcall->packet->size);
+ for (i = 0; i < *n_bufs && !VLOG_DROP_DBG(&dpmsg_rl); ++i) {
+ struct ds flow;
+ char *packet;
- ds_init(&flow);
- odp_flow_key_format(upcall->key, upcall->key_len, &flow);
+ packet = ofp_packet_to_string(upcalls[i].packet->data,
+ upcalls[i].packet->size);
- VLOG_DBG("%s: %s upcall:\n%s\n%s",
- dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
- ds_cstr(&flow), packet);
+ ds_init(&flow);
+ odp_flow_key_format(upcalls[i].key, upcalls[i].key_len, &flow);
- ds_destroy(&flow);
- free(packet);
+ VLOG_DBG("%s: %s upcall:\n%s\n%s",
+ dpif_name(dpif),
+ dpif_upcall_type_to_string(upcalls[i].type),
+ ds_cstr(&flow), packet);
+
+ ds_destroy(&flow);
+ free(packet);
+ }
} else if (error && error != EAGAIN) {
log_operation(dpif, "recv", error);
}
diff --git a/lib/dpif.h b/lib/dpif.h
index c5e3fc8..14e3c64 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -327,6 +327,7 @@
#include "openflow/openflow.h"
#include "netdev.h"
#include "util.h"
+#include "ofpbuf.h"
#ifdef __cplusplus
extern "C" {
@@ -336,7 +337,6 @@ struct dpif;
struct ds;
struct flow;
struct nlattr;
-struct ofpbuf;
struct sset;
struct dpif_class;
@@ -557,7 +557,9 @@ struct dpif_upcall {
};
int dpif_recv_set(struct dpif *, bool enable);
-int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
+int dpif_recv(struct dpif *, struct dpif_upcall upcalls[],
+ struct ofpbuf bufs[], int * n_bufs,
+ void * buf_space, size_t buf_space_size);
void dpif_recv_purge(struct dpif *);
void dpif_recv_wait(struct dpif *);
diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
index e6b10a1..361cb87 100644
--- a/lib/netlink-socket.c
+++ b/lib/netlink-socket.c
@@ -420,6 +420,95 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf *buf, bool wait)
return nl_sock_recv__(sock, buf, wait);
}
+static int
+nl_sock_recvm__(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs,
+ bool wait)
+{
+ ssize_t retval;
+
+ int n = 0;
+ retval = EAGAIN;
+ for (n = 0; n < *n_msgs; ++n) {
+ do {
+ retval = recvmsg(sock->fd, &mmsg[n].msg_hdr,
+ wait ? 0 : MSG_DONTWAIT);
+ } while (retval < 0 && errno == EINTR);
+ if (retval <= 0)
+ break;
+ mmsg[n].msg_len = retval;
+ }
+ if (n > 0) {
+ retval = n;
+ }
+
+ if (retval < 0) {
+ int error = errno;
+ if (error == ENOBUFS) {
+ /* Socket receive buffer overflow dropped one or more messages that
+ * the kernel tried to send to us. */
+ COVERAGE_INC(netlink_overflow);
+ }
+ return error;
+ }
+
+ if (retval > 0) {
+ int i;
+ int n_invalid = 0;
+ int last_good = -1;
+
+ for (i = 0; i < retval; ++i) {
+ if (mmsg[i].msg_hdr.msg_flags & MSG_TRUNC) {
+ VLOG_ERR_RL(&rl, "truncated message (longer than %u bytes)",
+ mmsg[i].msg_len);
+ mmsg[i].msg_len = 0; /* Mark as invalid */
+ ++n_invalid;
+ } else {
+ struct nlmsghdr *nlmsghdr;
+
+ nlmsghdr = mmsg[i].msg_hdr.msg_iov->iov_base;
+
+ if (mmsg[i].msg_len < sizeof *nlmsghdr
+ || mmsg[i].msg_hdr.msg_iov->iov_len < sizeof *nlmsghdr
+ || nlmsghdr->nlmsg_len < sizeof *nlmsghdr
+ || nlmsghdr->nlmsg_len > mmsg[i].msg_len) {
+ VLOG_ERR_RL(&rl, "received invalid nlmsg (%u bytes < %zu)",
+ mmsg[i].msg_len, sizeof *nlmsghdr);
+ mmsg[i].msg_len = 0;
+ ++n_invalid;
+ }
+ }
+
+ if (mmsg[i].msg_len > 0) {
+ last_good = i;
+ log_nlmsg(__func__, 0, mmsg[i].msg_hdr.msg_iov->iov_base,
+ mmsg[i].msg_len, sock->protocol);
+ }
+ }
+ if (n_invalid == retval) {
+ /* We have nothing */
+ return E2BIG;
+ }
+ /* trim trailing zero-length msgs */
+ retval = last_good + 1;
+ }
+
+ *n_msgs = retval;
+
+ COVERAGE_INC(netlink_received);
+
+ return 0;
+}
+
+int
+nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs, bool wait)
+{
+ int error = nl_sock_cow__(sock);
+ if (error) {
+ return error;
+ }
+ return nl_sock_recvm__(sock, mmsg, n_msgs, wait);
+}
+
static void
nl_sock_record_errors__(struct nl_transaction **transactions, size_t n,
int error)
diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
index 78dd7b2..9827f08 100644
--- a/lib/netlink-socket.h
+++ b/lib/netlink-socket.h
@@ -36,6 +36,8 @@
#include <stddef.h>
#include <stdint.h>
#include "ofpbuf.h"
+#include <sys/socket.h>
+#include <linux/version.h>
struct nl_sock;
@@ -43,6 +45,13 @@ struct nl_sock;
#error "netlink-socket.h is only for hosts that support Netlink sockets"
#endif
+#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,32)
+struct mmsghdr {
+ struct msghdr msg_hdr; /* Message header */
+ unsigned int msg_len; /* Number of received bytes for header */
+};
+#endif
+
/* Netlink sockets. */
int nl_sock_create(int protocol, struct nl_sock **);
int nl_sock_clone(const struct nl_sock *, struct nl_sock **);
@@ -55,6 +64,8 @@ int nl_sock_send(struct nl_sock *, const struct ofpbuf *, bool wait);
int nl_sock_send_seq(struct nl_sock *, const struct ofpbuf *,
uint32_t nlmsg_seq, bool wait);
int nl_sock_recv(struct nl_sock *, struct ofpbuf *, bool wait);
+int nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs,
+ bool wait);
int nl_sock_transact(struct nl_sock *, const struct ofpbuf *request,
struct ofpbuf **replyp);
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 109e57c..4584987 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -3743,8 +3743,14 @@ exit:
return error;
}
+enum upcall_type { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL };
+
+static enum upcall_type classify_upcall(const struct dpif_upcall *);
+
+static void handle_sflow_upcall(struct dpif_backer *, const struct dpif_upcall *);
+
static void
-handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
+do_handle_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
size_t n_upcalls)
{
struct dpif_upcall *upcall;
@@ -3777,6 +3783,19 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
uint32_t hash;
int error;
+ switch (classify_upcall(upcall)) {
+ case MISS_UPCALL:
+ /* Handle below below. */
+ break;
+
+ case SFLOW_UPCALL:
+ handle_sflow_upcall(backer, upcall);
+ continue;
+
+ case BAD_UPCALL:
+ continue;
+ }
+
error = ofproto_receive(backer, upcall->packet, upcall->key,
upcall->key_len, &flow, &miss->key_fitness,
&ofproto, &odp_in_port, &miss->initial_tci);
@@ -3867,7 +3886,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
hmap_destroy(&todo);
}
-static enum { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL }
+static enum upcall_type
classify_upcall(const struct dpif_upcall *upcall)
{
union user_action_cookie cookie;
@@ -3925,53 +3944,29 @@ handle_sflow_upcall(struct dpif_backer *backer,
static int
handle_upcalls(struct dpif_backer *backer, unsigned int max_batch)
{
- struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+ struct dpif_upcall upcalls[FLOW_MISS_MAX_BATCH];
struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH];
uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8];
- int n_processed;
- int n_misses;
+ int n_upcalls = max_batch;
int i;
+ int error;
ovs_assert(max_batch <= FLOW_MISS_MAX_BATCH);
- n_misses = 0;
- for (n_processed = 0; n_processed < max_batch; n_processed++) {
- struct dpif_upcall *upcall = &misses[n_misses];
- struct ofpbuf *buf = &miss_bufs[n_misses];
- int error;
-
- ofpbuf_use_stub(buf, miss_buf_stubs[n_misses],
- sizeof miss_buf_stubs[n_misses]);
- error = dpif_recv(backer->dpif, upcall, buf);
- if (error) {
- ofpbuf_uninit(buf);
- break;
- }
-
- switch (classify_upcall(upcall)) {
- case MISS_UPCALL:
- /* Handle it later. */
- n_misses++;
- break;
-
- case SFLOW_UPCALL:
- handle_sflow_upcall(backer, upcall);
- ofpbuf_uninit(buf);
- break;
-
- case BAD_UPCALL:
- ofpbuf_uninit(buf);
- break;
- }
+ error = dpif_recv(backer->dpif, upcalls, miss_bufs, &n_upcalls,
+ miss_buf_stubs, sizeof(miss_buf_stubs));
+ if (error || n_upcalls == 0) {
+ return 0;
}
- /* Handle deferred MISS_UPCALL processing. */
- handle_miss_upcalls(backer, misses, n_misses);
- for (i = 0; i < n_misses; i++) {
+ /* Handle upcalls processing. */
+ do_handle_upcalls(backer, upcalls, n_upcalls);
+
+ for (i = 0; i < n_upcalls; i++) {
ofpbuf_uninit(&miss_bufs[i]);
}
- return n_processed;
+ return n_upcalls;
}
/* Flow expiration. */
--
1.7.10.4
More information about the dev
mailing list