backupchecks/containers/backupchecks/src/backend/app/main/routes_settings.py
Ivo Oskamp 19ef9dc32a Update test email generator with fixed sets and separate buttons
Changed from configurable count input to three separate buttons for
success, warning, and error test emails. Each button generates exactly
3 emails with consistent data for reproducible testing.

Changes:
- Updated routes_settings.py to use fixed email sets instead of random data
- Changed route from /settings/test-emails/generate to /settings/test-emails/generate/<status_type>
- Created three predefined email sets (success, warning, error) with fixed content
- Updated settings.html UI to show three separate buttons instead of count input
- Each set contains 3 emails simulating Veeam, Synology, and NAKIVO backups
- Updated changelog with detailed description

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-09 14:44:19 +01:00

2012 lines
79 KiB
Python

from .routes_shared import * # noqa: F401,F403
from .routes_shared import _get_database_size_bytes, _get_or_create_settings, _format_bytes, _get_free_disk_bytes, _log_admin_event
import json
from datetime import datetime
@main_bp.route("/settings/jobs/delete-all", methods=["POST"])
@login_required
@roles_required("admin")
def settings_jobs_delete_all():
try:
jobs = Job.query.all()
if not jobs:
flash("No jobs to delete.", "info")
return redirect(url_for("main.settings", section="general"))
# Collect run ids for FK cleanup in auxiliary tables that may not have ON DELETE CASCADE
run_ids = []
mail_message_ids = []
for job in jobs:
for run in job.runs:
if run.id is not None:
run_ids.append(run.id)
if run.mail_message_id:
mail_message_ids.append(run.mail_message_id)
# Return related mails back to inbox and unlink from job
if mail_message_ids:
msgs = MailMessage.query.filter(MailMessage.id.in_(mail_message_ids)).all()
for msg in msgs:
if hasattr(msg, "location"):
msg.location = "inbox"
msg.job_id = None
def _safe_execute(stmt, params):
try:
db.session.execute(stmt, params)
except Exception as cleanup_exc:
# Best-effort cleanup for differing DB schemas
print(f"[settings-jobs] Cleanup skipped: {cleanup_exc}")
# Ensure run_object_links doesn't block job_runs deletion (older schemas may miss ON DELETE CASCADE)
if run_ids:
db.session.execute(
text("DELETE FROM run_object_links WHERE run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
# Ensure job_object_links doesn't block jobs deletion (older schemas may miss ON DELETE CASCADE)
job_ids = [j.id for j in jobs]
if job_ids:
db.session.execute(
text("DELETE FROM job_object_links WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Clean up auxiliary FK tables that may reference job_runs/jobs without ON DELETE CASCADE (older schemas)
if run_ids:
_safe_execute(
text("DELETE FROM remark_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
_safe_execute(
text("DELETE FROM ticket_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
# Some schemas use remark_scopes for per-run remarks
_safe_execute(
text("DELETE FROM remark_scopes WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
if job_ids:
# ticket_scopes.job_id is a FK without ON DELETE CASCADE in some schemas
_safe_execute(
text("DELETE FROM ticket_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Some schemas use remark_scopes for per-job remarks
_safe_execute(
text("DELETE FROM remark_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Overrides may reference jobs directly
_safe_execute(
text("DELETE FROM overrides WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Delete all jobs (runs/objects are cascaded via ORM relationships)
for job in jobs:
db.session.delete(job)
db.session.commit()
flash("All jobs deleted. Related mails are returned to the inbox.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings-jobs] Failed to delete all jobs: {exc}")
flash("Failed to delete all jobs.", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings/jobs/orphaned", methods=["GET"])
@login_required
@roles_required("admin")
def settings_jobs_orphaned():
"""Show list of orphaned jobs for verification before deletion."""
# Find jobs without valid customer
orphaned_jobs = Job.query.outerjoin(Customer, Job.customer_id == Customer.id).filter(
db.or_(
Job.customer_id.is_(None),
Customer.id.is_(None)
)
).order_by(Job.job_name.asc()).all()
# Build list with details
jobs_list = []
for job in orphaned_jobs:
run_count = JobRun.query.filter_by(job_id=job.id).count()
mail_count = JobRun.query.filter_by(job_id=job.id).filter(JobRun.mail_message_id.isnot(None)).count()
jobs_list.append({
"id": job.id,
"job_name": job.job_name or "Unnamed",
"backup_software": job.backup_software or "-",
"backup_type": job.backup_type or "-",
"customer_id": job.customer_id,
"run_count": run_count,
"mail_count": mail_count,
})
return render_template(
"main/settings_orphaned_jobs.html",
orphaned_jobs=jobs_list,
)
@main_bp.route("/settings/jobs/delete-orphaned", methods=["POST"])
@login_required
@roles_required("admin")
def settings_jobs_delete_orphaned():
"""Delete jobs that have no customer (customer_id is NULL or customer does not exist).
Also deletes all related emails from the database since the customer is gone.
"""
try:
# Find jobs without valid customer
orphaned_jobs = Job.query.outerjoin(Customer, Job.customer_id == Customer.id).filter(
db.or_(
Job.customer_id.is_(None),
Customer.id.is_(None)
)
).all()
if not orphaned_jobs:
flash("No orphaned jobs found.", "info")
return redirect(url_for("main.settings", section="maintenance"))
job_count = len(orphaned_jobs)
mail_count = 0
run_count = 0
# Collect mail message ids and run ids for cleanup
mail_message_ids = []
run_ids = []
job_ids = [job.id for job in orphaned_jobs]
for job in orphaned_jobs:
for run in job.runs:
if run.id is not None:
run_ids.append(run.id)
run_count += 1
if run.mail_message_id:
mail_message_ids.append(run.mail_message_id)
# Helper function for safe SQL execution
def _safe_execute(stmt, params):
try:
db.session.execute(stmt, params)
except Exception:
pass
# Clean up auxiliary tables that may not have ON DELETE CASCADE
if run_ids:
from sqlalchemy import text, bindparam
_safe_execute(
text("DELETE FROM ticket_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
_safe_execute(
text("DELETE FROM remark_job_runs WHERE job_run_id IN :run_ids").bindparams(
bindparam("run_ids", expanding=True)
),
{"run_ids": run_ids},
)
if job_ids:
from sqlalchemy import text, bindparam
# Clean up scopes
_safe_execute(
text("DELETE FROM ticket_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
_safe_execute(
text("DELETE FROM remark_scopes WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Clean up overrides
_safe_execute(
text("DELETE FROM overrides WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Unlink mails from jobs before deleting jobs
# mail_messages.job_id references jobs.id
_safe_execute(
text("UPDATE mail_messages SET job_id = NULL WHERE job_id IN :job_ids").bindparams(
bindparam("job_ids", expanding=True)
),
{"job_ids": job_ids},
)
# Delete mail_objects before deleting mails
# mail_objects.mail_message_id references mail_messages.id
if mail_message_ids:
from sqlalchemy import text, bindparam
_safe_execute(
text("DELETE FROM mail_objects WHERE mail_message_id IN :mail_ids").bindparams(
bindparam("mail_ids", expanding=True)
),
{"mail_ids": mail_message_ids},
)
# Delete all orphaned jobs (runs/objects are cascaded via ORM relationships)
for job in orphaned_jobs:
db.session.delete(job)
# Now delete related mails permanently (customer is gone)
# This must happen AFTER deleting jobs/runs to avoid foreign key constraint violations
if mail_message_ids:
mail_count = len(mail_message_ids)
MailMessage.query.filter(MailMessage.id.in_(mail_message_ids)).delete(synchronize_session=False)
db.session.commit()
flash(
f"Deleted {job_count} orphaned job(s), {run_count} run(s), and {mail_count} email(s).",
"success"
)
_log_admin_event(
event_type="maintenance_delete_orphaned_jobs",
message=f"Deleted {job_count} orphaned jobs, {run_count} runs, and {mail_count} emails",
details=json.dumps({
"jobs_deleted": job_count,
"runs_deleted": run_count,
"mails_deleted": mail_count,
}),
)
except Exception as exc:
db.session.rollback()
print(f"[settings-jobs] Failed to delete orphaned jobs: {exc}")
flash("Failed to delete orphaned jobs.", "danger")
return redirect(url_for("main.settings", section="maintenance"))
@main_bp.route("/settings/test-emails/generate/<status_type>", methods=["POST"])
@login_required
@roles_required("admin")
def settings_generate_test_emails(status_type):
"""Generate test emails in inbox for testing parsers and orphaned jobs cleanup.
Fixed sets for consistent testing and reproducibility.
"""
try:
from datetime import datetime, timedelta
# Fixed test email sets per status type
email_sets = {
"success": [
{
"from_address": "veeam@test.local",
"subject": "Backup job 'Daily VM Backup' completed successfully",
"body": """Job name: Daily VM Backup
Status: Success
Start time: 2026-02-09 01:00:00
End time: 2026-02-09 02:15:00
Total size: 150 GB
Objects processed: 25
All backup operations completed without issues.""",
},
{
"from_address": "synology@test.local",
"subject": "[Synology] Backup Task SQL Database Backup completed successfully",
"body": """Dear Administrator,
Backup task 'SQL Database Backup' has completed successfully.
Task: SQL Database Backup
Status: Success
Start: 2026-02-09 02:00:00
Finish: 2026-02-09 02:45:00
Data transferred: 75 GB
All backup operations completed without issues.""",
},
{
"from_address": "nakivo@test.local",
"subject": "Job 'Exchange Mailbox' finished successfully",
"body": """NAKIVO Backup & Replication
Job: Exchange Mailbox
Status: Success
Started: 2026-02-09 03:00:00
Completed: 2026-02-09 03:30:00
Size: 50 GB
All backup operations completed without issues.""",
},
],
"warning": [
{
"from_address": "veeam@test.local",
"subject": "Backup job 'Weekly File Server' completed with warnings",
"body": """Job name: Weekly File Server
Status: Warning
Start time: 2026-02-09 01:00:00
End time: 2026-02-09 02:30:00
Total size: 200 GB
Objects processed: 35
Backup completed but some files were skipped.""",
},
{
"from_address": "synology@test.local",
"subject": "[Synology] Backup Task Critical Servers completed with warnings",
"body": """Dear Administrator,
Backup task 'Critical Servers' has completed with warnings.
Task: Critical Servers
Status: Warning
Start: 2026-02-09 02:00:00
Finish: 2026-02-09 03:00:00
Data transferred: 300 GB
Backup completed but some files were skipped.""",
},
{
"from_address": "nakivo@test.local",
"subject": "Job 'Production Backup' finished with warnings",
"body": """NAKIVO Backup & Replication
Job: Production Backup
Status: Warning
Started: 2026-02-09 03:00:00
Completed: 2026-02-09 04:00:00
Size: 250 GB
Some backup objects were skipped.""",
},
],
"error": [
{
"from_address": "veeam@test.local",
"subject": "Backup job 'Development Environment' failed",
"body": """Job name: Development Environment
Status: Failed
Start time: 2026-02-09 01:00:00
End time: 2026-02-09 01:15:00
Total size: 0 GB
Objects processed: 0
Backup failed. Please check the logs for details.""",
},
{
"from_address": "synology@test.local",
"subject": "[Synology] Backup Task Archive Job failed",
"body": """Dear Administrator,
Backup task 'Archive Job' has failed.
Task: Archive Job
Status: Failed
Start: 2026-02-09 02:00:00
Finish: 2026-02-09 02:05:00
Data transferred: 0 GB
Backup failed. Please check the logs for details.""",
},
{
"from_address": "nakivo@test.local",
"subject": "Job 'Critical Servers' finished with errors",
"body": """NAKIVO Backup & Replication
Job: Critical Servers
Status: Failed
Started: 2026-02-09 03:00:00
Completed: 2026-02-09 03:10:00
Size: 0 GB
Some backup objects failed to process.""",
},
],
}
if status_type not in email_sets:
flash("Invalid status type.", "danger")
return redirect(url_for("main.settings", section="maintenance"))
emails = email_sets[status_type]
created_count = 0
now = datetime.utcnow()
for email_data in emails:
mail = MailMessage(
from_address=email_data["from_address"],
subject=email_data["subject"],
text_body=email_data["body"],
html_body=f"<pre>{email_data['body']}</pre>",
received_at=now - timedelta(hours=created_count),
location="inbox",
job_id=None,
)
db.session.add(mail)
created_count += 1
db.session.commit()
flash(f"Generated {created_count} {status_type} test email(s) in inbox.", "success")
_log_admin_event(
event_type="maintenance_generate_test_emails",
message=f"Generated {created_count} {status_type} test emails",
details=json.dumps({"status_type": status_type, "count": created_count}),
)
except Exception as exc:
db.session.rollback()
print(f"[settings-test] Failed to generate test emails: {exc}")
flash("Failed to generate test emails.", "danger")
return redirect(url_for("main.settings", section="maintenance"))
@main_bp.route("/settings/objects/backfill", methods=["POST"])
@login_required
@roles_required("admin")
def settings_objects_backfill():
"""Backfill object persistence tables for existing approved runs.
This repairs cases where mail_objects exist but run_object_links/job_object_links/customer_objects were not created.
"""
engine = db.get_engine()
# Select runs that have mail_objects but no run_object_links yet
rows = []
try:
with engine.begin() as conn:
rows = conn.execute(
text(
"""
SELECT
jr.id AS run_id,
jr.job_id AS job_id,
j.customer_id AS customer_id,
jr.mail_message_id AS mail_message_id
FROM job_runs jr
JOIN jobs j ON j.id = jr.job_id
WHERE jr.mail_message_id IS NOT NULL
AND EXISTS (
SELECT 1 FROM mail_objects mo WHERE mo.mail_message_id = jr.mail_message_id
)
AND NOT EXISTS (
SELECT 1 FROM run_object_links rol WHERE rol.run_id = jr.id
)
ORDER BY jr.id DESC
"""
)
).fetchall()
except Exception as exc:
flash("Backfill failed while selecting runs.", "danger")
_log_admin_event("object_backfill_error", f"Backfill select failed: {exc}")
return redirect(url_for("main.settings", section="general"))
total = len(rows)
repaired_runs = 0
repaired_objects = 0
errors = 0
for r in rows:
try:
repaired_objects += persist_objects_for_auto_run(
int(r[2]), int(r[1]), int(r[0]), int(r[3])
)
repaired_runs += 1
except Exception as exc:
errors += 1
_log_admin_event(
"object_backfill_run_error",
f"Backfill failed for run {r[0]} (job {r[1]}, message {r[3]}): {exc}",
)
_log_admin_event(
"object_backfill",
f"Backfill finished. candidates={total}, repaired_runs={repaired_runs}, objects={repaired_objects}, errors={errors}",
)
if total == 0:
flash("No runs needed backfill.", "info")
else:
if errors == 0:
flash(f"Backfill complete. Repaired {repaired_runs} runs.", "success")
else:
flash(
f"Backfill complete with errors. Repaired {repaired_runs} runs, errors: {errors}.",
"warning",
)
return redirect(url_for("main.settings"))
@main_bp.route("/settings/jobs/export", methods=["GET"])
@login_required
@roles_required("admin")
def settings_jobs_export():
try:
jobs = Job.query.all()
payload = {
"schema": "approved_jobs_export_v1",
"exported_at": datetime.utcnow().isoformat() + "Z",
"counts": {"customers": 0, "jobs": 0},
"customers": [],
"jobs": [],
}
# Collect customers referenced by jobs (and ensure stable name mapping)
customer_by_id = {}
for job in jobs:
if job.customer_id and job.customer and job.customer.name:
customer_by_id[job.customer_id] = job.customer
payload["customers"] = [
{
"name": customer.name,
"autotask_company_id": customer.autotask_company_id,
"autotask_company_name": customer.autotask_company_name
}
for _, customer in sorted(customer_by_id.items(), key=lambda x: x[1].name.lower())
]
for job in jobs:
customer = customer_by_id.get(job.customer_id)
payload["jobs"].append(
{
"customer_name": customer.name if customer else None,
"from_address": getattr(job, "from_address", None),
"backup_software": job.backup_software,
"backup_type": job.backup_type,
"job_name": job.job_name,
"schedule_type": job.schedule_type,
"schedule_days_of_week": job.schedule_days_of_week,
"schedule_day_of_month": job.schedule_day_of_month,
"schedule_times": job.schedule_times,
"auto_approve": bool(job.auto_approve),
"active": bool(job.active),
}
)
payload["counts"]["customers"] = len(payload["customers"])
payload["counts"]["jobs"] = len(payload["jobs"])
# Audit logging
_log_admin_event(
"export_jobs",
f"Exported jobs configuration",
details=json.dumps({
"format": "JSON",
"schema": "approved_jobs_export_v1",
"customers_count": len(payload["customers"]),
"jobs_count": len(payload["jobs"])
}, indent=2)
)
filename = f"approved-jobs-export-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}.json"
blob = json.dumps(payload, indent=2, ensure_ascii=False).encode("utf-8")
return send_file(
io.BytesIO(blob),
mimetype="application/json",
as_attachment=True,
download_name=filename,
)
except Exception as exc:
print(f"[settings-jobs] Export failed: {exc}")
flash("Export failed.", "danger")
return redirect(url_for("main.settings", section="general"))
@main_bp.route("/settings/jobs/import", methods=["POST"])
@login_required
@roles_required("admin")
def settings_jobs_import():
upload = request.files.get("jobs_file")
if not upload or not upload.filename:
flash("No import file was provided.", "danger")
return redirect(url_for("main.settings", section="general"))
try:
raw = upload.read()
payload = json.loads(raw.decode("utf-8"))
except Exception:
flash("Invalid JSON file.", "danger")
return redirect(url_for("main.settings", section="general"))
if not isinstance(payload, dict) or payload.get("schema") != "approved_jobs_export_v1":
flash("Unsupported import file schema.", "danger")
return redirect(url_for("main.settings", section="general"))
jobs = payload.get("jobs") or []
if not isinstance(jobs, list):
flash("Invalid import file format (jobs).", "danger")
return redirect(url_for("main.settings", section="general"))
created_customers = 0
updated_customers = 0
created_jobs = 0
updated_jobs = 0
try:
# First, process customers from the payload (if present)
customers_data = payload.get("customers") or []
if isinstance(customers_data, list):
for cust_item in customers_data:
if not isinstance(cust_item, dict):
continue
cust_name = (cust_item.get("name") or "").strip()
if not cust_name:
continue
# Read Autotask fields (backwards compatible - optional)
autotask_company_id = cust_item.get("autotask_company_id")
autotask_company_name = cust_item.get("autotask_company_name")
existing_customer = Customer.query.filter_by(name=cust_name).first()
if existing_customer:
# Update Autotask mapping if provided
if autotask_company_id is not None:
existing_customer.autotask_company_id = autotask_company_id
existing_customer.autotask_company_name = autotask_company_name
existing_customer.autotask_mapping_status = None # Will be resynced
existing_customer.autotask_last_sync_at = None
updated_customers += 1
else:
new_customer = Customer(
name=cust_name,
active=True,
autotask_company_id=autotask_company_id,
autotask_company_name=autotask_company_name
)
db.session.add(new_customer)
created_customers += 1
db.session.flush()
# Now process jobs (customers should already exist from above, or will be created on-the-fly)
for item in jobs:
if not isinstance(item, dict):
continue
customer_name = (item.get("customer_name") or "").strip()
if not customer_name:
# jobs without customer are allowed, but we cannot map them meaningfully
customer = None
else:
customer = Customer.query.filter_by(name=customer_name).first()
if not customer:
customer = Customer(name=customer_name, active=True)
db.session.add(customer)
db.session.flush()
created_customers += 1
backup_software = (item.get("backup_software") or "").strip() or None
backup_type = (item.get("backup_type") or "").strip() or None
from_address = normalize_from_address(item.get("from_address"))
job_name = (item.get("job_name") or "").strip() or None
# Match existing job using the same key we show in the UI
existing = None
q = Job.query
if customer and customer.id:
q = q.filter(Job.customer_id == customer.id)
else:
q = q.filter(Job.customer_id.is_(None))
if from_address is None:
q = q.filter(Job.from_address.is_(None))
else:
q = q.filter(func.lower(Job.from_address) == from_address)
q = q.filter(Job.backup_software == backup_software)
q = q.filter(Job.backup_type == backup_type)
q = q.filter(Job.job_name == job_name)
existing = q.first()
def _bool(val, default=False):
if val is None:
return default
if isinstance(val, bool):
return val
if isinstance(val, (int, float)):
return bool(val)
if isinstance(val, str):
v = val.strip().lower()
if v in ("1", "true", "yes", "y", "on"):
return True
if v in ("0", "false", "no", "n", "off"):
return False
return default
schedule_type = (item.get("schedule_type") or "").strip() or None
schedule_days_of_week = (item.get("schedule_days_of_week") or "").strip() or None
schedule_times = (item.get("schedule_times") or "").strip() or None
schedule_day_of_month = item.get("schedule_day_of_month")
if schedule_day_of_month in ("", None):
schedule_day_of_month = None
else:
try:
schedule_day_of_month = int(schedule_day_of_month)
except Exception:
schedule_day_of_month = None
auto_approve = _bool(item.get("auto_approve"), default=True)
active = _bool(item.get("active"), default=True)
if existing:
if hasattr(existing, "from_address"):
existing.from_address = from_address
existing.schedule_type = schedule_type
existing.schedule_days_of_week = schedule_days_of_week
existing.schedule_day_of_month = schedule_day_of_month
existing.schedule_times = schedule_times
existing.auto_approve = auto_approve
existing.active = active
updated_jobs += 1
else:
job_kwargs = {
"customer_id": (customer.id if customer else None),
"backup_software": backup_software,
"backup_type": backup_type,
"job_name": job_name,
"schedule_type": schedule_type,
"schedule_days_of_week": schedule_days_of_week,
"schedule_day_of_month": schedule_day_of_month,
"schedule_times": schedule_times,
"auto_approve": auto_approve,
"active": active,
}
# Include from_address in the persisted job key so re-parse matching works after import
if hasattr(Job, "from_address"):
job_kwargs["from_address"] = from_address
new_job = Job(**job_kwargs)
db.session.add(new_job)
created_jobs += 1
db.session.commit()
flash(
f"Import completed. Customers created: {created_customers}, updated: {updated_customers}. Jobs created: {created_jobs}, updated: {updated_jobs}.",
"success",
)
# Audit logging
_log_admin_event(
"import_jobs",
"Imported jobs configuration",
details=json.dumps({
"format": "JSON",
"schema": payload.get("schema"),
"customers_created": created_customers,
"customers_updated": updated_customers,
"jobs_created": created_jobs,
"jobs_updated": updated_jobs
}, indent=2)
)
except Exception as exc:
db.session.rollback()
print(f"[settings-jobs] Import failed: {exc}")
flash("Import failed.", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings", methods=["GET", "POST"])
@login_required
@roles_required("admin")
def settings():
settings = _get_or_create_settings()
section = (request.args.get("section") or "general").strip().lower() or "general"
if request.method == "POST":
autotask_form_touched = any(str(k).startswith("autotask_") for k in (request.form or {}).keys())
import_form_touched = any(str(k).startswith("auto_import_") or str(k).startswith("manual_import_") or str(k).startswith("ingest_eml_") for k in (request.form or {}).keys())
general_form_touched = "ui_timezone" in request.form
mail_form_touched = any(k in request.form for k in ["graph_tenant_id", "graph_client_id", "graph_mailbox", "incoming_folder", "processed_folder"])
# Track changes for audit logging
changes_general = {}
changes_mail = {}
changes_autotask = {}
# Capture old values before modifications
old_ui_timezone = settings.ui_timezone
old_require_daily_dashboard_visit = settings.require_daily_dashboard_visit
old_is_sandbox_environment = settings.is_sandbox_environment
old_graph_tenant_id = settings.graph_tenant_id
old_graph_client_id = settings.graph_client_id
old_graph_mailbox = settings.graph_mailbox
old_incoming_folder = settings.incoming_folder
old_processed_folder = settings.processed_folder
old_auto_import_enabled = settings.auto_import_enabled
old_auto_import_interval = settings.auto_import_interval_minutes
old_autotask_enabled = getattr(settings, "autotask_enabled", None)
old_autotask_environment = getattr(settings, "autotask_environment", None)
old_autotask_username = getattr(settings, "autotask_api_username", None)
old_autotask_tracking_identifier = getattr(settings, "autotask_tracking_identifier", None)
old_autotask_base_url = getattr(settings, "autotask_base_url", None)
# NOTE: The Settings UI has multiple tabs with separate forms.
# Only update values that are present in the submitted form, to avoid
# clearing unrelated settings when saving from another tab.
if "graph_tenant_id" in request.form:
settings.graph_tenant_id = (request.form.get("graph_tenant_id") or "").strip() or None
if "graph_client_id" in request.form:
settings.graph_client_id = (request.form.get("graph_client_id") or "").strip() or None
if "graph_mailbox" in request.form:
settings.graph_mailbox = (request.form.get("graph_mailbox") or "").strip() or None
if "graph_client_secret" in request.form:
client_secret = (request.form.get("graph_client_secret") or "").strip()
if client_secret:
settings.graph_client_secret = client_secret
if "incoming_folder" in request.form:
settings.incoming_folder = (request.form.get("incoming_folder") or "").strip() or None
if "processed_folder" in request.form:
settings.processed_folder = (request.form.get("processed_folder") or "").strip() or None
# UI display
if "ui_timezone" in request.form:
settings.ui_timezone = (request.form.get("ui_timezone") or "").strip() or "Europe/Amsterdam"
# Navigation setting is in the same form (General tab), so process it here.
# Checkbox: present in form = checked, absent = unchecked.
settings.require_daily_dashboard_visit = bool(request.form.get("require_daily_dashboard_visit"))
# Environment indicator is in the same form (General tab), so process it here.
# Checkbox: present in form = checked, absent = unchecked.
settings.is_sandbox_environment = bool(request.form.get("is_sandbox_environment"))
# Autotask integration
if "autotask_enabled" in request.form:
settings.autotask_enabled = bool(request.form.get("autotask_enabled"))
if "autotask_environment" in request.form:
env_val = (request.form.get("autotask_environment") or "").strip().lower()
if env_val in ("sandbox", "production"):
settings.autotask_environment = env_val
else:
settings.autotask_environment = None
if "autotask_api_username" in request.form:
settings.autotask_api_username = (request.form.get("autotask_api_username") or "").strip() or None
if "autotask_api_password" in request.form:
pw = (request.form.get("autotask_api_password") or "").strip()
if pw:
settings.autotask_api_password = pw
if "autotask_tracking_identifier" in request.form:
settings.autotask_tracking_identifier = (request.form.get("autotask_tracking_identifier") or "").strip() or None
if "autotask_base_url" in request.form:
settings.autotask_base_url = (request.form.get("autotask_base_url") or "").strip() or None
if "autotask_default_queue_id" in request.form:
try:
settings.autotask_default_queue_id = int(request.form.get("autotask_default_queue_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_source_id" in request.form:
try:
settings.autotask_default_ticket_source_id = int(request.form.get("autotask_default_ticket_source_id") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_default_ticket_status" in request.form:
try:
form_value = request.form.get("autotask_default_ticket_status", "").strip()
if form_value: # Only update if a value was actually selected
settings.autotask_default_ticket_status = int(form_value)
elif form_value == "" and settings.autotask_default_ticket_status is not None:
# If explicitly cleared (empty string submitted) and was previously set,
# allow clearing only if reference data is loaded (dropdown has options)
if getattr(settings, "autotask_cached_ticket_statuses_json", None):
settings.autotask_default_ticket_status = None
# Otherwise: keep existing value (prevents accidental clearing when dropdown is empty)
except (ValueError, TypeError):
pass
if "autotask_priority_warning" in request.form:
try:
settings.autotask_priority_warning = int(request.form.get("autotask_priority_warning") or 0) or None
except (ValueError, TypeError):
pass
if "autotask_priority_error" in request.form:
try:
settings.autotask_priority_error = int(request.form.get("autotask_priority_error") or 0) or None
except (ValueError, TypeError):
pass
# Daily Jobs
if "daily_jobs_start_date" in request.form:
daily_jobs_start_date_str = (request.form.get("daily_jobs_start_date") or "").strip()
if daily_jobs_start_date_str:
try:
settings.daily_jobs_start_date = datetime.strptime(daily_jobs_start_date_str, "%Y-%m-%d").date()
except Exception:
settings.daily_jobs_start_date = None
else:
settings.daily_jobs_start_date = None
# Import configuration
# Checkbox: only update when any import field is present (form was submitted)
# Unchecked checkboxes are not sent by browsers, so check import_form_touched
if import_form_touched:
settings.auto_import_enabled = bool(request.form.get("auto_import_enabled"))
if "auto_import_interval_minutes" in request.form:
try:
settings.auto_import_interval_minutes = int(
request.form.get("auto_import_interval_minutes") or settings.auto_import_interval_minutes
)
except (ValueError, TypeError):
pass
if "auto_import_cutoff_date" in request.form:
auto_import_cutoff_date_str = (request.form.get("auto_import_cutoff_date") or "").strip()
if auto_import_cutoff_date_str:
try:
settings.auto_import_cutoff_date = datetime.strptime(auto_import_cutoff_date_str, "%Y-%m-%d").date()
except Exception:
settings.auto_import_cutoff_date = None
else:
settings.auto_import_cutoff_date = None
if (
"auto_import_enabled" in request.form
or "auto_import_interval_minutes" in request.form
or "auto_import_cutoff_date" in request.form
or "manual_import_batch_size" in request.form
or "ingest_eml_retention_days" in request.form
):
# Automatic importer batch size is fixed at 50
settings.auto_import_max_items = 50
if "ingest_eml_retention_days" in request.form:
try:
settings.ingest_eml_retention_days = int(
request.form.get("ingest_eml_retention_days") or getattr(settings, "ingest_eml_retention_days", 7)
)
except (ValueError, TypeError):
pass
if settings.ingest_eml_retention_days not in (0, 7, 14):
settings.ingest_eml_retention_days = 7
if "manual_import_batch_size" in request.form:
try:
settings.manual_import_batch_size = int(
request.form.get("manual_import_batch_size") or settings.manual_import_batch_size
)
except (ValueError, TypeError):
pass
# Manual import batch size is configurable but limited to 50
try:
settings.manual_import_batch_size = int(settings.manual_import_batch_size or 50)
except (ValueError, TypeError):
settings.manual_import_batch_size = 50
if settings.manual_import_batch_size < 1:
settings.manual_import_batch_size = 1
if settings.manual_import_batch_size > 50:
settings.manual_import_batch_size = 50
try:
db.session.commit()
flash("Settings have been saved.", "success")
# Audit logging: detect and log changes
if general_form_touched:
if old_ui_timezone != settings.ui_timezone:
changes_general["ui_timezone"] = {"old": old_ui_timezone, "new": settings.ui_timezone}
if old_require_daily_dashboard_visit != settings.require_daily_dashboard_visit:
changes_general["require_daily_dashboard_visit"] = {"old": old_require_daily_dashboard_visit, "new": settings.require_daily_dashboard_visit}
if old_is_sandbox_environment != settings.is_sandbox_environment:
changes_general["is_sandbox_environment"] = {"old": old_is_sandbox_environment, "new": settings.is_sandbox_environment}
if changes_general:
_log_admin_event(
"settings_general",
f"Updated {len(changes_general)} general setting(s)",
details=json.dumps(changes_general, indent=2)
)
if mail_form_touched or import_form_touched:
if old_graph_tenant_id != settings.graph_tenant_id:
changes_mail["graph_tenant_id"] = {"old": old_graph_tenant_id, "new": settings.graph_tenant_id}
if old_graph_client_id != settings.graph_client_id:
changes_mail["graph_client_id"] = {"old": old_graph_client_id, "new": settings.graph_client_id}
if old_graph_mailbox != settings.graph_mailbox:
changes_mail["graph_mailbox"] = {"old": old_graph_mailbox, "new": settings.graph_mailbox}
if old_incoming_folder != settings.incoming_folder:
changes_mail["incoming_folder"] = {"old": old_incoming_folder, "new": settings.incoming_folder}
if old_processed_folder != settings.processed_folder:
changes_mail["processed_folder"] = {"old": old_processed_folder, "new": settings.processed_folder}
if old_auto_import_enabled != settings.auto_import_enabled:
changes_mail["auto_import_enabled"] = {"old": old_auto_import_enabled, "new": settings.auto_import_enabled}
if old_auto_import_interval != settings.auto_import_interval_minutes:
changes_mail["auto_import_interval_minutes"] = {"old": old_auto_import_interval, "new": settings.auto_import_interval_minutes}
if changes_mail:
_log_admin_event(
"settings_mail",
f"Updated {len(changes_mail)} mail setting(s)",
details=json.dumps(changes_mail, indent=2)
)
if autotask_form_touched:
if old_autotask_enabled != getattr(settings, "autotask_enabled", None):
changes_autotask["autotask_enabled"] = {"old": old_autotask_enabled, "new": getattr(settings, "autotask_enabled", None)}
if old_autotask_environment != getattr(settings, "autotask_environment", None):
changes_autotask["autotask_environment"] = {"old": old_autotask_environment, "new": getattr(settings, "autotask_environment", None)}
if old_autotask_username != getattr(settings, "autotask_api_username", None):
changes_autotask["autotask_api_username"] = {"old": old_autotask_username, "new": getattr(settings, "autotask_api_username", None)}
if old_autotask_tracking_identifier != getattr(settings, "autotask_tracking_identifier", None):
changes_autotask["autotask_tracking_identifier"] = {"old": old_autotask_tracking_identifier, "new": getattr(settings, "autotask_tracking_identifier", None)}
if old_autotask_base_url != getattr(settings, "autotask_base_url", None):
changes_autotask["autotask_base_url"] = {"old": old_autotask_base_url, "new": getattr(settings, "autotask_base_url", None)}
# Note: Password is NOT logged for security
if changes_autotask:
_log_admin_event(
"settings_autotask",
f"Updated {len(changes_autotask)} Autotask setting(s)",
details=json.dumps(changes_autotask, indent=2)
)
# Autotask ticket defaults depend on reference data (queues, sources, statuses, priorities).
# When the Autotask integration is (re)configured, auto-refresh the cached reference data
# once so the dropdowns become usable immediately.
try:
if (
autotask_form_touched
and bool(getattr(settings, "autotask_enabled", False))
and bool(getattr(settings, "autotask_api_username", None))
and bool(getattr(settings, "autotask_api_password", None))
and bool(getattr(settings, "autotask_tracking_identifier", None))
):
missing_cache = (
not bool(getattr(settings, "autotask_cached_queues_json", None))
or not bool(getattr(settings, "autotask_cached_ticket_sources_json", None))
or not bool(getattr(settings, "autotask_cached_ticket_statuses_json", None))
or not bool(getattr(settings, "autotask_cached_priorities_json", None))
)
if missing_cache:
queues, sources, statuses, pr_out = _refresh_autotask_reference_data(settings)
db.session.commit()
flash(
f"Autotask reference data refreshed. Queues: {len(queues)}. Ticket Sources: {len(sources)}. Ticket Statuses: {len(statuses)}. Priorities: {len(pr_out)}.",
"success",
)
_log_admin_event(
"autotask_reference_data_auto_refreshed",
"Autotask reference data auto-refreshed after settings save.",
details=json.dumps({"queues": len(queues or []), "ticket_sources": len(sources or []), "ticket_statuses": len(statuses or []), "priorities": len(pr_out)}),
)
except Exception as exc:
try:
db.session.rollback()
except Exception:
pass
flash(f"Autotask reference data refresh failed: {exc}", "warning")
_log_admin_event(
"autotask_reference_data_auto_refresh_failed",
"Autotask reference data auto-refresh failed after settings save.",
details=json.dumps({"error": str(exc)}),
)
# If EML storage has been turned off, clear any stored blobs immediately.
try:
if getattr(settings, "ingest_eml_retention_days", 7) == 0:
MailMessage.query.filter(MailMessage.eml_blob.isnot(None)).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to clear stored EML blobs: {exc}")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to save settings: {exc}")
flash("Failed to save settings.", "danger")
return redirect(url_for("main.settings", section=section))
db_size_bytes = _get_database_size_bytes()
free_disk_bytes = _get_free_disk_bytes()
db_size_human = _format_bytes(db_size_bytes) if db_size_bytes is not None else "unknown"
free_disk_human = _format_bytes(free_disk_bytes) if free_disk_bytes is not None else "unknown"
free_disk_warning = False
if free_disk_bytes is not None:
two_gb = 2 * 1024 * 1024 * 1024
free_disk_warning = free_disk_bytes < two_gb
has_client_secret = bool(settings.graph_client_secret)
has_autotask_password = bool(getattr(settings, "autotask_api_password", None))
# Common UI timezones (IANA names)
tz_options = [
"Europe/Amsterdam",
"UTC",
"Europe/London",
"Europe/Paris",
"Europe/Berlin",
"Europe/Brussels",
"America/New_York",
"America/Chicago",
"America/Denver",
"America/Los_Angeles",
"Asia/Tokyo",
"Asia/Singapore",
]
# News admin data (only when requested)
news_admin_items = []
news_admin_stats = {}
total_users = 0
if section == "news":
try:
total_users = int(User.query.count() or 0)
except Exception:
total_users = 0
try:
news_admin_items = NewsItem.query.order_by(
NewsItem.pinned.desc(),
NewsItem.publish_from.desc().nullslast(),
NewsItem.created_at.desc(),
).all()
except Exception:
news_admin_items = []
try:
for item in news_admin_items:
read_count = int(NewsRead.query.filter_by(news_item_id=item.id).count() or 0)
news_admin_stats[item.id] = {
"read": read_count,
"unread": max(total_users - read_count, 0),
"total": total_users,
}
except Exception:
news_admin_stats = {}
users = User.query.order_by(User.username.asc()).all()
# Count users that have 'admin' among their assigned roles (comma-separated storage)
admin_users_count = 0
try:
admin_users_count = sum(1 for u in users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_users_count = 0
# Autotask cached reference data for dropdowns
autotask_queues = []
autotask_ticket_sources = []
autotask_priorities = []
autotask_ticket_statuses = []
autotask_last_sync_at = getattr(settings, "autotask_reference_last_sync_at", None)
# Auto-load reference data on page load if Autotask is enabled but cache is empty
try:
if (
bool(getattr(settings, "autotask_enabled", False))
and bool(getattr(settings, "autotask_api_username", None))
and bool(getattr(settings, "autotask_api_password", None))
and bool(getattr(settings, "autotask_tracking_identifier", None))
):
missing_cache = (
not getattr(settings, "autotask_cached_queues_json", None)
or not getattr(settings, "autotask_cached_ticket_sources_json", None)
or not getattr(settings, "autotask_cached_ticket_statuses_json", None)
or not getattr(settings, "autotask_cached_priorities_json", None)
)
if missing_cache:
queues_loaded, sources_loaded, statuses_loaded, priorities_loaded = _refresh_autotask_reference_data(settings)
db.session.commit()
flash(
f"Autotask reference data auto-loaded. Queues: {len(queues_loaded)}. Ticket Sources: {len(sources_loaded)}. Ticket Statuses: {len(statuses_loaded)}. Priorities: {len(priorities_loaded)}.",
"info",
)
except Exception as exc:
try:
db.session.rollback()
except Exception:
pass
flash(f"Failed to auto-load Autotask reference data: {exc}", "warning")
try:
if getattr(settings, "autotask_cached_queues_json", None):
autotask_queues = json.loads(settings.autotask_cached_queues_json) or []
except Exception:
autotask_queues = []
try:
if getattr(settings, "autotask_cached_ticket_sources_json", None):
autotask_ticket_sources = json.loads(settings.autotask_cached_ticket_sources_json) or []
except Exception:
autotask_ticket_sources = []
try:
if getattr(settings, "autotask_cached_priorities_json", None):
autotask_priorities = json.loads(settings.autotask_cached_priorities_json) or []
except Exception:
autotask_priorities = []
try:
if getattr(settings, "autotask_cached_ticket_statuses_json", None):
autotask_ticket_statuses = json.loads(settings.autotask_cached_ticket_statuses_json) or []
except Exception:
autotask_ticket_statuses = []
return render_template(
"main/settings.html",
settings=settings,
db_size_human=db_size_human,
free_disk_human=free_disk_human,
free_disk_warning=free_disk_warning,
has_client_secret=has_client_secret,
has_autotask_password=has_autotask_password,
tz_options=tz_options,
users=users,
admin_users_count=admin_users_count,
section=section,
autotask_queues=autotask_queues,
autotask_ticket_sources=autotask_ticket_sources,
autotask_priorities=autotask_priorities,
autotask_ticket_statuses=autotask_ticket_statuses,
autotask_last_sync_at=autotask_last_sync_at,
news_admin_items=news_admin_items,
news_admin_stats=news_admin_stats,
)
@main_bp.route("/settings/news/create", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_create():
title = (request.form.get("title") or "").strip()
body = (request.form.get("body") or "").strip()
link_url = (request.form.get("link_url") or "").strip() or None
severity = (request.form.get("severity") or "info").strip().lower() or "info"
pinned = bool(request.form.get("pinned"))
active = bool(request.form.get("active"))
publish_from_str = (request.form.get("publish_from") or "").strip()
publish_until_str = (request.form.get("publish_until") or "").strip()
publish_from = None
publish_until = None
try:
if publish_from_str:
publish_from = datetime.strptime(publish_from_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_from = None
try:
if publish_until_str:
publish_until = datetime.strptime(publish_until_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_until = None
if not title or not body:
flash("Title and body are required.", "danger")
return redirect(url_for("main.settings", section="news"))
item = NewsItem(
title=title,
body=body,
link_url=link_url,
severity=severity if severity in ("info", "warning") else "info",
pinned=pinned,
active=active,
publish_from=publish_from,
publish_until=publish_until,
created_by_user_id=getattr(current_user, "id", None),
)
db.session.add(item)
try:
db.session.commit()
flash("News item created.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to create news item: {exc}")
flash("Failed to create news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/update", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_update(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
title = (request.form.get("title") or "").strip()
body = (request.form.get("body") or "").strip()
link_url = (request.form.get("link_url") or "").strip() or None
severity = (request.form.get("severity") or "info").strip().lower() or "info"
pinned = bool(request.form.get("pinned"))
active = bool(request.form.get("active"))
publish_from_str = (request.form.get("publish_from") or "").strip()
publish_until_str = (request.form.get("publish_until") or "").strip()
publish_from = None
publish_until = None
try:
if publish_from_str:
publish_from = datetime.strptime(publish_from_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_from = None
try:
if publish_until_str:
publish_until = datetime.strptime(publish_until_str, "%Y-%m-%dT%H:%M")
except Exception:
publish_until = None
if not title or not body:
flash("Title and body are required.", "danger")
return redirect(url_for("main.settings", section="news"))
item.title = title
item.body = body
item.link_url = link_url
item.severity = severity if severity in ("info", "warning") else "info"
item.pinned = pinned
item.active = active
item.publish_from = publish_from
item.publish_until = publish_until
try:
db.session.commit()
flash("News item updated.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to update news item: {exc}")
flash("Failed to update news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/delete", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_delete(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
try:
db.session.delete(item)
db.session.commit()
flash("News item deleted.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to delete news item: {exc}")
flash("Failed to delete news item.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/reset_reads", methods=["POST"])
@login_required
@roles_required("admin")
def settings_news_reset_reads(news_id: int):
try:
NewsRead.query.filter_by(news_item_id=news_id).delete(synchronize_session=False)
db.session.commit()
flash("Read status reset for this news item.", "success")
except Exception as exc:
db.session.rollback()
print(f"[settings] Failed to reset news reads: {exc}")
flash("Failed to reset read status.", "danger")
return redirect(url_for("main.settings", section="news"))
@main_bp.route("/settings/news/<int:news_id>/reads")
@login_required
@roles_required("admin")
def settings_news_reads(news_id: int):
item = NewsItem.query.get(news_id)
if not item:
return abort(404)
reads = []
try:
reads = (
db.session.query(NewsRead, User)
.join(User, User.id == NewsRead.user_id)
.filter(NewsRead.news_item_id == news_id)
.order_by(NewsRead.read_at.desc())
.all()
)
except Exception:
reads = []
return render_template("main/settings_news_reads.html", item=item, reads=reads)
@main_bp.route("/settings/users/create", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_create():
username = (request.form.get("new_username") or "").strip()
roles = [r.strip() for r in request.form.getlist("new_roles") if (r or "").strip()]
# Backwards compatible storage: comma-separated roles in the existing "role" column
role = ",".join(dict.fromkeys(roles)) if roles else "viewer"
password = (request.form.get("new_password") or "").strip()
if not username:
flash("Username is required.", "danger")
return redirect(url_for("main.settings", section="general"))
existing = User.query.filter_by(username=username).first()
if existing:
flash("Username already exists.", "danger")
return redirect(url_for("main.settings", section="general"))
if not password:
flash("Password is required.", "danger")
return redirect(url_for("main.settings", section="general"))
user = User(username=username, role=role)
user.set_password(password)
db.session.add(user)
try:
db.session.commit()
flash(f"User '{username}' has been created.", "success")
_log_admin_event("user_create", f"User '{username}' created with roles '{role}'.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to create user: {exc}")
flash("Failed to create user.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/app-reset", methods=["POST"])
@login_required
@roles_required("admin")
def settings_app_reset():
# Require explicit confirmation to avoid accidental resets
confirmation = (request.form.get("confirm_reset") or "").strip().upper()
if confirmation != "RESET":
flash("Application reset cancelled. Type RESET to confirm.", "warning")
return redirect(url_for("main.settings", section="general"))
# Reset all application data (including users). After this, the app will
# return to initial setup (create new admin).
try:
dialect_name = ""
try:
dialect_name = (db.engine.dialect.name or "").lower()
except Exception:
dialect_name = ""
if dialect_name.startswith("postgres"):
# Postgres: fast and resets identities; CASCADE handles FKs.
db.session.execute(
text(
"""TRUNCATE TABLE
ticket_job_runs,
remark_job_runs,
ticket_scopes,
remark_scopes,
tickets,
remarks,
mail_objects,
mail_messages,
job_objects,
job_runs,
jobs,
overrides,
customers,
admin_logs,
system_settings,
users
RESTART IDENTITY CASCADE"""
)
)
db.session.commit()
else:
# Fallback (e.g. SQLite): delete in FK-safe order.
for model in (
TicketJobRun,
RemarkJobRun,
TicketScope,
RemarkScope,
Ticket,
Remark,
MailObject,
MailMessage,
JobObject,
JobRun,
Job,
Override,
Customer,
AdminLog,
SystemSettings,
User,
):
db.session.query(model).delete()
db.session.commit()
try:
logout_user()
except Exception:
pass
flash("Application has been reset. Please create a new admin user.", "success")
return redirect(url_for("auth.initial_setup"))
except Exception as exc:
try:
db.session.rollback()
except Exception:
pass
flash(f"Reset failed: {exc}", "danger")
return redirect(url_for("main.settings", section="general"))
@main_bp.route("/settings/users/<int:user_id>/reset-password", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_reset_password(user_id: int):
user = User.query.get_or_404(user_id)
new_password = (request.form.get("reset_password") or "").strip()
if not new_password:
flash("New password is required.", "danger")
return redirect(url_for("main.settings", section="general"))
user.set_password(new_password)
try:
db.session.commit()
flash(f"Password for '{user.username}' has been reset.", "success")
_log_admin_event("user_reset_password", f"Password reset for user '{user.username}'.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to reset password: {exc}")
flash("Failed to reset password.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/users/<int:user_id>/roles", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_update_roles(user_id: int):
user = User.query.get_or_404(user_id)
roles = [r.strip() for r in request.form.getlist("roles") if (r or "").strip()]
roles = list(dict.fromkeys(roles))
if not roles:
roles = ["viewer"]
# Prevent removing the last remaining admin role
removing_admin = ("admin" in user.roles) and ("admin" not in roles)
if removing_admin:
try:
all_users = User.query.all()
admin_count = sum(1 for u in all_users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_count = 0
if admin_count <= 1:
flash("Cannot remove admin role from the last admin account.", "danger")
return redirect(url_for("main.settings", section="users"))
old_roles = ",".join(user.roles)
new_roles = ",".join(roles)
user.role = new_roles
try:
db.session.commit()
flash(f"Roles for '{user.username}' have been updated.", "success")
_log_admin_event("user_update_roles", f"User '{user.username}' roles changed from '{old_roles}' to '{new_roles}'.")
# If the updated user is currently logged in, make sure the active role stays valid.
try:
if getattr(current_user, "id", None) == user.id:
current_user.set_active_role(user.roles[0])
except Exception:
pass
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to update roles: {exc}")
flash("Failed to update roles.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/users/<int:user_id>/delete", methods=["POST"])
@login_required
@roles_required("admin")
def settings_users_delete(user_id: int):
user = User.query.get_or_404(user_id)
# Prevent deleting the last admin user
if "admin" in user.roles:
try:
all_users = User.query.all()
admin_count = sum(1 for u in all_users if "admin" in (getattr(u, "roles", None) or []))
except Exception:
admin_count = 0
if admin_count <= 1:
flash("Cannot delete the last admin account.", "danger")
return redirect(url_for("main.settings", section="general"))
username = user.username
try:
db.session.delete(user)
db.session.commit()
flash(f"User '{username}' has been deleted.", "success")
_log_admin_event("user_delete", f"User '{username}' deleted.")
except Exception as exc:
db.session.rollback()
print(f"[settings-users] Failed to delete user: {exc}")
flash("Failed to delete user.", "danger")
return redirect(url_for("main.settings", section="users"))
@main_bp.route("/settings/mail-import", methods=["POST"])
@login_required
@roles_required("admin")
def settings_mail_import():
settings = _get_or_create_settings()
# Prevent manual import when free disk is below 2 GB
free_disk_bytes = _get_free_disk_bytes()
if free_disk_bytes is not None:
two_gb = 2 * 1024 * 1024 * 1024
if free_disk_bytes < two_gb:
flash("Manual mail import is blocked because free disk space is below 2 GB.", "danger")
_log_admin_event("mail_import_manual_blocked", "Manual mail import blocked: free disk space below 2 GB.")
return redirect(url_for("main.settings", section="general"))
# Determine batch size (max 50)
try:
batch_size = int(request.form.get("manual_import_items") or settings.manual_import_batch_size or 50)
except (ValueError, TypeError):
batch_size = settings.manual_import_batch_size or 50
if batch_size <= 0:
batch_size = 1
if batch_size > 50:
batch_size = 50
auto_approved_runs = []
try:
total_fetched, new_messages, auto_approved, auto_approved_runs, errors = run_manual_import(
settings, batch_size
)
except MailImportError as exc:
msg = f"Manual mail import failed: {exc}"
_log_admin_event("mail_import_manual_error", msg)
flash(str(exc), "danger")
return redirect(url_for("main.settings", section="general"))
except Exception as exc:
msg = f"Unexpected error during manual mail import: {exc}"
_log_admin_event("mail_import_manual_error", msg)
flash("Unexpected error during manual mail import. See logs for details.", "danger")
return redirect(url_for("main.settings", section="general"))
msg = f"Manual mail import finished. fetched={total_fetched}, new={new_messages}, auto_approved={auto_approved}, errors={len(errors)}"
_log_admin_event("mail_import_manual", msg)
# Persist objects for auto-approved runs (must not block the request)
if auto_approved_runs:
persisted_objects = 0
persisted_errors = 0
for (customer_id, job_id, run_id, mail_message_id) in auto_approved_runs:
try:
persisted_objects += persist_objects_for_auto_run(
int(customer_id), int(job_id), int(run_id), int(mail_message_id)
)
except Exception as exc:
persisted_errors += 1
_log_admin_event(
"object_persist_error",
f"Object persistence failed for auto-approved message {mail_message_id} (job {job_id}, run {run_id}): {exc}",
)
_log_admin_event(
"object_persist_auto_approve",
f"Persisted objects for auto-approved runs (manual import). runs={len(auto_approved_runs)}, objects={persisted_objects}, errors={persisted_errors}",
)
if errors:
flash("Manual mail import finished with errors.", "warning")
else:
flash("Manual mail import finished.", "success")
flash(f"Fetched: {total_fetched}, new: {new_messages}, auto-approved: {auto_approved}.", "info")
for err in errors[:5]:
flash(f"Import error: {err}", "danger")
return redirect(url_for("main.settings"))
@main_bp.route("/settings/folders")
@login_required
@roles_required("admin")
def settings_folders():
settings = _get_or_create_settings()
mailbox = (settings.graph_mailbox or "").strip()
if not mailbox:
return jsonify(
{"status": "error", "message": "Microsoft Graph mailbox is not configured."}
), 400
try:
# Reuse the same token flow used by the mail importer.
from ..mail_importer import _get_access_token, _build_auth_headers, GRAPH_BASE_URL
access_token = _get_access_token(settings)
headers = _build_auth_headers(access_token)
def _graph_get_all(url: str):
items = []
next_url = url
# Safety limit to avoid infinite loops if Graph behaves unexpectedly.
safety_pages = 0
while next_url and safety_pages < 50:
safety_pages += 1
resp = requests.get(next_url, headers=headers, timeout=20)
if resp.status_code != 200:
try:
payload = resp.json()
except Exception:
payload = {}
msg = payload.get("error", {}).get("message") or f"HTTP {resp.status_code}"
raise RuntimeError(msg)
payload = resp.json() or {}
items.extend(payload.get("value", []) or [])
next_url = payload.get("@odata.nextLink")
return items
def _build_tree(parent_id: str | None, parent_path: str):
if parent_id is None:
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders?$top=100"
else:
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders?$top=100"
folders = _graph_get_all(url)
nodes = []
for f in folders:
fid = f.get("id")
name = (f.get("displayName") or "").strip()
if not fid or not name:
continue
path = name if not parent_path else f"{parent_path}/{name}"
node = {
"displayName": name,
"id": fid,
"path": path,
"children": [],
}
# Recursively load children (bounded by Graph and typical folder depth).
try:
node["children"] = _build_tree(fid, path)
except Exception:
# If child loading fails for a specific folder, keep it as a leaf.
node["children"] = []
nodes.append(node)
# Stable order for UI
nodes.sort(key=lambda n: (n.get("displayName") or "").lower())
return nodes
folders_tree = _build_tree(None, "")
return jsonify({"status": "ok", "folders": folders_tree})
except Exception as exc:
try:
current_app.logger.exception("Failed to load mailbox folders from Microsoft Graph")
except Exception:
pass
return jsonify({"status": "error", "message": str(exc) or "Failed to load folders."}), 500
@main_bp.route("/settings/autotask/test-connection", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_test_connection():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
zone = client.get_zone_info()
# Lightweight authenticated calls to validate credentials and basic API access
_ = client.get_queues()
_ = client.get_ticket_sources()
flash(f"Autotask connection OK. Zone: {zone.zone_name or 'unknown'}.", "success")
_log_admin_event(
"autotask_test_connection",
"Autotask test connection succeeded.",
details=json.dumps({"zone": zone.zone_name, "api_url": zone.api_url}),
)
except Exception as exc:
flash(f"Autotask connection failed: {exc}", "danger")
_log_admin_event(
"autotask_test_connection_failed",
"Autotask test connection failed.",
details=json.dumps({"error": str(exc)}),
)
return redirect(url_for("main.settings", section="integrations"))
def _refresh_autotask_reference_data(settings):
"""Refresh and persist Autotask reference data used for ticket default dropdowns."""
from ..integrations.autotask.client import AutotaskClient
client = AutotaskClient(
username=settings.autotask_api_username,
password=settings.autotask_api_password,
api_integration_code=settings.autotask_tracking_identifier,
environment=(settings.autotask_environment or "production"),
)
queues = client.get_queues()
sources = client.get_ticket_sources()
priorities = client.get_ticket_priorities()
statuses = client.get_ticket_statuses()
# Store a minimal subset for dropdowns (id + name/label)
# Note: Some "reference" values are exposed as picklists (value/label)
# instead of entity collections (id/name). We normalize both shapes.
def _norm(items):
out = []
for it in items or []:
if not isinstance(it, dict):
continue
_id = it.get("id")
if _id is None:
_id = it.get("value")
name = (
it.get("name")
or it.get("label")
or it.get("queueName")
or it.get("sourceName")
or it.get("description")
or ""
)
try:
_id_int = int(_id)
except Exception:
continue
out.append({"id": _id_int, "name": str(name)})
# Sort by name for stable dropdowns
out.sort(key=lambda x: (x.get("name") or "").lower())
return out
settings.autotask_cached_queues_json = json.dumps(_norm(queues))
settings.autotask_cached_ticket_sources_json = json.dumps(_norm(sources))
settings.autotask_cached_ticket_statuses_json = json.dumps(_norm(statuses))
# Priorities are returned as picklist values (value/label)
pr_out = []
for it in priorities or []:
if not isinstance(it, dict):
continue
if it.get("isActive") is False:
continue
val = it.get("value")
label = it.get("label") or it.get("name") or ""
try:
val_int = int(val)
except Exception:
continue
pr_out.append({"id": val_int, "name": str(label)})
pr_out.sort(key=lambda x: (x.get("name") or "").lower())
settings.autotask_cached_priorities_json = json.dumps(pr_out)
settings.autotask_reference_last_sync_at = datetime.utcnow()
return queues, sources, statuses, pr_out
@main_bp.route("/settings/autotask/refresh-reference-data", methods=["POST"])
@login_required
@roles_required("admin")
def settings_autotask_refresh_reference_data():
settings = _get_or_create_settings()
if not settings.autotask_api_username or not settings.autotask_api_password or not settings.autotask_tracking_identifier:
flash("Autotask settings incomplete. Provide username, password and tracking identifier first.", "warning")
return redirect(url_for("main.settings", section="integrations"))
try:
queues, sources, statuses, pr_out = _refresh_autotask_reference_data(settings)
db.session.commit()
flash(
f"Autotask reference data refreshed. Queues: {len(queues)}. Ticket Sources: {len(sources)}. Ticket Statuses: {len(statuses)}. Priorities: {len(pr_out)}.",
"success",
)
_log_admin_event(
"autotask_refresh_reference_data",
"Autotask reference data refreshed.",
details=json.dumps({"queues": len(queues or []), "ticket_sources": len(sources or []), "ticket_statuses": len(statuses or []), "priorities": len(pr_out)}),
)
except Exception as exc:
flash(f"Failed to refresh Autotask reference data: {exc}", "danger")
_log_admin_event(
"autotask_refresh_reference_data_failed",
"Autotask reference data refresh failed.",
details=json.dumps({"error": str(exc)}),
)
return redirect(url_for("main.settings", section="integrations"))