backupchecks/containers/backupchecks/src/backend/app/main/routes_cove.py

396 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Cove Data Protection account review routes.
Mirrors the Inbox flow for mail messages:
/cove/accounts list all Cove accounts (unmatched first)
/cove/accounts/<id>/link link an account to an existing or new job
/cove/accounts/<id>/unlink remove the job link
"""
import re
from .routes_shared import * # noqa: F401,F403
from .routes_shared import _log_admin_event
from ..cove_importer import CoveImportError, run_cove_import, DATASOURCE_LABELS as _COVE_DS_LABELS
from ..models import CoveAccount, Customer, Job, JobRun, SystemSettings
_COVE_DATASOURCE_LABELS = {
"D01": "Files & Folders",
"D1": "Files & Folders",
"D02": "System State",
"D2": "System State",
"D06": "Network Shares",
"D6": "Network Shares",
"D10": "VssMsSql",
"D11": "VssSharePoint",
"D19": "M365 Exchange",
"D20": "M365 OneDrive",
"D05": "M365 SharePoint",
"D5": "M365 SharePoint",
"D23": "M365 Teams",
}
_COVE_M365_CODES = {"D19", "D20", "D05", "D5", "D23"}
_COVE_SERVER_CODES = {"D10", "D11"}
def _parse_cove_datasource_codes(raw: str | None) -> list[str]:
"""Extract datasource codes from Cove I78 strings like 'D01D02D10'."""
text = (raw or "").strip().upper()
if not text:
return []
return re.findall(r"D\d{1,2}", text)
def _derive_backup_type_for_account(cove_acc: CoveAccount) -> str:
"""Return Backupchecks-style backup type for a Cove account.
Heuristic:
- M365 datasource present -> Microsoft 365
- Server-specific datasource -> Server
- Otherwise -> Workstation
"""
codes = set(_parse_cove_datasource_codes(getattr(cove_acc, "datasource_types", None)))
if codes.intersection(_COVE_M365_CODES):
return "Microsoft 365"
if codes.intersection(_COVE_SERVER_CODES):
return "Server"
return "Workstation"
def _humanize_datasources(raw: str | None) -> str:
"""Return readable datasource labels from Cove I78 code string."""
labels: list[str] = []
for code in _parse_cove_datasource_codes(raw):
label = _COVE_DATASOURCE_LABELS.get(code, code)
if label not in labels:
labels.append(label)
return ", ".join(labels)
@main_bp.route("/cove/accounts")
@login_required
@roles_required("admin", "operator")
def cove_accounts():
settings = SystemSettings.query.first()
if not settings or not getattr(settings, "cove_enabled", False):
flash("Cove integration is not enabled.", "warning")
return redirect(url_for("main.settings", section="integrations"))
# Unmatched accounts (no job linked) shown first, like Inbox items
unmatched = (
CoveAccount.query
.filter(CoveAccount.job_id.is_(None))
.order_by(CoveAccount.customer_name.asc().nullslast(), CoveAccount.account_name.asc())
.all()
)
# Matched accounts
matched = (
CoveAccount.query
.filter(CoveAccount.job_id.isnot(None))
.order_by(CoveAccount.customer_name.asc().nullslast(), CoveAccount.account_name.asc())
.all()
)
customers = Customer.query.filter_by(active=True).order_by(Customer.name.asc()).all()
customer_rows = [{"id": c.id, "name": c.name} for c in customers]
jobs = Job.query.filter_by(archived=False).order_by(Job.job_name.asc()).all()
for acc in unmatched + matched:
acc.derived_backup_software = "Cove Data Protection"
acc.derived_backup_type = _derive_backup_type_for_account(acc)
acc.derived_job_name = (acc.account_name or acc.computer_name or f"Cove account {acc.account_id}").strip()
acc.datasource_display = _humanize_datasources(acc.datasource_types) or ""
return render_template(
"main/cove_accounts.html",
unmatched=unmatched,
matched=matched,
customers=customer_rows,
jobs=jobs,
settings=settings,
STATUS_LABELS={
1: "In process", 2: "Failed", 3: "Aborted", 5: "Completed",
6: "Interrupted", 7: "Not started", 8: "Completed with errors",
9: "In progress with faults", 10: "Over quota",
11: "No selection", 12: "Restarted",
},
STATUS_CLASS={
1: "warning", 2: "danger", 3: "danger", 5: "success",
6: "danger", 7: "secondary", 8: "warning", 9: "warning",
10: "danger", 11: "warning", 12: "warning",
},
)
@main_bp.route("/cove/accounts/<int:cove_account_db_id>/link", methods=["POST"])
@login_required
@roles_required("admin", "operator")
def cove_account_link(cove_account_db_id: int):
"""Link a Cove account to a job (create a new one or select existing)."""
cove_acc = CoveAccount.query.get_or_404(cove_account_db_id)
action = (request.form.get("action") or "").strip() # "create" or "link"
linked_job_name = ""
if action == "create":
# Create a new job from the Cove account data
customer_id_raw = (request.form.get("customer_id") or "").strip()
if not customer_id_raw:
flash("Please select a customer.", "danger")
return redirect(url_for("main.cove_accounts"))
try:
customer_id = int(customer_id_raw)
except ValueError:
flash("Invalid customer selection.", "danger")
return redirect(url_for("main.cove_accounts"))
customer = Customer.query.get(customer_id)
if not customer:
flash("Customer not found.", "danger")
return redirect(url_for("main.cove_accounts"))
job_name = (cove_acc.account_name or cove_acc.computer_name or f"Cove account {cove_acc.account_id}").strip()
backup_type = _derive_backup_type_for_account(cove_acc)
job = Job(
customer_id=customer.id,
backup_software="Cove Data Protection",
backup_type=backup_type,
job_name=job_name,
cove_account_id=cove_acc.account_id,
active=True,
auto_approve=True,
)
db.session.add(job)
db.session.flush()
cove_acc.job_id = job.id
db.session.commit()
_log_admin_event(
"cove_account_linked",
f"Created job {job.id} and linked Cove account {cove_acc.account_id} ({cove_acc.account_name})",
details=f"customer={customer.name}, job_name={job_name}",
)
linked_job_name = job_name
flash(f"Job '{job_name}' created for customer '{customer.name}'.", "success")
elif action == "link":
# Link to an existing job
job_id_raw = (request.form.get("job_id") or "").strip()
if not job_id_raw:
flash("Please select a job.", "danger")
return redirect(url_for("main.cove_accounts"))
try:
job_id = int(job_id_raw)
except ValueError:
flash("Invalid job selection.", "danger")
return redirect(url_for("main.cove_accounts"))
job = Job.query.get(job_id)
if not job:
flash("Job not found.", "danger")
return redirect(url_for("main.cove_accounts"))
job.cove_account_id = cove_acc.account_id
cove_acc.job_id = job.id
db.session.commit()
_log_admin_event(
"cove_account_linked",
f"Linked Cove account {cove_acc.account_id} ({cove_acc.account_name}) to existing job {job.id}",
details=f"job_name={job.job_name}",
)
linked_job_name = job.job_name or ""
flash(f"Cove account linked to job '{job.job_name}'.", "success")
else:
flash("Unknown action.", "warning")
return redirect(url_for("main.cove_accounts"))
# Trigger an immediate import so the latest Cove run appears right away
# after linking (instead of waiting for the next scheduled/manual import).
settings = SystemSettings.query.first()
if settings and getattr(settings, "cove_enabled", False):
linked_job_id = cove_acc.job_id
before_count = 0
if linked_job_id:
before_count = (
JobRun.query
.filter_by(job_id=linked_job_id, source_type="cove_api")
.count()
)
try:
total, created, skipped, errors = run_cove_import(settings)
after_count = 0
if linked_job_id:
after_count = (
JobRun.query
.filter_by(job_id=linked_job_id, source_type="cove_api")
.count()
)
linked_created = max(after_count - before_count, 0)
_log_admin_event(
"cove_import_after_link",
(
"Triggered immediate Cove import after account link. "
f"accounts={total}, created={created}, skipped={skipped}, errors={errors}"
),
)
if linked_created > 0:
flash(
(
f"Immediate import complete for '{linked_job_name}'. "
f"New linked runs: {linked_created} (accounts: {total}, skipped: {skipped}, errors: {errors})."
),
"success" if errors == 0 else "warning",
)
else:
latest_cove = CoveAccount.query.get(cove_acc.id)
if latest_cove and latest_cove.last_run_at:
reason = (
"latest run seems unchanged (already imported) "
"or Cove has not published a newer session yet"
)
else:
reason = "Cove returned no usable last-session timestamp yet for this account"
flash(
(
f"Immediate import complete for '{linked_job_name}', but no new run was found yet. "
f"Reason: {reason}. (accounts: {total}, skipped: {skipped}, errors: {errors})"
),
"info" if errors == 0 else "warning",
)
except CoveImportError as exc:
_log_admin_event(
"cove_import_after_link_error",
f"Immediate Cove import after account link failed: {exc}",
)
flash(
"Account linked, but immediate import failed. "
"You can run import again from Cove settings.",
"warning",
)
except Exception as exc:
_log_admin_event(
"cove_import_after_link_error",
f"Unexpected immediate Cove import error after account link: {exc}",
)
flash(
"Account linked, but immediate import encountered an unexpected error. "
"You can run import again from Cove settings.",
"warning",
)
return redirect(url_for("main.cove_accounts"))
@main_bp.route("/cove/accounts/<int:cove_account_db_id>/unlink", methods=["POST"])
@login_required
@roles_required("admin", "operator")
def cove_account_unlink(cove_account_db_id: int):
"""Remove the job link from a Cove account (puts it back in the unmatched list)."""
cove_acc = CoveAccount.query.get_or_404(cove_account_db_id)
old_job_id = cove_acc.job_id
if old_job_id:
job = Job.query.get(old_job_id)
if job and job.cove_account_id == cove_acc.account_id:
job.cove_account_id = None
cove_acc.job_id = None
db.session.commit()
_log_admin_event(
"cove_account_unlinked",
f"Unlinked Cove account {cove_acc.account_id} ({cove_acc.account_name}) from job {old_job_id}",
)
flash("Cove account unlinked.", "success")
return redirect(url_for("main.cove_accounts"))
@main_bp.route("/cove/run/<int:run_id>/detail")
@login_required
@roles_required("admin", "operator", "viewer")
def cove_run_detail(run_id: int):
"""Return structured Cove run details as JSON for the job detail popup."""
from ..database import db
from sqlalchemy import text as _sql_text
run = JobRun.query.get_or_404(run_id)
if getattr(run, "source_type", None) != "cove_api":
return jsonify({"status": "error", "message": "Not a Cove run"}), 400
cove_acc = CoveAccount.query.filter_by(job_id=run.job_id).first()
cove_summary = None
if cove_acc:
raw_ds = (cove_acc.datasource_types or "").strip().upper()
ds_codes = re.findall(r"D\d{1,2}", raw_ds) if raw_ds else []
ds_labels: list[str] = []
for code in ds_codes:
lbl = _COVE_DS_LABELS.get(code, code)
if lbl not in ds_labels:
ds_labels.append(lbl)
cove_summary = {
"account_name": cove_acc.account_name or "",
"computer_name": cove_acc.computer_name or "",
"customer_name": cove_acc.customer_name or "",
"datasources": ", ".join(ds_labels) if ds_labels else (raw_ds or ""),
"last_run_at": cove_acc.last_run_at.strftime("%Y-%m-%d %H:%M") if cove_acc.last_run_at else "",
"status": run.status or "",
}
# Per-run datasource objects from run_object_links
obj_rows = db.session.execute(
_sql_text("""
SELECT co.object_name AS name, rol.status, rol.error_message
FROM run_object_links rol
JOIN customer_objects co ON co.id = rol.customer_object_id
WHERE rol.run_id = :run_id
ORDER BY co.object_name ASC
"""),
{"run_id": run_id},
).mappings().all()
objects = [
{
"name": r["name"] or "",
"type": "",
"status": r["status"] or "",
"error_message": r["error_message"] or "",
}
for r in obj_rows
]
account_label = ""
if cove_acc:
account_label = cove_acc.account_name or cove_acc.computer_name or f"account {cove_acc.account_id}"
return jsonify({
"status": "ok",
"meta": {
"subject": account_label,
"from_address": "",
"backup_software": "Cove Data Protection",
"backup_type": "",
"job_name": "",
"overall_status": run.status or "",
"overall_message": run.remark or "",
"customer_name": cove_acc.customer_name if cove_acc else "",
"received_at": run.run_at.strftime("%Y-%m-%d %H:%M") if run.run_at else "",
"parsed_at": "",
"has_eml": False,
},
"cove_summary": cove_summary,
"cloud_connect_summary": None,
"objects": objects,
"body_html": "",
"mail": None,
})