from fastapi import APIRouter, Depends, HTTPException, Request, Response, status from fastapi.responses import RedirectResponse from sqlalchemy.orm import Session from authlib.integrations.starlette_client import OAuthError from app import models, schemas from app.auth import create_access_token, get_current_user, oauth from app.database import get_db, settings router = APIRouter() FIREWALL_ADMINS_GROUP = "firewall admins" @router.get("/oidc/login") async def oidc_login(request: Request) -> RedirectResponse: return await oauth.keycloak.authorize_redirect(request, settings.keycloak_redirect_uri) @router.get("/oidc/callback") async def oidc_callback(request: Request, db: Session = Depends(get_db)) -> RedirectResponse: try: token = await oauth.keycloak.authorize_access_token(request) except OAuthError as e: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) userinfo = token.get("userinfo") or {} groups = userinfo.get("groups", []) import logging as _logging, base64 as _b64, json as _json _log = _logging.getLogger("shorefront.auth") _log.warning("userinfo keys: %s | groups: %r", list(userinfo.keys()), groups) _raw = token.get("id_token", "") if _raw: try: _payload = _raw.split(".")[1] _payload += "=" * (-len(_payload) % 4) _id_claims = _json.loads(_b64.b64decode(_payload)) _log.warning("ID token claims: %s", _id_claims) except Exception as _e: _log.warning("Could not decode id_token: %s", _e) if FIREWALL_ADMINS_GROUP not in groups: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not in firewall admins group") sub = userinfo["sub"] email = userinfo.get("email", "") username = userinfo.get("preferred_username", sub) user = db.query(models.User).filter(models.User.keycloak_sub == sub).first() if not user: user = models.User( keycloak_sub=sub, email=email, username=username, hashed_password=None, is_active=True, ) db.add(user) db.commit() db.refresh(user) access_token = create_access_token(user.id) response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) response.set_cookie( key="access_token", value=access_token, httponly=True, samesite="lax", max_age=3600, ) return response @router.post("/logout") def logout(response: Response) -> dict: response.delete_cookie("access_token") return {"message": "Logged out"} @router.get("/me", response_model=schemas.UserOut) def me(current_user: models.User = Depends(get_current_user)) -> models.User: return current_user