fix: prevent duplicate queue entries with Redis SET markers
Recovery sweeps and the retry endpoint now check a per-document Redis key (SET NX, 1h TTL) before pushing to the queue. If the marker exists, the doc is already enqueued and gets skipped. This prevents the scheduler from re-enqueuing the same parsed docs every 5 minutes.
This commit is contained in:
+9
-4
@@ -1891,6 +1891,7 @@ async def retry_failed_extractions_endpoint():
|
|||||||
return {"retried": 0, "message": "No extraction-failed documents to retry"}
|
return {"retried": 0, "message": "No extraction-failed documents to retry"}
|
||||||
|
|
||||||
doc_ids = []
|
doc_ids = []
|
||||||
|
enqueued_set_prefix = f"{QUEUE_PREFIX}:enqueued"
|
||||||
for row in rows:
|
for row in rows:
|
||||||
doc_type = row["document_type"]
|
doc_type = row["document_type"]
|
||||||
if doc_type == "macro_event":
|
if doc_type == "macro_event":
|
||||||
@@ -1898,10 +1899,14 @@ async def retry_failed_extractions_endpoint():
|
|||||||
else:
|
else:
|
||||||
target = queue_key("extraction")
|
target = queue_key("extraction")
|
||||||
|
|
||||||
await rds.rpush(target, json.dumps({
|
doc_id = str(row["id"])
|
||||||
"document_id": str(row["id"]),
|
marker = f"{enqueued_set_prefix}:{doc_id}"
|
||||||
"ticker": row["ticker"] or "",
|
added = await rds.set(marker, "1", nx=True, ex=3600)
|
||||||
}))
|
if added:
|
||||||
|
await rds.rpush(target, json.dumps({
|
||||||
|
"document_id": doc_id,
|
||||||
|
"ticker": row["ticker"] or "",
|
||||||
|
}))
|
||||||
doc_ids.append(row["id"])
|
doc_ids.append(row["id"])
|
||||||
|
|
||||||
# Delete failed intelligence rows so extractor starts fresh
|
# Delete failed intelligence rows so extractor starts fresh
|
||||||
|
|||||||
+38
-12
@@ -22,6 +22,7 @@ from services.shared.redis_keys import (
|
|||||||
QUEUE_EXTRACTION,
|
QUEUE_EXTRACTION,
|
||||||
QUEUE_INGESTION,
|
QUEUE_INGESTION,
|
||||||
QUEUE_MACRO_CLASSIFICATION,
|
QUEUE_MACRO_CLASSIFICATION,
|
||||||
|
QUEUE_PREFIX,
|
||||||
lock_key,
|
lock_key,
|
||||||
queue_key,
|
queue_key,
|
||||||
rate_limit_key,
|
rate_limit_key,
|
||||||
@@ -538,6 +539,35 @@ STALE_PARSED_THRESHOLD_MINUTES: int = 30
|
|||||||
# How long after an extraction failure before we retry
|
# How long after an extraction failure before we retry
|
||||||
EXTRACTION_FAILED_RETRY_MINUTES: int = 60
|
EXTRACTION_FAILED_RETRY_MINUTES: int = 60
|
||||||
|
|
||||||
|
# Redis set key for tracking enqueued doc IDs (prevents duplicate enqueuing)
|
||||||
|
_ENQUEUED_SET = f"{QUEUE_PREFIX}:enqueued"
|
||||||
|
# How long an enqueued marker lives before it can be re-enqueued (seconds)
|
||||||
|
_ENQUEUED_TTL = 3600
|
||||||
|
|
||||||
|
|
||||||
|
async def _enqueue_if_new(
|
||||||
|
rds: aioredis.Redis,
|
||||||
|
queue: str,
|
||||||
|
document_id: str,
|
||||||
|
ticker: str,
|
||||||
|
) -> bool:
|
||||||
|
"""Push a job onto *queue* only if *document_id* isn't already tracked.
|
||||||
|
|
||||||
|
Uses a Redis SET with per-member expiry (via a separate key) to prevent
|
||||||
|
the same document from being enqueued multiple times by recovery sweeps.
|
||||||
|
Returns True if enqueued, False if skipped as duplicate.
|
||||||
|
"""
|
||||||
|
marker_key = f"{_ENQUEUED_SET}:{document_id}"
|
||||||
|
# SET NX returns True only if the key was created (not already present)
|
||||||
|
added = await rds.set(marker_key, "1", nx=True, ex=_ENQUEUED_TTL)
|
||||||
|
if not added:
|
||||||
|
return False
|
||||||
|
await rds.rpush(queue, json.dumps({
|
||||||
|
"document_id": document_id,
|
||||||
|
"ticker": ticker,
|
||||||
|
}))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
||||||
"""Re-enqueue documents stuck in 'parsed' status for extraction.
|
"""Re-enqueue documents stuck in 'parsed' status for extraction.
|
||||||
@@ -575,12 +605,10 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in
|
|||||||
else:
|
else:
|
||||||
target = queue_key(QUEUE_EXTRACTION)
|
target = queue_key(QUEUE_EXTRACTION)
|
||||||
|
|
||||||
await rds.rpush(target, json.dumps({
|
added = await _enqueue_if_new(rds, target, str(row["id"]), row["ticker"] or "")
|
||||||
"document_id": str(row["id"]),
|
if added:
|
||||||
"ticker": row["ticker"] or "",
|
doc_ids.append(row["id"])
|
||||||
}))
|
enqueued += 1
|
||||||
doc_ids.append(row["id"])
|
|
||||||
enqueued += 1
|
|
||||||
|
|
||||||
# Touch updated_at so these docs won't be re-enqueued until the threshold passes again
|
# Touch updated_at so these docs won't be re-enqueued until the threshold passes again
|
||||||
if doc_ids:
|
if doc_ids:
|
||||||
@@ -627,12 +655,10 @@ async def retry_failed_extractions(pool: asyncpg.Pool, rds: aioredis.Redis) -> i
|
|||||||
else:
|
else:
|
||||||
target = queue_key(QUEUE_EXTRACTION)
|
target = queue_key(QUEUE_EXTRACTION)
|
||||||
|
|
||||||
await rds.rpush(target, json.dumps({
|
added = await _enqueue_if_new(rds, target, str(row["id"]), row["ticker"] or "")
|
||||||
"document_id": str(row["id"]),
|
if added:
|
||||||
"ticker": row["ticker"] or "",
|
doc_ids.append(row["id"])
|
||||||
}))
|
enqueued += 1
|
||||||
doc_ids.append(row["id"])
|
|
||||||
enqueued += 1
|
|
||||||
|
|
||||||
if doc_ids:
|
if doc_ids:
|
||||||
# Delete failed intelligence rows so extractor starts fresh
|
# Delete failed intelligence rows so extractor starts fresh
|
||||||
|
|||||||
Reference in New Issue
Block a user