[ovs-dev] [token bucket 3/3] token-bucket: New library for generic rate-limiting.

Ben Pfaff blp at nicira.com
Thu May 31 00:16:55 UTC 2012


This commit converts two rate-limiters in the tree to use the library.
I intend to use the library elsewhere in the future.

Signed-off-by: Ben Pfaff <blp at nicira.com>
---
 lib/automake.mk    |    2 +
 lib/token-bucket.c |   96 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 lib/token-bucket.h |   42 +++++++++++++++++++++++
 lib/vlog.c         |   25 +++----------
 lib/vlog.h         |   35 +++++++------------
 ofproto/pinsched.c |   86 ++++++++++++++++------------------------------
 6 files changed, 189 insertions(+), 97 deletions(-)
 create mode 100644 lib/token-bucket.c
 create mode 100644 lib/token-bucket.h

diff --git a/lib/automake.mk b/lib/automake.mk
index 1d404c2..35589a9 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -171,6 +171,8 @@ lib_libopenvswitch_a_SOURCES = \
 	lib/timer.h \
 	lib/timeval.c \
 	lib/timeval.h \
+	lib/token-bucket.c \
+	lib/token-bucket.h \
 	lib/type-props.h \
 	lib/unaligned.h \
 	lib/unicode.c \
diff --git a/lib/token-bucket.c b/lib/token-bucket.c
new file mode 100644
index 0000000..73f11a8
--- /dev/null
+++ b/lib/token-bucket.c
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "token-bucket.h"
+
+#include "poll-loop.h"
+#include "sat-math.h"
+#include "timeval.h"
+#include "util.h"
+
+/* Initializes 'tb' to accumulate 'rate' tokens per millisecond, with a
+ * maximum of 'burst' tokens.
+ *
+ * The token bucket is initially full.
+ *
+ * It may be more convenient to use TOKEN_BUCKET_INIT. */
+void
+token_bucket_init(struct token_bucket *tb,
+                  unsigned int rate, unsigned int burst)
+{
+    tb->rate = rate;
+    tb->burst = burst;
+    tb->tokens = 0;
+    tb->last_fill = LLONG_MIN;
+}
+
+/* Changes 'tb' to accumulate 'rate' tokens per millisecond, with a maximum of
+ * 'burst' tokens.
+ *
+ * 'tb' must already have been initialized with TOKEN_BUCKET_INIT or
+ * token_bucket_init(). */
+void
+token_bucket_set(struct token_bucket *tb,
+                 unsigned int rate, unsigned int burst)
+{
+    tb->rate = rate;
+    tb->burst = burst;
+    if (burst > tb->tokens) {
+        tb->tokens = burst;
+    }
+}
+
+/* Attempts to remove 'n' tokens from 'tb'.  Returns true if successful, false
+ * if 'tb' contained fewer than 'n' tokens (and thus 'n' tokens could not be
+ * removed) . */
+bool
+token_bucket_withdraw(struct token_bucket *tb, unsigned int n)
+{
+    if (tb->tokens < n) {
+        long long int now = time_msec();
+        if (now > tb->last_fill) {
+            unsigned long long int elapsed_ull
+                = (unsigned long long int) now - tb->last_fill;
+            unsigned int elapsed = MIN(UINT_MAX, elapsed_ull);
+            unsigned int add = sat_mul(tb->rate, elapsed);
+            unsigned int tokens = sat_add(tb->tokens, add);
+            tb->tokens = MIN(tokens, tb->burst);
+            tb->last_fill = now;
+        }
+
+        if (tb->tokens < n) {
+            return false;
+        }
+    }
+
+    tb->tokens -= n;
+    return true;
+}
+
+/* Causes the poll loop to wake up when at least 'n' tokens will be available
+ * for withdrawal from 'tb'. */
+void
+token_bucket_wait(struct token_bucket *tb, unsigned int n)
+{
+    if (tb->tokens >= n) {
+        poll_immediate_wake();
+    } else {
+        unsigned int need = n - tb->tokens;
+        poll_timer_wait_until(tb->last_fill + need / tb->rate + 1);
+    }
+}
diff --git a/lib/token-bucket.h b/lib/token-bucket.h
new file mode 100644
index 0000000..ef2b1ae
--- /dev/null
+++ b/lib/token-bucket.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2012 Nicira, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef TOKEN_BUCKET_H
+#define TOKEN_BUCKET_H 1
+
+#include <limits.h>
+#include <stdbool.h>
+
+struct token_bucket {
+    /* Configuration settings. */
+    unsigned int rate;          /* Tokens added per millisecond. */
+    unsigned int burst;         /* Max cumulative tokens credit. */
+
+    /* Current status. */
+    unsigned int tokens;        /* Current number of tokens. */
+    long long int last_fill;    /* Last time tokens added. */
+};
+
+#define TOKEN_BUCKET_INIT(RATE, BURST) { RATE, BURST, 0, LLONG_MIN }
+
+void token_bucket_init(struct token_bucket *,
+                       unsigned int rate, unsigned int burst);
+void token_bucket_set(struct token_bucket *,
+                       unsigned int rate, unsigned int burst);
+bool token_bucket_withdraw(struct token_bucket *, unsigned int n);
+void token_bucket_wait(struct token_bucket *, unsigned int n);
+
+#endif /* token-bucket.h */
diff --git a/lib/vlog.c b/lib/vlog.c
index a7d9e48..4b4465a 100644
--- a/lib/vlog.c
+++ b/lib/vlog.c
@@ -762,28 +762,15 @@ vlog_should_drop(const struct vlog_module *module, enum vlog_level level,
         return true;
     }
 
-    if (rl->tokens < VLOG_MSG_TOKENS) {
+    if (!token_bucket_withdraw(&rl->token_bucket, VLOG_MSG_TOKENS)) {
         time_t now = time_now();
-        if (rl->last_fill > now) {
-            /* Last filled in the future?  Time must have gone backward, or
-             * 'rl' has not been used before. */
-            rl->tokens = rl->burst;
-        } else if (rl->last_fill < now) {
-            unsigned int add = sat_mul(rl->rate, now - rl->last_fill);
-            unsigned int tokens = sat_add(rl->tokens, add);
-            rl->tokens = MIN(tokens, rl->burst);
-            rl->last_fill = now;
-        }
-        if (rl->tokens < VLOG_MSG_TOKENS) {
-            if (!rl->n_dropped) {
-                rl->first_dropped = now;
-            }
-            rl->last_dropped = now;
-            rl->n_dropped++;
-            return true;
+        if (!rl->n_dropped) {
+            rl->first_dropped = now;
         }
+        rl->last_dropped = now;
+        rl->n_dropped++;
+        return true;
     }
-    rl->tokens -= VLOG_MSG_TOKENS;
 
     if (rl->n_dropped) {
         time_t now = time_now();
diff --git a/lib/vlog.h b/lib/vlog.h
index 2dce3c6..e15e441 100644
--- a/lib/vlog.h
+++ b/lib/vlog.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
 #include <stdbool.h>
 #include <time.h>
 #include "compiler.h"
+#include "sat-math.h"
+#include "token-bucket.h"
 #include "util.h"
 
 #ifdef  __cplusplus
@@ -87,35 +89,24 @@ struct vlog_module *vlog_module_from_name(const char *name);
 
 /* Rate-limiter for log messages. */
 struct vlog_rate_limit {
-    /* Configuration settings. */
-    unsigned int rate;          /* Tokens per second. */
-    unsigned int burst;         /* Max cumulative tokens credit. */
-
-    /* Current status. */
-    unsigned int tokens;        /* Current number of tokens. */
-    time_t last_fill;           /* Last time tokens added. */
+    struct token_bucket token_bucket;
     time_t first_dropped;       /* Time first message was dropped. */
     time_t last_dropped;        /* Time of most recent message drop. */
     unsigned int n_dropped;     /* Number of messages dropped. */
 };
 
-/* Number of tokens to emit a message.  We add 'rate' tokens per second, which
- * is 60 times the unit used for 'rate', thus 60 tokens are required to emit
- * one message. */
-#define VLOG_MSG_TOKENS 60
+/* Number of tokens to emit a message.  We add 'rate' tokens per millisecond,
+ * thus 60,000 tokens are required to emit one message per minute. */
+#define VLOG_MSG_TOKENS (60 * 1000)
 
 /* Initializer for a struct vlog_rate_limit, to set up a maximum rate of RATE
  * messages per minute and a maximum burst size of BURST messages. */
-#define VLOG_RATE_LIMIT_INIT(RATE, BURST)                   \
-        {                                                   \
-            RATE,                           /* rate */      \
-            (MIN(BURST, UINT_MAX / VLOG_MSG_TOKENS)         \
-             * VLOG_MSG_TOKENS),            /* burst */     \
-            0,                              /* tokens */    \
-            0,                              /* last_fill */ \
-            0,                              /* first_dropped */ \
-            0,                              /* last_dropped */ \
-            0,                              /* n_dropped */ \
+#define VLOG_RATE_LIMIT_INIT(RATE, BURST)                               \
+        {                                                               \
+            TOKEN_BUCKET_INIT(RATE, SAT_MUL(BURST, VLOG_MSG_TOKENS)),   \
+            0,                              /* first_dropped */         \
+            0,                              /* last_dropped */          \
+            0,                              /* n_dropped */             \
         }
 
 /* Configuring how each module logs messages. */
diff --git a/ofproto/pinsched.c b/ofproto/pinsched.c
index 4a939b8..57e8e23 100644
--- a/ofproto/pinsched.c
+++ b/ofproto/pinsched.c
@@ -28,7 +28,9 @@
 #include "poll-loop.h"
 #include "random.h"
 #include "rconn.h"
+#include "sat-math.h"
 #include "timeval.h"
+#include "token-bucket.h"
 #include "vconn.h"
 
 struct pinqueue {
@@ -39,24 +41,13 @@ struct pinqueue {
 };
 
 struct pinsched {
-    /* Client-supplied parameters. */
-    int rate_limit;           /* Packets added to bucket per second. */
-    int burst_limit;          /* Maximum token bucket size, in packets. */
+    struct token_bucket token_bucket;
 
     /* One queue per physical port. */
     struct hmap queues;         /* Contains "struct pinqueue"s. */
     int n_queued;               /* Sum over queues[*].n. */
     struct pinqueue *next_txq;  /* Next pinqueue check in round-robin. */
 
-    /* Token bucket.
-     *
-     * It costs 1000 tokens to send a single packet_in message.  A single token
-     * per message would be more straightforward, but this choice lets us avoid
-     * round-off error in refill_bucket()'s calculation of how many tokens to
-     * add to the bucket, since no division step is needed. */
-    long long int last_fill;    /* Time at which we last added tokens. */
-    int tokens;                 /* Current number of tokens. */
-
     /* Transmission queue. */
     int n_txq;                  /* No. of packets waiting in rconn for tx. */
 
@@ -86,6 +77,20 @@ dequeue_packet(struct pinsched *ps, struct pinqueue *q)
     return packet;
 }
 
+static void
+adjust_limits(int *rate_limit, int *burst_limit)
+{
+    if (*rate_limit <= 0) {
+        *rate_limit = 1000;
+    }
+    if (*burst_limit <= 0) {
+        *burst_limit = *rate_limit / 4;
+    }
+    if (*burst_limit < 1) {
+        *burst_limit = 1;
+    }
+}
+
 /* Destroys 'q' and removes it from 'ps''s set of queues.
  * (The caller must ensure that 'q' is empty.) */
 static void
@@ -169,30 +174,13 @@ get_tx_packet(struct pinsched *ps)
     return packet;
 }
 
-/* Add tokens to the bucket based on elapsed time. */
-static void
-refill_bucket(struct pinsched *ps)
-{
-    long long int now = time_msec();
-    long long int tokens = (now - ps->last_fill) * ps->rate_limit + ps->tokens;
-    if (tokens >= 1000) {
-        ps->last_fill = now;
-        ps->tokens = MIN(tokens, ps->burst_limit * 1000);
-    }
-}
-
 /* Attempts to remove enough tokens from 'ps' to transmit a packet.  Returns
  * true if successful, false otherwise.  (In the latter case no tokens are
  * removed.) */
 static bool
 get_token(struct pinsched *ps)
 {
-    if (ps->tokens >= 1000) {
-        ps->tokens -= 1000;
-        return true;
-    } else {
-        return false;
-    }
+    return token_bucket_withdraw(&ps->token_bucket, 1000);
 }
 
 void
@@ -216,7 +204,7 @@ pinsched_send(struct pinsched *ps, uint16_t port_no,
          * otherwise wasted space. */
         ofpbuf_trim(packet);
 
-        if (ps->n_queued >= ps->burst_limit) {
+        if (ps->n_queued * 1000 >= ps->token_bucket.burst) {
             drop_packet(ps);
         }
         q = pinqueue_get(ps, port_no);
@@ -235,7 +223,6 @@ pinsched_run(struct pinsched *ps, pinsched_tx_cb *cb, void *aux)
 
         /* Drain some packets out of the bucket if possible, but limit the
          * number of iterations to allow other code to get work done too. */
-        refill_bucket(ps);
         for (i = 0; ps->n_queued && get_token(ps) && i < 50; i++) {
             cb(get_tx_packet(ps), aux);
         }
@@ -246,14 +233,7 @@ void
 pinsched_wait(struct pinsched *ps)
 {
     if (ps && ps->n_queued) {
-        if (ps->tokens >= 1000) {
-            /* We can transmit more packets as soon as we're called again. */
-            poll_immediate_wake();
-        } else {
-            /* We have to wait for the bucket to re-fill.  We could calculate
-             * the exact amount of time here for increased smoothness. */
-            poll_timer_wait(TIME_UPDATE_INTERVAL / 2);
-        }
+        token_bucket_wait(&ps->token_bucket, 1000);
     }
 }
 
@@ -264,16 +244,18 @@ pinsched_create(int rate_limit, int burst_limit)
     struct pinsched *ps;
 
     ps = xzalloc(sizeof *ps);
+
+    adjust_limits(&rate_limit, &burst_limit);
+    token_bucket_init(&ps->token_bucket,
+                      rate_limit, sat_mul(burst_limit, 1000));
+
     hmap_init(&ps->queues);
     ps->n_queued = 0;
     ps->next_txq = NULL;
-    ps->last_fill = time_msec();
-    ps->tokens = rate_limit * 1000;
     ps->n_txq = 0;
     ps->n_normal = 0;
     ps->n_limited = 0;
     ps->n_queue_dropped = 0;
-    pinsched_set_limits(ps, rate_limit, burst_limit);
 
     return ps;
 }
@@ -298,24 +280,16 @@ void
 pinsched_get_limits(const struct pinsched *ps,
                     int *rate_limit, int *burst_limit)
 {
-    *rate_limit = ps->rate_limit;
-    *burst_limit = ps->burst_limit;
+    *rate_limit = ps->token_bucket.rate;
+    *burst_limit = ps->token_bucket.burst / 1000;
 }
 
 void
 pinsched_set_limits(struct pinsched *ps, int rate_limit, int burst_limit)
 {
-    if (rate_limit <= 0) {
-        rate_limit = 1000;
-    }
-    if (burst_limit <= 0) {
-        burst_limit = rate_limit / 4;
-    }
-    burst_limit = MAX(burst_limit, 1);
-    burst_limit = MIN(burst_limit, INT_MAX / 1000);
-
-    ps->rate_limit = rate_limit;
-    ps->burst_limit = burst_limit;
+    adjust_limits(&rate_limit, &burst_limit);
+    token_bucket_set(&ps->token_bucket,
+                     rate_limit, sat_mul(burst_limit, 1000));
     while (ps->n_queued > burst_limit) {
         drop_packet(ps);
     }
-- 
1.7.2.5




More information about the dev mailing list