backupchecks/containers/backupchecks/src/backend/app/mail_importer.py

812 lines
32 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone, timedelta
from typing import List
import socket
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError
from urllib.parse import urlparse
import requests
from sqlalchemy import func
from . import db
from .models import MailMessage, SystemSettings, Job, JobRun, MailObject
from .parsers import parse_mail_message
from .parsers.veeam import extract_vspc_active_alarms_companies
from .email_utils import normalize_from_address, extract_best_html_from_eml, is_effectively_blank_html
from .job_matching import find_matching_job
from .ticketing_utils import link_open_internal_tickets_to_run
GRAPH_TOKEN_URL_TEMPLATE = "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
GRAPH_BASE_URL = "https://graph.microsoft.com/v1.0"
class MailImportError(Exception):
pass
def _get_access_token(settings: SystemSettings) -> str:
if not settings.graph_tenant_id or not settings.graph_client_id or not settings.graph_client_secret:
raise MailImportError("Graph credentials are not fully configured.")
token_url = GRAPH_TOKEN_URL_TEMPLATE.format(tenant_id=settings.graph_tenant_id)
data = {
"client_id": settings.graph_client_id,
"client_secret": settings.graph_client_secret,
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
}
resp = requests.post(token_url, data=data, timeout=15)
if resp.status_code != 200:
raise MailImportError(f"Failed to obtain access token from Microsoft Graph (status {resp.status_code}).")
payload = resp.json()
access_token = payload.get("access_token")
if not access_token:
raise MailImportError("Access token not present in Graph response.")
return access_token
def _build_auth_headers(access_token: str) -> dict:
return {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
def _can_resolve_hostname(hostname: str, timeout_seconds: int = 2) -> bool:
"""Best-effort DNS preflight.
requests' connect timeout does not cover DNS resolution time.
When DNS is slow/unavailable, a sync gunicorn worker can hit WORKER TIMEOUT.
We therefore preflight resolution and skip move operations if it hangs.
"""
if not hostname:
return False
try:
# signal.* cannot be used outside the main thread (gunicorn worker threads / schedulers).
# Use a small worker thread and a hard timeout instead.
with ThreadPoolExecutor(max_workers=1) as ex:
fut = ex.submit(socket.getaddrinfo, hostname, 443)
fut.result(timeout=float(timeout_seconds))
return True
except FuturesTimeoutError:
return False
except Exception:
return False
def _fetch_eml_bytes(mailbox: str, msg_id: str, access_token: str) -> bytes | None:
"""Fetch raw RFC822 (.eml) content for a message id via Microsoft Graph."""
if not mailbox or not msg_id:
return None
url = f"{GRAPH_BASE_URL}/users/{mailbox}/messages/{msg_id}/$value"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/octet-stream",
}
try:
resp = requests.get(url, headers=headers, timeout=30)
except Exception:
return None
if resp.status_code != 200:
return None
return resp.content or None
def _resolve_folder_id(settings: SystemSettings, access_token: str, folder_path: str) -> str:
"""Resolve a displayName path like 'Inbox/Backup Database' to a folder id."""
if not settings.graph_mailbox:
raise MailImportError("Mailbox address is not configured.")
folder_path = (folder_path or "").strip()
if not folder_path:
raise MailImportError("Folder path is empty.")
segments = [seg.strip() for seg in folder_path.split("/") if seg.strip()]
if not segments:
raise MailImportError("Folder path is empty.")
headers = _build_auth_headers(access_token)
mailbox = settings.graph_mailbox
retention_days = getattr(settings, "ingest_eml_retention_days", 7)
try:
retention_days = int(retention_days) if retention_days is not None else 7
except (ValueError, TypeError):
retention_days = 7
if retention_days not in (0, 7, 14):
retention_days = 7
# Fetch top-level mailFolders (Inbox, Archive, etc.)
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders?$top=100"
resp = requests.get(url, headers=headers, timeout=20)
if resp.status_code != 200:
raise MailImportError(f"Failed to list top-level mail folders (status {resp.status_code}).")
data = resp.json()
folders = data.get("value", [])
def _find_by_name(items, name):
name_lower = name.lower()
for item in items:
if str(item.get("displayName", "")).lower() == name_lower:
return item
return None
current_folder = _find_by_name(folders, segments[0])
if not current_folder:
raise MailImportError(f"Folder '{segments[0]}' not found in mailbox.")
# Walk down childFolders if there are more segments
for segment in segments[1:]:
parent_id = current_folder.get("id")
url = f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders/{parent_id}/childFolders?$top=100"
resp = requests.get(url, headers=headers, timeout=20)
if resp.status_code != 200:
raise MailImportError(
f"Failed to list child folders for '{current_folder.get('displayName')}' (status {resp.status_code})."
)
children = resp.json().get("value", [])
next_folder = _find_by_name(children, segment)
if not next_folder:
raise MailImportError(
f"Folder '{segment}' not found under '{current_folder.get('displayName')}'."
)
current_folder = next_folder
folder_id = current_folder.get("id")
if not folder_id:
raise MailImportError("Resolved folder does not have an id.")
return folder_id
def _parse_graph_datetime(value: str | None):
if not value:
return None
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
return dt.astimezone(timezone.utc).replace(tzinfo=None)
except Exception:
return None
def _store_messages(settings: SystemSettings, messages):
total = 0
new_count = 0
auto_approved = 0
auto_approved_runs = []
for msg in messages:
total += 1
graph_id = msg.get("id")
if not graph_id:
continue
existing = MailMessage.query.filter_by(message_id=graph_id).first()
if existing:
continue
from_info = msg.get("from") or {}
email_info = from_info.get("emailAddress") or {}
from_addr = normalize_from_address(email_info.get("address"))
subject = msg.get("subject")
received_raw = msg.get("receivedDateTime")
received_at = _parse_graph_datetime(received_raw)
# Decide which body field to populate based on Graph response
body = msg.get("body") or {}
body_content = body.get("content")
body_type = (body.get("contentType") or "").lower()
html_body = None
text_body = None
if isinstance(body_content, str):
if body_type == "html":
html_body = body_content
else:
text_body = body_content
mail = MailMessage(
message_id=graph_id,
from_address=from_addr,
subject=subject,
received_at=received_at,
html_body=html_body,
text_body=text_body,
location="inbox",
eml_blob=msg.get("_eml_bytes"),
eml_stored_at=(datetime.utcnow() if msg.get("_eml_bytes") else None),
)
# Some systems send empty bodies and put the actual report in an HTML attachment.
# Graph may still return a body that only contains whitespace/newlines; treat that
# as empty so we can fall back to the attachment.
def _is_blank_text(s):
return s is None or (isinstance(s, str) and s.strip() == "")
# If we have raw EML bytes and no meaningful body content, extract the first
# HTML attachment and use it as the HTML body so parsers and the inbox preview
# can work.
if is_effectively_blank_html(mail.html_body) and _is_blank_text(mail.text_body) and mail.eml_blob:
attachment_html = extract_best_html_from_eml(mail.eml_blob)
if attachment_html:
mail.html_body = attachment_html
# IMPORTANT: Persist first so mail.id exists.
# Object extraction stores rows keyed by mail_message_id; without an id,
# objects are silently skipped.
db.session.add(mail)
db.session.flush()
# Immediately run parsers so Inbox / Jobs can show parsed metadata + objects.
try:
parse_mail_message(mail)
except Exception as exc:
# Do not break the import if parsing fails; just record it on the message
if hasattr(mail, "parse_result"):
mail.parse_result = "error"
if hasattr(mail, "parse_error"):
mail.parse_error = str(exc)[:500]
# Auto-approve if this job was already approved before (unique match across customers).
# Mirrors the behavior of the Inbox "Re-parse all" auto-approve.
try:
if (
getattr(mail, "location", "inbox") == "inbox"
and getattr(mail, "parse_result", None) == "ok"
and not bool(getattr(mail, "approved", False))
):
# Special case: Veeam VSPC "Active alarms summary" contains multiple companies.
bsw = (getattr(mail, "backup_software", "") or "").strip().lower()
btype = (getattr(mail, "backup_type", "") or "").strip().lower()
jname = (getattr(mail, "job_name", "") or "").strip().lower()
if bsw == "veeam" and btype == "service provider console" and jname == "active alarms summary":
raw = (mail.text_body or "").strip() or (mail.html_body or "")
companies = extract_vspc_active_alarms_companies(raw)
if companies:
def _is_error_status(value: str | None) -> bool:
v = (value or "").strip().lower()
return v in {"error", "failed", "critical"} or v.startswith("fail")
created_any = False
first_job = None
mapped_count = 0
for company in companies:
# Build a temp message using the per-company job name
tmp = MailMessage(
from_address=mail.from_address,
backup_software=mail.backup_software,
backup_type=mail.backup_type,
job_name=f"{(mail.job_name or 'Active alarms summary').strip()} | {company}".strip(),
)
job = find_matching_job(tmp)
if not job:
continue
# Respect per-job flags.
if hasattr(job, "active") and not bool(job.active):
continue
if hasattr(job, "auto_approve") and not bool(job.auto_approve):
continue
mapped_count += 1
objs = (
MailObject.query.filter(MailObject.mail_message_id == mail.id)
.filter(MailObject.object_name.ilike(f"{company} | %"))
.all()
)
saw_error = any(_is_error_status(o.status) for o in objs)
saw_warning = any((o.status or "").strip().lower() == "warning" for o in objs)
status = "Error" if saw_error else ("Warning" if saw_warning else (mail.overall_status or "Success"))
run = JobRun(
job_id=job.id,
mail_message_id=mail.id,
run_at=mail.received_at,
status=status or None,
missed=False,
)
# Optional storage metrics
if hasattr(run, "storage_used_bytes") and hasattr(mail, "storage_used_bytes"):
run.storage_used_bytes = mail.storage_used_bytes
if hasattr(run, "storage_capacity_bytes") and hasattr(mail, "storage_capacity_bytes"):
run.storage_capacity_bytes = mail.storage_capacity_bytes
if hasattr(run, "storage_free_bytes") and hasattr(mail, "storage_free_bytes"):
run.storage_free_bytes = mail.storage_free_bytes
if hasattr(run, "storage_free_percent") and hasattr(mail, "storage_free_percent"):
run.storage_free_percent = mail.storage_free_percent
db.session.add(run)
db.session.flush()
# Legacy ticket behavior: inherit any open internal tickets for this job.
try:
link_open_internal_tickets_to_run(run=run, job=job)
except Exception:
pass
auto_approved_runs.append((job.customer_id, job.id, run.id, mail.id))
created_any = True
if not first_job:
first_job = job
# If all companies are mapped, mark the mail as fully approved and move to history.
if created_any and mapped_count == len(companies):
mail.job_id = first_job.id if first_job else None
if hasattr(mail, "approved"):
mail.approved = True
if hasattr(mail, "approved_at"):
mail.approved_at = datetime.utcnow()
if hasattr(mail, "location"):
mail.location = "history"
auto_approved += 1
# Do not fall back to single-job matching for VSPC summary.
continue
job = find_matching_job(mail)
if job:
# Respect per-job flags.
if hasattr(job, "active") and not bool(job.active):
raise Exception("job not active")
if hasattr(job, "auto_approve") and not bool(job.auto_approve):
raise Exception("job auto_approve disabled")
# Create a new run for the known job
run = JobRun(
job_id=job.id,
mail_message_id=mail.id,
run_at=mail.received_at,
status=mail.overall_status or None,
missed=False,
)
# Optional storage metrics (for capacity graphs)
if hasattr(run, "storage_used_bytes") and hasattr(mail, "storage_used_bytes"):
run.storage_used_bytes = mail.storage_used_bytes
if hasattr(run, "storage_capacity_bytes") and hasattr(mail, "storage_capacity_bytes"):
run.storage_capacity_bytes = mail.storage_capacity_bytes
if hasattr(run, "storage_free_bytes") and hasattr(mail, "storage_free_bytes"):
run.storage_free_bytes = mail.storage_free_bytes
if hasattr(run, "storage_free_percent") and hasattr(mail, "storage_free_percent"):
run.storage_free_percent = mail.storage_free_percent
db.session.add(run)
db.session.flush() # ensure run.id is available
# Legacy ticket behavior: inherit any open internal tickets for this job.
try:
link_open_internal_tickets_to_run(run=run, job=job)
except Exception:
pass
# Update mail message to reflect approval
mail.job_id = job.id
if hasattr(mail, "approved"):
mail.approved = True
if hasattr(mail, "approved_at"):
mail.approved_at = datetime.utcnow()
if hasattr(mail, "location"):
mail.location = "history"
auto_approved += 1
auto_approved_runs.append((job.customer_id, job.id, run.id, mail.id))
except Exception as exc:
db.session.rollback()
raise MailImportError(f"Failed to store mail messages in database: {exc}")
return total, new_count, auto_approved, auto_approved_runs
def run_auto_import(settings: SystemSettings):
"""Execute the automatic import from Microsoft Graph.
Automatic import always uses a fixed batch size (50) and respects the
configured cutoff date. Messages older than the cutoff date are not fetched
and therefore remain in the inbox.
Returns:
(total_fetched, new_messages, auto_approved, auto_approved_runs, errors)
"""
errors: List[str] = []
if not settings.graph_mailbox:
raise MailImportError("Mailbox address is not configured.")
try:
access_token = _get_access_token(settings)
except MailImportError:
raise
except Exception as exc:
raise MailImportError(f"Unexpected error while obtaining Graph token: {exc}")
try:
incoming_folder_id = _resolve_folder_id(settings, access_token, settings.incoming_folder or "Inbox")
except MailImportError:
raise
except Exception as exc:
raise MailImportError(f"Unexpected error while resolving incoming folder: {exc}")
processed_folder_id = None
if getattr(settings, "processed_folder", None):
try:
processed_folder_id = _resolve_folder_id(settings, access_token, settings.processed_folder)
except MailImportError as exc:
# If the processed folder is misconfigured, we still continue the import
errors.append(str(exc))
except Exception as exc:
errors.append(f"Unexpected error while resolving processed folder: {exc}")
headers = _build_auth_headers(access_token)
mailbox = settings.graph_mailbox
retention_days = getattr(settings, "ingest_eml_retention_days", 7)
try:
retention_days = int(retention_days) if retention_days is not None else 7
except (ValueError, TypeError):
retention_days = 7
if retention_days not in (0, 7, 14):
retention_days = 7
batch_size = 50
url = (
f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders/{incoming_folder_id}/messages"
f"?$top={batch_size}&$orderby=receivedDateTime desc"
)
# Optional cutoff date (UTC midnight). Older messages should remain in inbox.
cutoff_date = getattr(settings, "auto_import_cutoff_date", None)
if cutoff_date:
cutoff_dt = datetime.combine(cutoff_date, datetime.min.time()).replace(tzinfo=timezone.utc)
cutoff_iso = cutoff_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# Graph requires spaces in $filter to be URL-encoded.
url += f"&$filter=receivedDateTime%20ge%20{cutoff_iso}"
resp = requests.get(url, headers=headers, timeout=20)
if resp.status_code != 200:
raise MailImportError(f"Failed to fetch messages from incoming folder (status {resp.status_code}).")
payload = resp.json()
items = payload.get("value", [])
total_fetched = len(items)
# Fetch full bodies for the fetched messages so inline popup can show content.
for msg in items:
msg_id = msg.get("id")
if not msg_id:
continue
detail_url = f"{GRAPH_BASE_URL}/users/{mailbox}/messages/{msg_id}?$select=body,bodyPreview"
try:
detail_resp = requests.get(detail_url, headers=headers, timeout=20)
except Exception as exc:
errors.append(f"Error while fetching body for message {msg_id}: {exc}")
continue
if detail_resp.status_code != 200:
errors.append(f"Failed to fetch body for message {msg_id} (status {detail_resp.status_code}).")
continue
detail_payload = detail_resp.json()
if "body" in detail_payload:
msg["body"] = detail_payload.get("body")
if "bodyPreview" in detail_payload:
msg["bodyPreview"] = detail_payload.get("bodyPreview")
# Optionally fetch raw EML bytes for new messages (debug storage)
if retention_days > 0:
try:
ids = [m.get("id") for m in items if m.get("id")]
existing_ids = set()
if ids:
existing_ids = {
mid
for (mid,) in db.session.query(MailMessage.message_id)
.filter(MailMessage.message_id.in_(ids))
.all()
if mid
}
for m in items:
mid = m.get("id")
if not mid or mid in existing_ids:
continue
eml_bytes = _fetch_eml_bytes(mailbox, mid, access_token)
if eml_bytes:
m["_eml_bytes"] = eml_bytes
except Exception as exc:
errors.append(f"Unexpected error while fetching EML bytes: {exc}")
auto_approved_runs = []
try:
total_processed, new_messages, auto_approved, auto_approved_runs = _store_messages(settings, items)
except MailImportError as exc:
errors.append(str(exc))
new_messages = 0
auto_approved = 0
auto_approved_runs = []
# Never move messages when the import failed (prevents "moved but not stored" situations).
processed_folder_id = None
# Ensure imported messages are committed before moving them to another folder.
# If commit fails, do not move anything.
if processed_folder_id and new_messages >= 0:
try:
db.session.commit()
except Exception as exc:
db.session.rollback()
errors.append(f"Failed to commit imported messages: {exc}")
new_messages = 0
auto_approved = 0
auto_approved_runs = []
processed_folder_id = None
# Move messages to the processed folder if configured
if processed_folder_id:
graph_host = urlparse(GRAPH_BASE_URL).hostname or ""
if graph_host and not _can_resolve_hostname(graph_host, timeout_seconds=2):
errors.append(
"Skipping move-to-processed step: Microsoft Graph hostname could not be resolved in time. "
"Messages were imported, but will not be moved."
)
processed_folder_id = None
if processed_folder_id:
for msg in items:
msg_id = msg.get("id")
if not msg_id:
continue
move_url = f"{GRAPH_BASE_URL}/users/{mailbox}/messages/{msg_id}/move"
try:
move_resp = requests.post(
move_url,
headers=headers,
json={"destinationId": processed_folder_id},
timeout=20,
)
except Exception as exc:
errors.append(f"Error while moving message {msg_id}: {exc}")
continue
if move_resp.status_code not in (200, 201):
errors.append(
f"Failed to move message {msg_id} to processed folder "
f"(status {move_resp.status_code})."
)
# Cleanup stored EML blobs based on retention policy
try:
if retention_days == 0:
MailMessage.query.filter(MailMessage.eml_blob.isnot(None)).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
else:
cutoff = datetime.utcnow() - timedelta(days=retention_days)
MailMessage.query.filter(
MailMessage.eml_stored_at.isnot(None),
MailMessage.eml_stored_at < cutoff,
).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
except Exception as exc:
db.session.rollback()
errors.append(f"Failed to cleanup stored EML blobs: {exc}")
return total_fetched, new_messages, auto_approved, auto_approved_runs, errors
def run_manual_import(settings: SystemSettings, batch_size: int):
"""Execute a one-off manual import from Microsoft Graph.
Returns:
(total_fetched, new_messages, auto_approved, auto_approved_runs, errors)
"""
errors: List[str] = []
if not settings.graph_mailbox:
raise MailImportError("Mailbox address is not configured.")
try:
access_token = _get_access_token(settings)
except MailImportError as exc:
raise
except Exception as exc:
raise MailImportError(f"Unexpected error while obtaining Graph token: {exc}")
try:
incoming_folder_id = _resolve_folder_id(settings, access_token, settings.incoming_folder or "Inbox")
except MailImportError as exc:
raise
except Exception as exc:
raise MailImportError(f"Unexpected error while resolving incoming folder: {exc}")
processed_folder_id = None
if getattr(settings, "processed_folder", None):
try:
processed_folder_id = _resolve_folder_id(settings, access_token, settings.processed_folder)
except MailImportError as exc:
# If the processed folder is misconfigured, we still continue the import
errors.append(str(exc))
except Exception as exc:
errors.append(f"Unexpected error while resolving processed folder: {exc}")
headers = _build_auth_headers(access_token)
mailbox = settings.graph_mailbox
retention_days = getattr(settings, "ingest_eml_retention_days", 7)
try:
retention_days = int(retention_days) if retention_days is not None else 7
except (ValueError, TypeError):
retention_days = 7
if retention_days not in (0, 7, 14):
retention_days = 7
url = (
f"{GRAPH_BASE_URL}/users/{mailbox}/mailFolders/{incoming_folder_id}/messages"
f"?$top={batch_size}&$orderby=receivedDateTime desc"
)
resp = requests.get(url, headers=headers, timeout=20)
if resp.status_code != 200:
raise MailImportError(f"Failed to fetch messages from incoming folder (status {resp.status_code}).")
payload = resp.json()
items = payload.get("value", [])
total_fetched = len(items)
# Fetch full bodies for the fetched messages so inline popup can show content.
# We keep this simple: for each new message, fetch its body (HTML or text).
for msg in items:
msg_id = msg.get("id")
if not msg_id:
continue
detail_url = f"{GRAPH_BASE_URL}/users/{mailbox}/messages/{msg_id}?$select=body,bodyPreview"
try:
detail_resp = requests.get(detail_url, headers=headers, timeout=20)
except Exception as exc:
errors.append(f"Error while fetching body for message {msg_id}: {exc}")
continue
if detail_resp.status_code != 200:
errors.append(
f"Failed to fetch body for message {msg_id} (status {detail_resp.status_code})."
)
continue
detail_payload = detail_resp.json()
if "body" in detail_payload:
msg["body"] = detail_payload.get("body")
if "bodyPreview" in detail_payload:
msg["bodyPreview"] = detail_payload.get("bodyPreview")
# Optionally fetch raw EML bytes for new messages (debug storage)
if retention_days > 0:
try:
ids = [m.get("id") for m in items if m.get("id")]
existing_ids = set()
if ids:
existing_ids = {
mid
for (mid,) in db.session.query(MailMessage.message_id)
.filter(MailMessage.message_id.in_(ids))
.all()
if mid
}
for m in items:
mid = m.get("id")
if not mid or mid in existing_ids:
continue
eml_bytes = _fetch_eml_bytes(mailbox, mid, access_token)
if eml_bytes:
m["_eml_bytes"] = eml_bytes
except Exception as exc:
errors.append(f"Unexpected error while fetching EML bytes: {exc}")
auto_approved = 0
auto_approved_runs = []
try:
total_processed, new_messages, auto_approved, auto_approved_runs = _store_messages(settings, items)
except MailImportError as exc:
errors.append(str(exc))
new_messages = 0
auto_approved_runs = []
# Never move messages when the import failed (prevents "moved but not stored" situations).
processed_folder_id = None
# Ensure imported messages are committed before moving them to another folder.
# If commit fails, do not move anything.
if processed_folder_id and new_messages >= 0:
try:
db.session.commit()
except Exception as exc:
db.session.rollback()
errors.append(f"Failed to commit imported messages: {exc}")
new_messages = 0
auto_approved = 0
auto_approved_runs = []
processed_folder_id = None
# Move messages to the processed folder if configured
if processed_folder_id:
graph_host = urlparse(GRAPH_BASE_URL).hostname or ""
if graph_host and not _can_resolve_hostname(graph_host, timeout_seconds=2):
errors.append(
"Skipping move-to-processed step: Microsoft Graph hostname could not be resolved in time. "
"Messages were imported, but will not be moved."
)
processed_folder_id = None
if processed_folder_id:
for msg in items:
msg_id = msg.get("id")
if not msg_id:
continue
move_url = f"{GRAPH_BASE_URL}/users/{mailbox}/messages/{msg_id}/move"
try:
move_resp = requests.post(
move_url,
headers=headers,
json={"destinationId": processed_folder_id},
timeout=20,
)
except Exception as exc:
errors.append(f"Error while moving message {msg_id}: {exc}")
continue
if move_resp.status_code not in (200, 201):
errors.append(
f"Failed to move message {msg_id} to processed folder "
f"(status {move_resp.status_code})."
)
# Cleanup stored EML blobs based on retention policy
try:
if retention_days == 0:
MailMessage.query.filter(MailMessage.eml_blob.isnot(None)).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
else:
cutoff = datetime.utcnow() - timedelta(days=retention_days)
MailMessage.query.filter(
MailMessage.eml_stored_at.isnot(None),
MailMessage.eml_stored_at < cutoff,
).update(
{MailMessage.eml_blob: None, MailMessage.eml_stored_at: None},
synchronize_session=False,
)
db.session.commit()
except Exception as exc:
db.session.rollback()
errors.append(f"Failed to cleanup stored EML blobs: {exc}")
return total_fetched, new_messages, auto_approved, auto_approved_runs, errors