feat: rename masq to snat throughout, update generator to Shorewall 5 snat format
This commit is contained in:
@@ -92,7 +92,7 @@ def generate_config(
|
||||
selectinload(models.Config.policies).selectinload(models.Policy.dst_zone),
|
||||
selectinload(models.Config.rules).selectinload(models.Rule.src_zone),
|
||||
selectinload(models.Config.rules).selectinload(models.Rule.dst_zone),
|
||||
selectinload(models.Config.masq_entries),
|
||||
selectinload(models.Config.snat_entries),
|
||||
)
|
||||
.filter(models.Config.id == config_id, models.Config.owner_id == current_user.id)
|
||||
.first()
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from sqlalchemy.orm import Session
|
||||
from app import models, schemas
|
||||
from app.auth import get_current_user
|
||||
from app.database import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def _owner_config(config_id: int, db: Session, user: models.User) -> models.Config:
|
||||
config = db.query(models.Config).filter(
|
||||
models.Config.id == config_id, models.Config.owner_id == user.id
|
||||
).first()
|
||||
if not config:
|
||||
raise HTTPException(status_code=404, detail="Config not found")
|
||||
return config
|
||||
|
||||
|
||||
@router.get("/{config_id}/masq", response_model=list[schemas.MasqOut])
|
||||
def list_masq(config_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
return db.query(models.Masq).filter(models.Masq.config_id == config_id).all()
|
||||
|
||||
|
||||
@router.post("/{config_id}/masq", response_model=schemas.MasqOut, status_code=201)
|
||||
def create_masq(config_id: int, body: schemas.MasqCreate, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
masq = models.Masq(**body.model_dump(), config_id=config_id)
|
||||
db.add(masq)
|
||||
db.commit()
|
||||
db.refresh(masq)
|
||||
return masq
|
||||
|
||||
|
||||
@router.get("/{config_id}/masq/{masq_id}", response_model=schemas.MasqOut)
|
||||
def get_masq(config_id: int, masq_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
masq = db.query(models.Masq).filter(models.Masq.id == masq_id, models.Masq.config_id == config_id).first()
|
||||
if not masq:
|
||||
raise HTTPException(status_code=404, detail="Masq entry not found")
|
||||
return masq
|
||||
|
||||
|
||||
@router.put("/{config_id}/masq/{masq_id}", response_model=schemas.MasqOut)
|
||||
def update_masq(config_id: int, masq_id: int, body: schemas.MasqUpdate, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
masq = db.query(models.Masq).filter(models.Masq.id == masq_id, models.Masq.config_id == config_id).first()
|
||||
if not masq:
|
||||
raise HTTPException(status_code=404, detail="Masq entry not found")
|
||||
for field, value in body.model_dump(exclude_none=True).items():
|
||||
setattr(masq, field, value)
|
||||
db.commit()
|
||||
db.refresh(masq)
|
||||
return masq
|
||||
|
||||
|
||||
@router.delete("/{config_id}/masq/{masq_id}", status_code=204)
|
||||
def delete_masq(config_id: int, masq_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
masq = db.query(models.Masq).filter(models.Masq.id == masq_id, models.Masq.config_id == config_id).first()
|
||||
if not masq:
|
||||
raise HTTPException(status_code=404, detail="Masq entry not found")
|
||||
db.delete(masq)
|
||||
db.commit()
|
||||
64
backend/app/api/snat.py
Normal file
64
backend/app/api/snat.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from sqlalchemy.orm import Session
|
||||
from app import models, schemas
|
||||
from app.auth import get_current_user
|
||||
from app.database import get_db
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
def _owner_config(config_id: int, db: Session, user: models.User) -> models.Config:
|
||||
config = db.query(models.Config).filter(
|
||||
models.Config.id == config_id, models.Config.owner_id == user.id
|
||||
).first()
|
||||
if not config:
|
||||
raise HTTPException(status_code=404, detail="Config not found")
|
||||
return config
|
||||
|
||||
|
||||
@router.get("/{config_id}/snat", response_model=list[schemas.SnatOut])
|
||||
def list_snat(config_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
return db.query(models.Snat).filter(models.Snat.config_id == config_id).all()
|
||||
|
||||
|
||||
@router.post("/{config_id}/snat", response_model=schemas.SnatOut, status_code=201)
|
||||
def create_snat(config_id: int, body: schemas.SnatCreate, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
snat = models.Snat(**body.model_dump(), config_id=config_id)
|
||||
db.add(snat)
|
||||
db.commit()
|
||||
db.refresh(snat)
|
||||
return snat
|
||||
|
||||
|
||||
@router.get("/{config_id}/snat/{snat_id}", response_model=schemas.SnatOut)
|
||||
def get_snat(config_id: int, snat_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
snat = db.query(models.Snat).filter(models.Snat.id == snat_id, models.Snat.config_id == config_id).first()
|
||||
if not snat:
|
||||
raise HTTPException(status_code=404, detail="SNAT entry not found")
|
||||
return snat
|
||||
|
||||
|
||||
@router.put("/{config_id}/snat/{snat_id}", response_model=schemas.SnatOut)
|
||||
def update_snat(config_id: int, snat_id: int, body: schemas.SnatUpdate, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
snat = db.query(models.Snat).filter(models.Snat.id == snat_id, models.Snat.config_id == config_id).first()
|
||||
if not snat:
|
||||
raise HTTPException(status_code=404, detail="SNAT entry not found")
|
||||
for field, value in body.model_dump(exclude_none=True).items():
|
||||
setattr(snat, field, value)
|
||||
db.commit()
|
||||
db.refresh(snat)
|
||||
return snat
|
||||
|
||||
|
||||
@router.delete("/{config_id}/snat/{snat_id}", status_code=204)
|
||||
def delete_snat(config_id: int, snat_id: int, db: Session = Depends(get_db), user: models.User = Depends(get_current_user)):
|
||||
_owner_config(config_id, db, user)
|
||||
snat = db.query(models.Snat).filter(models.Snat.id == snat_id, models.Snat.config_id == config_id).first()
|
||||
if not snat:
|
||||
raise HTTPException(status_code=404, detail="SNAT entry not found")
|
||||
db.delete(snat)
|
||||
db.commit()
|
||||
Reference in New Issue
Block a user