From 87579d68da3180a2e6b85bb8804911c3179928f2 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 07:59:30 +0000 Subject: [PATCH] fix: add stale document recovery sweep to scheduler, re-enqueues orphaned parsed docs every 5 min --- services/scheduler/app.py | 54 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 34dfbe7..792f92e 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -19,7 +19,9 @@ from services.shared.config import load_config from services.shared.db import get_pg_pool, get_redis from services.shared.logging import setup_logging from services.shared.redis_keys import ( + QUEUE_EXTRACTION, QUEUE_INGESTION, + QUEUE_MACRO_CLASSIFICATION, lock_key, queue_key, rate_limit_key, @@ -472,12 +474,18 @@ async def main() -> None: rds = get_redis(config) logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK) + recovery_counter = 0 try: while True: try: if await acquire_lock(rds, "scheduler_cycle", ttl=30): try: await schedule_cycle(pool, rds) + # Run stale document recovery every ~20 cycles (~5 minutes) + recovery_counter += 1 + if recovery_counter >= 20: + recovery_counter = 0 + await recover_stale_documents(pool, rds) finally: await release_lock(rds, "scheduler_cycle") except Exception: @@ -488,5 +496,51 @@ async def main() -> None: await rds.close() +# How long a document can sit in "parsed" before we consider it orphaned +STALE_PARSED_THRESHOLD_MINUTES: int = 30 + + +async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: + """Re-enqueue documents stuck in 'parsed' status for extraction. + + Documents can get orphaned when Redis loses queue entries (pod restart, + OOM, etc.). This sweep catches any document that has been in 'parsed' + status for longer than STALE_PARSED_THRESHOLD_MINUTES and re-enqueues + it for extraction. + + Returns the number of documents re-enqueued. + """ + rows = await pool.fetch( + """SELECT d.id, d.document_type, dcm.ticker + FROM documents d + LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id + WHERE d.status = 'parsed' + AND d.updated_at < NOW() - INTERVAL '1 minute' * $1 + ORDER BY d.created_at ASC + LIMIT 100""", + STALE_PARSED_THRESHOLD_MINUTES, + ) + + if not rows: + return 0 + + enqueued = 0 + for row in rows: + doc_type = row["document_type"] + if doc_type == "macro_event": + target = queue_key(QUEUE_MACRO_CLASSIFICATION) + else: + target = queue_key(QUEUE_EXTRACTION) + + await rds.rpush(target, json.dumps({ + "document_id": str(row["id"]), + "ticker": row["ticker"] or "", + })) + enqueued += 1 + + logger.info("Recovered %d stale parsed documents for extraction", enqueued) + return enqueued + + if __name__ == "__main__": asyncio.run(main())