backupchecks/containers/backupchecks/src/backend/app/main/routes_run_checks.py

1612 lines
58 KiB
Python

from __future__ import annotations
import calendar
from datetime import date, datetime, time, timedelta, timezone
from flask import jsonify, render_template, request, url_for
from urllib.parse import urljoin
from flask_login import current_user, login_required
from sqlalchemy import and_, or_, func, text
from .routes_shared import (
_apply_overrides_to_run,
_format_datetime,
_get_ui_timezone,
_get_ui_timezone_name,
_get_or_create_settings,
_infer_schedule_map_from_runs,
_infer_monthly_schedule_from_runs,
_to_amsterdam_date,
main_bp,
roles_required,
get_active_role,
)
from ..database import db
from ..email_utils import extract_best_html_from_eml, is_effectively_blank_html
from ..models import (
Customer,
Job,
JobObject,
JobRun,
JobRunReviewEvent,
MailMessage,
MailObject,
Override,
Ticket,
TicketJobRun,
TicketScope,
User,
)
AUTOTASK_TERMINAL_STATUS_IDS = {5}
def _ensure_internal_ticket_for_autotask(
*,
ticket_number: str,
job: Job | None,
run_ids: list[int],
now: datetime,
) -> Ticket | None:
"""Best-effort: ensure an internal Ticket exists and is linked to the provided runs."""
code = (ticket_number or "").strip().upper()
if not code:
return None
ticket = Ticket.query.filter(Ticket.ticket_code == code).first()
if ticket is None:
# Align with manual ticket creation: active_from_date is today (Amsterdam date).
active_from = _to_amsterdam_date(now) or now.date()
ticket = Ticket(
ticket_code=code,
description="",
active_from_date=active_from,
start_date=now,
)
db.session.add(ticket)
db.session.flush()
# Ensure job scope exists (for Daily Jobs / Job Details filtering), best-effort.
if job is not None and getattr(job, "id", None):
try:
existing = TicketScope.query.filter_by(ticket_id=ticket.id, scope_type="job", job_id=job.id).first()
if existing is None:
db.session.add(
TicketScope(
ticket_id=ticket.id,
scope_type="job",
customer_id=job.customer_id,
backup_software=job.backup_software,
backup_type=job.backup_type,
job_id=job.id,
job_name_match=job.job_name,
job_name_match_mode="exact",
)
)
except Exception:
pass
# Ensure run links.
for rid in run_ids or []:
if rid <= 0:
continue
if not TicketJobRun.query.filter_by(ticket_id=ticket.id, job_run_id=rid).first():
db.session.add(TicketJobRun(ticket_id=ticket.id, job_run_id=rid, link_source="autotask"))
return ticket
def _resolve_internal_ticket_for_job(
*,
ticket: Ticket,
job: Job | None,
run_ids: list[int],
now: datetime,
) -> None:
"""Resolve the ticket (and its job scope) as PSA-driven, best-effort."""
if ticket.resolved_at is None:
ticket.resolved_at = now
if getattr(ticket, "resolved_origin", None) is None:
ticket.resolved_origin = "psa"
# Resolve all still-open scopes.
try:
TicketScope.query.filter_by(ticket_id=ticket.id, resolved_at=None).update({"resolved_at": now})
except Exception:
pass
# Ensure job scope exists and is resolved.
if job is not None and getattr(job, "id", None):
try:
scope = TicketScope.query.filter_by(ticket_id=ticket.id, scope_type="job", job_id=job.id).first()
if scope is None:
scope = TicketScope(
ticket_id=ticket.id,
scope_type="job",
customer_id=job.customer_id,
backup_software=job.backup_software,
backup_type=job.backup_type,
job_id=job.id,
job_name_match=job.job_name,
job_name_match_mode="exact",
resolved_at=now,
)
db.session.add(scope)
else:
if scope.resolved_at is None:
scope.resolved_at = now
except Exception:
pass
# Keep audit links to runs.
for rid in run_ids or []:
if rid <= 0:
continue
if not TicketJobRun.query.filter_by(ticket_id=ticket.id, job_run_id=rid).first():
db.session.add(TicketJobRun(ticket_id=ticket.id, job_run_id=rid, link_source="autotask"))
def _poll_autotask_ticket_states_for_runs(*, run_ids: list[int]) -> None:
"""Phase 2: Read-only PSA-driven ticket completion sync.
Best-effort: never blocks page load.
"""
if not run_ids:
return
settings = _get_or_create_settings()
if not bool(getattr(settings, "autotask_enabled", False)):
return
# Build ticket id -> run ids mapping.
runs = JobRun.query.filter(JobRun.id.in_(run_ids)).all()
ticket_to_runs: dict[int, list[JobRun]] = {}
for r in runs:
tid = getattr(r, "autotask_ticket_id", None)
try:
tid_int = int(tid) if tid is not None else 0
except Exception:
tid_int = 0
if tid_int <= 0:
continue
ticket_to_runs.setdefault(tid_int, []).append(r)
if not ticket_to_runs:
return
try:
client = _build_autotask_client_from_settings()
except Exception:
return
now = datetime.utcnow()
ticket_ids = sorted(ticket_to_runs.keys())
# Optimization: query non-terminal tickets first; fallback to GET by id for missing.
try:
active_items = client.query_tickets_by_ids(ticket_ids, exclude_status_ids=sorted(AUTOTASK_TERMINAL_STATUS_IDS))
except Exception:
active_items = []
active_map: dict[int, dict] = {}
for it in active_items or []:
try:
iid = int(it.get("id") or 0)
except Exception:
iid = 0
if iid > 0:
active_map[iid] = it
missing_ids = [tid for tid in ticket_ids if tid not in active_map]
# Process active tickets: backfill ticket numbers + ensure internal ticket link.
try:
for tid, item in active_map.items():
runs_for_ticket = ticket_to_runs.get(tid) or []
ticket_number = None
if isinstance(item, dict):
ticket_number = item.get("ticketNumber") or item.get("number") or item.get("ticket_number")
# Backfill missing stored ticket number.
if ticket_number:
for rr in runs_for_ticket:
if not (getattr(rr, "autotask_ticket_number", None) or "").strip():
rr.autotask_ticket_number = str(ticket_number).strip()
db.session.add(rr)
# Ensure internal ticket exists and is linked.
tn = (str(ticket_number).strip() if ticket_number else "")
if not tn:
# Try from DB
for rr in runs_for_ticket:
if (getattr(rr, "autotask_ticket_number", None) or "").strip():
tn = rr.autotask_ticket_number.strip()
break
job = Job.query.get(runs_for_ticket[0].job_id) if runs_for_ticket else None
_ensure_internal_ticket_for_autotask(
ticket_number=tn,
job=job,
run_ids=[int(x.id) for x in runs_for_ticket if getattr(x, "id", None)],
now=now,
)
except Exception:
# Continue to missing-id fallback.
pass
# Fallback for missing ids (could be terminal, deleted, or query omission).
for tid in missing_ids:
try:
t = client.get_ticket(tid)
except Exception:
continue
status_id = None
if isinstance(t, dict):
status_id = t.get("status") or t.get("statusId") or t.get("statusID")
try:
status_int = int(status_id) if status_id is not None else 0
except Exception:
status_int = 0
ticket_number = None
if isinstance(t, dict):
ticket_number = t.get("ticketNumber") or t.get("number") or t.get("ticket_number")
runs_for_ticket = ticket_to_runs.get(tid) or []
# Backfill stored ticket number if missing.
if ticket_number:
for rr in runs_for_ticket:
if not (getattr(rr, "autotask_ticket_number", None) or "").strip():
rr.autotask_ticket_number = str(ticket_number).strip()
db.session.add(rr)
job = Job.query.get(runs_for_ticket[0].job_id) if runs_for_ticket else None
tn = (str(ticket_number).strip() if ticket_number else "")
if not tn:
for rr in runs_for_ticket:
if (getattr(rr, "autotask_ticket_number", None) or "").strip():
tn = rr.autotask_ticket_number.strip()
break
internal_ticket = _ensure_internal_ticket_for_autotask(
ticket_number=tn,
job=job,
run_ids=[int(x.id) for x in runs_for_ticket if getattr(x, "id", None)],
now=now,
)
# If terminal in PSA: resolve internally.
if internal_ticket is not None and status_int in AUTOTASK_TERMINAL_STATUS_IDS:
_resolve_internal_ticket_for_job(
ticket=internal_ticket,
job=job,
run_ids=[int(x.id) for x in runs_for_ticket if getattr(x, "id", None)],
now=now,
)
try:
db.session.commit()
except Exception:
db.session.rollback()
def _build_autotask_client_from_settings():
"""Build an AutotaskClient from stored settings or raise a user-safe exception."""
settings = _get_or_create_settings()
if not getattr(settings, "autotask_enabled", False):
raise RuntimeError("Autotask integration is disabled.")
required = [
getattr(settings, "autotask_environment", None),
getattr(settings, "autotask_api_username", None),
getattr(settings, "autotask_api_password", None),
getattr(settings, "autotask_tracking_identifier", None),
]
if any(not (x and str(x).strip()) for x in required):
raise RuntimeError("Autotask settings incomplete.")
from ..integrations.autotask.client import AutotaskClient
return AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=settings.autotask_environment,
)
def _determine_autotask_severity(status_text: str | None) -> str:
s = (status_text or "").strip().lower()
if "warning" in s:
return "warning"
if "error" in s or "fail" in s:
return "error"
if "missed" in s:
return "error"
return "warning"
def _compose_autotask_ticket_description(
*,
settings,
job: Job,
run: JobRun,
status_display: str,
overall_message: str,
objects_payload: list[dict[str, str]],
) -> str:
tz_name = _get_ui_timezone_name() or "Europe/Amsterdam"
run_dt = run.run_at
run_at_str = _format_datetime(run_dt) if run_dt else "-"
base_url = (getattr(settings, "autotask_base_url", None) or "").strip()
job_rel = url_for("main.job_detail", job_id=job.id)
# Link to Job Details with a hint for the specific run.
job_link = urljoin(base_url.rstrip("/") + "/", job_rel.lstrip("/"))
if run.id:
job_link = f"{job_link}?run_id={int(run.id)}"
lines: list[str] = []
lines.append(f"Customer: {job.customer.name if job.customer else ''}")
lines.append(f"Job: {job.job_name or ''}")
lines.append(f"Backup: {job.backup_software or ''} / {job.backup_type or ''}")
lines.append(f"Run at ({tz_name}): {run_at_str}")
lines.append(f"Status: {status_display or ''}")
lines.append("")
overall_message = (overall_message or "").strip()
if overall_message:
lines.append("Summary:")
lines.append(overall_message)
lines.append("")
lines.append("Multiple objects reported messages. See Backupchecks for full details.")
else:
# Fallback to object-level messages with a hard limit.
limit = 10
shown = 0
total = 0
for o in objects_payload or []:
name = (o.get("name") or "").strip()
err = (o.get("error_message") or "").strip()
st = (o.get("status") or "").strip()
if not name:
continue
if not err and not st:
continue
total += 1
if shown >= limit:
continue
msg = err or st
lines.append(f"- {name}: {msg}")
shown += 1
if total == 0:
lines.append("No detailed object messages available. See Backupchecks for full details.")
elif total > shown:
lines.append(f"And {int(total - shown)} additional objects reported similar messages.")
lines.append("")
lines.append(f"Backupchecks details: {job_link}")
return "\n".join(lines).strip() + "\n"
# Grace window for matching real runs to an expected schedule slot.
# A run within +/- 1 hour of the inferred schedule time counts as fulfilling the slot.
MISSED_GRACE_WINDOW = timedelta(hours=1)
def _status_is_success(status: str | None) -> bool:
s = (status or "").strip().lower()
if not s:
return False
if "override" in s:
return True
return "success" in s
def _utc_naive_from_local(dt_local: datetime) -> datetime:
"""Convert a timezone-aware local datetime to UTC naive, matching DB convention."""
if dt_local.tzinfo is None:
return dt_local
return dt_local.astimezone(timezone.utc).replace(tzinfo=None)
def _local_from_utc_naive(dt_utc_naive: datetime) -> datetime:
tz = _get_ui_timezone()
if not tz:
return dt_utc_naive
try:
if dt_utc_naive.tzinfo is None:
dt_utc_naive = dt_utc_naive.replace(tzinfo=timezone.utc)
return dt_utc_naive.astimezone(tz)
except Exception:
return dt_utc_naive
def _parse_hhmm(hhmm: str) -> tuple[int, int] | None:
try:
parts = (hhmm or "").strip().split(":")
if len(parts) < 2:
return None
hh = int(parts[0])
mm = int(parts[1])
if hh < 0 or hh > 23 or mm < 0 or mm > 59:
return None
return hh, mm
except Exception:
return None
def _get_default_missed_start_date() -> date:
# Prefer configured Daily Jobs missed start date.
settings = _get_or_create_settings()
if getattr(settings, "daily_jobs_start_date", None):
return settings.daily_jobs_start_date
# Sensible safety guard: do not generate missed runs for unbounded history.
return (_to_amsterdam_date(datetime.utcnow()) or datetime.utcnow().date()) - timedelta(days=90)
def _ensure_missed_runs_for_job(job: Job, start_from: date, end_inclusive: date) -> int:
"""Generate missed JobRun rows for scheduled slots without a run, so Run Checks can review them.
Returns number of inserted missed runs.
"""
tz = _get_ui_timezone()
schedule_map = _infer_schedule_map_from_runs(job.id) or {}
has_weekly_times = any((schedule_map.get(i) or []) for i in range(7))
monthly = None
if not has_weekly_times:
monthly = _infer_monthly_schedule_from_runs(job.id)
if (not has_weekly_times) and (not monthly):
return 0
today_local = _to_amsterdam_date(datetime.utcnow()) or datetime.utcnow().date()
if end_inclusive > today_local:
end_inclusive = today_local
now_local_dt = datetime.now(tz) if tz else datetime.utcnow()
now_utc_naive = _utc_naive_from_local(now_local_dt)
# Remove any previously generated missed runs in this date window.
# Missed runs must be based on learned schedule from real mail-reported runs.
try:
start_local = datetime.combine(start_from, time.min)
end_local_excl = datetime.combine(end_inclusive + timedelta(days=1), time.min)
if tz:
start_local = start_local.replace(tzinfo=tz)
end_local_excl = end_local_excl.replace(tzinfo=tz)
start_utc_naive = _utc_naive_from_local(start_local)
end_utc_naive_excl = _utc_naive_from_local(end_local_excl)
db.session.query(JobRun).filter(
JobRun.job_id == job.id,
JobRun.missed.is_(True),
JobRun.mail_message_id.is_(None),
JobRun.reviewed_at.is_(None),
JobRun.run_at.isnot(None),
JobRun.run_at >= start_utc_naive,
JobRun.run_at < end_utc_naive_excl,
).delete(synchronize_session=False)
db.session.commit()
except Exception:
db.session.rollback()
inserted = 0
d = start_from
while d <= end_inclusive:
if not has_weekly_times:
break
weekday = d.weekday()
times = schedule_map.get(weekday) or []
if not times:
d = d + timedelta(days=1)
continue
for hhmm in times:
hm = _parse_hhmm(hhmm)
if not hm:
continue
hh, mm = hm
local_dt = datetime.combine(d, time(hour=hh, minute=mm))
if tz:
local_dt = local_dt.replace(tzinfo=tz)
# Only generate missed runs for past slots.
if local_dt > now_local_dt:
continue
slot_utc_naive = _utc_naive_from_local(local_dt)
# Do not mark as missed until the full grace window has passed.
if now_utc_naive <= (slot_utc_naive + MISSED_GRACE_WINDOW):
continue
# Consider any real run near the slot as fulfilling the schedule.
# Also avoid duplicates if a missed run already exists.
window_start = slot_utc_naive - MISSED_GRACE_WINDOW
window_end = slot_utc_naive + MISSED_GRACE_WINDOW
exists = (
db.session.query(JobRun.id)
.filter(
JobRun.job_id == job.id,
JobRun.run_at.isnot(None),
or_(
and_(JobRun.missed.is_(False), JobRun.mail_message_id.isnot(None)),
and_(JobRun.missed.is_(True), JobRun.mail_message_id.is_(None)),
),
JobRun.run_at >= window_start,
JobRun.run_at <= window_end,
)
.first()
)
if exists:
continue
miss = JobRun(
job_id=job.id,
run_at=slot_utc_naive,
status="Missed",
missed=True,
remark=None,
mail_message_id=None,
)
db.session.add(miss)
inserted += 1
d = d + timedelta(days=1)
# Monthly expected slots (fallback when no stable weekly schedule is detected)
if (not has_weekly_times) and monthly:
try:
dom = int(monthly.get("day_of_month") or 0)
except Exception:
dom = 0
times = monthly.get("times") or []
if dom > 0 and times:
# Iterate months in the window [start_from, end_inclusive]
cur = date(start_from.year, start_from.month, 1)
end_marker = date(end_inclusive.year, end_inclusive.month, 1)
while cur <= end_marker:
try:
last_dom = calendar.monthrange(cur.year, cur.month)[1]
except Exception:
last_dom = 28
scheduled_dom = dom if dom <= last_dom else last_dom
scheduled_date = date(cur.year, cur.month, scheduled_dom)
if scheduled_date >= start_from and scheduled_date <= end_inclusive:
for hhmm in times:
hm = _parse_hhmm(hhmm)
if not hm:
continue
hh, mm = hm
local_dt = datetime.combine(scheduled_date, time(hour=hh, minute=mm))
if tz:
local_dt = local_dt.replace(tzinfo=tz)
# Only generate missed runs for past slots.
if local_dt > now_local_dt:
continue
slot_utc_naive = _utc_naive_from_local(local_dt)
# Do not mark as missed until the full grace window has passed.
if now_utc_naive <= (slot_utc_naive + MISSED_GRACE_WINDOW):
continue
window_start = slot_utc_naive - MISSED_GRACE_WINDOW
window_end = slot_utc_naive + MISSED_GRACE_WINDOW
exists = (
db.session.query(JobRun.id)
.filter(
JobRun.job_id == job.id,
JobRun.run_at.isnot(None),
or_(
and_(JobRun.missed.is_(False), JobRun.mail_message_id.isnot(None)),
and_(JobRun.missed.is_(True), JobRun.mail_message_id.is_(None)),
),
JobRun.run_at >= window_start,
JobRun.run_at <= window_end,
)
.first()
)
if exists:
continue
miss = JobRun(
job_id=job.id,
run_at=slot_utc_naive,
status="Missed",
missed=True,
remark=None,
mail_message_id=None,
)
db.session.add(miss)
inserted += 1
# Next month
if cur.month == 12:
cur = date(cur.year + 1, 1, 1)
else:
cur = date(cur.year, cur.month + 1, 1)
if inserted:
db.session.commit()
return inserted
@main_bp.route("/run-checks")
@login_required
@roles_required("admin", "operator")
def run_checks_page():
"""Run Checks page: list jobs that have runs to review (including generated missed runs)."""
include_reviewed = False
if get_active_role() == "admin":
include_reviewed = request.args.get("include_reviewed", "0") in ("1", "true", "yes", "on")
# Generate missed runs since the last review per job so they show up in Run Checks.
# This is intentionally best-effort; any errors should not block page load.
try:
settings_start = _get_default_missed_start_date()
last_reviewed_rows = (
db.session.query(JobRun.job_id, func.max(JobRun.reviewed_at))
.group_by(JobRun.job_id)
.all()
)
last_reviewed_map = {int(jid): (dt if dt else None) for jid, dt in last_reviewed_rows}
jobs = Job.query.filter(Job.archived.is_(False)).all()
today_local = _to_amsterdam_date(datetime.utcnow()) or datetime.utcnow().date()
for job in jobs:
last_rev = last_reviewed_map.get(int(job.id))
if last_rev:
start_date = _to_amsterdam_date(last_rev) or settings_start
else:
start_date = settings_start
if start_date and start_date > today_local:
continue
_ensure_missed_runs_for_job(job, start_date, today_local)
except Exception:
# Don't block the page if missed-run generation fails.
pass
# Phase 2 (read-only PSA driven): sync internal ticket resolved state based on PSA ticket status.
# Best-effort: never blocks page load.
try:
run_q = JobRun.query.filter(JobRun.reviewed_at.is_(None), JobRun.autotask_ticket_id.isnot(None))
run_ids = [int(x) for (x,) in run_q.with_entities(JobRun.id).limit(800).all()]
_poll_autotask_ticket_states_for_runs(run_ids=run_ids)
except Exception:
pass
# Aggregated per-job rows
base = (
db.session.query(
Job.id.label("job_id"),
Job.job_name.label("job_name"),
Job.backup_software.label("backup_software"),
Job.backup_type.label("backup_type"),
Customer.name.label("customer_name"),
)
.select_from(Job)
.outerjoin(Customer, Customer.id == Job.customer_id)
.filter(Job.archived.is_(False))
)
# Runs to show in the overview: unreviewed (or all if admin toggle enabled)
run_filter = []
if not include_reviewed:
run_filter.append(JobRun.reviewed_at.is_(None))
# Last review per job must be derived from reviewed runs (independent of the overview run filter).
# The overview typically shows only unreviewed runs, so using the same filter would always yield NULL.
last_reviewed_ts = (
db.session.query(
JobRun.job_id.label("job_id"),
func.max(JobRun.reviewed_at).label("last_reviewed_at"),
)
.filter(JobRun.reviewed_at.isnot(None))
.group_by(JobRun.job_id)
.subquery()
)
last_reviewed_pick = (
db.session.query(
JobRun.job_id.label("job_id"),
func.max(JobRun.id).label("run_id"),
)
.join(
last_reviewed_ts,
(JobRun.job_id == last_reviewed_ts.c.job_id)
& (JobRun.reviewed_at == last_reviewed_ts.c.last_reviewed_at),
)
.group_by(JobRun.job_id)
.subquery()
)
last_reviewed = (
db.session.query(
JobRun.job_id.label("job_id"),
JobRun.reviewed_at.label("last_reviewed_at"),
JobRun.reviewed_by_user_id.label("last_reviewed_by_user_id"),
)
.join(last_reviewed_pick, JobRun.id == last_reviewed_pick.c.run_id)
.subquery()
)
agg = (
db.session.query(
JobRun.job_id.label("job_id"),
func.count(JobRun.id).label("run_count"),
func.max(func.coalesce(JobRun.run_at, JobRun.created_at)).label("last_run_ts"),
)
.group_by(JobRun.job_id)
)
if run_filter:
agg = agg.filter(*run_filter)
agg = agg.subquery()
q = (
base.join(agg, agg.c.job_id == Job.id)
.outerjoin(last_reviewed, last_reviewed.c.job_id == Job.id)
.add_columns(
last_reviewed.c.last_reviewed_at.label("last_reviewed_at"),
last_reviewed.c.last_reviewed_by_user_id.label("last_reviewed_by_user_id"),
)
)
# Sort for operational review: Customer > Backup > Type > Job
q = q.order_by(
Customer.name.asc().nullslast(),
Job.backup_software.asc().nullslast(),
Job.backup_type.asc().nullslast(),
Job.job_name.asc().nullslast(),
Job.id.asc(),
)
rows = q.limit(2000).all()
# Ensure override flags are up-to-date for the runs shown in this overview.
# The Run Checks modal computes override status on-the-fly, but the overview
# aggregates by persisted JobRun.override_applied. Keep those flags aligned
# so jobs with overridden runs do not stay orange (Warning).
try:
from .routes_shared import _recompute_override_flags_for_runs
job_ids_for_recompute = [int(r.job_id) for r in rows]
if job_ids_for_recompute:
_recompute_override_flags_for_runs(job_ids=job_ids_for_recompute, only_unreviewed=True)
except Exception:
pass
# Per-job status indicators for the overview table (counts per status).
job_ids = [int(r.job_id) for r in rows]
status_map: dict[int, dict[str, int]] = {}
if job_ids:
s_q = (
db.session.query(
JobRun.job_id.label("job_id"),
JobRun.status.label("status"),
JobRun.missed.label("missed"),
JobRun.override_applied.label("override_applied"),
func.count(JobRun.id).label("cnt"),
)
.filter(JobRun.job_id.in_(job_ids))
)
if run_filter:
s_q = s_q.filter(*run_filter)
s_q = s_q.group_by(JobRun.job_id, JobRun.status, JobRun.missed, JobRun.override_applied)
for jid, status, missed, override_applied, cnt in s_q.all():
job_id = int(jid)
label = (status or "").strip() or "Unknown"
if bool(missed) or (label.lower() == "missed"):
label = "Missed"
elif bool(override_applied):
# Keep the label consistent with other pages.
label = "Success (override)"
status_map.setdefault(job_id, {})
status_map[job_id][label] = status_map[job_id].get(label, 0) + int(cnt or 0)
# Map reviewed-by usernames for admins
reviewed_by_map: dict[int, str] = {}
if get_active_role() == "admin":
user_ids = sorted({int(r.last_reviewed_by_user_id) for r in rows if getattr(r, "last_reviewed_by_user_id", None)})
if user_ids:
users = User.query.filter(User.id.in_(user_ids)).all()
reviewed_by_map = {u.id: u.username for u in users}
# Ticket/Remark indicators (active today) for faster reviewing.
today_local = _to_amsterdam_date(datetime.utcnow()) or datetime.utcnow().date()
payload = []
for r in rows:
job_id = int(r.job_id)
# Status indicators for the overview (counts per status).
indicators: list[dict[str, object]] = []
counts = status_map.get(job_id, {})
if counts:
def _dot_for(label: str) -> str:
s = (label or "").strip().lower()
if s == "success":
return "dot-success"
if s == "warning":
return "dot-warning"
if s in ("failed", "error"):
return "dot-failed"
if s == "missed":
return "dot-missed"
if s == "expected":
return "dot-expected"
if "override" in s:
return "dot-override"
return ""
# Keep a stable order (actionable first).
preferred = [
"Failed",
"Error",
"Warning",
"Missed",
"Success",
"Expected",
"Success (override)",
]
seen = set()
for k in preferred:
if k in counts:
indicators.append({"status": k, "count": int(counts.get(k, 0) or 0), "dot": _dot_for(k)})
seen.add(k)
for k in sorted(counts.keys()):
if k not in seen:
indicators.append({"status": k, "count": int(counts.get(k, 0) or 0), "dot": _dot_for(k)})
has_active_ticket = False
has_active_remark = False
ui_tz = _get_ui_timezone_name()
try:
t_exists = db.session.execute(
text(
"""
SELECT 1
FROM tickets t
JOIN ticket_scopes ts ON ts.ticket_id = t.id
WHERE ts.job_id = :job_id
AND t.active_from_date <= :run_date
AND (
COALESCE(ts.resolved_at, t.resolved_at) IS NULL
OR ((COALESCE(ts.resolved_at, t.resolved_at) AT TIME ZONE 'UTC' AT TIME ZONE :ui_tz)::date) >= :run_date
)
LIMIT 1
"""
),
{"job_id": job_id, "run_date": today_local, "ui_tz": ui_tz},
).first()
has_active_ticket = bool(t_exists)
r_exists = db.session.execute(
text(
"""
SELECT 1
FROM remarks r
JOIN remark_scopes rs ON rs.remark_id = r.id
WHERE rs.job_id = :job_id
AND COALESCE(
r.active_from_date,
((r.start_date AT TIME ZONE 'UTC' AT TIME ZONE :ui_tz)::date)
) <= :run_date
AND (
r.resolved_at IS NULL
OR ((r.resolved_at AT TIME ZONE 'UTC' AT TIME ZONE :ui_tz)::date) >= :run_date
)
LIMIT 1
"""
),
{"job_id": job_id, "run_date": today_local, "ui_tz": ui_tz},
).first()
has_active_remark = bool(r_exists)
except Exception:
has_active_ticket = False
has_active_remark = False
last_run_ts = getattr(r, "last_run_ts", None)
last_run_at = _format_datetime(last_run_ts) if last_run_ts else ""
last_reviewed_at = getattr(r, "last_reviewed_at", None)
last_reviewed_by = reviewed_by_map.get(getattr(r, "last_reviewed_by_user_id", None), "")
payload.append(
{
"job_id": job_id,
"customer_name": r.customer_name or "-",
"job_name": r.job_name or "-",
"backup_software": r.backup_software or "-",
"backup_type": r.backup_type or "-",
"run_count": int(getattr(r, "run_count", 0) or 0),
"last_run_at": last_run_at or "-",
"status_counts": status_map.get(job_id, {}),
"status_indicators": indicators,
"has_active_ticket": bool(has_active_ticket),
"has_active_remark": bool(has_active_remark),
"last_reviewed_at": _format_datetime(last_reviewed_at) if (get_active_role() == "admin" and last_reviewed_at) else "",
"last_reviewed_by": last_reviewed_by if get_active_role() == "admin" else "",
}
)
settings = _get_or_create_settings()
autotask_enabled = bool(getattr(settings, "autotask_enabled", False))
return render_template(
"main/run_checks.html",
rows=payload,
is_admin=(get_active_role() == "admin"),
include_reviewed=include_reviewed,
autotask_enabled=autotask_enabled,
)
@main_bp.route("/api/run-checks/details")
@login_required
@roles_required("admin", "operator")
def run_checks_details():
"""Return runs for a job for the Run Checks modal."""
try:
job_id = int(request.args.get("job_id", "0"))
except Exception:
job_id = 0
if job_id <= 0:
return jsonify({"status": "error", "message": "Invalid parameters."}), 400
include_reviewed = False
if get_active_role() == "admin":
include_reviewed = request.args.get("include_reviewed", "0") in ("1", "true", "yes", "on")
job = Job.query.get_or_404(job_id)
q = JobRun.query.filter(JobRun.job_id == job.id)
if not include_reviewed:
q = q.filter(JobRun.reviewed_at.is_(None))
runs = q.order_by(func.coalesce(JobRun.run_at, JobRun.created_at).desc(), JobRun.id.desc()).limit(400).all()
# Prefetch internal ticket resolution info for Autotask-linked runs (Phase 2 UI).
autotask_codes = set()
for _r in runs:
code = (getattr(_r, "autotask_ticket_number", None) or "").strip()
if code:
autotask_codes.add(code)
ticket_by_code = {}
if autotask_codes:
try:
for _t in Ticket.query.filter(Ticket.ticket_code.in_(list(autotask_codes))).all():
ticket_by_code[_t.ticket_code] = _t
except Exception:
ticket_by_code = {}
runs_payload = []
for run in runs:
msg = MailMessage.query.get(run.mail_message_id) if run.mail_message_id else None
mail_meta = None
has_eml = False
body_html = ""
if msg:
mail_meta = {
"from_address": msg.from_address or "",
"subject": msg.subject or "",
"received_at": _format_datetime(msg.received_at),
}
def _is_blank_text(s):
return s is None or (isinstance(s, str) and s.strip() == "")
html_body = getattr(msg, "html_body", None)
text_body = getattr(msg, "text_body", None)
# Keep Run Checks consistent with Inbox/All Mail: if the Graph body is empty but the
# real report is stored as an HTML attachment inside the EML, extract it.
if is_effectively_blank_html(html_body) and _is_blank_text(text_body) and getattr(msg, "eml_blob", None):
extracted = extract_best_html_from_eml(getattr(msg, "eml_blob", None))
if extracted:
html_body = extracted
if not is_effectively_blank_html(html_body):
body_html = html_body
elif not _is_blank_text(text_body):
escaped = (
text_body.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
)
body_html = f"<pre>{escaped}</pre>"
else:
body_html = "<p>No message content stored.</p>"
has_eml = bool(getattr(msg, "eml_stored_at", None))
objects_payload = []
# Preferred: read persisted objects for this run from run_object_links/customer_objects (Step 2).
try:
rows = (
db.session.execute(
text(
"""
SELECT
co.object_name AS name,
rol.status AS status,
rol.error_message AS error_message
FROM run_object_links rol
JOIN customer_objects co ON co.id = rol.customer_object_id
WHERE rol.run_id = :run_id
ORDER BY co.object_name ASC
"""
),
{"run_id": run.id},
)
.mappings()
.all()
)
for rr in rows:
objects_payload.append(
{
"name": rr.get("name") or "",
"type": "",
"status": rr.get("status") or "",
"error_message": rr.get("error_message") or "",
}
)
except Exception:
# Fallback for older data / during upgrades
try:
objects = run.objects.order_by(JobObject.object_name.asc()).all()
except Exception:
objects = list(run.objects or [])
for obj in objects:
objects_payload.append(
{
"name": obj.object_name,
"type": getattr(obj, "object_type", "") or "",
"status": obj.status or "",
"error_message": obj.error_message or "",
}
)
# If no run-linked objects exist yet, fall back to objects parsed/stored on the mail message.
if (not objects_payload) and msg:
try:
for mo in (
MailObject.query.filter_by(mail_message_id=msg.id)
.order_by(MailObject.object_name.asc())
.all()
):
objects_payload.append(
{
"name": mo.object_name or "",
"type": mo.object_type or "",
"status": mo.status or "",
"error_message": mo.error_message or "",
}
)
except Exception:
pass
# Autotask ticket resolution info (derived from internal Ticket)
at_resolved = False
at_resolved_origin = ""
at_resolved_at = ""
try:
_code = (getattr(run, "autotask_ticket_number", None) or "").strip()
if _code and _code in ticket_by_code:
_t = ticket_by_code[_code]
at_resolved = getattr(_t, "resolved_at", None) is not None
at_resolved_origin = (getattr(_t, "resolved_origin", None) or "")
at_resolved_at = _format_datetime(getattr(_t, "resolved_at", None)) if getattr(_t, "resolved_at", None) else ""
except Exception:
pass
status_display = run.status or "-"
try:
status_display, _, _, _ov_id, _ov_reason = _apply_overrides_to_run(job, run)
except Exception:
status_display = run.status or "-"
runs_payload.append(
{
"id": run.id,
"run_at": _format_datetime(run.run_at) if run.run_at else "-",
"status": status_display,
"remark": run.remark or "",
"overall_message": (getattr(msg, "overall_message", None) or "") if msg else "",
"missed": bool(run.missed),
"is_reviewed": bool(run.reviewed_at),
"reviewed_at": _format_datetime(run.reviewed_at) if (get_active_role() == "admin" and run.reviewed_at) else "",
"mail_message_id": run.mail_message_id,
"has_eml": bool(has_eml),
"mail": mail_meta,
"body_html": body_html,
"objects": objects_payload,
"autotask_ticket_id": getattr(run, "autotask_ticket_id", None),
"autotask_ticket_number": getattr(run, "autotask_ticket_number", None) or "",
"autotask_ticket_is_resolved": bool(at_resolved),
"autotask_ticket_resolved_origin": at_resolved_origin,
"autotask_ticket_resolved_at": at_resolved_at,
}
)
job_payload = {
"id": job.id,
"customer_name": job.customer.name if job.customer else "",
"backup_software": job.backup_software or "",
"backup_type": job.backup_type or "",
"job_name": job.job_name or "",
}
if not runs_payload:
return jsonify({"status": "ok", "job": job_payload, "runs": [], "message": "No runs found."})
return jsonify({"status": "ok", "job": job_payload, "runs": runs_payload})
@main_bp.post("/api/run-checks/autotask-ticket")
@login_required
@roles_required("admin", "operator")
def api_run_checks_create_autotask_ticket():
"""Create an Autotask ticket for a specific run.
Enforces: exactly one ticket per run.
"""
data = request.get_json(silent=True) or {}
try:
run_id = int(data.get("run_id") or 0)
except Exception:
run_id = 0
if run_id <= 0:
return jsonify({"status": "error", "message": "Invalid parameters."}), 400
run = JobRun.query.get(run_id)
if not run:
return jsonify({"status": "error", "message": "Run not found."}), 404
# If a ticket is already linked we normally prevent duplicate creation.
# Exception: if the linked ticket is resolved (e.g. resolved by PSA), allow creating a new ticket.
if getattr(run, "autotask_ticket_id", None):
already_resolved = False
try:
code = (getattr(run, "autotask_ticket_number", None) or "").strip()
if code:
t = Ticket.query.filter_by(ticket_code=code).first()
already_resolved = bool(getattr(t, "resolved_at", None)) if t else False
except Exception:
already_resolved = False
if not already_resolved:
return jsonify(
{
"status": "ok",
"ticket_id": int(run.autotask_ticket_id),
"ticket_number": getattr(run, "autotask_ticket_number", None) or "",
"already_exists": True,
}
)
# resolved -> continue, create a new Autotask ticket and overwrite current linkage.
job = Job.query.get(run.job_id)
if not job:
return jsonify({"status": "error", "message": "Job not found."}), 404
customer = Customer.query.get(job.customer_id) if getattr(job, "customer_id", None) else None
if not customer:
return jsonify({"status": "error", "message": "Customer not found."}), 404
if not getattr(customer, "autotask_company_id", None):
return jsonify({"status": "error", "message": "Customer has no Autotask company mapping."}), 400
if (getattr(customer, "autotask_mapping_status", None) or "").strip().lower() not in ("ok", "renamed"):
return jsonify({"status": "error", "message": "Autotask company mapping is not valid."}), 400
settings = _get_or_create_settings()
base_url = (getattr(settings, "autotask_base_url", None) or "").strip()
if not base_url:
return jsonify({"status": "error", "message": "Autotask Base URL is not configured."}), 400
# Required ticket defaults
if not getattr(settings, "autotask_default_queue_id", None):
return jsonify({"status": "error", "message": "Autotask default queue is not configured."}), 400
if not getattr(settings, "autotask_default_ticket_source_id", None):
return jsonify({"status": "error", "message": "Autotask default ticket source is not configured."}), 400
if not getattr(settings, "autotask_default_ticket_status", None):
return jsonify({"status": "error", "message": "Autotask default ticket status is not configured."}), 400
# Determine display status (including overrides) for consistent subject/priority mapping.
status_display = run.status or "-"
try:
status_display, _, _, _ov_id, _ov_reason = _apply_overrides_to_run(job, run)
except Exception:
status_display = run.status or "-"
severity = _determine_autotask_severity(status_display)
priority_id = None
if severity == "warning":
priority_id = getattr(settings, "autotask_priority_warning", None)
else:
priority_id = getattr(settings, "autotask_priority_error", None)
# Load mail + objects for ticket composition.
msg = MailMessage.query.get(run.mail_message_id) if run.mail_message_id else None
overall_message = (getattr(msg, "overall_message", None) or "") if msg else ""
objects_payload: list[dict[str, str]] = []
try:
objs = run.objects.order_by(JobObject.object_name.asc()).all()
except Exception:
objs = list(run.objects or [])
for o in objs or []:
objects_payload.append(
{
"name": getattr(o, "object_name", "") or "",
"type": getattr(o, "object_type", "") or "",
"status": getattr(o, "status", "") or "",
"error_message": getattr(o, "error_message", "") or "",
}
)
if (not objects_payload) and msg:
try:
mos = MailObject.query.filter_by(mail_message_id=msg.id).order_by(MailObject.object_name.asc()).all()
except Exception:
mos = []
for mo in mos or []:
objects_payload.append(
{
"name": getattr(mo, "object_name", "") or "",
"type": getattr(mo, "object_type", "") or "",
"status": getattr(mo, "status", "") or "",
"error_message": getattr(mo, "error_message", "") or "",
}
)
subject = f"[Backupchecks] {customer.name} - {job.job_name or ''} - {status_display}"
description = _compose_autotask_ticket_description(
settings=settings,
job=job,
run=run,
status_display=status_display,
overall_message=overall_message,
objects_payload=objects_payload,
)
payload = {
"companyID": int(customer.autotask_company_id),
"title": subject,
"description": description,
"queueID": int(settings.autotask_default_queue_id),
"source": int(settings.autotask_default_ticket_source_id),
"status": int(settings.autotask_default_ticket_status),
}
if priority_id:
payload["priority"] = int(priority_id)
try:
client = _build_autotask_client_from_settings()
created = client.create_ticket(payload)
except Exception as exc:
return jsonify({"status": "error", "message": f"Autotask ticket creation failed: {exc}"}), 400
ticket_id = created.get("id") if isinstance(created, dict) else None
ticket_number = None
if isinstance(created, dict):
ticket_number = created.get("ticketNumber") or created.get("number") or created.get("ticket_number")
if not ticket_id:
return jsonify({"status": "error", "message": "Autotask did not return a ticket id."}), 400
try:
run.autotask_ticket_id = int(ticket_id)
except Exception:
run.autotask_ticket_id = None
run.autotask_ticket_number = (str(ticket_number).strip() if ticket_number is not None else "") or None
run.autotask_ticket_created_at = datetime.utcnow()
run.autotask_ticket_created_by_user_id = current_user.id
try:
db.session.add(run)
db.session.commit()
except Exception as exc:
db.session.rollback()
return jsonify({"status": "error", "message": f"Failed to store ticket reference: {exc}"}), 500
return jsonify(
{
"status": "ok",
"ticket_id": int(run.autotask_ticket_id) if run.autotask_ticket_id else None,
"ticket_number": run.autotask_ticket_number or "",
"already_exists": False,
}
)
@main_bp.post("/api/run-checks/mark-reviewed")
@login_required
@roles_required("admin", "operator")
def api_run_checks_mark_reviewed():
data = request.get_json(silent=True) or {}
run_ids = data.get("run_ids") or []
job_ids = data.get("job_ids") or []
# Backwards compatible: accept either run_ids or job_ids.
ids: list[int] = []
if job_ids:
try:
ids = [int(x) for x in job_ids]
except Exception:
return jsonify({"status": "error", "message": "Invalid job_ids."}), 400
if not ids:
return jsonify({"status": "ok", "updated": 0, "skipped": 0})
runs = JobRun.query.filter(JobRun.job_id.in_(ids)).all()
else:
try:
run_ids = [int(x) for x in run_ids]
except Exception:
return jsonify({"status": "error", "message": "Invalid run_ids."}), 400
if not run_ids:
return jsonify({"status": "ok", "updated": 0, "skipped": 0})
runs = JobRun.query.filter(JobRun.id.in_(run_ids)).all()
now = datetime.utcnow()
updated = 0
skipped = 0
for run in runs:
if run.reviewed_at is not None:
skipped += 1
continue
run.reviewed_at = now
run.reviewed_by_user_id = current_user.id
db.session.add(
JobRunReviewEvent(
run_id=run.id,
action="REVIEWED",
actor_user_id=current_user.id,
)
)
updated += 1
db.session.commit()
return jsonify({"status": "ok", "updated": updated, "skipped": skipped})
@main_bp.post("/api/run-checks/unmark-reviewed")
@login_required
@roles_required("admin")
def api_run_checks_unmark_reviewed():
data = request.get_json(silent=True) or {}
run_ids = data.get("run_ids") or []
job_ids = data.get("job_ids") or []
note = data.get("note")
runs = []
if job_ids:
try:
job_ids = [int(x) for x in job_ids]
except Exception:
return jsonify({"status": "error", "message": "Invalid job_ids."}), 400
if not job_ids:
return jsonify({"status": "ok", "updated": 0, "skipped": 0})
runs = JobRun.query.filter(JobRun.job_id.in_(job_ids)).all()
else:
try:
run_ids = [int(x) for x in run_ids]
except Exception:
return jsonify({"status": "error", "message": "Invalid run_ids."}), 400
if not run_ids:
return jsonify({"status": "ok", "updated": 0, "skipped": 0})
runs = JobRun.query.filter(JobRun.id.in_(run_ids)).all()
updated = 0
skipped = 0
for run in runs:
if run.reviewed_at is None:
skipped += 1
continue
run.reviewed_at = None
run.reviewed_by_user_id = None
db.session.add(
JobRunReviewEvent(
run_id=run.id,
action="UNREVIEWED",
actor_user_id=current_user.id,
note=(str(note)[:2000] if note else None),
)
)
updated += 1
db.session.commit()
return jsonify({"status": "ok", "updated": updated, "skipped": skipped})
@main_bp.post("/api/run-checks/mark-success-override")
@login_required
@roles_required("admin", "operator")
def api_run_checks_mark_success_override():
"""Create a time-bounded override so the selected run is treated as Success (override)."""
data = request.get_json(silent=True) or {}
try:
run_id = int(data.get("run_id") or 0)
except Exception:
run_id = 0
if run_id <= 0:
return jsonify({"status": "error", "message": "Invalid run_id."}), 400
run = JobRun.query.get_or_404(run_id)
job = Job.query.get_or_404(run.job_id)
# Do not allow overriding a missed placeholder run.
if bool(getattr(run, "missed", False)):
return jsonify({"status": "error", "message": "Missed runs cannot be marked as success."}), 400
# If it is already a success or already overridden, do nothing.
if bool(getattr(run, "override_applied", False)):
return jsonify({"status": "ok", "message": "Already overridden."})
if _status_is_success(getattr(run, "status", None)):
return jsonify({"status": "ok", "message": "Already successful."})
# Build a tight validity window around this run.
run_ts = getattr(run, "run_at", None) or getattr(run, "created_at", None) or datetime.utcnow()
start_at = run_ts - timedelta(minutes=1)
end_at = run_ts + timedelta(minutes=1)
comment = (data.get("comment") or "").strip()
if not comment:
# Keep it short and consistent; Operators will typically include a ticket number separately.
comment = "Marked as success from Run Checks"
comment = comment[:2000]
created_any = False
# Prefer object-level overrides (scoped to this job) to avoid impacting other jobs.
obj_rows = []
try:
obj_rows = (
db.session.execute(
text(
"""
SELECT
co.object_name AS object_name,
rol.status AS status,
rol.error_message AS error_message
FROM run_object_links rol
JOIN customer_objects co ON co.id = rol.customer_object_id
WHERE rol.run_id = :run_id
ORDER BY co.object_name ASC
"""
),
{"run_id": run.id},
)
.mappings()
.all()
)
except Exception:
obj_rows = []
def _obj_is_problem(status: str | None) -> bool:
s = (status or "").strip().lower()
if not s:
return False
if "success" in s:
return False
if "override" in s:
return False
return True
for rr in obj_rows or []:
obj_name = (rr.get("object_name") or "").strip()
obj_status = (rr.get("status") or "").strip()
if (not obj_name) or (not _obj_is_problem(obj_status)):
continue
err = (rr.get("error_message") or "").strip()
ov = Override(
level="object",
job_id=job.id,
object_name=obj_name,
match_status=(obj_status or None),
match_error_contains=(err[:255] if err else None),
match_error_mode=("contains" if err else None),
treat_as_success=True,
active=True,
comment=comment,
created_by=current_user.username,
start_at=start_at,
end_at=end_at,
)
db.session.add(ov)
created_any = True
# If we couldn't build a safe object-scoped override, fall back to a very tight global override.
if not created_any:
match_error_contains = (getattr(run, "remark", None) or "").strip()
if not match_error_contains:
# As a last resort, try to match any error message from legacy objects.
try:
objs = list(run.objects) if hasattr(run, "objects") else []
except Exception:
objs = []
for obj in objs or []:
em = (getattr(obj, "error_message", None) or "").strip()
if em:
match_error_contains = em
break
ov = Override(
level="global",
backup_software=job.backup_software or None,
backup_type=job.backup_type or None,
match_status=(getattr(run, "status", None) or None),
match_error_contains=(match_error_contains[:255] if match_error_contains else None),
match_error_mode=("contains" if match_error_contains else None),
treat_as_success=True,
active=True,
comment=comment,
created_by=current_user.username,
start_at=start_at,
end_at=end_at,
)
db.session.add(ov)
created_any = True
db.session.commit()
# Recompute flags so the overview and modal reflect the override immediately.
try:
from .routes_shared import _recompute_override_flags_for_runs
_recompute_override_flags_for_runs(job_ids=[job.id], start_at=start_at, end_at=end_at, only_unreviewed=False)
except Exception:
pass
return jsonify({"status": "ok", "message": "Override created."})