← Back to Python

All Topics

Advertisement

Learn/Python/API Development

API Rate Limiting - Throttling, Quotas, Redis-Based

Topic: API Rate Limiting

Advertisement

Introduction

Rate limiting protects APIs from abuse and ensures fair resource allocation. This tutorial covers various rate limiting strategies including token bucket, sliding window, and Redis-based implementations.

In-Memory Rate Limiter

from time import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = defaultdict(list)
    
    def is_allowed(self, key: str) -> bool:
        now = time()
        window_start = now - self.window
        
        # Remove old requests
        self.requests[key] = [
            t for t in self.requests[key] if t > window_start
        ]
        
        if len(self.requests[key]) >= self.max_requests:
            return False
        
        self.requests[key].append(now)
        return True
    
    def get_remaining(self, key: str) -> int:
        now = time()
        window_start = now - self.window
        
        current_count = len([
            t for t in self.requests[key] if t > window_start
        ])
        
        return max(0, self.max_requests - current_count)

limiter = RateLimiter(max_requests=100, window_seconds=60)

# Usage
if limiter.is_allowed("user123"):
    # Process request
    pass
else:
    # Return 429 Too Many Requests
    pass

Token Bucket Algorithm

import threading

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time()
        self.lock = threading.Lock()
    
    def consume(self, tokens: int = 1) -> bool:
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def _refill(self):
        now = time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now
    
    def get_tokens(self) -> float:
        with self.lock:
            self._refill()
            return self.tokens

# Usage
bucket = TokenBucket(capacity=100, refill_rate=1.0)  # 1 token per second

if bucket.consume():
    # Process request
    pass

Redis-Based Rate Limiter

import redis
from time import time

class RedisRateLimiter:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
    
    def is_allowed(self, key: str, max_requests: int, window: int) -> bool:
        now = time()
        window_start = now - window
        
        pipe = self.redis.pipeline()
        
        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        pipe.zcard(key)
        
        # Add new request
        pipe.zadd(key, {str(now): now})
        
        # Set expiry on the key
        pipe.expire(key, window)
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count > max_requests:
            # Remove the just-added request
            self.redis.zrem(key, str(now))
            return False
        
        return True
    
    def get_window(self, key: str) -> dict:
        now = time()
        count = self.redis.zcard(key)
        ttl = self.redis.ttl(key)
        
        return {
            "requests": count,
            "remaining": max(0, 100 - count),
            "reset_in": ttl
        }

# Usage
limiter = RedisRateLimiter()

@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    client_ip = request.client.host
    
    if not limiter.is_allowed(f"ip:{client_ip}", 100, 60):
        return JSONResponse(
            {"error": "Rate limit exceeded"},
            status_code=429,
            headers={"Retry-After": "60"}
        )
    
    return await call_next(request)

FastAPI Rate Limiter

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

# Using slowapi
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()

@app.get("/items")
@limiter.limit("100/minute")
async def list_items(request: Request):
    return {"items": ["item1", "item2"]}

@app.get("/users")
@limiter.limit("10/second")
async def list_users(request: Request):
    return {"users": ["user1", "user2"]}

Practice Problems

  1. Implement a distributed rate limiter using Redis
  2. Add rate limiting per API endpoint
  3. Create rate limiting with burst allowance
  4. Implement rate limit headers (X-RateLimit-Limit, etc.)
  5. Add rate limiting with different tiers (free, premium)

Advertisement

Advertisement

Need More Practice?

Get personalized Python help from ChatWhole's AI-powered platform.

Get Expert Help →