Files
stonks-oracle/tests/test_dead_letter.py
T

209 lines
6.2 KiB
Python

"""Tests for dead-letter queue support and replay tooling."""
from __future__ import annotations
import json
import pytest
from services.shared.dead_letter import (
DEFAULT_MAX_ATTEMPTS,
dlq_length,
dlq_summary,
peek_dlq,
purge_dlq,
replay_all,
replay_one,
send_to_dlq,
wrap_dlq_entry,
)
from services.shared.redis_keys import dlq_key, queue_key
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class FakeRedis:
"""Minimal async Redis fake backed by plain dicts."""
def __init__(self):
self._data: dict[str, list[str]] = {}
async def rpush(self, key: str, value: str) -> int:
self._data.setdefault(key, []).append(value)
return len(self._data[key])
async def lpop(self, key: str) -> str | None:
lst = self._data.get(key, [])
if not lst:
return None
return lst.pop(0)
async def llen(self, key: str) -> int:
return len(self._data.get(key, []))
async def lrange(self, key: str, start: int, end: int) -> list[str]:
lst = self._data.get(key, [])
return lst[start:end + 1]
async def delete(self, key: str) -> int:
if key in self._data:
del self._data[key]
return 1
return 0
@pytest.fixture
def rds():
return FakeRedis()
SAMPLE_JOB = {"ticker": "AAPL", "source_type": "news_api", "source_id": "src-1"}
# ---------------------------------------------------------------------------
# wrap_dlq_entry
# ---------------------------------------------------------------------------
def test_wrap_dlq_entry_structure():
entry = wrap_dlq_entry(SAMPLE_JOB, "ingestion", "timeout", attempt=2, worker="ingestion_worker")
assert entry["original_payload"] == SAMPLE_JOB
assert entry["queue"] == "ingestion"
assert entry["error"] == "timeout"
assert entry["attempt"] == 2
assert entry["worker"] == "ingestion_worker"
assert "dead_lettered_at" in entry
# ---------------------------------------------------------------------------
# send_to_dlq / dlq_length
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_send_to_dlq_and_length(rds):
await send_to_dlq(rds, "parsing", SAMPLE_JOB, error="parse failure", attempt=3)
length = await dlq_length(rds, "parsing")
assert length == 1
# Verify the stored entry
raw = rds._data[dlq_key("parsing")][0]
entry = json.loads(raw)
assert entry["original_payload"] == SAMPLE_JOB
assert entry["error"] == "parse failure"
assert entry["attempt"] == 3
# ---------------------------------------------------------------------------
# peek_dlq
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_peek_dlq(rds):
for i in range(5):
await send_to_dlq(rds, "extraction", {"doc": i}, error=f"err-{i}")
items = await peek_dlq(rds, "extraction", start=0, count=3)
assert len(items) == 3
assert items[0]["original_payload"]["doc"] == 0
assert items[2]["original_payload"]["doc"] == 2
# DLQ should still have all 5 items (peek doesn't remove)
assert await dlq_length(rds, "extraction") == 5
# ---------------------------------------------------------------------------
# replay_one
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_replay_one(rds):
await send_to_dlq(rds, "ingestion", SAMPLE_JOB, error="timeout")
await send_to_dlq(rds, "ingestion", {"ticker": "MSFT"}, error="timeout")
entry = await replay_one(rds, "ingestion")
assert entry is not None
assert entry["original_payload"] == SAMPLE_JOB
# Original payload should now be in the source queue
source_queue = queue_key("ingestion")
raw = await rds.lpop(source_queue)
assert raw is not None
assert json.loads(raw) == SAMPLE_JOB
# DLQ should have 1 remaining
assert await dlq_length(rds, "ingestion") == 1
@pytest.mark.asyncio
async def test_replay_one_empty(rds):
result = await replay_one(rds, "ingestion")
assert result is None
# ---------------------------------------------------------------------------
# replay_all
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_replay_all(rds):
for i in range(4):
await send_to_dlq(rds, "aggregation", {"idx": i}, error="fail")
count = await replay_all(rds, "aggregation")
assert count == 4
# DLQ should be empty
assert await dlq_length(rds, "aggregation") == 0
# Source queue should have 4 items
source_queue = queue_key("aggregation")
assert await rds.llen(source_queue) == 4
@pytest.mark.asyncio
async def test_replay_all_empty(rds):
count = await replay_all(rds, "aggregation")
assert count == 0
# ---------------------------------------------------------------------------
# purge_dlq
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_purge_dlq(rds):
for i in range(3):
await send_to_dlq(rds, "parsing", {"idx": i}, error="fail")
removed = await purge_dlq(rds, "parsing")
assert removed == 3
assert await dlq_length(rds, "parsing") == 0
@pytest.mark.asyncio
async def test_purge_dlq_empty(rds):
removed = await purge_dlq(rds, "parsing")
assert removed == 0
# ---------------------------------------------------------------------------
# dlq_summary
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dlq_summary(rds):
await send_to_dlq(rds, "ingestion", {"a": 1}, error="e")
await send_to_dlq(rds, "ingestion", {"b": 2}, error="e")
await send_to_dlq(rds, "parsing", {"c": 3}, error="e")
summary = await dlq_summary(rds, ["ingestion", "parsing", "extraction"])
assert summary == {"ingestion": 2, "parsing": 1, "extraction": 0}
# ---------------------------------------------------------------------------
# DEFAULT_MAX_ATTEMPTS constant
# ---------------------------------------------------------------------------
def test_default_max_attempts():
assert DEFAULT_MAX_ATTEMPTS == 3