86 lines
2.2 KiB
Python
86 lines
2.2 KiB
Python
from app.schemas.user import UserSchema
|
|
|
|
from app.db.demo import users_db
|
|
|
|
from app.utils.bcrypt_utils import validate_password, hash_password
|
|
from app.utils.jwt_utlis import decode_jwt
|
|
|
|
|
|
from fastapi import Form, HTTPException, Depends
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
|
|
from jwt.exceptions import InvalidTokenError
|
|
|
|
http_bearer = HTTPBearer()
|
|
|
|
def validate_auth_user(
|
|
username: str = Form(),
|
|
password: str = Form()
|
|
) -> UserSchema | HTTPException:
|
|
|
|
if not (user := users_db.get(username)):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid username or password"
|
|
)
|
|
|
|
is_password_valid = validate_password(password=password, hashed_password=user.password)
|
|
|
|
if not is_password_valid:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid username or password")
|
|
return user
|
|
|
|
def validate_register_user(
|
|
username: str = Form(),
|
|
password: str = Form(),
|
|
confirm_password: str = Form()
|
|
) -> UserSchema | HTTPException:
|
|
|
|
if users_db.get(username):
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="User with this name already exists"
|
|
)
|
|
|
|
if password != confirm_password:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Passwords don't match"
|
|
)
|
|
|
|
user = UserSchema(
|
|
username=username,
|
|
password=hash_password(password)
|
|
)
|
|
|
|
return user
|
|
|
|
def get_current_token_payload(
|
|
credentials: HTTPAuthorizationCredentials = Depends(http_bearer)
|
|
) -> UserSchema | HTTPException:
|
|
|
|
token = credentials.credentials
|
|
try:
|
|
payload = decode_jwt(
|
|
token=token
|
|
)
|
|
except InvalidTokenError:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid token error"
|
|
)
|
|
return payload
|
|
|
|
def get_current_auth_user(
|
|
payload: dict = Depends(get_current_token_payload)
|
|
) -> UserSchema | HTTPException:
|
|
|
|
username: str = payload.get("username")
|
|
if not (user := users_db.get(username)):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Token invalid" # for real user not found
|
|
)
|
|
return user |