backupchecks/containers/backupchecks/src/backend/app/main/routes_search.py

964 lines
34 KiB
Python

from .routes_shared import * # noqa: F401,F403
from .routes_shared import (
_apply_overrides_to_run,
_format_datetime,
_get_or_create_settings,
_get_ui_timezone,
_infer_monthly_schedule_from_runs,
_infer_schedule_map_from_runs,
)
from sqlalchemy import and_, cast, func, or_, String
import math
SEARCH_LIMIT_PER_SECTION = 10
SEARCH_SECTION_KEYS = [
"inbox",
"customers",
"jobs",
"daily_jobs",
"run_checks",
"tickets",
"remarks",
"overrides",
"reports",
]
def _is_section_allowed(section: str) -> bool:
role = get_active_role()
allowed = {
"inbox": {"admin", "operator", "viewer"},
"customers": {"admin", "operator", "viewer"},
"jobs": {"admin", "operator", "viewer"},
"daily_jobs": {"admin", "operator", "viewer"},
"run_checks": {"admin", "operator"},
"tickets": {"admin", "operator", "viewer"},
"remarks": {"admin", "operator", "viewer"},
"overrides": {"admin", "operator", "viewer"},
"reports": {"admin", "operator", "viewer", "reporter"},
}
return role in allowed.get(section, set())
def _build_patterns(raw_query: str) -> list[str]:
tokens = [t.strip() for t in (raw_query or "").split() if t.strip()]
patterns: list[str] = []
for token in tokens:
p = token.replace("\\", "\\\\")
p = p.replace("%", "\\%").replace("_", "\\_")
p = p.replace("*", "%")
if not p.startswith("%"):
p = f"%{p}"
if not p.endswith("%"):
p = f"{p}%"
patterns.append(p)
return patterns
def _contains_all_terms(columns: list, patterns: list[str]):
if not patterns or not columns:
return None
term_filters = []
for pattern in patterns:
per_term = [col.ilike(pattern, escape="\\") for col in columns]
term_filters.append(or_(*per_term))
return and_(*term_filters)
def _parse_page(value: str | None) -> int:
try:
page = int((value or "").strip())
except Exception:
page = 1
return page if page > 0 else 1
def _paginate_query(query, page: int, order_by_cols: list):
total = query.count()
total_pages = max(1, math.ceil(total / SEARCH_LIMIT_PER_SECTION)) if total else 1
current_page = min(max(page, 1), total_pages)
rows = (
query.order_by(*order_by_cols)
.offset((current_page - 1) * SEARCH_LIMIT_PER_SECTION)
.limit(SEARCH_LIMIT_PER_SECTION)
.all()
)
return total, current_page, total_pages, rows
def _enrich_paging(section: dict, total: int, current_page: int, total_pages: int) -> None:
section["total"] = int(total or 0)
section["current_page"] = int(current_page or 1)
section["total_pages"] = int(total_pages or 1)
section["has_prev"] = section["current_page"] > 1
section["has_next"] = section["current_page"] < section["total_pages"]
section["prev_url"] = ""
section["next_url"] = ""
def _build_inbox_results(patterns: list[str], page: int) -> dict:
section = {
"key": "inbox",
"title": "Inbox",
"view_all_url": url_for("main.inbox"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("inbox"):
return section
query = MailMessage.query
if hasattr(MailMessage, "location"):
query = query.filter(MailMessage.location == "inbox")
match_expr = _contains_all_terms(
[
func.coalesce(MailMessage.from_address, ""),
func.coalesce(MailMessage.subject, ""),
cast(MailMessage.received_at, String),
func.coalesce(MailMessage.backup_software, ""),
func.coalesce(MailMessage.backup_type, ""),
func.coalesce(MailMessage.job_name, ""),
func.coalesce(MailMessage.parse_result, ""),
cast(MailMessage.parsed_at, String),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[MailMessage.received_at.desc().nullslast(), MailMessage.id.desc()],
)
_enrich_paging(section, total, current_page, total_pages)
for msg in rows:
parsed_flag = bool(getattr(msg, "parsed_at", None) or (msg.parse_result or ""))
section["items"].append(
{
"title": msg.subject or f"Message #{msg.id}",
"subtitle": f"{msg.from_address or '-'} | {_format_datetime(msg.received_at)}",
"meta": f"{msg.backup_software or '-'} / {msg.backup_type or '-'} / {msg.job_name or '-'} | Parsed: {'Yes' if parsed_flag else 'No'}",
"link": url_for("main.inbox"),
}
)
return section
def _build_customers_results(patterns: list[str], page: int) -> dict:
section = {
"key": "customers",
"title": "Customers",
"view_all_url": url_for("main.customers"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("customers"):
return section
query = Customer.query
match_expr = _contains_all_terms([func.coalesce(Customer.name, "")], patterns)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[Customer.name.asc()],
)
_enrich_paging(section, total, current_page, total_pages)
for c in rows:
try:
job_count = c.jobs.count()
except Exception:
job_count = 0
section["items"].append(
{
"title": c.name or f"Customer #{c.id}",
"subtitle": f"Jobs: {job_count}",
"meta": "Active" if c.active else "Inactive",
"link": url_for("main.jobs", customer_id=c.id),
}
)
return section
def _build_jobs_results(patterns: list[str], page: int) -> dict:
section = {
"key": "jobs",
"title": "Jobs",
"view_all_url": url_for("main.jobs"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("jobs"):
return section
query = (
db.session.query(
Job.id.label("job_id"),
Job.backup_software.label("backup_software"),
Job.backup_type.label("backup_type"),
Job.job_name.label("job_name"),
Customer.name.label("customer_name"),
)
.select_from(Job)
.outerjoin(Customer, Customer.id == Job.customer_id)
.filter(Job.archived.is_(False))
.filter(db.or_(Customer.id.is_(None), Customer.active.is_(True)))
)
match_expr = _contains_all_terms(
[
func.coalesce(Customer.name, ""),
func.coalesce(Job.backup_software, ""),
func.coalesce(Job.backup_type, ""),
func.coalesce(Job.job_name, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[
Customer.name.asc().nullslast(),
Job.backup_software.asc(),
Job.backup_type.asc(),
Job.job_name.asc(),
],
)
_enrich_paging(section, total, current_page, total_pages)
for row in rows:
section["items"].append(
{
"title": row.job_name or f"Job #{row.job_id}",
"subtitle": f"{row.customer_name or '-'} | {row.backup_software or '-'} / {row.backup_type or '-'}",
"meta": "",
"link": url_for("main.job_detail", job_id=row.job_id),
}
)
return section
def _build_daily_jobs_results(patterns: list[str], page: int) -> dict:
section = {
"key": "daily_jobs",
"title": "Daily Jobs",
"view_all_url": url_for("main.daily_jobs"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("daily_jobs"):
return section
try:
tz = _get_ui_timezone()
except Exception:
tz = None
try:
target_date = datetime.now(tz).date() if tz else datetime.utcnow().date()
except Exception:
target_date = datetime.utcnow().date()
settings = _get_or_create_settings()
missed_start_date = getattr(settings, "daily_jobs_start_date", None)
if tz:
local_midnight = datetime(
year=target_date.year,
month=target_date.month,
day=target_date.day,
hour=0,
minute=0,
second=0,
tzinfo=tz,
)
start_of_day = local_midnight.astimezone(datetime_module.timezone.utc).replace(tzinfo=None)
end_of_day = (local_midnight + timedelta(days=1)).astimezone(datetime_module.timezone.utc).replace(tzinfo=None)
else:
start_of_day = datetime(
year=target_date.year,
month=target_date.month,
day=target_date.day,
hour=0,
minute=0,
second=0,
)
end_of_day = start_of_day + timedelta(days=1)
def _to_local(dt_utc):
if not dt_utc or not tz:
return dt_utc
try:
if dt_utc.tzinfo is None:
dt_utc = dt_utc.replace(tzinfo=datetime_module.timezone.utc)
return dt_utc.astimezone(tz)
except Exception:
return dt_utc
def _bucket_15min(dt_utc):
d = _to_local(dt_utc)
if not d:
return None
minute_bucket = (d.minute // 15) * 15
return f"{d.hour:02d}:{minute_bucket:02d}"
def _is_success_status(value: str) -> bool:
s = (value or "").strip().lower()
if not s:
return False
return ("success" in s) or ("override" in s)
query = (
db.session.query(
Job.id.label("job_id"),
Job.job_name.label("job_name"),
Job.backup_software.label("backup_software"),
Job.backup_type.label("backup_type"),
Customer.name.label("customer_name"),
)
.select_from(Job)
.outerjoin(Customer, Customer.id == Job.customer_id)
.filter(Job.archived.is_(False))
.filter(db.or_(Customer.id.is_(None), Customer.active.is_(True)))
)
match_expr = _contains_all_terms(
[
func.coalesce(Customer.name, ""),
func.coalesce(Job.backup_software, ""),
func.coalesce(Job.backup_type, ""),
func.coalesce(Job.job_name, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[
Customer.name.asc().nullslast(),
Job.backup_software.asc(),
Job.backup_type.asc(),
Job.job_name.asc(),
],
)
_enrich_paging(section, total, current_page, total_pages)
for row in rows:
expected_times = (_infer_schedule_map_from_runs(row.job_id).get(target_date.weekday()) or [])
if not expected_times:
monthly = _infer_monthly_schedule_from_runs(row.job_id)
if monthly:
try:
dom = int(monthly.get("day_of_month") or 0)
except Exception:
dom = 0
mtimes = monthly.get("times") or []
try:
import calendar as _calendar
last_dom = _calendar.monthrange(target_date.year, target_date.month)[1]
except Exception:
last_dom = target_date.day
scheduled_dom = dom if (dom and dom <= last_dom) else last_dom
if target_date.day == scheduled_dom:
expected_times = list(mtimes)
runs_for_day = (
JobRun.query.filter(
JobRun.job_id == row.job_id,
JobRun.run_at >= start_of_day,
JobRun.run_at < end_of_day,
)
.order_by(JobRun.run_at.asc())
.all()
)
run_count = len(runs_for_day)
last_status = "-"
expected_display = expected_times[-1] if expected_times else "-"
if run_count > 0:
last_run = runs_for_day[-1]
try:
job_obj = Job.query.get(int(row.job_id))
status_display, _override_applied, _override_level, _ov_id, _ov_reason = _apply_overrides_to_run(job_obj, last_run)
if getattr(last_run, "missed", False):
last_status = status_display or "Missed"
else:
last_status = status_display or (last_run.status or "-")
except Exception:
last_status = last_run.status or "-"
expected_display = _bucket_15min(last_run.run_at) or expected_display
else:
try:
today_local = datetime.now(tz).date() if tz else datetime.utcnow().date()
except Exception:
today_local = datetime.utcnow().date()
if target_date > today_local:
last_status = "Expected"
elif target_date == today_local:
last_status = "Expected"
else:
if missed_start_date and target_date < missed_start_date:
last_status = "-"
else:
last_status = "Missed"
success_text = "Yes" if _is_success_status(last_status) else "No"
section["items"].append(
{
"title": row.job_name or f"Job #{row.job_id}",
"subtitle": f"{row.customer_name or '-'} | {row.backup_software or '-'} / {row.backup_type or '-'}",
"meta": f"Expected: {expected_display} | Successful: {success_text} | Runs: {run_count}",
"link": url_for("main.daily_jobs", date=target_date.strftime("%Y-%m-%d"), open_job_id=row.job_id),
}
)
return section
def _build_run_checks_results(patterns: list[str], page: int) -> dict:
section = {
"key": "run_checks",
"title": "Run Checks",
"view_all_url": url_for("main.run_checks_page"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("run_checks"):
return section
agg = (
db.session.query(
JobRun.job_id.label("job_id"),
func.count(JobRun.id).label("run_count"),
)
.filter(JobRun.reviewed_at.is_(None))
.group_by(JobRun.job_id)
.subquery()
)
query = (
db.session.query(
Job.id.label("job_id"),
Job.job_name.label("job_name"),
Job.backup_software.label("backup_software"),
Job.backup_type.label("backup_type"),
Customer.name.label("customer_name"),
agg.c.run_count.label("run_count"),
)
.select_from(Job)
.join(agg, agg.c.job_id == Job.id)
.outerjoin(Customer, Customer.id == Job.customer_id)
.filter(Job.archived.is_(False))
)
match_expr = _contains_all_terms(
[
func.coalesce(Customer.name, ""),
func.coalesce(Job.backup_software, ""),
func.coalesce(Job.backup_type, ""),
func.coalesce(Job.job_name, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[
Customer.name.asc().nullslast(),
Job.backup_software.asc().nullslast(),
Job.backup_type.asc().nullslast(),
Job.job_name.asc().nullslast(),
],
)
_enrich_paging(section, total, current_page, total_pages)
for row in rows:
section["items"].append(
{
"title": row.job_name or f"Job #{row.job_id}",
"subtitle": f"{row.customer_name or '-'} | {row.backup_software or '-'} / {row.backup_type or '-'}",
"meta": f"Unreviewed runs: {int(row.run_count or 0)}",
"link": url_for("main.run_checks_page"),
}
)
return section
def _build_tickets_results(patterns: list[str], page: int) -> dict:
section = {
"key": "tickets",
"title": "Tickets",
"view_all_url": url_for("main.tickets_page"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("tickets"):
return section
query = (
db.session.query(Ticket)
.select_from(Ticket)
.outerjoin(TicketScope, TicketScope.ticket_id == Ticket.id)
.outerjoin(Customer, Customer.id == TicketScope.customer_id)
.outerjoin(Job, Job.id == TicketScope.job_id)
)
match_expr = _contains_all_terms(
[
func.coalesce(Ticket.ticket_code, ""),
func.coalesce(Customer.name, ""),
func.coalesce(TicketScope.scope_type, ""),
func.coalesce(TicketScope.backup_software, ""),
func.coalesce(TicketScope.backup_type, ""),
func.coalesce(TicketScope.job_name_match, ""),
func.coalesce(Job.job_name, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
query = query.distinct()
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[Ticket.start_date.desc().nullslast()],
)
_enrich_paging(section, total, current_page, total_pages)
for t in rows:
customer_display = "-"
scope_summary = "-"
try:
scope_rows = (
db.session.query(
TicketScope.scope_type.label("scope_type"),
TicketScope.backup_software.label("backup_software"),
TicketScope.backup_type.label("backup_type"),
Customer.name.label("customer_name"),
)
.select_from(TicketScope)
.outerjoin(Customer, Customer.id == TicketScope.customer_id)
.filter(TicketScope.ticket_id == t.id)
.all()
)
customer_names = []
for s in scope_rows:
cname = getattr(s, "customer_name", None)
if cname and cname not in customer_names:
customer_names.append(cname)
if customer_names:
customer_display = customer_names[0]
if len(customer_names) > 1:
customer_display = f"{customer_display} +{len(customer_names)-1}"
if scope_rows:
s = scope_rows[0]
bits = []
if getattr(s, "scope_type", None):
bits.append(str(getattr(s, "scope_type")))
if getattr(s, "backup_software", None):
bits.append(str(getattr(s, "backup_software")))
if getattr(s, "backup_type", None):
bits.append(str(getattr(s, "backup_type")))
scope_summary = " / ".join(bits) if bits else "-"
except Exception:
customer_display = "-"
scope_summary = "-"
section["items"].append(
{
"title": t.ticket_code or f"Ticket #{t.id}",
"subtitle": f"{customer_display} | {scope_summary}",
"meta": _format_datetime(t.start_date),
"link": url_for("main.ticket_detail", ticket_id=t.id),
}
)
return section
def _build_remarks_results(patterns: list[str], page: int) -> dict:
section = {
"key": "remarks",
"title": "Remarks",
"view_all_url": url_for("main.tickets_page", tab="remarks"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("remarks"):
return section
query = (
db.session.query(Remark)
.select_from(Remark)
.outerjoin(RemarkScope, RemarkScope.remark_id == Remark.id)
.outerjoin(Customer, Customer.id == RemarkScope.customer_id)
.outerjoin(Job, Job.id == RemarkScope.job_id)
)
match_expr = _contains_all_terms(
[
func.coalesce(Remark.title, ""),
func.coalesce(Remark.body, ""),
func.coalesce(Customer.name, ""),
func.coalesce(RemarkScope.scope_type, ""),
func.coalesce(RemarkScope.backup_software, ""),
func.coalesce(RemarkScope.backup_type, ""),
func.coalesce(RemarkScope.job_name_match, ""),
func.coalesce(Job.job_name, ""),
cast(Remark.start_date, String),
cast(Remark.resolved_at, String),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
query = query.distinct()
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[Remark.start_date.desc().nullslast()],
)
_enrich_paging(section, total, current_page, total_pages)
for r in rows:
customer_display = "-"
scope_summary = "-"
try:
scope_rows = (
db.session.query(
RemarkScope.scope_type.label("scope_type"),
RemarkScope.backup_software.label("backup_software"),
RemarkScope.backup_type.label("backup_type"),
Customer.name.label("customer_name"),
)
.select_from(RemarkScope)
.outerjoin(Customer, Customer.id == RemarkScope.customer_id)
.filter(RemarkScope.remark_id == r.id)
.all()
)
customer_names = []
for s in scope_rows:
cname = getattr(s, "customer_name", None)
if cname and cname not in customer_names:
customer_names.append(cname)
if customer_names:
customer_display = customer_names[0]
if len(customer_names) > 1:
customer_display = f"{customer_display} +{len(customer_names)-1}"
if scope_rows:
s = scope_rows[0]
bits = []
if getattr(s, "scope_type", None):
bits.append(str(getattr(s, "scope_type")))
if getattr(s, "backup_software", None):
bits.append(str(getattr(s, "backup_software")))
if getattr(s, "backup_type", None):
bits.append(str(getattr(s, "backup_type")))
scope_summary = " / ".join(bits) if bits else "-"
except Exception:
customer_display = "-"
scope_summary = "-"
preview = (r.title or r.body or "").strip()
if len(preview) > 80:
preview = preview[:77] + "..."
section["items"].append(
{
"title": preview or f"Remark #{r.id}",
"subtitle": f"{customer_display} | {scope_summary}",
"meta": _format_datetime(r.start_date),
"link": url_for("main.remark_detail", remark_id=r.id),
}
)
return section
def _build_overrides_results(patterns: list[str], page: int) -> dict:
section = {
"key": "overrides",
"title": "Existing overrides",
"view_all_url": url_for("main.overrides"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("overrides"):
return section
query = (
db.session.query(
Override.id.label("id"),
Override.level.label("level"),
Override.backup_software.label("backup_software"),
Override.backup_type.label("backup_type"),
Override.object_name.label("object_name"),
Override.start_at.label("start_at"),
Override.end_at.label("end_at"),
Override.comment.label("comment"),
Customer.name.label("customer_name"),
Job.job_name.label("job_name"),
)
.select_from(Override)
.outerjoin(Job, Job.id == Override.job_id)
.outerjoin(Customer, Customer.id == Job.customer_id)
)
match_expr = _contains_all_terms(
[
func.coalesce(Override.level, ""),
func.coalesce(Customer.name, ""),
func.coalesce(Override.backup_software, ""),
func.coalesce(Override.backup_type, ""),
func.coalesce(Job.job_name, ""),
func.coalesce(Override.object_name, ""),
cast(Override.start_at, String),
cast(Override.end_at, String),
func.coalesce(Override.comment, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[Override.level.asc(), Override.start_at.desc()],
)
_enrich_paging(section, total, current_page, total_pages)
for row in rows:
scope_bits = []
if row.customer_name:
scope_bits.append(row.customer_name)
if row.backup_software:
scope_bits.append(row.backup_software)
if row.backup_type:
scope_bits.append(row.backup_type)
if row.job_name:
scope_bits.append(row.job_name)
if row.object_name:
scope_bits.append(f"object: {row.object_name}")
scope_text = " / ".join(scope_bits) if scope_bits else "All jobs"
section["items"].append(
{
"title": (row.level or "override").capitalize(),
"subtitle": scope_text,
"meta": f"From {_format_datetime(row.start_at)} to {_format_datetime(row.end_at) if row.end_at else '-'} | {row.comment or ''}",
"link": url_for("main.overrides"),
}
)
return section
def _build_reports_results(patterns: list[str], page: int) -> dict:
section = {
"key": "reports",
"title": "Reports",
"view_all_url": url_for("main.reports"),
"total": 0,
"items": [],
"current_page": 1,
"total_pages": 1,
"has_prev": False,
"has_next": False,
"prev_url": "",
"next_url": "",
}
if not _is_section_allowed("reports"):
return section
query = ReportDefinition.query
match_expr = _contains_all_terms(
[
func.coalesce(ReportDefinition.name, ""),
func.coalesce(ReportDefinition.report_type, ""),
cast(ReportDefinition.period_start, String),
cast(ReportDefinition.period_end, String),
func.coalesce(ReportDefinition.output_format, ""),
],
patterns,
)
if match_expr is not None:
query = query.filter(match_expr)
total, current_page, total_pages, rows = _paginate_query(
query,
page,
[ReportDefinition.created_at.desc()],
)
_enrich_paging(section, total, current_page, total_pages)
can_edit = get_active_role() in ("admin", "operator", "reporter")
for r in rows:
section["items"].append(
{
"title": r.name or f"Report #{r.id}",
"subtitle": f"{r.report_type or '-'} | {r.output_format or '-'}",
"meta": f"{_format_datetime(r.period_start)} -> {_format_datetime(r.period_end)}",
"link": (url_for("main.reports_edit", report_id=r.id) if can_edit else url_for("main.reports")),
}
)
return section
@main_bp.route("/search")
@login_required
def search_page():
query = (request.args.get("q") or "").strip()
patterns = _build_patterns(query)
requested_pages = {
key: _parse_page(request.args.get(f"p_{key}"))
for key in SEARCH_SECTION_KEYS
}
sections = []
if patterns:
sections.append(_build_inbox_results(patterns, requested_pages["inbox"]))
sections.append(_build_customers_results(patterns, requested_pages["customers"]))
sections.append(_build_jobs_results(patterns, requested_pages["jobs"]))
sections.append(_build_daily_jobs_results(patterns, requested_pages["daily_jobs"]))
sections.append(_build_run_checks_results(patterns, requested_pages["run_checks"]))
sections.append(_build_tickets_results(patterns, requested_pages["tickets"]))
sections.append(_build_remarks_results(patterns, requested_pages["remarks"]))
sections.append(_build_overrides_results(patterns, requested_pages["overrides"]))
sections.append(_build_reports_results(patterns, requested_pages["reports"]))
else:
sections = [
{"key": "inbox", "title": "Inbox", "view_all_url": url_for("main.inbox"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "customers", "title": "Customers", "view_all_url": url_for("main.customers"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "jobs", "title": "Jobs", "view_all_url": url_for("main.jobs"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "daily_jobs", "title": "Daily Jobs", "view_all_url": url_for("main.daily_jobs"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "run_checks", "title": "Run Checks", "view_all_url": url_for("main.run_checks_page"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "tickets", "title": "Tickets", "view_all_url": url_for("main.tickets_page"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "remarks", "title": "Remarks", "view_all_url": url_for("main.tickets_page", tab="remarks"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "overrides", "title": "Existing overrides", "view_all_url": url_for("main.overrides"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
{"key": "reports", "title": "Reports", "view_all_url": url_for("main.reports"), "total": 0, "items": [], "current_page": 1, "total_pages": 1, "has_prev": False, "has_next": False, "prev_url": "", "next_url": ""},
]
visible_sections = [s for s in sections if _is_section_allowed(s["key"])]
current_pages = {
s["key"]: int(s.get("current_page", 1) or 1)
for s in sections
}
def _build_search_url(page_overrides: dict[str, int]) -> str:
args = {"q": query}
for key in SEARCH_SECTION_KEYS:
args[f"p_{key}"] = int(page_overrides.get(key, current_pages.get(key, 1)))
return url_for("main.search_page", **args)
for s in visible_sections:
key = s["key"]
cur = int(s.get("current_page", 1) or 1)
if query:
if key == "inbox":
s["view_all_url"] = url_for("main.inbox", q=query)
elif key == "customers":
s["view_all_url"] = url_for("main.customers", q=query)
elif key == "jobs":
s["view_all_url"] = url_for("main.jobs", q=query)
elif key == "daily_jobs":
s["view_all_url"] = url_for("main.daily_jobs", q=query)
elif key == "run_checks":
s["view_all_url"] = url_for("main.run_checks_page", q=query)
elif key == "tickets":
s["view_all_url"] = url_for("main.tickets_page", q=query)
elif key == "remarks":
s["view_all_url"] = url_for("main.tickets_page", tab="remarks", q=query)
elif key == "overrides":
s["view_all_url"] = url_for("main.overrides", q=query)
elif key == "reports":
s["view_all_url"] = url_for("main.reports", q=query)
if s.get("has_prev"):
prev_pages = dict(current_pages)
prev_pages[key] = cur - 1
s["prev_url"] = _build_search_url(prev_pages)
if s.get("has_next"):
next_pages = dict(current_pages)
next_pages[key] = cur + 1
s["next_url"] = _build_search_url(next_pages)
total_hits = sum(int(s.get("total", 0) or 0) for s in visible_sections)
return render_template(
"main/search.html",
query=query,
sections=visible_sections,
total_hits=total_hits,
limit_per_section=SEARCH_LIMIT_PER_SECTION,
)