- Add standalone cove_api_test.py to verify new D9Fxx/D10Fxx/D11Fxx column codes - D02/D03 confirmed as legacy by N-able support; D9/D10/D11 should work - Document session status codes (F00) and timestamp fields (F09/F15/F18) - Update TODO and knowledge docs with breakthrough status Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
310 lines
9.9 KiB
Python
310 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Cove Data Protection API – Test Script
|
||
=======================================
|
||
Tests the new column codes (D9Fxx, D10Fxx, D11Fxx) after feedback from N-able support.
|
||
Verifies which columns are available for backup status monitoring.
|
||
|
||
Usage:
|
||
python3 cove_api_test.py \
|
||
--partner "YourPartnerName" \
|
||
--username "api-user" \
|
||
--password "secret" \
|
||
[--url https://api.backup.management/jsonapi] \
|
||
[--partner-id 12345]
|
||
|
||
Or via environment variables:
|
||
COVE_PARTNER=... COVE_USERNAME=... COVE_PASSWORD=... python3 cove_api_test.py
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
|
||
API_URL = "https://api.backup.management/jsonapi"
|
||
|
||
# Status codes for D9F00 / F17 / F16
|
||
SESSION_STATUS = {
|
||
1: "In process",
|
||
2: "Failed",
|
||
3: "Aborted",
|
||
5: "Completed",
|
||
6: "Interrupted",
|
||
7: "NotStarted",
|
||
8: "CompletedWithErrors",
|
||
9: "InProgressWithFaults",
|
||
10: "OverQuota",
|
||
11: "NoSelection",
|
||
12: "Restarted",
|
||
}
|
||
|
||
# Column sets to test
|
||
COLUMN_SETS = {
|
||
"safe_legacy": {
|
||
"description": "Previously confirmed working (from original testing)",
|
||
"columns": ["I1", "I14", "I18", "D01F00", "D01F01", "D09F00"],
|
||
},
|
||
"status_and_timestamps": {
|
||
"description": "Core backup status and timestamps (D9Fxx – new format)",
|
||
"columns": [
|
||
"I1", "I14", "I18",
|
||
"D9F00", # Last Session Status
|
||
"D9F06", # Last Session Errors Count
|
||
"D9F09", # Last Successful Session Timestamp
|
||
"D9F12", # Session Duration
|
||
"D9F15", # Last Session Timestamp
|
||
"D9F17", # Last Completed Session Status
|
||
"D9F18", # Last Completed Session Timestamp
|
||
],
|
||
},
|
||
"d10_vss_mssql": {
|
||
"description": "VssMsSql data source (replaces legacy D02/D03)",
|
||
"columns": [
|
||
"I1", "I18",
|
||
"D10F00", # Last Session Status
|
||
"D10F09", # Last Successful Session Timestamp
|
||
"D10F15", # Last Session Timestamp
|
||
],
|
||
},
|
||
"d11_vss_sharepoint": {
|
||
"description": "VssSharePoint data source (replaces legacy D02/D03)",
|
||
"columns": [
|
||
"I1", "I18",
|
||
"D11F00", # Last Session Status
|
||
"D11F09", # Last Successful Session Timestamp
|
||
"D11F15", # Last Session Timestamp
|
||
],
|
||
},
|
||
"d1_files_folders": {
|
||
"description": "Files and Folders data source",
|
||
"columns": [
|
||
"I1", "I18",
|
||
"D1F00", # Last Session Status
|
||
"D1F09", # Last Successful Session Timestamp
|
||
"D1F15", # Last Session Timestamp
|
||
],
|
||
},
|
||
"full_set": {
|
||
"description": "Full set for Backupchecks integration (if all above work)",
|
||
"columns": [
|
||
"I1", "I14", "I18",
|
||
"D9F00", "D9F01", "D9F02", "D9F03", "D9F04", "D9F05",
|
||
"D9F06", "D9F07", "D9F08", "D9F09", "D9F10", "D9F11",
|
||
"D9F12", "D9F13", "D9F15", "D9F16", "D9F17", "D9F18",
|
||
],
|
||
},
|
||
}
|
||
|
||
|
||
def _post(url: str, payload: dict, timeout: int = 30) -> dict:
|
||
headers = {"Content-Type": "application/json"}
|
||
resp = requests.post(url, json=payload, headers=headers, timeout=timeout)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
|
||
def login(url: str, partner: str, username: str, password: str) -> tuple[str, int]:
|
||
"""Authenticate and return (visa, partner_id)."""
|
||
payload = {
|
||
"jsonrpc": "2.0",
|
||
"method": "Login",
|
||
"params": {
|
||
"partner": partner,
|
||
"username": username,
|
||
"password": password,
|
||
},
|
||
"id": "1",
|
||
}
|
||
data = _post(url, payload)
|
||
|
||
if "error" in data:
|
||
raise RuntimeError(f"Login failed: {data['error']}")
|
||
|
||
visa = data.get("visa")
|
||
if not visa:
|
||
raise RuntimeError(f"No visa token in response: {data}")
|
||
|
||
partner_id = data["result"]["result"]["PartnerId"]
|
||
print(f" Login OK – PartnerId={partner_id}")
|
||
return visa, partner_id
|
||
|
||
|
||
def enumerate_statistics(url: str, visa: str, partner_id: int, columns: list[str]) -> dict:
|
||
"""Call EnumerateAccountStatistics with specified columns."""
|
||
payload = {
|
||
"jsonrpc": "2.0",
|
||
"method": "EnumerateAccountStatistics",
|
||
"visa": visa,
|
||
"params": {
|
||
"query": {
|
||
"PartnerId": partner_id,
|
||
"SelectionMode": "Merged",
|
||
"StartRecordNumber": 0,
|
||
"RecordsCount": 20,
|
||
"Columns": columns,
|
||
}
|
||
},
|
||
"id": "2",
|
||
}
|
||
return _post(url, payload)
|
||
|
||
|
||
def format_timestamp(value) -> str:
|
||
"""Convert Unix timestamp to readable datetime."""
|
||
if value is None or value == 0:
|
||
return "(empty)"
|
||
try:
|
||
ts = int(value)
|
||
if ts == 0:
|
||
return "(empty)"
|
||
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
|
||
except (ValueError, TypeError, OSError):
|
||
return str(value)
|
||
|
||
|
||
def format_status(value) -> str:
|
||
"""Convert numeric session status to description."""
|
||
if value is None:
|
||
return "(empty)"
|
||
try:
|
||
code = int(value)
|
||
label = SESSION_STATUS.get(code, f"Unknown({code})")
|
||
return f"{code} – {label}"
|
||
except (ValueError, TypeError):
|
||
return str(value)
|
||
|
||
|
||
def print_section(title: str) -> None:
|
||
print()
|
||
print("=" * 60)
|
||
print(f" {title}")
|
||
print("=" * 60)
|
||
|
||
|
||
def test_column_set(url: str, visa: str, partner_id: int, name: str, info: dict) -> bool:
|
||
"""Test a column set. Returns True if successful."""
|
||
print(f"\n[TEST] {name}")
|
||
print(f" {info['description']}")
|
||
print(f" Columns: {', '.join(info['columns'])}")
|
||
|
||
data = enumerate_statistics(url, visa, partner_id, info["columns"])
|
||
|
||
if "error" in data:
|
||
err = data["error"]
|
||
code = err.get("code", "?")
|
||
msg = err.get("message", str(err))
|
||
print(f" ❌ FAILED – error {code}: {msg}")
|
||
return False
|
||
|
||
result = data.get("result")
|
||
if result is None:
|
||
print(" ⚠️ result is null (empty? no accounts?)")
|
||
return True # Not a security error, just empty
|
||
|
||
accounts = result if isinstance(result, list) else result.get("Accounts", [])
|
||
if not isinstance(accounts, list):
|
||
# Might be a dict with stats
|
||
print(f" ✅ SUCCESS – response type: {type(result).__name__}")
|
||
print(f" Raw (truncated): {json.dumps(result)[:300]}")
|
||
return True
|
||
|
||
print(f" ✅ SUCCESS – {len(accounts)} account(s) returned")
|
||
|
||
# Show first 3 accounts with parsed values
|
||
for i, account in enumerate(accounts[:3]):
|
||
print(f"\n Account {i + 1}:")
|
||
for col in info["columns"]:
|
||
val = account.get(col)
|
||
if col.startswith("D9F") and col[3:5] in ("09", "15", "18"):
|
||
# Timestamp field
|
||
display = format_timestamp(val)
|
||
elif col.startswith("D9F") and col[3:5] in ("00", "17", "16"):
|
||
# Status field
|
||
display = format_status(val)
|
||
elif col == "I14" and val is not None:
|
||
# Storage bytes
|
||
try:
|
||
gb = int(val) / (1024 ** 3)
|
||
display = f"{val} bytes ({gb:.2f} GB)"
|
||
except (ValueError, TypeError):
|
||
display = str(val)
|
||
else:
|
||
display = str(val) if val is not None else "(empty)"
|
||
print(f" {col:12s} = {display}")
|
||
|
||
if len(accounts) > 3:
|
||
print(f" ... and {len(accounts) - 3} more")
|
||
|
||
return True
|
||
|
||
|
||
def run_tests(url: str, partner: str, username: str, password: str, partner_id_override: int | None) -> None:
|
||
print_section("Cove Data Protection API – Column Code Test")
|
||
print(f" URL: {url}")
|
||
print(f" Partner: {partner}")
|
||
print(f" User: {username}")
|
||
|
||
# Step 1: Login
|
||
print_section("Step 1: Authentication")
|
||
try:
|
||
visa, partner_id = login(url, partner, username, password)
|
||
except Exception as exc:
|
||
print(f" ❌ Login failed: {exc}")
|
||
sys.exit(1)
|
||
|
||
if partner_id_override:
|
||
partner_id = partner_id_override
|
||
print(f" Using partner_id override: {partner_id}")
|
||
|
||
# Step 2: Test column sets
|
||
print_section("Step 2: Column Code Tests")
|
||
|
||
results = {}
|
||
for name, info in COLUMN_SETS.items():
|
||
try:
|
||
results[name] = test_column_set(url, visa, partner_id, name, info)
|
||
except Exception as exc:
|
||
print(f" ❌ Exception: {exc}")
|
||
results[name] = False
|
||
|
||
# Step 3: Summary
|
||
print_section("Step 3: Summary")
|
||
for name, ok in results.items():
|
||
icon = "✅" if ok else "❌"
|
||
print(f" {icon} {name}")
|
||
|
||
all_ok = all(results.values())
|
||
print()
|
||
if all_ok:
|
||
print(" All column sets work! Ready for full integration.")
|
||
else:
|
||
failed = [n for n, ok in results.items() if not ok]
|
||
print(f" Some column sets failed: {', '.join(failed)}")
|
||
print(" Check error details above.")
|
||
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(description="Test Cove Data Protection API column codes")
|
||
parser.add_argument("--url", default=os.environ.get("COVE_URL", API_URL))
|
||
parser.add_argument("--partner", default=os.environ.get("COVE_PARTNER", ""))
|
||
parser.add_argument("--username", default=os.environ.get("COVE_USERNAME", ""))
|
||
parser.add_argument("--password", default=os.environ.get("COVE_PASSWORD", ""))
|
||
parser.add_argument("--partner-id", type=int, default=None, help="Override partner ID")
|
||
args = parser.parse_args()
|
||
|
||
if not args.partner or not args.username or not args.password:
|
||
print("Error: --partner, --username and --password are required.")
|
||
print("Or set COVE_PARTNER, COVE_USERNAME, COVE_PASSWORD environment variables.")
|
||
sys.exit(1)
|
||
|
||
run_tests(args.url, args.partner, args.username, args.password, args.partner_id)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|