backupchecks/containers/backupchecks/src/backend/app/parsers/ntfs_auditing.py

63 lines
1.8 KiB
Python

from __future__ import annotations
import re
from typing import Dict, Tuple, List
from ..models import MailMessage
def _normalize_subject(subject: str) -> str:
# Some senders use underscores as spaces in the subject.
s = (subject or "").strip()
s = s.replace("_", " ")
s = re.sub(r"\s+", " ", s)
return s.strip()
def try_parse_ntfs_auditing(msg: MailMessage) -> Tuple[bool, Dict, List[Dict]]:
"""Parse NTFS Auditing "Audit" report e-mails.
Example subjects (decoded):
- "Bouter btr-dc002.bouter.nl file audits → 0 ↑ 0"
- "Bouter btr-dc001.bouter.nl file audits → 6 ↑ 12"
Notes:
- These mails do not include per-object rows that should be tracked.
- The job does not provide a meaningful overall status; we store it as Success.
- Missing runs are handled by the existing scheduler/missing logic (no mail received).
"""
subject_raw = (msg.subject or "").strip()
if not subject_raw:
return False, {}, []
# Basic sender sanity check (but keep it tolerant)
from_addr = (msg.from_address or "").lower()
if from_addr and "auditing@" not in from_addr and "file auditing" not in from_addr:
# Not our sender
return False, {}, []
subject = _normalize_subject(subject_raw)
# Find the host part between "Bouter" and "file audits".
m = re.search(r"\bBouter\b\s+(?P<host>[^\s]+)\s+file\s+audits\b", subject, flags=re.IGNORECASE)
if not m:
return False, {}, []
host = m.group("host").strip()
if not host:
return False, {}, []
job_name = f"{host} file audits"
result = {
"backup_software": "NTFS Auditing",
"backup_type": "Audit",
"job_name": job_name,
"overall_status": "Success",
"overall_message": None,
}
# There are no objects in this mail.
return True, result, []