"""Aggregation worker - company-level rolling window trend summaries. Queries document intelligence and market context for a given ticker, computes weighted signal scores, and produces TrendSummary objects persisted to the trend_windows table. Requirements: 6.1, 6.2, 6.5 """ from __future__ import annotations import json import logging import time from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any import asyncpg from services.aggregation.contradiction import CatalystEntry, detect_contradictions from services.aggregation.evidence import ( EvidenceRankConfig, RankedEvidence, rank_evidence_detailed, ) from services.aggregation.evidence import ( rank_evidence as _rank_evidence_composite, ) from services.aggregation.market_context import fetch_market_context from services.aggregation.scoring import ( ScoringConfig, WeightedSignal, compute_signal_weight, sentiment_to_numeric, weighted_sentiment_average, ) from services.shared.metrics import ( AGGREGATION_CONTRADICTION_SCORE, AGGREGATION_DURATION, AGGREGATION_SIGNALS_PROCESSED, AGGREGATION_WINDOWS_COMPUTED, ) from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow logger = logging.getLogger(__name__) # Map TrendWindow values to lookback durations. WINDOW_DURATIONS: dict[str, timedelta] = { TrendWindow.INTRADAY.value: timedelta(hours=12), TrendWindow.ONE_DAY.value: timedelta(days=1), TrendWindow.SEVEN_DAY.value: timedelta(days=7), TrendWindow.THIRTY_DAY.value: timedelta(days=30), TrendWindow.NINETY_DAY.value: timedelta(days=90), } # How many evidence document IDs to keep in supporting/opposing lists. MAX_EVIDENCE_REFS = 10 @dataclass class AggregationConfig: """Controls which windows to compute and scoring parameters.""" windows: list[str] | None = None # None = all windows scoring: ScoringConfig | None = None max_evidence: int = MAX_EVIDENCE_REFS def effective_windows(self) -> list[str]: if self.windows: return self.windows return [w.value for w in TrendWindow] def effective_scoring(self) -> ScoringConfig: return self.scoring or ScoringConfig() # --------------------------------------------------------------------------- # Fetch impact records for a ticker within a time window # --------------------------------------------------------------------------- _IMPACT_QUERY = """ SELECT di.document_id, di.confidence, di.novelty_score, di.source_credibility, dir.sentiment, dir.impact_score, dir.catalyst_type, dir.key_facts, dir.risks, d.published_at FROM document_impact_records dir JOIN document_intelligence di ON di.id = dir.intelligence_id JOIN documents d ON d.id = di.document_id WHERE dir.ticker = $1 AND d.published_at >= $2 AND d.published_at <= $3 AND di.validation_status = 'valid' AND d.status != 'rejected' ORDER BY d.published_at DESC """ @dataclass class ImpactRow: """Parsed row from the impact query.""" document_id: str confidence: float novelty_score: float source_credibility: float sentiment: str impact_score: float catalyst_type: str key_facts: list[str] risks: list[str] published_at: datetime def _parse_impact_row(row: Any) -> ImpactRow: """Convert an asyncpg Record to an ImpactRow.""" key_facts = row["key_facts"] if isinstance(key_facts, str): key_facts = json.loads(key_facts) risks = row["risks"] if isinstance(risks, str): risks = json.loads(risks) return ImpactRow( document_id=str(row["document_id"]), confidence=float(row["confidence"] or 0.5), novelty_score=float(row["novelty_score"] or 0.5), source_credibility=float(row["source_credibility"] or 0.5), sentiment=row["sentiment"] or "neutral", impact_score=float(row["impact_score"] or 0.0), catalyst_type=row["catalyst_type"] or "other", key_facts=key_facts if isinstance(key_facts, list) else [], risks=risks if isinstance(risks, list) else [], published_at=row["published_at"], ) async def fetch_impact_records( pool: asyncpg.Pool, ticker: str, window_start: datetime, window_end: datetime, ) -> list[ImpactRow]: """Fetch validated document impact records for a ticker in a time range.""" rows = await pool.fetch(_IMPACT_QUERY, ticker, window_start, window_end) return [_parse_impact_row(r) for r in rows] # --------------------------------------------------------------------------- # Build weighted signals from impact records # --------------------------------------------------------------------------- def build_weighted_signals( impacts: list[ImpactRow], reference_time: datetime, window: str, market_ctx: Any | None = None, config: ScoringConfig | None = None, ) -> list[WeightedSignal]: """Convert impact records into WeightedSignal objects using the scoring module.""" cfg = config or ScoringConfig() signals: list[WeightedSignal] = [] for imp in impacts: sw = compute_signal_weight( published_at=imp.published_at, reference_time=reference_time, window=window, source_credibility=imp.source_credibility, novelty_score=imp.novelty_score, extraction_confidence=imp.confidence, market_ctx=market_ctx, config=cfg, ) signals.append( WeightedSignal( document_id=imp.document_id, weight=sw, sentiment_value=sentiment_to_numeric(imp.sentiment), impact_score=imp.impact_score, ) ) return signals # --------------------------------------------------------------------------- # Derive trend direction from weighted sentiment # --------------------------------------------------------------------------- # Thresholds for mapping numeric sentiment to direction. BULLISH_THRESHOLD = 0.15 BEARISH_THRESHOLD = -0.15 MIXED_THRESHOLD = 0.10 # contradiction score above this → mixed def derive_trend_direction( avg_sentiment: float, contradiction_score: float = 0.0, ) -> TrendDirection: """Map a weighted average sentiment to a TrendDirection. If contradiction is high, the direction is MIXED regardless of the average sentiment value. """ if contradiction_score > MIXED_THRESHOLD and abs(avg_sentiment) < 0.3: return TrendDirection.MIXED if avg_sentiment >= BULLISH_THRESHOLD: return TrendDirection.BULLISH if avg_sentiment <= BEARISH_THRESHOLD: return TrendDirection.BEARISH return TrendDirection.NEUTRAL # --------------------------------------------------------------------------- # Compute contradiction score # --------------------------------------------------------------------------- def compute_contradiction_score(signals: list[WeightedSignal]) -> float: """Measure how much disagreement exists among weighted signals. Returns a value in [0, 1] where 0 means full agreement and 1 means equal-weight positive and negative signals. The formula computes the ratio of the minority-side total weight to the majority-side total weight. """ if not signals: return 0.0 pos_weight = 0.0 neg_weight = 0.0 for sig in signals: w = sig.weight.combined * sig.impact_score if sig.sentiment_value > 0: pos_weight += w elif sig.sentiment_value < 0: neg_weight += w total = pos_weight + neg_weight if total == 0.0: return 0.0 minority = min(pos_weight, neg_weight) return round(minority / total, 4) # --------------------------------------------------------------------------- # Rank evidence (supporting vs opposing) # --------------------------------------------------------------------------- def rank_evidence( signals: list[WeightedSignal], max_refs: int = MAX_EVIDENCE_REFS, ) -> tuple[list[str], list[str]]: """Return top supporting and opposing document IDs ranked by composite score. Delegates to the evidence ranking module which considers multiple factors (weight, impact, recency, confidence) rather than raw weight alone. Supporting = positive sentiment, Opposing = negative sentiment. Neutral/mixed signals are excluded from evidence lists. """ config = EvidenceRankConfig(max_refs=max_refs) return _rank_evidence_composite(signals, config) # --------------------------------------------------------------------------- # Extract dominant catalysts and material risks # --------------------------------------------------------------------------- def extract_catalysts_and_risks( impacts: list[ImpactRow], signals: list[WeightedSignal], ) -> tuple[list[str], list[str]]: """Return dominant catalyst types and material risks weighted by signal strength. Catalysts are ranked by cumulative weight. Risks are deduplicated and ordered by the weight of the signal that surfaced them. """ catalyst_weights: dict[str, float] = {} risk_entries: list[tuple[float, str]] = [] # Build a lookup from document_id to combined weight weight_by_doc = {s.document_id: s.weight.combined * s.impact_score for s in signals} for imp in impacts: w = weight_by_doc.get(imp.document_id, 0.0) if w <= 0.0: continue catalyst_weights[imp.catalyst_type] = catalyst_weights.get(imp.catalyst_type, 0.0) + w for risk in imp.risks: risk_entries.append((w, risk)) # Top catalysts by cumulative weight sorted_catalysts = sorted(catalyst_weights.items(), key=lambda x: x[1], reverse=True) catalysts = [cat for cat, _ in sorted_catalysts[:5]] # Deduplicated risks ordered by weight seen_risks: set[str] = set() risks: list[str] = [] risk_entries.sort(key=lambda x: x[0], reverse=True) for _, risk_text in risk_entries: normalized = risk_text.strip().lower() if normalized not in seen_risks: seen_risks.add(normalized) risks.append(risk_text.strip()) if len(risks) >= 5: break return catalysts, risks # --------------------------------------------------------------------------- # Compute trend confidence # --------------------------------------------------------------------------- def compute_trend_confidence( signals: list[WeightedSignal], contradiction_score: float, ) -> float: """Derive an overall confidence for the trend summary. Confidence is based on: - Number of contributing signals (more = higher base) - Average extraction confidence of contributing signals - Contradiction penalty (high contradiction lowers confidence) Returns a value in [0, 1]. """ if not signals: return 0.0 active = [s for s in signals if s.weight.combined > 0] if not active: return 0.0 # Base confidence from signal count (diminishing returns) count_factor = min(len(active) / 20.0, 1.0) # Average extraction confidence (from the confidence_gate — if gated, # the signal wouldn't be in active list, so we use the raw confidence # from the weight breakdown). avg_conf = sum(s.weight.credibility for s in active) / len(active) # Contradiction penalty contradiction_penalty = contradiction_score * 0.4 confidence = (0.4 * count_factor + 0.6 * avg_conf) - contradiction_penalty return round(max(0.0, min(1.0, confidence)), 4) # --------------------------------------------------------------------------- # Assemble a TrendSummary from components # --------------------------------------------------------------------------- @dataclass class AssembledTrend: """A trend summary paired with its detailed evidence rankings.""" summary: TrendSummary supporting_evidence: list[RankedEvidence] opposing_evidence: list[RankedEvidence] def assemble_trend_summary( ticker: str, window: str, signals: list[WeightedSignal], impacts: list[ImpactRow], market_ctx: Any | None = None, max_evidence: int = MAX_EVIDENCE_REFS, reference_time: datetime | None = None, ) -> TrendSummary: """Build a complete TrendSummary from weighted signals and impact records.""" result = assemble_trend_with_evidence( ticker, window, signals, impacts, market_ctx, max_evidence, reference_time, ) return result.summary def assemble_trend_with_evidence( ticker: str, window: str, signals: list[WeightedSignal], impacts: list[ImpactRow], market_ctx: Any | None = None, max_evidence: int = MAX_EVIDENCE_REFS, reference_time: datetime | None = None, ) -> AssembledTrend: """Build a TrendSummary and return detailed evidence rankings for persistence.""" if reference_time is None: reference_time = datetime.now(timezone.utc) avg_sentiment = weighted_sentiment_average(signals) # Run full contradiction detection (Requirement 6.4) catalyst_entries = [ CatalystEntry(document_id=imp.document_id, catalyst_type=imp.catalyst_type) for imp in impacts ] contradiction_result = detect_contradictions(signals, catalyst_entries) contradiction = contradiction_result.score direction = derive_trend_direction(avg_sentiment, contradiction) confidence = compute_trend_confidence(signals, contradiction) # Get detailed evidence rankings for persistence config = EvidenceRankConfig(max_refs=max_evidence) supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, config) supporting = [r.document_id for r in supporting_ranked] opposing = [r.document_id for r in opposing_ranked] catalysts, risks = extract_catalysts_and_risks(impacts, signals) # Trend strength: absolute value of weighted sentiment, clamped to [0, 1] strength = round(min(abs(avg_sentiment), 1.0), 4) summary = TrendSummary( entity_type="company", entity_id=ticker, window=TrendWindow(window), trend_direction=direction, trend_strength=strength, confidence=confidence, top_supporting_evidence=supporting, top_opposing_evidence=opposing, dominant_catalysts=catalysts, material_risks=risks, contradiction_score=contradiction, disagreement_details=contradiction_result.details, market_context=market_ctx, generated_at=reference_time, ) return AssembledTrend( summary=summary, supporting_evidence=supporting_ranked, opposing_evidence=opposing_ranked, ) # --------------------------------------------------------------------------- # Persist trend summary to PostgreSQL # --------------------------------------------------------------------------- _UPSERT_TREND = """ INSERT INTO trend_windows ( entity_type, entity_id, window, trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, disagreement_details, market_context, generated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10::jsonb, $11, $12::jsonb, $13::jsonb, $14 ) RETURNING id """ async def persist_trend_summary( pool: asyncpg.Pool, summary: TrendSummary, ) -> str: """Insert a trend summary row and return its UUID.""" row = await pool.fetchrow( _UPSERT_TREND, summary.entity_type, summary.entity_id, summary.window.value, summary.trend_direction.value, summary.trend_strength, summary.confidence, json.dumps(summary.top_supporting_evidence), json.dumps(summary.top_opposing_evidence), json.dumps(summary.dominant_catalysts), json.dumps(summary.material_risks), summary.contradiction_score, json.dumps([d.model_dump() for d in summary.disagreement_details]), json.dumps(summary.market_context.model_dump() if summary.market_context else {}, default=str), summary.generated_at, ) return str(row["id"]) # --------------------------------------------------------------------------- # Persist evidence mappings to trend_evidence table # --------------------------------------------------------------------------- _INSERT_EVIDENCE = """ INSERT INTO trend_evidence ( trend_window_id, document_id, evidence_type, rank_score, weight_component, impact_component, recency_component, confidence_component, sentiment_value ) VALUES ( $1, $2::uuid, $3, $4, $5, $6, $7, $8, $9 ) """ async def persist_trend_evidence( pool: asyncpg.Pool, trend_window_id: str, supporting: list[RankedEvidence], opposing: list[RankedEvidence], ) -> int: """Insert evidence mapping rows for a trend window. Returns count inserted.""" rows: list[tuple[str, str, str, float, float, float, float, float, float]] = [] for ev in supporting: rows.append(( trend_window_id, ev.document_id, "supporting", ev.rank_score, ev.weight_component, ev.impact_component, ev.recency_component, ev.confidence_component, ev.sentiment_value, )) for ev in opposing: rows.append(( trend_window_id, ev.document_id, "opposing", ev.rank_score, ev.weight_component, ev.impact_component, ev.recency_component, ev.confidence_component, ev.sentiment_value, )) if not rows: return 0 await pool.executemany(_INSERT_EVIDENCE, rows) return len(rows) # --------------------------------------------------------------------------- # Main aggregation entry point for a single ticker + window # --------------------------------------------------------------------------- async def aggregate_company_window( pool: asyncpg.Pool, ticker: str, window: str, reference_time: datetime | None = None, config: AggregationConfig | None = None, ) -> TrendSummary: """Compute and persist a trend summary for one ticker and one window. Steps: 1. Determine the time range for the window. 2. Fetch document impact records from PostgreSQL. 3. Fetch market context for the ticker. 4. Build weighted signals using the scoring module. 5. Assemble the TrendSummary. 6. Persist to trend_windows table. Returns the assembled TrendSummary. """ cfg = config or AggregationConfig() scoring_cfg = cfg.effective_scoring() if reference_time is None: reference_time = datetime.now(timezone.utc) _agg_start = time.monotonic() duration = WINDOW_DURATIONS.get(window, timedelta(days=7)) window_start = reference_time - duration # 1. Fetch impact records impacts = await fetch_impact_records(pool, ticker, window_start, reference_time) # 2. Fetch market context market_ctx = await fetch_market_context(pool, ticker, window, reference_time) # 3. Build weighted signals signals = build_weighted_signals( impacts, reference_time, window, market_ctx, scoring_cfg, ) # 4. Assemble trend summary with evidence details assembled = assemble_trend_with_evidence( ticker=ticker, window=window, signals=signals, impacts=impacts, market_ctx=market_ctx if market_ctx.has_data else None, max_evidence=cfg.max_evidence, reference_time=reference_time, ) summary = assembled.summary # 5. Persist trend window trend_id = await persist_trend_summary(pool, summary) # 6. Persist evidence mappings evidence_count = await persist_trend_evidence( pool, trend_id, assembled.supporting_evidence, assembled.opposing_evidence, ) logger.info( "Persisted trend %s for %s/%s: direction=%s strength=%.3f confidence=%.3f signals=%d evidence=%d", trend_id, ticker, window, summary.trend_direction.value, summary.trend_strength, summary.confidence, len(signals), evidence_count, ) # Prometheus metrics AGGREGATION_WINDOWS_COMPUTED.labels(window=window).inc() AGGREGATION_SIGNALS_PROCESSED.labels(window=window).inc(len(signals)) AGGREGATION_CONTRADICTION_SCORE.observe(summary.contradiction_score) AGGREGATION_DURATION.labels(window=window).observe(time.monotonic() - _agg_start) return summary # --------------------------------------------------------------------------- # Aggregate all windows for a single ticker # --------------------------------------------------------------------------- async def aggregate_company( pool: asyncpg.Pool, ticker: str, reference_time: datetime | None = None, config: AggregationConfig | None = None, ) -> list[TrendSummary]: """Compute trend summaries for all configured windows for a ticker.""" cfg = config or AggregationConfig() if reference_time is None: reference_time = datetime.now(timezone.utc) summaries: list[TrendSummary] = [] for window in cfg.effective_windows(): summary = await aggregate_company_window( pool, ticker, window, reference_time, cfg, ) summaries.append(summary) return summaries