[ovs-dev] [PATCH] ipfix: implement flow caching and aggregation in exporter

Romain Lenglet rlenglet at vmware.com
Thu Jul 11 23:48:55 UTC 2013


Implement a per-exporter flow cache with active timeout expiration.
Add columns "cache_active_timeout" and "cache_max_flows" into table
"IPFIX" to configure each cache.

Add per-flow elements "octetDeltaSumOfSquares",
"minimumIpTotalLength", and "maximumIpTotalLength" to replace
"ethernetTotalLength".  Add per-flow element "flowEndReason" to
indicate whether a flow has expired because of an active timeout, the
cache size limit being reached, or the exporter being stopped.

Signed-off-by: Romain Lenglet <rlenglet at vmware.com>
---
 ofproto/ofproto-dpif-ipfix.c |  758 +++++++++++++++++++++++++++++++++++++-----
 ofproto/ofproto-dpif-ipfix.h |    3 +
 ofproto/ofproto-dpif.c       |   23 +-
 ofproto/ofproto.h            |    4 +
 utilities/ovs-vsctl.8.in     |    5 +-
 vswitchd/bridge.c            |   10 +
 vswitchd/vswitch.ovsschema   |   12 +-
 vswitchd/vswitch.xml         |   12 +
 8 files changed, 727 insertions(+), 100 deletions(-)

diff --git a/ofproto/ofproto-dpif-ipfix.c b/ofproto/ofproto-dpif-ipfix.c
index ef0e980..e066199 100644
--- a/ofproto/ofproto-dpif-ipfix.c
+++ b/ofproto/ofproto-dpif-ipfix.c
@@ -15,15 +15,18 @@
  */
 
 #include <config.h>
+#include <sys/time.h>
 #include "ofproto-dpif-ipfix.h"
 #include "byte-order.h"
 #include "collectors.h"
 #include "flow.h"
 #include "hash.h"
 #include "hmap.h"
+#include "list.h"
 #include "ofpbuf.h"
 #include "ofproto.h"
 #include "packets.h"
+#include "poll-loop.h"
 #include "sset.h"
 #include "util.h"
 #include "timeval.h"
@@ -40,7 +43,11 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 struct dpif_ipfix_exporter {
     struct collectors *collectors;
     uint32_t seq_number;
-    time_t last_template_set_time;
+    __kernel_time_t last_template_set_time;
+    struct hmap cache_flow_key_map;  /* ipfix_flow_cache_entry. */
+    struct list cache_flow_start_timestamp_list;  /* ipfix_flow_cache_entry. */
+    uint32_t cache_active_timeout;  /* In seconds. */
+    uint32_t cache_max_flows;
 };
 
 struct dpif_ipfix_bridge_exporter {
@@ -61,7 +68,7 @@ struct dpif_ipfix_flow_exporter_map_node {
 
 struct dpif_ipfix {
     struct dpif_ipfix_bridge_exporter bridge_exporter;
-    struct hmap flow_exporter_map;  /* dpif_ipfix_flow_exporter_map_nodes. */
+    struct hmap flow_exporter_map;  /* dpif_ipfix_flow_exporter_map_node. */
     int ref_cnt;
 };
 
@@ -138,29 +145,27 @@ struct ipfix_template_field_specifier {
 } __attribute__((packed));
 BUILD_ASSERT_DECL(sizeof(struct ipfix_template_field_specifier) == 4);
 
-/* Part of data record for common metadata and Ethernet entities. */
-struct ipfix_data_record_common {
+/* Part of data record flow key for common metadata and Ethernet entities. */
+struct ipfix_data_record_flow_key_common {
     ovs_be32 observation_point_id;  /* OBSERVATION_POINT_ID */
-    ovs_be64 packet_delta_count;  /* PACKET_DELTA_COUNT */
-    ovs_be64 layer2_octet_delta_count;  /* LAYER2_OCTET_DELTA_COUNT */
     uint8_t source_mac_address[6];  /* SOURCE_MAC_ADDRESS */
     uint8_t destination_mac_address[6];  /* DESTINATION_MAC_ADDRESS */
     ovs_be16 ethernet_type;  /* ETHERNET_TYPE */
-    ovs_be16 ethernet_total_length;  /* ETHERNET_TOTAL_LENGTH */
     uint8_t ethernet_header_length;  /* ETHERNET_HEADER_LENGTH */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_common) == 37);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_common) == 19);
 
-/* Part of data record for VLAN entities. */
-struct ipfix_data_record_vlan {
+/* Part of data record flow key for VLAN entities. */
+struct ipfix_data_record_flow_key_vlan {
     ovs_be16 vlan_id;  /* VLAN_ID */
     ovs_be16 dot1q_vlan_id;  /* DOT1Q_VLAN_ID */
     uint8_t dot1q_priority;  /* DOT1Q_PRIORITY */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_vlan) == 5);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_vlan) == 5);
 
-/* Part of data record for IP entities. */
-struct ipfix_data_record_ip {
+/* Part of data record flow key for IP entities. */
+/* XXX: Replace IP_TTL with MINIMUM_TTL and MAXIMUM_TTL? */
+struct ipfix_data_record_flow_key_ip {
     uint8_t ip_version;  /* IP_VERSION */
     uint8_t ip_ttl;  /* IP_TTL */
     uint8_t protocol_identifier;  /* PROTOCOL_IDENTIFIER */
@@ -168,29 +173,118 @@ struct ipfix_data_record_ip {
     uint8_t ip_precedence;  /* IP_PRECEDENCE */
     uint8_t ip_class_of_service;  /* IP_CLASS_OF_SERVICE */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_ip) == 6);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_ip) == 6);
 
-/* Part of data record for IPv4 entities. */
-struct ipfix_data_record_ipv4 {
+/* Part of data record flow key for IPv4 entities. */
+struct ipfix_data_record_flow_key_ipv4 {
     ovs_be32 source_ipv4_address;  /* SOURCE_IPV4_ADDRESS */
     ovs_be32 destination_ipv4_address;  /* DESTINATION_IPV4_ADDRESS */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_ipv4) == 8);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_ipv4) == 8);
 
-/* Part of data record for IPv4 entities. */
-struct ipfix_data_record_ipv6 {
+/* Part of data record flow key for IPv6 entities. */
+struct ipfix_data_record_flow_key_ipv6 {
     uint8_t source_ipv6_address[16];  /* SOURCE_IPV6_ADDRESS */
     uint8_t destination_ipv6_address[16];  /* DESTINATION_IPV6_ADDRESS */
     ovs_be32 flow_label_ipv6;  /* FLOW_LABEL_IPV6 */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_ipv6) == 36);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_ipv6) == 36);
 
-/* Part of data record for TCP/UDP entities. */
-struct ipfix_data_record_tcpudp {
+/* Part of data record flow key for TCP/UDP entities. */
+struct ipfix_data_record_flow_key_tcpudp {
     ovs_be16 source_transport_port;  /* SOURCE_TRANSPORT_PORT */
     ovs_be16 destination_transport_port;  /* DESTINATION_TRANSPORT_PORT */
 } __attribute__((packed));
-BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_tcpudp) == 4);
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_flow_key_tcpudp) == 4);
+
+/* Cf. IETF RFC 5102 Section 5.11.3. */
+enum ipfix_flow_end_reason {
+    IDLE_TIMEOUT = 0x01,
+    ACTIVE_TIMEOUT = 0x02,
+    END_OF_FLOW_DETECTED = 0x03,
+    FORCED_END = 0x04,
+    LACK_OF_RESOURCES = 0x05
+};
+
+/* Part of data record for common aggregated elements. */
+struct ipfix_data_record_aggregated_common {
+    ovs_be32 flow_start_delta_microseconds; /* FLOW_START_DELTA_MICROSECONDS */
+    ovs_be32 flow_end_delta_microseconds; /* FLOW_END_DELTA_MICROSECONDS */
+    ovs_be64 packet_delta_count;  /* PACKET_DELTA_COUNT */
+    ovs_be64 layer2_octet_delta_count;  /* LAYER2_OCTET_DELTA_COUNT */
+    uint8_t flow_end_reason;  /* FLOW_END_REASON */
+} __attribute__((packed));
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_aggregated_common) == 25);
+
+/* Part of data record for IP aggregated elements. */
+struct ipfix_data_record_aggregated_ip {
+    ovs_be64 octet_delta_sum_of_squares;  /* OCTET_DELTA_SUM_OF_SQUARES */
+    ovs_be64 minimum_ip_total_length;  /* MINIMUM_IP_TOTAL_LENGTH */
+    ovs_be64 maximum_ip_total_length;  /* MAXIMUM_IP_TOTAL_LENGTH */
+} __attribute__((packed));
+BUILD_ASSERT_DECL(sizeof(struct ipfix_data_record_aggregated_ip) == 24);
+
+#define MAX_FLOW_KEY_LEN                                \
+    (sizeof(struct ipfix_data_record_flow_key_common)    \
+     + sizeof(struct ipfix_data_record_flow_key_vlan)    \
+     + sizeof(struct ipfix_data_record_flow_key_ip)      \
+     + sizeof(struct ipfix_data_record_flow_key_ipv6)    \
+     + sizeof(struct ipfix_data_record_flow_key_tcpudp))
+
+#define MAX_DATA_RECORD_LEN                                 \
+    (MAX_FLOW_KEY_LEN                                       \
+     + sizeof(struct ipfix_data_record_aggregated_common)   \
+     + sizeof(struct ipfix_data_record_aggregated_ip))
+
+/* Max length of a data set.  To simplify the implementation, each
+ * data record is sent in a separate data set, so each data set
+ * contains at most one data record. */
+#define MAX_DATA_SET_LEN             \
+    (sizeof(struct ipfix_set_header) \
+     + MAX_DATA_RECORD_LEN)
+
+/* Max length of an IPFIX message. Arbitrarily set to accomodate low
+ * MTU. */
+#define MAX_MESSAGE_LEN 1024
+
+
+
+/* Cache structures. */
+
+/* Flow key. */
+struct ipfix_flow_key {
+    uint32_t obs_domain_id;
+    uint16_t template_id;
+    size_t flow_key_msg_part_size;
+    uint8_t flow_key_msg_part[MAX_FLOW_KEY_LEN];
+};
+
+/* Flow cache entry. */
+struct ipfix_flow_cache_entry {
+    struct hmap_node flow_key_map_node;
+    struct list cache_flow_start_timestamp_list_node;
+    struct ipfix_flow_key flow_key;
+    /* Common aggregated elements. */
+    struct timeval flow_start_timestamp;
+    struct timeval flow_end_timestamp;
+    uint64_t packet_delta_count;
+    uint64_t layer2_octet_delta_count;
+    uint64_t octet_delta_sum_of_squares;  /* 0 if not IP. */
+    uint16_t minimum_ip_total_length;  /* 0 if not IP. */
+    uint16_t maximum_ip_total_length;  /* 0 if not IP. */
+};
+
+
+
+static void dpif_ipfix_cache_expire(struct dpif_ipfix_exporter *, bool,
+                                    const struct timeval *, const uint32_t,
+                                    const uint32_t);
+
+static void get_export_time_now(struct timeval *, uint32_t *, uint32_t *);
+
+static void dpif_ipfix_cache_expire_now(struct dpif_ipfix_exporter *, bool);
+
+
 
 static bool
 ofproto_ipfix_bridge_exporter_options_equal(
@@ -200,6 +294,8 @@ ofproto_ipfix_bridge_exporter_options_equal(
     return (a->obs_domain_id == b->obs_domain_id
             && a->obs_point_id == b->obs_point_id
             && a->sampling_rate == b->sampling_rate
+            && a->cache_active_timeout == b->cache_active_timeout
+            && a->cache_max_flows == b->cache_max_flows
             && sset_equals(&a->targets, &b->targets));
 }
 
@@ -229,6 +325,8 @@ ofproto_ipfix_flow_exporter_options_equal(
     const struct ofproto_ipfix_flow_exporter_options *b)
 {
     return (a->collector_set_id == b->collector_set_id
+            && a->cache_active_timeout == b->cache_active_timeout
+            && a->cache_max_flows == b->cache_max_flows
             && sset_equals(&a->targets, &b->targets));
 }
 
@@ -253,17 +351,44 @@ ofproto_ipfix_flow_exporter_options_destroy(
 }
 
 static void
+dpif_ipfix_exporter_init(struct dpif_ipfix_exporter *exporter)
+{
+    exporter->collectors = NULL;
+    exporter->seq_number = 1;
+    exporter->last_template_set_time = TYPE_MINIMUM(__kernel_time_t);
+    hmap_init(&exporter->cache_flow_key_map);
+    list_init(&exporter->cache_flow_start_timestamp_list);
+    exporter->cache_active_timeout = 0;
+    exporter->cache_max_flows = 0;
+}
+
+static void
 dpif_ipfix_exporter_clear(struct dpif_ipfix_exporter *exporter)
 {
+    /* Flush the cache with flow end reason "forced end." */
+    dpif_ipfix_cache_expire_now(exporter, true);
+
     collectors_destroy(exporter->collectors);
     exporter->collectors = NULL;
     exporter->seq_number = 1;
-    exporter->last_template_set_time = TIME_MIN;
+    exporter->last_template_set_time = TYPE_MINIMUM(__kernel_time_t);
+
+    exporter->cache_active_timeout = 0;
+    exporter->cache_max_flows = 0;
+}
+
+static void
+dpif_ipfix_exporter_destroy(struct dpif_ipfix_exporter *exporter)
+{
+    dpif_ipfix_exporter_clear(exporter);
+    hmap_destroy(&exporter->cache_flow_key_map);
 }
 
 static bool
 dpif_ipfix_exporter_set_options(struct dpif_ipfix_exporter *exporter,
-                                const struct sset *targets)
+                                const struct sset *targets,
+                                const uint32_t cache_active_timeout,
+                                const uint32_t cache_max_flows)
 {
     collectors_destroy(exporter->collectors);
     collectors_create(targets, IPFIX_DEFAULT_COLLECTOR_PORT,
@@ -274,10 +399,20 @@ dpif_ipfix_exporter_set_options(struct dpif_ipfix_exporter *exporter,
         dpif_ipfix_exporter_clear(exporter);
         return false;
     }
+    exporter->cache_active_timeout = cache_active_timeout;
+    exporter->cache_max_flows = cache_max_flows;
     return true;
 }
 
 static void
+dpif_ipfix_bridge_exporter_init(struct dpif_ipfix_bridge_exporter *exporter)
+{
+    dpif_ipfix_exporter_init(&exporter->exporter);
+    exporter->options = NULL;
+    exporter->probability = 0;
+}
+
+static void
 dpif_ipfix_bridge_exporter_clear(struct dpif_ipfix_bridge_exporter *exporter)
 {
     dpif_ipfix_exporter_clear(&exporter->exporter);
@@ -287,6 +422,13 @@ dpif_ipfix_bridge_exporter_clear(struct dpif_ipfix_bridge_exporter *exporter)
 }
 
 static void
+dpif_ipfix_bridge_exporter_destroy(struct dpif_ipfix_bridge_exporter *exporter)
+{
+    dpif_ipfix_bridge_exporter_clear(exporter);
+    dpif_ipfix_exporter_destroy(&exporter->exporter);
+}
+
+static void
 dpif_ipfix_bridge_exporter_set_options(
     struct dpif_ipfix_bridge_exporter *exporter,
     const struct ofproto_ipfix_bridge_exporter_options *options)
@@ -311,8 +453,9 @@ dpif_ipfix_bridge_exporter_set_options(
     if (options_changed
         || collectors_count(exporter->exporter.collectors)
             < sset_count(&options->targets)) {
-        if (!dpif_ipfix_exporter_set_options(&exporter->exporter,
-                                             &options->targets)) {
+        if (!dpif_ipfix_exporter_set_options(
+                &exporter->exporter, &options->targets,
+                options->cache_active_timeout, options->cache_max_flows)) {
             return;
         }
     }
@@ -326,6 +469,10 @@ dpif_ipfix_bridge_exporter_set_options(
     exporter->options = ofproto_ipfix_bridge_exporter_options_clone(options);
     exporter->probability =
         MAX(1, UINT32_MAX / exporter->options->sampling_rate);
+
+    /* Run over the cache as some entries might have expired after
+     * changing the timeouts. */
+    dpif_ipfix_cache_expire_now(&exporter->exporter, false);
 }
 
 static struct dpif_ipfix_flow_exporter_map_node*
@@ -347,6 +494,13 @@ dpif_ipfix_find_flow_exporter_map_node(
 }
 
 static void
+dpif_ipfix_flow_exporter_init(struct dpif_ipfix_flow_exporter *exporter)
+{
+    dpif_ipfix_exporter_init(&exporter->exporter);
+    exporter->options = NULL;
+}
+
+static void
 dpif_ipfix_flow_exporter_clear(struct dpif_ipfix_flow_exporter *exporter)
 {
     dpif_ipfix_exporter_clear(&exporter->exporter);
@@ -354,6 +508,13 @@ dpif_ipfix_flow_exporter_clear(struct dpif_ipfix_flow_exporter *exporter)
     exporter->options = NULL;
 }
 
+static void
+dpif_ipfix_flow_exporter_destroy(struct dpif_ipfix_flow_exporter *exporter)
+{
+    dpif_ipfix_flow_exporter_clear(exporter);
+    dpif_ipfix_exporter_destroy(&exporter->exporter);
+}
+
 static bool
 dpif_ipfix_flow_exporter_set_options(
     struct dpif_ipfix_flow_exporter *exporter,
@@ -379,8 +540,9 @@ dpif_ipfix_flow_exporter_set_options(
     if (options_changed
         || collectors_count(exporter->exporter.collectors)
             < sset_count(&options->targets)) {
-        if (!dpif_ipfix_exporter_set_options(&exporter->exporter,
-                                             &options->targets)) {
+        if (!dpif_ipfix_exporter_set_options(
+                &exporter->exporter, &options->targets,
+                options->cache_active_timeout, options->cache_max_flows)) {
             return false;
         }
     }
@@ -393,6 +555,10 @@ dpif_ipfix_flow_exporter_set_options(
     ofproto_ipfix_flow_exporter_options_destroy(exporter->options);
     exporter->options = ofproto_ipfix_flow_exporter_options_clone(options);
 
+    /* Run over the cache as some entries might have expired after
+     * changing the timeouts. */
+    dpif_ipfix_cache_expire_now(&exporter->exporter, false);
+
     return true;
 }
 
@@ -419,7 +585,7 @@ dpif_ipfix_set_options(
             di, options->collector_set_id);
         if (!node) {
             node = xzalloc(sizeof *node);
-            dpif_ipfix_exporter_clear(&node->exporter.exporter);
+            dpif_ipfix_flow_exporter_init(&node->exporter);
             hmap_insert(&di->flow_exporter_map, &node->node,
                         hash_int(options->collector_set_id, 0));
         }
@@ -448,7 +614,7 @@ dpif_ipfix_set_options(
             }
             if (i == n_flow_exporters_options) {  // Not found.
                 hmap_remove(&di->flow_exporter_map, &node->node);
-                dpif_ipfix_flow_exporter_clear(&node->exporter);
+                dpif_ipfix_flow_exporter_destroy(&node->exporter);
                 free(node);
             }
         }
@@ -463,7 +629,7 @@ dpif_ipfix_create(void)
 {
     struct dpif_ipfix *di;
     di = xzalloc(sizeof *di);
-    dpif_ipfix_exporter_clear(&di->bridge_exporter.exporter);
+    dpif_ipfix_bridge_exporter_init(&di->bridge_exporter);
     hmap_init(&di->flow_exporter_map);
     di->ref_cnt = 1;
     return di;
@@ -489,14 +655,14 @@ dpif_ipfix_get_bridge_exporter_probability(const struct dpif_ipfix *di)
 static void
 dpif_ipfix_clear(struct dpif_ipfix *di)
 {
-    struct dpif_ipfix_flow_exporter_map_node *node, *next;
+    struct dpif_ipfix_flow_exporter_map_node *exp_node, *exp_next;
 
     dpif_ipfix_bridge_exporter_clear(&di->bridge_exporter);
 
-    HMAP_FOR_EACH_SAFE (node, next, node, &di->flow_exporter_map) {
-        hmap_remove(&di->flow_exporter_map, &node->node);
-        dpif_ipfix_flow_exporter_clear(&node->exporter);
-        free(node);
+    HMAP_FOR_EACH_SAFE (exp_node, exp_next, node, &di->flow_exporter_map) {
+        hmap_remove(&di->flow_exporter_map, &exp_node->node);
+        dpif_ipfix_flow_exporter_destroy(&exp_node->exporter);
+        free(exp_node);
     }
 }
 
@@ -510,21 +676,22 @@ dpif_ipfix_unref(struct dpif_ipfix *di)
     ovs_assert(di->ref_cnt > 0);
     if (!--di->ref_cnt) {
         dpif_ipfix_clear(di);
+        dpif_ipfix_bridge_exporter_destroy(&di->bridge_exporter);
         hmap_destroy(&di->flow_exporter_map);
         free(di);
     }
 }
 
 static void
-ipfix_init_header(uint32_t seq_number, uint32_t obs_domain_id,
-                  struct ofpbuf *msg)
+ipfix_init_header(uint32_t export_time_sec, uint32_t seq_number,
+                  uint32_t obs_domain_id, struct ofpbuf *msg)
 {
     struct ipfix_header *hdr;
 
     hdr = ofpbuf_put_zeros(msg, sizeof *hdr);
     hdr->version = htons(IPFIX_VERSION);
     hdr->length = htons(sizeof *hdr);  /* Updated in ipfix_send_msg. */
-    hdr->export_time = htonl(time_wall());
+    hdr->export_time = htonl(export_time_sec);
     hdr->seq_number = htonl(seq_number);
     hdr->obs_domain_id = htonl(obs_domain_id);
 }
@@ -577,15 +744,14 @@ ipfix_define_template_fields(enum ipfix_proto_l2 l2, enum ipfix_proto_l3 l3,
         count++; \
     }
 
+    /* 1. Flow key. */
+
     DEF(OBSERVATION_POINT_ID);
-    DEF(PACKET_DELTA_COUNT);
-    DEF(LAYER2_OCTET_DELTA_COUNT);
 
     /* Common Ethernet entities. */
     DEF(SOURCE_MAC_ADDRESS);
     DEF(DESTINATION_MAC_ADDRESS);
     DEF(ETHERNET_TYPE);
-    DEF(ETHERNET_TOTAL_LENGTH);
     DEF(ETHERNET_HEADER_LENGTH);
 
     if (l2 == IPFIX_PROTO_L2_VLAN) {
@@ -617,6 +783,20 @@ ipfix_define_template_fields(enum ipfix_proto_l2 l2, enum ipfix_proto_l3 l3,
         DEF(DESTINATION_TRANSPORT_PORT);
     }
 
+    /* 2. Flow aggregated data. */
+
+    DEF(FLOW_START_DELTA_MICROSECONDS);
+    DEF(FLOW_END_DELTA_MICROSECONDS);
+    DEF(PACKET_DELTA_COUNT);
+    DEF(LAYER2_OCTET_DELTA_COUNT);
+    DEF(FLOW_END_REASON);
+
+    if (l3 != IPFIX_PROTO_L3_UNKNOWN) {
+        DEF(OCTET_DELTA_SUM_OF_SQUARES);
+        DEF(MINIMUM_IP_TOTAL_LENGTH);
+        DEF(MAXIMUM_IP_TOTAL_LENGTH);
+    }
+
 #undef DEF
 
     return count;
@@ -624,9 +804,9 @@ ipfix_define_template_fields(enum ipfix_proto_l2 l2, enum ipfix_proto_l3 l3,
 
 static void
 ipfix_send_template_msg(struct dpif_ipfix_exporter *exporter,
-                        uint32_t obs_domain_id)
+                        uint32_t export_time_sec, uint32_t obs_domain_id)
 {
-    uint64_t msg_stub[DIV_ROUND_UP(1500, 8)];
+    uint64_t msg_stub[DIV_ROUND_UP(MAX_MESSAGE_LEN, 8)];
     struct ofpbuf msg;
     size_t set_hdr_offset, tmpl_hdr_offset;
     struct ipfix_set_header *set_hdr;
@@ -638,7 +818,8 @@ ipfix_send_template_msg(struct dpif_ipfix_exporter *exporter,
 
     ofpbuf_use_stub(&msg, msg_stub, sizeof msg_stub);
 
-    ipfix_init_header(exporter->seq_number, obs_domain_id, &msg);
+    ipfix_init_header(export_time_sec, exporter->seq_number, obs_domain_id,
+                      &msg);
     set_hdr_offset = msg.size;
 
     /* Add a Template Set. */
@@ -677,24 +858,148 @@ ipfix_send_template_msg(struct dpif_ipfix_exporter *exporter,
     ofpbuf_uninit(&msg);
 }
 
+static inline uint32_t
+ipfix_hash_flow_key(const struct ipfix_flow_key *flow_key, uint32_t basis)
+{
+    uint32_t hash;
+    hash = hash_int(flow_key->obs_domain_id, basis);
+    hash = hash_int(flow_key->template_id, hash);
+    hash = hash_bytes(flow_key->flow_key_msg_part,
+                      flow_key->flow_key_msg_part_size, hash);
+    return hash;
+}
+
+static bool
+ipfix_flow_key_equal(const struct ipfix_flow_key *a,
+                     const struct ipfix_flow_key *b)
+{
+    return (a->obs_domain_id == b->obs_domain_id
+            && a->template_id == b->template_id
+            && memcmp(a->flow_key_msg_part, b->flow_key_msg_part,
+                      a->flow_key_msg_part_size) == 0);
+}
+
+static struct ipfix_flow_cache_entry*
+ipfix_cache_find_entry(const struct dpif_ipfix_exporter *exporter,
+                       const struct ipfix_flow_key *flow_key)
+{
+    struct ipfix_flow_cache_entry *entry;
+
+    HMAP_FOR_EACH_WITH_HASH (entry, flow_key_map_node,
+                             ipfix_hash_flow_key(flow_key, 0),
+                             &exporter->cache_flow_key_map) {
+        if (ipfix_flow_key_equal(&entry->flow_key, flow_key)) {
+            return entry;
+        }
+    }
+
+    return NULL;
+}
+
+static bool
+ipfix_cache_next_timeout_msec(const struct dpif_ipfix_exporter *exporter,
+                              long long int *next_timeout_msec)
+{
+    struct ipfix_flow_cache_entry *entry;
+
+    LIST_FOR_EACH(entry, cache_flow_start_timestamp_list_node,
+                  &exporter->cache_flow_start_timestamp_list) {
+        *next_timeout_msec = timeval_to_msec(&entry->flow_start_timestamp)
+            + 1000LL * exporter->cache_active_timeout;
+        return true;
+    }
+
+    return false;
+}
+
+static void
+ipfix_cache_aggregate_entries(struct ipfix_flow_cache_entry *from_entry,
+                              struct ipfix_flow_cache_entry *to_entry)
+{
+    struct timeval *to_start, *to_end, *from_start, *from_end;
+    uint16_t *to_min_len, *to_max_len, *from_min_len, *from_max_len;
+
+    to_start = &to_entry->flow_start_timestamp;
+    to_end = &to_entry->flow_end_timestamp;
+    from_start = &from_entry->flow_start_timestamp;
+    from_end = &from_entry->flow_end_timestamp;
+
+    if (timercmp(to_start, from_start, >)) {
+        *to_start = *from_start;
+    }
+
+    if (timercmp(to_end, from_end, <)) {
+        *to_end = *from_end;
+    }
+
+    to_entry->packet_delta_count += from_entry->packet_delta_count;
+    to_entry->layer2_octet_delta_count += from_entry->layer2_octet_delta_count;
+
+    to_entry->octet_delta_sum_of_squares +=
+        from_entry->octet_delta_sum_of_squares;
+
+    to_min_len = &to_entry->minimum_ip_total_length;
+    to_max_len = &to_entry->maximum_ip_total_length;
+    from_min_len = &from_entry->minimum_ip_total_length;
+    from_max_len = &from_entry->maximum_ip_total_length;
+
+    if (!*to_min_len || (*from_min_len && *to_min_len > *from_min_len)) {
+        *to_min_len = *from_min_len;
+    }
+    if (*to_max_len < *from_max_len) {
+        *to_max_len = *from_max_len;
+    }
+}
+
+/* Add an entry into a flow cache.  The entry is either aggregated into
+ * an existing entry with the same flow key and free()d, or it is
+ * inserted into the cache. */
+static void
+ipfix_cache_update(struct dpif_ipfix_exporter *exporter,
+                   struct ipfix_flow_cache_entry *entry)
+{
+    struct ipfix_flow_cache_entry *old_entry;
+
+    old_entry = ipfix_cache_find_entry(exporter, &entry->flow_key);
+
+    if (old_entry == NULL) {
+        hmap_insert(&exporter->cache_flow_key_map, &entry->flow_key_map_node,
+                    ipfix_hash_flow_key(&entry->flow_key, 0));
+
+        /* As the latest entry added into the cache, it should
+         * logically have the highest flow_start_timestamp, so append
+         * it at the tail. */
+        list_push_back(&exporter->cache_flow_start_timestamp_list,
+                       &entry->cache_flow_start_timestamp_list_node);
+
+        /* Enforce exporter->cache_max_flows limit. */
+        if (hmap_count(&exporter->cache_flow_key_map)
+            > exporter->cache_max_flows) {
+            dpif_ipfix_cache_expire_now(exporter, false);
+        }
+    } else {
+        ipfix_cache_aggregate_entries(entry, old_entry);
+        free(entry);
+    }
+}
+
 static void
-ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter, struct ofpbuf *packet,
-                    const struct flow *flow, uint64_t packet_delta_count,
-                    uint32_t obs_domain_id, uint32_t obs_point_id)
+ipfix_cache_entry_init(struct ipfix_flow_cache_entry *entry,
+                       struct ofpbuf *packet, const struct flow *flow,
+                       uint64_t packet_delta_count, uint32_t obs_domain_id,
+                       uint32_t obs_point_id)
 {
-    uint64_t msg_stub[DIV_ROUND_UP(1500, 8)];
+    struct ipfix_flow_key *flow_key;
     struct ofpbuf msg;
-    size_t set_hdr_offset;
-    struct ipfix_set_header *set_hdr;
     enum ipfix_proto_l2 l2;
     enum ipfix_proto_l3 l3;
     enum ipfix_proto_l4 l4;
+    uint8_t ethernet_header_length;
+    uint16_t ethernet_total_length;
 
-    ofpbuf_use_stub(&msg, msg_stub, sizeof msg_stub);
-
-    ipfix_init_header(exporter->seq_number, obs_domain_id, &msg);
-    exporter->seq_number++;
-    set_hdr_offset = msg.size;
+    flow_key = &entry->flow_key;
+    ofpbuf_use_stub(&msg, flow_key->flow_key_msg_part,
+                    sizeof flow_key->flow_key_msg_part);
 
     /* Choose the right template ID matching the protocols in the
      * sampled packet. */
@@ -721,46 +1026,33 @@ ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter, struct ofpbuf *packet,
         }
     }
 
-    /* Add a Data Set. */
-    set_hdr = ofpbuf_put_zeros(&msg, sizeof *set_hdr);
-    set_hdr->set_id = htons(ipfix_get_template_id(l2, l3, l4));
+    flow_key->obs_domain_id = obs_domain_id;
+    flow_key->template_id = ipfix_get_template_id(l2, l3, l4);
 
     /* The fields defined in the ipfix_data_record_* structs and sent
      * below must match exactly the templates defined in
      * ipfix_define_template_fields. */
 
+    ethernet_header_length = (l2 == IPFIX_PROTO_L2_VLAN)
+        ? VLAN_ETH_HEADER_LEN : ETH_HEADER_LEN;
+    ethernet_total_length = packet->size;
+
     /* Common Ethernet entities. */
     {
-        struct ipfix_data_record_common *data_common;
-        uint16_t ethernet_total_length;
-        uint8_t ethernet_header_length;
-        uint64_t layer2_octet_delta_count;
-
-        ethernet_total_length = packet->size;
-        ethernet_header_length = (l2 == IPFIX_PROTO_L2_VLAN)
-            ? VLAN_ETH_HEADER_LEN : ETH_HEADER_LEN;
-
-        /* Calculate the total matched octet count by considering as
-         * an approximation that all matched packets have the same
-         * length. */
-        layer2_octet_delta_count = packet_delta_count * ethernet_total_length;
+        struct ipfix_data_record_flow_key_common *data_common;
 
         data_common = ofpbuf_put_zeros(&msg, sizeof *data_common);
         data_common->observation_point_id = htonl(obs_point_id);
-        data_common->packet_delta_count = htonll(packet_delta_count);
-        data_common->layer2_octet_delta_count =
-            htonll(layer2_octet_delta_count);
         memcpy(data_common->source_mac_address, flow->dl_src,
                sizeof flow->dl_src);
         memcpy(data_common->destination_mac_address, flow->dl_dst,
                sizeof flow->dl_dst);
         data_common->ethernet_type = flow->dl_type;
-        data_common->ethernet_total_length = htons(ethernet_total_length);
         data_common->ethernet_header_length = ethernet_header_length;
     }
 
     if (l2 == IPFIX_PROTO_L2_VLAN) {
-        struct ipfix_data_record_vlan *data_vlan;
+        struct ipfix_data_record_flow_key_vlan *data_vlan;
         uint16_t vlan_id = vlan_tci_to_vid(flow->vlan_tci);
         uint8_t priority = vlan_tci_to_pcp(flow->vlan_tci);
 
@@ -771,7 +1063,7 @@ ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter, struct ofpbuf *packet,
     }
 
     if (l3 != IPFIX_PROTO_L3_UNKNOWN) {
-        struct ipfix_data_record_ip *data_ip;
+        struct ipfix_data_record_flow_key_ip *data_ip;
 
         data_ip = ofpbuf_put_zeros(&msg, sizeof *data_ip);
         data_ip->ip_version = (l3 == IPFIX_PROTO_L3_IPV4) ? 4 : 6;
@@ -782,12 +1074,12 @@ ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter, struct ofpbuf *packet,
         data_ip->ip_class_of_service = flow->nw_tos;
 
         if (l3 == IPFIX_PROTO_L3_IPV4) {
-            struct ipfix_data_record_ipv4 *data_ipv4;
+            struct ipfix_data_record_flow_key_ipv4 *data_ipv4;
             data_ipv4 = ofpbuf_put_zeros(&msg, sizeof *data_ipv4);
             data_ipv4->source_ipv4_address = flow->nw_src;
             data_ipv4->destination_ipv4_address = flow->nw_dst;
         } else {  /* l3 == IPFIX_PROTO_L3_IPV6 */
-            struct ipfix_data_record_ipv6 *data_ipv6;
+            struct ipfix_data_record_flow_key_ipv6 *data_ipv6;
 
             data_ipv6 = ofpbuf_put_zeros(&msg, sizeof *data_ipv6);
             memcpy(data_ipv6->source_ipv6_address, &flow->ipv6_src,
@@ -799,15 +1091,159 @@ ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter, struct ofpbuf *packet,
     }
 
     if (l4 != IPFIX_PROTO_L4_UNKNOWN) {
-        struct ipfix_data_record_tcpudp *data_tcpudp;
+        struct ipfix_data_record_flow_key_tcpudp *data_tcpudp;
 
         data_tcpudp = ofpbuf_put_zeros(&msg, sizeof *data_tcpudp);
         data_tcpudp->source_transport_port = flow->tp_src;
         data_tcpudp->destination_transport_port = flow->tp_dst;
     }
 
-    set_hdr = (struct ipfix_set_header*)((uint8_t*)msg.data + set_hdr_offset);
-    set_hdr->length = htons(msg.size - set_hdr_offset);
+    flow_key->flow_key_msg_part_size = msg.size;
+
+    /* The buffer should never be reallocated. */
+    ovs_assert(msg.base == flow_key->flow_key_msg_part);
+
+    {
+        uint64_t layer2_octet_delta_count;
+
+        /* Calculate the total matched octet count by considering as
+         * an approximation that all matched packets have the same
+         * length. */
+        layer2_octet_delta_count = packet_delta_count * ethernet_total_length;
+
+        xgettimeofday(&entry->flow_end_timestamp);
+        entry->flow_start_timestamp = entry->flow_end_timestamp;
+        entry->packet_delta_count = packet_delta_count;
+        entry->layer2_octet_delta_count = layer2_octet_delta_count;
+
+    }
+
+    if (l3 != IPFIX_PROTO_L3_UNKNOWN) {
+        uint16_t ip_total_length =
+            ethernet_total_length - ethernet_header_length;
+
+        entry->octet_delta_sum_of_squares =
+            packet_delta_count * ip_total_length * ip_total_length;
+        entry->minimum_ip_total_length = ip_total_length;
+        entry->maximum_ip_total_length = ip_total_length;
+    } else {
+        entry->octet_delta_sum_of_squares = 0;
+        entry->minimum_ip_total_length = 0;
+        entry->maximum_ip_total_length = 0;
+    }
+}
+
+/* Send each single data record in its own data set, to simplify the
+ * implementation by avoiding having to group record by template ID
+ * before sending. */
+static void
+ipfix_put_data_set(uint32_t export_time_sec,
+                   uint32_t export_time_neg_delta_usec,
+                   struct ipfix_flow_cache_entry *entry,
+                   enum ipfix_flow_end_reason flow_end_reason,
+                   struct ofpbuf *msg)
+{
+    size_t set_hdr_offset;
+    struct ipfix_set_header *set_hdr;
+
+    set_hdr_offset = msg->size;
+
+    /* Put a Data Set. */
+    set_hdr = ofpbuf_put_zeros(msg, sizeof *set_hdr);
+    set_hdr->set_id = htons(entry->flow_key.template_id);
+
+    /* Copy the flow key part of the data record. */
+
+    ofpbuf_put(msg, entry->flow_key.flow_key_msg_part,
+               entry->flow_key.flow_key_msg_part_size);
+
+    /* Put the non-key part of the data record. */
+
+    {
+        struct ipfix_data_record_aggregated_common *data_aggregated_common;
+        uint32_t flow_start_sec, flow_start_neg_delta_usec;
+        uint32_t flow_end_sec, flow_end_neg_delta_usec;
+
+        if (entry->flow_start_timestamp.tv_usec == 0) {
+            flow_start_sec = entry->flow_start_timestamp.tv_sec;
+            flow_start_neg_delta_usec = 0;
+        } else {
+            flow_start_sec = entry->flow_start_timestamp.tv_sec + 1;
+            flow_start_neg_delta_usec =
+                1000000 - entry->flow_start_timestamp.tv_usec;
+        }
+
+        if (entry->flow_end_timestamp.tv_usec == 0) {
+            flow_end_sec = entry->flow_end_timestamp.tv_sec;
+            flow_end_neg_delta_usec = 0;
+        } else {
+            flow_end_sec = entry->flow_end_timestamp.tv_sec + 1;
+            flow_end_neg_delta_usec =
+                1000000 - entry->flow_end_timestamp.tv_usec;
+        }
+
+        ovs_assert(flow_start_sec < flow_end_sec
+                   || (flow_start_sec == flow_end_sec
+                       && flow_start_neg_delta_usec
+                          >= flow_end_neg_delta_usec));
+
+        ovs_assert(flow_end_sec < export_time_sec
+                   || (flow_end_sec == export_time_sec
+                       && flow_end_neg_delta_usec
+                          >= export_time_neg_delta_usec));
+
+        data_aggregated_common = ofpbuf_put_zeros(
+            msg, sizeof *data_aggregated_common);
+        data_aggregated_common->flow_start_delta_microseconds = htonl(
+            (export_time_sec - flow_start_sec) * 1000000
+            - export_time_neg_delta_usec
+            + flow_start_neg_delta_usec);
+        data_aggregated_common->flow_end_delta_microseconds = htonl(
+            (export_time_sec - flow_end_sec) * 1000000
+            - export_time_neg_delta_usec
+            + flow_end_neg_delta_usec);
+        data_aggregated_common->packet_delta_count = htonll(
+            entry->packet_delta_count);
+        data_aggregated_common->layer2_octet_delta_count = htonll(
+            entry->layer2_octet_delta_count);
+        data_aggregated_common->flow_end_reason = flow_end_reason;
+    }
+
+    if (entry->octet_delta_sum_of_squares) {  /* IP packet. */
+        struct ipfix_data_record_aggregated_ip *data_aggregated_ip;
+
+        data_aggregated_ip = ofpbuf_put_zeros(
+            msg, sizeof *data_aggregated_ip);
+        data_aggregated_ip->octet_delta_sum_of_squares = htonll(
+            entry->octet_delta_sum_of_squares);
+        data_aggregated_ip->minimum_ip_total_length = htonll(
+            entry->minimum_ip_total_length);
+        data_aggregated_ip->maximum_ip_total_length = htonll(
+            entry->maximum_ip_total_length);
+    }
+
+    set_hdr = (struct ipfix_set_header*)((uint8_t*)msg->data + set_hdr_offset);
+    set_hdr->length = htons(msg->size - set_hdr_offset);
+}
+
+/* Send an IPFIX message with a single data record. */
+static void
+ipfix_send_data_msg(struct dpif_ipfix_exporter *exporter,
+                    uint32_t export_time_sec,
+                    uint32_t export_time_neg_delta_usec,
+                    struct ipfix_flow_cache_entry *entry,
+                    enum ipfix_flow_end_reason flow_end_reason)
+{
+    uint64_t msg_stub[DIV_ROUND_UP(MAX_MESSAGE_LEN, 8)];
+    struct ofpbuf msg;
+    ofpbuf_use_stub(&msg, msg_stub, sizeof msg_stub);
+
+    ipfix_init_header(export_time_sec, exporter->seq_number,
+                      entry->flow_key.obs_domain_id, &msg);
+    exporter->seq_number++;
+
+    ipfix_put_data_set(export_time_sec, export_time_neg_delta_usec,
+                       entry, flow_end_reason, &msg);
 
     ipfix_send_msg(exporter->collectors, &msg);
 
@@ -820,14 +1256,13 @@ dpif_ipfix_sample(struct dpif_ipfix_exporter *exporter,
                   uint64_t packet_delta_count, uint32_t obs_domain_id,
                   uint32_t obs_point_id)
 {
-    time_t now = time_wall();
-    if ((exporter->last_template_set_time + IPFIX_TEMPLATE_INTERVAL) <= now) {
-        ipfix_send_template_msg(exporter, obs_domain_id);
-        exporter->last_template_set_time = now;
-    }
+    struct ipfix_flow_cache_entry *entry;
 
-    ipfix_send_data_msg(exporter, packet, flow, packet_delta_count,
-                        obs_domain_id, obs_point_id);
+    /* Create a flow cache entry from the sample. */
+    entry = xmalloc(sizeof *entry);
+    ipfix_cache_entry_init(entry, packet, flow, packet_delta_count,
+                           obs_domain_id, obs_point_id);
+    ipfix_cache_update(exporter, entry);
 }
 
 void
@@ -864,3 +1299,144 @@ dpif_ipfix_flow_sample(struct dpif_ipfix *di, struct ofpbuf *packet,
     dpif_ipfix_sample(&node->exporter.exporter, packet, flow,
                       packet_delta_count, obs_domain_id, obs_point_id);
 }
+
+static void
+dpif_ipfix_cache_expire(struct dpif_ipfix_exporter *exporter,
+                        bool forced_end, const struct timeval *export_time,
+                        const uint32_t export_time_sec,
+                        const uint32_t export_time_neg_delta_usec)
+{
+    struct ipfix_flow_cache_entry *entry, *next_entry;
+    struct timeval active_timeout, max_flow_start_time;
+    bool template_msg_sent = false;
+    enum ipfix_flow_end_reason flow_end_reason;
+
+    if (list_is_empty(&exporter->cache_flow_start_timestamp_list)) {
+        return;
+    }
+
+    active_timeout.tv_sec = exporter->cache_active_timeout;
+    active_timeout.tv_usec = 0;
+    timersub(export_time, &active_timeout, &max_flow_start_time);
+
+    LIST_FOR_EACH_SAFE (entry, next_entry, cache_flow_start_timestamp_list_node,
+                        &exporter->cache_flow_start_timestamp_list) {
+        if (forced_end) {
+            flow_end_reason = FORCED_END;
+        } else if (!timercmp(&max_flow_start_time,
+                             &entry->flow_start_timestamp, <)) {
+            flow_end_reason = ACTIVE_TIMEOUT;
+        } else if (hmap_count(&exporter->cache_flow_key_map)
+                   > exporter->cache_max_flows) {
+            /* Enforce exporter->cache_max_flows. */
+            flow_end_reason = LACK_OF_RESOURCES;
+        } else {
+            /* Remaining flows haven't expired yet. */
+            break;
+        }
+
+        list_remove(&entry->cache_flow_start_timestamp_list_node);
+        hmap_remove(&exporter->cache_flow_key_map,
+                    &entry->flow_key_map_node);
+
+        if (!template_msg_sent
+            && (exporter->last_template_set_time + IPFIX_TEMPLATE_INTERVAL)
+                <= export_time->tv_sec) {
+            ipfix_send_template_msg(exporter, export_time_sec,
+                                    entry->flow_key.obs_domain_id);
+            exporter->last_template_set_time = export_time->tv_sec;
+            template_msg_sent = true;
+        }
+
+        /* XXX: Group multiple data records for the same obs domain id
+         * into the same message. */
+        ipfix_send_data_msg(exporter, export_time_sec,
+                            export_time_neg_delta_usec, entry, flow_end_reason);
+        free(entry);
+    }
+}
+
+static void
+get_export_time_now(struct timeval *export_time, uint32_t *export_time_sec,
+                    uint32_t *export_time_neg_delta_usec)
+{
+    xgettimeofday(export_time);
+
+    /* The IPFIX start and end deltas are negative deltas relative to
+     * the export time, so set the export time 1 second off to
+     * calculate those deltas. */
+    if (export_time->tv_usec == 0) {
+        *export_time_sec = export_time->tv_sec;
+        *export_time_neg_delta_usec = 0;
+    } else {
+        *export_time_sec = export_time->tv_sec + 1;
+        *export_time_neg_delta_usec = 1000000 - export_time->tv_usec;
+    }
+}
+
+static void
+dpif_ipfix_cache_expire_now(struct dpif_ipfix_exporter *exporter,
+                            bool forced_end)
+{
+    struct timeval export_time;
+    uint32_t export_time_sec;
+    uint32_t export_time_neg_delta_usec;
+
+    get_export_time_now(&export_time, &export_time_sec,
+                        &export_time_neg_delta_usec);
+
+    dpif_ipfix_cache_expire(exporter, forced_end, &export_time,
+                            export_time_sec, export_time_neg_delta_usec);
+}
+
+void
+dpif_ipfix_run(struct dpif_ipfix *di)
+{
+    struct timeval export_time;
+    uint32_t export_time_sec;
+    uint32_t export_time_neg_delta_usec;
+    struct dpif_ipfix_flow_exporter_map_node *flow_exporter_node;
+
+    get_export_time_now(&export_time, &export_time_sec,
+                        &export_time_neg_delta_usec);
+
+    if (di->bridge_exporter.probability > 0) {  /* Bridge exporter enabled. */
+      dpif_ipfix_cache_expire(
+          &di->bridge_exporter.exporter, false, &export_time, export_time_sec,
+          export_time_neg_delta_usec);
+    }
+    HMAP_FOR_EACH (flow_exporter_node, node, &di->flow_exporter_map) {
+        dpif_ipfix_cache_expire(
+            &flow_exporter_node->exporter.exporter, false, &export_time,
+            export_time_sec, export_time_neg_delta_usec);
+    }
+}
+
+void
+dpif_ipfix_wait(struct dpif_ipfix *di)
+{
+    long long int next_timeout_msec, temp_timeout_msec;
+    bool has_next_timeout = false;
+    struct dpif_ipfix_flow_exporter_map_node *flow_exporter_node;
+
+    if (di->bridge_exporter.probability > 0) {  /* Bridge exporter enabled. */
+        if (ipfix_cache_next_timeout_msec(
+                &di->bridge_exporter.exporter, &temp_timeout_msec)
+            && (!has_next_timeout || temp_timeout_msec < next_timeout_msec)) {
+            next_timeout_msec = temp_timeout_msec;
+            has_next_timeout = true;
+        }
+    }
+    HMAP_FOR_EACH (flow_exporter_node, node, &di->flow_exporter_map) {
+        if (ipfix_cache_next_timeout_msec(
+                &flow_exporter_node->exporter.exporter, &temp_timeout_msec)
+            && (!has_next_timeout || temp_timeout_msec < next_timeout_msec)) {
+            next_timeout_msec = temp_timeout_msec;
+            has_next_timeout = true;
+        }
+    }
+
+    if (has_next_timeout) {
+        poll_timer_wait_until(next_timeout_msec);
+    }
+}
diff --git a/ofproto/ofproto-dpif-ipfix.h b/ofproto/ofproto-dpif-ipfix.h
index c050dba..6ebf8b0 100644
--- a/ofproto/ofproto-dpif-ipfix.h
+++ b/ofproto/ofproto-dpif-ipfix.h
@@ -41,4 +41,7 @@ void dpif_ipfix_flow_sample(struct dpif_ipfix *, struct ofpbuf *,
                             const struct flow *, uint32_t, uint16_t, uint32_t,
                             uint32_t);
 
+void dpif_ipfix_run(struct dpif_ipfix *);
+void dpif_ipfix_wait(struct dpif_ipfix *);
+
 #endif /* ofproto/ofproto-dpif-ipfix.h */
diff --git a/ofproto/ofproto-dpif.c b/ofproto/ofproto-dpif.c
index 67e6c7a..cf20842 100644
--- a/ofproto/ofproto-dpif.c
+++ b/ofproto/ofproto-dpif.c
@@ -1437,6 +1437,9 @@ run(struct ofproto *ofproto_)
     if (ofproto->sflow) {
         dpif_sflow_run(ofproto->sflow);
     }
+    if (ofproto->ipfix) {
+        dpif_ipfix_run(ofproto->ipfix);
+    }
 
     HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
         port_run(ofport);
@@ -1495,6 +1498,9 @@ wait(struct ofproto *ofproto_)
     if (ofproto->sflow) {
         dpif_sflow_wait(ofproto->sflow);
     }
+    if (ofproto->ipfix) {
+        dpif_ipfix_wait(ofproto->ipfix);
+    }
     if (!tag_set_is_empty(&ofproto->backer->revalidate_set)) {
         poll_immediate_wake();
     }
@@ -1812,20 +1818,25 @@ set_ipfix(
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
     struct dpif_ipfix *di = ofproto->ipfix;
+    bool has_options = bridge_exporter_options || flow_exporters_options;
 
-    if (bridge_exporter_options || flow_exporters_options) {
-        if (!di) {
-            di = ofproto->ipfix = dpif_ipfix_create();
-        }
+    if (has_options && !di) {
+        di = ofproto->ipfix = dpif_ipfix_create();
+    }
+
+    if (di) {
+        /* Call set_options in any case to cleanly flush the flow
+         * caches in the last exporters that are to be destroyed. */
         dpif_ipfix_set_options(
             di, bridge_exporter_options, flow_exporters_options,
             n_flow_exporters_options);
-    } else {
-        if (di) {
+
+        if (!has_options) {
             dpif_ipfix_unref(di);
             ofproto->ipfix = NULL;
         }
     }
+
     return 0;
 }
 
diff --git a/ofproto/ofproto.h b/ofproto/ofproto.h
index 792df89..798bbdf 100644
--- a/ofproto/ofproto.h
+++ b/ofproto/ofproto.h
@@ -77,11 +77,15 @@ struct ofproto_ipfix_bridge_exporter_options {
     uint32_t sampling_rate;
     uint32_t obs_domain_id;  /* Bridge-wide Observation Domain ID. */
     uint32_t obs_point_id;  /* Bridge-wide Observation Point ID. */
+    uint32_t cache_active_timeout;
+    uint32_t cache_max_flows;
 };
 
 struct ofproto_ipfix_flow_exporter_options {
     uint32_t collector_set_id;
     struct sset targets;
+    uint32_t cache_active_timeout;
+    uint32_t cache_max_flows;
 };
 
 struct ofproto_stp_settings {
diff --git a/utilities/ovs-vsctl.8.in b/utilities/ovs-vsctl.8.in
index fd29b06..bdeb348 100644
--- a/utilities/ovs-vsctl.8.in
+++ b/utilities/ovs-vsctl.8.in
@@ -942,11 +942,12 @@ Deconfigure sFlow from \fBbr0\fR, which also destroys the sFlow record
 .PP
 Configure bridge \fBbr0\fR to send one IPFIX flow record per packet
 sample to UDP port 4739 on host 192.168.0.34, with Observation Domain
-ID 123 and Observation Point ID 456:
+ID 123 and Observation Point ID 456, a flow cache active timeout of 1
+minute (60 seconds), and a maximum flow cache size of 13 flows:
 .IP
 .B "ovs\-vsctl \-\- set Bridge br0 ipfix=@i \(rs"
 .IP
-.B "\-\- \-\-id=@i create IPFIX targets=\(rs\(dq192.168.0.34:4739\(rs\(dq obs_domain_id=123 obs_point_id=456"
+.B "\-\- \-\-id=@i create IPFIX targets=\(rs\(dq192.168.0.34:4739\(rs\(dq obs_domain_id=123 obs_point_id=456 cache_active_timeout=60 cache_max_flows=13"
 .PP
 Deconfigure the IPFIX settings from \fBbr0\fR, which also destroys the
 IPFIX record (since it is now unreferenced):
diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
index 686eb93..38c214b 100644
--- a/vswitchd/bridge.c
+++ b/vswitchd/bridge.c
@@ -1021,6 +1021,12 @@ bridge_configure_ipfix(struct bridge *br)
         if (be_cfg->obs_point_id) {
             be_opts.obs_point_id = *be_cfg->obs_point_id;
         }
+        if (be_cfg->cache_active_timeout) {
+            be_opts.cache_active_timeout = *be_cfg->cache_active_timeout;
+        }
+        if (be_cfg->cache_max_flows) {
+            be_opts.cache_max_flows = *be_cfg->cache_max_flows;
+        }
     }
 
     if (n_fe_opts > 0) {
@@ -1033,6 +1039,10 @@ bridge_configure_ipfix(struct bridge *br)
                 sset_init(&opts->targets);
                 sset_add_array(&opts->targets, fe_cfg->ipfix->targets,
                                fe_cfg->ipfix->n_targets);
+                opts->cache_active_timeout = fe_cfg->ipfix->cache_active_timeout
+                    ? *fe_cfg->ipfix->cache_active_timeout : 0;
+                opts->cache_max_flows = fe_cfg->ipfix->cache_max_flows
+                    ? *fe_cfg->ipfix->cache_max_flows : 0;
                 opts++;
             }
         }
diff --git a/vswitchd/vswitch.ovsschema b/vswitchd/vswitch.ovsschema
index c1c3ef4..538dad3 100644
--- a/vswitchd/vswitch.ovsschema
+++ b/vswitchd/vswitch.ovsschema
@@ -1,6 +1,6 @@
 {"name": "Open_vSwitch",
  "version": "7.3.0",
- "cksum": "1081379034 19765",
+ "cksum": "2483452374 20182",
  "tables": {
    "Open_vSwitch": {
      "columns": {
@@ -423,6 +423,16 @@
                           "minInteger": 0,
                           "maxInteger": 4294967295},
                   "min": 0, "max": 1}},
+       "cache_active_timeout": {
+         "type": {"key": {"type": "integer",
+                          "minInteger": 0,
+                          "maxInteger": 4200},
+                  "min": 0, "max": 1}},
+       "cache_max_flows": {
+         "type": {"key": {"type": "integer",
+                          "minInteger": 0,
+                          "maxInteger": 4294967295},
+                  "min": 0, "max": 1}},
        "external_ids": {
          "type": {"key": "string", "value": "string",
                   "min": 0, "max": "unlimited"}}}},
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 3385912..f65cbfb 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -3472,6 +3472,18 @@
       referenced from a <ref table="Flow_Sample_Collector_Set"/>.
     </column>
 
+    <column name="cache_active_timeout">
+      The maximum period in seconds for which an IPFIX flow record is
+      cached and aggregated before being sent.  If not specified,
+      defaults to 0.  If 0, caching is disabled.
+    </column>
+
+    <column name="cache_max_flows">
+      The maximum number of IPFIX flow records that can be cached at a
+      time.  If not specified, defaults to 0.  If 0, caching is
+      disabled.
+    </column>
+
     <group title="Common Columns">
       The overall purpose of these columns is described under <code>Common
       Columns</code> at the beginning of this document.
-- 
1.7.9.5



More information about the dev mailing list