Fix Cove test script: remove partner field from login, use confirmed columns

Login requires only username + password (no partner field).
Updated column set matches confirmed working columns from Postman testing.
Added per-datasource output and 28-day color bar display.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Ivo Oskamp 2026-02-23 09:52:18 +01:00
parent 6d086a883f
commit a30d51bed0

View File

@ -2,19 +2,17 @@
"""
Cove Data Protection API Test Script
=======================================
Tests the new column codes (D9Fxx, D10Fxx, D11Fxx) after feedback from N-able support.
Verifies which columns are available for backup status monitoring.
Verified working via Postman (2026-02-23). Uses confirmed column codes.
Usage:
python3 cove_api_test.py \
--partner "YourPartnerName" \
--username "api-user" \
--password "secret" \
[--url https://api.backup.management/jsonapi] \
[--partner-id 12345]
python3 cove_api_test.py --username "api-user" --password "secret"
Or via environment variables:
COVE_PARTNER=... COVE_USERNAME=... COVE_PASSWORD=... python3 cove_api_test.py
COVE_USERNAME="api-user" COVE_PASSWORD="secret" python3 cove_api_test.py
Optional:
--url API endpoint (default: https://api.backup.management/jsonapi)
--records Max records to fetch (default: 50)
"""
import argparse
@ -27,76 +25,60 @@ import requests
API_URL = "https://api.backup.management/jsonapi"
# Status codes for D9F00 / F17 / F16
# Session status codes (F00 / F15 / F09)
SESSION_STATUS = {
1: "In process",
2: "Failed",
3: "Aborted",
5: "Completed",
6: "Interrupted",
7: "NotStarted",
8: "CompletedWithErrors",
9: "InProgressWithFaults",
1: "In process",
2: "Failed",
3: "Aborted",
5: "Completed",
6: "Interrupted",
7: "NotStarted",
8: "CompletedWithErrors",
9: "InProgressWithFaults",
10: "OverQuota",
11: "NoSelection",
12: "Restarted",
}
# Column sets to test
COLUMN_SETS = {
"safe_legacy": {
"description": "Previously confirmed working (from original testing)",
"columns": ["I1", "I14", "I18", "D01F00", "D01F01", "D09F00"],
},
"status_and_timestamps": {
"description": "Core backup status and timestamps (D9Fxx new format)",
"columns": [
"I1", "I14", "I18",
"D9F00", # Last Session Status
"D9F06", # Last Session Errors Count
"D9F09", # Last Successful Session Timestamp
"D9F12", # Session Duration
"D9F15", # Last Session Timestamp
"D9F17", # Last Completed Session Status
"D9F18", # Last Completed Session Timestamp
],
},
"d10_vss_mssql": {
"description": "VssMsSql data source (replaces legacy D02/D03)",
"columns": [
"I1", "I18",
"D10F00", # Last Session Status
"D10F09", # Last Successful Session Timestamp
"D10F15", # Last Session Timestamp
],
},
"d11_vss_sharepoint": {
"description": "VssSharePoint data source (replaces legacy D02/D03)",
"columns": [
"I1", "I18",
"D11F00", # Last Session Status
"D11F09", # Last Successful Session Timestamp
"D11F15", # Last Session Timestamp
],
},
"d1_files_folders": {
"description": "Files and Folders data source",
"columns": [
"I1", "I18",
"D1F00", # Last Session Status
"D1F09", # Last Successful Session Timestamp
"D1F15", # Last Session Timestamp
],
},
"full_set": {
"description": "Full set for Backupchecks integration (if all above work)",
"columns": [
"I1", "I14", "I18",
"D9F00", "D9F01", "D9F02", "D9F03", "D9F04", "D9F05",
"D9F06", "D9F07", "D9F08", "D9F09", "D9F10", "D9F11",
"D9F12", "D9F13", "D9F15", "D9F16", "D9F17", "D9F18",
],
},
# Backupchecks status mapping
STATUS_MAP = {
1: "Warning", # In process
2: "Error", # Failed
3: "Error", # Aborted
5: "Success", # Completed
6: "Error", # Interrupted
7: "Warning", # NotStarted
8: "Warning", # CompletedWithErrors
9: "Warning", # InProgressWithFaults
10: "Error", # OverQuota
11: "Warning", # NoSelection
12: "Warning", # Restarted
}
# Confirmed working columns (verified via Postman 2026-02-23)
COLUMNS = [
"I1", "I18", "I8", "I78",
"D09F00", "D09F09", "D09F15", "D09F08",
"D1F00", "D1F15",
"D10F00", "D10F15",
"D11F00", "D11F15",
"D19F00", "D19F15",
"D20F00", "D20F15",
"D5F00", "D5F15",
"D23F00", "D23F15",
]
# Datasource labels
DATASOURCE_LABELS = {
"D09": "Total",
"D1": "Files & Folders",
"D2": "System State",
"D10": "VssMsSql (SQL Server)",
"D11": "VssSharePoint",
"D19": "M365 Exchange",
"D20": "M365 OneDrive",
"D5": "M365 SharePoint",
"D23": "M365 Teams",
}
@ -107,17 +89,16 @@ def _post(url: str, payload: dict, timeout: int = 30) -> dict:
return resp.json()
def login(url: str, partner: str, username: str, password: str) -> tuple[str, int]:
def login(url: str, username: str, password: str) -> tuple[str, int]:
"""Authenticate and return (visa, partner_id)."""
payload = {
"jsonrpc": "2.0",
"id": "jsonrpc",
"method": "Login",
"params": {
"partner": partner,
"username": username,
"password": password,
},
"id": "1",
}
data = _post(url, payload)
@ -128,181 +109,181 @@ def login(url: str, partner: str, username: str, password: str) -> tuple[str, in
if not visa:
raise RuntimeError(f"No visa token in response: {data}")
partner_id = data["result"]["result"]["PartnerId"]
print(f" Login OK PartnerId={partner_id}")
return visa, partner_id
result = data.get("result", {})
partner_id = result.get("PartnerId") or result.get("result", {}).get("PartnerId")
if not partner_id:
raise RuntimeError(f"Could not find PartnerId in response: {data}")
return visa, int(partner_id)
def enumerate_statistics(url: str, visa: str, partner_id: int, columns: list[str]) -> dict:
"""Call EnumerateAccountStatistics with specified columns."""
def enumerate_statistics(url: str, visa: str, partner_id: int, columns: list[str], records: int = 50) -> dict:
payload = {
"jsonrpc": "2.0",
"method": "EnumerateAccountStatistics",
"visa": visa,
"id": "jsonrpc",
"method": "EnumerateAccountStatistics",
"params": {
"query": {
"PartnerId": partner_id,
"SelectionMode": "Merged",
"StartRecordNumber": 0,
"RecordsCount": 20,
"RecordsCount": records,
"Columns": columns,
}
},
"id": "2",
}
return _post(url, payload)
def format_timestamp(value) -> str:
"""Convert Unix timestamp to readable datetime."""
if value is None or value == 0:
return "(empty)"
def fmt_ts(value) -> str:
if not value:
return "(none)"
try:
ts = int(value)
if ts == 0:
return "(empty)"
return "(none)"
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
return dt.strftime("%Y-%m-%d %H:%M UTC")
except (ValueError, TypeError, OSError):
return str(value)
def format_status(value) -> str:
"""Convert numeric session status to description."""
def fmt_status(value) -> str:
if value is None:
return "(empty)"
return "(none)"
try:
code = int(value)
label = SESSION_STATUS.get(code, f"Unknown({code})")
return f"{code} {label}"
bc = STATUS_MAP.get(code, "?")
label = SESSION_STATUS.get(code, f"Unknown")
return f"{code} ({label}) → {bc}"
except (ValueError, TypeError):
return str(value)
def print_section(title: str) -> None:
def fmt_colorbar(value: str) -> str:
if not value:
return "(none)"
icons = {"5": "", "8": "⚠️", "2": "", "1": "🔄", "0": "·"}
return "".join(icons.get(c, c) for c in str(value))
def print_header(title: str) -> None:
print()
print("=" * 60)
print("=" * 70)
print(f" {title}")
print("=" * 60)
print("=" * 70)
def test_column_set(url: str, visa: str, partner_id: int, name: str, info: dict) -> bool:
"""Test a column set. Returns True if successful."""
print(f"\n[TEST] {name}")
print(f" {info['description']}")
print(f" Columns: {', '.join(info['columns'])}")
def run(url: str, username: str, password: str, records: int) -> None:
print_header("Cove Data Protection API Test")
print(f" URL: {url}")
print(f" Username: {username}")
data = enumerate_statistics(url, visa, partner_id, info["columns"])
# Login
print_header("Step 1: Login")
visa, partner_id = login(url, username, password)
print(f" ✅ Login OK")
print(f" PartnerId: {partner_id}")
print(f" Visa: {visa[:40]}...")
# Fetch statistics
print_header("Step 2: EnumerateAccountStatistics")
print(f" Columns: {', '.join(COLUMNS)}")
print(f" Records: max {records}")
data = enumerate_statistics(url, visa, partner_id, COLUMNS, records)
if "error" in data:
err = data["error"]
code = err.get("code", "?")
msg = err.get("message", str(err))
print(f" ❌ FAILED error {code}: {msg}")
return False
print(f" ❌ FAILED error {err.get('code')}: {err.get('message')}")
print(f" Data: {err.get('data')}")
sys.exit(1)
result = data.get("result")
if result is None:
print(" ⚠️ result is null (empty? no accounts?)")
return True # Not a security error, just empty
print(" ⚠️ result is null (no accounts?)")
sys.exit(0)
# Result can be a list directly or wrapped
accounts = result if isinstance(result, list) else result.get("Accounts", [])
if not isinstance(accounts, list):
# Might be a dict with stats
print(f" ✅ SUCCESS response type: {type(result).__name__}")
print(f" Raw (truncated): {json.dumps(result)[:300]}")
return True
total = len(accounts)
print(f" ✅ SUCCESS {total} account(s) returned")
print(f" ✅ SUCCESS {len(accounts)} account(s) returned")
# Per-account output
print_header(f"Step 3: Account Details ({total} total)")
# Show first 3 accounts with parsed values
for i, account in enumerate(accounts[:3]):
print(f"\n Account {i + 1}:")
for col in info["columns"]:
val = account.get(col)
if col.startswith("D9F") and col[3:5] in ("09", "15", "18"):
# Timestamp field
display = format_timestamp(val)
elif col.startswith("D9F") and col[3:5] in ("00", "17", "16"):
# Status field
display = format_status(val)
elif col == "I14" and val is not None:
# Storage bytes
try:
gb = int(val) / (1024 ** 3)
display = f"{val} bytes ({gb:.2f} GB)"
except (ValueError, TypeError):
display = str(val)
else:
display = str(val) if val is not None else "(empty)"
print(f" {col:12s} = {display}")
for i, acc in enumerate(accounts):
device_name = acc.get("I1", "(no name)")
computer = acc.get("I18") or "(M365 tenant)"
customer = acc.get("I8", "")
active_ds = acc.get("I78", "")
if len(accounts) > 3:
print(f" ... and {len(accounts) - 3} more")
print(f"\n [{i+1}/{total}] {device_name}")
print(f" Computer : {computer}")
print(f" Customer : {customer}")
print(f" Datasrc : {active_ds}")
return True
# Total (D09)
d9_status = acc.get("D09F00")
d9_last_ok = acc.get("D09F09")
d9_last = acc.get("D09F15")
d9_bar = acc.get("D09F08")
print(f" Total:")
print(f" Status : {fmt_status(d9_status)}")
print(f" Last session: {fmt_ts(d9_last)}")
print(f" Last success: {fmt_ts(d9_last_ok)}")
print(f" 28-day bar : {fmt_colorbar(d9_bar)}")
def run_tests(url: str, partner: str, username: str, password: str, partner_id_override: int | None) -> None:
print_section("Cove Data Protection API Column Code Test")
print(f" URL: {url}")
print(f" Partner: {partner}")
print(f" User: {username}")
# Per-datasource (only if active)
ds_pairs = [
("D1", "D1F00", "D1F15"),
("D10", "D10F00", "D10F15"),
("D11", "D11F00", "D11F15"),
("D19", "D19F00", "D19F15"),
("D20", "D20F00", "D20F15"),
("D5", "D5F00", "D5F15"),
("D23", "D23F00", "D23F15"),
]
for ds_code, f00_col, f15_col in ds_pairs:
f00 = acc.get(f00_col)
f15 = acc.get(f15_col)
if f00 is None and f15 is None:
continue
label = DATASOURCE_LABELS.get(ds_code, ds_code)
print(f" {label}:")
print(f" Status : {fmt_status(f00)}")
print(f" Last session: {fmt_ts(f15)}")
# Step 1: Login
print_section("Step 1: Authentication")
try:
visa, partner_id = login(url, partner, username, password)
except Exception as exc:
print(f" ❌ Login failed: {exc}")
sys.exit(1)
# Summary
print_header("Summary")
status_counts: dict[str, int] = {}
for acc in accounts:
s = acc.get("D09F00")
bc = STATUS_MAP.get(int(s), "Unknown") if s is not None else "No data"
status_counts[bc] = status_counts.get(bc, 0) + 1
if partner_id_override:
partner_id = partner_id_override
print(f" Using partner_id override: {partner_id}")
# Step 2: Test column sets
print_section("Step 2: Column Code Tests")
results = {}
for name, info in COLUMN_SETS.items():
try:
results[name] = test_column_set(url, visa, partner_id, name, info)
except Exception as exc:
print(f" ❌ Exception: {exc}")
results[name] = False
# Step 3: Summary
print_section("Step 3: Summary")
for name, ok in results.items():
icon = "" if ok else ""
print(f" {icon} {name}")
all_ok = all(results.values())
for status, count in sorted(status_counts.items()):
icon = {"Success": "", "Warning": "⚠️", "Error": ""}.get(status, " ")
print(f" {icon} {status}: {count}")
print(f"\n Total accounts: {total}")
print()
if all_ok:
print(" All column sets work! Ready for full integration.")
else:
failed = [n for n, ok in results.items() if not ok]
print(f" Some column sets failed: {', '.join(failed)}")
print(" Check error details above.")
def main() -> None:
parser = argparse.ArgumentParser(description="Test Cove Data Protection API column codes")
parser.add_argument("--url", default=os.environ.get("COVE_URL", API_URL))
parser.add_argument("--partner", default=os.environ.get("COVE_PARTNER", ""))
parser = argparse.ArgumentParser(description="Test Cove Data Protection API")
parser.add_argument("--url", default=os.environ.get("COVE_URL", API_URL))
parser.add_argument("--username", default=os.environ.get("COVE_USERNAME", ""))
parser.add_argument("--password", default=os.environ.get("COVE_PASSWORD", ""))
parser.add_argument("--partner-id", type=int, default=None, help="Override partner ID")
parser.add_argument("--records", type=int, default=50, help="Max accounts to fetch")
args = parser.parse_args()
if not args.partner or not args.username or not args.password:
print("Error: --partner, --username and --password are required.")
print("Or set COVE_PARTNER, COVE_USERNAME, COVE_PASSWORD environment variables.")
if not args.username or not args.password:
print("Error: --username and --password are required.")
print("Or set COVE_USERNAME and COVE_PASSWORD environment variables.")
sys.exit(1)
run_tests(args.url, args.partner, args.username, args.password, args.partner_id)
run(args.url, args.username, args.password, args.records)
if __name__ == "__main__":