310 lines
10 KiB
Python
310 lines
10 KiB
Python
"""Competitive signal propagation engine.
|
|
|
|
Evaluates incoming document intelligence, identifies competitors via
|
|
the competitor_relationships table, queries historical cross-company
|
|
patterns, and produces weighted competitive signals persisted to
|
|
competitive_signal_records.
|
|
|
|
Also converts pattern and competitive signals into WeightedSignal
|
|
objects for the aggregation engine.
|
|
|
|
Requirements: 4.1, 4.2, 4.3, 4.4, 4.5, 9.1
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import asyncpg
|
|
|
|
from services.aggregation.pattern_matcher import (
|
|
HistoricalPattern,
|
|
find_cross_company_patterns,
|
|
)
|
|
from services.aggregation.scoring import (
|
|
ScoringConfig,
|
|
WeightedSignal,
|
|
compute_signal_weight,
|
|
)
|
|
from services.shared.config import CompetitiveConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class CompetitiveSignalRecord:
|
|
"""A competitive signal produced by propagating a source event to a
|
|
competitor based on historical cross-company patterns."""
|
|
|
|
source_document_id: str
|
|
source_ticker: str
|
|
target_ticker: str
|
|
catalyst_type: str
|
|
pattern_confidence: float
|
|
signal_direction: str # bullish | bearish
|
|
signal_strength: float # [0, 1]
|
|
relationship_strength: float
|
|
computed_at: datetime
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQL queries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_COMPETITOR_LOOKUP_QUERY = """
|
|
SELECT cr.company_a_id, cr.company_b_id, cr.strength,
|
|
ca.ticker AS ticker_a, cb.ticker AS ticker_b
|
|
FROM competitor_relationships cr
|
|
JOIN companies ca ON ca.id = cr.company_a_id
|
|
JOIN companies cb ON cb.id = cr.company_b_id
|
|
WHERE (ca.ticker = $1 OR cb.ticker = $1)
|
|
AND cr.active = TRUE
|
|
"""
|
|
|
|
_INSERT_SIGNAL_QUERY = """
|
|
INSERT INTO competitive_signal_records
|
|
(source_document_id, source_ticker, target_ticker, catalyst_type,
|
|
pattern_confidence, signal_direction, signal_strength,
|
|
relationship_strength, computed_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# propagate_signals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def propagate_signals(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
catalyst_type: str,
|
|
impact_score: float,
|
|
document_id: str,
|
|
config: Optional[CompetitiveConfig] = None,
|
|
) -> list[CompetitiveSignalRecord]:
|
|
"""Look up competitors, query cross-company patterns, produce weighted
|
|
competitive signals, and persist them.
|
|
|
|
Args:
|
|
pool: asyncpg connection pool.
|
|
ticker: Source company ticker that received the catalyst.
|
|
catalyst_type: The catalyst type from document intelligence.
|
|
impact_score: The source document's impact score.
|
|
document_id: The source document ID.
|
|
config: Optional competitive config overrides.
|
|
|
|
Returns:
|
|
List of CompetitiveSignalRecord objects produced and persisted.
|
|
"""
|
|
cfg = config or CompetitiveConfig()
|
|
now = datetime.now(timezone.utc)
|
|
records: list[CompetitiveSignalRecord] = []
|
|
|
|
# Step 1: Look up active competitors
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(_COMPETITOR_LOOKUP_QUERY, ticker)
|
|
except Exception:
|
|
logger.exception("Failed to look up competitors for %s", ticker)
|
|
return records
|
|
|
|
if not rows:
|
|
logger.debug("No active competitors found for %s", ticker)
|
|
return records
|
|
|
|
# Step 2: For each competitor, query cross-company patterns
|
|
for row in rows:
|
|
ticker_a = row["ticker_a"]
|
|
ticker_b = row["ticker_b"]
|
|
rel_strength = float(row["strength"])
|
|
|
|
# Determine the competitor ticker (the other side of the relationship)
|
|
competitor_ticker = ticker_b if ticker_a == ticker else ticker_a
|
|
|
|
# Threshold gating (Req 4.5)
|
|
if rel_strength < cfg.propagation_strength_threshold:
|
|
logger.info(
|
|
"Skipping propagation %s→%s: relationship strength %.3f "
|
|
"below threshold %.3f",
|
|
ticker, competitor_ticker, rel_strength,
|
|
cfg.propagation_strength_threshold,
|
|
)
|
|
continue
|
|
|
|
# Query cross-company patterns
|
|
try:
|
|
patterns = await find_cross_company_patterns(
|
|
pool, ticker, competitor_ticker, catalyst_type, config=cfg,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to query cross-company patterns for %s→%s/%s",
|
|
ticker, competitor_ticker, catalyst_type,
|
|
)
|
|
continue
|
|
|
|
for pattern in patterns:
|
|
# Confidence threshold gating (Req 9.1)
|
|
if pattern.pattern_confidence < cfg.pattern_confidence_threshold:
|
|
logger.info(
|
|
"Excluding pattern %s→%s/%s/%s: confidence %.3f "
|
|
"below threshold %.3f",
|
|
ticker, competitor_ticker, catalyst_type,
|
|
pattern.time_horizon, pattern.pattern_confidence,
|
|
cfg.pattern_confidence_threshold,
|
|
)
|
|
continue
|
|
|
|
# Compute signal strength (Req 4.3)
|
|
raw_strength = (
|
|
pattern.avg_strength
|
|
* rel_strength
|
|
* pattern.pattern_confidence
|
|
* impact_score
|
|
)
|
|
signal_strength = min(max(raw_strength, 0.0), 1.0)
|
|
|
|
# Determine direction
|
|
direction = (
|
|
"bullish" if pattern.bullish_pct > pattern.bearish_pct
|
|
else "bearish"
|
|
)
|
|
|
|
record = CompetitiveSignalRecord(
|
|
source_document_id=document_id,
|
|
source_ticker=ticker,
|
|
target_ticker=competitor_ticker,
|
|
catalyst_type=catalyst_type,
|
|
pattern_confidence=pattern.pattern_confidence,
|
|
signal_direction=direction,
|
|
signal_strength=signal_strength,
|
|
relationship_strength=rel_strength,
|
|
computed_at=now,
|
|
)
|
|
records.append(record)
|
|
|
|
# Step 3: Persist all records
|
|
if records:
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
await conn.executemany(
|
|
_INSERT_SIGNAL_QUERY,
|
|
[
|
|
(
|
|
r.source_document_id,
|
|
r.source_ticker,
|
|
r.target_ticker,
|
|
r.catalyst_type,
|
|
r.pattern_confidence,
|
|
r.signal_direction,
|
|
r.signal_strength,
|
|
r.relationship_strength,
|
|
r.computed_at,
|
|
)
|
|
for r in records
|
|
],
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to persist %d competitive signal records", len(records),
|
|
)
|
|
|
|
return records
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# build_pattern_weighted_signals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_pattern_weighted_signals(
|
|
patterns: list[HistoricalPattern],
|
|
competitive_signals: list[CompetitiveSignalRecord],
|
|
reference_time: datetime,
|
|
window: str,
|
|
config: Optional[CompetitiveConfig] = None,
|
|
) -> list[WeightedSignal]:
|
|
"""Convert pattern and competitive signal objects to WeightedSignal
|
|
objects for the aggregation engine.
|
|
|
|
For HistoricalPattern objects:
|
|
- sentiment_value = +1.0 if bullish_pct > bearish_pct else -1.0
|
|
- impact_score = avg_strength * competitive_signal_weight
|
|
- published_at = data_end (most recent data point for recency decay)
|
|
- extraction_confidence = pattern_confidence
|
|
|
|
For CompetitiveSignalRecord objects:
|
|
- sentiment_value = +1.0 if direction == "bullish" else -1.0
|
|
- impact_score = signal_strength * competitive_signal_weight
|
|
- published_at = computed_at (for recency decay)
|
|
- extraction_confidence = pattern_confidence
|
|
|
|
Args:
|
|
patterns: Self-company historical patterns.
|
|
competitive_signals: Competitive signal records from propagation.
|
|
reference_time: Aggregation anchor time for recency decay.
|
|
window: Trend window identifier (e.g. "7d").
|
|
config: Optional competitive config overrides.
|
|
|
|
Returns:
|
|
List of WeightedSignal objects ready for aggregation.
|
|
"""
|
|
cfg = config or CompetitiveConfig()
|
|
scoring_cfg = ScoringConfig()
|
|
signals: list[WeightedSignal] = []
|
|
|
|
# Convert HistoricalPattern objects
|
|
for pattern in patterns:
|
|
sentiment_value = (
|
|
1.0 if pattern.bullish_pct > pattern.bearish_pct else -1.0
|
|
)
|
|
impact = pattern.avg_strength * cfg.competitive_signal_weight
|
|
|
|
weight = compute_signal_weight(
|
|
published_at=pattern.data_end,
|
|
reference_time=reference_time,
|
|
window=window,
|
|
source_credibility=1.0, # patterns are derived from validated data
|
|
novelty_score=0.5,
|
|
extraction_confidence=pattern.pattern_confidence,
|
|
market_ctx=None,
|
|
config=scoring_cfg,
|
|
)
|
|
|
|
signals.append(WeightedSignal(
|
|
document_id=f"pattern:{pattern.source_ticker}:{pattern.catalyst_type}:{pattern.time_horizon}",
|
|
weight=weight,
|
|
sentiment_value=sentiment_value,
|
|
impact_score=impact,
|
|
))
|
|
|
|
# Convert CompetitiveSignalRecord objects
|
|
for sig in competitive_signals:
|
|
sentiment_value = 1.0 if sig.signal_direction == "bullish" else -1.0
|
|
impact = sig.signal_strength * cfg.competitive_signal_weight
|
|
|
|
weight = compute_signal_weight(
|
|
published_at=sig.computed_at,
|
|
reference_time=reference_time,
|
|
window=window,
|
|
source_credibility=1.0,
|
|
novelty_score=0.5,
|
|
extraction_confidence=sig.pattern_confidence,
|
|
market_ctx=None,
|
|
config=scoring_cfg,
|
|
)
|
|
|
|
signals.append(WeightedSignal(
|
|
document_id=sig.source_document_id,
|
|
weight=weight,
|
|
sentiment_value=sentiment_value,
|
|
impact_score=impact,
|
|
))
|
|
|
|
return signals
|