| from fastapi import APIRouter, HTTPException, status, Depends |
| from pydantic import BaseModel, EmailStr |
| from sqlmodel import Session, select |
| from passlib.context import CryptContext |
| from jose import jwt |
| from datetime import datetime, timedelta |
| from ..models.user import User |
| from ..database import get_session |
| from ..config import settings |
|
|
| router = APIRouter(prefix="/api/auth", tags=["Authentication"]) |
|
|
| |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
|
|
| class SignupRequest(BaseModel): |
| email: EmailStr |
| password: str |
|
|
|
|
| class SigninRequest(BaseModel): |
| email: EmailStr |
| password: str |
|
|
|
|
| class AuthResponse(BaseModel): |
| access_token: str |
| token_type: str = "bearer" |
| user: dict |
|
|
|
|
| def hash_password(password: str) -> str: |
| """Hash a password using bcrypt.""" |
| return pwd_context.hash(password) |
|
|
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify a password against its hash.""" |
| return pwd_context.verify(plain_password, hashed_password) |
|
|
|
|
| def create_access_token(user_id: int, email: str) -> str: |
| """Create a JWT access token.""" |
| expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRATION_HOURS) |
| to_encode = { |
| "sub": str(user_id), |
| "email": email, |
| "exp": expire |
| } |
| encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) |
| return encoded_jwt |
|
|
|
|
| @router.post("/signup", response_model=AuthResponse) |
| async def signup( |
| request: SignupRequest, |
| session: Session = Depends(get_session) |
| ): |
| """ |
| Register a new user. |
| |
| - Validates email uniqueness |
| - Hashes password |
| - Creates user in database |
| - Returns JWT token |
| """ |
| |
| existing_user = session.exec( |
| select(User).where(User.email == request.email) |
| ).first() |
|
|
| if existing_user: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="Email already registered" |
| ) |
|
|
| |
| hashed_password = hash_password(request.password) |
| new_user = User( |
| email=request.email, |
| password_hash=hashed_password |
| ) |
|
|
| session.add(new_user) |
| session.commit() |
| session.refresh(new_user) |
|
|
| |
| access_token = create_access_token(new_user.id, new_user.email) |
|
|
| return AuthResponse( |
| access_token=access_token, |
| user={ |
| "id": new_user.id, |
| "email": new_user.email |
| } |
| ) |
|
|
|
|
| @router.post("/signin", response_model=AuthResponse) |
| async def signin( |
| request: SigninRequest, |
| session: Session = Depends(get_session) |
| ): |
| """ |
| Authenticate a user. |
| |
| - Validates credentials |
| - Returns JWT token |
| """ |
| |
| user = session.exec( |
| select(User).where(User.email == request.email) |
| ).first() |
|
|
| if not user: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid email or password" |
| ) |
|
|
| |
| if not verify_password(request.password, user.password_hash): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid email or password" |
| ) |
|
|
| |
| access_token = create_access_token(user.id, user.email) |
|
|
| return AuthResponse( |
| access_token=access_token, |
| user={ |
| "id": user.id, |
| "email": user.email |
| } |
| ) |
|
|