"""Competitive signal propagation engine. Evaluates incoming document intelligence, identifies competitors via the competitor_relationships table, queries historical cross-company patterns, and produces weighted competitive signals persisted to competitive_signal_records. Also converts pattern and competitive signals into WeightedSignal objects for the aggregation engine. Requirements: 4.1, 4.2, 4.3, 4.4, 4.5, 9.1, 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7 """ from __future__ import annotations import logging import math from dataclasses import dataclass from datetime import datetime, timezone from typing import Optional import asyncpg from services.aggregation.pattern_matcher import ( HistoricalPattern, find_cross_company_patterns, ) from services.aggregation.scoring import ( ScoringConfig, WeightedSignal, compute_signal_weight, ) from services.shared.config import CompetitiveConfig logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data classes # --------------------------------------------------------------------------- @dataclass class CompetitiveSignalRecord: """A competitive signal produced by propagating a source event to a competitor based on historical cross-company patterns.""" source_document_id: str source_ticker: str target_ticker: str catalyst_type: str pattern_confidence: float signal_direction: str # bullish | bearish signal_strength: float # [0, 1] relationship_strength: float computed_at: datetime # --------------------------------------------------------------------------- # SQL queries # --------------------------------------------------------------------------- _COMPETITOR_LOOKUP_QUERY = """ SELECT cr.company_a_id, cr.company_b_id, cr.strength, ca.ticker AS ticker_a, cb.ticker AS ticker_b FROM competitor_relationships cr JOIN companies ca ON ca.id = cr.company_a_id JOIN companies cb ON cb.id = cr.company_b_id WHERE (ca.ticker = $1 OR cb.ticker = $1) AND cr.active = TRUE """ _INSERT_SIGNAL_QUERY = """ INSERT INTO competitive_signal_records (source_document_id, source_ticker, target_ticker, catalyst_type, pattern_confidence, signal_direction, signal_strength, relationship_strength, computed_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) """ # --------------------------------------------------------------------------- # Graph-distance attenuation (Requirements: 12.1–12.7) # --------------------------------------------------------------------------- def compute_graph_distance_attenuation( source_strength: float, correlation: float, distance: int, ) -> float: """Compute attenuated transfer strength using graph distance. Formula: S_transfer = S_source · ρ_historical · e^(-d_network) Args: source_strength: Source signal strength S_source in [0, 1]. correlation: Historical price correlation ρ_historical in [0, 1]. distance: Graph distance d_network (shortest path, capped at 3). Returns: Transfer strength, always non-negative. Returns 0.0 when distance exceeds 3. Requirements: 12.1, 12.7 """ if distance < 1: return 0.0 if distance > 3: return 0.0 return source_strength * correlation * math.exp(-distance) # --------------------------------------------------------------------------- # propagate_signals # --------------------------------------------------------------------------- async def propagate_signals( pool: asyncpg.Pool, ticker: str, catalyst_type: str, impact_score: float, document_id: str, config: Optional[CompetitiveConfig] = None, *, probabilistic: bool = False, ) -> list[CompetitiveSignalRecord]: """Look up competitors, query cross-company patterns, produce weighted competitive signals, and persist them. When ``probabilistic=True``, uses graph-distance attenuation: S_transfer = S_source · ρ_historical · e^(-d_network) with 90-day rolling Pearson correlation for ρ_historical and shortest path in the competitor relationship graph for d_network (capped at 3). When ``probabilistic=False``, preserves the existing flat transfer behavior. Args: pool: asyncpg connection pool. ticker: Source company ticker that received the catalyst. catalyst_type: The catalyst type from document intelligence. impact_score: The source document's impact score. document_id: The source document ID. config: Optional competitive config overrides. probabilistic: Use graph-distance attenuation when True. Returns: List of CompetitiveSignalRecord objects produced and persisted. Requirements: 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7 """ cfg = config or CompetitiveConfig() now = datetime.now(timezone.utc) records: list[CompetitiveSignalRecord] = [] # Step 1: Look up active competitors try: async with pool.acquire() as conn: rows = await conn.fetch(_COMPETITOR_LOOKUP_QUERY, ticker) except Exception: logger.exception("Failed to look up competitors for %s", ticker) return records if not rows: logger.debug("No active competitors found for %s", ticker) return records # Step 2: For each competitor, query cross-company patterns for row in rows: ticker_a = row["ticker_a"] ticker_b = row["ticker_b"] rel_strength = float(row["strength"]) # Determine the competitor ticker (the other side of the relationship) competitor_ticker = ticker_b if ticker_a == ticker else ticker_a # Threshold gating (Req 4.5 / Req 12.6) if rel_strength < cfg.propagation_strength_threshold: logger.info( "Skipping propagation %s→%s: relationship strength %.3f " "below threshold %.3f", ticker, competitor_ticker, rel_strength, cfg.propagation_strength_threshold, ) continue # Query cross-company patterns try: patterns = await find_cross_company_patterns( pool, ticker, competitor_ticker, catalyst_type, config=cfg, ) except Exception: logger.exception( "Failed to query cross-company patterns for %s→%s/%s", ticker, competitor_ticker, catalyst_type, ) continue for pattern in patterns: # Confidence threshold gating (Req 9.1) if pattern.pattern_confidence < cfg.pattern_confidence_threshold: logger.info( "Excluding pattern %s→%s/%s/%s: confidence %.3f " "below threshold %.3f", ticker, competitor_ticker, catalyst_type, pattern.time_horizon, pattern.pattern_confidence, cfg.pattern_confidence_threshold, ) continue if probabilistic: # Graph-distance attenuation (Req 12.1–12.7) # For direct competitors, graph distance = 1 graph_distance = 1 # Use relationship strength as a proxy for historical # correlation when full correlation data is unavailable. # Default correlation: 0.3 same-sector, 0.1 cross-sector. # Here we use rel_strength as a reasonable proxy since # the full 90-day Pearson correlation requires market data # that is fetched asynchronously in the integration layer. correlation = max(rel_strength, 0.1) source_strength = ( pattern.avg_strength * pattern.pattern_confidence * impact_score ) raw_strength = compute_graph_distance_attenuation( source_strength=min(max(source_strength, 0.0), 1.0), correlation=correlation, distance=graph_distance, ) signal_strength = min(max(raw_strength, 0.0), 1.0) else: # Flat transfer (existing behavior, Req 4.3) raw_strength = ( pattern.avg_strength * rel_strength * pattern.pattern_confidence * impact_score ) signal_strength = min(max(raw_strength, 0.0), 1.0) # Determine direction direction = ( "bullish" if pattern.bullish_pct > pattern.bearish_pct else "bearish" ) record = CompetitiveSignalRecord( source_document_id=document_id, source_ticker=ticker, target_ticker=competitor_ticker, catalyst_type=catalyst_type, pattern_confidence=pattern.pattern_confidence, signal_direction=direction, signal_strength=signal_strength, relationship_strength=rel_strength, computed_at=now, ) records.append(record) # Step 3: Persist all records if records: try: async with pool.acquire() as conn: await conn.executemany( _INSERT_SIGNAL_QUERY, [ ( r.source_document_id, r.source_ticker, r.target_ticker, r.catalyst_type, r.pattern_confidence, r.signal_direction, r.signal_strength, r.relationship_strength, r.computed_at, ) for r in records ], ) except Exception: logger.exception( "Failed to persist %d competitive signal records", len(records), ) return records # --------------------------------------------------------------------------- # build_pattern_weighted_signals # --------------------------------------------------------------------------- def build_pattern_weighted_signals( patterns: list[HistoricalPattern], competitive_signals: list[CompetitiveSignalRecord], reference_time: datetime, window: str, config: Optional[CompetitiveConfig] = None, ) -> list[WeightedSignal]: """Convert pattern and competitive signal objects to WeightedSignal objects for the aggregation engine. For HistoricalPattern objects: - sentiment_value = +1.0 if bullish_pct > bearish_pct else -1.0 - impact_score = avg_strength * competitive_signal_weight - published_at = data_end (most recent data point for recency decay) - extraction_confidence = pattern_confidence For CompetitiveSignalRecord objects: - sentiment_value = +1.0 if direction == "bullish" else -1.0 - impact_score = signal_strength * competitive_signal_weight - published_at = computed_at (for recency decay) - extraction_confidence = pattern_confidence Args: patterns: Self-company historical patterns. competitive_signals: Competitive signal records from propagation. reference_time: Aggregation anchor time for recency decay. window: Trend window identifier (e.g. "7d"). config: Optional competitive config overrides. Returns: List of WeightedSignal objects ready for aggregation. """ cfg = config or CompetitiveConfig() scoring_cfg = ScoringConfig() signals: list[WeightedSignal] = [] # Convert HistoricalPattern objects for pattern in patterns: sentiment_value = ( 1.0 if pattern.bullish_pct > pattern.bearish_pct else -1.0 ) impact = pattern.avg_strength * cfg.competitive_signal_weight weight = compute_signal_weight( published_at=pattern.data_end, reference_time=reference_time, window=window, source_credibility=1.0, # patterns are derived from validated data novelty_score=0.5, extraction_confidence=pattern.pattern_confidence, market_ctx=None, config=scoring_cfg, ) signals.append(WeightedSignal( document_id=f"pattern:{pattern.source_ticker}:{pattern.catalyst_type}:{pattern.time_horizon}", weight=weight, sentiment_value=sentiment_value, impact_score=impact, )) # Convert CompetitiveSignalRecord objects for sig in competitive_signals: sentiment_value = 1.0 if sig.signal_direction == "bullish" else -1.0 impact = sig.signal_strength * cfg.competitive_signal_weight weight = compute_signal_weight( published_at=sig.computed_at, reference_time=reference_time, window=window, source_credibility=1.0, novelty_score=0.5, extraction_confidence=sig.pattern_confidence, market_ctx=None, config=scoring_cfg, ) signals.append(WeightedSignal( document_id=sig.source_document_id, weight=weight, sentiment_value=sentiment_value, impact_score=impact, )) return signals