"""Signal engine entry point — asyncio event loop and queue polling. Connects to PostgreSQL and Redis, loads configuration from ``risk_configs``, and polls the ``stonks:queue:signal_engine`` queue indefinitely. Each queue message triggers a full evaluation tick via ``evaluate_tick()``. When ``dual_pipeline_enabled`` is ``False`` the worker sleeps and retries (fail-safe: the existing pipeline continues unchanged). Requirements: 13.1, 13.6, 13.7, 16.1, 16.6 """ from __future__ import annotations import asyncio import json import logging import sys import asyncpg import redis.asyncio from services.shared.config import load_config as load_app_config from services.shared.redis_keys import QUEUE_SIGNAL_ENGINE, queue_key from services.signal_engine.config import load_config as load_signal_config from services.signal_engine.worker import evaluate_tick logger = logging.getLogger(__name__) # BLPOP timeout in seconds — how long to wait for a queue message before # looping back to check the enabled flag. _BLPOP_TIMEOUT = 5 async def main() -> None: """Start the signal engine worker loop. 1. Connect to PostgreSQL (asyncpg pool) using env vars from ``services.shared.config``. 2. Connect to Redis (redis.asyncio) using env vars. 3. Load signal engine config via ``load_config(pool)``. 4. Log active configuration at startup. 5. Poll ``stonks:queue:signal_engine`` queue indefinitely (BLPOP). 6. Check ``dual_pipeline_enabled`` flag; if disabled, sleep and retry. 7. On config read failure, default to disabled (fail-safe). 8. Parse queue message as JSON: ``{"ticker": "AAPL", "triggered_at": "..."}``. 9. Call ``evaluate_tick(pool, redis, ticker, config)`` for each message. Requirements: 13.1, 13.6, 13.7, 16.1, 16.6 """ # --- Setup logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s", stream=sys.stdout, ) logger.info("Signal engine starting up") # --- Load shared app config for connection details --- app_config = load_app_config() # --- Connect to PostgreSQL --- pool = await asyncpg.create_pool( dsn=app_config.postgres.dsn, min_size=2, max_size=10, ) logger.info("Connected to PostgreSQL at %s", app_config.postgres.host) # --- Connect to Redis --- redis_client = redis.asyncio.from_url( app_config.redis.url, decode_responses=True, ) logger.info("Connected to Redis at %s", app_config.redis.host) # --- Load signal engine config --- try: config = await load_signal_config(pool) except Exception: logger.warning( "Failed to load signal engine config at startup — " "defaulting to disabled (fail-safe)", exc_info=True, ) from services.signal_engine.config import SignalEngineConfig config = SignalEngineConfig() # dual_pipeline_enabled=False logger.info( "Signal engine config: dual_pipeline_enabled=%s, " "heuristic=%s, probabilistic=%s, shadow_mode=%s, " "polling_interval=%ds", config.dual_pipeline_enabled, config.heuristic_pipeline_enabled, config.probabilistic_pipeline_enabled, config.shadow_mode, config.polling_interval_seconds, ) # --- Queue key --- signal_queue = queue_key(QUEUE_SIGNAL_ENGINE) logger.info("Polling queue: %s", signal_queue) # --- Main loop --- try: while True: # Check if dual pipeline is enabled if not config.dual_pipeline_enabled: logger.debug( "Dual pipeline disabled — sleeping %ds before retry", config.polling_interval_seconds, ) await asyncio.sleep(config.polling_interval_seconds) # Reload config to pick up flag changes try: config = await load_signal_config(pool) except Exception: logger.warning( "Failed to reload signal engine config — " "keeping disabled (fail-safe)", exc_info=True, ) continue # BLPOP: blocking pop from the signal engine queue try: result = await redis_client.blpop( signal_queue, timeout=_BLPOP_TIMEOUT, ) except Exception: logger.warning( "Redis BLPOP failed — sleeping before retry", exc_info=True, ) await asyncio.sleep(5) continue if result is None: # Timeout — no message, loop back continue # result is (queue_name, message) _, raw_message = result # Parse the queue message try: message = json.loads(raw_message) ticker = message["ticker"] except (json.JSONDecodeError, KeyError, TypeError): logger.warning( "Invalid queue message — skipping: %s", raw_message, ) continue logger.info("Processing evaluation tick for %s", ticker) # Run the evaluation tick try: await evaluate_tick(pool, redis_client, ticker, config) except Exception: logger.error( "Unhandled error in evaluate_tick for %s", ticker, exc_info=True, ) except KeyboardInterrupt: logger.info("Signal engine shutting down (KeyboardInterrupt)") finally: await pool.close() await redis_client.aclose() logger.info("Signal engine shut down") if __name__ == "__main__": asyncio.run(main())