Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

137 lines
4.5 KiB
Python

"""Multi-Timeframe Confluence Engine.
Evaluates signals across multiple timeframes and computes weighted confluence
scores. Signals must trigger on at least 2 timeframes **and** include at
least one higher-timeframe anchor (D, W, or M) to pass the confluence filter.
The weighted confluence score is:
C_confluence = Σ(w_tf · s_tf)
where ``w_tf`` is the timeframe weight and ``s_tf`` is the signal strength on
that timeframe (only summed over timeframes where the signal triggered).
Requirements: 3.1, 3.2, 3.3, 3.4, 3.5, 3.6
"""
from __future__ import annotations
import logging
from collections import Counter
from services.signal_engine.models import (
ConfluenceSignal,
SignalDirection,
SignalResult,
)
logger = logging.getLogger(__name__)
# Higher-timeframe anchors — at least one must be present for a signal to pass.
HIGHER_TIMEFRAME_ANCHORS: frozenset[str] = frozenset({"D", "W", "M"})
# Minimum number of timeframes a signal must trigger on.
MIN_TIMEFRAME_COUNT: int = 2
def _dominant_direction(results: dict[str, SignalResult]) -> SignalDirection:
"""Determine the dominant direction from a set of per-timeframe results.
Counts bullish vs bearish votes across active timeframes. Ties resolve
to NEUTRAL.
"""
counts: Counter[SignalDirection] = Counter()
for sr in results.values():
counts[sr.direction] += 1
bullish = counts.get(SignalDirection.BULLISH, 0)
bearish = counts.get(SignalDirection.BEARISH, 0)
if bullish > bearish:
return SignalDirection.BULLISH
if bearish > bullish:
return SignalDirection.BEARISH
return SignalDirection.NEUTRAL
def compute_confluence(
signal_results: dict[str, dict[str, SignalResult]],
weights: dict[str, float],
) -> list[ConfluenceSignal]:
"""Compute weighted confluence scores across timeframes.
Args:
signal_results: ``{signal_type: {timeframe: SignalResult}}``.
Each inner dict maps timeframe labels (e.g. ``"D"``, ``"H4"``)
to the :class:`SignalResult` produced by the signal evaluator on
that timeframe.
weights: ``{timeframe: weight}`` e.g.
``{"M30": 0.03, "H1": 0.07, "H4": 0.15, "D": 0.30, "W": 0.30, "M": 0.15}``.
Returns:
List of :class:`ConfluenceSignal` objects that pass **both** filters:
1. **Minimum confluence threshold** — the signal must trigger on at
least :data:`MIN_TIMEFRAME_COUNT` (2) timeframes.
2. **Higher-timeframe anchor** — at least one of D, W, or M must be
among the active timeframes.
Requirements: 3.1, 3.2, 3.3, 3.4, 3.5, 3.6
"""
confluence_signals: list[ConfluenceSignal] = []
for signal_type, tf_results in signal_results.items():
active_timeframes = list(tf_results.keys())
# 3.3 — Minimum confluence threshold: discard if < 2 timeframes
if len(active_timeframes) < MIN_TIMEFRAME_COUNT:
logger.debug(
"Signal %s discarded: only %d timeframe(s) triggered (need >= %d)",
signal_type,
len(active_timeframes),
MIN_TIMEFRAME_COUNT,
)
continue
# 3.4 — Higher-timeframe anchor: discard if none of D, W, M present
if not HIGHER_TIMEFRAME_ANCHORS.intersection(active_timeframes):
logger.debug(
"Signal %s discarded: no higher-timeframe anchor (D/W/M) "
"among active timeframes %s",
signal_type,
active_timeframes,
)
continue
# 3.2 — Compute weighted confluence score
per_timeframe: dict[str, float] = {}
confluence_score = 0.0
for tf, sr in tf_results.items():
w = weights.get(tf, 0.0)
per_timeframe[tf] = sr.strength
confluence_score += w * sr.strength
# Determine dominant direction across active timeframes
direction = _dominant_direction(tf_results)
confluence_signals.append(
ConfluenceSignal(
signal_type=signal_type,
direction=direction,
confluence_score=confluence_score,
active_timeframes=active_timeframes,
per_timeframe=per_timeframe,
)
)
logger.debug(
"Signal %s passed confluence: score=%.4f, direction=%s, "
"timeframes=%s",
signal_type,
confluence_score,
direction.value,
active_timeframes,
)
return confluence_signals