7fcc8a6c07
ci/woodpecker/push/test Pipeline failed
ci/woodpecker/push/build-1 unknown status
ci/woodpecker/push/build-3 unknown status
ci/woodpecker/push/build-2 unknown status
ci/woodpecker/push/finalize unknown status
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
- Migration 035: prediction_snapshots, prediction_outcomes, signal_evidence_links, model_metric_snapshots tables + SQL views - Prediction snapshot writer with canonical evidence keys, duplicate detection, contribution scores - Outcome evaluator across 5 horizons (1h, 6h, 1d, 7d, 30d) - Metrics engine: ECE, Brier score, IC, Rank IC, benchmark comparison - Attribution engine: per-source, per-catalyst, per-layer performance - Calibration engine: Bayesian shrinkage source reliability - Quality gate for live trading eligibility with configurable thresholds - 7 new /api/validation/* endpoints - Upgraded OpsModel dashboard with validation tab - Enhanced recommendation display with calibration context - Backtest replay validation mode - 86 Python tests (unit + property-based), 179 frontend tests passing
1692 lines
60 KiB
Python
1692 lines
60 KiB
Python
"""Aggregation worker - company-level rolling window trend summaries.
|
||
|
||
Queries document intelligence and market context for a given ticker,
|
||
computes weighted signal scores, and produces TrendSummary objects
|
||
persisted to the trend_windows table.
|
||
|
||
Requirements: 6.1, 6.2, 6.5
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import math
|
||
import time
|
||
import uuid as _uuid
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
import asyncpg
|
||
|
||
from services.aggregation.bayesian import (
|
||
BayesianPosterior,
|
||
compute_bayesian_posterior,
|
||
)
|
||
from services.aggregation.contradiction import CatalystEntry, detect_contradictions
|
||
from services.aggregation.evidence import (
|
||
EvidenceRankConfig,
|
||
RankedEvidence,
|
||
rank_evidence_detailed,
|
||
)
|
||
from services.aggregation.evidence import (
|
||
rank_evidence as _rank_evidence_composite,
|
||
)
|
||
from services.aggregation.interpolation import integrate_macro_signals
|
||
from services.aggregation.market_context import fetch_market_context
|
||
from services.aggregation.pattern_matcher import find_self_patterns
|
||
from services.aggregation.projection import (
|
||
MacroEventInfo,
|
||
compute_projection,
|
||
persist_trend_projection,
|
||
)
|
||
from services.aggregation.regime import (
|
||
MarketRegime,
|
||
RegimeClassification,
|
||
classify_regime,
|
||
)
|
||
from services.aggregation.scoring import (
|
||
ScoringConfig,
|
||
WeightedSignal,
|
||
compute_signal_weight,
|
||
sentiment_to_numeric,
|
||
weighted_sentiment_average,
|
||
)
|
||
from services.aggregation.signal_propagation import (
|
||
CompetitiveSignalRecord,
|
||
build_pattern_weighted_signals,
|
||
)
|
||
from services.aggregation.source_accuracy import fetch_source_accuracy
|
||
from services.shared.metrics import (
|
||
AGGREGATION_CONTRADICTION_SCORE,
|
||
AGGREGATION_DURATION,
|
||
AGGREGATION_SIGNALS_PROCESSED,
|
||
AGGREGATION_WINDOWS_COMPUTED,
|
||
)
|
||
from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow
|
||
from services.trading.model_quality_gate import QualityGateResult, evaluate_quality_gate
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Map TrendWindow values to lookback durations.
|
||
WINDOW_DURATIONS: dict[str, timedelta] = {
|
||
TrendWindow.INTRADAY.value: timedelta(hours=12),
|
||
TrendWindow.ONE_DAY.value: timedelta(days=1),
|
||
TrendWindow.SEVEN_DAY.value: timedelta(days=7),
|
||
TrendWindow.THIRTY_DAY.value: timedelta(days=30),
|
||
TrendWindow.NINETY_DAY.value: timedelta(days=90),
|
||
}
|
||
|
||
# How many evidence document IDs to keep in supporting/opposing lists.
|
||
MAX_EVIDENCE_REFS = 10
|
||
|
||
|
||
@dataclass
|
||
class AggregationConfig:
|
||
"""Controls which windows to compute and scoring parameters."""
|
||
|
||
windows: list[str] | None = None # None = all windows
|
||
scoring: ScoringConfig | None = None
|
||
max_evidence: int = MAX_EVIDENCE_REFS
|
||
macro_signal_weight: float = 0.3 # relative weight of macro vs company signals
|
||
macro_enabled: bool = True # runtime toggle state
|
||
competitive_signal_weight: float = 0.2 # relative weight of pattern signals
|
||
competitive_enabled: bool = True # runtime toggle state
|
||
probabilistic_scoring_enabled: bool = False # probabilistic pipeline toggle
|
||
|
||
def effective_windows(self) -> list[str]:
|
||
if self.windows:
|
||
return self.windows
|
||
return [w.value for w in TrendWindow]
|
||
|
||
def effective_scoring(self) -> ScoringConfig:
|
||
return self.scoring or ScoringConfig()
|
||
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch impact records for a ticker within a time window
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_IMPACT_QUERY = """
|
||
SELECT
|
||
di.document_id,
|
||
di.confidence,
|
||
di.novelty_score,
|
||
di.source_credibility,
|
||
dir.sentiment,
|
||
dir.impact_score,
|
||
dir.catalyst_type,
|
||
dir.key_facts,
|
||
dir.risks,
|
||
d.published_at
|
||
FROM document_impact_records dir
|
||
JOIN document_intelligence di ON di.id = dir.intelligence_id
|
||
JOIN documents d ON d.id = di.document_id
|
||
WHERE dir.ticker = $1
|
||
AND d.published_at >= $2
|
||
AND d.published_at <= $3
|
||
AND di.validation_status = 'valid'
|
||
AND d.status != 'rejected'
|
||
ORDER BY d.published_at DESC
|
||
"""
|
||
|
||
|
||
@dataclass
|
||
class ImpactRow:
|
||
"""Parsed row from the impact query."""
|
||
|
||
document_id: str
|
||
confidence: float
|
||
novelty_score: float
|
||
source_credibility: float
|
||
sentiment: str
|
||
impact_score: float
|
||
catalyst_type: str
|
||
key_facts: list[str]
|
||
risks: list[str]
|
||
published_at: datetime
|
||
|
||
|
||
def _parse_impact_row(row: Any) -> ImpactRow:
|
||
"""Convert an asyncpg Record to an ImpactRow."""
|
||
key_facts = row["key_facts"]
|
||
if isinstance(key_facts, str):
|
||
key_facts = json.loads(key_facts)
|
||
risks = row["risks"]
|
||
if isinstance(risks, str):
|
||
risks = json.loads(risks)
|
||
|
||
return ImpactRow(
|
||
document_id=str(row["document_id"]),
|
||
confidence=float(row["confidence"] or 0.5),
|
||
novelty_score=float(row["novelty_score"] or 0.5),
|
||
source_credibility=float(row["source_credibility"] or 0.5),
|
||
sentiment=row["sentiment"] or "neutral",
|
||
impact_score=float(row["impact_score"] or 0.0),
|
||
catalyst_type=row["catalyst_type"] or "other",
|
||
key_facts=key_facts if isinstance(key_facts, list) else [],
|
||
risks=risks if isinstance(risks, list) else [],
|
||
published_at=row["published_at"],
|
||
)
|
||
|
||
|
||
async def fetch_impact_records(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> list[ImpactRow]:
|
||
"""Fetch validated document impact records for a ticker in a time range."""
|
||
rows = await pool.fetch(_IMPACT_QUERY, ticker, window_start, window_end)
|
||
return [_parse_impact_row(r) for r in rows]
|
||
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch macro toggle state from risk_configs
|
||
#
|
||
# MACRO LAYER TOGGLE BEHAVIOR (Requirements 11.2, 11.3, 11.4):
|
||
# - The toggle state is read fresh from PostgreSQL at the start of each
|
||
# aggregation cycle (no caching), so changes take effect immediately on
|
||
# the next cycle.
|
||
# - When disabled: ingestion and classification continue normally (historical
|
||
# data is preserved), but interpolation and aggregation integration are
|
||
# skipped — the aggregation engine produces trends using only company-
|
||
# specific signals.
|
||
# - When re-enabled: the engine resumes computing macro impact scores using
|
||
# the most recent GlobalEvent classifications, including any events that
|
||
# were ingested and classified while the layer was disabled.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_MACRO_TOGGLE_QUERY = """
|
||
SELECT config->>'macro_enabled' AS macro_enabled
|
||
FROM risk_configs
|
||
WHERE active = TRUE
|
||
ORDER BY updated_at DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
|
||
async def fetch_macro_enabled(pool: asyncpg.Pool) -> bool | None:
|
||
"""Check macro toggle state from risk_configs table.
|
||
|
||
Returns True/False if explicitly set, or None if no config exists
|
||
(caller should fall back to AggregationConfig default).
|
||
"""
|
||
row = await pool.fetchrow(_MACRO_TOGGLE_QUERY)
|
||
if row is None or row["macro_enabled"] is None:
|
||
return None
|
||
return row["macro_enabled"].lower() == "true"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch competitive toggle state from risk_configs
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_COMPETITIVE_TOGGLE_QUERY = """
|
||
SELECT config->>'competitive_enabled' AS competitive_enabled
|
||
FROM risk_configs
|
||
WHERE active = TRUE
|
||
ORDER BY updated_at DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
|
||
async def fetch_competitive_enabled(pool: asyncpg.Pool) -> bool | None:
|
||
"""Check competitive toggle state from risk_configs table.
|
||
|
||
Returns True/False if explicitly set, or None if no config exists
|
||
(caller should fall back to AggregationConfig default).
|
||
"""
|
||
row = await pool.fetchrow(_COMPETITIVE_TOGGLE_QUERY)
|
||
if row is None or row["competitive_enabled"] is None:
|
||
return None
|
||
return row["competitive_enabled"].lower() == "true"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch probabilistic scoring toggle from risk_configs
|
||
#
|
||
# PROBABILISTIC PIPELINE TOGGLE (Requirements 16.3, 16.4, 16.5, 16.6, 16.7):
|
||
# - Read once per aggregation cycle from the risk_configs table.
|
||
# - When False (default): the heuristic pipeline is used — identical outputs
|
||
# to the current system.
|
||
# - When True: the new Bayesian, regime-aware, and adaptive formulas are
|
||
# used for all pipeline stages.
|
||
# - Defaults to False when the key is missing, the value is invalid, or the
|
||
# database is unreachable (fail-safe to heuristic mode).
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_PROBABILISTIC_TOGGLE_QUERY = """
|
||
SELECT config->>'probabilistic_scoring_enabled' AS probabilistic_scoring_enabled
|
||
FROM risk_configs
|
||
WHERE active = TRUE
|
||
ORDER BY updated_at DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
|
||
async def fetch_probabilistic_scoring_enabled(pool: asyncpg.Pool) -> bool:
|
||
"""Check probabilistic scoring toggle from risk_configs table.
|
||
|
||
Returns True when explicitly enabled, False in all other cases
|
||
(missing key, invalid value, no config row, DB error).
|
||
This is fail-safe: any failure defaults to the heuristic pipeline.
|
||
|
||
Requirements: 16.3, 16.6
|
||
"""
|
||
try:
|
||
row = await pool.fetchrow(_PROBABILISTIC_TOGGLE_QUERY)
|
||
if row is None or row["probabilistic_scoring_enabled"] is None:
|
||
return False
|
||
raw = row["probabilistic_scoring_enabled"]
|
||
if not isinstance(raw, str) or raw.lower() not in ("true", "false"):
|
||
logger.warning(
|
||
"Invalid probabilistic_scoring_enabled value %r in "
|
||
"risk_configs; defaulting to heuristic pipeline",
|
||
raw,
|
||
)
|
||
return False
|
||
return raw.lower() == "true"
|
||
except Exception:
|
||
logger.warning(
|
||
"Failed to read probabilistic_scoring_enabled from risk_configs; "
|
||
"defaulting to heuristic pipeline",
|
||
exc_info=True,
|
||
)
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch competitive signals targeting a ticker within a time window
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_COMPETITIVE_SIGNALS_QUERY = """
|
||
SELECT source_document_id, source_ticker, target_ticker, catalyst_type,
|
||
pattern_confidence, signal_direction, signal_strength,
|
||
relationship_strength, computed_at
|
||
FROM competitive_signal_records
|
||
WHERE target_ticker = $1
|
||
AND computed_at >= $2
|
||
AND computed_at <= $3
|
||
ORDER BY computed_at DESC
|
||
LIMIT 500
|
||
"""
|
||
|
||
|
||
async def fetch_competitive_signals(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> list[CompetitiveSignalRecord]:
|
||
"""Fetch competitive signal records targeting a ticker in a time range."""
|
||
rows = await pool.fetch(
|
||
_COMPETITIVE_SIGNALS_QUERY, ticker, window_start, window_end,
|
||
)
|
||
return [
|
||
CompetitiveSignalRecord(
|
||
source_document_id=str(row["source_document_id"]),
|
||
source_ticker=row["source_ticker"],
|
||
target_ticker=row["target_ticker"],
|
||
catalyst_type=row["catalyst_type"],
|
||
pattern_confidence=float(row["pattern_confidence"]),
|
||
signal_direction=row["signal_direction"],
|
||
signal_strength=float(row["signal_strength"]),
|
||
relationship_strength=float(row["relationship_strength"]),
|
||
computed_at=row["computed_at"],
|
||
)
|
||
for row in rows
|
||
]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Fetch macro impact records for a ticker within a time window
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_MACRO_IMPACT_QUERY = """
|
||
SELECT
|
||
mir.event_id,
|
||
mir.company_id,
|
||
mir.ticker,
|
||
mir.macro_impact_score,
|
||
mir.impact_direction,
|
||
mir.contributing_factors,
|
||
mir.confidence,
|
||
mir.computed_at,
|
||
ge.source_document_id,
|
||
d.published_at AS event_published_at
|
||
FROM macro_impact_records mir
|
||
JOIN global_events ge ON ge.id = mir.event_id
|
||
JOIN documents d ON d.id = ge.source_document_id
|
||
WHERE mir.ticker = $1
|
||
AND mir.computed_at >= $2
|
||
AND mir.computed_at <= $3
|
||
ORDER BY mir.computed_at DESC
|
||
"""
|
||
|
||
|
||
@dataclass
|
||
class MacroImpactRow:
|
||
"""Parsed row from the macro impact query."""
|
||
|
||
event_id: str
|
||
company_id: str
|
||
ticker: str
|
||
macro_impact_score: float
|
||
impact_direction: str
|
||
contributing_factors: list[str]
|
||
confidence: float
|
||
computed_at: datetime
|
||
source_document_id: str
|
||
event_published_at: datetime
|
||
|
||
|
||
def _parse_macro_impact_row(row: Any) -> MacroImpactRow:
|
||
"""Convert an asyncpg Record to a MacroImpactRow."""
|
||
factors = row["contributing_factors"]
|
||
if isinstance(factors, str):
|
||
factors = json.loads(factors)
|
||
|
||
return MacroImpactRow(
|
||
event_id=str(row["event_id"]),
|
||
company_id=str(row["company_id"]),
|
||
ticker=row["ticker"],
|
||
macro_impact_score=float(row["macro_impact_score"] or 0.0),
|
||
impact_direction=row["impact_direction"] or "neutral",
|
||
contributing_factors=factors if isinstance(factors, list) else [],
|
||
confidence=float(row["confidence"] or 0.5),
|
||
computed_at=row["computed_at"],
|
||
source_document_id=str(row["source_document_id"]),
|
||
event_published_at=row["event_published_at"],
|
||
)
|
||
|
||
|
||
async def fetch_macro_impact_records(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
window_start: datetime,
|
||
window_end: datetime,
|
||
) -> list[MacroImpactRow]:
|
||
"""Fetch macro impact records for a ticker in a time range."""
|
||
rows = await pool.fetch(_MACRO_IMPACT_QUERY, ticker, window_start, window_end)
|
||
return [_parse_macro_impact_row(r) for r in rows]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Convert macro impact records to WeightedSignals
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_DIRECTION_TO_SENTIMENT: dict[str, float] = {
|
||
"positive": 1.0,
|
||
"negative": -1.0,
|
||
"mixed": 0.0,
|
||
"neutral": 0.0,
|
||
}
|
||
|
||
|
||
def build_macro_weighted_signals(
|
||
macro_impacts: list[MacroImpactRow],
|
||
reference_time: datetime,
|
||
window: str,
|
||
macro_signal_weight: float = 0.3,
|
||
config: ScoringConfig | None = None,
|
||
*,
|
||
returns: list[float] | None = None,
|
||
volumes: list[float] | None = None,
|
||
) -> list[WeightedSignal]:
|
||
"""Convert macro impact records into WeightedSignal objects.
|
||
|
||
Uses the same scoring pipeline as company signals:
|
||
- document_id = source_document_id (for evidence tracing)
|
||
- sentiment_value mapped from impact_direction
|
||
- impact_score = macro_impact_score * macro_signal_weight
|
||
- recency decay from the global event's publication time
|
||
- confidence gating from the macro record's confidence
|
||
|
||
When ``config.probabilistic`` is True, passes returns/volumes for
|
||
regime multiplier computation.
|
||
"""
|
||
cfg = config or ScoringConfig()
|
||
signals: list[WeightedSignal] = []
|
||
for mir in macro_impacts:
|
||
sw = compute_signal_weight(
|
||
published_at=mir.event_published_at,
|
||
reference_time=reference_time,
|
||
window=window,
|
||
source_credibility=mir.confidence,
|
||
novelty_score=0.5,
|
||
extraction_confidence=mir.confidence,
|
||
config=cfg,
|
||
returns=returns,
|
||
volumes=volumes,
|
||
)
|
||
sentiment = _DIRECTION_TO_SENTIMENT.get(mir.impact_direction, 0.0)
|
||
impact = mir.macro_impact_score * macro_signal_weight
|
||
signals.append(
|
||
WeightedSignal(
|
||
document_id=mir.source_document_id,
|
||
weight=sw,
|
||
sentiment_value=sentiment,
|
||
impact_score=impact,
|
||
)
|
||
)
|
||
return signals
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Build weighted signals from impact records
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def build_weighted_signals(
|
||
impacts: list[ImpactRow],
|
||
reference_time: datetime,
|
||
window: str,
|
||
market_ctx: Any | None = None,
|
||
config: ScoringConfig | None = None,
|
||
*,
|
||
source_accuracy_map: dict[str, float] | None = None,
|
||
returns: list[float] | None = None,
|
||
volumes: list[float] | None = None,
|
||
) -> list[WeightedSignal]:
|
||
"""Convert impact records into WeightedSignal objects using the scoring module.
|
||
|
||
When ``config.probabilistic`` is True, passes source accuracy factors,
|
||
event types, and market data (returns/volumes) to the scoring pipeline
|
||
for regime multiplier and adaptive decay computation.
|
||
"""
|
||
cfg = config or ScoringConfig()
|
||
accuracy_map = source_accuracy_map or {}
|
||
signals: list[WeightedSignal] = []
|
||
for imp in impacts:
|
||
# Look up source accuracy factor for this document's source
|
||
saf = accuracy_map.get(imp.document_id, 1.0)
|
||
|
||
sw = compute_signal_weight(
|
||
published_at=imp.published_at,
|
||
reference_time=reference_time,
|
||
window=window,
|
||
source_credibility=imp.source_credibility,
|
||
novelty_score=imp.novelty_score,
|
||
extraction_confidence=imp.confidence,
|
||
market_ctx=market_ctx,
|
||
config=cfg,
|
||
event_type=imp.catalyst_type if cfg.probabilistic else None,
|
||
impact_score=imp.impact_score,
|
||
source_accuracy_factor=saf,
|
||
returns=returns,
|
||
volumes=volumes,
|
||
)
|
||
signals.append(
|
||
WeightedSignal(
|
||
document_id=imp.document_id,
|
||
weight=sw,
|
||
sentiment_value=sentiment_to_numeric(imp.sentiment),
|
||
impact_score=imp.impact_score,
|
||
info_gain_factor=sw.info_gain_factor,
|
||
source_accuracy_factor=sw.source_accuracy_factor,
|
||
)
|
||
)
|
||
return signals
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Derive trend direction from weighted sentiment
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Thresholds for mapping numeric sentiment to direction.
|
||
BULLISH_THRESHOLD = 0.15
|
||
BEARISH_THRESHOLD = -0.15
|
||
MIXED_THRESHOLD = 0.10 # contradiction score above this → mixed
|
||
|
||
|
||
def derive_trend_direction(
|
||
avg_sentiment: float,
|
||
contradiction_score: float = 0.0,
|
||
) -> TrendDirection:
|
||
"""Map a weighted average sentiment to a TrendDirection.
|
||
|
||
If contradiction is high, the direction is MIXED regardless of
|
||
the average sentiment value.
|
||
"""
|
||
if contradiction_score > MIXED_THRESHOLD and abs(avg_sentiment) < 0.3:
|
||
return TrendDirection.MIXED
|
||
if avg_sentiment >= BULLISH_THRESHOLD:
|
||
return TrendDirection.BULLISH
|
||
if avg_sentiment <= BEARISH_THRESHOLD:
|
||
return TrendDirection.BEARISH
|
||
return TrendDirection.NEUTRAL
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Compute contradiction score
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def compute_contradiction_score(signals: list[WeightedSignal]) -> float:
|
||
"""Measure how much disagreement exists among weighted signals.
|
||
|
||
Returns a value in [0, 1] where 0 means full agreement and 1 means
|
||
equal-weight positive and negative signals.
|
||
|
||
The formula computes the ratio of the minority-side total weight to
|
||
the majority-side total weight.
|
||
"""
|
||
if not signals:
|
||
return 0.0
|
||
|
||
pos_weight = 0.0
|
||
neg_weight = 0.0
|
||
for sig in signals:
|
||
w = sig.weight.combined * sig.impact_score
|
||
if sig.sentiment_value > 0:
|
||
pos_weight += w
|
||
elif sig.sentiment_value < 0:
|
||
neg_weight += w
|
||
|
||
total = pos_weight + neg_weight
|
||
if total == 0.0:
|
||
return 0.0
|
||
|
||
minority = min(pos_weight, neg_weight)
|
||
return round(minority / total, 4)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rank evidence (supporting vs opposing)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def rank_evidence(
|
||
signals: list[WeightedSignal],
|
||
max_refs: int = MAX_EVIDENCE_REFS,
|
||
) -> tuple[list[str], list[str]]:
|
||
"""Return top supporting and opposing document IDs ranked by composite score.
|
||
|
||
Delegates to the evidence ranking module which considers multiple
|
||
factors (weight, impact, recency, confidence) rather than raw weight alone.
|
||
|
||
Supporting = positive sentiment, Opposing = negative sentiment.
|
||
Neutral/mixed signals are excluded from evidence lists.
|
||
"""
|
||
config = EvidenceRankConfig(max_refs=max_refs)
|
||
return _rank_evidence_composite(signals, config)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Extract dominant catalysts and material risks
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def extract_catalysts_and_risks(
|
||
impacts: list[ImpactRow],
|
||
signals: list[WeightedSignal],
|
||
) -> tuple[list[str], list[str]]:
|
||
"""Return dominant catalyst types and material risks weighted by signal strength.
|
||
|
||
Catalysts are ranked by cumulative weight. Risks are deduplicated and
|
||
ordered by the weight of the signal that surfaced them.
|
||
"""
|
||
catalyst_weights: dict[str, float] = {}
|
||
risk_entries: list[tuple[float, str]] = []
|
||
|
||
# Build a lookup from document_id to combined weight
|
||
weight_by_doc = {s.document_id: s.weight.combined * s.impact_score for s in signals}
|
||
|
||
for imp in impacts:
|
||
w = weight_by_doc.get(imp.document_id, 0.0)
|
||
if w <= 0.0:
|
||
continue
|
||
catalyst_weights[imp.catalyst_type] = catalyst_weights.get(imp.catalyst_type, 0.0) + w
|
||
for risk in imp.risks:
|
||
risk_entries.append((w, risk))
|
||
|
||
# Top catalysts by cumulative weight
|
||
sorted_catalysts = sorted(catalyst_weights.items(), key=lambda x: x[1], reverse=True)
|
||
catalysts = [cat for cat, _ in sorted_catalysts[:5]]
|
||
|
||
# Deduplicated risks ordered by weight
|
||
seen_risks: set[str] = set()
|
||
risks: list[str] = []
|
||
risk_entries.sort(key=lambda x: x[0], reverse=True)
|
||
for _, risk_text in risk_entries:
|
||
normalized = risk_text.strip().lower()
|
||
if normalized not in seen_risks:
|
||
seen_risks.add(normalized)
|
||
risks.append(risk_text.strip())
|
||
if len(risks) >= 5:
|
||
break
|
||
|
||
return catalysts, risks
|
||
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Compute trend confidence
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def compute_trend_confidence(
|
||
signals: list[WeightedSignal],
|
||
contradiction_score: float,
|
||
) -> float:
|
||
"""Derive an overall confidence for the trend summary.
|
||
|
||
Confidence is based on:
|
||
- Number of UNIQUE source documents (not raw signal count)
|
||
- Average extraction confidence of contributing signals
|
||
- Signal agreement (what fraction point the same direction),
|
||
dampened by sample size so that 1-2 signals agreeing doesn't
|
||
inflate confidence the same way 10+ signals agreeing does
|
||
- Contradiction penalty (high contradiction lowers confidence)
|
||
|
||
Returns a value in [0, 1].
|
||
"""
|
||
if not signals:
|
||
return 0.0
|
||
|
||
active = [s for s in signals if s.weight.combined > 0]
|
||
if not active:
|
||
return 0.0
|
||
|
||
# Count unique source documents — competitive signals from the same doc
|
||
# shouldn't inflate confidence
|
||
unique_sources = len({s.document_id for s in active if s.document_id})
|
||
count_factor = min(unique_sources / 15.0, 0.8) # Cap at 0.8, needs 15 unique docs
|
||
|
||
# Average extraction confidence
|
||
avg_conf = sum(s.weight.credibility for s in active) / len(active)
|
||
|
||
# Signal agreement: what fraction of signals agree on direction
|
||
bullish = sum(1 for s in active if s.sentiment_value > 0)
|
||
bearish = sum(1 for s in active if s.sentiment_value < 0)
|
||
total = bullish + bearish
|
||
if total > 0:
|
||
agreement = max(bullish, bearish) / total
|
||
else:
|
||
agreement = 0.5
|
||
|
||
# Dampen agreement by sample size: 1-2 signals agreeing is far less
|
||
# meaningful than 7+ signals agreeing. Uses log2(n+1)/log2(8) so the
|
||
# dampener saturates at 1.0 around n=7 unique sources.
|
||
agreement_dampener = min(1.0, math.log2(unique_sources + 1) / math.log2(8))
|
||
agreement *= agreement_dampener
|
||
|
||
# Contradiction penalty
|
||
contradiction_penalty = contradiction_score * 0.4
|
||
|
||
confidence = (0.3 * count_factor + 0.3 * avg_conf + 0.4 * agreement) - contradiction_penalty
|
||
return round(max(0.0, min(1.0, confidence)), 4)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Assemble a TrendSummary from components
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class AssembledTrend:
|
||
"""A trend summary paired with its detailed evidence rankings."""
|
||
|
||
summary: TrendSummary
|
||
supporting_evidence: list[RankedEvidence]
|
||
opposing_evidence: list[RankedEvidence]
|
||
|
||
|
||
def assemble_trend_summary(
|
||
ticker: str,
|
||
window: str,
|
||
signals: list[WeightedSignal],
|
||
impacts: list[ImpactRow],
|
||
market_ctx: Any | None = None,
|
||
max_evidence: int = MAX_EVIDENCE_REFS,
|
||
reference_time: datetime | None = None,
|
||
*,
|
||
probabilistic: bool = False,
|
||
regime: RegimeClassification | None = None,
|
||
) -> TrendSummary:
|
||
"""Build a complete TrendSummary from weighted signals and impact records."""
|
||
result = assemble_trend_with_evidence(
|
||
ticker, window, signals, impacts, market_ctx, max_evidence, reference_time,
|
||
probabilistic=probabilistic,
|
||
regime=regime,
|
||
)
|
||
return result.summary
|
||
|
||
|
||
def assemble_trend_with_evidence(
|
||
ticker: str,
|
||
window: str,
|
||
signals: list[WeightedSignal],
|
||
impacts: list[ImpactRow],
|
||
market_ctx: Any | None = None,
|
||
max_evidence: int = MAX_EVIDENCE_REFS,
|
||
reference_time: datetime | None = None,
|
||
*,
|
||
probabilistic: bool = False,
|
||
regime: RegimeClassification | None = None,
|
||
) -> AssembledTrend:
|
||
"""Build a TrendSummary and return detailed evidence rankings for persistence.
|
||
|
||
When ``probabilistic`` is True:
|
||
- Computes Bayesian posterior from merged signals
|
||
- Uses Bayesian confidence formula for trend confidence
|
||
- Uses entropy-based direction classification
|
||
- Applies regime-adjusted thresholds
|
||
- Populates probabilistic TrendSummary fields
|
||
- Stores probabilistic outputs in market_context JSONB
|
||
|
||
When ``probabilistic`` is False:
|
||
- Preserves exact current heuristic behavior (no changes)
|
||
|
||
Requirements: 1.1, 1.2, 8.1–8.5, 9.1–9.6, 7.8, 16.4, 16.5
|
||
"""
|
||
if reference_time is None:
|
||
reference_time = datetime.now(timezone.utc)
|
||
|
||
avg_sentiment = weighted_sentiment_average(signals)
|
||
|
||
# Run full contradiction detection (Requirement 6.4)
|
||
catalyst_entries = [
|
||
CatalystEntry(document_id=imp.document_id, catalyst_type=imp.catalyst_type)
|
||
for imp in impacts
|
||
]
|
||
contradiction_result = detect_contradictions(
|
||
signals, catalyst_entries, probabilistic=probabilistic,
|
||
)
|
||
contradiction = contradiction_result.score
|
||
|
||
if not probabilistic:
|
||
# --- Heuristic mode: preserve exact current behavior ---
|
||
direction = derive_trend_direction(avg_sentiment, contradiction)
|
||
confidence = compute_trend_confidence(signals, contradiction)
|
||
|
||
# Get detailed evidence rankings for persistence
|
||
ev_config = EvidenceRankConfig(max_refs=max_evidence)
|
||
supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, ev_config)
|
||
|
||
supporting = list(dict.fromkeys(r.document_id for r in supporting_ranked))
|
||
opposing = list(dict.fromkeys(r.document_id for r in opposing_ranked))
|
||
|
||
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
|
||
|
||
# Trend strength: absolute value of weighted sentiment, clamped to [0, 1]
|
||
strength = round(min(abs(avg_sentiment), 1.0), 4)
|
||
|
||
summary = TrendSummary(
|
||
entity_type="company",
|
||
entity_id=ticker,
|
||
window=TrendWindow(window),
|
||
trend_direction=direction,
|
||
trend_strength=strength,
|
||
confidence=confidence,
|
||
top_supporting_evidence=supporting,
|
||
top_opposing_evidence=opposing,
|
||
dominant_catalysts=catalysts,
|
||
material_risks=risks,
|
||
contradiction_score=contradiction,
|
||
disagreement_details=contradiction_result.details,
|
||
market_context=market_ctx,
|
||
generated_at=reference_time,
|
||
)
|
||
|
||
return AssembledTrend(
|
||
summary=summary,
|
||
supporting_evidence=supporting_ranked,
|
||
opposing_evidence=opposing_ranked,
|
||
)
|
||
|
||
# --- Probabilistic mode (Req 8.1–8.5, 9.1–9.6) ---
|
||
|
||
# Default to uncertainty regime when not provided (Req 7.9)
|
||
if regime is None:
|
||
regime = RegimeClassification(
|
||
regime=MarketRegime.UNCERTAINTY,
|
||
trend_indicator=0.0,
|
||
volatility_ratio=1.0,
|
||
bullish_threshold=0.15,
|
||
bearish_threshold=-0.15,
|
||
contradiction_penalty_multiplier=0.6,
|
||
)
|
||
|
||
# Compute Bayesian posterior from merged signals (Req 1.1, 1.2)
|
||
posterior: BayesianPosterior = compute_bayesian_posterior(signals)
|
||
|
||
# --- Bayesian confidence formula (Req 8.1–8.4) ---
|
||
# confidence = 0.5 × C_bayesian + 0.25 × F_count + 0.25 × C_avg_credibility - P_contradiction
|
||
active = [s for s in signals if s.weight.combined > 0]
|
||
unique_sources = len({s.document_id for s in active if s.document_id}) if active else 0
|
||
f_count = min(unique_sources / 15.0, 0.8)
|
||
|
||
avg_credibility = (
|
||
sum(s.weight.credibility for s in active) / len(active) if active else 0.0
|
||
)
|
||
|
||
# Contradiction penalty uses regime-adjusted multiplier (Req 7.7)
|
||
contradiction_penalty = contradiction * regime.contradiction_penalty_multiplier
|
||
|
||
confidence = (
|
||
0.5 * posterior.bayesian_confidence
|
||
+ 0.25 * f_count
|
||
+ 0.25 * avg_credibility
|
||
- contradiction_penalty
|
||
)
|
||
confidence = round(max(0.0, min(1.0, confidence)), 4)
|
||
|
||
# --- Entropy-based direction (Req 9.1–9.5) ---
|
||
# Fixed P_bull thresholds for direction: 0.65 / 0.35
|
||
if posterior.entropy > 0.9:
|
||
direction = TrendDirection.MIXED
|
||
elif posterior.p_bull > 0.65:
|
||
direction = TrendDirection.BULLISH
|
||
elif posterior.p_bull < 0.35:
|
||
direction = TrendDirection.BEARISH
|
||
else:
|
||
direction = TrendDirection.NEUTRAL
|
||
|
||
# Get detailed evidence rankings for persistence
|
||
ev_config = EvidenceRankConfig(max_refs=max_evidence)
|
||
supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, ev_config)
|
||
|
||
supporting = list(dict.fromkeys(r.document_id for r in supporting_ranked))
|
||
opposing = list(dict.fromkeys(r.document_id for r in opposing_ranked))
|
||
|
||
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
|
||
|
||
# Trend strength: absolute value of weighted sentiment, clamped to [0, 1]
|
||
strength = round(min(abs(avg_sentiment), 1.0), 4)
|
||
|
||
# Build probabilistic JSONB data for market_context storage
|
||
probabilistic_data = {
|
||
"p_bull": round(posterior.p_bull, 6),
|
||
"alpha": round(posterior.alpha, 4),
|
||
"beta": round(posterior.beta, 4),
|
||
"log_likelihood": round(posterior.log_likelihood, 6),
|
||
"bayesian_confidence": round(posterior.bayesian_confidence, 6),
|
||
"entropy": round(posterior.entropy, 6),
|
||
"regime": regime.regime.value,
|
||
"regime_volatility_ratio": round(regime.volatility_ratio, 4),
|
||
"pipeline_mode": "probabilistic",
|
||
"contradiction_entropy": round(contradiction, 4),
|
||
}
|
||
|
||
# Enrich market_context with probabilistic outputs
|
||
if market_ctx is not None and hasattr(market_ctx, "model_dump"):
|
||
enriched_ctx_data = market_ctx.model_dump()
|
||
enriched_ctx_data["probabilistic"] = probabilistic_data
|
||
enriched_market_ctx = enriched_ctx_data
|
||
elif isinstance(market_ctx, dict):
|
||
enriched_market_ctx = {**market_ctx, "probabilistic": probabilistic_data}
|
||
else:
|
||
enriched_market_ctx = {"probabilistic": probabilistic_data}
|
||
|
||
summary = TrendSummary(
|
||
entity_type="company",
|
||
entity_id=ticker,
|
||
window=TrendWindow(window),
|
||
trend_direction=direction,
|
||
trend_strength=strength,
|
||
confidence=confidence,
|
||
top_supporting_evidence=supporting,
|
||
top_opposing_evidence=opposing,
|
||
dominant_catalysts=catalysts,
|
||
material_risks=risks,
|
||
contradiction_score=contradiction,
|
||
disagreement_details=contradiction_result.details,
|
||
market_context=enriched_market_ctx,
|
||
generated_at=reference_time,
|
||
# Probabilistic fields (Req 9.6, 16.1)
|
||
p_bull=round(posterior.p_bull, 6),
|
||
alpha=round(posterior.alpha, 4),
|
||
beta_param=round(posterior.beta, 4),
|
||
bayesian_confidence=round(posterior.bayesian_confidence, 6),
|
||
entropy=round(posterior.entropy, 6),
|
||
regime=regime.regime.value,
|
||
pipeline_mode="probabilistic",
|
||
)
|
||
|
||
return AssembledTrend(
|
||
summary=summary,
|
||
supporting_evidence=supporting_ranked,
|
||
opposing_evidence=opposing_ranked,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Persist trend summary to PostgreSQL
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_UPSERT_TREND = """
|
||
INSERT INTO trend_windows (
|
||
entity_type, entity_id, "window", trend_direction, trend_strength,
|
||
confidence, top_supporting_evidence, top_opposing_evidence,
|
||
dominant_catalysts, material_risks, contradiction_score,
|
||
disagreement_details, market_context, generated_at
|
||
) VALUES (
|
||
$1, $2, $3, $4, $5,
|
||
$6, $7::jsonb, $8::jsonb,
|
||
$9::jsonb, $10::jsonb, $11,
|
||
$12::jsonb, $13::jsonb, $14
|
||
)
|
||
ON CONFLICT (entity_type, entity_id, "window") DO UPDATE SET
|
||
trend_direction = EXCLUDED.trend_direction,
|
||
trend_strength = EXCLUDED.trend_strength,
|
||
confidence = EXCLUDED.confidence,
|
||
top_supporting_evidence = EXCLUDED.top_supporting_evidence,
|
||
top_opposing_evidence = EXCLUDED.top_opposing_evidence,
|
||
dominant_catalysts = EXCLUDED.dominant_catalysts,
|
||
material_risks = EXCLUDED.material_risks,
|
||
contradiction_score = EXCLUDED.contradiction_score,
|
||
disagreement_details = EXCLUDED.disagreement_details,
|
||
market_context = EXCLUDED.market_context,
|
||
generated_at = EXCLUDED.generated_at
|
||
RETURNING id
|
||
"""
|
||
|
||
|
||
_INSERT_TREND_HISTORY = """
|
||
INSERT INTO trend_history (
|
||
entity_type, entity_id, "window", trend_direction,
|
||
trend_strength, confidence, contradiction_score,
|
||
dominant_catalysts, material_risks, generated_at
|
||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb, $10)
|
||
"""
|
||
|
||
|
||
async def persist_trend_summary(
|
||
pool: asyncpg.Pool,
|
||
summary: TrendSummary,
|
||
) -> str:
|
||
"""Insert a trend summary row and return its UUID.
|
||
|
||
Also appends a snapshot to trend_history for time-series charting.
|
||
"""
|
||
row = await pool.fetchrow(
|
||
_UPSERT_TREND,
|
||
summary.entity_type,
|
||
summary.entity_id,
|
||
summary.window.value,
|
||
summary.trend_direction.value,
|
||
summary.trend_strength,
|
||
summary.confidence,
|
||
json.dumps(summary.top_supporting_evidence),
|
||
json.dumps(summary.top_opposing_evidence),
|
||
json.dumps(summary.dominant_catalysts),
|
||
json.dumps(summary.material_risks),
|
||
summary.contradiction_score,
|
||
json.dumps([d.model_dump() for d in summary.disagreement_details]),
|
||
json.dumps(
|
||
summary.market_context.model_dump()
|
||
if hasattr(summary.market_context, "model_dump")
|
||
else (summary.market_context if summary.market_context else {}),
|
||
default=str,
|
||
),
|
||
summary.generated_at,
|
||
)
|
||
trend_id = str(row["id"])
|
||
|
||
# Append to trend_history for time-series charts
|
||
try:
|
||
await pool.execute(
|
||
_INSERT_TREND_HISTORY,
|
||
summary.entity_type,
|
||
summary.entity_id,
|
||
summary.window.value,
|
||
summary.trend_direction.value,
|
||
summary.trend_strength,
|
||
summary.confidence,
|
||
summary.contradiction_score,
|
||
json.dumps(summary.dominant_catalysts),
|
||
json.dumps(summary.material_risks),
|
||
summary.generated_at,
|
||
)
|
||
except Exception:
|
||
# Don't fail the main upsert if history insert fails (table may not exist yet)
|
||
logger.debug("Could not insert trend history for %s/%s", summary.entity_id, summary.window.value, exc_info=True)
|
||
|
||
return trend_id
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Persist evidence mappings to trend_evidence table
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_INSERT_EVIDENCE = """
|
||
INSERT INTO trend_evidence (
|
||
trend_window_id, document_id, evidence_type,
|
||
rank_score, weight_component, impact_component,
|
||
recency_component, confidence_component, sentiment_value
|
||
) VALUES (
|
||
$1, $2::uuid, $3,
|
||
$4, $5, $6,
|
||
$7, $8, $9
|
||
)
|
||
"""
|
||
|
||
|
||
def _is_valid_uuid(val: str) -> bool:
|
||
"""Check if a string is a valid UUID (pattern signal IDs are not)."""
|
||
try:
|
||
_uuid.UUID(val)
|
||
return True
|
||
except (ValueError, AttributeError):
|
||
return False
|
||
|
||
|
||
async def persist_trend_evidence(
|
||
pool: asyncpg.Pool,
|
||
trend_window_id: str,
|
||
supporting: list[RankedEvidence],
|
||
opposing: list[RankedEvidence],
|
||
) -> int:
|
||
"""Insert evidence mapping rows for a trend window. Returns count inserted.
|
||
|
||
Deletes any existing evidence for this trend window first to prevent
|
||
duplicate accumulation across aggregation cycles.
|
||
"""
|
||
rows: list[tuple[str, str, str, float, float, float, float, float, float]] = []
|
||
for ev in supporting:
|
||
# Skip non-UUID document IDs (e.g. pattern signal synthetic IDs)
|
||
if not _is_valid_uuid(ev.document_id):
|
||
continue
|
||
rows.append((
|
||
trend_window_id, ev.document_id, "supporting",
|
||
ev.rank_score, ev.weight_component, ev.impact_component,
|
||
ev.recency_component, ev.confidence_component, ev.sentiment_value,
|
||
))
|
||
for ev in opposing:
|
||
if not _is_valid_uuid(ev.document_id):
|
||
continue
|
||
rows.append((
|
||
trend_window_id, ev.document_id, "opposing",
|
||
ev.rank_score, ev.weight_component, ev.impact_component,
|
||
ev.recency_component, ev.confidence_component, ev.sentiment_value,
|
||
))
|
||
|
||
# Clear stale evidence before inserting fresh rows
|
||
await pool.execute(
|
||
"DELETE FROM trend_evidence WHERE trend_window_id = $1",
|
||
trend_window_id,
|
||
)
|
||
|
||
if not rows:
|
||
return 0
|
||
|
||
await pool.executemany(_INSERT_EVIDENCE, rows)
|
||
return len(rows)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Build MacroEventInfo objects for projection computation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_MACRO_EVENT_INFO_QUERY = """
|
||
SELECT
|
||
mir.event_id,
|
||
mir.macro_impact_score,
|
||
mir.impact_direction,
|
||
mir.confidence,
|
||
ge.estimated_duration,
|
||
ge.severity,
|
||
d.published_at AS event_published_at
|
||
FROM macro_impact_records mir
|
||
JOIN global_events ge ON ge.id = mir.event_id
|
||
JOIN documents d ON d.id = ge.source_document_id
|
||
WHERE mir.ticker = $1
|
||
AND mir.computed_at >= $2
|
||
AND mir.computed_at <= $3
|
||
ORDER BY mir.computed_at DESC
|
||
"""
|
||
|
||
|
||
async def _build_macro_event_infos(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
window_start: datetime,
|
||
reference_time: datetime,
|
||
) -> list[MacroEventInfo]:
|
||
"""Fetch macro impact records and build MacroEventInfo objects for projection."""
|
||
rows = await pool.fetch(
|
||
_MACRO_EVENT_INFO_QUERY, ticker, window_start, reference_time,
|
||
)
|
||
infos: list[MacroEventInfo] = []
|
||
for row in rows:
|
||
published_at = row["event_published_at"]
|
||
age_hours = 0.0
|
||
if published_at:
|
||
age_hours = max(
|
||
(reference_time - published_at).total_seconds() / 3600.0, 0.0,
|
||
)
|
||
infos.append(
|
||
MacroEventInfo(
|
||
event_id=str(row["event_id"]),
|
||
macro_impact_score=float(row["macro_impact_score"] or 0.0),
|
||
impact_direction=row["impact_direction"] or "neutral",
|
||
confidence=float(row["confidence"] or 0.5),
|
||
estimated_duration=row["estimated_duration"] or "short_term",
|
||
severity=row["severity"] or "low",
|
||
event_age_hours=age_hours,
|
||
)
|
||
)
|
||
return infos
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Regime detection helper (Req 7.1, 7.2, 7.3, 7.8, 7.9)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_CLOSING_PRICES_QUERY = """
|
||
SELECT close
|
||
FROM market_data_daily
|
||
WHERE ticker = $1
|
||
ORDER BY bar_date DESC
|
||
LIMIT 120
|
||
"""
|
||
|
||
_DAILY_RETURNS_QUERY = """
|
||
SELECT (close - LAG(close) OVER (ORDER BY bar_date)) / NULLIF(LAG(close) OVER (ORDER BY bar_date), 0) AS daily_return
|
||
FROM market_data_daily
|
||
WHERE ticker = $1
|
||
ORDER BY bar_date DESC
|
||
LIMIT 120
|
||
"""
|
||
|
||
_DAILY_VOLUMES_QUERY = """
|
||
SELECT volume
|
||
FROM market_data_daily
|
||
WHERE ticker = $1
|
||
ORDER BY bar_date DESC
|
||
LIMIT 30
|
||
"""
|
||
|
||
# Default uncertainty regime used when market data is unavailable
|
||
_DEFAULT_UNCERTAINTY_REGIME = RegimeClassification(
|
||
regime=MarketRegime.UNCERTAINTY,
|
||
trend_indicator=0.0,
|
||
volatility_ratio=1.0,
|
||
bullish_threshold=0.15,
|
||
bearish_threshold=-0.15,
|
||
contradiction_penalty_multiplier=0.6,
|
||
)
|
||
|
||
|
||
async def _classify_ticker_regime(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
) -> RegimeClassification:
|
||
"""Classify market regime for a ticker from historical price data.
|
||
|
||
Fetches closing prices and daily returns, then delegates to
|
||
``classify_regime``. Falls back to the uncertainty regime when
|
||
market data is unavailable or insufficient.
|
||
|
||
Requirements: 7.1, 7.2, 7.3, 7.8, 7.9
|
||
"""
|
||
try:
|
||
price_rows = await pool.fetch(_CLOSING_PRICES_QUERY, ticker)
|
||
if not price_rows:
|
||
logger.info(
|
||
"No market data for %s — defaulting to uncertainty regime",
|
||
ticker,
|
||
)
|
||
return _DEFAULT_UNCERTAINTY_REGIME
|
||
|
||
# Prices come in DESC order; reverse to chronological
|
||
closing_prices = [float(r["close"]) for r in reversed(price_rows) if r["close"] is not None]
|
||
|
||
return_rows = await pool.fetch(_DAILY_RETURNS_QUERY, ticker)
|
||
# Returns come in DESC order; reverse to chronological, skip NULLs
|
||
returns = [
|
||
float(r["daily_return"])
|
||
for r in reversed(return_rows)
|
||
if r["daily_return"] is not None
|
||
]
|
||
|
||
if not closing_prices or not returns:
|
||
logger.info(
|
||
"Insufficient market data for %s — defaulting to uncertainty regime",
|
||
ticker,
|
||
)
|
||
return _DEFAULT_UNCERTAINTY_REGIME
|
||
|
||
return classify_regime(closing_prices, returns)
|
||
|
||
except Exception:
|
||
logger.warning(
|
||
"Failed to classify regime for %s — defaulting to uncertainty regime",
|
||
ticker,
|
||
exc_info=True,
|
||
)
|
||
return _DEFAULT_UNCERTAINTY_REGIME
|
||
|
||
|
||
async def _fetch_ticker_market_data(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
) -> tuple[list[float] | None, list[float] | None]:
|
||
"""Fetch recent daily returns and volumes for regime multiplier scoring.
|
||
|
||
Returns (returns, volumes) where each is a chronological list or None
|
||
if data is unavailable. Used by the probabilistic scoring pipeline
|
||
to compute regime multiplier M_regime in ``compute_signal_weight``.
|
||
"""
|
||
try:
|
||
return_rows = await pool.fetch(_DAILY_RETURNS_QUERY, ticker)
|
||
returns = [
|
||
float(r["daily_return"])
|
||
for r in reversed(return_rows)
|
||
if r["daily_return"] is not None
|
||
] if return_rows else None
|
||
|
||
volume_rows = await pool.fetch(_DAILY_VOLUMES_QUERY, ticker)
|
||
volumes = [
|
||
float(r["volume"])
|
||
for r in reversed(volume_rows)
|
||
if r["volume"] is not None
|
||
] if volume_rows else None
|
||
|
||
return returns or None, volumes or None
|
||
except Exception:
|
||
logger.warning(
|
||
"Failed to fetch market data for %s scoring — "
|
||
"regime multiplier will default to 1.0",
|
||
ticker,
|
||
exc_info=True,
|
||
)
|
||
return None, None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main aggregation entry point for a single ticker + window
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def aggregate_company_window(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
window: str,
|
||
reference_time: datetime | None = None,
|
||
config: AggregationConfig | None = None,
|
||
*,
|
||
probabilistic: bool = False,
|
||
regime: RegimeClassification | None = None,
|
||
source_accuracy_map: dict[str, float] | None = None,
|
||
ticker_returns: list[float] | None = None,
|
||
ticker_volumes: list[float] | None = None,
|
||
) -> TrendSummary:
|
||
"""Compute and persist a trend summary for one ticker and one window.
|
||
|
||
Steps:
|
||
1. Determine the time range for the window.
|
||
2. Fetch document impact records from PostgreSQL.
|
||
3. Fetch market context for the ticker.
|
||
4. Build weighted signals using the scoring module.
|
||
5. Check macro toggle and fetch/merge macro signals if enabled.
|
||
6. Check competitive toggle and fetch/merge pattern/competitive signals if enabled.
|
||
7. Assemble the TrendSummary (probabilistic or heuristic).
|
||
8. Persist to trend_windows table.
|
||
|
||
When ``probabilistic`` is True, the scoring config is set to
|
||
probabilistic mode, source accuracy factors are passed to signal
|
||
scoring, and macro integration uses the conditional modifier.
|
||
|
||
Returns the assembled TrendSummary.
|
||
"""
|
||
cfg = config or AggregationConfig()
|
||
scoring_cfg = cfg.effective_scoring()
|
||
|
||
# When probabilistic mode is active, create a scoring config with
|
||
# probabilistic=True so all downstream scoring uses the new formulas.
|
||
if probabilistic and not scoring_cfg.probabilistic:
|
||
scoring_cfg = ScoringConfig(
|
||
half_life_hours=scoring_cfg.half_life_hours,
|
||
min_recency_weight=scoring_cfg.min_recency_weight,
|
||
credibility_floor=scoring_cfg.credibility_floor,
|
||
credibility_ceiling=scoring_cfg.credibility_ceiling,
|
||
credibility_exponent=scoring_cfg.credibility_exponent,
|
||
novelty_bonus_max=scoring_cfg.novelty_bonus_max,
|
||
confidence_floor=scoring_cfg.confidence_floor,
|
||
volatility_recency_boost_threshold=scoring_cfg.volatility_recency_boost_threshold,
|
||
volatility_recency_boost_max=scoring_cfg.volatility_recency_boost_max,
|
||
volume_surge_threshold_pct=scoring_cfg.volume_surge_threshold_pct,
|
||
volume_surge_boost=scoring_cfg.volume_surge_boost,
|
||
probabilistic=True,
|
||
sigmoid_steepness=scoring_cfg.sigmoid_steepness,
|
||
sigmoid_midpoint=scoring_cfg.sigmoid_midpoint,
|
||
info_gain_lambda=scoring_cfg.info_gain_lambda,
|
||
info_gain_max=scoring_cfg.info_gain_max,
|
||
default_base_rate=scoring_cfg.default_base_rate,
|
||
adaptive_decay_impact_scale=scoring_cfg.adaptive_decay_impact_scale,
|
||
adaptive_decay_surprise_scale=scoring_cfg.adaptive_decay_surprise_scale,
|
||
adaptive_decay_market_scale=scoring_cfg.adaptive_decay_market_scale,
|
||
regime_return_weight=scoring_cfg.regime_return_weight,
|
||
regime_volume_weight=scoring_cfg.regime_volume_weight,
|
||
regime_multiplier_max=scoring_cfg.regime_multiplier_max,
|
||
)
|
||
|
||
if reference_time is None:
|
||
reference_time = datetime.now(timezone.utc)
|
||
|
||
_agg_start = time.monotonic()
|
||
duration = WINDOW_DURATIONS.get(window, timedelta(days=7))
|
||
window_start = reference_time - duration
|
||
|
||
# 1. Fetch impact records
|
||
impacts = await fetch_impact_records(pool, ticker, window_start, reference_time)
|
||
|
||
# 2. Fetch market context
|
||
market_ctx = await fetch_market_context(pool, ticker, window, reference_time)
|
||
|
||
# 3. Build weighted signals — pass source accuracy and market data
|
||
# when in probabilistic mode (Req 4.1–4.3, 6.1–6.5)
|
||
signals = build_weighted_signals(
|
||
impacts, reference_time, window, market_ctx, scoring_cfg,
|
||
source_accuracy_map=source_accuracy_map if probabilistic else None,
|
||
returns=ticker_returns if probabilistic else None,
|
||
volumes=ticker_volumes if probabilistic else None,
|
||
)
|
||
|
||
# 4. Check macro toggle and merge macro signals
|
||
# (Requirement 11.2, 11.3, 11.4): Toggle state is read from the DB on
|
||
# every aggregation cycle. When disabled, macro signals are skipped but
|
||
# ingestion/classification continue independently — so when re-enabled,
|
||
# the most recent classifications (including those ingested while disabled)
|
||
# are immediately available for impact computation.
|
||
macro_enabled = cfg.macro_enabled
|
||
db_toggle = await fetch_macro_enabled(pool)
|
||
if db_toggle is not None:
|
||
macro_enabled = db_toggle
|
||
|
||
macro_modifier = 1.0
|
||
if macro_enabled:
|
||
macro_impacts = await fetch_macro_impact_records(
|
||
pool, ticker, window_start, reference_time,
|
||
)
|
||
if macro_impacts:
|
||
macro_signals = build_macro_weighted_signals(
|
||
macro_impacts,
|
||
reference_time,
|
||
window,
|
||
macro_signal_weight=cfg.macro_signal_weight,
|
||
config=scoring_cfg,
|
||
returns=ticker_returns if probabilistic else None,
|
||
volumes=ticker_volumes if probabilistic else None,
|
||
)
|
||
|
||
if probabilistic:
|
||
# Probabilistic mode: use conditional macro modifier (Req 11.1–11.5)
|
||
company_direction = derive_trend_direction(
|
||
weighted_sentiment_average(signals),
|
||
).value
|
||
signals, macro_modifier = integrate_macro_signals(
|
||
company_signals=signals,
|
||
macro_signals=macro_signals,
|
||
company_direction=company_direction,
|
||
macro_impacts=macro_impacts,
|
||
ticker=ticker,
|
||
probabilistic=True,
|
||
macro_signal_weight=cfg.macro_signal_weight,
|
||
)
|
||
else:
|
||
# Heuristic mode: simple additive merge (current behavior)
|
||
signals = signals + macro_signals
|
||
|
||
logger.info(
|
||
"Merged %d macro signals for %s/%s (modifier=%.4f)",
|
||
len(macro_signals), ticker, window, macro_modifier,
|
||
)
|
||
|
||
# 5. Check competitive toggle and merge pattern/competitive signals
|
||
# (Requirements 5.1-5.6): Same toggle pattern as macro layer. When
|
||
# disabled, pattern mining remains queryable but aggregation skips
|
||
# competitive signals — no degradation of existing behavior.
|
||
competitive_enabled = cfg.competitive_enabled
|
||
db_competitive_toggle = await fetch_competitive_enabled(pool)
|
||
if db_competitive_toggle is not None:
|
||
competitive_enabled = db_competitive_toggle
|
||
|
||
if competitive_enabled:
|
||
try:
|
||
# Get unique catalyst types from the impact records
|
||
catalyst_types = {imp.catalyst_type for imp in impacts}
|
||
|
||
# Query self-company historical patterns for each catalyst type
|
||
all_patterns = []
|
||
for cat_type in catalyst_types:
|
||
patterns = await find_self_patterns(pool, ticker, cat_type)
|
||
all_patterns.extend(patterns)
|
||
|
||
# Fetch competitive signals targeting this ticker
|
||
comp_signals = await fetch_competitive_signals(
|
||
pool, ticker, window_start, reference_time,
|
||
)
|
||
|
||
# Convert to WeightedSignal objects
|
||
if all_patterns or comp_signals:
|
||
pattern_weighted = build_pattern_weighted_signals(
|
||
patterns=all_patterns,
|
||
competitive_signals=comp_signals,
|
||
reference_time=reference_time,
|
||
window=window,
|
||
)
|
||
signals = signals + pattern_weighted
|
||
logger.info(
|
||
"Merged %d pattern/competitive signals for %s/%s "
|
||
"(patterns=%d, competitive=%d)",
|
||
len(pattern_weighted), ticker, window,
|
||
len(all_patterns), len(comp_signals),
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"Failed to fetch pattern/competitive signals for %s/%s — "
|
||
"continuing with company+macro signals only",
|
||
ticker, window,
|
||
)
|
||
|
||
# 6. Assemble trend summary with evidence details
|
||
assembled = assemble_trend_with_evidence(
|
||
ticker=ticker,
|
||
window=window,
|
||
signals=signals,
|
||
impacts=impacts,
|
||
market_ctx=market_ctx if market_ctx.has_data else None,
|
||
max_evidence=cfg.max_evidence,
|
||
reference_time=reference_time,
|
||
probabilistic=probabilistic,
|
||
regime=regime,
|
||
)
|
||
summary = assembled.summary
|
||
|
||
# 6b. Enrich probabilistic JSONB with macro modifier (Req 16.2)
|
||
if probabilistic and macro_modifier != 1.0:
|
||
ctx = summary.market_context
|
||
if isinstance(ctx, dict) and "probabilistic" in ctx:
|
||
ctx["probabilistic"]["macro_modifier"] = round(macro_modifier, 4)
|
||
|
||
# 7. Persist trend window
|
||
trend_id = await persist_trend_summary(pool, summary)
|
||
|
||
# 8. Persist evidence mappings
|
||
evidence_count = await persist_trend_evidence(
|
||
pool, trend_id,
|
||
assembled.supporting_evidence,
|
||
assembled.opposing_evidence,
|
||
)
|
||
|
||
logger.info(
|
||
"Persisted trend %s for %s/%s: direction=%s strength=%.3f confidence=%.3f signals=%d evidence=%d",
|
||
trend_id, ticker, window, summary.trend_direction.value,
|
||
summary.trend_strength, summary.confidence, len(signals), evidence_count,
|
||
)
|
||
|
||
# 9. Compute and persist trend projection
|
||
try:
|
||
macro_event_infos: list[MacroEventInfo] = []
|
||
if macro_enabled:
|
||
macro_event_infos = await _build_macro_event_infos(
|
||
pool, ticker, window_start, reference_time,
|
||
)
|
||
|
||
projection = compute_projection(
|
||
summary=summary,
|
||
macro_events=macro_event_infos if macro_event_infos else None,
|
||
macro_enabled=macro_enabled,
|
||
upcoming_catalysts=summary.dominant_catalysts[:3] if summary.dominant_catalysts else None,
|
||
)
|
||
await persist_trend_projection(pool, trend_id, projection)
|
||
logger.info(
|
||
"Persisted projection for %s/%s: direction=%s strength=%.3f confidence=%.3f diverges=%s",
|
||
ticker, window, projection.projected_direction,
|
||
projection.projected_strength, projection.projected_confidence,
|
||
projection.diverges_from_current,
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"Failed to compute/persist projection for trend %s (%s/%s) — continuing",
|
||
trend_id, ticker, window,
|
||
)
|
||
|
||
# Prometheus metrics
|
||
AGGREGATION_WINDOWS_COMPUTED.labels(window=window).inc()
|
||
AGGREGATION_SIGNALS_PROCESSED.labels(window=window).inc(len(signals))
|
||
AGGREGATION_CONTRADICTION_SCORE.observe(summary.contradiction_score)
|
||
AGGREGATION_DURATION.labels(window=window).observe(time.monotonic() - _agg_start)
|
||
|
||
return summary
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Aggregate all windows for a single ticker
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def aggregate_company(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
reference_time: datetime | None = None,
|
||
config: AggregationConfig | None = None,
|
||
) -> list[TrendSummary]:
|
||
"""Compute trend summaries for all configured windows for a ticker."""
|
||
cfg = config or AggregationConfig()
|
||
if reference_time is None:
|
||
reference_time = datetime.now(timezone.utc)
|
||
|
||
# Read probabilistic scoring flag once per cycle (Requirement 16.7).
|
||
# Mid-cycle changes take effect on the next cycle.
|
||
probabilistic = await fetch_probabilistic_scoring_enabled(pool)
|
||
pipeline_mode = "probabilistic" if probabilistic else "heuristic"
|
||
|
||
# --- Quality gate evaluation (Req 11.2, 11.3) ---
|
||
# Evaluate model quality gate at the start of each aggregation cycle.
|
||
# When the gate fails, all recommendations are forced to paper mode.
|
||
# Gate evaluation failure defaults to paper-only (fail-safe).
|
||
quality_gate_passed = False
|
||
try:
|
||
gate_result: QualityGateResult = await evaluate_quality_gate(pool)
|
||
quality_gate_passed = gate_result.passed
|
||
logger.info(
|
||
"Quality gate for %s cycle: %s — %s",
|
||
ticker,
|
||
"PASS" if gate_result.passed else "FAIL",
|
||
gate_result.reason,
|
||
)
|
||
except Exception:
|
||
logger.exception(
|
||
"Quality gate evaluation failed for %s cycle — "
|
||
"defaulting to paper-only mode (fail-safe)",
|
||
ticker,
|
||
)
|
||
quality_gate_passed = False
|
||
|
||
logger.info(
|
||
"Aggregation cycle for %s: pipeline_mode=%s quality_gate=%s",
|
||
ticker,
|
||
pipeline_mode,
|
||
"passed" if quality_gate_passed else "failed",
|
||
)
|
||
|
||
# --- Regime detection (Req 7.1, 7.2, 7.3, 7.8, 7.9) ---
|
||
# Classify market regime for this ticker using closing prices and returns.
|
||
# Default to uncertainty regime when market data is unavailable.
|
||
regime: RegimeClassification | None = None
|
||
ticker_returns: list[float] | None = None
|
||
ticker_volumes: list[float] | None = None
|
||
source_accuracy_map: dict[str, float] | None = None
|
||
|
||
if probabilistic:
|
||
regime = await _classify_ticker_regime(pool, ticker)
|
||
logger.info(
|
||
"Regime for %s: %s (trend_indicator=%.1f, vol_ratio=%.2f, "
|
||
"bullish_threshold=%.2f, contradiction_mult=%.1f)",
|
||
ticker,
|
||
regime.regime.value,
|
||
regime.trend_indicator,
|
||
regime.volatility_ratio,
|
||
regime.bullish_threshold,
|
||
regime.contradiction_penalty_multiplier,
|
||
)
|
||
|
||
# Fetch market data (returns/volumes) for regime multiplier in scoring
|
||
# (Req 6.1–6.5). Fetched once per cycle and reused across all windows.
|
||
ticker_returns, ticker_volumes = await _fetch_ticker_market_data(pool, ticker)
|
||
|
||
# Batch-fetch source accuracy for all sources in the signal set
|
||
# (Req 4.1–4.3). Fetched once per cycle; individual signals look up
|
||
# their factor from this map. DB errors default to empty map (factor 1.0).
|
||
try:
|
||
# Fetch all source IDs from the longest window to cover all signals
|
||
longest_window = max(
|
||
cfg.effective_windows(),
|
||
key=lambda w: WINDOW_DURATIONS.get(w, timedelta(days=7)),
|
||
)
|
||
longest_duration = WINDOW_DURATIONS.get(longest_window, timedelta(days=90))
|
||
window_start = reference_time - longest_duration
|
||
all_impacts = await fetch_impact_records(pool, ticker, window_start, reference_time)
|
||
source_ids = list({imp.document_id for imp in all_impacts})
|
||
if source_ids:
|
||
sa_records = await fetch_source_accuracy(pool, source_ids)
|
||
source_accuracy_map = {
|
||
sid: sa.accuracy_factor for sid, sa in sa_records.items()
|
||
}
|
||
logger.info(
|
||
"Fetched source accuracy for %s: %d/%d sources have records",
|
||
ticker, len(sa_records), len(source_ids),
|
||
)
|
||
except Exception:
|
||
logger.warning(
|
||
"Failed to fetch source accuracy for %s — defaulting to neutral factor",
|
||
ticker,
|
||
exc_info=True,
|
||
)
|
||
source_accuracy_map = None
|
||
|
||
summaries: list[TrendSummary] = []
|
||
for window in cfg.effective_windows():
|
||
summary = await aggregate_company_window(
|
||
pool, ticker, window, reference_time, cfg,
|
||
probabilistic=probabilistic,
|
||
regime=regime,
|
||
source_accuracy_map=source_accuracy_map,
|
||
ticker_returns=ticker_returns,
|
||
ticker_volumes=ticker_volumes,
|
||
)
|
||
|
||
# When quality gate fails, annotate the trend summary so the
|
||
# recommendation engine forces paper mode (Req 11.2, 11.3).
|
||
if not quality_gate_passed:
|
||
ctx = summary.market_context
|
||
if isinstance(ctx, dict):
|
||
ctx["quality_gate_passed"] = False
|
||
elif ctx is not None and hasattr(ctx, "model_dump"):
|
||
ctx_dict = ctx.model_dump()
|
||
ctx_dict["quality_gate_passed"] = False
|
||
summary.market_context = ctx_dict
|
||
else:
|
||
summary.market_context = {"quality_gate_passed": False}
|
||
|
||
summaries.append(summary)
|
||
|
||
return summaries
|