209 lines
6.2 KiB
Python
209 lines
6.2 KiB
Python
"""Tests for dead-letter queue support and replay tooling."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from services.shared.dead_letter import (
|
|
DEFAULT_MAX_ATTEMPTS,
|
|
dlq_length,
|
|
dlq_summary,
|
|
peek_dlq,
|
|
purge_dlq,
|
|
replay_all,
|
|
replay_one,
|
|
send_to_dlq,
|
|
wrap_dlq_entry,
|
|
)
|
|
from services.shared.redis_keys import dlq_key, queue_key
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class FakeRedis:
|
|
"""Minimal async Redis fake backed by plain dicts."""
|
|
|
|
def __init__(self):
|
|
self._data: dict[str, list[str]] = {}
|
|
|
|
async def rpush(self, key: str, value: str) -> int:
|
|
self._data.setdefault(key, []).append(value)
|
|
return len(self._data[key])
|
|
|
|
async def lpop(self, key: str) -> str | None:
|
|
lst = self._data.get(key, [])
|
|
if not lst:
|
|
return None
|
|
return lst.pop(0)
|
|
|
|
async def llen(self, key: str) -> int:
|
|
return len(self._data.get(key, []))
|
|
|
|
async def lrange(self, key: str, start: int, end: int) -> list[str]:
|
|
lst = self._data.get(key, [])
|
|
return lst[start:end + 1]
|
|
|
|
async def delete(self, key: str) -> int:
|
|
if key in self._data:
|
|
del self._data[key]
|
|
return 1
|
|
return 0
|
|
|
|
|
|
@pytest.fixture
|
|
def rds():
|
|
return FakeRedis()
|
|
|
|
|
|
SAMPLE_JOB = {"ticker": "AAPL", "source_type": "news_api", "source_id": "src-1"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# wrap_dlq_entry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_wrap_dlq_entry_structure():
|
|
entry = wrap_dlq_entry(SAMPLE_JOB, "ingestion", "timeout", attempt=2, worker="ingestion_worker")
|
|
assert entry["original_payload"] == SAMPLE_JOB
|
|
assert entry["queue"] == "ingestion"
|
|
assert entry["error"] == "timeout"
|
|
assert entry["attempt"] == 2
|
|
assert entry["worker"] == "ingestion_worker"
|
|
assert "dead_lettered_at" in entry
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# send_to_dlq / dlq_length
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_to_dlq_and_length(rds):
|
|
await send_to_dlq(rds, "parsing", SAMPLE_JOB, error="parse failure", attempt=3)
|
|
length = await dlq_length(rds, "parsing")
|
|
assert length == 1
|
|
|
|
# Verify the stored entry
|
|
raw = rds._data[dlq_key("parsing")][0]
|
|
entry = json.loads(raw)
|
|
assert entry["original_payload"] == SAMPLE_JOB
|
|
assert entry["error"] == "parse failure"
|
|
assert entry["attempt"] == 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# peek_dlq
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_peek_dlq(rds):
|
|
for i in range(5):
|
|
await send_to_dlq(rds, "extraction", {"doc": i}, error=f"err-{i}")
|
|
|
|
items = await peek_dlq(rds, "extraction", start=0, count=3)
|
|
assert len(items) == 3
|
|
assert items[0]["original_payload"]["doc"] == 0
|
|
assert items[2]["original_payload"]["doc"] == 2
|
|
|
|
# DLQ should still have all 5 items (peek doesn't remove)
|
|
assert await dlq_length(rds, "extraction") == 5
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# replay_one
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_one(rds):
|
|
await send_to_dlq(rds, "ingestion", SAMPLE_JOB, error="timeout")
|
|
await send_to_dlq(rds, "ingestion", {"ticker": "MSFT"}, error="timeout")
|
|
|
|
entry = await replay_one(rds, "ingestion")
|
|
assert entry is not None
|
|
assert entry["original_payload"] == SAMPLE_JOB
|
|
|
|
# Original payload should now be in the source queue
|
|
source_queue = queue_key("ingestion")
|
|
raw = await rds.lpop(source_queue)
|
|
assert raw is not None
|
|
assert json.loads(raw) == SAMPLE_JOB
|
|
|
|
# DLQ should have 1 remaining
|
|
assert await dlq_length(rds, "ingestion") == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_one_empty(rds):
|
|
result = await replay_one(rds, "ingestion")
|
|
assert result is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# replay_all
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_all(rds):
|
|
for i in range(4):
|
|
await send_to_dlq(rds, "aggregation", {"idx": i}, error="fail")
|
|
|
|
count = await replay_all(rds, "aggregation")
|
|
assert count == 4
|
|
|
|
# DLQ should be empty
|
|
assert await dlq_length(rds, "aggregation") == 0
|
|
|
|
# Source queue should have 4 items
|
|
source_queue = queue_key("aggregation")
|
|
assert await rds.llen(source_queue) == 4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_all_empty(rds):
|
|
count = await replay_all(rds, "aggregation")
|
|
assert count == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# purge_dlq
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_purge_dlq(rds):
|
|
for i in range(3):
|
|
await send_to_dlq(rds, "parsing", {"idx": i}, error="fail")
|
|
|
|
removed = await purge_dlq(rds, "parsing")
|
|
assert removed == 3
|
|
assert await dlq_length(rds, "parsing") == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_purge_dlq_empty(rds):
|
|
removed = await purge_dlq(rds, "parsing")
|
|
assert removed == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# dlq_summary
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dlq_summary(rds):
|
|
await send_to_dlq(rds, "ingestion", {"a": 1}, error="e")
|
|
await send_to_dlq(rds, "ingestion", {"b": 2}, error="e")
|
|
await send_to_dlq(rds, "parsing", {"c": 3}, error="e")
|
|
|
|
summary = await dlq_summary(rds, ["ingestion", "parsing", "extraction"])
|
|
assert summary == {"ingestion": 2, "parsing": 1, "extraction": 0}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DEFAULT_MAX_ATTEMPTS constant
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_default_max_attempts():
|
|
assert DEFAULT_MAX_ATTEMPTS == 3
|