[ovs-dev] [PATCH] ofproto-dpif-upcall: Remove the dispatcher thread.
Alex Wang
alexw at nicira.com
Thu Feb 6 07:04:00 UTC 2014
Hey Ben and Pravin,
Could you help review the dpif part and datapath part of the patch
respectively?
I found it really difficult to partition the patch into series. So, I can
only post the entire in one.
Thanks a lot~,
Alex Wang,
On Wed, Feb 5, 2014 at 11:01 PM, Alex Wang <alexw at nicira.com> wrote:
> This commit removes the 'dispatcher' thread by allowing 'handler'
> threads to read upcalls directly from dpif. vport in dpif will
> open netlink sockets for each handler and will use the 5-tuple
> hash from the missed packet to choose which socket (handler) to
> send the upcall.
>
> This patch also significantly simplifies the flow miss handling
> code and brings slight improvement to flow setup rate.
>
> Signed-off-by: Alex Wang <alexw at nicira.com>
>
> ---
> RFC->PATCH
> - use XOR to calculate the 5-tuple hash. this fixes the flow setup
> performance variation issue.
> - replace the malloc of 'struct upcall *' in udpif_upcall_handler()
> by local 'struct upcall' array.
> ---
> datapath/datapath.c | 22 +-
> datapath/vport.c | 128 +++++++++++-
> datapath/vport.h | 25 ++-
> include/linux/openvswitch.h | 9 +-
> lib/dpif-linux.c | 458
> +++++++++++++++++++++++------------------
> lib/dpif-linux.h | 3 +-
> lib/dpif-netdev.c | 9 +-
> lib/dpif-provider.h | 26 ++-
> lib/dpif.c | 38 ++--
> lib/dpif.h | 10 +-
> lib/flow.c | 18 ++
> lib/flow.h | 3 +-
> ofproto/ofproto-dpif-upcall.c | 265 ++++++------------------
> ofproto/ofproto-dpif-xlate.c | 5 +-
> ofproto/ofproto-dpif.c | 8 +-
> 15 files changed, 560 insertions(+), 467 deletions(-)
>
> diff --git a/datapath/datapath.c b/datapath/datapath.c
> index 5f1b34c..3e08e02 100644
> --- a/datapath/datapath.c
> +++ b/datapath/datapath.c
> @@ -242,7 +242,7 @@ void ovs_dp_process_received_packet(struct vport *p,
> struct sk_buff *skb)
> upcall.cmd = OVS_PACKET_CMD_MISS;
> upcall.key = &key;
> upcall.userdata = NULL;
> - upcall.portid = p->upcall_portid;
> + upcall.portid = ovs_vport_find_pid(p, &key);
> ovs_dp_upcall(dp, skb, &upcall);
> consume_skb(skb);
> stats_counter = &stats->n_missed;
> @@ -1239,7 +1239,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
> parms.options = NULL;
> parms.dp = dp;
> parms.port_no = OVSP_LOCAL;
> - parms.upcall_portid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);
> + parms.upcall_pids = a[OVS_DP_ATTR_UPCALL_PID];
>
> ovs_dp_change(dp, a);
>
> @@ -1457,7 +1457,7 @@ static const struct nla_policy
> vport_policy[OVS_VPORT_ATTR_MAX + 1] = {
> [OVS_VPORT_ATTR_STATS] = { .len = sizeof(struct ovs_vport_stats) },
> [OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },
> [OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },
> - [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },
> + [OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NLA_UNSPEC },
> [OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },
> };
>
> @@ -1492,8 +1492,7 @@ static int ovs_vport_cmd_fill_info(struct vport
> *vport, struct sk_buff *skb,
>
> if (nla_put_u32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no) ||
> nla_put_u32(skb, OVS_VPORT_ATTR_TYPE, vport->ops->type) ||
> - nla_put_string(skb, OVS_VPORT_ATTR_NAME,
> vport->ops->get_name(vport)) ||
> - nla_put_u32(skb, OVS_VPORT_ATTR_UPCALL_PID,
> vport->upcall_portid))
> + nla_put_string(skb, OVS_VPORT_ATTR_NAME,
> vport->ops->get_name(vport)))
> goto nla_put_failure;
>
> ovs_vport_get_stats(vport, &vport_stats);
> @@ -1501,6 +1500,9 @@ static int ovs_vport_cmd_fill_info(struct vport
> *vport, struct sk_buff *skb,
> &vport_stats))
> goto nla_put_failure;
>
> + if (ovs_vport_get_upcall_pids(vport, skb))
> + goto nla_put_failure;
> +
> err = ovs_vport_get_options(vport, skb);
> if (err == -EMSGSIZE)
> goto error;
> @@ -1577,8 +1579,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
> int err;
>
> err = -EINVAL;
> - if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE] ||
> - !a[OVS_VPORT_ATTR_UPCALL_PID])
> + if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE])
> goto exit;
>
> ovs_lock();
> @@ -1615,7 +1616,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
> parms.options = a[OVS_VPORT_ATTR_OPTIONS];
> parms.dp = dp;
> parms.port_no = port_no;
> - parms.upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> + parms.upcall_pids = a[OVS_VPORT_ATTR_UPCALL_PIDS];
>
> vport = new_vport(&parms);
> err = PTR_ERR(vport);
> @@ -1676,8 +1677,9 @@ static int ovs_vport_cmd_set(struct sk_buff *skb,
> struct genl_info *info)
> if (a[OVS_VPORT_ATTR_STATS])
> ovs_vport_set_stats(vport,
> nla_data(a[OVS_VPORT_ATTR_STATS]));
>
> - if (a[OVS_VPORT_ATTR_UPCALL_PID])
> - vport->upcall_portid =
> nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> + err = ovs_vport_set_upcall_pids(vport,
> a[OVS_VPORT_ATTR_UPCALL_PIDS]);
> + if (err)
> + goto exit_free;
>
> err = ovs_vport_cmd_fill_info(vport, reply, info->snd_portid,
> info->snd_seq, 0, OVS_VPORT_CMD_NEW);
> diff --git a/datapath/vport.c b/datapath/vport.c
> index 7f12acc..3e83ba7 100644
> --- a/datapath/vport.c
> +++ b/datapath/vport.c
> @@ -135,10 +135,12 @@ struct vport *ovs_vport_alloc(int priv_size, const
> struct vport_ops *ops,
>
> vport->dp = parms->dp;
> vport->port_no = parms->port_no;
> - vport->upcall_portid = parms->upcall_portid;
> vport->ops = ops;
> INIT_HLIST_NODE(&vport->dp_hash_node);
>
> + if (ovs_vport_set_upcall_pids(vport, parms->upcall_pids))
> + return ERR_PTR(-EINVAL);
> +
> vport->percpu_stats = alloc_percpu(struct pcpu_tstats);
> if (!vport->percpu_stats) {
> kfree(vport);
> @@ -162,6 +164,7 @@ struct vport *ovs_vport_alloc(int priv_size, const
> struct vport_ops *ops,
> */
> void ovs_vport_free(struct vport *vport)
> {
> + ovs_vport_set_upcall_pids(vport, NULL);
> free_percpu(vport->percpu_stats);
> kfree(vport);
> }
> @@ -348,6 +351,129 @@ int ovs_vport_get_options(const struct vport *vport,
> struct sk_buff *skb)
> return 0;
> }
>
> +static void __vport_pids_destroy(struct vport_pids *pids)
> +{
> + if (pids->pids)
> + kfree(pids->pids);
> +
> + kfree(pids);
> +}
> +
> +static void vport_pids_destroy_rcu_cb(struct rcu_head *rcu)
> +{
> + struct vport_pids *pids = container_of(rcu, struct vport_pids,
> rcu);
> +
> + __vport_pids_destroy(pids);
> +}
> +
> +/**
> + * ovs_vport_set_upcall_pids - set upcall pids for sending upcall.
> + *
> + * @vport: vport to modify.
> + * @pids: new configuration.
> + *
> + * If the pids is non-null, sets the vport's upcall_pids pointer. If the
> + * pids is null, frees the vport's upcall_pids.
> + *
> + * Returns 0 if successful, -EINVAL if @pids cannot be parsed as an array
> + * of U32.
> + */
> +int ovs_vport_set_upcall_pids(struct vport *vport, struct nlattr *pids)
> +{
> + struct vport_pids *old;
> +
> + if (pids && nla_len(pids) % sizeof(u32))
> + return -EINVAL;
> +
> + rcu_read_lock();
> + old = vport->upcall_pids ? ovsl_dereference(vport->upcall_pids)
> + : NULL;
> +
> + if (pids) {
> + struct vport_pids *vport_pids;
> +
> + vport_pids = kmalloc(sizeof *vport_pids, GFP_KERNEL);
> + vport_pids->pids = kmalloc(nla_len(pids), GFP_KERNEL);
> + vport_pids->n_pids = nla_len(pids)
> + / (sizeof *vport_pids->pids);
> + memcpy(vport_pids->pids, nla_data(pids), nla_len(pids));
> +
> + rcu_assign_pointer(vport->upcall_pids, vport_pids);
> + } else if (old) {
> + rcu_assign_pointer(vport->upcall_pids, NULL);
> + }
> +
> + if (old)
> + call_rcu(&old->rcu, vport_pids_destroy_rcu_cb);
> +
> + rcu_read_unlock();
> + return 0;
> +}
> +
> +/**
> + * ovs_vport_get_options - get the upcall_pids value.
> + *
> + * @vport: vport from which to retrieve the pids.
> + * @skb: sk_buff where pids should be appended.
> + *
> + * Retrieves the configuration of the given vport, appending the
> + * %OVS_VPORT_ATTR_UPCALL_PIDS attribute which is the array of upcall
> + * pids to @skb.
> + *
> + * Returns 0 if successful, -EMSGSIZE if @skb has insufficient room.
> + * If an error occurs, @skb is left unmodified.
> + */
> +int ovs_vport_get_upcall_pids(const struct vport *vport, struct sk_buff
> *skb)
> +{
> + struct vport_pids *pids;
> + int err = 0;
> +
> + rcu_read_lock();
> + pids = ovsl_dereference(vport->upcall_pids);
> +
> + if (!pids)
> + goto exit;
> +
> + if (nla_put(skb, OVS_VPORT_ATTR_UPCALL_PIDS,
> + pids->n_pids * sizeof *pids->pids,
> + (void *) pids->pids)) {
> + err = -EMSGSIZE;
> + goto exit;
> + }
> +
> +exit:
> + rcu_read_unlock();
> + return err;
> +}
> +
> +/**
> + * ovs_vport_find_pid - find the upcall pid to send upcall.
> + *
> + * @vport: vport from which the missed packet is received.
> + * @key: flow keys.
> + *
> + * Calculates the 5-tuple hash from the flow key and finds the upcall pid
> to
> + * send the upcall to.
> + *
> + * Returns the pid of the target socket. Must be called with
> rcu_read_lock.
> + */
> +u32 ovs_vport_find_pid(const struct vport *p, const struct sw_flow_key
> *key)
> +{
> + struct vport_pids *pids;
> + u32 hash;
> +
> + pids = ovsl_dereference(p->upcall_pids);
> +
> + if (!pids)
> + return 0;
> +
> + hash = key->ipv4.addr.src ^ key->ipv4.addr.dst
> + ^ key->ip.proto ^ key->ipv4.tp.src
> + ^ key->ipv4.tp.dst;
> +
> + return pids->pids[jhash((void *) &hash, 4, 0) % pids->n_pids];
> +}
> +
> /**
> * ovs_vport_receive - pass up received packet to the datapath for
> processing
> *
> diff --git a/datapath/vport.h b/datapath/vport.h
> index 18b723e..f11faa9 100644
> --- a/datapath/vport.h
> +++ b/datapath/vport.h
> @@ -50,6 +50,11 @@ void ovs_vport_get_stats(struct vport *, struct
> ovs_vport_stats *);
> int ovs_vport_set_options(struct vport *, struct nlattr *options);
> int ovs_vport_get_options(const struct vport *, struct sk_buff *);
>
> +int ovs_vport_set_upcall_pids(struct vport *, struct nlattr *pids);
> +int ovs_vport_get_upcall_pids(const struct vport *, struct sk_buff *);
> +
> +u32 ovs_vport_find_pid(const struct vport *, const struct sw_flow_key *);
> +
> int ovs_vport_send(struct vport *, struct sk_buff *);
>
> /* The following definitions are for implementers of vport devices: */
> @@ -60,13 +65,25 @@ struct vport_err_stats {
> u64 tx_dropped;
> u64 tx_errors;
> };
> +/**
> + * struct vport_pids - array of netlink pids for a vport.
> + * must be protected by rcu.
> + * @rcu: RCU callback head for deferred destruction.
> + * @n_pids: Size of @upcall_pids array.
> + * @pids: Array storing the Netlink socket pids to use for packets
> received
> + * on this port that miss the flow table.
> + */
> +struct vport_pids {
> + struct rcu_head rcu;
> + u32 n_pids;
> + u32 *pids;
> +};
>
> /**
> * struct vport - one port within a datapath
> * @rcu: RCU callback head for deferred destruction.
> * @dp: Datapath to which this port belongs.
> - * @upcall_portid: The Netlink port to use for packets received on this
> port that
> - * miss the flow table.
> + * @upcall_pids: RCU protected vport_pids array.
> * @port_no: Index into @dp's @ports array.
> * @hash_node: Element in @dev_table hash table in vport.c.
> * @dp_hash_node: Element in @datapath->ports hash table in datapath.c.
> @@ -80,7 +97,7 @@ struct vport_err_stats {
> struct vport {
> struct rcu_head rcu;
> struct datapath *dp;
> - u32 upcall_portid;
> + struct vport_pids __rcu *upcall_pids;
> u16 port_no;
>
> struct hlist_node hash_node;
> @@ -112,7 +129,7 @@ struct vport_parms {
> /* For ovs_vport_alloc(). */
> struct datapath *dp;
> u16 port_no;
> - u32 upcall_portid;
> + struct nlattr *upcall_pids;
> };
>
> /**
> diff --git a/include/linux/openvswitch.h b/include/linux/openvswitch.h
> index 5137c2f..e1fe92d 100644
> --- a/include/linux/openvswitch.h
> +++ b/include/linux/openvswitch.h
> @@ -225,9 +225,9 @@ enum ovs_vport_type {
> * this is the name of the network device. Maximum length %IFNAMSIZ-1
> bytes
> * plus a null terminator.
> * @OVS_VPORT_ATTR_OPTIONS: Vport-specific configuration information.
> - * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that
> - * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
> - * this port. A value of zero indicates that upcalls should not be sent.
> + * @OVS_VPORT_ATTR_UPCALL_PIDS: The array of Netlink socket pids in
> userspace
> + * that OVS_PACKET_CMD_MISS upcalls will be directed to for packets
> received on
> + * this port. If this is not specified, upcalls should not be sent.
> * @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for
> * packets sent or received through the vport.
> *
> @@ -251,7 +251,8 @@ enum ovs_vport_attr {
> OVS_VPORT_ATTR_TYPE, /* u32 OVS_VPORT_TYPE_* constant. */
> OVS_VPORT_ATTR_NAME, /* string name, up to IFNAMSIZ bytes long
> */
> OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type
> */
> - OVS_VPORT_ATTR_UPCALL_PID, /* u32 Netlink PID to receive upcalls */
> + OVS_VPORT_ATTR_UPCALL_PIDS, /* array of u32 Netlink socket PIDs
> for */
> + /* receiving upcalls */
> OVS_VPORT_ATTR_STATS, /* struct ovs_vport_stats */
> __OVS_VPORT_ATTR_MAX
> };
> diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
> index f7f5292..86e0e97 100644
> --- a/lib/dpif-linux.c
> +++ b/lib/dpif-linux.c
> @@ -35,6 +35,7 @@
> #include "bitmap.h"
> #include "dpif-provider.h"
> #include "dynamic-string.h"
> +#include "fat-rwlock.h"
> #include "flow.h"
> #include "netdev.h"
> #include "netdev-linux.h"
> @@ -132,7 +133,15 @@ struct dpif_channel {
> long long int last_poll; /* Last time this channel was polled. */
> };
>
> -static void report_loss(struct dpif *, struct dpif_channel *);
> +static void report_loss(struct dpif *, struct dpif_channel *, uint32_t
> ch_idx,
> + uint32_t handler_id);
> +
> +struct dpif_epoll {
> + struct epoll_event *epoll_events;
> + int epoll_fd; /* epoll fd that includes channel socks.
> */
> + int n_events; /* Num events returned by epoll_wait(). */
> + int event_offset; /* Offset into 'epoll_events'. */
> +};
>
> /* Datapath interface for the openvswitch Linux kernel module. */
> struct dpif_linux {
> @@ -140,13 +149,11 @@ struct dpif_linux {
> int dp_ifindex;
>
> /* Upcall messages. */
> - struct ovs_mutex upcall_lock;
> + struct fat_rwlock upcall_lock;
> int uc_array_size; /* Size of 'channels' and 'epoll_events'.
> */
> - struct dpif_channel *channels;
> - struct epoll_event *epoll_events;
> - int epoll_fd; /* epoll fd that includes channel socks.
> */
> - int n_events; /* Num events returned by epoll_wait(). */
> - int event_offset; /* Offset into 'epoll_events'. */
> + struct dpif_epoll *epolls;
> + struct dpif_channel **channels;/* Array of channel arrays for each
> vport. */
> + uint32_t n_handlers; /* Num of upcall receivers (handlers).
> */
>
> /* Change notification. */
> struct nl_sock *port_notifier; /* vport multicast group subscriber. */
> @@ -171,8 +178,8 @@ static unsigned int ovs_vport_mcgroup;
> static int dpif_linux_init(void);
> static int open_dpif(const struct dpif_linux_dp *, struct dpif **);
> static uint32_t dpif_linux_port_get_pid(const struct dpif *,
> - odp_port_t port_no);
> -static int dpif_linux_refresh_channels(struct dpif *);
> + odp_port_t port_no, uint32_t
> hash);
> +static int dpif_linux_refresh_channels(struct dpif *, uint32_t
> n_handlers);
>
> static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *,
> struct ofpbuf *);
> @@ -252,8 +259,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif
> **dpifp)
>
> dpif = xzalloc(sizeof *dpif);
> dpif->port_notifier = NULL;
> - ovs_mutex_init(&dpif->upcall_lock);
> - dpif->epoll_fd = -1;
> + fat_rwlock_init(&dpif->upcall_lock);
>
> dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,
> dp->dp_ifindex, dp->dp_ifindex);
> @@ -265,63 +271,43 @@ open_dpif(const struct dpif_linux_dp *dp, struct
> dpif **dpifp)
> }
>
> static void
> -destroy_channels(struct dpif_linux *dpif)
> +channels_to_pids(struct dpif_channel *ch, uint32_t len, uint32_t
> **upcall_pids)
> {
> - unsigned int i;
> + size_t i;
> + uint32_t *pids;
>
> - if (dpif->epoll_fd < 0) {
> + if (!upcall_pids) {
> return;
> }
>
> - for (i = 0; i < dpif->uc_array_size; i++ ) {
> - struct dpif_linux_vport vport_request;
> - struct dpif_channel *ch = &dpif->channels[i];
> - uint32_t upcall_pid = 0;
> -
> - if (!ch->sock) {
> - continue;
> - }
> -
> - epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock),
> NULL);
> -
> - /* Turn off upcalls. */
> - dpif_linux_vport_init(&vport_request);
> - vport_request.cmd = OVS_VPORT_CMD_SET;
> - vport_request.dp_ifindex = dpif->dp_ifindex;
> - vport_request.port_no = u32_to_odp(i);
> - vport_request.upcall_pid = &upcall_pid;
> - dpif_linux_vport_transact(&vport_request, NULL, NULL);
> -
> - nl_sock_destroy(ch->sock);
> + pids = xmalloc(len * sizeof *pids);
> + for (i = 0; i < len; i++) {
> + pids[i] = nl_sock_pid(ch[i].sock);
> }
> -
> - free(dpif->channels);
> - dpif->channels = NULL;
> - dpif->uc_array_size = 0;
> -
> - free(dpif->epoll_events);
> - dpif->epoll_events = NULL;
> - dpif->n_events = dpif->event_offset = 0;
> -
> - /* Don't close dpif->epoll_fd since that would cause other threads
> that
> - * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed
> fd. */
> + *upcall_pids = pids;
> }
>
> +/* Adds 'dpif->n_handlers' channels to vport. If upcall_pids is non-NULL,
> + * makes it point to the array of channel socket pids. */
> static int
> -add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock
> *sock)
> +add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no,
> + uint32_t **upcall_pids)
> {
> struct epoll_event event;
> + struct dpif_epoll *epolls = dpif->epolls;
> uint32_t port_idx = odp_to_u32(port_no);
> + uint32_t *pids = NULL;
> + int error = 0;
> + size_t i, j;
>
> - if (dpif->epoll_fd < 0) {
> + if (epolls == NULL) {
> return 0;
> }
>
> /* We assume that the datapath densely chooses port numbers, which
> - * can therefore be used as an index into an array of channels. */
> + * can therefore be used as an index into dpif->channels. */
> if (port_idx >= dpif->uc_array_size) {
> uint32_t new_size = port_idx + 1;
> - uint32_t i;
>
> if (new_size > MAX_PORTS) {
> VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
> @@ -332,49 +318,117 @@ add_channel(struct dpif_linux *dpif, odp_port_t
> port_no, struct nl_sock *sock)
> dpif->channels = xrealloc(dpif->channels,
> new_size * sizeof *dpif->channels);
> for (i = dpif->uc_array_size; i < new_size; i++) {
> - dpif->channels[i].sock = NULL;
> + dpif->channels[i] = NULL;
> + }
> + for (i = 0; i < dpif->n_handlers; i++) {
> + epolls[i].epoll_events = xrealloc(epolls[i].epoll_events,
> + new_size * sizeof
> + *epolls[i].epoll_events);
> }
> -
> - dpif->epoll_events = xrealloc(dpif->epoll_events,
> - new_size * sizeof
> *dpif->epoll_events);
> dpif->uc_array_size = new_size;
> }
>
> memset(&event, 0, sizeof event);
> event.events = EPOLLIN;
> event.data.u32 = port_idx;
> - if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> - &event) < 0) {
> - return errno;
> - }
>
> - nl_sock_destroy(dpif->channels[port_idx].sock);
> - dpif->channels[port_idx].sock = sock;
> - dpif->channels[port_idx].last_poll = LLONG_MIN;
> + /* Creates channel for each upcall handler. */
> + dpif->channels[port_idx] = xzalloc(dpif->n_handlers
> + * sizeof
> *dpif->channels[port_idx]);
> + for (i = 0; i < dpif->n_handlers; i++) {
> + struct nl_sock *sock = NULL;
> +
> + error = nl_sock_create(NETLINK_GENERIC, &sock);
> + if (error) {
> + goto error;
> + }
> +
> + if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> + &event) < 0) {
> + error = errno;
> + goto error;
> + }
> + dpif->channels[port_idx][i].sock = sock;
> + dpif->channels[port_idx][i].last_poll = LLONG_MIN;
> + }
> + channels_to_pids(dpif->channels[port_idx], dpif->n_handlers,
> upcall_pids);
>
> return 0;
> +
> +error:
> + /* Cleans up the already created channel and socks. */
> + for (j = 0; j < i; j++) {
> + nl_sock_destroy(dpif->channels[port_idx][j].sock);
> + }
> + free(pids);
> + free(dpif->channels[port_idx]);
> + dpif->channels[port_idx] = NULL;
> +
> + return error;
> }
>
> static void
> -del_channel(struct dpif_linux *dpif, odp_port_t port_no)
> +del_vport_channels(struct dpif_linux *dpif, odp_port_t port_no)
> {
> struct dpif_channel *ch;
> uint32_t port_idx = odp_to_u32(port_no);
> + size_t i;
> +
> + if (!dpif->epolls || port_idx >= dpif->uc_array_size) {
> + return;
> + }
>
> - if (dpif->epoll_fd < 0 || port_idx >= dpif->uc_array_size) {
> + ch = dpif->channels[port_idx];
> + if (!ch) {
> return;
> }
>
> - ch = &dpif->channels[port_idx];
> - if (!ch->sock) {
> + for (i = 0; i < dpif->n_handlers; i++) {
> + epoll_ctl(dpif->epolls[i].epoll_fd, EPOLL_CTL_DEL,
> + nl_sock_fd(ch[i].sock), NULL);
> + nl_sock_destroy(ch[i].sock);
> + dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
> + }
> + free(ch);
> + dpif->channels[port_idx] = NULL;
> +}
> +
> +static void
> +destroy_all_channels(struct dpif_linux *dpif)
> +{
> + unsigned int i;
> +
> + if (!dpif->epolls) {
> return;
> }
>
> - epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
> - dpif->event_offset = dpif->n_events = 0;
> + for (i = 0; i < dpif->uc_array_size; i++ ) {
> + struct dpif_linux_vport vport_request;
> + struct dpif_channel *ch = dpif->channels[i];
> +
> + if (!ch->sock) {
> + continue;
> + }
> +
> + /* Turn off upcalls. */
> + dpif_linux_vport_init(&vport_request);
> + vport_request.cmd = OVS_VPORT_CMD_SET;
> + vport_request.dp_ifindex = dpif->dp_ifindex;
> + vport_request.port_no = u32_to_odp(i);
> + vport_request.upcall_pids = NULL;
> + dpif_linux_vport_transact(&vport_request, NULL, NULL);
> +
> + del_vport_channels(dpif, u32_to_odp(i));
> + }
> +
> + free(dpif->channels);
> + dpif->channels = NULL;
> + dpif->uc_array_size = 0;
> +
> + free(dpif->epolls);
>
> - nl_sock_destroy(ch->sock);
> - ch->sock = NULL;
> + /* Don't close dpif->epoll_fd since that would cause other threads
> that
> + * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed
> fd. */
> }
>
> static void
> @@ -383,11 +437,8 @@ dpif_linux_close(struct dpif *dpif_)
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> nl_sock_destroy(dpif->port_notifier);
> - destroy_channels(dpif);
> - if (dpif->epoll_fd >= 0) {
> - close(dpif->epoll_fd);
> - }
> - ovs_mutex_destroy(&dpif->upcall_lock);
> + destroy_all_channels(dpif);
> + fat_rwlock_destroy(&dpif->upcall_lock);
> free(dpif);
> }
>
> @@ -409,7 +460,7 @@ dpif_linux_run(struct dpif *dpif_)
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> if (dpif->refresh_channels) {
> dpif->refresh_channels = false;
> - dpif_linux_refresh_channels(dpif_);
> + dpif_linux_refresh_channels(dpif_, dpif->n_handlers);
> }
> }
>
> @@ -500,20 +551,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
> namebuf, sizeof
> namebuf);
> const char *type = netdev_get_type(netdev);
> struct dpif_linux_vport request, reply;
> - struct nl_sock *sock = NULL;
> - uint32_t upcall_pid;
> struct ofpbuf *buf;
> uint64_t options_stub[64 / 8];
> struct ofpbuf options;
> + uint32_t *upcall_pids = NULL;
> int error;
>
> - if (dpif->epoll_fd >= 0) {
> - error = nl_sock_create(NETLINK_GENERIC, &sock);
> - if (error) {
> - return error;
> - }
> - }
> -
> dpif_linux_vport_init(&request);
> request.cmd = OVS_VPORT_CMD_NEW;
> request.dp_ifindex = dpif->dp_ifindex;
> @@ -522,7 +565,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
> VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it
> has "
> "unsupported type `%s'",
> dpif_name(dpif_), name, type);
> - nl_sock_destroy(sock);
> return EINVAL;
> }
> request.name = name;
> @@ -541,41 +583,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
> }
>
> request.port_no = *port_nop;
> - upcall_pid = sock ? nl_sock_pid(sock) : 0;
> - request.upcall_pid = &upcall_pid;
> + request.upcall_pids = NULL;
>
> error = dpif_linux_vport_transact(&request, &reply, &buf);
> if (!error) {
> *port_nop = reply.port_no;
> - VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
> - dpif_name(dpif_), reply.port_no, upcall_pid);
> } else {
> if (error == EBUSY && *port_nop != ODPP_NONE) {
> VLOG_INFO("%s: requested port %"PRIu32" is in use",
> dpif_name(dpif_), *port_nop);
> }
> - nl_sock_destroy(sock);
> ofpbuf_delete(buf);
> return error;
> }
> ofpbuf_delete(buf);
>
> - if (sock) {
> - error = add_channel(dpif, *port_nop, sock);
> - if (error) {
> - VLOG_INFO("%s: could not add channel for port %s",
> - dpif_name(dpif_), name);
> + if (add_vport_channels(dpif, *port_nop, &upcall_pids)) {
> + VLOG_INFO("%s: could not add channel for port %s",
> + dpif_name(dpif_), name);
>
> - /* Delete the port. */
> - dpif_linux_vport_init(&request);
> - request.cmd = OVS_VPORT_CMD_DEL;
> - request.dp_ifindex = dpif->dp_ifindex;
> - request.port_no = *port_nop;
> - dpif_linux_vport_transact(&request, NULL, NULL);
> + /* Delete the port. */
> + dpif_linux_vport_init(&request);
> + request.cmd = OVS_VPORT_CMD_DEL;
> + request.dp_ifindex = dpif->dp_ifindex;
> + request.port_no = *port_nop;
> + dpif_linux_vport_transact(&request, NULL, NULL);
>
> - nl_sock_destroy(sock);
> - return error;
> - }
> + return error;
> + } else {
> + dpif_linux_vport_init(&request);
> + request.cmd = OVS_VPORT_CMD_SET;
> + request.dp_ifindex = dpif->dp_ifindex;
> + request.port_no = *port_nop;
> + request.n_pids = dpif->n_handlers;
> + request.upcall_pids = upcall_pids;
> + dpif_linux_vport_transact(&request, NULL, NULL);
> + free(upcall_pids);
> }
>
> return 0;
> @@ -588,9 +631,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev
> *netdev,
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> int error;
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> + fat_rwlock_wrlock(&dpif->upcall_lock);
> error = dpif_linux_port_add__(dpif_, netdev, port_nop);
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_unlock(&dpif->upcall_lock);
>
> return error;
> }
> @@ -608,7 +651,7 @@ dpif_linux_port_del__(struct dpif *dpif_, odp_port_t
> port_no)
> vport.port_no = port_no;
> error = dpif_linux_vport_transact(&vport, NULL, NULL);
>
> - del_channel(dpif, port_no);
> + del_vport_channels(dpif, port_no);
>
> return error;
> }
> @@ -619,9 +662,9 @@ dpif_linux_port_del(struct dpif *dpif_, odp_port_t
> port_no)
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> int error;
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> + fat_rwlock_wrlock(&dpif->upcall_lock);
> error = dpif_linux_port_del__(dpif_, port_no);
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_unlock(&dpif->upcall_lock);
>
> return error;
> }
> @@ -672,21 +715,26 @@ dpif_linux_port_query_by_name(const struct dpif
> *dpif, const char *devname,
> }
>
> static uint32_t
> -dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)
> +dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
> + uint32_t hash)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> uint32_t port_idx = odp_to_u32(port_no);
> uint32_t pid = 0;
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> - if (dpif->epoll_fd >= 0) {
> + fat_rwlock_wrlock(&dpif->upcall_lock);
> + if (dpif->epolls) {
> /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
> * channel, since it is not heavily loaded. */
> uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
> - const struct nl_sock *sock = dpif->channels[idx].sock;
> - pid = sock ? nl_sock_pid(sock) : 0;
> + const struct dpif_channel *ch = dpif->channels[idx];
> +
> + if (ch) {
> + pid = ch[hash % dpif->n_handlers].sock
> + ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0;
> + }
> }
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_unlock(&dpif->upcall_lock);
>
> return pid;
> }
> @@ -1274,10 +1322,10 @@ dpif_linux_operate(struct dpif *dpif, struct
> dpif_op **ops, size_t n_ops)
> }
>
> /* Synchronizes 'dpif->channels' with the set of vports currently in
> 'dpif' in
> - * the kernel, by adding a new channel for any kernel vport that lacks
> one and
> - * deleting any channels that have no backing kernel vports. */
> + * the kernel, by adding a new set of channels for any kernel vport that
> lacks
> + * one and deleting any channels that have no backing kernel vports. */
> static int
> -dpif_linux_refresh_channels(struct dpif *dpif_)
> +dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> unsigned long int *keep_channels;
> @@ -1287,52 +1335,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
> int retval = 0;
> size_t i;
>
> - /* To start with, we need an epoll fd. */
> - if (dpif->epoll_fd < 0) {
> - dpif->epoll_fd = epoll_create(10);
> - if (dpif->epoll_fd < 0) {
> - return errno;
> + if (dpif->n_handlers != n_handlers) {
> + destroy_all_channels(dpif);
> + dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls);
> +
> + for (i = 0; i < n_handlers; i++) {
> + dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ?
> + dpif->uc_array_size :
> 10);
> + if (dpif->epolls[i].epoll_fd < 0) {
> + return errno;
> + }
> }
> + dpif->n_handlers = n_handlers;
> + }
> +
> + for (i = 0; i < n_handlers; i++) {
> + dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
> }
>
> keep_channels_nbits = dpif->uc_array_size;
> keep_channels = bitmap_allocate(keep_channels_nbits);
>
> - dpif->n_events = dpif->event_offset = 0;
> -
> dpif_linux_port_dump_start__(dpif_, &dump);
> while (!dpif_linux_port_dump_next__(dpif_, &dump, &vport)) {
> uint32_t port_no = odp_to_u32(vport.port_no);
> - struct nl_sock *sock = (port_no < dpif->uc_array_size
> - ? dpif->channels[port_no].sock
> - : NULL);
> - bool new_sock = !sock;
> + struct dpif_channel *ch = (port_no < dpif->uc_array_size
> + ? dpif->channels[port_no]
> + : NULL);
> + uint32_t *upcall_pids = NULL;
> int error;
>
> - if (new_sock) {
> - error = nl_sock_create(NETLINK_GENERIC, &sock);
> + if (!ch) {
> + error = add_vport_channels(dpif, vport.port_no, &upcall_pids);
> if (error) {
> + VLOG_INFO("%s: could not add channels for port %s",
> + dpif_name(dpif_), vport.name);
> retval = error;
> goto error;
> }
> + } else {
> + channels_to_pids(ch, dpif->n_handlers, &upcall_pids);
> }
>
> /* Configure the vport to deliver misses to 'sock'. */
> - if (!vport.upcall_pid || *vport.upcall_pid != nl_sock_pid(sock)) {
> - uint32_t upcall_pid = nl_sock_pid(sock);
> + if (!vport.upcall_pids
> + || vport.n_pids != dpif->n_handlers
> + || memcmp(upcall_pids, vport.upcall_pids, n_handlers * sizeof
> + *upcall_pids)) {
> struct dpif_linux_vport vport_request;
>
> dpif_linux_vport_init(&vport_request);
> vport_request.cmd = OVS_VPORT_CMD_SET;
> vport_request.dp_ifindex = dpif->dp_ifindex;
> vport_request.port_no = vport.port_no;
> - vport_request.upcall_pid = &upcall_pid;
> + vport_request.n_pids = dpif->n_handlers;
> + vport_request.upcall_pids = upcall_pids;
> error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
> - if (!error) {
> - VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid
> %"PRIu32,
> - dpif_name(&dpif->dpif), vport_request.port_no,
> - upcall_pid);
> - } else {
> + if (error) {
> VLOG_WARN_RL(&error_rl,
> "%s: failed to set upcall pid on port: %s",
> dpif_name(&dpif->dpif), ovs_strerror(error));
> @@ -1348,31 +1407,22 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
> }
> }
>
> - if (new_sock) {
> - error = add_channel(dpif, vport.port_no, sock);
> - if (error) {
> - VLOG_INFO("%s: could not add channel for port %s",
> - dpif_name(dpif_), vport.name);
> - retval = error;
> - goto error;
> - }
> - }
> -
> if (port_no < keep_channels_nbits) {
> bitmap_set1(keep_channels, port_no);
> }
> continue;
>
> error:
> - nl_sock_destroy(sock);
> + del_vport_channels(dpif, vport.port_no);
> }
> nl_dump_done(&dump);
>
> /* Discard any saved channels that we didn't reuse. */
> for (i = 0; i < keep_channels_nbits; i++) {
> if (!bitmap_is_set(keep_channels, i)) {
> - nl_sock_destroy(dpif->channels[i].sock);
> - dpif->channels[i].sock = NULL;
> + del_vport_channels(dpif, u32_to_odp(i));
> + free(dpif->channels[i]);
> + dpif->channels[i] = NULL;
> }
> }
> free(keep_channels);
> @@ -1381,29 +1431,32 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
> }
>
> static int
> -dpif_linux_recv_set__(struct dpif *dpif_, bool enable)
> +dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t
> n_handlers)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> - if ((dpif->epoll_fd >= 0) == enable) {
> + if ((dpif->epolls != NULL) == enable) {
> + if (enable && dpif->n_handlers != n_handlers) {
> + dpif_linux_refresh_channels(dpif_, n_handlers);
> + }
> return 0;
> } else if (!enable) {
> - destroy_channels(dpif);
> + destroy_all_channels(dpif);
> return 0;
> } else {
> - return dpif_linux_refresh_channels(dpif_);
> + return dpif_linux_refresh_channels(dpif_, n_handlers);
> }
> }
>
> static int
> -dpif_linux_recv_set(struct dpif *dpif_, bool enable)
> +dpif_linux_recv_set(struct dpif *dpif_, bool enable, uint32_t n_handlers)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> int error;
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> - error = dpif_linux_recv_set__(dpif_, enable);
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_wrlock(&dpif->upcall_lock);
> + error = dpif_linux_recv_set__(dpif_, enable, n_handlers);
> + fat_rwlock_unlock(&dpif->upcall_lock);
>
> return error;
> }
> @@ -1482,38 +1535,39 @@ parse_odp_packet(struct ofpbuf *buf, struct
> dpif_upcall *upcall,
> }
>
> static int
> -dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,
> - struct ofpbuf *buf)
> +dpif_linux_recv__(struct dpif *dpif_, uint32_t handler_id,
> + struct dpif_upcall *upcall, struct ofpbuf *buf)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> + struct dpif_epoll *epoll = dpif->epolls ? &dpif->epolls[handler_id] :
> NULL;
> int read_tries = 0;
>
> - if (dpif->epoll_fd < 0) {
> - return EAGAIN;
> + if (!epoll) {
> + return EAGAIN;
> }
>
> - if (dpif->event_offset >= dpif->n_events) {
> + if (epoll->event_offset >= epoll->n_events) {
> int retval;
>
> - dpif->event_offset = dpif->n_events = 0;
> + epoll->event_offset = epoll->n_events = 0;
>
> do {
> - retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
> + retval = epoll_wait(epoll->epoll_fd, epoll->epoll_events,
> dpif->uc_array_size, 0);
> } while (retval < 0 && errno == EINTR);
> if (retval < 0) {
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> VLOG_WARN_RL(&rl, "epoll_wait failed (%s)",
> ovs_strerror(errno));
> } else if (retval > 0) {
> - dpif->n_events = retval;
> + epoll->n_events = retval;
> }
> }
>
> - while (dpif->event_offset < dpif->n_events) {
> - int idx = dpif->epoll_events[dpif->event_offset].data.u32;
> - struct dpif_channel *ch = &dpif->channels[idx];
> + while (epoll->event_offset < epoll->n_events) {
> + int idx = epoll->epoll_events[epoll->event_offset].data.u32;
> + struct dpif_channel *ch = &dpif->channels[idx][handler_id];
>
> - dpif->event_offset++;
> + epoll->event_offset++;
>
> for (;;) {
> int dp_ifindex;
> @@ -1529,7 +1583,7 @@ dpif_linux_recv__(struct dpif *dpif_, struct
> dpif_upcall *upcall,
> * packets that the buffer overflowed. Try again
> * immediately because there's almost certainly a packet
> * waiting for us. */
> - report_loss(dpif_, ch);
> + report_loss(dpif_, ch, idx, handler_id);
> continue;
> }
>
> @@ -1554,29 +1608,29 @@ dpif_linux_recv__(struct dpif *dpif_, struct
> dpif_upcall *upcall,
> }
>
> static int
> -dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
> - struct ofpbuf *buf)
> +dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id,
> + struct dpif_upcall *upcall, struct ofpbuf *buf)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> int error;
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> - error = dpif_linux_recv__(dpif_, upcall, buf);
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_rdlock(&dpif->upcall_lock);
> + error = dpif_linux_recv__(dpif_, handler_id, upcall, buf);
> + fat_rwlock_unlock(&dpif->upcall_lock);
>
> return error;
> }
>
> static void
> -dpif_linux_recv_wait(struct dpif *dpif_)
> +dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> - if (dpif->epoll_fd >= 0) {
> - poll_fd_wait(dpif->epoll_fd, POLLIN);
> + fat_rwlock_rdlock(&dpif->upcall_lock);
> + if (dpif->epolls) {
> + poll_fd_wait(dpif->epolls[handler_id].epoll_fd, POLLIN);
> }
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_unlock(&dpif->upcall_lock);
> }
>
> static void
> @@ -1584,18 +1638,19 @@ dpif_linux_recv_purge(struct dpif *dpif_)
> {
> struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> - ovs_mutex_lock(&dpif->upcall_lock);
> - if (dpif->epoll_fd >= 0) {
> - struct dpif_channel *ch;
> + fat_rwlock_rdlock(&dpif->upcall_lock);
> + if (dpif->epolls) {
> + size_t i, j;
>
> - for (ch = dpif->channels; ch <
> &dpif->channels[dpif->uc_array_size];
> - ch++) {
> - if (ch->sock) {
> - nl_sock_drain(ch->sock);
> + for (i = 0; i < dpif->uc_array_size; i++ ) {
> + if (dpif->channels[i]) {
> + for (j = 0; j < dpif->n_handlers; j++) {
> + nl_sock_drain(dpif->channels[i][j].sock);
> + }
> }
> }
> }
> - ovs_mutex_unlock(&dpif->upcall_lock);
> + fat_rwlock_unlock(&dpif->upcall_lock);
> }
>
> const struct dpif_class dpif_linux_class = {
> @@ -1701,7 +1756,7 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport
> *vport,
> [OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 },
> [OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 },
> [OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len =
> IFNAMSIZ },
> - [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 },
> + [OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NL_A_UNSPEC, .optional =
> true },
> [OVS_VPORT_ATTR_STATS] = { NL_POLICY_FOR(struct ovs_vport_stats),
> .optional = true },
> [OVS_VPORT_ATTR_OPTIONS] = { .type = NL_A_NESTED, .optional =
> true },
> @@ -1731,9 +1786,12 @@ dpif_linux_vport_from_ofpbuf(struct
> dpif_linux_vport *vport,
> vport->port_no = nl_attr_get_odp_port(a[OVS_VPORT_ATTR_PORT_NO]);
> vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]);
> vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]);
> - if (a[OVS_VPORT_ATTR_UPCALL_PID]) {
> - vport->upcall_pid = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]);
> + if (a[OVS_VPORT_ATTR_UPCALL_PIDS]) {
> + vport->n_pids = nl_attr_get_size(a[OVS_VPORT_ATTR_UPCALL_PIDS])
> + / (sizeof *vport->upcall_pids);
> + vport->upcall_pids = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PIDS]);
> }
> +
> if (a[OVS_VPORT_ATTR_STATS]) {
> vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]);
> }
> @@ -1770,8 +1828,10 @@ dpif_linux_vport_to_ofpbuf(const struct
> dpif_linux_vport *vport,
> nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name);
> }
>
> - if (vport->upcall_pid) {
> - nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID,
> *vport->upcall_pid);
> + if (vport->upcall_pids) {
> + nl_msg_put_unspec(buf, OVS_VPORT_ATTR_UPCALL_PIDS,
> + vport->upcall_pids,
> + vport->n_pids * sizeof *vport->upcall_pids);
> }
>
> if (vport->stats) {
> @@ -2176,9 +2236,9 @@ dpif_linux_flow_get_stats(const struct
> dpif_linux_flow *flow,
> /* Logs information about a packet that was recently lost in 'ch' (in
> * 'dpif_'). */
> static void
> -report_loss(struct dpif *dpif_, struct dpif_channel *ch)
> +report_loss(struct dpif *dpif_, struct dpif_channel *ch, uint32_t ch_idx,
> + uint32_t handler_id)
> {
> - struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
> struct ds s;
>
> @@ -2192,7 +2252,7 @@ report_loss(struct dpif *dpif_, struct dpif_channel
> *ch)
> time_msec() - ch->last_poll);
> }
>
> - VLOG_WARN("%s: lost packet on channel %"PRIdPTR"%s",
> - dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s));
> + VLOG_WARN("%s: lost packet on channel %u of handler %u",
> dpif_name(dpif_),
> + ch_idx, handler_id);
> ds_destroy(&s);
> }
> diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h
> index ec94ccf..02c7b03 100644
> --- a/lib/dpif-linux.h
> +++ b/lib/dpif-linux.h
> @@ -32,6 +32,7 @@ struct dpif_linux_vport {
>
> /* ovs_vport header. */
> int dp_ifindex;
> + uint32_t n_pids;
> odp_port_t port_no; /* ODPP_NONE if unknown. */
> enum ovs_vport_type type;
>
> @@ -41,7 +42,7 @@ struct dpif_linux_vport {
> * 32-bit boundaries, so use get_unaligned_u64() to access its values.
> */
> const char *name; /* OVS_VPORT_ATTR_NAME. */
> - const uint32_t *upcall_pid; /* OVS_VPORT_ATTR_UPCALL_PID.
> */
> + const uint32_t *upcall_pids; /* OVS_VPORT_ATTR_UPCALL_PIDS.
> */
> const struct ovs_vport_stats *stats; /* OVS_VPORT_ATTR_STATS. */
> const struct nlattr *options; /* OVS_VPORT_ATTR_OPTIONS. */
> size_t options_len;
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 73eb99d..9033fba 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -1429,7 +1429,8 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
> }
>
> static int
> -dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
> +dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED,
> + uint32_t n_handlers OVS_UNUSED)
> {
> return 0;
> }
> @@ -1458,8 +1459,8 @@ find_nonempty_queue(struct dp_netdev *dp)
> }
>
> static int
> -dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
> - struct ofpbuf *buf)
> +dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id OVS_UNUSED,
> + struct dpif_upcall *upcall, struct ofpbuf *buf)
> {
> struct dp_netdev *dp = get_dp_netdev(dpif);
> struct dp_netdev_queue *q;
> @@ -1485,7 +1486,7 @@ dpif_netdev_recv(struct dpif *dpif, struct
> dpif_upcall *upcall,
> }
>
> static void
> -dpif_netdev_recv_wait(struct dpif *dpif)
> +dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)
> {
> struct dp_netdev *dp = get_dp_netdev(dpif);
> uint64_t seq;
> diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> index adc5242..7ef10bb 100644
> --- a/lib/dpif-provider.h
> +++ b/lib/dpif-provider.h
> @@ -146,7 +146,8 @@ struct dpif_class {
>
> /* Returns the Netlink PID value to supply in
> OVS_ACTION_ATTR_USERSPACE
> * actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in
> - * flows whose packets arrived on port 'port_no'.
> + * flows whose packets arrived on port 'port_no' and 5-tuple hash is
> + * 'hash'.
> *
> * A 'port_no' of UINT32_MAX should be treated as a special case. The
> * implementation should return a reserved PID, not allocated to any
> port,
> @@ -158,7 +159,8 @@ struct dpif_class {
> *
> * A dpif provider that doesn't have meaningful Netlink PIDs can use
> NULL
> * for this function. This is equivalent to always returning 0. */
> - uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no);
> + uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no,
> + uint32_t hash);
>
> /* Attempts to begin dumping the ports in a dpif. On success,
> returns 0
> * and initializes '*statep' with any data needed for iteration. On
> @@ -324,16 +326,17 @@ struct dpif_class {
> * Turning packet receive off and then back on is allowed to change
> Netlink
> * PID assignments (see ->port_get_pid()). The client is responsible
> for
> * updating flows as necessary if it does this. */
> - int (*recv_set)(struct dpif *dpif, bool enable);
> + int (*recv_set)(struct dpif *dpif, bool enable, uint32_t n_handlers);
>
> /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
> * priority value used for setting packet priority. */
> int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
> uint32_t *priority);
>
> - /* Polls for an upcall from 'dpif'. If successful, stores the upcall
> into
> - * '*upcall', using 'buf' for storage. Should only be called if
> 'recv_set'
> - * has been used to enable receiving packets from 'dpif'.
> + /* Polls for an upcall from 'dpif' for handler with 'handler_id'. If
> + * successful, stores the upcall into '*upcall', using 'buf' for
> storage.
> + * Should only be called if 'recv_set' has been used to enable
> receiving
> + * packets from 'dpif'.
> *
> * The implementation should point 'upcall->key' and
> 'upcall->userdata'
> * (if any) into data in the caller-provided 'buf'. The
> implementation may
> @@ -349,12 +352,13 @@ struct dpif_class {
> *
> * This function must not block. If no upcall is pending when it is
> * called, it should return EAGAIN without blocking. */
> - int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
> - struct ofpbuf *buf);
> + int (*recv)(struct dpif *dpif, uint32_t handler_id,
> + struct dpif_upcall *upcall, struct ofpbuf *buf);
>
> - /* Arranges for the poll loop to wake up when 'dpif' has a message
> queued
> - * to be received with the recv member function. */
> - void (*recv_wait)(struct dpif *dpif);
> + /* Arranges for the poll loop for handler with 'handler_id' to wake
> up when
> + * 'dpif' has a message queued to be received with the recv member
> + * function by the handler. */
> + void (*recv_wait)(struct dpif *dpif, uint32_t handler_id);
>
> /* Throws away any queued upcalls that 'dpif' currently has ready to
> * return. */
> diff --git a/lib/dpif.c b/lib/dpif.c
> index 2b79a6e..72897b3 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -634,7 +634,7 @@ dpif_port_query_by_name(const struct dpif *dpif, const
> char *devname,
>
> /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE
> actions
> * as the OVS_USERSPACE_ATTR_PID attribute's value, for use in flows whose
> - * packets arrived on port 'port_no'.
> + * packets arrived on port 'port_no' and 5-tuple hash is 'hash'.
> *
> * A 'port_no' of ODPP_NONE is a special case: it returns a reserved PID,
> not
> * allocated to any port, that the client may use for special purposes.
> @@ -645,10 +645,10 @@ dpif_port_query_by_name(const struct dpif *dpif,
> const char *devname,
> * update all of the flows that it installed that contain
> * OVS_ACTION_ATTR_USERSPACE actions. */
> uint32_t
> -dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no)
> +dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no, uint32_t
> hash)
> {
> return (dpif->dpif_class->port_get_pid
> - ? (dpif->dpif_class->port_get_pid)(dpif, port_no)
> + ? (dpif->dpif_class->port_get_pid)(dpif, port_no, hash)
> : 0);
> }
>
> @@ -1248,24 +1248,18 @@ dpif_upcall_type_to_string(enum dpif_upcall_type
> type)
> }
> }
>
> -/* Enables or disables receiving packets with dpif_recv() on 'dpif'.
> Returns 0
> - * if successful, otherwise a positive errno value.
> - *
> - * Turning packet receive off and then back on may change the Netlink PID
> - * assignments returned by dpif_port_get_pid(). If the client does this,
> it
> - * must update all of the flows that have OVS_ACTION_ATTR_USERSPACE
> actions
> - * using the new PID assignment. */
> int
> -dpif_recv_set(struct dpif *dpif, bool enable)
> +dpif_recv_set(struct dpif *dpif, bool enable, uint32_t n_handlers)
> {
> - int error = dpif->dpif_class->recv_set(dpif, enable);
> + int error = dpif->dpif_class->recv_set(dpif, enable, n_handlers);
> log_operation(dpif, "recv_set", error);
> return error;
> }
>
> -/* Polls for an upcall from 'dpif'. If successful, stores the upcall into
> - * '*upcall', using 'buf' for storage. Should only be called if
> - * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
> +/* Polls for an upcall from 'dpif' for handler with 'handler_id'. If
> + * successful, stores the upcall into '*upcall', using 'buf' for storage.
> + * Should only be called if dpif_recv_set() has been used to enable
> receiving
> + * packets on 'dpif'.
> *
> * 'upcall->key' and 'upcall->userdata' point into data in the
> caller-provided
> * 'buf', so their memory cannot be freed separately from 'buf'.
> @@ -1280,9 +1274,10 @@ dpif_recv_set(struct dpif *dpif, bool enable)
> * Returns 0 if successful, otherwise a positive errno value. Returns
> EAGAIN
> * if no upcall is immediately available. */
> int
> -dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf
> *buf)
> +dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall
> *upcall,
> + struct ofpbuf *buf)
> {
> - int error = dpif->dpif_class->recv(dpif, upcall, buf);
> + int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
> if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
> struct ds flow;
> char *packet;
> @@ -1316,12 +1311,13 @@ dpif_recv_purge(struct dpif *dpif)
> }
> }
>
> -/* Arranges for the poll loop to wake up when 'dpif' has a message queued
> to be
> - * received with dpif_recv(). */
> +/* Arranges for the poll loop for handler with 'handler_id' to wake up
> when
> + * 'dpif' has a message queued to be received with dpif_recv() by the
> handler.
> + */
> void
> -dpif_recv_wait(struct dpif *dpif)
> +dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
> {
> - dpif->dpif_class->recv_wait(dpif);
> + dpif->dpif_class->recv_wait(dpif, handler_id);
> }
>
> /* Obtains the NetFlow engine type and engine ID for 'dpif' into
> '*engine_type'
> diff --git a/lib/dpif.h b/lib/dpif.h
> index 7f986f9..b6e3fbd 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -445,7 +445,8 @@ int dpif_port_query_by_name(const struct dpif *, const
> char *devname,
> struct dpif_port *);
> int dpif_port_get_name(struct dpif *, odp_port_t port_no,
> char *name, size_t name_size);
> -uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no);
> +uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no,
> + uint32_t hash);
>
> struct dpif_port_dump {
> const struct dpif *dpif;
> @@ -613,10 +614,11 @@ struct dpif_upcall {
> struct nlattr *userdata; /* Argument to OVS_ACTION_ATTR_USERSPACE.
> */
> };
>
> -int dpif_recv_set(struct dpif *, bool enable);
> -int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
> +int dpif_recv_set(struct dpif *, bool enable, uint32_t n_handlers);
> +int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
> + struct ofpbuf *);
> void dpif_recv_purge(struct dpif *);
> -void dpif_recv_wait(struct dpif *);
> +void dpif_recv_wait(struct dpif *, uint32_t handler_id);
>
> /* Miscellaneous. */
>
> diff --git a/lib/flow.c b/lib/flow.c
> index 06ba036..db29db0 100644
> --- a/lib/flow.c
> +++ b/lib/flow.c
> @@ -822,6 +822,24 @@ flow_wildcards_set_reg_mask(struct flow_wildcards
> *wc, int idx, uint32_t mask)
> wc->masks.regs[idx] = mask;
> }
>
> +/* Calculates the 5-tuple hash from the given flow. */
> +uint32_t
> +flow_hash_5tuple(const struct flow *flow, uint32_t basis)
> +{
> + uint32_t hash;
> +
> + if (!flow) {
> + return 0;
> + }
> +
> + hash = (OVS_FORCE int) flow->nw_src
> + ^ (OVS_FORCE int) flow->nw_dst
> + ^ flow->nw_proto ^ (OVS_FORCE int) flow->tp_src
> + ^ (OVS_FORCE int) flow->tp_dst;
> +
> + return jhash_bytes((void *) &hash, sizeof hash, basis);
> +}
> +
> /* Hashes 'flow' based on its L2 through L4 protocol information. */
> uint32_t
> flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis)
> diff --git a/lib/flow.h b/lib/flow.h
> index 3109a84..26871a2 100644
> --- a/lib/flow.h
> +++ b/lib/flow.h
> @@ -323,7 +323,8 @@ void flow_wildcards_fold_minimask_range(struct
> flow_wildcards *,
> uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t
> basis);
> bool flow_wildcards_equal(const struct flow_wildcards *,
> const struct flow_wildcards *);
> -uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis);
> +uint32_t flow_hash_5tuple(const struct flow *, uint32_t basis);
> +uint32_t flow_hash_symmetric_l4(const struct flow *, uint32_t basis);
>
> /* Initialize a flow with random fields that matter for nx_hash_fields. */
> void flow_random_hash_fields(struct flow *);
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index 489012a..e6df302 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -45,26 +45,11 @@
>
> VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
>
> -COVERAGE_DEFINE(upcall_queue_overflow);
> -
> -/* A thread that processes each upcall handed to it by the dispatcher
> thread,
> - * forwards the upcall's packet, and possibly sets up a kernel flow as a
> - * cache. */
> struct handler {
> struct udpif *udpif; /* Parent udpif. */
> pthread_t thread; /* Thread ID. */
> char *name; /* Thread name. */
> -
> - struct ovs_mutex mutex; /* Mutex guarding the following. */
> -
> - /* Atomic queue of unprocessed upcalls. */
> - struct list upcalls OVS_GUARDED;
> - size_t n_upcalls OVS_GUARDED;
> -
> - bool need_signal; /* Only changed by the dispatcher.
> */
> -
> - pthread_cond_t wake_cond; /* Wakes 'thread' while holding
> - 'mutex'. */
> + uint32_t handler_id; /* Handler id. */
> };
>
> /* A thread that processes each kernel flow handed to it by the
> flow_dumper
> @@ -89,9 +74,6 @@ struct revalidator {
> *
> * udpif has two logically separate pieces:
> *
> - * - A "dispatcher" thread that reads upcalls from the kernel and
> dispatches
> - * them to one of several "handler" threads (see struct handler).
> - *
> * - A "flow_dumper" thread that reads the kernel flow table and
> dispatches
> * flows to one of several "revalidator" threads (see struct
> * revalidator). */
> @@ -103,7 +85,6 @@ struct udpif {
>
> uint32_t secret; /* Random seed for upcall hash. */
>
> - pthread_t dispatcher; /* Dispatcher thread ID. */
> pthread_t flow_dumper; /* Flow dumper thread ID. */
>
> struct handler *handlers; /* Upcall handlers. */
> @@ -143,7 +124,7 @@ enum upcall_type {
> };
>
> struct upcall {
> - struct list list_node; /* For queuing upcalls. */
> + bool is_valid;
> struct flow_miss *flow_miss; /* This upcall's flow_miss. */
>
> /* Raw upcall plus data for keeping track of the memory backing it. */
> @@ -216,15 +197,14 @@ struct flow_miss {
> bool put;
> };
>
> -static void upcall_destroy(struct upcall *);
> +static void upcall_destroy(struct upcall *, bool free_upcall);
>
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
>
> -static void recv_upcalls(struct udpif *);
> -static void handle_upcalls(struct handler *handler, struct list *upcalls);
> +static void handle_upcalls(struct handler *handler, struct upcall
> *upcalls,
> + size_t n_upcalls);
> static void *udpif_flow_dumper(void *);
> -static void *udpif_dispatcher(void *);
> static void *udpif_upcall_handler(void *);
> static void *udpif_revalidator(void *);
> static uint64_t udpif_get_n_flows(struct udpif *);
> @@ -307,9 +287,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
> for (i = 0; i < udpif->n_handlers; i++) {
> struct handler *handler = &udpif->handlers[i];
>
> - ovs_mutex_lock(&handler->mutex);
> - xpthread_cond_signal(&handler->wake_cond);
> - ovs_mutex_unlock(&handler->mutex);
> xpthread_join(handler->thread, NULL);
> }
>
> @@ -323,7 +300,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
> }
>
> xpthread_join(udpif->flow_dumper, NULL);
> - xpthread_join(udpif->dispatcher, NULL);
>
> for (i = 0; i < udpif->n_revalidators; i++) {
> struct revalidator *revalidator = &udpif->revalidators[i];
> @@ -347,17 +323,7 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
> }
>
> for (i = 0; i < udpif->n_handlers; i++) {
> - struct handler *handler = &udpif->handlers[i];
> - struct upcall *miss, *next;
> -
> - LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls)
> {
> - list_remove(&miss->list_node);
> - upcall_destroy(miss);
> - }
> - ovs_mutex_destroy(&handler->mutex);
> -
> - xpthread_cond_destroy(&handler->wake_cond);
> - free(handler->name);
> + free(udpif->handlers[i].name);
> }
> latch_poll(&udpif->exit_latch);
>
> @@ -382,10 +348,7 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
> struct handler *handler = &udpif->handlers[i];
>
> handler->udpif = udpif;
> - list_init(&handler->upcalls);
> - handler->need_signal = false;
> - xpthread_cond_init(&handler->wake_cond, NULL);
> - ovs_mutex_init(&handler->mutex);
> + handler->handler_id = i;
> xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
> handler);
> }
> @@ -403,7 +366,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
> xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
> revalidator);
> }
> - xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher,
> udpif);
> xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper,
> udpif);
> }
> }
> @@ -430,16 +392,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct
> simap *usage)
> {
> size_t i;
>
> - simap_increase(usage, "dispatchers", 1);
> simap_increase(usage, "flow_dumpers", 1);
>
> simap_increase(usage, "handlers", udpif->n_handlers);
> - for (i = 0; i < udpif->n_handlers; i++) {
> - struct handler *handler = &udpif->handlers[i];
> - ovs_mutex_lock(&handler->mutex);
> - simap_increase(usage, "handler upcalls", handler->n_upcalls);
> - ovs_mutex_unlock(&handler->mutex);
> - }
>
> simap_increase(usage, "revalidators", udpif->n_revalidators);
> for (i = 0; i < udpif->n_revalidators; i++) {
> @@ -468,12 +423,16 @@ udpif_flush(void)
>
> /* Destroys and deallocates 'upcall'. */
> static void
> -upcall_destroy(struct upcall *upcall)
> +upcall_destroy(struct upcall *upcall, bool free_upcall)
> {
> if (upcall) {
> ofpbuf_uninit(&upcall->dpif_upcall.packet);
> ofpbuf_uninit(&upcall->upcall_buf);
> - free(upcall);
> +
> + upcall->is_valid = false;
> + if (free_upcall) {
> + free(upcall);
> + }
> }
> }
>
> @@ -499,24 +458,6 @@ udpif_get_n_flows(struct udpif *udpif)
> return flow_count;
> }
>
> -/* The dispatcher thread is responsible for receiving upcalls from the
> kernel,
> - * assigning them to a upcall_handler thread. */
> -static void *
> -udpif_dispatcher(void *arg)
> -{
> - struct udpif *udpif = arg;
> -
> - set_subprogram_name("dispatcher");
> - while (!latch_is_set(&udpif->exit_latch)) {
> - recv_upcalls(udpif);
> - dpif_recv_wait(udpif->dpif);
> - latch_wait(&udpif->exit_latch);
> - poll_block();
> - }
> -
> - return NULL;
> -}
> -
> static void *
> udpif_flow_dumper(void *arg)
> {
> @@ -627,38 +568,44 @@ udpif_flow_dumper(void *arg)
> return NULL;
> }
>
> -/* The miss handler thread is responsible for processing miss upcalls
> retrieved
> - * by the dispatcher thread. Once finished it passes the processed miss
> - * upcalls to ofproto-dpif where they're installed in the datapath. */
> static void *
> udpif_upcall_handler(void *arg)
> {
> struct handler *handler = arg;
> + struct udpif *udpif = handler->udpif;
> + struct upcall upcalls[FLOW_MISS_MAX_BATCH];
>
> handler->name = xasprintf("handler_%u", ovsthread_id_self());
> set_subprogram_name("%s", handler->name);
>
> while (!latch_is_set(&handler->udpif->exit_latch)) {
> - struct list misses = LIST_INITIALIZER(&misses);
> - size_t i;
> -
> - ovs_mutex_lock(&handler->mutex);
> - if (!handler->n_upcalls) {
> - ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
> - }
> + size_t i, n_upcalls;
>
> for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
> - if (handler->n_upcalls) {
> - handler->n_upcalls--;
> - list_push_back(&misses,
> list_pop_front(&handler->upcalls));
> - } else {
> + struct upcall *upcall = &upcalls[i];
> + int error;
> +
> + ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> + sizeof upcall->upcall_stub);
> + error = dpif_recv(udpif->dpif, handler->handler_id,
> + &upcall->dpif_upcall, &upcall->upcall_buf);
> + if (error) {
> + /* upcall_destroy() can only be called on successfully
> received
> + * upcalls. */
> + ofpbuf_uninit(&upcall->upcall_buf);
> break;
> }
> + upcall->is_valid = true;
> }
> - ovs_mutex_unlock(&handler->mutex);
> -
> - handle_upcalls(handler, &misses);
>
> + n_upcalls = i;
> + if (!n_upcalls) {
> + dpif_recv_wait(udpif->dpif, handler->handler_id);
> + latch_wait(&udpif->exit_latch);
> + poll_block();
> + } else {
> + handle_upcalls(handler, upcalls, n_upcalls);
> + }
> coverage_clear();
> }
>
> @@ -765,98 +712,11 @@ classify_upcall(const struct upcall *upcall)
> }
> }
>
> -static void
> -recv_upcalls(struct udpif *udpif)
> -{
> - int n;
> -
> - for (;;) {
> - uint32_t hash = udpif->secret;
> - struct handler *handler;
> - struct upcall *upcall;
> - size_t n_bytes, left;
> - struct nlattr *nla;
> - int error;
> -
> - upcall = xmalloc(sizeof *upcall);
> - ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> - sizeof upcall->upcall_stub);
> - error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
> - &upcall->upcall_buf);
> - if (error) {
> - /* upcall_destroy() can only be called on successfully
> received
> - * upcalls. */
> - ofpbuf_uninit(&upcall->upcall_buf);
> - free(upcall);
> - break;
> - }
> -
> - n_bytes = 0;
> - NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
> - upcall->dpif_upcall.key_len) {
> - enum ovs_key_attr type = nl_attr_type(nla);
> - if (type == OVS_KEY_ATTR_IN_PORT
> - || type == OVS_KEY_ATTR_TCP
> - || type == OVS_KEY_ATTR_UDP) {
> - if (nl_attr_get_size(nla) == 4) {
> - hash = mhash_add(hash, nl_attr_get_u32(nla));
> - n_bytes += 4;
> - } else {
> - VLOG_WARN_RL(&rl,
> - "Netlink attribute with incorrect
> size.");
> - }
> - }
> - }
> - hash = mhash_finish(hash, n_bytes);
> -
> - handler = &udpif->handlers[hash % udpif->n_handlers];
> -
> - ovs_mutex_lock(&handler->mutex);
> - if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
> - list_push_back(&handler->upcalls, &upcall->list_node);
> - if (handler->n_upcalls == 0) {
> - handler->need_signal = true;
> - }
> - handler->n_upcalls++;
> - if (handler->need_signal &&
> - handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
> - handler->need_signal = false;
> - xpthread_cond_signal(&handler->wake_cond);
> - }
> - ovs_mutex_unlock(&handler->mutex);
> - if (!VLOG_DROP_DBG(&rl)) {
> - struct ds ds = DS_EMPTY_INITIALIZER;
> -
> - odp_flow_key_format(upcall->dpif_upcall.key,
> - upcall->dpif_upcall.key_len,
> - &ds);
> - VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
> - ds_destroy(&ds);
> - }
> - } else {
> - ovs_mutex_unlock(&handler->mutex);
> - COVERAGE_INC(upcall_queue_overflow);
> - upcall_destroy(upcall);
> - }
> - }
> -
> - for (n = 0; n < udpif->n_handlers; ++n) {
> - struct handler *handler = &udpif->handlers[n];
> -
> - if (handler->need_signal) {
> - handler->need_signal = false;
> - ovs_mutex_lock(&handler->mutex);
> - xpthread_cond_signal(&handler->wake_cond);
> - ovs_mutex_unlock(&handler->mutex);
> - }
> - }
> -}
> -
> /* Calculates slow path actions for 'xout'. 'buf' must statically be
> * initialized with at least 128 bytes of space. */
> static void
> compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
> - odp_port_t odp_in_port, struct ofpbuf *buf)
> + struct flow *flow, odp_port_t odp_in_port, struct
> ofpbuf *buf)
> {
> union user_action_cookie cookie;
> odp_port_t port;
> @@ -869,7 +729,7 @@ compose_slow_path(struct udpif *udpif, struct
> xlate_out *xout,
> port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
> ? ODPP_NONE
> : odp_in_port;
> - pid = dpif_port_get_pid(udpif->dpif, port);
> + pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
> odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
> }
>
> @@ -889,7 +749,8 @@ flow_miss_find(struct hmap *todo, const struct
> ofproto_dpif *ofproto,
> }
>
> static void
> -handle_upcalls(struct handler *handler, struct list *upcalls)
> +handle_upcalls(struct handler *handler, struct upcall *upcalls,
> + size_t n_upcalls)
> {
> struct hmap misses = HMAP_INITIALIZER(&misses);
> struct udpif *udpif = handler->udpif;
> @@ -898,7 +759,6 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
> struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
> struct flow_miss *miss, *next_miss;
> - struct upcall *upcall, *next;
> size_t n_misses, n_ops, i;
> unsigned int flow_limit;
> bool fail_open, may_put;
> @@ -927,7 +787,8 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> * datapath flow.)
> */
> n_misses = 0;
> - LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
> + for (i = 0; i < n_upcalls; i++) {
> + struct upcall *upcall = &upcalls[i];
> struct dpif_upcall *dupcall = &upcall->dpif_upcall;
> struct flow_miss *miss = &miss_buf[n_misses];
> struct ofpbuf *packet = &dupcall->packet;
> @@ -956,8 +817,7 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> dupcall->key, dupcall->key_len, NULL, 0,
> NULL, 0,
> NULL);
> }
> - list_remove(&upcall->list_node);
> - upcall_destroy(upcall);
> + upcall_destroy(upcall, false);
> continue;
> }
>
> @@ -1039,8 +899,7 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> dpif_ipfix_unref(ipfix);
> dpif_sflow_unref(sflow);
>
> - list_remove(&upcall->list_node);
> - upcall_destroy(upcall);
> + upcall_destroy(upcall, false);
> }
>
> /* Initialize each 'struct flow_miss's ->xout.
> @@ -1083,12 +942,17 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> * The loop fills 'ops' with an array of operations to execute in the
> * datapath. */
> n_ops = 0;
> - LIST_FOR_EACH (upcall, list_node, upcalls) {
> + for (i = 0; i < n_upcalls; i++) {
> + struct upcall *upcall = &upcalls[i];
> struct flow_miss *miss = upcall->flow_miss;
> struct ofpbuf *packet = &upcall->dpif_upcall.packet;
> struct dpif_op *op;
> ovs_be16 flow_vlan_tci;
>
> + if (!upcall->is_valid) {
> + continue;
> + }
> +
> /* Save a copy of flow.vlan_tci in case it is changed to
> * generate proper mega flow masks for VLAN splinter flows. */
> flow_vlan_tci = miss->flow.vlan_tci;
> @@ -1162,7 +1026,8 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>
> ofpbuf_use_stack(&buf, miss->slow_path_buf,
> sizeof miss->slow_path_buf);
> - compose_slow_path(udpif, &miss->xout, miss->odp_in_port,
> &buf);
> + compose_slow_path(udpif, &miss->xout, &miss->flow,
> + miss->odp_in_port, &buf);
> op->u.flow_put.actions = buf.data;
> op->u.flow_put.actions_len = buf.size;
> }
> @@ -1197,11 +1062,16 @@ handle_upcalls(struct handler *handler, struct
> list *upcalls)
> *
> * Copy packets before they are modified by execution. */
> if (fail_open) {
> - LIST_FOR_EACH (upcall, list_node, upcalls) {
> + for (i = 0; i < n_upcalls; i++) {
> + struct upcall *upcall = &upcalls[i];
> struct flow_miss *miss = upcall->flow_miss;
> struct ofpbuf *packet = &upcall->dpif_upcall.packet;
> struct ofproto_packet_in *pin;
>
> + if (!upcall->is_valid) {
> + continue;
> + }
> +
> pin = xmalloc(sizeof *pin);
> pin->up.packet = xmemdup(packet->data, packet->size);
> pin->up.packet_len = packet->size;
> @@ -1227,9 +1097,10 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
> }
> hmap_destroy(&misses);
>
> - LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
> - list_remove(&upcall->list_node);
> - upcall_destroy(upcall);
> + for (i = 0; i < n_upcalls; i++) {
> + if (upcalls[i].is_valid) {
> + upcall_destroy(&upcalls[i], false);
> + }
> }
> }
>
> @@ -1324,7 +1195,7 @@ revalidate_ukey(struct udpif *udpif, struct
> udpif_flow_dump *udump,
> xout.odp_actions.size);
> } else {
> ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof
> slow_path_buf);
> - compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
> + compose_slow_path(udpif, &xout, &flow, odp_in_port,
> &xout_actions);
> }
>
> if (!ofpbuf_equal(&xout_actions, actions)) {
> @@ -1525,16 +1396,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int
> argc OVS_UNUSED,
> ds_put_format(&ds, "\tdump duration : %lldms\n",
> udpif->dump_duration);
>
> ds_put_char(&ds, '\n');
> - for (i = 0; i < udpif->n_handlers; i++) {
> - struct handler *handler = &udpif->handlers[i];
> -
> - ovs_mutex_lock(&handler->mutex);
> - ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
> - handler->name, handler->n_upcalls);
> - ovs_mutex_unlock(&handler->mutex);
> - }
> -
> - ds_put_char(&ds, '\n');
> for (i = 0; i < n_revalidators; i++) {
> struct revalidator *revalidator = &udpif->revalidators[i];
>
> diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
> index ad44582..e85aa7a 100644
> --- a/ofproto/ofproto-dpif-xlate.c
> +++ b/ofproto/ofproto-dpif-xlate.c
> @@ -1483,8 +1483,9 @@ compose_sample_action(const struct xbridge *xbridge,
> actions_offset = nl_msg_start_nested(odp_actions,
> OVS_SAMPLE_ATTR_ACTIONS);
>
> odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
> - pid = dpif_port_get_pid(xbridge->dpif, odp_port);
> - cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
> odp_actions);
> + pid = dpif_port_get_pid(xbridge->dpif, odp_port,
> flow_hash_5tuple(flow, 0));
> + cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
> + odp_actions);
>
> nl_msg_end_nested(odp_actions, actions_offset);
> nl_msg_end_nested(odp_actions, sample_offset);
> diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
> index 7b3e1eb..a53b3fd 100644
> --- a/ofproto/ofproto-dpif.c
> +++ b/ofproto/ofproto-dpif.c
> @@ -470,7 +470,8 @@ type_run(const char *type)
>
> backer->recv_set_enable = true;
>
> - error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
> + error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
> + n_handlers);
> if (error) {
> VLOG_ERR("Failed to enable receiving packets in dpif.");
> return error;
> @@ -480,6 +481,7 @@ type_run(const char *type)
> }
>
> if (backer->recv_set_enable) {
> + dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);
> udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
> }
>
> @@ -885,7 +887,7 @@ open_dpif_backer(const char *type, struct dpif_backer
> **backerp)
>
> shash_add(&all_dpif_backers, type, backer);
>
> - error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
> + error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
> n_handlers);
> if (error) {
> VLOG_ERR("failed to listen on datapath of type %s: %s",
> type, ovs_strerror(error));
> @@ -930,7 +932,7 @@ check_variable_length_userdata(struct dpif_backer
> *backer)
> ofpbuf_init(&actions, 64);
> start = nl_msg_start_nested(&actions, OVS_ACTION_ATTR_USERSPACE);
> nl_msg_put_u32(&actions, OVS_USERSPACE_ATTR_PID,
> - dpif_port_get_pid(backer->dpif, ODPP_NONE));
> + dpif_port_get_pid(backer->dpif, ODPP_NONE, 0));
> nl_msg_put_unspec_zero(&actions, OVS_USERSPACE_ATTR_USERDATA, 4);
> nl_msg_end_nested(&actions, start);
>
> --
> 1.7.9.5
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.openvswitch.org/pipermail/ovs-dev/attachments/20140205/5a465db8/attachment-0003.html>
More information about the dev
mailing list