Files
stonks-oracle/tests/test_signal_engine_rsi.py
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

347 lines
12 KiB
Python

"""Unit tests for services.signal_engine.signals.rsi — RSI evaluator.
Requirements: 2.3, 2.6, 2.7
"""
from __future__ import annotations
from datetime import datetime, timezone
from services.signal_engine.models import OHLCVBar, SignalDirection
from services.signal_engine.signals.rsi import (
DEFAULT_MIN_BARS,
DEFAULT_RSI_PERIOD,
OVERBOUGHT_THRESHOLD,
OVERSOLD_THRESHOLD,
RSIEvaluator,
compute_rsi,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _bar(close: float) -> OHLCVBar:
"""Create a minimal OHLCVBar for testing."""
return OHLCVBar(
timestamp=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=close,
high=close,
low=close,
close=close,
volume=1000.0,
)
def _make_bars(n: int, close: float = 100.0) -> list[OHLCVBar]:
"""Create *n* identical bars with the same close price."""
return [_bar(close) for _ in range(n)]
def _trending_bars(n: int, start: float, step: float) -> list[OHLCVBar]:
"""Create *n* bars with linearly increasing/decreasing close prices."""
return [_bar(start + i * step) for i in range(n)]
def _alternating_bars(
n: int,
base: float,
gain: float,
loss: float,
) -> list[OHLCVBar]:
"""Create bars that alternate between gaining and losing.
Useful for producing RSI values in the neutral zone.
"""
bars: list[OHLCVBar] = [_bar(base)]
price = base
for i in range(1, n):
if i % 2 == 1:
price += gain
else:
price -= loss
bars.append(_bar(price))
return bars
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
def test_default_period() -> None:
assert DEFAULT_RSI_PERIOD == 14
def test_default_min_bars() -> None:
assert DEFAULT_MIN_BARS == 15
def test_thresholds() -> None:
assert OVERBOUGHT_THRESHOLD == 70.0
assert OVERSOLD_THRESHOLD == 30.0
# ---------------------------------------------------------------------------
# Insufficient data → None
# ---------------------------------------------------------------------------
def test_returns_none_when_insufficient_bars() -> None:
"""Requirement 2.6: return None when fewer than 15 bars."""
evaluator = RSIEvaluator()
bars = _make_bars(14)
assert evaluator.evaluate(bars, "D") is None
def test_returns_none_with_empty_bars() -> None:
evaluator = RSIEvaluator()
assert evaluator.evaluate([], "D") is None
def test_returns_none_with_one_bar() -> None:
evaluator = RSIEvaluator()
assert evaluator.evaluate([_bar(100.0)], "D") is None
# ---------------------------------------------------------------------------
# Neutral zone → None
# ---------------------------------------------------------------------------
def test_returns_none_in_neutral_zone() -> None:
"""RSI between 30 and 70 should return None (no signal)."""
evaluator = RSIEvaluator()
# Alternating gains and losses produce RSI near 50
bars = _alternating_bars(30, base=100.0, gain=1.0, loss=1.0)
result = evaluator.evaluate(bars, "D")
# RSI should be near 50 → neutral → None
assert result is None
def test_flat_market_returns_none() -> None:
"""All bars with the same close → no price changes → no signal.
When all changes are zero, avg_gain=0 and avg_loss=0.
avg_loss=0 means RSI=100, which is overbought. But with truly flat
prices (no gains, no losses), RSI is technically 100 (all gains are 0,
all losses are 0 → RS = 0/0 edge case handled as RSI=100).
"""
evaluator = RSIEvaluator()
bars = _make_bars(30, close=100.0)
rsi = compute_rsi(bars)
# With zero changes: avg_gain=0, avg_loss=0 → RSI=100 (per our implementation)
assert rsi == 100.0
# ---------------------------------------------------------------------------
# Overbought signal (RSI > 70) → BEARISH
# ---------------------------------------------------------------------------
def test_overbought_produces_bearish_signal() -> None:
"""Requirement 2.3: RSI > 70 → BEARISH signal (overbought → potential reversal down)."""
evaluator = RSIEvaluator()
# Strong uptrend: all gains, no losses → RSI approaches 100
bars = _trending_bars(30, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
assert result.signal_type == "rsi"
assert result.direction == SignalDirection.BEARISH
assert result.metadata["zone"] == "overbought"
assert result.metadata["rsi"] > OVERBOUGHT_THRESHOLD
# ---------------------------------------------------------------------------
# Oversold signal (RSI < 30) → BULLISH
# ---------------------------------------------------------------------------
def test_oversold_produces_bullish_signal() -> None:
"""Requirement 2.3: RSI < 30 → BULLISH signal (oversold → potential reversal up)."""
evaluator = RSIEvaluator()
# Strong downtrend: all losses, no gains → RSI approaches 0
bars = _trending_bars(30, start=200.0, step=-2.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
assert result.signal_type == "rsi"
assert result.direction == SignalDirection.BULLISH
assert result.metadata["zone"] == "oversold"
assert result.metadata["rsi"] < OVERSOLD_THRESHOLD
# ---------------------------------------------------------------------------
# Strength scaling
# ---------------------------------------------------------------------------
def test_overbought_strength_scales_with_distance() -> None:
"""Strength = (RSI - 70) / 30, clamped to [0, 1]."""
evaluator = RSIEvaluator()
# Strong uptrend → RSI near 100 → high strength
bars = _trending_bars(30, start=50.0, step=3.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
rsi = result.metadata["rsi"]
expected_strength = min(1.0, max(0.0, (rsi - 70.0) / 30.0))
assert abs(result.strength - expected_strength) < 1e-9
def test_oversold_strength_scales_with_distance() -> None:
"""Strength = (30 - RSI) / 30, clamped to [0, 1]."""
evaluator = RSIEvaluator()
# Strong downtrend → RSI near 0 → high strength
bars = _trending_bars(30, start=200.0, step=-3.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
rsi = result.metadata["rsi"]
expected_strength = min(1.0, max(0.0, (30.0 - rsi) / 30.0))
assert abs(result.strength - expected_strength) < 1e-9
def test_strength_clamped_to_unit_interval() -> None:
"""Strength must always be in [0, 1]."""
evaluator = RSIEvaluator()
# Extreme uptrend → RSI ≈ 100 → strength should be clamped to 1.0
bars = _trending_bars(30, start=10.0, step=5.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
assert 0.0 <= result.strength <= 1.0
# ---------------------------------------------------------------------------
# Confidence = strength * 0.85
# ---------------------------------------------------------------------------
def test_confidence_equals_strength_times_085() -> None:
"""Confidence = strength * 0.85."""
evaluator = RSIEvaluator()
bars = _trending_bars(30, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
expected_confidence = result.strength * 0.85
assert abs(result.confidence - expected_confidence) < 1e-9
# ---------------------------------------------------------------------------
# Signal result structure (Requirement 2.7)
# ---------------------------------------------------------------------------
def test_signal_result_structure() -> None:
"""Requirement 2.7: SignalResult has all required fields."""
evaluator = RSIEvaluator()
bars = _trending_bars(30, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "H4")
assert result is not None
assert result.signal_type == "rsi"
assert result.timeframe == "H4"
assert 0.0 <= result.strength <= 1.0
assert 0.0 <= result.confidence <= 1.0
assert result.direction in (SignalDirection.BULLISH, SignalDirection.BEARISH)
def test_metadata_contains_rsi_and_period() -> None:
"""Metadata should include RSI value and period used."""
evaluator = RSIEvaluator()
bars = _trending_bars(30, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
meta = result.metadata
assert "rsi" in meta
assert "period" in meta
assert meta["period"] == 14
assert isinstance(meta["rsi"], float)
# ---------------------------------------------------------------------------
# Timeframe passthrough
# ---------------------------------------------------------------------------
def test_timeframe_passthrough() -> None:
"""The timeframe label is passed through to the result."""
evaluator = RSIEvaluator()
bars = _trending_bars(30, start=50.0, step=2.0)
for tf in ("M30", "H1", "H4", "D", "W", "M"):
result = evaluator.evaluate(bars, tf)
assert result is not None
assert result.timeframe == tf
# ---------------------------------------------------------------------------
# Boundary: exactly 15 bars
# ---------------------------------------------------------------------------
def test_exactly_15_bars_works() -> None:
"""Exactly 15 bars (period + 1) should be sufficient."""
evaluator = RSIEvaluator()
bars = _trending_bars(15, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
# Should produce a result (strong uptrend → overbought)
assert result is not None
def test_14_bars_returns_none() -> None:
"""14 bars is insufficient for a 14-period RSI."""
evaluator = RSIEvaluator()
bars = _trending_bars(14, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
assert result is None
# ---------------------------------------------------------------------------
# compute_rsi standalone function
# ---------------------------------------------------------------------------
def test_compute_rsi_all_gains() -> None:
"""All gains, no losses → RSI approaches 100."""
bars = _trending_bars(30, start=50.0, step=1.0)
rsi = compute_rsi(bars)
assert rsi is not None
assert rsi > 95.0 # Should be very close to 100
def test_compute_rsi_all_losses() -> None:
"""All losses, no gains → RSI approaches 0."""
bars = _trending_bars(30, start=200.0, step=-1.0)
rsi = compute_rsi(bars)
assert rsi is not None
assert rsi < 5.0 # Should be very close to 0
def test_compute_rsi_insufficient_data() -> None:
"""Returns None when fewer than period + 1 bars."""
bars = _make_bars(10)
rsi = compute_rsi(bars)
assert rsi is None
def test_compute_rsi_range() -> None:
"""RSI should always be in [0, 100]."""
# Mixed trend
bars = _trending_bars(15, start=100.0, step=0.5) + _trending_bars(15, start=107.0, step=-0.3)
rsi = compute_rsi(bars)
assert rsi is not None
assert 0.0 <= rsi <= 100.0
# ---------------------------------------------------------------------------
# Custom period
# ---------------------------------------------------------------------------
def test_custom_period() -> None:
"""RSIEvaluator with a custom period should use that period."""
evaluator = RSIEvaluator(period=7)
assert evaluator.period == 7
assert evaluator.min_bars == 8
# 8 bars with uptrend should work
bars = _trending_bars(8, start=50.0, step=2.0)
result = evaluator.evaluate(bars, "D")
assert result is not None
assert result.metadata["period"] == 7