Files
stonks-oracle/tests/test_pbt_signal_engine_confluence.py
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

178 lines
6.4 KiB
Python

# Feature: dual-pipeline-signal-engine, Property: Confluence score monotonicity
"""Property-based tests for the Multi-Timeframe Confluence Engine.
Feature: dual-pipeline-signal-engine
Tests the confluence score monotonicity property from the design specification:
activating a signal on an additional timeframe with non-zero weight always
increases or maintains the confluence score.
Requirements: 3.6, 17.5
"""
from __future__ import annotations
from hypothesis import given, settings
from hypothesis import strategies as st
from services.signal_engine.confluence import compute_confluence
from services.signal_engine.models import SignalDirection, SignalResult
# ---------------------------------------------------------------------------
# Property: Confluence score monotonicity
# Validates: Requirements 3.6, 17.5
# ---------------------------------------------------------------------------
# Default timeframe weights per the design specification
DEFAULT_WEIGHTS: dict[str, float] = {
"M30": 0.03,
"H1": 0.07,
"H4": 0.15,
"D": 0.30,
"W": 0.30,
"M": 0.15,
}
ALL_TIMEFRAMES = list(DEFAULT_WEIGHTS.keys())
ANCHOR_TIMEFRAMES = ["D", "W", "M"]
NON_ANCHOR_TIMEFRAMES = ["M30", "H1", "H4"]
# ---------------------------------------------------------------------------
# Hypothesis strategies
# ---------------------------------------------------------------------------
_direction = st.sampled_from([SignalDirection.BULLISH, SignalDirection.BEARISH])
_nonzero_strength = st.floats(
min_value=0.01, max_value=1.0, allow_nan=False, allow_infinity=False,
)
_confidence = st.floats(
min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False,
)
_signal_type = st.just("test_signal")
def _signal_result(
timeframe: str,
strength: st.SearchStrategy[float] = _nonzero_strength,
) -> st.SearchStrategy[SignalResult]:
"""Build a SignalResult for a given timeframe with non-zero strength."""
return st.builds(
SignalResult,
signal_type=_signal_type,
timeframe=st.just(timeframe),
strength=strength,
direction=_direction,
confidence=_confidence,
)
@st.composite
def _base_and_extra_timeframe(draw: st.DrawFn) -> tuple[dict[str, SignalResult], str]:
"""Generate a base set of signal results that passes confluence, plus one extra timeframe.
The base set has at least 2 timeframes including at least one D/W/M anchor.
The extra timeframe is not in the base set and has a non-zero weight.
"""
# Pick 1 anchor timeframe (guaranteed)
anchor = draw(st.sampled_from(ANCHOR_TIMEFRAMES))
# Pick 1-4 additional timeframes from the remaining (to get at least 2 total)
remaining = [tf for tf in ALL_TIMEFRAMES if tf != anchor]
additional_count = draw(st.integers(min_value=1, max_value=min(4, len(remaining))))
additional = draw(
st.lists(
st.sampled_from(remaining),
min_size=additional_count,
max_size=additional_count,
unique=True,
)
)
base_tfs = [anchor] + additional
# Build signal results for the base set
base_results: dict[str, SignalResult] = {}
for tf in base_tfs:
base_results[tf] = draw(_signal_result(tf))
# Pick an extra timeframe NOT in the base set
unused = [tf for tf in ALL_TIMEFRAMES if tf not in base_tfs]
if not unused:
# All 6 timeframes used — remove one non-anchor from base to free it up
removable = [tf for tf in base_tfs if tf not in ANCHOR_TIMEFRAMES]
if not removable:
# All are anchors — remove one that isn't the primary anchor
removable = [tf for tf in base_tfs if tf != anchor]
to_remove = draw(st.sampled_from(removable))
del base_results[to_remove]
unused = [to_remove]
extra_tf = draw(st.sampled_from(unused))
return base_results, extra_tf
# ---------------------------------------------------------------------------
# Property test
# ---------------------------------------------------------------------------
@given(data=st.data(), base_and_extra=_base_and_extra_timeframe())
@settings(max_examples=100)
def test_confluence_score_monotonicity(
data: st.DataObject,
base_and_extra: tuple[dict[str, SignalResult], str],
) -> None:
"""**Validates: Requirements 3.6, 17.5**
Given a signal that already passes confluence (≥2 timeframes, ≥1 D/W/M
anchor), adding an additional timeframe with non-zero strength and
non-zero weight SHALL always increase or maintain the confluence score.
The weighted confluence score is C = Σ(w_tf · s_tf). Since both w_tf > 0
and s_tf > 0 for the added timeframe, the new term is strictly positive,
so the score must increase.
"""
base_results, extra_tf = base_and_extra
signal_type = "test_signal"
# Compute confluence for the base set
base_input = {signal_type: dict(base_results)}
base_confluence = compute_confluence(base_input, DEFAULT_WEIGHTS)
# The base set should pass confluence (≥2 TFs, ≥1 anchor)
assert len(base_confluence) == 1, (
f"Expected base set to pass confluence but got {len(base_confluence)} signals.\n"
f" Base timeframes: {list(base_results.keys())}"
)
base_score = base_confluence[0].confluence_score
# Add the extra timeframe with non-zero strength
extra_result = data.draw(_signal_result(extra_tf))
extended_results = dict(base_results)
extended_results[extra_tf] = extra_result
# Compute confluence for the extended set
extended_input = {signal_type: extended_results}
extended_confluence = compute_confluence(extended_input, DEFAULT_WEIGHTS)
# The extended set must also pass confluence (superset of a passing set)
assert len(extended_confluence) == 1, (
f"Expected extended set to pass confluence but got "
f"{len(extended_confluence)} signals.\n"
f" Extended timeframes: {list(extended_results.keys())}"
)
new_score = extended_confluence[0].confluence_score
# Monotonicity: new_score >= base_score
assert new_score >= base_score, (
f"Confluence score decreased when adding timeframe {extra_tf}!\n"
f" Base score: {base_score:.6f} (timeframes: {list(base_results.keys())})\n"
f" Extended score: {new_score:.6f} (timeframes: {list(extended_results.keys())})\n"
f" Added TF weight: {DEFAULT_WEIGHTS[extra_tf]}, "
f"strength: {extra_result.strength:.6f}"
)