Demo
This commit is contained in:
0
app/routers/__init__.py
Normal file
0
app/routers/__init__.py
Normal file
95
app/routers/demo.py
Normal file
95
app/routers/demo.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from app.schemas.user import UserSchema
|
||||
from app.schemas.token import TokenInfo
|
||||
from app.utils.bcrypt_utils import *
|
||||
from app.utils.jwt_utlis import *
|
||||
|
||||
from jwt.exceptions import InvalidTokenError
|
||||
from fastapi import APIRouter, Depends, Form, HTTPException
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
|
||||
http_bearer = HTTPBearer()
|
||||
|
||||
router = APIRouter(prefix="/jwt", tags=["JWT"])
|
||||
|
||||
john = UserSchema(
|
||||
username="john",
|
||||
password=hash_password("qwerty"),
|
||||
email="john@example.com"
|
||||
)
|
||||
|
||||
sam = UserSchema(
|
||||
username="sam",
|
||||
password=hash_password("secret"),
|
||||
)
|
||||
|
||||
users_db: dict[str, UserSchema] = {
|
||||
john.username: john,
|
||||
sam.username: sam
|
||||
}
|
||||
|
||||
def validate_auth_user(
|
||||
username: str = Form(),
|
||||
password: str = Form()
|
||||
):
|
||||
unauthed_exc = HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid username or password"
|
||||
)
|
||||
if not (user := users_db.get(username)):
|
||||
raise unauthed_exc
|
||||
|
||||
if validate_password(
|
||||
password=password,
|
||||
hashed_password=user.password
|
||||
):
|
||||
return user
|
||||
raise unauthed_exc
|
||||
|
||||
def get_current_token_payload(
|
||||
credentials: HTTPAuthorizationCredentials = Depends(http_bearer)
|
||||
) -> UserSchema:
|
||||
token = credentials.credentials
|
||||
try:
|
||||
payload = decode_jwt(
|
||||
token=token
|
||||
)
|
||||
except InvalidTokenError:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Invalid token error"
|
||||
)
|
||||
return payload
|
||||
|
||||
def get_current_auth_user(
|
||||
payload: dict = Depends(get_current_token_payload)
|
||||
) -> UserSchema:
|
||||
username: str = payload.get("username")
|
||||
if not (user := users_db.get(username)):
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Token invalid" # fr user not found
|
||||
)
|
||||
return user
|
||||
|
||||
@router.post("/login/", response_model=TokenInfo)
|
||||
def auth_user_issue_jwt(
|
||||
user: UserSchema = Depends(validate_auth_user)
|
||||
):
|
||||
jwt_payload = {
|
||||
"username": user.username,
|
||||
"email": user.email
|
||||
}
|
||||
token = encode_jwt(jwt_payload)
|
||||
return TokenInfo(
|
||||
access_token=token,
|
||||
token_type="Bearer"
|
||||
)
|
||||
|
||||
@router.get("/users/me/")
|
||||
def auth_user_check_self_info(
|
||||
user: UserSchema = Depends(get_current_auth_user)
|
||||
):
|
||||
return {
|
||||
"username": user.username,
|
||||
"email": user.email
|
||||
}
|
||||
Reference in New Issue
Block a user