Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

138 lines
4.8 KiB
Python

"""Signal cluster classification and within-cluster correlation penalty.
Groups signals into four clusters — momentum, structure, volatility,
fundamentals — and applies exponential decay within each cluster to prevent
likelihood ratio stacking inflation in the Bayesian pipeline.
Within a cluster the strongest signal (by ``|log_lr|``) contributes at full
weight; subsequent signals contribute at ``0.5^(n-1)`` decay. Signals in
different clusters are treated as independent (no penalty). Single-signal
clusters receive no penalty.
Requirements: 7.1, 7.2, 7.3, 7.4
"""
from __future__ import annotations
import logging
from collections import defaultdict
from enum import Enum
from services.signal_engine.models import LikelihoodRatio
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Signal cluster enum
# ---------------------------------------------------------------------------
class SignalCluster(str, Enum):
"""Correlation cluster for grouping related signals."""
MOMENTUM = "momentum" # MA stack, RSI
STRUCTURE = "structure" # Fibonacci, Elliott Wave, Cup & Handle
VOLATILITY = "volatility" # ATR-based, Bollinger-derived
FUNDAMENTALS = "fundamentals" # valuation, earnings, macro
# ---------------------------------------------------------------------------
# Signal type → cluster mapping
# ---------------------------------------------------------------------------
_SIGNAL_CLUSTER_MAP: dict[str, SignalCluster] = {
# Momentum
"ma_stack": SignalCluster.MOMENTUM,
"rsi": SignalCluster.MOMENTUM,
# Structure
"fibonacci": SignalCluster.STRUCTURE,
"elliott_wave": SignalCluster.STRUCTURE,
"cup_handle": SignalCluster.STRUCTURE,
# Volatility
"atr": SignalCluster.VOLATILITY,
"bollinger": SignalCluster.VOLATILITY,
# Fundamentals
"valuation": SignalCluster.FUNDAMENTALS,
"earnings": SignalCluster.FUNDAMENTALS,
"macro": SignalCluster.FUNDAMENTALS,
}
# Decay factor applied to successive signals within the same cluster.
_WITHIN_CLUSTER_DECAY = 0.5
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def classify_signal(signal_type: str) -> SignalCluster:
"""Map a signal type string to its correlation cluster.
Falls back to :pyattr:`SignalCluster.FUNDAMENTALS` for unknown signal
types so that unrecognised signals still participate in the penalty
system rather than silently bypassing it.
"""
cluster = _SIGNAL_CLUSTER_MAP.get(signal_type)
if cluster is None:
logger.warning(
"Unknown signal type %r — defaulting to FUNDAMENTALS cluster",
signal_type,
)
return SignalCluster.FUNDAMENTALS
return cluster
def apply_correlation_penalty(
likelihood_ratios: list[LikelihoodRatio],
) -> list[LikelihoodRatio]:
"""Apply within-cluster decay penalty to correlated signals.
Algorithm:
1. Group LRs by cluster.
2. Within each cluster, sort by ``abs(log_lr)`` descending (strongest
first).
3. The strongest signal keeps its full ``log_lr`` as
``penalized_log_lr``.
4. The *n*-th signal (1-indexed) receives
``penalized_log_lr = log_lr * 0.5^(n-1)``.
5. Single-signal clusters are untouched (``penalized_log_lr = log_lr``).
6. Cross-cluster signals are independent — no penalty applied across
clusters.
Returns a **new** list of :class:`LikelihoodRatio` instances with
updated ``penalized_log_lr`` values. The original objects are not
mutated.
"""
if not likelihood_ratios:
return []
# Group by cluster
clusters: dict[str, list[tuple[int, LikelihoodRatio]]] = defaultdict(list)
for idx, lr in enumerate(likelihood_ratios):
clusters[lr.cluster].append((idx, lr))
# Build result list preserving original order
result: list[LikelihoodRatio | None] = [None] * len(likelihood_ratios)
for cluster_name, members in clusters.items():
# Sort by abs(log_lr) descending — strongest first
sorted_members = sorted(members, key=lambda t: abs(t[1].log_lr), reverse=True)
for rank, (orig_idx, lr) in enumerate(sorted_members):
decay = _WITHIN_CLUSTER_DECAY ** rank # 0.5^0=1, 0.5^1=0.5, ...
penalized = lr.log_lr * decay
result[orig_idx] = LikelihoodRatio(
signal_type=lr.signal_type,
cluster=lr.cluster,
lr=lr.lr,
log_lr=lr.log_lr,
penalized_log_lr=penalized,
hit_rate=lr.hit_rate,
strength=lr.strength,
)
# Safety: should never happen, but guard against it
return [r for r in result if r is not None]