68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
|
|
from datetime import datetime, timedelta, timezone
|
||
|
|
from uuid import uuid4
|
||
|
|
|
||
|
|
from argon2 import PasswordHasher
|
||
|
|
from argon2.exceptions import VerifyMismatchError
|
||
|
|
from fastapi import Depends, HTTPException, status
|
||
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
|
|
from jose import JWTError, jwt
|
||
|
|
|
||
|
|
from app.config import settings
|
||
|
|
|
||
|
|
ph = PasswordHasher()
|
||
|
|
bearer_scheme = HTTPBearer()
|
||
|
|
|
||
|
|
|
||
|
|
def hash_password(password: str) -> str:
|
||
|
|
return ph.hash(password)
|
||
|
|
|
||
|
|
|
||
|
|
def verify_password(password: str, hashed: str) -> bool:
|
||
|
|
try:
|
||
|
|
return ph.verify(hashed, password)
|
||
|
|
except VerifyMismatchError:
|
||
|
|
return False
|
||
|
|
|
||
|
|
|
||
|
|
def create_access_token(user_id: str) -> str:
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
payload = {
|
||
|
|
"sub": user_id,
|
||
|
|
"iat": now,
|
||
|
|
"exp": now + timedelta(minutes=settings.JWT_ACCESS_EXPIRE_MINUTES),
|
||
|
|
"type": "access",
|
||
|
|
}
|
||
|
|
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
|
||
|
|
|
||
|
|
|
||
|
|
def create_refresh_token(user_id: str) -> str:
|
||
|
|
now = datetime.now(timezone.utc)
|
||
|
|
payload = {
|
||
|
|
"sub": user_id,
|
||
|
|
"iat": now,
|
||
|
|
"exp": now + timedelta(days=settings.JWT_REFRESH_EXPIRE_DAYS),
|
||
|
|
"type": "refresh",
|
||
|
|
"jti": str(uuid4()),
|
||
|
|
}
|
||
|
|
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
|
||
|
|
|
||
|
|
|
||
|
|
def decode_token(token: str, expected_type: str = "access") -> dict:
|
||
|
|
try:
|
||
|
|
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
|
||
|
|
except JWTError:
|
||
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
||
|
|
if payload.get("type") != expected_type:
|
||
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type")
|
||
|
|
return payload
|
||
|
|
|
||
|
|
|
||
|
|
async def get_current_user_id(
|
||
|
|
credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme),
|
||
|
|
) -> str:
|
||
|
|
payload = decode_token(credentials.credentials, expected_type="access")
|
||
|
|
user_id = payload.get("sub")
|
||
|
|
if not user_id:
|
||
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
||
|
|
return user_id
|