fix: recovery sweep touches updated_at after re-enqueue to prevent duplicate flooding every 5 min
This commit is contained in:
@@ -534,6 +534,7 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
enqueued = 0
|
enqueued = 0
|
||||||
|
doc_ids = []
|
||||||
for row in rows:
|
for row in rows:
|
||||||
doc_type = row["document_type"]
|
doc_type = row["document_type"]
|
||||||
if doc_type == "macro_event":
|
if doc_type == "macro_event":
|
||||||
@@ -545,8 +546,16 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in
|
|||||||
"document_id": str(row["id"]),
|
"document_id": str(row["id"]),
|
||||||
"ticker": row["ticker"] or "",
|
"ticker": row["ticker"] or "",
|
||||||
}))
|
}))
|
||||||
|
doc_ids.append(row["id"])
|
||||||
enqueued += 1
|
enqueued += 1
|
||||||
|
|
||||||
|
# Touch updated_at so these docs won't be re-enqueued until the threshold passes again
|
||||||
|
if doc_ids:
|
||||||
|
await pool.execute(
|
||||||
|
"UPDATE documents SET updated_at = NOW() WHERE id = ANY($1::uuid[])",
|
||||||
|
doc_ids,
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("Recovered %d stale parsed documents for extraction", enqueued)
|
logger.info("Recovered %d stale parsed documents for extraction", enqueued)
|
||||||
return enqueued
|
return enqueued
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user