"""Aggregation worker entrypoint - polls Redis for aggregation jobs. After computing trend summaries for a ticker, the worker also triggers competitive signal propagation for the ticker's competitors when the competitive layer is enabled. This ensures that document intelligence for one company produces competitive signals for related companies. Requirements: 4.1, 5.1, 9.4 """ from __future__ import annotations import asyncio import json import logging import asyncpg import redis.asyncio as aioredis from services.aggregation.signal_propagation import propagate_signals from services.aggregation.worker import aggregate_company, fetch_competitive_enabled from services.shared.config import CompetitiveConfig, load_config from services.shared.logging import inject_trace_context, setup_logging from services.shared.redis_keys import ( QUEUE_AGGREGATION, QUEUE_RECOMMENDATION, queue_key, ) logger = logging.getLogger("aggregation_main") # --------------------------------------------------------------------------- # Query to fetch recent document intelligence records for a ticker. # Used to trigger signal propagation after aggregation completes. # --------------------------------------------------------------------------- _RECENT_INTELLIGENCE_QUERY = """ SELECT di.document_id, dir.catalyst_type, dir.impact_score FROM document_impact_records dir JOIN document_intelligence di ON di.id = dir.intelligence_id JOIN documents d ON d.id = di.document_id WHERE dir.ticker = $1 AND di.validation_status = 'valid' AND d.status != 'rejected' ORDER BY d.published_at DESC LIMIT 50 """ # Track consecutive propagation failures for alerting (Requirement 9.4) _propagation_consecutive_failures = 0 async def _trigger_signal_propagation( pool: asyncpg.Pool, ticker: str, competitive_config: CompetitiveConfig, ) -> int: """Trigger competitive signal propagation for a ticker's recent documents. Fetches recent document intelligence records for the ticker and calls propagate_signals for each, producing competitive signals for the ticker's competitors. Returns the total number of competitive signals produced. """ global _propagation_consecutive_failures rows = await pool.fetch(_RECENT_INTELLIGENCE_QUERY, ticker) if not rows: return 0 total_signals = 0 for row in rows: document_id = str(row["document_id"]) catalyst_type = row["catalyst_type"] or "other" impact_score = float(row["impact_score"] or 0.0) if impact_score <= 0.0: continue try: records = await propagate_signals( pool=pool, ticker=ticker, catalyst_type=catalyst_type, impact_score=impact_score, document_id=document_id, config=competitive_config, ) total_signals += len(records) # Reset failure counter on success _propagation_consecutive_failures = 0 except Exception: _propagation_consecutive_failures += 1 logger.exception( "Signal propagation failed for %s doc %s/%s", ticker, document_id, catalyst_type, ) if _propagation_consecutive_failures >= competitive_config.propagation_failure_threshold: logger.critical( "ALERT: Sustained signal propagation failures (%d consecutive). " "Continuing with company-specific + macro signals only. " "Operator action required.", _propagation_consecutive_failures, ) # Stop trying propagation for this ticker after threshold break return total_signals async def main() -> None: config = load_config() setup_logging("aggregation", level=config.log_level, json_output=config.json_logs) pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8) redis_client = aioredis.from_url(config.redis.url) queue = queue_key(QUEUE_AGGREGATION) rec_queue = queue_key(QUEUE_RECOMMENDATION) competitive_config = config.competitive logger.info("Aggregation worker started, polling %s", queue) try: while True: raw = await redis_client.lpop(queue) if raw is None: await asyncio.sleep(1) continue job = json.loads(raw) ticker = job.get("ticker", "") logger.info("Processing aggregation job for %s", ticker) try: summaries = await aggregate_company(pool, ticker) logger.info( "Aggregation complete for %s: %d windows", ticker, len(summaries), ) # Trigger competitive signal propagation after aggregation # (Requirement 4.1): When new document intelligence is # produced for a company, propagate signals to competitors. # Check toggle state from DB (same pattern as macro toggle). competitive_enabled = competitive_config.competitive_enabled db_toggle = await fetch_competitive_enabled(pool) if db_toggle is not None: competitive_enabled = db_toggle if competitive_enabled: try: sig_count = await _trigger_signal_propagation( pool, ticker, competitive_config, ) if sig_count > 0: logger.info( "Propagated %d competitive signals for %s", sig_count, ticker, ) except Exception: logger.exception( "Signal propagation failed for %s — " "continuing with company+macro signals only", ticker, ) # Enqueue recommendation job for each window that produced a trend for summary in summaries: if summary.trend_strength > 0: await redis_client.rpush( rec_queue, json.dumps(inject_trace_context({ "ticker": ticker, "window": summary.window.value, })), ) except Exception: logger.exception("Aggregation failed for %s", ticker) finally: await pool.close() await redis_client.close() if __name__ == "__main__": asyncio.run(main())