Files
stonks-oracle/tests/test_resilient_adapter.py
T
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

214 lines
7.6 KiB
Python

"""Tests for the resilient adapter wrapper.
Validates retry logic, backoff computation, rate-limit coordination,
and retryable error classification.
"""
from datetime import datetime, timezone
from typing import Any
import pytest
from services.adapters.base import AdapterResult, BaseAdapter
from services.adapters.resilient import (
ResilientAdapter,
RetryConfig,
compute_delay,
)
# --- Helpers ---
def _make_result(
ok: bool = True,
error: str | None = None,
http_status: int | None = None,
metadata: dict[str, Any] | None = None,
) -> AdapterResult:
return AdapterResult(
source_type="market_api",
ticker="AAPL",
items=[{"price": 150}] if ok else [],
raw_payload=b'{"ok":true}' if ok else b"",
content_hash="abc" if ok else "",
fetched_at=datetime.now(timezone.utc),
error=error,
http_status=http_status,
metadata=metadata or {},
)
class FakeAdapter(BaseAdapter):
"""Adapter that returns a sequence of pre-configured results."""
def __init__(self, results: list[AdapterResult]) -> None:
self._results = list(results)
self._call_count = 0
@property
def call_count(self) -> int:
return self._call_count
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
idx = min(self._call_count, len(self._results) - 1)
self._call_count += 1
return self._results[idx]
def source_type(self) -> str:
return "market_api"
# --- Tests ---
class TestComputeDelay:
def test_first_attempt_is_base_delay_plus_jitter(self):
cfg = RetryConfig(base_delay=1.0, max_delay=60.0, jitter_factor=0.0)
delay = compute_delay(0, cfg)
assert delay == pytest.approx(1.0, abs=0.01)
def test_exponential_growth(self):
cfg = RetryConfig(base_delay=1.0, max_delay=60.0, jitter_factor=0.0)
d0 = compute_delay(0, cfg)
d1 = compute_delay(1, cfg)
d2 = compute_delay(2, cfg)
assert d1 == pytest.approx(2.0, abs=0.01)
assert d2 == pytest.approx(4.0, abs=0.01)
assert d2 > d1 > d0
def test_capped_at_max_delay(self):
cfg = RetryConfig(base_delay=1.0, max_delay=10.0, jitter_factor=0.0)
delay = compute_delay(10, cfg)
assert delay <= 10.0
def test_jitter_adds_randomness(self):
cfg = RetryConfig(base_delay=1.0, max_delay=60.0, jitter_factor=1.0)
delays = {compute_delay(0, cfg) for _ in range(20)}
# With jitter_factor=1.0, we should see some variation
assert len(delays) > 1
class TestRetryableClassification:
def setup_method(self) -> None:
adapter = FakeAdapter([_make_result()])
self.resilient = ResilientAdapter(adapter)
def test_ok_result_not_retryable(self):
result = _make_result(ok=True)
assert self.resilient._is_retryable(result) is False
def test_429_is_retryable(self):
result = _make_result(ok=False, error="rate limited", http_status=429)
assert self.resilient._is_retryable(result) is True
def test_500_is_retryable(self):
result = _make_result(ok=False, error="server error", http_status=500)
assert self.resilient._is_retryable(result) is True
def test_503_is_retryable(self):
result = _make_result(ok=False, error="unavailable", http_status=503)
assert self.resilient._is_retryable(result) is True
def test_400_not_retryable(self):
result = _make_result(ok=False, error="bad request", http_status=400)
assert self.resilient._is_retryable(result) is False
def test_401_not_retryable(self):
result = _make_result(ok=False, error="unauthorized", http_status=401)
assert self.resilient._is_retryable(result) is False
def test_timeout_error_retryable(self):
result = _make_result(ok=False, error="timeout: read timed out")
assert self.resilient._is_retryable(result) is True
def test_connection_error_retryable(self):
result = _make_result(ok=False, error="Connection refused")
assert self.resilient._is_retryable(result) is True
def test_generic_error_not_retryable(self):
result = _make_result(ok=False, error="invalid JSON response")
assert self.resilient._is_retryable(result) is False
@pytest.mark.asyncio
class TestResilientFetch:
async def test_success_on_first_try(self):
adapter = FakeAdapter([_make_result(ok=True)])
resilient = ResilientAdapter(
adapter, retry_config=RetryConfig(max_retries=2, base_delay=0.01)
)
result = await resilient.fetch("AAPL", {})
assert result.ok
assert adapter.call_count == 1
assert result.metadata["retry_stats"]["attempts"] == 1
async def test_retries_on_retryable_then_succeeds(self):
results = [
_make_result(ok=False, error="server error", http_status=500),
_make_result(ok=False, error="server error", http_status=500),
_make_result(ok=True),
]
adapter = FakeAdapter(results)
resilient = ResilientAdapter(
adapter, retry_config=RetryConfig(max_retries=3, base_delay=0.01)
)
result = await resilient.fetch("AAPL", {})
assert result.ok
assert adapter.call_count == 3
assert result.metadata["retry_stats"]["attempts"] == 3
async def test_exhausts_retries(self):
fail = _make_result(ok=False, error="server error", http_status=500)
adapter = FakeAdapter([fail, fail, fail, fail])
resilient = ResilientAdapter(
adapter, retry_config=RetryConfig(max_retries=2, base_delay=0.01)
)
result = await resilient.fetch("AAPL", {})
assert not result.ok
assert adapter.call_count == 3 # initial + 2 retries
assert result.metadata["retry_stats"]["exhausted"] is True
async def test_no_retry_on_non_retryable(self):
fail = _make_result(ok=False, error="bad request", http_status=400)
adapter = FakeAdapter([fail])
resilient = ResilientAdapter(
adapter, retry_config=RetryConfig(max_retries=3, base_delay=0.01)
)
result = await resilient.fetch("AAPL", {})
assert not result.ok
assert adapter.call_count == 1
async def test_retry_after_respected_for_429(self):
fail_429 = _make_result(
ok=False, error="rate limited", http_status=429,
metadata={"retry_after": 0.05},
)
results = [fail_429, _make_result(ok=True)]
adapter = FakeAdapter(results)
resilient = ResilientAdapter(
adapter, retry_config=RetryConfig(max_retries=2, base_delay=0.01)
)
result = await resilient.fetch("AAPL", {})
assert result.ok
assert adapter.call_count == 2
# Should have waited at least the retry_after amount
assert result.metadata["retry_stats"]["total_delay"] >= 0.04
async def test_source_type_passthrough(self):
adapter = FakeAdapter([_make_result()])
resilient = ResilientAdapter(adapter)
assert resilient.source_type() == "market_api"
async def test_default_config_for_known_source_type(self):
adapter = FakeAdapter([_make_result()])
resilient = ResilientAdapter(adapter)
# market_api default is 30 rate limit max
assert resilient.config.rate_limit_max == 30
async def test_custom_config_overrides_default(self):
adapter = FakeAdapter([_make_result()])
custom = RetryConfig(max_retries=5, rate_limit_max=100)
resilient = ResilientAdapter(adapter, retry_config=custom)
assert resilient.config.max_retries == 5
assert resilient.config.rate_limit_max == 100