[ovs-dev] [PATCH V2 5/5] dpif-netdev: Create 'number of dpdk ifaces on cpu socket' pmd threads for each cpu socket.
Alex Wang
alexw at nicira.com
Wed Jun 25 01:40:06 UTC 2014
The pmd threads are pinned to available cpu cores on the
corresponding cpu socket.
Signed-off-by: Alex Wang <alexw at nicira.com>
---
PATCH -> V2:
- Add latch_destory().
- Use 'int' for cpu socket/core id.
---
lib/dpif-netdev.c | 178 ++++++++++++++++++++++++++++++++++++++---------------
lib/netdev-dpdk.c | 27 +++++++-
lib/netdev-dpdk.h | 14 +++++
3 files changed, 167 insertions(+), 52 deletions(-)
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 4dcc268..f36ff2b 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -51,6 +51,7 @@
#include "odp-util.h"
#include "ofp-print.h"
#include "ofpbuf.h"
+#include "ovs-numa.h"
#include "ovs-rcu.h"
#include "packet-dpif.h"
#include "packets.h"
@@ -69,7 +70,6 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
/* By default, choose a priority in the middle. */
#define NETDEV_RULE_PRIORITY 0x8000
-#define NR_THREADS 1
/* Use per thread recirc_depth to prevent recirculation loop. */
#define MAX_RECIRC_DEPTH 5
DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
@@ -166,11 +166,9 @@ struct dp_netdev {
struct cmap ports;
struct seq *port_seq; /* Incremented whenever a port changes. */
- /* Forwarding threads. */
- struct latch exit_latch;
- struct pmd_thread *pmd_threads;
- size_t n_pmd_threads;
- int pmd_count;
+ /* Per-cpu-socket struct for configuring pmd threads. */
+ struct pmd_socket *pmd_sockets;
+ int n_pmd_sockets;
};
static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -305,6 +303,15 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
const struct dp_netdev_flow *);
static void dp_netdev_actions_free(struct dp_netdev_actions *);
+/* Represents the PMD configuration on a cpu socket. */
+struct pmd_socket {
+ struct dp_netdev *dp;
+ struct latch exit_latch;
+ struct pmd_thread *pmd_threads;
+ int socket_id;
+ int n_pmd_threads;
+};
+
/* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate
* the performance overhead of interrupt processing. Therefore netdev can
* not implement rx-wait for these devices. dpif-netdev needs to poll
@@ -317,9 +324,10 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
* table, and executes the actions it finds.
**/
struct pmd_thread {
- struct dp_netdev *dp;
+ struct pmd_socket *socket;
pthread_t thread;
int id;
+ int core_id;
atomic_uint change_seq;
};
@@ -359,7 +367,9 @@ static void dp_netdev_port_input(struct dp_netdev *dp,
struct dpif_packet **packets, int cnt,
odp_port_t port_no);
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
+static void dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *);
+static void dp_netdev_unset_pmd_threads(struct dp_netdev *, int socket_id);
+static void dp_netdev_set_pmd_threads(struct dp_netdev *, int socket_id);
static struct dpif_netdev *
dpif_netdev_cast(const struct dpif *dpif)
@@ -473,7 +483,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
OVS_REQUIRES(dp_netdev_mutex)
{
struct dp_netdev *dp;
- int error;
+ int n_sockets, error, i;
dp = xzalloc(sizeof *dp);
shash_add(&dp_netdevs, name, dp);
@@ -494,7 +504,15 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
ovs_mutex_init(&dp->port_mutex);
cmap_init(&dp->ports);
dp->port_seq = seq_create();
- latch_init(&dp->exit_latch);
+
+ n_sockets = ovs_numa_get_n_sockets();
+ dp->n_pmd_sockets = n_sockets != OVS_SOCKET_UNSPEC ? n_sockets : 0;
+ dp->pmd_sockets = xzalloc(dp->n_pmd_sockets * sizeof *dp->pmd_sockets);
+ for (i = 0; i < dp->n_pmd_sockets; i++) {
+ dp->pmd_sockets[i].dp = dp;
+ dp->pmd_sockets[i].socket_id = i;
+ latch_init(&dp->pmd_sockets[i].exit_latch);
+ }
ovs_mutex_lock(&dp->port_mutex);
error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -563,8 +581,8 @@ dp_netdev_free(struct dp_netdev *dp)
shash_find_and_delete(&dp_netdevs, dp->name);
- dp_netdev_set_pmd_threads(dp, 0);
- free(dp->pmd_threads);
+ dp_netdev_destroy_all_pmd_sockets(dp);
+ free(dp->pmd_sockets);
dp_netdev_flow_flush(dp);
ovs_mutex_lock(&dp->port_mutex);
@@ -590,7 +608,6 @@ dp_netdev_free(struct dp_netdev *dp)
ovs_mutex_destroy(&dp->flow_mutex);
seq_destroy(dp->port_seq);
cmap_destroy(&dp->ports);
- latch_destroy(&dp->exit_latch);
free(CONST_CAST(char *, dp->name));
free(dp);
}
@@ -659,12 +676,12 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
}
static void
-dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+dp_netdev_reload_pmd_threads(struct pmd_socket *s)
{
int i;
- for (i = 0; i < dp->n_pmd_threads; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
+ for (i = 0; i < s->n_pmd_threads; i++) {
+ struct pmd_thread *f = &s->pmd_threads[i];
int id;
atomic_add(&f->change_seq, 1, &id);
@@ -735,10 +752,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
}
port->sf = sf;
- if (netdev_is_pmd(netdev)) {
- dp->pmd_count++;
- dp_netdev_set_pmd_threads(dp, NR_THREADS);
- dp_netdev_reload_pmd_threads(dp);
+ if (netdev_is_pmd(netdev) && dp->n_pmd_sockets) {
+ dp_netdev_set_pmd_threads(dp, netdev_dpdk_get_socket_id(netdev));
}
ovs_refcount_init(&port->ref_cnt);
@@ -879,6 +894,23 @@ get_port_by_name(struct dp_netdev *dp,
return ENOENT;
}
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on cpu socket 'socket_id'. */
+static bool
+has_pmd_port_for_socket(struct dp_netdev *dp, int socket_id)
+{
+ struct dp_netdev_port *port;
+
+ CMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_dpdk_get_socket_id(port->netdev) == socket_id) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
static void
do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
OVS_REQUIRES(dp->port_mutex)
@@ -886,7 +918,15 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
seq_change(dp->port_seq);
if (netdev_is_pmd(port->netdev)) {
- dp_netdev_reload_pmd_threads(dp);
+ int socket_id = netdev_dpdk_get_socket_id(port->netdev);
+
+ /* If there is no netdev on the cpu socket, unsets the pmd threads
+ * for that cpu socket. Else, just reload the queues. */
+ if (!has_pmd_port_for_socket(dp, socket_id)) {
+ dp_netdev_unset_pmd_threads(dp, socket_id);
+ } else {
+ dp_netdev_reload_pmd_threads(&dp->pmd_sockets[socket_id]);
+ }
}
port_unref(port);
@@ -1828,7 +1868,7 @@ static int
pmd_load_queues(struct pmd_thread *f,
struct rxq_poll **ppoll_list, int poll_cnt)
{
- struct dp_netdev *dp = f->dp;
+ struct pmd_socket *s = f->socket;
struct rxq_poll *poll_list = *ppoll_list;
struct dp_netdev_port *port;
int id = f->id;
@@ -1843,12 +1883,13 @@ pmd_load_queues(struct pmd_thread *f,
poll_cnt = 0;
index = 0;
- CMAP_FOR_EACH (port, node, &f->dp->ports) {
- if (netdev_is_pmd(port->netdev)) {
+ CMAP_FOR_EACH (port, node, &s->dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_dpdk_get_socket_id(port->netdev) == s->socket_id) {
int i;
for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
- if ((index % dp->n_pmd_threads) == id) {
+ if ((index % s->n_pmd_threads) == id) {
poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
port_ref(port);
@@ -1869,7 +1910,7 @@ static void *
pmd_thread_main(void *f_)
{
struct pmd_thread *f = f_;
- struct dp_netdev *dp = f->dp;
+ struct dp_netdev *dp = f->socket->dp;
unsigned int lc = 0;
struct rxq_poll *poll_list;
unsigned int port_seq;
@@ -1879,7 +1920,7 @@ pmd_thread_main(void *f_)
poll_cnt = 0;
poll_list = NULL;
- pmd_thread_setaffinity_cpu(f->id);
+ pmd_thread_setaffinity_cpu(f->core_id);
reload:
poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
atomic_read(&f->change_seq, &port_seq);
@@ -1892,6 +1933,8 @@ reload:
dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx);
}
+ netdev_dpdk_flush_non_local(f->socket->socket_id, f->core_id);
+
if (lc++ > 1024) {
ovsrcu_quiesce();
@@ -1906,7 +1949,7 @@ reload:
}
}
- if (!latch_is_set(&f->dp->exit_latch)){
+ if (!latch_is_set(&f->socket->exit_latch)){
goto reload;
}
@@ -1919,40 +1962,75 @@ reload:
}
static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_destroy_all_pmd_sockets(struct dp_netdev *dp)
{
int i;
- if (n == dp->n_pmd_threads) {
- return;
+ /* Exits all existing pmd threads. */
+ for (i = 0; i < dp->n_pmd_sockets; i++) {
+ struct pmd_socket *s = &dp->pmd_sockets[i];
+
+ dp_netdev_unset_pmd_threads(dp, i);
+ latch_destroy(&s->exit_latch);
}
+}
+
+/* Deletes all pmd threads on cpu socket 'socket_id'. */
+static void
+dp_netdev_unset_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+ struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+ int i;
- /* Stop existing threads. */
- latch_set(&dp->exit_latch);
- dp_netdev_reload_pmd_threads(dp);
- for (i = 0; i < dp->n_pmd_threads; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
+ if (s->n_pmd_threads) {
+ latch_set(&s->exit_latch);
+ dp_netdev_reload_pmd_threads(s);
+ for (i = 0; i < s->n_pmd_threads; i++) {
+ struct pmd_thread *f = &s->pmd_threads[i];
- xpthread_join(f->thread, NULL);
+ ovs_numa_unpin_core(f->core_id);
+ xpthread_join(f->thread, NULL);
+ }
+ latch_poll(&s->exit_latch);
+ free(s->pmd_threads);
+ s->pmd_threads = NULL;
+ s->n_pmd_threads = 0;
}
- latch_poll(&dp->exit_latch);
- free(dp->pmd_threads);
+}
+
+/* Checks the cpu socket id of 'netdev' and starts pmd threads for
+ * the cpu socket. */
+static void
+dp_netdev_set_pmd_threads(struct dp_netdev *dp, int socket_id)
+{
+ struct pmd_socket *s = &dp->pmd_sockets[socket_id];
+
+ /* If there are already pmd threads create for the cpu socket
+ * in which 'netdev' is on, do nothing. Else, creates the
+ * pmd threads for the socket. */
+ if (!s->n_pmd_threads) {
+ int n_threads = netdev_dpdk_get_n_devs_on_socket(socket_id);
+ int i;
- /* Start new threads. */
- dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
- dp->n_pmd_threads = n;
+ /* Starts a new thread for the cpu socket. */
+ s->pmd_threads = xmalloc(n_threads * sizeof *s->pmd_threads);
+ s->n_pmd_threads = n_threads;
- for (i = 0; i < n; i++) {
- struct pmd_thread *f = &dp->pmd_threads[i];
+ for (i = 0; i < s->n_pmd_threads; i++) {
+ struct pmd_thread *f = &s->pmd_threads[i];
- f->dp = dp;
- f->id = i;
- atomic_store(&f->change_seq, 1);
+ f->core_id = ovs_numa_get_unpinned_core_on_socket(socket_id);
+ f->id = i;
+ f->socket = s;
+ atomic_store(&f->change_seq, 1);
- /* Each thread will distribute all devices rx-queues among
- * themselves. */
- f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+ /* Each thread will distribute all devices rx-queues among
+ * themselves. */
+ f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+ }
}
+
+ dp_netdev_reload_pmd_threads(s);
}
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index b0254e7..5a0ca76 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -491,8 +491,8 @@ netdev_dpdk_construct(struct netdev *netdev_)
netdev->mtu = ETHER_MTU;
netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
- /* TODO: need to discover device node at run time. */
- netdev->socket_id = SOCKET0;
+ netdev->socket_id = rte_eth_dev_socket_id(port_no);
+ CPU_SOCKET_ID_ASSERT(netdev->socket_id);
netdev->port_id = port_no;
netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
@@ -1308,6 +1308,29 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_)
}
}
+/* Returns the number of dpdk interfaces on cpu socket 'socket_id'. */
+int
+netdev_dpdk_get_n_devs_on_socket(int socket_id)
+{
+ CPU_SOCKET_ID_ASSERT(socket_id);
+
+ return dpdk_get_n_devs(socket_id);
+}
+
+/* For all dpdk ifaces not on the cpu socket 'socket_id', flushes the 'tx_q'.
+ * for this thread. */
+void
+netdev_dpdk_flush_non_local(int socket_id, int qid)
+{
+ struct netdev_dpdk *dev;
+
+ LIST_FOR_EACH (dev, list_node, &dpdk_list) {
+ if (dev->socket_id != socket_id) {
+ dpdk_queue_flush(dev, qid);
+ }
+ }
+}
+
int
pmd_thread_setaffinity_cpu(int cpu)
{
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index da507ce..fc6c217 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -25,6 +25,8 @@ struct netdev;
int dpdk_init(int argc, char **argv);
void netdev_dpdk_register(void);
int netdev_dpdk_get_socket_id(const struct netdev *);
+int netdev_dpdk_get_n_devs_on_socket(int socket_id);
+void netdev_dpdk_flush_non_local(int socket_id, int qid);
void free_dpdk_buf(struct dpif_packet *);
int pmd_thread_setaffinity_cpu(int cpu);
@@ -50,6 +52,18 @@ netdev_dpdk_get_socket_id(const struct netdev *netdev_ OVS_UNUSED)
return -1;
}
+static inline int
+netdev_dpdk_get_n_devs_on_socket(int socket_id OVS_UNUSED)
+{
+ return -1;
+}
+
+static inline void
+netdev_dpdk_flush_non_local(int socket_id OVS_UNUSED, int qid OVS_UNUSED)
+{
+ /* Nothing */
+}
+
static inline void
free_dpdk_buf(struct dpif_packet *buf OVS_UNUSED)
{
--
1.7.9.5
More information about the dev
mailing list