114 lines
3.7 KiB
Python
114 lines
3.7 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Response
|
|
from fastapi.responses import StreamingResponse
|
|
from sqlalchemy.orm import Session, selectinload
|
|
from app import models, schemas
|
|
from app.auth import get_current_user
|
|
from app.database import get_db
|
|
from app.shorewall_generator import ShorewallGenerator
|
|
import io
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def _get_config_or_404(config_id: int, db: Session, user: models.User) -> models.Config:
|
|
config = db.query(models.Config).filter(
|
|
models.Config.id == config_id,
|
|
models.Config.owner_id == user.id,
|
|
).first()
|
|
if not config:
|
|
raise HTTPException(status_code=404, detail="Config not found")
|
|
return config
|
|
|
|
|
|
@router.get("", response_model=list[schemas.ConfigOut])
|
|
def list_configs(
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
) -> list[models.Config]:
|
|
return db.query(models.Config).filter(models.Config.owner_id == current_user.id).all()
|
|
|
|
|
|
@router.post("", response_model=schemas.ConfigOut, status_code=201)
|
|
def create_config(
|
|
body: schemas.ConfigCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
) -> models.Config:
|
|
config = models.Config(**body.model_dump(), owner_id=current_user.id)
|
|
db.add(config)
|
|
db.commit()
|
|
db.refresh(config)
|
|
return config
|
|
|
|
|
|
@router.get("/{config_id}", response_model=schemas.ConfigOut)
|
|
def get_config(
|
|
config_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
) -> models.Config:
|
|
return _get_config_or_404(config_id, db, current_user)
|
|
|
|
|
|
@router.put("/{config_id}", response_model=schemas.ConfigOut)
|
|
def update_config(
|
|
config_id: int,
|
|
body: schemas.ConfigUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
) -> models.Config:
|
|
config = _get_config_or_404(config_id, db, current_user)
|
|
for field, value in body.model_dump(exclude_none=True).items():
|
|
setattr(config, field, value)
|
|
db.commit()
|
|
db.refresh(config)
|
|
return config
|
|
|
|
|
|
@router.delete("/{config_id}", status_code=204)
|
|
def delete_config(
|
|
config_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
) -> None:
|
|
config = _get_config_or_404(config_id, db, current_user)
|
|
db.delete(config)
|
|
db.commit()
|
|
|
|
|
|
@router.post("/{config_id}/generate")
|
|
def generate_config(
|
|
config_id: int,
|
|
format: str = "json",
|
|
db: Session = Depends(get_db),
|
|
current_user: models.User = Depends(get_current_user),
|
|
):
|
|
config = (
|
|
db.query(models.Config)
|
|
.options(
|
|
selectinload(models.Config.zones),
|
|
selectinload(models.Config.interfaces).selectinload(models.Interface.zone),
|
|
selectinload(models.Config.policies).selectinload(models.Policy.src_zone),
|
|
selectinload(models.Config.policies).selectinload(models.Policy.dst_zone),
|
|
selectinload(models.Config.rules).selectinload(models.Rule.src_zone),
|
|
selectinload(models.Config.rules).selectinload(models.Rule.dst_zone),
|
|
selectinload(models.Config.masq_entries),
|
|
)
|
|
.filter(models.Config.id == config_id, models.Config.owner_id == current_user.id)
|
|
.first()
|
|
)
|
|
if not config:
|
|
raise HTTPException(status_code=404, detail="Config not found")
|
|
|
|
generator = ShorewallGenerator(config)
|
|
|
|
if format == "zip":
|
|
zip_bytes = generator.as_zip()
|
|
return StreamingResponse(
|
|
io.BytesIO(zip_bytes),
|
|
media_type="application/zip",
|
|
headers={"Content-Disposition": f"attachment; filename={config.name}-shorewall.zip"},
|
|
)
|
|
|
|
return generator.as_json()
|