[ovs-dev] [dpif-netdev 15/15] dpif-netdev: Use separate threads for forwarding.

Pravin Shelar pshelar at nicira.com
Thu Jan 9 00:57:28 UTC 2014


On Fri, Dec 27, 2013 at 8:03 PM, Ben Pfaff <blp at nicira.com> wrote:
> For now, we use exactly two threads.  Presumably at some point we will want
> to make this configurable.
>
> Signed-off-by: Ben Pfaff <blp at nicira.com>

Look good.

Thanks.

> ---
>  lib/dpif-netdev.c |  210 +++++++++++++++++++++++++++++++++++------------------
>  1 file changed, 141 insertions(+), 69 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 7070eac..d667afe 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -39,6 +39,7 @@
>  #include "dynamic-string.h"
>  #include "flow.h"
>  #include "hmap.h"
> +#include "latch.h"
>  #include "list.h"
>  #include "meta-flow.h"
>  #include "netdev.h"
> @@ -158,6 +159,11 @@ struct dp_netdev {
>      struct ovs_rwlock port_rwlock;
>      struct hmap ports OVS_GUARDED;
>      struct seq *port_seq;       /* Incremented whenever a port changes. */
> +
> +    /* Forwarding threads. */
> +    struct latch exit_latch;
> +    struct dp_forwarder *forwarders;
> +    size_t n_forwarders;
>  };
>
>  static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
> @@ -281,6 +287,15 @@ struct dp_netdev_actions *dp_netdev_actions_ref(
>      const struct dp_netdev_actions *);
>  void dp_netdev_actions_unref(struct dp_netdev_actions *);
>
> +/* A thread that receives packets from some ports, looks them up in the flow
> + * table, and executes the actions it finds. */
> +struct dp_forwarder {
> +    struct dp_netdev *dp;
> +    pthread_t thread;
> +    char *name;
> +    uint32_t min_hash, max_hash;
> +};
> +
>  /* Interface to netdev-based datapath. */
>  struct dpif_netdev {
>      struct dpif dpif;
> @@ -317,6 +332,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
>                                   struct dp_netdev_port *port,
>                                   struct ofpbuf *packet)
>      OVS_REQ_RDLOCK(dp->port_rwlock);
> +static void dp_netdev_set_threads(struct dp_netdev *, int n);
>
>  static struct dpif_netdev *
>  dpif_netdev_cast(const struct dpif *dpif)
> @@ -453,6 +469,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>      ovs_rwlock_init(&dp->port_rwlock);
>      hmap_init(&dp->ports);
>      dp->port_seq = seq_create();
> +    latch_init(&dp->exit_latch);
>
>      ovs_rwlock_wrlock(&dp->port_rwlock);
>      error = do_add_port(dp, name, "internal", ODPP_LOCAL);
> @@ -461,6 +478,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
>          dp_netdev_free(dp);
>          return error;
>      }
> +    dp_netdev_set_threads(dp, 2);
>
>      *dpp = dp;
>      return 0;
> @@ -518,6 +536,9 @@ dp_netdev_free(struct dp_netdev *dp)
>
>      shash_find_and_delete(&dp_netdevs, dp->name);
>
> +    dp_netdev_set_threads(dp, 0);
> +    free(dp->forwarders);
> +
>      dp_netdev_flow_flush(dp);
>      ovs_rwlock_wrlock(&dp->port_rwlock);
>      HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
> @@ -539,6 +560,7 @@ dp_netdev_free(struct dp_netdev *dp)
>      hmap_destroy(&dp->ports);
>      atomic_flag_destroy(&dp->destroyed);
>      ovs_refcount_destroy(&dp->ref_cnt);
> +    latch_destroy(&dp->exit_latch);
>      free(CONST_CAST(char *, dp->name));
>      free(dp);
>  }
> @@ -1526,6 +1548,123 @@ dp_netdev_actions_unref(struct dp_netdev_actions *actions)
>      }
>  }
>
> +static void *
> +dp_forwarder_main(void *f_)
> +{
> +    struct dp_forwarder *f = f_;
> +    struct dp_netdev *dp = f->dp;
> +    struct ofpbuf packet;
> +
> +    f->name = xasprintf("forwarder_%u", ovsthread_id_self());
> +    set_subprogram_name("%s", f->name);
> +
> +    ofpbuf_init(&packet, 0);
> +    while (!latch_is_set(&dp->exit_latch)) {
> +        bool received_anything;
> +        int i;
> +
> +        ovs_rwlock_rdlock(&dp->port_rwlock);
> +        for (i = 0; i < 50; i++) {
> +            struct dp_netdev_port *port;
> +
> +            received_anything = false;
> +            HMAP_FOR_EACH (port, node, &f->dp->ports) {
> +                if (port->rx
> +                    && port->node.hash >= f->min_hash
> +                    && port->node.hash <= f->max_hash) {
> +                    int buf_size;
> +                    int error;
> +                    int mtu;
> +
> +                    if (netdev_get_mtu(port->netdev, &mtu)) {
> +                        mtu = ETH_PAYLOAD_MAX;
> +                    }
> +                    buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
> +
> +                    ofpbuf_clear(&packet);
> +                    ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM,
> +                                                 buf_size);
> +
> +                    error = netdev_rx_recv(port->rx, &packet);
> +                    if (!error) {
> +                        received_anything = true;
> +                        dp_netdev_port_input(dp, port, &packet);
> +                    } else if (error != EAGAIN && error != EOPNOTSUPP) {
> +                        static struct vlog_rate_limit rl
> +                            = VLOG_RATE_LIMIT_INIT(1, 5);
> +
> +                        VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
> +                                    netdev_get_name(port->netdev),
> +                                    ovs_strerror(error));
> +                    }
> +                }
> +            }
> +
> +            if (!received_anything) {
> +                break;
> +            }
> +        }
> +
> +        if (received_anything) {
> +            poll_immediate_wake();
> +        } else {
> +            struct dp_netdev_port *port;
> +
> +            HMAP_FOR_EACH (port, node, &f->dp->ports)
> +                if (port->rx
> +                    && port->node.hash >= f->min_hash
> +                    && port->node.hash <= f->max_hash) {
> +                    netdev_rx_wait(port->rx);
> +                }
> +            seq_wait(dp->port_seq, seq_read(dp->port_seq));
> +            latch_wait(&dp->exit_latch);
> +        }
> +        ovs_rwlock_unlock(&dp->port_rwlock);
> +
> +        poll_block();
> +    }
> +    ofpbuf_uninit(&packet);
> +
> +    free(f->name);
> +
> +    return NULL;
> +}
> +
> +static void
> +dp_netdev_set_threads(struct dp_netdev *dp, int n)
> +{
> +    int i;
> +
> +    if (n == dp->n_forwarders) {
> +        return;
> +    }
> +
> +    /* Stop existing threads. */
> +    latch_set(&dp->exit_latch);
> +    for (i = 0; i < dp->n_forwarders; i++) {
> +        struct dp_forwarder *f = &dp->forwarders[i];
> +
> +        xpthread_join(f->thread, NULL);
> +    }
> +    latch_poll(&dp->exit_latch);
> +    free(dp->forwarders);
> +
> +    /* Start new threads. */
> +    dp->forwarders = xmalloc(n * sizeof *dp->forwarders);
> +    dp->n_forwarders = n;
> +    for (i = 0; i < n; i++) {
> +        struct dp_forwarder *f = &dp->forwarders[i];
> +
> +        f->dp = dp;
> +        f->min_hash = UINT32_MAX / n * i;
> +        f->max_hash = UINT32_MAX / n * (i + 1) - 1;
> +        if (i == n - 1) {
> +            f->max_hash = UINT32_MAX;
> +        }


> +        xpthread_create(&f->thread, NULL, dp_forwarder_main, f);
> +    }
> +}
> +
>  static void
>  dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
>                      const struct ofpbuf *packet)
> @@ -1571,73 +1710,6 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
>      }
>  }
>
> -static void
> -dpif_netdev_run(struct dpif *dpif)
> -{
> -    struct dp_netdev_port *port;
> -    struct dp_netdev *dp;
> -    struct ofpbuf packet;
> -
> -    dp = get_dp_netdev(dpif);
> -    ofpbuf_init(&packet, 0);
> -
> -    ovs_rwlock_rdlock(&dp->port_rwlock);
> -    HMAP_FOR_EACH (port, node, &dp->ports) {
> -        int buf_size;
> -        int error;
> -        int mtu;
> -
> -        error = netdev_get_mtu(port->netdev, &mtu);
> -        if (error) {
> -            mtu = ETH_PAYLOAD_MAX;
> -        }
> -        buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
> -
> -        ofpbuf_clear(&packet);
> -        ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM, buf_size);
> -
> -        error = port->rx ? netdev_rx_recv(port->rx, &packet) : EOPNOTSUPP;
> -        if (!error) {
> -            dp_netdev_port_input(dp, port, &packet);
> -        } else if (error != EAGAIN && error != EOPNOTSUPP) {
> -            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> -
> -            VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
> -                        netdev_get_name(port->netdev), ovs_strerror(error));
> -        }
> -    }
> -    ovs_rwlock_unlock(&dp->port_rwlock);
> -
> -    ofpbuf_uninit(&packet);
> -}
> -
> -static void
> -dpif_netdev_wait(struct dpif *dpif)
> -{
> -    struct dp_netdev_port *port;
> -
> -    /* There is a race here, if thread A calls dpif_netdev_wait(dpif) and
> -     * thread B calls dpif_port_add(dpif) or dpif_port_remove(dpif) before
> -     * A makes it to poll_block().
> -     *
> -     * But I think it doesn't matter:
> -     *
> -     *     - In the dpif_port_add() case, A will not wake up when a packet
> -     *       arrives on the new port, but this would also happen if the
> -     *       ordering were reversed.
> -     *
> -     *     - In the dpif_port_remove() case, A might wake up spuriously, but
> -     *       that is harmless. */
> -
> -    ovs_mutex_lock(&dp_netdev_mutex);
> -    HMAP_FOR_EACH (port, node, &get_dp_netdev(dpif)->ports) {
> -        if (port->rx) {
> -            netdev_rx_wait(port->rx);
> -        }
> -    }
> -    ovs_mutex_unlock(&dp_netdev_mutex);
> -}
> -
>  static int
>  dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
>                             int queue_no, const struct flow *flow,
> @@ -1750,8 +1822,8 @@ const struct dpif_class dpif_netdev_class = {
>      dpif_netdev_open,
>      dpif_netdev_close,
>      dpif_netdev_destroy,
> -    dpif_netdev_run,
> -    dpif_netdev_wait,
> +    NULL,                       /* run */
> +    NULL,                       /* wait */
>      dpif_netdev_get_stats,
>      dpif_netdev_port_add,
>      dpif_netdev_port_del,
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list