Auto-commit local changes before build (2026-01-15 10:12:09)

This commit is contained in:
Ivo Oskamp 2026-01-15 10:12:09 +01:00
parent 48e7830957
commit f6310da575
7 changed files with 361 additions and 2 deletions

View File

@ -1 +1 @@
v20260113-08-vspc-object-linking-normalize v20260115-01-autotask-settings

View File

@ -0,0 +1,129 @@
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlencode
import requests
@dataclass
class AutotaskZoneInfo:
zone_name: str
api_url: str
web_url: Optional[str] = None
ci: Optional[int] = None
class AutotaskError(RuntimeError):
pass
class AutotaskClient:
def __init__(
self,
username: str,
password: str,
api_integration_code: str,
environment: str = "production",
timeout_seconds: int = 30,
) -> None:
self.username = username
self.password = password
self.api_integration_code = api_integration_code
self.environment = (environment or "production").strip().lower()
self.timeout_seconds = timeout_seconds
self._zone_info: Optional[AutotaskZoneInfo] = None
def _zoneinfo_base(self) -> str:
# Production zone lookup endpoint: webservices.autotask.net
# Sandbox is typically pre-release: webservices2.autotask.net
if self.environment == "sandbox":
return "https://webservices2.autotask.net/atservicesrest"
return "https://webservices.autotask.net/atservicesrest"
def get_zone_info(self) -> AutotaskZoneInfo:
if self._zone_info is not None:
return self._zone_info
url = f"{self._zoneinfo_base().rstrip('/')}/v1.0/zoneInformation"
params = {"user": self.username}
try:
resp = requests.get(url, params=params, timeout=self.timeout_seconds)
except Exception as exc:
raise AutotaskError(f"ZoneInformation request failed: {exc}") from exc
if resp.status_code >= 400:
raise AutotaskError(f"ZoneInformation request failed (HTTP {resp.status_code}).")
try:
data = resp.json()
except Exception as exc:
raise AutotaskError("ZoneInformation response is not valid JSON.") from exc
zone = AutotaskZoneInfo(
zone_name=str(data.get("zoneName") or ""),
api_url=str(data.get("url") or "").rstrip("/"),
web_url=(str(data.get("webUrl") or "").rstrip("/") or None),
ci=(int(data["ci"]) if str(data.get("ci") or "").isdigit() else None),
)
if not zone.api_url:
raise AutotaskError("ZoneInformation did not return an API URL.")
self._zone_info = zone
return zone
def _headers(self) -> Dict[str, str]:
# Autotask REST API requires the APIIntegrationcode header for API-only users.
return {
"APIIntegrationcode": self.api_integration_code,
"Content-Type": "application/json",
"Accept": "application/json",
}
def _request(self, method: str, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
zone = self.get_zone_info()
base = zone.api_url.rstrip("/")
url = f"{base}/v1.0/{path.lstrip('/')}"
try:
resp = requests.request(
method=method.upper(),
url=url,
headers=self._headers(),
params=params or None,
auth=(self.username, self.password),
timeout=self.timeout_seconds,
)
except Exception as exc:
raise AutotaskError(f"Request failed: {exc}") from exc
if resp.status_code == 401:
raise AutotaskError("Authentication failed (HTTP 401). Check username/password and APIIntegrationcode.")
if resp.status_code == 403:
raise AutotaskError("Access forbidden (HTTP 403). API user permissions may be insufficient.")
if resp.status_code == 404:
raise AutotaskError(f"Resource not found (HTTP 404) for path: {path}")
if resp.status_code >= 400:
raise AutotaskError(f"Autotask API error (HTTP {resp.status_code}).")
try:
return resp.json()
except Exception as exc:
raise AutotaskError("Autotask API response is not valid JSON.") from exc
def _query_all_first_page(self, entity_name: str) -> List[Dict[str, Any]]:
# Use a simple 'exist' filter on id to return the first page (up to 500 items).
search = {"filter": [{"op": "exist", "field": "id"}]}
params = {"search": json.dumps(search)}
data = self._request("GET", f"{entity_name}/query", params=params)
items = data.get("items") or []
if not isinstance(items, list):
return []
return items
def get_queues(self) -> List[Dict[str, Any]]:
return self._query_all_first_page("Queues")
def get_ticket_sources(self) -> List[Dict[str, Any]]:
return self._query_all_first_page("TicketSources")

View File

@ -1,5 +1,7 @@
from .routes_shared import * # noqa: F401,F403 from .routes_shared import * # noqa: F401,F403
from .routes_shared import _get_database_size_bytes, _get_or_create_settings, _format_bytes, _get_free_disk_bytes, _log_admin_event from .routes_shared import _get_database_size_bytes, _get_or_create_settings, _format_bytes, _get_free_disk_bytes, _log_admin_event
import json
from datetime import datetime
@main_bp.route("/settings/jobs/delete-all", methods=["POST"]) @main_bp.route("/settings/jobs/delete-all", methods=["POST"])
@login_required @login_required
@ -430,6 +432,61 @@ def settings():
if "ui_timezone" in request.form: if "ui_timezone" in request.form:
settings.ui_timezone = (request.form.get("ui_timezone") or "").strip() or "Europe/Amsterdam" settings.ui_timezone = (request.form.get("ui_timezone") or "").strip() or "Europe/Amsterdam"
# Autotask integration
if "autotask_enabled" in request.form:
settings.autotask_enabled = bool(request.form.get("autotask_enabled"))
if "autotask_environment" in request.form:
env_val = (request.form.get("autotask_environment") or "").strip().lower()
if env_val in ("sandbox", "production"):
settings.autotask_environment = env_val
else:
settings.autotask_environment = None
if "autotask_api_username" in request.form:
settings.autotask_api_username = (request.form.get("autotask_api_username") or "").strip() or None
if "autotask_api_password" in request.form:
pw = (request.form.get("autotask_api_password") or "").strip()
if pw:
settings.autotask_api_password = pw
if "autotask_tracking_identifier" in request.form:
settings.autotask_tracking_identifier = (request.form.get("autotask_tracking_identifier") or "").strip() or None
if "autotask_base_url" in request.form:
settings.autotask_base_url = (request.form.get("autotask_base_url") or "").strip() or None
if "autotask_default_queue_id" in request.form:
try:
settings.autotask_default_queue_id = int(request.form.get("autotask_default_queue_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_source_id" in request.form:
try:
settings.autotask_default_ticket_source_id = int(request.form.get("autotask_default_ticket_source_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_status" in request.form:
try:
settings.autotask_default_ticket_status = int(request.form.get("autotask_default_ticket_status") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_priority_warning" in request.form:
try:
settings.autotask_priority_warning = int(request.form.get("autotask_priority_warning") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_priority_error" in request.form:
try:
settings.autotask_priority_error = int(request.form.get("autotask_priority_error") or 0) or None
except (ValueError, TypeError):
pass
# Daily Jobs # Daily Jobs
if "daily_jobs_start_date" in request.form: if "daily_jobs_start_date" in request.form:
daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip() daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip()
@ -537,6 +594,7 @@ def settings():
free_disk_warning = free_disk_bytes < two_gb free_disk_warning = free_disk_bytes < two_gb
has_client_secret = bool(settings.graph_client_secret) has_client_secret = bool(settings.graph_client_secret)
has_autotask_password = bool(getattr(settings, "autotask_api_password", None))
# Common UI timezones (IANA names) # Common UI timezones (IANA names)
tz_options = [ tz_options = [
@ -595,6 +653,23 @@ def settings():
except Exception: except Exception:
admin_users_count = 0 admin_users_count = 0
# Autotask cached reference data for dropdowns
autotask_queues = []
autotask_ticket_sources = []
autotask_last_sync_at = getattr(settings, "autotask_reference_last_sync_at", None)
try:
if getattr(settings, "autotask_cached_queues_json", None):
autotask_queues = json.loads(settings.autotask_cached_queues_json) or []
except Exception:
autotask_queues = []
try:
if getattr(settings, "autotask_cached_ticket_sources_json", None):
autotask_ticket_sources = json.loads(settings.autotask_cached_ticket_sources_json) or []
except Exception:
autotask_ticket_sources = []
return render_template( return render_template(
"main/settings.html", "main/settings.html",
settings=settings, settings=settings,
@ -602,10 +677,14 @@ def settings():
free_disk_human=free_disk_human, free_disk_human=free_disk_human,
free_disk_warning=free_disk_warning, free_disk_warning=free_disk_warning,
has_client_secret=has_client_secret, has_client_secret=has_client_secret,
has_autotask_password=has_autotask_password,
tz_options=tz_options, tz_options=tz_options,
users=users, users=users,
admin_users_count=admin_users_count, admin_users_count=admin_users_count,
section=section, section=section,
autotask_queues=autotask_queues,
autotask_ticket_sources=autotask_ticket_sources,
autotask_last_sync_at=autotask_last_sync_at,
news_admin_items=news_admin_items, news_admin_items=news_admin_items,
news_admin_stats=news_admin_stats, news_admin_stats=news_admin_stats,
) )
@ -1172,3 +1251,90 @@ def settings_folders():
except Exception: except Exception:
pass pass
return jsonify({"status": "error", "message": str(exc) or "Failed to load folders."}), 500 return jsonify({"status": "error", "message": str(exc) or "Failed to load folders."}), 500
@main_bp.route("/settings/autotask/test-connection", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_test_connection():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
zone = client.get_zone_info()
# Lightweight authenticated call to validate credentials
_ = client.get_ticket_sources()
flash(f"Autotask connection OK. Zone: {zone.zone_name or 'unknown'}.", "success")
_log_admin_event("autotask_test_connection", details={"zone": zone.zone_name, "api_url": zone.api_url})
except Exception as exc:
flash(f"Autotask connection failed: {exc}", "danger")
_log_admin_event("autotask_test_connection_failed", details={"error": str(exc)})
return redirect(url_for("main.settings", section="integrations"))
@main_bp.route("/settings/autotask/refresh-reference-data", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_refresh_reference_data():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
queues = client.get_queues()
sources = client.get_ticket_sources()
# Store a minimal subset for dropdowns (id + name/label)
def _norm(items):
out = []
for it in items or []:
if not isinstance(it, dict):
continue
_id = it.get("id")
name = it.get("name") or it.get("label") or it.get("queueName") or it.get("sourceName") or it.get("description") or ""
try:
_id_int = int(_id)
except Exception:
continue
out.append({"id": _id_int, "name": str(name)})
# Sort by name for stable dropdowns
out.sort(key=lambda x: (x.get("name") or "").lower())
return out
settings.autotask_cached_queues_json = json.dumps(_norm(queues))
settings.autotask_cached_ticket_sources_json = json.dumps(_norm(sources))
settings.autotask_reference_last_sync_at = datetime.utcnow()
db.session.commit()
flash(f"Autotask reference data refreshed. Queues: {len(queues)}. Ticket Sources: {len(sources)}.", "success")
_log_admin_event(
"autotask_refresh_reference_data",
details={"queues": len(queues or []), "ticket_sources": len(sources or [])},
)
except Exception as exc:
flash(f"Failed to refresh Autotask reference data: {exc}", "danger")
_log_admin_event("autotask_refresh_reference_data_failed", details={"error": str(exc)})
return redirect(url_for("main.settings", section="integrations"))

View File

@ -127,6 +127,47 @@ def migrate_system_settings_ui_timezone() -> None:
except Exception as exc: except Exception as exc:
print(f"[migrations] Failed to migrate system_settings.ui_timezone: {exc}") print(f"[migrations] Failed to migrate system_settings.ui_timezone: {exc}")
def migrate_system_settings_autotask_integration() -> None:
"""Add Autotask integration columns to system_settings if missing."""
table = "system_settings"
columns = [
("autotask_enabled", "BOOLEAN NOT NULL DEFAULT FALSE"),
("autotask_environment", "VARCHAR(32) NULL"),
("autotask_api_username", "VARCHAR(255) NULL"),
("autotask_api_password", "VARCHAR(255) NULL"),
("autotask_tracking_identifier", "VARCHAR(255) NULL"),
("autotask_base_url", "VARCHAR(512) NULL"),
("autotask_default_queue_id", "INTEGER NULL"),
("autotask_default_ticket_source_id", "INTEGER NULL"),
("autotask_default_ticket_status", "INTEGER NULL"),
("autotask_priority_warning", "INTEGER NULL"),
("autotask_priority_error", "INTEGER NULL"),
("autotask_cached_queues_json", "TEXT NULL"),
("autotask_cached_ticket_sources_json", "TEXT NULL"),
("autotask_reference_last_sync_at", "TIMESTAMP NULL"),
]
try:
engine = db.get_engine()
except Exception as exc:
print(f"[migrations] Could not get engine for system_settings autotask migration: {exc}")
return
try:
with engine.begin() as conn:
for column, ddl in columns:
if _column_exists_on_conn(conn, table, column):
continue
conn.execute(text(f'ALTER TABLE "{table}" ADD COLUMN {column} {ddl}'))
print("[migrations] migrate_system_settings_autotask_integration completed.")
except Exception as exc:
print(f"[migrations] Failed to migrate system_settings autotask integration columns: {exc}")
def migrate_mail_messages_columns() -> None: def migrate_mail_messages_columns() -> None:

View File

@ -107,6 +107,26 @@ class SystemSettings(db.Model):
# UI display timezone (IANA name). Used for rendering times in the web interface. # UI display timezone (IANA name). Used for rendering times in the web interface.
ui_timezone = db.Column(db.String(64), nullable=False, default="Europe/Amsterdam") ui_timezone = db.Column(db.String(64), nullable=False, default="Europe/Amsterdam")
# Autotask integration settings
autotask_enabled = db.Column(db.Boolean, nullable=False, default=False)
autotask_environment = db.Column(db.String(32), nullable=True) # sandbox | production
autotask_api_username = db.Column(db.String(255), nullable=True)
autotask_api_password = db.Column(db.String(255), nullable=True)
autotask_tracking_identifier = db.Column(db.String(255), nullable=True)
autotask_base_url = db.Column(db.String(512), nullable=True) # Backupchecks base URL for deep links
# Autotask defaults (IDs are leading)
autotask_default_queue_id = db.Column(db.Integer, nullable=True)
autotask_default_ticket_source_id = db.Column(db.Integer, nullable=True)
autotask_default_ticket_status = db.Column(db.Integer, nullable=True)
autotask_priority_warning = db.Column(db.Integer, nullable=True)
autotask_priority_error = db.Column(db.Integer, nullable=True)
# Cached reference data (for dropdowns)
autotask_cached_queues_json = db.Column(db.Text, nullable=True)
autotask_cached_ticket_sources_json = db.Column(db.Text, nullable=True)
autotask_reference_last_sync_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
updated_at = db.Column( updated_at = db.Column(
db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False

View File

@ -20,6 +20,9 @@
<li class="nav-item"> <li class="nav-item">
<a class="nav-link {% if section == 'imports' %}active{% endif %}" href="{{ url_for('main.settings', section='imports') }}">Imports</a> <a class="nav-link {% if section == 'imports' %}active{% endif %}" href="{{ url_for('main.settings', section='imports') }}">Imports</a>
</li> </li>
<li class="nav-item">
<a class="nav-link {% if section == 'integrations' %}active{% endif %}" href="{{ url_for('main.settings', section='integrations') }}">Integrations</a>
</li>
<li class="nav-item"> <li class="nav-item">
<a class="nav-link {% if section == 'maintenance' %}active{% endif %}" href="{{ url_for('main.settings', section='maintenance') }}">Maintenance</a> <a class="nav-link {% if section == 'maintenance' %}active{% endif %}" href="{{ url_for('main.settings', section='maintenance') }}">Maintenance</a>
</li> </li>