Files
shorefront/backend/app/api/configs.py

155 lines
5.2 KiB
Python

from typing import Literal, Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session, selectinload
from app import models, schemas
from app.auth import get_current_user, get_optional_user
from app.database import get_db
from app.shorewall_generator import ShorewallGenerator
import io
import secrets
router = APIRouter()
def _get_config_or_404(config_id: int, db: Session, user: models.User) -> models.Config:
config = db.query(models.Config).filter(
models.Config.id == config_id,
models.Config.owner_id == user.id,
).first()
if not config:
raise HTTPException(status_code=404, detail="Config not found")
return config
@router.get("", response_model=list[schemas.ConfigOut])
def list_configs(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> list[models.Config]:
return db.query(models.Config).filter(models.Config.owner_id == current_user.id).all()
@router.post("", response_model=schemas.ConfigOut, status_code=201)
def create_config(
body: schemas.ConfigCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> models.Config:
config = models.Config(**body.model_dump(), owner_id=current_user.id)
db.add(config)
db.commit()
db.refresh(config)
return config
@router.get("/{config_id}", response_model=schemas.ConfigOut)
def get_config(
config_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> models.Config:
return _get_config_or_404(config_id, db, current_user)
@router.put("/{config_id}", response_model=schemas.ConfigOut)
def update_config(
config_id: int,
body: schemas.ConfigUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> models.Config:
config = _get_config_or_404(config_id, db, current_user)
for field, value in body.model_dump(exclude_none=True).items():
setattr(config, field, value)
db.commit()
db.refresh(config)
return config
@router.delete("/{config_id}", status_code=204)
def delete_config(
config_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> None:
config = _get_config_or_404(config_id, db, current_user)
db.delete(config)
db.commit()
@router.post("/{config_id}/generate")
def generate_config(
config_id: int,
format: Literal["json", "zip"] = "json",
body: Optional[schemas.GenerateRequest] = None,
db: Session = Depends(get_db),
current_user: Optional[models.User] = Depends(get_optional_user),
):
token = body.token if body else None
# Determine access: OIDC session or matching download token
if current_user is not None:
# Authenticated via OIDC — enforce owner check
config = (
db.query(models.Config)
.filter(models.Config.id == config_id, models.Config.owner_id == current_user.id)
.first()
)
if not config:
raise HTTPException(status_code=404, detail="Config not found")
elif token:
# Token auth — no owner filter, just match token
config = (
db.query(models.Config)
.filter(models.Config.id == config_id, models.Config.download_token == token)
.first()
)
if not config:
raise HTTPException(status_code=401, detail="Invalid token")
else:
raise HTTPException(status_code=401, detail="Authentication required")
# Eagerly load relationships
config = (
db.query(models.Config)
.options(
selectinload(models.Config.zones),
selectinload(models.Config.interfaces).selectinload(models.Interface.zone),
selectinload(models.Config.policies).selectinload(models.Policy.src_zone),
selectinload(models.Config.policies).selectinload(models.Policy.dst_zone),
selectinload(models.Config.rules).selectinload(models.Rule.src_zone),
selectinload(models.Config.rules).selectinload(models.Rule.dst_zone),
selectinload(models.Config.snat_entries),
selectinload(models.Config.host_entries).selectinload(models.Host.zone),
selectinload(models.Config.params),
)
.filter(models.Config.id == config_id)
.first()
)
generator = ShorewallGenerator(config)
if format == "zip":
zip_bytes = generator.as_zip()
return StreamingResponse(
io.BytesIO(zip_bytes),
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={config.name}-shorewall.zip"},
)
return generator.as_json()
@router.post("/{config_id}/regenerate-token", response_model=schemas.RegenerateTokenOut)
def regenerate_token(
config_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user),
) -> dict:
config = _get_config_or_404(config_id, db, current_user)
config.download_token = secrets.token_urlsafe(32)
db.commit()
db.refresh(config)
return {"download_token": config.download_token}