[ovs-dev] [RFC] ofproto-dpif-upcall: Improve upcall handling fairness.

Alex Wang alexw at nicira.com
Wed Nov 20 02:14:04 UTC 2013


Hey Ben and Ethan,

Could you review this RFC when you are available?

This patch is pretty complete.  I'll still do some experiments to find the
best UPCALL_QUEUE_LENGTH and UPCALL_QUEUES value.

Thanks,
Alex Wang,


On Tue, Nov 19, 2013 at 6:09 PM, Alex Wang <alexw at nicira.com> wrote:

> This commit improves the upcall dispatching fairness by
> introduing a 2-stage scheme.  And the two stages are run in two
> threads, 'dispatcher' and 'distributor', respectively.
>
> At the first stage, the dispatcher thread will read upcalls from
> kernel and put the upcalls into the fair queues based on the L2
> header information.  At the second stage, the distributor thread
> will iterate over the fair queues in a Round Robin fashion.  Each
> time, it will insert the upcall at the front of the fair queue into
> the corresponding upcall handler thread's queue for processing.
>
> Experiment shows big improvement in handling fairness and slight
> improvement in flow setup rate.
>
> Signed-off-by: Alex Wang <alexw at nicira.com>
> ---
>  lib/guarded-list.c            |   12 ++
>  lib/guarded-list.h            |    1 +
>  ofproto/ofproto-dpif-upcall.c |  267
> +++++++++++++++++++++++++++++++++--------
>  3 files changed, 232 insertions(+), 48 deletions(-)
>
> diff --git a/lib/guarded-list.c b/lib/guarded-list.c
> index cbb2030..2dc0c48 100644
> --- a/lib/guarded-list.c
> +++ b/lib/guarded-list.c
> @@ -44,6 +44,18 @@ guarded_list_is_empty(const struct guarded_list *list)
>      return empty;
>  }
>
> +struct list*
> +guarded_list_front(struct guarded_list *list)
> +{
> +    struct list *ret;
> +
> +    ovs_mutex_lock(&list->mutex);
> +    ret = list_front(&list->list);
> +    ovs_mutex_unlock(&list->mutex);
> +
> +    return ret;
> +}
> +
>  /* If 'list' has fewer than 'max' elements, adds 'node' at the end of the
> list
>   * and returns the number of elements now on the list.
>   *
> diff --git a/lib/guarded-list.h b/lib/guarded-list.h
> index 625865d..1a26dc8 100644
> --- a/lib/guarded-list.h
> +++ b/lib/guarded-list.h
> @@ -33,6 +33,7 @@ void guarded_list_destroy(struct guarded_list *);
>
>  bool guarded_list_is_empty(const struct guarded_list *);
>
> +struct list *guarded_list_front(struct guarded_list *);
>  size_t guarded_list_push_back(struct guarded_list *, struct list *,
>                                size_t max);
>  struct list *guarded_list_pop_front(struct guarded_list *);
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index dba3d3b..97e9d22 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -36,7 +36,9 @@
>  #include "poll-loop.h"
>  #include "vlog.h"
>
> -#define MAX_QUEUE_LENGTH 512
> +#define MAX_QUEUE_LENGTH 128
> +#define UPCALL_QUEUES 512
> +#define UPCALL_QUEUE_LENGTH 64
>
>  VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
>
> @@ -45,7 +47,7 @@ COVERAGE_DEFINE(upcall_queue_overflow);
>  COVERAGE_DEFINE(fmb_queue_overflow);
>  COVERAGE_DEFINE(fmb_queue_revalidated);
>
> -/* A thread that processes each upcall handed to it by the dispatcher
> thread,
> +/* A thread that processes each upcall handed to it by the distributor
> thread,
>   * forwards the upcall's packet, and then queues it to the main
> ofproto_dpif
>   * to possibly set up a kernel flow as a cache. */
>  struct handler {
> @@ -58,8 +60,8 @@ struct handler {
>      struct list upcalls OVS_GUARDED;
>      size_t n_upcalls OVS_GUARDED;
>
> -    size_t n_new_upcalls;              /* Only changed by the dispatcher.
> */
> -    bool need_signal;                  /* Only changed by the dispatcher.
> */
> +    size_t n_new_upcalls;              /* Only changed by the
> distributor. */
> +    bool need_signal;                  /* Only changed by the
> distributor. */
>
>      pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
>                                            'mutex'. */
> @@ -67,11 +69,12 @@ struct handler {
>
>  /* An upcall handler for ofproto_dpif.
>   *
> - * udpif is implemented as a "dispatcher" thread that reads upcalls from
> the
> - * kernel.  It processes each upcall just enough to figure out its next
> - * destination.  For a "miss" upcall (MISS_UPCALL), this is one of several
> - * "handler" threads (see struct handler).  Other upcalls are queued to
> the
> - * main ofproto_dpif. */
> + * udpif is implemented as two threads, "dispatcher" and "distributor".
> + * "dispatcher" thread reads upcalls from the kernel and puts the upcalls
> + * to the corresponding fair queues based on L2+L3 header.  "distributor"
> + * thread reads upcalls from fair queues in a round-robin fashion and puts
> + * the upcalls to the corresponding upcall handler's queue based on L4
> + * header. */
>  struct udpif {
>      struct dpif *dpif;                 /* Datapath handle. */
>      struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
> @@ -79,6 +82,7 @@ struct udpif {
>      uint32_t secret;                   /* Random seed for upcall hash. */
>
>      pthread_t dispatcher;              /* Dispatcher thread ID. */
> +    pthread_t distributor;             /* Distributor thread ID. */
>
>      struct handler *handlers;          /* Upcall handlers. */
>      size_t n_handlers;
> @@ -93,6 +97,33 @@ struct udpif {
>      struct seq *wait_seq;
>
>      struct latch exit_latch; /* Tells child threads to exit. */
> +
> +    /* Fair queues for upcalls. */
> +    struct guarded_list upcall_queues[UPCALL_QUEUES];
> +
> +    /* For waking up the distributor thread, when there are upcalls
> +     * to distribute. */
> +    struct seq *distributor_seq;
> +    /* For waking up the distributor thread, when the handler's queue
> +     * has more room. */
> +    struct seq *hol_block_seq;
> +
> +    struct ovs_mutex mutex;        /* Mutex guarding the following. */
> +
> +    /* Contains the index of non-empty "upcall_queues". */
> +    struct list non_empty_list;
> +    /* Indicates if upcall_queue at index has already been in
> +     * non_empty_list. */
> +    int non_empty_list_map[UPCALL_QUEUES];
> +    /* Indicates if upcall_queue at index is blocked by the front upcall.
> */
> +    int hol_block_map[UPCALL_QUEUES];
> +};
> +
> +/* In udpif's "non_empty_list".  Used to store an index, which indicates
> + * a non-empty upcall_queue. */
> +struct non_empty_list_entry {
> +    struct list list_node;
> +    int value;
>  };
>
>  enum upcall_type {
> @@ -107,6 +138,9 @@ struct upcall {
>      struct list list_node;          /* For queuing upcalls. */
>      struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
>
> +    uint32_t hash_1;                /* Used by dispatcher thread. */
> +    uint32_t hash_2;                /* Used by distributor thread. */
> +
>      /* Raw upcall plus data for keeping track of the memory backing it. */
>      struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
>      struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
> @@ -118,24 +152,34 @@ static void upcall_destroy(struct upcall *);
>  static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
>  static void recv_upcalls(struct udpif *);
> +static void distribute_upcalls(struct udpif *);
>  static void handle_upcalls(struct udpif *, struct list *upcalls);
>  static void miss_destroy(struct flow_miss *);
>  static void *udpif_dispatcher(void *);
> +static void *udpif_distributor(void *);
>  static void *udpif_upcall_handler(void *);
>
>  struct udpif *
>  udpif_create(struct dpif_backer *backer, struct dpif *dpif)
>  {
>      struct udpif *udpif = xzalloc(sizeof *udpif);
> +    size_t i;
>
>      udpif->dpif = dpif;
>      udpif->backer = backer;
>      udpif->secret = random_uint32();
>      udpif->wait_seq = seq_create();
> +    udpif->distributor_seq = seq_create();
> +    udpif->hol_block_seq = seq_create();
>      latch_init(&udpif->exit_latch);
> +    list_init(&udpif->non_empty_list);
>      guarded_list_init(&udpif->drop_keys);
>      guarded_list_init(&udpif->fmbs);
> +    ovs_mutex_init(&udpif->mutex);
>      atomic_init(&udpif->reval_seq, 0);
> +    for (i = 0; i < UPCALL_QUEUES; i++) {
> +        guarded_list_init(&udpif->upcall_queues[i]);
> +    }
>
>      return udpif;
>  }
> @@ -145,6 +189,7 @@ udpif_destroy(struct udpif *udpif)
>  {
>      struct flow_miss_batch *fmb;
>      struct drop_key *drop_key;
> +    size_t i;
>
>      udpif_recv_set(udpif, 0, false);
>
> @@ -160,6 +205,12 @@ udpif_destroy(struct udpif *udpif)
>      guarded_list_destroy(&udpif->fmbs);
>      latch_destroy(&udpif->exit_latch);
>      seq_destroy(udpif->wait_seq);
> +    seq_destroy(udpif->distributor_seq);
> +    seq_destroy(udpif->hol_block_seq);
> +    ovs_mutex_destroy(&udpif->mutex);
> +    for (i = 0; i < UPCALL_QUEUES; i++) {
> +        guarded_list_destroy(&udpif->upcall_queues[i]);
> +    }
>      free(udpif);
>  }
>
> @@ -189,6 +240,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers,
> bool enable)
>          }
>
>          xpthread_join(udpif->dispatcher, NULL);
> +        xpthread_join(udpif->distributor, NULL);
>          for (i = 0; i < udpif->n_handlers; i++) {
>              struct handler *handler = &udpif->handlers[i];
>              struct upcall *miss, *next;
> @@ -230,6 +282,7 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers,
> bool enable)
>                              handler);
>          }
>          xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher,
> udpif);
> +        xpthread_create(&udpif->distributor, NULL, udpif_distributor,
> udpif);
>      }
>  }
>
> @@ -368,8 +421,8 @@ udpif_drop_key_clear(struct udpif *udpif)
>      }
>  }
>
> -/* The dispatcher thread is responsible for receiving upcalls from the
> kernel,
> - * assigning them to a upcall_handler thread. */
> +/* The dispatcher thread is responsible for reading upcalls from
> + * the kernel, assigning them to udpif's "upcall_queues". */
>  static void *
>  udpif_dispatcher(void *arg)
>  {
> @@ -386,9 +439,36 @@ udpif_dispatcher(void *arg)
>      return NULL;
>  }
>
> -/* The miss handler thread is responsible for processing miss upcalls
> retrieved
> - * by the dispatcher thread.  Once finished it passes the processed miss
> - * upcalls to ofproto-dpif where they're installed in the datapath. */
> +/* The distributor thread is responsible for reading upcalls from
> + * udpif's "upcall_queues" and distributing them into corresponding
> + * upcall handler's queue.
> + *
> + * If all of the front upcalls in "upcall_queues" are blocked (similar
> + * to head-of-line blocking), the distributor thread will wait on the
> + * hol_block_seq. */
> +static void *
> +udpif_distributor(void *arg)
> +{
> +    struct udpif *udpif = arg;
> +
> +    set_subprogram_name("distributor");
> +    while (!latch_is_set(&udpif->exit_latch)) {
> +        uint64_t distributor_seq = seq_read(udpif->distributor_seq);
> +        uint64_t hol_block_seq = seq_read(udpif->hol_block_seq);
> +
> +        distribute_upcalls(udpif);
> +        seq_wait(udpif->distributor_seq, distributor_seq);
> +        seq_wait(udpif->hol_block_seq, hol_block_seq);
> +        latch_wait(&udpif->exit_latch);
> +        poll_block();
> +    }
> +
> +    return NULL;
> +}
> +
> +/* The miss handler thread is responsible for processing miss upcalls
> handed
> + * to it by the distributor thread.  Once finished it passes the processed
> + * miss upcalls to ofproto-dpif where they're installed in the datapath.
> */
>  static void *
>  udpif_upcall_handler(void *arg)
>  {
> @@ -420,6 +500,9 @@ udpif_upcall_handler(void *arg)
>          }
>          ovs_mutex_unlock(&handler->mutex);
>
> +        /* Changes the udpif->hol_block_seq every time reading a batch of
> +         * upcalls. */
> +        seq_change(handler->udpif->hol_block_seq);
>          handle_upcalls(handler->udpif, &misses);
>
>          coverage_clear();
> @@ -490,19 +573,21 @@ classify_upcall(const struct upcall *upcall)
>  static void
>  recv_upcalls(struct udpif *udpif)
>  {
> -    int n;
> -
> +    /* recv upcalls from dpif and put them into the upcall_queues. */
>      for (;;) {
> -        uint32_t hash = udpif->secret;
> -        struct handler *handler;
>          struct upcall *upcall;
> +        struct guarded_list *queue;
>          size_t n_bytes, left;
>          struct nlattr *nla;
> -        int error;
> +        int idx, error;
>
>          upcall = xmalloc(sizeof *upcall);
> -        ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
> +        upcall->hash_1 = udpif->secret;
> +        upcall->hash_2 = udpif->secret;
> +        ofpbuf_use_stub(&upcall->upcall_buf,
> +                        upcall->upcall_stub,
>                          sizeof upcall->upcall_stub);
> +
>          error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
>                            &upcall->upcall_buf);
>          if (error) {
> @@ -514,11 +599,20 @@ recv_upcalls(struct udpif *udpif)
>          NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
>                            upcall->dpif_upcall.key_len) {
>              enum ovs_key_attr type = nl_attr_type(nla);
> -            if (type == OVS_KEY_ATTR_IN_PORT
> -                || type == OVS_KEY_ATTR_TCP
> -                || type == OVS_KEY_ATTR_UDP) {
> +            if (type == OVS_KEY_ATTR_ETHERNET) {
> +                if (nl_attr_get_size(nla) == 12) {
> +                    upcall->hash_1 = hash_bytes(nl_attr_get_unspec(nla,
> 12),
> +                                                2 * ETH_ADDR_LEN, 0);
> +                } else {
> +                    VLOG_WARN_RL(&rl,
> +                                 "Netlink attribute with incorrect
> size.");
> +                }
> +            } else if (type == OVS_KEY_ATTR_IN_PORT
> +                       || type == OVS_KEY_ATTR_TCP
> +                       || type == OVS_KEY_ATTR_UDP) {
>                  if (nl_attr_get_size(nla) == 4) {
> -                    hash = mhash_add(hash, nl_attr_get_u32(nla));
> +                    upcall->hash_2 = mhash_add(upcall->hash_2,
> +                                               nl_attr_get_u32(nla));
>                      n_bytes += 4;
>                  } else {
>                      VLOG_WARN_RL(&rl,
> @@ -526,37 +620,112 @@ recv_upcalls(struct udpif *udpif)
>                  }
>              }
>          }
> -        hash =  mhash_finish(hash, n_bytes);
> +        upcall->hash_2 = mhash_finish(upcall->hash_2, n_bytes);
> +
> +        idx = upcall->hash_1 % UPCALL_QUEUES;
> +        /* Selects the upcall_queues to insert the upcall. */
> +        queue = &udpif->upcall_queues[idx];
> +        /* Drops upcall if queue length exceeds UPCALL_QUEUE_LENGTH. */
> +        if (!guarded_list_push_back(queue, &upcall->list_node,
> +                                    UPCALL_QUEUE_LENGTH)) {
> +            upcall_destroy(upcall);
> +        } else {
> +            /* If successfully inserted, check if the queue should be
> recorded
> +             * in the non_empty_list. */
> +            ovs_mutex_lock(&udpif->mutex);
> +            if (!udpif->non_empty_list_map[idx]
> +                && !guarded_list_is_empty(queue)) {
> +                struct non_empty_list_entry *entry = xmalloc(sizeof
> *entry);
> +
> +                entry->value = idx;
> +                udpif->non_empty_list_map[idx] = 1;
> +                list_push_back(&udpif->non_empty_list, &entry->list_node);
> +                if (list_is_singleton(&udpif->non_empty_list)) {
> +                    seq_change(udpif->distributor_seq);
> +                }
> +            }
> +            ovs_mutex_unlock(&udpif->mutex);
> +        }
> +    }
> +}
> +
> +static void
> +distribute_upcalls(struct udpif *udpif)
> +{
> +    int n;
>
> -        handler = &udpif->handlers[hash % udpif->n_handlers];
> +    while (!list_is_empty(&udpif->non_empty_list)) {
> +        struct non_empty_list_entry *front;
> +        struct guarded_list *queue;
> +        struct upcall *upcall;
> +        struct handler *handler;
> +
> +        front = CONTAINER_OF(list_front(&udpif->non_empty_list),
> +                             struct non_empty_list_entry, list_node);
> +
> +        queue = &udpif->upcall_queues[front->value];
> +        upcall = CONTAINER_OF(guarded_list_front(queue),
> +                              struct upcall, list_node);
> +        handler = &udpif->handlers[upcall->hash_2 % udpif->n_handlers];
>
>          ovs_mutex_lock(&handler->mutex);
> -        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
> -            list_push_back(&handler->upcalls, &upcall->list_node);
> -            if (handler->n_upcalls == 0) {
> -                handler->need_signal = true;
> -            }
> -            handler->n_upcalls++;
> -            if (handler->need_signal &&
> -                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
> -                handler->need_signal = false;
> -                xpthread_cond_signal(&handler->wake_cond);
> -            }
> +        if (handler->n_upcalls >= MAX_QUEUE_LENGTH) {
>              ovs_mutex_unlock(&handler->mutex);
> -            if (!VLOG_DROP_DBG(&rl)) {
> -                struct ds ds = DS_EMPTY_INITIALIZER;
> -
> -                odp_flow_key_format(upcall->dpif_upcall.key,
> -                                    upcall->dpif_upcall.key_len,
> -                                    &ds);
> -                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
> -                ds_destroy(&ds);
> +
> +            /* If the handler queue is full, moves front node to the
> back. */
> +            ovs_mutex_lock(&udpif->mutex);
> +            list_pop_front(&udpif->non_empty_list);
> +            list_push_back(&udpif->non_empty_list, &front->list_node);
> +
> +            /* Updates the hol_block_map. */
> +            if (!udpif->hol_block_map) {
> +                udpif->hol_block_map[front->value] = 1;
>              }
> +
> +            /* If all queues are blocked by the head upcall, jump out the
> +             * loop. */
> +            if (!memcmp(udpif->hol_block_map, udpif->non_empty_list_map,
> +                        UPCALL_QUEUES)) {
> +                ovs_mutex_unlock(&udpif->mutex);
> +                break;
> +            }
> +            ovs_mutex_unlock(&udpif->mutex);
> +            continue;
> +       }
> +
> +        guarded_list_pop_front(queue);
> +        list_push_back(&handler->upcalls, &upcall->list_node);
> +        if (handler->n_upcalls == 0) {
> +            handler->need_signal = true;
> +        }
> +        handler->n_upcalls++;
> +        if (handler->need_signal &&
> +            handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
> +            handler->need_signal = false;
> +            xpthread_cond_signal(&handler->wake_cond);
> +        }
> +        ovs_mutex_unlock(&handler->mutex);
> +        if (!VLOG_DROP_DBG(&rl)) {
> +            struct ds ds = DS_EMPTY_INITIALIZER;
> +
> +            odp_flow_key_format(upcall->dpif_upcall.key,
> +                                upcall->dpif_upcall.key_len,
> +                                &ds);
> +            VLOG_DBG("distributor: enqueue (%s)", ds_cstr(&ds));
> +            ds_destroy(&ds);
> +        }
> +
> +        /* Do not insert 'front' back to non_empty_list if the
> +         * list becomes empty. */
> +        ovs_mutex_lock(&udpif->mutex);
> +        list_pop_front(&udpif->non_empty_list);
> +        if (guarded_list_is_empty(queue)) {
> +            udpif->non_empty_list_map[front->value] = 0;
> +            free(front);
>          } else {
> -            ovs_mutex_unlock(&handler->mutex);
> -            COVERAGE_INC(upcall_queue_overflow);
> -            upcall_destroy(upcall);
> +            list_push_back(&udpif->non_empty_list, &front->list_node);
>          }
> +        ovs_mutex_unlock(&udpif->mutex);
>      }
>
>      for (n = 0; n < udpif->n_handlers; ++n) {
> @@ -569,6 +738,8 @@ recv_upcalls(struct udpif *udpif)
>              ovs_mutex_unlock(&handler->mutex);
>          }
>      }
> +
> +    memset(udpif->hol_block_map, '\0', UPCALL_QUEUES);
>  }
>
>  static struct flow_miss *
> --
> 1.7.9.5
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.openvswitch.org/pipermail/ovs-dev/attachments/20131119/321d9816/attachment-0003.html>


More information about the dev mailing list