backupchecks/containers/backupchecks/src/backend/app/main/routes_settings.py

1458 lines
55 KiB
Python

from .routes_shared import * # noqa: F401,F403
from .routes_shared import _get_database_size_bytes, _get_or_create_settings, _format_bytes, _get_free_disk_bytes, _log_admin_event
import json
from datetime import datetime
@main_bp.route("/settings/jobs/delete-all", methods=["POST"])
@login_required
@roles_required("admin")
def settings_jobs_delete_all():
try:
jobs = Job.query.all()
if not jobs:
flash("No jobs to delete.", "info")
return redirect(url_for("main.settings", section="general"))
# Collect run ids for FK cleanup in auxiliary tables that may not have ON DELETE CASCADE
run_ids = []
mail_message_ids = []
for job in jobs:
for run in job.runs:
if run.id is not None:
run_ids.append(run.id)
if run.mail_message_id:
mail_message_ids.append(run.mail_message_id)
# Return related mails back to inbox and unlink from job
if mail_message_ids:
msgs = MailMessage.query.filter(MailMessage.id.in_(mail_message_ids)).all()
for msg in msgs:
if hasattr(msg, "location"):
msg.location = "inbox"
msg.job_id = None
def _safe_execute(stmt, params):
try:
db.session.execute(stmt, params)
except Exception as cleanup_exc:
# Best-effort cleanup for differing DB schemas
print(f"[settings-jobs] Cleanup skipped: {cleanup_exc}")
# Ensure run_object_links doesn't block job_runs deletion (older schemas may miss ON DELETE CASCADE)
if run_ids:
db.session.execute(
text("DELETE FROM run_object_links WHERE run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
# Ensure job_object_links doesn't block jobs deletion (older schemas may miss ON DELETE CASCADE)
job_ids = [j.id for j in jobs]
if job_ids:
db.session.execute(
text("DELETE FROM job_object_links WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Clean up auxiliary FK tables that may reference job_runs/jobs without ON DELETE CASCADE (older schemas)
if run_ids:
_safe_execute(
text("DELETE FROM remark_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
_safe_execute(
text("DELETE FROM ticket_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
# Some schemas use remark_scopes for per-run remarks
_safe_execute(
text("DELETE FROM remark_scopes WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
if job_ids:
# ticket_scopes.job_id is a FK without ON DELETE CASCADE in some schemas
_safe_execute(
text("DELETE FROM ticket_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Some schemas use remark_scopes for per-job remarks
_safe_execute(
text("DELETE FROM remark_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Overrides may reference jobs directly
_safe_execute(
text("DELETE FROM overrides WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Delete all jobs (runs/objects are cascaded via ORM relationships)
for job in jobs:
db.session.delete(job)
db.session.commit()
flash("All jobs deleted. Related mails are returned to the inbox.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings-jobs] Failed to delete all jobs: {exc}")
flash("Failed to delete all jobs.", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings/objects/backfill", methods=["POST"])
@login_required
@roles_required("admin")
def settings_objects_backfill():
"""Backfill object persistence tables for existing approved runs.
This repairs cases where mail_objects exist but run_object_links/job_object_links/customer_objects were not created.
"""
engine = db.get_engine()
# Select runs that have mail_objects but no run_object_links yet
rows = []
try:
with engine.begin() as conn:
rows = conn.execute(
text(
"""
SELECT
jr.id AS run_id,
jr.job_id AS job_id,
j.customer_id AS customer_id,
jr.mail_message_id AS mail_message_id
FROM job_runs jr
JOIN jobs j ON j.id = jr.job_id
WHERE jr.mail_message_id IS NOT NULL
AND EXISTS (
SELECT 1 FROM mail_objects mo WHERE mo.mail_message_id = jr.mail_message_id
)
AND NOT EXISTS (
SELECT 1 FROM run_object_links rol WHERE rol.run_id = jr.id
)
ORDER BY jr.id DESC
"""
)
).fetchall()
except Exception as exc:
flash("Backfill failed while selecting runs.", "danger")
_log_admin_event("object_backfill_error", f"Backfill select failed: {exc}")
return redirect(url_for("main.settings", section="general"))
total = len(rows)
repaired_runs = 0
repaired_objects = 0
errors = 0
for r in rows:
try:
repaired_objects += persist_objects_for_auto_run(
int(r[2]), int(r[1]), int(r[0]), int(r[3])
)
repaired_runs += 1
except Exception as exc:
errors += 1
_log_admin_event(
"object_backfill_run_error",
f"Backfill failed for run {r[0]} (job {r[1]}, message {r[3]}): {exc}",
)
_log_admin_event(
"object_backfill",
f"Backfill finished. candidates={total}, repaired_runs={repaired_runs}, objects={repaired_objects}, errors={errors}",
)
if total == 0:
flash("No runs needed backfill.", "info")
else:
if errors == 0:
flash(f"Backfill complete. Repaired {repaired_runs} runs.", "success")
else:
flash(
f"Backfill complete with errors. Repaired {repaired_runs} runs, errors: {errors}.",
"warning",
)
return redirect(url_for("main.settings"))
@main_bp.route("/settings/jobs/export", methods=["GET"])
@login_required
@roles_required("admin")
def settings_jobs_export():
try:
jobs = Job.query.all()
payload = {
"schema": "approved_jobs_export_v1",
"exported_at": datetime.utcnow().isoformat() + "Z",
"counts": {"customers": 0, "jobs": 0},
"customers": [],
"jobs": [],
}
# Collect customers referenced by jobs (and ensure stable name mapping)
customer_by_id = {}
for job in jobs:
if job.customer_id and job.customer and job.customer.name:
customer_by_id[job.customer_id] = job.customer.name
payload["customers"] = [{"name": name} for _, name in sorted(customer_by_id.items(), key=lambda x: x[1].lower())]
for job in jobs:
payload["jobs"].append(
{
"customer_name": customer_by_id.get(job.customer_id),
"from_address": getattr(job, "from_address", None),
"backup_software": job.backup_software,
"backup_type": job.backup_type,
"job_name": job.job_name,
"schedule_type": job.schedule_type,
"schedule_days_of_week": job.schedule_days_of_week,
"schedule_day_of_month": job.schedule_day_of_month,
"schedule_times": job.schedule_times,
"auto_approve": bool(job.auto_approve),
"active": bool(job.active),
}
)
payload["counts"]["customers"] = len(payload["customers"])
payload["counts"]["jobs"] = len(payload["jobs"])
filename = f"approved-jobs-export-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}.json"
blob = json.dumps(payload, indent=2, ensure_ascii=False).encode("utf-8")
return send_file(
io.BytesIO(blob),
mimetype="application/json",
as_attachment=True,
download_name=filename,
)
except Exception as exc:
print(f"[settings-jobs] Export failed: {exc}")
flash("Export failed.", "danger")
return redirect(url_for("main.settings", section="general"))
@main_bp.route("/settings/jobs/import", methods=["POST"])
@login_required
@roles_required("admin")
def settings_jobs_import():
upload = request.files.get("jobs_file")
if not upload or not upload.filename:
flash("No import file was provided.", "danger")
return redirect(url_for("main.settings", section="general"))
try:
raw = upload.read()
payload = json.loads(raw.decode("utf-8"))
except Exception:
flash("Invalid JSON file.", "danger")
return redirect(url_for("main.settings", section="general"))
if not isinstance(payload, dict) or payload.get("schema") != "approved_jobs_export_v1":
flash("Unsupported import file schema.", "danger")
return redirect(url_for("main.settings", section="general"))
jobs = payload.get("jobs") or []
if not isinstance(jobs, list):
flash("Invalid import file format (jobs).", "danger")
return redirect(url_for("main.settings", section="general"))
created_customers = 0
created_jobs = 0
updated_jobs = 0
try:
for item in jobs:
if not isinstance(item, dict):
continue
customer_name = (item.get("customer_name") or "").strip()
if not customer_name:
# jobs without customer are allowed, but we cannot map them meaningfully
customer = None
else:
customer = Customer.query.filter_by(name=customer_name).first()
if not customer:
customer = Customer(name=customer_name, active=True)
db.session.add(customer)
db.session.flush()
created_customers += 1
backup_software = (item.get("backup_software") or "").strip() or None
backup_type = (item.get("backup_type") or "").strip() or None
from_address = normalize_from_address(item.get("from_address"))
job_name = (item.get("job_name") or "").strip() or None
# Match existing job using the same key we show in the UI
existing = None
q = Job.query
if customer and customer.id:
q = q.filter(Job.customer_id == customer.id)
else:
q = q.filter(Job.customer_id.is_(None))
if from_address is None:
q = q.filter(Job.from_address.is_(None))
else:
q = q.filter(func.lower(Job.from_address) == from_address)
q = q.filter(Job.backup_software == backup_software)
q = q.filter(Job.backup_type == backup_type)
q = q.filter(Job.job_name == job_name)
existing = q.first()
def _bool(val, default=False):
if val is None:
return default
if isinstance(val, bool):
return val
if isinstance(val, (int, float)):
return bool(val)
if isinstance(val, str):
v = val.strip().lower()
if v in ("1", "true", "yes", "y", "on"):
return True
if v in ("0", "false", "no", "n", "off"):
return False
return default
schedule_type = (item.get("schedule_type") or "").strip() or None
schedule_days_of_week = (item.get("schedule_days_of_week") or "").strip() or None
schedule_times = (item.get("schedule_times") or "").strip() or None
schedule_day_of_month = item.get("schedule_day_of_month")
if schedule_day_of_month in ("", None):
schedule_day_of_month = None
else:
try:
schedule_day_of_month = int(schedule_day_of_month)
except Exception:
schedule_day_of_month = None
auto_approve = _bool(item.get("auto_approve"), default=True)
active = _bool(item.get("active"), default=True)
if existing:
if hasattr(existing, "from_address"):
existing.from_address = from_address
existing.schedule_type = schedule_type
existing.schedule_days_of_week = schedule_days_of_week
existing.schedule_day_of_month = schedule_day_of_month
existing.schedule_times = schedule_times
existing.auto_approve = auto_approve
existing.active = active
updated_jobs += 1
else:
job_kwargs = {
"customer_id": (customer.id if customer else None),
"backup_software": backup_software,
"backup_type": backup_type,
"job_name": job_name,
"schedule_type": schedule_type,
"schedule_days_of_week": schedule_days_of_week,
"schedule_day_of_month": schedule_day_of_month,
"schedule_times": schedule_times,
"auto_approve": auto_approve,
"active": active,
}
# Include from_address in the persisted job key so re-parse matching works after import
if hasattr(Job, "from_address"):
job_kwargs["from_address"] = from_address
new_job = Job(**job_kwargs)
db.session.add(new_job)
created_jobs += 1
db.session.commit()
flash(
f"Import completed. Customers created: {created_customers}. Jobs created: {created_jobs}. Jobs updated: {updated_jobs}.",
"success",
)
except Exception as exc:
db.session.rollback()
print(f"[settings-jobs] Import failed: {exc}")
flash("Import failed.", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings", methods=["GET", "POST"])
@login_required
@roles_required("admin")
def settings():
settings = _get_or_create_settings()
section = (request.args.get("section") or "general").strip().lower() or "general"
if request.method == "POST":
autotask_form_touched = any(str(k).startswith("autotask_") for k in (request.form or {}).keys())
# NOTE: The Settings UI has multiple tabs with separate forms.
# Only update values that are present in the submitted form, to avoid
# clearing unrelated settings when saving from another tab.
if "graph_tenant_id" in request.form:
settings.graph_tenant_id = (request.form.get("graph_tenant_id") or "").strip() or None
if "graph_client_id" in request.form:
settings.graph_client_id = (request.form.get("graph_client_id") or "").strip() or None
if "graph_mailbox" in request.form:
settings.graph_mailbox = (request.form.get("graph_mailbox") or "").strip() or None
if "graph_client_secret" in request.form:
client_secret = (request.form.get("graph_client_secret") or "").strip()
if client_secret:
settings.graph_client_secret = client_secret
if "incoming_folder" in request.form:
settings.incoming_folder = (request.form.get("incoming_folder") or "").strip() or None
if "processed_folder" in request.form:
settings.processed_folder = (request.form.get("processed_folder") or "").strip() or None
# UI display
if "ui_timezone" in request.form:
settings.ui_timezone = (request.form.get("ui_timezone") or "").strip() or "Europe/Amsterdam"
# Autotask integration
if "autotask_enabled" in request.form:
settings.autotask_enabled = bool(request.form.get("autotask_enabled"))
if "autotask_environment" in request.form:
env_val = (request.form.get("autotask_environment") or "").strip().lower()
if env_val in ("sandbox", "production"):
settings.autotask_environment = env_val
else:
settings.autotask_environment = None
if "autotask_api_username" in request.form:
settings.autotask_api_username = (request.form.get("autotask_api_username") or "").strip() or None
if "autotask_api_password" in request.form:
pw = (request.form.get("autotask_api_password") or "").strip()
if pw:
settings.autotask_api_password = pw
if "autotask_tracking_identifier" in request.form:
settings.autotask_tracking_identifier = (request.form.get("autotask_tracking_identifier") or "").strip() or None
if "autotask_base_url" in request.form:
settings.autotask_base_url = (request.form.get("autotask_base_url") or "").strip() or None
if "autotask_default_queue_id" in request.form:
try:
settings.autotask_default_queue_id = int(request.form.get("autotask_default_queue_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_source_id" in request.form:
try:
settings.autotask_default_ticket_source_id = int(request.form.get("autotask_default_ticket_source_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_status" in request.form:
try:
settings.autotask_default_ticket_status = int(request.form.get("autotask_default_ticket_status") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_priority_warning" in request.form:
try:
settings.autotask_priority_warning = int(request.form.get("autotask_priority_warning") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_priority_error" in request.form:
try:
settings.autotask_priority_error = int(request.form.get("autotask_priority_error") or 0) or None
except (ValueError, TypeError):
pass
# Daily Jobs
if "daily_jobs_start_date" in request.form:
daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip()
if daily_jobs_start_date_str:
try:
settings.daily_jobs_start_date = datetime.strptime(daily_jobs_start_date_str, "%Y-%m-%d").date()
except Exception:
settings.daily_jobs_start_date = None
else:
settings.daily_jobs_start_date = None
# Import configuration
if "auto_import_enabled" in request.form:
settings.auto_import_enabled = bool(request.form.get("auto_import_enabled"))
if "auto_import_interval_minutes" in request.form:
try:
settings.auto_import_interval_minutes = int(
request.form.get("auto_import_interval_minutes") or settings.auto_import_interval_minutes
)
except (ValueError, TypeError):
pass
if "auto_import_cutoff_date" in request.form:
auto_import_cutoff_date_str = (request.form.get("auto_import_cutoff_date") or "").strip()
if auto_import_cutoff_date_str:
try:
settings.auto_import_cutoff_date = datetime.strptime(auto_import_cutoff_date_str, "%Y-%m-%d").date()
except Exception:
settings.auto_import_cutoff_date = None
else:
settings.auto_import_cutoff_date = None
if (
"auto_import_enabled" in request.form
or "auto_import_interval_minutes" in request.form
or "auto_import_cutoff_date" in request.form
or "manual_import_batch_size" in request.form
or "ingest_eml_retention_days" in request.form
):
# Automatic importer batch size is fixed at 50
settings.auto_import_max_items = 50
if "ingest_eml_retention_days" in request.form:
try:
settings.ingest_eml_retention_days = int(
request.form.get("ingest_eml_retention_days") or getattr(settings, "ingest_eml_retention_days", 7)
)
except (ValueError, TypeError):
pass
if settings.ingest_eml_retention_days not in (0, 7, 14):
settings.ingest_eml_retention_days = 7
if "manual_import_batch_size" in request.form:
try:
settings.manual_import_batch_size = int(
request.form.get("manual_import_batch_size") or settings.manual_import_batch_size
)
except (ValueError, TypeError):
pass
# Manual import batch size is configurable but limited to 50
try:
settings.manual_import_batch_size = int(settings.manual_import_batch_size or 50)
except (ValueError, TypeError):
settings.manual_import_batch_size = 50
if settings.manual_import_batch_size < 1:
settings.manual_import_batch_size = 1
if settings.manual_import_batch_size > 50:
settings.manual_import_batch_size = 50
try:
db.session.commit()
flash("Settings have been saved.", "success")
# Autotask ticket defaults depend on reference data (queues, sources, statuses, priorities).
# When the Autotask integration is (re)configured, auto-refresh the cached reference data
# once so the dropdowns become usable immediately.
try:
if (
autotask_form_touched
and bool(getattr(settings, "autotask_enabled", False))
and bool(getattr(settings, "autotask_api_username", None))
and bool(getattr(settings, "autotask_api_password", None))
and bool(getattr(settings, "autotask_tracking_identifier", None))
):
missing_cache = (
not bool(getattr(settings, "autotask_cached_queues_json", None))
or not bool(getattr(settings, "autotask_cached_ticket_sources_json", None))
or not bool(getattr(settings, "autotask_cached_ticket_statuses_json", None))
or not bool(getattr(settings, "autotask_cached_priorities_json", None))
)
if missing_cache:
queues, sources, statuses, pr_out = _refresh_autotask_reference_data(settings)
db.session.commit()
flash(
f"Autotask reference data refreshed. Queues: {len(queues)}. Ticket Sources: {len(sources)}. Ticket Statuses: {len(statuses)}. Priorities: {len(pr_out)}.",
"success",
)
_log_admin_event(
"autotask_reference_data_auto_refreshed",
"Autotask reference data auto-refreshed after settings save.",
details=json.dumps({"queues": len(queues or []), "ticket_sources": len(sources or []), "ticket_statuses": len(statuses or []), "priorities": len(pr_out)}),
)
except Exception as exc:
try:
db.session.rollback()
except Exception:
pass
flash(f"Autotask reference data refresh failed: {exc}", "warning")
_log_admin_event(
"autotask_reference_data_auto_refresh_failed",
"Autotask reference data auto-refresh failed after settings save.",
details=json.dumps({"error": str(exc)}),
)
# If EML storage has been turned off, clear any stored blobs immediately.
try:
if getattr(settings, "ingest_eml_retention_days", 7) == 0:
MailMessage.query.filter(MailMessage.eml_blob.isnot(None)).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to clear stored EML blobs: {exc}")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to save settings: {exc}")
flash("Failed to save settings.", "danger")
return redirect(url_for("main.settings", section=section))
db_size_bytes = _get_database_size_bytes()
free_disk_bytes = _get_free_disk_bytes()
db_size_human = _format_bytes(db_size_bytes) if db_size_bytes is not None else "unknown"
free_disk_human = _format_bytes(free_disk_bytes) if free_disk_bytes is not None else "unknown"
free_disk_warning = False
if free_disk_bytes is not None:
two_gb = 2 * 1024 * 1024 * 1024
free_disk_warning = free_disk_bytes < two_gb
has_client_secret = bool(settings.graph_client_secret)
has_autotask_password = bool(getattr(settings, "autotask_api_password", None))
# Common UI timezones (IANA names)
tz_options = [
"Europe/Amsterdam",
"UTC",
"Europe/London",
"Europe/Paris",
"Europe/Berlin",
"Europe/Brussels",
"America/New_York",
"America/Chicago",
"America/Denver",
"America/Los_Angeles",
"Asia/Tokyo",
"Asia/Singapore",
]
# News admin data (only when requested)
news_admin_items = []
news_admin_stats = {}
total_users = 0
if section == "news":
try:
total_users = int(User.query.count() or 0)
except Exception:
total_users = 0
try:
news_admin_items = NewsItem.query.order_by(
NewsItem.pinned.desc(),
NewsItem.publish_from.desc().nullslast(),
NewsItem.created_at.desc(),
).all()
except Exception:
news_admin_items = []
try:
for item in news_admin_items:
read_count = int(NewsRead.query.filter_by(news_item_id=item.id).count() or 0)
news_admin_stats[item.id] = {
"read": read_count,
"unread": max(total_users - read_count, 0),
"total": total_users,
}
except Exception:
news_admin_stats = {}
users = User.query.order_by(User.username.asc()).all()
# Count users that have 'admin' among their assigned roles (comma-separated storage)
admin_users_count = 0
try:
admin_users_count = sum(1 for u in users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_users_count = 0
# Autotask cached reference data for dropdowns
autotask_queues = []
autotask_ticket_sources = []
autotask_priorities = []
autotask_ticket_statuses = []
autotask_last_sync_at = getattr(settings, "autotask_reference_last_sync_at", None)
try:
if getattr(settings, "autotask_cached_queues_json", None):
autotask_queues = json.loads(settings.autotask_cached_queues_json) or []
except Exception:
autotask_queues = []
try:
if getattr(settings, "autotask_cached_ticket_sources_json", None):
autotask_ticket_sources = json.loads(settings.autotask_cached_ticket_sources_json) or []
except Exception:
autotask_ticket_sources = []
try:
if getattr(settings, "autotask_cached_priorities_json", None):
autotask_priorities = json.loads(settings.autotask_cached_priorities_json) or []
except Exception:
autotask_priorities = []
try:
if getattr(settings, "autotask_cached_ticket_statuses_json", None):
autotask_ticket_statuses = json.loads(settings.autotask_cached_ticket_statuses_json) or []
except Exception:
autotask_ticket_statuses = []
return render_template(
"main/settings.html",
settings=settings,
db_size_human=db_size_human,
free_disk_human=free_disk_human,
free_disk_warning=free_disk_warning,
has_client_secret=has_client_secret,
has_autotask_password=has_autotask_password,
tz_options=tz_options,
users=users,
admin_users_count=admin_users_count,
section=section,
autotask_queues=autotask_queues,
autotask_ticket_sources=autotask_ticket_sources,
autotask_priorities=autotask_priorities,
autotask_ticket_statuses=autotask_ticket_statuses,
autotask_last_sync_at=autotask_last_sync_at,
news_admin_items=news_admin_items,
news_admin_stats=news_admin_stats,
)
@main_bp.route("/settings/news/create", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_create():
title = (request.form.get("title") or "").strip()
body = (request.form.get("body") or "").strip()
link_url = (request.form.get("link_url") or "").strip() or None
severity = (request.form.get("severity") or "info").strip().lower() or "info"
pinned = bool(request.form.get("pinned"))
active = bool(request.form.get("active"))
publish_from_str = (request.form.get("publish_from") or "").strip()
publish_until_str = (request.form.get("publish_until") or "").strip()
publish_from = None
publish_until = None
try:
if publish_from_str:
publish_from = datetime.strptime(publish_from_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_from = None
try:
if publish_until_str:
publish_until = datetime.strptime(publish_until_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_until = None
if not title or not body:
flash("Title and body are required.", "danger")
return redirect(url_for("main.settings", section="news"))
item = NewsItem(
title=title,
body=body,
link_url=link_url,
severity=severity if severity in ("info", "warning") else "info",
pinned=pinned,
active=active,
publish_from=publish_from,
publish_until=publish_until,
created_by_user_id=getattr(current_user, "id", None),
)
db.session.add(item)
try:
db.session.commit()
flash("News item created.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to create news item: {exc}")
flash("Failed to create news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/update", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_update(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
title = (request.form.get("title") or "").strip()
body = (request.form.get("body") or "").strip()
link_url = (request.form.get("link_url") or "").strip() or None
severity = (request.form.get("severity") or "info").strip().lower() or "info"
pinned = bool(request.form.get("pinned"))
active = bool(request.form.get("active"))
publish_from_str = (request.form.get("publish_from") or "").strip()
publish_until_str = (request.form.get("publish_until") or "").strip()
publish_from = None
publish_until = None
try:
if publish_from_str:
publish_from = datetime.strptime(publish_from_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_from = None
try:
if publish_until_str:
publish_until = datetime.strptime(publish_until_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_until = None
if not title or not body:
flash("Title and body are required.", "danger")
return redirect(url_for("main.settings", section="news"))
item.title = title
item.body = body
item.link_url = link_url
item.severity = severity if severity in ("info", "warning") else "info"
item.pinned = pinned
item.active = active
item.publish_from = publish_from
item.publish_until = publish_until
try:
db.session.commit()
flash("News item updated.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to update news item: {exc}")
flash("Failed to update news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/delete", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_delete(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
try:
db.session.delete(item)
db.session.commit()
flash("News item deleted.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to delete news item: {exc}")
flash("Failed to delete news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/reset_reads", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_reset_reads(news_id: int):
try:
NewsRead.query.filter_by(news_item_id=news_id).delete(synchronize_session=False)
db.session.commit()
flash("Read status reset for this news item.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to reset news reads: {exc}")
flash("Failed to reset read status.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/reads")
@login_required
@roles_required("admin")
def settings_news_reads(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
reads = []
try:
reads = (
db.session.query(NewsRead, User)
.join(User, User.id == NewsRead.user_id)
.filter(NewsRead.news_item_id == news_id)
.order_by(NewsRead.read_at.desc())
.all()
)
except Exception:
reads = []
return render_template("main/settings_news_reads.html", item=item, reads=reads)
@main_bp.route("/settings/users/create", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_create():
username = (request.form.get("new_username") or "").strip()
roles = [r.strip() for r in request.form.getlist("new_roles") if (r or "").strip()]
# Backwards compatible storage: comma-separated roles in the existing "role" column
role = ",".join(dict.fromkeys(roles)) if roles else "viewer"
password = (request.form.get("new_password") or "").strip()
if not username:
flash("Username is required.", "danger")
return redirect(url_for("main.settings", section="general"))
existing = User.query.filter_by(username=username).first()
if existing:
flash("Username already exists.", "danger")
return redirect(url_for("main.settings", section="general"))
if not password:
flash("Password is required.", "danger")
return redirect(url_for("main.settings", section="general"))
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
try:
db.session.commit()
flash(f"User '{username}' has been created.", "success")
_log_admin_event("user_create", f"User '{username}' created with roles '{role}'.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to create user: {exc}")
flash("Failed to create user.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/app-reset", methods=["POST"])
@login_required
@roles_required("admin")
def settings_app_reset():
# Require explicit confirmation to avoid accidental resets
confirmation = (request.form.get("confirm_reset") or "").strip().upper()
if confirmation != "RESET":
flash("Application reset cancelled. Type RESET to confirm.", "warning")
return redirect(url_for("main.settings", section="general"))
# Reset all application data (including users). After this, the app will
# return to initial setup (create new admin).
try:
dialect_name = ""
try:
dialect_name = (db.engine.dialect.name or "").lower()
except Exception:
dialect_name = ""
if dialect_name.startswith("postgres"):
# Postgres: fast and resets identities; CASCADE handles FKs.
db.session.execute(
text(
"""TRUNCATE TABLE
ticket_job_runs,
remark_job_runs,
ticket_scopes,
remark_scopes,
tickets,
remarks,
mail_objects,
mail_messages,
job_objects,
job_runs,
jobs,
overrides,
customers,
admin_logs,
system_settings,
users
RESTART IDENTITY CASCADE"""
)
)
db.session.commit()
else:
# Fallback (e.g. SQLite): delete in FK-safe order.
for model in (
TicketJobRun,
RemarkJobRun,
TicketScope,
RemarkScope,
Ticket,
Remark,
MailObject,
MailMessage,
JobObject,
JobRun,
Job,
Override,
Customer,
AdminLog,
SystemSettings,
User,
):
db.session.query(model).delete()
db.session.commit()
try:
logout_user()
except Exception:
pass
flash("Application has been reset. Please create a new admin user.", "success")
return redirect(url_for("auth.initial_setup"))
except Exception as exc:
try:
db.session.rollback()
except Exception:
pass
flash(f"Reset failed: {exc}", "danger")
return redirect(url_for("main.settings", section="general"))
@main_bp.route("/settings/users/<int:user_id>/reset-password", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_reset_password(user_id: int):
user = User.query.get_or_404(user_id)
new_password = (request.form.get("reset_password") or "").strip()
if not new_password:
flash("New password is required.", "danger")
return redirect(url_for("main.settings", section="general"))
user.set_password(new_password)
try:
db.session.commit()
flash(f"Password for '{user.username}' has been reset.", "success")
_log_admin_event("user_reset_password", f"Password reset for user '{user.username}'.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to reset password: {exc}")
flash("Failed to reset password.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/users/<int:user_id>/roles", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_update_roles(user_id: int):
user = User.query.get_or_404(user_id)
roles = [r.strip() for r in request.form.getlist("roles") if (r or "").strip()]
roles = list(dict.fromkeys(roles))
if not roles:
roles = ["viewer"]
# Prevent removing the last remaining admin role
removing_admin = ("admin" in user.roles) and ("admin" not in roles)
if removing_admin:
try:
all_users = User.query.all()
admin_count = sum(1 for u in all_users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_count = 0
if admin_count <= 1:
flash("Cannot remove admin role from the last admin account.", "danger")
return redirect(url_for("main.settings", section="users"))
old_roles = ",".join(user.roles)
new_roles = ",".join(roles)
user.role = new_roles
try:
db.session.commit()
flash(f"Roles for '{user.username}' have been updated.", "success")
_log_admin_event("user_update_roles", f"User '{user.username}' roles changed from '{old_roles}' to '{new_roles}'.")
# If the updated user is currently logged in, make sure the active role stays valid.
try:
if getattr(current_user, "id", None) == user.id:
current_user.set_active_role(user.roles[0])
except Exception:
pass
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to update roles: {exc}")
flash("Failed to update roles.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/users/<int:user_id>/delete", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_delete(user_id: int):
user = User.query.get_or_404(user_id)
# Prevent deleting the last admin user
if "admin" in user.roles:
try:
all_users = User.query.all()
admin_count = sum(1 for u in all_users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_count = 0
if admin_count <= 1:
flash("Cannot delete the last admin account.", "danger")
return redirect(url_for("main.settings", section="general"))
username = user.username
try:
db.session.delete(user)
db.session.commit()
flash(f"User '{username}' has been deleted.", "success")
_log_admin_event("user_delete", f"User '{username}' deleted.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to delete user: {exc}")
flash("Failed to delete user.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/mail-import", methods=["POST"])
@login_required
@roles_required("admin")
def settings_mail_import():
settings = _get_or_create_settings()
# Prevent manual import when free disk is below 2 GB
free_disk_bytes = _get_free_disk_bytes()
if free_disk_bytes is not None:
two_gb = 2 * 1024 * 1024 * 1024
if free_disk_bytes < two_gb:
flash("Manual mail import is blocked because free disk space is below 2 GB.", "danger")
_log_admin_event("mail_import_manual_blocked", "Manual mail import blocked: free disk space below 2 GB.")
return redirect(url_for("main.settings", section="general"))
# Determine batch size (max 50)
try:
batch_size = int(request.form.get("manual_import_items") or settings.manual_import_batch_size or 50)
except (ValueError, TypeError):
batch_size = settings.manual_import_batch_size or 50
if batch_size <= 0:
batch_size = 1
if batch_size > 50:
batch_size = 50
auto_approved_runs = []
try:
total_fetched, new_messages, auto_approved, auto_approved_runs, errors = run_manual_import(
settings, batch_size
)
except MailImportError as exc:
msg = f"Manual mail import failed: {exc}"
_log_admin_event("mail_import_manual_error", msg)
flash(str(exc), "danger")
return redirect(url_for("main.settings", section="general"))
except Exception as exc:
msg = f"Unexpected error during manual mail import: {exc}"
_log_admin_event("mail_import_manual_error", msg)
flash("Unexpected error during manual mail import. See logs for details.", "danger")
return redirect(url_for("main.settings", section="general"))
msg = f"Manual mail import finished. fetched={total_fetched}, new={new_messages}, auto_approved={auto_approved}, errors={len(errors)}"
_log_admin_event("mail_import_manual", msg)
# Persist objects for auto-approved runs (must not block the request)
if auto_approved_runs:
persisted_objects = 0
persisted_errors = 0
for (customer_id, job_id, run_id, mail_message_id) in auto_approved_runs:
try:
persisted_objects += persist_objects_for_auto_run(
int(customer_id), int(job_id), int(run_id), int(mail_message_id)
)
except Exception as exc:
persisted_errors += 1
_log_admin_event(
"object_persist_error",
f"Object persistence failed for auto-approved message {mail_message_id} (job {job_id}, run {run_id}): {exc}",
)
_log_admin_event(
"object_persist_auto_approve",
f"Persisted objects for auto-approved runs (manual import). runs={len(auto_approved_runs)}, objects={persisted_objects}, errors={persisted_errors}",
)
if errors:
flash("Manual mail import finished with errors.", "warning")
else:
flash("Manual mail import finished.", "success")
flash(f"Fetched: {total_fetched}, new: {new_messages}, auto-approved: {auto_approved}.", "info")
for err in errors[:5]:
flash(f"Import error: {err}", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings/folders")
@login_required
@roles_required("admin")
def settings_folders():
settings = _get_or_create_settings()
mailbox = (settings.graph_mailbox or "").strip()
if not mailbox:
return jsonify(
{"status": "error", "message": "Microsoft Graph mailbox is not configured."}
), 400
try:
# Reuse the same token flow used by the mail importer.
from ..mail_importer import _get_access_token, _build_auth_headers, GRAPH_BASE_URL
access_token = _get_access_token(settings)
headers = _build_auth_headers(access_token)
def _graph_get_all(url: str):
items = []
next_url = url
# Safety limit to avoid infinite loops if Graph behaves unexpectedly.
safety_pages = 0
while next_url and safety_pages < 50:
safety_pages += 1
resp = requests.get(next_url, headers=headers, timeout=20)
if resp.status_code != 200:
try:
payload = resp.json()
except Exception:
payload = {}
msg = payload.get("error", {}).get("message") or f"HTTP {resp.status_code}"
raise RuntimeError(msg)
payload = resp.json() or {}
items.extend(payload.get("value", []) or [])
next_url = payload.get("@odata.nextLink")
return items
def _build_tree(parent_id: str | None, parent_path: str):
if parent_id is None:
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders?$top=100"
else:
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders?$top=100"
folders = _graph_get_all(url)
nodes = []
for f in folders:
fid = f.get("id")
name = (f.get("displayName") or "").strip()
if not fid or not name:
continue
path = name if not parent_path else f"{parent_path}/{name}"
node = {
"displayName": name,
"id": fid,
"path": path,
"children": [],
}
# Recursively load children (bounded by Graph and typical folder depth).
try:
node["children"] = _build_tree(fid, path)
except Exception:
# If child loading fails for a specific folder, keep it as a leaf.
node["children"] = []
nodes.append(node)
# Stable order for UI
nodes.sort(key=lambda n: (n.get("displayName") or "").lower())
return nodes
folders_tree = _build_tree(None, "")
return jsonify({"status": "ok", "folders": folders_tree})
except Exception as exc:
try:
current_app.logger.exception("Failed to load mailbox folders from Microsoft Graph")
except Exception:
pass
return jsonify({"status": "error", "message": str(exc) or "Failed to load folders."}), 500
@main_bp.route("/settings/autotask/test-connection", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_test_connection():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
zone = client.get_zone_info()
# Lightweight authenticated calls to validate credentials and basic API access
_ = client.get_queues()
_ = client.get_ticket_sources()
flash(f"Autotask connection OK. Zone: {zone.zone_name or 'unknown'}.", "success")
_log_admin_event(
"autotask_test_connection",
"Autotask test connection succeeded.",
details=json.dumps({"zone": zone.zone_name, "api_url": zone.api_url}),
)
except Exception as exc:
flash(f"Autotask connection failed: {exc}", "danger")
_log_admin_event(
"autotask_test_connection_failed",
"Autotask test connection failed.",
details=json.dumps({"error": str(exc)}),
)
return redirect(url_for("main.settings", section="integrations"))
def _refresh_autotask_reference_data(settings):
"""Refresh and persist Autotask reference data used for ticket default dropdowns."""
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
queues = client.get_queues()
sources = client.get_ticket_sources()
priorities = client.get_ticket_priorities()
statuses = client.get_ticket_statuses()
# Store a minimal subset for dropdowns (id + name/label)
# Note: Some "reference" values are exposed as picklists (value/label)
# instead of entity collections (id/name). We normalize both shapes.
def _norm(items):
out = []
for it in items or []:
if not isinstance(it, dict):
continue
_id = it.get("id")
if _id is None:
_id = it.get("value")
name = (
it.get("name")
or it.get("label")
or it.get("queueName")
or it.get("sourceName")
or it.get("description")
or ""
)
try:
_id_int = int(_id)
except Exception:
continue
out.append({"id": _id_int, "name": str(name)})
# Sort by name for stable dropdowns
out.sort(key=lambda x: (x.get("name") or "").lower())
return out
settings.autotask_cached_queues_json = json.dumps(_norm(queues))
settings.autotask_cached_ticket_sources_json = json.dumps(_norm(sources))
settings.autotask_cached_ticket_statuses_json = json.dumps(_norm(statuses))
# Priorities are returned as picklist values (value/label)
pr_out = []
for it in priorities or []:
if not isinstance(it, dict):
continue
if it.get("isActive") is False:
continue
val = it.get("value")
label = it.get("label") or it.get("name") or ""
try:
val_int = int(val)
except Exception:
continue
pr_out.append({"id": val_int, "name": str(label)})
pr_out.sort(key=lambda x: (x.get("name") or "").lower())
settings.autotask_cached_priorities_json = json.dumps(pr_out)
settings.autotask_reference_last_sync_at = datetime.utcnow()
return queues, sources, statuses, pr_out
@main_bp.route("/settings/autotask/refresh-reference-data", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_refresh_reference_data():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
queues, sources, statuses, pr_out = _refresh_autotask_reference_data(settings)
db.session.commit()
flash(
f"Autotask reference data refreshed. Queues: {len(queues)}. Ticket Sources: {len(sources)}. Ticket Statuses: {len(statuses)}. Priorities: {len(pr_out)}.",
"success",
)
_log_admin_event(
"autotask_refresh_reference_data",
"Autotask reference data refreshed.",
details=json.dumps({"queues": len(queues or []), "ticket_sources": len(sources or []), "ticket_statuses": len(statuses or []), "priorities": len(pr_out)}),
)
except Exception as exc:
flash(f"Failed to refresh Autotask reference data: {exc}", "danger")
_log_admin_event(
"autotask_refresh_reference_data_failed",
"Autotask reference data refresh failed.",
details=json.dumps({"error": str(exc)}),
)
return redirect(url_for("main.settings", section="integrations"))