"""Tests for dead-letter queue support and replay tooling.""" from __future__ import annotations import json import pytest from services.shared.dead_letter import ( DEFAULT_MAX_ATTEMPTS, dlq_length, dlq_summary, peek_dlq, purge_dlq, replay_all, replay_one, send_to_dlq, wrap_dlq_entry, ) from services.shared.redis_keys import dlq_key, queue_key # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- class FakeRedis: """Minimal async Redis fake backed by plain dicts.""" def __init__(self): self._data: dict[str, list[str]] = {} async def rpush(self, key: str, value: str) -> int: self._data.setdefault(key, []).append(value) return len(self._data[key]) async def lpop(self, key: str) -> str | None: lst = self._data.get(key, []) if not lst: return None return lst.pop(0) async def llen(self, key: str) -> int: return len(self._data.get(key, [])) async def lrange(self, key: str, start: int, end: int) -> list[str]: lst = self._data.get(key, []) return lst[start:end + 1] async def delete(self, key: str) -> int: if key in self._data: del self._data[key] return 1 return 0 @pytest.fixture def rds(): return FakeRedis() SAMPLE_JOB = {"ticker": "AAPL", "source_type": "news_api", "source_id": "src-1"} # --------------------------------------------------------------------------- # wrap_dlq_entry # --------------------------------------------------------------------------- def test_wrap_dlq_entry_structure(): entry = wrap_dlq_entry(SAMPLE_JOB, "ingestion", "timeout", attempt=2, worker="ingestion_worker") assert entry["original_payload"] == SAMPLE_JOB assert entry["queue"] == "ingestion" assert entry["error"] == "timeout" assert entry["attempt"] == 2 assert entry["worker"] == "ingestion_worker" assert "dead_lettered_at" in entry # --------------------------------------------------------------------------- # send_to_dlq / dlq_length # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_send_to_dlq_and_length(rds): await send_to_dlq(rds, "parsing", SAMPLE_JOB, error="parse failure", attempt=3) length = await dlq_length(rds, "parsing") assert length == 1 # Verify the stored entry raw = rds._data[dlq_key("parsing")][0] entry = json.loads(raw) assert entry["original_payload"] == SAMPLE_JOB assert entry["error"] == "parse failure" assert entry["attempt"] == 3 # --------------------------------------------------------------------------- # peek_dlq # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_peek_dlq(rds): for i in range(5): await send_to_dlq(rds, "extraction", {"doc": i}, error=f"err-{i}") items = await peek_dlq(rds, "extraction", start=0, count=3) assert len(items) == 3 assert items[0]["original_payload"]["doc"] == 0 assert items[2]["original_payload"]["doc"] == 2 # DLQ should still have all 5 items (peek doesn't remove) assert await dlq_length(rds, "extraction") == 5 # --------------------------------------------------------------------------- # replay_one # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_replay_one(rds): await send_to_dlq(rds, "ingestion", SAMPLE_JOB, error="timeout") await send_to_dlq(rds, "ingestion", {"ticker": "MSFT"}, error="timeout") entry = await replay_one(rds, "ingestion") assert entry is not None assert entry["original_payload"] == SAMPLE_JOB # Original payload should now be in the source queue source_queue = queue_key("ingestion") raw = await rds.lpop(source_queue) assert raw is not None assert json.loads(raw) == SAMPLE_JOB # DLQ should have 1 remaining assert await dlq_length(rds, "ingestion") == 1 @pytest.mark.asyncio async def test_replay_one_empty(rds): result = await replay_one(rds, "ingestion") assert result is None # --------------------------------------------------------------------------- # replay_all # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_replay_all(rds): for i in range(4): await send_to_dlq(rds, "aggregation", {"idx": i}, error="fail") count = await replay_all(rds, "aggregation") assert count == 4 # DLQ should be empty assert await dlq_length(rds, "aggregation") == 0 # Source queue should have 4 items source_queue = queue_key("aggregation") assert await rds.llen(source_queue) == 4 @pytest.mark.asyncio async def test_replay_all_empty(rds): count = await replay_all(rds, "aggregation") assert count == 0 # --------------------------------------------------------------------------- # purge_dlq # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_purge_dlq(rds): for i in range(3): await send_to_dlq(rds, "parsing", {"idx": i}, error="fail") removed = await purge_dlq(rds, "parsing") assert removed == 3 assert await dlq_length(rds, "parsing") == 0 @pytest.mark.asyncio async def test_purge_dlq_empty(rds): removed = await purge_dlq(rds, "parsing") assert removed == 0 # --------------------------------------------------------------------------- # dlq_summary # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_dlq_summary(rds): await send_to_dlq(rds, "ingestion", {"a": 1}, error="e") await send_to_dlq(rds, "ingestion", {"b": 2}, error="e") await send_to_dlq(rds, "parsing", {"c": 3}, error="e") summary = await dlq_summary(rds, ["ingestion", "parsing", "extraction"]) assert summary == {"ingestion": 2, "parsing": 1, "extraction": 0} # --------------------------------------------------------------------------- # DEFAULT_MAX_ATTEMPTS constant # --------------------------------------------------------------------------- def test_default_max_attempts(): assert DEFAULT_MAX_ATTEMPTS == 3