from datetime import datetime, timedelta, timezone from uuid import uuid4 from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError, jwt from app.config import settings ph = PasswordHasher() bearer_scheme = HTTPBearer() def hash_password(password: str) -> str: return ph.hash(password) def verify_password(password: str, hashed: str) -> bool: try: return ph.verify(hashed, password) except VerifyMismatchError: return False def create_access_token(user_id: str) -> str: now = datetime.now(timezone.utc) payload = { "sub": user_id, "iat": now, "exp": now + timedelta(minutes=settings.JWT_ACCESS_EXPIRE_MINUTES), "type": "access", } return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) def create_refresh_token(user_id: str) -> str: now = datetime.now(timezone.utc) payload = { "sub": user_id, "iat": now, "exp": now + timedelta(days=settings.JWT_REFRESH_EXPIRE_DAYS), "type": "refresh", "jti": str(uuid4()), } return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str, expected_type: str = "access") -> dict: try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") if payload.get("type") != expected_type: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type") return payload async def get_current_user_id( credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme), ) -> str: payload = decode_token(credentials.credentials, expected_type="access") user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") return user_id