Files
vgui-cicd/dokumente/management/commands/import-document.py

355 lines
15 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Document/management/commands/import_standard.py
import re
from pathlib import Path
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from dokumente.models import (
Dokument,
Dokumententyp,
Thema,
Vorgabe,
VorgabeKurztext,
VorgabeLangtext,
Geltungsbereich,
Einleitung, # <-- make sure this model exists as a Textabschnitt subclass
Checklistenfrage,
)
from abschnitte.models import AbschnittTyp
from stichworte.models import Stichwort
class Command(BaseCommand):
help = (
"Import a policy document from a structured text file.\n"
"Supports Einleitung, Geltungsbereich, Vorgaben (Kurztext/Langtext with AbschnittTyp), "
"Stichworte (comma-separated), Checklistenfragen, dry-run, verbose, and purge."
)
def add_arguments(self, parser):
parser.add_argument("file_path", type=str, help="Path to the plaintext file")
parser.add_argument("--nummer", required=True, help="Document number (e.g., STD-001)")
parser.add_argument("--name", required=True, help='Document name (e.g., "IT-Sicherheit Container")')
parser.add_argument("--dokumententyp", required=True, help='Dokumententyp name (e.g., "IT-Sicherheit")')
parser.add_argument("--gueltigkeit_von", default=None, help="Start date (YYYY-MM-DD)")
parser.add_argument("--gueltigkeit_bis", default=None, help="End date (YYYY-MM-DD)")
parser.add_argument("--dry-run", action="store_true", help="Perform a dry run without saving to DB")
parser.add_argument("--verbose", action="store_true", help="Verbose output for dry run")
parser.add_argument("--purge", action="store_true", help="Delete existing Einleitung/Geltungsbereich/Vorgaben first")
# normalize header: "liste-ungeordnet" -> "liste ungeordnet"
@staticmethod
def _norm_header(h: str) -> str:
return h.lower().replace("-", " ").strip()
def handle(self, *args, **options):
dry_run = options["dry_run"]
verbose = options["verbose"]
purge = options["purge"]
file_path = Path(options["file_path"])
if not file_path.exists():
raise CommandError(f"File {file_path} does not exist")
nummer = options["nummer"]
name = options["name"]
dokumententyp_name = options["dokumententyp"]
try:
dokumententyp = Dokumententyp.objects.get(name=dokumententyp_name)
except Dokumententyp.DoesNotExist:
raise CommandError(f"Dokumententyp '{dokumententyp_name}' does not exist")
if dry_run:
self.stdout.write(self.style.WARNING("Dry run: no database changes will be made."))
# get or create Document (we want a real instance even in purge to count existing rows)
standard, created = Dokument.objects.get_or_create(
nummer=nummer,
defaults={
"dokumententyp": dokumententyp,
"name": name,
"gueltigkeit_von": options["gueltigkeit_von"],
"gueltigkeit_bis": options["gueltigkeit_bis"],
"aktiv":False,
},
)
if created:
self.stdout.write(self.style.SUCCESS(f"Created Document {nummer} {name}"))
else:
self.stdout.write(self.style.WARNING(f"Document {nummer} already exists; content may be updated."))
# purge (Einleitung + Geltungsbereich + Vorgaben cascade)
if purge:
qs_vorgaben = standard.vorgaben.all()
qs_check = Checklistenfrage.objects.filter(vorgabe__in=qs_vorgaben)
qs_kurz = VorgabeKurztext.objects.filter(abschnitt__in=qs_vorgaben)
qs_lang = VorgabeLangtext.objects.filter(abschnitt__in=qs_vorgaben)
qs_gb = Geltungsbereich.objects.filter(geltungsbereich=standard)
qs_einl = Einleitung.objects.filter(einleitung=standard)
c_vorgaben = qs_vorgaben.count()
c_check = qs_check.count()
c_kurz = qs_kurz.count()
c_lang = qs_lang.count()
c_gb = qs_gb.count()
c_einl = qs_einl.count()
if dry_run:
self.stdout.write(self.style.WARNING(
f"[DRY RUN] Would purge: {c_einl} Einleitung-Abschnitte, "
f"{c_gb} Geltungsbereich-Abschnitte, {c_vorgaben} Vorgaben "
f"({c_kurz} Kurztext, {c_lang} Langtext, {c_check} Checklistenfragen)."
))
else:
deleted_einl = qs_einl.delete()[0]
deleted_gb = qs_gb.delete()[0]
deleted_vorgaben = qs_vorgaben.delete()[0]
self.stdout.write(self.style.SUCCESS(
f"Purged {deleted_einl} Einleitung, {deleted_gb} Geltungsbereich, "
f"{deleted_vorgaben} Vorgaben (incl. Kurz/Lang/Checklistenfragen)."
))
# read and split file
content = file_path.read_text(encoding="utf-8")
blocks = re.split(r"^>>>", content, flags=re.MULTILINE)
blocks = [b.strip() for b in blocks if b.strip()]
# state
abschnittstyp_names = {"text", "liste geordnet", "liste ungeordnet"}
current_context = "geltungsbereich" # default before first Vorgabe
einleitung_sections = [] # list of {inhalt, typ: AbschnittTyp}
geltungsbereich_sections = [] # list of {inhalt, typ: AbschnittTyp}
current_vorgabe = None
vorgaben_data = [] # each: {thema, titel, nummer, kurztext:[{inhalt,typ}], langtext:[{inhalt,typ}], stichworte:set(), checkliste:[str]}
for block in blocks:
lines = block.splitlines()
header = lines[0].strip()
text = "\n".join(lines[1:]).strip()
header_norm = self._norm_header(header)
# resolve AbschnittTyp if this is a section block
abschnitt_typ = None
if header_norm in abschnittstyp_names:
try:
abschnitt_typ = AbschnittTyp.objects.get(abschnitttyp=header_norm)
except AbschnittTyp.DoesNotExist:
self.stdout.write(self.style.WARNING(
f"AbschnittTyp '{header_norm}' not found; defaulting to 'text'."
))
abschnitt_typ = AbschnittTyp.objects.get(abschnitttyp="text")
# contexts
if header_norm == "einleitung":
current_context = "einleitung"
continue
if header_norm == "geltungsbereich":
current_context = "geltungsbereich"
continue
if header_norm.startswith("vorgabe"):
# save previous
if current_vorgabe:
vorgaben_data.append(current_vorgabe)
parts = header.split(" ", 1)
thema_name = parts[1].strip() if len(parts) > 1 else ""
current_vorgabe = {
"thema": thema_name,
"titel": "",
"nummer": None,
"kurztext": [],
"langtext": [],
"stichworte": set(),
"checkliste": [],
}
current_context = "vorgabe_none"
continue
if header_norm.startswith("titel") and current_vorgabe:
# inline title or next text block
inline = header[len("Titel"):].strip() if header.startswith("Titel") else ""
title_value = inline or text
current_vorgabe["titel"] = title_value
continue
if header_norm.startswith("nummer") and current_vorgabe:
m = re.search(r"\d+", header)
if m:
current_vorgabe["nummer"] = int(m.group())
current_context = "vorgabe_none"
continue
if header_norm == "kurztext":
current_context = "vorgabe_kurztext"
continue
if header_norm == "langtext":
current_context = "vorgabe_langtext"
continue
if header_norm.startswith("stichworte") and current_vorgabe:
inline = header[len("Stichworte"):].strip() if header.startswith("Stichworte") else ""
kw_str = inline or text
if kw_str:
for k in kw_str.split(","):
k = k.strip()
if k:
current_vorgabe["stichworte"].add(k)
else:
current_context = "vorgabe_stichworte"
continue
if header_norm == "checkliste" and current_vorgabe:
if text:
current_vorgabe["checkliste"].extend([q.strip() for q in text.splitlines() if q.strip()])
else:
current_context = "vorgabe_checkliste"
continue
# Abschnitt content blocks
if header_norm in abschnittstyp_names:
section = {"inhalt": text, "typ": abschnitt_typ}
if current_context == "einleitung":
einleitung_sections.append(section)
if dry_run and verbose:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Einleitung Abschnitt ({section['typ']}): {text[:50]}..."
))
elif current_context == "geltungsbereich":
geltungsbereich_sections.append(section)
if dry_run and verbose:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Geltungsbereich Abschnitt ({section['typ']}): {text[:50]}..."
))
elif current_context == "vorgabe_kurztext" and current_vorgabe:
current_vorgabe["kurztext"].append(section)
if dry_run and verbose:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Vorgabe {current_vorgabe.get('nummer')} Kurztext ({section['typ']}): {text[:50]}..."
))
elif current_context == "vorgabe_langtext" and current_vorgabe:
current_vorgabe["langtext"].append(section)
if dry_run and verbose:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Vorgabe {current_vorgabe.get('nummer')} Langtext ({section['typ']}): {text[:50]}..."
))
elif current_context == "vorgabe_stichworte" and current_vorgabe:
for k in text.split(","):
k = k.strip()
if k:
current_vorgabe["stichworte"].add(k)
current_context = "vorgabe_none"
elif current_context == "vorgabe_checkliste" and current_vorgabe:
current_vorgabe["checkliste"].extend([q.strip() for q in text.splitlines() if q.strip()])
current_context = "vorgabe_none"
# append last vorgabe
if current_vorgabe:
vorgaben_data.append(current_vorgabe)
# === SAVE: Einleitung ===
for sektion in einleitung_sections:
if dry_run:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Would create Einleitung Abschnitt ({sektion['typ']}): {sektion['inhalt'][:50]}..."
))
else:
Einleitung.objects.create(
einleitung=standard,
abschnitttyp=sektion["typ"],
inhalt=sektion["inhalt"],
)
# === SAVE: Geltungsbereich ===
for sektion in geltungsbereich_sections:
if dry_run:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Would create Geltungsbereich Abschnitt ({sektion['typ']}): {sektion['inhalt'][:50]}..."
))
else:
Geltungsbereich.objects.create(
geltungsbereich=standard,
abschnitttyp=sektion["typ"],
inhalt=sektion["inhalt"],
)
# === SAVE: Vorgaben and children ===
for v in vorgaben_data:
try:
thema = Thema.objects.get(name=v["thema"])
except Thema.DoesNotExist:
self.stdout.write(self.style.WARNING(
f"Thema '{v['thema']}' not found, skipping Vorgabe {v['nummer']}"
))
continue
if dry_run:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Would create Vorgabe {v['nummer']}: '{v['titel']}' (Thema: {v['thema']})"
))
if v["stichworte"]:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Stichworte: {', '.join(sorted(v['stichworte']))}"
))
if v["checkliste"]:
for frage in v["checkliste"]:
self.stdout.write(self.style.SUCCESS(f"[DRY RUN] Checkliste: {frage}"))
for s in v["kurztext"]:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Kurztext ({s['typ']}): {s['inhalt'][:50]}..."
))
for s in v["langtext"]:
self.stdout.write(self.style.SUCCESS(
f"[DRY RUN] Langtext ({s['typ']}): {s['inhalt'][:50]}..."
))
else:
vorgabe = Vorgabe.objects.create(
nummer=v["nummer"],
dokument=standard,
thema=thema,
titel=v["titel"],
gueltigkeit_von=timezone.now().date(),
order=0,
)
# Stichworte
for kw in sorted(v["stichworte"]):
stw, _ = Stichwort.objects.get_or_create(stichwort=kw)
vorgabe.stichworte.add(stw)
# Checklistenfragen
for frage in v["checkliste"]:
Checklistenfrage.objects.create(vorgabe=vorgabe, frage=frage)
# Kurztext sections
for s in v["kurztext"]:
VorgabeKurztext.objects.create(
abschnitt=vorgabe,
abschnitttyp=s["typ"],
inhalt=s["inhalt"],
)
# Langtext sections
for s in v["langtext"]:
VorgabeLangtext.objects.create(
abschnitt=vorgabe,
abschnitttyp=s["typ"],
inhalt=s["inhalt"],
)
self.stdout.write(self.style.SUCCESS(
"Dry run complete" if dry_run else f"Imported document {nummer} {name} with {len(vorgaben_data)} Vorgaben"
))