[ovs-dev] [dpif-netdev 15/15] dpif-netdev: Use separate threads for forwarding.
Pravin Shelar
pshelar at nicira.com
Thu Jan 9 00:57:28 UTC 2014
On Fri, Dec 27, 2013 at 8:03 PM, Ben Pfaff <blp at nicira.com> wrote:
> For now, we use exactly two threads. Presumably at some point we will want
> to make this configurable.
>
> Signed-off-by: Ben Pfaff <blp at nicira.com>
Look good.
Thanks.
> ---
> lib/dpif-netdev.c | 210 +++++++++++++++++++++++++++++++++++------------------
> 1 file changed, 141 insertions(+), 69 deletions(-)
>
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 7070eac..d667afe 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -39,6 +39,7 @@
> #include "dynamic-string.h"
> #include "flow.h"
> #include "hmap.h"
> +#include "latch.h"
> #include "list.h"
> #include "meta-flow.h"
> #include "netdev.h"
> @@ -158,6 +159,11 @@ struct dp_netdev {
> struct ovs_rwlock port_rwlock;
> struct hmap ports OVS_GUARDED;
> struct seq *port_seq; /* Incremented whenever a port changes. */
> +
> + /* Forwarding threads. */
> + struct latch exit_latch;
> + struct dp_forwarder *forwarders;
> + size_t n_forwarders;
> };
>
> static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
> @@ -281,6 +287,15 @@ struct dp_netdev_actions *dp_netdev_actions_ref(
> const struct dp_netdev_actions *);
> void dp_netdev_actions_unref(struct dp_netdev_actions *);
>
> +/* A thread that receives packets from some ports, looks them up in the flow
> + * table, and executes the actions it finds. */
> +struct dp_forwarder {
> + struct dp_netdev *dp;
> + pthread_t thread;
> + char *name;
> + uint32_t min_hash, max_hash;
> +};
> +
> /* Interface to netdev-based datapath. */
> struct dpif_netdev {
> struct dpif dpif;
> @@ -317,6 +332,7 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
> struct dp_netdev_port *port,
> struct ofpbuf *packet)
> OVS_REQ_RDLOCK(dp->port_rwlock);
> +static void dp_netdev_set_threads(struct dp_netdev *, int n);
>
> static struct dpif_netdev *
> dpif_netdev_cast(const struct dpif *dpif)
> @@ -453,6 +469,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
> ovs_rwlock_init(&dp->port_rwlock);
> hmap_init(&dp->ports);
> dp->port_seq = seq_create();
> + latch_init(&dp->exit_latch);
>
> ovs_rwlock_wrlock(&dp->port_rwlock);
> error = do_add_port(dp, name, "internal", ODPP_LOCAL);
> @@ -461,6 +478,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
> dp_netdev_free(dp);
> return error;
> }
> + dp_netdev_set_threads(dp, 2);
>
> *dpp = dp;
> return 0;
> @@ -518,6 +536,9 @@ dp_netdev_free(struct dp_netdev *dp)
>
> shash_find_and_delete(&dp_netdevs, dp->name);
>
> + dp_netdev_set_threads(dp, 0);
> + free(dp->forwarders);
> +
> dp_netdev_flow_flush(dp);
> ovs_rwlock_wrlock(&dp->port_rwlock);
> HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
> @@ -539,6 +560,7 @@ dp_netdev_free(struct dp_netdev *dp)
> hmap_destroy(&dp->ports);
> atomic_flag_destroy(&dp->destroyed);
> ovs_refcount_destroy(&dp->ref_cnt);
> + latch_destroy(&dp->exit_latch);
> free(CONST_CAST(char *, dp->name));
> free(dp);
> }
> @@ -1526,6 +1548,123 @@ dp_netdev_actions_unref(struct dp_netdev_actions *actions)
> }
> }
>
> +static void *
> +dp_forwarder_main(void *f_)
> +{
> + struct dp_forwarder *f = f_;
> + struct dp_netdev *dp = f->dp;
> + struct ofpbuf packet;
> +
> + f->name = xasprintf("forwarder_%u", ovsthread_id_self());
> + set_subprogram_name("%s", f->name);
> +
> + ofpbuf_init(&packet, 0);
> + while (!latch_is_set(&dp->exit_latch)) {
> + bool received_anything;
> + int i;
> +
> + ovs_rwlock_rdlock(&dp->port_rwlock);
> + for (i = 0; i < 50; i++) {
> + struct dp_netdev_port *port;
> +
> + received_anything = false;
> + HMAP_FOR_EACH (port, node, &f->dp->ports) {
> + if (port->rx
> + && port->node.hash >= f->min_hash
> + && port->node.hash <= f->max_hash) {
> + int buf_size;
> + int error;
> + int mtu;
> +
> + if (netdev_get_mtu(port->netdev, &mtu)) {
> + mtu = ETH_PAYLOAD_MAX;
> + }
> + buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
> +
> + ofpbuf_clear(&packet);
> + ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM,
> + buf_size);
> +
> + error = netdev_rx_recv(port->rx, &packet);
> + if (!error) {
> + received_anything = true;
> + dp_netdev_port_input(dp, port, &packet);
> + } else if (error != EAGAIN && error != EOPNOTSUPP) {
> + static struct vlog_rate_limit rl
> + = VLOG_RATE_LIMIT_INIT(1, 5);
> +
> + VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
> + netdev_get_name(port->netdev),
> + ovs_strerror(error));
> + }
> + }
> + }
> +
> + if (!received_anything) {
> + break;
> + }
> + }
> +
> + if (received_anything) {
> + poll_immediate_wake();
> + } else {
> + struct dp_netdev_port *port;
> +
> + HMAP_FOR_EACH (port, node, &f->dp->ports)
> + if (port->rx
> + && port->node.hash >= f->min_hash
> + && port->node.hash <= f->max_hash) {
> + netdev_rx_wait(port->rx);
> + }
> + seq_wait(dp->port_seq, seq_read(dp->port_seq));
> + latch_wait(&dp->exit_latch);
> + }
> + ovs_rwlock_unlock(&dp->port_rwlock);
> +
> + poll_block();
> + }
> + ofpbuf_uninit(&packet);
> +
> + free(f->name);
> +
> + return NULL;
> +}
> +
> +static void
> +dp_netdev_set_threads(struct dp_netdev *dp, int n)
> +{
> + int i;
> +
> + if (n == dp->n_forwarders) {
> + return;
> + }
> +
> + /* Stop existing threads. */
> + latch_set(&dp->exit_latch);
> + for (i = 0; i < dp->n_forwarders; i++) {
> + struct dp_forwarder *f = &dp->forwarders[i];
> +
> + xpthread_join(f->thread, NULL);
> + }
> + latch_poll(&dp->exit_latch);
> + free(dp->forwarders);
> +
> + /* Start new threads. */
> + dp->forwarders = xmalloc(n * sizeof *dp->forwarders);
> + dp->n_forwarders = n;
> + for (i = 0; i < n; i++) {
> + struct dp_forwarder *f = &dp->forwarders[i];
> +
> + f->dp = dp;
> + f->min_hash = UINT32_MAX / n * i;
> + f->max_hash = UINT32_MAX / n * (i + 1) - 1;
> + if (i == n - 1) {
> + f->max_hash = UINT32_MAX;
> + }
> + xpthread_create(&f->thread, NULL, dp_forwarder_main, f);
> + }
> +}
> +
> static void
> dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
> const struct ofpbuf *packet)
> @@ -1571,73 +1710,6 @@ dp_netdev_port_input(struct dp_netdev *dp, struct dp_netdev_port *port,
> }
> }
>
> -static void
> -dpif_netdev_run(struct dpif *dpif)
> -{
> - struct dp_netdev_port *port;
> - struct dp_netdev *dp;
> - struct ofpbuf packet;
> -
> - dp = get_dp_netdev(dpif);
> - ofpbuf_init(&packet, 0);
> -
> - ovs_rwlock_rdlock(&dp->port_rwlock);
> - HMAP_FOR_EACH (port, node, &dp->ports) {
> - int buf_size;
> - int error;
> - int mtu;
> -
> - error = netdev_get_mtu(port->netdev, &mtu);
> - if (error) {
> - mtu = ETH_PAYLOAD_MAX;
> - }
> - buf_size = DP_NETDEV_HEADROOM + VLAN_ETH_HEADER_LEN + mtu;
> -
> - ofpbuf_clear(&packet);
> - ofpbuf_reserve_with_tailroom(&packet, DP_NETDEV_HEADROOM, buf_size);
> -
> - error = port->rx ? netdev_rx_recv(port->rx, &packet) : EOPNOTSUPP;
> - if (!error) {
> - dp_netdev_port_input(dp, port, &packet);
> - } else if (error != EAGAIN && error != EOPNOTSUPP) {
> - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> -
> - VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
> - netdev_get_name(port->netdev), ovs_strerror(error));
> - }
> - }
> - ovs_rwlock_unlock(&dp->port_rwlock);
> -
> - ofpbuf_uninit(&packet);
> -}
> -
> -static void
> -dpif_netdev_wait(struct dpif *dpif)
> -{
> - struct dp_netdev_port *port;
> -
> - /* There is a race here, if thread A calls dpif_netdev_wait(dpif) and
> - * thread B calls dpif_port_add(dpif) or dpif_port_remove(dpif) before
> - * A makes it to poll_block().
> - *
> - * But I think it doesn't matter:
> - *
> - * - In the dpif_port_add() case, A will not wake up when a packet
> - * arrives on the new port, but this would also happen if the
> - * ordering were reversed.
> - *
> - * - In the dpif_port_remove() case, A might wake up spuriously, but
> - * that is harmless. */
> -
> - ovs_mutex_lock(&dp_netdev_mutex);
> - HMAP_FOR_EACH (port, node, &get_dp_netdev(dpif)->ports) {
> - if (port->rx) {
> - netdev_rx_wait(port->rx);
> - }
> - }
> - ovs_mutex_unlock(&dp_netdev_mutex);
> -}
> -
> static int
> dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
> int queue_no, const struct flow *flow,
> @@ -1750,8 +1822,8 @@ const struct dpif_class dpif_netdev_class = {
> dpif_netdev_open,
> dpif_netdev_close,
> dpif_netdev_destroy,
> - dpif_netdev_run,
> - dpif_netdev_wait,
> + NULL, /* run */
> + NULL, /* wait */
> dpif_netdev_get_stats,
> dpif_netdev_port_add,
> dpif_netdev_port_del,
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
More information about the dev
mailing list