Files
shorefront/backend/app/api/auth.py
Adrian A. Baumann 740983277f
All checks were successful
Build containers when image tags change / build-if-image-changed (backend, shorefront-backend, shorefront backend, backend/Dockerfile, git.baumann.gr/adebaumann/shorefront-backend, .backend.image) (push) Successful in 45s
Build containers when image tags change / build-if-image-changed (frontend, shorefront-frontend, shorefront frontend, frontend/Dockerfile, git.baumann.gr/adebaumann/shorefront-frontend, .frontend.image) (push) Successful in 1m31s
debug: log userinfo keys and groups claim in OIDC callback
2026-03-01 01:17:12 +01:00

75 lines
2.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from authlib.integrations.starlette_client import OAuthError
from app import models, schemas
from app.auth import create_access_token, get_current_user, oauth
from app.database import get_db, settings
router = APIRouter()
FIREWALL_ADMINS_GROUP = "firewall admins"
@router.get("/oidc/login")
async def oidc_login(request: Request) -> RedirectResponse:
return await oauth.keycloak.authorize_redirect(request, settings.keycloak_redirect_uri)
@router.get("/oidc/callback")
async def oidc_callback(request: Request, db: Session = Depends(get_db)) -> RedirectResponse:
try:
token = await oauth.keycloak.authorize_access_token(request)
except OAuthError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
userinfo = token.get("userinfo") or {}
groups = userinfo.get("groups", [])
import logging as _logging
_logging.getLogger("shorefront.auth").warning(
"OIDC callback — userinfo keys: %s | groups claim: %r",
list(userinfo.keys()),
groups,
)
if FIREWALL_ADMINS_GROUP not in groups:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not in firewall admins group")
sub = userinfo["sub"]
email = userinfo.get("email", "")
username = userinfo.get("preferred_username", sub)
user = db.query(models.User).filter(models.User.keycloak_sub == sub).first()
if not user:
user = models.User(
keycloak_sub=sub,
email=email,
username=username,
hashed_password=None,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
access_token = create_access_token(user.id)
response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
samesite="lax",
max_age=3600,
)
return response
@router.post("/logout")
def logout(response: Response) -> dict:
response.delete_cookie("access_token")
return {"message": "Logged out"}
@router.get("/me", response_model=schemas.UserOut)
def me(current_user: models.User = Depends(get_current_user)) -> models.User:
return current_user