from app.schemas.user import UserSchema from app.db.demo import users_db from app.utils.bcrypt_utils import validate_password, hash_password from app.utils.jwt_utlis import decode_jwt from fastapi import Form, HTTPException, Depends from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jwt.exceptions import InvalidTokenError http_bearer = HTTPBearer() def validate_auth_user( username: str = Form(), password: str = Form() ) -> UserSchema | HTTPException: if not (user := users_db.get(username)): raise HTTPException( status_code=401, detail="Invalid username or password" ) is_password_valid = validate_password(password=password, hashed_password=user.password) if not is_password_valid: raise HTTPException( status_code=401, detail="Invalid username or password") return user def validate_register_user( username: str = Form(), password: str = Form(), confirm_password: str = Form() ) -> UserSchema | HTTPException: if users_db.get(username): raise HTTPException( status_code=409, detail="User with this name already exists" ) if password != confirm_password: raise HTTPException( status_code=400, detail="Passwords don't match" ) user = UserSchema( username=username, password=hash_password(password) ) return user def get_current_token_payload( credentials: HTTPAuthorizationCredentials = Depends(http_bearer) ) -> UserSchema | HTTPException: token = credentials.credentials try: payload = decode_jwt( token=token ) except InvalidTokenError: raise HTTPException( status_code=401, detail="Invalid token error" ) return payload def get_current_auth_user( payload: dict = Depends(get_current_token_payload) ) -> UserSchema | HTTPException: username: str = payload.get("username") if not (user := users_db.get(username)): raise HTTPException( status_code=401, detail="Token invalid" # for real user not found ) return user