[ovs-dev] [PATCH v2] ofproto-dpif-upcall: Batch upcalls.

YAMAMOTO Takashi yamamoto at valinux.co.jp
Fri Aug 30 03:24:17 UTC 2013


> Batching reduces overheads and enables upto 4 times the upcall processing
> performance in a specialized test case.
> 
> Signed-off-by: Jarno Rajahalme <jrajahalme at nicira.com>
> ---
> v2: Do not wake consumer threads if they are already keeping the queue
>     level low enough.

if the queue level is high, the consumer should already be busy working on
the queue, right?  why do you want to keep signaling them again and again?

the dispatcher needs to signal the consumer only if handler->n_upcalls
was 0 before incrementing.

    if (handler->n_upcalls == 0) {
        handler->need_signal = true;
    }
    handler->n_upcalls++;
    if (handler->need_signal && handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
        handler->need_signal = false;
        xpthread_cond_signal(&handler->wake_cond);
    }

YAMAMOTO Takashi

>     Be more conservative about locking while cond_signal.
> ---
>  ofproto/ofproto-dpif-upcall.c |   33 ++++++++++++++++++++++++++++-----
>  1 file changed, 28 insertions(+), 5 deletions(-)
> 
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index db78af3..22cdf1b 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -55,6 +55,8 @@ struct handler {
>      struct list upcalls OVS_GUARDED;
>      size_t n_upcalls OVS_GUARDED;
>  
> +    size_t n_new_upcalls;              /* Only changed by the dispatcher. */
> +
>      pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
>                                            'mutex'. */
>  };
> @@ -515,6 +517,10 @@ static void
>  recv_upcalls(struct udpif *udpif)
>  {
>      static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
> +    size_t n_udpif_new_upcalls = 0;
> +    struct handler *handler;
> +    int n;
> +
>      for (;;) {
>          struct upcall *upcall;
>          int error;
> @@ -535,7 +541,6 @@ recv_upcalls(struct udpif *udpif)
>          } else if (upcall->type == MISS_UPCALL) {
>              struct dpif_upcall *dupcall = &upcall->dpif_upcall;
>              uint32_t hash = udpif->secret;
> -            struct handler *handler;
>              struct nlattr *nla;
>              size_t n_bytes, left;
>  
> @@ -561,8 +566,11 @@ recv_upcalls(struct udpif *udpif)
>              ovs_mutex_lock(&handler->mutex);
>              if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
>                  list_push_back(&handler->upcalls, &upcall->list_node);
> -                handler->n_upcalls++;
> -                xpthread_cond_signal(&handler->wake_cond);
> +                handler->n_new_upcalls = ++handler->n_upcalls;
> +
> +                if (handler->n_new_upcalls >= FLOW_MISS_MAX_BATCH) {
> +                    xpthread_cond_signal(&handler->wake_cond);
> +                }
>                  ovs_mutex_unlock(&handler->mutex);
>                  if (!VLOG_DROP_DBG(&rl)) {
>                      struct ds ds = DS_EMPTY_INITIALIZER;
> @@ -581,10 +589,13 @@ recv_upcalls(struct udpif *udpif)
>          } else {
>              ovs_mutex_lock(&udpif->upcall_mutex);
>              if (udpif->n_upcalls < MAX_QUEUE_LENGTH) {
> -                udpif->n_upcalls++;
> +                n_udpif_new_upcalls = ++udpif->n_upcalls;
>                  list_push_back(&udpif->upcalls, &upcall->list_node);
>                  ovs_mutex_unlock(&udpif->upcall_mutex);
> -                seq_change(udpif->wait_seq);
> +
> +                if (n_udpif_new_upcalls >= FLOW_MISS_MAX_BATCH) {
> +                    seq_change(udpif->wait_seq);
> +                }
>              } else {
>                  ovs_mutex_unlock(&udpif->upcall_mutex);
>                  COVERAGE_INC(upcall_queue_overflow);
> @@ -592,6 +603,18 @@ recv_upcalls(struct udpif *udpif)
>              }
>          }
>      }
> +    for (n = 0; n < udpif->n_handlers; ++n) {
> +        handler = &udpif->handlers[n];
> +        if (handler->n_new_upcalls) {
> +            handler->n_new_upcalls = 0;
> +            ovs_mutex_lock(&handler->mutex);
> +            xpthread_cond_signal(&handler->wake_cond);
> +            ovs_mutex_unlock(&handler->mutex);
> +        }
> +    }
> +    if (n_udpif_new_upcalls) {
> +        seq_change(udpif->wait_seq);
> +    }
>  }
>  
>  static struct flow_miss *
> -- 
> 1.7.10.4
> 
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list