[ovs-dev] [PATCH] netdev-dummy: Add support for active stream

Andy Zhou azhou at nicira.com
Thu Dec 19 23:22:21 UTC 2013


The netdev-dummy thus far only support passive connection. It will
listen for incoming connections requests. This patch allows
active stream to be configured as well.

This patch effectively allows a netdev-dummy port make point-to-point
connection to another, without being a patch port. This feature will
be useful in test cases for future commits.

Signed-off-by: Andy Zhou <azhou at nicira.com>
---
 lib/netdev-dummy.c    |  338 ++++++++++++++++++++++++++++++++++---------------
 tests/ofproto-dpif.at |   58 +++++++--
 2 files changed, 282 insertions(+), 114 deletions(-)

diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index fd30454..856af1c 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -69,10 +69,15 @@ struct netdev_dummy {
     unsigned int change_seq OVS_GUARDED;
     int ifindex OVS_GUARDED;
 
+    /* Passive connections states. */
     struct pstream *pstream OVS_GUARDED;
     struct dummy_stream *streams OVS_GUARDED;
     size_t n_streams OVS_GUARDED;
 
+    /* Active connection states. */
+    struct dummy_stream astream OVS_GUARDED;
+    bool astream_connected OVS_GUARDED;
+
     FILE *tx_pcap, *rx_pcap OVS_GUARDED;
 
     struct list rxes OVS_GUARDED; /* List of child "netdev_rx_dummy"s. */
@@ -118,105 +123,193 @@ netdev_rx_dummy_cast(const struct netdev_rx *rx)
 }
 
 static void
-netdev_dummy_run(void)
+dummy_stream_init(struct dummy_stream *s, struct stream *stream)
 {
-    struct netdev_dummy *dev;
+    int rxbuf_size = stream ? 2048 : 0;
+    s->stream = stream;
+    ofpbuf_init(&s->rxbuf, rxbuf_size);
+    list_init(&s->txq);
+}
 
-    ovs_mutex_lock(&dummy_list_mutex);
-    LIST_FOR_EACH (dev, list_node, &dummy_list) {
-        size_t i;
+static void
+dummy_stream_wait(struct dummy_stream *s)
+{
+    stream_run_wait(s->stream);
+    if (!list_is_empty(&s->txq)) {
+        stream_send_wait(s->stream);
+    }
+    stream_recv_wait(s->stream);
+}
 
-        ovs_mutex_lock(&dev->mutex);
+static void
+dummy_stream_send(struct dummy_stream *s, const void *buffer, size_t size)
+{
+    if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) {
+        struct ofpbuf *b;
 
-        if (dev->pstream) {
-            struct stream *new_stream;
-            int error;
+        b = ofpbuf_clone_data_with_headroom(buffer, size, 2);
+        put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size));
+        list_push_back(&s->txq, &b->list_node);
+    }
+}
 
-            error = pstream_accept(dev->pstream, &new_stream);
-            if (!error) {
-                struct dummy_stream *s;
-
-                dev->streams = xrealloc(dev->streams,
-                                        ((dev->n_streams + 1)
-                                         * sizeof *dev->streams));
-                s = &dev->streams[dev->n_streams++];
-                s->stream = new_stream;
-                ofpbuf_init(&s->rxbuf, 2048);
-                list_init(&s->txq);
-            } else if (error != EAGAIN) {
-                VLOG_WARN("%s: accept failed (%s)",
-                          pstream_get_name(dev->pstream), ovs_strerror(error));
-                pstream_close(dev->pstream);
-                dev->pstream = NULL;
+static int
+dummy_stream_run__(struct netdev_dummy *dev, struct dummy_stream* s)
+{
+    int error = 0;
+    size_t n;
+
+    stream_run(s->stream);
+
+    if (!list_is_empty(&s->txq)) {
+        struct ofpbuf *txbuf;
+        int retval;
+
+        txbuf = ofpbuf_from_list(list_front(&s->txq));
+        retval = stream_send(s->stream, txbuf->data, txbuf->size);
+        if (retval > 0) {
+            ofpbuf_pull(txbuf, retval);
+            if (!txbuf->size) {
+                list_remove(&txbuf->list_node);
+                ofpbuf_delete(txbuf);
             }
+        } else if (retval != -EAGAIN) {
+            error = -retval;
         }
+    }
 
-        for (i = 0; i < dev->n_streams; i++) {
-            struct dummy_stream *s = &dev->streams[i];
-            int error = 0;
-            size_t n;
-
-            stream_run(s->stream);
-
-            if (!list_is_empty(&s->txq)) {
-                struct ofpbuf *txbuf;
-                int retval;
-
-                txbuf = ofpbuf_from_list(list_front(&s->txq));
-                retval = stream_send(s->stream, txbuf->data, txbuf->size);
-                if (retval > 0) {
-                    ofpbuf_pull(txbuf, retval);
-                    if (!txbuf->size) {
-                        list_remove(&txbuf->list_node);
-                        ofpbuf_delete(txbuf);
-                    }
-                } else if (retval != -EAGAIN) {
-                    error = -retval;
-                }
-            }
-
-            if (!error) {
-                if (s->rxbuf.size < 2) {
-                    n = 2 - s->rxbuf.size;
-                } else {
-                    uint16_t frame_len;
-
-                    frame_len = ntohs(get_unaligned_be16(s->rxbuf.data));
-                    if (frame_len < ETH_HEADER_LEN) {
-                        error = EPROTO;
-                        n = 0;
-                    } else {
-                        n = (2 + frame_len) - s->rxbuf.size;
-                    }
-                }
+    if (!error) {
+        if (s->rxbuf.size < 2) {
+            n = 2 - s->rxbuf.size;
+        } else {
+            uint16_t frame_len;
+
+            frame_len = ntohs(get_unaligned_be16(s->rxbuf.data));
+            if (frame_len < ETH_HEADER_LEN) {
+                error = EPROTO;
+                n = 0;
+            } else {
+                n = (2 + frame_len) - s->rxbuf.size;
             }
-            if (!error) {
-                int retval;
-
-                ofpbuf_prealloc_tailroom(&s->rxbuf, n);
-                retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n);
-                if (retval > 0) {
-                    s->rxbuf.size += retval;
-                    if (retval == n && s->rxbuf.size > 2) {
-                        ofpbuf_pull(&s->rxbuf, 2);
-                        netdev_dummy_queue_packet(dev,
-                                                  ofpbuf_clone(&s->rxbuf));
-                        ofpbuf_clear(&s->rxbuf);
-                    }
-                } else if (retval != -EAGAIN) {
-                    error = (retval < 0 ? -retval
-                             : s->rxbuf.size ? EPROTO
-                             : EOF);
-                }
+        }
+    }
+    if (!error) {
+        int retval;
+
+        ofpbuf_prealloc_tailroom(&s->rxbuf, n);
+        retval = stream_recv(s->stream, ofpbuf_tail(&s->rxbuf), n);
+        if (retval > 0) {
+            s->rxbuf.size += retval;
+            if (retval == n && s->rxbuf.size > 2) {
+                ofpbuf_pull(&s->rxbuf, 2);
+                netdev_dummy_queue_packet(dev,
+                                          ofpbuf_clone(&s->rxbuf));
+                ofpbuf_clear(&s->rxbuf);
             }
+        } else if (retval != -EAGAIN) {
+            error = (retval < 0 ? -retval
+                     : s->rxbuf.size ? EPROTO
+                     : EOF);
+        }
+    }
 
-            if (error) {
+    return error;
+}
+
+static void
+netdev_dummy_passive_stream_run__(struct netdev_dummy *dev)
+    OVS_REQUIRES(dev->mutex)
+{
+    struct stream *new_stream;
+    int error;
+    size_t i;
+
+    error = pstream_accept(dev->pstream, &new_stream);
+    if (!error) {
+        struct dummy_stream *s;
+
+        dev->streams = xrealloc(dev->streams,
+                                ((dev->n_streams + 1)
+                                 * sizeof *dev->streams));
+        s = &dev->streams[dev->n_streams++];
+        dummy_stream_init(s, new_stream);
+    } else if (error != EAGAIN) {
+        VLOG_WARN("%s: accept failed (%s)",
+                  pstream_get_name(dev->pstream), ovs_strerror(error));
+        pstream_close(dev->pstream);
+        dev->pstream = NULL;
+    }
+
+    for (i = 0; i < dev->n_streams; i++) {
+        struct dummy_stream *s = &dev->streams[i];
+
+        error = dummy_stream_run__(dev, s);
+        if (error) {
+            VLOG_DBG("%s: closing connection (%s)",
+                     stream_get_name(s->stream),
+                     ovs_retval_to_string(error));
+            dummy_stream_close(&dev->streams[i]);
+            dev->streams[i] = dev->streams[--dev->n_streams];
+        }
+    }
+}
+
+static void
+netdev_dummy_stream_run__(struct netdev_dummy *dev)
+    OVS_REQUIRES(dev->mutex)
+{
+
+    if (!dev->astream_connected) {
+        int err=stream_connect(dev->astream.stream);
+
+        switch (err) {
+            case 0: /* Connected. */
+                dev->astream_connected = true;
+                VLOG_DBG("%s: connected",
+                          stream_get_name(dev->astream.stream));
+                break;
+
+            case EAGAIN:
+                return;
+
+            default:
                 VLOG_DBG("%s: closing connection (%s)",
-                         stream_get_name(s->stream),
-                         ovs_retval_to_string(error));
-                dummy_stream_close(&dev->streams[i]);
-                dev->streams[i] = dev->streams[--dev->n_streams];
-            }
+                         stream_get_name(dev->astream.stream),
+                         ovs_strerror(err));
+
+                stream_close(dev->astream.stream);
+                dev->astream.stream = NULL;
+                return;
+        }
+    }
+
+    if (dev->astream_connected) {
+        int error;
+        error = dummy_stream_run__(dev, &dev->astream);
+        if (error) {
+            VLOG_DBG("%s: closing active connection (%s)",
+                     stream_get_name(dev->astream.stream),
+                     ovs_retval_to_string(error));
+            dummy_stream_close(&dev->astream);
+            dev->astream_connected = false;
+        }
+    }
+}
+
+static void
+netdev_dummy_run(void)
+{
+    struct netdev_dummy *dev;
+
+    ovs_mutex_lock(&dummy_list_mutex);
+    LIST_FOR_EACH (dev, list_node, &dummy_list) {
+
+        ovs_mutex_lock(&dev->mutex);
+
+        if (dev->pstream) {
+            netdev_dummy_passive_stream_run__(dev);
+        } else if (dev->astream.stream) {
+            netdev_dummy_stream_run__(dev);
         }
 
         ovs_mutex_unlock(&dev->mutex);
@@ -247,12 +340,10 @@ netdev_dummy_wait(void)
         }
         for (i = 0; i < dev->n_streams; i++) {
             struct dummy_stream *s = &dev->streams[i];
-
-            stream_run_wait(s->stream);
-            if (!list_is_empty(&s->txq)) {
-                stream_send_wait(s->stream);
-            }
-            stream_recv_wait(s->stream);
+            dummy_stream_wait(s);
+        }
+        if (dev->astream.stream) {
+            dummy_stream_wait(&dev->astream);
         }
         ovs_mutex_unlock(&dev->mutex);
     }
@@ -292,6 +383,9 @@ netdev_dummy_construct(struct netdev *netdev_)
     netdev->streams = NULL;
     netdev->n_streams = 0;
 
+    dummy_stream_init(&netdev->astream, NULL);
+    netdev->astream_connected = false;
+
     list_init(&netdev->rxes);
     ovs_mutex_unlock(&netdev->mutex);
 
@@ -313,11 +407,15 @@ netdev_dummy_destruct(struct netdev *netdev_)
     ovs_mutex_unlock(&dummy_list_mutex);
 
     ovs_mutex_lock(&netdev->mutex);
+
     pstream_close(netdev->pstream);
     for (i = 0; i < netdev->n_streams; i++) {
         dummy_stream_close(&netdev->streams[i]);
     }
     free(netdev->streams);
+
+    dummy_stream_close(&netdev->astream);
+
     ovs_mutex_unlock(&netdev->mutex);
     ovs_mutex_destroy(&netdev->mutex);
 }
@@ -345,6 +443,10 @@ netdev_dummy_get_config(const struct netdev *netdev_, struct smap *args)
         smap_add(args, "pstream", pstream_get_name(netdev->pstream));
     }
 
+    if (netdev->astream.stream) {
+        smap_add(args, "stream", stream_get_name(netdev->astream.stream));
+    }
+
     ovs_mutex_unlock(&netdev->mutex);
     return 0;
 }
@@ -353,7 +455,7 @@ static int
 netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args)
 {
     struct netdev_dummy *netdev = netdev_dummy_cast(netdev_);
-    const char *pstream;
+    const char *pstream, *stream;
     const char *pcap;
 
     ovs_mutex_lock(&netdev->mutex);
@@ -377,6 +479,39 @@ netdev_dummy_set_config(struct netdev *netdev_, const struct smap *args)
         }
     }
 
+    stream = smap_get(args, "stream");
+    if(pstream
+       || !stream
+       || !netdev->astream.stream
+       || strcmp(stream_get_name(netdev->astream.stream), stream)) {
+        dummy_stream_close(&netdev->astream);
+        netdev->astream_connected = false;
+
+        if (stream && !pstream) {
+            int error;
+            struct stream *active_stream;
+
+            error = stream_open(stream, &active_stream, DSCP_DEFAULT);
+            dummy_stream_init(&netdev->astream, active_stream);
+
+            switch (error) {
+                case 0:
+                    netdev->astream_connected = true;
+                    VLOG_INFO("%s: connected", stream_get_name(active_stream));
+                    break;
+
+                case EAGAIN:
+                    break;
+
+                default:
+                    VLOG_WARN("%s: open failed (%s)", stream,
+                              ovs_strerror(error));
+                    stream_close(active_stream);
+                    break;
+            }
+        }
+    }
+
     if (netdev->rx_pcap) {
         fclose(netdev->rx_pcap);
     }
@@ -546,17 +681,16 @@ netdev_dummy_send(struct netdev *netdev, const void *buffer, size_t size)
         fflush(dev->tx_pcap);
     }
 
-    for (i = 0; i < dev->n_streams; i++) {
-        struct dummy_stream *s = &dev->streams[i];
-
-        if (list_size(&s->txq) < NETDEV_DUMMY_MAX_QUEUE) {
-            struct ofpbuf *b;
+    if (dev->pstream) {
+        for (i = 0; i < dev->n_streams; i++) {
+            struct dummy_stream *s = &dev->streams[i];
 
-            b = ofpbuf_clone_data_with_headroom(buffer, size, 2);
-            put_unaligned_be16(ofpbuf_push_uninit(b, 2), htons(size));
-            list_push_back(&s->txq, &b->list_node);
+            dummy_stream_send(s, buffer, size);
         }
+    } else if (dev->astream_connected) {
+            dummy_stream_send(&dev->astream, buffer, size);
     }
+
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
diff --git a/tests/ofproto-dpif.at b/tests/ofproto-dpif.at
index 4d8d460..7cf10ab 100644
--- a/tests/ofproto-dpif.at
+++ b/tests/ofproto-dpif.at
@@ -1,5 +1,51 @@
 AT_BANNER([ofproto-dpif])
 
+# Strips out uninteresting parts of flow output, as well as parts
+# that vary from one run to another (e.g., timing and bond actions).
+m4_define([STRIP_USED], [[sed '
+    s/used:[0-9]*\.[0-9]*/used:0.0/
+' | sort]])
+m4_define([STRIP_XOUT], [[sed '
+    s/used:[0-9]*\.[0-9]*/used:0.0/
+    s/actions:.*/actions: <del>/
+    s/packets:[0-9]*/packets:0/
+    s/bytes:[0-9]*/bytes:0/
+' | sort]])
+
+AT_SETUP([ofproto-dpif - dummy interface])
+# Create br0 with interfaces p1 and p7
+#    and br1 with interfaces p2 and p8
+# with p1 and p2 connected via unix domain socket
+OVS_VSWITCHD_START(
+  [add-port br0 p1 -- set interface p1 type=dummy options:pstream=punix:$OVS_RUNDIR/p0.sock ofport_request=1 -- \
+   add-port br0 p7 -- set interface p7 ofport_request=7 type=dummy -- \
+   add-br br1 -- \
+   set bridge br1 other-config:hwaddr=aa:66:aa:66:00:00 -- \
+   set bridge br1 datapath-type=dummy other-config:datapath-id=1234 \
+                  fail-mode=secure -- \
+   add-port br1 p2 -- set interface p2 type=dummy options:stream=unix:$OVS_RUNDIR/p0.sock ofport_request=2 -- \
+   add-port br1 p8 -- set interface p8 ofport_request=8 type=dummy --])
+
+AT_CHECK([ovs-ofctl add-flow br0 action=normal])
+AT_CHECK([ovs-ofctl add-flow br1 action=normal])
+ovs-appctl time/stop
+ovs-appctl time/warp 5000
+AT_CHECK([ovs-appctl netdev-dummy/receive p7 'in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2,dst=10.0.0.1,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)'])
+AT_CHECK([ovs-appctl netdev-dummy/receive p8 'in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3,dst=10.0.0.4,proto=1,tos=0,ttl=64,frag=no),icmp(type=8,code=0)'])
+ovs-appctl time/warp 100
+
+AT_CHECK([ovs-appctl dpif/dump-flows br0 | STRIP_XOUT], [0], [dnl
+skb_priority(0),in_port(1),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+skb_priority(0),in_port(7),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+])
+
+AT_CHECK([ovs-appctl dpif/dump-flows br1 | STRIP_XOUT], [0], [dnl
+skb_priority(0),in_port(2),eth(src=50:54:00:00:00:09,dst=50:54:00:00:00:0a),eth_type(0x0800),ipv4(src=10.0.0.2/0.0.0.0,dst=10.0.0.1/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+skb_priority(0),in_port(8),eth(src=50:54:00:00:00:0b,dst=50:54:00:00:00:0c),eth_type(0x0800),ipv4(src=10.0.0.3/0.0.0.0,dst=10.0.0.4/0.0.0.0,proto=1/0,tos=0/0,ttl=64/0,frag=no/0xff), packets:0, bytes:0, used:0.0s, actions: <del>
+])
+OVS_VSWITCHD_STOP
+AT_CLEANUP
+
 AT_SETUP([ofproto-dpif - resubmit])
 OVS_VSWITCHD_START
 ADD_OF_PORTS([br0], [1], [10], [11], [12], [13], [14], [15],
@@ -2527,18 +2573,6 @@ AT_CLEANUP
 dnl ----------------------------------------------------------------------
 AT_BANNER([ofproto-dpif -- megaflows])
 
-# Strips out uninteresting parts of megaflow output, as well as parts
-# that vary from one run to another (e.g., timing and bond actions).
-m4_define([STRIP_USED], [[sed '
-    s/used:[0-9]*\.[0-9]*/used:0.0/
-' | sort]])
-m4_define([STRIP_XOUT], [[sed '
-    s/used:[0-9]*\.[0-9]*/used:0.0/
-    s/actions:.*/actions: <del>/
-    s/packets:[0-9]*/packets:0/
-    s/bytes:[0-9]*/bytes:0/
-' | sort]])
-
 AT_SETUP([ofproto-dpif megaflow - port classification])
 OVS_VSWITCHD_START
 ADD_OF_PORTS([br0], [1], [2])
-- 
1.7.9.5




More information about the dev mailing list