[ovs-dev] [PATCH V2] ofproto-dpif-upcall: Remove the dispatcher thread.

Han Zhou zhouhan at gmail.com
Sat Feb 15 16:44:04 UTC 2014


Hi Alex,

Here comes my full review:

On Fri, 2014-02-07 at 17:17 -0800, Alex Wang wrote:

> diff --git a/lib/dpif-linux.c b/lib/dpif-linux.c
>  
> +/* Adds 'dpif->n_handlers' channels to vport.  If upcall_pids is non-NULL,
> + * makes it point to the array of channel socket pids. */
>  static int
> -add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)
> +add_vport_channels(struct dpif_linux *dpif, odp_port_t port_no,
> +                   uint32_t **upcall_pids)
>  {
>      struct epoll_event event;
> +    struct dpif_epoll *epolls = dpif->epolls;
>      uint32_t port_idx = odp_to_u32(port_no);
> +    uint32_t *pids = NULL;

The variable pids is never used, but freed.

> +    int error = 0;
> +    size_t i, j;
>  
> -    if (dpif->epoll_fd < 0) {
> +    if (epolls == NULL) {
>          return 0;
>      }
>  
>      /* We assume that the datapath densely chooses port numbers, which
> -     * can therefore be used as an index into an array of channels. */
> +     * can therefore be used as an index into dpif->channels. */
>      if (port_idx >= dpif->uc_array_size) {
>          uint32_t new_size = port_idx + 1;
> -        uint32_t i;
>  
>          if (new_size > MAX_PORTS) {
>              VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
> @@ -332,49 +318,117 @@ add_channel(struct dpif_linux *dpif, odp_port_t port_no, struct nl_sock *sock)
>          dpif->channels = xrealloc(dpif->channels,
>                                    new_size * sizeof *dpif->channels);
>          for (i = dpif->uc_array_size; i < new_size; i++) {
> -            dpif->channels[i].sock = NULL;
> +            dpif->channels[i] = NULL;
> +        }
> +        for (i = 0; i < dpif->n_handlers; i++) {
> +            epolls[i].epoll_events = xrealloc(epolls[i].epoll_events,
> +                                              new_size * sizeof
> +                                              *epolls[i].epoll_events);
>          }
> -
> -        dpif->epoll_events = xrealloc(dpif->epoll_events,
> -                                      new_size * sizeof *dpif->epoll_events);
>          dpif->uc_array_size = new_size;
>      }
>  
>      memset(&event, 0, sizeof event);
>      event.events = EPOLLIN;
>      event.data.u32 = port_idx;
> -    if (epoll_ctl(dpif->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> -                  &event) < 0) {
> -        return errno;
> -    }
>  
> -    nl_sock_destroy(dpif->channels[port_idx].sock);
> -    dpif->channels[port_idx].sock = sock;
> -    dpif->channels[port_idx].last_poll = LLONG_MIN;
> +    /* Creates channel for each upcall handler. */
> +    dpif->channels[port_idx] = xzalloc(dpif->n_handlers
> +                                       * sizeof *dpif->channels[port_idx]);
> +    for (i = 0; i < dpif->n_handlers; i++) {
> +        struct nl_sock *sock = NULL;
> +
> +        error = nl_sock_create(NETLINK_GENERIC, &sock);
> +        if (error) {
> +            goto error;
> +        }
> +
> +        if (epoll_ctl(epolls[i].epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
> +                      &event) < 0) {
> +            error = errno;
> +            goto error;

If this epoll_ctl fails, the sock created above is not destroyed in
error handling.

> +        }
> +        dpif->channels[port_idx][i].sock = sock;
> +        dpif->channels[port_idx][i].last_poll = LLONG_MIN;
> +    }
> +    channels_to_pids(dpif->channels[port_idx], dpif->n_handlers, upcall_pids);
>  
>      return 0;
> +
> +error:
> +    /* Cleans up the already created channel and socks. */
> +    for (j = 0; j < i; j++) {
> +        nl_sock_destroy(dpif->channels[port_idx][j].sock);
> +    }
> +    free(pids);
> +    free(dpif->channels[port_idx]);
> +    dpif->channels[port_idx] = NULL;
> +
> +    return error;
>  }
>  

...

> +static void
> +destroy_all_channels(struct dpif_linux *dpif)
> +{
> +    unsigned int i;
> +
> +    if (!dpif->epolls) {
>          return;
>      }
>  
> -    epoll_ctl(dpif->epoll_fd, EPOLL_CTL_DEL, nl_sock_fd(ch->sock), NULL);
> -    dpif->event_offset = dpif->n_events = 0;
> +    for (i = 0; i < dpif->uc_array_size; i++ ) {
> +        struct dpif_linux_vport vport_request;
> +        struct dpif_channel *ch = dpif->channels[i];
> +
> +        if (!ch->sock) {

The condition here should be changed to "if (!ch)" because ch is now an
array of channels.

> +            continue;
> +        }

...


>  
> @@ -500,20 +551,12 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
>                                                    namebuf, sizeof namebuf);
>      const char *type = netdev_get_type(netdev);
>      struct dpif_linux_vport request, reply;
> -    struct nl_sock *sock = NULL;
> -    uint32_t upcall_pid;
>      struct ofpbuf *buf;
>      uint64_t options_stub[64 / 8];
>      struct ofpbuf options;
> +    uint32_t *upcall_pids = NULL;
>      int error;
>  
> -    if (dpif->epoll_fd >= 0) {
> -        error = nl_sock_create(NETLINK_GENERIC, &sock);
> -        if (error) {
> -            return error;
> -        }
> -    }
> -
>      dpif_linux_vport_init(&request);
>      request.cmd = OVS_VPORT_CMD_NEW;
>      request.dp_ifindex = dpif->dp_ifindex;
> @@ -522,7 +565,6 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
>          VLOG_WARN_RL(&error_rl, "%s: cannot create port `%s' because it has "
>                       "unsupported type `%s'",
>                       dpif_name(dpif_), name, type);
> -        nl_sock_destroy(sock);
>          return EINVAL;
>      }
>      request.name = name;
> @@ -541,41 +583,42 @@ dpif_linux_port_add__(struct dpif *dpif_, struct netdev *netdev,
>      }
>  
>      request.port_no = *port_nop;
> -    upcall_pid = sock ? nl_sock_pid(sock) : 0;
> -    request.upcall_pid = &upcall_pid;
> +    request.upcall_pids = NULL;
>  
>      error = dpif_linux_vport_transact(&request, &reply, &buf);
>      if (!error) {
>          *port_nop = reply.port_no;
> -        VLOG_DBG("%s: assigning port %"PRIu32" to netlink pid %"PRIu32,
> -                 dpif_name(dpif_), reply.port_no, upcall_pid);
>      } else {
>          if (error == EBUSY && *port_nop != ODPP_NONE) {
>              VLOG_INFO("%s: requested port %"PRIu32" is in use",
>                        dpif_name(dpif_), *port_nop);
>          }
> -        nl_sock_destroy(sock);
>          ofpbuf_delete(buf);
>          return error;
>      }
>      ofpbuf_delete(buf);
>  
> -    if (sock) {
> -        error = add_channel(dpif, *port_nop, sock);
> -        if (error) {
> -            VLOG_INFO("%s: could not add channel for port %s",
> -                      dpif_name(dpif_), name);
> +    if (add_vport_channels(dpif, *port_nop, &upcall_pids)) {
> +        VLOG_INFO("%s: could not add channel for port %s",
> +                  dpif_name(dpif_), name);
>  
> -            /* Delete the port. */
> -            dpif_linux_vport_init(&request);
> -            request.cmd = OVS_VPORT_CMD_DEL;
> -            request.dp_ifindex = dpif->dp_ifindex;
> -            request.port_no = *port_nop;
> -            dpif_linux_vport_transact(&request, NULL, NULL);
> +        /* Delete the port. */
> +        dpif_linux_vport_init(&request);
> +        request.cmd = OVS_VPORT_CMD_DEL;
> +        request.dp_ifindex = dpif->dp_ifindex;
> +        request.port_no = *port_nop;
> +        dpif_linux_vport_transact(&request, NULL, NULL);
>  
> -            nl_sock_destroy(sock);
> -            return error;
> -        }
> +        return error;
> +    } else {
> +        dpif_linux_vport_init(&request);
> +        request.cmd = OVS_VPORT_CMD_SET;
> +        request.dp_ifindex = dpif->dp_ifindex;
> +        request.port_no = *port_nop;
> +        request.n_pids = dpif->n_handlers;
> +        request.upcall_pids = upcall_pids;
> +        dpif_linux_vport_transact(&request, NULL, NULL);
> +        free(upcall_pids);
>      }
>  
>      return 0;

In this function, you separated vport transaction for creating vports
and setting channel pids to kernel. But it could lead to problem when
kernel started sending upcalls to user space when vports are added but
channel pids not updated. I think it should still be kept as a single
transaction to avoid race condition. 

...

>  
>  static uint32_t
> -dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)
> +dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
> +                        uint32_t hash)
I suggest to use struct flow instead of hash as the parameter, because
the handler mapping is dpif implementation specific. it is dpif
implementation that determines how to utilize flow information for pid
selection. E.g. dpif-XXX may use different hash algorithm than the
flow_hash_5tuple().

>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      uint32_t port_idx = odp_to_u32(port_no);
>      uint32_t pid = 0;
>  
> -    ovs_mutex_lock(&dpif->upcall_lock);
> -    if (dpif->epoll_fd >= 0) {
> +    fat_rwlock_wrlock(&dpif->upcall_lock);

Should this be fat_rwlock_rdlock? I don't see updates in below critical
section.

> +    if (dpif->epolls) {
>          /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
>           * channel, since it is not heavily loaded. */
>          uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
> -        const struct nl_sock *sock = dpif->channels[idx].sock;
> -        pid = sock ? nl_sock_pid(sock) : 0;
> +        const struct dpif_channel *ch = dpif->channels[idx];
> +
> +        if (ch) {
> +            pid = ch[hash % dpif->n_handlers].sock
> +                  ? nl_sock_pid(ch[hash % dpif->n_handlers].sock) : 0;
> +        }
>      }
> -    ovs_mutex_unlock(&dpif->upcall_lock);
> +    fat_rwlock_unlock(&dpif->upcall_lock);
>  
>      return pid;
>  }

...

> @@ -1274,10 +1322,10 @@ dpif_linux_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
>  }
>  
>  /* Synchronizes 'dpif->channels' with the set of vports currently in 'dpif' in
> - * the kernel, by adding a new channel for any kernel vport that lacks one and
> - * deleting any channels that have no backing kernel vports. */
> + * the kernel, by adding a new set of channels for any kernel vport that lacks
> + * one and deleting any channels that have no backing kernel vports. */
>  static int
> -dpif_linux_refresh_channels(struct dpif *dpif_)
> +dpif_linux_refresh_channels(struct dpif *dpif_, uint32_t n_handlers)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>      unsigned long int *keep_channels;
> @@ -1287,52 +1335,63 @@ dpif_linux_refresh_channels(struct dpif *dpif_)
>      int retval = 0;
>      size_t i;
>  
> -    /* To start with, we need an epoll fd. */
> -    if (dpif->epoll_fd < 0) {
> -        dpif->epoll_fd = epoll_create(10);
> -        if (dpif->epoll_fd < 0) {
> -            return errno;
> +    if (dpif->n_handlers != n_handlers) {
> +        destroy_all_channels(dpif);
> +        dpif->epolls = xzalloc(n_handlers * sizeof *dpif->epolls);
> +
> +        for (i = 0; i < n_handlers; i++) {
> +            dpif->epolls[i].epoll_fd = epoll_create(dpif->uc_array_size ?
> +                                                    dpif->uc_array_size : 10);
> +            if (dpif->epolls[i].epoll_fd < 0) {
> +                return errno;
> +            }
>          }
> +        dpif->n_handlers = n_handlers;
> +    }

Could it be more graceful when changing n_handlers? Just enlarge/shrink
the channels arrays to minimize the traffic interruption when thread
number is being changed?

...

>  
>  static int
> -dpif_linux_recv_set__(struct dpif *dpif_, bool enable)
> +dpif_linux_recv_set__(struct dpif *dpif_, bool enable, uint32_t n_handlers)
>  {
>      struct dpif_linux *dpif = dpif_linux_cast(dpif_);
>  
> -    if ((dpif->epoll_fd >= 0) == enable) {
> +    if ((dpif->epolls != NULL) == enable) {
> +        if (enable && dpif->n_handlers != n_handlers) {
> +            dpif_linux_refresh_channels(dpif_, n_handlers);

No return error checking. It is a little weird to do the refreshing
here. Other dpif implementation would not know to do this in the
interface dpif_recv_set. Please see my next comment. 

> +        }
>          return 0;
>      } else if (!enable) {
> -        destroy_channels(dpif);
> +        destroy_all_channels(dpif);
>          return 0;
>      } else {
> -        return dpif_linux_refresh_channels(dpif_);
> +        return dpif_linux_refresh_channels(dpif_, n_handlers);
>      }
>  }
>  

...

> diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
> index 7b3e1eb..a53b3fd 100644
> --- a/ofproto/ofproto-dpif.c
> +++ b/ofproto/ofproto-dpif.c
> @@ -470,7 +470,8 @@ type_run(const char *type)
>  
>          backer->recv_set_enable = true;
>  
> -        error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
> +        error = dpif_recv_set(backer->dpif, backer->recv_set_enable,
> +                              n_handlers);
>          if (error) {
>              VLOG_ERR("Failed to enable receiving packets in dpif.");
>              return error;
> @@ -480,6 +481,7 @@ type_run(const char *type)
>      }
>  
>      if (backer->recv_set_enable) {
> +        dpif_recv_set(backer->dpif, backer->recv_set_enable, n_handlers);
>          udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
>      }
>  

The logic of calling dpif_recv_set() here is somehow misleading. I
understand that it is only for refreshing dpif->n_handlers, rather than
enable/disable receiving. But would it be better to introduce a new
interface in dpif to clearly update dpif->n_handler, which can be
invoked in udpif_set_threads(). 

These are all my findings (plus the rcu issue in my previous email).
Otherwise, it looks good to me. And it really simplifies the
upcall_handler a lot! Would like to know your performance test results.
Thanks! 

Best regards,
Han




More information about the dev mailing list