Files

135 lines
4.1 KiB
Python

"""Dead-letter queue (DLQ) support and replay tooling.
When a worker fails to process a job after exhausting retries, the job
is pushed to a per-queue dead-letter list in Redis. Each DLQ entry
wraps the original payload with failure metadata (error message,
timestamp, attempt count) so operators can inspect and replay later.
Replay moves items from the DLQ back to the source queue for
reprocessing.
Requirements: 12.1 (observability), design section 8 (data flows)
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from typing import Any
import redis.asyncio as aioredis
from services.shared.redis_keys import dlq_key, queue_key
logger = logging.getLogger(__name__)
# Default max attempts before a job is dead-lettered
DEFAULT_MAX_ATTEMPTS = 3
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def wrap_dlq_entry(
original_payload: dict[str, Any],
queue_name: str,
error: str,
attempt: int = 1,
worker: str = "",
) -> dict[str, Any]:
"""Wrap an original job payload with DLQ metadata."""
return {
"original_payload": original_payload,
"queue": queue_name,
"error": error,
"attempt": attempt,
"worker": worker,
"dead_lettered_at": _now_iso(),
}
async def send_to_dlq(
rds: aioredis.Redis,
queue_name: str,
original_payload: dict[str, Any],
error: str,
attempt: int = 1,
worker: str = "",
) -> None:
"""Push a failed job to the dead-letter queue for *queue_name*."""
entry = wrap_dlq_entry(original_payload, queue_name, error, attempt, worker)
await rds.rpush(dlq_key(queue_name), json.dumps(entry, default=str))
logger.warning(
"Dead-lettered job on %s after %d attempts: %s",
queue_name, attempt, error,
extra={"queue": queue_name, "attempt": attempt},
)
async def dlq_length(rds: aioredis.Redis, queue_name: str) -> int:
"""Return the number of items in the DLQ for *queue_name*."""
return await rds.llen(dlq_key(queue_name))
async def peek_dlq(
rds: aioredis.Redis,
queue_name: str,
start: int = 0,
count: int = 10,
) -> list[dict[str, Any]]:
"""Return DLQ entries without removing them (for inspection)."""
raw_items = await rds.lrange(dlq_key(queue_name), start, start + count - 1)
return [json.loads(item) for item in raw_items]
async def replay_one(rds: aioredis.Redis, queue_name: str) -> dict[str, Any] | None:
"""Pop the oldest DLQ entry and re-enqueue its original payload.
Returns the replayed DLQ entry, or None if the DLQ is empty.
"""
raw = await rds.lpop(dlq_key(queue_name))
if raw is None:
return None
entry = json.loads(raw)
original = entry.get("original_payload", entry)
await rds.rpush(queue_key(queue_name), json.dumps(original, default=str))
logger.info("Replayed 1 job from DLQ back to %s", queue_name)
return entry
async def replay_all(rds: aioredis.Redis, queue_name: str) -> int:
"""Replay every item in the DLQ back to the source queue.
Returns the number of items replayed.
"""
count = 0
while True:
raw = await rds.lpop(dlq_key(queue_name))
if raw is None:
break
entry = json.loads(raw)
original = entry.get("original_payload", entry)
await rds.rpush(queue_key(queue_name), json.dumps(original, default=str))
count += 1
if count:
logger.info("Replayed %d jobs from DLQ back to %s", count, queue_name)
return count
async def purge_dlq(rds: aioredis.Redis, queue_name: str) -> int:
"""Delete all items from the DLQ for *queue_name*. Returns count removed."""
key = dlq_key(queue_name)
length = await rds.llen(key)
if length:
await rds.delete(key)
return length
async def dlq_summary(rds: aioredis.Redis, queue_names: list[str]) -> dict[str, int]:
"""Return a mapping of queue_name -> DLQ depth for the given queues."""
result: dict[str, int] = {}
for name in queue_names:
result[name] = await rds.llen(dlq_key(name))
return result