[ovs-dev] [PATCH] netdev-dummy: Add support for active stream
Andy Zhou
azhou at nicira.com
Thu Dec 19 23:22:21 UTC 2013
The netdev-dummy thus far only support passive connection. It will
listen for incoming connections requests. This patch allows
active stream to be configured as well.
This patch effectively allows a netdev-dummy port make point-to-point
connection to another, without being a patch port. This feature will
be useful in test cases for future commits.
Signed-off-by: Andy Zhou <azhou at nicira.com>
---
lib/netdev-dummy.c | 338 ++++++++++++++++++++++++++++++++++---------------
tests/ofproto-dpif.at | 58 +++++++--
2 files changed, 282 insertions(+), 114 deletions(-)
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index fd30454..856af1c 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -69,10 +69,15 @@ struct netdev_dummy {
unsigned int change_seq OVS_GUARDED;
int ifindex OVS_GUARDED;
+ /* Passive connections states. */
struct pstream *pstream OVS_GUARDED;
struct dummy_stream *streams OVS_GUARDED;
size_t n_streams OVS_GUARDED;
+ /* Active connection states. */
+ struct dummy_stream astream OVS_GUARDED;
+ bool astream_connected OVS_GUARDED;
+
FILE *tx_pcap, *rx_pcap OVS_GUARDED;
struct list rxes OVS_GUARDED; /* List of child "netdev_rx_dummy"s. */
@@ -118,105 +123,193 @@ netdev_rx_dummy_cast(const struct netdev_rx *rx)
}
static void
-netdev_dummy_run(void)
+dummy_stream_init(struct dummy_stream *s, struct stream *stream)
{
- struct netdev_dummy *dev;
+ int rxbuf_size = stream ? 2048 : 0;
+ s->stream = stream;
+ ofpbuf_init(&s->rxbuf, rxbuf_size);
+ list_init(&s->txq);
+}
- ovs_mutex_lock(&dummy_list_mutex);
- LIST_FOR_EACH (dev, list_node, &dummy_list) {
- size_t i;
+static void
+dummy_stream_wait(struct dummy_stream *s)
+{
+ stream_run_wait(s->stream);
+ if (!list_is_empty(&s->txq)) {
+ stream_send_wait(s->stream);
+ }
+ stream_recv_wait(s->stream);
+}
- ovs_mutex_lock(&dev->mutex);
+static void
+dummy_stream_send(struct dummy_stream *s, const void *buffer, size_t size)
+{
+ if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) {
+ struct ofpbuf *b;
- if (dev->pstream) {
- struct stream *new_stream;
- int error;
+ b = ofpbuf_clone_data_with_headroom(buffer, size, 2);
+ put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size));
+ list_push_back(&s->txq, &b->list_node);
+ }
+}
- error = pstream_accept(dev->pstream, &new_stream);
- if (!error) {
- struct dummy_stream *s;
-
- dev->streams = xrealloc(dev->streams,
- ((dev->n_streams + 1)
- * sizeof *dev->streams));
- s = &dev->streams[dev->n_streams++];
- s->stream = new_stream;
- ofpbuf_init(&s->rxbuf, 2048);
- list_init(&s->txq);
- } else if (error != EAGAIN) {
- VLOG_WARN("%s: accept failed (%s)",
- pstream_get_name(dev->pstream), ovs_strerror(error));
- pstream_close(dev->pstream);
- dev->pstream = NULL;
+static int
+dummy_stream_run__(struct netdev_dummy *dev, struct dummy_stream* s)
+{
+ int error = 0;
+ size_t n;
+
+ stream_run(s->stream);
+
+ if (!list_is_empty(&s->txq)) {
+ struct ofpbuf *txbuf;
+ int retval;
+
+ txbuf = ofpbuf_from_list(list_front(&s->txq));
+ retval = stream_send(s->stream, txbuf->data, txbuf->size);
+ if (retval > 0) {
+ ofpbuf_pull(txbuf, retval);
+ if (!txbuf->size) {
+ list_remove(&txbuf->list_node);
+ ofpbuf_delete(txbuf);
}
+ } else if (retval != -EAGAIN) {
+ error = -retval;
}
+ }
- for (i = 0; i < dev->n_streams; i++) {
- struct dummy_stream *s = &dev->streams[i];
- int error = 0;
- size_t n;
-
- stream_run(s->stream);
-
- if (!list_is_empty(&s->txq)) {
- struct ofpbuf *txbuf;
- int retval;
-
- txbuf = ofpbuf_from_list(list_front(&s->txq));
- retval = stream_send(s->stream, txbuf->data, txbuf->size);
- if (retval > 0) {
- ofpbuf_pull(txbuf, retval);
- if (!txbuf->size) {
- list_remove(&txbuf->list_node);
- ofpbuf_delete(txbuf);
- }
- } else if (retval != -EAGAIN) {
- error = -retval;
- }
- }
-
- if (!error) {
- if (s->rxbuf.size < 2) {
- n = 2 - s->rxbuf.size;
- } else {
- uint16_t frame_len;
-
- frame_len = ntohs(get_unaligned_be16(s->rxbuf.data));
- if (frame_len < ETH_HEADER_LEN) {
- error = EPROTO;
- n = 0;
- } else {
- n = (2 + frame_len) - s->rxbuf.size;
- }
- }
+ if (!error) {
+ if (s->rxbuf.size < 2) {
+ n = 2 - s->rxbuf.size;
+ } else {
+ uint16_t frame_len;
+
+ frame_len = ntohs(get_unaligned_be16(s->rxbuf.data));
+ if (frame_len < ETH_HEADER_LEN) {
+ error = EPROTO;
+ n = 0;
+ } else {
+ n = (2 + frame_len) - s->rxbuf.size;
}
- if (!error) {
- int retval;
-
- ofpbuf_prealloc_tailroom(&s->rxbuf, n);
- retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n);
- if (retval > 0) {
- s->rxbuf.size += retval;
- if (retval == n && s->rxbuf.size > 2) {
- ofpbuf_pull(&s->rxbuf, 2);
- netdev_dummy_queue_packet(dev,
- ofpbuf_clone(&s->rxbuf));
- ofpbuf_clear(&s->rxbuf);
- }
- } else if (retval != -EAGAIN) {
- error = (retval < 0 ? -retval
- : s->rxbuf.size ? EPROTO
- : EOF);
- }
+ }
+ }
+ if (!error) {
+ int retval;
+
+ ofpbuf_prealloc_tailroom(&s->rxbuf, n);
+ retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n);
+ if (retval > 0) {
+ s->rxbuf.size += retval;
+ if (retval == n && s->rxbuf.size > 2) {
+ ofpbuf_pull(&s->rxbuf, 2);
+ netdev_dummy_queue_packet(dev,
+ ofpbuf_clone(&s->rxbuf));
+ ofpbuf_clear(&s->rxbuf);
}
+ } else if (retval != -EAGAIN) {
+ error = (retval < 0 ? -retval
+ : s->rxbuf.size ? EPROTO
+ : EOF);
+ }
+ }
- if (error) {
+ return error;
+}
+
+static void
+netdev_dummy_passive_stream_run__(struct netdev_dummy *dev)
+ OVS_REQUIRES(dev->mutex)
+{
+ struct stream *new_stream;
+ int error;
+ size_t i;
+
+ error = pstream_accept(dev->pstream, &new_stream);
+ if (!error) {
+ struct dummy_stream *s;
+
+ dev->streams = xrealloc(dev->streams,
+ ((dev->n_streams + 1)
+ * sizeof *dev->streams));
+ s = &dev->streams[dev->n_streams++];
+ dummy_stream_init(s, new_stream);
+ } else if (error != EAGAIN) {
+ VLOG_WARN("%s: accept failed (%s)",
+ pstream_get_name(dev->pstream), ovs_strerror(error));
+ pstream_close(dev->pstream);
+ dev->pstream = NULL;
+ }
+
+ for (i = 0; i < dev->n_streams; i++) {
+ struct dummy_stream *s = &dev->streams[i];
+
+ error = dummy_stream_run__(dev, s);
+ if (error) {
+ VLOG_DBG("%s: closing connection (%s)",
+ stream_get_name(s->stream),
+ ovs_retval_to_string(error));
+ dummy_stream_close(&dev->streams[i]);
+ dev->streams[i] = dev->streams[--dev->n_streams];
+ }
+ }
+}
+
+static void
+netdev_dummy_stream_run__(struct netdev_dummy *dev)
+ OVS_REQUIRES(dev->mutex)
+{
+
+ if (!dev->astream_connected) {
+ int err=stream_connect(dev->astream.stream);
+
+ switch (err) {
+ case 0: /* Connected. */
+ dev->astream_connected = true;
+ VLOG_DBG("%s: connected",
+ stream_get_name(dev->astream.stream));
+ break;
+
+ case EAGAIN:
+ return;
+
+ default:
VLOG_DBG("%s: closing connection (%s)",
- stream_get_name(s->stream),
- ovs_retval_to_string(error));
- dummy_stream_close(&dev->streams[i]);
- dev->streams[i] = dev->streams[--dev->n_streams];
- }
+ stream_get_name(dev->astream.stream),
+ ovs_strerror(err));
+
+ stream_close(dev->astream.stream);
+ dev->astream.stream = NULL;
+ return;
+ }
+ }
+
+ if (dev->astream_connected) {
+ int error;
+ error = dummy_stream_run__(dev, &dev->astream);
+ if (error) {
+ VLOG_DBG("%s: closing active connection (%s)",
+ stream_get_name(dev->astream.stream),
+ ovs_retval_to_string(error));
+ dummy_stream_close(&dev->astream);
+ dev->astream_connected = false;
+ }
+ }
+}
+
+static void
+netdev_dummy_run(void)
+{
+ struct netdev_dummy *dev;
+
+ ovs_mutex_lock(&dummy_list_mutex);
+ LIST_FOR_EACH (dev, list_node, &dummy_list) {
+
+ ovs_mutex_lock(&dev->mutex);
+
+ if (dev->pstream) {
+ netdev_dummy_passive_stream_run__(dev);
+ } else if (dev->astream.stream) {
+ netdev_dummy_stream_run__(dev);
}
ovs_mutex_unlock(&dev->mutex);
@@ -247,12 +340,10 @@ netdev_dummy_wait(void)
}
for (i = 0; i < dev->n_streams; i++) {
struct dummy_stream *s = &dev->streams[i];
-
- stream_run_wait(s->stream);
- if (!list_is_empty(&s->txq)) {
- stream_send_wait(s->stream);
- }
- stream_recv_wait(s->stream);
+ dummy_stream_wait(s);
+ }
+ if (dev->astream.stream) {
+ dummy_stream_wait(&dev->astream);
}
ovs_mutex_unlock(&dev->mutex);
}
@@ -292,6 +383,9 @@ netdev_dummy_construct(struct netdev *netdev_)
netdev->streams = NULL;
netdev->n_streams = 0;
+ dummy_stream_init(&netdev->astream, NULL);
+ netdev->astream_connected = false;
+
list_init(&netdev->rxes);
ovs_mutex_unlock(&netdev->mutex);
@@ -313,11 +407,15 @@ netdev_dummy_destruct(struct netdev *netdev_)
ovs_mutex_unlock(&dummy_list_mutex);
ovs_mutex_lock(&netdev->mutex);
+
pstream_close(netdev->pstream);
for (i = 0; i < netdev->n_streams; i++) {
dummy_stream_close(&netdev->streams[i]);
}
free(netdev->streams);
+
+ dummy_stream_close(&netdev->astream);
+
ovs_mutex_unlock(&netdev->mutex);
ovs_mutex_destroy(&netdev->mutex);
}
@@ -345,6 +443,10 @@ netdev_dummy_get_config(const struct netdev *netdev_, struct smap *args)
smap_add(args, "pstream", pstream_get_name(netdev->pstream));
}
+ if (netdev->astream.stream) {
+ smap_add(args, "stream", stream_get_name(netdev->astream.stream));
+ }
+
ovs_mutex_unlock(&netdev->mutex);
return 0;
}
@@ -353,7 +455,7 @@ static int
netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args)
{
struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
- const char *pstream;
+ const char *pstream, *stream;
const char *pcap;
ovs_mutex_lock(&netdev->mutex);
@@ -377,6 +479,39 @@ netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args)
}
}
+ stream = smap_get(args, "stream");
+ if(pstream
+ || !stream
+ || !netdev->astream.stream
+ || strcmp(stream_get_name(netdev->astream.stream), stream)) {
+ dummy_stream_close(&netdev->astream);
+ netdev->astream_connected = false;
+
+ if (stream && !pstream) {
+ int error;
+ struct stream *active_stream;
+
+ error = stream_open(stream, &active_stream, DSCP_DEFAULT);
+ dummy_stream_init(&netdev->astream, active_stream);
+
+ switch (error) {
+ case 0:
+ netdev->astream_connected = true;
+ VLOG_INFO("%s: connected", stream_get_name(active_stream));
+ break;
+
+ case EAGAIN:
+ break;
+
+ default:
+ VLOG_WARN("%s: open failed (%s)", stream,
+ ovs_strerror(error));
+ stream_close(active_stream);
+ break;
+ }
+ }
+ }
+
if (netdev->rx_pcap) {
fclose(netdev->rx_pcap);
}
@@ -546,17 +681,16 @@ netdev_dummy_send(struct netdev *netdev, const void *buffer, size_t size)
fflush(dev->tx_pcap);
}
- for (i = 0; i < dev->n_streams; i++) {
- struct dummy_stream *s = &dev->streams[i];
-
- if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) {
- struct ofpbuf *b;
+ if (dev->pstream) {
+ for (i = 0; i < dev->n_streams; i++) {
+ struct dummy_stream *s = &dev->streams[i];
- b = ofpbuf_clone_data_with_headroom(buffer, size, 2);
- put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size));
- list_push_back(&s->txq, &b->list_node);
+ dummy_stream_send(s, buffer, size);
}
+ } else if (dev->astream_connected) {
+ dummy_stream_send(&dev->astream, buffer, size);
}
+
ovs_mutex_unlock(&dev->mutex);
return 0;
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index 4d8d460..7cf10ab 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -1,5 +1,51 @@
AT_BANNER([ofproto-dpif])
+# Strips out uninteresting parts of flow output, as well as parts
+# that vary from one run to another (e.g., timing and bond actions).
+m4_define([STRIP_USED], [[sed '
+ s/used:[0-9]*\.[0-9]*/used:0.0/
+' | sort]])
+m4_define([STRIP_XOUT], [[sed '
+ s/used:[0-9]*\.[0-9]*/used:0.0/
+ s/actions:.*/actions: <del>/
+ s/packets:[0-9]*/packets:0/
+ s/bytes:[0-9]*/bytes:0/
+' | sort]])
+
+AT_SETUP([ofproto-dpif - dummy interface])
+# Create br0 with interfaces p1 and p7
+# and br1 with interfaces p2 and p8
+# with p1 and p2 connected via unix domain socket
+OVS_VSWITCHD_START(
+ [add-port br0 p1 -- set interface p1 type=dummy options:pstream=punix:$OVS_RUNDIR/p0.sock ofport_request=1 -- \
+ add-port br0 p7 -- set interface p7 ofport_request=7 type=dummy -- \
+ add-br br1 -- \
+ set bridge br1 other-config:hwaddr=aa:66:aa:66:00:00 -- \
+ set bridge br1 datapath-type=dummy other-config:datapath-id=1234 \
+ fail-mode=secure -- \
+ add-port br1 p2 -- set interface p2 type=dummy options:stream=unix:$OVS_RUNDIR/p0.sock ofport_request=2 -- \
+ add-port br1 p8 -- set interface p8 ofport_request=8 type=dummy --])
+
+AT_CHECK([ovs-ofctl add-flow br0 action=normal])
+AT_CHECK([ovs-ofctl add-flow br1 action=normal])
+ovs-appctl time/stop
+ovs-appctl time/warp 5000
+AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)'])
+AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)'])
+ovs-appctl time/warp 100
+
+AT_CHECK([ovs-appctl dpif/dump-flows br0 | STRIP_XOUT], [0], [dnl
+skb_priority(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+skb_priority(0),in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+])
+
+AT_CHECK([ovs-appctl dpif/dump-flows br1 | STRIP_XOUT], [0], [dnl
+skb_priority(0),in_port(2),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+skb_priority(0),in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+])
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
AT_SETUP([ofproto-dpif - resubmit])
OVS_VSWITCHD_START
ADD_OF_PORTS([br0], [1], [10], [11], [12], [13], [14], [15],
@@ -2527,18 +2573,6 @@ AT_CLEANUP
dnl ----------------------------------------------------------------------
AT_BANNER([ofproto-dpif -- megaflows])
-# Strips out uninteresting parts of megaflow output, as well as parts
-# that vary from one run to another (e.g., timing and bond actions).
-m4_define([STRIP_USED], [[sed '
- s/used:[0-9]*\.[0-9]*/used:0.0/
-' | sort]])
-m4_define([STRIP_XOUT], [[sed '
- s/used:[0-9]*\.[0-9]*/used:0.0/
- s/actions:.*/actions: <del>/
- s/packets:[0-9]*/packets:0/
- s/bytes:[0-9]*/bytes:0/
-' | sort]])
-
AT_SETUP([ofproto-dpif megaflow - port classification])
OVS_VSWITCHD_START
ADD_OF_PORTS([br0], [1], [2])
--
1.7.9.5
More information about the dev
mailing list