95 lines
2.4 KiB
Python
95 lines
2.4 KiB
Python
from app.schemas.user import UserSchema
|
|
from app.schemas.token import TokenInfo
|
|
from app.utils.bcrypt_utils import *
|
|
from app.utils.jwt_utlis import *
|
|
|
|
from jwt.exceptions import InvalidTokenError
|
|
from fastapi import APIRouter, Depends, Form, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
|
|
http_bearer = HTTPBearer()
|
|
|
|
router = APIRouter(prefix="/jwt", tags=["JWT"])
|
|
|
|
john = UserSchema(
|
|
username="john",
|
|
password=hash_password("qwerty"),
|
|
email="john@example.com"
|
|
)
|
|
|
|
sam = UserSchema(
|
|
username="sam",
|
|
password=hash_password("secret"),
|
|
)
|
|
|
|
users_db: dict[str, UserSchema] = {
|
|
john.username: john,
|
|
sam.username: sam
|
|
}
|
|
|
|
def validate_auth_user(username: str = Form(), password: str = Form()) -> UserSchema | HTTPException:
|
|
|
|
if not (user := users_db.get(username)):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid username or password"
|
|
)
|
|
|
|
is_passwd_valid = validate_password(password=password, hashed_password=user.password)
|
|
|
|
if not is_passwd_valid:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid username or password"
|
|
)
|
|
|
|
return user
|
|
|
|
def get_current_token_payload(
|
|
credentials: HTTPAuthorizationCredentials = Depends(http_bearer)
|
|
) -> UserSchema:
|
|
token = credentials.credentials
|
|
try:
|
|
payload = decode_jwt(
|
|
token=token
|
|
)
|
|
except InvalidTokenError:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid token error"
|
|
)
|
|
return payload
|
|
|
|
def get_current_auth_user(
|
|
payload: dict = Depends(get_current_token_payload)
|
|
) -> UserSchema:
|
|
username: str = payload.get("username")
|
|
if not (user := users_db.get(username)):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Token invalid" # fr user not found
|
|
)
|
|
return user
|
|
|
|
@router.post("/login/", response_model=TokenInfo)
|
|
def auth_user_issue_jwt(
|
|
user: UserSchema = Depends(validate_auth_user)
|
|
):
|
|
jwt_payload = {
|
|
"username": user.username,
|
|
"email": user.email
|
|
}
|
|
token = encode_jwt(jwt_payload)
|
|
return TokenInfo(
|
|
access_token=token,
|
|
token_type="Bearer"
|
|
)
|
|
|
|
@router.get("/users/me/")
|
|
def auth_user_check_self_info(
|
|
user: UserSchema = Depends(get_current_auth_user)
|
|
):
|
|
return {
|
|
"username": user.username,
|
|
"email": user.email
|
|
} |