Fix Cove run creation transaction scope

This commit is contained in:
Ivo Oskamp 2026-03-02 16:46:44 +01:00
parent eff1cb3700
commit 2200b0cb00
2 changed files with 70 additions and 66 deletions

View File

@ -464,73 +464,69 @@ def _persist_datasource_objects(
observed_at: datetime, observed_at: datetime,
) -> None: ) -> None:
"""Create run_object_links for each active datasource found in the account stats.""" """Create run_object_links for each active datasource found in the account stats."""
engine = db.get_engine() for ds_prefix, ds_label in DATASOURCE_LABELS.items():
status_key = f"{ds_prefix}F00"
status_code = flat.get(status_key)
if status_code is None:
continue
with engine.begin() as conn: status = _map_status(status_code)
for ds_prefix, ds_label in DATASOURCE_LABELS.items(): ds_last_ts = _ts_to_dt(flat.get(f"{ds_prefix}F15"))
status_key = f"{ds_prefix}F00" status_msg = (
status_code = flat.get(status_key) f"Cove datasource status: {_status_label(status_code)} "
if status_code is None: f"({status_code}); last session: {_fmt_utc(ds_last_ts)}"
continue )
status = _map_status(status_code) # Use the same SQLAlchemy session/transaction as JobRun creation.
ds_last_ts = _ts_to_dt(flat.get(f"{ds_prefix}F15")) # A separate engine connection cannot reliably see the uncommitted run row.
status_msg = ( customer_object_id = db.session.execute(
f"Cove datasource status: {_status_label(status_code)} " text(
f"({status_code}); last session: {_fmt_utc(ds_last_ts)}" """
) INSERT INTO customer_objects (customer_id, object_name, object_type, first_seen_at, last_seen_at)
VALUES (:customer_id, :object_name, :object_type, NOW(), NOW())
ON CONFLICT (customer_id, object_name)
DO UPDATE SET
last_seen_at = NOW(),
object_type = COALESCE(EXCLUDED.object_type, customer_objects.object_type)
RETURNING id
"""
),
{
"customer_id": customer_id,
"object_name": ds_label,
"object_type": "cove_datasource",
},
).scalar()
# Upsert customer_objects db.session.execute(
customer_object_id = conn.execute( text(
text( """
""" INSERT INTO job_object_links (job_id, customer_object_id, first_seen_at, last_seen_at)
INSERT INTO customer_objects (customer_id, object_name, object_type, first_seen_at, last_seen_at) VALUES (:job_id, :customer_object_id, NOW(), NOW())
VALUES (:customer_id, :object_name, :object_type, NOW(), NOW()) ON CONFLICT (job_id, customer_object_id)
ON CONFLICT (customer_id, object_name) DO UPDATE SET last_seen_at = NOW()
DO UPDATE SET """
last_seen_at = NOW(), ),
object_type = COALESCE(EXCLUDED.object_type, customer_objects.object_type) {"job_id": job_id, "customer_object_id": customer_object_id},
RETURNING id )
"""
),
{
"customer_id": customer_id,
"object_name": ds_label,
"object_type": "cove_datasource",
},
).scalar()
# Upsert job_object_links db.session.execute(
conn.execute( text(
text( """
""" INSERT INTO run_object_links (run_id, customer_object_id, status, error_message, observed_at)
INSERT INTO job_object_links (job_id, customer_object_id, first_seen_at, last_seen_at) VALUES (:run_id, :customer_object_id, :status, :error_message, :observed_at)
VALUES (:job_id, :customer_object_id, NOW(), NOW()) ON CONFLICT (run_id, customer_object_id)
ON CONFLICT (job_id, customer_object_id) DO UPDATE SET
DO UPDATE SET last_seen_at = NOW() status = EXCLUDED.status,
""" error_message = EXCLUDED.error_message,
), observed_at = EXCLUDED.observed_at
{"job_id": job_id, "customer_object_id": customer_object_id}, """
) ),
{
# Upsert run_object_links "run_id": run_id,
conn.execute( "customer_object_id": customer_object_id,
text( "status": status,
""" "error_message": status_msg,
INSERT INTO run_object_links (run_id, customer_object_id, status, error_message, observed_at) "observed_at": ds_last_ts or observed_at,
VALUES (:run_id, :customer_object_id, :status, :error_message, :observed_at) },
ON CONFLICT (run_id, customer_object_id) )
DO UPDATE SET
status = EXCLUDED.status,
error_message = EXCLUDED.error_message,
observed_at = EXCLUDED.observed_at
"""
),
{
"run_id": run_id,
"customer_object_id": customer_object_id,
"status": status,
"error_message": status_msg,
"observed_at": ds_last_ts or observed_at,
},
)

View File

@ -2,6 +2,14 @@
This file documents all changes made to this project via Claude Code. This file documents all changes made to this project via Claude Code.
## [2026-03-02]
### Fixed
- Cove run creation after account linking/import:
- Fixed transaction scope in `app/cove_importer.py` for datasource object persistence.
- `run_object_links` / related upserts now use the same SQLAlchemy session transaction as `JobRun` creation instead of a separate engine connection.
- Prevents FK/visibility issues where a new uncommitted `JobRun` was not visible to a second connection, causing run creation to roll back and resulting in no Cove runs appearing.
## [2026-02-27] ## [2026-02-27]
### Added ### Added