[ovs-dev] [PATCH 2/2] netflow: Implement NetFlow active timeouts.

Jesse Gross jesse at nicira.com
Sat Oct 31 00:14:18 UTC 2009


Provides a NetFlow expiration message at regular intervals if the
key netflow.<br>.active-timeout is set.

Feature #1317
---
 secchan/main.c                  |    2 +-
 secchan/netflow.c               |   79 ++++++++++++++++++++++++++----
 secchan/netflow.h               |   25 ++++++++-
 secchan/ofproto.c               |  102 ++++++++++++++++++++++++++-------------
 secchan/ofproto.h               |   10 ++--
 vswitchd/bridge.c               |   10 ++++-
 vswitchd/ovs-vswitchd.conf.5.in |   23 +++++----
 7 files changed, 188 insertions(+), 63 deletions(-)

diff --git a/secchan/main.c b/secchan/main.c
index ee29f27..1527114 100644
--- a/secchan/main.c
+++ b/secchan/main.c
@@ -168,7 +168,7 @@ main(int argc, char *argv[])
         ovs_fatal(error,
                   "failed to configure controller snooping connections");
     }
-    error = ofproto_set_netflow(ofproto, &s.netflow, 0, 0, false);
+    error = ofproto_set_netflow(ofproto, &s.netflow, 0, 0, 0, false);
     if (error) {
         ovs_fatal(error, "failed to configure NetFlow collectors");
     }
diff --git a/secchan/netflow.c b/secchan/netflow.c
index 282fd83..d7c9bb7 100644
--- a/secchan/netflow.c
+++ b/secchan/netflow.c
@@ -100,6 +100,7 @@ struct netflow {
                                    * bits of the interface fields. */
     uint32_t netflow_cnt;         /* Flow sequence number for NetFlow. */
     struct ofpbuf packet;         /* NetFlow packet being accumulated. */
+    int active_timeout;           /* Timeout for flows that are still active. */
 };
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -160,14 +161,19 @@ open_collector(char *dst)
 }
 
 void
-netflow_expire(struct netflow *nf, const struct ofexpired *expired)
+netflow_expire(struct netflow *nf, struct netflow_flow *nf_flow,
+               struct ofexpired *expired)
 {
     struct netflow_v5_header *nf_hdr;
     struct netflow_v5_record *nf_rec;
     struct timeval now;
 
-    /* NetFlow only reports on IP packets. */
-    if (expired->flow.dl_type != htons(ETH_TYPE_IP)) {
+    nf_flow->last_expired += nf->active_timeout;
+
+    /* NetFlow only reports on IP packets and we should only report flows
+     * that actually have traffic. */
+    if (expired->flow.dl_type != htons(ETH_TYPE_IP) ||
+        expired->packet_count - nf_flow->packet_count_off == 0) {
         return;
     }
 
@@ -201,10 +207,12 @@ netflow_expire(struct netflow *nf, const struct ofexpired *expired)
         nf_rec->input = htons(expired->flow.in_port);
         nf_rec->output = htons(expired->output_iface);
     }
-    nf_rec->packet_count = htonl(MIN(expired->packet_count, UINT32_MAX));
-    nf_rec->byte_count = htonl(MIN(expired->byte_count, UINT32_MAX));
-    nf_rec->init_time = htonl(expired->created - nf->boot_time);
-    nf_rec->used_time = htonl(MAX(expired->created, expired->used)
+    nf_rec->packet_count = htonl(MIN(expired->packet_count -
+                                     nf_flow->packet_count_off, UINT32_MAX));
+    nf_rec->byte_count = htonl(MIN(expired->byte_count -
+                                   nf_flow->byte_count_off, UINT32_MAX));
+    nf_rec->init_time = htonl(nf_flow->created - nf->boot_time);
+    nf_rec->used_time = htonl(MAX(nf_flow->created, expired->used)
                              - nf->boot_time);
     if (expired->flow.nw_proto == IP_TYPE_ICMP) {
         /* In NetFlow, the ICMP type and code are concatenated and
@@ -217,9 +225,15 @@ netflow_expire(struct netflow *nf, const struct ofexpired *expired)
         nf_rec->src_port = expired->flow.tp_src;
         nf_rec->dst_port = expired->flow.tp_dst;
     }
-    nf_rec->tcp_flags = expired->tcp_flags;
+    nf_rec->tcp_flags = nf_flow->tcp_flags;
     nf_rec->ip_proto = expired->flow.nw_proto;
-    nf_rec->ip_tos = expired->ip_tos;
+    nf_rec->ip_tos = nf_flow->ip_tos;
+
+    /* Update flow tracking data. */
+    nf_flow->created = 0;
+    nf_flow->packet_count_off = expired->packet_count;
+    nf_flow->byte_count_off = expired->byte_count;
+    nf_flow->tcp_flags = 0;
 
     /* NetFlow messages are limited to 30 records. */
     if (ntohs(nf_hdr->count) >= 30) {
@@ -259,7 +273,8 @@ clear_collectors(struct netflow *nf)
 }
 
 int
-netflow_set_collectors(struct netflow *nf, const struct svec *collectors_)
+netflow_set_collectors(struct netflow *nf, const struct svec *collectors_,
+                       int active_timeout)
 {
     struct svec collectors;
     int error = 0;
@@ -288,6 +303,9 @@ netflow_set_collectors(struct netflow *nf, const struct svec *collectors_)
     }
 
     svec_destroy(&collectors);
+
+    nf->active_timeout = active_timeout * 1000;
+
     return error;
 }
 
@@ -324,3 +342,44 @@ netflow_destroy(struct netflow *nf)
         free(nf);
     }
 }
+
+void
+netflow_flow_clear(struct netflow_flow *nf_flow)
+{
+    memset(nf_flow, 0, sizeof *nf_flow);
+}
+
+void
+netflow_flow_update_time(struct netflow_flow *nf_flow, long long int used)
+{
+    if (!nf_flow->created) {
+        nf_flow->created = used;
+
+        if (!nf_flow->last_expired) {
+            nf_flow->last_expired = time_msec();
+        }
+    }
+}
+
+void
+netflow_flow_update_flags(struct netflow_flow *nf_flow, uint8_t ip_tos,
+                          uint8_t tcp_flags)
+{
+    nf_flow->ip_tos = ip_tos;
+    nf_flow->tcp_flags |= tcp_flags;
+}
+
+bool
+netflow_active_timeout_expired(struct netflow *nf, struct netflow_flow *nf_flow)
+{
+    long long int now = time_msec();
+
+    if (nf && nf->active_timeout) {
+        return now > nf_flow->last_expired + nf->active_timeout;
+    } else {
+        /* Keep the time updated to prevent a flood of expiration if active
+         * timeouts are enabled in the future. */
+        nf_flow->last_expired = now;
+        return false;
+    }
+}
diff --git a/secchan/netflow.h b/secchan/netflow.h
index a5ad336..2812f0e 100644
--- a/secchan/netflow.h
+++ b/secchan/netflow.h
@@ -28,12 +28,31 @@ enum netflow_output_ports {
     NF_OUT_DROP = UINT16_MAX - 2
 };
 
+struct netflow_flow {
+    long long int last_expired;   /* Time this flow last timed out. */
+    long long int created;        /* Time flow was created since time out. */
+
+    uint64_t packet_count_off;    /* Packet count at last time out. */
+    uint64_t byte_count_off;      /* Byte count at last time out. */
+
+    uint8_t ip_tos;               /* Last-seen IP type-of-service. */
+    uint8_t tcp_flags;            /* Bitwise-OR of all TCP flags seen. */
+};
+
 struct netflow *netflow_create(void);
 void netflow_destroy(struct netflow *);
-int netflow_set_collectors(struct netflow *, const struct svec *collectors);
+int netflow_set_collectors(struct netflow *, const struct svec *collectors,
+                           int active_timeout);
 void netflow_set_engine(struct netflow *nf, uint8_t engine_type, 
-        uint8_t engine_id, bool add_id_to_iface);
-void netflow_expire(struct netflow *, const struct ofexpired *);
+                        uint8_t engine_id, bool add_id_to_iface);
+void netflow_expire(struct netflow *, struct netflow_flow *,
+                    struct ofexpired *);
 void netflow_run(struct netflow *);
 
+void netflow_flow_clear(struct netflow_flow *);
+void netflow_flow_update_time(struct netflow_flow *, long long int used);
+void netflow_flow_update_flags(struct netflow_flow *, uint8_t ip_tos,
+                               uint8_t tcp_flags);
+bool netflow_active_timeout_expired(struct netflow *, struct netflow_flow *);
+
 #endif /* netflow.h */
diff --git a/secchan/ofproto.c b/secchan/ofproto.c
index 4266cbf..11f48b2 100644
--- a/secchan/ofproto.c
+++ b/secchan/ofproto.c
@@ -94,9 +94,8 @@ struct rule {
     uint64_t packet_count;      /* Number of packets received. */
     uint64_t byte_count;        /* Number of bytes received. */
     uint64_t accounted_bytes;   /* Number of bytes passed to account_cb. */
-    uint8_t tcp_flags;          /* Bitwise-OR of all TCP flags seen. */
-    uint8_t ip_tos;             /* Last-seen IP type-of-service. */
     tag_type tags;              /* Tags (set only by hooks). */
+    struct netflow_flow nf_flow; /* Per-flow NetFlow tracking data. */
     uint16_t nf_output_iface;   /* Output interface index for NetFlow. */
 
     /* If 'super' is non-NULL, this rule is a subrule, that is, it is an
@@ -245,6 +244,7 @@ static void send_packet_in_action(struct ofpbuf *, void *ofproto);
 static void update_used(struct ofproto *);
 static void update_stats(struct rule *, const struct odp_flow_stats *);
 static void expire_rule(struct cls_rule *, void *ofproto);
+static void active_timeout(struct ofproto *ofproto, struct rule *rule);
 static bool revalidate_rule(struct ofproto *p, struct rule *rule);
 static void revalidate_cb(struct cls_rule *rule_, void *p_);
 
@@ -544,7 +544,8 @@ ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
 
 int
 ofproto_set_netflow(struct ofproto *ofproto, const struct svec *collectors,
-        uint8_t engine_type, uint8_t engine_id, bool add_id_to_iface)
+                    uint8_t engine_type, uint8_t engine_id,
+                    int active_timeout, bool add_id_to_iface)
 {
     if (collectors && collectors->n) {
         if (!ofproto->netflow) {
@@ -552,7 +553,8 @@ ofproto_set_netflow(struct ofproto *ofproto, const struct svec *collectors,
         }
         netflow_set_engine(ofproto->netflow, engine_type, engine_id, 
                 add_id_to_iface);
-        return netflow_set_collectors(ofproto->netflow, collectors);
+        return netflow_set_collectors(ofproto->netflow, collectors,
+                                      active_timeout);
     } else {
         netflow_destroy(ofproto->netflow);
         ofproto->netflow = NULL;
@@ -1399,6 +1401,9 @@ rule_create(struct rule *super,
     }
     rule->n_actions = n_actions;
     rule->actions = xmemdup(actions, n_actions * sizeof *actions);
+    netflow_flow_clear(&rule->nf_flow);
+    netflow_flow_update_time(&rule->nf_flow, rule->created);
+
     return rule;
 }
 
@@ -1504,6 +1509,7 @@ rule_execute(struct ofproto *ofproto, struct rule *rule,
         flow_extract_stats(flow, packet, &stats);
         update_stats(rule, &stats);
         rule->used = time_msec();
+        netflow_flow_update_time(&rule->nf_flow, rule->used);
     }
 }
 
@@ -1698,47 +1704,44 @@ rule_uninstall(struct ofproto *p, struct rule *rule)
     }
 }
 
+static bool
+is_controller_rule(struct rule *rule)
+{
+    /* If the only action is send to the controller then don't report
+     * NetFlow expiration messages since it is just part of the control
+     * logic for the network and not real traffic. */
+
+    return rule->n_odp_actions == 1 &&
+           rule->odp_actions[0].type == ODPAT_CONTROLLER;
+}
+
 static void
 rule_post_uninstall(struct ofproto *ofproto, struct rule *rule)
 {
     struct rule *super = rule->super;
-    bool controller_action;
 
     rule_account(ofproto, rule, 0);
 
-    /* If the only action is send to the controller then don't report
-     * NetFlow expiration messages since it is just part of the control
-     * logic for the network and not real traffic. */
-    controller_action = rule->n_odp_actions == 1 &&
-                        rule->odp_actions[0].type == ODPAT_CONTROLLER;
-
-    if (ofproto->netflow && rule->byte_count && !controller_action) {
+    if (ofproto->netflow && !is_controller_rule(rule)) {
         struct ofexpired expired;
         expired.flow = rule->cr.flow;
         expired.packet_count = rule->packet_count;
         expired.byte_count = rule->byte_count;
         expired.used = rule->used;
-        expired.created = rule->created;
-        expired.tcp_flags = rule->tcp_flags;
-        expired.ip_tos = rule->ip_tos;
         expired.output_iface = rule->nf_output_iface;
-        netflow_expire(ofproto->netflow, &expired);
+        netflow_expire(ofproto->netflow, &rule->nf_flow, &expired);
     }
     if (super) {
         super->packet_count += rule->packet_count;
         super->byte_count += rule->byte_count;
-        super->tcp_flags |= rule->tcp_flags;
-        if (rule->packet_count) {
-            super->ip_tos = rule->ip_tos;
-        }
 
         /* Reset counters to prevent double counting if the rule ever gets
          * reinstalled. */
         rule->packet_count = 0;
         rule->byte_count = 0;
         rule->accounted_bytes = 0;
-        rule->tcp_flags = 0;
-        rule->ip_tos = 0;
+
+        netflow_flow_clear(&rule->nf_flow);
     }
 }
 
@@ -2722,18 +2725,19 @@ update_time(struct rule *rule, const struct odp_flow_stats *stats)
     long long int used = msec_from_nsec(stats->used_sec, stats->used_nsec);
     if (used > rule->used) {
         rule->used = used;
+        netflow_flow_update_time(&rule->nf_flow, used);
     }
 }
 
 static void
 update_stats(struct rule *rule, const struct odp_flow_stats *stats)
 {
-    update_time(rule, stats);
-    rule->packet_count += stats->n_packets;
-    rule->byte_count += stats->n_bytes;
-    rule->tcp_flags |= stats->tcp_flags;
     if (stats->n_packets) {
-        rule->ip_tos = stats->ip_tos;
+        update_time(rule, stats);
+        rule->packet_count += stats->n_packets;
+        rule->byte_count += stats->n_bytes;
+        netflow_flow_update_flags(&rule->nf_flow, stats->ip_tos,
+                                  stats->tcp_flags);
     }
 }
 
@@ -3271,18 +3275,15 @@ expire_rule(struct cls_rule *cls_rule, void *p_)
                    ? rule->used + rule->idle_timeout * 1000
                    : LLONG_MAX);
     expire = MIN(hard_expire, idle_expire);
-    if (expire == LLONG_MAX) {
-        if (rule->installed && time_msec() >= rule->used + 5000) {
-            uninstall_idle_flow(p, rule);
-        }
-        return;
-    }
 
     now = time_msec();
     if (now < expire) {
         if (rule->installed && now >= rule->used + 5000) {
             uninstall_idle_flow(p, rule);
+        } else if (!rule->cr.wc.wildcards) {
+            active_timeout(p, rule);
         }
+
         return;
     }
 
@@ -3304,6 +3305,41 @@ expire_rule(struct cls_rule *cls_rule, void *p_)
 }
 
 static void
+active_timeout(struct ofproto *ofproto, struct rule *rule)
+{
+    if (netflow_active_timeout_expired(ofproto->netflow, &rule->nf_flow) &&
+        !is_controller_rule(rule)) {
+        struct ofexpired expired;
+        struct odp_flow odp_flow;
+
+        /* Get updated flow stats. */
+        memset(&odp_flow, 0, sizeof odp_flow);
+        odp_flow.key = rule->cr.flow;
+        odp_flow.flags = ODPFF_ZERO_TCP_FLAGS;
+        dpif_flow_get(&ofproto->dpif, &odp_flow);
+
+        if (odp_flow.stats.n_packets) {
+            update_time(rule, &odp_flow.stats);
+            netflow_flow_update_flags(&rule->nf_flow, odp_flow.stats.ip_tos,
+                                      odp_flow.stats.tcp_flags);
+        }
+
+        expired.flow = rule->cr.flow;
+        expired.packet_count = rule->packet_count +
+                               odp_flow.stats.n_packets;
+        expired.byte_count = rule->byte_count + odp_flow.stats.n_bytes;
+        expired.used = rule->used;
+        expired.output_iface = rule->nf_output_iface;
+
+        netflow_expire(ofproto->netflow, &rule->nf_flow, &expired);
+
+        /* Schedule us to send the accumulated records once we have
+         * collected all of them. */
+        poll_immediate_wake();
+    }
+}
+
+static void
 update_used(struct ofproto *p)
 {
     struct odp_flow *flows;
diff --git a/secchan/ofproto.h b/secchan/ofproto.h
index 8847dee..505ad7f 100644
--- a/secchan/ofproto.h
+++ b/secchan/ofproto.h
@@ -30,12 +30,9 @@ struct svec;
 
 struct ofexpired {
     flow_t flow;
-    uint64_t packet_count;      /* Packets from *expired* subrules. */
-    uint64_t byte_count;        /* Bytes from *expired* subrules. */
+    uint64_t packet_count;      /* Packets from subrules. */
+    uint64_t byte_count;        /* Bytes from subrules. */
     long long int used;         /* Last-used time (0 if never used). */
-    long long int created;      /* Creation time. */
-    uint8_t tcp_flags;          /* Bitwise-OR of all TCP flags seen. */
-    uint8_t ip_tos;             /* Last-seen IP type-of-service. */
     uint16_t output_iface;      /* Output interface index. */
 };
 
@@ -64,7 +61,8 @@ int ofproto_set_controller(struct ofproto *, const char *controller);
 int ofproto_set_listeners(struct ofproto *, const struct svec *listeners);
 int ofproto_set_snoops(struct ofproto *, const struct svec *snoops);
 int ofproto_set_netflow(struct ofproto *, const struct svec *collectors,
-        uint8_t engine_type, uint8_t engine_id, bool add_id_to_iface);
+                        uint8_t engine_type, uint8_t engine_id,
+                        int active_timeout, bool add_id_to_iface);
 void ofproto_set_failure(struct ofproto *, bool fail_open);
 void ofproto_set_rate_limit(struct ofproto *, int rate_limit, int burst_limit);
 int ofproto_set_stp(struct ofproto *, bool enable_stp);
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index fcdd866..7f718c9 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -497,6 +497,7 @@ bridge_reconfigure(void)
         const char *devname;
         uint8_t engine_type = br->dpif.minor;
         uint8_t engine_id = br->dpif.minor;
+        int active_timeout = 0;
         bool add_id_to_iface = false;
         struct svec nf_hosts;
 
@@ -550,6 +551,13 @@ bridge_reconfigure(void)
         if (cfg_has("netflow.%s.engine-id", br->name)) {
             engine_id = cfg_get_int(0, "netflow.%s.engine-id", br->name);
         }
+        if (cfg_has("netflow.%s.engine-id", br->name)) {
+            engine_id = cfg_get_int(0, "netflow.%s.engine-id", br->name);
+        }
+        if (cfg_has("netflow.%s.active-timeout", br->name)) {
+            active_timeout = cfg_get_int(0, "netflow.%s.active-timeout",
+                                         br->name);
+        }
         if (cfg_has("netflow.%s.add-id-to-iface", br->name)) {
             add_id_to_iface = cfg_get_bool(0, "netflow.%s.add-id-to-iface",
                     br->name);
@@ -567,7 +575,7 @@ bridge_reconfigure(void)
         svec_init(&nf_hosts);
         cfg_get_all_keys(&nf_hosts, "netflow.%s.host", br->name);
         if (ofproto_set_netflow(br->ofproto, &nf_hosts,  engine_type, 
-                    engine_id, add_id_to_iface)) {
+                    engine_id, active_timeout, add_id_to_iface)) {
             VLOG_ERR("bridge %s: problem setting netflow collectors", 
                     br->name);
         }
diff --git a/vswitchd/ovs-vswitchd.conf.5.in b/vswitchd/ovs-vswitchd.conf.5.in
index f998e49..be02ca6 100644
--- a/vswitchd/ovs-vswitchd.conf.5.in
+++ b/vswitchd/ovs-vswitchd.conf.5.in
@@ -412,18 +412,23 @@ port.eth1.ingress.policing-burst=20
 
 .fi
 .SS "NetFlow v5 Flow Logging"
-NetFlow is a protocol that exports a number of details about terminating 
-IP flows, such as the principals involved and duration.  A bridge may be 
-configured to send NetFlow v5 records to NetFlow collectors when flows 
-end.  To enable, define the key \fBnetflow.\fIbridge\fB.host\fR for each 
-collector in the form \fIip\fB:\fIport\fR.  Records from \fIbridge\fR 
+NetFlow is a protocol that exports a number of details about terminating
+IP flows, such as the principals involved and duration.  A bridge may be
+configured to send NetFlow v5 records to NetFlow collectors when flows
+end.  To enable, define the key \fBnetflow.\fIbridge\fB.host\fR for each
+collector in the form \fIip\fB:\fIport\fR.  Records from \fIbridge\fR
 will be sent to each \fIip\fR on UDP \fIport\fR.  The \fIip\fR must
 be specified numerically, not as a DNS name.
 
-The NetFlow messages will use the datapath index for the engine type and id.  
-This can be overridden with the \fBnetflow.\fIbridge\fB.engine-type\fR and 
+In addition to terminating flows, NetFlow can also send records at a set
+interval for flows that are still active.  This interval can be configured
+by defining the key \fBnetflow.\fIbridge\fB\.active-timeout\fR.  The value
+is in seconds.  An active timeout of 0 will disable this functionality.
+
+The NetFlow messages will use the datapath index for the engine type and id.
+This can be overridden with the \fBnetflow.\fIbridge\fB.engine-type\fR and
 \fBnetflow.\fIbridge\fB.engine-id\fR, respectively.  Each takes a value
-between 0 and 255, inclusive. 
+between 0 and 255, inclusive.
 
 Many NetFlow collectors do not expect multiple virtual switches to be
 sending messages from the same host, and they do not store the engine
@@ -431,7 +436,7 @@ information which could be used to disambiguate the traffic.  To prevent
 flows from multiple switches appearing as if they came on the interface,
 add \fBnetflow.\fIbridge\fB.add-id-to-iface=true\fR to the configuration
 file.  This will place the least significant 7 bits of the engine id
-into the most significant bits of the ingress and egress interface fields 
+into the most significant bits of the ingress and egress interface fields
 of flow records.  When this option is enabled, a maximum of 508 ports are
 supported.  By default, this behavior is disabled.
 
-- 
1.6.0.4





More information about the dev mailing list