34 lines
1.3 KiB
Python
34 lines
1.3 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from ..models import User, engine
|
|
from sqlmodel import Session, select
|
|
import jwt
|
|
from ..settings import SECRET_KEY
|
|
security = HTTPBearer()
|
|
|
|
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
|
|
token = credentials.credentials
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
|
with Session(engine) as session:
|
|
query = select(User).where(User.username == payload["username"]).limit(1)
|
|
user: User = session.exec(query).first()
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code = status.HTTP_401_UNAUTHORIZED,
|
|
detail="Credenciales invalidas"
|
|
)
|
|
if user.password_version != payload["pwd_v"]:
|
|
raise HTTPException(
|
|
status_code = status.HTTP_401_UNAUTHORIZED,
|
|
detail = "Credenciales invalidas"
|
|
)
|
|
return user
|
|
|
|
def get_staff_user(user: User = Depends(get_current_user)) -> User:
|
|
if not user.is_staff:
|
|
raise HTTPException(
|
|
status_code = status.HTTP_403_FORBIDDEN,
|
|
detail = "This user needs to be an Staff to access this resource"
|
|
)
|
|
return user
|