[ovs-dev] [PATCH, v5, 1/1] utilities: a top like tool for ovs-dpctl dump-flows
Mark Hamilton
mhamilton at vmware.com
Fri Sep 13 18:00:19 UTC 2013
Gurucharan,
In regards to the argparse, how is it done today for the various scripts that use argparse. ovs-test for example simple does import argparse. So I'm unclear on what I need to do for this ovs-dpctl-top script beyond the user adding the compat folder to their PYTHONPATH.
When you say it looks like my usage of python-argparse is not compatible.
I tried:
ssh root at 10.24.122.93
export PYTHONPATH=/usr/share/openvswitch/python
python ovs-dpctl-top.in --accumulate
This works. Are you seeing a specific error when you run ovs-dpctl-top?
By "maintain order", you mean alphabetically?
-Mark
----- Original Message -----
From: "Gurucharan Shetty" <shettyg at nicira.com>
To: "Mark Hamilton" <mhamilton at nicira.com>
Cc: "dev" <dev at openvswitch.org>
Sent: Friday, September 13, 2013 10:19:16 AM
Subject: Re: [ovs-dev] [PATCH, v5, 1/1] utilities: a top like tool for ovs-dpctl dump-flows
On Fri, Sep 13, 2013 at 8:55 AM, Mark Hamilton <mhamilton at nicira.com> wrote:
> This python script summarizes ovs-dpctl dump-flows content by aggregating
> the number of packets, total bytes and occurrence of the following fields:
> - Openflow in_port
> - Ethernet type
> - Source and destination MAC addresses
> - IP protocol
> - Source and destination IPv4 addresses
> - Source and destination IPv6 addresses
> - UDP and TCP destination port
> - Tunnel source and destination addresses
>
> Testing included confirming both mega-flows and non-megaflows are
> properly parsed. Bit masks are applied in the case of mega-flows
> prior to aggregation. Test --script parameter which runs in
> non-interactive mode. Tested syntax against python 2.4.3, 2.6 and 2.7.
> Confirmed script passes pep8 and pylint run as:
>
> pylint --disable=I0011 --include-id=y --reports=n
>
> This tool has been added to these distribution:
> - add ovs-dpctl-top to debian distribution
> - add ovs-dpctl-top to rpm distribution.
> - add ovs-dpctl-top to XenServer RPM.
I cannot get it to work on Xenserver right now. We do include
python-argparse in the OVS compat folder but it looks like your usage
of it is not compatible with the one we have it there.
>
> Signed-off-by: Mark Hamilton <mhamilton at nicira.com>
>
> ---
> Version 5
> - Changed Openflow in_port to the correct term Datapath in_port
> - Changed decay timer thread to a simplier form which works better
> across the various supported python versions.
>
> Version 4
> - Improve parser to handle nested parentheses.
> - Change code to be python2.4 compliant.
> - Remove netaddr module requirement.
> - Instead use core socket module.
> - Fix decay to remove flows and propagate new stats.
>
> Version 3
> - Removed trailing whitespaces and white space changes that prevented
> version 2 patch from applying cleanly.
>
> Version 2
> - Single character commands have been added to control how content is
> displayed.
> - An accumulate mode has been added which continuously collects and
> aggregates dump-flow content.
> - Instead of running through all accumulated flows which takes a long time
> when the flow count gets in the 10,000's, changes are calculated and used to
> update statistics.
> - The total number of bytes and packet average are displayed.
> - Given that a significant amount of space is dedicated to statistics a
> conciderable amount of work went into managing the output.
>
> debian/control | 2 +-
> debian/openvswitch-switch.install | 1 +
> debian/openvswitch-switch.manpages | 1 +
> manpages.mk | 4 +
> rhel/openvswitch.spec.in | 2 +
> utilities/automake.mk | 6 +
> utilities/ovs-dpctl-top.8.in | 140 +++
> utilities/ovs-dpctl-top.in | 1691 ++++++++++++++++++++++++++++++++++++
> xenserver/openvswitch-xen.spec.in | 2 +
> 9 files changed, 1848 insertions(+), 1 deletion(-)
> create mode 100644 utilities/ovs-dpctl-top.8.in
> create mode 100755 utilities/ovs-dpctl-top.in
>
> diff --git a/debian/control b/debian/control
> index fe58b31..46b5630 100644
> --- a/debian/control
> +++ b/debian/control
> @@ -66,7 +66,7 @@ Description: Open vSwitch common components
> Package: openvswitch-switch
> Architecture: linux-any
> Suggests: openvswitch-datapath-module
> -Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase
> +Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase, python-argparse
> Description: Open vSwitch switch implementations
> Open vSwitch is a production quality, multilayer, software-based,
> Ethernet virtual switch. It is designed to enable massive network
> diff --git a/debian/openvswitch-switch.install b/debian/openvswitch-switch.install
> index 4d7a15b..ab111be 100644
> --- a/debian/openvswitch-switch.install
> +++ b/debian/openvswitch-switch.install
> @@ -1,6 +1,7 @@
> usr/bin/ovs-dpctl
> usr/bin/ovs-pcap
> usr/bin/ovs-tcpundump
> +usr/bin/ovs-dpctl-top
Can we maintain the order here.
> usr/bin/ovs-vlan-test
> usr/bin/ovs-vsctl
> usr/bin/ovsdb-tool
> diff --git a/debian/openvswitch-switch.manpages b/debian/openvswitch-switch.manpages
> index a0a331c..3a9f8b7 100644
> --- a/debian/openvswitch-switch.manpages
> +++ b/debian/openvswitch-switch.manpages
> @@ -3,6 +3,7 @@ _debian/utilities/ovs-dpctl.8
> _debian/utilities/ovs-pcap.1
> _debian/utilities/ovs-tcpundump.1
> _debian/utilities/ovs-vlan-test.8
> +_debian/utilities/ovs-dpctl-top.8
> _debian/utilities/ovs-vsctl.8
> _debian/vswitchd/ovs-vswitchd.8
> vswitchd/ovs-vswitchd.conf.db.5
> diff --git a/manpages.mk b/manpages.mk
> index 263f2ea..2a34f04 100644
> --- a/manpages.mk
> +++ b/manpages.mk
> @@ -116,6 +116,10 @@ lib/vconn-active.man:
> lib/vconn-passive.man:
> lib/vlog.man:
>
> +utilities/ovs-dpctl-top.8: \
> + utilities/ovs-dpctl-top.8.in
> +utilities/ovs-dpctl-top.8.in:
> +
> utilities/ovs-dpctl.8: \
> utilities/ovs-dpctl.8.in \
> lib/common.man \
> diff --git a/rhel/openvswitch.spec.in b/rhel/openvswitch.spec.in
> index 0fd5200..2c0570e 100644
> --- a/rhel/openvswitch.spec.in
> +++ b/rhel/openvswitch.spec.in
> @@ -119,6 +119,7 @@ exit 0
> /usr/bin/ovs-pcap
> /usr/bin/ovs-pki
> /usr/bin/ovs-tcpundump
> +/usr/bin/ovs-dpctl-top
> /usr/bin/ovs-vlan-test
> /usr/bin/ovs-vsctl
> /usr/bin/ovsdb-client
> @@ -137,6 +138,7 @@ exit 0
> /usr/share/man/man8/ovs-bugtool.8.gz
> /usr/share/man/man8/ovs-ctl.8.gz
> /usr/share/man/man8/ovs-dpctl.8.gz
> +/usr/share/man/man8/ovs-dpctl-top.8.gz
> /usr/share/man/man8/ovs-ofctl.8.gz
> /usr/share/man/man8/ovs-parse-backtrace.8.gz
> /usr/share/man/man8/ovs-pki.8.gz
> diff --git a/utilities/automake.mk b/utilities/automake.mk
> index 9f2bb63..6f88d97 100644
> --- a/utilities/automake.mk
> +++ b/utilities/automake.mk
> @@ -10,6 +10,7 @@ bin_SCRIPTS += \
> utilities/ovs-l3ping \
> utilities/ovs-parse-backtrace \
> utilities/ovs-pcap \
> + utilities/ovs-dpctl-top \
> utilities/ovs-tcpundump \
> utilities/ovs-test \
> utilities/ovs-vlan-test
> @@ -30,6 +31,7 @@ EXTRA_DIST += \
> utilities/ovs-pcap.in \
> utilities/ovs-pki.in \
> utilities/ovs-save \
> + utilities/ovs-dpctl-top.in \
> utilities/ovs-tcpundump.in \
> utilities/ovs-test.in \
> utilities/ovs-vlan-test.in
> @@ -44,6 +46,7 @@ MAN_ROOTS += \
> utilities/ovs-parse-backtrace.8 \
> utilities/ovs-pcap.1.in \
> utilities/ovs-pki.8.in \
> + utilities/ovs-dpctl-top.8.in \
> utilities/ovs-tcpundump.1.in \
> utilities/ovs-vlan-bug-workaround.8.in \
> utilities/ovs-test.8.in \
> @@ -66,6 +69,8 @@ DISTCLEANFILES += \
> utilities/ovs-pcap.1 \
> utilities/ovs-pki \
> utilities/ovs-pki.8 \
> + utilities/ovs-dpctl-top \
> + utilities/ovs-dpctl-top.8 \
> utilities/ovs-tcpundump \
> utilities/ovs-tcpundump.1 \
> utilities/ovs-test \
> @@ -85,6 +90,7 @@ man_MANS += \
> utilities/ovs-parse-backtrace.8 \
> utilities/ovs-pcap.1 \
> utilities/ovs-pki.8 \
> + utilities/ovs-dpctl-top.8 \
> utilities/ovs-tcpundump.1 \
> utilities/ovs-vlan-bug-workaround.8 \
> utilities/ovs-test.8 \
> diff --git a/utilities/ovs-dpctl-top.8.in b/utilities/ovs-dpctl-top.8.in
> new file mode 100644
> index 0000000..92facb2
> --- /dev/null
> +++ b/utilities/ovs-dpctl-top.8.in
> @@ -0,0 +1,140 @@
> +.de IQ
> +. br
> +. ns
> +. IP "\\$1"
> +..
> +.TH ovs\-dpctl\-top "8" "@VERSION@" "Open vSwitch" "Open vSwitch Manual"
> +.
> +.SH NAME
> +\fBovs\-dpctl\-top\fR \- Top like behavior for ovs\-dpctl dump\-flows
> +.
> +.SH SYNOPSIS
> +\fBovs\-dpctl\-top\fR [\-h] [\-v] [\-f FLOWFILES] [\-V] [\-s] [\-\-host HOST]
> +[\-a | \-\-accumulate] [\-\-accumulate\-decay ACCUMULATEDECAY] [\-d DELAY]
> +.
> +.SH DESCRIPTION
> +.PP
> +This program summarizes \fBovs\-dpctl\fR flow content by aggregating the number
> +of packets, total bytes and occurrence of the following fields:
> +.IP
> +\- Openflow in_port
> +.IP
> +\- Ethernet type
> +.IP
> +\- Source and destination MAC addresses
> +.IP
> +\- IP protocol
> +.IP
> +\- Source and destination IPv4 addresses
> +.IP
> +\- Source and destination IPv6 addresses
> +.IP
> +\- UDP and TCP destination port
> +.IP
> +\- Tunnel source and destination addresses
> +.
> +.SS "Output shows four values:"
> +.IP
> +\- FIELDS: the flow fields for example in_port(1).
> +.IP
> +\- COUNT: the number of lines in the dump\-flow output contain the flow field.
> +.IP
> +\- PACKETS: the total number of packets containing the flow field.
> +.IP
> +\- BYTES: the total number of bytes containing the flow field. If units are
> +not present then values are in bytes.
> +.IP
> +\- AVERAGE: the average packets size (BYTES/PACKET).
> +.PP
> +.SS "Top Behavior"
> +.PP
> +While in top mode, the default behavior, the following single character commands
> +are supported:
> +.IP
> +a \- toggles top in accumulate and live mode. Accumulate mode is described
> +below.
> +.IP
> +s \- toggles which column is used to sort content in decreasing order. A
> +DESC title is placed over the column.
> +.IP
> +_ \- a space indicating to collect dump\-flow content again
> +.IP
> +h \- halt output. Any character will restart sampling
> +.IP
> +f \- cycle through flow fields
> +.IP
> +q \- q for quit.
> +.PP
> +.SS "Accumulate Mode"
> +.PP
> +There are two supported modes: live and accumulate. The default is live.
> +The parameter \fB\-\-accumulate\fR or the 'a' character in top mode enables the
> +latter. In live mode, recent dump\-flow content is presented.
> +Where as accumulate mode keeps track of the prior historical
> +information until the flow is reset not when the flow is purged. Reset
> +flows are determined when the packet count for a flow has decreased from
> +its previous sample. There is one caveat, eventually the system will
> +run out of memory if, after the accumulate\-decay period any flows that
> +have not been refreshed are purged. The goal here is to free memory
> +of flows that are not active. Statistics are not decremented. Their purpose
> +is to reflect the overall history of the flow fields.
> +.PP
> +.SS "Debugging Errors"
> +.PP
> +Parsing errors are counted and displayed in the status line at the beginning
> +of the output. Use the \fB\-\-verbose\fR option with \fB\-\-script to see
> +what output was not parsed, like this:
> +.PP
> +$ ovs\-dpctl dump\-flows | ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-verbose\fR
> +.PP
> +Error messages will identify content that failed to parse.
> +.PP
> +.SS "Access Remote Hosts"
> +.PP
> +The \fB\-\-host\fR must follow the format user at hostname. This script simply
> +calls \&'ssh user at Hostname' without checking for login credentials therefore
> +public keys should be installed on the system identified by hostname, such as:
> +.PP
> +$ ssh\-copy\-id user at hostname
> +.PP
> +Consult ssh\-copy\-id man pages for more details.
> +.PP
> +.SS "Expected usage"
> +.PP
> +$ ovs\-dpctl\-top
> +.PP
> +or to run as a script:
> +.PP
> +$ ovs\-dpctl dump\-flows > dump\-flows.log
> +.PP
> +$ ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-flow\-file\fR dump\-flows.log
> +.SS "OPTIONS"
> +.TP
> +\fB\-h\fR, \fB\-\-help\fR
> +show this help message and exit.
> +.TP
> +\fB\-v\fR, \fB\-\-version\fR
> +show program's version number and exit.
> +.TP
> +\fB\-f\fR FLOWFILES, \fB\-\-flow\-file\fR FLOWFILES
> +file containing flows from ovs\-dpctl dump\-flow.
> +.TP
> +\fB\-V\fR, \fB\-\-verbose\fR
> +enable debug level verbosity.
> +.TP
> +\fB\-s\fR, \fB\-\-script\fR
> +Run from a script (no user interface).
> +.TP
> +\fB\-\-host\fR HOST
> +Specify a user at host for retrieving flows see Accessing
> +Remote Hosts for more information.
> +.TP
> +\fB\-a\fR, \fB\-\-accumulate\fR
> +Accumulate dump\-flow content.
> +.TP
> +\fB\-\-accumulate\-decay\fR ACCUMULATEDECAY
> +Decay old accumulated flows. The default is 5 minutes. A value of 0 disables
> +decay.
> +.TP
> +\fB\-d\fR DELAY, \fB\-\-delay\fR DELAY
> +Delay in milliseconds to collect dump\-flow content (sample rate).
> diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in
> new file mode 100755
> index 0000000..ddcc2e4
> --- /dev/null
> +++ b/utilities/ovs-dpctl-top.in
> @@ -0,0 +1,1691 @@
> +#! @PYTHON@
> +#
> +# Copyright (c) 2013 Nicira, Inc.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at:
> +#
> +# http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +#
> +#
> +# The approximate_size code was copied from
> +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
> +# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
> +# used under a Creative Commons Attribution-Share-Alike license:
> +# http://creativecommons.org/licenses/by-sa/3.0/
> +#
> +#
> +
> +"""Top like behavior for ovs-dpctl dump-flows output.
> +
> +This program summarizes ovs-dpctl flow content by aggregating the number
> +of packets, total bytes and occurrence of the following fields:
> +
> + - Datapath in_port
> +
> + - Ethernet type
> +
> + - Source and destination MAC addresses
> +
> + - IP protocol
> +
> + - Source and destination IPv4 addresses
> +
> + - Source and destination IPv6 addresses
> +
> + - UDP and TCP destination port
> +
> + - Tunnel source and destination addresses
> +
> +
> +Output shows four values:
> + - FIELDS: the flow fields for example in_port(1).
> +
> + - PACKETS: the total number of packets containing the flow field.
> +
> + - BYTES: the total number of bytes containing the flow field. If units are
> + not present then values are in bytes.
> +
> + - AVERAGE: the average packets size (BYTES/PACKET).
> +
> + - COUNT: the number of lines in the dump-flow output contain the flow field.
> +
> +Top Behavior
> +
> +While in top mode, the default behavior, the following single character
> +commands are supported:
> +
> + a - toggles top in accumulate and live mode. Accumulate mode is described
> + below.
> +
> + s - toggles which column is used to sort content in decreasing order. A
> + DESC title is placed over the column.
> +
> + _ - a space indicating to collect dump-flow content again
> +
> + h - halt output. Any character will restart sampling
> +
> + f - cycle through flow fields
> +
> + q - q for quit.
> +
> +Accumulate Mode
> +
> +There are two supported modes: live and accumulate. The default is live.
> +The parameter --accumulate or the 'a' character in top mode enables the
> +latter. In live mode, recent dump-flow content is presented.
> +Where as accumulate mode keeps track of the prior historical
> +information until the flow is reset not when the flow is purged. Reset
> +flows are determined when the packet count for a flow has decreased from
> +its previous sample. There is one caveat, eventually the system will
> +run out of memory if, after the accumulate-decay period any flows that
> +have not been refreshed are purged. The goal here is to free memory
> +of flows that are not active. Statistics are not decremented. Their purpose
> +is to reflect the overall history of the flow fields.
> +
> +
> +Debugging Errors
> +
> +Parsing errors are counted and displayed in the status line at the beginning
> +of the output. Use the --verbose option with --script to see what output
> + was not parsed, like this:
> +$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
> +
> +Error messages will identify content that failed to parse.
> +
> +
> +Access Remote Hosts
> +
> +The --host must follow the format user at hostname. This script simply calls
> +'ssh user at Hostname' without checking for login credentials therefore public
> +keys should be installed on the system identified by hostname, such as:
> +
> +$ ssh-copy-id user at hostname
> +
> +Consult ssh-copy-id man pages for more details.
> +
> +
> +Expected usage
> +
> +$ ovs-dpctl-top
> +
> +or to run as a script:
> +$ ovs-dpctl dump-flows > dump-flows.log
> +$ ovs-dpctl-top --script --flow-file dump-flows.log
> +
> +"""
> +
> +# pylint: disable-msg=C0103
> +# pylint: disable-msg=C0302
> +# pylint: disable-msg=R0902
> +# pylint: disable-msg=R0903
> +# pylint: disable-msg=R0904
> +# pylint: disable-msg=R0912
> +# pylint: disable-msg=R0913
> +# pylint: disable-msg=R0914
> +
> +import sys
> +import os
> +import argparse
> +import logging
> +import re
> +import unittest
> +import copy
> +import curses
> +import operator
> +import subprocess
> +#import netaddr
> +import fcntl
> +import struct
> +import termios
> +import datetime
> +import threading
> +import time
> +import socket
> +
> +
> +##
> +# The following two definitions provide the necessary netaddr functionality.
> +# Python netaddr module is not part of the core installation. Packaging
> +# netaddr was involved and seems inappropriate given that only two
> +# methods where used.
> +def ipv4_to_network(ip_str):
> + """ Calculate the network given a ipv4/mask value.
> + If a mask is not present simply return ip_str.
> + """
> + pack_length = '!HH'
> + try:
> + (ip, mask) = ip_str.split("/")
> + except ValueError:
> + # just an ip address no mask.
> + return ip_str
> +
> + ip_p = socket.inet_pton(socket.AF_INET, ip)
> + ip_t = struct.unpack(pack_length, ip_p)
> + mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
> + network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
> +
> + return socket.inet_ntop(socket.AF_INET,
> + struct.pack('!HH', network_n[0], network_n[1]))
> +
> +
> +def ipv6_to_network(ip_str):
> + """ Calculate the network given a ipv6/mask value.
> + If a mask is not present simply return ip_str.
> + """
> + pack_length = '!HHHHHHHH'
> + try:
> + (ip, mask) = ip_str.split("/")
> + except ValueError:
> + # just an ip address no mask.
> + return ip_str
> +
> + ip_p = socket.inet_pton(socket.AF_INET6, ip)
> + ip_t = struct.unpack(pack_length, ip_p)
> + mask_t = struct.unpack(pack_length,
> + socket.inet_pton(socket.AF_INET6, mask))
> + network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
> +
> + return socket.inet_ntop(socket.AF_INET6,
> + struct.pack(pack_length,
> + network_n[0], network_n[1],
> + network_n[2], network_n[3],
> + network_n[4], network_n[5],
> + network_n[6], network_n[7]))
> +
> +
> +##
> +# columns displayed
> +##
> +class Columns:
> + """ Holds column specific content.
> + Titles needs to be less than 8 characters.
> + """
> + VALUE_WIDTH = 9
> + FIELDS = "fields"
> + PACKETS = "packets"
> + COUNT = "count"
> + BYTES = "bytes"
> + AVERAGE = "average"
> +
> + def __init__(self):
> + pass
> +
> + @staticmethod
> + def assoc_list(obj):
> + """ Return a associated list. """
> + return [(Columns.FIELDS, repr(obj)),
> + (Columns.PACKETS, obj.packets),
> + (Columns.BYTES, obj.bytes),
> + (Columns.COUNT, obj.count),
> + (Columns.AVERAGE, obj.average),
> + ]
> +
> +
> +def element_eth_get(field_type, element, stats_dict):
> + """ Extract eth frame src and dst from a dump-flow element."""
> + fmt = "%s(src=%s,dst=%s)"
> +
> + element = fmt % (field_type, element["src"], element["dst"])
> + return SumData(field_type, element, stats_dict["packets"],
> + stats_dict["bytes"], element)
> +
> +
> +def element_ipv4_get(field_type, element, stats_dict):
> + """ Extract src and dst from a dump-flow element."""
> + fmt = "%s(src=%s,dst=%s)"
> + element_show = fmt % (field_type, element["src"], element["dst"])
> +
> + ##
> + # netaddr implementation.
> + #element_key = fmt % (field_type,
> + # netaddr.IPNetwork(element["src"]).ipv4().network,
> + # netaddr.IPNetwork(element["dst"]).ipv4().network)
> +
> + element_key = fmt % (field_type, ipv4_to_network(element["src"]),
> + ipv4_to_network(element["dst"]))
> +
> + return SumData(field_type, element_show, stats_dict["packets"],
> + stats_dict["bytes"], element_key)
> +
> +
> +def element_tunnel_get(field_type, element, stats_dict):
> + """ Extract src and dst from a tunnel."""
> + return element_ipv4_get(field_type, element, stats_dict)
> +
> +
> +def element_ipv6_get(field_type, element, stats_dict):
> + """ Extract src and dst from a dump-flow element."""
> +
> + fmt = "%s(src=%s,dst=%s)"
> + element_show = fmt % (field_type, element["src"], element["dst"])
> +
> + ##
> + # netaddr implementation
> + #src = str(netaddr.IPNetwork(element["src"]).ipv6().network)
> + #dst = str(netaddr.IPNetwork(element["dst"]).ipv6().network)
> + #element_key = fmt % (field_type, src, dst)
> +
> + element_key = fmt % (field_type, ipv6_to_network(element["src"]),
> + ipv6_to_network(element["dst"]))
> +
> + return SumData(field_type, element_show, stats_dict["packets"],
> + stats_dict["bytes"], element_key)
> +
> +
> +def element_dst_port_get(field_type, element, stats_dict):
> + """ Extract src and dst from a dump-flow element."""
> + element_key = "%s(dst=%s)" % (field_type, element["dst"])
> + return SumData(field_type, element_key, stats_dict["packets"],
> + stats_dict["bytes"], element_key)
> +
> +
> +def element_passthrough_get(field_type, element, stats_dict):
> + """ Extract src and dst from a dump-flow element."""
> + element_key = "%s(%s)" % (field_type, element)
> + return SumData(field_type, element_key,
> + stats_dict["packets"], stats_dict["bytes"], element_key)
> +
> +
> +# pylint: disable-msg=R0903
> +class OutputFormat:
> + """ Holds field_type and function to extract element value. """
> + def __init__(self, field_type, generator):
> + self.field_type = field_type
> + self.generator = generator
> +
> +OUTPUT_FORMAT = [
> + OutputFormat("eth", element_eth_get),
> + OutputFormat("ipv4", element_ipv4_get),
> + OutputFormat("ipv6", element_ipv6_get),
> + OutputFormat("tunnel", element_tunnel_get),
> + OutputFormat("udp", element_dst_port_get),
> + OutputFormat("tcp", element_dst_port_get),
> + OutputFormat("eth_type", element_passthrough_get),
> + OutputFormat("in_port", element_passthrough_get)
> + ]
> +
> +
> +ELEMENT_KEY = {
> + "udp": "udp.dst",
> + "tcp": "tcp.dst"
> + }
> +
> +
> +def top_input_get(args):
> + """ Return subprocess stdout."""
> + cmd = []
> + if (args.host):
> + cmd += ["ssh", args.host]
> + cmd += ["ovs-dpctl", "dump-flows"]
> +
> + return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
> + stdout=subprocess.PIPE).stdout
> +
> +
> +def args_get():
> + """ read program parameters handle any necessary validation of input. """
> +
> + parser = argparse.ArgumentParser(version="@VERSION@",
> + formatter_class=argparse.RawDescriptionHelpFormatter,
> + description=__doc__)
> + ##
> + # None is a special value indicating to read flows from stdin.
> + # This handles the case
> + # ovs-dpctl dump-flows | ovs-dpctl-flows.py
> + parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
> + action="append",
> + help="file containing flows from ovs-dpctl dump-flow")
> + parser.add_argument("-V", "--verbose", dest="verbose",
> + default=logging.CRITICAL,
> + action="store_const", const=logging.DEBUG,
> + help="enable debug level verbosity")
> + parser.add_argument("-s", "--script", dest="top", action="store_false",
> + help="Run from a script (no user interface)")
> + parser.add_argument("--host", dest="host",
> + help="Specify a user at host for retrieving flows see"
> + "Accessing Remote Hosts for more information")
> +
> + parser.add_argument("-a", "--accumulate", dest="accumulate",
> + action="store_true", default=False,
> + help="Accumulate dump-flow content")
> + parser.add_argument("--accumulate-decay", dest="accumulateDecay",
> + default=5.0 * 60, type=float,
> + help="Decay old accumulated flows. "
> + "The default is 5 minutes. "
> + "A value of 0 disables decay.")
> + parser.add_argument("-d", "--delay", dest="delay", type=int,
> + default=1000,
> + help="Delay in milliseconds to collect dump-flow "
> + "content (sample rate).")
> +
> + args = parser.parse_args()
> +
> + logging.basicConfig(level=args.verbose)
> +
> + return args
> +
> +###
> +# Code to parse a single line in dump-flow
> +###
> +# key(values)
> +FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
> +# key:value
> +FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
> +FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
> +
> +
> +def flow_line_iter(line):
> + """ iterate over flow dump elements.
> + return tuples of (true, element) or (false, remaining element)
> + """
> + # splits by , except for when in a (). Actions element was not
> + # split properly but we don't need it.
> + rc = []
> +
> + element = ""
> + paren_count = 0
> +
> + for ch in line:
> + if (ch == '('):
> + paren_count += 1
> + elif (ch == ')'):
> + paren_count -= 1
> +
> + if (ch == ' '):
> + # ignore white space.
> + continue
> + elif ((ch == ',') and (paren_count == 0)):
> + rc.append(element)
> + element = ""
> + else:
> + element += ch
> +
> + if (paren_count):
> + raise ValueError(line)
> + else:
> + if (len(element) > 0):
> + rc.append(element)
> + return rc
> +
> +
> +def flow_line_compound_parse(compound):
> + """ Parse compound element
> + for example
> + src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
> + which is in
> + eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
> + """
> + result = {}
> + for element in flow_line_iter(compound):
> + match = FIELDS_CMPND_ELEMENT.search(element)
> + if (match):
> + key = match.group(1)
> + value = match.group(2)
> + result[key] = value
> +
> + match = FIELDS_CMPND.search(element)
> + if (match):
> + key = match.group(1)
> + value = match.group(2)
> + result[key] = flow_line_compound_parse(value)
> + continue
> +
> + if (len(result.keys()) == 0):
> + return compound
> + return result
> +
> +
> +def flow_line_split(line):
> + """ Convert a flow dump line into ([fields], [stats], actions) tuple.
> + Where fields and stats are lists.
> + This function relies on a the following ovs-dpctl dump-flow
> + output characteristics:
> + 1. The dumpe flow line consists of a list of frame fields, list of stats
> + and action.
> + 2. list of frame fields, each stat and action field are delimited by ', '.
> + 3. That all other non stat field are not delimited by ', '.
> +
> + """
> +
> + results = re.split(', ', line)
> +
> + (field, stats, action) = (results[0], results[1:-1], results[-1])
> +
> + fields = flow_line_iter(field)
> + return (fields, stats, action)
> +
> +
> +def elements_to_dict(elements):
> + """ Convert line to a hierarchy of dictionaries. """
> + result = {}
> + for element in elements:
> + match = FIELDS_CMPND.search(element)
> + if (match):
> + key = match.group(1)
> + value = match.group(2)
> + result[key] = flow_line_compound_parse(value)
> + continue
> +
> + match = FIELDS_ELEMENT.search(element)
> + if (match):
> + key = match.group(1)
> + value = match.group(2)
> + result[key] = value
> + else:
> + raise ValueError("can't parse >%s<" % element)
> + return result
> +
> +
> +# pylint: disable-msg=R0903
> +class SumData(object):
> + """ Interface that all data going into SumDb must implement.
> + Holds the flow field and its corresponding count, total packets,
> + total bytes and calculates average.
> +
> + __repr__ is used as key into SumData singleton.
> + __str__ is used as human readable output.
> + """
> +
> + def __init__(self, field_type, field, packets, flow_bytes, key):
> + # Count is the number of lines in the dump-flow log.
> + self.field_type = field_type
> + self.field = field
> + self.count = 1
> + self.packets = int(packets)
> + self.bytes = int(flow_bytes)
> + self.key = key
> +
> + def decrement(self, decr_packets, decr_bytes, decr_count):
> + """ Decrement content to calculate delta from previous flow sample."""
> + self.packets -= decr_packets
> + self.bytes -= decr_bytes
> + self.count -= decr_count
> +
> + def __iadd__(self, other):
> + """ Add two objects. """
> +
> + if (self.key != other.key):
> + raise ValueError("adding two unrelated types")
> +
> + self.count += other.count
> + self.packets += other.packets
> + self.bytes += other.bytes
> + return self
> +
> + def __isub__(self, other):
> + """ Decrement two objects. """
> +
> + if (self.key != other.key):
> + raise ValueError("adding two unrelated types")
> +
> + self.count -= other.count
> + self.packets -= other.packets
> + self.bytes -= other.bytes
> + return self
> +
> + def __getattr__(self, name):
> + """ Handle average. """
> + if (name == "average"):
> + if (self.packets == 0):
> + return float(0.0)
> + else:
> + return float(self.bytes) / float(self.packets)
> + raise AttributeError(name)
> +
> + def __str__(self):
> + """ Used for debugging. """
> + return "%s %s %s %s" % (self.field, self.count,
> + self.packets, self.bytes)
> +
> + def __repr__(self):
> + """ Used as key in the FlowDB table. """
> + return self.key
> +
> +
> +def flow_aggregate(fields_dict, stats_dict):
> + """ Search for content in a line.
> + Passed the flow port of the dump-flows plus the current stats consisting
> + of packets, bytes, etc
> + """
> + result = []
> +
> + for output_format in OUTPUT_FORMAT:
> + field = fields_dict.get(output_format.field_type, None)
> + if (field):
> + obj = output_format.generator(output_format.field_type,
> + field, stats_dict)
> + result.append(obj)
> +
> + return result
> +
> +
> +def flows_read(ihdl, flow_db):
> + """ read flow content from ihdl and insert into flow_db. """
> +
> + done = False
> + while (not done):
> + line = ihdl.readline()
> + if (len(line) == 0):
> + # end of input
> + break
> +
> + try:
> + flow_db.flow_line_add(line)
> + except ValueError, arg:
> + logging.error(arg)
> +
> + return flow_db
> +
> +
> +def get_terminal_size():
> + """
> + return column width and height of the terminal
> + """
> + for fd_io in [0, 1, 2]:
> + try:
> + result = struct.unpack('hh',
> + fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
> + '1234'))
> + except IOError:
> + result = None
> + continue
> +
> + if (result is None or result == (0, 0)):
> + # Maybe we can't get the width. In that case assume (25, 80)
> + result = (25, 80)
> +
> + return result
> +
> +##
> +# Content derived from:
> +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
> +##
> +SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
> + 1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
> +
> +
> +def approximate_size(size, a_kilobyte_is_1024_bytes=True):
> + """Convert a file size to human-readable form.
> +
> + Keyword arguments:
> + size -- file size in bytes
> + a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
> + if False, use multiples of 1000
> +
> + Returns: string
> +
> + """
> + size = float(size)
> + if size < 0:
> + raise ValueError('number must be non-negative')
> +
> + if (a_kilobyte_is_1024_bytes):
> + multiple = 1024
> + else:
> + multiple = 1000
> + for suffix in SUFFIXES[multiple]:
> + size /= multiple
> + if size < multiple:
> + return '{0:.1f} {1}'.format(size, suffix)
> +
> + raise ValueError('number too large')
> +
> +
> +##
> +# End copied content
> +##
> +class ColMeta:
> + """ Concepts about columns. """
> + def __init__(self, sortable, width):
> + self.sortable = sortable
> + self.width = width
> +
> +
> +class RowMeta:
> + """ How to render rows. """
> + def __init__(self, label, fmt):
> + self.label = label
> + self.fmt = fmt
> +
> +
> +def fmt_packet(obj, width):
> + """ Provide a string for packets that is appropriate for output."""
> + return str(obj.packets).rjust(width)
> +
> +
> +def fmt_count(obj, width):
> + """ Provide a string for average that is appropriate for output."""
> + return str(obj.count).rjust(width)
> +
> +
> +def fmt_avg(obj, width):
> + """ Provide a string for average that is appropriate for output."""
> + return str(int(obj.average)).rjust(width)
> +
> +
> +def fmt_field(obj, width):
> + """ truncate really long flow and insert ellipses to help make it
> + clear.
> + """
> +
> + ellipses = " ... "
> + value = obj.field
> + if (len(obj.field) > width):
> + value = value[:(width - len(ellipses))] + ellipses
> + return value.ljust(width)
> +
> +
> +def fmt_bytes(obj, width):
> + """ Provide a string for average that is appropriate for output."""
> + if (len(str(obj.bytes)) <= width):
> + value = str(obj.bytes)
> + else:
> + value = approximate_size(obj.bytes)
> + return value.rjust(width)
> +
> +
> +def title_center(value, width):
> + """ Center a column title."""
> + return value.upper().center(width)
> +
> +
> +def title_rjust(value, width):
> + """ Right justify a column title. """
> + return value.upper().rjust(width)
> +
> +
> +def column_picker(order, obj):
> + """ return the column as specified by order. """
> + if (order == 1):
> + return obj.count
> + elif (order == 2):
> + return obj.packets
> + elif (order == 3):
> + return obj.bytes
> + elif (order == 4):
> + return obj.average
> + else:
> + raise ValueError("order outside of range %s" % order)
> +
> +
> +class Render:
> + """ Renders flow data. """
> + def __init__(self, console_width):
> + """ Calculate column widths taking into account changes in format."""
> +
> + self._start_time = datetime.datetime.now()
> +
> + self._cols = [ColMeta(False, 0),
> + ColMeta(True, Columns.VALUE_WIDTH),
> + ColMeta(True, Columns.VALUE_WIDTH),
> + ColMeta(True, Columns.VALUE_WIDTH),
> + ColMeta(True, Columns.VALUE_WIDTH)]
> + self._console_width = console_width
> + self.console_width_set(console_width)
> +
> + # Order in this array dictate the order of the columns.
> + # The 0 width for the first entry is a place holder. This is
> + # dynamically calculated. The first column is special. We need a
> + # way to indicate which field are presented.
> + self._descs = [RowMeta("", title_rjust),
> + RowMeta("", title_rjust),
> + RowMeta("", title_rjust),
> + RowMeta("", title_rjust),
> + RowMeta("", title_rjust)]
> + self._column_sort_select = 0
> + self.column_select_event()
> +
> + self._titles = [
> + RowMeta(Columns.FIELDS, title_center),
> + RowMeta(Columns.COUNT, title_rjust),
> + RowMeta(Columns.PACKETS, title_rjust),
> + RowMeta(Columns.BYTES, title_rjust),
> + RowMeta(Columns.AVERAGE, title_rjust)
> + ]
> +
> + self._datas = [
> + RowMeta(None, fmt_field),
> + RowMeta(None, fmt_count),
> + RowMeta(None, fmt_packet),
> + RowMeta(None, fmt_bytes),
> + RowMeta(None, fmt_avg)
> + ]
> +
> + ##
> + # _field_types hold which fields are displayed in the field
> + # column, with the keyword all implying all fields.
> + ##
> + self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
> +
> + ##
> + # The default is to show all field types.
> + ##
> + self._field_type_select = -1
> + self.field_type_toggle()
> +
> + def _field_type_select_get(self):
> + """ Return which field type to display. """
> + return self._field_types[self._field_type_select]
> +
> + def field_type_toggle(self):
> + """ toggle which field types to show. """
> + self._field_type_select += 1
> + if (self._field_type_select >= len(self._field_types)):
> + self._field_type_select = 0
> + value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
> + self._titles[0].label = value
> +
> + def column_select_event(self):
> + """ Handles column select toggle. """
> +
> + self._descs[self._column_sort_select].label = ""
> + for _ in range(len(self._cols)):
> + self._column_sort_select += 1
> + if (self._column_sort_select >= len(self._cols)):
> + self._column_sort_select = 0
> +
> + # Now look for the next sortable column
> + if (self._cols[self._column_sort_select].sortable):
> + break
> + self._descs[self._column_sort_select].label = "DESC"
> +
> + def console_width_set(self, console_width):
> + """ Adjust the output given the new console_width. """
> + self._console_width = console_width
> +
> + spaces = len(self._cols) - 1
> + ##
> + # Calculating column width can be tedious but important. The
> + # flow field value can be long. The goal here is to dedicate
> + # fixed column space for packets, bytes, average and counts. Give the
> + # remaining space to the flow column. When numbers get large
> + # transition output to output generated by approximate_size which
> + # limits output to ###.# XiB in other words 9 characters.
> + ##
> + # At this point, we know the maximum length values. We may
> + # truncate the flow column to get everything to fit.
> + self._cols[0].width = 0
> + values_max_length = sum([ii.width for ii in self._cols]) + spaces
> + flow_max_length = console_width - values_max_length
> + self._cols[0].width = flow_max_length
> +
> + def format(self, flow_db):
> + """ shows flows based on --script parameter."""
> +
> + rc = []
> + ##
> + # Top output consists of
> + # Title
> + # Column title (2 rows)
> + # data
> + # statistics and status
> +
> + ##
> + # Title
> + ##
> + rc.append("Flow Summary".center(self._console_width))
> +
> + stats = " Total: %(flow_total)s errors: %(flow_errors)s " % \
> + flow_db.flow_stats_get()
> + accumulate = flow_db.accumulate_get()
> + if (accumulate):
> + stats += "Accumulate: on "
> + else:
> + stats += "Accumulate: off "
> +
> + duration = datetime.datetime.now() - self._start_time
> + stats += "Duration: %s " % str(duration)
> + rc.append(stats.ljust(self._console_width))
> +
> + ##
> + # 2 rows for columns.
> + ##
> + # Indicate which column is in descending order.
> + rc.append(" ".join([ii.fmt(ii.label, col.width)
> + for (ii, col) in zip(self._descs, self._cols)]))
> +
> + rc.append(" ".join([ii.fmt(ii.label, col.width)
> + for (ii, col) in zip(self._titles, self._cols)]))
> +
> + ##
> + # Data.
> + ##
> + for dd in flow_db.field_values_in_order(self._field_type_select_get(),
> + self._column_sort_select):
> + rc.append(" ".join([ii.fmt(dd, col.width)
> + for (ii, col) in zip(self._datas,
> + self._cols)]))
> +
> + return rc
> +
> +
> +def curses_screen_begin():
> + """ begin curses screen control. """
> + stdscr = curses.initscr()
> + curses.cbreak()
> + curses.noecho()
> + stdscr.keypad(1)
> + return stdscr
> +
> +
> +def curses_screen_end(stdscr):
> + """ end curses screen control. """
> + curses.nocbreak()
> + stdscr.keypad(0)
> + curses.echo()
> + curses.endwin()
> +
> +
> +class FlowDB:
> + """ Implements live vs accumulate mode.
> +
> + Flows are stored as key value pairs. The key consists of the content
> + prior to stat fields. The value portion consists of stats in a dictionary
> + form.
> +
> + @ \todo future add filtering here.
> + """
> + def __init__(self, accumulate):
> + self._accumulate = accumulate
> + self._error_count = 0
> + # Values are (stats, last update time.)
> + # The last update time is used for aging.
> + self._flow_lock = threading.Lock()
> + # This dictionary holds individual flows.
> + self._flows = {}
> + # This dictionary holds aggregate of flow fields.
> + self._fields = {}
> +
> + def accumulate_get(self):
> + """ Return the current accumulate state. """
> + return self._accumulate
> +
> + def accumulate_toggle(self):
> + """ toggle accumulate flow behavior. """
> + self._accumulate = not self._accumulate
> +
> + def begin(self):
> + """ Indicate the beginning of processing flow content.
> + if accumulate is false clear current set of flows. """
> +
> + if (not self._accumulate):
> + self._flow_lock.acquire()
> + try:
> + self._flows.clear()
> + finally:
> + self._flow_lock.release()
> + self._fields.clear()
> +
> + def flow_line_add(self, line):
> + """ Split a line from a ovs-dpctl dump-flow into key and stats.
> + The order of the content in the flow should be:
> + - flow content
> + - stats for the flow
> + - actions
> +
> + This method also assumes that the dump flow output does not
> + change order of fields of the same flow.
> + """
> +
> + line = line.rstrip("\n")
> + (fields, stats, _) = flow_line_split(line)
> +
> + try:
> + fields_dict = elements_to_dict(fields)
> +
> + if (len(fields_dict) == 0):
> + raise ValueError("flow fields are missing %s", line)
> +
> + stats_dict = elements_to_dict(stats)
> + if (len(stats_dict) == 0):
> + raise ValueError("statistics are missing %s.", line)
> +
> + ##
> + # In accumulate mode, the Flow database can reach 10,000's of
> + # persistent flows. The interaction of the script with this many
> + # flows is too slow. Instead, delta are sent to the flow_db
> + # database allow incremental changes to be done in O(m) time
> + # where m is the current flow list, instead of iterating over
> + # all flows in O(n) time where n is the entire history of flows.
> + key = ",".join(fields)
> +
> + self._flow_lock.acquire()
> + try:
> + (stats_old_dict, _) = self._flows.get(key, (None, None))
> + finally:
> + self._flow_lock.release()
> +
> + self.flow_event(fields_dict, stats_old_dict, stats_dict)
> +
> + except ValueError, arg:
> + logging.error(arg)
> + self._error_count += 1
> + raise
> +
> + self._flow_lock.acquire()
> + try:
> + self._flows[key] = (stats_dict, datetime.datetime.now())
> + finally:
> + self._flow_lock.release()
> +
> + def decay(self, decayTimeInSeconds):
> + """ Decay content. """
> + now = datetime.datetime.now()
> + for (key, value) in self._flows.items():
> + (stats_dict, updateTime) = value
> + delta = now - updateTime
> +
> + if (delta.seconds > decayTimeInSeconds):
> + self._flow_lock.acquire()
> + try:
> + del self._flows[key]
> +
> + fields_dict = elements_to_dict(flow_line_iter(key))
> + matches = flow_aggregate(fields_dict, stats_dict)
> + for match in matches:
> + self.field_dec(match)
> +
> + finally:
> + self._flow_lock.release()
> +
> + def flow_stats_get(self):
> + """ Return statistics in a form of a dictionary. """
> + rc = None
> + self._flow_lock.acquire()
> + try:
> + rc = {"flow_total": len(self._flows),
> + "flow_errors": self._error_count}
> + finally:
> + self._flow_lock.release()
> + return rc
> +
> + def field_types_get(self):
> + """ Return the set of types stored in the singleton. """
> + types = set((ii.field_type for ii in self._fields.values()))
> + return types
> +
> + def field_add(self, data):
> + """ Collect dump-flow data to sum number of times item appears. """
> + current = self._fields.get(repr(data), None)
> + if (current is None):
> + current = copy.copy(data)
> + else:
> + current += data
> + self._fields[repr(current)] = current
> +
> + def field_dec(self, data):
> + """ Collect dump-flow data to sum number of times item appears. """
> + current = self._fields.get(repr(data), None)
> + if (current is None):
> + raise ValueError("decrementing field missing %s" % repr(data))
> +
> + current -= data
> + self._fields[repr(current)] = current
> + if (current.count == 0):
> + del self._fields[repr(current)]
> +
> + def field_values_in_order(self, field_type_select, column_order):
> + """ Return a list of items in order maximum first. """
> + values = self._fields.values()
> + if (field_type_select != "all"):
> + # If a field type other than "all" then reduce the list.
> + values = [ii for ii in values
> + if (ii.field_type == field_type_select)]
> + values = [(column_picker(column_order, ii), ii) for ii in values]
> + values.sort(key=operator.itemgetter(0))
> + values.reverse()
> + values = [ii[1] for ii in values]
> + return values
> +
> + def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
> + """ Receives new flow information. """
> +
> + # In order to avoid processing every flow at every sample
> + # period, changes in flow packet count is used to determine the
> + # delta in the flow statistics. This delta is used in the call
> + # to self.decrement prior to self.field_add
> +
> + if (stats_old_dict is None):
> + # This is a new flow
> + matches = flow_aggregate(fields_dict, stats_new_dict)
> + for match in matches:
> + self.field_add(match)
> + else:
> + old_packets = int(stats_old_dict.get("packets", 0))
> + new_packets = int(stats_new_dict.get("packets", 0))
> + if (old_packets == new_packets):
> + # ignore. same data.
> + pass
> + else:
> + old_bytes = stats_old_dict.get("bytes", 0)
> + # old_packets != new_packets
> + # if old_packets > new_packets then we end up decrementing
> + # packets and bytes.
> + matches = flow_aggregate(fields_dict, stats_new_dict)
> + for match in matches:
> + match.decrement(int(old_packets), int(old_bytes), 1)
> + self.field_add(match)
> +
> +
> +class DecayThread(threading.Thread):
> + """ Periodically call flow database to see if any flows are old. """
> + def __init__(self, flow_db, interval):
> + """ Start decay thread. """
> + threading.Thread.__init__(self)
> +
> + self._interval = max(1, interval)
> + self._min_interval = min(1, interval / 10)
> + self._flow_db = flow_db
> + self._event = threading.Event()
> + self._running = True
> +
> + self.daemon = True
> +
> + def run(self):
> + """ Worker thread which handles decaying accumulated flows. """
> +
> + while(self._running):
> + self._event.wait(self._min_interval)
> + if (self._running):
> + self._flow_db.decay(self._interval)
> +
> + def stop(self):
> + """ Stop thread. """
> + self._running = False
> + self._event.set()
> + ##
> + # Give the calling thread time to terminate but not too long.
> + # this thread is a daemon so the application will terminate if
> + # we timeout during the join. This is just a cleaner way to
> + # release resources.
> + self.join(2.0)
> +
> +
> +def flow_top_command(stdscr, render, flow_db):
> + """ Handle input while in top mode. """
> + ch = stdscr.getch()
> + ##
> + # Any character will restart sampling.
> + if (ch == ord('h')):
> + # halt output.
> + ch = stdscr.getch()
> + while (ch == -1):
> + ch = stdscr.getch()
> +
> + if (ch == ord('s')):
> + # toggle which column sorts data in descending order.
> + render.column_select_event()
> + elif (ch == ord('a')):
> + flow_db.accumulate_toggle()
> + elif (ch == ord('f')):
> + render.field_type_toggle()
> + elif (ch == ord(' ')):
> + # resample
> + pass
> +
> + return ch
> +
> +
> +def decay_timer_start(flow_db, accumulateDecay):
> + """ If accumulateDecay greater than zero then start timer. """
> + if (accumulateDecay > 0):
> + decay_timer = DecayThread(flow_db, accumulateDecay)
> + decay_timer.start()
> + return decay_timer
> + else:
> + return None
> +
> +
> +def flows_top(args):
> + """ handles top like behavior when --script is not specified. """
> +
> + flow_db = FlowDB(args.accumulate)
> + render = Render(0)
> +
> + decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
> + lines = []
> +
> + try:
> + stdscr = curses_screen_begin()
> + try:
> + ch = 'X'
> + #stdscr.nodelay(1)
> + stdscr.timeout(args.delay)
> +
> + while (ch != ord('q')):
> + flow_db.begin()
> +
> + try:
> + ihdl = top_input_get(args)
> + try:
> + flows_read(ihdl, flow_db)
> + finally:
> + ihdl.close()
> + except OSError, arg:
> + logging.critical(arg)
> + break
> +
> + (console_height, console_width) = stdscr.getmaxyx()
> + render.console_width_set(console_width)
> +
> + output_height = console_height - 1
> + line_count = range(output_height)
> + line_output = render.format(flow_db)
> + lines = zip(line_count, line_output[:output_height])
> +
> + stdscr.erase()
> + for (count, line) in lines:
> + stdscr.addstr(count, 0, line[:console_width])
> + stdscr.refresh()
> +
> + ch = flow_top_command(stdscr, render, flow_db)
> +
> + finally:
> + curses_screen_end(stdscr)
> + except KeyboardInterrupt:
> + pass
> + if (decay_timer):
> + decay_timer.stop()
> +
> + # repeat output
> + for (count, line) in lines:
> + print line
> +
> +
> +def flows_script(args):
> + """ handles --script option. """
> +
> + flow_db = FlowDB(args.accumulate)
> + flow_db.begin()
> +
> + if (args.flowFiles is None):
> + logging.info("reading flows from stdin")
> + ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
> + try:
> + flow_db = flows_read(ihdl, flow_db)
> + finally:
> + ihdl.close()
> + else:
> + for flowFile in args.flowFiles:
> + logging.info("reading flows from %s", flowFile)
> + ihdl = open(flowFile, "r")
> + try:
> + flow_db = flows_read(ihdl, flow_db)
> + finally:
> + ihdl.close()
> +
> + (_, console_width) = get_terminal_size()
> + render = Render(console_width)
> +
> + for line in render.format(flow_db):
> + print line
> +
> +
> +def main():
> + """ Return 0 on success or 1 on failure.
> +
> + Algorithm
> + There are four stages to the process ovs-dpctl dump-flow content.
> + 1. Retrieve current input
> + 2. store in FlowDB and maintain history
> + 3. Iterate over FlowDB and aggregating stats for each flow field
> + 4. present data.
> +
> + Retrieving current input is currently trivial, the ovs-dpctl dump-flow
> + is called. Future version will have more elaborate means for collecting
> + dump-flow content. FlowDB returns all data as in the form of a hierarchical
> + dictionary. Input will vary.
> +
> + In the case of accumulate mode, flows are not purged from the FlowDB
> + manager. Instead at the very least, merely the latest statistics are
> + kept. In the case, of live output the FlowDB is purged prior to sampling
> + data.
> +
> + Aggregating results requires identify flow fields to aggregate out
> + of the flow and summing stats.
> +
> + """
> + args = args_get()
> +
> + try:
> + if (args.top):
> + flows_top(args)
> + else:
> + flows_script(args)
> + except KeyboardInterrupt:
> + return 1
> + return 0
> +
> +if __name__ == '__main__':
> + sys.exit(main())
> +elif __name__ == 'ovs-dpctl-top':
> + # pylint: disable-msg=R0915
> +
> + ##
> + # Test case beyond this point.
> + # pylint: disable-msg=R0904
> + class TestsuiteFlowParse(unittest.TestCase):
> + """
> + parse flow into hierarchy of dictionaries.
> + """
> + def test_flow_parse(self):
> + """ test_flow_parse. """
> + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> + "udp(src=61252,dst=5355), packets:1, bytes:92, "\
> + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + (fields, stats, _) = flow_line_split(line)
> + flow_dict = elements_to_dict(fields + stats)
> + self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
> + self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
> + self.assertEqual(flow_dict["ipv6"]["src"],
> + "fe80::55bf:fe42:bc96:2812")
> + self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
> + self.assertEqual(flow_dict["packets"], "1")
> + self.assertEqual(flow_dict["bytes"], "92")
> +
> + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> + "udp(src=61252,dst=5355), packets:1, bytes:92, "\
> + "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + (fields, stats, _) = flow_line_split(line)
> + flow_dict = elements_to_dict(fields + stats)
> + self.assertEqual(flow_dict["used"], "-0.703s")
> + self.assertEqual(flow_dict["packets"], "1")
> + self.assertEqual(flow_dict["bytes"], "92")
> +
> + def test_flow_sum(self):
> + """ test_flow_sum. """
> + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> + "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + (fields, stats, _) = flow_line_split(line)
> + stats_dict = elements_to_dict(stats)
> + fields_dict = elements_to_dict(fields)
> + ##
> + # Test simple case of one line.
> + flow_db = FlowDB(False)
> + matches = flow_aggregate(fields_dict, stats_dict)
> + for match in matches:
> + flow_db.field_add(match)
> +
> + flow_types = flow_db.field_types_get()
> + expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
> + self.assert_(len(flow_types) == len(expected_flow_types))
> + for flow_type in flow_types:
> + self.assertTrue(flow_type in expected_flow_types)
> +
> + for flow_type in flow_types:
> + sum_value = flow_db.field_values_in_order("all", 1)
> + self.assert_(len(sum_value) == 5)
> + self.assert_(sum_value[0].packets == 2)
> + self.assert_(sum_value[0].count == 1)
> + self.assert_(sum_value[0].bytes == 92)
> +
> + ##
> + # Add line again just to see counts go up.
> + matches = flow_aggregate(fields_dict, stats_dict)
> + for match in matches:
> + flow_db.field_add(match)
> +
> + flow_types = flow_db.field_types_get()
> + self.assert_(len(flow_types) == len(expected_flow_types))
> + for flow_type in flow_types:
> + self.assertTrue(flow_type in expected_flow_types)
> +
> + for flow_type in flow_types:
> + sum_value = flow_db.field_values_in_order("all", 1)
> + self.assert_(len(sum_value) == 5)
> + self.assert_(sum_value[0].packets == 4)
> + self.assert_(sum_value[0].count == 2)
> + self.assert_(sum_value[0].bytes == 2 * 92)
> +
> + def test_assoc_list(self):
> + """ test_assoc_list. """
> + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> + "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + valid_flows = [
> + 'eth_type(0x86dd)',
> + 'udp(dst=5355)',
> + 'in_port(4)',
> + 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
> + 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
> + ]
> +
> + (fields, stats, _) = flow_line_split(line)
> + stats_dict = elements_to_dict(stats)
> + fields_dict = elements_to_dict(fields)
> +
> + ##
> + # Test simple case of one line.
> + flow_db = FlowDB(False)
> + matches = flow_aggregate(fields_dict, stats_dict)
> + for match in matches:
> + flow_db.field_add(match)
> +
> + for sum_value in flow_db.field_values_in_order("all", 1):
> + assoc_list = Columns.assoc_list(sum_value)
> + for item in assoc_list:
> + if (item[0] == "fields"):
> + self.assertTrue(item[1] in valid_flows)
> + elif (item[0] == "packets"):
> + self.assertTrue(item[1] == 2)
> + elif (item[0] == "count"):
> + self.assertTrue(item[1] == 1)
> + elif (item[0] == "average"):
> + self.assertTrue(item[1] == 46.0)
> + elif (item[0] == "bytes"):
> + self.assertTrue(item[1] == 92)
> + else:
> + raise ValueError("unknown %s", item[0])
> +
> + def test_human_format(self):
> + """ test_assoc_list. """
> +
> + self.assertEqual(approximate_size(0.0), "0.0 KiB")
> + self.assertEqual(approximate_size(1024), "1.0 KiB")
> + self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
> + self.assertEqual(approximate_size((1024 * 1024) + 100000),
> + "1.1 MiB")
> + value = (1024 * 1024 * 1024) + 100000000
> + self.assertEqual(approximate_size(value), "1.1 GiB")
> +
> + def test_flow_line_split(self):
> + """ Splitting a flow line is not trivial.
> + There is no clear delimiter. Comma is used liberally."""
> + expected_fields = ["in_port(4)",
> + "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
> + "eth_type(0x86dd)",
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
> + "udp(src=61252,dst=5355)"]
> + expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
> + expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> + "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> + "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> + "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> + "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> + "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> + "38,41,44,47,50,53,56,59,62,65"
> +
> + (fields, stats, actions) = flow_line_split(line)
> +
> + self.assertEqual(fields, expected_fields)
> + self.assertEqual(stats, expected_stats)
> + self.assertEqual(actions, expected_actions)
> +
> + def test_accumulate_decay(self):
> + """ test_accumulate_decay: test accumulated decay. """
> + lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
> + "dst=ff:ff:ff:ff:ff:ff),"
> + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> + "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> + "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
> + "packets:1, bytes:120, used:0.004s, actions:1"]
> +
> + flow_db = FlowDB(True)
> + flow_db.begin()
> + flow_db.flow_line_add(lines[0])
> +
> + # Make sure we decay
> + time.sleep(4)
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> + flow_db.decay(1)
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
> +
> + flow_db.flow_line_add(lines[0])
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> + flow_db.decay(30)
> + # Should not be deleted.
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> +
> + flow_db.flow_line_add(lines[0])
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> + timer = decay_timer_start(flow_db, 2)
> + time.sleep(10)
> + self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
> + timer.stop()
> +
> + def test_accumulate(self):
> + """ test_accumulate test that FlowDB supports accumulate. """
> +
> + lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
> + "dst=ff:ff:ff:ff:ff:ff),"
> + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> + "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> + "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
> + "packets:1, bytes:120, used:0.004s, actions:1",
> + "in_port(2),"
> + "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
> + "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
> + "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
> + "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
> + "packets:2, bytes:5026, used:0.348s, actions:1",
> + "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
> + "dst=ff:ff:ff:ff:ff:ff),"
> + "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> + "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> + "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> + "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
> + "bytes:240, used:0.004s, actions:1"]
> +
> + lines = [
> + "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
> + "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
> + "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
> + "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
> + "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
> + "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
> + ]
> +
> + # Turn on accumulate.
> + flow_db = FlowDB(True)
> + flow_db.begin()
> +
> + flow_db.flow_line_add(lines[0])
> +
> + # Test one flow exist.
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 1)
> + self.assertEqual(in_ports[0].bytes, 120)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + # simulate another sample
> + # Test two different flows exist.
> + flow_db.begin()
> + flow_db.flow_line_add(lines[1])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 1)
> + self.assertEqual(in_ports[0].bytes, 120)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 126)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + # Test first flow increments packets.
> + flow_db.begin()
> + flow_db.flow_line_add(lines[2])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 240)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 126)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + # Test third flow but with the same in_port(1) as the first flow.
> + flow_db.begin()
> + flow_db.flow_line_add(lines[3])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 3)
> + self.assertEqual(in_ports[0].bytes, 360)
> + self.assertEqual(in_ports[0].count, 2)
> +
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 126)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + # Third flow has changes.
> + flow_db.begin()
> + flow_db.flow_line_add(lines[4])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 4)
> + self.assertEqual(in_ports[0].bytes, 480)
> + self.assertEqual(in_ports[0].count, 2)
> +
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 126)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + # First flow reset.
> + flow_db.begin()
> + flow_db.flow_line_add(lines[5])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 3)
> + self.assertEqual(in_ports[0].bytes, 360)
> + self.assertEqual(in_ports[0].count, 2)
> +
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 2)
> + self.assertEqual(in_ports[0].bytes, 126)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + def test_parse_character_errors(self):
> + """ test_parsing errors.
> + The flow parses is purposely loose. Its not designed to validate
> + input. Merely pull out what it can but there are situations
> + that a parse error can be detected.
> + """
> +
> + lines = ["complete garbage",
> + "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
> + "dst=33:33:00:00:00:66),"
> + "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
> + "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
> + "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
> + "packets:2,bytes:5026,actions:1"]
> +
> + flow_db = FlowDB(False)
> + flow_db.begin()
> + for line in lines:
> + try:
> + flow_db.flow_line_add(line)
> + except ValueError:
> + # We want an exception. That is how we know we have
> + # correctly found a simple parsing error. We are not
> + # looking to validate flow output just catch simple issues.
> + continue
> + self.assertTrue(False)
> +
> + def test_tunnel_parsing(self):
> + """ test_tunnel_parsing test parse flows with tunnel. """
> + lines = [
> + "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
> + "tos=0x0,ttl=64,flags(key)),in_port(1),"
> + "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
> + "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
> + "actions:userspace(pid=4294962691,slow_path(cfm))"
> + ]
> + flow_db = FlowDB(False)
> + flow_db.begin()
> + flow_db.flow_line_add(lines[0])
> + sum_values = flow_db.field_values_in_order("all", 1)
> + in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> + self.assertEqual(len(in_ports), 1)
> + self.assertEqual(in_ports[0].packets, 6)
> + self.assertEqual(in_ports[0].bytes, 534)
> + self.assertEqual(in_ports[0].count, 1)
> +
> + def test_flow_multiple_paren(self):
> + """ test_flow_multiple_paren. """
> + line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
> + valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
> + "in_port(2)"]
> + rc = flow_line_iter(line)
> + self.assertEqual(valid, rc)
> +
> + def test_to_network(self):
> + """ test_to_network test ipv4_to_network and ipv6_to_network. """
> + ipv4s = [
> + ("192.168.0.1", "192.168.0.1"),
> + ("192.168.0.1/255.255.255.255", "192.168.0.1"),
> + ("192.168.0.1/255.255.255.0", "192.168.0.0"),
> + ("192.168.0.1/255.255.0.0", "192.168.0.0"),
> + ("192.168.0.1/255.0.0.0", "192.0.0.0"),
> + ("192.168.0.1/0.0.0.0", "0.0.0.0"),
> + ("10.24.106.230/255.255.255.255", "10.24.106.230"),
> + ("10.24.106.230/255.255.255.0", "10.24.106.0"),
> + ("10.24.106.0/255.255.255.0", "10.24.106.0"),
> + ("10.24.106.0/255.255.252.0", "10.24.104.0")
> + ]
> +
> + ipv6s = [
> + ("1::192:168:0:1", "1::192:168:0:1"),
> + ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
> + ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
> + ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
> + ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
> + ("1::192:168:0:1/1::0:0:0:0", "1::"),
> + ("1::192:168:0:1/::", "::")
> + ]
> +
> + for (ipv4_test, ipv4_check) in ipv4s:
> + self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
> +
> + for (ipv6_test, ipv6_check) in ipv6s:
> + self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
> diff --git a/xenserver/openvswitch-xen.spec.in b/xenserver/openvswitch-xen.spec.in
> index 4d3b8fa..18fc77b 100644
> --- a/xenserver/openvswitch-xen.spec.in
> +++ b/xenserver/openvswitch-xen.spec.in
> @@ -430,6 +430,7 @@ exit 0
> /usr/bin/ovs-parse-backtrace
> /usr/bin/ovs-pcap
> /usr/bin/ovs-tcpundump
> +/usr/bin/ovs-dpctl-top
> /usr/bin/ovs-vlan-test
> /usr/bin/ovs-vsctl
> /usr/bin/ovsdb-client
> @@ -443,6 +444,7 @@ exit 0
> /usr/share/man/man8/ovs-bugtool.8.gz
> /usr/share/man/man8/ovs-ctl.8.gz
> /usr/share/man/man8/ovs-dpctl.8.gz
> +/usr/share/man/man8/ovs-dpctl-top.8.gz
> /usr/share/man/man8/ovs-ofctl.8.gz
> /usr/share/man/man8/ovs-parse-backtrace.8.gz
> /usr/share/man/man1/ovs-pcap.1.gz
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev at openvswitch.org
http://openvswitch.org/mailman/listinfo/dev
More information about the dev
mailing list