Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

234 lines
8.1 KiB
Python

"""Output Formatter — assembles the structured SignalOutput contract.
Populates trade plans based on verdict combinations and maps
``SignalOutput`` to the existing ``Recommendation`` schema for
trading engine compatibility.
Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 10.6, 12.1, 12.2, 12.3, 12.4, 12.5
"""
from __future__ import annotations
from datetime import datetime, timezone
from services.shared.schemas import (
ActionType,
PositionSizing,
Recommendation,
RecommendationMode,
)
from services.signal_engine.config import SignalEngineConfig
from services.signal_engine.models import (
DeltaResult,
ExitSignal,
HeuristicResult,
ProbabilisticResult,
SignalOutput,
TradePlan,
Verdict,
)
# ---------------------------------------------------------------------------
# Position sizing constants
# ---------------------------------------------------------------------------
# Full position sizing (heuristic-only or dual confirmed)
_FULL_POSITION_SIZE_PCT = 0.02
_FULL_MAX_LOSS_PCT = 0.005
# Reduced position sizing for probabilistic-only BUY (50% of standard)
_REDUCED_POSITION_SIZE_PCT = 0.01
# Trade plan price levels (relative to entry)
_STOP_LOSS_FACTOR = 0.95
_TARGET_1_FACTOR = 1.05
_TARGET_2_FACTOR = 1.10
def _build_trade_plan(
price: float,
*,
dual_confirmed: bool = False,
probabilistic_only: bool = False,
) -> TradePlan:
"""Build a trade plan with position sizing based on confirmation mode.
- dual_confirmed: full position sizing with dual_confirmed flag
- probabilistic_only: 50% position sizing with probabilistic_only flag
- heuristic-only (neither flag): standard full position sizing
"""
if dual_confirmed:
position_size_pct = _FULL_POSITION_SIZE_PCT
max_loss_pct = _FULL_MAX_LOSS_PCT
elif probabilistic_only:
position_size_pct = _REDUCED_POSITION_SIZE_PCT
max_loss_pct = _FULL_MAX_LOSS_PCT
else:
# Heuristic-only BUY
position_size_pct = _FULL_POSITION_SIZE_PCT
max_loss_pct = _FULL_MAX_LOSS_PCT
return TradePlan(
entry_price=price,
stop_loss=round(price * _STOP_LOSS_FACTOR, 6),
target_1=round(price * _TARGET_1_FACTOR, 6),
target_2=round(price * _TARGET_2_FACTOR, 6),
position_size_pct=position_size_pct,
max_loss_pct=max_loss_pct,
dual_confirmed=dual_confirmed,
probabilistic_only=probabilistic_only,
)
def format_output(
ticker: str,
price: float,
heuristic: HeuristicResult,
probabilistic: ProbabilisticResult,
delta: DeltaResult,
exit_signals: list[ExitSignal],
config: SignalEngineConfig,
) -> SignalOutput:
"""Assemble the structured ``SignalOutput`` contract.
Trade plan logic:
- Both BUY → ``dual_confirmed``, full position sizing
- Probabilistic-only BUY → ``probabilistic_only``, 50% position sizing
- Heuristic-only BUY → standard position sizing
- No BUY → no trade_plan (WATCH/SKIP persisted for analysis)
"""
heuristic_buy = heuristic.verdict == Verdict.BUY
probabilistic_buy = probabilistic.verdict == Verdict.BUY
trade_plan: TradePlan | None = None
if heuristic_buy and probabilistic_buy:
# Both pipelines agree on BUY → dual confirmed
trade_plan = _build_trade_plan(
price, dual_confirmed=True, probabilistic_only=False
)
elif probabilistic_buy and not heuristic_buy:
# Probabilistic-only BUY → reduced position sizing
trade_plan = _build_trade_plan(
price, dual_confirmed=False, probabilistic_only=True
)
elif heuristic_buy and not probabilistic_buy:
# Heuristic-only BUY → standard position sizing
trade_plan = _build_trade_plan(
price, dual_confirmed=False, probabilistic_only=False
)
# else: No BUY → no trade_plan
return SignalOutput(
ticker=ticker,
timestamp=datetime.now(tz=timezone.utc),
price=price,
# Heuristic pipeline section
heuristic_verdict=heuristic.verdict.value,
heuristic_confidence=heuristic.confidence,
heuristic_s_total=heuristic.s_total,
# Probabilistic pipeline section
probabilistic_verdict=probabilistic.verdict.value,
probabilistic_p_up=probabilistic.p_up,
probabilistic_entropy=probabilistic.entropy,
probabilistic_ev_r=probabilistic.ev_r,
# Delta analysis section
delta_agreement=delta.agreement,
delta_confidence_delta=delta.confidence_delta,
delta_reasons=delta.disagreement_reasons,
# Trade plan and exit signals
trade_plan=trade_plan,
exit_signals=exit_signals,
# Detail payloads for audit
heuristic_detail=heuristic.model_dump(),
probabilistic_detail=probabilistic.model_dump(),
# Pipeline mode metadata
pipeline_mode="dual_pipeline",
shadow_mode=config.shadow_mode,
)
def signal_output_to_recommendation(output: SignalOutput) -> Recommendation:
"""Map a ``SignalOutput`` to the existing ``Recommendation`` schema.
Enables the trading engine to consume dual-pipeline outputs without
modification to its core ``evaluate_recommendation`` logic.
Confidence mapping:
- Dual confirmed: ``max(heuristic_confidence, probabilistic_P_up)``
- Probabilistic only: ``probabilistic_P_up * 0.8`` (20% haircut)
- Heuristic only: ``heuristic_confidence``
- No BUY: ``max(heuristic_confidence, probabilistic_P_up)``
Action mapping:
- BUY (either pipeline) → ``ActionType.BUY``
- WATCH → ``ActionType.WATCH``
- SKIP → ``ActionType.HOLD``
Mode: always ``RecommendationMode.PAPER_ELIGIBLE``
"""
trade_plan = output.trade_plan
# Determine confidence based on confirmation mode
if trade_plan is not None and trade_plan.dual_confirmed:
confidence = max(output.heuristic_confidence, output.probabilistic_p_up)
elif trade_plan is not None and trade_plan.probabilistic_only:
confidence = output.probabilistic_p_up * 0.8
elif trade_plan is not None:
# Heuristic-only BUY
confidence = output.heuristic_confidence
else:
# No trade plan — use the best available confidence
confidence = max(output.heuristic_confidence, output.probabilistic_p_up)
# Clamp confidence to [0, 1]
confidence = max(0.0, min(1.0, confidence))
# Determine action from verdicts
h_verdict = output.heuristic_verdict
p_verdict = output.probabilistic_verdict
if h_verdict == Verdict.BUY.value or p_verdict == Verdict.BUY.value:
action = ActionType.BUY
elif h_verdict == Verdict.WATCH.value or p_verdict == Verdict.WATCH.value:
action = ActionType.WATCH
else:
action = ActionType.HOLD
# Build position sizing from trade plan if available
position_sizing = PositionSizing()
if trade_plan is not None:
position_sizing = PositionSizing(
portfolio_pct=trade_plan.position_size_pct,
max_loss_pct=trade_plan.max_loss_pct,
)
# Build thesis from delta analysis
thesis_parts: list[str] = []
if trade_plan is not None and trade_plan.dual_confirmed:
thesis_parts.append("Dual-pipeline confirmed BUY signal")
elif trade_plan is not None and trade_plan.probabilistic_only:
thesis_parts.append("Probabilistic-only BUY signal (reduced sizing)")
elif trade_plan is not None:
thesis_parts.append("Heuristic-only BUY signal")
else:
thesis_parts.append(f"No BUY signal (H={h_verdict}, P={p_verdict})")
if output.delta_reasons:
thesis_parts.append(f"Delta reasons: {', '.join(output.delta_reasons)}")
return Recommendation(
recommendation_id=output.output_id,
ticker=output.ticker,
action=action,
mode=RecommendationMode.PAPER_ELIGIBLE,
confidence=confidence,
time_horizon="signal_engine",
thesis="; ".join(thesis_parts),
position_sizing=position_sizing,
pipeline_mode="dual_pipeline",
p_bull=output.probabilistic_p_up,
expected_value=output.probabilistic_ev_r,
generated_at=output.timestamp,
)