<div dir="ltr">sorry used the wrong --in-reply-to id, will send again, please ignore this one.</div><div class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Feb 18, 2014 at 10:46 AM, Alex Wang <span dir="ltr"><<a href="mailto:alexw@nicira.com" target="_blank">alexw@nicira.com</a>></span> wrote:<br>
<blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex"><div class="">This commit removes the 'dispatcher' thread by allowing 'handler'<br>
threads to read upcalls directly from dpif. vport in dpif will<br>
open netlink sockets for each handler and will use the 5-tuple<br>
hash from the missed packet to choose which socket (handler) to<br>
send the upcall.<br>
<br>
This patch also significantly simplifies the flow miss handling<br>
code and brings slight improvement to flow setup rate.<br>
<br>
Signed-off-by: Alex Wang <<a href="mailto:alexw@nicira.com">alexw@nicira.com</a>><br>
<br>
---<br>
v3->v4:<br>
- remove the incorrect call to rcu_read_lock() in datapath/vport.c.<br>
</div>- fix unused variables, acquire rdlock instead of rwlock for read-only<br>
<div class=""> critical section.<br>
- issues to be discussed:<br>
1. Should we pass in the "struct flow *" and allow dpif to decide<br>
what to hash ?<br>
2. Should we add a new API for setting the n_handlers ? currently,<br>
recv_set() do both upcall enable/disable and set n_handlers.<br>
<br>
v2->v3:<br>
- removes the rcu read-side critical section in ovs_vport_set_upcall_pids().<br>
<br>
PATCH->v2:<br>
- change attribute name back to OVS_VPORT_ATTR_UPCALL_PID for consistency.<br>
<br>
RFC->PATCH<br>
- use XOR to calculate the 5-tuple hash. this fixes the performance<br>
variation issue.<br>
- replace the malloc of 'struct upcall *' in udpif_upcall_handler()<br>
by local 'struct upcall' array.<br>
---<br>
datapath/datapath.c | 22 +-<br>
</div> datapath/vport.c | 127 +++++++++++-<br>
<div class=""> datapath/vport.h | 25 ++-<br>
include/linux/openvswitch.h | 9 +-<br>
lib/dpif-linux.c | 454 +++++++++++++++++++++++------------------<br>
lib/dpif-linux.h | 3 +-<br>
lib/dpif-netdev.c | 9 +-<br>
lib/dpif-provider.h | 26 ++-<br>
lib/dpif.c | 38 ++--<br>
lib/dpif.h | 10 +-<br>
lib/flow.c | 18 ++<br>
lib/flow.h | 3 +-<br>
ofproto/ofproto-dpif-upcall.c | 265 ++++++------------------<br>
ofproto/ofproto-dpif-xlate.c | 5 +-<br>
ofproto/ofproto-dpif.c | 8 +-<br>
</div> 15 files changed, 556 insertions(+), 466 deletions(-)<br>
<br>
diff --git a/datapath/datapath.c b/datapath/datapath.c<br>
index f7c3391..e972c5e 100644<br>
<div class="">--- a/datapath/datapath.c<br>
+++ b/datapath/datapath.c<br>
@@ -242,7 +242,7 @@ void ovs_dp_process_received_packet(struct vport *p, struct sk_buff *skb)<br>
upcall.cmd = OVS_PACKET_CMD_MISS;<br>
upcall.key = &key;<br>
upcall.userdata = NULL;<br>
- upcall.portid = p->upcall_portid;<br>
+ upcall.portid = ovs_vport_find_pid(p, &key);<br>
ovs_dp_upcall(dp, skb, &upcall);<br>
consume_skb(skb);<br>
stats_counter = &stats->n_missed;<br>
</div>@@ -1241,7 +1241,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb, struct genl_info *info)<br>
<div class=""> parms.options = NULL;<br>
parms.dp = dp;<br>
parms.port_no = OVSP_LOCAL;<br>
- parms.upcall_portid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);<br>
+ parms.upcall_pids = a[OVS_DP_ATTR_UPCALL_PID];<br>
<br>
ovs_dp_change(dp, a);<br>
<br>
</div>@@ -1459,7 +1459,7 @@ static const struct nla_policy vport_policy[OVS_VPORT_ATTR_MAX + 1] = {<br>
<div class=""> [OVS_VPORT_ATTR_STATS] = { .len = sizeof(struct ovs_vport_stats) },<br>
[OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },<br>
[OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },<br>
- [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },<br>
+ [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_UNSPEC },<br>
[OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },<br>
};<br>
<br>
</div>@@ -1494,8 +1494,7 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, struct sk_buff *skb,<br>
<div class=""><br>
if (nla_put_u32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no) ||<br>
nla_put_u32(skb, OVS_VPORT_ATTR_TYPE, vport->ops->type) ||<br>
- nla_put_string(skb, OVS_VPORT_ATTR_NAME, vport->ops->get_name(vport)) ||<br>
- nla_put_u32(skb, OVS_VPORT_ATTR_UPCALL_PID, vport->upcall_portid))<br>
+ nla_put_string(skb, OVS_VPORT_ATTR_NAME, vport->ops->get_name(vport)))<br>
goto nla_put_failure;<br>
<br>
ovs_vport_get_stats(vport, &vport_stats);<br>
</div>@@ -1503,6 +1502,9 @@ static int ovs_vport_cmd_fill_info(struct vport *vport, struct sk_buff *skb,<br>
<div class=""> &vport_stats))<br>
goto nla_put_failure;<br>
<br>
+ if (ovs_vport_get_upcall_pids(vport, skb))<br>
+ goto nla_put_failure;<br>
+<br>
err = ovs_vport_get_options(vport, skb);<br>
if (err == -EMSGSIZE)<br>
goto error;<br>
</div>@@ -1579,8 +1581,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct genl_info *info)<br>
<div class=""> int err;<br>
<br>
err = -EINVAL;<br>
- if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE] ||<br>
- !a[OVS_VPORT_ATTR_UPCALL_PID])<br>
+ if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE])<br>
goto exit;<br>
<br>
ovs_lock();<br>
</div>@@ -1617,7 +1618,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb, struct genl_info *info)<br>
<div class=""> parms.options = a[OVS_VPORT_ATTR_OPTIONS];<br>
parms.dp = dp;<br>
parms.port_no = port_no;<br>
- parms.upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);<br>
+ parms.upcall_pids = a[OVS_VPORT_ATTR_UPCALL_PID];<br>
<br>
vport = new_vport(&parms);<br>
err = PTR_ERR(vport);<br>
</div>@@ -1678,8 +1679,9 @@ static int ovs_vport_cmd_set(struct sk_buff *skb, struct genl_info *info)<br>
<div class=""> if (a[OVS_VPORT_ATTR_STATS])<br>
ovs_vport_set_stats(vport, nla_data(a[OVS_VPORT_ATTR_STATS]));<br>
<br>
- if (a[OVS_VPORT_ATTR_UPCALL_PID])<br>
- vport->upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);<br>
+ err = ovs_vport_set_upcall_pids(vport, a[OVS_VPORT_ATTR_UPCALL_PID]);<br>
+ if (err)<br>
+ goto exit_free;<br>
<br>
err = ovs_vport_cmd_fill_info(vport, reply, info->snd_portid,<br>
info->snd_seq, 0, OVS_VPORT_CMD_NEW);<br>
diff --git a/datapath/vport.c b/datapath/vport.c<br>
</div>index 7f12acc..86e04b5 100644<br>
<div class="">--- a/datapath/vport.c<br>
+++ b/datapath/vport.c<br>
@@ -135,10 +135,12 @@ struct vport *ovs_vport_alloc(int priv_size, const struct vport_ops *ops,<br>
<br>
vport->dp = parms->dp;<br>
vport->port_no = parms->port_no;<br>
- vport->upcall_portid = parms->upcall_portid;<br>
vport->ops = ops;<br>
INIT_HLIST_NODE(&vport->dp_hash_node);<br>
<br>
+ if (ovs_vport_set_upcall_pids(vport, parms->upcall_pids))<br>
+ return ERR_PTR(-EINVAL);<br>
+<br>
vport->percpu_stats = alloc_percpu(struct pcpu_tstats);<br>
if (!vport->percpu_stats) {<br>
kfree(vport);<br>
@@ -162,6 +164,7 @@ struct vport *ovs_vport_alloc(int priv_size, const struct vport_ops *ops,<br>
*/<br>
void ovs_vport_free(struct vport *vport)<br>
{<br>
+ ovs_vport_set_upcall_pids(vport, NULL);<br>
free_percpu(vport->percpu_stats);<br>
kfree(vport);<br>
}<br>
</div>@@ -348,6 +351,128 @@ int ovs_vport_get_options(const struct vport *vport, struct sk_buff *skb)<br>
<div><div class="h5"> return 0;<br>
}<br>
<br>
+static void __vport_pids_destroy(struct vport_pids *pids)<br>
+{<br>
+ if (pids->pids)<br>
+ kfree(pids->pids);<br>
+<br>
+ kfree(pids);<br>
+}<br>
+<br>
+static void vport_pids_destroy_rcu_cb(struct rcu_head *rcu)<br>
+{<br>
+ struct vport_pids *pids = container_of(rcu, struct vport_pids, rcu);<br>
+<br>
+ __vport_pids_destroy(pids);<br>
+}<br>
+<br>
+/**<br>
+ * ovs_vport_set_upcall_pids - set upcall pids for sending upcall.<br>
+ *<br>
+ * @vport: vport to modify.<br>
+ * @pids: new configuration.<br>
+ *<br>
+ * If the pids is non-null, sets the vport's upcall_pids pointer. If the<br>
+ * pids is null, frees the vport's upcall_pids.<br>
+ *<br>
+ * Returns 0 if successful, -EINVAL if @pids cannot be parsed as an array<br>
+ * of U32.<br>
+ *<br>
+ * Must be called with ovs_mutex.<br>
+ */<br>
+int ovs_vport_set_upcall_pids(struct vport *vport, struct nlattr *pids)<br>
+{<br>
+ struct vport_pids *old;<br>
+<br>
+ if (pids && nla_len(pids) % sizeof(u32))<br>
+ return -EINVAL;<br>
+<br>
</div></div>+ old = ovsl_dereference(vport->upcall_pids);<br>
<div><div class="h5">+<br>
+ if (pids) {<br>
+ struct vport_pids *vport_pids;<br>
+<br>
+ vport_pids = kmalloc(sizeof *vport_pids, GFP_KERNEL);<br>
+ vport_pids->pids = kmalloc(nla_len(pids), GFP_KERNEL);<br>
+ vport_pids->n_pids = nla_len(pids)<br>
+ / (sizeof *vport_pids->pids);<br>
+ memcpy(vport_pids->pids, nla_data(pids), nla_len(pids));<br>
+<br>
+ rcu_assign_pointer(vport->upcall_pids, vport_pids);<br>
+ } else if (old) {<br>
+ rcu_assign_pointer(vport->upcall_pids, NULL);<br>
+ }<br>
+<br>
+ if (old)<br>
+ call_rcu(&old->rcu, vport_pids_destroy_rcu_cb);<br>
+<br>
+ return 0;<br>
+}<br>
+<br>
+/**<br>
+ * ovs_vport_get_options - get the upcall_pids value.<br>
+ *<br>
+ * @vport: vport from which to retrieve the pids.<br>
+ * @skb: sk_buff where pids should be appended.<br>
+ *<br>
+ * Retrieves the configuration of the given vport, appending the<br>
+ * %OVS_VPORT_ATTR_UPCALL_PID attribute which is the array of upcall<br>
+ * pids to @skb.<br>
+ *<br>
+ * Returns 0 if successful, -EMSGSIZE if @skb has insufficient room.<br>
+ * If an error occurs, @skb is left unmodified.<br>
+ */<br>
+int ovs_vport_get_upcall_pids(const struct vport *vport, struct sk_buff *skb)<br>
+{<br>
+ struct vport_pids *pids;<br>
+ int err = 0;<br>
+<br>
+ rcu_read_lock();<br>
+ pids = ovsl_dereference(vport->upcall_pids);<br>
+<br>
+ if (!pids)<br>
+ goto exit;<br>
+<br>
+ if (nla_put(skb, OVS_VPORT_ATTR_UPCALL_PID,<br>
+ pids->n_pids * sizeof *pids->pids,<br>
+ (void *) pids->pids)) {<br>
+ err = -EMSGSIZE;<br>
+ goto exit;<br>
+ }<br>
+<br>
+exit:<br>
+ rcu_read_unlock();<br>
+ return err;<br>
+}<br>
+<br>
+/**<br>
+ * ovs_vport_find_pid - find the upcall pid to send upcall.<br>
+ *<br>
+ * @vport: vport from which the missed packet is received.<br>
+ * @key: flow keys.<br>
+ *<br>
+ * Calculates the 5-tuple hash from the flow key and finds the upcall pid to<br>
+ * send the upcall to.<br>
+ *<br>
+ * Returns the pid of the target socket. Must be called with rcu_read_lock.<br>
+ */<br>
+u32 ovs_vport_find_pid(const struct vport *p, const struct sw_flow_key *key)<br>
+{<br>
+ struct vport_pids *pids;<br>
+ u32 hash;<br>
+<br>
+ pids = ovsl_dereference(p->upcall_pids);<br>
+<br>
+ if (!pids)<br>
+ return 0;<br>
+<br>
+ hash = key->ipv4.addr.src ^ key->ipv4.addr.dst<br>
+ ^ key->ip.proto ^ key->ipv4.tp.src<br>
+ ^ key->ipv4.tp.dst;<br>
+<br>
+ return pids->pids[jhash((void *) &hash, 4, 0) % pids->n_pids];<br>
+}<br>
+<br>
/**<br>
* ovs_vport_receive - pass up received packet to the datapath for processing<br>
*<br>
diff --git a/datapath/vport.h b/datapath/vport.h<br>
index 18b723e..f11faa9 100644<br>
--- a/datapath/vport.h<br>
+++ b/datapath/vport.h<br>
@@ -50,6 +50,11 @@ void ovs_vport_get_stats(struct vport *, struct ovs_vport_stats *);<br>
int ovs_vport_set_options(struct vport *, struct nlattr *options);<br>
int ovs_vport_get_options(const struct vport *, struct sk_buff *);<br>
<br>
+int ovs_vport_set_upcall_pids(struct vport *, struct nlattr *pids);<br>
+int ovs_vport_get_upcall_pids(const struct vport *, struct sk_buff *);<br>
+<br>
+u32 ovs_vport_find_pid(const struct vport *, const struct sw_flow_key *);<br>
+<br>
int ovs_vport_send(struct vport *, struct sk_buff *);<br>
<br>
/* The following definitions are for implementers of vport devices: */<br>
@@ -60,13 +65,25 @@ struct vport_err_stats {<br>
u64 tx_dropped;<br>
u64 tx_errors;<br>
};<br>
+/**<br>
+ * struct vport_pids - array of netlink pids for a vport.<br>
+ * must be protected by rcu.<br>
+ * @rcu: RCU callback head for deferred destruction.<br>
+ * @n_pids: Size of @upcall_pids array.<br>
+ * @pids: Array storing the Netlink socket pids to use for packets received<br>
+ * on this port that miss the flow table.<br>
+ */<br>
+struct vport_pids {<br>
+ struct rcu_head rcu;<br>
+ u32 n_pids;<br>
+ u32 *pids;<br>
+};<br>
<br>
/**<br>
* struct vport - one port within a datapath<br>
* @rcu: RCU callback head for deferred destruction.<br>
* @dp: Datapath to which this port belongs.<br>
- * @upcall_portid: The Netlink port to use for packets received on this port that<br>
- * miss the flow table.<br>
+ * @upcall_pids: RCU protected vport_pids array.<br>
* @port_no: Index into @dp's @ports array.<br>
* @hash_node: Element in @dev_table hash table in vport.c.<br>
* @dp_hash_node: Element in @datapath->ports hash table in datapath.c.<br>
@@ -80,7 +97,7 @@ struct vport_err_stats {<br>
struct vport {<br>
struct rcu_head rcu;<br>
struct datapath *dp;<br>
- u32 upcall_portid;<br>
+ struct vport_pids __rcu *upcall_pids;<br>
u16 port_no;<br>
<br>
struct hlist_node hash_node;<br>
@@ -112,7 +129,7 @@ struct vport_parms {<br>
/* For ovs_vport_alloc(). */<br>
struct datapath *dp;<br>
u16 port_no;<br>
- u32 upcall_portid;<br>
+ struct nlattr *upcall_pids;<br>
};<br>
<br>
/**<br>
diff --git a/include/linux/openvswitch.h b/include/linux/openvswitch.h<br>
index d1ff5ec..9d21326 100644<br>
--- a/include/linux/openvswitch.h<br>
+++ b/include/linux/openvswitch.h<br>
@@ -225,9 +225,9 @@ enum ovs_vport_type {<br>
* this is the name of the network device. Maximum length %IFNAMSIZ-1 bytes<br>
* plus a null terminator.<br>
* @OVS_VPORT_ATTR_OPTIONS: Vport-specific configuration information.<br>
- * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that<br>
- * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on<br>
- * this port. A value of zero indicates that upcalls should not be sent.<br>
+ * @OVS_VPORT_ATTR_UPCALL_PID: The array of Netlink socket pids in userspace<br>
+ * that OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on<br>
+ * this port. If this is not specified, upcalls should not be sent.<br>
* @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for<br>
* packets sent or received through the vport.<br>
*<br>
@@ -251,7 +251,8 @@ enum ovs_vport_attr {<br>
OVS_VPORT_ATTR_TYPE, /* u32 OVS_VPORT_TYPE_* constant. */<br>
OVS_VPORT_ATTR_NAME, /* string name, up to IFNAMSIZ bytes long */<br>
OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type */<br>
- OVS_VPORT_ATTR_UPCALL_PID, /* u32 Netlink PID to receive upcalls */<br>
+ OVS_VPORT_ATTR_UPCALL_PID, /* array of u32 Netlink socket PIDs for */<br>
+ /* receiving upcalls */<br>
OVS_VPORT_ATTR_STATS, /* struct ovs_vport_stats */<br>
__OVS_VPORT_ATTR_MAX<br>
};<br>
diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c<br>
index f7f5292..4d35e15 100644<br>
--- a/lib/dpif-linux.c<br>
+++ b/lib/dpif-linux.c<br>
@@ -35,6 +35,7 @@<br>
#include "bitmap.h"<br>
#include "dpif-provider.h"<br>
#include "dynamic-string.h"<br>
+#include "fat-rwlock.h"<br>
#include "flow.h"<br>
#include "netdev.h"<br>
#include "netdev-linux.h"<br>
@@ -132,7 +133,15 @@ struct dpif_channel {<br>
long long int last_poll; /* Last time this channel was polled. */<br>
};<br>
<br>
-static void report_loss(struct dpif *, struct dpif_channel *);<br>
+static void report_loss(struct dpif *, struct dpif_channel *, uint32_t ch_idx,<br>
+ uint32_t handler_id);<br>
+<br>
+struct dpif_epoll {<br>
+ struct epoll_event *epoll_events;<br>
+ int epoll_fd; /* epoll fd that includes channel socks. */<br>
+ int n_events; /* Num events returned by epoll_wait(). */<br>
+ int event_offset; /* Offset into 'epoll_events'. */<br>
+};<br>
<br>
/* Datapath interface for the openvswitch Linux kernel module. */<br>
struct dpif_linux {<br>
@@ -140,13 +149,11 @@ struct dpif_linux {<br>
int dp_ifindex;<br>
<br>
/* Upcall messages. */<br>
- struct ovs_mutex upcall_lock;<br>
+ struct fat_rwlock upcall_lock;<br>
int uc_array_size; /* Size of 'channels' and 'epoll_events'. */<br>
- struct dpif_channel *channels;<br>
- struct epoll_event *epoll_events;<br>
- int epoll_fd; /* epoll fd that includes channel socks. */<br>
- int n_events; /* Num events returned by epoll_wait(). */<br>
- int event_offset; /* Offset into 'epoll_events'. */<br>
+ struct dpif_epoll *epolls;<br>
+ struct dpif_channel **channels;/* Array of channel arrays for each vport. */<br>
+ uint32_t n_handlers; /* Num of upcall receivers (handlers). */<br>
<br>
/* Change notification. */<br>
struct nl_sock *port_notifier; /* vport multicast group subscriber. */<br>
@@ -171,8 +178,8 @@ static unsigned int ovs_vport_mcgroup;<br>
static int dpif_linux_init(void);<br>
static int open_dpif(const struct dpif_linux_dp *, struct dpif **);<br>
static uint32_t dpif_linux_port_get_pid(const struct dpif *,<br>
- odp_port_t port_no);<br>
-static int dpif_linux_refresh_channels(struct dpif *);<br>
+ odp_port_t port_no, uint32_t hash);<br>
+static int dpif_linux_refresh_channels(struct dpif *, uint32_t n_handlers);<br>
<br>
static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *,<br>
struct ofpbuf *);<br>
@@ -252,8 +259,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)<br>
<br>
dpif = xzalloc(sizeof *dpif);<br>
dpif->port_notifier = NULL;<br>
- ovs_mutex_init(&dpif->upcall_lock);<br>
- dpif->epoll_fd = -1;<br>
+ fat_rwlock_init(&dpif->upcall_lock);<br>
<br>
dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,<br>
dp->dp_ifindex, dp->dp_ifindex);<br>
@@ -265,63 +271,42 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif **dpifp)<br>
}<br>
<br>
static void<br>
-destroy_channels(struct dpif_linux *dpif)<br>
+channels_to_pids(struct dpif_channel *ch, uint32_t len, uint32_t **upcall_pids)<br>
{<br>
- unsigned int i;<br>
+ size_t i;<br>
+ uint32_t *pids;<br>
<br>
- if (dpif->epoll_fd < 0) {<br>
+ if (!upcall_pids) {<br>
return;<br>
}<br>
<br>
- for (i = 0; i < dpif->uc_array_size; i++ ) {<br>
- struct dpif_linux_vport vport_request;<br>
- struct dpif_channel *ch = &dpif->channels[i];<br>
- uint32_t upcall_pid = 0;<br>
-<br>
- if (!ch->sock) {<br>
- continue;<br>
- }<br>
-<br>
- epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);<br>
-<br>
- /* Turn off upcalls. */<br>
- dpif_linux_vport_init(&vport_request);<br>
- vport_request.cmd = OVS_VPORT_CMD_SET;<br>
- vport_request.dp_ifindex = dpif->dp_ifindex;<br>
- vport_request.port_no = u32_to_odp(i);<br>
- vport_request.upcall_pid = &upcall_pid;<br>
- dpif_linux_vport_transact(&vport_request, NULL, NULL);<br>
-<br>
- nl_sock_destroy(ch->sock);<br>
+ pids = xmalloc(len * sizeof *pids);<br>
+ for (i = 0; i < len; i++) {<br>
+ pids[i] = nl_sock_pid(ch[i].sock);<br>
}<br>
-<br>
- free(dpif->channels);<br>
- dpif->channels = NULL;<br>
- dpif->uc_array_size = 0;<br>
-<br>
- free(dpif->epoll_events);<br>
- dpif->epoll_events = NULL;<br>
- dpif->n_events = dpif->event_offset = 0;<br>
-<br>
- /* Don't close dpif->epoll_fd since that would cause other threads that<br>
- * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed fd. */<br>
+ *upcall_pids = pids;<br>
}<br>
<br>
+/* Adds 'dpif->n_handlers' channels to vport. If upcall_pids is non-NULL,<br>
+ * makes it point to the array of channel socket pids. */<br>
static int<br>
-add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)<br>
+add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no,<br>
+ uint32_t **upcall_pids)<br>
{<br>
struct epoll_event event;<br>
+ struct dpif_epoll *epolls = dpif->epolls;<br>
uint32_t port_idx = odp_to_u32(port_no);<br>
+ int error = 0;<br>
+ size_t i, j;<br>
<br>
- if (dpif->epoll_fd < 0) {<br>
+ if (epolls == NULL) {<br>
return 0;<br>
}<br>
<br>
/* We assume that the datapath densely chooses port numbers, which<br>
- * can therefore be used as an index into an array of channels. */<br>
+ * can therefore be used as an index into dpif->channels. */<br>
if (port_idx >= dpif->uc_array_size) {<br>
uint32_t new_size = port_idx + 1;<br>
- uint32_t i;<br>
<br>
if (new_size > MAX_PORTS) {<br>
VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",<br>
@@ -332,49 +317,116 @@ add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)<br>
dpif->channels = xrealloc(dpif->channels,<br>
new_size * sizeof *dpif->channels);<br>
for (i = dpif->uc_array_size; i < new_size; i++) {<br>
- dpif->channels[i].sock = NULL;<br>
+ dpif->channels[i] = NULL;<br>
+ }<br>
+ for (i = 0; i < dpif->n_handlers; i++) {<br>
+ epolls[i].epoll_events = xrealloc(epolls[i].epoll_events,<br>
+ new_size * sizeof<br>
+ *epolls[i].epoll_events);<br>
}<br>
-<br>
- dpif->epoll_events = xrealloc(dpif->epoll_events,<br>
- new_size * sizeof *dpif->epoll_events);<br>
dpif->uc_array_size = new_size;<br>
}<br>
<br>
memset(&event, 0, sizeof event);<br>
event.events = EPOLLIN;<br>
event.data.u32 = port_idx;<br>
- if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),<br>
- &event) < 0) {<br>
- return errno;<br>
- }<br>
<br>
- nl_sock_destroy(dpif->channels[port_idx].sock);<br>
- dpif->channels[port_idx].sock = sock;<br>
- dpif->channels[port_idx].last_poll = LLONG_MIN;<br>
+ /* Creates channel for each upcall handler. */<br>
+ dpif->channels[port_idx] = xzalloc(dpif->n_handlers<br>
+ * sizeof *dpif->channels[port_idx]);<br>
+ for (i = 0; i < dpif->n_handlers; i++) {<br>
+ struct nl_sock *sock = NULL;<br>
+<br>
+ error = nl_sock_create(NETLINK_GENERIC, &sock);<br>
+ if (error) {<br>
+ goto error;<br>
+ }<br>
+<br>
+ if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),<br>
+ &event) < 0) {<br>
+ error = errno;<br>
+ goto error;<br>
+ }<br>
+ dpif->channels[port_idx][i].sock = sock;<br>
+ dpif->channels[port_idx][i].last_poll = LLONG_MIN;<br>
+ }<br>
+ channels_to_pids(dpif->channels[port_idx], dpif->n_handlers, upcall_pids);<br>
<br>
return 0;<br>
+<br>
+error:<br>
+ /* Cleans up the already created channel and socks. */<br>
+ for (j = 0; j <= i; j++) {<br>
+ nl_sock_destroy(dpif->channels[port_idx][j].sock);<br>
+ }<br>
+ free(dpif->channels[port_idx]);<br>
+ dpif->channels[port_idx] = NULL;<br>
+<br>
+ return error;<br>
}<br>
<br>
static void<br>
-del_channel(struct dpif_linux *dpif, odp_port_t port_no)<br>
+del_vport_channels(struct dpif_linux *dpif, odp_port_t port_no)<br>
{<br>
struct dpif_channel *ch;<br>
uint32_t port_idx = odp_to_u32(port_no);<br>
+ size_t i;<br>
+<br>
+ if (!dpif->epolls || port_idx >= dpif->uc_array_size) {<br>
+ return;<br>
+ }<br>
<br>
- if (dpif->epoll_fd < 0 || port_idx >= dpif->uc_array_size) {<br>
+ ch = dpif->channels[port_idx];<br>
+ if (!ch) {<br>
return;<br>
}<br>
<br>
- ch = &dpif->channels[port_idx];<br>
- if (!ch->sock) {<br>
+ for (i = 0; i < dpif->n_handlers; i++) {<br>
+ epoll_ctl(dpif->epolls[i].epoll_fd, EPOLL_CTL_DEL,<br>
+ nl_sock_fd(ch[i].sock), NULL);<br>
+ nl_sock_destroy(ch[i].sock);<br>
+ dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;<br>
+ }<br>
+ free(ch);<br>
+ dpif->channels[port_idx] = NULL;<br>
+}<br>
+<br>
+static void<br>
+destroy_all_channels(struct dpif_linux *dpif)<br>
+{<br>
+ unsigned int i;<br>
+<br>
+ if (!dpif->epolls) {<br>
return;<br>
}<br>
<br>
- epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);<br>
- dpif->event_offset = dpif->n_events = 0;<br>
+ for (i = 0; i < dpif->uc_array_size; i++ ) {<br>
+ struct dpif_linux_vport vport_request;<br>
+ struct dpif_channel *ch = dpif->channels[i];<br>
+<br>
+ if (!ch) {<br>
+ continue;<br>
+ }<br>
+<br>
+ /* Turn off upcalls. */<br>
+ dpif_linux_vport_init(&vport_request);<br>
+ vport_request.cmd = OVS_VPORT_CMD_SET;<br>
+ vport_request.dp_ifindex = dpif->dp_ifindex;<br>
+ vport_request.port_no = u32_to_odp(i);<br>
+ vport_request.upcall_pids = NULL;<br>
+ dpif_linux_vport_transact(&vport_request, NULL, NULL);<br>
+<br>
+ del_vport_channels(dpif, u32_to_odp(i));<br>
+ }<br>
<br>
- nl_sock_destroy(ch->sock);<br>
- ch->sock = NULL;<br>
+ free(dpif->channels);<br>
+ dpif->channels = NULL;<br>
+ dpif->uc_array_size = 0;<br>
+<br>
+ free(dpif->epolls);<br>
+<br>
+ /* Don't close dpif->epoll_fd since that would cause other threads that<br>
+ * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed fd. */<br>
}<br>
<br>
static void<br>
@@ -383,11 +435,8 @@ dpif_linux_close(struct dpif *dpif_)<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
<br>
nl_sock_destroy(dpif->port_notifier);<br>
- destroy_channels(dpif);<br>
- if (dpif->epoll_fd >= 0) {<br>
- close(dpif->epoll_fd);<br>
- }<br>
- ovs_mutex_destroy(&dpif->upcall_lock);<br>
+ destroy_all_channels(dpif);<br>
+ fat_rwlock_destroy(&dpif->upcall_lock);<br>
free(dpif);<br>
}<br>
<br>
@@ -409,7 +458,7 @@ dpif_linux_run(struct dpif *dpif_)<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
if (dpif->refresh_channels) {<br>
dpif->refresh_channels = false;<br>
- dpif_linux_refresh_channels(dpif_);<br>
+ dpif_linux_refresh_channels(dpif_, dpif->n_handlers);<br>
}<br>
}<br>
<br>
@@ -500,20 +549,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,<br>
namebuf, sizeof namebuf);<br>
const char *type = netdev_get_type(netdev);<br>
struct dpif_linux_vport request, reply;<br>
- struct nl_sock *sock = NULL;<br>
- uint32_t upcall_pid;<br>
struct ofpbuf *buf;<br>
uint64_t options_stub[64 / 8];<br>
struct ofpbuf options;<br>
+ uint32_t *upcall_pids = NULL;<br>
int error;<br>
<br>
- if (dpif->epoll_fd >= 0) {<br>
- error = nl_sock_create(NETLINK_GENERIC, &sock);<br>
- if (error) {<br>
- return error;<br>
- }<br>
- }<br>
-<br>
dpif_linux_vport_init(&request);<br>
request.cmd = OVS_VPORT_CMD_NEW;<br>
request.dp_ifindex = dpif->dp_ifindex;<br>
@@ -522,7 +563,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,<br>
VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it has "<br>
"unsupported type `%s'",<br>
dpif_name(dpif_), name, type);<br>
- nl_sock_destroy(sock);<br>
return EINVAL;<br>
}<br>
<a href="http://request.name" target="_blank">request.name</a> = name;<br>
@@ -541,41 +581,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,<br>
}<br>
<br>
request.port_no = *port_nop;<br>
- upcall_pid = sock ? nl_sock_pid(sock) : 0;<br>
- request.upcall_pid = &upcall_pid;<br>
+ request.upcall_pids = NULL;<br>
<br>
error = dpif_linux_vport_transact(&request, &reply, &buf);<br>
if (!error) {<br>
*port_nop = reply.port_no;<br>
- VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,<br>
- dpif_name(dpif_), reply.port_no, upcall_pid);<br>
} else {<br>
if (error == EBUSY && *port_nop != ODPP_NONE) {<br>
VLOG_INFO("%s: requested port %"PRIu32" is in use",<br>
dpif_name(dpif_), *port_nop);<br>
}<br>
- nl_sock_destroy(sock);<br>
ofpbuf_delete(buf);<br>
return error;<br>
}<br>
ofpbuf_delete(buf);<br>
<br>
- if (sock) {<br>
- error = add_channel(dpif, *port_nop, sock);<br>
- if (error) {<br>
- VLOG_INFO("%s: could not add channel for port %s",<br>
- dpif_name(dpif_), name);<br>
+ if (add_vport_channels(dpif, *port_nop, &upcall_pids)) {<br>
+ VLOG_INFO("%s: could not add channel for port %s",<br>
+ dpif_name(dpif_), name);<br>
<br>
- /* Delete the port. */<br>
- dpif_linux_vport_init(&request);<br>
- request.cmd = OVS_VPORT_CMD_DEL;<br>
- request.dp_ifindex = dpif->dp_ifindex;<br>
- request.port_no = *port_nop;<br>
- dpif_linux_vport_transact(&request, NULL, NULL);<br>
+ /* Delete the port. */<br>
+ dpif_linux_vport_init(&request);<br>
+ request.cmd = OVS_VPORT_CMD_DEL;<br>
+ request.dp_ifindex = dpif->dp_ifindex;<br>
+ request.port_no = *port_nop;<br>
+ dpif_linux_vport_transact(&request, NULL, NULL);<br>
<br>
- nl_sock_destroy(sock);<br>
- return error;<br>
- }<br>
+ return error;<br>
+ } else {<br>
+ dpif_linux_vport_init(&request);<br>
+ request.cmd = OVS_VPORT_CMD_SET;<br>
+ request.dp_ifindex = dpif->dp_ifindex;<br>
+ request.port_no = *port_nop;<br>
+ request.n_pids = dpif->n_handlers;<br>
+ request.upcall_pids = upcall_pids;<br>
+ dpif_linux_vport_transact(&request, NULL, NULL);<br>
+ free(upcall_pids);<br>
}<br>
<br>
return 0;<br>
@@ -588,9 +629,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
int error;<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
+ fat_rwlock_wrlock(&dpif->upcall_lock);<br>
error = dpif_linux_port_add__(dpif_, netdev, port_nop);<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
<br>
return error;<br>
}<br>
@@ -608,7 +649,7 @@ dpif_linux_port_del__(struct dpif *dpif_, odp_port_t port_no)<br>
vport.port_no = port_no;<br>
error = dpif_linux_vport_transact(&vport, NULL, NULL);<br>
<br>
- del_channel(dpif, port_no);<br>
+ del_vport_channels(dpif, port_no);<br>
<br>
return error;<br>
}<br>
@@ -619,9 +660,9 @@ dpif_linux_port_del(struct dpif *dpif_, odp_port_t port_no)<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
int error;<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
+ fat_rwlock_wrlock(&dpif->upcall_lock);<br>
error = dpif_linux_port_del__(dpif_, port_no);<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
<br>
return error;<br>
}<br>
@@ -672,21 +713,26 @@ dpif_linux_port_query_by_name(const struct dpif *dpif, const char *devname,<br>
}<br>
<br>
static uint32_t<br>
-dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)<br>
+dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,<br>
+ uint32_t hash)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
uint32_t port_idx = odp_to_u32(port_no);<br>
uint32_t pid = 0;<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
- if (dpif->epoll_fd >= 0) {<br>
+ fat_rwlock_rdlock(&dpif->upcall_lock);<br>
+ if (dpif->epolls) {<br>
/* The ODPP_NONE "reserved" port number uses the "ovs-system"'s<br>
* channel, since it is not heavily loaded. */<br>
uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;<br>
- const struct nl_sock *sock = dpif->channels[idx].sock;<br>
- pid = sock ? nl_sock_pid(sock) : 0;<br>
+ const struct dpif_channel *ch = dpif->channels[idx];<br>
+<br>
+ if (ch) {<br>
+ pid = ch[hash % dpif->n_handlers].sock<br>
+ ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0;<br>
+ }<br>
}<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
<br>
return pid;<br>
}<br>
@@ -1274,10 +1320,10 @@ dpif_linux_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)<br>
}<br>
<br>
/* Synchronizes 'dpif->channels' with the set of vports currently in 'dpif' in<br>
- * the kernel, by adding a new channel for any kernel vport that lacks one and<br>
- * deleting any channels that have no backing kernel vports. */<br>
+ * the kernel, by adding a new set of channels for any kernel vport that lacks<br>
+ * one and deleting any channels that have no backing kernel vports. */<br>
static int<br>
-dpif_linux_refresh_channels(struct dpif *dpif_)<br>
+dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
unsigned long int *keep_channels;<br>
@@ -1287,52 +1333,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_)<br>
int retval = 0;<br>
size_t i;<br>
<br>
- /* To start with, we need an epoll fd. */<br>
- if (dpif->epoll_fd < 0) {<br>
- dpif->epoll_fd = epoll_create(10);<br>
- if (dpif->epoll_fd < 0) {<br>
- return errno;<br>
+ if (dpif->n_handlers != n_handlers) {<br>
+ destroy_all_channels(dpif);<br>
+ dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls);<br>
+<br>
+ for (i = 0; i < n_handlers; i++) {<br>
+ dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ?<br>
+ dpif->uc_array_size : 10);<br>
+ if (dpif->epolls[i].epoll_fd < 0) {<br>
+ return errno;<br>
+ }<br>
}<br>
+ dpif->n_handlers = n_handlers;<br>
+ }<br>
+<br>
+ for (i = 0; i < n_handlers; i++) {<br>
+ dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;<br>
}<br>
<br>
keep_channels_nbits = dpif->uc_array_size;<br>
keep_channels = bitmap_allocate(keep_channels_nbits);<br>
<br>
- dpif->n_events = dpif->event_offset = 0;<br>
-<br>
dpif_linux_port_dump_start__(dpif_, &dump);<br>
while (!dpif_linux_port_dump_next__(dpif_, &dump, &vport)) {<br>
uint32_t port_no = odp_to_u32(vport.port_no);<br>
- struct nl_sock *sock = (port_no < dpif->uc_array_size<br>
- ? dpif->channels[port_no].sock<br>
- : NULL);<br>
- bool new_sock = !sock;<br>
+ struct dpif_channel *ch = (port_no < dpif->uc_array_size<br>
+ ? dpif->channels[port_no]<br>
+ : NULL);<br>
+ uint32_t *upcall_pids = NULL;<br>
int error;<br>
<br>
- if (new_sock) {<br>
- error = nl_sock_create(NETLINK_GENERIC, &sock);<br>
+ if (!ch) {<br>
+ error = add_vport_channels(dpif, vport.port_no, &upcall_pids);<br>
if (error) {<br>
+ VLOG_INFO("%s: could not add channels for port %s",<br>
+ dpif_name(dpif_), <a href="http://vport.name" target="_blank">vport.name</a>);<br>
retval = error;<br>
goto error;<br>
}<br>
+ } else {<br>
+ channels_to_pids(ch, dpif->n_handlers, &upcall_pids);<br>
}<br>
<br>
/* Configure the vport to deliver misses to 'sock'. */<br>
- if (!vport.upcall_pid || *vport.upcall_pid != nl_sock_pid(sock)) {<br>
- uint32_t upcall_pid = nl_sock_pid(sock);<br>
+ if (!vport.upcall_pids<br>
+ || vport.n_pids != dpif->n_handlers<br>
+ || memcmp(upcall_pids, vport.upcall_pids, n_handlers * sizeof<br>
+ *upcall_pids)) {<br>
struct dpif_linux_vport vport_request;<br>
<br>
dpif_linux_vport_init(&vport_request);<br>
vport_request.cmd = OVS_VPORT_CMD_SET;<br>
vport_request.dp_ifindex = dpif->dp_ifindex;<br>
vport_request.port_no = vport.port_no;<br>
- vport_request.upcall_pid = &upcall_pid;<br>
+ vport_request.n_pids = dpif->n_handlers;<br>
+ vport_request.upcall_pids = upcall_pids;<br>
error = dpif_linux_vport_transact(&vport_request, NULL, NULL);<br>
- if (!error) {<br>
- VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,<br>
- dpif_name(&dpif->dpif), vport_request.port_no,<br>
- upcall_pid);<br>
- } else {<br>
+ if (error) {<br>
VLOG_WARN_RL(&error_rl,<br>
"%s: failed to set upcall pid on port: %s",<br>
dpif_name(&dpif->dpif), ovs_strerror(error));<br>
@@ -1348,31 +1405,22 @@ dpif_linux_refresh_channels(struct dpif *dpif_)<br>
}<br>
}<br>
<br>
- if (new_sock) {<br>
- error = add_channel(dpif, vport.port_no, sock);<br>
- if (error) {<br>
- VLOG_INFO("%s: could not add channel for port %s",<br>
- dpif_name(dpif_), <a href="http://vport.name" target="_blank">vport.name</a>);<br>
- retval = error;<br>
- goto error;<br>
- }<br>
- }<br>
-<br>
if (port_no < keep_channels_nbits) {<br>
bitmap_set1(keep_channels, port_no);<br>
}<br>
continue;<br>
<br>
error:<br>
- nl_sock_destroy(sock);<br>
+ del_vport_channels(dpif, vport.port_no);<br>
}<br>
nl_dump_done(&dump);<br>
<br>
/* Discard any saved channels that we didn't reuse. */<br>
for (i = 0; i < keep_channels_nbits; i++) {<br>
if (!bitmap_is_set(keep_channels, i)) {<br>
- nl_sock_destroy(dpif->channels[i].sock);<br>
- dpif->channels[i].sock = NULL;<br>
+ del_vport_channels(dpif, u32_to_odp(i));<br>
+ free(dpif->channels[i]);<br>
+ dpif->channels[i] = NULL;<br>
}<br>
}<br>
free(keep_channels);<br>
@@ -1381,29 +1429,32 @@ dpif_linux_refresh_channels(struct dpif *dpif_)<br>
}<br>
<br>
static int<br>
-dpif_linux_recv_set__(struct dpif *dpif_, bool enable)<br>
+dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t n_handlers)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
<br>
- if ((dpif->epoll_fd >= 0) == enable) {<br>
+ if ((dpif->epolls != NULL) == enable) {<br>
+ if (enable && dpif->n_handlers != n_handlers) {<br>
+ dpif_linux_refresh_channels(dpif_, n_handlers);<br>
+ }<br>
return 0;<br>
} else if (!enable) {<br>
- destroy_channels(dpif);<br>
+ destroy_all_channels(dpif);<br>
return 0;<br>
} else {<br>
- return dpif_linux_refresh_channels(dpif_);<br>
+ return dpif_linux_refresh_channels(dpif_, n_handlers);<br>
}<br>
}<br>
<br>
static int<br>
-dpif_linux_recv_set(struct dpif *dpif_, bool enable)<br>
+dpif_linux_recv_set(struct dpif *dpif_, bool enable, uint32_t n_handlers)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
int error;<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
- error = dpif_linux_recv_set__(dpif_, enable);<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_wrlock(&dpif->upcall_lock);<br>
+ error = dpif_linux_recv_set__(dpif_, enable, n_handlers);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
<br>
return error;<br>
}<br>
@@ -1482,38 +1533,39 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,<br>
}<br>
<br>
static int<br>
-dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,<br>
- struct ofpbuf *buf)<br>
+dpif_linux_recv__(struct dpif *dpif_, uint32_t handler_id,<br>
+ struct dpif_upcall *upcall, struct ofpbuf *buf)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
+ struct dpif_epoll *epoll = dpif->epolls ? &dpif->epolls[handler_id] : NULL;<br>
int read_tries = 0;<br>
<br>
- if (dpif->epoll_fd < 0) {<br>
- return EAGAIN;<br>
+ if (!epoll) {<br>
+ return EAGAIN;<br>
}<br>
<br>
- if (dpif->event_offset >= dpif->n_events) {<br>
+ if (epoll->event_offset >= epoll->n_events) {<br>
int retval;<br>
<br>
- dpif->event_offset = dpif->n_events = 0;<br>
+ epoll->event_offset = epoll->n_events = 0;<br>
<br>
do {<br>
- retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,<br>
+ retval = epoll_wait(epoll->epoll_fd, epoll->epoll_events,<br>
dpif->uc_array_size, 0);<br>
} while (retval < 0 && errno == EINTR);<br>
if (retval < 0) {<br>
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);<br>
VLOG_WARN_RL(&rl, "epoll_wait failed (%s)", ovs_strerror(errno));<br>
} else if (retval > 0) {<br>
- dpif->n_events = retval;<br>
+ epoll->n_events = retval;<br>
}<br>
}<br>
<br>
- while (dpif->event_offset < dpif->n_events) {<br>
- int idx = dpif->epoll_events[dpif->event_offset].data.u32;<br>
- struct dpif_channel *ch = &dpif->channels[idx];<br>
+ while (epoll->event_offset < epoll->n_events) {<br>
+ int idx = epoll->epoll_events[epoll->event_offset].data.u32;<br>
+ struct dpif_channel *ch = &dpif->channels[idx][handler_id];<br>
<br>
- dpif->event_offset++;<br>
+ epoll->event_offset++;<br>
<br>
for (;;) {<br>
int dp_ifindex;<br>
@@ -1529,7 +1581,7 @@ dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,<br>
* packets that the buffer overflowed. Try again<br>
* immediately because there's almost certainly a packet<br>
* waiting for us. */<br>
- report_loss(dpif_, ch);<br>
+ report_loss(dpif_, ch, idx, handler_id);<br>
continue;<br>
}<br>
<br>
@@ -1554,29 +1606,29 @@ dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,<br>
}<br>
<br>
static int<br>
-dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,<br>
- struct ofpbuf *buf)<br>
+dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id,<br>
+ struct dpif_upcall *upcall, struct ofpbuf *buf)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
int error;<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
- error = dpif_linux_recv__(dpif_, upcall, buf);<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_rdlock(&dpif->upcall_lock);<br>
+ error = dpif_linux_recv__(dpif_, handler_id, upcall, buf);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
<br>
return error;<br>
}<br>
<br>
static void<br>
-dpif_linux_recv_wait(struct dpif *dpif_)<br>
+dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
- if (dpif->epoll_fd >= 0) {<br>
- poll_fd_wait(dpif->epoll_fd, POLLIN);<br>
+ fat_rwlock_rdlock(&dpif->upcall_lock);<br>
+ if (dpif->epolls) {<br>
+ poll_fd_wait(dpif->epolls[handler_id].epoll_fd, POLLIN);<br>
}<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
}<br>
<br>
static void<br>
@@ -1584,18 +1636,19 @@ dpif_linux_recv_purge(struct dpif *dpif_)<br>
{<br>
struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
<br>
- ovs_mutex_lock(&dpif->upcall_lock);<br>
- if (dpif->epoll_fd >= 0) {<br>
- struct dpif_channel *ch;<br>
+ fat_rwlock_rdlock(&dpif->upcall_lock);<br>
+ if (dpif->epolls) {<br>
+ size_t i, j;<br>
<br>
- for (ch = dpif->channels; ch < &dpif->channels[dpif->uc_array_size];<br>
- ch++) {<br>
- if (ch->sock) {<br>
- nl_sock_drain(ch->sock);<br>
+ for (i = 0; i < dpif->uc_array_size; i++ ) {<br>
+ if (dpif->channels[i]) {<br>
+ for (j = 0; j < dpif->n_handlers; j++) {<br>
+ nl_sock_drain(dpif->channels[i][j].sock);<br>
+ }<br>
}<br>
}<br>
}<br>
- ovs_mutex_unlock(&dpif->upcall_lock);<br>
+ fat_rwlock_unlock(&dpif->upcall_lock);<br>
}<br>
<br>
const struct dpif_class dpif_linux_class = {<br>
@@ -1701,7 +1754,7 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,<br>
[OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 },<br>
[OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 },<br>
[OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len = IFNAMSIZ },<br>
- [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 },<br>
+ [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_UNSPEC, .optional = true },<br>
[OVS_VPORT_ATTR_STATS] = { NL_POLICY_FOR(struct ovs_vport_stats),<br>
.optional = true },<br>
[OVS_VPORT_ATTR_OPTIONS] = { .type = NL_A_NESTED, .optional = true },<br>
@@ -1732,8 +1785,11 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *vport,<br>
vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]);<br>
vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]);<br>
if (a[OVS_VPORT_ATTR_UPCALL_PID]) {<br>
- vport->upcall_pid = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]);<br>
+ vport->n_pids = nl_attr_get_size(a[OVS_VPORT_ATTR_UPCALL_PID])<br>
+ / (sizeof *vport->upcall_pids);<br>
+ vport->upcall_pids = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]);<br>
}<br>
+<br>
if (a[OVS_VPORT_ATTR_STATS]) {<br>
vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]);<br>
}<br>
@@ -1770,8 +1826,10 @@ dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *vport,<br>
nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name);<br>
}<br>
<br>
- if (vport->upcall_pid) {<br>
- nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID, *vport->upcall_pid);<br>
+ if (vport->upcall_pids) {<br>
+ nl_msg_put_unspec(buf, OVS_VPORT_ATTR_UPCALL_PID,<br>
+ vport->upcall_pids,<br>
+ vport->n_pids * sizeof *vport->upcall_pids);<br>
}<br>
<br>
if (vport->stats) {<br>
@@ -2176,9 +2234,9 @@ dpif_linux_flow_get_stats(const struct dpif_linux_flow *flow,<br>
/* Logs information about a packet that was recently lost in 'ch' (in<br>
* 'dpif_'). */<br>
static void<br>
-report_loss(struct dpif *dpif_, struct dpif_channel *ch)<br>
+report_loss(struct dpif *dpif_, struct dpif_channel *ch, uint32_t ch_idx,<br>
+ uint32_t handler_id)<br>
{<br>
- struct dpif_linux *dpif = dpif_linux_cast(dpif_);<br>
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);<br>
struct ds s;<br>
<br>
@@ -2192,7 +2250,7 @@ report_loss(struct dpif *dpif_, struct dpif_channel *ch)<br>
time_msec() - ch->last_poll);<br>
}<br>
<br>
- VLOG_WARN("%s: lost packet on channel %"PRIdPTR"%s",<br>
- dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s));<br>
+ VLOG_WARN("%s: lost packet on channel %u of handler %u", dpif_name(dpif_),<br>
+ ch_idx, handler_id);<br>
ds_destroy(&s);<br>
}<br>
diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h<br>
index ec94ccf..d174e0e 100644<br>
--- a/lib/dpif-linux.h<br>
+++ b/lib/dpif-linux.h<br>
@@ -32,6 +32,7 @@ struct dpif_linux_vport {<br>
<br>
/* ovs_vport header. */<br>
int dp_ifindex;<br>
+ uint32_t n_pids;<br>
odp_port_t port_no; /* ODPP_NONE if unknown. */<br>
enum ovs_vport_type type;<br>
<br>
@@ -41,7 +42,7 @@ struct dpif_linux_vport {<br>
* 32-bit boundaries, so use get_unaligned_u64() to access its values.<br>
*/<br>
const char *name; /* OVS_VPORT_ATTR_NAME. */<br>
- const uint32_t *upcall_pid; /* OVS_VPORT_ATTR_UPCALL_PID. */<br>
+ const uint32_t *upcall_pids; /* OVS_VPORT_ATTR_UPCALL_PID. */<br>
const struct ovs_vport_stats *stats; /* OVS_VPORT_ATTR_STATS. */<br>
const struct nlattr *options; /* OVS_VPORT_ATTR_OPTIONS. */<br>
size_t options_len;<br>
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c<br>
index 73eb99d..9033fba 100644<br>
--- a/lib/dpif-netdev.c<br>
+++ b/lib/dpif-netdev.c<br>
@@ -1429,7 +1429,8 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)<br>
}<br>
<br>
static int<br>
-dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)<br>
+dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED,<br>
+ uint32_t n_handlers OVS_UNUSED)<br>
{<br>
return 0;<br>
}<br>
@@ -1458,8 +1459,8 @@ find_nonempty_queue(struct dp_netdev *dp)<br>
}<br>
<br>
static int<br>
-dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,<br>
- struct ofpbuf *buf)<br>
+dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id OVS_UNUSED,<br>
+ struct dpif_upcall *upcall, struct ofpbuf *buf)<br>
{<br>
struct dp_netdev *dp = get_dp_netdev(dpif);<br>
struct dp_netdev_queue *q;<br>
@@ -1485,7 +1486,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,<br>
}<br>
<br>
static void<br>
-dpif_netdev_recv_wait(struct dpif *dpif)<br>
+dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)<br>
{<br>
struct dp_netdev *dp = get_dp_netdev(dpif);<br>
uint64_t seq;<br>
diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h<br>
index adc5242..7ef10bb 100644<br>
--- a/lib/dpif-provider.h<br>
+++ b/lib/dpif-provider.h<br>
@@ -146,7 +146,8 @@ struct dpif_class {<br>
<br>
/* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE<br>
* actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in<br>
- * flows whose packets arrived on port 'port_no'.<br>
+ * flows whose packets arrived on port 'port_no' and 5-tuple hash is<br>
+ * 'hash'.<br>
*<br>
* A 'port_no' of UINT32_MAX should be treated as a special case. The<br>
* implementation should return a reserved PID, not allocated to any port,<br>
@@ -158,7 +159,8 @@ struct dpif_class {<br>
*<br>
* A dpif provider that doesn't have meaningful Netlink PIDs can use NULL<br>
* for this function. This is equivalent to always returning 0. */<br>
- uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no);<br>
+ uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no,<br>
+ uint32_t hash);<br>
<br>
/* Attempts to begin dumping the ports in a dpif. On success, returns 0<br>
* and initializes '*statep' with any data needed for iteration. On<br>
@@ -324,16 +326,17 @@ struct dpif_class {<br>
* Turning packet receive off and then back on is allowed to change Netlink<br>
* PID assignments (see ->port_get_pid()). The client is responsible for<br>
* updating flows as necessary if it does this. */<br>
- int (*recv_set)(struct dpif *dpif, bool enable);<br>
+ int (*recv_set)(struct dpif *dpif, bool enable, uint32_t n_handlers);<br>
<br>
/* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a<br>
* priority value used for setting packet priority. */<br>
int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,<br>
uint32_t *priority);<br>
<br>
- /* Polls for an upcall from 'dpif'. If successful, stores the upcall into<br>
- * '*upcall', using 'buf' for storage. Should only be called if 'recv_set'<br>
- * has been used to enable receiving packets from 'dpif'.<br>
+ /* Polls for an upcall from 'dpif' for handler with 'handler_id'. If<br>
+ * successful, stores the upcall into '*upcall', using 'buf' for storage.<br>
+ * Should only be called if 'recv_set' has been used to enable receiving<br>
+ * packets from 'dpif'.<br>
*<br>
* The implementation should point 'upcall->key' and 'upcall->userdata'<br>
* (if any) into data in the caller-provided 'buf'. The implementation may<br>
@@ -349,12 +352,13 @@ struct dpif_class {<br>
*<br>
* This function must not block. If no upcall is pending when it is<br>
* called, it should return EAGAIN without blocking. */<br>
- int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,<br>
- struct ofpbuf *buf);<br>
+ int (*recv)(struct dpif *dpif, uint32_t handler_id,<br>
+ struct dpif_upcall *upcall, struct ofpbuf *buf);<br>
<br>
- /* Arranges for the poll loop to wake up when 'dpif' has a message queued<br>
- * to be received with the recv member function. */<br>
- void (*recv_wait)(struct dpif *dpif);<br>
+ /* Arranges for the poll loop for handler with 'handler_id' to wake up when<br>
+ * 'dpif' has a message queued to be received with the recv member<br>
+ * function by the handler. */<br>
+ void (*recv_wait)(struct dpif *dpif, uint32_t handler_id);<br>
<br>
/* Throws away any queued upcalls that 'dpif' currently has ready to<br>
* return. */<br>
diff --git a/lib/dpif.c b/lib/dpif.c<br>
index 2b79a6e..72897b3 100644<br>
--- a/lib/dpif.c<br>
+++ b/lib/dpif.c<br>
@@ -634,7 +634,7 @@ dpif_port_query_by_name(const struct dpif *dpif, const char *devname,<br>
<br>
/* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE actions<br>
* as the OVS_USERSPACE_ATTR_PID attribute's value, for use in flows whose<br>
- * packets arrived on port 'port_no'.<br>
+ * packets arrived on port 'port_no' and 5-tuple hash is 'hash'.<br>
*<br>
* A 'port_no' of ODPP_NONE is a special case: it returns a reserved PID, not<br>
* allocated to any port, that the client may use for special purposes.<br>
@@ -645,10 +645,10 @@ dpif_port_query_by_name(const struct dpif *dpif, const char *devname,<br>
* update all of the flows that it installed that contain<br>
* OVS_ACTION_ATTR_USERSPACE actions. */<br>
uint32_t<br>
-dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no)<br>
+dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no, uint32_t hash)<br>
{<br>
return (dpif->dpif_class->port_get_pid<br>
- ? (dpif->dpif_class->port_get_pid)(dpif, port_no)<br>
+ ? (dpif->dpif_class->port_get_pid)(dpif, port_no, hash)<br>
: 0);<br>
}<br>
<br>
@@ -1248,24 +1248,18 @@ dpif_upcall_type_to_string(enum dpif_upcall_type type)<br>
}<br>
}<br>
<br>
-/* Enables or disables receiving packets with dpif_recv() on 'dpif'. Returns 0<br>
- * if successful, otherwise a positive errno value.<br>
- *<br>
- * Turning packet receive off and then back on may change the Netlink PID<br>
- * assignments returned by dpif_port_get_pid(). If the client does this, it<br>
- * must update all of the flows that have OVS_ACTION_ATTR_USERSPACE actions<br>
- * using the new PID assignment. */<br>
int<br>
-dpif_recv_set(struct dpif *dpif, bool enable)<br>
+dpif_recv_set(struct dpif *dpif, bool enable, uint32_t n_handlers)<br>
{<br>
- int error = dpif->dpif_class->recv_set(dpif, enable);<br>
+ int error = dpif->dpif_class->recv_set(dpif, enable, n_handlers);<br>
log_operation(dpif, "recv_set", error);<br>
return error;<br>
}<br>
<br>
-/* Polls for an upcall from 'dpif'. If successful, stores the upcall into<br>
- * '*upcall', using 'buf' for storage. Should only be called if<br>
- * dpif_recv_set() has been used to enable receiving packets on 'dpif'.<br>
+/* Polls for an upcall from 'dpif' for handler with 'handler_id'. If<br>
+ * successful, stores the upcall into '*upcall', using 'buf' for storage.<br>
+ * Should only be called if dpif_recv_set() has been used to enable receiving<br>
+ * packets on 'dpif'.<br>
*<br>
* 'upcall->key' and 'upcall->userdata' point into data in the caller-provided<br>
* 'buf', so their memory cannot be freed separately from 'buf'.<br>
@@ -1280,9 +1274,10 @@ dpif_recv_set(struct dpif *dpif, bool enable)<br>
* Returns 0 if successful, otherwise a positive errno value. Returns EAGAIN<br>
* if no upcall is immediately available. */<br>
int<br>
-dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf)<br>
+dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall *upcall,<br>
+ struct ofpbuf *buf)<br>
{<br>
- int error = dpif->dpif_class->recv(dpif, upcall, buf);<br>
+ int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);<br>
if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {<br>
struct ds flow;<br>
char *packet;<br>
@@ -1316,12 +1311,13 @@ dpif_recv_purge(struct dpif *dpif)<br>
}<br>
}<br>
<br>
-/* Arranges for the poll loop to wake up when 'dpif' has a message queued to be<br>
- * received with dpif_recv(). */<br>
+/* Arranges for the poll loop for handler with 'handler_id' to wake up when<br>
+ * 'dpif' has a message queued to be received with dpif_recv() by the handler.<br>
+ */<br>
void<br>
-dpif_recv_wait(struct dpif *dpif)<br>
+dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)<br>
{<br>
- dpif->dpif_class->recv_wait(dpif);<br>
+ dpif->dpif_class->recv_wait(dpif, handler_id);<br>
}<br>
<br>
/* Obtains the NetFlow engine type and engine ID for 'dpif' into '*engine_type'<br>
diff --git a/lib/dpif.h b/lib/dpif.h<br>
index 7f986f9..b6e3fbd 100644<br>
--- a/lib/dpif.h<br>
+++ b/lib/dpif.h<br>
@@ -445,7 +445,8 @@ int dpif_port_query_by_name(const struct dpif *, const char *devname,<br>
struct dpif_port *);<br>
int dpif_port_get_name(struct dpif *, odp_port_t port_no,<br>
char *name, size_t name_size);<br>
-uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no);<br>
+uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no,<br>
+ uint32_t hash);<br>
<br>
struct dpif_port_dump {<br>
const struct dpif *dpif;<br>
@@ -613,10 +614,11 @@ struct dpif_upcall {<br>
struct nlattr *userdata; /* Argument to OVS_ACTION_ATTR_USERSPACE. */<br>
};<br>
<br>
-int dpif_recv_set(struct dpif *, bool enable);<br>
-int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);<br>
+int dpif_recv_set(struct dpif *, bool enable, uint32_t n_handlers);<br>
+int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,<br>
+ struct ofpbuf *);<br>
void dpif_recv_purge(struct dpif *);<br>
-void dpif_recv_wait(struct dpif *);<br>
+void dpif_recv_wait(struct dpif *, uint32_t handler_id);<br>
<br>
/* Miscellaneous. */<br>
<br>
diff --git a/lib/flow.c b/lib/flow.c<br>
index e7fe4d3..7a36940 100644<br>
--- a/lib/flow.c<br>
+++ b/lib/flow.c<br>
@@ -822,6 +822,24 @@ flow_wildcards_set_reg_mask(struct flow_wildcards *wc, int idx, uint32_t mask)<br>
wc->masks.regs[idx] = mask;<br>
}<br>
<br>
+/* Calculates the 5-tuple hash from the given flow. */<br>
+uint32_t<br>
+flow_hash_5tuple(const struct flow *flow, uint32_t basis)<br>
+{<br>
+ uint32_t hash;<br>
+<br>
+ if (!flow) {<br>
+ return 0;<br>
+ }<br>
+<br>
+ hash = (OVS_FORCE int) flow->nw_src<br>
+ ^ (OVS_FORCE int) flow->nw_dst<br>
+ ^ flow->nw_proto ^ (OVS_FORCE int) flow->tp_src<br>
+ ^ (OVS_FORCE int) flow->tp_dst;<br>
+<br>
+ return jhash_bytes((void *) &hash, sizeof hash, basis);<br>
+}<br>
+<br>
/* Hashes 'flow' based on its L2 through L4 protocol information. */<br>
uint32_t<br>
flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis)<br>
diff --git a/lib/flow.h b/lib/flow.h<br>
index 3109a84..26871a2 100644<br>
--- a/lib/flow.h<br>
+++ b/lib/flow.h<br>
@@ -323,7 +323,8 @@ void flow_wildcards_fold_minimask_range(struct flow_wildcards *,<br>
uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t basis);<br>
bool flow_wildcards_equal(const struct flow_wildcards *,<br>
const struct flow_wildcards *);<br>
-uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis);<br>
+uint32_t flow_hash_5tuple(const struct flow *, uint32_t basis);<br>
+uint32_t flow_hash_symmetric_l4(const struct flow *, uint32_t basis);<br>
<br>
/* Initialize a flow with random fields that matter for nx_hash_fields. */<br>
void flow_random_hash_fields(struct flow *);<br>
diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c<br>
index e0a5aed..1a4c111 100644<br>
--- a/ofproto/ofproto-dpif-upcall.c<br>
+++ b/ofproto/ofproto-dpif-upcall.c<br>
@@ -45,26 +45,11 @@<br>
<br>
VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);<br>
<br>
-COVERAGE_DEFINE(upcall_queue_overflow);<br>
-<br>
-/* A thread that processes each upcall handed to it by the dispatcher thread,<br>
- * forwards the upcall's packet, and possibly sets up a kernel flow as a<br>
- * cache. */<br>
struct handler {<br>
struct udpif *udpif; /* Parent udpif. */<br>
pthread_t thread; /* Thread ID. */<br>
char *name; /* Thread name. */<br>
-<br>
- struct ovs_mutex mutex; /* Mutex guarding the following. */<br>
-<br>
- /* Atomic queue of unprocessed upcalls. */<br>
- struct list upcalls OVS_GUARDED;<br>
- size_t n_upcalls OVS_GUARDED;<br>
-<br>
- bool need_signal; /* Only changed by the dispatcher. */<br>
-<br>
- pthread_cond_t wake_cond; /* Wakes 'thread' while holding<br>
- 'mutex'. */<br>
+ uint32_t handler_id; /* Handler id. */<br>
};<br>
<br>
/* A thread that processes each kernel flow handed to it by the flow_dumper<br>
@@ -89,9 +74,6 @@ struct revalidator {<br>
*<br>
* udpif has two logically separate pieces:<br>
*<br>
- * - A "dispatcher" thread that reads upcalls from the kernel and dispatches<br>
- * them to one of several "handler" threads (see struct handler).<br>
- *<br>
* - A "flow_dumper" thread that reads the kernel flow table and dispatches<br>
* flows to one of several "revalidator" threads (see struct<br>
* revalidator). */<br>
@@ -103,7 +85,6 @@ struct udpif {<br>
<br>
uint32_t secret; /* Random seed for upcall hash. */<br>
<br>
- pthread_t dispatcher; /* Dispatcher thread ID. */<br>
pthread_t flow_dumper; /* Flow dumper thread ID. */<br>
<br>
struct handler *handlers; /* Upcall handlers. */<br>
@@ -143,7 +124,7 @@ enum upcall_type {<br>
};<br>
<br>
struct upcall {<br>
- struct list list_node; /* For queuing upcalls. */<br>
+ bool is_valid;<br>
struct flow_miss *flow_miss; /* This upcall's flow_miss. */<br>
<br>
/* Raw upcall plus data for keeping track of the memory backing it. */<br>
@@ -216,15 +197,14 @@ struct flow_miss {<br>
bool put;<br>
};<br>
<br>
-static void upcall_destroy(struct upcall *);<br>
+static void upcall_destroy(struct upcall *, bool free_upcall);<br>
<br>
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);<br>
static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);<br>
<br>
-static void recv_upcalls(struct udpif *);<br>
-static void handle_upcalls(struct handler *handler, struct list *upcalls);<br>
+static void handle_upcalls(struct handler *handler, struct upcall *upcalls,<br>
+ size_t n_upcalls);<br>
static void *udpif_flow_dumper(void *);<br>
-static void *udpif_dispatcher(void *);<br>
static void *udpif_upcall_handler(void *);<br>
static void *udpif_revalidator(void *);<br>
static uint64_t udpif_get_n_flows(struct udpif *);<br>
@@ -311,9 +291,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,<br>
for (i = 0; i < udpif->n_handlers; i++) {<br>
struct handler *handler = &udpif->handlers[i];<br>
<br>
- ovs_mutex_lock(&handler->mutex);<br>
- xpthread_cond_signal(&handler->wake_cond);<br>
- ovs_mutex_unlock(&handler->mutex);<br>
xpthread_join(handler->thread, NULL);<br>
}<br>
<br>
@@ -327,7 +304,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,<br>
}<br>
<br>
xpthread_join(udpif->flow_dumper, NULL);<br>
- xpthread_join(udpif->dispatcher, NULL);<br>
<br>
for (i = 0; i < udpif->n_revalidators; i++) {<br>
struct revalidator *revalidator = &udpif->revalidators[i];<br>
@@ -351,17 +327,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,<br>
}<br>
<br>
for (i = 0; i < udpif->n_handlers; i++) {<br>
- struct handler *handler = &udpif->handlers[i];<br>
- struct upcall *miss, *next;<br>
-<br>
- LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {<br>
- list_remove(&miss->list_node);<br>
- upcall_destroy(miss);<br>
- }<br>
- ovs_mutex_destroy(&handler->mutex);<br>
-<br>
- xpthread_cond_destroy(&handler->wake_cond);<br>
- free(handler->name);<br>
+ free(udpif->handlers[i].name);<br>
}<br>
latch_poll(&udpif->exit_latch);<br>
<br>
@@ -386,10 +352,7 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,<br>
struct handler *handler = &udpif->handlers[i];<br>
<br>
handler->udpif = udpif;<br>
- list_init(&handler->upcalls);<br>
- handler->need_signal = false;<br>
- xpthread_cond_init(&handler->wake_cond, NULL);<br>
- ovs_mutex_init(&handler->mutex);<br>
+ handler->handler_id = i;<br>
xpthread_create(&handler->thread, NULL, udpif_upcall_handler,<br>
handler);<br>
}<br>
@@ -407,7 +370,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,<br>
xpthread_create(&revalidator->thread, NULL, udpif_revalidator,<br>
revalidator);<br>
}<br>
- xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);<br>
xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);<br>
}<br>
}<br>
@@ -434,16 +396,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)<br>
{<br>
size_t i;<br>
<br>
- simap_increase(usage, "dispatchers", 1);<br>
simap_increase(usage, "flow_dumpers", 1);<br>
<br>
simap_increase(usage, "handlers", udpif->n_handlers);<br>
- for (i = 0; i < udpif->n_handlers; i++) {<br>
- struct handler *handler = &udpif->handlers[i];<br>
- ovs_mutex_lock(&handler->mutex);<br>
- simap_increase(usage, "handler upcalls", handler->n_upcalls);<br>
- ovs_mutex_unlock(&handler->mutex);<br>
- }<br>
<br>
simap_increase(usage, "revalidators", udpif->n_revalidators);<br>
for (i = 0; i < udpif->n_revalidators; i++) {<br>
@@ -472,12 +427,16 @@ udpif_flush(void)<br>
<br>
/* Destroys and deallocates 'upcall'. */<br>
static void<br>
-upcall_destroy(struct upcall *upcall)<br>
+upcall_destroy(struct upcall *upcall, bool free_upcall)<br>
{<br>
if (upcall) {<br>
ofpbuf_uninit(&upcall->dpif_upcall.packet);<br>
ofpbuf_uninit(&upcall->upcall_buf);<br>
- free(upcall);<br>
+<br>
+ upcall->is_valid = false;<br>
+ if (free_upcall) {<br>
+ free(upcall);<br>
+ }<br>
}<br>
}<br>
<br>
@@ -503,24 +462,6 @@ udpif_get_n_flows(struct udpif *udpif)<br>
return flow_count;<br>
}<br>
<br>
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,<br>
- * assigning them to a upcall_handler thread. */<br>
-static void *<br>
-udpif_dispatcher(void *arg)<br>
-{<br>
- struct udpif *udpif = arg;<br>
-<br>
- set_subprogram_name("dispatcher");<br>
- while (!latch_is_set(&udpif->exit_latch)) {<br>
- recv_upcalls(udpif);<br>
- dpif_recv_wait(udpif->dpif);<br>
- latch_wait(&udpif->exit_latch);<br>
- poll_block();<br>
- }<br>
-<br>
- return NULL;<br>
-}<br>
-<br>
static void *<br>
udpif_flow_dumper(void *arg)<br>
{<br>
@@ -631,38 +572,44 @@ udpif_flow_dumper(void *arg)<br>
return NULL;<br>
}<br>
<br>
-/* The miss handler thread is responsible for processing miss upcalls retrieved<br>
- * by the dispatcher thread. Once finished it passes the processed miss<br>
- * upcalls to ofproto-dpif where they're installed in the datapath. */<br>
static void *<br>
udpif_upcall_handler(void *arg)<br>
{<br>
struct handler *handler = arg;<br>
+ struct udpif *udpif = handler->udpif;<br>
+ struct upcall upcalls[FLOW_MISS_MAX_BATCH];<br>
<br>
handler->name = xasprintf("handler_%u", ovsthread_id_self());<br>
set_subprogram_name("%s", handler->name);<br>
<br>
while (!latch_is_set(&handler->udpif->exit_latch)) {<br>
- struct list misses = LIST_INITIALIZER(&misses);<br>
- size_t i;<br>
-<br>
- ovs_mutex_lock(&handler->mutex);<br>
- if (!handler->n_upcalls) {<br>
- ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);<br>
- }<br>
+ size_t i, n_upcalls;<br>
<br>
for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {<br>
- if (handler->n_upcalls) {<br>
- handler->n_upcalls--;<br>
- list_push_back(&misses, list_pop_front(&handler->upcalls));<br>
- } else {<br>
+ struct upcall *upcall = &upcalls[i];<br>
+ int error;<br>
+<br>
+ ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,<br>
+ sizeof upcall->upcall_stub);<br>
+ error = dpif_recv(udpif->dpif, handler->handler_id,<br>
+ &upcall->dpif_upcall, &upcall->upcall_buf);<br>
+ if (error) {<br>
+ /* upcall_destroy() can only be called on successfully received<br>
+ * upcalls. */<br>
+ ofpbuf_uninit(&upcall->upcall_buf);<br>
break;<br>
}<br>
+ upcall->is_valid = true;<br>
}<br>
- ovs_mutex_unlock(&handler->mutex);<br>
-<br>
- handle_upcalls(handler, &misses);<br>
<br>
+ n_upcalls = i;<br>
+ if (!n_upcalls) {<br>
+ dpif_recv_wait(udpif->dpif, handler->handler_id);<br>
+ latch_wait(&udpif->exit_latch);<br>
+ poll_block();<br>
+ } else {<br>
+ handle_upcalls(handler, upcalls, n_upcalls);<br>
+ }<br>
coverage_clear();<br>
}<br>
<br>
@@ -769,98 +716,11 @@ classify_upcall(const struct upcall *upcall)<br>
}<br>
}<br>
<br>
-static void<br>
-recv_upcalls(struct udpif *udpif)<br>
-{<br>
- int n;<br>
-<br>
- for (;;) {<br>
- uint32_t hash = udpif->secret;<br>
- struct handler *handler;<br>
- struct upcall *upcall;<br>
- size_t n_bytes, left;<br>
- struct nlattr *nla;<br>
- int error;<br>
-<br>
- upcall = xmalloc(sizeof *upcall);<br>
- ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,<br>
- sizeof upcall->upcall_stub);<br>
- error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,<br>
- &upcall->upcall_buf);<br>
- if (error) {<br>
- /* upcall_destroy() can only be called on successfully received<br>
- * upcalls. */<br>
- ofpbuf_uninit(&upcall->upcall_buf);<br>
- free(upcall);<br>
- break;<br>
- }<br>
-<br>
- n_bytes = 0;<br>
- NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,<br>
- upcall->dpif_upcall.key_len) {<br>
- enum ovs_key_attr type = nl_attr_type(nla);<br>
- if (type == OVS_KEY_ATTR_IN_PORT<br>
- || type == OVS_KEY_ATTR_TCP<br>
- || type == OVS_KEY_ATTR_UDP) {<br>
- if (nl_attr_get_size(nla) == 4) {<br>
- hash = mhash_add(hash, nl_attr_get_u32(nla));<br>
- n_bytes += 4;<br>
- } else {<br>
- VLOG_WARN_RL(&rl,<br>
- "Netlink attribute with incorrect size.");<br>
- }<br>
- }<br>
- }<br>
- hash = mhash_finish(hash, n_bytes);<br>
-<br>
- handler = &udpif->handlers[hash % udpif->n_handlers];<br>
-<br>
- ovs_mutex_lock(&handler->mutex);<br>
- if (handler->n_upcalls < MAX_QUEUE_LENGTH) {<br>
- list_push_back(&handler->upcalls, &upcall->list_node);<br>
- if (handler->n_upcalls == 0) {<br>
- handler->need_signal = true;<br>
- }<br>
- handler->n_upcalls++;<br>
- if (handler->need_signal &&<br>
- handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {<br>
- handler->need_signal = false;<br>
- xpthread_cond_signal(&handler->wake_cond);<br>
- }<br>
- ovs_mutex_unlock(&handler->mutex);<br>
- if (!VLOG_DROP_DBG(&rl)) {<br>
- struct ds ds = DS_EMPTY_INITIALIZER;<br>
-<br>
- odp_flow_key_format(upcall->dpif_upcall.key,<br>
- upcall->dpif_upcall.key_len,<br>
- &ds);<br>
- VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));<br>
- ds_destroy(&ds);<br>
- }<br>
- } else {<br>
- ovs_mutex_unlock(&handler->mutex);<br>
- COVERAGE_INC(upcall_queue_overflow);<br>
- upcall_destroy(upcall);<br>
- }<br>
- }<br>
-<br>
- for (n = 0; n < udpif->n_handlers; ++n) {<br>
- struct handler *handler = &udpif->handlers[n];<br>
-<br>
- if (handler->need_signal) {<br>
- handler->need_signal = false;<br>
- ovs_mutex_lock(&handler->mutex);<br>
- xpthread_cond_signal(&handler->wake_cond);<br>
- ovs_mutex_unlock(&handler->mutex);<br>
- }<br>
- }<br>
-}<br>
-<br>
/* Calculates slow path actions for 'xout'. 'buf' must statically be<br>
* initialized with at least 128 bytes of space. */<br>
static void<br>
compose_slow_path(struct udpif *udpif, struct xlate_out *xout,<br>
- odp_port_t odp_in_port, struct ofpbuf *buf)<br>
+ struct flow *flow, odp_port_t odp_in_port, struct ofpbuf *buf)<br>
{<br>
union user_action_cookie cookie;<br>
odp_port_t port;<br>
@@ -873,7 +733,7 @@ compose_slow_path(struct udpif *udpif, struct xlate_out *xout,<br>
port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)<br>
? ODPP_NONE<br>
: odp_in_port;<br>
- pid = dpif_port_get_pid(udpif->dpif, port);<br>
+ pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));<br>
odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);<br>
}<br>
<br>
@@ -893,7 +753,8 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,<br>
}<br>
<br>
static void<br>
-handle_upcalls(struct handler *handler, struct list *upcalls)<br>
+handle_upcalls(struct handler *handler, struct upcall *upcalls,<br>
+ size_t n_upcalls)<br>
{<br>
struct hmap misses = HMAP_INITIALIZER(&misses);<br>
struct udpif *udpif = handler->udpif;<br>
@@ -902,7 +763,6 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];<br>
struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];<br>
struct flow_miss *miss, *next_miss;<br>
- struct upcall *upcall, *next;<br>
size_t n_misses, n_ops, i;<br>
unsigned int flow_limit;<br>
bool fail_open, may_put;<br>
@@ -931,7 +791,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
* datapath flow.)<br>
*/<br>
n_misses = 0;<br>
- LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {<br>
+ for (i = 0; i < n_upcalls; i++) {<br>
+ struct upcall *upcall = &upcalls[i];<br>
struct dpif_upcall *dupcall = &upcall->dpif_upcall;<br>
struct flow_miss *miss = &miss_buf[n_misses];<br>
struct ofpbuf *packet = &dupcall->packet;<br>
@@ -960,8 +821,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,<br>
NULL);<br>
}<br>
- list_remove(&upcall->list_node);<br>
- upcall_destroy(upcall);<br>
+ upcall_destroy(upcall, false);<br>
continue;<br>
}<br>
<br>
@@ -1043,8 +903,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
dpif_ipfix_unref(ipfix);<br>
dpif_sflow_unref(sflow);<br>
<br>
- list_remove(&upcall->list_node);<br>
- upcall_destroy(upcall);<br>
+ upcall_destroy(upcall, false);<br>
}<br>
<br>
/* Initialize each 'struct flow_miss's ->xout.<br>
@@ -1087,12 +946,17 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
* The loop fills 'ops' with an array of operations to execute in the<br>
* datapath. */<br>
n_ops = 0;<br>
- LIST_FOR_EACH (upcall, list_node, upcalls) {<br>
+ for (i = 0; i < n_upcalls; i++) {<br>
+ struct upcall *upcall = &upcalls[i];<br>
struct flow_miss *miss = upcall->flow_miss;<br>
struct ofpbuf *packet = &upcall->dpif_upcall.packet;<br>
struct dpif_op *op;<br>
ovs_be16 flow_vlan_tci;<br>
<br>
+ if (!upcall->is_valid) {<br>
+ continue;<br>
+ }<br>
+<br>
/* Save a copy of flow.vlan_tci in case it is changed to<br>
* generate proper mega flow masks for VLAN splinter flows. */<br>
flow_vlan_tci = miss->flow.vlan_tci;<br>
@@ -1166,7 +1030,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
<br>
ofpbuf_use_stack(&buf, miss->slow_path_buf,<br>
sizeof miss->slow_path_buf);<br>
- compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);<br>
+ compose_slow_path(udpif, &miss->xout, &miss->flow,<br>
+ miss->odp_in_port, &buf);<br>
op->u.flow_put.actions = buf.data;<br>
op->u.flow_put.actions_len = buf.size;<br>
}<br>
@@ -1201,11 +1066,16 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
*<br>
* Copy packets before they are modified by execution. */<br>
if (fail_open) {<br>
- LIST_FOR_EACH (upcall, list_node, upcalls) {<br>
+ for (i = 0; i < n_upcalls; i++) {<br>
+ struct upcall *upcall = &upcalls[i];<br>
struct flow_miss *miss = upcall->flow_miss;<br>
struct ofpbuf *packet = &upcall->dpif_upcall.packet;<br>
struct ofproto_packet_in *pin;<br>
<br>
+ if (!upcall->is_valid) {<br>
+ continue;<br>
+ }<br>
+<br>
pin = xmalloc(sizeof *pin);<br>
pin->up.packet = xmemdup(packet->data, packet->size);<br>
pin->up.packet_len = packet->size;<br>
@@ -1231,9 +1101,10 @@ handle_upcalls(struct handler *handler, struct list *upcalls)<br>
}<br>
hmap_destroy(&misses);<br>
<br>
- LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {<br>
- list_remove(&upcall->list_node);<br>
- upcall_destroy(upcall);<br>
+ for (i = 0; i < n_upcalls; i++) {<br>
+ if (upcalls[i].is_valid) {<br>
+ upcall_destroy(&upcalls[i], false);<br>
+ }<br>
}<br>
}<br>
<br>
@@ -1328,7 +1199,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,<br>
xout.odp_actions.size);<br>
} else {<br>
ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);<br>
- compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);<br>
+ compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);<br>
}<br>
<br>
if (!ofpbuf_equal(&xout_actions, actions)) {<br>
@@ -1529,16 +1400,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,<br>
ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);<br>
<br>
ds_put_char(&ds, '\n');<br>
- for (i = 0; i < udpif->n_handlers; i++) {<br>
- struct handler *handler = &udpif->handlers[i];<br>
-<br>
- ovs_mutex_lock(&handler->mutex);<br>
- ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",<br>
- handler->name, handler->n_upcalls);<br>
- ovs_mutex_unlock(&handler->mutex);<br>
- }<br>
-<br>
- ds_put_char(&ds, '\n');<br>
for (i = 0; i < n_revalidators; i++) {<br>
struct revalidator *revalidator = &udpif->revalidators[i];<br>
<br>
diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c<br>
</div></div>index ccf0b75..08c140b 100644<br>
<div class="">--- a/ofproto/ofproto-dpif-xlate.c<br>
+++ b/ofproto/ofproto-dpif-xlate.c<br>
@@ -1483,8 +1483,9 @@ compose_sample_action(const struct xbridge *xbridge,<br>
actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);<br>
<br>
odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);<br>
- pid = dpif_port_get_pid(xbridge->dpif, odp_port);<br>
- cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size, odp_actions);<br>
+ pid = dpif_port_get_pid(xbridge->dpif, odp_port, flow_hash_5tuple(flow, 0));<br>
+ cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,<br>
+ odp_actions);<br>
<br>
nl_msg_end_nested(odp_actions, actions_offset);<br>
nl_msg_end_nested(odp_actions, sample_offset);<br>
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c<br>
</div>index 328b215..00e74c5 100644<br>
--- a/ofproto/ofproto-dpif.c<br>
+++ b/ofproto/ofproto-dpif.c<br>
@@ -472,7 +472,8 @@ type_run(const char *type)<br>
<div class=""><br>
backer->recv_set_enable = true;<br>
<br>
- error = dpif_recv_set(backer->dpif, backer->recv_set_enable);<br>
+ error = dpif_recv_set(backer->dpif, backer->recv_set_enable,<br>
+ n_handlers);<br>
if (error) {<br>
VLOG_ERR("Failed to enable receiving packets in dpif.");<br>
return error;<br>
</div>@@ -482,6 +483,7 @@ type_run(const char *type)<br>
<div class=""> }<br>
<br>
if (backer->recv_set_enable) {<br>
+ dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);<br>
udpif_set_threads(backer->udpif, n_handlers, n_revalidators);<br>
}<br>
<br>
</div>@@ -887,7 +889,7 @@ open_dpif_backer(const char *type, struct dpif_backer **backerp)<br>
<div class=""><br>
shash_add(&all_dpif_backers, type, backer);<br>
<br>
- error = dpif_recv_set(backer->dpif, backer->recv_set_enable);<br>
+ error = dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);<br>
if (error) {<br>
VLOG_ERR("failed to listen on datapath of type %s: %s",<br>
type, ovs_strerror(error));<br>
</div>@@ -932,7 +934,7 @@ check_variable_length_userdata(struct dpif_backer *backer)<br>
<div class="HOEnZb"><div class="h5"> ofpbuf_init(&actions, 64);<br>
start = nl_msg_start_nested(&actions, OVS_ACTION_ATTR_USERSPACE);<br>
nl_msg_put_u32(&actions, OVS_USERSPACE_ATTR_PID,<br>
- dpif_port_get_pid(backer->dpif, ODPP_NONE));<br>
+ dpif_port_get_pid(backer->dpif, ODPP_NONE, 0));<br>
nl_msg_put_unspec_zero(&actions, OVS_USERSPACE_ATTR_USERDATA, 4);<br>
nl_msg_end_nested(&actions, start);<br>
<br>
--<br>
1.7.9.5<br>
<br>
</div></div></blockquote></div><br></div>