← Back to Python

All Topics

Advertisement

Learn/Python/Authentication

OAuth2 Implementation - Authorization Flow, Tokens

Topic: OAuth2 Implementation

Advertisement

Introduction

OAuth2 is an authorization framework that enables applications to obtain limited access to user accounts on HTTP services. This tutorial covers various OAuth2 flows and implementation in Python.

Authorization Code Flow

# oauth_flow.py
import httpx
from urllib.parse import urlencode, parse_qs

CLIENT_ID = "your-client-id"
CLIENT_SECRET = "your-client-secret"
REDIRECT_URI = "http://localhost:8000/callback"

# Step 1: Redirect user to authorization URL
def get_authorization_url(state: str):
    params = {
        "client_id": CLIENT_ID,
        "redirect_uri": REDIRECT_URI,
        "response_type": "code",
        "scope": "read:user write:repos",
        "state": state
    }
    return f"https://github.com/login/oauth/authorize?{urlencode(params)}"

# Step 2: Exchange authorization code for tokens
def exchange_code_for_token(code: str) -> dict:
    response = httpx.post(
        "https://github.com/login/oauth/access_token",
        data={
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "code": code,
            "redirect_uri": REDIRECT_URI
        },
        headers={"Accept": "application/json"}
    )
    return response.json()

# Access token response
# {
#     "access_token": "...",
#     "token_type": "bearer",
#     "scope": "read:user write:repos",
#     "expires_in": 3600
# }

OAuth2 Client Implementation

# oauth_client.py
import httpx
from typing import Optional
from datetime import datetime, timedelta

class OAuth2Client:
    def __init__(
        self,
        client_id: str,
        client_secret: str,
        authorize_url: str,
        token_url: str,
        redirect_uri: str
    ):
        self.client_id = client_id
        self.client_secret = client_secret
        self.authorize_url = authorize_url
        self.token_url = token_url
        self.redirect_uri = redirect_uri
        self.access_token: Optional[str] = None
        self.refresh_token: Optional[str] = None
        self.expires_at: Optional[datetime] = None
    
    def get_authorization_url(self, state: str, scope: str) -> str:
        params = {
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "response_type": "code",
            "scope": scope,
            "state": state
        }
        return f"{self.authorize_url}?{urlencode(params)}"
    
    def fetch_token(self, code: str) -> dict:
        response = httpx.post(
            self.token_url,
            data={
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "code": code,
                "grant_type": "authorization_code",
                "redirect_uri": self.redirect_uri
            }
        )
        token_data = response.json()
        
        self.access_token = token_data["access_token"]
        if "refresh_token" in token_data:
            self.refresh_token = token_data["refresh_token"]
        
        if "expires_in" in token_data:
            self.expires_at = datetime.utcnow() + timedelta(
                seconds=token_data["expires_in"]
            )
        
        return token_data
    
    def refresh_access_token(self) -> dict:
        response = httpx.post(
            self.token_url,
            data={
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "refresh_token": self.refresh_token,
                "grant_type": "refresh_token"
            }
        )
        return self.fetch_token(response.json()["access_token"])
    
    def get(self, url: str, **kwargs) -> httpx.Response:
        if self._is_token_expired():
            self.refresh_access_token()
        
        headers = kwargs.get("headers", {})
        headers["Authorization"] = f"Bearer {self.access_token}"
        kwargs["headers"] = headers
        
        return httpx.get(url, **kwargs)
    
    def _is_token_expired(self) -> bool:
        return self.expires_at and datetime.utcnow() >= self.expires_at

OAuth2 with FastAPI

# fastapi_oauth.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse

app = FastAPI()

oauth_client = OAuth2Client(
    client_id="your-client-id",
    client_secret="your-client-secret",
    authorize_url="https://github.com/login/oauth/authorize",
    token_url="https://github.com/login/oauth/access_token",
    redirect_uri="http://localhost:8000/callback"
)

@app.get("/login")
async def login():
    state = "random-state-string"
    url = oauth_client.get_authorization_url(state, "read:user")
    return RedirectResponse(url)

@app.get("/callback")
async def callback(code: str):
    tokens = oauth_client.fetch_token(code)
    return {"access_token": tokens["access_token"]}

Practice Problems

  1. Implement the Implicit grant flow for client-side applications
  2. Add PKCE (Proof Key for Code Exchange) to authorization flow
  3. Create a custom OAuth2 provider
  4. Implement token storage with encryption
  5. Add multi-provider support (Google, GitHub, Facebook)

Advertisement

Advertisement

Need More Practice?

Get personalized Python help from ChatWhole's AI-powered platform.

Get Expert Help →