fix: add stale document recovery sweep to scheduler, re-enqueues orphaned parsed docs every 5 min
This commit is contained in:
@@ -19,7 +19,9 @@ from services.shared.config import load_config
|
||||
from services.shared.db import get_pg_pool, get_redis
|
||||
from services.shared.logging import setup_logging
|
||||
from services.shared.redis_keys import (
|
||||
QUEUE_EXTRACTION,
|
||||
QUEUE_INGESTION,
|
||||
QUEUE_MACRO_CLASSIFICATION,
|
||||
lock_key,
|
||||
queue_key,
|
||||
rate_limit_key,
|
||||
@@ -472,12 +474,18 @@ async def main() -> None:
|
||||
rds = get_redis(config)
|
||||
|
||||
logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK)
|
||||
recovery_counter = 0
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
if await acquire_lock(rds, "scheduler_cycle", ttl=30):
|
||||
try:
|
||||
await schedule_cycle(pool, rds)
|
||||
# Run stale document recovery every ~20 cycles (~5 minutes)
|
||||
recovery_counter += 1
|
||||
if recovery_counter >= 20:
|
||||
recovery_counter = 0
|
||||
await recover_stale_documents(pool, rds)
|
||||
finally:
|
||||
await release_lock(rds, "scheduler_cycle")
|
||||
except Exception:
|
||||
@@ -488,5 +496,51 @@ async def main() -> None:
|
||||
await rds.close()
|
||||
|
||||
|
||||
# How long a document can sit in "parsed" before we consider it orphaned
|
||||
STALE_PARSED_THRESHOLD_MINUTES: int = 30
|
||||
|
||||
|
||||
async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
||||
"""Re-enqueue documents stuck in 'parsed' status for extraction.
|
||||
|
||||
Documents can get orphaned when Redis loses queue entries (pod restart,
|
||||
OOM, etc.). This sweep catches any document that has been in 'parsed'
|
||||
status for longer than STALE_PARSED_THRESHOLD_MINUTES and re-enqueues
|
||||
it for extraction.
|
||||
|
||||
Returns the number of documents re-enqueued.
|
||||
"""
|
||||
rows = await pool.fetch(
|
||||
"""SELECT d.id, d.document_type, dcm.ticker
|
||||
FROM documents d
|
||||
LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id
|
||||
WHERE d.status = 'parsed'
|
||||
AND d.updated_at < NOW() - INTERVAL '1 minute' * $1
|
||||
ORDER BY d.created_at ASC
|
||||
LIMIT 100""",
|
||||
STALE_PARSED_THRESHOLD_MINUTES,
|
||||
)
|
||||
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
enqueued = 0
|
||||
for row in rows:
|
||||
doc_type = row["document_type"]
|
||||
if doc_type == "macro_event":
|
||||
target = queue_key(QUEUE_MACRO_CLASSIFICATION)
|
||||
else:
|
||||
target = queue_key(QUEUE_EXTRACTION)
|
||||
|
||||
await rds.rpush(target, json.dumps({
|
||||
"document_id": str(row["id"]),
|
||||
"ticker": row["ticker"] or "",
|
||||
}))
|
||||
enqueued += 1
|
||||
|
||||
logger.info("Recovered %d stale parsed documents for extraction", enqueued)
|
||||
return enqueued
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
Reference in New Issue
Block a user