[ovs-dev] netdev-dpdk / add dpdk rings to netdev-dpdk

gerald.rogers at intel.com gerald.rogers at intel.com
Mon Jun 23 11:55:03 UTC 2014


Shared memory ring patch

This patch enables the client dpdk rings within the netdev-dpdk.  It adds
a new dpdk device called dpdkr (other naming suggestions?).  This allows
for the use of shared memory to communicate with other dpdk applications,
on the host or within a virtual machine.  Instructions for use are in
INSTALL.DPDK.

This has been tested on Intel multi-core platforms and with the client
application within the host.

Signed-off-by: Gerald Rogers <gerald.rogers at intel.com>

diff --git a/INSTALL.DPDK b/INSTALL.DPDK
index 2a6d7ef..6a8b565 100644
--- a/INSTALL.DPDK
+++ b/INSTALL.DPDK
@@ -190,6 +190,44 @@ The core 23 is left idle, which allows core 7 to run at full rate.
 
 Future changes may change the need for cpu core affinitization.
 
+DPDK Rings :
+
+Following the steps above to create a bridge, you can now add dpdk rings
+as a port to the vswitch.  OVS will expect the DPDK ring device name to
+start with dpdkr and end with portid.
+
+    ovs-vsctl add-port br0 dpdkr0 -- set Interface dpdkr0 type=dpdkr
+
+DPDK rings client test application
+
+Included in the test directory is a sample DPDK application for testing
+the rings.  This is from the base dpdk directory and modified to work
+with the ring naming used within ovs.
+
+location tests/ovs_client
+
+To run the client :
+
+ovsclient -c 1 -n 4 --proc-type=secondary -- --n "port id you gave dpdkr"
+
+It is essential to have --proc-type=secondary
+
+The application simply receives an mbuf on the receive queue of the 
+ethernet ring and then places that same mbuf on the transmit ring of 
+the ethernet ring.  It is a trivial loopback application.
+
+In addition to executing the client in the host, you can execute it within 
+a guest VM. To do so you will need a patched qemu.  You can download the
+patch and getting started guid at :
+
+https://01.org/packet-processing/downloads
+
+A general rule of thumb for better performance is that the client 
+application shouldn't be assigned the same dpdk core mask "-c" as 
+the vswitchd.
+
+
+
 Restrictions:
 -------------
 
@@ -202,6 +240,11 @@ Restrictions:
     device queues configured.
   - Work with 1500 MTU, needs few changes in DPDK lib to fix this issue.
   - Currently DPDK port does not make use any offload functionality.
+  ivshmem
+  - The shared memory is currently restricted to the use of a 1GB
+   huge pages.  
+  - All huge pages are shared amongst the host, clients, virtual 
+   machines etc.  
 
 Bug Reporting:
 --------------
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 6c281fe..aecf379 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -68,7 +68,7 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 /* By default, choose a priority in the middle. */
 #define NETDEV_RULE_PRIORITY 0x8000
 
-#define NR_THREADS 1
+#define NR_THREADS 4
 /* Use per thread recirc_depth to prevent recirculation loop. */
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index fbdb6b3..3208ad6 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -148,6 +148,22 @@ struct dpdk_tx_queue {
     struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
 };
 
+/* dpdk has no way to remove dpdk ring ethernet devices
+   so we have to keep them around once they've been created
+*/
+
+static struct list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
+    = LIST_INITIALIZER(&dpdk_ring_list);
+
+struct dpdk_ring {
+    /* For the client rings */
+    struct rte_ring *cring_tx;
+    struct rte_ring *cring_rx;
+    int port_id; /* dpdkr port id */
+    int eth_port_id; /* ethernet device port id */
+    struct list list_node OVS_GUARDED_BY(mutex);
+};
+
 struct netdev_dpdk {
     struct netdev up;
     int port_id;
@@ -167,6 +183,7 @@ struct netdev_dpdk {
     uint8_t hwaddr[ETH_ADDR_LEN];
     enum netdev_flags flags;
 
+
     struct rte_eth_link link;
     int link_reset_cnt;
 
@@ -179,6 +196,11 @@ struct netdev_rxq_dpdk {
     int port_id;
 };
 
+struct dpdk_class {
+    const char *dpif_port;
+    struct netdev_class netdev_class;
+};
+
 static int netdev_dpdk_construct(struct netdev *);
 
 static bool
@@ -573,6 +595,7 @@ dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
     if (txq->count == 0) {
         return;
     }
+
     rte_spinlock_lock(&txq->tx_lock);
     nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
     if (nb_tx != txq->count) {
@@ -595,6 +618,7 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c)
 
     dpdk_queue_flush(dev, rxq_->queue_id);
 
+
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
                              (struct rte_mbuf **) packets,
                              MIN((int)NETDEV_MAX_RX_BATCH,
@@ -1111,68 +1135,199 @@ dpdk_class_init(void)
     return 0;
 }
 
-static struct netdev_class netdev_dpdk_class = {
-    "dpdk",
-    dpdk_class_init,            /* init */
-    NULL,                       /* netdev_dpdk_run */
-    NULL,                       /* netdev_dpdk_wait */
-
-    netdev_dpdk_alloc,
-    netdev_dpdk_construct,
-    netdev_dpdk_destruct,
-    netdev_dpdk_dealloc,
-    netdev_dpdk_get_config,
-    NULL,                       /* netdev_dpdk_set_config */
-    NULL,                       /* get_tunnel_config */
-
-    netdev_dpdk_send,           /* send */
-    NULL,                       /* send_wait */
-
-    netdev_dpdk_set_etheraddr,
-    netdev_dpdk_get_etheraddr,
-    netdev_dpdk_get_mtu,
-    netdev_dpdk_set_mtu,
-    netdev_dpdk_get_ifindex,
-    netdev_dpdk_get_carrier,
-    netdev_dpdk_get_carrier_resets,
-    netdev_dpdk_set_miimon,
-    netdev_dpdk_get_stats,
-    netdev_dpdk_set_stats,
-    netdev_dpdk_get_features,
-    NULL,                       /* set_advertisements */
-
-    NULL,                       /* set_policing */
-    NULL,                       /* get_qos_types */
-    NULL,                       /* get_qos_capabilities */
-    NULL,                       /* get_qos */
-    NULL,                       /* set_qos */
-    NULL,                       /* get_queue */
-    NULL,                       /* set_queue */
-    NULL,                       /* delete_queue */
-    NULL,                       /* get_queue_stats */
-    NULL,                       /* queue_dump_start */
-    NULL,                       /* queue_dump_next */
-    NULL,                       /* queue_dump_done */
-    NULL,                       /* dump_queue_stats */
-
-    NULL,                       /* get_in4 */
-    NULL,                       /* set_in4 */
-    NULL,                       /* get_in6 */
-    NULL,                       /* add_router */
-    NULL,                       /* get_next_hop */
-    netdev_dpdk_get_status,
-    NULL,                       /* arp_lookup */
-
-    netdev_dpdk_update_flags,
-
-    netdev_dpdk_rxq_alloc,
-    netdev_dpdk_rxq_construct,
-    netdev_dpdk_rxq_destruct,
-    netdev_dpdk_rxq_dealloc,
-    netdev_dpdk_rxq_recv,
-    NULL,                       /* rxq_wait */
-    NULL,                       /* rxq_drain */
-};
+/* Client Rings */
+
+static int
+dpdkr_class_init(void)
+{
+  VLOG_INFO("Initialized dpdk client handlers:\n");
+  return 0;
+}
+
+static int
+netdev_dpdkr_construct(struct netdev *netdev_)
+{
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    struct dpdk_ring *ivshmem;
+    unsigned int port_no;
+    char *cport;
+    int err=0;
+    int found=0;
+
+    if (rte_eal_init_ret) {
+        return rte_eal_init_ret;
+    }
+
+    ovs_mutex_lock(&dpdk_mutex);
+    cport = netdev_->name + 5; /* Names always start with "dpdkr" */
+
+    if (strncmp(netdev_->name, "dpdkr", 5)) {
+        VLOG_ERR("Not a valid dpdkr name %s:\n",netdev_->name);
+        err = ENODEV;
+        goto unlock_dpdk;
+    }
+
+    port_no = strtol(cport, 0, 0); /* string must be null terminated */
+
+    ovs_mutex_init(&netdev->mutex);
+
+    ovs_mutex_lock(&netdev->mutex);
+    netdev->flags = 0;
+
+    netdev->mtu = ETHER_MTU;
+    netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
+
+    /* TODO: need to discover device node at run time. */
+    netdev->socket_id = SOCKET0;
+
+    netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
+    if (!netdev->dpdk_mp) {
+        VLOG_ERR("Unable to get memory pool\n");
+        err = ENOMEM;
+        goto unlock_dev;
+    }
+
+     /* look through our list to find the device */ 
+     LIST_FOR_EACH(ivshmem, list_node, &dpdk_ring_list){
+         if (ivshmem->port_id == port_no) {
+            VLOG_INFO("Found dpdk ring device %s:\n", netdev_->name);
+            found = 1;
+            break;
+         }
+    }
+    if (found) {
+        netdev->port_id = ivshmem->eth_port_id; /* really all that is needed */
+    } 
+    else {
+        /* Need to create the device rings */
+        char name[10];
+        
+        ivshmem = dpdk_rte_mzalloc(sizeof *ivshmem);
+        if (ivshmem == NULL)
+            goto unlock_dev;
+
+        snprintf(name,10,"%s_tx",netdev_->name);
+        ivshmem->cring_tx = rte_ring_create(name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+        if (ivshmem->cring_tx == NULL)
+        {
+            rte_free(ivshmem);
+            goto unlock_dev;
+        }
+
+        snprintf(name,10,"%s_rx",netdev_->name);
+        ivshmem->cring_rx = rte_ring_create(name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+        if (ivshmem->cring_rx == NULL)
+        {
+            rte_free(ivshmem);
+            goto unlock_dev;
+        }
+
+        err = rte_eth_from_rings(&ivshmem->cring_rx, 1, &ivshmem->cring_tx, 1, SOCKET0);
+        if (err < 0) {
+            rte_free(ivshmem);
+            goto unlock_dev; 
+         }
+
+        ivshmem->port_id = port_no;
+        ivshmem->eth_port_id = netdev->port_id = rte_eth_dev_count()-1 ;
+
+        list_push_back(&dpdk_ring_list, &ivshmem->list_node);
+    }
+
+    err = dpdk_eth_dev_init(netdev);
+    if (err) {
+        goto unlock_dev;
+    }
+
+    list_push_back(&dpdk_list, &netdev->list_node);
+
+    err = 0;
+unlock_dev:
+    ovs_mutex_unlock(&netdev->mutex);
+unlock_dpdk:
+    ovs_mutex_unlock(&dpdk_mutex);
+    return err;
+}
+
+static void
+netdev_dpdkr_destruct(struct netdev *netdev_)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
+
+    ovs_mutex_lock(&dev->mutex);
+    rte_eth_dev_stop(dev->port_id);
+
+    ovs_mutex_unlock(&dev->mutex);
+
+    ovs_mutex_lock(&dpdk_mutex);
+    list_remove(&dev->list_node);
+    dpdk_mp_put(dev->dpdk_mp);
+    ovs_mutex_unlock(&dpdk_mutex);
+
+    ovs_mutex_destroy(&dev->mutex);
+}
+
+#define DPDK_FUNCTIONS(NAME, INIT, CONSTRUCT, DECONSTRUCT)    \
+    INIT,                       /* init */                    \
+    NULL,                       /* netdev_dpdk_run */         \
+    NULL,                       /* netdev_dpdk_wait */        \
+                                                              \
+    netdev_dpdk_alloc,                                        \
+    CONSTRUCT,                                                \
+    DECONSTRUCT,                                              \
+    netdev_dpdk_dealloc,                                      \
+    netdev_dpdk_get_config,                                   \
+    NULL,                       /* netdev_dpdk_set_config */  \
+    NULL,                       /* get_tunnel_config */       \
+                                                              \
+    netdev_dpdk_send,           /* send */                    \
+    NULL,                       /* send_wait */               \
+                                                              \
+    netdev_dpdk_set_etheraddr,                                \
+    netdev_dpdk_get_etheraddr,                                \
+    netdev_dpdk_get_mtu,                                      \
+    netdev_dpdk_set_mtu,                                      \
+    netdev_dpdk_get_ifindex,                                  \
+    netdev_dpdk_get_carrier,                                  \
+    netdev_dpdk_get_carrier_resets,                           \
+    netdev_dpdk_set_miimon,                                   \
+    netdev_dpdk_get_stats,                                    \
+    netdev_dpdk_set_stats,                                    \
+    netdev_dpdk_get_features,                                 \
+    NULL,                       /* set_advertisements */      \
+                                                              \
+    NULL,                       /* set_policing */            \
+    NULL,                       /* get_qos_types */           \
+    NULL,                       /* get_qos_capabilities */    \
+    NULL,                       /* get_qos */                 \
+    NULL,                       /* set_qos */                 \
+    NULL,                       /* get_queue */               \
+    NULL,                       /* set_queue */               \
+    NULL,                       /* delete_queue */            \
+    NULL,                       /* get_queue_stats */         \
+    NULL,                       /* queue_dump_start */        \
+    NULL,                       /* queue_dump_next */         \
+    NULL,                       /* queue_dump_done */         \
+    NULL,                       /* dump_queue_stats */        \
+                                                              \
+    NULL,                       /* get_in4 */                 \
+    NULL,                       /* set_in4 */                 \
+    NULL,                       /* get_in6 */                 \
+    NULL,                       /* add_router */              \
+    NULL,                       /* get_next_hop */            \
+    netdev_dpdk_get_status,                                   \
+    NULL,                       /* arp_lookup */              \
+                                                              \
+    netdev_dpdk_update_flags,                                 \
+                                                              \
+    netdev_dpdk_rxq_alloc,                                    \
+    netdev_dpdk_rxq_construct,                                \
+    netdev_dpdk_rxq_destruct,                                 \
+    netdev_dpdk_rxq_dealloc,                                  \
+    netdev_dpdk_rxq_recv,                                     \
+    NULL,                       /* rx_wait */                 \
+    NULL,                       /* rxq_drain */               
+
 
 int
 dpdk_init(int argc, char **argv)
@@ -1196,10 +1351,32 @@ dpdk_init(int argc, char **argv)
     return result;
 }
 
+#define DPDK_CLASS(NAME, DPIF_PORT, INIT, CONSTRUCT, DECONSTRUCT)  \
+  { DPIF_PORT,                                                  \
+  { NAME, DPDK_FUNCTIONS(NAME, INIT, CONSTRUCT, DECONSTRUCT) }}                        
+
 void
 netdev_dpdk_register(void)
 {
-    netdev_register_provider(&netdev_dpdk_class);
+
+     /* The name of the dpif_port should be short enough to accomodate adding
+      * a port number to the end if one is necessary. 
+      */
+
+     static const struct dpdk_class dpdk_classes[] = {
+         DPDK_CLASS("dpdk", "dpdk_class", dpdk_class_init, netdev_dpdk_construct, netdev_dpdk_destruct),
+         DPDK_CLASS("dpdkr", "dpdkr_class", dpdkr_class_init, netdev_dpdkr_construct, netdev_dpdkr_destruct)
+     };
+     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+ 
+     if (ovsthread_once_start(&once)) {
+         int i;
+ 
+         for (i = 0; i < ARRAY_SIZE(dpdk_classes); i++) {
+             netdev_register_provider(&dpdk_classes[i].netdev_class);
+         }
+         ovsthread_once_done(&once);
+     }
 }
 
 int
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index 2807310..8cb715a 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -10,6 +10,7 @@
 #include <rte_eal.h>
 #include <rte_debug.h>
 #include <rte_ethdev.h>
+#include <rte_eth_ring.h>
 #include <rte_errno.h>
 #include <rte_memzone.h>
 #include <rte_memcpy.h>
diff --git a/lib/netdev.c b/lib/netdev.c
index 07cda42..fe6ea69 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -98,7 +98,8 @@ netdev_n_rxq(const struct netdev *netdev)
 bool
 netdev_is_pmd(const struct netdev *netdev)
 {
-    return !strcmp(netdev->netdev_class->type, "dpdk");
+    return (!strcmp(netdev->netdev_class->type, "dpdk") ||
+            !strcmp(netdev->netdev_class->type, "dpdkr"));
 }
 
 static void
diff --git a/tests/automake.mk b/tests/automake.mk
index 9354fad..c4ea63f 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -207,6 +207,11 @@ tests/idltest.ovsidl: $(IDLTEST_IDL_FILES)
 
 tests/idltest.c: tests/idltest.h
 
+noinst_PROGRAMS += tests/ovsclient
+tests_ovsclient_SOURCES = \
+	tests/ovs_client/ovs_client.c
+tests_ovsclient_LDADD = lib/libopenvswitch.la $(LIBS)
+
 noinst_PROGRAMS += tests/ovstest
 tests_ovstest_SOURCES = \
 	tests/ovstest.c \
diff --git a/tests/ovs_client/ovs_client.c b/tests/ovs_client/ovs_client.c
new file mode 100644
index 0000000..07f49ed
--- /dev/null
+++ b/tests/ovs_client/ovs_client.c
@@ -0,0 +1,217 @@
+/*
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <getopt.h>
+
+#include <config.h>
+#include <rte_config.h>
+#include <rte_mbuf.h>
+#include <rte_ether.h>
+#include <rte_string_fns.h>
+#include <rte_ip.h>
+#include <rte_byteorder.h>
+
+/* Number of packets to attempt to read from queue */
+#define PKT_READ_SIZE  ((uint16_t)32)
+
+/* define common names for structures shared between ovs_dpdk and client */
+#define MP_CLIENT_RXQ_NAME "dpdkr%u_tx"
+#define MP_CLIENT_TXQ_NAME "dpdkr%u_rx"
+
+#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1
+
+#define BASE_10 10
+
+/* our client id number - tells us which rx queue to read, and tx 
+ * queue to write to. */
+static uint8_t client_id = 0;
+
+int no_pkt;
+int pkt;
+
+/*
+ * Given the rx queue name template above, get the queue name
+ */
+static inline const char *
+get_rx_queue_name(unsigned id)
+{
+	/* buffer for return value. Size calculated by %u being replaced
+	 * by maximum 3 digits (plus an extra byte for safety) */
+	static char buffer[sizeof(MP_CLIENT_RXQ_NAME) + 2];
+
+	rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_RXQ_NAME, id);
+	return buffer;
+}
+
+/*
+ * Given the tx queue name template above, get the queue name
+ */
+static inline const char *
+get_tx_queue_name(unsigned id)
+{
+	/* buffer for return value. Size calculated by %u being replaced
+	 * by maximum 3 digits (plus an extra byte for safety) */
+	static char buffer[sizeof(MP_CLIENT_TXQ_NAME) + 2];
+
+	rte_snprintf(buffer, sizeof(buffer) - 1, MP_CLIENT_TXQ_NAME, id);
+	return buffer;
+}
+
+/*
+ * print a usage message
+ */
+static void
+usage(const char *progname)
+{
+	printf("\nUsage: %s [EAL args] -- -n <client_id>\n", progname);
+}
+
+/*
+ * Convert the client id number from a string to an int.
+ */
+static int
+parse_client_num(const char *client)
+{
+	char *end = NULL;
+	unsigned long temp = 0;
+
+	if (client == NULL || *client == '\0')
+		return -1;
+
+	temp = strtoul(client, &end, BASE_10);
+	/* If valid string argument is provided, terminating '/0' character
+	 * is stored in 'end' */
+	if (end == NULL || *end != '\0')
+		return -1;
+
+	client_id = (uint8_t)temp;
+	return 0;
+}
+
+/*
+ * Parse the application arguments to the client app.
+ */
+static int
+parse_app_args(int argc, char *argv[])
+{
+	int option_index = 0, opt = 0;
+	char **argvopt = argv;
+	const char *progname = NULL;
+	static struct option lgopts[] = { 
+		{NULL, 0, 0, 0 }
+	};
+	progname = argv[0];
+
+	while ((opt = getopt_long(argc, argvopt, "n:s:d:", lgopts,
+		&option_index)) != EOF){
+		switch (opt){
+			case 'n':
+				if (parse_client_num(optarg) != 0){
+					usage(progname);
+					return -1;
+				}
+				break;
+			default:
+				usage(progname);
+				return -1;
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * Application main function - loops through
+ * receiving and processing packets. Never returns
+ */
+int
+main(int argc, char *argv[])
+{
+	struct rte_ring *rx_ring = NULL;
+	struct rte_ring *tx_ring = NULL;
+	int retval = 0;
+	void *pkts[PKT_READ_SIZE];
+	int rslt = 0;
+
+	if ((retval = rte_eal_init(argc, argv)) < 0)
+		return -1;
+
+	argc -= retval;
+	argv += retval;
+
+	if (parse_app_args(argc, argv) < 0)
+		rte_exit(EXIT_FAILURE, "Invalid command-line arguments\n");
+
+	rx_ring = rte_ring_lookup(get_rx_queue_name(client_id));
+	if (rx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot get RX ring - is server process running?\n");
+
+	tx_ring = rte_ring_lookup(get_tx_queue_name(client_id));
+	if (tx_ring == NULL)
+		rte_exit(EXIT_FAILURE, "Cannot get TX ring - is server process running?\n");
+
+	RTE_LOG(INFO, APP, "Finished Process Init.\n");
+
+	printf("\nClient process %d handling packets\n", client_id);
+	printf("[Press Ctrl-C to quit ...]\n");
+
+	for (;;) {
+		unsigned rx_pkts = PKT_READ_SIZE;
+
+		/* try dequeuing max possible packets first, if that fails, get the
+		 * most we can. Loop body should only execute once, maximum */
+		while (rx_pkts > 0 &&
+				unlikely(rte_ring_dequeue_bulk(rx_ring, pkts, rx_pkts) != 0))
+			rx_pkts = (uint16_t)RTE_MIN(rte_ring_count(rx_ring), PKT_READ_SIZE);
+
+                if(rx_pkts>0)
+                {
+
+                    pkt++;
+
+		    /* blocking enqueue */
+		    do {
+			    rslt = rte_ring_enqueue_bulk(tx_ring, pkts, rx_pkts);
+		    } while (rslt == -ENOBUFS);
+                }
+                else
+                   no_pkt++;
+
+               if (!(pkt %  100000))
+               {
+                     printf("pkt %d %d\n", pkt, no_pkt);
+                     pkt=no_pkt=0;
+               }
+	}
+}



More information about the dev mailing list