357 lines
14 KiB
Python
357 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
|
||
from pathlib import Path
|
||
import re
|
||
import os
|
||
import sys
|
||
import yaml
|
||
|
||
INPUT_FILE = Path("domains.yaml")
|
||
OUTPUT_DIR = Path("icinga/services")
|
||
SERVICEGROUPS_DIR = Path("icinga/servicegroups")
|
||
SERVICEGROUPS_FILE = SERVICEGROUPS_DIR / "life-noc.conf"
|
||
HOST_NAME = "life-noc"
|
||
SERVICE_TEMPLATE = "service_echeance"
|
||
PROBE_TEMPLATE = "service_life_noc_probe"
|
||
DEFAULT_MOCK_STATE = "OK"
|
||
DEFAULT_MOCK_MESSAGE = "Sous contrôle"
|
||
|
||
LIFE_NOC_PUBLIC_BASE_URL = os.environ.get("LIFE_NOC_PUBLIC_BASE_URL", "").rstrip("/")
|
||
|
||
def absolutize_url(url: str) -> str:
|
||
url = (url or "").strip()
|
||
if not url:
|
||
return url
|
||
if url.startswith("http://") or url.startswith("https://"):
|
||
return url
|
||
if LIFE_NOC_PUBLIC_BASE_URL:
|
||
if not url.startswith("/"):
|
||
url = "/" + url
|
||
return LIFE_NOC_PUBLIC_BASE_URL + url
|
||
return url
|
||
|
||
def slugify(value: str) -> str:
|
||
value = value.strip().lower()
|
||
replacements = {
|
||
"à": "a", "â": "a", "ä": "a",
|
||
"ç": "c",
|
||
"é": "e", "è": "e", "ê": "e", "ë": "e",
|
||
"î": "i", "ï": "i",
|
||
"ô": "o", "ö": "o",
|
||
"ù": "u", "û": "u", "ü": "u",
|
||
"ÿ": "y",
|
||
"œ": "oe",
|
||
"æ": "ae",
|
||
"'": "",
|
||
"’": "",
|
||
}
|
||
for old, new in replacements.items():
|
||
value = value.replace(old, new)
|
||
value = re.sub(r"[^a-z0-9\-]+", "-", value)
|
||
value = re.sub(r"-{2,}", "-", value)
|
||
return value.strip("-")
|
||
|
||
|
||
def aliasify(value: str) -> str:
|
||
value = value.strip().upper()
|
||
replacements = {
|
||
"À": "A", "Â": "A", "Ä": "A",
|
||
"Ç": "C",
|
||
"É": "E", "È": "E", "Ê": "E", "Ë": "E",
|
||
"Î": "I", "Ï": "I",
|
||
"Ô": "O", "Ö": "O",
|
||
"Ù": "U", "Û": "U", "Ü": "U",
|
||
"Ÿ": "Y",
|
||
"Œ": "OE",
|
||
"Æ": "AE",
|
||
"'": "",
|
||
"’": "",
|
||
}
|
||
for old, new in replacements.items():
|
||
value = value.replace(old, new)
|
||
value = re.sub(r"[^A-Z0-9\-]+", "-", value)
|
||
value = re.sub(r"-{2,}", "-", value)
|
||
return value.strip("-")
|
||
|
||
|
||
def icinga_escape(value: str) -> str:
|
||
return value.replace("\\", "\\\\").replace('"', '\\"')
|
||
|
||
|
||
def normalize_mock_state(value: str) -> str:
|
||
state = str(value).strip().upper()
|
||
allowed = {"OK", "WARNING", "CRITICAL"}
|
||
if state not in allowed:
|
||
raise ValueError(
|
||
f"mock_state invalide: {value}. Valeurs permises: {', '.join(sorted(allowed))}"
|
||
)
|
||
return state
|
||
|
||
|
||
def render_mock_service(domain: str, group_name: str, item: dict) -> str:
|
||
name = item["name"].strip()
|
||
date = str(item["date"]).strip()
|
||
notes = item.get("notes", "").strip()
|
||
notes_url = item.get("notes_url", "").strip()
|
||
instructions_url = item.get("instructions_url", "").strip()
|
||
if not instructions_url:
|
||
instructions_url = f"/life-noc/item/{domain}/{name}"
|
||
instructions_url = absolutize_url(instructions_url)
|
||
action_url = item.get("action_url", "").strip()
|
||
mock_state = normalize_mock_state(item.get("mock_state", DEFAULT_MOCK_STATE))
|
||
mock_message = item.get("mock_message", DEFAULT_MOCK_MESSAGE).strip() or DEFAULT_MOCK_MESSAGE
|
||
|
||
service_name = f"{domain}-{name}"
|
||
|
||
lines = [
|
||
f'apply Service "{icinga_escape(service_name)}" {{',
|
||
f' import "{SERVICE_TEMPLATE}"',
|
||
f' vars.date_echeance = "{icinga_escape(date)}"',
|
||
f' vars.mock_state = "{icinga_escape(mock_state)}"',
|
||
f' vars.mock_message = "{icinga_escape(mock_message)}"',
|
||
f' groups = [ "{icinga_escape(group_name)}" ]',
|
||
]
|
||
|
||
if notes:
|
||
lines.append(f' notes = "{icinga_escape(notes)}"')
|
||
if notes_url:
|
||
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
|
||
if instructions_url:
|
||
lines.append(f' vars.instructions_url = "{icinga_escape(instructions_url)}"')
|
||
if action_url:
|
||
lines.append(f' action_url = "{icinga_escape(action_url)}"')
|
||
|
||
lines.append(f' assign where host.name == "{HOST_NAME}"')
|
||
lines.append("}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def render_probe_service(domain: str, group_name: str, item: dict) -> str:
|
||
name = item["name"].strip()
|
||
notes = item.get("notes", "").strip()
|
||
notes_url = item.get("notes_url", "").strip()
|
||
instructions_url = item.get("instructions_url", "").strip()
|
||
if not instructions_url:
|
||
instructions_url = f"/life-noc/item/{domain}/{name}"
|
||
action_url = item.get("action_url", "").strip()
|
||
instructions_url = absolutize_url(instructions_url)
|
||
probe = item["probe"]
|
||
|
||
probe_type = str(probe["type"]).strip()
|
||
source = probe["source"]
|
||
metric = probe["metric"]
|
||
thresholds = probe["thresholds"]
|
||
policy = probe.get("policy", {})
|
||
|
||
service_name = f"{domain}-{name}"
|
||
|
||
lines = [
|
||
f'apply Service "{icinga_escape(service_name)}" {{',
|
||
f' import "{PROBE_TEMPLATE}"',
|
||
f' vars.life_noc_probe_type = "{icinga_escape(probe_type)}"',
|
||
f' vars.life_noc_source_type = "{icinga_escape(str(source["type"]).strip())}"',
|
||
f' vars.life_noc_source_value = "{icinga_escape(str(source.get("value", "")).strip())}"',
|
||
f' vars.life_noc_inputs_file = "{icinga_escape(str(source.get("inputs_file", "")).strip())}"',
|
||
f' vars.life_noc_item_key = "{icinga_escape(str(source.get("item_key", name)).strip())}"',
|
||
f' vars.life_noc_metric_unit = "{icinga_escape(str(metric["unit"]).strip())}"',
|
||
f' vars.life_noc_label = "{icinga_escape(name)}"',
|
||
f' groups = [ "{icinga_escape(group_name)}" ]',
|
||
]
|
||
|
||
if "unknown_lt" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_unknown_lt = "{icinga_escape(str(thresholds["unknown_lt"]))}"')
|
||
if "ok_gte" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_ok_gte = "{icinga_escape(str(thresholds["ok_gte"]))}"')
|
||
if "warning_gte" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_warning_gte = "{icinga_escape(str(thresholds["warning_gte"]))}"')
|
||
if "critical_gte" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_critical_gte = "{icinga_escape(str(thresholds["critical_gte"]))}"')
|
||
if "unknown_gte" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_unknown_gte = "{icinga_escape(str(thresholds["unknown_gte"]))}"')
|
||
if "ok_lt" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_ok_lt = "{icinga_escape(str(thresholds["ok_lt"]))}"')
|
||
if "warning_lt" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_warning_lt = "{icinga_escape(str(thresholds["warning_lt"]))}"')
|
||
if "critical_lt" in thresholds:
|
||
lines.append(f' vars.life_noc_threshold_critical_lt = "{icinga_escape(str(thresholds["critical_lt"]))}"')
|
||
|
||
on_error = str(policy.get("on_error", "critical")).strip()
|
||
lines.append(f' vars.life_noc_on_error = "{icinga_escape(on_error)}"')
|
||
|
||
if notes:
|
||
lines.append(f' notes = "{icinga_escape(notes)}"')
|
||
if notes_url:
|
||
lines.append(f' notes_url = "{icinga_escape(notes_url)}"')
|
||
if instructions_url:
|
||
lines.append(f' vars.instructions_url = "{icinga_escape(instructions_url)}"')
|
||
if action_url:
|
||
lines.append(f' action_url = "{icinga_escape(action_url)}"')
|
||
|
||
lines.append(f' assign where host.name == "{HOST_NAME}"')
|
||
lines.append("}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def validate_probe(item: dict) -> None:
|
||
probe = item["probe"]
|
||
if not isinstance(probe, dict):
|
||
raise ValueError("probe doit être un objet")
|
||
|
||
probe_type = str(probe.get("type", "")).strip()
|
||
if probe_type not in {"elapsed_time", "days_until_due", "elapsed_distance", "remaining_quantity", "current_value"}:
|
||
raise ValueError("seuls probe.type=elapsed_time, days_until_due, elapsed_distance, remaining_quantity et current_value sont supportés")
|
||
|
||
source = probe.get("source")
|
||
if not isinstance(source, dict):
|
||
raise ValueError("probe.source doit être un objet")
|
||
|
||
source_type = str(source.get("type", "")).strip()
|
||
has_inline_value = "value" in source
|
||
has_store_ref = "inputs_file" in source and "item_key" in source
|
||
|
||
if probe_type in {"elapsed_time", "days_until_due"}:
|
||
if source_type != "manual_date":
|
||
raise ValueError("seul probe.source.type=manual_date est supporté pour les sondes de date")
|
||
if not has_inline_value and not has_store_ref:
|
||
raise ValueError("probe.source.value ou probe.source.inputs_file + item_key requis")
|
||
elif probe_type == "elapsed_distance":
|
||
if source_type != "manual_counter":
|
||
raise ValueError("seul probe.source.type=manual_counter est supporté pour elapsed_distance")
|
||
if not has_store_ref:
|
||
raise ValueError("probe.source.inputs_file + item_key requis pour elapsed_distance")
|
||
elif probe_type == "remaining_quantity":
|
||
if source_type != "manual_value":
|
||
raise ValueError("seul probe.source.type=manual_value est supporté pour remaining_quantity")
|
||
if not has_store_ref:
|
||
raise ValueError("probe.source.inputs_file + item_key requis pour remaining_quantity")
|
||
elif probe_type == "current_value":
|
||
if source_type != "manual_value":
|
||
raise ValueError("seul probe.source.type=manual_value est supporté pour current_value")
|
||
if not has_store_ref:
|
||
raise ValueError("probe.source.inputs_file + item_key requis pour current_value")
|
||
|
||
metric = probe.get("metric")
|
||
if not isinstance(metric, dict):
|
||
raise ValueError("probe.metric doit être un objet")
|
||
metric_unit = str(metric.get("unit", "")).strip()
|
||
if probe_type in {"elapsed_time", "days_until_due"} and metric_unit != "days":
|
||
raise ValueError("seul probe.metric.unit=days est supporté pour les sondes de date")
|
||
if probe_type == "elapsed_distance" and metric_unit != "km":
|
||
raise ValueError("seul probe.metric.unit=km est supporté pour elapsed_distance")
|
||
if probe_type == "remaining_quantity" and metric_unit not in {"%", "units", "litres"}:
|
||
raise ValueError("probe.metric.unit doit être %, units ou litres pour remaining_quantity")
|
||
if probe_type == "current_value" and metric_unit not in {"%", "units", "litres", "volts", "watts"}:
|
||
raise ValueError("probe.metric.unit doit être %, units, litres, volts ou watts pour current_value")
|
||
|
||
thresholds = probe.get("thresholds")
|
||
if not isinstance(thresholds, dict):
|
||
raise ValueError("probe.thresholds doit être un objet")
|
||
|
||
|
||
def render_servicegroup(group_name: str) -> str:
|
||
return "\n".join([
|
||
f'object ServiceGroup "{icinga_escape(group_name)}" {{',
|
||
f' display_name = "{icinga_escape(group_name)}"',
|
||
"}",
|
||
"",
|
||
])
|
||
|
||
|
||
def main() -> int:
|
||
if not INPUT_FILE.exists():
|
||
print(f"Erreur: fichier introuvable: {INPUT_FILE}", file=sys.stderr)
|
||
return 1
|
||
|
||
with INPUT_FILE.open("r", encoding="utf-8") as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
if not isinstance(data, dict) or "domains" not in data:
|
||
print("Erreur: le YAML doit contenir une clé racine 'domains'.", file=sys.stderr)
|
||
return 1
|
||
|
||
domains = data["domains"]
|
||
if not isinstance(domains, dict):
|
||
print("Erreur: 'domains' doit être un objet YAML.", file=sys.stderr)
|
||
return 1
|
||
|
||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
SERVICEGROUPS_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
for existing_conf in OUTPUT_DIR.glob("*.conf"):
|
||
existing_conf.unlink()
|
||
|
||
if SERVICEGROUPS_FILE.exists():
|
||
SERVICEGROUPS_FILE.unlink()
|
||
|
||
generated_files = []
|
||
servicegroup_content = [
|
||
"/*",
|
||
" AUTO-GENERATED FILE - SERVICE GROUPS",
|
||
" Ne pas modifier manuellement.",
|
||
" Source: domains.yaml",
|
||
"*/",
|
||
"",
|
||
]
|
||
|
||
for raw_domain, services in domains.items():
|
||
domain = slugify(str(raw_domain))
|
||
group_name = aliasify(str(raw_domain))
|
||
|
||
if not isinstance(services, list):
|
||
print(f"Erreur: le domaine '{raw_domain}' doit contenir une liste.", file=sys.stderr)
|
||
return 1
|
||
|
||
output_file = OUTPUT_DIR / f"{domain}.conf"
|
||
content = [
|
||
"/*",
|
||
f" AUTO-GENERATED FILE - DOMAIN: {raw_domain}",
|
||
" Ne pas modifier manuellement.",
|
||
" Source: domains.yaml",
|
||
"*/",
|
||
"",
|
||
]
|
||
|
||
servicegroup_content.append(render_servicegroup(group_name))
|
||
|
||
for idx, item in enumerate(services, start=1):
|
||
if not isinstance(item, dict):
|
||
print(f"Erreur: entrée invalide dans '{raw_domain}' à la position {idx}.", file=sys.stderr)
|
||
return 1
|
||
|
||
missing = [key for key in ("name", "date", "notes") if key not in item]
|
||
if missing:
|
||
print(
|
||
f"Erreur: dans le domaine '{raw_domain}', entrée {idx}, champs manquants: {', '.join(missing)}",
|
||
file=sys.stderr,
|
||
)
|
||
return 1
|
||
|
||
try:
|
||
if "probe" in item:
|
||
validate_probe(item)
|
||
content.append(render_probe_service(domain, group_name, item))
|
||
else:
|
||
content.append(render_mock_service(domain, group_name, item))
|
||
except ValueError as exc:
|
||
print(f"Erreur dans '{raw_domain}', entrée {idx}: {exc}", file=sys.stderr)
|
||
return 1
|
||
|
||
output_file.write_text("\n".join(content).rstrip() + "\n", encoding="utf-8")
|
||
generated_files.append(str(output_file))
|
||
|
||
SERVICEGROUPS_FILE.write_text("\n".join(servicegroup_content).rstrip() + "\n", encoding="utf-8")
|
||
generated_files.append(str(SERVICEGROUPS_FILE))
|
||
|
||
print("Fichiers générés :")
|
||
for file_path in generated_files:
|
||
print(f" - {file_path}")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|