Introduction
Rate limiting protects APIs from abuse and ensures fair resource allocation. This tutorial covers various rate limiting strategies including token bucket, sliding window, and Redis-based implementations.
In-Memory Rate Limiter
from time import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, key: str) -> bool:
now = time()
window_start = now - self.window
# Remove old requests
self.requests[key] = [
t for t in self.requests[key] if t > window_start
]
if len(self.requests[key]) >= self.max_requests:
return False
self.requests[key].append(now)
return True
def get_remaining(self, key: str) -> int:
now = time()
window_start = now - self.window
current_count = len([
t for t in self.requests[key] if t > window_start
])
return max(0, self.max_requests - current_count)
limiter = RateLimiter(max_requests=100, window_seconds=60)
# Usage
if limiter.is_allowed("user123"):
# Process request
pass
else:
# Return 429 Too Many Requests
pass
Token Bucket Algorithm
import threading
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time()
self.lock = threading.Lock()
def consume(self, tokens: int = 1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def _refill(self):
now = time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
def get_tokens(self) -> float:
with self.lock:
self._refill()
return self.tokens
# Usage
bucket = TokenBucket(capacity=100, refill_rate=1.0) # 1 token per second
if bucket.consume():
# Process request
pass
Redis-Based Rate Limiter
import redis
from time import time
class RedisRateLimiter:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
def is_allowed(self, key: str, max_requests: int, window: int) -> bool:
now = time()
window_start = now - window
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count current requests
pipe.zcard(key)
# Add new request
pipe.zadd(key, {str(now): now})
# Set expiry on the key
pipe.expire(key, window)
results = pipe.execute()
current_count = results[1]
if current_count > max_requests:
# Remove the just-added request
self.redis.zrem(key, str(now))
return False
return True
def get_window(self, key: str) -> dict:
now = time()
count = self.redis.zcard(key)
ttl = self.redis.ttl(key)
return {
"requests": count,
"remaining": max(0, 100 - count),
"reset_in": ttl
}
# Usage
limiter = RedisRateLimiter()
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
client_ip = request.client.host
if not limiter.is_allowed(f"ip:{client_ip}", 100, 60):
return JSONResponse(
{"error": "Rate limit exceeded"},
status_code=429,
headers={"Retry-After": "60"}
)
return await call_next(request)
FastAPI Rate Limiter
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
# Using slowapi
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
@app.get("/items")
@limiter.limit("100/minute")
async def list_items(request: Request):
return {"items": ["item1", "item2"]}
@app.get("/users")
@limiter.limit("10/second")
async def list_users(request: Request):
return {"users": ["user1", "user2"]}
Practice Problems
- Implement a distributed rate limiter using Redis
- Add rate limiting per API endpoint
- Create rate limiting with burst allowance
- Implement rate limit headers (X-RateLimit-Limit, etc.)
- Add rate limiting with different tiers (free, premium)