[ovs-dev] PATCH 1/1 : netdev-dpdk / add dpdk rings to netdev-dpdk

Rogers, Gerald gerald.rogers at intel.com
Wed May 7 15:49:19 UTC 2014


Pravin,

On the 1GB page question.  The patch simply allocates the rings and
creates a port.  You can allocate more than a single 1 GB page, the
restriction is that it would need 1GB pages.  The entire Huge Page
allocated by DPDK is passed to each Virtual Machine, and all VM¹s share
this memory.  There exists an API in DPDK to store the memory utilized by
the ring so that you can use pages smaller than 1GB.  Using this API, you
can allocate to a single VM only the memory associated with the ring and
the rings mbuf.  However, when using this API any traffic too and from the
VM will need to incur a copy.  The reason is that each VM would be
isolated from the others by which memory in the Huge Page Table mapping
they are given.  If you use just 1GB pages, then you can communicate
between VM¹s without incurring a copy penalty.  I chose not to implement
it using the API because I wanted the ivshmem to obtain the best
performance (ie. No copies).  A patch we are working to release in a
couple of weeks will support vHost within DPDK, thus eliminating the
isolation (ie. Shared memory) issues with ivshmem.  This vHost patch will
incur a performance penalty since copies are involved.  The vHost
implementation also won¹t have the 1GB page limitation.

As for the formatting issues, is there a ³c² pretty print formatter I can
run on ovs_client.c to make it conform to coding standards?

Thanks,

Gerald

On 5/6/14, 2:49 PM, "Pravin Shelar" <pshelar at nicira.com> wrote:

>Thanks for patch, It looks good, I have comments mostly related to coding
>style.
>
>Subject line should follow following format
>[PATCH <n>/<m>] <area>: <summary>
>
>On Thu, May 1, 2014 at 3:58 AM, Gerald Rogers <gerald.rogers at intel.com>
>wrote:
>> This patch enables the client dpdk rings within the netdev-dpdk.  It
>>adds
>> a new dpdk device called dpdkr (other naming suggestions?).  This allows
>> for the use of shared memory to communicate with other dpdk
>>applications,
>> on the host or within a virtual machine.  Instructions for use are in
>> INSTALL.DPDK.
>>
>> This has been tested on Intel multi-core platforms and with the client
>> application within the host.
>>
>>
>> Signed-off-by: Gerald Rogers <gerald.rogers at intel.com>
>> Signed-off-by: Gerald Rogers <gerald.rogers at intel.com>
>>
>Duplicate signed-off line.
>
>> diff --git a/INSTALL.DPDK b/INSTALL.DPDK
>> index 3e0247a..ec1b814 100644
>> --- a/INSTALL.DPDK
>> +++ b/INSTALL.DPDK
>> @@ -74,6 +74,44 @@ Once first DPDK port is added vswitchd, it creates
>>Polling thread and
>>  polls dpdk device in continuous loop. Therefore CPU utilization
>>  for that thread is always 100%.
>>
>> +DPDK Rings :
>> +
>> +Following the steps above to create a bridge, you can now add dpdk
>>rings
>> +as a port to the vswitch.  OVS will expect the DPDK ring device name to
>> +start with dpdkr and end with portid.
>> +
>> +    ovs-vsctl add-port br0 dpdkr0 -- set Interface dpdkr0 type=dpdkr
>> +
>> +DPDK rings client test application
>> +
>> +Included in the test directory is a sample DPDK application for testing
>> +the rings.  This is from the base dpdk directory and modified to work
>> +with the ring naming used within ovs.
>> +
>> +location tests/ovs_client
>> +
>> +To run the client :
>> +
>> +ovsclient -c 1 -n 4 --proc-type=secondary -- --n "port id you gave
>>dpdkr"
>> +
>> +It is essential to have --proc-type=secondary
>> +
>> +The application simply receives an mbuf on the receive queue of the
>> +ethernet ring and then places that same mbuf on the transmit ring of
>> +the ethernet ring.  It is a trivial loopback application.
>> +
>> +In addition to executing the client in the host, you can execute it
>>within
>> +a guest VM. To do so you will need a patched qemu.  You can download
>>the
>> +patch and getting started guid at :
>> +
>> +https://01.org/packet-processing/downloads
>> +
>> +A general rule of thumb for better performance is that the client
>> +application shouldn't be assigned the same dpdk core mask "-c" as
>> +the vswitchd.
>> +
>> +
>> +
>>  Restrictions:
>>  -------------
>>
>> @@ -86,6 +124,11 @@ Restrictions:
>>      device queues configured.
>>    - Work with 1500 MTU, needs few changes in DPDK lib to fix this
>>issue.
>>    - Currently DPDK port does not make use any offload functionality.
>> +  ivshmem
>> +  - The shared memory is currently restricted to the use of a 1GB
>> +   huge pages.
>I am not sure why do we have this limit.
>
>> +  - All huge pages are shared amongst the host, clients, virtual
>> +   machines etc.
>>
>>  Bug Reporting:
>>  --------------
>> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
>> index fd991ab..a90913f 100644
>> --- a/lib/netdev-dpdk.c
>> +++ b/lib/netdev-dpdk.c
>> @@ -148,6 +148,23 @@ struct dpdk_tx_queue {
>>      struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
>>  };
>>
>> +/* dpdk has no way to remove dpdk ring ethernet devices
>> +   so we have to keep them around once they've been created
>> +*/
>> +
>> +static struct list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
>> +    = LIST_INITIALIZER(&dpdk_ring_list);
>> +
>> +struct dpdk_ring {
>> +    /* For the client rings */
>> +    struct rte_ring * cring_tx;
>> +    struct rte_ring * cring_rx;
>No need to have space after *.
>
>> +    int port_id; /* dpdkr port id */
>> +    int eth_port_id; /* ethernet device port id */
>> +    struct list list_node OVS_GUARDED_BY(mutex);
>> +
>> +};
>Extra blank line.
>> +
>>  struct netdev_dpdk {
>>      struct netdev up;
>>      int port_id;
>> @@ -167,9 +184,14 @@ struct netdev_dpdk {
>>      uint8_t hwaddr[ETH_ADDR_LEN];
>>      enum netdev_flags flags;
>>
>> +
>>      struct rte_eth_link link;
>>      int link_reset_cnt;
>>
>> +    /* If a shared memory ring */
>> +
>> +    struct dpdk_ring *ivshmem;
>> +
>>      /* In dpdk_list. */
>>      struct list list_node OVS_GUARDED_BY(dpdk_mutex);
>>  };
>> @@ -571,6 +593,7 @@ dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
>>      if (txq->count == 0) {
>>          return;
>>      }
>> +
>>      rte_spinlock_lock(&txq->tx_lock);
>>      nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
>>txq->count);
>>      if (nb_tx != txq->count) {
>> @@ -593,6 +616,7 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_,
>>struct ofpbuf **packets, int *c)
>>
>>      dpdk_queue_flush(dev, rxq_->queue_id);
>>
>> +
>>      nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
>>                               (struct rte_mbuf **) packets,
>>MAX_RX_QUEUE_LEN);
>>      if (!nb_rx) {
>> @@ -1170,6 +1194,200 @@ static struct netdev_class netdev_dpdk_class = {
>>      NULL,                       /* rxq_drain */
>>  };
>>
>> +/* Client Rings */
>> +
>> +static int
>> +dpdk_classr_init(void)
>> +{
>> +  VLOG_INFO("Initialized dpdk client handlers:\n");
>> +  return 0;
>> +}
>> +
>> +static int
>> +netdev_dpdkr_construct(struct netdev *netdev_)
>> +{
>> +    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
>> +    struct dpdk_ring *ivshmem;
>> +    unsigned int port_no;
>> +    char *cport;
>> +    int err=0;
>> +    int found=0;
>> +    char name[10];
>We can move name-definition to if-else block where is used.
>
>> +
>> +
>Extra blank line.
>> +    if (rte_eal_init_ret) {
>> +        return rte_eal_init_ret;
>> +    }
>> +
>> +    ovs_mutex_lock(&dpdk_mutex);
>> +    cport = netdev_->name + 5; /* Names always start with "dpdkr" */
>> +
>> +    if (strncmp(netdev_->name, "dpdkr", 5)) {
>> +        VLOG_ERR("Not a valid dpdkr name %s:\n",netdev_->name);
>> +        err = ENODEV;
>> +        goto unlock_dpdk;
>> +    }
>> +
>> +    port_no = strtol(cport, 0, 0); /* string must be null terminated */
>> +
>> +    ovs_mutex_init(&netdev->mutex);
>> +
>> +    ovs_mutex_lock(&netdev->mutex);
>> +    netdev->flags = 0;
>> +
>> +    netdev->mtu = ETHER_MTU;
>> +    netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
>> +
>> +    /* TODO: need to discover device node at run time. */
>> +    netdev->socket_id = SOCKET0;
>> +
>> +    netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
>> +    if (!netdev->dpdk_mp) {
>> +        VLOG_ERR("Unable to get memory pool\n");
>> +        err = ENOMEM;
>> +        goto unlock_dev;
>> +    }
>> +
>> +     /* look through our list to find the device */
>> +     LIST_FOR_EACH(ivshmem, list_node, &dpdk_ring_list){
>> +         if (ivshmem->port_id == port_no) {
>> +            VLOG_INFO("Found dpdk ring device %s:\n", netdev_->name);
>> +            found = 1;
>> +            break;
>> +         }
>> +    }
>> +    if (found) {
>> +        netdev->ivshmem = ivshmem;
>
>"netdev->ivshmem" is only assigned not used anywhere in code.
>> +        netdev->port_id = ivshmem->eth_port_id; /* really all that is
>>needed */
>> +    }
>> +    else {
>> +        /* Need to create the device rings */
>> +
>> +        ivshmem = dpdk_rte_mzalloc(sizeof *ivshmem);
>> +        if (ivshmem == NULL)
>> +            goto unlock_dev;
>> +
>> +        snprintf(name,10,"%s_tx",netdev_->name);
>> +        ivshmem->cring_tx = rte_ring_create(name, MAX_RX_QUEUE_LEN,
>>SOCKET0, 0);
>> +        if (ivshmem->cring_tx == NULL)
>> +            goto unlock_dev;
>> +
>> +        snprintf(name,10,"%s_rx",netdev_->name);
>> +        ivshmem->cring_rx = rte_ring_create(name, MAX_RX_QUEUE_LEN,
>>SOCKET0, 0);
>Do we need to relation between ring size when we create device and
>when we open it?
>
>> +        if (ivshmem->cring_rx == NULL)
>> +            goto unlock_dev;
>Need to free memory in error case.
>> +
>> +        err = rte_eth_from_rings(&ivshmem->cring_rx, 1,
>>&ivshmem->cring_tx, 1, SOCKET0);
>> +        if (err < 0) {
>> +           goto unlock_dev;
>> +         }
>Extra space
>> +
>> +        ivshmem->port_id = port_no;
>> +        ivshmem->eth_port_id = netdev->port_id = rte_eth_dev_count()-1
>>;
>> +
>> +        netdev->ivshmem=ivshmem;
>> +
>Need space.
>
>> +        list_push_back(&dpdk_ring_list, &ivshmem->list_node);
>> +
>> +    }
>> +
>> +    err = dpdk_eth_dev_init(netdev);
>> +    if (err) {
>> +        goto unlock_dev;
>> +    }
>> +
>> +    list_push_back(&dpdk_list, &netdev->list_node);
>> +
>> +    err = 0;
>> +unlock_dev:
>> +    ovs_mutex_unlock(&netdev->mutex);
>> +unlock_dpdk:
>> +    ovs_mutex_unlock(&dpdk_mutex);
>> +    return err;
>> +}
>> +
>> +static void
>> +netdev_dpdkr_destruct(struct netdev *netdev_)
>> +{
>> +    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
>> +
>> +    ovs_mutex_lock(&dev->mutex);
>> +    rte_eth_dev_stop(dev->port_id);
>> +
>> +    ovs_mutex_unlock(&dev->mutex);
>> +
>> +    ovs_mutex_lock(&dpdk_mutex);
>> +    list_remove(&dev->list_node);
>> +    dpdk_mp_put(dev->dpdk_mp);
>> +    ovs_mutex_unlock(&dpdk_mutex);
>> +
>> +    ovs_mutex_destroy(&dev->mutex);
>> +}
>> +
>> +static struct netdev_class netdev_dpdkr_class = {
>> +    "dpdkr",
>> +    dpdk_classr_init,            /* init */
>> +    NULL,                       /* netdev_dpdk_run */
>> +    NULL,                       /* netdev_dpdk_wait */
>> +
>> +    netdev_dpdk_alloc,
>> +    netdev_dpdkr_construct,
>> +    netdev_dpdkr_destruct,
>> +    netdev_dpdk_dealloc,
>> +    netdev_dpdk_get_config,
>> +    NULL,                       /* netdev_dpdk_set_config */
>> +    NULL,                       /* get_tunnel_config */
>> +
>> +    netdev_dpdk_send,           /* send */
>> +    NULL,                       /* send_wait */
>> +
>> +    netdev_dpdk_set_etheraddr,
>> +    netdev_dpdk_get_etheraddr,
>> +    netdev_dpdk_get_mtu,
>> +    netdev_dpdk_set_mtu,
>> +    netdev_dpdk_get_ifindex,
>> +    netdev_dpdk_get_carrier,
>> +    netdev_dpdk_get_carrier_resets,
>> +    netdev_dpdk_set_miimon,
>> +    netdev_dpdk_get_stats,
>> +    netdev_dpdk_set_stats,
>> +    netdev_dpdk_get_features,
>> +    NULL,                       /* set_advertisements */
>> +
>> +    NULL,                       /* set_policing */
>> +    NULL,                       /* get_qos_types */
>> +    NULL,                       /* get_qos_capabilities */
>> +    NULL,                       /* get_qos */
>> +    NULL,                       /* set_qos */
>> +    NULL,                       /* get_queue */
>> +    NULL,                       /* set_queue */
>> +    NULL,                       /* delete_queue */
>> +    NULL,                       /* get_queue_stats */
>> +    NULL,                       /* queue_dump_start */
>> +    NULL,                       /* queue_dump_next */
>> +    NULL,                       /* queue_dump_done */
>> +    NULL,                       /* dump_queue_stats */
>> +
>> +    NULL,                       /* get_in4 */
>> +    NULL,                       /* set_in4 */
>> +    NULL,                       /* get_in6 */
>> +    NULL,                       /* add_router */
>> +    NULL,                       /* get_next_hop */
>> +    netdev_dpdk_get_status,
>> +    NULL,                       /* arp_lookup */
>> +
>> +    netdev_dpdk_update_flags,
>> +
>> +    netdev_dpdk_rxq_alloc,
>> +    netdev_dpdk_rxq_construct,
>> +    netdev_dpdk_rxq_destruct,
>> +    netdev_dpdk_rxq_dealloc,
>> +    netdev_dpdk_rxq_recv,
>> +    NULL,                       /* rx_wait */
>> +    NULL,                       /* rxq_drain */
>> +};
>> +
>> +
>We can avoid dup definition, checkout TUNNEL_CLASS() in netdev-vport.c
>
>>  int
>>  dpdk_init(int argc, char **argv)
>>  {
>> @@ -1196,6 +1414,7 @@ void
>>  netdev_dpdk_register(void)
>>  {
>>      netdev_register_provider(&netdev_dpdk_class);
>> +    netdev_register_provider(&netdev_dpdkr_class);
>>  }
>>
>>  int
>> diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
>> index 2807310..8cb715a 100644
>> --- a/lib/netdev-dpdk.h
>> +++ b/lib/netdev-dpdk.h
>> @@ -10,6 +10,7 @@
>>  #include <rte_eal.h>
>>  #include <rte_debug.h>
>>  #include <rte_ethdev.h>
>> +#include <rte_eth_ring.h>
>>  #include <rte_errno.h>
>>  #include <rte_memzone.h>
>>  #include <rte_memcpy.h>
>> diff --git a/lib/netdev.c b/lib/netdev.c
>> index 2fc1834..610de13 100644
>> --- a/lib/netdev.c
>> +++ b/lib/netdev.c
>> @@ -98,7 +98,7 @@ netdev_n_rxq(const struct netdev *netdev)
>>  bool
>>  netdev_is_pmd(const struct netdev *netdev)
>>  {
>> -    return !strcmp(netdev->netdev_class->type, "dpdk");
>> +    return !strncmp(netdev->netdev_class->type, "dpdk",4);
>>  }
>It would be easier to understand if dkdpr is explicitly checked.
>
>>
>>  static void
>> diff --git a/tests/automake.mk b/tests/automake.mk
>> index bf80702..0067812 100644
>> --- a/tests/automake.mk
>> +++ b/tests/automake.mk
>> @@ -204,6 +204,11 @@ tests/idltest.ovsidl: $(IDLTEST_IDL_FILES)
>>
>>  tests/idltest.c: tests/idltest.h
>>
>> +noinst_PROGRAMS += tests/ovsclient
>> +tests_ovsclient_SOURCES = \
>> +       tests/ovs_client/ovs_client.c
>> +tests_ovsclient_LDADD = lib/libopenvswitch.la $(LIBS)
>> +
>>  noinst_PROGRAMS += tests/ovstest
>>  tests_ovstest_SOURCES = \
>>         tests/ovstest.c \
>> diff --git a/tests/ovs_client/ovs_client.c
>>b/tests/ovs_client/ovs_client.c
>> new file mode 100644
>> index 0000000..07f49ed
>> --- /dev/null
>> +++ b/tests/ovs_client/ovs_client.c
>This file need lot of coding style fixes.
>
>> @@ -0,0 +1,217 @@
>> +/*
>> + *   BSD LICENSE
>> + *
>> + *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
>> + *   All rights reserved.
>> + *
>> + *   Redistribution and use in source and binary forms, with or without
>> + *   modification, are permitted provided that the following conditions
>> + *   are met:
>> + *
>> + *     * Redistributions of source code must retain the above copyright
>> + *       notice, this list of conditions and the following disclaimer.
>> + *     * Redistributions in binary form must reproduce the above
>>copyright
>> + *       notice, this list of conditions and the following disclaimer
>>in
>> + *       the documentation and/or other materials provided with the
>> + *       distribution.
>> + *     * Neither the name of Intel Corporation nor the names of its
>> + *       contributors may be used to endorse or promote products
>>derived
>> + *       from this software without specific prior written permission.
>> + *
>> + *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
>>CONTRIBUTORS
>> + *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
>> + *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
>>FOR
>> + *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
>>COPYRIGHT
>> + *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
>>INCIDENTAL,
>> + *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
>> + *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
>>USE,
>> + *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
>>ANY
>> + *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
>>TORT
>> + *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
>>USE
>> + *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
>>DAMAGE.
>> + *
>> + */
>> +
>> +#include <getopt.h>
>> +
>> +#include <config.h>
>> +#include <rte_config.h>
>> +#include <rte_mbuf.h>
>> +#include <rte_ether.h>
>> +#include <rte_string_fns.h>
>> +#include <rte_ip.h>
>> +#include <rte_byteorder.h>
>> +
>> +/* Number of packets to attempt to read from queue */
>> +#define PKT_READ_SIZE  ((uint16_t)32)
>> +
>> +/* define common names for structures shared between ovs_dpdk and
>>client */
>> +#define MP_CLIENT_RXQ_NAME "dpdkr%u_tx"
>> +#define MP_CLIENT_TXQ_NAME "dpdkr%u_rx"
>> +
>> +#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1
>> +
>> +#define BASE_10 10
>> +
>> +/* our client id number - tells us which rx queue to read, and tx
>> + * queue to write to. */
>> +static uint8_t client_id = 0;
>> +
>> +int no_pkt;
>> +int pkt;
>> +
>> +/*
>> + * Given the rx queue name template above, get the queue name
>> + */
>> +static inline const char *
>> +get_rx_queue_name(unsigned id)
>> +{
>> +       /* buffer for return value. Size calculated by %u being replaced
>> +        * by maximum 3 digits (plus an extra byte for safety) */
>> +       static char buffer[sizeof(MP_CLIENT_RXQ_NAME) + 2];
>> +
>> +       rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_RXQ_NAME,
>>id);
>> +       return buffer;
>> +}
>> +
>> +/*
>> + * Given the tx queue name template above, get the queue name
>> + */
>> +static inline const char *
>> +get_tx_queue_name(unsigned id)
>> +{
>> +       /* buffer for return value. Size calculated by %u being replaced
>> +        * by maximum 3 digits (plus an extra byte for safety) */
>> +       static char buffer[sizeof(MP_CLIENT_TXQ_NAME) + 2];
>> +
>> +       rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_TXQ_NAME,
>>id);
>> +       return buffer;
>> +}
>> +
>> +/*
>> + * print a usage message
>> + */
>> +static void
>> +usage(const char *progname)
>> +{
>> +       printf("\nUsage: %s [EAL args] -- -n <client_id>\n", progname);
>> +}
>> +
>> +/*
>> + * Convert the client id number from a string to an int.
>> + */
>> +static int
>> +parse_client_num(const char *client)
>> +{
>> +       char *end = NULL;
>> +       unsigned long temp = 0;
>> +
>> +       if (client == NULL || *client == '\0')
>> +               return -1;
>> +
>> +       temp = strtoul(client, &end, BASE_10);
>> +       /* If valid string argument is provided, terminating '/0'
>>character
>> +        * is stored in 'end' */
>> +       if (end == NULL || *end != '\0')
>> +               return -1;
>> +
>> +       client_id = (uint8_t)temp;
>> +       return 0;
>> +}
>> +
>> +/*
>> + * Parse the application arguments to the client app.
>> + */
>> +static int
>> +parse_app_args(int argc, char *argv[])
>> +{
>> +       int option_index = 0, opt = 0;
>> +       char **argvopt = argv;
>> +       const char *progname = NULL;
>> +       static struct option lgopts[] = {
>> +               {NULL, 0, 0, 0 }
>> +       };
>> +       progname = argv[0];
>> +
>> +       while ((opt = getopt_long(argc, argvopt, "n:s:d:", lgopts,
>> +               &option_index)) != EOF){
>> +               switch (opt){
>> +                       case 'n':
>> +                               if (parse_client_num(optarg) != 0){
>> +                                       usage(progname);
>> +                                       return -1;
>> +                               }
>> +                               break;
>> +                       default:
>> +                               usage(progname);
>> +                               return -1;
>> +               }
>> +       }
>> +
>> +       return 0;
>> +}
>> +
>> +/*
>> + * Application main function - loops through
>> + * receiving and processing packets. Never returns
>> + */
>> +int
>> +main(int argc, char *argv[])
>> +{
>> +       struct rte_ring *rx_ring = NULL;
>> +       struct rte_ring *tx_ring = NULL;
>> +       int retval = 0;
>> +       void *pkts[PKT_READ_SIZE];
>> +       int rslt = 0;
>> +
>> +       if ((retval = rte_eal_init(argc, argv)) < 0)
>> +               return -1;
>> +
>> +       argc -= retval;
>> +       argv += retval;
>> +
>> +       if (parse_app_args(argc, argv) < 0)
>> +               rte_exit(EXIT_FAILURE, "Invalid command-line
>>arguments\n");
>> +
>> +       rx_ring = rte_ring_lookup(get_rx_queue_name(client_id));
>> +       if (rx_ring == NULL)
>> +               rte_exit(EXIT_FAILURE, "Cannot get RX ring - is server
>>process running?\n");
>> +
>> +       tx_ring = rte_ring_lookup(get_tx_queue_name(client_id));
>> +       if (tx_ring == NULL)
>> +               rte_exit(EXIT_FAILURE, "Cannot get TX ring - is server
>>process running?\n");
>> +
>> +       RTE_LOG(INFO, APP, "Finished Process Init.\n");
>> +
>> +       printf("\nClient process %d handling packets\n", client_id);
>> +       printf("[Press Ctrl-C to quit ...]\n");
>> +
>> +       for (;;) {
>> +               unsigned rx_pkts = PKT_READ_SIZE;
>> +
>> +               /* try dequeuing max possible packets first, if that
>>fails, get the
>> +                * most we can. Loop body should only execute once,
>>maximum */
>> +               while (rx_pkts > 0 &&
>> +                               unlikely(rte_ring_dequeue_bulk(rx_ring,
>>pkts, rx_pkts) != 0))
>> +                       rx_pkts =
>>(uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
>> +
>> +                if(rx_pkts>0)
>> +                {
>> +
>> +                    pkt++;
>> +
>> +                   /* blocking enqueue */
>> +                   do {
>> +                           rslt = rte_ring_enqueue_bulk(tx_ring, pkts,
>>rx_pkts);
>> +                   } while (rslt == -ENOBUFS);
>> +                }
>> +                else
>> +                   no_pkt++;
>> +
>> +               if (!(pkt %  100000))
>> +               {
>> +                     printf("pkt %d %d\n", pkt, no_pkt);
>> +                     pkt=no_pkt=0;
>> +               }
>> +       }
>> +}
>> _______________________________________________
>> dev mailing list
>> dev at openvswitch.org
>> http://openvswitch.org/mailman/listinfo/dev




More information about the dev mailing list