5efccb1e03
Backend: assemble_trend_with_evidence now deduplicates document IDs via dict.fromkeys() (the rollup code already did this, but the base assembly didn't — same doc could appear multiple times from different intelligence extractions). Frontend: Trends.tsx deduplicates via Set before rendering as a safety net for existing data already stored with duplicates.
1137 lines
38 KiB
Python
1137 lines
38 KiB
Python
"""Aggregation worker - company-level rolling window trend summaries.
|
|
|
|
Queries document intelligence and market context for a given ticker,
|
|
computes weighted signal scores, and produces TrendSummary objects
|
|
persisted to the trend_windows table.
|
|
|
|
Requirements: 6.1, 6.2, 6.5
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
import time
|
|
import uuid as _uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
from services.aggregation.contradiction import CatalystEntry, detect_contradictions
|
|
from services.aggregation.evidence import (
|
|
EvidenceRankConfig,
|
|
RankedEvidence,
|
|
rank_evidence_detailed,
|
|
)
|
|
from services.aggregation.evidence import (
|
|
rank_evidence as _rank_evidence_composite,
|
|
)
|
|
from services.aggregation.market_context import fetch_market_context
|
|
from services.aggregation.pattern_matcher import find_self_patterns
|
|
from services.aggregation.projection import (
|
|
MacroEventInfo,
|
|
compute_projection,
|
|
persist_trend_projection,
|
|
)
|
|
from services.aggregation.scoring import (
|
|
ScoringConfig,
|
|
WeightedSignal,
|
|
compute_signal_weight,
|
|
sentiment_to_numeric,
|
|
weighted_sentiment_average,
|
|
)
|
|
from services.aggregation.signal_propagation import (
|
|
CompetitiveSignalRecord,
|
|
build_pattern_weighted_signals,
|
|
)
|
|
from services.shared.metrics import (
|
|
AGGREGATION_CONTRADICTION_SCORE,
|
|
AGGREGATION_DURATION,
|
|
AGGREGATION_SIGNALS_PROCESSED,
|
|
AGGREGATION_WINDOWS_COMPUTED,
|
|
)
|
|
from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Map TrendWindow values to lookback durations.
|
|
WINDOW_DURATIONS: dict[str, timedelta] = {
|
|
TrendWindow.INTRADAY.value: timedelta(hours=12),
|
|
TrendWindow.ONE_DAY.value: timedelta(days=1),
|
|
TrendWindow.SEVEN_DAY.value: timedelta(days=7),
|
|
TrendWindow.THIRTY_DAY.value: timedelta(days=30),
|
|
TrendWindow.NINETY_DAY.value: timedelta(days=90),
|
|
}
|
|
|
|
# How many evidence document IDs to keep in supporting/opposing lists.
|
|
MAX_EVIDENCE_REFS = 10
|
|
|
|
|
|
@dataclass
|
|
class AggregationConfig:
|
|
"""Controls which windows to compute and scoring parameters."""
|
|
|
|
windows: list[str] | None = None # None = all windows
|
|
scoring: ScoringConfig | None = None
|
|
max_evidence: int = MAX_EVIDENCE_REFS
|
|
macro_signal_weight: float = 0.3 # relative weight of macro vs company signals
|
|
macro_enabled: bool = True # runtime toggle state
|
|
competitive_signal_weight: float = 0.2 # relative weight of pattern signals
|
|
competitive_enabled: bool = True # runtime toggle state
|
|
|
|
def effective_windows(self) -> list[str]:
|
|
if self.windows:
|
|
return self.windows
|
|
return [w.value for w in TrendWindow]
|
|
|
|
def effective_scoring(self) -> ScoringConfig:
|
|
return self.scoring or ScoringConfig()
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch impact records for a ticker within a time window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IMPACT_QUERY = """
|
|
SELECT
|
|
di.document_id,
|
|
di.confidence,
|
|
di.novelty_score,
|
|
di.source_credibility,
|
|
dir.sentiment,
|
|
dir.impact_score,
|
|
dir.catalyst_type,
|
|
dir.key_facts,
|
|
dir.risks,
|
|
d.published_at
|
|
FROM document_impact_records dir
|
|
JOIN document_intelligence di ON di.id = dir.intelligence_id
|
|
JOIN documents d ON d.id = di.document_id
|
|
WHERE dir.ticker = $1
|
|
AND d.published_at >= $2
|
|
AND d.published_at <= $3
|
|
AND di.validation_status = 'valid'
|
|
AND d.status != 'rejected'
|
|
ORDER BY d.published_at DESC
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class ImpactRow:
|
|
"""Parsed row from the impact query."""
|
|
|
|
document_id: str
|
|
confidence: float
|
|
novelty_score: float
|
|
source_credibility: float
|
|
sentiment: str
|
|
impact_score: float
|
|
catalyst_type: str
|
|
key_facts: list[str]
|
|
risks: list[str]
|
|
published_at: datetime
|
|
|
|
|
|
def _parse_impact_row(row: Any) -> ImpactRow:
|
|
"""Convert an asyncpg Record to an ImpactRow."""
|
|
key_facts = row["key_facts"]
|
|
if isinstance(key_facts, str):
|
|
key_facts = json.loads(key_facts)
|
|
risks = row["risks"]
|
|
if isinstance(risks, str):
|
|
risks = json.loads(risks)
|
|
|
|
return ImpactRow(
|
|
document_id=str(row["document_id"]),
|
|
confidence=float(row["confidence"] or 0.5),
|
|
novelty_score=float(row["novelty_score"] or 0.5),
|
|
source_credibility=float(row["source_credibility"] or 0.5),
|
|
sentiment=row["sentiment"] or "neutral",
|
|
impact_score=float(row["impact_score"] or 0.0),
|
|
catalyst_type=row["catalyst_type"] or "other",
|
|
key_facts=key_facts if isinstance(key_facts, list) else [],
|
|
risks=risks if isinstance(risks, list) else [],
|
|
published_at=row["published_at"],
|
|
)
|
|
|
|
|
|
async def fetch_impact_records(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window_start: datetime,
|
|
window_end: datetime,
|
|
) -> list[ImpactRow]:
|
|
"""Fetch validated document impact records for a ticker in a time range."""
|
|
rows = await pool.fetch(_IMPACT_QUERY, ticker, window_start, window_end)
|
|
return [_parse_impact_row(r) for r in rows]
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch macro toggle state from risk_configs
|
|
#
|
|
# MACRO LAYER TOGGLE BEHAVIOR (Requirements 11.2, 11.3, 11.4):
|
|
# - The toggle state is read fresh from PostgreSQL at the start of each
|
|
# aggregation cycle (no caching), so changes take effect immediately on
|
|
# the next cycle.
|
|
# - When disabled: ingestion and classification continue normally (historical
|
|
# data is preserved), but interpolation and aggregation integration are
|
|
# skipped — the aggregation engine produces trends using only company-
|
|
# specific signals.
|
|
# - When re-enabled: the engine resumes computing macro impact scores using
|
|
# the most recent GlobalEvent classifications, including any events that
|
|
# were ingested and classified while the layer was disabled.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MACRO_TOGGLE_QUERY = """
|
|
SELECT config->>'macro_enabled' AS macro_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def fetch_macro_enabled(pool: asyncpg.Pool) -> bool | None:
|
|
"""Check macro toggle state from risk_configs table.
|
|
|
|
Returns True/False if explicitly set, or None if no config exists
|
|
(caller should fall back to AggregationConfig default).
|
|
"""
|
|
row = await pool.fetchrow(_MACRO_TOGGLE_QUERY)
|
|
if row is None or row["macro_enabled"] is None:
|
|
return None
|
|
return row["macro_enabled"].lower() == "true"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch competitive toggle state from risk_configs
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_COMPETITIVE_TOGGLE_QUERY = """
|
|
SELECT config->>'competitive_enabled' AS competitive_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def fetch_competitive_enabled(pool: asyncpg.Pool) -> bool | None:
|
|
"""Check competitive toggle state from risk_configs table.
|
|
|
|
Returns True/False if explicitly set, or None if no config exists
|
|
(caller should fall back to AggregationConfig default).
|
|
"""
|
|
row = await pool.fetchrow(_COMPETITIVE_TOGGLE_QUERY)
|
|
if row is None or row["competitive_enabled"] is None:
|
|
return None
|
|
return row["competitive_enabled"].lower() == "true"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch competitive signals targeting a ticker within a time window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_COMPETITIVE_SIGNALS_QUERY = """
|
|
SELECT source_document_id, source_ticker, target_ticker, catalyst_type,
|
|
pattern_confidence, signal_direction, signal_strength,
|
|
relationship_strength, computed_at
|
|
FROM competitive_signal_records
|
|
WHERE target_ticker = $1
|
|
AND computed_at >= $2
|
|
AND computed_at <= $3
|
|
ORDER BY computed_at DESC
|
|
LIMIT 500
|
|
"""
|
|
|
|
|
|
async def fetch_competitive_signals(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window_start: datetime,
|
|
window_end: datetime,
|
|
) -> list[CompetitiveSignalRecord]:
|
|
"""Fetch competitive signal records targeting a ticker in a time range."""
|
|
rows = await pool.fetch(
|
|
_COMPETITIVE_SIGNALS_QUERY, ticker, window_start, window_end,
|
|
)
|
|
return [
|
|
CompetitiveSignalRecord(
|
|
source_document_id=str(row["source_document_id"]),
|
|
source_ticker=row["source_ticker"],
|
|
target_ticker=row["target_ticker"],
|
|
catalyst_type=row["catalyst_type"],
|
|
pattern_confidence=float(row["pattern_confidence"]),
|
|
signal_direction=row["signal_direction"],
|
|
signal_strength=float(row["signal_strength"]),
|
|
relationship_strength=float(row["relationship_strength"]),
|
|
computed_at=row["computed_at"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch macro impact records for a ticker within a time window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MACRO_IMPACT_QUERY = """
|
|
SELECT
|
|
mir.event_id,
|
|
mir.company_id,
|
|
mir.ticker,
|
|
mir.macro_impact_score,
|
|
mir.impact_direction,
|
|
mir.contributing_factors,
|
|
mir.confidence,
|
|
mir.computed_at,
|
|
ge.source_document_id,
|
|
d.published_at AS event_published_at
|
|
FROM macro_impact_records mir
|
|
JOIN global_events ge ON ge.id = mir.event_id
|
|
JOIN documents d ON d.id = ge.source_document_id
|
|
WHERE mir.ticker = $1
|
|
AND mir.computed_at >= $2
|
|
AND mir.computed_at <= $3
|
|
ORDER BY mir.computed_at DESC
|
|
"""
|
|
|
|
|
|
@dataclass
|
|
class MacroImpactRow:
|
|
"""Parsed row from the macro impact query."""
|
|
|
|
event_id: str
|
|
company_id: str
|
|
ticker: str
|
|
macro_impact_score: float
|
|
impact_direction: str
|
|
contributing_factors: list[str]
|
|
confidence: float
|
|
computed_at: datetime
|
|
source_document_id: str
|
|
event_published_at: datetime
|
|
|
|
|
|
def _parse_macro_impact_row(row: Any) -> MacroImpactRow:
|
|
"""Convert an asyncpg Record to a MacroImpactRow."""
|
|
factors = row["contributing_factors"]
|
|
if isinstance(factors, str):
|
|
factors = json.loads(factors)
|
|
|
|
return MacroImpactRow(
|
|
event_id=str(row["event_id"]),
|
|
company_id=str(row["company_id"]),
|
|
ticker=row["ticker"],
|
|
macro_impact_score=float(row["macro_impact_score"] or 0.0),
|
|
impact_direction=row["impact_direction"] or "neutral",
|
|
contributing_factors=factors if isinstance(factors, list) else [],
|
|
confidence=float(row["confidence"] or 0.5),
|
|
computed_at=row["computed_at"],
|
|
source_document_id=str(row["source_document_id"]),
|
|
event_published_at=row["event_published_at"],
|
|
)
|
|
|
|
|
|
async def fetch_macro_impact_records(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window_start: datetime,
|
|
window_end: datetime,
|
|
) -> list[MacroImpactRow]:
|
|
"""Fetch macro impact records for a ticker in a time range."""
|
|
rows = await pool.fetch(_MACRO_IMPACT_QUERY, ticker, window_start, window_end)
|
|
return [_parse_macro_impact_row(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Convert macro impact records to WeightedSignals
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DIRECTION_TO_SENTIMENT: dict[str, float] = {
|
|
"positive": 1.0,
|
|
"negative": -1.0,
|
|
"mixed": 0.0,
|
|
"neutral": 0.0,
|
|
}
|
|
|
|
|
|
def build_macro_weighted_signals(
|
|
macro_impacts: list[MacroImpactRow],
|
|
reference_time: datetime,
|
|
window: str,
|
|
macro_signal_weight: float = 0.3,
|
|
config: ScoringConfig | None = None,
|
|
) -> list[WeightedSignal]:
|
|
"""Convert macro impact records into WeightedSignal objects.
|
|
|
|
Uses the same scoring pipeline as company signals:
|
|
- document_id = source_document_id (for evidence tracing)
|
|
- sentiment_value mapped from impact_direction
|
|
- impact_score = macro_impact_score * macro_signal_weight
|
|
- recency decay from the global event's publication time
|
|
- confidence gating from the macro record's confidence
|
|
"""
|
|
cfg = config or ScoringConfig()
|
|
signals: list[WeightedSignal] = []
|
|
for mir in macro_impacts:
|
|
sw = compute_signal_weight(
|
|
published_at=mir.event_published_at,
|
|
reference_time=reference_time,
|
|
window=window,
|
|
source_credibility=mir.confidence,
|
|
novelty_score=0.5,
|
|
extraction_confidence=mir.confidence,
|
|
config=cfg,
|
|
)
|
|
sentiment = _DIRECTION_TO_SENTIMENT.get(mir.impact_direction, 0.0)
|
|
impact = mir.macro_impact_score * macro_signal_weight
|
|
signals.append(
|
|
WeightedSignal(
|
|
document_id=mir.source_document_id,
|
|
weight=sw,
|
|
sentiment_value=sentiment,
|
|
impact_score=impact,
|
|
)
|
|
)
|
|
return signals
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build weighted signals from impact records
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_weighted_signals(
|
|
impacts: list[ImpactRow],
|
|
reference_time: datetime,
|
|
window: str,
|
|
market_ctx: Any | None = None,
|
|
config: ScoringConfig | None = None,
|
|
) -> list[WeightedSignal]:
|
|
"""Convert impact records into WeightedSignal objects using the scoring module."""
|
|
cfg = config or ScoringConfig()
|
|
signals: list[WeightedSignal] = []
|
|
for imp in impacts:
|
|
sw = compute_signal_weight(
|
|
published_at=imp.published_at,
|
|
reference_time=reference_time,
|
|
window=window,
|
|
source_credibility=imp.source_credibility,
|
|
novelty_score=imp.novelty_score,
|
|
extraction_confidence=imp.confidence,
|
|
market_ctx=market_ctx,
|
|
config=cfg,
|
|
)
|
|
signals.append(
|
|
WeightedSignal(
|
|
document_id=imp.document_id,
|
|
weight=sw,
|
|
sentiment_value=sentiment_to_numeric(imp.sentiment),
|
|
impact_score=imp.impact_score,
|
|
)
|
|
)
|
|
return signals
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Derive trend direction from weighted sentiment
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Thresholds for mapping numeric sentiment to direction.
|
|
BULLISH_THRESHOLD = 0.15
|
|
BEARISH_THRESHOLD = -0.15
|
|
MIXED_THRESHOLD = 0.10 # contradiction score above this → mixed
|
|
|
|
|
|
def derive_trend_direction(
|
|
avg_sentiment: float,
|
|
contradiction_score: float = 0.0,
|
|
) -> TrendDirection:
|
|
"""Map a weighted average sentiment to a TrendDirection.
|
|
|
|
If contradiction is high, the direction is MIXED regardless of
|
|
the average sentiment value.
|
|
"""
|
|
if contradiction_score > MIXED_THRESHOLD and abs(avg_sentiment) < 0.3:
|
|
return TrendDirection.MIXED
|
|
if avg_sentiment >= BULLISH_THRESHOLD:
|
|
return TrendDirection.BULLISH
|
|
if avg_sentiment <= BEARISH_THRESHOLD:
|
|
return TrendDirection.BEARISH
|
|
return TrendDirection.NEUTRAL
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Compute contradiction score
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def compute_contradiction_score(signals: list[WeightedSignal]) -> float:
|
|
"""Measure how much disagreement exists among weighted signals.
|
|
|
|
Returns a value in [0, 1] where 0 means full agreement and 1 means
|
|
equal-weight positive and negative signals.
|
|
|
|
The formula computes the ratio of the minority-side total weight to
|
|
the majority-side total weight.
|
|
"""
|
|
if not signals:
|
|
return 0.0
|
|
|
|
pos_weight = 0.0
|
|
neg_weight = 0.0
|
|
for sig in signals:
|
|
w = sig.weight.combined * sig.impact_score
|
|
if sig.sentiment_value > 0:
|
|
pos_weight += w
|
|
elif sig.sentiment_value < 0:
|
|
neg_weight += w
|
|
|
|
total = pos_weight + neg_weight
|
|
if total == 0.0:
|
|
return 0.0
|
|
|
|
minority = min(pos_weight, neg_weight)
|
|
return round(minority / total, 4)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rank evidence (supporting vs opposing)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def rank_evidence(
|
|
signals: list[WeightedSignal],
|
|
max_refs: int = MAX_EVIDENCE_REFS,
|
|
) -> tuple[list[str], list[str]]:
|
|
"""Return top supporting and opposing document IDs ranked by composite score.
|
|
|
|
Delegates to the evidence ranking module which considers multiple
|
|
factors (weight, impact, recency, confidence) rather than raw weight alone.
|
|
|
|
Supporting = positive sentiment, Opposing = negative sentiment.
|
|
Neutral/mixed signals are excluded from evidence lists.
|
|
"""
|
|
config = EvidenceRankConfig(max_refs=max_refs)
|
|
return _rank_evidence_composite(signals, config)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Extract dominant catalysts and material risks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def extract_catalysts_and_risks(
|
|
impacts: list[ImpactRow],
|
|
signals: list[WeightedSignal],
|
|
) -> tuple[list[str], list[str]]:
|
|
"""Return dominant catalyst types and material risks weighted by signal strength.
|
|
|
|
Catalysts are ranked by cumulative weight. Risks are deduplicated and
|
|
ordered by the weight of the signal that surfaced them.
|
|
"""
|
|
catalyst_weights: dict[str, float] = {}
|
|
risk_entries: list[tuple[float, str]] = []
|
|
|
|
# Build a lookup from document_id to combined weight
|
|
weight_by_doc = {s.document_id: s.weight.combined * s.impact_score for s in signals}
|
|
|
|
for imp in impacts:
|
|
w = weight_by_doc.get(imp.document_id, 0.0)
|
|
if w <= 0.0:
|
|
continue
|
|
catalyst_weights[imp.catalyst_type] = catalyst_weights.get(imp.catalyst_type, 0.0) + w
|
|
for risk in imp.risks:
|
|
risk_entries.append((w, risk))
|
|
|
|
# Top catalysts by cumulative weight
|
|
sorted_catalysts = sorted(catalyst_weights.items(), key=lambda x: x[1], reverse=True)
|
|
catalysts = [cat for cat, _ in sorted_catalysts[:5]]
|
|
|
|
# Deduplicated risks ordered by weight
|
|
seen_risks: set[str] = set()
|
|
risks: list[str] = []
|
|
risk_entries.sort(key=lambda x: x[0], reverse=True)
|
|
for _, risk_text in risk_entries:
|
|
normalized = risk_text.strip().lower()
|
|
if normalized not in seen_risks:
|
|
seen_risks.add(normalized)
|
|
risks.append(risk_text.strip())
|
|
if len(risks) >= 5:
|
|
break
|
|
|
|
return catalysts, risks
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Compute trend confidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def compute_trend_confidence(
|
|
signals: list[WeightedSignal],
|
|
contradiction_score: float,
|
|
) -> float:
|
|
"""Derive an overall confidence for the trend summary.
|
|
|
|
Confidence is based on:
|
|
- Number of UNIQUE source documents (not raw signal count)
|
|
- Average extraction confidence of contributing signals
|
|
- Signal agreement (what fraction point the same direction),
|
|
dampened by sample size so that 1-2 signals agreeing doesn't
|
|
inflate confidence the same way 10+ signals agreeing does
|
|
- Contradiction penalty (high contradiction lowers confidence)
|
|
|
|
Returns a value in [0, 1].
|
|
"""
|
|
if not signals:
|
|
return 0.0
|
|
|
|
active = [s for s in signals if s.weight.combined > 0]
|
|
if not active:
|
|
return 0.0
|
|
|
|
# Count unique source documents — competitive signals from the same doc
|
|
# shouldn't inflate confidence
|
|
unique_sources = len({s.document_id for s in active if s.document_id})
|
|
count_factor = min(unique_sources / 15.0, 0.8) # Cap at 0.8, needs 15 unique docs
|
|
|
|
# Average extraction confidence
|
|
avg_conf = sum(s.weight.credibility for s in active) / len(active)
|
|
|
|
# Signal agreement: what fraction of signals agree on direction
|
|
bullish = sum(1 for s in active if s.sentiment_value > 0)
|
|
bearish = sum(1 for s in active if s.sentiment_value < 0)
|
|
total = bullish + bearish
|
|
if total > 0:
|
|
agreement = max(bullish, bearish) / total
|
|
else:
|
|
agreement = 0.5
|
|
|
|
# Dampen agreement by sample size: 1-2 signals agreeing is far less
|
|
# meaningful than 7+ signals agreeing. Uses log2(n+1)/log2(8) so the
|
|
# dampener saturates at 1.0 around n=7 unique sources.
|
|
agreement_dampener = min(1.0, math.log2(unique_sources + 1) / math.log2(8))
|
|
agreement *= agreement_dampener
|
|
|
|
# Contradiction penalty
|
|
contradiction_penalty = contradiction_score * 0.4
|
|
|
|
confidence = (0.3 * count_factor + 0.3 * avg_conf + 0.4 * agreement) - contradiction_penalty
|
|
return round(max(0.0, min(1.0, confidence)), 4)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Assemble a TrendSummary from components
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class AssembledTrend:
|
|
"""A trend summary paired with its detailed evidence rankings."""
|
|
|
|
summary: TrendSummary
|
|
supporting_evidence: list[RankedEvidence]
|
|
opposing_evidence: list[RankedEvidence]
|
|
|
|
|
|
def assemble_trend_summary(
|
|
ticker: str,
|
|
window: str,
|
|
signals: list[WeightedSignal],
|
|
impacts: list[ImpactRow],
|
|
market_ctx: Any | None = None,
|
|
max_evidence: int = MAX_EVIDENCE_REFS,
|
|
reference_time: datetime | None = None,
|
|
) -> TrendSummary:
|
|
"""Build a complete TrendSummary from weighted signals and impact records."""
|
|
result = assemble_trend_with_evidence(
|
|
ticker, window, signals, impacts, market_ctx, max_evidence, reference_time,
|
|
)
|
|
return result.summary
|
|
|
|
|
|
def assemble_trend_with_evidence(
|
|
ticker: str,
|
|
window: str,
|
|
signals: list[WeightedSignal],
|
|
impacts: list[ImpactRow],
|
|
market_ctx: Any | None = None,
|
|
max_evidence: int = MAX_EVIDENCE_REFS,
|
|
reference_time: datetime | None = None,
|
|
) -> AssembledTrend:
|
|
"""Build a TrendSummary and return detailed evidence rankings for persistence."""
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
avg_sentiment = weighted_sentiment_average(signals)
|
|
|
|
# Run full contradiction detection (Requirement 6.4)
|
|
catalyst_entries = [
|
|
CatalystEntry(document_id=imp.document_id, catalyst_type=imp.catalyst_type)
|
|
for imp in impacts
|
|
]
|
|
contradiction_result = detect_contradictions(signals, catalyst_entries)
|
|
contradiction = contradiction_result.score
|
|
|
|
direction = derive_trend_direction(avg_sentiment, contradiction)
|
|
confidence = compute_trend_confidence(signals, contradiction)
|
|
|
|
# Get detailed evidence rankings for persistence
|
|
config = EvidenceRankConfig(max_refs=max_evidence)
|
|
supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, config)
|
|
|
|
supporting = list(dict.fromkeys(r.document_id for r in supporting_ranked))
|
|
opposing = list(dict.fromkeys(r.document_id for r in opposing_ranked))
|
|
|
|
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
|
|
|
|
# Trend strength: absolute value of weighted sentiment, clamped to [0, 1]
|
|
strength = round(min(abs(avg_sentiment), 1.0), 4)
|
|
|
|
summary = TrendSummary(
|
|
entity_type="company",
|
|
entity_id=ticker,
|
|
window=TrendWindow(window),
|
|
trend_direction=direction,
|
|
trend_strength=strength,
|
|
confidence=confidence,
|
|
top_supporting_evidence=supporting,
|
|
top_opposing_evidence=opposing,
|
|
dominant_catalysts=catalysts,
|
|
material_risks=risks,
|
|
contradiction_score=contradiction,
|
|
disagreement_details=contradiction_result.details,
|
|
market_context=market_ctx,
|
|
generated_at=reference_time,
|
|
)
|
|
|
|
return AssembledTrend(
|
|
summary=summary,
|
|
supporting_evidence=supporting_ranked,
|
|
opposing_evidence=opposing_ranked,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persist trend summary to PostgreSQL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_UPSERT_TREND = """
|
|
INSERT INTO trend_windows (
|
|
entity_type, entity_id, "window", trend_direction, trend_strength,
|
|
confidence, top_supporting_evidence, top_opposing_evidence,
|
|
dominant_catalysts, material_risks, contradiction_score,
|
|
disagreement_details, market_context, generated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7::jsonb, $8::jsonb,
|
|
$9::jsonb, $10::jsonb, $11,
|
|
$12::jsonb, $13::jsonb, $14
|
|
)
|
|
ON CONFLICT (entity_type, entity_id, "window") DO UPDATE SET
|
|
trend_direction = EXCLUDED.trend_direction,
|
|
trend_strength = EXCLUDED.trend_strength,
|
|
confidence = EXCLUDED.confidence,
|
|
top_supporting_evidence = EXCLUDED.top_supporting_evidence,
|
|
top_opposing_evidence = EXCLUDED.top_opposing_evidence,
|
|
dominant_catalysts = EXCLUDED.dominant_catalysts,
|
|
material_risks = EXCLUDED.material_risks,
|
|
contradiction_score = EXCLUDED.contradiction_score,
|
|
disagreement_details = EXCLUDED.disagreement_details,
|
|
market_context = EXCLUDED.market_context,
|
|
generated_at = EXCLUDED.generated_at
|
|
RETURNING id
|
|
"""
|
|
|
|
|
|
_INSERT_TREND_HISTORY = """
|
|
INSERT INTO trend_history (
|
|
entity_type, entity_id, "window", trend_direction,
|
|
trend_strength, confidence, contradiction_score,
|
|
dominant_catalysts, material_risks, generated_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb, $10)
|
|
"""
|
|
|
|
|
|
async def persist_trend_summary(
|
|
pool: asyncpg.Pool,
|
|
summary: TrendSummary,
|
|
) -> str:
|
|
"""Insert a trend summary row and return its UUID.
|
|
|
|
Also appends a snapshot to trend_history for time-series charting.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
_UPSERT_TREND,
|
|
summary.entity_type,
|
|
summary.entity_id,
|
|
summary.window.value,
|
|
summary.trend_direction.value,
|
|
summary.trend_strength,
|
|
summary.confidence,
|
|
json.dumps(summary.top_supporting_evidence),
|
|
json.dumps(summary.top_opposing_evidence),
|
|
json.dumps(summary.dominant_catalysts),
|
|
json.dumps(summary.material_risks),
|
|
summary.contradiction_score,
|
|
json.dumps([d.model_dump() for d in summary.disagreement_details]),
|
|
json.dumps(summary.market_context.model_dump() if summary.market_context else {}, default=str),
|
|
summary.generated_at,
|
|
)
|
|
trend_id = str(row["id"])
|
|
|
|
# Append to trend_history for time-series charts
|
|
try:
|
|
await pool.execute(
|
|
_INSERT_TREND_HISTORY,
|
|
summary.entity_type,
|
|
summary.entity_id,
|
|
summary.window.value,
|
|
summary.trend_direction.value,
|
|
summary.trend_strength,
|
|
summary.confidence,
|
|
summary.contradiction_score,
|
|
json.dumps(summary.dominant_catalysts),
|
|
json.dumps(summary.material_risks),
|
|
summary.generated_at,
|
|
)
|
|
except Exception:
|
|
# Don't fail the main upsert if history insert fails (table may not exist yet)
|
|
logger.debug("Could not insert trend history for %s/%s", summary.entity_id, summary.window.value, exc_info=True)
|
|
|
|
return trend_id
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persist evidence mappings to trend_evidence table
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_INSERT_EVIDENCE = """
|
|
INSERT INTO trend_evidence (
|
|
trend_window_id, document_id, evidence_type,
|
|
rank_score, weight_component, impact_component,
|
|
recency_component, confidence_component, sentiment_value
|
|
) VALUES (
|
|
$1, $2::uuid, $3,
|
|
$4, $5, $6,
|
|
$7, $8, $9
|
|
)
|
|
"""
|
|
|
|
|
|
def _is_valid_uuid(val: str) -> bool:
|
|
"""Check if a string is a valid UUID (pattern signal IDs are not)."""
|
|
try:
|
|
_uuid.UUID(val)
|
|
return True
|
|
except (ValueError, AttributeError):
|
|
return False
|
|
|
|
|
|
async def persist_trend_evidence(
|
|
pool: asyncpg.Pool,
|
|
trend_window_id: str,
|
|
supporting: list[RankedEvidence],
|
|
opposing: list[RankedEvidence],
|
|
) -> int:
|
|
"""Insert evidence mapping rows for a trend window. Returns count inserted."""
|
|
rows: list[tuple[str, str, str, float, float, float, float, float, float]] = []
|
|
for ev in supporting:
|
|
# Skip non-UUID document IDs (e.g. pattern signal synthetic IDs)
|
|
if not _is_valid_uuid(ev.document_id):
|
|
continue
|
|
rows.append((
|
|
trend_window_id, ev.document_id, "supporting",
|
|
ev.rank_score, ev.weight_component, ev.impact_component,
|
|
ev.recency_component, ev.confidence_component, ev.sentiment_value,
|
|
))
|
|
for ev in opposing:
|
|
if not _is_valid_uuid(ev.document_id):
|
|
continue
|
|
rows.append((
|
|
trend_window_id, ev.document_id, "opposing",
|
|
ev.rank_score, ev.weight_component, ev.impact_component,
|
|
ev.recency_component, ev.confidence_component, ev.sentiment_value,
|
|
))
|
|
|
|
if not rows:
|
|
return 0
|
|
|
|
await pool.executemany(_INSERT_EVIDENCE, rows)
|
|
return len(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build MacroEventInfo objects for projection computation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MACRO_EVENT_INFO_QUERY = """
|
|
SELECT
|
|
mir.event_id,
|
|
mir.macro_impact_score,
|
|
mir.impact_direction,
|
|
mir.confidence,
|
|
ge.estimated_duration,
|
|
ge.severity,
|
|
d.published_at AS event_published_at
|
|
FROM macro_impact_records mir
|
|
JOIN global_events ge ON ge.id = mir.event_id
|
|
JOIN documents d ON d.id = ge.source_document_id
|
|
WHERE mir.ticker = $1
|
|
AND mir.computed_at >= $2
|
|
AND mir.computed_at <= $3
|
|
ORDER BY mir.computed_at DESC
|
|
"""
|
|
|
|
|
|
async def _build_macro_event_infos(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window_start: datetime,
|
|
reference_time: datetime,
|
|
) -> list[MacroEventInfo]:
|
|
"""Fetch macro impact records and build MacroEventInfo objects for projection."""
|
|
rows = await pool.fetch(
|
|
_MACRO_EVENT_INFO_QUERY, ticker, window_start, reference_time,
|
|
)
|
|
infos: list[MacroEventInfo] = []
|
|
for row in rows:
|
|
published_at = row["event_published_at"]
|
|
age_hours = 0.0
|
|
if published_at:
|
|
age_hours = max(
|
|
(reference_time - published_at).total_seconds() / 3600.0, 0.0,
|
|
)
|
|
infos.append(
|
|
MacroEventInfo(
|
|
event_id=str(row["event_id"]),
|
|
macro_impact_score=float(row["macro_impact_score"] or 0.0),
|
|
impact_direction=row["impact_direction"] or "neutral",
|
|
confidence=float(row["confidence"] or 0.5),
|
|
estimated_duration=row["estimated_duration"] or "short_term",
|
|
severity=row["severity"] or "low",
|
|
event_age_hours=age_hours,
|
|
)
|
|
)
|
|
return infos
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main aggregation entry point for a single ticker + window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def aggregate_company_window(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
reference_time: datetime | None = None,
|
|
config: AggregationConfig | None = None,
|
|
) -> TrendSummary:
|
|
"""Compute and persist a trend summary for one ticker and one window.
|
|
|
|
Steps:
|
|
1. Determine the time range for the window.
|
|
2. Fetch document impact records from PostgreSQL.
|
|
3. Fetch market context for the ticker.
|
|
4. Build weighted signals using the scoring module.
|
|
5. Check macro toggle and fetch/merge macro signals if enabled.
|
|
6. Check competitive toggle and fetch/merge pattern/competitive signals if enabled.
|
|
7. Assemble the TrendSummary.
|
|
8. Persist to trend_windows table.
|
|
|
|
Returns the assembled TrendSummary.
|
|
"""
|
|
cfg = config or AggregationConfig()
|
|
scoring_cfg = cfg.effective_scoring()
|
|
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
_agg_start = time.monotonic()
|
|
duration = WINDOW_DURATIONS.get(window, timedelta(days=7))
|
|
window_start = reference_time - duration
|
|
|
|
# 1. Fetch impact records
|
|
impacts = await fetch_impact_records(pool, ticker, window_start, reference_time)
|
|
|
|
# 2. Fetch market context
|
|
market_ctx = await fetch_market_context(pool, ticker, window, reference_time)
|
|
|
|
# 3. Build weighted signals
|
|
signals = build_weighted_signals(
|
|
impacts, reference_time, window, market_ctx, scoring_cfg,
|
|
)
|
|
|
|
# 4. Check macro toggle and merge macro signals
|
|
# (Requirement 11.2, 11.3, 11.4): Toggle state is read from the DB on
|
|
# every aggregation cycle. When disabled, macro signals are skipped but
|
|
# ingestion/classification continue independently — so when re-enabled,
|
|
# the most recent classifications (including those ingested while disabled)
|
|
# are immediately available for impact computation.
|
|
macro_enabled = cfg.macro_enabled
|
|
db_toggle = await fetch_macro_enabled(pool)
|
|
if db_toggle is not None:
|
|
macro_enabled = db_toggle
|
|
|
|
if macro_enabled:
|
|
macro_impacts = await fetch_macro_impact_records(
|
|
pool, ticker, window_start, reference_time,
|
|
)
|
|
if macro_impacts:
|
|
macro_signals = build_macro_weighted_signals(
|
|
macro_impacts,
|
|
reference_time,
|
|
window,
|
|
macro_signal_weight=cfg.macro_signal_weight,
|
|
config=scoring_cfg,
|
|
)
|
|
signals = signals + macro_signals
|
|
logger.info(
|
|
"Merged %d macro signals for %s/%s",
|
|
len(macro_signals), ticker, window,
|
|
)
|
|
|
|
# 5. Check competitive toggle and merge pattern/competitive signals
|
|
# (Requirements 5.1-5.6): Same toggle pattern as macro layer. When
|
|
# disabled, pattern mining remains queryable but aggregation skips
|
|
# competitive signals — no degradation of existing behavior.
|
|
competitive_enabled = cfg.competitive_enabled
|
|
db_competitive_toggle = await fetch_competitive_enabled(pool)
|
|
if db_competitive_toggle is not None:
|
|
competitive_enabled = db_competitive_toggle
|
|
|
|
if competitive_enabled:
|
|
try:
|
|
# Get unique catalyst types from the impact records
|
|
catalyst_types = {imp.catalyst_type for imp in impacts}
|
|
|
|
# Query self-company historical patterns for each catalyst type
|
|
all_patterns = []
|
|
for cat_type in catalyst_types:
|
|
patterns = await find_self_patterns(pool, ticker, cat_type)
|
|
all_patterns.extend(patterns)
|
|
|
|
# Fetch competitive signals targeting this ticker
|
|
comp_signals = await fetch_competitive_signals(
|
|
pool, ticker, window_start, reference_time,
|
|
)
|
|
|
|
# Convert to WeightedSignal objects
|
|
if all_patterns or comp_signals:
|
|
pattern_weighted = build_pattern_weighted_signals(
|
|
patterns=all_patterns,
|
|
competitive_signals=comp_signals,
|
|
reference_time=reference_time,
|
|
window=window,
|
|
)
|
|
signals = signals + pattern_weighted
|
|
logger.info(
|
|
"Merged %d pattern/competitive signals for %s/%s "
|
|
"(patterns=%d, competitive=%d)",
|
|
len(pattern_weighted), ticker, window,
|
|
len(all_patterns), len(comp_signals),
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to fetch pattern/competitive signals for %s/%s — "
|
|
"continuing with company+macro signals only",
|
|
ticker, window,
|
|
)
|
|
|
|
# 6. Assemble trend summary with evidence details
|
|
assembled = assemble_trend_with_evidence(
|
|
ticker=ticker,
|
|
window=window,
|
|
signals=signals,
|
|
impacts=impacts,
|
|
market_ctx=market_ctx if market_ctx.has_data else None,
|
|
max_evidence=cfg.max_evidence,
|
|
reference_time=reference_time,
|
|
)
|
|
summary = assembled.summary
|
|
|
|
# 7. Persist trend window
|
|
trend_id = await persist_trend_summary(pool, summary)
|
|
|
|
# 8. Persist evidence mappings
|
|
evidence_count = await persist_trend_evidence(
|
|
pool, trend_id,
|
|
assembled.supporting_evidence,
|
|
assembled.opposing_evidence,
|
|
)
|
|
|
|
logger.info(
|
|
"Persisted trend %s for %s/%s: direction=%s strength=%.3f confidence=%.3f signals=%d evidence=%d",
|
|
trend_id, ticker, window, summary.trend_direction.value,
|
|
summary.trend_strength, summary.confidence, len(signals), evidence_count,
|
|
)
|
|
|
|
# 9. Compute and persist trend projection
|
|
try:
|
|
macro_event_infos: list[MacroEventInfo] = []
|
|
if macro_enabled:
|
|
macro_event_infos = await _build_macro_event_infos(
|
|
pool, ticker, window_start, reference_time,
|
|
)
|
|
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_event_infos if macro_event_infos else None,
|
|
macro_enabled=macro_enabled,
|
|
upcoming_catalysts=summary.dominant_catalysts[:3] if summary.dominant_catalysts else None,
|
|
)
|
|
await persist_trend_projection(pool, trend_id, projection)
|
|
logger.info(
|
|
"Persisted projection for %s/%s: direction=%s strength=%.3f confidence=%.3f diverges=%s",
|
|
ticker, window, projection.projected_direction,
|
|
projection.projected_strength, projection.projected_confidence,
|
|
projection.diverges_from_current,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Failed to compute/persist projection for trend %s (%s/%s) — continuing",
|
|
trend_id, ticker, window,
|
|
)
|
|
|
|
# Prometheus metrics
|
|
AGGREGATION_WINDOWS_COMPUTED.labels(window=window).inc()
|
|
AGGREGATION_SIGNALS_PROCESSED.labels(window=window).inc(len(signals))
|
|
AGGREGATION_CONTRADICTION_SCORE.observe(summary.contradiction_score)
|
|
AGGREGATION_DURATION.labels(window=window).observe(time.monotonic() - _agg_start)
|
|
|
|
return summary
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Aggregate all windows for a single ticker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def aggregate_company(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
reference_time: datetime | None = None,
|
|
config: AggregationConfig | None = None,
|
|
) -> list[TrendSummary]:
|
|
"""Compute trend summaries for all configured windows for a ticker."""
|
|
cfg = config or AggregationConfig()
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
summaries: list[TrendSummary] = []
|
|
for window in cfg.effective_windows():
|
|
summary = await aggregate_company_window(
|
|
pool, ticker, window, reference_time, cfg,
|
|
)
|
|
summaries.append(summary)
|
|
|
|
return summaries
|