From 1afc95fe8b20254c7b9e6eac8d3eb09aab1109b6 Mon Sep 17 00:00:00 2001 From: "Adrian A. Baumann" Date: Sat, 28 Feb 2026 20:02:48 +0100 Subject: [PATCH] feat: add ShorewallGenerator (zones, interfaces, policy, rules, masq, json, zip) --- backend/app/shorewall_generator.py | 74 ++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 backend/app/shorewall_generator.py diff --git a/backend/app/shorewall_generator.py b/backend/app/shorewall_generator.py new file mode 100644 index 0000000..5c1a81c --- /dev/null +++ b/backend/app/shorewall_generator.py @@ -0,0 +1,74 @@ +import io +import zipfile +from datetime import datetime, timezone +from app import models + + +class ShorewallGenerator: + def __init__(self, config: models.Config) -> None: + self._config = config + self._ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + def _header(self, filename: str) -> str: + return ( + f"# {filename} — generated by shorefront " + f"| config: {self._config.name} " + f"| {self._ts}\n" + ) + + def _col(self, *values: str, width: int = 16) -> str: + return "".join(v.ljust(width) for v in values).rstrip() + "\n" + + def zones(self) -> str: + lines = [self._header("zones"), "#ZONE".ljust(16) + "TYPE".ljust(16) + "OPTIONS\n"] + for z in self._config.zones: + lines.append(self._col(z.name, z.type, z.options or "-")) + return "".join(lines) + + def interfaces(self) -> str: + lines = [self._header("interfaces"), "#ZONE".ljust(16) + "INTERFACE".ljust(16) + "OPTIONS\n"] + for iface in self._config.interfaces: + lines.append(self._col(iface.zone.name, iface.name, iface.options or "-")) + return "".join(lines) + + def policy(self) -> str: + lines = [self._header("policy"), "#SOURCE".ljust(16) + "DEST".ljust(16) + "POLICY".ljust(16) + "LOG LEVEL\n"] + for p in sorted(self._config.policies, key=lambda x: x.position): + lines.append(self._col(p.src_zone.name, p.dst_zone.name, p.policy, p.log_level or "-")) + return "".join(lines) + + def rules(self) -> str: + lines = [ + self._header("rules"), + "#ACTION".ljust(16) + "SOURCE".ljust(24) + "DEST".ljust(24) + "PROTO".ljust(10) + "DPORT".ljust(10) + "SPORT\n", + ] + for r in sorted(self._config.rules, key=lambda x: x.position): + src = (r.src_zone.name if r.src_zone else "all") + (f":{r.src_ip}" if r.src_ip else "") + dst = (r.dst_zone.name if r.dst_zone else "all") + (f":{r.dst_ip}" if r.dst_ip else "") + lines.append(self._col(r.action, src, dst, r.proto or "-", r.dport or "-", r.sport or "-", width=16)) + return "".join(lines) + + def masq(self) -> str: + lines = [self._header("masq"), "#INTERFACE".ljust(24) + "SOURCE".ljust(24) + "ADDRESS\n"] + for m in self._config.masq_entries: + lines.append(self._col(m.out_interface, m.source_network, m.to_address or "-", width=24)) + return "".join(lines) + + def as_json(self) -> dict: + return { + "zones": self.zones(), + "interfaces": self.interfaces(), + "policy": self.policy(), + "rules": self.rules(), + "masq": self.masq(), + } + + def as_zip(self) -> bytes: + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("zones", self.zones()) + zf.writestr("interfaces", self.interfaces()) + zf.writestr("policy", self.policy()) + zf.writestr("rules", self.rules()) + zf.writestr("masq", self.masq()) + return buf.getvalue()