life-noc/scripts/generate_services.py

357 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from pathlib import Path
import re
import os
import sys
import yaml
INPUT_FILE = Path("domains.yaml")
OUTPUT_DIR = Path("icinga/services")
SERVICEGROUPS_DIR = Path("icinga/servicegroups")
SERVICEGROUPS_FILE = SERVICEGROUPS_DIR / "life-noc.conf"
HOST_NAME = "life-noc"
SERVICE_TEMPLATE = "service_echeance"
PROBE_TEMPLATE = "service_life_noc_probe"
DEFAULT_MOCK_STATE = "OK"
DEFAULT_MOCK_MESSAGE = "Sous contrôle"
LIFE_NOC_PUBLIC_BASE_URL = os.environ.get("LIFE_NOC_PUBLIC_BASE_URL", "").rstrip("/")
def absolutize_url(url: str) -> str:
url = (url or "").strip()
if not url:
return url
if url.startswith("http://") or url.startswith("https://"):
return url
if LIFE_NOC_PUBLIC_BASE_URL:
if not url.startswith("/"):
url = "/" + url
return LIFE_NOC_PUBLIC_BASE_URL + url
return url
def slugify(value: str) -> str:
value = value.strip().lower()
replacements = {
"à": "a", "â": "a", "ä": "a",
"ç": "c",
"é": "e", "è": "e", "ê": "e", "ë": "e",
"î": "i", "ï": "i",
"ô": "o", "ö": "o",
"ù": "u", "û": "u", "ü": "u",
"ÿ": "y",
"œ": "oe",
"æ": "ae",
"'": "",
"": "",
}
for old, new in replacements.items():
value = value.replace(old, new)
value = re.sub(r"[^a-z0-9\-]+", "-", value)
value = re.sub(r"-{2,}", "-", value)
return value.strip("-")
def aliasify(value: str) -> str:
value = value.strip().upper()
replacements = {
"À": "A", "Â": "A", "Ä": "A",
"Ç": "C",
"É": "E", "È": "E", "Ê": "E", "Ë": "E",
"Î": "I", "Ï": "I",
"Ô": "O", "Ö": "O",
"Ù": "U", "Û": "U", "Ü": "U",
"Ÿ": "Y",
"Œ": "OE",
"Æ": "AE",
"'": "",
"": "",
}
for old, new in replacements.items():
value = value.replace(old, new)
value = re.sub(r"[^A-Z0-9\-]+", "-", value)
value = re.sub(r"-{2,}", "-", value)
return value.strip("-")
def icinga_escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def normalize_mock_state(value: str) -> str:
state = str(value).strip().upper()
allowed = {"OK", "WARNING", "CRITICAL"}
if state not in allowed:
raise ValueError(
f"mock_state invalide: {value}. Valeurs permises: {', '.join(sorted(allowed))}"
)
return state
def render_mock_service(domain: str, group_name: str, item: dict) -> str:
name = item["name"].strip()
date = str(item["date"]).strip()
notes = item.get("notes", "").strip()
notes_url = item.get("notes_url", "").strip()
instructions_url = item.get("instructions_url", "").strip()
if not instructions_url:
instructions_url = f"/life-noc/item/{domain}/{name}"
instructions_url = absolutize_url(instructions_url)
action_url = item.get("action_url", "").strip()
mock_state = normalize_mock_state(item.get("mock_state", DEFAULT_MOCK_STATE))
mock_message = item.get("mock_message", DEFAULT_MOCK_MESSAGE).strip() or DEFAULT_MOCK_MESSAGE
service_name = f"{domain}-{name}"
lines = [
f'apply Service "{icinga_escape(service_name)}" {{',
f' import "{SERVICE_TEMPLATE}"',
f' vars.date_echeance = "{icinga_escape(date)}"',
f' vars.mock_state = "{icinga_escape(mock_state)}"',
f' vars.mock_message = "{icinga_escape(mock_message)}"',
f' groups = [ "{icinga_escape(group_name)}" ]',
]
if notes:
lines.append(f' notes = "{icinga_escape(notes)}"')
if notes_url:
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
if instructions_url:
lines.append(f' vars.instructions_url = "{icinga_escape(instructions_url)}"')
if action_url:
lines.append(f' action_url = "{icinga_escape(action_url)}"')
lines.append(f' assign where host.name == "{HOST_NAME}"')
lines.append("}")
lines.append("")
return "\n".join(lines)
def render_probe_service(domain: str, group_name: str, item: dict) -> str:
name = item["name"].strip()
notes = item.get("notes", "").strip()
notes_url = item.get("notes_url", "").strip()
instructions_url = item.get("instructions_url", "").strip()
if not instructions_url:
instructions_url = f"/life-noc/item/{domain}/{name}"
action_url = item.get("action_url", "").strip()
instructions_url = absolutize_url(instructions_url)
probe = item["probe"]
probe_type = str(probe["type"]).strip()
source = probe["source"]
metric = probe["metric"]
thresholds = probe["thresholds"]
policy = probe.get("policy", {})
service_name = f"{domain}-{name}"
lines = [
f'apply Service "{icinga_escape(service_name)}" {{',
f' import "{PROBE_TEMPLATE}"',
f' vars.life_noc_probe_type = "{icinga_escape(probe_type)}"',
f' vars.life_noc_source_type = "{icinga_escape(str(source["type"]).strip())}"',
f' vars.life_noc_source_value = "{icinga_escape(str(source.get("value", "")).strip())}"',
f' vars.life_noc_inputs_file = "{icinga_escape(str(source.get("inputs_file", "")).strip())}"',
f' vars.life_noc_item_key = "{icinga_escape(str(source.get("item_key", name)).strip())}"',
f' vars.life_noc_metric_unit = "{icinga_escape(str(metric["unit"]).strip())}"',
f' vars.life_noc_label = "{icinga_escape(name)}"',
f' groups = [ "{icinga_escape(group_name)}" ]',
]
if "unknown_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_unknown_lt = "{icinga_escape(str(thresholds["unknown_lt"]))}"')
if "ok_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_ok_gte = "{icinga_escape(str(thresholds["ok_gte"]))}"')
if "warning_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_warning_gte = "{icinga_escape(str(thresholds["warning_gte"]))}"')
if "critical_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_critical_gte = "{icinga_escape(str(thresholds["critical_gte"]))}"')
if "unknown_gte" in thresholds:
lines.append(f' vars.life_noc_threshold_unknown_gte = "{icinga_escape(str(thresholds["unknown_gte"]))}"')
if "ok_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_ok_lt = "{icinga_escape(str(thresholds["ok_lt"]))}"')
if "warning_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_warning_lt = "{icinga_escape(str(thresholds["warning_lt"]))}"')
if "critical_lt" in thresholds:
lines.append(f' vars.life_noc_threshold_critical_lt = "{icinga_escape(str(thresholds["critical_lt"]))}"')
on_error = str(policy.get("on_error", "critical")).strip()
lines.append(f' vars.life_noc_on_error = "{icinga_escape(on_error)}"')
if notes:
lines.append(f' notes = "{icinga_escape(notes)}"')
if notes_url:
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
if instructions_url:
lines.append(f' vars.instructions_url = "{icinga_escape(instructions_url)}"')
if action_url:
lines.append(f' action_url = "{icinga_escape(action_url)}"')
lines.append(f' assign where host.name == "{HOST_NAME}"')
lines.append("}")
lines.append("")
return "\n".join(lines)
def validate_probe(item: dict) -> None:
probe = item["probe"]
if not isinstance(probe, dict):
raise ValueError("probe doit être un objet")
probe_type = str(probe.get("type", "")).strip()
if probe_type not in {"elapsed_time", "days_until_due", "elapsed_distance", "remaining_quantity", "current_value"}:
raise ValueError("seuls probe.type=elapsed_time, days_until_due, elapsed_distance, remaining_quantity et current_value sont supportés")
source = probe.get("source")
if not isinstance(source, dict):
raise ValueError("probe.source doit être un objet")
source_type = str(source.get("type", "")).strip()
has_inline_value = "value" in source
has_store_ref = "inputs_file" in source and "item_key" in source
if probe_type in {"elapsed_time", "days_until_due"}:
if source_type != "manual_date":
raise ValueError("seul probe.source.type=manual_date est supporté pour les sondes de date")
if not has_inline_value and not has_store_ref:
raise ValueError("probe.source.value ou probe.source.inputs_file + item_key requis")
elif probe_type == "elapsed_distance":
if source_type != "manual_counter":
raise ValueError("seul probe.source.type=manual_counter est supporté pour elapsed_distance")
if not has_store_ref:
raise ValueError("probe.source.inputs_file + item_key requis pour elapsed_distance")
elif probe_type == "remaining_quantity":
if source_type != "manual_value":
raise ValueError("seul probe.source.type=manual_value est supporté pour remaining_quantity")
if not has_store_ref:
raise ValueError("probe.source.inputs_file + item_key requis pour remaining_quantity")
elif probe_type == "current_value":
if source_type != "manual_value":
raise ValueError("seul probe.source.type=manual_value est supporté pour current_value")
if not has_store_ref:
raise ValueError("probe.source.inputs_file + item_key requis pour current_value")
metric = probe.get("metric")
if not isinstance(metric, dict):
raise ValueError("probe.metric doit être un objet")
metric_unit = str(metric.get("unit", "")).strip()
if probe_type in {"elapsed_time", "days_until_due"} and metric_unit != "days":
raise ValueError("seul probe.metric.unit=days est supporté pour les sondes de date")
if probe_type == "elapsed_distance" and metric_unit != "km":
raise ValueError("seul probe.metric.unit=km est supporté pour elapsed_distance")
if probe_type == "remaining_quantity" and metric_unit not in {"%", "units", "litres"}:
raise ValueError("probe.metric.unit doit être %, units ou litres pour remaining_quantity")
if probe_type == "current_value" and metric_unit not in {"%", "units", "litres", "volts", "watts"}:
raise ValueError("probe.metric.unit doit être %, units, litres, volts ou watts pour current_value")
thresholds = probe.get("thresholds")
if not isinstance(thresholds, dict):
raise ValueError("probe.thresholds doit être un objet")
def render_servicegroup(group_name: str) -> str:
return "\n".join([
f'object ServiceGroup "{icinga_escape(group_name)}" {{',
f' display_name = "{icinga_escape(group_name)}"',
"}",
"",
])
def main() -> int:
if not INPUT_FILE.exists():
print(f"Erreur: fichier introuvable: {INPUT_FILE}", file=sys.stderr)
return 1
with INPUT_FILE.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict) or "domains" not in data:
print("Erreur: le YAML doit contenir une clé racine 'domains'.", file=sys.stderr)
return 1
domains = data["domains"]
if not isinstance(domains, dict):
print("Erreur: 'domains' doit être un objet YAML.", file=sys.stderr)
return 1
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
SERVICEGROUPS_DIR.mkdir(parents=True, exist_ok=True)
for existing_conf in OUTPUT_DIR.glob("*.conf"):
existing_conf.unlink()
if SERVICEGROUPS_FILE.exists():
SERVICEGROUPS_FILE.unlink()
generated_files = []
servicegroup_content = [
"/*",
" AUTO-GENERATED FILE - SERVICE GROUPS",
" Ne pas modifier manuellement.",
" Source: domains.yaml",
"*/",
"",
]
for raw_domain, services in domains.items():
domain = slugify(str(raw_domain))
group_name = aliasify(str(raw_domain))
if not isinstance(services, list):
print(f"Erreur: le domaine '{raw_domain}' doit contenir une liste.", file=sys.stderr)
return 1
output_file = OUTPUT_DIR / f"{domain}.conf"
content = [
"/*",
f" AUTO-GENERATED FILE - DOMAIN: {raw_domain}",
" Ne pas modifier manuellement.",
" Source: domains.yaml",
"*/",
"",
]
servicegroup_content.append(render_servicegroup(group_name))
for idx, item in enumerate(services, start=1):
if not isinstance(item, dict):
print(f"Erreur: entrée invalide dans '{raw_domain}' à la position {idx}.", file=sys.stderr)
return 1
missing = [key for key in ("name", "date", "notes") if key not in item]
if missing:
print(
f"Erreur: dans le domaine '{raw_domain}', entrée {idx}, champs manquants: {', '.join(missing)}",
file=sys.stderr,
)
return 1
try:
if "probe" in item:
validate_probe(item)
content.append(render_probe_service(domain, group_name, item))
else:
content.append(render_mock_service(domain, group_name, item))
except ValueError as exc:
print(f"Erreur dans '{raw_domain}', entrée {idx}: {exc}", file=sys.stderr)
return 1
output_file.write_text("\n".join(content).rstrip() + "\n", encoding="utf-8")
generated_files.append(str(output_file))
SERVICEGROUPS_FILE.write_text("\n".join(servicegroup_content).rstrip() + "\n", encoding="utf-8")
generated_files.append(str(SERVICEGROUPS_FILE))
print("Fichiers générés :")
for file_path in generated_files:
print(f" - {file_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())