Skip to main content

Token Bucket Rate Limiting: Implementation in Python

The token bucket algorithm is the most widely used rate limiting approach in production APIs because it handles bursts gracefully while enforcing long-term request rates. You maintain a bucket for each client that fills with tokens at a fixed rate (e.g., 10 tokens per second); each request consumes one token, and when the bucket is empty, the request is rejected. This article shows you how to implement a working token bucket rate limiter in Python from scratch, then integrate it into a Flask application.

I built the first version of this pattern for a fintech startup where clients needed to make 1,000 requests per day but could spike to 5,000 in a single hour. Token bucket solved this: we set a refill rate of 1 token per 86.4 seconds (1,000 per day) but allowed a burst bucket of 5,000 tokens. Clients got the fairness they needed without arbitrary rejection of legitimate spikes.

Core Token Bucket Logic

The token bucket has two main operations: refill (add tokens at the specified rate) and consume (subtract a token if available). The key insight is that instead of running a background timer, you compute tokens on-demand: tokens added equals (current_time - last_refill_time) * refill_rate.

Here's a minimal, correct implementation:

import time
from threading import Lock

class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
"""
Initialize the bucket.

Args:
capacity: Maximum tokens in the bucket (burst size)
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity # Start with a full bucket
self.last_refill = time.time()
self.lock = Lock()

def _refill(self):
"""Add tokens based on elapsed time since last refill."""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now

def consume(self, tokens: int = 1) -> bool:
"""
Try to consume tokens. Return True if successful, False if insufficient.
This method is thread-safe.
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False

def get_remaining(self) -> float:
"""Return the current token count without consuming any."""
with self.lock:
self._refill()
return self.tokens

The threading.Lock ensures that concurrent requests don't race. When two requests arrive simultaneously, the lock serializes the refill and consume operations, so the bucket state is always consistent.

Integrating with Flask: Per-Client Rate Limiting

In a real API, you'll want to rate limit per user or API key, not per endpoint. Here's a complete Flask example with a decorator:

from flask import Flask, request, jsonify
from collections import defaultdict
import time

app = Flask(__name__)

# One bucket per user (keyed by API key or user ID)
rate_limiters = defaultdict(lambda: TokenBucket(capacity=100, refill_rate=10))

def rate_limit(get_client_id=None):
"""
Decorator that rate-limits requests.

Args:
get_client_id: Optional callable to extract client ID from request.
Defaults to request.remote_addr (IP address).
"""
def decorator(f):
def wrapped(*args, **kwargs):
client_id = get_client_id(request) if get_client_id else request.remote_addr
bucket = rate_limiters[client_id]

if not bucket.consume():
remaining = bucket.get_remaining()
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': int((1 / bucket.refill_rate) + 1)
}), 429, {
'X-RateLimit-Limit': str(bucket.capacity),
'X-RateLimit-Remaining': str(int(remaining)),
'Retry-After': str(int((1 / bucket.refill_rate) + 1))
}

return f(*args, **kwargs)

wrapped.__name__ = f.__name__
return wrapped
return decorator

def get_api_key(request):
"""Extract API key from Authorization header."""
auth = request.headers.get('Authorization', '')
if auth.startswith('Bearer '):
return auth[7:]
return request.remote_addr

@app.route('/api/data', methods=['GET'])
@rate_limit(get_client_id=get_api_key)
def get_data():
return jsonify({'data': 'Your API data here'})

if __name__ == '__main__':
app.run(debug=True)

When a request arrives, the decorator looks up or creates a bucket for that API key, tries to consume a token, and returns 429 if the bucket is empty. The Retry-After header tells the client how long to wait before retrying.

Distributed Rate Limiting with Redis

The above solution works fine for a single server, but if you have multiple Flask instances behind a load balancer, each instance has its own bucket dictionary, and the limits aren't coordinated. Clients can make 100 requests per instance, multiplying their actual rate.

Redis solves this by providing a shared counter. Here's a distributed token bucket using Redis:

import redis
import time

class RedisTokenBucket:
def __init__(self, redis_client, key: str, capacity: int, refill_rate: float):
"""
Distributed token bucket backed by Redis.

Args:
redis_client: redis.Redis instance
key: Redis key for this bucket (e.g., 'rate_limit:user123')
capacity: Maximum tokens
refill_rate: Tokens added per second
"""
self.redis = redis_client
self.key = key
self.capacity = capacity
self.refill_rate = refill_rate

def consume(self, tokens: int = 1) -> bool:
"""Lua script atomically refills and consumes tokens."""
# This Lua script runs atomically on the Redis server
script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local tokens_to_consume = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

local data = redis.call('HGETALL', key)
local tokens = tonumber(data[2]) or capacity
local last_refill = tonumber(data[4]) or now

-- Add tokens for elapsed time
local elapsed = math.max(0, now - last_refill)
tokens = math.min(capacity, tokens + elapsed * refill_rate)

-- Try to consume
if tokens >= tokens_to_consume then
tokens = tokens - tokens_to_consume
redis.call('HSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 86400) -- Cleanup after 24 hours
return {1, tokens} -- Success, remaining tokens
end

return {0, tokens} -- Failure, but return current tokens
"""

result = self.redis.eval(script, 1, self.key,
self.capacity, self.refill_rate, tokens, time.time())
return result[0] == 1

def get_remaining(self) -> float:
"""Get current token count without consuming."""
script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local data = redis.call('HGETALL', key)
local tokens = tonumber(data[2]) or capacity
local last_refill = tonumber(data[4]) or now

local elapsed = math.max(0, now - last_refill)
tokens = math.min(capacity, tokens + elapsed * refill_rate)

return tokens
"""

return self.redis.eval(script, 1, self.key,
self.capacity, self.refill_rate, time.time())

The Lua script is atomic: Redis evaluates it as a single operation, preventing race conditions across multiple application servers. This is the production approach used by major APIs.

Monitoring and Alerting

Track these metrics to understand your rate limiting:

import logging

class MonitoredTokenBucket(TokenBucket):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rejected_count = 0
self.accepted_count = 0

def consume(self, tokens: int = 1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
self.accepted_count += 1
return True
self.rejected_count += 1
logging.warning(
f"Rate limit exceeded. Accepted: {self.accepted_count}, "
f"Rejected: {self.rejected_count}"
)
return False

Alert if rejection rate exceeds 5% of traffic—it may indicate either a misbehaving client or rate limits set too tight.

Key Takeaways

  • Token bucket fills at a fixed rate and allows bursts up to its capacity. Use it when fairness and burst tolerance both matter.
  • Implement refill on-demand by computing elapsed time, not with background timers. Simpler and race-condition-free.
  • Use threading.Lock for single-server deployments, Redis Lua scripts for distributed systems.
  • Always return standard rate-limit headers (X-RateLimit-*, Retry-After) so clients can implement smart backoff.
  • Monitor rejection rates and adjust limits based on actual backend capacity, not guesses.

Frequently Asked Questions

What is the difference between token bucket and leaky bucket?

Token bucket accumulates tokens when idle, allowing bursts. Leaky bucket has a queue that drains at a fixed rate, smoothing all traffic. Token bucket is simpler to implement and better for APIs; leaky bucket is better for traffic shaping and load balancing.

Should I use per-IP or per-API-key rate limiting?

Per-API-key for authenticated APIs (you control who the client is). Per-IP for public endpoints. If your service has corporate clients on shared networks (office, mobile carrier), per-IP unfairly groups them together. Always prefer per-account limits when possible.

How do I handle rate limiting across multiple regions?

If you have API servers in multiple data centers, use a globally distributed cache like Redis Cluster or DynamoDB. Each server writes to the same distributed state. Latency is higher, but consistency is guaranteed. Alternatively, use edge computing (Cloudflare Workers) to rate limit before traffic hits your origin.

What happens if a client ignores Retry-After and keeps sending requests?

They'll keep getting 429 responses and won't succeed until the bucket refills. This is intentional—bad clients are cut off. Monitor for patterns: if one IP/key consistently ignores rate limits, consider blacklisting it.

Can I have different limits for different endpoints?

Yes. Create separate decorators or pass the limit values as parameters. For example, /api/expensive-operation might allow 10 requests/minute while /api/list-users allows 1,000/minute. Set limits based on backend cost.

Further Reading