Files
2025-12-06 18:54:43 +03:00

86 lines
2.2 KiB
Python

from app.schemas.user import UserSchema
from app.db.demo import users_db
from app.utils.bcrypt_utils import validate_password, hash_password
from app.utils.jwt_utlis import decode_jwt
from fastapi import Form, HTTPException, Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jwt.exceptions import InvalidTokenError
http_bearer = HTTPBearer()
def validate_auth_user(
username: str = Form(),
password: str = Form()
) -> UserSchema | HTTPException:
if not (user := users_db.get(username)):
raise HTTPException(
status_code=401,
detail="Invalid username or password"
)
is_password_valid = validate_password(password=password, hashed_password=user.password)
if not is_password_valid:
raise HTTPException(
status_code=401,
detail="Invalid username or password")
return user
def validate_register_user(
username: str = Form(),
password: str = Form(),
confirm_password: str = Form()
) -> UserSchema | HTTPException:
if users_db.get(username):
raise HTTPException(
status_code=409,
detail="User with this name already exists"
)
if password != confirm_password:
raise HTTPException(
status_code=400,
detail="Passwords don't match"
)
user = UserSchema(
username=username,
password=hash_password(password)
)
return user
def get_current_token_payload(
credentials: HTTPAuthorizationCredentials = Depends(http_bearer)
) -> UserSchema | HTTPException:
token = credentials.credentials
try:
payload = decode_jwt(
token=token
)
except InvalidTokenError:
raise HTTPException(
status_code=401,
detail="Invalid token error"
)
return payload
def get_current_auth_user(
payload: dict = Depends(get_current_token_payload)
) -> UserSchema | HTTPException:
username: str = payload.get("username")
if not (user := users_db.get(username)):
raise HTTPException(
status_code=401,
detail="Token invalid" # for real user not found
)
return user