backupchecks/containers/backupchecks/src/backend/app/cove_importer.py

533 lines
17 KiB
Python

"""Cove Data Protection API importer.
Fetches backup job run data from the Cove (N-able) API.
Flow (mirrors the mail Inbox flow):
1. All Cove accounts are upserted into the `cove_accounts` staging table.
2. Accounts without a linked job appear on the Cove Accounts page where
an admin can create or link a job (same as approving a mail from Inbox).
3. For accounts that have a linked job, a JobRun is created per new session
(deduplicated via external_id).
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from typing import Any
import requests
from sqlalchemy import text
from .database import db
logger = logging.getLogger(__name__)
COVE_DEFAULT_URL = "https://api.backup.management/jsonapi"
# Columns to request from EnumerateAccountStatistics
COVE_COLUMNS = [
"I1", # Account/device name
"I18", # Computer name
"I8", # Customer / partner name
"I78", # Active datasource label
"D09F00", # Overall last session status
"D09F09", # Last successful session timestamp
"D09F15", # Last session end timestamp
"D09F08", # 28-day colorbar
# Datasource-specific status (F00) and last session time (F15)
"D1F00", "D1F15", # Files & Folders
"D10F00", "D10F15", # VssMsSql
"D11F00", "D11F15", # VssSharePoint
"D19F00", "D19F15", # M365 Exchange
"D20F00", "D20F15", # M365 OneDrive
"D5F00", "D5F15", # M365 SharePoint
"D23F00", "D23F15", # M365 Teams
]
# Mapping from Cove status code to Backupchecks status string
STATUS_MAP: dict[int, str] = {
1: "Warning", # In process
2: "Error", # Failed
3: "Error", # Aborted
5: "Success", # Completed
6: "Error", # Interrupted
7: "Warning", # NotStarted
8: "Warning", # CompletedWithErrors
9: "Warning", # InProgressWithFaults
10: "Error", # OverQuota
11: "Warning", # NoSelection
12: "Warning", # Restarted
}
# Mapping from Cove status code to readable label
STATUS_LABELS: dict[int, str] = {
1: "In process",
2: "Failed",
3: "Aborted",
5: "Completed",
6: "Interrupted",
7: "Not started",
8: "Completed with errors",
9: "In progress with faults",
10: "Over quota",
11: "No selection",
12: "Restarted",
}
# Datasource label mapping (column prefix → human-readable label)
DATASOURCE_LABELS: dict[str, str] = {
"D1": "Files & Folders",
"D10": "VssMsSql",
"D11": "VssSharePoint",
"D19": "M365 Exchange",
"D20": "M365 OneDrive",
"D5": "M365 SharePoint",
"D23": "M365 Teams",
}
class CoveImportError(Exception):
"""Raised when Cove API interaction fails."""
def _cove_login(url: str, username: str, password: str) -> tuple[str, int]:
"""Login to the Cove API and return (visa, partner_id).
Raises CoveImportError on failure.
"""
payload = {
"jsonrpc": "2.0",
"id": "jsonrpc",
"method": "Login",
"params": {
"username": username,
"password": password,
},
}
try:
resp = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as exc:
raise CoveImportError(f"Cove login request failed: {exc}") from exc
except ValueError as exc:
raise CoveImportError(f"Cove login response is not valid JSON: {exc}") from exc
if "error" in data and data["error"]:
error = data["error"]
msg = error.get("message") or str(error) if isinstance(error, dict) else str(error)
raise CoveImportError(f"Cove login failed: {msg}")
# Visa is returned at the top level of the response (not inside result)
visa = data.get("visa") or ""
if not visa:
raise CoveImportError("Cove login succeeded but no visa token returned")
# PartnerId is inside result
result = data.get("result") or {}
partner_id = (
result.get("PartnerId")
or result.get("PartnerID")
or result.get("result", {}).get("PartnerId")
or 0
)
return visa, int(partner_id)
def _cove_enumerate(
url: str,
visa: str,
partner_id: int,
start: int,
count: int,
) -> list[dict]:
"""Call EnumerateAccountStatistics and return a list of account dicts.
Returns empty list when no more results.
"""
payload = {
"jsonrpc": "2.0",
"visa": visa,
"id": "jsonrpc",
"method": "EnumerateAccountStatistics",
"params": {
"query": {
"PartnerId": partner_id,
"StartRecordNumber": start,
"RecordsCount": count,
"Columns": COVE_COLUMNS,
}
},
}
try:
resp = requests.post(
url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=60,
)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as exc:
raise CoveImportError(f"Cove EnumerateAccountStatistics request failed: {exc}") from exc
except ValueError as exc:
raise CoveImportError(f"Cove EnumerateAccountStatistics response is not valid JSON: {exc}") from exc
if "error" in data and data["error"]:
error = data["error"]
msg = error.get("message") or str(error) if isinstance(error, dict) else str(error)
raise CoveImportError(f"Cove EnumerateAccountStatistics failed: {msg}")
result = data.get("result")
if result is None:
return []
# Unwrap possible nested result
if isinstance(result, dict) and "result" in result:
result = result["result"]
# Accounts can be a list directly or wrapped in an "Accounts" key
if isinstance(result, list):
return result
if isinstance(result, dict):
return result.get("Accounts", []) or []
return []
def _flatten_settings(account: dict) -> dict:
"""Convert the Settings array in an account dict to a flat key→value dict.
Cove returns settings as a list of single-key dicts, e.g.:
[{"D09F00": "5"}, {"I1": "device name"}, ...]
"""
flat: dict[str, Any] = {}
settings_list = account.get("Settings") or []
if isinstance(settings_list, list):
for item in settings_list:
if isinstance(item, dict):
flat.update(item)
return flat
def _map_status(code: Any) -> str:
"""Map a Cove status code (int) to a Backupchecks status string."""
if code is None:
return "Warning"
try:
return STATUS_MAP.get(int(code), "Warning")
except (ValueError, TypeError):
return "Warning"
def _status_label(code: Any) -> str:
"""Map a Cove status code (int) to a human-readable label."""
if code is None:
return "Unknown"
try:
return STATUS_LABELS.get(int(code), f"Code {int(code)}")
except (ValueError, TypeError):
return "Unknown"
def _ts_to_dt(value: Any) -> datetime | None:
"""Convert a Unix timestamp (int or str) to a naive UTC datetime."""
if value is None:
return None
try:
ts = int(value)
if ts <= 0:
return None
return datetime.fromtimestamp(ts, tz=timezone.utc).replace(tzinfo=None)
except (ValueError, TypeError, OSError):
return None
def _fmt_utc(dt: datetime | None) -> str:
"""Format a naive UTC datetime to readable text for run object messages."""
if not dt:
return "unknown"
return dt.strftime("%Y-%m-%d %H:%M UTC")
def run_cove_import(settings) -> tuple[int, int, int, int]:
"""Fetch Cove account statistics and update the staging table + JobRuns.
For every account:
- Upsert into cove_accounts (always)
- If the account has a linked job → create a JobRun if not already seen
Args:
settings: SystemSettings ORM object with cove_* fields.
Returns:
Tuple of (total_accounts, created_runs, skipped_runs, error_count).
Raises:
CoveImportError if the API login fails.
"""
url = (getattr(settings, "cove_api_url", None) or "").strip() or COVE_DEFAULT_URL
username = (getattr(settings, "cove_api_username", None) or "").strip()
password = (getattr(settings, "cove_api_password", None) or "").strip()
if not username or not password:
raise CoveImportError("Cove API username or password not configured")
visa, partner_id = _cove_login(url, username, password)
# Save partner_id back to settings
if partner_id and partner_id != getattr(settings, "cove_partner_id", None):
settings.cove_partner_id = partner_id
try:
db.session.commit()
except Exception:
db.session.rollback()
total = 0
created = 0
skipped = 0
errors = 0
page_size = 250
start = 0
while True:
try:
accounts = _cove_enumerate(url, visa, partner_id, start, page_size)
except CoveImportError:
raise
except Exception as exc:
raise CoveImportError(f"Unexpected error fetching accounts at offset {start}: {exc}") from exc
if not accounts:
break
for account in accounts:
total += 1
try:
run_created = _process_account(account)
if run_created:
created += 1
else:
skipped += 1
except Exception as exc:
errors += 1
logger.warning("Cove import: error processing account: %s", exc)
try:
db.session.rollback()
except Exception:
pass
if len(accounts) < page_size:
break
start += page_size
# Update last import timestamp
settings.cove_last_import_at = datetime.utcnow()
try:
db.session.commit()
except Exception:
db.session.rollback()
return total, created, skipped, errors
def _process_account(account: dict) -> bool:
"""Upsert a Cove account into the staging table and create a JobRun if linked.
Returns True if a new JobRun was created, False otherwise.
"""
from .models import CoveAccount, JobRun
flat = _flatten_settings(account)
# AccountId is a top-level field
account_id = account.get("AccountId") or account.get("AccountID")
if not account_id:
return False
try:
account_id = int(account_id)
except (ValueError, TypeError):
return False
# Extract metadata from flat settings
account_name = (flat.get("I1") or "").strip() or None
computer_name = (flat.get("I18") or "").strip() or None
customer_name = (flat.get("I8") or "").strip() or None
datasource_types = (flat.get("I78") or "").strip() or None
# Prefer "last session end" (D09F15); fallback to "last successful session" (D09F09)
# so accounts without D09F15 can still produce an initial run.
last_run_ts_raw = flat.get("D09F15")
last_run_at = _ts_to_dt(last_run_ts_raw)
if last_run_at is None:
last_run_ts_raw = flat.get("D09F09")
last_run_at = _ts_to_dt(last_run_ts_raw)
colorbar_28d = (flat.get("D09F08") or "").strip() or None
try:
last_status_code = int(flat["D09F00"]) if flat.get("D09F00") is not None else None
except (ValueError, TypeError):
last_status_code = None
# Upsert into cove_accounts staging table
cove_acc = CoveAccount.query.filter_by(account_id=account_id).first()
if cove_acc is None:
cove_acc = CoveAccount(
account_id=account_id,
first_seen_at=datetime.utcnow(),
)
db.session.add(cove_acc)
cove_acc.account_name = account_name
cove_acc.computer_name = computer_name
cove_acc.customer_name = customer_name
cove_acc.datasource_types = datasource_types
cove_acc.last_status_code = last_status_code
cove_acc.last_run_at = last_run_at
cove_acc.colorbar_28d = colorbar_28d
cove_acc.last_seen_at = datetime.utcnow()
db.session.flush() # ensure cove_acc.id is set
# If not linked to a job yet, nothing more to do (shows up in Cove Accounts page)
if not cove_acc.job_id:
db.session.commit()
return False
# Account is linked: create a JobRun if the last session is new
if not last_run_at:
db.session.commit()
return False
try:
run_ts = int(last_run_ts_raw or 0)
except (TypeError, ValueError):
run_ts = 0
external_id = f"cove-{account_id}-{run_ts}"
existing = JobRun.query.filter_by(external_id=external_id).first()
if existing:
db.session.commit()
return False
# Fetch the linked job
from .models import Job
job = Job.query.get(cove_acc.job_id)
if not job:
db.session.commit()
return False
status = _map_status(last_status_code)
run_remark = (
f"Cove account: {account_name or account_id} | "
f"Computer: {computer_name or '-'} | "
f"Customer: {customer_name or '-'} | "
f"Last status: {_status_label(last_status_code)} ({last_status_code if last_status_code is not None else '-'}) | "
f"Last run: {_fmt_utc(last_run_at)}"
)
run = JobRun(
job_id=job.id,
mail_message_id=None,
run_at=last_run_at,
status=status,
remark=run_remark,
missed=False,
override_applied=False,
source_type="cove_api",
external_id=external_id,
)
db.session.add(run)
db.session.flush() # get run.id
# Persist per-datasource objects
if job.customer_id:
_persist_datasource_objects(flat, job.customer_id, job.id, run.id, last_run_at)
db.session.commit()
return True
def _persist_datasource_objects(
flat: dict,
customer_id: int,
job_id: int,
run_id: int,
observed_at: datetime,
) -> None:
"""Create run_object_links for each active datasource found in the account stats."""
engine = db.get_engine()
with engine.begin() as conn:
for ds_prefix, ds_label in DATASOURCE_LABELS.items():
status_key = f"{ds_prefix}F00"
status_code = flat.get(status_key)
if status_code is None:
continue
status = _map_status(status_code)
ds_last_ts = _ts_to_dt(flat.get(f"{ds_prefix}F15"))
status_msg = (
f"Cove datasource status: {_status_label(status_code)} "
f"({status_code}); last session: {_fmt_utc(ds_last_ts)}"
)
# Upsert customer_objects
customer_object_id = conn.execute(
text(
"""
INSERT INTO customer_objects (customer_id, object_name, object_type, first_seen_at, last_seen_at)
VALUES (:customer_id, :object_name, :object_type, NOW(), NOW())
ON CONFLICT (customer_id, object_name)
DO UPDATE SET
last_seen_at = NOW(),
object_type = COALESCE(EXCLUDED.object_type, customer_objects.object_type)
RETURNING id
"""
),
{
"customer_id": customer_id,
"object_name": ds_label,
"object_type": "cove_datasource",
},
).scalar()
# Upsert job_object_links
conn.execute(
text(
"""
INSERT INTO job_object_links (job_id, customer_object_id, first_seen_at, last_seen_at)
VALUES (:job_id, :customer_object_id, NOW(), NOW())
ON CONFLICT (job_id, customer_object_id)
DO UPDATE SET last_seen_at = NOW()
"""
),
{"job_id": job_id, "customer_object_id": customer_object_id},
)
# Upsert run_object_links
conn.execute(
text(
"""
INSERT INTO run_object_links (run_id, customer_object_id, status, error_message, observed_at)
VALUES (:run_id, :customer_object_id, :status, :error_message, :observed_at)
ON CONFLICT (run_id, customer_object_id)
DO UPDATE SET
status = EXCLUDED.status,
error_message = EXCLUDED.error_message,
observed_at = EXCLUDED.observed_at
"""
),
{
"run_id": run_id,
"customer_object_id": customer_object_id,
"status": status,
"error_message": status_msg,
"observed_at": ds_last_ts or observed_at,
},
)