diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 63c485b..2181f1e 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -534,6 +534,7 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in return 0 enqueued = 0 + doc_ids = [] for row in rows: doc_type = row["document_type"] if doc_type == "macro_event": @@ -545,8 +546,16 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in "document_id": str(row["id"]), "ticker": row["ticker"] or "", })) + doc_ids.append(row["id"]) enqueued += 1 + # Touch updated_at so these docs won't be re-enqueued until the threshold passes again + if doc_ids: + await pool.execute( + "UPDATE documents SET updated_at = NOW() WHERE id = ANY($1::uuid[])", + doc_ids, + ) + logger.info("Recovered %d stale parsed documents for extraction", enqueued) return enqueued