from datetime import datetime, timedelta, timezone from typing import Optional from jose import JWTError, jwt from authlib.integrations.starlette_client import OAuth from fastapi import Cookie, HTTPException, status, Depends from sqlalchemy.orm import Session from app.database import get_db, settings from app import models oauth = OAuth() oauth.register( name="keycloak", client_id=settings.keycloak_client_id, client_secret=settings.keycloak_client_secret, server_metadata_url=( f"{settings.keycloak_url}/realms/{settings.keycloak_realm}" "/.well-known/openid-configuration" ), client_kwargs={"scope": "openid email profile"}, ) def create_access_token(user_id: int) -> str: expire = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_expire_minutes) return jwt.encode( {"sub": str(user_id), "exp": expire}, settings.jwt_secret_key, algorithm=settings.jwt_algorithm, ) def get_current_user( access_token: Optional[str] = Cookie(default=None), db: Session = Depends(get_db), ) -> models.User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", ) if not access_token: raise credentials_exception try: payload = jwt.decode(access_token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm]) user_id: str = payload.get("sub") if user_id is None: raise credentials_exception except JWTError: raise credentials_exception user = db.get(models.User, int(user_id)) if user is None or not user.is_active: raise credentials_exception return user def get_optional_user( access_token: Optional[str] = Cookie(default=None), db: Session = Depends(get_db), ) -> Optional[models.User]: if not access_token: return None try: return get_current_user(access_token=access_token, db=db) except HTTPException: return None