Files
stonks-oracle/tests/test_signal_engine_base.py
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

201 lines
5.8 KiB
Python

"""Unit tests for services.signal_engine.signals.base helper functions.
Tests swing high/low detection, lookback validation, and SMA computation.
Requirements: 2.6, 2.7
"""
from __future__ import annotations
from datetime import datetime, timezone
from services.signal_engine.models import OHLCVBar
from services.signal_engine.signals.base import (
compute_sma,
find_swing_high,
find_swing_low,
validate_lookback,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _bar(
close: float,
high: float | None = None,
low: float | None = None,
ts_offset: int = 0,
) -> OHLCVBar:
"""Create a minimal OHLCVBar for testing."""
return OHLCVBar(
timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=close,
high=high if high is not None else close,
low=low if low is not None else close,
close=close,
volume=1000.0,
)
# ---------------------------------------------------------------------------
# find_swing_high
# ---------------------------------------------------------------------------
def test_find_swing_high_basic() -> None:
bars = [_bar(10, high=12), _bar(10, high=15), _bar(10, high=11)]
result = find_swing_high(bars, lookback=3)
assert result is not None
idx, price = result
assert idx == 1
assert price == 15.0
def test_find_swing_high_lookback_subset() -> None:
bars = [_bar(10, high=20), _bar(10, high=12), _bar(10, high=15), _bar(10, high=11)]
# lookback=2 → only last 2 bars (index 2 and 3 in original)
result = find_swing_high(bars, lookback=2)
assert result is not None
idx, price = result
assert idx == 2 # bar at index 2 has high=15
assert price == 15.0
def test_find_swing_high_insufficient_data() -> None:
bars = [_bar(10, high=12)]
assert find_swing_high(bars, lookback=5) is None
def test_find_swing_high_zero_lookback() -> None:
bars = [_bar(10, high=12)]
assert find_swing_high(bars, lookback=0) is None
def test_find_swing_high_negative_lookback() -> None:
bars = [_bar(10, high=12)]
assert find_swing_high(bars, lookback=-1) is None
def test_find_swing_high_tie_takes_last() -> None:
"""When multiple bars share the same high, the last one wins (>=)."""
bars = [_bar(10, high=15), _bar(10, high=15), _bar(10, high=10)]
result = find_swing_high(bars, lookback=3)
assert result is not None
idx, price = result
assert idx == 1
assert price == 15.0
# ---------------------------------------------------------------------------
# find_swing_low
# ---------------------------------------------------------------------------
def test_find_swing_low_basic() -> None:
bars = [_bar(10, low=8), _bar(10, low=5), _bar(10, low=9)]
result = find_swing_low(bars, lookback=3)
assert result is not None
idx, price = result
assert idx == 1
assert price == 5.0
def test_find_swing_low_lookback_subset() -> None:
bars = [_bar(10, low=2), _bar(10, low=8), _bar(10, low=5), _bar(10, low=9)]
# lookback=2 → only last 2 bars (index 2 and 3)
result = find_swing_low(bars, lookback=2)
assert result is not None
idx, price = result
assert idx == 2 # bar at index 2 has low=5
assert price == 5.0
def test_find_swing_low_insufficient_data() -> None:
bars = [_bar(10, low=8)]
assert find_swing_low(bars, lookback=5) is None
def test_find_swing_low_zero_lookback() -> None:
bars = [_bar(10, low=8)]
assert find_swing_low(bars, lookback=0) is None
def test_find_swing_low_tie_takes_last() -> None:
"""When multiple bars share the same low, the last one wins (<=)."""
bars = [_bar(10, low=5), _bar(10, low=5), _bar(10, low=10)]
result = find_swing_low(bars, lookback=3)
assert result is not None
idx, price = result
assert idx == 1
assert price == 5.0
# ---------------------------------------------------------------------------
# validate_lookback
# ---------------------------------------------------------------------------
def test_validate_lookback_sufficient() -> None:
bars = [_bar(10)] * 20
assert validate_lookback(bars, min_bars=20) is True
def test_validate_lookback_more_than_enough() -> None:
bars = [_bar(10)] * 50
assert validate_lookback(bars, min_bars=20) is True
def test_validate_lookback_insufficient() -> None:
bars = [_bar(10)] * 5
assert validate_lookback(bars, min_bars=20) is False
def test_validate_lookback_empty() -> None:
assert validate_lookback([], min_bars=1) is False
def test_validate_lookback_zero_min() -> None:
assert validate_lookback([], min_bars=0) is True
# ---------------------------------------------------------------------------
# compute_sma
# ---------------------------------------------------------------------------
def test_compute_sma_basic() -> None:
bars = [_bar(10), _bar(20), _bar(30)]
result = compute_sma(bars, period=3)
assert result is not None
assert result == 20.0
def test_compute_sma_subset() -> None:
bars = [_bar(100), _bar(10), _bar(20), _bar(30)]
# period=3 → average of last 3 bars: (10+20+30)/3 = 20
result = compute_sma(bars, period=3)
assert result is not None
assert result == 20.0
def test_compute_sma_single_bar() -> None:
bars = [_bar(42)]
result = compute_sma(bars, period=1)
assert result is not None
assert result == 42.0
def test_compute_sma_insufficient_data() -> None:
bars = [_bar(10), _bar(20)]
assert compute_sma(bars, period=5) is None
def test_compute_sma_zero_period() -> None:
bars = [_bar(10)]
assert compute_sma(bars, period=0) is None
def test_compute_sma_negative_period() -> None:
bars = [_bar(10)]
assert compute_sma(bars, period=-1) is None