Files
stonks-oracle/tests/test_signal_engine_normalizer.py
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

420 lines
14 KiB
Python

"""Unit tests for services.signal_engine.normalizer.
Tests the input normalizer's data assembly, sentinel handling, timestamp
validation, and derived field computation.
Requirements: 1.1, 1.2, 1.3, 1.4, 1.5
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from services.signal_engine.config import SignalEngineConfig
from services.signal_engine.models import OHLCVBar
from services.signal_engine.normalizer import (
TIMEFRAMES,
_aggregate_bars_by_period,
_polygon_bar_to_ohlcv,
_validate_monotonic_timestamps,
normalize_input,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_bar_row(ts_ms: int, o: float, h: float, l: float, c: float, v: float) -> MagicMock:
"""Create a mock asyncpg.Record with Polygon bar data."""
row = MagicMock()
row.__getitem__ = lambda self, key: {
"data": {"t": ts_ms, "o": o, "h": h, "l": l, "c": c, "v": v},
}[key]
return row
def _make_bar(ts_ms: int, c: float = 100.0) -> OHLCVBar:
"""Create an OHLCVBar with a given timestamp and close price."""
return OHLCVBar(
timestamp=datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc),
open=c - 1,
high=c + 1,
low=c - 2,
close=c,
volume=1000.0,
)
# ---------------------------------------------------------------------------
# _polygon_bar_to_ohlcv
# ---------------------------------------------------------------------------
class TestPolygonBarToOhlcv:
def test_valid_bar(self):
row = _make_bar_row(1700000000000, 100.0, 105.0, 99.0, 103.0, 5000.0)
bar = _polygon_bar_to_ohlcv(row)
assert bar is not None
assert bar.open == 100.0
assert bar.high == 105.0
assert bar.low == 99.0
assert bar.close == 103.0
assert bar.volume == 5000.0
assert bar.timestamp.year == 2023
def test_missing_timestamp_returns_none(self):
row = MagicMock()
row.__getitem__ = lambda self, key: {"data": {"o": 1, "h": 2, "l": 0, "c": 1, "v": 10}}[key]
assert _polygon_bar_to_ohlcv(row) is None
def test_non_dict_data_returns_none(self):
row = MagicMock()
row.__getitem__ = lambda self, key: {"data": "not a dict"}[key]
assert _polygon_bar_to_ohlcv(row) is None
# ---------------------------------------------------------------------------
# _validate_monotonic_timestamps
# ---------------------------------------------------------------------------
class TestValidateMonotonicTimestamps:
def test_already_monotonic(self):
bars = [_make_bar(1000 * i) for i in [1000, 2000, 3000]]
result = _validate_monotonic_timestamps(bars, "D", "AAPL")
assert result is bars # same reference — no sorting needed
def test_non_monotonic_gets_sorted(self):
bars = [_make_bar(1000 * i) for i in [3000, 1000, 2000]]
result = _validate_monotonic_timestamps(bars, "D", "AAPL")
timestamps = [b.timestamp for b in result]
assert timestamps == sorted(timestamps)
def test_single_bar(self):
bars = [_make_bar(1000000)]
result = _validate_monotonic_timestamps(bars, "D", "AAPL")
assert len(result) == 1
def test_empty_list(self):
result = _validate_monotonic_timestamps([], "D", "AAPL")
assert result == []
# ---------------------------------------------------------------------------
# _aggregate_bars_by_period
# ---------------------------------------------------------------------------
class TestAggregate:
def test_weekly_aggregation(self):
# Create 10 daily bars spanning ~2 weeks
from datetime import timedelta
base = datetime(2024, 1, 1, tzinfo=timezone.utc)
daily = []
for i in range(10):
ts = base + timedelta(days=i)
daily.append(
OHLCVBar(
timestamp=ts,
open=100.0 + i,
high=110.0 + i,
low=90.0 + i,
close=105.0 + i,
volume=1000.0,
)
)
weekly = _aggregate_bars_by_period(daily, "week")
assert len(weekly) >= 2 # should span at least 2 ISO weeks
# Each weekly bar should have correct OHLCV aggregation
for w in weekly:
assert w.volume >= 1000.0 # at least one day's volume
def test_monthly_aggregation(self):
from datetime import timedelta
base = datetime(2024, 1, 15, tzinfo=timezone.utc)
daily = []
for i in range(45): # spans Jan and Feb
ts = base + timedelta(days=i)
daily.append(
OHLCVBar(
timestamp=ts,
open=100.0,
high=110.0,
low=90.0,
close=105.0,
volume=500.0,
)
)
monthly = _aggregate_bars_by_period(daily, "month")
assert len(monthly) >= 2 # should span at least 2 months
def test_empty_input(self):
assert _aggregate_bars_by_period([], "week") == []
# ---------------------------------------------------------------------------
# normalize_input — integration with mocked DB
# ---------------------------------------------------------------------------
class TestNormalizeInput:
"""Test the full normalize_input function with mocked asyncpg pool."""
@pytest.fixture
def config(self):
return SignalEngineConfig()
@pytest.fixture
def mock_pool(self):
pool = AsyncMock()
return pool
def _setup_pool_with_data(self, pool):
"""Configure the mock pool to return realistic data."""
ts_base = 1700000000000 # Nov 2023
# Daily bars
daily_rows = []
for i in range(5):
row = MagicMock()
data = {
"t": ts_base + i * 86400000,
"o": 100.0 + i,
"h": 105.0 + i,
"l": 98.0 + i,
"c": 103.0 + i,
"v": 10000.0,
}
row.__getitem__ = lambda self, key, d=data: {"data": d}[key]
daily_rows.append(row)
# Trend window row
trend_row = MagicMock()
trend_row.__getitem__ = lambda self, key: {"confidence": 0.75}[key]
# Earnings row
from datetime import date, timedelta
future_date = date.today() + timedelta(days=30)
earnings_row = MagicMock()
earnings_row.__getitem__ = lambda self, key: {"earnings_date": future_date}[key]
# Macro impact rows
macro_rows = []
for direction in ["positive", "positive", "negative"]:
row = MagicMock()
row.__getitem__ = lambda self, key, d=direction: {
"impact_direction": d,
"macro_impact_score": 0.5,
"confidence": 0.8,
}[key]
macro_rows.append(row)
# Position rows — empty
position_rows = []
# Configure pool.fetch / pool.fetchrow responses
call_count = {"fetch": 0, "fetchrow": 0}
async def mock_fetch(query, *args):
q = query.strip().lower()
if "market_snapshots" in q and "snapshot_type = 'bar'" in q:
return daily_rows
if "market_snapshots" in q and "intraday_bar" in q:
return []
if "macro_impact_records" in q:
return macro_rows
if "position_stop_levels" in q:
return position_rows
return []
async def mock_fetchrow(query, *args):
q = query.strip().lower()
if "trend_windows" in q:
return trend_row
if "earnings_calendar" in q:
return earnings_row
return None
pool.fetch = mock_fetch
pool.fetchrow = mock_fetchrow
@pytest.mark.asyncio
async def test_full_normalization(self, mock_pool, config):
self._setup_pool_with_data(mock_pool)
result = await normalize_input(mock_pool, "AAPL", config)
assert result.ticker == "AAPL"
assert result.evaluated_at is not None
assert len(result.bars["D"]) == 5
assert result.valuation_score == 0.75
assert result.earnings_proximity_days is not None
assert result.earnings_proximity_days > 0
assert result.macro_bias != 0.0 # should be positive-leaning
assert result.open_positions == []
assert len(result.closing_prices) == 5
assert len(result.returns) == 4 # n-1 returns
assert result.current_price is not None
@pytest.mark.asyncio
async def test_sentinel_values_on_empty_data(self, mock_pool, config):
"""When all data sources return empty, sentinels are used."""
async def empty_fetch(query, *args):
return []
async def empty_fetchrow(query, *args):
return None
mock_pool.fetch = empty_fetch
mock_pool.fetchrow = empty_fetchrow
result = await normalize_input(mock_pool, "UNKNOWN", config)
assert result.ticker == "UNKNOWN"
assert all(result.bars[tf] == [] for tf in TIMEFRAMES)
assert result.valuation_score is None
assert result.earnings_proximity_days is None
assert result.macro_bias == 0.0
assert result.open_positions == []
assert result.closing_prices == []
assert result.returns == []
assert result.current_price is None
@pytest.mark.asyncio
async def test_db_errors_produce_sentinels(self, mock_pool, config):
"""When DB queries raise exceptions, sentinels are used."""
async def failing_fetch(query, *args):
raise Exception("DB connection lost")
async def failing_fetchrow(query, *args):
raise Exception("DB connection lost")
mock_pool.fetch = failing_fetch
mock_pool.fetchrow = failing_fetchrow
result = await normalize_input(mock_pool, "FAIL", config)
assert result.ticker == "FAIL"
assert all(result.bars[tf] == [] for tf in TIMEFRAMES)
assert result.valuation_score is None
assert result.earnings_proximity_days is None
assert result.macro_bias == 0.0
assert result.open_positions == []
assert result.current_price is None
@pytest.mark.asyncio
async def test_weekly_monthly_derived_from_daily(self, mock_pool, config):
"""Weekly and monthly bars are derived from daily bars."""
ts_base = 1700000000000
daily_rows = []
for i in range(30): # 30 days of data
row = MagicMock()
data = {
"t": ts_base + i * 86400000,
"o": 100.0,
"h": 110.0,
"l": 90.0,
"c": 105.0,
"v": 1000.0,
}
row.__getitem__ = lambda self, key, d=data: {"data": d}[key]
daily_rows.append(row)
async def mock_fetch(query, *args):
q = query.strip().lower()
if "market_snapshots" in q and "snapshot_type = 'bar'" in q:
return daily_rows
return []
async def mock_fetchrow(query, *args):
return None
mock_pool.fetch = mock_fetch
mock_pool.fetchrow = mock_fetchrow
result = await normalize_input(mock_pool, "AAPL", config)
assert len(result.bars["D"]) == 30
assert len(result.bars["W"]) > 0 # weekly derived
assert len(result.bars["M"]) > 0 # monthly derived
@pytest.mark.asyncio
async def test_current_price_from_shortest_timeframe(self, mock_pool, config):
"""current_price comes from the shortest available timeframe."""
ts_base = 1700000000000
# Only provide intraday bars (M30), no daily
intraday_rows = []
for i in range(3):
row = MagicMock()
data = {
"t": ts_base + i * 1800000, # 30-min intervals
"o": 100.0,
"h": 110.0,
"l": 90.0,
"c": 150.0 + i, # last close = 152.0
"v": 500.0,
}
row.__getitem__ = lambda self, key, d=data: {"data": d}[key]
intraday_rows.append(row)
async def mock_fetch(query, *args):
q = query.strip().lower()
if "intraday_bar" in q:
return intraday_rows
return []
async def mock_fetchrow(query, *args):
return None
mock_pool.fetch = mock_fetch
mock_pool.fetchrow = mock_fetchrow
result = await normalize_input(mock_pool, "AAPL", config)
# M30 is the shortest timeframe and has data
assert result.current_price == 152.0
@pytest.mark.asyncio
async def test_macro_bias_computation(self, mock_pool, config):
"""macro_bias is a weighted average of direction scores."""
macro_rows = []
# 2 positive, 1 negative — should lean positive
for direction, score, conf in [
("positive", 0.8, 0.9),
("positive", 0.6, 0.7),
("negative", 0.3, 0.5),
]:
row = MagicMock()
row.__getitem__ = lambda self, key, d=direction, s=score, c=conf: {
"impact_direction": d,
"macro_impact_score": s,
"confidence": c,
}[key]
macro_rows.append(row)
async def mock_fetch(query, *args):
if "macro_impact_records" in query:
return macro_rows
return []
async def mock_fetchrow(query, *args):
return None
mock_pool.fetch = mock_fetch
mock_pool.fetchrow = mock_fetchrow
result = await normalize_input(mock_pool, "AAPL", config)
# Weighted: pos(0.8*0.9=0.72) + pos(0.6*0.7=0.42) + neg(0.3*0.5=0.15)
# = (1.0*0.72 + 1.0*0.42 + (-1.0)*0.15) / (0.72+0.42+0.15)
# = (0.72 + 0.42 - 0.15) / 1.29 ≈ 0.767
assert result.macro_bias > 0.0
assert result.macro_bias <= 1.0