140 lines
4.7 KiB
Python
140 lines
4.7 KiB
Python
"""Tests for metadata persistence helpers.
|
|
|
|
Validates the helper functions in services.shared.metadata that don't
|
|
require a live database connection: type resolution, publisher extraction,
|
|
date parsing, market snapshot type inference, and retry/failure tracking
|
|
computations.
|
|
|
|
Requirements: 3.3, 3.4, 9.2
|
|
"""
|
|
from datetime import datetime, timezone
|
|
|
|
from services.shared.metadata import (
|
|
RETRY_BACKOFF_BASE,
|
|
RETRY_BACKOFF_MAX,
|
|
RETRY_MAX_COUNT,
|
|
_extract_publisher,
|
|
_infer_market_snapshot_type,
|
|
_parse_published_at,
|
|
_resolve_document_type,
|
|
compute_next_retry_at,
|
|
)
|
|
|
|
|
|
class TestResolveDocumentType:
|
|
def test_news_api(self):
|
|
assert _resolve_document_type("news_api") == "article"
|
|
|
|
def test_filings_api(self):
|
|
assert _resolve_document_type("filings_api") == "filing"
|
|
|
|
def test_web_scrape(self):
|
|
assert _resolve_document_type("web_scrape") == "press_release"
|
|
|
|
def test_unknown_defaults_to_article(self):
|
|
assert _resolve_document_type("something_else") == "article"
|
|
|
|
|
|
class TestExtractPublisher:
|
|
def test_direct_publisher_field(self):
|
|
assert _extract_publisher({"publisher": "Reuters"}) == "Reuters"
|
|
|
|
def test_source_dict_with_name(self):
|
|
assert _extract_publisher({"source": {"name": "Bloomberg"}}) == "Bloomberg"
|
|
|
|
def test_source_string(self):
|
|
assert _extract_publisher({"source": "AP News"}) == "AP News"
|
|
|
|
def test_empty_item(self):
|
|
assert _extract_publisher({}) == ""
|
|
|
|
def test_publisher_takes_precedence(self):
|
|
item = {"publisher": "Reuters", "source": {"name": "Bloomberg"}}
|
|
assert _extract_publisher(item) == "Reuters"
|
|
|
|
|
|
class TestParsePublishedAt:
|
|
def test_iso_format_with_z(self):
|
|
result = _parse_published_at({"publishedAt": "2026-04-10T12:00:00Z"})
|
|
assert result is not None
|
|
assert result.year == 2026
|
|
assert result.month == 4
|
|
|
|
def test_iso_format_with_offset(self):
|
|
result = _parse_published_at({"published_at": "2026-04-10T12:00:00+00:00"})
|
|
assert result is not None
|
|
|
|
def test_none_when_missing(self):
|
|
assert _parse_published_at({}) is None
|
|
|
|
def test_datetime_passthrough(self):
|
|
dt = datetime(2026, 1, 1, tzinfo=timezone.utc)
|
|
result = _parse_published_at({"publishedAt": dt})
|
|
assert result is dt
|
|
|
|
def test_invalid_string_returns_none(self):
|
|
assert _parse_published_at({"publishedAt": "not-a-date"}) is None
|
|
|
|
|
|
class TestInferMarketSnapshotType:
|
|
def test_bar_from_ohlc(self):
|
|
item = {"o": 100, "h": 105, "l": 99, "c": 103, "v": 1000}
|
|
assert _infer_market_snapshot_type(item) == "bar"
|
|
|
|
def test_ticker_details_from_market_cap(self):
|
|
item = {"market_cap": 2_000_000_000, "name": "Apple"}
|
|
assert _infer_market_snapshot_type(item) == "ticker_details"
|
|
|
|
def test_ticker_details_from_sic_code(self):
|
|
item = {"sic_code": "3674", "name": "NVIDIA"}
|
|
assert _infer_market_snapshot_type(item) == "ticker_details"
|
|
|
|
def test_quote_from_bid_ask(self):
|
|
item = {"bid": 100.5, "ask": 101.0}
|
|
assert _infer_market_snapshot_type(item) == "quote"
|
|
|
|
def test_generic_snapshot_fallback(self):
|
|
item = {"some_field": "value"}
|
|
assert _infer_market_snapshot_type(item) == "snapshot"
|
|
|
|
|
|
class TestComputeNextRetryAt:
|
|
def test_first_retry_uses_base_delay(self):
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
result = compute_next_retry_at(0, now=now)
|
|
expected_seconds = RETRY_BACKOFF_BASE # 60s
|
|
delta = (result - now).total_seconds()
|
|
assert delta == expected_seconds
|
|
|
|
def test_exponential_growth(self):
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
d0 = (compute_next_retry_at(0, now=now) - now).total_seconds()
|
|
d1 = (compute_next_retry_at(1, now=now) - now).total_seconds()
|
|
d2 = (compute_next_retry_at(2, now=now) - now).total_seconds()
|
|
assert d1 == d0 * 2
|
|
assert d2 == d1 * 2
|
|
|
|
def test_capped_at_max(self):
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
result = compute_next_retry_at(20, now=now)
|
|
delta = (result - now).total_seconds()
|
|
assert delta == RETRY_BACKOFF_MAX
|
|
|
|
def test_defaults_to_utc_now(self):
|
|
before = datetime.now(timezone.utc)
|
|
result = compute_next_retry_at(0)
|
|
after = datetime.now(timezone.utc)
|
|
assert before <= result
|
|
assert (result - after).total_seconds() <= RETRY_BACKOFF_BASE + 1
|
|
|
|
|
|
class TestRetryConstants:
|
|
def test_max_count_is_reasonable(self):
|
|
assert RETRY_MAX_COUNT == 10
|
|
|
|
def test_backoff_base_is_one_minute(self):
|
|
assert RETRY_BACKOFF_BASE == 60
|
|
|
|
def test_backoff_max_is_one_hour(self):
|
|
assert RETRY_BACKOFF_MAX == 3600
|