[ovs-dev] [PATCH] ofproto-dpif-upcall: Remove the dispatcher thread.

Alex Wang alexw at nicira.com
Thu Feb 6 07:04:00 UTC 2014


Hey Ben and Pravin,

Could you help review the dpif part and datapath part of the patch
respectively?

I found it really difficult to partition the patch into series.  So, I can
only post the entire in one.

Thanks a lot~,
Alex Wang,


On Wed, Feb 5, 2014 at 11:01 PM, Alex Wang <alexw at nicira.com> wrote:

> This commit removes the 'dispatcher' thread by allowing 'handler'
> threads to read upcalls directly from dpif.  vport in dpif will
> open netlink sockets for each handler and will use the 5-tuple
> hash from the missed packet to choose which socket (handler) to
> send the upcall.
>
> This patch also significantly simplifies the flow miss handling
> code and brings slight improvement to flow setup rate.
>
> Signed-off-by: Alex Wang <alexw at nicira.com>
>
> ---
> RFC->PATCH
> - use XOR to calculate the 5-tuple hash.  this fixes the flow setup
>   performance variation issue.
> - replace the malloc of 'struct upcall *'  in udpif_upcall_handler()
>   by local 'struct upcall' array.
> ---
>  datapath/datapath.c           |   22 +-
>  datapath/vport.c              |  128 +++++++++++-
>  datapath/vport.h              |   25 ++-
>  include/linux/openvswitch.h   |    9 +-
>  lib/dpif-linux.c              |  458
> +++++++++++++++++++++++------------------
>  lib/dpif-linux.h              |    3 +-
>  lib/dpif-netdev.c             |    9 +-
>  lib/dpif-provider.h           |   26 ++-
>  lib/dpif.c                    |   38 ++--
>  lib/dpif.h                    |   10 +-
>  lib/flow.c                    |   18 ++
>  lib/flow.h                    |    3 +-
>  ofproto/ofproto-dpif-upcall.c |  265 ++++++------------------
>  ofproto/ofproto-dpif-xlate.c  |    5 +-
>  ofproto/ofproto-dpif.c        |    8 +-
>  15 files changed, 560 insertions(+), 467 deletions(-)
>
> diff --git a/datapath/datapath.c b/datapath/datapath.c
> index 5f1b34c..3e08e02 100644
> --- a/datapath/datapath.c
> +++ b/datapath/datapath.c
> @@ -242,7 +242,7 @@ void ovs_dp_process_received_packet(struct vport *p,
> struct sk_buff *skb)
>                 upcall.cmd = OVS_PACKET_CMD_MISS;
>                 upcall.key = &key;
>                 upcall.userdata = NULL;
> -               upcall.portid = p->upcall_portid;
> +               upcall.portid = ovs_vport_find_pid(p, &key);
>                 ovs_dp_upcall(dp, skb, &upcall);
>                 consume_skb(skb);
>                 stats_counter = &stats->n_missed;
> @@ -1239,7 +1239,7 @@ static int ovs_dp_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
>         parms.options = NULL;
>         parms.dp = dp;
>         parms.port_no = OVSP_LOCAL;
> -       parms.upcall_portid = nla_get_u32(a[OVS_DP_ATTR_UPCALL_PID]);
> +       parms.upcall_pids = a[OVS_DP_ATTR_UPCALL_PID];
>
>         ovs_dp_change(dp, a);
>
> @@ -1457,7 +1457,7 @@ static const struct nla_policy
> vport_policy[OVS_VPORT_ATTR_MAX + 1] = {
>         [OVS_VPORT_ATTR_STATS] = { .len = sizeof(struct ovs_vport_stats) },
>         [OVS_VPORT_ATTR_PORT_NO] = { .type = NLA_U32 },
>         [OVS_VPORT_ATTR_TYPE] = { .type = NLA_U32 },
> -       [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NLA_U32 },
> +       [OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NLA_UNSPEC },
>         [OVS_VPORT_ATTR_OPTIONS] = { .type = NLA_NESTED },
>  };
>
> @@ -1492,8 +1492,7 @@ static int ovs_vport_cmd_fill_info(struct vport
> *vport, struct sk_buff *skb,
>
>         if (nla_put_u32(skb, OVS_VPORT_ATTR_PORT_NO, vport->port_no) ||
>             nla_put_u32(skb, OVS_VPORT_ATTR_TYPE, vport->ops->type) ||
> -           nla_put_string(skb, OVS_VPORT_ATTR_NAME,
> vport->ops->get_name(vport)) ||
> -           nla_put_u32(skb, OVS_VPORT_ATTR_UPCALL_PID,
> vport->upcall_portid))
> +           nla_put_string(skb, OVS_VPORT_ATTR_NAME,
> vport->ops->get_name(vport)))
>                 goto nla_put_failure;
>
>         ovs_vport_get_stats(vport, &vport_stats);
> @@ -1501,6 +1500,9 @@ static int ovs_vport_cmd_fill_info(struct vport
> *vport, struct sk_buff *skb,
>                     &vport_stats))
>                 goto nla_put_failure;
>
> +       if (ovs_vport_get_upcall_pids(vport, skb))
> +               goto nla_put_failure;
> +
>         err = ovs_vport_get_options(vport, skb);
>         if (err == -EMSGSIZE)
>                 goto error;
> @@ -1577,8 +1579,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
>         int err;
>
>         err = -EINVAL;
> -       if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE] ||
> -           !a[OVS_VPORT_ATTR_UPCALL_PID])
> +       if (!a[OVS_VPORT_ATTR_NAME] || !a[OVS_VPORT_ATTR_TYPE])
>                 goto exit;
>
>         ovs_lock();
> @@ -1615,7 +1616,7 @@ static int ovs_vport_cmd_new(struct sk_buff *skb,
> struct genl_info *info)
>         parms.options = a[OVS_VPORT_ATTR_OPTIONS];
>         parms.dp = dp;
>         parms.port_no = port_no;
> -       parms.upcall_portid = nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> +       parms.upcall_pids = a[OVS_VPORT_ATTR_UPCALL_PIDS];
>
>         vport = new_vport(&parms);
>         err = PTR_ERR(vport);
> @@ -1676,8 +1677,9 @@ static int ovs_vport_cmd_set(struct sk_buff *skb,
> struct genl_info *info)
>         if (a[OVS_VPORT_ATTR_STATS])
>                 ovs_vport_set_stats(vport,
> nla_data(a[OVS_VPORT_ATTR_STATS]));
>
> -       if (a[OVS_VPORT_ATTR_UPCALL_PID])
> -               vport->upcall_portid =
> nla_get_u32(a[OVS_VPORT_ATTR_UPCALL_PID]);
> +       err = ovs_vport_set_upcall_pids(vport,
> a[OVS_VPORT_ATTR_UPCALL_PIDS]);
> +       if (err)
> +               goto exit_free;
>
>         err = ovs_vport_cmd_fill_info(vport, reply, info->snd_portid,
>                                       info->snd_seq, 0, OVS_VPORT_CMD_NEW);
> diff --git a/datapath/vport.c b/datapath/vport.c
> index 7f12acc..3e83ba7 100644
> --- a/datapath/vport.c
> +++ b/datapath/vport.c
> @@ -135,10 +135,12 @@ struct vport *ovs_vport_alloc(int priv_size, const
> struct vport_ops *ops,
>
>         vport->dp = parms->dp;
>         vport->port_no = parms->port_no;
> -       vport->upcall_portid = parms->upcall_portid;
>         vport->ops = ops;
>         INIT_HLIST_NODE(&vport->dp_hash_node);
>
> +       if (ovs_vport_set_upcall_pids(vport, parms->upcall_pids))
> +               return ERR_PTR(-EINVAL);
> +
>         vport->percpu_stats = alloc_percpu(struct pcpu_tstats);
>         if (!vport->percpu_stats) {
>                 kfree(vport);
> @@ -162,6 +164,7 @@ struct vport *ovs_vport_alloc(int priv_size, const
> struct vport_ops *ops,
>   */
>  void ovs_vport_free(struct vport *vport)
>  {
> +       ovs_vport_set_upcall_pids(vport, NULL);
>         free_percpu(vport->percpu_stats);
>         kfree(vport);
>  }
> @@ -348,6 +351,129 @@ int ovs_vport_get_options(const struct vport *vport,
> struct sk_buff *skb)
>         return 0;
>  }
>
> +static void __vport_pids_destroy(struct vport_pids *pids)
> +{
> +       if (pids->pids)
> +               kfree(pids->pids);
> +
> +       kfree(pids);
> +}
> +
> +static void vport_pids_destroy_rcu_cb(struct rcu_head *rcu)
> +{
> +       struct vport_pids *pids = container_of(rcu, struct vport_pids,
> rcu);
> +
> +       __vport_pids_destroy(pids);
> +}
> +
> +/**
> + *     ovs_vport_set_upcall_pids - set upcall pids for sending upcall.
> + *
> + * @vport: vport to modify.
> + * @pids: new configuration.
> + *
> + * If the pids is non-null, sets the vport's upcall_pids pointer.  If the
> + * pids is null, frees the vport's upcall_pids.
> + *
> + * Returns 0 if successful, -EINVAL if @pids cannot be parsed as an array
> + * of U32.
> + */
> +int ovs_vport_set_upcall_pids(struct vport *vport,  struct nlattr *pids)
> +{
> +       struct vport_pids *old;
> +
> +       if (pids && nla_len(pids) % sizeof(u32))
> +               return -EINVAL;
> +
> +       rcu_read_lock();
> +       old = vport->upcall_pids ? ovsl_dereference(vport->upcall_pids)
> +               : NULL;
> +
> +       if (pids) {
> +               struct vport_pids *vport_pids;
> +
> +               vport_pids = kmalloc(sizeof *vport_pids, GFP_KERNEL);
> +               vport_pids->pids = kmalloc(nla_len(pids), GFP_KERNEL);
> +               vport_pids->n_pids = nla_len(pids)
> +                       / (sizeof *vport_pids->pids);
> +               memcpy(vport_pids->pids, nla_data(pids), nla_len(pids));
> +
> +               rcu_assign_pointer(vport->upcall_pids, vport_pids);
> +       } else if (old) {
> +               rcu_assign_pointer(vport->upcall_pids, NULL);
> +       }
> +
> +       if (old)
> +               call_rcu(&old->rcu, vport_pids_destroy_rcu_cb);
> +
> +       rcu_read_unlock();
> +       return 0;
> +}
> +
> +/**
> + *     ovs_vport_get_options - get the upcall_pids value.
> + *
> + * @vport: vport from which to retrieve the pids.
> + * @skb: sk_buff where pids should be appended.
> + *
> + * Retrieves the configuration of the given vport, appending the
> + * %OVS_VPORT_ATTR_UPCALL_PIDS attribute which is the array of upcall
> + * pids to @skb.
> + *
> + * Returns 0 if successful, -EMSGSIZE if @skb has insufficient room.
> + * If an error occurs, @skb is left unmodified.
> + */
> +int ovs_vport_get_upcall_pids(const struct vport *vport, struct sk_buff
> *skb)
> +{
> +       struct vport_pids *pids;
> +       int err = 0;
> +
> +       rcu_read_lock();
> +       pids = ovsl_dereference(vport->upcall_pids);
> +
> +       if (!pids)
> +               goto exit;
> +
> +       if (nla_put(skb, OVS_VPORT_ATTR_UPCALL_PIDS,
> +                   pids->n_pids * sizeof *pids->pids,
> +                   (void *) pids->pids)) {
> +               err = -EMSGSIZE;
> +               goto exit;
> +       }
> +
> +exit:
> +       rcu_read_unlock();
> +       return err;
> +}
> +
> +/**
> + *     ovs_vport_find_pid - find the upcall pid to send upcall.
> + *
> + * @vport: vport from which the missed packet is received.
> + * @key: flow keys.
> + *
> + * Calculates the 5-tuple hash from the flow key and finds the upcall pid
> to
> + * send the upcall to.
> + *
> + * Returns the pid of the target socket.  Must be called with
> rcu_read_lock.
> + */
> +u32 ovs_vport_find_pid(const struct vport *p, const struct sw_flow_key
> *key)
> +{
> +       struct vport_pids *pids;
> +       u32 hash;
> +
> +       pids = ovsl_dereference(p->upcall_pids);
> +
> +       if (!pids)
> +               return 0;
> +
> +       hash = key->ipv4.addr.src ^ key->ipv4.addr.dst
> +               ^ key->ip.proto ^ key->ipv4.tp.src
> +               ^ key->ipv4.tp.dst;
> +
> +       return pids->pids[jhash((void *) &hash, 4, 0) % pids->n_pids];
> +}
> +
>  /**
>   *     ovs_vport_receive - pass up received packet to the datapath for
> processing
>   *
> diff --git a/datapath/vport.h b/datapath/vport.h
> index 18b723e..f11faa9 100644
> --- a/datapath/vport.h
> +++ b/datapath/vport.h
> @@ -50,6 +50,11 @@ void ovs_vport_get_stats(struct vport *, struct
> ovs_vport_stats *);
>  int ovs_vport_set_options(struct vport *, struct nlattr *options);
>  int ovs_vport_get_options(const struct vport *, struct sk_buff *);
>
> +int ovs_vport_set_upcall_pids(struct vport *, struct nlattr *pids);
> +int ovs_vport_get_upcall_pids(const struct vport *, struct sk_buff *);
> +
> +u32 ovs_vport_find_pid(const struct vport *, const struct sw_flow_key *);
> +
>  int ovs_vport_send(struct vport *, struct sk_buff *);
>
>  /* The following definitions are for implementers of vport devices: */
> @@ -60,13 +65,25 @@ struct vport_err_stats {
>         u64 tx_dropped;
>         u64 tx_errors;
>  };
> +/**
> + * struct vport_pids - array of netlink pids for a vport.
> + *                     must be protected by rcu.
> + * @rcu: RCU callback head for deferred destruction.
> + * @n_pids: Size of @upcall_pids array.
> + * @pids: Array storing the Netlink socket pids to use for packets
> received
> + * on this port that miss the flow table.
> + */
> +struct vport_pids {
> +       struct rcu_head rcu;
> +       u32 n_pids;
> +       u32 *pids;
> +};
>
>  /**
>   * struct vport - one port within a datapath
>   * @rcu: RCU callback head for deferred destruction.
>   * @dp: Datapath to which this port belongs.
> - * @upcall_portid: The Netlink port to use for packets received on this
> port that
> - * miss the flow table.
> + * @upcall_pids: RCU protected vport_pids array.
>   * @port_no: Index into @dp's @ports array.
>   * @hash_node: Element in @dev_table hash table in vport.c.
>   * @dp_hash_node: Element in @datapath->ports hash table in datapath.c.
> @@ -80,7 +97,7 @@ struct vport_err_stats {
>  struct vport {
>         struct rcu_head rcu;
>         struct datapath *dp;
> -       u32 upcall_portid;
> +       struct vport_pids __rcu *upcall_pids;
>         u16 port_no;
>
>         struct hlist_node hash_node;
> @@ -112,7 +129,7 @@ struct vport_parms {
>         /* For ovs_vport_alloc(). */
>         struct datapath *dp;
>         u16 port_no;
> -       u32 upcall_portid;
> +       struct nlattr *upcall_pids;
>  };
>
>  /**
> diff --git a/include/linux/openvswitch.h b/include/linux/openvswitch.h
> index 5137c2f..e1fe92d 100644
> --- a/include/linux/openvswitch.h
> +++ b/include/linux/openvswitch.h
> @@ -225,9 +225,9 @@ enum ovs_vport_type {
>   * this is the name of the network device.  Maximum length %IFNAMSIZ-1
> bytes
>   * plus a null terminator.
>   * @OVS_VPORT_ATTR_OPTIONS: Vport-specific configuration information.
> - * @OVS_VPORT_ATTR_UPCALL_PID: The Netlink socket in userspace that
> - * OVS_PACKET_CMD_MISS upcalls will be directed to for packets received on
> - * this port.  A value of zero indicates that upcalls should not be sent.
> + * @OVS_VPORT_ATTR_UPCALL_PIDS: The array of Netlink socket pids in
> userspace
> + * that OVS_PACKET_CMD_MISS upcalls will be directed to for packets
> received on
> + * this port.  If this is not specified, upcalls should not be sent.
>   * @OVS_VPORT_ATTR_STATS: A &struct ovs_vport_stats giving statistics for
>   * packets sent or received through the vport.
>   *
> @@ -251,7 +251,8 @@ enum ovs_vport_attr {
>         OVS_VPORT_ATTR_TYPE,    /* u32 OVS_VPORT_TYPE_* constant. */
>         OVS_VPORT_ATTR_NAME,    /* string name, up to IFNAMSIZ bytes long
> */
>         OVS_VPORT_ATTR_OPTIONS, /* nested attributes, varies by vport type
> */
> -       OVS_VPORT_ATTR_UPCALL_PID, /* u32 Netlink PID to receive upcalls */
> +       OVS_VPORT_ATTR_UPCALL_PIDS, /* array of u32 Netlink socket PIDs
> for */
> +                               /* receiving upcalls */
>         OVS_VPORT_ATTR_STATS,   /* struct ovs_vport_stats */
>         __OVS_VPORT_ATTR_MAX
>  };
> diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
> index f7f5292..86e0e97 100644
> --- a/lib/dpif-linux.c
> +++ b/lib/dpif-linux.c
> @@ -35,6 +35,7 @@
>  #include "bitmap.h"
>  #include "dpif-provider.h"
>  #include "dynamic-string.h"
> +#include "fat-rwlock.h"
>  #include "flow.h"
>  #include "netdev.h"
>  #include "netdev-linux.h"
> @@ -132,7 +133,15 @@ struct dpif_channel {
>      long long int last_poll;    /* Last time this channel was polled. */
>  };
>
> -static void report_loss(struct dpif *, struct dpif_channel *);
> +static void report_loss(struct dpif *, struct dpif_channel *, uint32_t
> ch_idx,
> +                        uint32_t handler_id);
> +
> +struct dpif_epoll {
> +    struct epoll_event *epoll_events;
> +    int epoll_fd;               /* epoll fd that includes channel socks.
> */
> +    int n_events;               /* Num events returned by epoll_wait(). */
> +    int event_offset;           /* Offset into 'epoll_events'. */
> +};
>
>  /* Datapath interface for the openvswitch Linux kernel module. */
>  struct dpif_linux {
> @@ -140,13 +149,11 @@ struct dpif_linux {
>      int dp_ifindex;
>
>      /* Upcall messages. */
> -    struct ovs_mutex upcall_lock;
> +    struct fat_rwlock upcall_lock;
>      int uc_array_size;          /* Size of 'channels' and 'epoll_events'.
> */
> -    struct dpif_channel *channels;
> -    struct epoll_event *epoll_events;
> -    int epoll_fd;               /* epoll fd that includes channel socks.
> */
> -    int n_events;               /* Num events returned by epoll_wait(). */
> -    int event_offset;           /* Offset into 'epoll_events'. */
> +    struct dpif_epoll *epolls;
> +    struct dpif_channel **channels;/* Array of channel arrays for each
> vport. */
> +    uint32_t n_handlers;           /* Num of upcall receivers (handlers).
> */
>
>      /* Change notification. */
>      struct nl_sock *port_notifier; /* vport multicast group subscriber. */
> @@ -171,8 +178,8 @@ static unsigned int ovs_vport_mcgroup;
>  static int dpif_linux_init(void);
>  static int open_dpif(const struct dpif_linux_dp *, struct dpif **);
>  static uint32_t dpif_linux_port_get_pid(const struct dpif *,
> -                                        odp_port_t port_no);
> -static int dpif_linux_refresh_channels(struct dpif *);
> +                                        odp_port_t port_no, uint32_t
> hash);
> +static int dpif_linux_refresh_channels(struct dpif *, uint32_t
> n_handlers);
>
>  static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *,
>                                         struct ofpbuf *);
> @@ -252,8 +259,7 @@ open_dpif(const struct dpif_linux_dp *dp, struct dpif
> **dpifp)
>
>      dpif = xzalloc(sizeof *dpif);
>      dpif->port_notifier = NULL;
> -    ovs_mutex_init(&dpif->upcall_lock);
> -    dpif->epoll_fd = -1;
> +    fat_rwlock_init(&dpif->upcall_lock);
>
>      dpif_init(&dpif->dpif, &dpif_linux_class, dp->name,
>                dp->dp_ifindex, dp->dp_ifindex);
> @@ -265,63 +271,43 @@ open_dpif(const struct dpif_linux_dp *dp, struct
> dpif **dpifp)
>  }
>
>  static void
> -destroy_channels(struct dpif_linux *dpif)
> +channels_to_pids(struct dpif_channel *ch, uint32_t len, uint32_t
> **upcall_pids)
>  {
> -    unsigned int i;
> +    size_t i;
> +    uint32_t *pids;
>
> -    if (dpif->epoll_fd < 0) {
> +    if (!upcall_pids) {
>          return;
>      }
>
> -    for (i = 0; i < dpif->uc_array_size; i++ ) {
> -        struct dpif_linux_vport vport_request;
> -        struct dpif_channel *ch = &dpif->channels[i];
> -        uint32_t upcall_pid = 0;
> -
> -        if (!ch->sock) {
> -            continue;
> -        }
> -
> -        epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock),
> NULL);
> -
> -        /* Turn off upcalls. */
> -        dpif_linux_vport_init(&vport_request);
> -        vport_request.cmd = OVS_VPORT_CMD_SET;
> -        vport_request.dp_ifindex = dpif->dp_ifindex;
> -        vport_request.port_no = u32_to_odp(i);
> -        vport_request.upcall_pid = &upcall_pid;
> -        dpif_linux_vport_transact(&vport_request, NULL, NULL);
> -
> -        nl_sock_destroy(ch->sock);
> +    pids = xmalloc(len * sizeof *pids);
> +    for (i = 0; i < len; i++) {
> +        pids[i] = nl_sock_pid(ch[i].sock);
>      }
> -
> -    free(dpif->channels);
> -    dpif->channels = NULL;
> -    dpif->uc_array_size = 0;
> -
> -    free(dpif->epoll_events);
> -    dpif->epoll_events = NULL;
> -    dpif->n_events = dpif->event_offset = 0;
> -
> -    /* Don't close dpif->epoll_fd since that would cause other threads
> that
> -     * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed
> fd. */
> +    *upcall_pids = pids;
>  }
>
> +/* Adds 'dpif->n_handlers' channels to vport.  If upcall_pids is non-NULL,
> + * makes it point to the array of channel socket pids. */
>  static int
> -add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock
> *sock)
> +add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no,
> +                   uint32_t **upcall_pids)
>  {
>      struct epoll_event event;
> +    struct dpif_epoll *epolls = dpif->epolls;
>      uint32_t port_idx = odp_to_u32(port_no);
> +    uint32_t *pids = NULL;
> +    int error = 0;
> +    size_t i, j;
>
> -    if (dpif->epoll_fd < 0) {
> +    if (epolls == NULL) {
>          return 0;
>      }
>
>      /* We assume that the datapath densely chooses port numbers, which
> -     * can therefore be used as an index into an array of channels. */
> +     * can therefore be used as an index into dpif->channels. */
>      if (port_idx >= dpif->uc_array_size) {
>          uint32_t new_size = port_idx + 1;
> -        uint32_t i;
>
>          if (new_size > MAX_PORTS) {
>              VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
> @@ -332,49 +318,117 @@ add_channel(struct dpif_linux *dpif, odp_port_t
> port_no, struct nl_sock *sock)
>          dpif->channels = xrealloc(dpif->channels,
>                                    new_size * sizeof *dpif->channels);
>          for (i = dpif->uc_array_size; i < new_size; i++) {
> -            dpif->channels[i].sock = NULL;
> +            dpif->channels[i] = NULL;
> +        }
> +        for (i = 0; i < dpif->n_handlers; i++) {
> +            epolls[i].epoll_events = xrealloc(epolls[i].epoll_events,
> +                                              new_size * sizeof
> +                                              *epolls[i].epoll_events);
>          }
> -
> -        dpif->epoll_events = xrealloc(dpif->epoll_events,
> -                                      new_size * sizeof
> *dpif->epoll_events);
>          dpif->uc_array_size = new_size;
>      }
>
>      memset(&event, 0, sizeof event);
>      event.events = EPOLLIN;
>      event.data.u32 = port_idx;
> -    if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> -                  &event) < 0) {
> -        return errno;
> -    }
>
> -    nl_sock_destroy(dpif->channels[port_idx].sock);
> -    dpif->channels[port_idx].sock = sock;
> -    dpif->channels[port_idx].last_poll = LLONG_MIN;
> +    /* Creates channel for each upcall handler. */
> +    dpif->channels[port_idx] = xzalloc(dpif->n_handlers
> +                                       * sizeof
> *dpif->channels[port_idx]);
> +    for (i = 0; i < dpif->n_handlers; i++) {
> +        struct nl_sock *sock = NULL;
> +
> +        error = nl_sock_create(NETLINK_GENERIC, &sock);
> +        if (error) {
> +            goto error;
> +        }
> +
> +        if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> +                      &event) < 0) {
> +            error = errno;
> +            goto error;
> +        }
> +        dpif->channels[port_idx][i].sock = sock;
> +        dpif->channels[port_idx][i].last_poll = LLONG_MIN;
> +    }
> +    channels_to_pids(dpif->channels[port_idx], dpif->n_handlers,
> upcall_pids);
>
>      return 0;
> +
> +error:
> +    /* Cleans up the already created channel and socks. */
> +    for (j = 0; j < i; j++) {
> +        nl_sock_destroy(dpif->channels[port_idx][j].sock);
> +    }
> +    free(pids);
> +    free(dpif->channels[port_idx]);
> +    dpif->channels[port_idx] = NULL;
> +
> +    return error;
>  }
>
>  static void
> -del_channel(struct dpif_linux *dpif, odp_port_t port_no)
> +del_vport_channels(struct dpif_linux *dpif, odp_port_t port_no)
>  {
>      struct dpif_channel *ch;
>      uint32_t port_idx = odp_to_u32(port_no);
> +    size_t i;
> +
> +    if (!dpif->epolls || port_idx >= dpif->uc_array_size) {
> +        return;
> +    }
>
> -    if (dpif->epoll_fd < 0 || port_idx >= dpif->uc_array_size) {
> +    ch = dpif->channels[port_idx];
> +    if (!ch) {
>          return;
>      }
>
> -    ch = &dpif->channels[port_idx];
> -    if (!ch->sock) {
> +    for (i = 0; i < dpif->n_handlers; i++) {
> +        epoll_ctl(dpif->epolls[i].epoll_fd, EPOLL_CTL_DEL,
> +                  nl_sock_fd(ch[i].sock), NULL);
> +        nl_sock_destroy(ch[i].sock);
> +        dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
> +    }
> +    free(ch);
> +    dpif->channels[port_idx] = NULL;
> +}
> +
> +static void
> +destroy_all_channels(struct dpif_linux *dpif)
> +{
> +    unsigned int i;
> +
> +    if (!dpif->epolls) {
>          return;
>      }
>
> -    epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
> -    dpif->event_offset = dpif->n_events = 0;
> +    for (i = 0; i < dpif->uc_array_size; i++ ) {
> +        struct dpif_linux_vport vport_request;
> +        struct dpif_channel *ch = dpif->channels[i];
> +
> +        if (!ch->sock) {
> +            continue;
> +        }
> +
> +        /* Turn off upcalls. */
> +        dpif_linux_vport_init(&vport_request);
> +        vport_request.cmd = OVS_VPORT_CMD_SET;
> +        vport_request.dp_ifindex = dpif->dp_ifindex;
> +        vport_request.port_no = u32_to_odp(i);
> +        vport_request.upcall_pids = NULL;
> +        dpif_linux_vport_transact(&vport_request, NULL, NULL);
> +
> +        del_vport_channels(dpif, u32_to_odp(i));
> +    }
> +
> +    free(dpif->channels);
> +    dpif->channels = NULL;
> +    dpif->uc_array_size = 0;
> +
> +    free(dpif->epolls);
>
> -    nl_sock_destroy(ch->sock);
> -    ch->sock = NULL;
> +    /* Don't close dpif->epoll_fd since that would cause other threads
> that
> +     * call dpif_recv_wait(dpif) to wait on an arbitrary fd or a closed
> fd. */
>  }
>
>  static void
> @@ -383,11 +437,8 @@ dpif_linux_close(struct dpif *dpif_)
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
>      nl_sock_destroy(dpif->port_notifier);
> -    destroy_channels(dpif);
> -    if (dpif->epoll_fd >= 0) {
> -        close(dpif->epoll_fd);
> -    }
> -    ovs_mutex_destroy(&dpif->upcall_lock);
> +    destroy_all_channels(dpif);
> +    fat_rwlock_destroy(&dpif->upcall_lock);
>      free(dpif);
>  }
>
> @@ -409,7 +460,7 @@ dpif_linux_run(struct dpif *dpif_)
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      if (dpif->refresh_channels) {
>          dpif->refresh_channels = false;
> -        dpif_linux_refresh_channels(dpif_);
> +        dpif_linux_refresh_channels(dpif_, dpif->n_handlers);
>      }
>  }
>
> @@ -500,20 +551,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
>                                                    namebuf, sizeof
> namebuf);
>      const char *type = netdev_get_type(netdev);
>      struct dpif_linux_vport request, reply;
> -    struct nl_sock *sock = NULL;
> -    uint32_t upcall_pid;
>      struct ofpbuf *buf;
>      uint64_t options_stub[64 / 8];
>      struct ofpbuf options;
> +    uint32_t *upcall_pids = NULL;
>      int error;
>
> -    if (dpif->epoll_fd >= 0) {
> -        error = nl_sock_create(NETLINK_GENERIC, &sock);
> -        if (error) {
> -            return error;
> -        }
> -    }
> -
>      dpif_linux_vport_init(&request);
>      request.cmd = OVS_VPORT_CMD_NEW;
>      request.dp_ifindex = dpif->dp_ifindex;
> @@ -522,7 +565,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
>          VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it
> has "
>                       "unsupported type `%s'",
>                       dpif_name(dpif_), name, type);
> -        nl_sock_destroy(sock);
>          return EINVAL;
>      }
>      request.name = name;
> @@ -541,41 +583,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct
> netdev *netdev,
>      }
>
>      request.port_no = *port_nop;
> -    upcall_pid = sock ? nl_sock_pid(sock) : 0;
> -    request.upcall_pid = &upcall_pid;
> +    request.upcall_pids = NULL;
>
>      error = dpif_linux_vport_transact(&request, &reply, &buf);
>      if (!error) {
>          *port_nop = reply.port_no;
> -        VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
> -                 dpif_name(dpif_), reply.port_no, upcall_pid);
>      } else {
>          if (error == EBUSY && *port_nop != ODPP_NONE) {
>              VLOG_INFO("%s: requested port %"PRIu32" is in use",
>                        dpif_name(dpif_), *port_nop);
>          }
> -        nl_sock_destroy(sock);
>          ofpbuf_delete(buf);
>          return error;
>      }
>      ofpbuf_delete(buf);
>
> -    if (sock) {
> -        error = add_channel(dpif, *port_nop, sock);
> -        if (error) {
> -            VLOG_INFO("%s: could not add channel for port %s",
> -                      dpif_name(dpif_), name);
> +    if (add_vport_channels(dpif, *port_nop, &upcall_pids)) {
> +        VLOG_INFO("%s: could not add channel for port %s",
> +                  dpif_name(dpif_), name);
>
> -            /* Delete the port. */
> -            dpif_linux_vport_init(&request);
> -            request.cmd = OVS_VPORT_CMD_DEL;
> -            request.dp_ifindex = dpif->dp_ifindex;
> -            request.port_no = *port_nop;
> -            dpif_linux_vport_transact(&request, NULL, NULL);
> +        /* Delete the port. */
> +        dpif_linux_vport_init(&request);
> +        request.cmd = OVS_VPORT_CMD_DEL;
> +        request.dp_ifindex = dpif->dp_ifindex;
> +        request.port_no = *port_nop;
> +        dpif_linux_vport_transact(&request, NULL, NULL);
>
> -            nl_sock_destroy(sock);
> -            return error;
> -        }
> +        return error;
> +    } else {
> +        dpif_linux_vport_init(&request);
> +        request.cmd = OVS_VPORT_CMD_SET;
> +        request.dp_ifindex = dpif->dp_ifindex;
> +        request.port_no = *port_nop;
> +        request.n_pids = dpif->n_handlers;
> +        request.upcall_pids = upcall_pids;
> +        dpif_linux_vport_transact(&request, NULL, NULL);
> +        free(upcall_pids);
>      }
>
>      return 0;
> @@ -588,9 +631,9 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev
> *netdev,
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      int error;
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> +    fat_rwlock_wrlock(&dpif->upcall_lock);
>      error = dpif_linux_port_add__(dpif_, netdev, port_nop);
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>
>      return error;
>  }
> @@ -608,7 +651,7 @@ dpif_linux_port_del__(struct dpif *dpif_, odp_port_t
> port_no)
>      vport.port_no = port_no;
>      error = dpif_linux_vport_transact(&vport, NULL, NULL);
>
> -    del_channel(dpif, port_no);
> +    del_vport_channels(dpif, port_no);
>
>      return error;
>  }
> @@ -619,9 +662,9 @@ dpif_linux_port_del(struct dpif *dpif_, odp_port_t
> port_no)
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      int error;
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> +    fat_rwlock_wrlock(&dpif->upcall_lock);
>      error = dpif_linux_port_del__(dpif_, port_no);
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>
>      return error;
>  }
> @@ -672,21 +715,26 @@ dpif_linux_port_query_by_name(const struct dpif
> *dpif, const char *devname,
>  }
>
>  static uint32_t
> -dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)
> +dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
> +                        uint32_t hash)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      uint32_t port_idx = odp_to_u32(port_no);
>      uint32_t pid = 0;
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    if (dpif->epoll_fd >= 0) {
> +    fat_rwlock_wrlock(&dpif->upcall_lock);
> +    if (dpif->epolls) {
>          /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
>           * channel, since it is not heavily loaded. */
>          uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
> -        const struct nl_sock *sock = dpif->channels[idx].sock;
> -        pid = sock ? nl_sock_pid(sock) : 0;
> +        const struct dpif_channel *ch = dpif->channels[idx];
> +
> +        if (ch) {
> +            pid = ch[hash % dpif->n_handlers].sock
> +                  ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0;
> +        }
>      }
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>
>      return pid;
>  }
> @@ -1274,10 +1322,10 @@ dpif_linux_operate(struct dpif *dpif, struct
> dpif_op **ops, size_t n_ops)
>  }
>
>  /* Synchronizes 'dpif->channels' with the set of vports currently in
> 'dpif' in
> - * the kernel, by adding a new channel for any kernel vport that lacks
> one and
> - * deleting any channels that have no backing kernel vports. */
> + * the kernel, by adding a new set of channels for any kernel vport that
> lacks
> + * one and deleting any channels that have no backing kernel vports. */
>  static int
> -dpif_linux_refresh_channels(struct dpif *dpif_)
> +dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      unsigned long int *keep_channels;
> @@ -1287,52 +1335,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
>      int retval = 0;
>      size_t i;
>
> -    /* To start with, we need an epoll fd. */
> -    if (dpif->epoll_fd < 0) {
> -        dpif->epoll_fd = epoll_create(10);
> -        if (dpif->epoll_fd < 0) {
> -            return errno;
> +    if (dpif->n_handlers != n_handlers) {
> +        destroy_all_channels(dpif);
> +        dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls);
> +
> +        for (i = 0; i < n_handlers; i++) {
> +            dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ?
> +                                                    dpif->uc_array_size :
> 10);
> +            if (dpif->epolls[i].epoll_fd < 0) {
> +                return errno;
> +            }
>          }
> +        dpif->n_handlers = n_handlers;
> +    }
> +
> +    for (i = 0; i < n_handlers; i++) {
> +        dpif->epolls[i].event_offset = dpif->epolls[i].n_events = 0;
>      }
>
>      keep_channels_nbits = dpif->uc_array_size;
>      keep_channels = bitmap_allocate(keep_channels_nbits);
>
> -    dpif->n_events = dpif->event_offset = 0;
> -
>      dpif_linux_port_dump_start__(dpif_, &dump);
>      while (!dpif_linux_port_dump_next__(dpif_, &dump, &vport)) {
>          uint32_t port_no = odp_to_u32(vport.port_no);
> -        struct nl_sock *sock = (port_no < dpif->uc_array_size
> -                                ? dpif->channels[port_no].sock
> -                                : NULL);
> -        bool new_sock = !sock;
> +        struct dpif_channel *ch = (port_no < dpif->uc_array_size
> +                                   ? dpif->channels[port_no]
> +                                   : NULL);
> +        uint32_t *upcall_pids = NULL;
>          int error;
>
> -        if (new_sock) {
> -            error = nl_sock_create(NETLINK_GENERIC, &sock);
> +        if (!ch) {
> +            error = add_vport_channels(dpif, vport.port_no, &upcall_pids);
>              if (error) {
> +                VLOG_INFO("%s: could not add channels for port %s",
> +                          dpif_name(dpif_), vport.name);
>                  retval = error;
>                  goto error;
>              }
> +        } else {
> +            channels_to_pids(ch, dpif->n_handlers, &upcall_pids);
>          }
>
>          /* Configure the vport to deliver misses to 'sock'. */
> -        if (!vport.upcall_pid || *vport.upcall_pid != nl_sock_pid(sock)) {
> -            uint32_t upcall_pid = nl_sock_pid(sock);
> +        if (!vport.upcall_pids
> +            || vport.n_pids != dpif->n_handlers
> +            || memcmp(upcall_pids, vport.upcall_pids, n_handlers * sizeof
> +                      *upcall_pids)) {
>              struct dpif_linux_vport vport_request;
>
>              dpif_linux_vport_init(&vport_request);
>              vport_request.cmd = OVS_VPORT_CMD_SET;
>              vport_request.dp_ifindex = dpif->dp_ifindex;
>              vport_request.port_no = vport.port_no;
> -            vport_request.upcall_pid = &upcall_pid;
> +            vport_request.n_pids = dpif->n_handlers;
> +            vport_request.upcall_pids = upcall_pids;
>              error = dpif_linux_vport_transact(&vport_request, NULL, NULL);
> -            if (!error) {
> -                VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid
> %"PRIu32,
> -                         dpif_name(&dpif->dpif), vport_request.port_no,
> -                         upcall_pid);
> -            } else {
> +            if (error) {
>                  VLOG_WARN_RL(&error_rl,
>                               "%s: failed to set upcall pid on port: %s",
>                               dpif_name(&dpif->dpif), ovs_strerror(error));
> @@ -1348,31 +1407,22 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
>              }
>          }
>
> -        if (new_sock) {
> -            error = add_channel(dpif, vport.port_no, sock);
> -            if (error) {
> -                VLOG_INFO("%s: could not add channel for port %s",
> -                          dpif_name(dpif_), vport.name);
> -                retval = error;
> -                goto error;
> -            }
> -        }
> -
>          if (port_no < keep_channels_nbits) {
>              bitmap_set1(keep_channels, port_no);
>          }
>          continue;
>
>      error:
> -        nl_sock_destroy(sock);
> +        del_vport_channels(dpif, vport.port_no);
>      }
>      nl_dump_done(&dump);
>
>      /* Discard any saved channels that we didn't reuse. */
>      for (i = 0; i < keep_channels_nbits; i++) {
>          if (!bitmap_is_set(keep_channels, i)) {
> -            nl_sock_destroy(dpif->channels[i].sock);
> -            dpif->channels[i].sock = NULL;
> +            del_vport_channels(dpif, u32_to_odp(i));
> +            free(dpif->channels[i]);
> +            dpif->channels[i] = NULL;
>          }
>      }
>      free(keep_channels);
> @@ -1381,29 +1431,32 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
>  }
>
>  static int
> -dpif_linux_recv_set__(struct dpif *dpif_, bool enable)
> +dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t
> n_handlers)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> -    if ((dpif->epoll_fd >= 0) == enable) {
> +    if ((dpif->epolls != NULL) == enable) {
> +        if (enable && dpif->n_handlers != n_handlers) {
> +            dpif_linux_refresh_channels(dpif_, n_handlers);
> +        }
>          return 0;
>      } else if (!enable) {
> -        destroy_channels(dpif);
> +        destroy_all_channels(dpif);
>          return 0;
>      } else {
> -        return dpif_linux_refresh_channels(dpif_);
> +        return dpif_linux_refresh_channels(dpif_, n_handlers);
>      }
>  }
>
>  static int
> -dpif_linux_recv_set(struct dpif *dpif_, bool enable)
> +dpif_linux_recv_set(struct dpif *dpif_, bool enable, uint32_t n_handlers)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      int error;
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    error = dpif_linux_recv_set__(dpif_, enable);
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_wrlock(&dpif->upcall_lock);
> +    error = dpif_linux_recv_set__(dpif_, enable, n_handlers);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>
>      return error;
>  }
> @@ -1482,38 +1535,39 @@ parse_odp_packet(struct ofpbuf *buf, struct
> dpif_upcall *upcall,
>  }
>
>  static int
> -dpif_linux_recv__(struct dpif *dpif_, struct dpif_upcall *upcall,
> -                  struct ofpbuf *buf)
> +dpif_linux_recv__(struct dpif *dpif_, uint32_t handler_id,
> +                  struct dpif_upcall *upcall, struct ofpbuf *buf)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
> +    struct dpif_epoll *epoll = dpif->epolls ? &dpif->epolls[handler_id] :
> NULL;
>      int read_tries = 0;
>
> -    if (dpif->epoll_fd < 0) {
> -       return EAGAIN;
> +    if (!epoll) {
> +        return EAGAIN;
>      }
>
> -    if (dpif->event_offset >= dpif->n_events) {
> +    if (epoll->event_offset >= epoll->n_events) {
>          int retval;
>
> -        dpif->event_offset = dpif->n_events = 0;
> +        epoll->event_offset = epoll->n_events = 0;
>
>          do {
> -            retval = epoll_wait(dpif->epoll_fd, dpif->epoll_events,
> +            retval = epoll_wait(epoll->epoll_fd, epoll->epoll_events,
>                                  dpif->uc_array_size, 0);
>          } while (retval < 0 && errno == EINTR);
>          if (retval < 0) {
>              static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
>              VLOG_WARN_RL(&rl, "epoll_wait failed (%s)",
> ovs_strerror(errno));
>          } else if (retval > 0) {
> -            dpif->n_events = retval;
> +            epoll->n_events = retval;
>          }
>      }
>
> -    while (dpif->event_offset < dpif->n_events) {
> -        int idx = dpif->epoll_events[dpif->event_offset].data.u32;
> -        struct dpif_channel *ch = &dpif->channels[idx];
> +    while (epoll->event_offset < epoll->n_events) {
> +        int idx = epoll->epoll_events[epoll->event_offset].data.u32;
> +        struct dpif_channel *ch = &dpif->channels[idx][handler_id];
>
> -        dpif->event_offset++;
> +        epoll->event_offset++;
>
>          for (;;) {
>              int dp_ifindex;
> @@ -1529,7 +1583,7 @@ dpif_linux_recv__(struct dpif *dpif_, struct
> dpif_upcall *upcall,
>                   * packets that the buffer overflowed.  Try again
>                   * immediately because there's almost certainly a packet
>                   * waiting for us. */
> -                report_loss(dpif_, ch);
> +                report_loss(dpif_, ch, idx, handler_id);
>                  continue;
>              }
>
> @@ -1554,29 +1608,29 @@ dpif_linux_recv__(struct dpif *dpif_, struct
> dpif_upcall *upcall,
>  }
>
>  static int
> -dpif_linux_recv(struct dpif *dpif_, struct dpif_upcall *upcall,
> -                struct ofpbuf *buf)
> +dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id,
> +                struct dpif_upcall *upcall, struct ofpbuf *buf)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      int error;
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    error = dpif_linux_recv__(dpif_, upcall, buf);
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_rdlock(&dpif->upcall_lock);
> +    error = dpif_linux_recv__(dpif_, handler_id, upcall, buf);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>
>      return error;
>  }
>
>  static void
> -dpif_linux_recv_wait(struct dpif *dpif_)
> +dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    if (dpif->epoll_fd >= 0) {
> -        poll_fd_wait(dpif->epoll_fd, POLLIN);
> +    fat_rwlock_rdlock(&dpif->upcall_lock);
> +    if (dpif->epolls) {
> +        poll_fd_wait(dpif->epolls[handler_id].epoll_fd, POLLIN);
>      }
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>  }
>
>  static void
> @@ -1584,18 +1638,19 @@ dpif_linux_recv_purge(struct dpif *dpif_)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    if (dpif->epoll_fd >= 0) {
> -        struct dpif_channel *ch;
> +    fat_rwlock_rdlock(&dpif->upcall_lock);
> +    if (dpif->epolls) {
> +        size_t i, j;
>
> -        for (ch = dpif->channels; ch <
> &dpif->channels[dpif->uc_array_size];
> -             ch++) {
> -            if (ch->sock) {
> -                nl_sock_drain(ch->sock);
> +        for (i = 0; i < dpif->uc_array_size; i++ ) {
> +            if (dpif->channels[i]) {
> +                for (j = 0; j < dpif->n_handlers; j++) {
> +                    nl_sock_drain(dpif->channels[i][j].sock);
> +                }
>              }
>          }
>      }
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>  }
>
>  const struct dpif_class dpif_linux_class = {
> @@ -1701,7 +1756,7 @@ dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport
> *vport,
>          [OVS_VPORT_ATTR_PORT_NO] = { .type = NL_A_U32 },
>          [OVS_VPORT_ATTR_TYPE] = { .type = NL_A_U32 },
>          [OVS_VPORT_ATTR_NAME] = { .type = NL_A_STRING, .max_len =
> IFNAMSIZ },
> -        [OVS_VPORT_ATTR_UPCALL_PID] = { .type = NL_A_U32 },
> +        [OVS_VPORT_ATTR_UPCALL_PIDS] = { .type = NL_A_UNSPEC, .optional =
> true },
>          [OVS_VPORT_ATTR_STATS] = { NL_POLICY_FOR(struct ovs_vport_stats),
>                                     .optional = true },
>          [OVS_VPORT_ATTR_OPTIONS] = { .type = NL_A_NESTED, .optional =
> true },
> @@ -1731,9 +1786,12 @@ dpif_linux_vport_from_ofpbuf(struct
> dpif_linux_vport *vport,
>      vport->port_no = nl_attr_get_odp_port(a[OVS_VPORT_ATTR_PORT_NO]);
>      vport->type = nl_attr_get_u32(a[OVS_VPORT_ATTR_TYPE]);
>      vport->name = nl_attr_get_string(a[OVS_VPORT_ATTR_NAME]);
> -    if (a[OVS_VPORT_ATTR_UPCALL_PID]) {
> -        vport->upcall_pid = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PID]);
> +    if (a[OVS_VPORT_ATTR_UPCALL_PIDS]) {
> +        vport->n_pids = nl_attr_get_size(a[OVS_VPORT_ATTR_UPCALL_PIDS])
> +                        / (sizeof *vport->upcall_pids);
> +        vport->upcall_pids = nl_attr_get(a[OVS_VPORT_ATTR_UPCALL_PIDS]);
>      }
> +
>      if (a[OVS_VPORT_ATTR_STATS]) {
>          vport->stats = nl_attr_get(a[OVS_VPORT_ATTR_STATS]);
>      }
> @@ -1770,8 +1828,10 @@ dpif_linux_vport_to_ofpbuf(const struct
> dpif_linux_vport *vport,
>          nl_msg_put_string(buf, OVS_VPORT_ATTR_NAME, vport->name);
>      }
>
> -    if (vport->upcall_pid) {
> -        nl_msg_put_u32(buf, OVS_VPORT_ATTR_UPCALL_PID,
> *vport->upcall_pid);
> +    if (vport->upcall_pids) {
> +        nl_msg_put_unspec(buf, OVS_VPORT_ATTR_UPCALL_PIDS,
> +                          vport->upcall_pids,
> +                          vport->n_pids * sizeof *vport->upcall_pids);
>      }
>
>      if (vport->stats) {
> @@ -2176,9 +2236,9 @@ dpif_linux_flow_get_stats(const struct
> dpif_linux_flow *flow,
>  /* Logs information about a packet that was recently lost in 'ch' (in
>   * 'dpif_'). */
>  static void
> -report_loss(struct dpif *dpif_, struct dpif_channel *ch)
> +report_loss(struct dpif *dpif_, struct dpif_channel *ch, uint32_t ch_idx,
> +            uint32_t handler_id)
>  {
> -    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
>      struct ds s;
>
> @@ -2192,7 +2252,7 @@ report_loss(struct dpif *dpif_, struct dpif_channel
> *ch)
>                        time_msec() - ch->last_poll);
>      }
>
> -    VLOG_WARN("%s: lost packet on channel %"PRIdPTR"%s",
> -              dpif_name(dpif_), ch - dpif->channels, ds_cstr(&s));
> +    VLOG_WARN("%s: lost packet on channel %u of handler %u",
> dpif_name(dpif_),
> +              ch_idx, handler_id);
>      ds_destroy(&s);
>  }
> diff --git a/lib/dpif-linux.h b/lib/dpif-linux.h
> index ec94ccf..02c7b03 100644
> --- a/lib/dpif-linux.h
> +++ b/lib/dpif-linux.h
> @@ -32,6 +32,7 @@ struct dpif_linux_vport {
>
>      /* ovs_vport header. */
>      int dp_ifindex;
> +    uint32_t n_pids;
>      odp_port_t port_no;                    /* ODPP_NONE if unknown. */
>      enum ovs_vport_type type;
>
> @@ -41,7 +42,7 @@ struct dpif_linux_vport {
>       * 32-bit boundaries, so use get_unaligned_u64() to access its values.
>       */
>      const char *name;                      /* OVS_VPORT_ATTR_NAME. */
> -    const uint32_t *upcall_pid;            /* OVS_VPORT_ATTR_UPCALL_PID.
> */
> +    const uint32_t *upcall_pids;           /* OVS_VPORT_ATTR_UPCALL_PIDS.
> */
>      const struct ovs_vport_stats *stats;   /* OVS_VPORT_ATTR_STATS. */
>      const struct nlattr *options;          /* OVS_VPORT_ATTR_OPTIONS. */
>      size_t options_len;
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 73eb99d..9033fba 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -1429,7 +1429,8 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
>  }
>
>  static int
> -dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
> +dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED,
> +                     uint32_t n_handlers OVS_UNUSED)
>  {
>      return 0;
>  }
> @@ -1458,8 +1459,8 @@ find_nonempty_queue(struct dp_netdev *dp)
>  }
>
>  static int
> -dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
> -                 struct ofpbuf *buf)
> +dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id OVS_UNUSED,
> +                 struct dpif_upcall *upcall, struct ofpbuf *buf)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_queue *q;
> @@ -1485,7 +1486,7 @@ dpif_netdev_recv(struct dpif *dpif, struct
> dpif_upcall *upcall,
>  }
>
>  static void
> -dpif_netdev_recv_wait(struct dpif *dpif)
> +dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)
>  {
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      uint64_t seq;
> diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> index adc5242..7ef10bb 100644
> --- a/lib/dpif-provider.h
> +++ b/lib/dpif-provider.h
> @@ -146,7 +146,8 @@ struct dpif_class {
>
>      /* Returns the Netlink PID value to supply in
> OVS_ACTION_ATTR_USERSPACE
>       * actions as the OVS_USERSPACE_ATTR_PID attribute's value, for use in
> -     * flows whose packets arrived on port 'port_no'.
> +     * flows whose packets arrived on port 'port_no' and 5-tuple hash is
> +     * 'hash'.
>       *
>       * A 'port_no' of UINT32_MAX should be treated as a special case.  The
>       * implementation should return a reserved PID, not allocated to any
> port,
> @@ -158,7 +159,8 @@ struct dpif_class {
>       *
>       * A dpif provider that doesn't have meaningful Netlink PIDs can use
> NULL
>       * for this function.  This is equivalent to always returning 0. */
> -    uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no);
> +    uint32_t (*port_get_pid)(const struct dpif *dpif, odp_port_t port_no,
> +                             uint32_t hash);
>
>      /* Attempts to begin dumping the ports in a dpif.  On success,
> returns 0
>       * and initializes '*statep' with any data needed for iteration.  On
> @@ -324,16 +326,17 @@ struct dpif_class {
>       * Turning packet receive off and then back on is allowed to change
> Netlink
>       * PID assignments (see ->port_get_pid()).  The client is responsible
> for
>       * updating flows as necessary if it does this. */
> -    int (*recv_set)(struct dpif *dpif, bool enable);
> +    int (*recv_set)(struct dpif *dpif, bool enable, uint32_t n_handlers);
>
>      /* Translates OpenFlow queue ID 'queue_id' (in host byte order) into a
>       * priority value used for setting packet priority. */
>      int (*queue_to_priority)(const struct dpif *dpif, uint32_t queue_id,
>                               uint32_t *priority);
>
> -    /* Polls for an upcall from 'dpif'.  If successful, stores the upcall
> into
> -     * '*upcall', using 'buf' for storage.  Should only be called if
> 'recv_set'
> -     * has been used to enable receiving packets from 'dpif'.
> +    /* Polls for an upcall from 'dpif' for handler with 'handler_id'.  If
> +     * successful, stores the upcall into '*upcall', using 'buf' for
> storage.
> +     * Should only be called if 'recv_set' has been used to enable
> receiving
> +     * packets from 'dpif'.
>       *
>       * The implementation should point 'upcall->key' and
> 'upcall->userdata'
>       * (if any) into data in the caller-provided 'buf'.  The
> implementation may
> @@ -349,12 +352,13 @@ struct dpif_class {
>       *
>       * This function must not block.  If no upcall is pending when it is
>       * called, it should return EAGAIN without blocking. */
> -    int (*recv)(struct dpif *dpif, struct dpif_upcall *upcall,
> -                struct ofpbuf *buf);
> +    int (*recv)(struct dpif *dpif, uint32_t handler_id,
> +                struct dpif_upcall *upcall, struct ofpbuf *buf);
>
> -    /* Arranges for the poll loop to wake up when 'dpif' has a message
> queued
> -     * to be received with the recv member function. */
> -    void (*recv_wait)(struct dpif *dpif);
> +    /* Arranges for the poll loop for handler with 'handler_id' to wake
> up when
> +     * 'dpif' has a message queued to be received with the recv member
> +     * function by the handler. */
> +    void (*recv_wait)(struct dpif *dpif, uint32_t handler_id);
>
>      /* Throws away any queued upcalls that 'dpif' currently has ready to
>       * return. */
> diff --git a/lib/dpif.c b/lib/dpif.c
> index 2b79a6e..72897b3 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -634,7 +634,7 @@ dpif_port_query_by_name(const struct dpif *dpif, const
> char *devname,
>
>  /* Returns the Netlink PID value to supply in OVS_ACTION_ATTR_USERSPACE
> actions
>   * as the OVS_USERSPACE_ATTR_PID attribute's value, for use in flows whose
> - * packets arrived on port 'port_no'.
> + * packets arrived on port 'port_no' and 5-tuple hash is 'hash'.
>   *
>   * A 'port_no' of ODPP_NONE is a special case: it returns a reserved PID,
> not
>   * allocated to any port, that the client may use for special purposes.
> @@ -645,10 +645,10 @@ dpif_port_query_by_name(const struct dpif *dpif,
> const char *devname,
>   * update all of the flows that it installed that contain
>   * OVS_ACTION_ATTR_USERSPACE actions. */
>  uint32_t
> -dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no)
> +dpif_port_get_pid(const struct dpif *dpif, odp_port_t port_no, uint32_t
> hash)
>  {
>      return (dpif->dpif_class->port_get_pid
> -            ? (dpif->dpif_class->port_get_pid)(dpif, port_no)
> +            ? (dpif->dpif_class->port_get_pid)(dpif, port_no, hash)
>              : 0);
>  }
>
> @@ -1248,24 +1248,18 @@ dpif_upcall_type_to_string(enum dpif_upcall_type
> type)
>      }
>  }
>
> -/* Enables or disables receiving packets with dpif_recv() on 'dpif'.
>  Returns 0
> - * if successful, otherwise a positive errno value.
> - *
> - * Turning packet receive off and then back on may change the Netlink PID
> - * assignments returned by dpif_port_get_pid().  If the client does this,
> it
> - * must update all of the flows that have OVS_ACTION_ATTR_USERSPACE
> actions
> - * using the new PID assignment. */
>  int
> -dpif_recv_set(struct dpif *dpif, bool enable)
> +dpif_recv_set(struct dpif *dpif, bool enable, uint32_t n_handlers)
>  {
> -    int error = dpif->dpif_class->recv_set(dpif, enable);
> +    int error = dpif->dpif_class->recv_set(dpif, enable, n_handlers);
>      log_operation(dpif, "recv_set", error);
>      return error;
>  }
>
> -/* Polls for an upcall from 'dpif'.  If successful, stores the upcall into
> - * '*upcall', using 'buf' for storage.  Should only be called if
> - * dpif_recv_set() has been used to enable receiving packets on 'dpif'.
> +/* Polls for an upcall from 'dpif' for handler with 'handler_id'.  If
> + * successful, stores the upcall into '*upcall', using 'buf' for storage.
> + * Should only be called if dpif_recv_set() has been used to enable
> receiving
> + * packets on 'dpif'.
>   *
>   * 'upcall->key' and 'upcall->userdata' point into data in the
> caller-provided
>   * 'buf', so their memory cannot be freed separately from 'buf'.
> @@ -1280,9 +1274,10 @@ dpif_recv_set(struct dpif *dpif, bool enable)
>   * Returns 0 if successful, otherwise a positive errno value.  Returns
> EAGAIN
>   * if no upcall is immediately available. */
>  int
> -dpif_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf
> *buf)
> +dpif_recv(struct dpif *dpif, uint32_t handler_id, struct dpif_upcall
> *upcall,
> +          struct ofpbuf *buf)
>  {
> -    int error = dpif->dpif_class->recv(dpif, upcall, buf);
> +    int error = dpif->dpif_class->recv(dpif, handler_id, upcall, buf);
>      if (!error && !VLOG_DROP_DBG(&dpmsg_rl)) {
>          struct ds flow;
>          char *packet;
> @@ -1316,12 +1311,13 @@ dpif_recv_purge(struct dpif *dpif)
>      }
>  }
>
> -/* Arranges for the poll loop to wake up when 'dpif' has a message queued
> to be
> - * received with dpif_recv(). */
> +/* Arranges for the poll loop for handler with 'handler_id' to wake up
> when
> + * 'dpif' has a message queued to be received with dpif_recv() by the
> handler.
> + */
>  void
> -dpif_recv_wait(struct dpif *dpif)
> +dpif_recv_wait(struct dpif *dpif, uint32_t handler_id)
>  {
> -    dpif->dpif_class->recv_wait(dpif);
> +       dpif->dpif_class->recv_wait(dpif, handler_id);
>  }
>
>  /* Obtains the NetFlow engine type and engine ID for 'dpif' into
> '*engine_type'
> diff --git a/lib/dpif.h b/lib/dpif.h
> index 7f986f9..b6e3fbd 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -445,7 +445,8 @@ int dpif_port_query_by_name(const struct dpif *, const
> char *devname,
>                              struct dpif_port *);
>  int dpif_port_get_name(struct dpif *, odp_port_t port_no,
>                         char *name, size_t name_size);
> -uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no);
> +uint32_t dpif_port_get_pid(const struct dpif *, odp_port_t port_no,
> +                           uint32_t hash);
>
>  struct dpif_port_dump {
>      const struct dpif *dpif;
> @@ -613,10 +614,11 @@ struct dpif_upcall {
>      struct nlattr *userdata;    /* Argument to OVS_ACTION_ATTR_USERSPACE.
> */
>  };
>
> -int dpif_recv_set(struct dpif *, bool enable);
> -int dpif_recv(struct dpif *, struct dpif_upcall *, struct ofpbuf *);
> +int dpif_recv_set(struct dpif *, bool enable, uint32_t n_handlers);
> +int dpif_recv(struct dpif *, uint32_t handler_id, struct dpif_upcall *,
> +              struct ofpbuf *);
>  void dpif_recv_purge(struct dpif *);
> -void dpif_recv_wait(struct dpif *);
> +void dpif_recv_wait(struct dpif *, uint32_t handler_id);
>
>  /* Miscellaneous. */
>
> diff --git a/lib/flow.c b/lib/flow.c
> index 06ba036..db29db0 100644
> --- a/lib/flow.c
> +++ b/lib/flow.c
> @@ -822,6 +822,24 @@ flow_wildcards_set_reg_mask(struct flow_wildcards
> *wc, int idx, uint32_t mask)
>      wc->masks.regs[idx] = mask;
>  }
>
> +/* Calculates the 5-tuple hash from the given flow. */
> +uint32_t
> +flow_hash_5tuple(const struct flow *flow, uint32_t basis)
> +{
> +    uint32_t hash;
> +
> +    if (!flow) {
> +        return 0;
> +    }
> +
> +    hash = (OVS_FORCE int) flow->nw_src
> +           ^ (OVS_FORCE int) flow->nw_dst
> +           ^ flow->nw_proto ^ (OVS_FORCE int) flow->tp_src
> +           ^ (OVS_FORCE int) flow->tp_dst;
> +
> +    return jhash_bytes((void *) &hash, sizeof hash, basis);
> +}
> +
>  /* Hashes 'flow' based on its L2 through L4 protocol information. */
>  uint32_t
>  flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis)
> diff --git a/lib/flow.h b/lib/flow.h
> index 3109a84..26871a2 100644
> --- a/lib/flow.h
> +++ b/lib/flow.h
> @@ -323,7 +323,8 @@ void flow_wildcards_fold_minimask_range(struct
> flow_wildcards *,
>  uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t
> basis);
>  bool flow_wildcards_equal(const struct flow_wildcards *,
>                            const struct flow_wildcards *);
> -uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis);
> +uint32_t flow_hash_5tuple(const struct flow *, uint32_t basis);
> +uint32_t flow_hash_symmetric_l4(const struct flow *, uint32_t basis);
>
>  /* Initialize a flow with random fields that matter for nx_hash_fields. */
>  void flow_random_hash_fields(struct flow *);
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index 489012a..e6df302 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -45,26 +45,11 @@
>
>  VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
>
> -COVERAGE_DEFINE(upcall_queue_overflow);
> -
> -/* A thread that processes each upcall handed to it by the dispatcher
> thread,
> - * forwards the upcall's packet, and possibly sets up a kernel flow as a
> - * cache. */
>  struct handler {
>      struct udpif *udpif;               /* Parent udpif. */
>      pthread_t thread;                  /* Thread ID. */
>      char *name;                        /* Thread name. */
> -
> -    struct ovs_mutex mutex;            /* Mutex guarding the following. */
> -
> -    /* Atomic queue of unprocessed upcalls. */
> -    struct list upcalls OVS_GUARDED;
> -    size_t n_upcalls OVS_GUARDED;
> -
> -    bool need_signal;                  /* Only changed by the dispatcher.
> */
> -
> -    pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
> -                                          'mutex'. */
> +    uint32_t handler_id;               /* Handler id. */
>  };
>
>  /* A thread that processes each kernel flow handed to it by the
> flow_dumper
> @@ -89,9 +74,6 @@ struct revalidator {
>   *
>   * udpif has two logically separate pieces:
>   *
> - *    - A "dispatcher" thread that reads upcalls from the kernel and
> dispatches
> - *      them to one of several "handler" threads (see struct handler).
> - *
>   *    - A "flow_dumper" thread that reads the kernel flow table and
> dispatches
>   *      flows to one of several "revalidator" threads (see struct
>   *      revalidator). */
> @@ -103,7 +85,6 @@ struct udpif {
>
>      uint32_t secret;                   /* Random seed for upcall hash. */
>
> -    pthread_t dispatcher;              /* Dispatcher thread ID. */
>      pthread_t flow_dumper;             /* Flow dumper thread ID. */
>
>      struct handler *handlers;          /* Upcall handlers. */
> @@ -143,7 +124,7 @@ enum upcall_type {
>  };
>
>  struct upcall {
> -    struct list list_node;          /* For queuing upcalls. */
> +    bool is_valid;
>      struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
>
>      /* Raw upcall plus data for keeping track of the memory backing it. */
> @@ -216,15 +197,14 @@ struct flow_miss {
>      bool put;
>  };
>
> -static void upcall_destroy(struct upcall *);
> +static void upcall_destroy(struct upcall *, bool free_upcall);
>
>  static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>  static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
>
> -static void recv_upcalls(struct udpif *);
> -static void handle_upcalls(struct handler *handler, struct list *upcalls);
> +static void handle_upcalls(struct handler *handler, struct upcall
> *upcalls,
> +                           size_t n_upcalls);
>  static void *udpif_flow_dumper(void *);
> -static void *udpif_dispatcher(void *);
>  static void *udpif_upcall_handler(void *);
>  static void *udpif_revalidator(void *);
>  static uint64_t udpif_get_n_flows(struct udpif *);
> @@ -307,9 +287,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
>          for (i = 0; i < udpif->n_handlers; i++) {
>              struct handler *handler = &udpif->handlers[i];
>
> -            ovs_mutex_lock(&handler->mutex);
> -            xpthread_cond_signal(&handler->wake_cond);
> -            ovs_mutex_unlock(&handler->mutex);
>              xpthread_join(handler->thread, NULL);
>          }
>
> @@ -323,7 +300,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
>          }
>
>          xpthread_join(udpif->flow_dumper, NULL);
> -        xpthread_join(udpif->dispatcher, NULL);
>
>          for (i = 0; i < udpif->n_revalidators; i++) {
>              struct revalidator *revalidator = &udpif->revalidators[i];
> @@ -347,17 +323,7 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
>          }
>
>          for (i = 0; i < udpif->n_handlers; i++) {
> -            struct handler *handler = &udpif->handlers[i];
> -            struct upcall *miss, *next;
> -
> -            LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls)
> {
> -                list_remove(&miss->list_node);
> -                upcall_destroy(miss);
> -            }
> -            ovs_mutex_destroy(&handler->mutex);
> -
> -            xpthread_cond_destroy(&handler->wake_cond);
> -            free(handler->name);
> +            free(udpif->handlers[i].name);
>          }
>          latch_poll(&udpif->exit_latch);
>
> @@ -382,10 +348,7 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
>              struct handler *handler = &udpif->handlers[i];
>
>              handler->udpif = udpif;
> -            list_init(&handler->upcalls);
> -            handler->need_signal = false;
> -            xpthread_cond_init(&handler->wake_cond, NULL);
> -            ovs_mutex_init(&handler->mutex);
> +            handler->handler_id = i;
>              xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
>                              handler);
>          }
> @@ -403,7 +366,6 @@ udpif_set_threads(struct udpif *udpif, size_t
> n_handlers,
>              xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
>                              revalidator);
>          }
> -        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher,
> udpif);
>          xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper,
> udpif);
>      }
>  }
> @@ -430,16 +392,9 @@ udpif_get_memory_usage(struct udpif *udpif, struct
> simap *usage)
>  {
>      size_t i;
>
> -    simap_increase(usage, "dispatchers", 1);
>      simap_increase(usage, "flow_dumpers", 1);
>
>      simap_increase(usage, "handlers", udpif->n_handlers);
> -    for (i = 0; i < udpif->n_handlers; i++) {
> -        struct handler *handler = &udpif->handlers[i];
> -        ovs_mutex_lock(&handler->mutex);
> -        simap_increase(usage, "handler upcalls",  handler->n_upcalls);
> -        ovs_mutex_unlock(&handler->mutex);
> -    }
>
>      simap_increase(usage, "revalidators", udpif->n_revalidators);
>      for (i = 0; i < udpif->n_revalidators; i++) {
> @@ -468,12 +423,16 @@ udpif_flush(void)
>
>  /* Destroys and deallocates 'upcall'. */
>  static void
> -upcall_destroy(struct upcall *upcall)
> +upcall_destroy(struct upcall *upcall, bool free_upcall)
>  {
>      if (upcall) {
>          ofpbuf_uninit(&upcall->dpif_upcall.packet);
>          ofpbuf_uninit(&upcall->upcall_buf);
> -        free(upcall);
> +
> +        upcall->is_valid = false;
> +        if (free_upcall) {
> +            free(upcall);
> +        }
>      }
>  }
>
> @@ -499,24 +458,6 @@ udpif_get_n_flows(struct udpif *udpif)
>      return flow_count;
>  }
>
> -/* The dispatcher thread is responsible for receiving upcalls from the
> kernel,
> - * assigning them to a upcall_handler thread. */
> -static void *
> -udpif_dispatcher(void *arg)
> -{
> -    struct udpif *udpif = arg;
> -
> -    set_subprogram_name("dispatcher");
> -    while (!latch_is_set(&udpif->exit_latch)) {
> -        recv_upcalls(udpif);
> -        dpif_recv_wait(udpif->dpif);
> -        latch_wait(&udpif->exit_latch);
> -        poll_block();
> -    }
> -
> -    return NULL;
> -}
> -
>  static void *
>  udpif_flow_dumper(void *arg)
>  {
> @@ -627,38 +568,44 @@ udpif_flow_dumper(void *arg)
>      return NULL;
>  }
>
> -/* The miss handler thread is responsible for processing miss upcalls
> retrieved
> - * by the dispatcher thread.  Once finished it passes the processed miss
> - * upcalls to ofproto-dpif where they're installed in the datapath. */
>  static void *
>  udpif_upcall_handler(void *arg)
>  {
>      struct handler *handler = arg;
> +    struct udpif *udpif = handler->udpif;
> +    struct upcall upcalls[FLOW_MISS_MAX_BATCH];
>
>      handler->name = xasprintf("handler_%u", ovsthread_id_self());
>      set_subprogram_name("%s", handler->name);
>
>      while (!latch_is_set(&handler->udpif->exit_latch)) {
> -        struct list misses = LIST_INITIALIZER(&misses);
> -        size_t i;
> -
> -        ovs_mutex_lock(&handler->mutex);
> -        if (!handler->n_upcalls) {
> -            ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
> -        }
> +        size_t i, n_upcalls;
>
>          for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
> -            if (handler->n_upcalls) {
> -                handler->n_upcalls--;
> -                list_push_back(&misses,
> list_pop_front(&handler->upcalls));
> -            } else {
> +            struct upcall *upcall = &upcalls[i];
> +            int error;
> +
> +            ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> +                            sizeof upcall->upcall_stub);
> +            error = dpif_recv(udpif->dpif, handler->handler_id,
> +                              &upcall->dpif_upcall, &upcall->upcall_buf);
> +            if (error) {
> +                /* upcall_destroy() can only be called on successfully
> received
> +                 * upcalls. */
> +                ofpbuf_uninit(&upcall->upcall_buf);
>                  break;
>              }
> +            upcall->is_valid = true;
>          }
> -        ovs_mutex_unlock(&handler->mutex);
> -
> -        handle_upcalls(handler, &misses);
>
> +        n_upcalls = i;
> +        if (!n_upcalls) {
> +            dpif_recv_wait(udpif->dpif, handler->handler_id);
> +            latch_wait(&udpif->exit_latch);
> +            poll_block();
> +        } else {
> +            handle_upcalls(handler, upcalls, n_upcalls);
> +        }
>          coverage_clear();
>      }
>
> @@ -765,98 +712,11 @@ classify_upcall(const struct upcall *upcall)
>      }
>  }
>
> -static void
> -recv_upcalls(struct udpif *udpif)
> -{
> -    int n;
> -
> -    for (;;) {
> -        uint32_t hash = udpif->secret;
> -        struct handler *handler;
> -        struct upcall *upcall;
> -        size_t n_bytes, left;
> -        struct nlattr *nla;
> -        int error;
> -
> -        upcall = xmalloc(sizeof *upcall);
> -        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> -                        sizeof upcall->upcall_stub);
> -        error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
> -                          &upcall->upcall_buf);
> -        if (error) {
> -            /* upcall_destroy() can only be called on successfully
> received
> -             * upcalls. */
> -            ofpbuf_uninit(&upcall->upcall_buf);
> -            free(upcall);
> -            break;
> -        }
> -
> -        n_bytes = 0;
> -        NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
> -                          upcall->dpif_upcall.key_len) {
> -            enum ovs_key_attr type = nl_attr_type(nla);
> -            if (type == OVS_KEY_ATTR_IN_PORT
> -                || type == OVS_KEY_ATTR_TCP
> -                || type == OVS_KEY_ATTR_UDP) {
> -                if (nl_attr_get_size(nla) == 4) {
> -                    hash = mhash_add(hash, nl_attr_get_u32(nla));
> -                    n_bytes += 4;
> -                } else {
> -                    VLOG_WARN_RL(&rl,
> -                                 "Netlink attribute with incorrect
> size.");
> -                }
> -            }
> -        }
> -        hash =  mhash_finish(hash, n_bytes);
> -
> -        handler = &udpif->handlers[hash % udpif->n_handlers];
> -
> -        ovs_mutex_lock(&handler->mutex);
> -        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
> -            list_push_back(&handler->upcalls, &upcall->list_node);
> -            if (handler->n_upcalls == 0) {
> -                handler->need_signal = true;
> -            }
> -            handler->n_upcalls++;
> -            if (handler->need_signal &&
> -                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
> -                handler->need_signal = false;
> -                xpthread_cond_signal(&handler->wake_cond);
> -            }
> -            ovs_mutex_unlock(&handler->mutex);
> -            if (!VLOG_DROP_DBG(&rl)) {
> -                struct ds ds = DS_EMPTY_INITIALIZER;
> -
> -                odp_flow_key_format(upcall->dpif_upcall.key,
> -                                    upcall->dpif_upcall.key_len,
> -                                    &ds);
> -                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
> -                ds_destroy(&ds);
> -            }
> -        } else {
> -            ovs_mutex_unlock(&handler->mutex);
> -            COVERAGE_INC(upcall_queue_overflow);
> -            upcall_destroy(upcall);
> -        }
> -    }
> -
> -    for (n = 0; n < udpif->n_handlers; ++n) {
> -        struct handler *handler = &udpif->handlers[n];
> -
> -        if (handler->need_signal) {
> -            handler->need_signal = false;
> -            ovs_mutex_lock(&handler->mutex);
> -            xpthread_cond_signal(&handler->wake_cond);
> -            ovs_mutex_unlock(&handler->mutex);
> -        }
> -    }
> -}
> -
>  /* Calculates slow path actions for 'xout'.  'buf' must statically be
>   * initialized with at least 128 bytes of space. */
>  static void
>  compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
> -                  odp_port_t odp_in_port, struct ofpbuf *buf)
> +                  struct flow *flow, odp_port_t odp_in_port, struct
> ofpbuf *buf)
>  {
>      union user_action_cookie cookie;
>      odp_port_t port;
> @@ -869,7 +729,7 @@ compose_slow_path(struct udpif *udpif, struct
> xlate_out *xout,
>      port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
>          ? ODPP_NONE
>          : odp_in_port;
> -    pid = dpif_port_get_pid(udpif->dpif, port);
> +    pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
>      odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
>  }
>
> @@ -889,7 +749,8 @@ flow_miss_find(struct hmap *todo, const struct
> ofproto_dpif *ofproto,
>  }
>
>  static void
> -handle_upcalls(struct handler *handler, struct list *upcalls)
> +handle_upcalls(struct handler *handler, struct upcall *upcalls,
> +               size_t n_upcalls)
>  {
>      struct hmap misses = HMAP_INITIALIZER(&misses);
>      struct udpif *udpif = handler->udpif;
> @@ -898,7 +759,6 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>      struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
>      struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
>      struct flow_miss *miss, *next_miss;
> -    struct upcall *upcall, *next;
>      size_t n_misses, n_ops, i;
>      unsigned int flow_limit;
>      bool fail_open, may_put;
> @@ -927,7 +787,8 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>       *     datapath flow.)
>       */
>      n_misses = 0;
> -    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
> +    for (i = 0; i < n_upcalls; i++) {
> +        struct upcall *upcall = &upcalls[i];
>          struct dpif_upcall *dupcall = &upcall->dpif_upcall;
>          struct flow_miss *miss = &miss_buf[n_misses];
>          struct ofpbuf *packet = &dupcall->packet;
> @@ -956,8 +817,7 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>                                dupcall->key, dupcall->key_len, NULL, 0,
> NULL, 0,
>                                NULL);
>              }
> -            list_remove(&upcall->list_node);
> -            upcall_destroy(upcall);
> +            upcall_destroy(upcall, false);
>              continue;
>          }
>
> @@ -1039,8 +899,7 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>          dpif_ipfix_unref(ipfix);
>          dpif_sflow_unref(sflow);
>
> -        list_remove(&upcall->list_node);
> -        upcall_destroy(upcall);
> +        upcall_destroy(upcall, false);
>      }
>
>      /* Initialize each 'struct flow_miss's ->xout.
> @@ -1083,12 +942,17 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>       * The loop fills 'ops' with an array of operations to execute in the
>       * datapath. */
>      n_ops = 0;
> -    LIST_FOR_EACH (upcall, list_node, upcalls) {
> +    for (i = 0; i < n_upcalls; i++) {
> +        struct upcall *upcall = &upcalls[i];
>          struct flow_miss *miss = upcall->flow_miss;
>          struct ofpbuf *packet = &upcall->dpif_upcall.packet;
>          struct dpif_op *op;
>          ovs_be16 flow_vlan_tci;
>
> +        if (!upcall->is_valid) {
> +            continue;
> +        }
> +
>          /* Save a copy of flow.vlan_tci in case it is changed to
>           * generate proper mega flow masks for VLAN splinter flows. */
>          flow_vlan_tci = miss->flow.vlan_tci;
> @@ -1162,7 +1026,8 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>
>                  ofpbuf_use_stack(&buf, miss->slow_path_buf,
>                                   sizeof miss->slow_path_buf);
> -                compose_slow_path(udpif, &miss->xout, miss->odp_in_port,
> &buf);
> +                compose_slow_path(udpif, &miss->xout, &miss->flow,
> +                                  miss->odp_in_port, &buf);
>                  op->u.flow_put.actions = buf.data;
>                  op->u.flow_put.actions_len = buf.size;
>              }
> @@ -1197,11 +1062,16 @@ handle_upcalls(struct handler *handler, struct
> list *upcalls)
>       *
>       * Copy packets before they are modified by execution. */
>      if (fail_open) {
> -        LIST_FOR_EACH (upcall, list_node, upcalls) {
> +        for (i = 0; i < n_upcalls; i++) {
> +            struct upcall *upcall = &upcalls[i];
>              struct flow_miss *miss = upcall->flow_miss;
>              struct ofpbuf *packet = &upcall->dpif_upcall.packet;
>              struct ofproto_packet_in *pin;
>
> +            if (!upcall->is_valid) {
> +                continue;
> +            }
> +
>              pin = xmalloc(sizeof *pin);
>              pin->up.packet = xmemdup(packet->data, packet->size);
>              pin->up.packet_len = packet->size;
> @@ -1227,9 +1097,10 @@ handle_upcalls(struct handler *handler, struct list
> *upcalls)
>      }
>      hmap_destroy(&misses);
>
> -    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
> -        list_remove(&upcall->list_node);
> -        upcall_destroy(upcall);
> +    for (i = 0; i < n_upcalls; i++) {
> +        if (upcalls[i].is_valid) {
> +            upcall_destroy(&upcalls[i], false);
> +        }
>      }
>  }
>
> @@ -1324,7 +1195,7 @@ revalidate_ukey(struct udpif *udpif, struct
> udpif_flow_dump *udump,
>                           xout.odp_actions.size);
>      } else {
>          ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof
> slow_path_buf);
> -        compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
> +        compose_slow_path(udpif, &xout, &flow, odp_in_port,
> &xout_actions);
>      }
>
>      if (!ofpbuf_equal(&xout_actions, actions)) {
> @@ -1525,16 +1396,6 @@ upcall_unixctl_show(struct unixctl_conn *conn, int
> argc OVS_UNUSED,
>          ds_put_format(&ds, "\tdump duration : %lldms\n",
> udpif->dump_duration);
>
>          ds_put_char(&ds, '\n');
> -        for (i = 0; i < udpif->n_handlers; i++) {
> -            struct handler *handler = &udpif->handlers[i];
> -
> -            ovs_mutex_lock(&handler->mutex);
> -            ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
> -                          handler->name, handler->n_upcalls);
> -            ovs_mutex_unlock(&handler->mutex);
> -        }
> -
> -        ds_put_char(&ds, '\n');
>          for (i = 0; i < n_revalidators; i++) {
>              struct revalidator *revalidator = &udpif->revalidators[i];
>
> diff --git a/ofproto/ofproto-dpif-xlate.c b/ofproto/ofproto-dpif-xlate.c
> index ad44582..e85aa7a 100644
> --- a/ofproto/ofproto-dpif-xlate.c
> +++ b/ofproto/ofproto-dpif-xlate.c
> @@ -1483,8 +1483,9 @@ compose_sample_action(const struct xbridge *xbridge,
>      actions_offset = nl_msg_start_nested(odp_actions,
> OVS_SAMPLE_ATTR_ACTIONS);
>
>      odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
> -    pid = dpif_port_get_pid(xbridge->dpif, odp_port);
> -    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
> odp_actions);
> +    pid = dpif_port_get_pid(xbridge->dpif, odp_port,
> flow_hash_5tuple(flow, 0));
> +    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
> +                                             odp_actions);
>
>      nl_msg_end_nested(odp_actions, actions_offset);
>      nl_msg_end_nested(odp_actions, sample_offset);
> diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
> index 7b3e1eb..a53b3fd 100644
> --- a/ofproto/ofproto-dpif.c
> +++ b/ofproto/ofproto-dpif.c
> @@ -470,7 +470,8 @@ type_run(const char *type)
>
>          backer->recv_set_enable = true;
>
> -        error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
> +        error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
> +                              n_handlers);
>          if (error) {
>              VLOG_ERR("Failed to enable receiving packets in dpif.");
>              return error;
> @@ -480,6 +481,7 @@ type_run(const char *type)
>      }
>
>      if (backer->recv_set_enable) {
> +        dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);
>          udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
>      }
>
> @@ -885,7 +887,7 @@ open_dpif_backer(const char *type, struct dpif_backer
> **backerp)
>
>      shash_add(&all_dpif_backers, type, backer);
>
> -    error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
> +    error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
> n_handlers);
>      if (error) {
>          VLOG_ERR("failed to listen on datapath of type %s: %s",
>                   type, ovs_strerror(error));
> @@ -930,7 +932,7 @@ check_variable_length_userdata(struct dpif_backer
> *backer)
>      ofpbuf_init(&actions, 64);
>      start = nl_msg_start_nested(&actions, OVS_ACTION_ATTR_USERSPACE);
>      nl_msg_put_u32(&actions, OVS_USERSPACE_ATTR_PID,
> -                   dpif_port_get_pid(backer->dpif, ODPP_NONE));
> +                   dpif_port_get_pid(backer->dpif, ODPP_NONE, 0));
>      nl_msg_put_unspec_zero(&actions, OVS_USERSPACE_ATTR_USERDATA, 4);
>      nl_msg_end_nested(&actions, start);
>
> --
> 1.7.9.5
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.openvswitch.org/pipermail/ovs-dev/attachments/20140205/5a465db8/attachment-0003.html>


More information about the dev mailing list