From 1a5fb2e36a7421a124697eb74290d684e4dcaf26 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 19:13:10 +0000 Subject: [PATCH] fix: recovery sweep touches updated_at after re-enqueue to prevent duplicate flooding every 5 min --- services/scheduler/app.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 63c485b..2181f1e 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -534,6 +534,7 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in return 0 enqueued = 0 + doc_ids = [] for row in rows: doc_type = row["document_type"] if doc_type == "macro_event": @@ -545,8 +546,16 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in "document_id": str(row["id"]), "ticker": row["ticker"] or "", })) + doc_ids.append(row["id"]) enqueued += 1 + # Touch updated_at so these docs won't be re-enqueued until the threshold passes again + if doc_ids: + await pool.execute( + "UPDATE documents SET updated_at = NOW() WHERE id = ANY($1::uuid[])", + doc_ids, + ) + logger.info("Recovered %d stale parsed documents for extraction", enqueued) return enqueued