Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

181 lines
5.9 KiB
Python

"""Signal engine entry point — asyncio event loop and queue polling.
Connects to PostgreSQL and Redis, loads configuration from ``risk_configs``,
and polls the ``stonks:queue:signal_engine`` queue indefinitely. Each
queue message triggers a full evaluation tick via ``evaluate_tick()``.
When ``dual_pipeline_enabled`` is ``False`` the worker sleeps and retries
(fail-safe: the existing pipeline continues unchanged).
Requirements: 13.1, 13.6, 13.7, 16.1, 16.6
"""
from __future__ import annotations
import asyncio
import json
import logging
import sys
import asyncpg
import redis.asyncio
from services.shared.config import load_config as load_app_config
from services.shared.redis_keys import QUEUE_SIGNAL_ENGINE, queue_key
from services.signal_engine.config import load_config as load_signal_config
from services.signal_engine.worker import evaluate_tick
logger = logging.getLogger(__name__)
# BLPOP timeout in seconds — how long to wait for a queue message before
# looping back to check the enabled flag.
_BLPOP_TIMEOUT = 5
async def main() -> None:
"""Start the signal engine worker loop.
1. Connect to PostgreSQL (asyncpg pool) using env vars from
``services.shared.config``.
2. Connect to Redis (redis.asyncio) using env vars.
3. Load signal engine config via ``load_config(pool)``.
4. Log active configuration at startup.
5. Poll ``stonks:queue:signal_engine`` queue indefinitely (BLPOP).
6. Check ``dual_pipeline_enabled`` flag; if disabled, sleep and retry.
7. On config read failure, default to disabled (fail-safe).
8. Parse queue message as JSON: ``{"ticker": "AAPL", "triggered_at": "..."}``.
9. Call ``evaluate_tick(pool, redis, ticker, config)`` for each message.
Requirements: 13.1, 13.6, 13.7, 16.1, 16.6
"""
# --- Setup logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
stream=sys.stdout,
)
logger.info("Signal engine starting up")
# --- Load shared app config for connection details ---
app_config = load_app_config()
# --- Connect to PostgreSQL ---
pool = await asyncpg.create_pool(
dsn=app_config.postgres.dsn,
min_size=2,
max_size=10,
)
logger.info("Connected to PostgreSQL at %s", app_config.postgres.host)
# --- Connect to Redis ---
redis_client = redis.asyncio.from_url(
app_config.redis.url,
decode_responses=True,
)
logger.info("Connected to Redis at %s", app_config.redis.host)
# --- Load signal engine config ---
try:
config = await load_signal_config(pool)
except Exception:
logger.warning(
"Failed to load signal engine config at startup — "
"defaulting to disabled (fail-safe)",
exc_info=True,
)
from services.signal_engine.config import SignalEngineConfig
config = SignalEngineConfig() # dual_pipeline_enabled=False
logger.info(
"Signal engine config: dual_pipeline_enabled=%s, "
"heuristic=%s, probabilistic=%s, shadow_mode=%s, "
"polling_interval=%ds",
config.dual_pipeline_enabled,
config.heuristic_pipeline_enabled,
config.probabilistic_pipeline_enabled,
config.shadow_mode,
config.polling_interval_seconds,
)
# --- Queue key ---
signal_queue = queue_key(QUEUE_SIGNAL_ENGINE)
logger.info("Polling queue: %s", signal_queue)
# --- Main loop ---
try:
while True:
# Check if dual pipeline is enabled
if not config.dual_pipeline_enabled:
logger.debug(
"Dual pipeline disabled — sleeping %ds before retry",
config.polling_interval_seconds,
)
await asyncio.sleep(config.polling_interval_seconds)
# Reload config to pick up flag changes
try:
config = await load_signal_config(pool)
except Exception:
logger.warning(
"Failed to reload signal engine config — "
"keeping disabled (fail-safe)",
exc_info=True,
)
continue
# BLPOP: blocking pop from the signal engine queue
try:
result = await redis_client.blpop(
signal_queue,
timeout=_BLPOP_TIMEOUT,
)
except Exception:
logger.warning(
"Redis BLPOP failed — sleeping before retry",
exc_info=True,
)
await asyncio.sleep(5)
continue
if result is None:
# Timeout — no message, loop back
continue
# result is (queue_name, message)
_, raw_message = result
# Parse the queue message
try:
message = json.loads(raw_message)
ticker = message["ticker"]
except (json.JSONDecodeError, KeyError, TypeError):
logger.warning(
"Invalid queue message — skipping: %s",
raw_message,
)
continue
logger.info("Processing evaluation tick for %s", ticker)
# Run the evaluation tick
try:
await evaluate_tick(pool, redis_client, ticker, config)
except Exception:
logger.error(
"Unhandled error in evaluate_tick for %s",
ticker,
exc_info=True,
)
except KeyboardInterrupt:
logger.info("Signal engine shutting down (KeyboardInterrupt)")
finally:
await pool.close()
await redis_client.aclose()
logger.info("Signal engine shut down")
if __name__ == "__main__":
asyncio.run(main())