From 2200b0cb00319d415d59248834eed05ffa0dd4d9 Mon Sep 17 00:00:00 2001 From: Ivo Oskamp Date: Mon, 2 Mar 2026 16:46:44 +0100 Subject: [PATCH] Fix Cove run creation transaction scope --- .../src/backend/app/cove_importer.py | 128 +++++++++--------- docs/changelog-claude.md | 8 ++ 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/containers/backupchecks/src/backend/app/cove_importer.py b/containers/backupchecks/src/backend/app/cove_importer.py index 08a7095..974dc94 100644 --- a/containers/backupchecks/src/backend/app/cove_importer.py +++ b/containers/backupchecks/src/backend/app/cove_importer.py @@ -464,73 +464,69 @@ def _persist_datasource_objects( observed_at: datetime, ) -> None: """Create run_object_links for each active datasource found in the account stats.""" - engine = db.get_engine() + for ds_prefix, ds_label in DATASOURCE_LABELS.items(): + status_key = f"{ds_prefix}F00" + status_code = flat.get(status_key) + if status_code is None: + continue - with engine.begin() as conn: - for ds_prefix, ds_label in DATASOURCE_LABELS.items(): - status_key = f"{ds_prefix}F00" - status_code = flat.get(status_key) - if status_code is None: - continue + status = _map_status(status_code) + ds_last_ts = _ts_to_dt(flat.get(f"{ds_prefix}F15")) + status_msg = ( + f"Cove datasource status: {_status_label(status_code)} " + f"({status_code}); last session: {_fmt_utc(ds_last_ts)}" + ) - status = _map_status(status_code) - ds_last_ts = _ts_to_dt(flat.get(f"{ds_prefix}F15")) - status_msg = ( - f"Cove datasource status: {_status_label(status_code)} " - f"({status_code}); last session: {_fmt_utc(ds_last_ts)}" - ) + # Use the same SQLAlchemy session/transaction as JobRun creation. + # A separate engine connection cannot reliably see the uncommitted run row. + customer_object_id = db.session.execute( + text( + """ + INSERT INTO customer_objects (customer_id, object_name, object_type, first_seen_at, last_seen_at) + VALUES (:customer_id, :object_name, :object_type, NOW(), NOW()) + ON CONFLICT (customer_id, object_name) + DO UPDATE SET + last_seen_at = NOW(), + object_type = COALESCE(EXCLUDED.object_type, customer_objects.object_type) + RETURNING id + """ + ), + { + "customer_id": customer_id, + "object_name": ds_label, + "object_type": "cove_datasource", + }, + ).scalar() - # Upsert customer_objects - customer_object_id = conn.execute( - text( - """ - INSERT INTO customer_objects (customer_id, object_name, object_type, first_seen_at, last_seen_at) - VALUES (:customer_id, :object_name, :object_type, NOW(), NOW()) - ON CONFLICT (customer_id, object_name) - DO UPDATE SET - last_seen_at = NOW(), - object_type = COALESCE(EXCLUDED.object_type, customer_objects.object_type) - RETURNING id - """ - ), - { - "customer_id": customer_id, - "object_name": ds_label, - "object_type": "cove_datasource", - }, - ).scalar() + db.session.execute( + text( + """ + INSERT INTO job_object_links (job_id, customer_object_id, first_seen_at, last_seen_at) + VALUES (:job_id, :customer_object_id, NOW(), NOW()) + ON CONFLICT (job_id, customer_object_id) + DO UPDATE SET last_seen_at = NOW() + """ + ), + {"job_id": job_id, "customer_object_id": customer_object_id}, + ) - # Upsert job_object_links - conn.execute( - text( - """ - INSERT INTO job_object_links (job_id, customer_object_id, first_seen_at, last_seen_at) - VALUES (:job_id, :customer_object_id, NOW(), NOW()) - ON CONFLICT (job_id, customer_object_id) - DO UPDATE SET last_seen_at = NOW() - """ - ), - {"job_id": job_id, "customer_object_id": customer_object_id}, - ) - - # Upsert run_object_links - conn.execute( - text( - """ - INSERT INTO run_object_links (run_id, customer_object_id, status, error_message, observed_at) - VALUES (:run_id, :customer_object_id, :status, :error_message, :observed_at) - ON CONFLICT (run_id, customer_object_id) - DO UPDATE SET - status = EXCLUDED.status, - error_message = EXCLUDED.error_message, - observed_at = EXCLUDED.observed_at - """ - ), - { - "run_id": run_id, - "customer_object_id": customer_object_id, - "status": status, - "error_message": status_msg, - "observed_at": ds_last_ts or observed_at, - }, - ) + db.session.execute( + text( + """ + INSERT INTO run_object_links (run_id, customer_object_id, status, error_message, observed_at) + VALUES (:run_id, :customer_object_id, :status, :error_message, :observed_at) + ON CONFLICT (run_id, customer_object_id) + DO UPDATE SET + status = EXCLUDED.status, + error_message = EXCLUDED.error_message, + observed_at = EXCLUDED.observed_at + """ + ), + { + "run_id": run_id, + "customer_object_id": customer_object_id, + "status": status, + "error_message": status_msg, + "observed_at": ds_last_ts or observed_at, + }, + ) diff --git a/docs/changelog-claude.md b/docs/changelog-claude.md index 1bdfb36..c2f661d 100644 --- a/docs/changelog-claude.md +++ b/docs/changelog-claude.md @@ -2,6 +2,14 @@ This file documents all changes made to this project via Claude Code. +## [2026-03-02] + +### Fixed +- Cove run creation after account linking/import: + - Fixed transaction scope in `app/cove_importer.py` for datasource object persistence. + - `run_object_links` / related upserts now use the same SQLAlchemy session transaction as `JobRun` creation instead of a separate engine connection. + - Prevents FK/visibility issues where a new uncommitted `JobRun` was not visible to a second connection, causing run creation to roll back and resulting in no Cove runs appearing. + ## [2026-02-27] ### Added