[ovs-dev] [dpdk patch 5/8] dpif-netdev: Create 'number of dpdk ifaces on cpu socket' pmd threads for each cpu socket.
Alex Wang
alexw at nicira.com
Tue Aug 12 04:56:40 UTC 2014
The pmd threads are pinned to available cpu cores on the
corresponding cpu socket. Note, core 0 is not pinnable
and is reserved for all non-pmd threads.
Signed-off-by: Alex Wang <alexw at nicira.com>
---
lib/dpif-netdev.c | 254 +++++++++++++++++++++++++++++++++++++++++------------
lib/dpif-netdev.h | 2 +-
lib/netdev-dpdk.c | 40 ++++++---
lib/netdev-dpdk.h | 15 ++++
4 files changed, 244 insertions(+), 67 deletions(-)
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index c637d9f..14784bf 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -52,6 +52,7 @@
#include "odp-util.h"
#include "ofp-print.h"
#include "ofpbuf.h"
+#include "ovs-numa.h"
#include "ovs-rcu.h"
#include "packet-dpif.h"
#include "packets.h"
@@ -71,6 +72,7 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
#define NETDEV_RULE_PRIORITY 0x8000
#define FLOW_DUMP_MAX_BATCH 50
+
/* Use per thread recirc_depth to prevent recirculation loop. */
#define MAX_RECIRC_DEPTH 5
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
@@ -142,11 +144,9 @@ struct dp_netdev {
struct fat_rwlock upcall_rwlock;
exec_upcall_cb *upcall_cb; /* Callback function for executing upcalls. */
- /* Forwarding threads. */
- struct latch exit_latch;
- struct pmd_thread *pmd_threads;
- size_t n_pmd_threads;
- int pmd_count;
+ /* Per-cpu-socket struct for configuring pmd threads. */
+ struct pmd_socket *pmd_sockets;
+ int n_pmd_sockets;
};
static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -281,6 +281,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
const struct dp_netdev_flow *);
static void dp_netdev_actions_free(struct dp_netdev_actions *);
+/* Represents the PMD configuration on a cpu socket. */
+struct pmd_socket {
+ struct dp_netdev *dp;
+ struct latch exit_latch;
+ struct pmd_thread *pmd_threads;
+ int socket_id;
+ int n_pmd_threads;
+};
+
/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
* the performance overhead of interrupt processing. Therefore netdev can
* not implement rx-wait for these devices. dpif-netdev needs to poll
@@ -293,9 +302,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
* table, and executes the actions it finds.
**/
struct pmd_thread {
- struct dp_netdev *dp;
+ struct pmd_socket *socket;
pthread_t thread;
- int id;
+ int index;
+ int core_id;
atomic_uint change_seq;
};
@@ -335,8 +345,10 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
struct dpif_packet **packets, int cnt,
odp_port_t port_no);
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
static void dp_netdev_disable_upcall(struct dp_netdev *);
+static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *);
+static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id);
+static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id);
static struct dpif_netdev *
dpif_netdev_cast(const struct dpif *dpif)
@@ -450,7 +462,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
OVS_REQUIRES(dp_netdev_mutex)
{
struct dp_netdev *dp;
- int error;
+ int n_sockets, error;
dp = xzalloc(sizeof *dp);
shash_add(&dp_netdevs, name, dp);
@@ -469,13 +481,32 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
ovs_mutex_init(&dp->port_mutex);
cmap_init(&dp->ports);
dp->port_seq = seq_create();
- latch_init(&dp->exit_latch);
fat_rwlock_init(&dp->upcall_rwlock);
/* Disable upcalls by default. */
dp_netdev_disable_upcall(dp);
dp->upcall_cb = NULL;
+ /* Reserves the core 0 for main thread. */
+ ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
+
+ /* Creates 'pmd_socket' struct for each cpu socket. */
+ n_sockets = ovs_numa_get_n_sockets();
+ if (n_sockets != OVS_SOCKET_UNSPEC) {
+ int i;
+
+ dp->n_pmd_sockets = n_sockets;
+ dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets);
+ for (i = 0; i < dp->n_pmd_sockets; i++) {
+ dp->pmd_sockets[i].dp = dp;
+ dp->pmd_sockets[i].socket_id = i;
+ latch_init(&dp->pmd_sockets[i].exit_latch);
+ }
+ } else {
+ dp->n_pmd_sockets = 0;
+ dp->pmd_sockets = NULL;
+ }
+
ovs_mutex_lock(&dp->port_mutex);
error = do_add_port(dp, name, "internal", ODPP_LOCAL);
ovs_mutex_unlock(&dp->port_mutex);
@@ -525,8 +556,8 @@ dp_netdev_free(struct dp_netdev *dp)
shash_find_and_delete(&dp_netdevs, dp->name);
- dp_netdev_set_pmd_threads(dp, 0);
- free(dp->pmd_threads);
+ dp_netdev_destroy_all_pmd_sockets(dp);
+ free(dp->pmd_sockets);
dp_netdev_flow_flush(dp);
ovs_mutex_lock(&dp->port_mutex);
@@ -547,7 +578,6 @@ dp_netdev_free(struct dp_netdev *dp)
seq_destroy(dp->port_seq);
cmap_destroy(&dp->ports);
fat_rwlock_destroy(&dp->upcall_rwlock);
- latch_destroy(&dp->exit_latch);
free(CONST_CAST(char *, dp->name));
free(dp);
}
@@ -618,12 +648,17 @@ dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
{
int i;
- for (i = 0; i < dp->n_pmd_threads; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
- int id;
+ for (i = 0; i < dp->n_pmd_sockets; i++) {
+ struct pmd_socket *s = &dp->pmd_sockets[i];
+ int j;
- atomic_add(&f->change_seq, 1, &id);
- }
+ for (j = 0; j < s->n_pmd_threads; j++) {
+ struct pmd_thread *f = &s->pmd_threads[j];
+ int id;
+
+ atomic_add(&f->change_seq, 1, &id);
+ }
+ }
}
static uint32_t
@@ -690,10 +725,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
}
port->sf = sf;
- if (netdev_is_pmd(netdev)) {
- dp->pmd_count++;
- dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
- dp_netdev_reload_pmd_threads(dp);
+ if (netdev_is_pmd(netdev) && dp->pmd_sockets) {
+ dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev));
}
ovs_refcount_init(&port->ref_cnt);
@@ -834,6 +867,23 @@ get_port_by_name(struct dp_netdev *dp,
return ENOENT;
}
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on cpu socket 'socket_id'. */
+static bool
+has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id)
+{
+ struct dp_netdev_port *port;
+
+ CMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_dpdk_get_socket_id(port->netdev) == socket_id) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
static void
do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
OVS_REQUIRES(dp->port_mutex)
@@ -841,7 +891,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
seq_change(dp->port_seq);
if (netdev_is_pmd(port->netdev)) {
- dp_netdev_reload_pmd_threads(dp);
+ int socket_id = netdev_dpdk_get_socket_id(port->netdev);
+
+ /* If there is no netdev on the cpu socket, unsets the pmd threads
+ * for that cpu socket. Else, just reloads the queues. */
+ if (!has_pmd_port_for_socket(dp, socket_id)) {
+ dp_netdev_unset_pmd_threads(dp, socket_id);
+ } else {
+ dp_netdev_reload_pmd_threads(dp);
+ }
}
port_unref(port);
@@ -1614,14 +1672,17 @@ struct rxq_poll {
struct netdev_rxq *rx;
};
+struct non_local_pmd_dev {
+ struct netdev *dev;
+};
+
static int
pmd_load_queues(struct pmd_thread *f,
struct rxq_poll **ppoll_list, int poll_cnt)
{
- struct dp_netdev *dp = f->dp;
+ struct pmd_socket *s = f->socket;
struct rxq_poll *poll_list = *ppoll_list;
struct dp_netdev_port *port;
- int id = f->id;
int index;
int i;
@@ -1633,12 +1694,13 @@ pmd_load_queues(struct pmd_thread *f,
poll_cnt = 0;
index = 0;
- CMAP_FOR_EACH (port, node, &f->dp->ports) {
- if (netdev_is_pmd(port->netdev)) {
+ CMAP_FOR_EACH (port, node, &s->dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) {
int i;
for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- if ((index % dp->n_pmd_threads) == id) {
+ if ((index % s->n_pmd_threads) == f->index) {
poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
port_ref(port);
@@ -1655,23 +1717,56 @@ pmd_load_queues(struct pmd_thread *f,
return poll_cnt;
}
+static int
+pmd_get_non_local_pmd_dev(struct pmd_thread *f,
+ struct non_local_pmd_dev **pdev_list, int dev_cnt)
+{
+ struct pmd_socket *s = f->socket;
+ struct non_local_pmd_dev *dev_list = *pdev_list;
+ struct dp_netdev_port *port;
+ int i;
+
+ for (i = 0; i < dev_cnt; i++) {
+ netdev_close(dev_list[i].dev);
+ }
+
+ dev_cnt = 0;
+
+ CMAP_FOR_EACH (port, node, &s->dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_dpdk_get_socket_id(port->netdev) != s->socket_id) {
+ dev_list = xrealloc(dev_list, sizeof *dev_list * (dev_cnt + 1));
+ netdev_ref(port->netdev);
+ dev_list[dev_cnt].dev = port->netdev;
+ dev_cnt++;
+ }
+ }
+
+ *pdev_list = dev_list;
+ return dev_cnt;
+}
+
static void *
pmd_thread_main(void *f_)
{
struct pmd_thread *f = f_;
- struct dp_netdev *dp = f->dp;
+ struct dp_netdev *dp = f->socket->dp;
unsigned int lc = 0;
struct rxq_poll *poll_list;
+ struct non_local_pmd_dev *dev_list;
unsigned int port_seq;
- int poll_cnt;
+ int poll_cnt, dev_cnt;
int i;
poll_cnt = 0;
+ dev_cnt = 0;
poll_list = NULL;
+ dev_list = NULL;
- pmd_thread_setaffinity_cpu(f->id);
+ pmd_thread_setaffinity_cpu(f->core_id);
reload:
poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+ dev_cnt = pmd_get_non_local_pmd_dev(f, &dev_list, dev_cnt);
atomic_read(&f->change_seq, &port_seq);
for (;;) {
@@ -1682,6 +1777,10 @@ reload:
dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
}
+ for (i = 0; i < dev_cnt; i++) {
+ netdev_dpdk_flush_non_local(dev_list[i].dev, f->core_id);
+ }
+
if (lc++ > 1024) {
ovsrcu_quiesce();
@@ -1696,7 +1795,7 @@ reload:
}
}
- if (!latch_is_set(&f->dp->exit_latch)){
+ if (!latch_is_set(&f->socket->exit_latch)){
goto reload;
}
@@ -1739,40 +1838,87 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
}
static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
{
int i;
- if (n == dp->n_pmd_threads) {
- return;
+ /* Exits all existing pmd threads. */
+ for (i = 0; i < dp->n_pmd_sockets; i++) {
+ struct pmd_socket *s = &dp->pmd_sockets[i];
+
+ dp_netdev_unset_pmd_threads(dp, i);
+ latch_destroy(&s->exit_latch);
}
+}
- /* Stop existing threads. */
- latch_set(&dp->exit_latch);
- dp_netdev_reload_pmd_threads(dp);
- for (i = 0; i < dp->n_pmd_threads; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
+/* Deletes all pmd threads on cpu socket 'socket_id'. */
+static void
+dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+ struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+ int i;
+
+ if (s->n_pmd_threads) {
+ latch_set(&s->exit_latch);
+ dp_netdev_reload_pmd_threads(dp);
+ for (i = 0; i < s->n_pmd_threads; i++) {
+ struct pmd_thread *f = &s->pmd_threads[i];
- xpthread_join(f->thread, NULL);
+ ovs_numa_unpin_core(f->core_id);
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&s->exit_latch);
+ free(s->pmd_threads);
+ s->pmd_threads = NULL;
+ s->n_pmd_threads = 0;
}
- latch_poll(&dp->exit_latch);
- free(dp->pmd_threads);
+}
+
+/* Checks the cpu socket id of 'netdev' and starts pmd threads for
+ * the cpu socket. */
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+ struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+
+ ovs_assert(ovs_numa_get_n_sockets() != OVS_SOCKET_UNSPEC
+ && ovs_numa_get_n_cores() != OVS_CORE_UNSPEC);
+
+ /* If there are already pmd threads create for the cpu socket
+ * in which 'netdev' is on, do nothing. Else, creates the
+ * pmd threads for the socket. */
+ if (!s->n_pmd_threads) {
+ int n_devs = netdev_dpdk_get_n_devs_on_socket(socket_id);
+ int n_unpinned_cores =
+ ovs_numa_get_n_unpinned_cores_on_socket(socket_id);
+ int n_threads, i;
+
+ if (!n_unpinned_cores) {
+ VLOG_ERR("Cannot create pmd threads due to out of unpinned "
+ "cores on socket");
+ return;
+ }
- /* Start new threads. */
- dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
- dp->n_pmd_threads = n;
+ /* Starts new pmd threads on the cpu socket. */
+ n_threads = MIN(n_devs, n_unpinned_cores);
+ s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads);
+ s->n_pmd_threads = n_threads;
- for (i = 0; i < n; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
+ for (i = 0; i < s->n_pmd_threads; i++) {
+ struct pmd_thread *f = &s->pmd_threads[i];
- f->dp = dp;
- f->id = i;
- atomic_store(&f->change_seq, 1);
+ f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id);
+ f->index = i;
+ f->socket = s;
+ atomic_store(&f->change_seq, 1);
- /* Each thread will distribute all devices rx-queues among
- * themselves. */
- f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+ }
}
+
+ dp_netdev_reload_pmd_threads(dp);
}
diff --git a/lib/dpif-netdev.h b/lib/dpif-netdev.h
index 50c1198..f501f7c 100644
--- a/lib/dpif-netdev.h
+++ b/lib/dpif-netdev.h
@@ -40,7 +40,7 @@ static inline void dp_packet_pad(struct ofpbuf *b)
}
}
-#define NR_PMD_THREADS 1
+#define NON_PMD_CORE_ID 0
#ifdef __cplusplus
}
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index 432524f..012ee68 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -401,7 +401,6 @@ dpdk_get_n_devs(int socket_id)
count++;
}
}
- ovs_assert(count);
return count;
}
@@ -508,14 +507,12 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk
rte_spinlock_init(&netdev->tx_q[i].tx_lock);
}
- netdev->port_id = port_no;
-
netdev->flags = 0;
netdev->mtu = ETHER_MTU;
netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
- /* XXX: need to discover device node at run time. */
- netdev->socket_id = SOCKET0;
+ netdev->socket_id = rte_eth_dev_socket_id(port_no);
+ netdev->port_id = port_no;
netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
if (!netdev->dpdk_mp) {
@@ -699,7 +696,11 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
int nb_rx;
- dpdk_queue_flush(dev, rxq_->queue_id);
+ /* There is only one tx queue for this core. Do not flush other
+ * queueus. */
+ if (rxq_->queue_id == rte_lcore_id()) {
+ dpdk_queue_flush(dev, rxq_->queue_id);
+ }
nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
(struct rte_mbuf **) packets,
@@ -1496,6 +1497,23 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_)
}
}
+/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */
+int
+netdev_dpdk_get_n_devs_on_socket(int socket_id)
+{
+ return dpdk_get_n_devs(socket_id);
+}
+
+/* Flushes pkts sent from the calling thread to other dpdk ifaces that are
+ * not on the same cpu socket. */
+void
+netdev_dpdk_flush_non_local(struct netdev *netdev_, int qid)
+{
+ struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
+
+ dpdk_queue_flush(dev, qid);
+}
+
int
pmd_thread_setaffinity_cpu(int cpu)
{
@@ -1510,7 +1528,8 @@ pmd_thread_setaffinity_cpu(int cpu)
return err;
}
/* lcore_id 0 is reseved for use by non pmd threads. */
- RTE_PER_LCORE(_lcore_id) = cpu + 1;
+ ovs_assert(cpu);
+ RTE_PER_LCORE(_lcore_id) = cpu;
return 0;
}
@@ -1518,16 +1537,13 @@ pmd_thread_setaffinity_cpu(int cpu)
void
thread_set_nonpmd(void)
{
- /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
- * for non pmd threads */
- BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
/* We have to use 0 to allow non pmd threads to perform certain DPDK
* operations, like rte_eth_dev_configure(). */
- RTE_PER_LCORE(_lcore_id) = 0;
+ RTE_PER_LCORE(_lcore_id) = NON_PMD_CORE_ID;
}
static bool
thread_is_pmd(void)
{
- return rte_lcore_id() != 0;
+ return rte_lcore_id() != NON_PMD_CORE_ID;
}
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index 75f6a0b..5e8f296 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -24,6 +24,8 @@ struct netdev;
int dpdk_init(int argc, char **argv);
void netdev_dpdk_register(void);
int netdev_dpdk_get_socket_id(const struct netdev *);
+int netdev_dpdk_get_n_devs_on_socket(int socket_id);
+void netdev_dpdk_flush_non_local(struct netdev *, int qid);
void free_dpdk_buf(struct dpif_packet *);
int pmd_thread_setaffinity_cpu(int cpu);
void thread_set_nonpmd(void);
@@ -48,6 +50,19 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED)
return -1;
}
+static inline int
+netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED)
+{
+ return -1;
+}
+
+static inline void
+netdev_dpdk_flush_non_local(struct netdev *netdev OVS_UNUSED,
+ int qid OVS_UNUSED)
+{
+ /* Nothing */
+}
+
static inline void
free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
{
--
1.7.9.5
More information about the dev
mailing list