from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Response from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session, selectinload from app import models, schemas from app.auth import get_current_user, get_optional_user from app.database import get_db from app.shorewall_generator import ShorewallGenerator import io import secrets router = APIRouter() def _get_config_or_404(config_id: int, db: Session, user: models.User) -> models.Config: config = db.query(models.Config).filter( models.Config.id == config_id, models.Config.owner_id == user.id, ).first() if not config: raise HTTPException(status_code=404, detail="Config not found") return config @router.get("", response_model=list[schemas.ConfigOut]) def list_configs( db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> list[models.Config]: return db.query(models.Config).filter(models.Config.owner_id == current_user.id).all() @router.post("", response_model=schemas.ConfigOut, status_code=201) def create_config( body: schemas.ConfigCreate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> models.Config: config = models.Config(**body.model_dump(), owner_id=current_user.id) db.add(config) db.commit() db.refresh(config) return config @router.get("/{config_id}", response_model=schemas.ConfigOut) def get_config( config_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> models.Config: return _get_config_or_404(config_id, db, current_user) @router.put("/{config_id}", response_model=schemas.ConfigOut) def update_config( config_id: int, body: schemas.ConfigUpdate, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> models.Config: config = _get_config_or_404(config_id, db, current_user) for field, value in body.model_dump(exclude_none=True).items(): setattr(config, field, value) db.commit() db.refresh(config) return config @router.delete("/{config_id}", status_code=204) def delete_config( config_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> None: config = _get_config_or_404(config_id, db, current_user) db.delete(config) db.commit() @router.post("/{config_id}/generate") def generate_config( config_id: int, format: str = "json", body: Optional[schemas.GenerateRequest] = None, db: Session = Depends(get_db), current_user: Optional[models.User] = Depends(get_optional_user), ): token = body.token if body else None # Determine access: OIDC session or matching download token if current_user is not None: # Authenticated via OIDC — enforce owner check config = ( db.query(models.Config) .filter(models.Config.id == config_id, models.Config.owner_id == current_user.id) .first() ) if not config: raise HTTPException(status_code=404, detail="Config not found") elif token: # Token auth — no owner filter, just match token config = ( db.query(models.Config) .filter(models.Config.id == config_id, models.Config.download_token == token) .first() ) if not config: raise HTTPException(status_code=401, detail="Invalid token") else: raise HTTPException(status_code=401, detail="Authentication required") # Eagerly load relationships config = ( db.query(models.Config) .options( selectinload(models.Config.zones), selectinload(models.Config.interfaces).selectinload(models.Interface.zone), selectinload(models.Config.policies).selectinload(models.Policy.src_zone), selectinload(models.Config.policies).selectinload(models.Policy.dst_zone), selectinload(models.Config.rules).selectinload(models.Rule.src_zone), selectinload(models.Config.rules).selectinload(models.Rule.dst_zone), selectinload(models.Config.snat_entries), selectinload(models.Config.host_entries).selectinload(models.Host.zone), selectinload(models.Config.params), ) .filter(models.Config.id == config_id) .first() ) generator = ShorewallGenerator(config) if format == "zip": zip_bytes = generator.as_zip() return StreamingResponse( io.BytesIO(zip_bytes), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={config.name}-shorewall.zip"}, ) return generator.as_json() @router.post("/{config_id}/regenerate-token", response_model=schemas.RegenerateTokenOut) def regenerate_token( config_id: int, db: Session = Depends(get_db), current_user: models.User = Depends(get_current_user), ) -> dict: config = _get_config_or_404(config_id, db, current_user) config.download_token = secrets.token_urlsafe(32) db.commit() db.refresh(config) return {"download_token": config.download_token}