f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
New service at services/signal_engine/ implementing concurrent heuristic (deterministic scoring) and probabilistic (Bayesian inference) pipelines that evaluate technical signals across 6 timeframes (M30-M) and produce independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick. Components: - Input Normalizer: multi-source data assembly with sentinel fallbacks - Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave - Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors - Hard Filter Engine: macro_bias, valuation, earnings proximity gating - Heuristic Pipeline: S_total scoring with confidence-gated verdicts - Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy gating, EV_R calculation, and signal correlation penalty - Exit Engine: stop-loss, targets, trailing ATR-based stops - Delta Analyzer: pipeline agreement tracking with rolling Redis metrics - Output Formatter: SignalOutput contract + Recommendation schema mapping - Worker orchestrator: concurrent pipelines with failure isolation - Main entry point: queue polling with fail-safe config loading Infrastructure: - Migration 039: signal_engine_outputs table with 3 indexes - Helm chart: signalEngine service entry (processing tier) - Redis key: QUEUE_SIGNAL_ENGINE constant Tests: 390 tests (unit + property-based) covering all components Config: dual_pipeline_enabled=false by default (safe rollout)
301 lines
8.9 KiB
Python
301 lines
8.9 KiB
Python
"""Top-level orchestrator for a single evaluation tick.
|
|
|
|
Coordinates input normalization, exit evaluation, hard filters, signal
|
|
evaluation, both pipelines (concurrent), delta analysis, output formatting,
|
|
persistence, and Redis queue publication.
|
|
|
|
Requirements: 11.1, 11.2, 11.3, 11.4, 11.5, 11.6
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
|
|
import asyncpg
|
|
import redis.asyncio
|
|
|
|
from services.aggregation.regime import classify_regime
|
|
from services.signal_engine.config import SignalEngineConfig
|
|
from services.signal_engine.confluence import compute_confluence
|
|
from services.signal_engine.delta import analyze_delta
|
|
from services.signal_engine.exit_engine import evaluate_exits
|
|
from services.signal_engine.formatter import (
|
|
format_output,
|
|
signal_output_to_recommendation,
|
|
)
|
|
from services.signal_engine.hard_filter import evaluate_hard_filters
|
|
from services.signal_engine.heuristic import run_heuristic_pipeline
|
|
from services.signal_engine.models import (
|
|
HeuristicResult,
|
|
NormalizedInput,
|
|
ProbabilisticResult,
|
|
SignalOutput,
|
|
SignalResult,
|
|
Verdict,
|
|
)
|
|
from services.signal_engine.normalizer import normalize_input
|
|
from services.signal_engine.persistence import persist_signal_output
|
|
from services.signal_engine.probabilistic import run_probabilistic_pipeline
|
|
from services.signal_engine.signals.cup_handle import CupHandleEvaluator
|
|
from services.signal_engine.signals.elliott_wave import ElliottWaveEvaluator
|
|
from services.signal_engine.signals.fibonacci import FibonacciEvaluator
|
|
from services.signal_engine.signals.ma_stack import MAStackEvaluator
|
|
from services.signal_engine.signals.rsi import RSIEvaluator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Redis queue for trading decisions
|
|
_TRADING_QUEUE = "stonks:queue:trading_decisions"
|
|
|
|
# All signal evaluators
|
|
_EVALUATORS = [
|
|
FibonacciEvaluator(),
|
|
MAStackEvaluator(),
|
|
RSIEvaluator(),
|
|
CupHandleEvaluator(),
|
|
ElliottWaveEvaluator(),
|
|
]
|
|
|
|
# Default SKIP results used when a pipeline fails
|
|
_SKIP_HEURISTIC = HeuristicResult(
|
|
verdict=Verdict.SKIP,
|
|
confidence=0.0,
|
|
s_total=0.0,
|
|
s_company=0.0,
|
|
s_macro=0.0,
|
|
s_competitive=0.0,
|
|
signal_weights=[],
|
|
reasoning=["pipeline_error: heuristic pipeline raised an exception"],
|
|
)
|
|
|
|
_SKIP_PROBABILISTIC = ProbabilisticResult(
|
|
verdict=Verdict.SKIP,
|
|
p_up=0.5,
|
|
entropy=1.0,
|
|
ev_r=0.0,
|
|
prior=0.5,
|
|
posterior=0.5,
|
|
likelihood_ratios=[],
|
|
regime="uncertainty",
|
|
reasoning=["pipeline_error: probabilistic pipeline raised an exception"],
|
|
)
|
|
|
|
|
|
def _evaluate_signals(
|
|
normalized: NormalizedInput,
|
|
) -> dict[str, dict[str, SignalResult]]:
|
|
"""Run all signal evaluators across all timeframes.
|
|
|
|
Returns ``{signal_type: {timeframe: SignalResult}}`` for signals that
|
|
fired. Signals that returned ``None`` (insufficient data or no trigger)
|
|
are omitted.
|
|
"""
|
|
from services.signal_engine.normalizer import TIMEFRAMES
|
|
|
|
results: dict[str, dict[str, SignalResult]] = {}
|
|
|
|
for evaluator in _EVALUATORS:
|
|
for tf in TIMEFRAMES:
|
|
bars = normalized.bars.get(tf, [])
|
|
if not bars:
|
|
continue
|
|
|
|
try:
|
|
result = evaluator.evaluate(bars, tf)
|
|
except Exception:
|
|
logger.warning(
|
|
"Signal evaluator %s failed on %s/%s",
|
|
type(evaluator).__name__,
|
|
normalized.ticker,
|
|
tf,
|
|
exc_info=True,
|
|
)
|
|
continue
|
|
|
|
if result is not None:
|
|
results.setdefault(result.signal_type, {})[tf] = result
|
|
|
|
return results
|
|
|
|
|
|
async def evaluate_tick(
|
|
pool: asyncpg.Pool,
|
|
redis_client: redis.asyncio.Redis,
|
|
ticker: str,
|
|
config: SignalEngineConfig,
|
|
) -> SignalOutput | None:
|
|
"""Run a full evaluation tick for a single ticker.
|
|
|
|
Steps:
|
|
1. Normalize inputs (single fetch, shared reference)
|
|
2. Evaluate exit conditions for open positions
|
|
3. Run hard filters (short-circuit if filtered)
|
|
4. Evaluate signals across timeframes via Signal Library
|
|
5. Compute confluence
|
|
6. Classify regime via existing ``classify_regime()``
|
|
7. Run both pipelines concurrently via ``asyncio.gather``
|
|
8. Compute delta analysis
|
|
9. Format output
|
|
10. Persist to database and publish to Redis queue
|
|
|
|
Returns ``None`` if the ticker is hard-filtered or both pipelines fail.
|
|
|
|
Requirements: 11.1, 11.2, 11.3, 11.4, 11.5, 11.6
|
|
"""
|
|
tick_start = time.monotonic()
|
|
|
|
# Step 1: Normalize inputs
|
|
normalized = await normalize_input(pool, ticker, config)
|
|
|
|
# Step 2: Evaluate exit conditions (before pipelines — Req 8.6)
|
|
current_price = normalized.current_price or 0.0
|
|
exit_signals = evaluate_exits(
|
|
normalized.open_positions,
|
|
{ticker: current_price},
|
|
config.exit_config,
|
|
)
|
|
|
|
# Step 3: Hard filters
|
|
filter_result = evaluate_hard_filters(normalized, config.hard_filter_config)
|
|
if filter_result.filtered:
|
|
logger.info(
|
|
"Ticker %s hard-filtered: %s",
|
|
ticker,
|
|
", ".join(filter_result.reasons),
|
|
)
|
|
return None
|
|
|
|
# Step 4: Evaluate signals across timeframes
|
|
signal_results = _evaluate_signals(normalized)
|
|
|
|
# Step 5: Compute confluence
|
|
confluence_signals = compute_confluence(signal_results, config.timeframe_weights)
|
|
|
|
# Step 6: Classify regime
|
|
regime = classify_regime(normalized.closing_prices, normalized.returns)
|
|
|
|
# Step 7: Run both pipelines concurrently
|
|
heuristic_start = time.monotonic()
|
|
|
|
async def _run_heuristic() -> HeuristicResult:
|
|
return run_heuristic_pipeline(
|
|
normalized, confluence_signals, config.heuristic_config
|
|
)
|
|
|
|
async def _run_probabilistic() -> ProbabilisticResult:
|
|
return run_probabilistic_pipeline(
|
|
normalized, confluence_signals, regime, config.probabilistic_config
|
|
)
|
|
|
|
results = await asyncio.gather(
|
|
_run_heuristic(),
|
|
_run_probabilistic(),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
pipeline_elapsed = time.monotonic() - heuristic_start
|
|
|
|
# Handle pipeline exceptions — SKIP verdict for failed pipeline
|
|
heuristic_result: HeuristicResult
|
|
probabilistic_result: ProbabilisticResult
|
|
|
|
if isinstance(results[0], BaseException):
|
|
logger.error(
|
|
"Heuristic pipeline failed for %s: %s",
|
|
ticker,
|
|
results[0],
|
|
exc_info=results[0],
|
|
)
|
|
heuristic_result = _SKIP_HEURISTIC
|
|
else:
|
|
heuristic_result = results[0]
|
|
|
|
if isinstance(results[1], BaseException):
|
|
logger.error(
|
|
"Probabilistic pipeline failed for %s: %s",
|
|
ticker,
|
|
results[1],
|
|
exc_info=results[1],
|
|
)
|
|
probabilistic_result = _SKIP_PROBABILISTIC
|
|
else:
|
|
probabilistic_result = results[1]
|
|
|
|
# If both pipelines failed, return None
|
|
if isinstance(results[0], BaseException) and isinstance(results[1], BaseException):
|
|
logger.error(
|
|
"Both pipelines failed for %s — skipping tick",
|
|
ticker,
|
|
)
|
|
return None
|
|
|
|
logger.info(
|
|
"Pipelines completed for %s in %.3fs — heuristic=%s, probabilistic=%s",
|
|
ticker,
|
|
pipeline_elapsed,
|
|
heuristic_result.verdict.value,
|
|
probabilistic_result.verdict.value,
|
|
)
|
|
|
|
# Step 8: Delta analysis
|
|
delta = await analyze_delta(
|
|
heuristic_result, probabilistic_result, redis_client, ticker
|
|
)
|
|
|
|
# Step 9: Format output
|
|
price = normalized.current_price or 0.0
|
|
output = format_output(
|
|
ticker,
|
|
price,
|
|
heuristic_result,
|
|
probabilistic_result,
|
|
delta,
|
|
exit_signals,
|
|
config,
|
|
)
|
|
|
|
# Step 10: Persist to database
|
|
await persist_signal_output(pool, output)
|
|
|
|
# Step 11: Publish to trading queue (only if at least one BUY and not shadow_mode)
|
|
has_buy = (
|
|
heuristic_result.verdict == Verdict.BUY
|
|
or probabilistic_result.verdict == Verdict.BUY
|
|
)
|
|
|
|
if has_buy and not config.shadow_mode:
|
|
try:
|
|
recommendation = signal_output_to_recommendation(output)
|
|
await redis_client.rpush(
|
|
_TRADING_QUEUE,
|
|
recommendation.model_dump_json(),
|
|
)
|
|
logger.info(
|
|
"Published trading recommendation for %s to %s",
|
|
ticker,
|
|
_TRADING_QUEUE,
|
|
)
|
|
except Exception:
|
|
logger.error(
|
|
"Failed to publish trading recommendation for %s",
|
|
ticker,
|
|
exc_info=True,
|
|
)
|
|
elif has_buy and config.shadow_mode:
|
|
logger.info(
|
|
"Shadow mode: BUY signal for %s persisted but not published to trading queue",
|
|
ticker,
|
|
)
|
|
|
|
# Log wall-clock execution time
|
|
tick_elapsed = time.monotonic() - tick_start
|
|
logger.info(
|
|
"Evaluation tick for %s completed in %.3fs",
|
|
ticker,
|
|
tick_elapsed,
|
|
)
|
|
|
|
return output
|