File size: 3,523 Bytes
310260a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | from fastapi import APIRouter, HTTPException, status, Depends
from pydantic import BaseModel, EmailStr
from sqlmodel import Session, select
from passlib.context import CryptContext
from jose import jwt
from datetime import datetime, timedelta
from ..models.user import User
from ..database import get_session
from ..config import settings
router = APIRouter(prefix="/api/auth", tags=["Authentication"])
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class SignupRequest(BaseModel):
email: EmailStr
password: str
class SigninRequest(BaseModel):
email: EmailStr
password: str
class AuthResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: dict
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(user_id: int, email: str) -> str:
"""Create a JWT access token."""
expire = datetime.utcnow() + timedelta(hours=settings.JWT_EXPIRATION_HOURS)
to_encode = {
"sub": str(user_id),
"email": email,
"exp": expire
}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
@router.post("/signup", response_model=AuthResponse)
async def signup(
request: SignupRequest,
session: Session = Depends(get_session)
):
"""
Register a new user.
- Validates email uniqueness
- Hashes password
- Creates user in database
- Returns JWT token
"""
# Check if user already exists
existing_user = session.exec(
select(User).where(User.email == request.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create new user
hashed_password = hash_password(request.password)
new_user = User(
email=request.email,
password_hash=hashed_password
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create access token
access_token = create_access_token(new_user.id, new_user.email)
return AuthResponse(
access_token=access_token,
user={
"id": new_user.id,
"email": new_user.email
}
)
@router.post("/signin", response_model=AuthResponse)
async def signin(
request: SigninRequest,
session: Session = Depends(get_session)
):
"""
Authenticate a user.
- Validates credentials
- Returns JWT token
"""
# Find user by email
user = session.exec(
select(User).where(User.email == request.email)
).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Verify password
if not verify_password(request.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password"
)
# Create access token
access_token = create_access_token(user.id, user.email)
return AuthResponse(
access_token=access_token,
user={
"id": user.id,
"email": user.email
}
)
|