Files
stonks-oracle/tests/test_signal_engine_confluence.py
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

389 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for the multi-timeframe confluence engine.
Validates compute_confluence against requirements 3.13.6.
"""
from services.signal_engine.confluence import (
HIGHER_TIMEFRAME_ANCHORS,
MIN_TIMEFRAME_COUNT,
compute_confluence,
)
from services.signal_engine.models import (
SignalDirection,
SignalResult,
)
# Default timeframe weights from the design (Requirement 3.1)
DEFAULT_WEIGHTS: dict[str, float] = {
"M30": 0.03,
"H1": 0.07,
"H4": 0.15,
"D": 0.30,
"W": 0.30,
"M": 0.15,
}
def _make_signal(
signal_type: str = "fibonacci",
timeframe: str = "D",
strength: float = 0.8,
direction: SignalDirection = SignalDirection.BULLISH,
confidence: float = 0.9,
) -> SignalResult:
"""Build a minimal SignalResult with sensible defaults."""
return SignalResult(
signal_type=signal_type,
timeframe=timeframe,
strength=strength,
direction=direction,
confidence=confidence,
)
class TestMinimumConfluenceThreshold:
"""Requirement 3.3: signals triggering on < 2 timeframes are discarded."""
def test_single_timeframe_discarded(self):
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result == []
def test_zero_timeframes_discarded(self):
signal_results = {"fibonacci": {}}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result == []
def test_two_timeframes_passes_minimum(self):
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D"),
"W": _make_signal(timeframe="W"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert result[0].signal_type == "fibonacci"
class TestHigherTimeframeAnchor:
"""Requirement 3.4: signals without at least one of D, W, M are discarded."""
def test_only_intraday_timeframes_discarded(self):
"""M30 + H1 = 2 timeframes but no D/W/M anchor → discarded."""
signal_results = {
"rsi": {
"M30": _make_signal(timeframe="M30"),
"H1": _make_signal(timeframe="H1"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result == []
def test_intraday_plus_h4_discarded(self):
"""M30 + H1 + H4 = 3 timeframes but no D/W/M → discarded."""
signal_results = {
"rsi": {
"M30": _make_signal(timeframe="M30"),
"H1": _make_signal(timeframe="H1"),
"H4": _make_signal(timeframe="H4"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result == []
def test_with_daily_anchor_passes(self):
signal_results = {
"rsi": {
"H4": _make_signal(timeframe="H4"),
"D": _make_signal(timeframe="D"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
def test_with_weekly_anchor_passes(self):
signal_results = {
"rsi": {
"H1": _make_signal(timeframe="H1"),
"W": _make_signal(timeframe="W"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
def test_with_monthly_anchor_passes(self):
signal_results = {
"rsi": {
"H4": _make_signal(timeframe="H4"),
"M": _make_signal(timeframe="M"),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
class TestConfluenceScoreComputation:
"""Requirement 3.2: C_confluence = Σ(w_tf · s_tf)."""
def test_two_timeframes_score(self):
"""D(0.30) * 0.8 + W(0.30) * 0.6 = 0.24 + 0.18 = 0.42."""
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.8),
"W": _make_signal(timeframe="W", strength=0.6),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert abs(result[0].confluence_score - 0.42) < 1e-9
def test_all_timeframes_score(self):
"""All six timeframes with strength 1.0 → sum of all weights."""
signal_results = {
"ma_stack": {
tf: _make_signal(timeframe=tf, strength=1.0)
for tf in DEFAULT_WEIGHTS
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
expected = sum(DEFAULT_WEIGHTS.values())
assert abs(result[0].confluence_score - expected) < 1e-9
def test_zero_strength_contributes_zero(self):
"""D(0.30) * 0.0 + W(0.30) * 1.0 = 0.0 + 0.30 = 0.30."""
signal_results = {
"rsi": {
"D": _make_signal(timeframe="D", strength=0.0),
"W": _make_signal(timeframe="W", strength=1.0),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert abs(result[0].confluence_score - 0.30) < 1e-9
def test_unknown_timeframe_weight_defaults_to_zero(self):
"""A timeframe not in the weights dict contributes 0 to the score."""
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.5),
"UNKNOWN": _make_signal(timeframe="UNKNOWN", strength=1.0),
}
}
# UNKNOWN is not in DEFAULT_WEIGHTS, so its weight is 0.0
# But we still need a D/W/M anchor and >= 2 timeframes
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert abs(result[0].confluence_score - 0.15) < 1e-9 # 0.30 * 0.5
class TestPerTimeframeStrengths:
"""Verify per_timeframe dict contains correct strength values."""
def test_per_timeframe_populated(self):
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.7),
"W": _make_signal(timeframe="W", strength=0.9),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert result[0].per_timeframe == {"D": 0.7, "W": 0.9}
def test_active_timeframes_match_per_timeframe_keys(self):
signal_results = {
"ma_stack": {
"H4": _make_signal(timeframe="H4", strength=0.5),
"D": _make_signal(timeframe="D", strength=0.6),
"W": _make_signal(timeframe="W", strength=0.8),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert set(result[0].active_timeframes) == set(result[0].per_timeframe.keys())
class TestDominantDirection:
"""Verify direction is determined by majority vote across timeframes."""
def test_all_bullish(self):
signal_results = {
"fibonacci": {
"D": _make_signal(direction=SignalDirection.BULLISH),
"W": _make_signal(direction=SignalDirection.BULLISH),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result[0].direction == SignalDirection.BULLISH
def test_all_bearish(self):
signal_results = {
"fibonacci": {
"D": _make_signal(direction=SignalDirection.BEARISH),
"W": _make_signal(direction=SignalDirection.BEARISH),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result[0].direction == SignalDirection.BEARISH
def test_majority_bullish(self):
signal_results = {
"fibonacci": {
"D": _make_signal(direction=SignalDirection.BULLISH),
"W": _make_signal(direction=SignalDirection.BULLISH),
"M": _make_signal(direction=SignalDirection.BEARISH),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result[0].direction == SignalDirection.BULLISH
def test_tie_resolves_to_neutral(self):
signal_results = {
"fibonacci": {
"D": _make_signal(direction=SignalDirection.BULLISH),
"W": _make_signal(direction=SignalDirection.BEARISH),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result[0].direction == SignalDirection.NEUTRAL
def test_neutral_votes_do_not_count(self):
"""2 bullish + 1 neutral → bullish wins."""
signal_results = {
"fibonacci": {
"D": _make_signal(direction=SignalDirection.BULLISH),
"W": _make_signal(direction=SignalDirection.BULLISH),
"M": _make_signal(direction=SignalDirection.NEUTRAL),
}
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert result[0].direction == SignalDirection.BULLISH
class TestMultipleSignalTypes:
"""Verify that multiple signal types are processed independently."""
def test_two_signals_both_pass(self):
signal_results = {
"fibonacci": {
"D": _make_signal(signal_type="fibonacci", timeframe="D"),
"W": _make_signal(signal_type="fibonacci", timeframe="W"),
},
"rsi": {
"H4": _make_signal(signal_type="rsi", timeframe="H4"),
"D": _make_signal(signal_type="rsi", timeframe="D"),
},
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 2
types = {cs.signal_type for cs in result}
assert types == {"fibonacci", "rsi"}
def test_one_passes_one_discarded(self):
signal_results = {
"fibonacci": {
"D": _make_signal(signal_type="fibonacci", timeframe="D"),
"W": _make_signal(signal_type="fibonacci", timeframe="W"),
},
"rsi": {
# Only 1 timeframe → discarded
"D": _make_signal(signal_type="rsi", timeframe="D"),
},
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert result[0].signal_type == "fibonacci"
def test_one_passes_one_no_anchor(self):
signal_results = {
"fibonacci": {
"D": _make_signal(signal_type="fibonacci", timeframe="D"),
"W": _make_signal(signal_type="fibonacci", timeframe="W"),
},
"rsi": {
# 2 timeframes but no D/W/M → discarded
"M30": _make_signal(signal_type="rsi", timeframe="M30"),
"H1": _make_signal(signal_type="rsi", timeframe="H1"),
},
}
result = compute_confluence(signal_results, DEFAULT_WEIGHTS)
assert len(result) == 1
assert result[0].signal_type == "fibonacci"
class TestEmptyInputs:
"""Edge cases with empty inputs."""
def test_empty_signal_results(self):
result = compute_confluence({}, DEFAULT_WEIGHTS)
assert result == []
def test_empty_weights(self):
"""Signals pass filters but all weights are 0 → score is 0.0."""
signal_results = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.8),
"W": _make_signal(timeframe="W", strength=0.6),
}
}
result = compute_confluence(signal_results, {})
assert len(result) == 1
assert result[0].confluence_score == 0.0
class TestConfluenceScoreMonotonicity:
"""Requirement 3.6: more timeframes with higher weights → higher score."""
def test_adding_timeframe_increases_score(self):
"""Adding a third timeframe with non-zero strength increases the score."""
two_tf = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.8),
"W": _make_signal(timeframe="W", strength=0.6),
}
}
three_tf = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.8),
"W": _make_signal(timeframe="W", strength=0.6),
"H4": _make_signal(timeframe="H4", strength=0.5),
}
}
result_2 = compute_confluence(two_tf, DEFAULT_WEIGHTS)
result_3 = compute_confluence(three_tf, DEFAULT_WEIGHTS)
assert result_3[0].confluence_score > result_2[0].confluence_score
def test_higher_weight_timeframe_contributes_more(self):
"""D (weight 0.30) contributes more than M30 (weight 0.03) at same strength."""
with_d = {
"fibonacci": {
"D": _make_signal(timeframe="D", strength=0.5),
"W": _make_signal(timeframe="W", strength=0.5),
}
}
with_m30 = {
"fibonacci": {
"M30": _make_signal(timeframe="M30", strength=0.5),
"W": _make_signal(timeframe="W", strength=0.5),
}
}
result_d = compute_confluence(with_d, DEFAULT_WEIGHTS)
result_m30 = compute_confluence(with_m30, DEFAULT_WEIGHTS)
assert result_d[0].confluence_score > result_m30[0].confluence_score
class TestConstants:
"""Verify module-level constants match the design."""
def test_higher_timeframe_anchors(self):
assert HIGHER_TIMEFRAME_ANCHORS == frozenset({"D", "W", "M"})
def test_min_timeframe_count(self):
assert MIN_TIMEFRAME_COUNT == 2