Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

183 lines
6.2 KiB
Python

"""Moving average stack signal evaluator.
Detects bullish alignment (MA_10 > MA_20 > MA_50 > MA_200) and bearish
alignment (MA_10 < MA_20 < MA_50 < MA_200), producing a signal strength
proportional to the degree of alignment.
Full alignment (4/4 MAs in order) yields strength 1.0, partial alignment
(3/4) yields 0.6, and no alignment returns ``None`` (no signal).
"""
from __future__ import annotations
from services.signal_engine.models import OHLCVBar, SignalDirection, SignalResult
from services.signal_engine.signals.base import compute_sma, validate_lookback
# MA periods used for stack evaluation
MA_PERIODS: list[int] = [10, 20, 50, 200]
# Minimum number of bars required (longest MA period)
MIN_BARS: int = 200
# Strength values
_FULL_ALIGNMENT_STRENGTH: float = 1.0
_PARTIAL_ALIGNMENT_STRENGTH: float = 0.6
# Confidence multiplier (high confidence for clear alignment patterns)
_CONFIDENCE_MULTIPLIER: float = 0.9
class MAStackEvaluator:
"""Moving average stack signal evaluator.
Satisfies the :class:`~services.signal_engine.signals.base.SignalEvaluator`
protocol.
Computes MA_10, MA_20, MA_50, and MA_200 and checks whether they are
in bullish or bearish order. Full alignment (all four in strict order)
produces strength 1.0; partial alignment (any three consecutive in order)
produces strength 0.6. When no alignment is detected the evaluator
returns ``None``.
"""
# ------------------------------------------------------------------
# Public API (SignalEvaluator protocol)
# ------------------------------------------------------------------
def evaluate(
self,
bars: list[OHLCVBar],
timeframe: str,
) -> SignalResult | None:
"""Evaluate moving average stack alignment on *bars*.
Returns ``None`` when there are fewer than 200 bars (insufficient
data for MA_200) or when no alignment is detected.
"""
if not validate_lookback(bars, MIN_BARS):
return None
# Compute all four moving averages
ma_10 = compute_sma(bars, 10)
ma_20 = compute_sma(bars, 20)
ma_50 = compute_sma(bars, 50)
ma_200 = compute_sma(bars, 200)
# Safety check — compute_sma returns None on insufficient data
if ma_10 is None or ma_20 is None or ma_50 is None or ma_200 is None:
return None
ma_values = [ma_10, ma_20, ma_50, ma_200]
# Check full bullish alignment: MA_10 > MA_20 > MA_50 > MA_200
full_bullish = ma_10 > ma_20 > ma_50 > ma_200
# Check full bearish alignment: MA_10 < MA_20 < MA_50 < MA_200
full_bearish = ma_10 < ma_20 < ma_50 < ma_200
if full_bullish:
return self._build_result(
direction=SignalDirection.BULLISH,
strength=_FULL_ALIGNMENT_STRENGTH,
alignment="full_bullish",
timeframe=timeframe,
ma_values=ma_values,
)
if full_bearish:
return self._build_result(
direction=SignalDirection.BEARISH,
strength=_FULL_ALIGNMENT_STRENGTH,
alignment="full_bearish",
timeframe=timeframe,
ma_values=ma_values,
)
# Check partial alignment (3 out of 4 consecutive MAs in order)
partial_bullish = self._check_partial_bullish(ma_values)
partial_bearish = self._check_partial_bearish(ma_values)
if partial_bullish:
return self._build_result(
direction=SignalDirection.BULLISH,
strength=_PARTIAL_ALIGNMENT_STRENGTH,
alignment="partial_bullish",
timeframe=timeframe,
ma_values=ma_values,
)
if partial_bearish:
return self._build_result(
direction=SignalDirection.BEARISH,
strength=_PARTIAL_ALIGNMENT_STRENGTH,
alignment="partial_bearish",
timeframe=timeframe,
ma_values=ma_values,
)
# No alignment detected — no signal
return None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _check_partial_bullish(ma_values: list[float]) -> bool:
"""Return ``True`` if any 3 consecutive MAs are in bullish order.
Checks windows [0:3] and [1:4] of the ordered MA list
(MA_10, MA_20, MA_50, MA_200) for strictly descending values
(higher MA value = bullish when shorter period > longer period).
"""
# Window 1: MA_10 > MA_20 > MA_50
if ma_values[0] > ma_values[1] > ma_values[2]:
return True
# Window 2: MA_20 > MA_50 > MA_200
if ma_values[1] > ma_values[2] > ma_values[3]:
return True
return False
@staticmethod
def _check_partial_bearish(ma_values: list[float]) -> bool:
"""Return ``True`` if any 3 consecutive MAs are in bearish order.
Checks windows [0:3] and [1:4] of the ordered MA list
for strictly ascending values (lower MA value = bearish when
shorter period < longer period).
"""
# Window 1: MA_10 < MA_20 < MA_50
if ma_values[0] < ma_values[1] < ma_values[2]:
return True
# Window 2: MA_20 < MA_50 < MA_200
if ma_values[1] < ma_values[2] < ma_values[3]:
return True
return False
@staticmethod
def _build_result(
*,
direction: SignalDirection,
strength: float,
alignment: str,
timeframe: str,
ma_values: list[float],
) -> SignalResult:
"""Construct a ``SignalResult`` for the MA stack signal."""
confidence = strength * _CONFIDENCE_MULTIPLIER
return SignalResult(
signal_type="ma_stack",
timeframe=timeframe,
strength=strength,
direction=direction,
confidence=confidence,
metadata={
"ma_10": ma_values[0],
"ma_20": ma_values[1],
"ma_50": ma_values[2],
"ma_200": ma_values[3],
"alignment": alignment,
},
)