life-noc/scripts/generate_services.py.bak
2026-03-15 14:43:50 -04:00

307 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from pathlib import Path
import re
import sys
import yaml
INPUT_FILE = Path("domains.yaml")
OUTPUT_DIR = Path("icinga/services")
SERVICEGROUPS_DIR = Path("icinga/servicegroups")
SERVICEGROUPS_FILE = SERVICEGROUPS_DIR / "life-noc.conf"
HOST_NAME = "life-noc"
SERVICE_TEMPLATE = "service_echeance"
PROBE_TEMPLATE = "service_life_noc_probe"
DEFAULT_MOCK_STATE = "OK"
DEFAULT_MOCK_MESSAGE = "Sous contrôle"
def slugify(value: str) -> str:
value = value.strip().lower()
replacements = {
"à": "a", "â": "a", "ä": "a",
"ç": "c",
"é": "e", "è": "e", "ê": "e", "ë": "e",
"î": "i", "ï": "i",
"ô": "o", "ö": "o",
"ù": "u", "û": "u", "ü": "u",
"ÿ": "y",
"œ": "oe",
"æ": "ae",
"'": "",
"": "",
}
for old, new in replacements.items():
value = value.replace(old, new)
value = re.sub(r"[^a-z0-9\-]+", "-", value)
value = re.sub(r"-{2,}", "-", value)
return value.strip("-")
def aliasify(value: str) -> str:
value = value.strip().upper()
replacements = {
"À": "A", "Â": "A", "Ä": "A",
"Ç": "C",
"É": "E", "È": "E", "Ê": "E", "Ë": "E",
"Î": "I", "Ï": "I",
"Ô": "O", "Ö": "O",
"Ù": "U", "Û": "U", "Ü": "U",
"Ÿ": "Y",
"Œ": "OE",
"Æ": "AE",
"'": "",
"": "",
}
for old, new in replacements.items():
value = value.replace(old, new)
value = re.sub(r"[^A-Z0-9\-]+", "-", value)
value = re.sub(r"-{2,}", "-", value)
return value.strip("-")
def icinga_escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def normalize_mock_state(value: str) -> str:
state = str(value).strip().upper()
allowed = {"OK", "WARNING", "CRITICAL"}
if state not in allowed:
raise ValueError(
f"mock_state invalide: {value}. Valeurs permises: {', '.join(sorted(allowed))}"
)
return state
def render_mock_service(domain: str, group_name: str, item: dict) -> str:
name = item["name"].strip()
date = str(item["date"]).strip()
notes = item.get("notes", "").strip()
notes_url = item.get("notes_url", "").strip()
action_url = item.get("action_url", "").strip()
mock_state = normalize_mock_state(item.get("mock_state", DEFAULT_MOCK_STATE))
mock_message = item.get("mock_message", DEFAULT_MOCK_MESSAGE).strip() or DEFAULT_MOCK_MESSAGE
service_name = f"{domain}-{name}"
lines = [
f'apply Service "{icinga_escape(service_name)}" {{',
f' import "{SERVICE_TEMPLATE}"',
f' vars.date_echeance = "{icinga_escape(date)}"',
f' vars.mock_state = "{icinga_escape(mock_state)}"',
f' vars.mock_message = "{icinga_escape(mock_message)}"',
f' groups = [ "{icinga_escape(group_name)}" ]',
]
if notes:
lines.append(f' notes = "{icinga_escape(notes)}"')
if notes_url:
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
if action_url:
lines.append(f' action_url = "{icinga_escape(action_url)}"')
lines.append(f' assign where host.name == "{HOST_NAME}"')
lines.append("}")
lines.append("")
return "\n".join(lines)
def render_probe_service(domain: str, group_name: str, item: dict) -> str:
name = item["name"].strip()
notes = item.get("notes", "").strip()
notes_url = item.get("notes_url", "").strip()
action_url = item.get("action_url", "").strip()
probe = item["probe"]
probe_type = str(probe["type"]).strip()
source = probe["source"]
metric = probe["metric"]
thresholds = probe["thresholds"]
policy = probe.get("policy", {})
service_name = f"{domain}-{name}"
lines = [
f'apply Service "{icinga_escape(service_name)}" {{',
f' import "{PROBE_TEMPLATE}"',
f' vars.life_noc_probe_type = "{icinga_escape(probe_type)}"',
f' vars.life_noc_source_type = "{icinga_escape(str(source["type"]).strip())}"',
f' vars.life_noc_source_value = "{icinga_escape(str(source.get("value", "")).strip())}"',
f' vars.life_noc_inputs_file = "{icinga_escape(str(source.get("inputs_file", "")).strip())}"',
f' vars.life_noc_item_key = "{icinga_escape(str(source.get("item_key", name)).strip())}"',
f' vars.life_noc_metric_unit = "{icinga_escape(str(metric["unit"]).strip())}"',
f' vars.life_noc_label = "{icinga_escape(name)}"',
f' groups = [ "{icinga_escape(group_name)}" ]',
]
if "unknown_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_unknown_lt = "{icinga_escape(str(thresholds["unknown_lt"]))}"')
if "ok_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_ok_gte = "{icinga_escape(str(thresholds["ok_gte"]))}"')
if "warning_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_warning_gte = "{icinga_escape(str(thresholds["warning_gte"]))}"')
if "critical_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_critical_gte = "{icinga_escape(str(thresholds["critical_gte"]))}"')
if "unknown_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_unknown_gte = "{icinga_escape(str(thresholds["unknown_gte"]))}"')
if "ok_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_ok_lt = "{icinga_escape(str(thresholds["ok_lt"]))}"')
if "warning_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_warning_lt = "{icinga_escape(str(thresholds["warning_lt"]))}"')
if "critical_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_critical_lt = "{icinga_escape(str(thresholds["critical_lt"]))}"')
on_error = str(policy.get("on_error", "critical")).strip()
lines.append(f' vars.life_noc_on_error = "{icinga_escape(on_error)}"')
if notes:
lines.append(f' notes = "{icinga_escape(notes)}"')
if notes_url:
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
if action_url:
lines.append(f' action_url = "{icinga_escape(action_url)}"')
lines.append(f' assign where host.name == "{HOST_NAME}"')
lines.append("}")
lines.append("")
return "\n".join(lines)
def validate_probe(item: dict) -> None:
probe = item["probe"]
if not isinstance(probe, dict):
raise ValueError("probe doit être un objet")
probe_type = str(probe.get("type", "")).strip()
if probe_type not in {"elapsed_time", "days_until_due"}:
raise ValueError("seuls probe.type=elapsed_time et days_until_due sont supportés")
source = probe.get("source")
if not isinstance(source, dict):
raise ValueError("probe.source doit être un objet")
if str(source.get("type", "")).strip() != "manual_date":
raise ValueError("seul probe.source.type=manual_date est supporté en v1")
has_inline_value = "value" in source
has_store_ref = "inputs_file" in source and "item_key" in source
if not has_inline_value and not has_store_ref:
raise ValueError("probe.source.value ou probe.source.inputs_file + item_key requis")
metric = probe.get("metric")
if not isinstance(metric, dict):
raise ValueError("probe.metric doit être un objet")
if str(metric.get("unit", "")).strip() != "days":
raise ValueError("seul probe.metric.unit=days est supporté en v1")
thresholds = probe.get("thresholds")
if not isinstance(thresholds, dict):
raise ValueError("probe.thresholds doit être un objet")
def render_servicegroup(group_name: str) -> str:
return "\n".join([
f'object ServiceGroup "{icinga_escape(group_name)}" {{',
f' display_name = "{icinga_escape(group_name)}"',
"}",
"",
])
def main() -> int:
if not INPUT_FILE.exists():
print(f"Erreur: fichier introuvable: {INPUT_FILE}", file=sys.stderr)
return 1
with INPUT_FILE.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict) or "domains" not in data:
print("Erreur: le YAML doit contenir une clé racine 'domains'.", file=sys.stderr)
return 1
domains = data["domains"]
if not isinstance(domains, dict):
print("Erreur: 'domains' doit être un objet YAML.", file=sys.stderr)
return 1
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
SERVICEGROUPS_DIR.mkdir(parents=True, exist_ok=True)
for existing_conf in OUTPUT_DIR.glob("*.conf"):
existing_conf.unlink()
if SERVICEGROUPS_FILE.exists():
SERVICEGROUPS_FILE.unlink()
generated_files = []
servicegroup_content = [
"/*",
" AUTO-GENERATED FILE - SERVICE GROUPS",
" Ne pas modifier manuellement.",
" Source: domains.yaml",
"*/",
"",
]
for raw_domain, services in domains.items():
domain = slugify(str(raw_domain))
group_name = aliasify(str(raw_domain))
if not isinstance(services, list):
print(f"Erreur: le domaine '{raw_domain}' doit contenir une liste.", file=sys.stderr)
return 1
output_file = OUTPUT_DIR / f"{domain}.conf"
content = [
"/*",
f" AUTO-GENERATED FILE - DOMAIN: {raw_domain}",
" Ne pas modifier manuellement.",
" Source: domains.yaml",
"*/",
"",
]
servicegroup_content.append(render_servicegroup(group_name))
for idx, item in enumerate(services, start=1):
if not isinstance(item, dict):
print(f"Erreur: entrée invalide dans '{raw_domain}' à la position {idx}.", file=sys.stderr)
return 1
missing = [key for key in ("name", "date", "notes") if key not in item]
if missing:
print(
f"Erreur: dans le domaine '{raw_domain}', entrée {idx}, champs manquants: {', '.join(missing)}",
file=sys.stderr,
)
return 1
try:
if "probe" in item:
validate_probe(item)
content.append(render_probe_service(domain, group_name, item))
else:
content.append(render_mock_service(domain, group_name, item))
except ValueError as exc:
print(f"Erreur dans '{raw_domain}', entrée {idx}: {exc}", file=sys.stderr)
return 1
output_file.write_text("\n".join(content).rstrip() + "\n", encoding="utf-8")
generated_files.append(str(output_file))
SERVICEGROUPS_FILE.write_text("\n".join(servicegroup_content).rstrip() + "\n", encoding="utf-8")
generated_files.append(str(SERVICEGROUPS_FILE))
print("Fichiers générés :")
for file_path in generated_files:
print(f" - {file_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())