[ovs-dev] [PATCH 01/11] dpif: Pool dpif_recv() calls.

Jarno Rajahalme jarno.rajahalme at nsn.com
Mon Feb 11 14:46:17 UTC 2013


Take ofproto-dpif upcall recv pooling down to the system call interface.

Signed-off-by: Jarno Rajahalme <jarno.rajahalme at nsn.com>
---
 lib/dpif-linux.c       |  136 +++++++++++++++++++++++++++++++-----------------
 lib/dpif-netdev.c      |   24 ++++-----
 lib/dpif-provider.h    |   28 ++++++----
 lib/dpif.c             |   53 ++++++++++++-------
 lib/dpif.h             |    6 ++-
 lib/netlink-socket.c   |   89 +++++++++++++++++++++++++++++++
 lib/netlink-socket.h   |   11 ++++
 ofproto/ofproto-dpif.c |   71 ++++++++++++-------------
 8 files changed, 289 insertions(+), 129 deletions(-)

diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
index b6eba39..a44e4a4 100644
--- a/lib/dpif-linux.c
+++ b/lib/dpif-linux.c
@@ -1271,76 +1271,118 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,
     return 0;
 }
 
+#define RECVMMSG_MAX_BATCH 50
+
 static int
-dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
-                struct ofpbuf *buf)
+dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall upcalls[],
+                struct ofpbuf bufs[], int *n_bufs,
+                void * buf_space, size_t buf_space_size)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-    int read_tries = 0;
+    int read_errors = 0;
+    int n = 0; /* upcall index */
+    struct iovec iov[RECVMMSG_MAX_BATCH];
+    struct mmsghdr mmsg[RECVMMSG_MAX_BATCH];
+    int n_msgs = MIN(*n_bufs, RECVMMSG_MAX_BATCH);
+    size_t buf_size = (buf_space_size / n_msgs) & ~0x7;
+    int error = EAGAIN;
+
+    memset(&mmsg, 0, sizeof mmsg);
+    /* Split the buf_space among the msgs */
+    for (n = 0; n < n_msgs; ++n) {
+        iov[n].iov_base = (char *)buf_space;
+        iov[n].iov_len = buf_size;
+        buf_space = (char *)buf_space + buf_size; /* Space for next buf */
+        mmsg[n].msg_hdr.msg_iov = &iov[n];
+        mmsg[n].msg_hdr.msg_iovlen = 1;
+    }
+
+    n = 0;
 
     if (dpif->epoll_fd < 0) {
        return EAGAIN;
     }
 
-    if (dpif->event_offset >= dpif->n_events) {
-        int retval;
+    while (n < n_msgs) {
+        /* Check if wrap around */
+        if (dpif->event_offset >= dpif->n_events) {
+            int retval;
 
-        dpif->event_offset = dpif->n_events = 0;
+            dpif->event_offset = dpif->n_events = 0;
 
-        do {
-            retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
-                                dpif->uc_array_size, 0);
-        } while (retval < 0 && errno == EINTR);
-        if (retval < 0) {
-            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-            VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno));
-        } else if (retval > 0) {
+            do {
+                retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
+                                    dpif->uc_array_size, 0);
+            } while (retval < 0 && errno == EINTR);
+            if (retval <= 0) {
+                if (retval < 0) {
+                    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+                    VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", strerror(errno));
+                }
+                error = EAGAIN;
+                goto out; /* stop receiving */
+            }
             dpif->n_events = retval;
         }
-    }
 
-    while (dpif->event_offset < dpif->n_events) {
-        int idx = dpif->epoll_events[dpif->event_offset].data.u32;
-        struct dpif_channel *ch = &dpif->channels[idx];
+        while (n < n_msgs && dpif->event_offset < dpif->n_events) {
+            int idx = dpif->epoll_events[dpif->event_offset].data.u32;
+            struct dpif_channel *ch = &dpif->channels[idx];
 
-        dpif->event_offset++;
+            dpif->event_offset++;
 
-        for (;;) {
-            int dp_ifindex;
-            int error;
+            for (;;) {
+                int dp_ifindex;
+                int count = n_msgs - n;
 
-            if (++read_tries > 50) {
-                return EAGAIN;
-            }
+                error = nl_sock_recvm(ch->sock, &mmsg[n], &count, false);
 
-            error = nl_sock_recv(ch->sock, buf, false);
-            if (error == ENOBUFS) {
-                /* ENOBUFS typically means that we've received so many
-                 * packets that the buffer overflowed.  Try again
-                 * immediately because there's almost certainly a packet
-                 * waiting for us. */
-                report_loss(dpif_, ch);
-                continue;
-            }
+                if (error == ENOBUFS) {
+                    /* ENOBUFS typically means that we've received so many
+                     * packets that the buffer overflowed.  Try again
+                     * immediately because there's almost certainly a packet
+                     * waiting for us. */
+                    report_loss(dpif_, ch);
+                    if (++read_errors > 50) {
+                        error = EAGAIN;
+                        goto out;
+                    }
+                    continue;
+                }
 
-            ch->last_poll = time_msec();
-            if (error) {
-                if (error == EAGAIN) {
-                    break;
+                ch->last_poll = time_msec();
+                if (error) {
+                    if (error != EAGAIN && ++read_errors > 50) {
+                        goto out;
+                    }
+                    break; /* Skip this channel */
                 }
-                return error;
-            }
 
-            error = parse_odp_packet(buf, upcall, &dp_ifindex);
-            if (!error && dp_ifindex == dpif->dp_ifindex) {
-                return 0;
-            } else if (error) {
-                return error;
+                count += n; /* One past last index */
+                for (;n < count; ++n) {
+                    /* Init ofpbuf */
+                    ofpbuf_use_stub(&bufs[n], (char *)iov[n].iov_base,
+                                    iov[n].iov_len);
+                    bufs[n].size = mmsg[n].msg_len;
+
+                    error = parse_odp_packet(&bufs[n], &upcalls[n],
+                                             &dp_ifindex);
+                    if (error || dp_ifindex != dpif->dp_ifindex) {
+                        /* Mark upcall invalid */
+                        upcalls[n].type = DPIF_N_UC_TYPES;
+                    }
+                }
+                break;
             }
         }
     }
-
-    return EAGAIN;
+ out:
+    /* See what we got */
+    *n_bufs = n;
+    if (n > 0) {
+        return 0;
+    }
+    return error; /* may be zero */
 }
 
 static void
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d315b59..3fa76d8 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -978,23 +978,23 @@ find_nonempty_queue(struct dpif *dpif)
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
-                 struct ofpbuf *buf)
+dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall upcalls[],
+                 struct ofpbuf bufs[], int *n_bufs,
+                 void * buf_space OVS_UNUSED, size_t buf_space_size OVS_UNUSED)
 {
-    struct dp_netdev_queue *q = find_nonempty_queue(dpif);
-    if (q) {
-        struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
+    int n = 0;
 
-        *upcall = u->upcall;
-        upcall->packet = buf;
+    struct dp_netdev_queue *q;
 
-        ofpbuf_uninit(buf);
-        *buf = u->buf;
+    for (n = 0; n < *n_bufs && (q = find_nonempty_queue(dpif)); ++n) {
+        struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
 
-        return 0;
-    } else {
-        return EAGAIN;
+        upcalls[n] = u->upcall;
+        upcalls[n].packet = &bufs[n];
+        bufs[n] = u->buf;
     }
+    *n_bufs = n;
+    return (n > 0) ? 0 : EAGAIN;
 }
 
 static void
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
index bea822f..c41827e 100644
--- a/lib/dpif-provider.h
+++ b/lib/dpif-provider.h
@@ -25,6 +25,7 @@
 #include "openflow/openflow.h"
 #include "dpif.h"
 #include "util.h"
+#include "ofpbuf.h"
 
 #ifdef  __cplusplus
 extern "C" {
@@ -326,20 +327,27 @@ struct dpif_class {
     int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
                              uint32_t *priority);
 
-    /* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
-     * '*upcall', using 'buf' for storage.  Should only be called if 'recv_set'
-     * has been used to enable receiving packets from 'dpif'.
+    /* Polls for upcalls from 'dpif'.  If successful, stores the upcalls into
+     * 'upcalls[]', initializing 'bufs[]' with 'buf_space' for storage.
+     * Should only be called if 'recv_set' has been used to enable receiving
+     * packets from 'dpif'.
      *
-     * The implementation should point 'upcall->packet' and 'upcall->key' into
-     * data in the caller-provided 'buf'.  If necessary to make room, the
-     * implementation may expand the data in 'buf'.  (This is hardly a great
-     * way to do things but it works out OK for the dpif providers that exist
-     * so far.)
+     * The implementation should initialize the 'bufs[]' needed, preferably
+     * using 'buf_space' of size 'buf_space_size', and point
+     * 'upcalls[].packet' and 'upcalls[].key' into 'data' in the 'bufs[]'.
+     *
+     * Caller gives the size of the 'upcalls' and 'bufs' arrays in '*n_bufs',
+     * which also returns the number of received upcalls.
+     *
+     * If necessary, the implementation can use or allocate additional space
+     * for 'bufs[]'.  It is thus important the caller frees the first '*n_bufs'
+     * (as returned) 'bufs' with ofpbuf_uninit().
      *
      * This function must not block.  If no upcall is pending when it is
      * called, it should return EAGAIN without blocking. */
-    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
-                struct ofpbuf *buf);
+    int (*recv)(struct dpif *dpif, struct dpif_upcall upcalls[],
+                struct ofpbuf bufs[], int *n_bufs,
+                void *buf_space, size_t buf_space_size);
 
     /* Arranges for the poll loop to wake up when 'dpif' has a message queued
      * to be received with the recv member function. */
diff --git a/lib/dpif.c b/lib/dpif.c
index 6aa52d5..fda7924 100644
--- a/lib/dpif.c
+++ b/lib/dpif.c
@@ -1116,37 +1116,50 @@ dpif_recv_set(struct dpif *dpif, bool enable)
     return error;
 }
 
-/* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
- * '*upcall', using 'buf' for storage.  Should only be called if
- * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
+/* Polls for upcalls from 'dpif'.  If successful, stores the upcalls into
+ * 'upcalls[]', using 'bufs[]' for storage preferably within 'buf_space' of
+ * size 'buf_space_size'.  Caller gives the size of the arrays in '*n_bufs',
+ * which also returns the number of received upcalls.
+ * Should only be called if dpif_recv_set() has been used to enable receiving
+ * packets on 'dpif'.
  *
- * 'upcall->packet' and 'upcall->key' point into data in the caller-provided
- * 'buf', so their memory cannot be freed separately from 'buf'.  (This is
- * hardly a great way to do things but it works out OK for the dpif providers
- * and clients that exist so far.)
+ * 'upcalls[].packet' and 'upcalls[].key' point into data in 'bufs[]', so their
+ * memory cannot be freed separately from 'bufs'. The first '*n_bufs' (as
+ * returned) 'bufs' are initialized by the class implementation, and must be
+ * freed with ofpbuf_uninit() by the caller after the 'upcalls' have been
+ * processed.
  *
  * Returns 0 if successful, otherwise a positive errno value.  Returns EAGAIN
  * if no upcall is immediately available. */
 int
-dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)
+dpif_recv(struct dpif *dpif,
+          struct dpif_upcall upcalls[], struct ofpbuf bufs[], int * n_bufs,
+          void *buf_space, size_t buf_space_size)
 {
-    int error = dpif->dpif_class->recv(dpif, upcall, buf);
+    int error = dpif->dpif_class->recv(dpif, upcalls, bufs, n_bufs,
+                                       buf_space, buf_space_size);
+
     if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
-        struct ds flow;
-        char *packet;
+        int i;
 
-        packet = ofp_packet_to_string(upcall->packet->data,
-                                      upcall->packet->size);
+        for (i = 0; i < *n_bufs && !VLOG_DROP_DBG(&dpmsg_rl); ++i) {
+            struct ds flow;
+            char *packet;
 
-        ds_init(&flow);
-        odp_flow_key_format(upcall->key, upcall->key_len, &flow);
+            packet = ofp_packet_to_string(upcalls[i].packet->data,
+                                          upcalls[i].packet->size);
 
-        VLOG_DBG("%s: %s upcall:\n%s\n%s",
-                 dpif_name(dpif), dpif_upcall_type_to_string(upcall->type),
-                 ds_cstr(&flow), packet);
+            ds_init(&flow);
+            odp_flow_key_format(upcalls[i].key, upcalls[i].key_len, &flow);
 
-        ds_destroy(&flow);
-        free(packet);
+            VLOG_DBG("%s: %s upcall:\n%s\n%s",
+                     dpif_name(dpif),
+                     dpif_upcall_type_to_string(upcalls[i].type),
+                     ds_cstr(&flow), packet);
+
+            ds_destroy(&flow);
+            free(packet);
+        }
     } else if (error && error != EAGAIN) {
         log_operation(dpif, "recv", error);
     }
diff --git a/lib/dpif.h b/lib/dpif.h
index c5e3fc8..14e3c64 100644
--- a/lib/dpif.h
+++ b/lib/dpif.h
@@ -327,6 +327,7 @@
 #include "openflow/openflow.h"
 #include "netdev.h"
 #include "util.h"
+#include "ofpbuf.h"
 
 #ifdef  __cplusplus
 extern "C" {
@@ -336,7 +337,6 @@ struct dpif;
 struct ds;
 struct flow;
 struct nlattr;
-struct ofpbuf;
 struct sset;
 struct dpif_class;
 
@@ -557,7 +557,9 @@ struct dpif_upcall {
 };
 
 int dpif_recv_set(struct dpif *, bool enable);
-int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
+int dpif_recv(struct dpif *, struct dpif_upcall upcalls[],
+              struct ofpbuf bufs[], int * n_bufs,
+              void * buf_space, size_t buf_space_size);
 void dpif_recv_purge(struct dpif *);
 void dpif_recv_wait(struct dpif *);
 
diff --git a/lib/netlink-socket.c b/lib/netlink-socket.c
index e6b10a1..361cb87 100644
--- a/lib/netlink-socket.c
+++ b/lib/netlink-socket.c
@@ -420,6 +420,95 @@ nl_sock_recv(struct nl_sock *sock, struct ofpbuf *buf, bool wait)
     return nl_sock_recv__(sock, buf, wait);
 }
 
+static int
+nl_sock_recvm__(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs,
+                bool wait)
+{
+    ssize_t retval;
+
+    int n = 0;
+    retval = EAGAIN;
+    for (n = 0; n < *n_msgs; ++n) {
+        do {
+            retval = recvmsg(sock->fd, &mmsg[n].msg_hdr,
+                             wait ? 0 : MSG_DONTWAIT);
+        } while (retval < 0 && errno == EINTR);
+        if (retval <= 0)
+            break;
+        mmsg[n].msg_len = retval;
+    }
+    if (n > 0) {
+        retval = n;
+    }
+
+    if (retval < 0) {
+        int error = errno;
+        if (error == ENOBUFS) {
+            /* Socket receive buffer overflow dropped one or more messages that
+             * the kernel tried to send to us. */
+            COVERAGE_INC(netlink_overflow);
+        }
+        return error;
+    }
+
+    if (retval > 0) {
+        int i;
+        int n_invalid = 0;
+        int last_good = -1;
+
+        for (i = 0; i < retval; ++i) {
+            if (mmsg[i].msg_hdr.msg_flags & MSG_TRUNC) {
+                VLOG_ERR_RL(&rl, "truncated message (longer than %u bytes)",
+                            mmsg[i].msg_len);
+                mmsg[i].msg_len = 0; /* Mark as invalid */
+                ++n_invalid;
+            } else {
+                struct nlmsghdr *nlmsghdr;
+
+                nlmsghdr = mmsg[i].msg_hdr.msg_iov->iov_base;
+
+                if (mmsg[i].msg_len < sizeof *nlmsghdr
+                    || mmsg[i].msg_hdr.msg_iov->iov_len < sizeof *nlmsghdr
+                    || nlmsghdr->nlmsg_len < sizeof *nlmsghdr
+                    || nlmsghdr->nlmsg_len > mmsg[i].msg_len) {
+                    VLOG_ERR_RL(&rl, "received invalid nlmsg (%u bytes < %zu)",
+                                mmsg[i].msg_len, sizeof *nlmsghdr);
+                    mmsg[i].msg_len = 0;
+                    ++n_invalid;
+                }
+            }
+
+            if (mmsg[i].msg_len > 0) {
+                last_good = i;
+                log_nlmsg(__func__, 0, mmsg[i].msg_hdr.msg_iov->iov_base,
+                          mmsg[i].msg_len, sock->protocol);
+            }
+        }
+        if (n_invalid == retval) {
+            /* We  have nothing */
+            return E2BIG;
+        }
+        /* trim trailing zero-length msgs */
+        retval = last_good + 1;
+    }
+
+    *n_msgs = retval;
+
+    COVERAGE_INC(netlink_received);
+
+    return 0;
+}
+
+int
+nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs, bool wait)
+{
+    int error = nl_sock_cow__(sock);
+    if (error) {
+        return error;
+    }
+    return nl_sock_recvm__(sock, mmsg, n_msgs, wait);
+}
+
 static void
 nl_sock_record_errors__(struct nl_transaction **transactions, size_t n,
                         int error)
diff --git a/lib/netlink-socket.h b/lib/netlink-socket.h
index 78dd7b2..9827f08 100644
--- a/lib/netlink-socket.h
+++ b/lib/netlink-socket.h
@@ -36,6 +36,8 @@
 #include <stddef.h>
 #include <stdint.h>
 #include "ofpbuf.h"
+#include <sys/socket.h>
+#include <linux/version.h>
 
 struct nl_sock;
 
@@ -43,6 +45,13 @@ struct nl_sock;
 #error "netlink-socket.h is only for hosts that support Netlink sockets"
 #endif
 
+#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,32)
+struct mmsghdr {
+    struct msghdr msg_hdr;  /* Message header */
+    unsigned int  msg_len;  /* Number of received bytes for header */
+};
+#endif
+
 /* Netlink sockets. */
 int nl_sock_create(int protocol, struct nl_sock **);
 int nl_sock_clone(const struct nl_sock *, struct nl_sock **);
@@ -55,6 +64,8 @@ int nl_sock_send(struct nl_sock *, const struct ofpbuf *, bool wait);
 int nl_sock_send_seq(struct nl_sock *, const struct ofpbuf *,
                      uint32_t nlmsg_seq, bool wait);
 int nl_sock_recv(struct nl_sock *, struct ofpbuf *, bool wait);
+int nl_sock_recvm(struct nl_sock *sock, struct mmsghdr mmsg[], int *n_msgs,
+                  bool wait);
 int nl_sock_transact(struct nl_sock *, const struct ofpbuf *request,
                      struct ofpbuf **replyp);
 
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 109e57c..4584987 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -3743,8 +3743,14 @@ exit:
     return error;
 }
 
+enum upcall_type { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL };
+
+static enum upcall_type classify_upcall(const struct dpif_upcall *);
+
+static void handle_sflow_upcall(struct dpif_backer *, const struct dpif_upcall *);
+
 static void
-handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
+do_handle_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
                     size_t n_upcalls)
 {
     struct dpif_upcall *upcall;
@@ -3777,6 +3783,19 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
         uint32_t hash;
         int error;
 
+        switch (classify_upcall(upcall)) {
+        case MISS_UPCALL:
+            /* Handle below below. */
+            break;
+
+        case SFLOW_UPCALL:
+            handle_sflow_upcall(backer, upcall);
+            continue;
+
+        case BAD_UPCALL:
+            continue;
+        }
+
         error = ofproto_receive(backer, upcall->packet, upcall->key,
                                 upcall->key_len, &flow, &miss->key_fitness,
                                 &ofproto, &odp_in_port, &miss->initial_tci);
@@ -3867,7 +3886,7 @@ handle_miss_upcalls(struct dpif_backer *backer, struct dpif_upcall *upcalls,
     hmap_destroy(&todo);
 }
 
-static enum { SFLOW_UPCALL, MISS_UPCALL, BAD_UPCALL }
+static enum upcall_type
 classify_upcall(const struct dpif_upcall *upcall)
 {
     union user_action_cookie cookie;
@@ -3925,53 +3944,29 @@ handle_sflow_upcall(struct dpif_backer *backer,
 static int
 handle_upcalls(struct dpif_backer *backer, unsigned int max_batch)
 {
-    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
+    struct dpif_upcall upcalls[FLOW_MISS_MAX_BATCH];
     struct ofpbuf miss_bufs[FLOW_MISS_MAX_BATCH];
     uint64_t miss_buf_stubs[FLOW_MISS_MAX_BATCH][4096 / 8];
-    int n_processed;
-    int n_misses;
+    int n_upcalls = max_batch;
     int i;
+    int error;
 
     ovs_assert(max_batch <= FLOW_MISS_MAX_BATCH);
 
-    n_misses = 0;
-    for (n_processed = 0; n_processed < max_batch; n_processed++) {
-        struct dpif_upcall *upcall = &misses[n_misses];
-        struct ofpbuf *buf = &miss_bufs[n_misses];
-        int error;
-
-        ofpbuf_use_stub(buf, miss_buf_stubs[n_misses],
-                        sizeof miss_buf_stubs[n_misses]);
-        error = dpif_recv(backer->dpif, upcall, buf);
-        if (error) {
-            ofpbuf_uninit(buf);
-            break;
-        }
-
-        switch (classify_upcall(upcall)) {
-        case MISS_UPCALL:
-            /* Handle it later. */
-            n_misses++;
-            break;
-
-        case SFLOW_UPCALL:
-            handle_sflow_upcall(backer, upcall);
-            ofpbuf_uninit(buf);
-            break;
-
-        case BAD_UPCALL:
-            ofpbuf_uninit(buf);
-            break;
-        }
+    error = dpif_recv(backer->dpif, upcalls, miss_bufs, &n_upcalls,
+                      miss_buf_stubs, sizeof(miss_buf_stubs));
+    if (error || n_upcalls == 0) {
+        return 0;
     }
 
-    /* Handle deferred MISS_UPCALL processing. */
-    handle_miss_upcalls(backer, misses, n_misses);
-    for (i = 0; i < n_misses; i++) {
+    /* Handle upcalls processing. */
+    do_handle_upcalls(backer, upcalls, n_upcalls);
+
+    for (i = 0; i < n_upcalls; i++) {
         ofpbuf_uninit(&miss_bufs[i]);
     }
 
-    return n_processed;
+    return n_upcalls;
 }
 
 /* Flow expiration. */
-- 
1.7.10.4




More information about the dev mailing list