"""Competitive signal propagation engine. Evaluates incoming document intelligence, identifies competitors via the competitor_relationships table, queries historical cross-company patterns, and produces weighted competitive signals persisted to competitive_signal_records. Also converts pattern and competitive signals into WeightedSignal objects for the aggregation engine. Requirements: 4.1, 4.2, 4.3, 4.4, 4.5, 9.1 """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional import asyncpg from services.aggregation.pattern_matcher import ( HistoricalPattern, find_cross_company_patterns, ) from services.aggregation.scoring import ( ScoringConfig, WeightedSignal, compute_signal_weight, ) from services.shared.config import CompetitiveConfig logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class CompetitiveSignalRecord: """A competitive signal produced by propagating a source event to a competitor based on historical cross-company patterns.""" source_document_id: str source_ticker: str target_ticker: str catalyst_type: str pattern_confidence: float signal_direction: str # bullish | bearish signal_strength: float # [0, 1] relationship_strength: float computed_at: datetime # --------------------------------------------------------------------------- # SQL queries # --------------------------------------------------------------------------- _COMPETITOR_LOOKUP_QUERY = """ SELECT cr.company_a_id, cr.company_b_id, cr.strength, ca.ticker AS ticker_a, cb.ticker AS ticker_b FROM competitor_relationships cr JOIN companies ca ON ca.id = cr.company_a_id JOIN companies cb ON cb.id = cr.company_b_id WHERE (ca.ticker = $1 OR cb.ticker = $1) AND cr.active = TRUE """ _INSERT_SIGNAL_QUERY = """ INSERT INTO competitive_signal_records (source_document_id, source_ticker, target_ticker, catalyst_type, pattern_confidence, signal_direction, signal_strength, relationship_strength, computed_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) """ # --------------------------------------------------------------------------- # propagate_signals # --------------------------------------------------------------------------- async def propagate_signals( pool: asyncpg.Pool, ticker: str, catalyst_type: str, impact_score: float, document_id: str, config: Optional[CompetitiveConfig] = None, ) -> list[CompetitiveSignalRecord]: """Look up competitors, query cross-company patterns, produce weighted competitive signals, and persist them. Args: pool: asyncpg connection pool. ticker: Source company ticker that received the catalyst. catalyst_type: The catalyst type from document intelligence. impact_score: The source document's impact score. document_id: The source document ID. config: Optional competitive config overrides. Returns: List of CompetitiveSignalRecord objects produced and persisted. """ cfg = config or CompetitiveConfig() now = datetime.now(timezone.utc) records: list[CompetitiveSignalRecord] = [] # Step 1: Look up active competitors try: async with pool.acquire() as conn: rows = await conn.fetch(_COMPETITOR_LOOKUP_QUERY, ticker) except Exception: logger.exception("Failed to look up competitors for %s", ticker) return records if not rows: logger.debug("No active competitors found for %s", ticker) return records # Step 2: For each competitor, query cross-company patterns for row in rows: ticker_a = row["ticker_a"] ticker_b = row["ticker_b"] rel_strength = float(row["strength"]) # Determine the competitor ticker (the other side of the relationship) competitor_ticker = ticker_b if ticker_a == ticker else ticker_a # Threshold gating (Req 4.5) if rel_strength < cfg.propagation_strength_threshold: logger.info( "Skipping propagation %s→%s: relationship strength %.3f " "below threshold %.3f", ticker, competitor_ticker, rel_strength, cfg.propagation_strength_threshold, ) continue # Query cross-company patterns try: patterns = await find_cross_company_patterns( pool, ticker, competitor_ticker, catalyst_type, config=cfg, ) except Exception: logger.exception( "Failed to query cross-company patterns for %s→%s/%s", ticker, competitor_ticker, catalyst_type, ) continue for pattern in patterns: # Confidence threshold gating (Req 9.1) if pattern.pattern_confidence < cfg.pattern_confidence_threshold: logger.info( "Excluding pattern %s→%s/%s/%s: confidence %.3f " "below threshold %.3f", ticker, competitor_ticker, catalyst_type, pattern.time_horizon, pattern.pattern_confidence, cfg.pattern_confidence_threshold, ) continue # Compute signal strength (Req 4.3) raw_strength = ( pattern.avg_strength * rel_strength * pattern.pattern_confidence * impact_score ) signal_strength = min(max(raw_strength, 0.0), 1.0) # Determine direction direction = ( "bullish" if pattern.bullish_pct > pattern.bearish_pct else "bearish" ) record = CompetitiveSignalRecord( source_document_id=document_id, source_ticker=ticker, target_ticker=competitor_ticker, catalyst_type=catalyst_type, pattern_confidence=pattern.pattern_confidence, signal_direction=direction, signal_strength=signal_strength, relationship_strength=rel_strength, computed_at=now, ) records.append(record) # Step 3: Persist all records if records: try: async with pool.acquire() as conn: await conn.executemany( _INSERT_SIGNAL_QUERY, [ ( r.source_document_id, r.source_ticker, r.target_ticker, r.catalyst_type, r.pattern_confidence, r.signal_direction, r.signal_strength, r.relationship_strength, r.computed_at, ) for r in records ], ) except Exception: logger.exception( "Failed to persist %d competitive signal records", len(records), ) return records # --------------------------------------------------------------------------- # build_pattern_weighted_signals # --------------------------------------------------------------------------- def build_pattern_weighted_signals( patterns: list[HistoricalPattern], competitive_signals: list[CompetitiveSignalRecord], reference_time: datetime, window: str, config: Optional[CompetitiveConfig] = None, ) -> list[WeightedSignal]: """Convert pattern and competitive signal objects to WeightedSignal objects for the aggregation engine. For HistoricalPattern objects: - sentiment_value = +1.0 if bullish_pct > bearish_pct else -1.0 - impact_score = avg_strength * competitive_signal_weight - published_at = data_end (most recent data point for recency decay) - extraction_confidence = pattern_confidence For CompetitiveSignalRecord objects: - sentiment_value = +1.0 if direction == "bullish" else -1.0 - impact_score = signal_strength * competitive_signal_weight - published_at = computed_at (for recency decay) - extraction_confidence = pattern_confidence Args: patterns: Self-company historical patterns. competitive_signals: Competitive signal records from propagation. reference_time: Aggregation anchor time for recency decay. window: Trend window identifier (e.g. "7d"). config: Optional competitive config overrides. Returns: List of WeightedSignal objects ready for aggregation. """ cfg = config or CompetitiveConfig() scoring_cfg = ScoringConfig() signals: list[WeightedSignal] = [] # Convert HistoricalPattern objects for pattern in patterns: sentiment_value = ( 1.0 if pattern.bullish_pct > pattern.bearish_pct else -1.0 ) impact = pattern.avg_strength * cfg.competitive_signal_weight weight = compute_signal_weight( published_at=pattern.data_end, reference_time=reference_time, window=window, source_credibility=1.0, # patterns are derived from validated data novelty_score=0.5, extraction_confidence=pattern.pattern_confidence, market_ctx=None, config=scoring_cfg, ) signals.append(WeightedSignal( document_id=f"pattern:{pattern.source_ticker}:{pattern.catalyst_type}:{pattern.time_horizon}", weight=weight, sentiment_value=sentiment_value, impact_score=impact, )) # Convert CompetitiveSignalRecord objects for sig in competitive_signals: sentiment_value = 1.0 if sig.signal_direction == "bullish" else -1.0 impact = sig.signal_strength * cfg.competitive_signal_weight weight = compute_signal_weight( published_at=sig.computed_at, reference_time=reference_time, window=window, source_credibility=1.0, novelty_score=0.5, extraction_confidence=sig.pattern_confidence, market_ctx=None, config=scoring_cfg, ) signals.append(WeightedSignal( document_id=sig.source_document_id, weight=weight, sentiment_value=sentiment_value, impact_score=impact, )) return signals