[ovs-dev] [PATCH, v5, 1/1] utilities: a top like tool for ovs-dpctl dump-flows

Mark Hamilton mhamilton at vmware.com
Fri Sep 13 18:00:19 UTC 2013


Gurucharan,
 In regards to the argparse, how is it done today for the various scripts that use argparse. ovs-test for example simple does import argparse. So I'm unclear on what I need to do for this ovs-dpctl-top script beyond the user adding the compat folder to their PYTHONPATH.

When you say it looks like my usage of python-argparse is not compatible.
I tried:

ssh root at 10.24.122.93
export PYTHONPATH=/usr/share/openvswitch/python
python ovs-dpctl-top.in --accumulate

This works. Are you seeing a specific error when you run ovs-dpctl-top?


By "maintain order", you mean alphabetically?


-Mark

----- Original Message -----
From: "Gurucharan Shetty" <shettyg at nicira.com>
To: "Mark Hamilton" <mhamilton at nicira.com>
Cc: "dev" <dev at openvswitch.org>
Sent: Friday, September 13, 2013 10:19:16 AM
Subject: Re: [ovs-dev] [PATCH, v5, 1/1] utilities: a top like tool for ovs-dpctl dump-flows

On Fri, Sep 13, 2013 at 8:55 AM, Mark Hamilton <mhamilton at nicira.com> wrote:
> This python script summarizes ovs-dpctl dump-flows content by aggregating
> the number of packets, total bytes and occurrence of the following fields:
>   - Openflow in_port
>   - Ethernet type
>   - Source and destination MAC addresses
>   - IP protocol
>   - Source and destination IPv4 addresses
>   - Source and destination IPv6 addresses
>   - UDP and TCP destination port
>   - Tunnel source and destination addresses
>
> Testing included confirming both mega-flows and non-megaflows are
> properly parsed. Bit masks are applied in the case of mega-flows
> prior to aggregation.  Test --script parameter which runs in
> non-interactive mode. Tested syntax against python 2.4.3, 2.6 and 2.7.
> Confirmed script passes pep8 and pylint run as:
>
> pylint --disable=I0011 --include-id=y --reports=n
>
> This tool has been added to these distribution:
>   - add ovs-dpctl-top to debian distribution
>   - add ovs-dpctl-top to rpm distribution.
>   - add ovs-dpctl-top to XenServer RPM.
I cannot get it to work on Xenserver right now. We do include
python-argparse in the OVS compat folder but it looks like your usage
of it is not compatible with the one we have it there.


>
> Signed-off-by: Mark Hamilton <mhamilton at nicira.com>
>
> ---
> Version 5
>   - Changed Openflow in_port to the correct term Datapath in_port
>   - Changed decay timer thread to a simplier form which works better
>     across the various supported python versions.
>
> Version 4
>   - Improve parser to handle nested parentheses.
>   - Change code to be python2.4 compliant.
>   - Remove netaddr module requirement.
>   - Instead use core socket module.
>   - Fix decay to remove flows and propagate new stats.
>
> Version 3
>   - Removed trailing whitespaces and white space changes that prevented
>     version 2 patch from applying cleanly.
>
> Version 2
>   - Single character commands have been added to control how content is
>     displayed.
>   - An accumulate mode has been added which continuously collects and
>     aggregates dump-flow content.
>   - Instead of running through all accumulated flows which takes a long time
>     when the flow count gets in the 10,000's, changes are calculated and used to
>     update statistics.
>   - The total number of bytes and packet average are displayed.
>   - Given that a significant amount of space is dedicated to statistics a
>     conciderable amount of work went into managing the output.
>
>  debian/control                     |    2 +-
>  debian/openvswitch-switch.install  |    1 +
>  debian/openvswitch-switch.manpages |    1 +
>  manpages.mk                        |    4 +
>  rhel/openvswitch.spec.in           |    2 +
>  utilities/automake.mk              |    6 +
>  utilities/ovs-dpctl-top.8.in       |  140 +++
>  utilities/ovs-dpctl-top.in         | 1691 ++++++++++++++++++++++++++++++++++++
>  xenserver/openvswitch-xen.spec.in  |    2 +
>  9 files changed, 1848 insertions(+), 1 deletion(-)
>  create mode 100644 utilities/ovs-dpctl-top.8.in
>  create mode 100755 utilities/ovs-dpctl-top.in
>
> diff --git a/debian/control b/debian/control
> index fe58b31..46b5630 100644
> --- a/debian/control
> +++ b/debian/control
> @@ -66,7 +66,7 @@ Description: Open vSwitch common components
>  Package: openvswitch-switch
>  Architecture: linux-any
>  Suggests: openvswitch-datapath-module
> -Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase
> +Depends: ${shlibs:Depends}, ${misc:Depends}, ${python:Depends}, openvswitch-common (= ${binary:Version}), module-init-tools, procps, uuid-runtime, netbase, python-argparse
>  Description: Open vSwitch switch implementations
>   Open vSwitch is a production quality, multilayer, software-based,
>   Ethernet virtual switch. It is designed to enable massive network
> diff --git a/debian/openvswitch-switch.install b/debian/openvswitch-switch.install
> index 4d7a15b..ab111be 100644
> --- a/debian/openvswitch-switch.install
> +++ b/debian/openvswitch-switch.install
> @@ -1,6 +1,7 @@
>  usr/bin/ovs-dpctl
>  usr/bin/ovs-pcap
>  usr/bin/ovs-tcpundump
> +usr/bin/ovs-dpctl-top
Can we maintain the order here.

>  usr/bin/ovs-vlan-test
>  usr/bin/ovs-vsctl
>  usr/bin/ovsdb-tool
> diff --git a/debian/openvswitch-switch.manpages b/debian/openvswitch-switch.manpages
> index a0a331c..3a9f8b7 100644
> --- a/debian/openvswitch-switch.manpages
> +++ b/debian/openvswitch-switch.manpages
> @@ -3,6 +3,7 @@ _debian/utilities/ovs-dpctl.8
>  _debian/utilities/ovs-pcap.1
>  _debian/utilities/ovs-tcpundump.1
>  _debian/utilities/ovs-vlan-test.8
> +_debian/utilities/ovs-dpctl-top.8
>  _debian/utilities/ovs-vsctl.8
>  _debian/vswitchd/ovs-vswitchd.8
>  vswitchd/ovs-vswitchd.conf.db.5
> diff --git a/manpages.mk b/manpages.mk
> index 263f2ea..2a34f04 100644
> --- a/manpages.mk
> +++ b/manpages.mk
> @@ -116,6 +116,10 @@ lib/vconn-active.man:
>  lib/vconn-passive.man:
>  lib/vlog.man:
>
> +utilities/ovs-dpctl-top.8: \
> +       utilities/ovs-dpctl-top.8.in
> +utilities/ovs-dpctl-top.8.in:
> +
>  utilities/ovs-dpctl.8: \
>         utilities/ovs-dpctl.8.in \
>         lib/common.man \
> diff --git a/rhel/openvswitch.spec.in b/rhel/openvswitch.spec.in
> index 0fd5200..2c0570e 100644
> --- a/rhel/openvswitch.spec.in
> +++ b/rhel/openvswitch.spec.in
> @@ -119,6 +119,7 @@ exit 0
>  /usr/bin/ovs-pcap
>  /usr/bin/ovs-pki
>  /usr/bin/ovs-tcpundump
> +/usr/bin/ovs-dpctl-top
>  /usr/bin/ovs-vlan-test
>  /usr/bin/ovs-vsctl
>  /usr/bin/ovsdb-client
> @@ -137,6 +138,7 @@ exit 0
>  /usr/share/man/man8/ovs-bugtool.8.gz
>  /usr/share/man/man8/ovs-ctl.8.gz
>  /usr/share/man/man8/ovs-dpctl.8.gz
> +/usr/share/man/man8/ovs-dpctl-top.8.gz
>  /usr/share/man/man8/ovs-ofctl.8.gz
>  /usr/share/man/man8/ovs-parse-backtrace.8.gz
>  /usr/share/man/man8/ovs-pki.8.gz
> diff --git a/utilities/automake.mk b/utilities/automake.mk
> index 9f2bb63..6f88d97 100644
> --- a/utilities/automake.mk
> +++ b/utilities/automake.mk
> @@ -10,6 +10,7 @@ bin_SCRIPTS += \
>         utilities/ovs-l3ping \
>         utilities/ovs-parse-backtrace \
>         utilities/ovs-pcap \
> +       utilities/ovs-dpctl-top \
>         utilities/ovs-tcpundump \
>         utilities/ovs-test \
>         utilities/ovs-vlan-test
> @@ -30,6 +31,7 @@ EXTRA_DIST += \
>         utilities/ovs-pcap.in \
>         utilities/ovs-pki.in \
>         utilities/ovs-save \
> +       utilities/ovs-dpctl-top.in \
>         utilities/ovs-tcpundump.in \
>         utilities/ovs-test.in \
>         utilities/ovs-vlan-test.in
> @@ -44,6 +46,7 @@ MAN_ROOTS += \
>         utilities/ovs-parse-backtrace.8 \
>         utilities/ovs-pcap.1.in \
>         utilities/ovs-pki.8.in \
> +       utilities/ovs-dpctl-top.8.in \
>         utilities/ovs-tcpundump.1.in \
>         utilities/ovs-vlan-bug-workaround.8.in \
>         utilities/ovs-test.8.in \
> @@ -66,6 +69,8 @@ DISTCLEANFILES += \
>         utilities/ovs-pcap.1 \
>         utilities/ovs-pki \
>         utilities/ovs-pki.8 \
> +       utilities/ovs-dpctl-top \
> +       utilities/ovs-dpctl-top.8 \
>         utilities/ovs-tcpundump \
>         utilities/ovs-tcpundump.1 \
>         utilities/ovs-test \
> @@ -85,6 +90,7 @@ man_MANS += \
>         utilities/ovs-parse-backtrace.8 \
>         utilities/ovs-pcap.1 \
>         utilities/ovs-pki.8 \
> +       utilities/ovs-dpctl-top.8 \
>         utilities/ovs-tcpundump.1 \
>         utilities/ovs-vlan-bug-workaround.8 \
>         utilities/ovs-test.8 \
> diff --git a/utilities/ovs-dpctl-top.8.in b/utilities/ovs-dpctl-top.8.in
> new file mode 100644
> index 0000000..92facb2
> --- /dev/null
> +++ b/utilities/ovs-dpctl-top.8.in
> @@ -0,0 +1,140 @@
> +.de IQ
> +.  br
> +.  ns
> +.  IP "\\$1"
> +..
> +.TH ovs\-dpctl\-top "8" "@VERSION@" "Open vSwitch" "Open vSwitch Manual"
> +.
> +.SH NAME
> +\fBovs\-dpctl\-top\fR \- Top like behavior for ovs\-dpctl dump\-flows
> +.
> +.SH SYNOPSIS
> +\fBovs\-dpctl\-top\fR [\-h] [\-v] [\-f FLOWFILES] [\-V] [\-s] [\-\-host HOST]
> +[\-a | \-\-accumulate] [\-\-accumulate\-decay ACCUMULATEDECAY] [\-d DELAY]
> +.
> +.SH DESCRIPTION
> +.PP
> +This program summarizes \fBovs\-dpctl\fR flow content by aggregating the number
> +of packets, total bytes and occurrence of the following fields:
> +.IP
> +\- Openflow in_port
> +.IP
> +\- Ethernet type
> +.IP
> +\- Source and destination MAC addresses
> +.IP
> +\- IP protocol
> +.IP
> +\- Source and destination IPv4 addresses
> +.IP
> +\- Source and destination IPv6 addresses
> +.IP
> +\- UDP and TCP destination port
> +.IP
> +\- Tunnel source and destination addresses
> +.
> +.SS "Output shows four values:"
> +.IP
> +\- FIELDS: the flow fields for example in_port(1).
> +.IP
> +\- COUNT: the number of lines in the dump\-flow output contain the flow field.
> +.IP
> +\- PACKETS: the total number of packets containing the flow field.
> +.IP
> +\- BYTES: the total number of bytes containing the flow field.  If units are
> +not present then values are in bytes.
> +.IP
> +\- AVERAGE: the average packets size (BYTES/PACKET).
> +.PP
> +.SS "Top Behavior"
> +.PP
> +While in top mode, the default behavior, the following single character commands
> +are supported:
> +.IP
> +a \- toggles top in accumulate and live mode.  Accumulate mode is described
> +below.
> +.IP
> +s \- toggles which column is used to sort content in decreasing order.  A
> +DESC title is placed over the column.
> +.IP
> +_ \- a space indicating to collect dump\-flow content again
> +.IP
> +h \- halt output.  Any character will restart sampling
> +.IP
> +f \- cycle through flow fields
> +.IP
> +q \- q for quit.
> +.PP
> +.SS "Accumulate Mode"
> +.PP
> +There are two supported modes: live and accumulate.  The default is live.
> +The parameter \fB\-\-accumulate\fR  or the 'a' character in top mode enables the
> +latter.  In live mode, recent dump\-flow content is presented.
> +Where as accumulate mode keeps track of the prior historical
> +information until the flow is reset not when the flow is purged.  Reset
> +flows are determined when the packet count for a flow has decreased from
> +its previous sample.  There is one caveat, eventually the system will
> +run out of memory if, after the accumulate\-decay period any flows that
> +have not been refreshed are purged.  The goal here is to free memory
> +of flows that are not active.  Statistics are not decremented.  Their purpose
> +is to reflect the overall history of the flow fields.
> +.PP
> +.SS "Debugging Errors"
> +.PP
> +Parsing errors are counted and displayed in the status line at the beginning
> +of the output.  Use the \fB\-\-verbose\fR option with \fB\-\-script to see
> +what output was not parsed, like this:
> +.PP
> +$ ovs\-dpctl dump\-flows | ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-verbose\fR
> +.PP
> +Error messages will identify content that failed to parse.
> +.PP
> +.SS "Access Remote Hosts"
> +.PP
> +The \fB\-\-host\fR must follow the format user at hostname.  This script simply
> +calls \&'ssh user at Hostname' without checking for login credentials therefore
> +public keys should be installed on the system identified by hostname, such as:
> +.PP
> +$ ssh\-copy\-id user at hostname
> +.PP
> +Consult ssh\-copy\-id man pages for more details.
> +.PP
> +.SS "Expected usage"
> +.PP
> +$ ovs\-dpctl\-top
> +.PP
> +or to run as a script:
> +.PP
> +$ ovs\-dpctl dump\-flows > dump\-flows.log
> +.PP
> +$ ovs\-dpctl\-top \fB\-\-script\fR \fB\-\-flow\-file\fR dump\-flows.log
> +.SS "OPTIONS"
> +.TP
> +\fB\-h\fR, \fB\-\-help\fR
> +show this help message and exit.
> +.TP
> +\fB\-v\fR, \fB\-\-version\fR
> +show program's version number and exit.
> +.TP
> +\fB\-f\fR FLOWFILES, \fB\-\-flow\-file\fR FLOWFILES
> +file containing flows from ovs\-dpctl dump\-flow.
> +.TP
> +\fB\-V\fR, \fB\-\-verbose\fR
> +enable debug level verbosity.
> +.TP
> +\fB\-s\fR, \fB\-\-script\fR
> +Run from a script (no user interface).
> +.TP
> +\fB\-\-host\fR HOST
> +Specify a user at host for retrieving flows see Accessing
> +Remote Hosts for more information.
> +.TP
> +\fB\-a\fR, \fB\-\-accumulate\fR
> +Accumulate dump\-flow content.
> +.TP
> +\fB\-\-accumulate\-decay\fR ACCUMULATEDECAY
> +Decay old accumulated flows.  The default is 5 minutes. A value of 0 disables
> +decay.
> +.TP
> +\fB\-d\fR DELAY, \fB\-\-delay\fR DELAY
> +Delay in milliseconds to collect dump\-flow content (sample rate).
> diff --git a/utilities/ovs-dpctl-top.in b/utilities/ovs-dpctl-top.in
> new file mode 100755
> index 0000000..ddcc2e4
> --- /dev/null
> +++ b/utilities/ovs-dpctl-top.in
> @@ -0,0 +1,1691 @@
> +#! @PYTHON@
> +#
> +# Copyright (c) 2013 Nicira, Inc.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at:
> +#
> +#     http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +#
> +#
> +# The approximate_size code was copied from
> +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
> +# which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
> +# used under a Creative Commons Attribution-Share-Alike license:
> +# http://creativecommons.org/licenses/by-sa/3.0/
> +#
> +#
> +
> +"""Top like behavior for ovs-dpctl dump-flows output.
> +
> +This program summarizes ovs-dpctl flow content by aggregating the number
> +of packets, total bytes and occurrence of the following fields:
> +
> +  - Datapath in_port
> +
> +  - Ethernet type
> +
> +  - Source and destination MAC addresses
> +
> +  - IP protocol
> +
> +  - Source and destination IPv4 addresses
> +
> +  - Source and destination IPv6 addresses
> +
> +  - UDP and TCP destination port
> +
> +  - Tunnel source and destination addresses
> +
> +
> +Output shows four values:
> +  - FIELDS: the flow fields for example in_port(1).
> +
> +  - PACKETS: the total number of packets containing the flow field.
> +
> +  - BYTES: the total number of bytes containing the flow field. If units are
> +  not present then values are in bytes.
> +
> +  - AVERAGE: the average packets size (BYTES/PACKET).
> +
> +  - COUNT: the number of lines in the dump-flow output contain the flow field.
> +
> +Top Behavior
> +
> +While in top mode, the default behavior, the following single character
> +commands are supported:
> +
> +  a - toggles top in accumulate and live mode. Accumulate mode is described
> +  below.
> +
> +  s - toggles which column is used to sort content in decreasing order. A
> +  DESC title is placed over the column.
> +
> +  _ - a space indicating to collect dump-flow content again
> +
> +  h - halt output. Any character will restart sampling
> +
> +  f - cycle through flow fields
> +
> +  q - q for quit.
> +
> +Accumulate Mode
> +
> +There are two supported modes: live and accumulate. The default is live.
> +The parameter --accumulate  or the 'a' character in top mode enables the
> +latter. In live mode, recent dump-flow content is presented.
> +Where as accumulate mode keeps track of the prior historical
> +information until the flow is reset not when the flow is purged. Reset
> +flows are determined when the packet count for a flow has decreased from
> +its previous sample. There is one caveat, eventually the system will
> +run out of memory if, after the accumulate-decay period any flows that
> +have not been refreshed are purged. The goal here is to free memory
> +of flows that are not active. Statistics are not decremented. Their purpose
> +is to reflect the overall history of the flow fields.
> +
> +
> +Debugging Errors
> +
> +Parsing errors are counted and displayed in the status line at the beginning
> +of the output. Use the --verbose option with --script to see what output
> + was not parsed, like this:
> +$ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
> +
> +Error messages will identify content that failed to parse.
> +
> +
> +Access Remote Hosts
> +
> +The --host must follow the format user at hostname. This script simply calls
> +'ssh user at Hostname' without checking for login credentials therefore public
> +keys should be installed on the system identified by hostname, such as:
> +
> +$ ssh-copy-id user at hostname
> +
> +Consult ssh-copy-id man pages for more details.
> +
> +
> +Expected usage
> +
> +$ ovs-dpctl-top
> +
> +or to run as a script:
> +$ ovs-dpctl dump-flows > dump-flows.log
> +$ ovs-dpctl-top --script --flow-file dump-flows.log
> +
> +"""
> +
> +# pylint: disable-msg=C0103
> +# pylint: disable-msg=C0302
> +# pylint: disable-msg=R0902
> +# pylint: disable-msg=R0903
> +# pylint: disable-msg=R0904
> +# pylint: disable-msg=R0912
> +# pylint: disable-msg=R0913
> +# pylint: disable-msg=R0914
> +
> +import sys
> +import os
> +import argparse
> +import logging
> +import re
> +import unittest
> +import copy
> +import curses
> +import operator
> +import subprocess
> +#import netaddr
> +import fcntl
> +import struct
> +import termios
> +import datetime
> +import threading
> +import time
> +import socket
> +
> +
> +##
> +# The following two definitions provide the necessary netaddr functionality.
> +# Python netaddr module is not part of the core installation. Packaging
> +# netaddr was involved and seems inappropriate given that only two
> +# methods where used.
> +def ipv4_to_network(ip_str):
> +    """ Calculate the network given a ipv4/mask value.
> +    If a mask is not present simply return ip_str.
> +    """
> +    pack_length = '!HH'
> +    try:
> +        (ip, mask) = ip_str.split("/")
> +    except ValueError:
> +        # just an ip address no mask.
> +        return ip_str
> +
> +    ip_p = socket.inet_pton(socket.AF_INET, ip)
> +    ip_t = struct.unpack(pack_length, ip_p)
> +    mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
> +    network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
> +
> +    return socket.inet_ntop(socket.AF_INET,
> +                            struct.pack('!HH', network_n[0], network_n[1]))
> +
> +
> +def ipv6_to_network(ip_str):
> +    """ Calculate the network given a ipv6/mask value.
> +    If a mask is not present simply return ip_str.
> +    """
> +    pack_length = '!HHHHHHHH'
> +    try:
> +        (ip, mask) = ip_str.split("/")
> +    except ValueError:
> +        # just an ip address no mask.
> +        return ip_str
> +
> +    ip_p = socket.inet_pton(socket.AF_INET6, ip)
> +    ip_t = struct.unpack(pack_length, ip_p)
> +    mask_t = struct.unpack(pack_length,
> +                           socket.inet_pton(socket.AF_INET6, mask))
> +    network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
> +
> +    return socket.inet_ntop(socket.AF_INET6,
> +                            struct.pack(pack_length,
> +                                        network_n[0], network_n[1],
> +                                        network_n[2], network_n[3],
> +                                        network_n[4], network_n[5],
> +                                        network_n[6], network_n[7]))
> +
> +
> +##
> +# columns displayed
> +##
> +class Columns:
> +    """ Holds column specific content.
> +    Titles needs to be less than 8 characters.
> +    """
> +    VALUE_WIDTH = 9
> +    FIELDS = "fields"
> +    PACKETS = "packets"
> +    COUNT = "count"
> +    BYTES = "bytes"
> +    AVERAGE = "average"
> +
> +    def __init__(self):
> +        pass
> +
> +    @staticmethod
> +    def assoc_list(obj):
> +        """ Return a associated list. """
> +        return [(Columns.FIELDS, repr(obj)),
> +                (Columns.PACKETS, obj.packets),
> +                (Columns.BYTES, obj.bytes),
> +                (Columns.COUNT, obj.count),
> +                (Columns.AVERAGE, obj.average),
> +                ]
> +
> +
> +def element_eth_get(field_type, element, stats_dict):
> +    """ Extract eth frame src and dst from a dump-flow element."""
> +    fmt = "%s(src=%s,dst=%s)"
> +
> +    element = fmt % (field_type, element["src"], element["dst"])
> +    return SumData(field_type, element, stats_dict["packets"],
> +                   stats_dict["bytes"], element)
> +
> +
> +def element_ipv4_get(field_type, element, stats_dict):
> +    """ Extract src and dst from a dump-flow element."""
> +    fmt = "%s(src=%s,dst=%s)"
> +    element_show = fmt % (field_type, element["src"], element["dst"])
> +
> +    ##
> +    # netaddr implementation.
> +    #element_key = fmt % (field_type,
> +    #                    netaddr.IPNetwork(element["src"]).ipv4().network,
> +    #                    netaddr.IPNetwork(element["dst"]).ipv4().network)
> +
> +    element_key = fmt % (field_type, ipv4_to_network(element["src"]),
> +                         ipv4_to_network(element["dst"]))
> +
> +    return SumData(field_type, element_show, stats_dict["packets"],
> +                       stats_dict["bytes"], element_key)
> +
> +
> +def element_tunnel_get(field_type, element, stats_dict):
> +    """ Extract src and dst from a tunnel."""
> +    return element_ipv4_get(field_type, element, stats_dict)
> +
> +
> +def element_ipv6_get(field_type, element, stats_dict):
> +    """ Extract src and dst from a dump-flow element."""
> +
> +    fmt = "%s(src=%s,dst=%s)"
> +    element_show = fmt % (field_type, element["src"], element["dst"])
> +
> +    ##
> +    # netaddr implementation
> +    #src = str(netaddr.IPNetwork(element["src"]).ipv6().network)
> +    #dst = str(netaddr.IPNetwork(element["dst"]).ipv6().network)
> +    #element_key = fmt % (field_type, src, dst)
> +
> +    element_key = fmt % (field_type, ipv6_to_network(element["src"]),
> +                         ipv6_to_network(element["dst"]))
> +
> +    return SumData(field_type, element_show, stats_dict["packets"],
> +                       stats_dict["bytes"], element_key)
> +
> +
> +def element_dst_port_get(field_type, element, stats_dict):
> +    """ Extract src and dst from a dump-flow element."""
> +    element_key = "%s(dst=%s)" % (field_type, element["dst"])
> +    return SumData(field_type, element_key, stats_dict["packets"],
> +                   stats_dict["bytes"], element_key)
> +
> +
> +def element_passthrough_get(field_type, element, stats_dict):
> +    """ Extract src and dst from a dump-flow element."""
> +    element_key = "%s(%s)" % (field_type, element)
> +    return SumData(field_type, element_key,
> +                   stats_dict["packets"], stats_dict["bytes"], element_key)
> +
> +
> +# pylint: disable-msg=R0903
> +class OutputFormat:
> +    """ Holds field_type and function to extract element value. """
> +    def __init__(self, field_type, generator):
> +        self.field_type = field_type
> +        self.generator = generator
> +
> +OUTPUT_FORMAT = [
> +    OutputFormat("eth", element_eth_get),
> +    OutputFormat("ipv4", element_ipv4_get),
> +    OutputFormat("ipv6", element_ipv6_get),
> +    OutputFormat("tunnel", element_tunnel_get),
> +    OutputFormat("udp", element_dst_port_get),
> +    OutputFormat("tcp", element_dst_port_get),
> +    OutputFormat("eth_type", element_passthrough_get),
> +    OutputFormat("in_port", element_passthrough_get)
> +    ]
> +
> +
> +ELEMENT_KEY = {
> +    "udp": "udp.dst",
> +    "tcp": "tcp.dst"
> +    }
> +
> +
> +def top_input_get(args):
> +    """ Return subprocess stdout."""
> +    cmd = []
> +    if (args.host):
> +        cmd += ["ssh", args.host]
> +    cmd += ["ovs-dpctl", "dump-flows"]
> +
> +    return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
> +                            stdout=subprocess.PIPE).stdout
> +
> +
> +def args_get():
> +    """ read program parameters handle any necessary validation of input. """
> +
> +    parser = argparse.ArgumentParser(version="@VERSION@",
> +                          formatter_class=argparse.RawDescriptionHelpFormatter,
> +                          description=__doc__)
> +    ##
> +    # None is a special value indicating to read flows from stdin.
> +    # This handles the case
> +    #   ovs-dpctl dump-flows | ovs-dpctl-flows.py
> +    parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
> +                        action="append",
> +                        help="file containing flows from ovs-dpctl dump-flow")
> +    parser.add_argument("-V", "--verbose", dest="verbose",
> +                        default=logging.CRITICAL,
> +                        action="store_const", const=logging.DEBUG,
> +                        help="enable debug level verbosity")
> +    parser.add_argument("-s", "--script", dest="top", action="store_false",
> +                        help="Run from a script (no user interface)")
> +    parser.add_argument("--host", dest="host",
> +                        help="Specify a user at host for retrieving flows see"
> +                             "Accessing Remote Hosts for more information")
> +
> +    parser.add_argument("-a", "--accumulate", dest="accumulate",
> +                        action="store_true", default=False,
> +                        help="Accumulate dump-flow content")
> +    parser.add_argument("--accumulate-decay", dest="accumulateDecay",
> +                        default=5.0 * 60, type=float,
> +                        help="Decay old accumulated flows. "
> +                        "The default is 5 minutes. "
> +                        "A value of 0 disables decay.")
> +    parser.add_argument("-d", "--delay", dest="delay", type=int,
> +                        default=1000,
> +                        help="Delay in milliseconds to collect dump-flow "
> +                             "content (sample rate).")
> +
> +    args = parser.parse_args()
> +
> +    logging.basicConfig(level=args.verbose)
> +
> +    return args
> +
> +###
> +# Code to parse a single line in dump-flow
> +###
> +# key(values)
> +FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
> +# key:value
> +FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
> +FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
> +
> +
> +def flow_line_iter(line):
> +    """ iterate over flow dump elements.
> +    return tuples of (true, element) or (false, remaining element)
> +    """
> +    # splits by , except for when in a (). Actions element was not
> +    # split properly but we don't need it.
> +    rc = []
> +
> +    element = ""
> +    paren_count = 0
> +
> +    for ch in line:
> +        if (ch == '('):
> +            paren_count += 1
> +        elif (ch == ')'):
> +            paren_count -= 1
> +
> +        if (ch == ' '):
> +            # ignore white space.
> +            continue
> +        elif ((ch == ',') and (paren_count == 0)):
> +            rc.append(element)
> +            element = ""
> +        else:
> +            element += ch
> +
> +    if (paren_count):
> +        raise ValueError(line)
> +    else:
> +        if (len(element) > 0):
> +            rc.append(element)
> +    return rc
> +
> +
> +def flow_line_compound_parse(compound):
> +    """ Parse compound element
> +    for example
> +    src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
> +    which is in
> +    eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
> +    """
> +    result = {}
> +    for element in flow_line_iter(compound):
> +        match = FIELDS_CMPND_ELEMENT.search(element)
> +        if (match):
> +            key = match.group(1)
> +            value = match.group(2)
> +            result[key] = value
> +
> +        match = FIELDS_CMPND.search(element)
> +        if (match):
> +            key = match.group(1)
> +            value = match.group(2)
> +            result[key] = flow_line_compound_parse(value)
> +            continue
> +
> +    if (len(result.keys()) == 0):
> +        return compound
> +    return result
> +
> +
> +def flow_line_split(line):
> +    """ Convert a flow dump line into ([fields], [stats], actions) tuple.
> +    Where fields and stats are lists.
> +    This function relies on a the following ovs-dpctl dump-flow
> +    output characteristics:
> +    1. The dumpe flow line consists of a list of frame fields, list of stats
> +       and action.
> +    2. list of frame fields, each stat and action field are delimited by ', '.
> +    3. That all other non stat field are not delimited by ', '.
> +
> +    """
> +
> +    results = re.split(', ', line)
> +
> +    (field, stats, action) = (results[0], results[1:-1], results[-1])
> +
> +    fields = flow_line_iter(field)
> +    return (fields, stats, action)
> +
> +
> +def elements_to_dict(elements):
> +    """ Convert line to a hierarchy of dictionaries. """
> +    result = {}
> +    for element in elements:
> +        match = FIELDS_CMPND.search(element)
> +        if (match):
> +            key = match.group(1)
> +            value = match.group(2)
> +            result[key] = flow_line_compound_parse(value)
> +            continue
> +
> +        match = FIELDS_ELEMENT.search(element)
> +        if (match):
> +            key = match.group(1)
> +            value = match.group(2)
> +            result[key] = value
> +        else:
> +            raise ValueError("can't parse >%s<" % element)
> +    return result
> +
> +
> +# pylint: disable-msg=R0903
> +class SumData(object):
> +    """ Interface that all data going into SumDb must implement.
> +    Holds the flow field and its corresponding count, total packets,
> +    total bytes and calculates average.
> +
> +    __repr__ is used as key into SumData singleton.
> +    __str__ is used as human readable output.
> +    """
> +
> +    def __init__(self, field_type, field, packets, flow_bytes, key):
> +        # Count is the number of lines in the dump-flow log.
> +        self.field_type = field_type
> +        self.field = field
> +        self.count = 1
> +        self.packets = int(packets)
> +        self.bytes = int(flow_bytes)
> +        self.key = key
> +
> +    def decrement(self, decr_packets, decr_bytes, decr_count):
> +        """ Decrement content to calculate delta from previous flow sample."""
> +        self.packets -= decr_packets
> +        self.bytes -= decr_bytes
> +        self.count -= decr_count
> +
> +    def __iadd__(self, other):
> +        """ Add two objects. """
> +
> +        if (self.key != other.key):
> +            raise ValueError("adding two unrelated types")
> +
> +        self.count += other.count
> +        self.packets += other.packets
> +        self.bytes += other.bytes
> +        return self
> +
> +    def __isub__(self, other):
> +        """ Decrement two objects. """
> +
> +        if (self.key != other.key):
> +            raise ValueError("adding two unrelated types")
> +
> +        self.count -= other.count
> +        self.packets -= other.packets
> +        self.bytes -= other.bytes
> +        return self
> +
> +    def __getattr__(self, name):
> +        """ Handle average. """
> +        if (name == "average"):
> +            if (self.packets == 0):
> +                return float(0.0)
> +            else:
> +                return float(self.bytes) / float(self.packets)
> +        raise AttributeError(name)
> +
> +    def __str__(self):
> +        """ Used for debugging. """
> +        return "%s %s %s %s" % (self.field, self.count,
> +                                   self.packets, self.bytes)
> +
> +    def __repr__(self):
> +        """ Used as key in the FlowDB table. """
> +        return self.key
> +
> +
> +def flow_aggregate(fields_dict, stats_dict):
> +    """ Search for content in a line.
> +    Passed the flow port of the dump-flows plus the current stats consisting
> +    of packets, bytes, etc
> +    """
> +    result = []
> +
> +    for output_format in OUTPUT_FORMAT:
> +        field = fields_dict.get(output_format.field_type, None)
> +        if (field):
> +            obj = output_format.generator(output_format.field_type,
> +                                          field, stats_dict)
> +            result.append(obj)
> +
> +    return result
> +
> +
> +def flows_read(ihdl, flow_db):
> +    """ read flow content from ihdl and insert into flow_db. """
> +
> +    done = False
> +    while (not done):
> +        line = ihdl.readline()
> +        if (len(line) == 0):
> +            # end of input
> +            break
> +
> +        try:
> +            flow_db.flow_line_add(line)
> +        except ValueError, arg:
> +            logging.error(arg)
> +
> +    return flow_db
> +
> +
> +def get_terminal_size():
> +    """
> +    return column width and height of the terminal
> +    """
> +    for fd_io in [0, 1, 2]:
> +        try:
> +            result = struct.unpack('hh',
> +                                   fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
> +                                               '1234'))
> +        except IOError:
> +            result = None
> +            continue
> +
> +    if (result is None or result == (0, 0)):
> +        # Maybe we can't get the width. In that case assume (25, 80)
> +        result = (25, 80)
> +
> +    return result
> +
> +##
> +# Content derived from:
> +# http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
> +##
> +SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
> +            1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
> +
> +
> +def approximate_size(size, a_kilobyte_is_1024_bytes=True):
> +    """Convert a file size to human-readable form.
> +
> +    Keyword arguments:
> +    size -- file size in bytes
> +    a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
> +                                    if False, use multiples of 1000
> +
> +    Returns: string
> +
> +    """
> +    size = float(size)
> +    if size < 0:
> +        raise ValueError('number must be non-negative')
> +
> +    if (a_kilobyte_is_1024_bytes):
> +        multiple = 1024
> +    else:
> +        multiple = 1000
> +    for suffix in SUFFIXES[multiple]:
> +        size /= multiple
> +        if size < multiple:
> +            return '{0:.1f} {1}'.format(size, suffix)
> +
> +    raise ValueError('number too large')
> +
> +
> +##
> +# End copied content
> +##
> +class ColMeta:
> +    """ Concepts about columns. """
> +    def __init__(self, sortable, width):
> +        self.sortable = sortable
> +        self.width = width
> +
> +
> +class RowMeta:
> +    """ How to render rows. """
> +    def __init__(self, label, fmt):
> +        self.label = label
> +        self.fmt = fmt
> +
> +
> +def fmt_packet(obj, width):
> +    """ Provide a string for packets that is appropriate for output."""
> +    return str(obj.packets).rjust(width)
> +
> +
> +def fmt_count(obj, width):
> +    """ Provide a string for average that is appropriate for output."""
> +    return str(obj.count).rjust(width)
> +
> +
> +def fmt_avg(obj, width):
> +    """ Provide a string for average that is appropriate for output."""
> +    return str(int(obj.average)).rjust(width)
> +
> +
> +def fmt_field(obj, width):
> +    """ truncate really long flow and insert ellipses to help make it
> +    clear.
> +    """
> +
> +    ellipses = " ... "
> +    value = obj.field
> +    if (len(obj.field) > width):
> +        value = value[:(width - len(ellipses))] + ellipses
> +    return value.ljust(width)
> +
> +
> +def fmt_bytes(obj, width):
> +    """ Provide a string for average that is appropriate for output."""
> +    if (len(str(obj.bytes)) <= width):
> +        value = str(obj.bytes)
> +    else:
> +        value = approximate_size(obj.bytes)
> +    return value.rjust(width)
> +
> +
> +def title_center(value, width):
> +    """ Center a column title."""
> +    return value.upper().center(width)
> +
> +
> +def title_rjust(value, width):
> +    """ Right justify a column title. """
> +    return value.upper().rjust(width)
> +
> +
> +def column_picker(order, obj):
> +    """ return the column as specified by order. """
> +    if (order == 1):
> +        return obj.count
> +    elif (order == 2):
> +        return obj.packets
> +    elif (order == 3):
> +        return obj.bytes
> +    elif (order == 4):
> +        return obj.average
> +    else:
> +        raise ValueError("order outside of range %s" % order)
> +
> +
> +class Render:
> +    """ Renders flow data. """
> +    def __init__(self, console_width):
> +        """ Calculate column widths taking into account changes in format."""
> +
> +        self._start_time = datetime.datetime.now()
> +
> +        self._cols = [ColMeta(False, 0),
> +                      ColMeta(True, Columns.VALUE_WIDTH),
> +                      ColMeta(True, Columns.VALUE_WIDTH),
> +                      ColMeta(True, Columns.VALUE_WIDTH),
> +                      ColMeta(True, Columns.VALUE_WIDTH)]
> +        self._console_width = console_width
> +        self.console_width_set(console_width)
> +
> +        # Order in this array dictate the order of the columns.
> +        # The 0 width for the first entry is a place holder. This is
> +        # dynamically calculated. The first column is special. We need a
> +        # way to indicate which field are presented.
> +        self._descs = [RowMeta("", title_rjust),
> +                       RowMeta("", title_rjust),
> +                       RowMeta("", title_rjust),
> +                       RowMeta("", title_rjust),
> +                       RowMeta("", title_rjust)]
> +        self._column_sort_select = 0
> +        self.column_select_event()
> +
> +        self._titles = [
> +            RowMeta(Columns.FIELDS, title_center),
> +            RowMeta(Columns.COUNT, title_rjust),
> +            RowMeta(Columns.PACKETS, title_rjust),
> +            RowMeta(Columns.BYTES, title_rjust),
> +            RowMeta(Columns.AVERAGE, title_rjust)
> +        ]
> +
> +        self._datas = [
> +            RowMeta(None, fmt_field),
> +            RowMeta(None, fmt_count),
> +            RowMeta(None, fmt_packet),
> +            RowMeta(None, fmt_bytes),
> +            RowMeta(None, fmt_avg)
> +            ]
> +
> +        ##
> +        # _field_types hold which fields are displayed in the field
> +        # column, with the keyword all implying all fields.
> +        ##
> +        self._field_types = ["all"] + [ii.field_type for ii in OUTPUT_FORMAT]
> +
> +        ##
> +        # The default is to show all field types.
> +        ##
> +        self._field_type_select = -1
> +        self.field_type_toggle()
> +
> +    def _field_type_select_get(self):
> +        """ Return which field type to display. """
> +        return self._field_types[self._field_type_select]
> +
> +    def field_type_toggle(self):
> +        """ toggle which field types to show. """
> +        self._field_type_select += 1
> +        if (self._field_type_select >= len(self._field_types)):
> +            self._field_type_select = 0
> +        value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
> +        self._titles[0].label = value
> +
> +    def column_select_event(self):
> +        """ Handles column select toggle. """
> +
> +        self._descs[self._column_sort_select].label = ""
> +        for _ in range(len(self._cols)):
> +            self._column_sort_select += 1
> +            if (self._column_sort_select >= len(self._cols)):
> +                self._column_sort_select = 0
> +
> +            # Now look for the next sortable column
> +            if (self._cols[self._column_sort_select].sortable):
> +                break
> +        self._descs[self._column_sort_select].label = "DESC"
> +
> +    def console_width_set(self, console_width):
> +        """ Adjust the output given the new console_width. """
> +        self._console_width = console_width
> +
> +        spaces = len(self._cols) - 1
> +        ##
> +        # Calculating column width can be tedious but important. The
> +        # flow field value can be long. The goal here is to dedicate
> +        # fixed column space for packets, bytes, average and counts. Give the
> +        # remaining space to the flow column. When numbers get large
> +        # transition output to output generated by approximate_size which
> +        # limits output to ###.# XiB in other words 9 characters.
> +        ##
> +        # At this point, we know the maximum length values. We may
> +        # truncate the flow column to get everything to fit.
> +        self._cols[0].width = 0
> +        values_max_length = sum([ii.width for ii in self._cols]) + spaces
> +        flow_max_length = console_width - values_max_length
> +        self._cols[0].width = flow_max_length
> +
> +    def format(self, flow_db):
> +        """ shows flows based on --script parameter."""
> +
> +        rc = []
> +        ##
> +        # Top output consists of
> +        # Title
> +        # Column title (2 rows)
> +        # data
> +        # statistics and status
> +
> +        ##
> +        # Title
> +        ##
> +        rc.append("Flow Summary".center(self._console_width))
> +
> +        stats = " Total: %(flow_total)s  errors: %(flow_errors)s " % \
> +                  flow_db.flow_stats_get()
> +        accumulate = flow_db.accumulate_get()
> +        if (accumulate):
> +            stats += "Accumulate: on "
> +        else:
> +            stats += "Accumulate: off "
> +
> +        duration = datetime.datetime.now() - self._start_time
> +        stats += "Duration: %s " % str(duration)
> +        rc.append(stats.ljust(self._console_width))
> +
> +        ##
> +        # 2 rows for columns.
> +        ##
> +        # Indicate which column is in descending order.
> +        rc.append(" ".join([ii.fmt(ii.label, col.width)
> +                            for (ii, col) in zip(self._descs, self._cols)]))
> +
> +        rc.append(" ".join([ii.fmt(ii.label, col.width)
> +                         for (ii, col) in zip(self._titles, self._cols)]))
> +
> +        ##
> +        # Data.
> +        ##
> +        for dd in flow_db.field_values_in_order(self._field_type_select_get(),
> +                                                self._column_sort_select):
> +            rc.append(" ".join([ii.fmt(dd, col.width)
> +                                for (ii, col) in zip(self._datas,
> +                                                     self._cols)]))
> +
> +        return rc
> +
> +
> +def curses_screen_begin():
> +    """ begin curses screen control. """
> +    stdscr = curses.initscr()
> +    curses.cbreak()
> +    curses.noecho()
> +    stdscr.keypad(1)
> +    return stdscr
> +
> +
> +def curses_screen_end(stdscr):
> +    """ end curses screen control. """
> +    curses.nocbreak()
> +    stdscr.keypad(0)
> +    curses.echo()
> +    curses.endwin()
> +
> +
> +class FlowDB:
> +    """ Implements live vs accumulate mode.
> +
> +    Flows are stored as key value pairs. The key consists of the content
> +    prior to stat fields. The value portion consists of stats in a dictionary
> +    form.
> +
> +    @ \todo future add filtering here.
> +    """
> +    def __init__(self, accumulate):
> +        self._accumulate = accumulate
> +        self._error_count = 0
> +        # Values are (stats, last update time.)
> +        # The last update time is used for aging.
> +        self._flow_lock = threading.Lock()
> +        # This dictionary holds individual flows.
> +        self._flows = {}
> +        # This dictionary holds aggregate of flow fields.
> +        self._fields = {}
> +
> +    def accumulate_get(self):
> +        """ Return the current accumulate state. """
> +        return self._accumulate
> +
> +    def accumulate_toggle(self):
> +        """ toggle accumulate flow behavior. """
> +        self._accumulate = not self._accumulate
> +
> +    def begin(self):
> +        """ Indicate the beginning of processing flow content.
> +        if accumulate is false clear current set of flows. """
> +
> +        if (not self._accumulate):
> +            self._flow_lock.acquire()
> +            try:
> +                self._flows.clear()
> +            finally:
> +                self._flow_lock.release()
> +            self._fields.clear()
> +
> +    def flow_line_add(self, line):
> +        """ Split a line from a ovs-dpctl dump-flow into key and stats.
> +        The order of the content in the flow should be:
> +        - flow content
> +        - stats for the flow
> +        - actions
> +
> +        This method also assumes that the dump flow output does not
> +        change order of fields of the same flow.
> +        """
> +
> +        line = line.rstrip("\n")
> +        (fields, stats, _) = flow_line_split(line)
> +
> +        try:
> +            fields_dict = elements_to_dict(fields)
> +
> +            if (len(fields_dict) == 0):
> +                raise ValueError("flow fields are missing %s", line)
> +
> +            stats_dict = elements_to_dict(stats)
> +            if (len(stats_dict) == 0):
> +                raise ValueError("statistics are missing %s.", line)
> +
> +            ##
> +            # In accumulate mode, the Flow database can reach 10,000's of
> +            # persistent flows. The interaction of the script with this many
> +            # flows is too slow. Instead, delta are sent to the flow_db
> +            # database allow incremental changes to be done in O(m) time
> +            # where m is the current flow list, instead of iterating over
> +            # all flows in O(n) time where n is the entire history of flows.
> +            key = ",".join(fields)
> +
> +            self._flow_lock.acquire()
> +            try:
> +                (stats_old_dict, _) = self._flows.get(key, (None, None))
> +            finally:
> +                self._flow_lock.release()
> +
> +            self.flow_event(fields_dict, stats_old_dict, stats_dict)
> +
> +        except ValueError, arg:
> +            logging.error(arg)
> +            self._error_count += 1
> +            raise
> +
> +        self._flow_lock.acquire()
> +        try:
> +            self._flows[key] = (stats_dict, datetime.datetime.now())
> +        finally:
> +            self._flow_lock.release()
> +
> +    def decay(self, decayTimeInSeconds):
> +        """ Decay content. """
> +        now = datetime.datetime.now()
> +        for (key, value) in self._flows.items():
> +            (stats_dict, updateTime) = value
> +            delta = now - updateTime
> +
> +            if (delta.seconds > decayTimeInSeconds):
> +                self._flow_lock.acquire()
> +                try:
> +                    del self._flows[key]
> +
> +                    fields_dict = elements_to_dict(flow_line_iter(key))
> +                    matches = flow_aggregate(fields_dict, stats_dict)
> +                    for match in matches:
> +                        self.field_dec(match)
> +
> +                finally:
> +                    self._flow_lock.release()
> +
> +    def flow_stats_get(self):
> +        """ Return statistics in a form of a dictionary. """
> +        rc = None
> +        self._flow_lock.acquire()
> +        try:
> +            rc = {"flow_total": len(self._flows),
> +                  "flow_errors": self._error_count}
> +        finally:
> +            self._flow_lock.release()
> +        return rc
> +
> +    def field_types_get(self):
> +        """ Return the set of types stored in the singleton. """
> +        types = set((ii.field_type for ii in self._fields.values()))
> +        return types
> +
> +    def field_add(self, data):
> +        """ Collect dump-flow data to sum number of times item appears. """
> +        current = self._fields.get(repr(data), None)
> +        if (current is None):
> +            current = copy.copy(data)
> +        else:
> +            current += data
> +        self._fields[repr(current)] = current
> +
> +    def field_dec(self, data):
> +        """ Collect dump-flow data to sum number of times item appears. """
> +        current = self._fields.get(repr(data), None)
> +        if (current is None):
> +            raise ValueError("decrementing field missing %s" % repr(data))
> +
> +        current -= data
> +        self._fields[repr(current)] = current
> +        if (current.count == 0):
> +            del self._fields[repr(current)]
> +
> +    def field_values_in_order(self, field_type_select, column_order):
> +        """ Return a list of items in order maximum first. """
> +        values = self._fields.values()
> +        if (field_type_select != "all"):
> +            # If a field type other than "all" then reduce the list.
> +            values = [ii for ii in values
> +                      if (ii.field_type == field_type_select)]
> +        values = [(column_picker(column_order, ii), ii) for ii in values]
> +        values.sort(key=operator.itemgetter(0))
> +        values.reverse()
> +        values = [ii[1] for ii in values]
> +        return values
> +
> +    def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
> +        """ Receives new flow information. """
> +
> +        # In order to avoid processing every flow at every sample
> +        # period, changes in flow packet count is used to determine the
> +        # delta in the flow statistics. This delta is used in the call
> +        # to self.decrement prior to self.field_add
> +
> +        if (stats_old_dict is None):
> +            # This is a new flow
> +            matches = flow_aggregate(fields_dict, stats_new_dict)
> +            for match in matches:
> +                self.field_add(match)
> +        else:
> +            old_packets = int(stats_old_dict.get("packets", 0))
> +            new_packets = int(stats_new_dict.get("packets", 0))
> +            if (old_packets == new_packets):
> +                # ignore. same data.
> +                pass
> +            else:
> +                old_bytes = stats_old_dict.get("bytes", 0)
> +                # old_packets != new_packets
> +                # if old_packets > new_packets then we end up decrementing
> +                # packets and bytes.
> +                matches = flow_aggregate(fields_dict, stats_new_dict)
> +                for match in matches:
> +                    match.decrement(int(old_packets), int(old_bytes), 1)
> +                    self.field_add(match)
> +
> +
> +class DecayThread(threading.Thread):
> +    """ Periodically call flow database to see if any flows are old. """
> +    def __init__(self, flow_db, interval):
> +        """ Start decay thread. """
> +        threading.Thread.__init__(self)
> +
> +        self._interval = max(1, interval)
> +        self._min_interval = min(1, interval / 10)
> +        self._flow_db = flow_db
> +        self._event = threading.Event()
> +        self._running = True
> +
> +        self.daemon = True
> +
> +    def run(self):
> +        """ Worker thread which handles decaying accumulated flows. """
> +
> +        while(self._running):
> +            self._event.wait(self._min_interval)
> +            if (self._running):
> +                self._flow_db.decay(self._interval)
> +
> +    def stop(self):
> +        """ Stop thread. """
> +        self._running = False
> +        self._event.set()
> +        ##
> +        # Give the calling thread time to terminate but not too long.
> +        # this thread is a daemon so the application will terminate if
> +        # we timeout during the join. This is just a cleaner way to
> +        # release resources.
> +        self.join(2.0)
> +
> +
> +def flow_top_command(stdscr, render, flow_db):
> +    """ Handle input while in top mode. """
> +    ch = stdscr.getch()
> +    ##
> +    # Any character will restart sampling.
> +    if (ch == ord('h')):
> +        # halt output.
> +        ch = stdscr.getch()
> +        while (ch == -1):
> +            ch = stdscr.getch()
> +
> +    if (ch == ord('s')):
> +        # toggle which column sorts data in descending order.
> +        render.column_select_event()
> +    elif (ch == ord('a')):
> +        flow_db.accumulate_toggle()
> +    elif (ch == ord('f')):
> +        render.field_type_toggle()
> +    elif (ch == ord(' ')):
> +        # resample
> +        pass
> +
> +    return ch
> +
> +
> +def decay_timer_start(flow_db, accumulateDecay):
> +    """ If accumulateDecay greater than zero then start timer. """
> +    if (accumulateDecay > 0):
> +        decay_timer = DecayThread(flow_db, accumulateDecay)
> +        decay_timer.start()
> +        return decay_timer
> +    else:
> +        return None
> +
> +
> +def flows_top(args):
> +    """ handles top like behavior when --script is not specified. """
> +
> +    flow_db = FlowDB(args.accumulate)
> +    render = Render(0)
> +
> +    decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
> +    lines = []
> +
> +    try:
> +        stdscr = curses_screen_begin()
> +        try:
> +            ch = 'X'
> +            #stdscr.nodelay(1)
> +            stdscr.timeout(args.delay)
> +
> +            while (ch != ord('q')):
> +                flow_db.begin()
> +
> +                try:
> +                    ihdl = top_input_get(args)
> +                    try:
> +                        flows_read(ihdl, flow_db)
> +                    finally:
> +                        ihdl.close()
> +                except OSError, arg:
> +                    logging.critical(arg)
> +                    break
> +
> +                (console_height, console_width) = stdscr.getmaxyx()
> +                render.console_width_set(console_width)
> +
> +                output_height = console_height - 1
> +                line_count = range(output_height)
> +                line_output = render.format(flow_db)
> +                lines = zip(line_count, line_output[:output_height])
> +
> +                stdscr.erase()
> +                for (count, line) in lines:
> +                    stdscr.addstr(count, 0, line[:console_width])
> +                stdscr.refresh()
> +
> +                ch = flow_top_command(stdscr, render, flow_db)
> +
> +        finally:
> +            curses_screen_end(stdscr)
> +    except KeyboardInterrupt:
> +        pass
> +    if (decay_timer):
> +        decay_timer.stop()
> +
> +    # repeat output
> +    for (count, line) in lines:
> +        print line
> +
> +
> +def flows_script(args):
> +    """ handles --script option. """
> +
> +    flow_db = FlowDB(args.accumulate)
> +    flow_db.begin()
> +
> +    if (args.flowFiles is None):
> +        logging.info("reading flows from stdin")
> +        ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
> +        try:
> +            flow_db = flows_read(ihdl, flow_db)
> +        finally:
> +            ihdl.close()
> +    else:
> +        for flowFile in args.flowFiles:
> +            logging.info("reading flows from %s", flowFile)
> +            ihdl = open(flowFile, "r")
> +            try:
> +                flow_db = flows_read(ihdl, flow_db)
> +            finally:
> +                ihdl.close()
> +
> +    (_, console_width) = get_terminal_size()
> +    render = Render(console_width)
> +
> +    for line in render.format(flow_db):
> +        print line
> +
> +
> +def main():
> +    """ Return 0 on success or 1 on failure.
> +
> +    Algorithm
> +    There are four stages to the process ovs-dpctl dump-flow content.
> +    1. Retrieve current input
> +    2. store in FlowDB and maintain history
> +    3. Iterate over FlowDB and aggregating stats for each flow field
> +    4. present data.
> +
> +    Retrieving current input is currently trivial, the ovs-dpctl dump-flow
> +    is called. Future version will have more elaborate means for collecting
> +    dump-flow content. FlowDB returns all data as in the form of a hierarchical
> +    dictionary. Input will vary.
> +
> +    In the case of accumulate mode, flows are not purged from the FlowDB
> +    manager. Instead at the very least, merely the latest statistics are
> +    kept. In the case, of live output the FlowDB is purged prior to sampling
> +    data.
> +
> +    Aggregating results requires identify flow fields to aggregate out
> +    of the flow and summing stats.
> +
> +    """
> +    args = args_get()
> +
> +    try:
> +        if (args.top):
> +            flows_top(args)
> +        else:
> +            flows_script(args)
> +    except KeyboardInterrupt:
> +        return 1
> +    return 0
> +
> +if __name__ == '__main__':
> +    sys.exit(main())
> +elif __name__ == 'ovs-dpctl-top':
> +    # pylint: disable-msg=R0915
> +
> +    ##
> +    # Test case beyond this point.
> +    # pylint: disable-msg=R0904
> +    class TestsuiteFlowParse(unittest.TestCase):
> +        """
> +        parse flow into hierarchy of dictionaries.
> +        """
> +        def test_flow_parse(self):
> +            """ test_flow_parse. """
> +            line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> +                   "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> +                   "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> +                   "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> +                   "udp(src=61252,dst=5355), packets:1, bytes:92, "\
> +                   "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> +                   "38,41,44,47,50,53,56,59,62,65"
> +
> +            (fields, stats, _) = flow_line_split(line)
> +            flow_dict = elements_to_dict(fields + stats)
> +            self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
> +            self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
> +            self.assertEqual(flow_dict["ipv6"]["src"],
> +                             "fe80::55bf:fe42:bc96:2812")
> +            self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
> +            self.assertEqual(flow_dict["packets"], "1")
> +            self.assertEqual(flow_dict["bytes"], "92")
> +
> +            line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> +                   "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> +                   "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> +                   "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> +                   "udp(src=61252,dst=5355), packets:1, bytes:92, "\
> +                   "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> +                   "38,41,44,47,50,53,56,59,62,65"
> +
> +            (fields, stats, _) = flow_line_split(line)
> +            flow_dict = elements_to_dict(fields + stats)
> +            self.assertEqual(flow_dict["used"], "-0.703s")
> +            self.assertEqual(flow_dict["packets"], "1")
> +            self.assertEqual(flow_dict["bytes"], "92")
> +
> +        def test_flow_sum(self):
> +            """ test_flow_sum. """
> +            line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> +                   "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> +                   "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> +                   "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> +                   "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> +                   "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> +                   "38,41,44,47,50,53,56,59,62,65"
> +
> +            (fields, stats, _) = flow_line_split(line)
> +            stats_dict = elements_to_dict(stats)
> +            fields_dict = elements_to_dict(fields)
> +            ##
> +            # Test simple case of one line.
> +            flow_db = FlowDB(False)
> +            matches = flow_aggregate(fields_dict, stats_dict)
> +            for match in matches:
> +                flow_db.field_add(match)
> +
> +            flow_types = flow_db.field_types_get()
> +            expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
> +            self.assert_(len(flow_types) == len(expected_flow_types))
> +            for flow_type in flow_types:
> +                self.assertTrue(flow_type in expected_flow_types)
> +
> +            for flow_type in flow_types:
> +                sum_value = flow_db.field_values_in_order("all", 1)
> +                self.assert_(len(sum_value) == 5)
> +                self.assert_(sum_value[0].packets == 2)
> +                self.assert_(sum_value[0].count == 1)
> +                self.assert_(sum_value[0].bytes == 92)
> +
> +            ##
> +            # Add line again just to see counts go up.
> +            matches = flow_aggregate(fields_dict, stats_dict)
> +            for match in matches:
> +                flow_db.field_add(match)
> +
> +            flow_types = flow_db.field_types_get()
> +            self.assert_(len(flow_types) == len(expected_flow_types))
> +            for flow_type in flow_types:
> +                self.assertTrue(flow_type in expected_flow_types)
> +
> +            for flow_type in flow_types:
> +                sum_value = flow_db.field_values_in_order("all", 1)
> +                self.assert_(len(sum_value) == 5)
> +                self.assert_(sum_value[0].packets == 4)
> +                self.assert_(sum_value[0].count == 2)
> +                self.assert_(sum_value[0].bytes == 2 * 92)
> +
> +        def test_assoc_list(self):
> +            """ test_assoc_list. """
> +            line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> +                   "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> +                   "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> +                   "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> +                   "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> +                   "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> +                   "38,41,44,47,50,53,56,59,62,65"
> +
> +            valid_flows = [
> +                'eth_type(0x86dd)',
> +                'udp(dst=5355)',
> +                'in_port(4)',
> +                'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
> +                'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
> +                ]
> +
> +            (fields, stats, _) = flow_line_split(line)
> +            stats_dict = elements_to_dict(stats)
> +            fields_dict = elements_to_dict(fields)
> +
> +            ##
> +            # Test simple case of one line.
> +            flow_db = FlowDB(False)
> +            matches = flow_aggregate(fields_dict, stats_dict)
> +            for match in matches:
> +                flow_db.field_add(match)
> +
> +            for sum_value in flow_db.field_values_in_order("all", 1):
> +                assoc_list = Columns.assoc_list(sum_value)
> +                for item in assoc_list:
> +                    if (item[0] == "fields"):
> +                        self.assertTrue(item[1] in valid_flows)
> +                    elif (item[0] == "packets"):
> +                        self.assertTrue(item[1] == 2)
> +                    elif (item[0] == "count"):
> +                        self.assertTrue(item[1] == 1)
> +                    elif (item[0] == "average"):
> +                        self.assertTrue(item[1] == 46.0)
> +                    elif (item[0] == "bytes"):
> +                        self.assertTrue(item[1] == 92)
> +                    else:
> +                        raise ValueError("unknown %s", item[0])
> +
> +        def test_human_format(self):
> +            """ test_assoc_list. """
> +
> +            self.assertEqual(approximate_size(0.0), "0.0 KiB")
> +            self.assertEqual(approximate_size(1024), "1.0 KiB")
> +            self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
> +            self.assertEqual(approximate_size((1024 * 1024) + 100000),
> +                             "1.1 MiB")
> +            value = (1024 * 1024 * 1024) + 100000000
> +            self.assertEqual(approximate_size(value), "1.1 GiB")
> +
> +        def test_flow_line_split(self):
> +            """ Splitting a flow line is not trivial.
> +            There is no clear delimiter. Comma is used liberally."""
> +            expected_fields = ["in_port(4)",
> +                            "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
> +                            "eth_type(0x86dd)",
> +                           "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
> +                           "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
> +                           "udp(src=61252,dst=5355)"]
> +            expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
> +            expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
> +                               "38,41,44,47,50,53,56,59,62,65"
> +
> +            line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
> +                   "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
> +                   "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
> +                   "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
> +                   "udp(src=61252,dst=5355), packets:2, bytes:92, "\
> +                   "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
> +                   "38,41,44,47,50,53,56,59,62,65"
> +
> +            (fields, stats, actions) = flow_line_split(line)
> +
> +            self.assertEqual(fields, expected_fields)
> +            self.assertEqual(stats, expected_stats)
> +            self.assertEqual(actions, expected_actions)
> +
> +        def test_accumulate_decay(self):
> +            """ test_accumulate_decay: test accumulated decay. """
> +            lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
> +                     "dst=ff:ff:ff:ff:ff:ff),"
> +                     "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> +                     "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> +                     "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> +                     "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
> +                     "packets:1, bytes:120, used:0.004s, actions:1"]
> +
> +            flow_db = FlowDB(True)
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[0])
> +
> +            # Make sure we decay
> +            time.sleep(4)
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> +            flow_db.decay(1)
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
> +
> +            flow_db.flow_line_add(lines[0])
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> +            flow_db.decay(30)
> +            # Should not be deleted.
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> +
> +            flow_db.flow_line_add(lines[0])
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
> +            timer = decay_timer_start(flow_db, 2)
> +            time.sleep(10)
> +            self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
> +            timer.stop()
> +
> +        def test_accumulate(self):
> +            """ test_accumulate test that FlowDB supports accumulate. """
> +
> +            lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
> +                     "dst=ff:ff:ff:ff:ff:ff),"
> +                     "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> +                     "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> +                     "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> +                     "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
> +                     "packets:1, bytes:120, used:0.004s, actions:1",
> +                     "in_port(2),"
> +                     "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
> +                     "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
> +                     "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
> +                     "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
> +                     "packets:2, bytes:5026, used:0.348s, actions:1",
> +                     "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
> +                     "dst=ff:ff:ff:ff:ff:ff),"
> +                     "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
> +                     "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
> +                     "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
> +                     "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
> +                     "bytes:240, used:0.004s, actions:1"]
> +
> +            lines = [
> +                "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
> +                "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
> +                "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
> +                "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
> +                "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
> +                "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
> +                ]
> +
> +            # Turn on accumulate.
> +            flow_db = FlowDB(True)
> +            flow_db.begin()
> +
> +            flow_db.flow_line_add(lines[0])
> +
> +            # Test one flow exist.
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 1)
> +            self.assertEqual(in_ports[0].bytes, 120)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            # simulate another sample
> +            # Test two different flows exist.
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[1])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 1)
> +            self.assertEqual(in_ports[0].bytes, 120)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 126)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            # Test first flow increments packets.
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[2])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 240)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 126)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            # Test third flow but with the same in_port(1) as the first flow.
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[3])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 3)
> +            self.assertEqual(in_ports[0].bytes, 360)
> +            self.assertEqual(in_ports[0].count, 2)
> +
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 126)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            # Third flow has changes.
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[4])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 4)
> +            self.assertEqual(in_ports[0].bytes, 480)
> +            self.assertEqual(in_ports[0].count, 2)
> +
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 126)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +            # First flow reset.
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[5])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 3)
> +            self.assertEqual(in_ports[0].bytes, 360)
> +            self.assertEqual(in_ports[0].count, 2)
> +
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 2)
> +            self.assertEqual(in_ports[0].bytes, 126)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +        def test_parse_character_errors(self):
> +            """ test_parsing errors.
> +            The flow parses is purposely loose. Its not designed to validate
> +            input. Merely pull out what it can but there are situations
> +            that a parse error can be detected.
> +            """
> +
> +            lines = ["complete garbage",
> +                     "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
> +                     "dst=33:33:00:00:00:66),"
> +                     "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
> +                     "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
> +                     "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
> +                     "packets:2,bytes:5026,actions:1"]
> +
> +            flow_db = FlowDB(False)
> +            flow_db.begin()
> +            for line in lines:
> +                try:
> +                    flow_db.flow_line_add(line)
> +                except ValueError:
> +                    # We want an exception. That is how we know we have
> +                    # correctly found a simple parsing error. We are not
> +                    # looking to validate flow output just catch simple issues.
> +                    continue
> +                self.assertTrue(False)
> +
> +        def test_tunnel_parsing(self):
> +            """ test_tunnel_parsing test parse flows with tunnel. """
> +            lines = [
> +                "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
> +                "tos=0x0,ttl=64,flags(key)),in_port(1),"
> +                "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
> +                "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
> +                "actions:userspace(pid=4294962691,slow_path(cfm))"
> +                ]
> +            flow_db = FlowDB(False)
> +            flow_db.begin()
> +            flow_db.flow_line_add(lines[0])
> +            sum_values = flow_db.field_values_in_order("all", 1)
> +            in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
> +            self.assertEqual(len(in_ports), 1)
> +            self.assertEqual(in_ports[0].packets, 6)
> +            self.assertEqual(in_ports[0].bytes, 534)
> +            self.assertEqual(in_ports[0].count, 1)
> +
> +        def test_flow_multiple_paren(self):
> +            """ test_flow_multiple_paren. """
> +            line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
> +            valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
> +                     "in_port(2)"]
> +            rc = flow_line_iter(line)
> +            self.assertEqual(valid, rc)
> +
> +        def test_to_network(self):
> +            """ test_to_network test ipv4_to_network and ipv6_to_network. """
> +            ipv4s = [
> +                ("192.168.0.1", "192.168.0.1"),
> +                ("192.168.0.1/255.255.255.255", "192.168.0.1"),
> +                ("192.168.0.1/255.255.255.0", "192.168.0.0"),
> +                ("192.168.0.1/255.255.0.0", "192.168.0.0"),
> +                ("192.168.0.1/255.0.0.0", "192.0.0.0"),
> +                ("192.168.0.1/0.0.0.0", "0.0.0.0"),
> +                ("10.24.106.230/255.255.255.255", "10.24.106.230"),
> +                ("10.24.106.230/255.255.255.0", "10.24.106.0"),
> +                ("10.24.106.0/255.255.255.0", "10.24.106.0"),
> +                ("10.24.106.0/255.255.252.0", "10.24.104.0")
> +                ]
> +
> +            ipv6s = [
> +                ("1::192:168:0:1", "1::192:168:0:1"),
> +                ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
> +                ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
> +                ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
> +                ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
> +                ("1::192:168:0:1/1::0:0:0:0", "1::"),
> +                ("1::192:168:0:1/::", "::")
> +                ]
> +
> +            for (ipv4_test, ipv4_check) in ipv4s:
> +                self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
> +
> +            for (ipv6_test, ipv6_check) in ipv6s:
> +                self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
> diff --git a/xenserver/openvswitch-xen.spec.in b/xenserver/openvswitch-xen.spec.in
> index 4d3b8fa..18fc77b 100644
> --- a/xenserver/openvswitch-xen.spec.in
> +++ b/xenserver/openvswitch-xen.spec.in
> @@ -430,6 +430,7 @@ exit 0
>  /usr/bin/ovs-parse-backtrace
>  /usr/bin/ovs-pcap
>  /usr/bin/ovs-tcpundump
> +/usr/bin/ovs-dpctl-top
>  /usr/bin/ovs-vlan-test
>  /usr/bin/ovs-vsctl
>  /usr/bin/ovsdb-client
> @@ -443,6 +444,7 @@ exit 0
>  /usr/share/man/man8/ovs-bugtool.8.gz
>  /usr/share/man/man8/ovs-ctl.8.gz
>  /usr/share/man/man8/ovs-dpctl.8.gz
> +/usr/share/man/man8/ovs-dpctl-top.8.gz
>  /usr/share/man/man8/ovs-ofctl.8.gz
>  /usr/share/man/man8/ovs-parse-backtrace.8.gz
>  /usr/share/man/man1/ovs-pcap.1.gz
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> dev at openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
dev at openvswitch.org
http://openvswitch.org/mailman/listinfo/dev



More information about the dev mailing list