| from fastapi import Depends, HTTPException, status |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from jose import jwt, JWTError |
| from typing import Dict |
| from ..config import settings |
|
|
|
|
| |
| security = HTTPBearer() |
|
|
|
|
| async def get_current_user( |
| credentials: HTTPAuthorizationCredentials = Depends(security) |
| ) -> Dict[str, any]: |
| """ |
| Verify JWT token and extract user identity. |
| |
| This dependency extracts and validates the JWT token from the Authorization header. |
| User identity is extracted from the token payload, never from client input. |
| |
| Args: |
| credentials: HTTP Bearer credentials containing JWT token |
| |
| Returns: |
| Dict containing user_id and email from token payload |
| |
| Raises: |
| HTTPException: 401 if token is missing, invalid, or expired |
| """ |
| try: |
| |
| payload = jwt.decode( |
| credentials.credentials, |
| settings.JWT_SECRET, |
| algorithms=[settings.JWT_ALGORITHM] |
| ) |
|
|
| |
| user_id = payload.get("sub") |
| email = payload.get("email") |
|
|
| if user_id is None: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid token: missing user ID" |
| ) |
|
|
| return { |
| "user_id": int(user_id), |
| "email": email |
| } |
|
|
| except JWTError as e: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail=f"Invalid token: {str(e)}" |
| ) |
| except ValueError: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid token: malformed user ID" |
| ) |
|
|