Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

301 lines
8.9 KiB
Python

"""Top-level orchestrator for a single evaluation tick.
Coordinates input normalization, exit evaluation, hard filters, signal
evaluation, both pipelines (concurrent), delta analysis, output formatting,
persistence, and Redis queue publication.
Requirements: 11.1, 11.2, 11.3, 11.4, 11.5, 11.6
"""
from __future__ import annotations
import asyncio
import logging
import time
import asyncpg
import redis.asyncio
from services.aggregation.regime import classify_regime
from services.signal_engine.config import SignalEngineConfig
from services.signal_engine.confluence import compute_confluence
from services.signal_engine.delta import analyze_delta
from services.signal_engine.exit_engine import evaluate_exits
from services.signal_engine.formatter import (
format_output,
signal_output_to_recommendation,
)
from services.signal_engine.hard_filter import evaluate_hard_filters
from services.signal_engine.heuristic import run_heuristic_pipeline
from services.signal_engine.models import (
HeuristicResult,
NormalizedInput,
ProbabilisticResult,
SignalOutput,
SignalResult,
Verdict,
)
from services.signal_engine.normalizer import normalize_input
from services.signal_engine.persistence import persist_signal_output
from services.signal_engine.probabilistic import run_probabilistic_pipeline
from services.signal_engine.signals.cup_handle import CupHandleEvaluator
from services.signal_engine.signals.elliott_wave import ElliottWaveEvaluator
from services.signal_engine.signals.fibonacci import FibonacciEvaluator
from services.signal_engine.signals.ma_stack import MAStackEvaluator
from services.signal_engine.signals.rsi import RSIEvaluator
logger = logging.getLogger(__name__)
# Redis queue for trading decisions
_TRADING_QUEUE = "stonks:queue:trading_decisions"
# All signal evaluators
_EVALUATORS = [
FibonacciEvaluator(),
MAStackEvaluator(),
RSIEvaluator(),
CupHandleEvaluator(),
ElliottWaveEvaluator(),
]
# Default SKIP results used when a pipeline fails
_SKIP_HEURISTIC = HeuristicResult(
verdict=Verdict.SKIP,
confidence=0.0,
s_total=0.0,
s_company=0.0,
s_macro=0.0,
s_competitive=0.0,
signal_weights=[],
reasoning=["pipeline_error: heuristic pipeline raised an exception"],
)
_SKIP_PROBABILISTIC = ProbabilisticResult(
verdict=Verdict.SKIP,
p_up=0.5,
entropy=1.0,
ev_r=0.0,
prior=0.5,
posterior=0.5,
likelihood_ratios=[],
regime="uncertainty",
reasoning=["pipeline_error: probabilistic pipeline raised an exception"],
)
def _evaluate_signals(
normalized: NormalizedInput,
) -> dict[str, dict[str, SignalResult]]:
"""Run all signal evaluators across all timeframes.
Returns ``{signal_type: {timeframe: SignalResult}}`` for signals that
fired. Signals that returned ``None`` (insufficient data or no trigger)
are omitted.
"""
from services.signal_engine.normalizer import TIMEFRAMES
results: dict[str, dict[str, SignalResult]] = {}
for evaluator in _EVALUATORS:
for tf in TIMEFRAMES:
bars = normalized.bars.get(tf, [])
if not bars:
continue
try:
result = evaluator.evaluate(bars, tf)
except Exception:
logger.warning(
"Signal evaluator %s failed on %s/%s",
type(evaluator).__name__,
normalized.ticker,
tf,
exc_info=True,
)
continue
if result is not None:
results.setdefault(result.signal_type, {})[tf] = result
return results
async def evaluate_tick(
pool: asyncpg.Pool,
redis_client: redis.asyncio.Redis,
ticker: str,
config: SignalEngineConfig,
) -> SignalOutput | None:
"""Run a full evaluation tick for a single ticker.
Steps:
1. Normalize inputs (single fetch, shared reference)
2. Evaluate exit conditions for open positions
3. Run hard filters (short-circuit if filtered)
4. Evaluate signals across timeframes via Signal Library
5. Compute confluence
6. Classify regime via existing ``classify_regime()``
7. Run both pipelines concurrently via ``asyncio.gather``
8. Compute delta analysis
9. Format output
10. Persist to database and publish to Redis queue
Returns ``None`` if the ticker is hard-filtered or both pipelines fail.
Requirements: 11.1, 11.2, 11.3, 11.4, 11.5, 11.6
"""
tick_start = time.monotonic()
# Step 1: Normalize inputs
normalized = await normalize_input(pool, ticker, config)
# Step 2: Evaluate exit conditions (before pipelines — Req 8.6)
current_price = normalized.current_price or 0.0
exit_signals = evaluate_exits(
normalized.open_positions,
{ticker: current_price},
config.exit_config,
)
# Step 3: Hard filters
filter_result = evaluate_hard_filters(normalized, config.hard_filter_config)
if filter_result.filtered:
logger.info(
"Ticker %s hard-filtered: %s",
ticker,
", ".join(filter_result.reasons),
)
return None
# Step 4: Evaluate signals across timeframes
signal_results = _evaluate_signals(normalized)
# Step 5: Compute confluence
confluence_signals = compute_confluence(signal_results, config.timeframe_weights)
# Step 6: Classify regime
regime = classify_regime(normalized.closing_prices, normalized.returns)
# Step 7: Run both pipelines concurrently
heuristic_start = time.monotonic()
async def _run_heuristic() -> HeuristicResult:
return run_heuristic_pipeline(
normalized, confluence_signals, config.heuristic_config
)
async def _run_probabilistic() -> ProbabilisticResult:
return run_probabilistic_pipeline(
normalized, confluence_signals, regime, config.probabilistic_config
)
results = await asyncio.gather(
_run_heuristic(),
_run_probabilistic(),
return_exceptions=True,
)
pipeline_elapsed = time.monotonic() - heuristic_start
# Handle pipeline exceptions — SKIP verdict for failed pipeline
heuristic_result: HeuristicResult
probabilistic_result: ProbabilisticResult
if isinstance(results[0], BaseException):
logger.error(
"Heuristic pipeline failed for %s: %s",
ticker,
results[0],
exc_info=results[0],
)
heuristic_result = _SKIP_HEURISTIC
else:
heuristic_result = results[0]
if isinstance(results[1], BaseException):
logger.error(
"Probabilistic pipeline failed for %s: %s",
ticker,
results[1],
exc_info=results[1],
)
probabilistic_result = _SKIP_PROBABILISTIC
else:
probabilistic_result = results[1]
# If both pipelines failed, return None
if isinstance(results[0], BaseException) and isinstance(results[1], BaseException):
logger.error(
"Both pipelines failed for %s — skipping tick",
ticker,
)
return None
logger.info(
"Pipelines completed for %s in %.3fs — heuristic=%s, probabilistic=%s",
ticker,
pipeline_elapsed,
heuristic_result.verdict.value,
probabilistic_result.verdict.value,
)
# Step 8: Delta analysis
delta = await analyze_delta(
heuristic_result, probabilistic_result, redis_client, ticker
)
# Step 9: Format output
price = normalized.current_price or 0.0
output = format_output(
ticker,
price,
heuristic_result,
probabilistic_result,
delta,
exit_signals,
config,
)
# Step 10: Persist to database
await persist_signal_output(pool, output)
# Step 11: Publish to trading queue (only if at least one BUY and not shadow_mode)
has_buy = (
heuristic_result.verdict == Verdict.BUY
or probabilistic_result.verdict == Verdict.BUY
)
if has_buy and not config.shadow_mode:
try:
recommendation = signal_output_to_recommendation(output)
await redis_client.rpush(
_TRADING_QUEUE,
recommendation.model_dump_json(),
)
logger.info(
"Published trading recommendation for %s to %s",
ticker,
_TRADING_QUEUE,
)
except Exception:
logger.error(
"Failed to publish trading recommendation for %s",
ticker,
exc_info=True,
)
elif has_buy and config.shadow_mode:
logger.info(
"Shadow mode: BUY signal for %s persisted but not published to trading queue",
ticker,
)
# Log wall-clock execution time
tick_elapsed = time.monotonic() - tick_start
logger.info(
"Evaluation tick for %s completed in %.3fs",
ticker,
tick_elapsed,
)
return output