Files
stonks-oracle/services/aggregation/worker.py
T
Celes Renata 7fcc8a6c07
ci/woodpecker/push/test Pipeline failed
ci/woodpecker/push/build-1 unknown status
ci/woodpecker/push/build-3 unknown status
ci/woodpecker/push/build-2 unknown status
ci/woodpecker/push/finalize unknown status
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: model validation, calibration, and signal quality layer
- Migration 035: prediction_snapshots, prediction_outcomes, signal_evidence_links, model_metric_snapshots tables + SQL views
- Prediction snapshot writer with canonical evidence keys, duplicate detection, contribution scores
- Outcome evaluator across 5 horizons (1h, 6h, 1d, 7d, 30d)
- Metrics engine: ECE, Brier score, IC, Rank IC, benchmark comparison
- Attribution engine: per-source, per-catalyst, per-layer performance
- Calibration engine: Bayesian shrinkage source reliability
- Quality gate for live trading eligibility with configurable thresholds
- 7 new /api/validation/* endpoints
- Upgraded OpsModel dashboard with validation tab
- Enhanced recommendation display with calibration context
- Backtest replay validation mode
- 86 Python tests (unit + property-based), 179 frontend tests passing
2026-05-01 03:04:58 +00:00

1692 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Aggregation worker - company-level rolling window trend summaries.
Queries document intelligence and market context for a given ticker,
computes weighted signal scores, and produces TrendSummary objects
persisted to the trend_windows table.
Requirements: 6.1, 6.2, 6.5
"""
from __future__ import annotations
import json
import logging
import math
import time
import uuid as _uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import asyncpg
from services.aggregation.bayesian import (
BayesianPosterior,
compute_bayesian_posterior,
)
from services.aggregation.contradiction import CatalystEntry, detect_contradictions
from services.aggregation.evidence import (
EvidenceRankConfig,
RankedEvidence,
rank_evidence_detailed,
)
from services.aggregation.evidence import (
rank_evidence as _rank_evidence_composite,
)
from services.aggregation.interpolation import integrate_macro_signals
from services.aggregation.market_context import fetch_market_context
from services.aggregation.pattern_matcher import find_self_patterns
from services.aggregation.projection import (
MacroEventInfo,
compute_projection,
persist_trend_projection,
)
from services.aggregation.regime import (
MarketRegime,
RegimeClassification,
classify_regime,
)
from services.aggregation.scoring import (
ScoringConfig,
WeightedSignal,
compute_signal_weight,
sentiment_to_numeric,
weighted_sentiment_average,
)
from services.aggregation.signal_propagation import (
CompetitiveSignalRecord,
build_pattern_weighted_signals,
)
from services.aggregation.source_accuracy import fetch_source_accuracy
from services.shared.metrics import (
AGGREGATION_CONTRADICTION_SCORE,
AGGREGATION_DURATION,
AGGREGATION_SIGNALS_PROCESSED,
AGGREGATION_WINDOWS_COMPUTED,
)
from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow
from services.trading.model_quality_gate import QualityGateResult, evaluate_quality_gate
logger = logging.getLogger(__name__)
# Map TrendWindow values to lookback durations.
WINDOW_DURATIONS: dict[str, timedelta] = {
TrendWindow.INTRADAY.value: timedelta(hours=12),
TrendWindow.ONE_DAY.value: timedelta(days=1),
TrendWindow.SEVEN_DAY.value: timedelta(days=7),
TrendWindow.THIRTY_DAY.value: timedelta(days=30),
TrendWindow.NINETY_DAY.value: timedelta(days=90),
}
# How many evidence document IDs to keep in supporting/opposing lists.
MAX_EVIDENCE_REFS = 10
@dataclass
class AggregationConfig:
"""Controls which windows to compute and scoring parameters."""
windows: list[str] | None = None # None = all windows
scoring: ScoringConfig | None = None
max_evidence: int = MAX_EVIDENCE_REFS
macro_signal_weight: float = 0.3 # relative weight of macro vs company signals
macro_enabled: bool = True # runtime toggle state
competitive_signal_weight: float = 0.2 # relative weight of pattern signals
competitive_enabled: bool = True # runtime toggle state
probabilistic_scoring_enabled: bool = False # probabilistic pipeline toggle
def effective_windows(self) -> list[str]:
if self.windows:
return self.windows
return [w.value for w in TrendWindow]
def effective_scoring(self) -> ScoringConfig:
return self.scoring or ScoringConfig()
# ---------------------------------------------------------------------------
# Fetch impact records for a ticker within a time window
# ---------------------------------------------------------------------------
_IMPACT_QUERY = """
SELECT
di.document_id,
di.confidence,
di.novelty_score,
di.source_credibility,
dir.sentiment,
dir.impact_score,
dir.catalyst_type,
dir.key_facts,
dir.risks,
d.published_at
FROM document_impact_records dir
JOIN document_intelligence di ON di.id = dir.intelligence_id
JOIN documents d ON d.id = di.document_id
WHERE dir.ticker = $1
AND d.published_at >= $2
AND d.published_at <= $3
AND di.validation_status = 'valid'
AND d.status != 'rejected'
ORDER BY d.published_at DESC
"""
@dataclass
class ImpactRow:
"""Parsed row from the impact query."""
document_id: str
confidence: float
novelty_score: float
source_credibility: float
sentiment: str
impact_score: float
catalyst_type: str
key_facts: list[str]
risks: list[str]
published_at: datetime
def _parse_impact_row(row: Any) -> ImpactRow:
"""Convert an asyncpg Record to an ImpactRow."""
key_facts = row["key_facts"]
if isinstance(key_facts, str):
key_facts = json.loads(key_facts)
risks = row["risks"]
if isinstance(risks, str):
risks = json.loads(risks)
return ImpactRow(
document_id=str(row["document_id"]),
confidence=float(row["confidence"] or 0.5),
novelty_score=float(row["novelty_score"] or 0.5),
source_credibility=float(row["source_credibility"] or 0.5),
sentiment=row["sentiment"] or "neutral",
impact_score=float(row["impact_score"] or 0.0),
catalyst_type=row["catalyst_type"] or "other",
key_facts=key_facts if isinstance(key_facts, list) else [],
risks=risks if isinstance(risks, list) else [],
published_at=row["published_at"],
)
async def fetch_impact_records(
pool: asyncpg.Pool,
ticker: str,
window_start: datetime,
window_end: datetime,
) -> list[ImpactRow]:
"""Fetch validated document impact records for a ticker in a time range."""
rows = await pool.fetch(_IMPACT_QUERY, ticker, window_start, window_end)
return [_parse_impact_row(r) for r in rows]
# ---------------------------------------------------------------------------
# Fetch macro toggle state from risk_configs
#
# MACRO LAYER TOGGLE BEHAVIOR (Requirements 11.2, 11.3, 11.4):
# - The toggle state is read fresh from PostgreSQL at the start of each
# aggregation cycle (no caching), so changes take effect immediately on
# the next cycle.
# - When disabled: ingestion and classification continue normally (historical
# data is preserved), but interpolation and aggregation integration are
# skipped — the aggregation engine produces trends using only company-
# specific signals.
# - When re-enabled: the engine resumes computing macro impact scores using
# the most recent GlobalEvent classifications, including any events that
# were ingested and classified while the layer was disabled.
# ---------------------------------------------------------------------------
_MACRO_TOGGLE_QUERY = """
SELECT config->>'macro_enabled' AS macro_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1
"""
async def fetch_macro_enabled(pool: asyncpg.Pool) -> bool | None:
"""Check macro toggle state from risk_configs table.
Returns True/False if explicitly set, or None if no config exists
(caller should fall back to AggregationConfig default).
"""
row = await pool.fetchrow(_MACRO_TOGGLE_QUERY)
if row is None or row["macro_enabled"] is None:
return None
return row["macro_enabled"].lower() == "true"
# ---------------------------------------------------------------------------
# Fetch competitive toggle state from risk_configs
# ---------------------------------------------------------------------------
_COMPETITIVE_TOGGLE_QUERY = """
SELECT config->>'competitive_enabled' AS competitive_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1
"""
async def fetch_competitive_enabled(pool: asyncpg.Pool) -> bool | None:
"""Check competitive toggle state from risk_configs table.
Returns True/False if explicitly set, or None if no config exists
(caller should fall back to AggregationConfig default).
"""
row = await pool.fetchrow(_COMPETITIVE_TOGGLE_QUERY)
if row is None or row["competitive_enabled"] is None:
return None
return row["competitive_enabled"].lower() == "true"
# ---------------------------------------------------------------------------
# Fetch probabilistic scoring toggle from risk_configs
#
# PROBABILISTIC PIPELINE TOGGLE (Requirements 16.3, 16.4, 16.5, 16.6, 16.7):
# - Read once per aggregation cycle from the risk_configs table.
# - When False (default): the heuristic pipeline is used — identical outputs
# to the current system.
# - When True: the new Bayesian, regime-aware, and adaptive formulas are
# used for all pipeline stages.
# - Defaults to False when the key is missing, the value is invalid, or the
# database is unreachable (fail-safe to heuristic mode).
# ---------------------------------------------------------------------------
_PROBABILISTIC_TOGGLE_QUERY = """
SELECT config->>'probabilistic_scoring_enabled' AS probabilistic_scoring_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1
"""
async def fetch_probabilistic_scoring_enabled(pool: asyncpg.Pool) -> bool:
"""Check probabilistic scoring toggle from risk_configs table.
Returns True when explicitly enabled, False in all other cases
(missing key, invalid value, no config row, DB error).
This is fail-safe: any failure defaults to the heuristic pipeline.
Requirements: 16.3, 16.6
"""
try:
row = await pool.fetchrow(_PROBABILISTIC_TOGGLE_QUERY)
if row is None or row["probabilistic_scoring_enabled"] is None:
return False
raw = row["probabilistic_scoring_enabled"]
if not isinstance(raw, str) or raw.lower() not in ("true", "false"):
logger.warning(
"Invalid probabilistic_scoring_enabled value %r in "
"risk_configs; defaulting to heuristic pipeline",
raw,
)
return False
return raw.lower() == "true"
except Exception:
logger.warning(
"Failed to read probabilistic_scoring_enabled from risk_configs; "
"defaulting to heuristic pipeline",
exc_info=True,
)
return False
# ---------------------------------------------------------------------------
# Fetch competitive signals targeting a ticker within a time window
# ---------------------------------------------------------------------------
_COMPETITIVE_SIGNALS_QUERY = """
SELECT source_document_id, source_ticker, target_ticker, catalyst_type,
pattern_confidence, signal_direction, signal_strength,
relationship_strength, computed_at
FROM competitive_signal_records
WHERE target_ticker = $1
AND computed_at >= $2
AND computed_at <= $3
ORDER BY computed_at DESC
LIMIT 500
"""
async def fetch_competitive_signals(
pool: asyncpg.Pool,
ticker: str,
window_start: datetime,
window_end: datetime,
) -> list[CompetitiveSignalRecord]:
"""Fetch competitive signal records targeting a ticker in a time range."""
rows = await pool.fetch(
_COMPETITIVE_SIGNALS_QUERY, ticker, window_start, window_end,
)
return [
CompetitiveSignalRecord(
source_document_id=str(row["source_document_id"]),
source_ticker=row["source_ticker"],
target_ticker=row["target_ticker"],
catalyst_type=row["catalyst_type"],
pattern_confidence=float(row["pattern_confidence"]),
signal_direction=row["signal_direction"],
signal_strength=float(row["signal_strength"]),
relationship_strength=float(row["relationship_strength"]),
computed_at=row["computed_at"],
)
for row in rows
]
# ---------------------------------------------------------------------------
# Fetch macro impact records for a ticker within a time window
# ---------------------------------------------------------------------------
_MACRO_IMPACT_QUERY = """
SELECT
mir.event_id,
mir.company_id,
mir.ticker,
mir.macro_impact_score,
mir.impact_direction,
mir.contributing_factors,
mir.confidence,
mir.computed_at,
ge.source_document_id,
d.published_at AS event_published_at
FROM macro_impact_records mir
JOIN global_events ge ON ge.id = mir.event_id
JOIN documents d ON d.id = ge.source_document_id
WHERE mir.ticker = $1
AND mir.computed_at >= $2
AND mir.computed_at <= $3
ORDER BY mir.computed_at DESC
"""
@dataclass
class MacroImpactRow:
"""Parsed row from the macro impact query."""
event_id: str
company_id: str
ticker: str
macro_impact_score: float
impact_direction: str
contributing_factors: list[str]
confidence: float
computed_at: datetime
source_document_id: str
event_published_at: datetime
def _parse_macro_impact_row(row: Any) -> MacroImpactRow:
"""Convert an asyncpg Record to a MacroImpactRow."""
factors = row["contributing_factors"]
if isinstance(factors, str):
factors = json.loads(factors)
return MacroImpactRow(
event_id=str(row["event_id"]),
company_id=str(row["company_id"]),
ticker=row["ticker"],
macro_impact_score=float(row["macro_impact_score"] or 0.0),
impact_direction=row["impact_direction"] or "neutral",
contributing_factors=factors if isinstance(factors, list) else [],
confidence=float(row["confidence"] or 0.5),
computed_at=row["computed_at"],
source_document_id=str(row["source_document_id"]),
event_published_at=row["event_published_at"],
)
async def fetch_macro_impact_records(
pool: asyncpg.Pool,
ticker: str,
window_start: datetime,
window_end: datetime,
) -> list[MacroImpactRow]:
"""Fetch macro impact records for a ticker in a time range."""
rows = await pool.fetch(_MACRO_IMPACT_QUERY, ticker, window_start, window_end)
return [_parse_macro_impact_row(r) for r in rows]
# ---------------------------------------------------------------------------
# Convert macro impact records to WeightedSignals
# ---------------------------------------------------------------------------
_DIRECTION_TO_SENTIMENT: dict[str, float] = {
"positive": 1.0,
"negative": -1.0,
"mixed": 0.0,
"neutral": 0.0,
}
def build_macro_weighted_signals(
macro_impacts: list[MacroImpactRow],
reference_time: datetime,
window: str,
macro_signal_weight: float = 0.3,
config: ScoringConfig | None = None,
*,
returns: list[float] | None = None,
volumes: list[float] | None = None,
) -> list[WeightedSignal]:
"""Convert macro impact records into WeightedSignal objects.
Uses the same scoring pipeline as company signals:
- document_id = source_document_id (for evidence tracing)
- sentiment_value mapped from impact_direction
- impact_score = macro_impact_score * macro_signal_weight
- recency decay from the global event's publication time
- confidence gating from the macro record's confidence
When ``config.probabilistic`` is True, passes returns/volumes for
regime multiplier computation.
"""
cfg = config or ScoringConfig()
signals: list[WeightedSignal] = []
for mir in macro_impacts:
sw = compute_signal_weight(
published_at=mir.event_published_at,
reference_time=reference_time,
window=window,
source_credibility=mir.confidence,
novelty_score=0.5,
extraction_confidence=mir.confidence,
config=cfg,
returns=returns,
volumes=volumes,
)
sentiment = _DIRECTION_TO_SENTIMENT.get(mir.impact_direction, 0.0)
impact = mir.macro_impact_score * macro_signal_weight
signals.append(
WeightedSignal(
document_id=mir.source_document_id,
weight=sw,
sentiment_value=sentiment,
impact_score=impact,
)
)
return signals
# ---------------------------------------------------------------------------
# Build weighted signals from impact records
# ---------------------------------------------------------------------------
def build_weighted_signals(
impacts: list[ImpactRow],
reference_time: datetime,
window: str,
market_ctx: Any | None = None,
config: ScoringConfig | None = None,
*,
source_accuracy_map: dict[str, float] | None = None,
returns: list[float] | None = None,
volumes: list[float] | None = None,
) -> list[WeightedSignal]:
"""Convert impact records into WeightedSignal objects using the scoring module.
When ``config.probabilistic`` is True, passes source accuracy factors,
event types, and market data (returns/volumes) to the scoring pipeline
for regime multiplier and adaptive decay computation.
"""
cfg = config or ScoringConfig()
accuracy_map = source_accuracy_map or {}
signals: list[WeightedSignal] = []
for imp in impacts:
# Look up source accuracy factor for this document's source
saf = accuracy_map.get(imp.document_id, 1.0)
sw = compute_signal_weight(
published_at=imp.published_at,
reference_time=reference_time,
window=window,
source_credibility=imp.source_credibility,
novelty_score=imp.novelty_score,
extraction_confidence=imp.confidence,
market_ctx=market_ctx,
config=cfg,
event_type=imp.catalyst_type if cfg.probabilistic else None,
impact_score=imp.impact_score,
source_accuracy_factor=saf,
returns=returns,
volumes=volumes,
)
signals.append(
WeightedSignal(
document_id=imp.document_id,
weight=sw,
sentiment_value=sentiment_to_numeric(imp.sentiment),
impact_score=imp.impact_score,
info_gain_factor=sw.info_gain_factor,
source_accuracy_factor=sw.source_accuracy_factor,
)
)
return signals
# ---------------------------------------------------------------------------
# Derive trend direction from weighted sentiment
# ---------------------------------------------------------------------------
# Thresholds for mapping numeric sentiment to direction.
BULLISH_THRESHOLD = 0.15
BEARISH_THRESHOLD = -0.15
MIXED_THRESHOLD = 0.10 # contradiction score above this → mixed
def derive_trend_direction(
avg_sentiment: float,
contradiction_score: float = 0.0,
) -> TrendDirection:
"""Map a weighted average sentiment to a TrendDirection.
If contradiction is high, the direction is MIXED regardless of
the average sentiment value.
"""
if contradiction_score > MIXED_THRESHOLD and abs(avg_sentiment) < 0.3:
return TrendDirection.MIXED
if avg_sentiment >= BULLISH_THRESHOLD:
return TrendDirection.BULLISH
if avg_sentiment <= BEARISH_THRESHOLD:
return TrendDirection.BEARISH
return TrendDirection.NEUTRAL
# ---------------------------------------------------------------------------
# Compute contradiction score
# ---------------------------------------------------------------------------
def compute_contradiction_score(signals: list[WeightedSignal]) -> float:
"""Measure how much disagreement exists among weighted signals.
Returns a value in [0, 1] where 0 means full agreement and 1 means
equal-weight positive and negative signals.
The formula computes the ratio of the minority-side total weight to
the majority-side total weight.
"""
if not signals:
return 0.0
pos_weight = 0.0
neg_weight = 0.0
for sig in signals:
w = sig.weight.combined * sig.impact_score
if sig.sentiment_value > 0:
pos_weight += w
elif sig.sentiment_value < 0:
neg_weight += w
total = pos_weight + neg_weight
if total == 0.0:
return 0.0
minority = min(pos_weight, neg_weight)
return round(minority / total, 4)
# ---------------------------------------------------------------------------
# Rank evidence (supporting vs opposing)
# ---------------------------------------------------------------------------
def rank_evidence(
signals: list[WeightedSignal],
max_refs: int = MAX_EVIDENCE_REFS,
) -> tuple[list[str], list[str]]:
"""Return top supporting and opposing document IDs ranked by composite score.
Delegates to the evidence ranking module which considers multiple
factors (weight, impact, recency, confidence) rather than raw weight alone.
Supporting = positive sentiment, Opposing = negative sentiment.
Neutral/mixed signals are excluded from evidence lists.
"""
config = EvidenceRankConfig(max_refs=max_refs)
return _rank_evidence_composite(signals, config)
# ---------------------------------------------------------------------------
# Extract dominant catalysts and material risks
# ---------------------------------------------------------------------------
def extract_catalysts_and_risks(
impacts: list[ImpactRow],
signals: list[WeightedSignal],
) -> tuple[list[str], list[str]]:
"""Return dominant catalyst types and material risks weighted by signal strength.
Catalysts are ranked by cumulative weight. Risks are deduplicated and
ordered by the weight of the signal that surfaced them.
"""
catalyst_weights: dict[str, float] = {}
risk_entries: list[tuple[float, str]] = []
# Build a lookup from document_id to combined weight
weight_by_doc = {s.document_id: s.weight.combined * s.impact_score for s in signals}
for imp in impacts:
w = weight_by_doc.get(imp.document_id, 0.0)
if w <= 0.0:
continue
catalyst_weights[imp.catalyst_type] = catalyst_weights.get(imp.catalyst_type, 0.0) + w
for risk in imp.risks:
risk_entries.append((w, risk))
# Top catalysts by cumulative weight
sorted_catalysts = sorted(catalyst_weights.items(), key=lambda x: x[1], reverse=True)
catalysts = [cat for cat, _ in sorted_catalysts[:5]]
# Deduplicated risks ordered by weight
seen_risks: set[str] = set()
risks: list[str] = []
risk_entries.sort(key=lambda x: x[0], reverse=True)
for _, risk_text in risk_entries:
normalized = risk_text.strip().lower()
if normalized not in seen_risks:
seen_risks.add(normalized)
risks.append(risk_text.strip())
if len(risks) >= 5:
break
return catalysts, risks
# ---------------------------------------------------------------------------
# Compute trend confidence
# ---------------------------------------------------------------------------
def compute_trend_confidence(
signals: list[WeightedSignal],
contradiction_score: float,
) -> float:
"""Derive an overall confidence for the trend summary.
Confidence is based on:
- Number of UNIQUE source documents (not raw signal count)
- Average extraction confidence of contributing signals
- Signal agreement (what fraction point the same direction),
dampened by sample size so that 1-2 signals agreeing doesn't
inflate confidence the same way 10+ signals agreeing does
- Contradiction penalty (high contradiction lowers confidence)
Returns a value in [0, 1].
"""
if not signals:
return 0.0
active = [s for s in signals if s.weight.combined > 0]
if not active:
return 0.0
# Count unique source documents — competitive signals from the same doc
# shouldn't inflate confidence
unique_sources = len({s.document_id for s in active if s.document_id})
count_factor = min(unique_sources / 15.0, 0.8) # Cap at 0.8, needs 15 unique docs
# Average extraction confidence
avg_conf = sum(s.weight.credibility for s in active) / len(active)
# Signal agreement: what fraction of signals agree on direction
bullish = sum(1 for s in active if s.sentiment_value > 0)
bearish = sum(1 for s in active if s.sentiment_value < 0)
total = bullish + bearish
if total > 0:
agreement = max(bullish, bearish) / total
else:
agreement = 0.5
# Dampen agreement by sample size: 1-2 signals agreeing is far less
# meaningful than 7+ signals agreeing. Uses log2(n+1)/log2(8) so the
# dampener saturates at 1.0 around n=7 unique sources.
agreement_dampener = min(1.0, math.log2(unique_sources + 1) / math.log2(8))
agreement *= agreement_dampener
# Contradiction penalty
contradiction_penalty = contradiction_score * 0.4
confidence = (0.3 * count_factor + 0.3 * avg_conf + 0.4 * agreement) - contradiction_penalty
return round(max(0.0, min(1.0, confidence)), 4)
# ---------------------------------------------------------------------------
# Assemble a TrendSummary from components
# ---------------------------------------------------------------------------
@dataclass
class AssembledTrend:
"""A trend summary paired with its detailed evidence rankings."""
summary: TrendSummary
supporting_evidence: list[RankedEvidence]
opposing_evidence: list[RankedEvidence]
def assemble_trend_summary(
ticker: str,
window: str,
signals: list[WeightedSignal],
impacts: list[ImpactRow],
market_ctx: Any | None = None,
max_evidence: int = MAX_EVIDENCE_REFS,
reference_time: datetime | None = None,
*,
probabilistic: bool = False,
regime: RegimeClassification | None = None,
) -> TrendSummary:
"""Build a complete TrendSummary from weighted signals and impact records."""
result = assemble_trend_with_evidence(
ticker, window, signals, impacts, market_ctx, max_evidence, reference_time,
probabilistic=probabilistic,
regime=regime,
)
return result.summary
def assemble_trend_with_evidence(
ticker: str,
window: str,
signals: list[WeightedSignal],
impacts: list[ImpactRow],
market_ctx: Any | None = None,
max_evidence: int = MAX_EVIDENCE_REFS,
reference_time: datetime | None = None,
*,
probabilistic: bool = False,
regime: RegimeClassification | None = None,
) -> AssembledTrend:
"""Build a TrendSummary and return detailed evidence rankings for persistence.
When ``probabilistic`` is True:
- Computes Bayesian posterior from merged signals
- Uses Bayesian confidence formula for trend confidence
- Uses entropy-based direction classification
- Applies regime-adjusted thresholds
- Populates probabilistic TrendSummary fields
- Stores probabilistic outputs in market_context JSONB
When ``probabilistic`` is False:
- Preserves exact current heuristic behavior (no changes)
Requirements: 1.1, 1.2, 8.18.5, 9.19.6, 7.8, 16.4, 16.5
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
avg_sentiment = weighted_sentiment_average(signals)
# Run full contradiction detection (Requirement 6.4)
catalyst_entries = [
CatalystEntry(document_id=imp.document_id, catalyst_type=imp.catalyst_type)
for imp in impacts
]
contradiction_result = detect_contradictions(
signals, catalyst_entries, probabilistic=probabilistic,
)
contradiction = contradiction_result.score
if not probabilistic:
# --- Heuristic mode: preserve exact current behavior ---
direction = derive_trend_direction(avg_sentiment, contradiction)
confidence = compute_trend_confidence(signals, contradiction)
# Get detailed evidence rankings for persistence
ev_config = EvidenceRankConfig(max_refs=max_evidence)
supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, ev_config)
supporting = list(dict.fromkeys(r.document_id for r in supporting_ranked))
opposing = list(dict.fromkeys(r.document_id for r in opposing_ranked))
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
# Trend strength: absolute value of weighted sentiment, clamped to [0, 1]
strength = round(min(abs(avg_sentiment), 1.0), 4)
summary = TrendSummary(
entity_type="company",
entity_id=ticker,
window=TrendWindow(window),
trend_direction=direction,
trend_strength=strength,
confidence=confidence,
top_supporting_evidence=supporting,
top_opposing_evidence=opposing,
dominant_catalysts=catalysts,
material_risks=risks,
contradiction_score=contradiction,
disagreement_details=contradiction_result.details,
market_context=market_ctx,
generated_at=reference_time,
)
return AssembledTrend(
summary=summary,
supporting_evidence=supporting_ranked,
opposing_evidence=opposing_ranked,
)
# --- Probabilistic mode (Req 8.18.5, 9.19.6) ---
# Default to uncertainty regime when not provided (Req 7.9)
if regime is None:
regime = RegimeClassification(
regime=MarketRegime.UNCERTAINTY,
trend_indicator=0.0,
volatility_ratio=1.0,
bullish_threshold=0.15,
bearish_threshold=-0.15,
contradiction_penalty_multiplier=0.6,
)
# Compute Bayesian posterior from merged signals (Req 1.1, 1.2)
posterior: BayesianPosterior = compute_bayesian_posterior(signals)
# --- Bayesian confidence formula (Req 8.18.4) ---
# confidence = 0.5 × C_bayesian + 0.25 × F_count + 0.25 × C_avg_credibility - P_contradiction
active = [s for s in signals if s.weight.combined > 0]
unique_sources = len({s.document_id for s in active if s.document_id}) if active else 0
f_count = min(unique_sources / 15.0, 0.8)
avg_credibility = (
sum(s.weight.credibility for s in active) / len(active) if active else 0.0
)
# Contradiction penalty uses regime-adjusted multiplier (Req 7.7)
contradiction_penalty = contradiction * regime.contradiction_penalty_multiplier
confidence = (
0.5 * posterior.bayesian_confidence
+ 0.25 * f_count
+ 0.25 * avg_credibility
- contradiction_penalty
)
confidence = round(max(0.0, min(1.0, confidence)), 4)
# --- Entropy-based direction (Req 9.19.5) ---
# Fixed P_bull thresholds for direction: 0.65 / 0.35
if posterior.entropy > 0.9:
direction = TrendDirection.MIXED
elif posterior.p_bull > 0.65:
direction = TrendDirection.BULLISH
elif posterior.p_bull < 0.35:
direction = TrendDirection.BEARISH
else:
direction = TrendDirection.NEUTRAL
# Get detailed evidence rankings for persistence
ev_config = EvidenceRankConfig(max_refs=max_evidence)
supporting_ranked, opposing_ranked = rank_evidence_detailed(signals, ev_config)
supporting = list(dict.fromkeys(r.document_id for r in supporting_ranked))
opposing = list(dict.fromkeys(r.document_id for r in opposing_ranked))
catalysts, risks = extract_catalysts_and_risks(impacts, signals)
# Trend strength: absolute value of weighted sentiment, clamped to [0, 1]
strength = round(min(abs(avg_sentiment), 1.0), 4)
# Build probabilistic JSONB data for market_context storage
probabilistic_data = {
"p_bull": round(posterior.p_bull, 6),
"alpha": round(posterior.alpha, 4),
"beta": round(posterior.beta, 4),
"log_likelihood": round(posterior.log_likelihood, 6),
"bayesian_confidence": round(posterior.bayesian_confidence, 6),
"entropy": round(posterior.entropy, 6),
"regime": regime.regime.value,
"regime_volatility_ratio": round(regime.volatility_ratio, 4),
"pipeline_mode": "probabilistic",
"contradiction_entropy": round(contradiction, 4),
}
# Enrich market_context with probabilistic outputs
if market_ctx is not None and hasattr(market_ctx, "model_dump"):
enriched_ctx_data = market_ctx.model_dump()
enriched_ctx_data["probabilistic"] = probabilistic_data
enriched_market_ctx = enriched_ctx_data
elif isinstance(market_ctx, dict):
enriched_market_ctx = {**market_ctx, "probabilistic": probabilistic_data}
else:
enriched_market_ctx = {"probabilistic": probabilistic_data}
summary = TrendSummary(
entity_type="company",
entity_id=ticker,
window=TrendWindow(window),
trend_direction=direction,
trend_strength=strength,
confidence=confidence,
top_supporting_evidence=supporting,
top_opposing_evidence=opposing,
dominant_catalysts=catalysts,
material_risks=risks,
contradiction_score=contradiction,
disagreement_details=contradiction_result.details,
market_context=enriched_market_ctx,
generated_at=reference_time,
# Probabilistic fields (Req 9.6, 16.1)
p_bull=round(posterior.p_bull, 6),
alpha=round(posterior.alpha, 4),
beta_param=round(posterior.beta, 4),
bayesian_confidence=round(posterior.bayesian_confidence, 6),
entropy=round(posterior.entropy, 6),
regime=regime.regime.value,
pipeline_mode="probabilistic",
)
return AssembledTrend(
summary=summary,
supporting_evidence=supporting_ranked,
opposing_evidence=opposing_ranked,
)
# ---------------------------------------------------------------------------
# Persist trend summary to PostgreSQL
# ---------------------------------------------------------------------------
_UPSERT_TREND = """
INSERT INTO trend_windows (
entity_type, entity_id, "window", trend_direction, trend_strength,
confidence, top_supporting_evidence, top_opposing_evidence,
dominant_catalysts, material_risks, contradiction_score,
disagreement_details, market_context, generated_at
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8::jsonb,
$9::jsonb, $10::jsonb, $11,
$12::jsonb, $13::jsonb, $14
)
ON CONFLICT (entity_type, entity_id, "window") DO UPDATE SET
trend_direction = EXCLUDED.trend_direction,
trend_strength = EXCLUDED.trend_strength,
confidence = EXCLUDED.confidence,
top_supporting_evidence = EXCLUDED.top_supporting_evidence,
top_opposing_evidence = EXCLUDED.top_opposing_evidence,
dominant_catalysts = EXCLUDED.dominant_catalysts,
material_risks = EXCLUDED.material_risks,
contradiction_score = EXCLUDED.contradiction_score,
disagreement_details = EXCLUDED.disagreement_details,
market_context = EXCLUDED.market_context,
generated_at = EXCLUDED.generated_at
RETURNING id
"""
_INSERT_TREND_HISTORY = """
INSERT INTO trend_history (
entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, contradiction_score,
dominant_catalysts, material_risks, generated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9::jsonb, $10)
"""
async def persist_trend_summary(
pool: asyncpg.Pool,
summary: TrendSummary,
) -> str:
"""Insert a trend summary row and return its UUID.
Also appends a snapshot to trend_history for time-series charting.
"""
row = await pool.fetchrow(
_UPSERT_TREND,
summary.entity_type,
summary.entity_id,
summary.window.value,
summary.trend_direction.value,
summary.trend_strength,
summary.confidence,
json.dumps(summary.top_supporting_evidence),
json.dumps(summary.top_opposing_evidence),
json.dumps(summary.dominant_catalysts),
json.dumps(summary.material_risks),
summary.contradiction_score,
json.dumps([d.model_dump() for d in summary.disagreement_details]),
json.dumps(
summary.market_context.model_dump()
if hasattr(summary.market_context, "model_dump")
else (summary.market_context if summary.market_context else {}),
default=str,
),
summary.generated_at,
)
trend_id = str(row["id"])
# Append to trend_history for time-series charts
try:
await pool.execute(
_INSERT_TREND_HISTORY,
summary.entity_type,
summary.entity_id,
summary.window.value,
summary.trend_direction.value,
summary.trend_strength,
summary.confidence,
summary.contradiction_score,
json.dumps(summary.dominant_catalysts),
json.dumps(summary.material_risks),
summary.generated_at,
)
except Exception:
# Don't fail the main upsert if history insert fails (table may not exist yet)
logger.debug("Could not insert trend history for %s/%s", summary.entity_id, summary.window.value, exc_info=True)
return trend_id
# ---------------------------------------------------------------------------
# Persist evidence mappings to trend_evidence table
# ---------------------------------------------------------------------------
_INSERT_EVIDENCE = """
INSERT INTO trend_evidence (
trend_window_id, document_id, evidence_type,
rank_score, weight_component, impact_component,
recency_component, confidence_component, sentiment_value
) VALUES (
$1, $2::uuid, $3,
$4, $5, $6,
$7, $8, $9
)
"""
def _is_valid_uuid(val: str) -> bool:
"""Check if a string is a valid UUID (pattern signal IDs are not)."""
try:
_uuid.UUID(val)
return True
except (ValueError, AttributeError):
return False
async def persist_trend_evidence(
pool: asyncpg.Pool,
trend_window_id: str,
supporting: list[RankedEvidence],
opposing: list[RankedEvidence],
) -> int:
"""Insert evidence mapping rows for a trend window. Returns count inserted.
Deletes any existing evidence for this trend window first to prevent
duplicate accumulation across aggregation cycles.
"""
rows: list[tuple[str, str, str, float, float, float, float, float, float]] = []
for ev in supporting:
# Skip non-UUID document IDs (e.g. pattern signal synthetic IDs)
if not _is_valid_uuid(ev.document_id):
continue
rows.append((
trend_window_id, ev.document_id, "supporting",
ev.rank_score, ev.weight_component, ev.impact_component,
ev.recency_component, ev.confidence_component, ev.sentiment_value,
))
for ev in opposing:
if not _is_valid_uuid(ev.document_id):
continue
rows.append((
trend_window_id, ev.document_id, "opposing",
ev.rank_score, ev.weight_component, ev.impact_component,
ev.recency_component, ev.confidence_component, ev.sentiment_value,
))
# Clear stale evidence before inserting fresh rows
await pool.execute(
"DELETE FROM trend_evidence WHERE trend_window_id = $1",
trend_window_id,
)
if not rows:
return 0
await pool.executemany(_INSERT_EVIDENCE, rows)
return len(rows)
# ---------------------------------------------------------------------------
# Build MacroEventInfo objects for projection computation
# ---------------------------------------------------------------------------
_MACRO_EVENT_INFO_QUERY = """
SELECT
mir.event_id,
mir.macro_impact_score,
mir.impact_direction,
mir.confidence,
ge.estimated_duration,
ge.severity,
d.published_at AS event_published_at
FROM macro_impact_records mir
JOIN global_events ge ON ge.id = mir.event_id
JOIN documents d ON d.id = ge.source_document_id
WHERE mir.ticker = $1
AND mir.computed_at >= $2
AND mir.computed_at <= $3
ORDER BY mir.computed_at DESC
"""
async def _build_macro_event_infos(
pool: asyncpg.Pool,
ticker: str,
window_start: datetime,
reference_time: datetime,
) -> list[MacroEventInfo]:
"""Fetch macro impact records and build MacroEventInfo objects for projection."""
rows = await pool.fetch(
_MACRO_EVENT_INFO_QUERY, ticker, window_start, reference_time,
)
infos: list[MacroEventInfo] = []
for row in rows:
published_at = row["event_published_at"]
age_hours = 0.0
if published_at:
age_hours = max(
(reference_time - published_at).total_seconds() / 3600.0, 0.0,
)
infos.append(
MacroEventInfo(
event_id=str(row["event_id"]),
macro_impact_score=float(row["macro_impact_score"] or 0.0),
impact_direction=row["impact_direction"] or "neutral",
confidence=float(row["confidence"] or 0.5),
estimated_duration=row["estimated_duration"] or "short_term",
severity=row["severity"] or "low",
event_age_hours=age_hours,
)
)
return infos
# ---------------------------------------------------------------------------
# Regime detection helper (Req 7.1, 7.2, 7.3, 7.8, 7.9)
# ---------------------------------------------------------------------------
_CLOSING_PRICES_QUERY = """
SELECT close
FROM market_data_daily
WHERE ticker = $1
ORDER BY bar_date DESC
LIMIT 120
"""
_DAILY_RETURNS_QUERY = """
SELECT (close - LAG(close) OVER (ORDER BY bar_date)) / NULLIF(LAG(close) OVER (ORDER BY bar_date), 0) AS daily_return
FROM market_data_daily
WHERE ticker = $1
ORDER BY bar_date DESC
LIMIT 120
"""
_DAILY_VOLUMES_QUERY = """
SELECT volume
FROM market_data_daily
WHERE ticker = $1
ORDER BY bar_date DESC
LIMIT 30
"""
# Default uncertainty regime used when market data is unavailable
_DEFAULT_UNCERTAINTY_REGIME = RegimeClassification(
regime=MarketRegime.UNCERTAINTY,
trend_indicator=0.0,
volatility_ratio=1.0,
bullish_threshold=0.15,
bearish_threshold=-0.15,
contradiction_penalty_multiplier=0.6,
)
async def _classify_ticker_regime(
pool: asyncpg.Pool,
ticker: str,
) -> RegimeClassification:
"""Classify market regime for a ticker from historical price data.
Fetches closing prices and daily returns, then delegates to
``classify_regime``. Falls back to the uncertainty regime when
market data is unavailable or insufficient.
Requirements: 7.1, 7.2, 7.3, 7.8, 7.9
"""
try:
price_rows = await pool.fetch(_CLOSING_PRICES_QUERY, ticker)
if not price_rows:
logger.info(
"No market data for %s — defaulting to uncertainty regime",
ticker,
)
return _DEFAULT_UNCERTAINTY_REGIME
# Prices come in DESC order; reverse to chronological
closing_prices = [float(r["close"]) for r in reversed(price_rows) if r["close"] is not None]
return_rows = await pool.fetch(_DAILY_RETURNS_QUERY, ticker)
# Returns come in DESC order; reverse to chronological, skip NULLs
returns = [
float(r["daily_return"])
for r in reversed(return_rows)
if r["daily_return"] is not None
]
if not closing_prices or not returns:
logger.info(
"Insufficient market data for %s — defaulting to uncertainty regime",
ticker,
)
return _DEFAULT_UNCERTAINTY_REGIME
return classify_regime(closing_prices, returns)
except Exception:
logger.warning(
"Failed to classify regime for %s — defaulting to uncertainty regime",
ticker,
exc_info=True,
)
return _DEFAULT_UNCERTAINTY_REGIME
async def _fetch_ticker_market_data(
pool: asyncpg.Pool,
ticker: str,
) -> tuple[list[float] | None, list[float] | None]:
"""Fetch recent daily returns and volumes for regime multiplier scoring.
Returns (returns, volumes) where each is a chronological list or None
if data is unavailable. Used by the probabilistic scoring pipeline
to compute regime multiplier M_regime in ``compute_signal_weight``.
"""
try:
return_rows = await pool.fetch(_DAILY_RETURNS_QUERY, ticker)
returns = [
float(r["daily_return"])
for r in reversed(return_rows)
if r["daily_return"] is not None
] if return_rows else None
volume_rows = await pool.fetch(_DAILY_VOLUMES_QUERY, ticker)
volumes = [
float(r["volume"])
for r in reversed(volume_rows)
if r["volume"] is not None
] if volume_rows else None
return returns or None, volumes or None
except Exception:
logger.warning(
"Failed to fetch market data for %s scoring — "
"regime multiplier will default to 1.0",
ticker,
exc_info=True,
)
return None, None
# ---------------------------------------------------------------------------
# Main aggregation entry point for a single ticker + window
# ---------------------------------------------------------------------------
async def aggregate_company_window(
pool: asyncpg.Pool,
ticker: str,
window: str,
reference_time: datetime | None = None,
config: AggregationConfig | None = None,
*,
probabilistic: bool = False,
regime: RegimeClassification | None = None,
source_accuracy_map: dict[str, float] | None = None,
ticker_returns: list[float] | None = None,
ticker_volumes: list[float] | None = None,
) -> TrendSummary:
"""Compute and persist a trend summary for one ticker and one window.
Steps:
1. Determine the time range for the window.
2. Fetch document impact records from PostgreSQL.
3. Fetch market context for the ticker.
4. Build weighted signals using the scoring module.
5. Check macro toggle and fetch/merge macro signals if enabled.
6. Check competitive toggle and fetch/merge pattern/competitive signals if enabled.
7. Assemble the TrendSummary (probabilistic or heuristic).
8. Persist to trend_windows table.
When ``probabilistic`` is True, the scoring config is set to
probabilistic mode, source accuracy factors are passed to signal
scoring, and macro integration uses the conditional modifier.
Returns the assembled TrendSummary.
"""
cfg = config or AggregationConfig()
scoring_cfg = cfg.effective_scoring()
# When probabilistic mode is active, create a scoring config with
# probabilistic=True so all downstream scoring uses the new formulas.
if probabilistic and not scoring_cfg.probabilistic:
scoring_cfg = ScoringConfig(
half_life_hours=scoring_cfg.half_life_hours,
min_recency_weight=scoring_cfg.min_recency_weight,
credibility_floor=scoring_cfg.credibility_floor,
credibility_ceiling=scoring_cfg.credibility_ceiling,
credibility_exponent=scoring_cfg.credibility_exponent,
novelty_bonus_max=scoring_cfg.novelty_bonus_max,
confidence_floor=scoring_cfg.confidence_floor,
volatility_recency_boost_threshold=scoring_cfg.volatility_recency_boost_threshold,
volatility_recency_boost_max=scoring_cfg.volatility_recency_boost_max,
volume_surge_threshold_pct=scoring_cfg.volume_surge_threshold_pct,
volume_surge_boost=scoring_cfg.volume_surge_boost,
probabilistic=True,
sigmoid_steepness=scoring_cfg.sigmoid_steepness,
sigmoid_midpoint=scoring_cfg.sigmoid_midpoint,
info_gain_lambda=scoring_cfg.info_gain_lambda,
info_gain_max=scoring_cfg.info_gain_max,
default_base_rate=scoring_cfg.default_base_rate,
adaptive_decay_impact_scale=scoring_cfg.adaptive_decay_impact_scale,
adaptive_decay_surprise_scale=scoring_cfg.adaptive_decay_surprise_scale,
adaptive_decay_market_scale=scoring_cfg.adaptive_decay_market_scale,
regime_return_weight=scoring_cfg.regime_return_weight,
regime_volume_weight=scoring_cfg.regime_volume_weight,
regime_multiplier_max=scoring_cfg.regime_multiplier_max,
)
if reference_time is None:
reference_time = datetime.now(timezone.utc)
_agg_start = time.monotonic()
duration = WINDOW_DURATIONS.get(window, timedelta(days=7))
window_start = reference_time - duration
# 1. Fetch impact records
impacts = await fetch_impact_records(pool, ticker, window_start, reference_time)
# 2. Fetch market context
market_ctx = await fetch_market_context(pool, ticker, window, reference_time)
# 3. Build weighted signals — pass source accuracy and market data
# when in probabilistic mode (Req 4.14.3, 6.16.5)
signals = build_weighted_signals(
impacts, reference_time, window, market_ctx, scoring_cfg,
source_accuracy_map=source_accuracy_map if probabilistic else None,
returns=ticker_returns if probabilistic else None,
volumes=ticker_volumes if probabilistic else None,
)
# 4. Check macro toggle and merge macro signals
# (Requirement 11.2, 11.3, 11.4): Toggle state is read from the DB on
# every aggregation cycle. When disabled, macro signals are skipped but
# ingestion/classification continue independently — so when re-enabled,
# the most recent classifications (including those ingested while disabled)
# are immediately available for impact computation.
macro_enabled = cfg.macro_enabled
db_toggle = await fetch_macro_enabled(pool)
if db_toggle is not None:
macro_enabled = db_toggle
macro_modifier = 1.0
if macro_enabled:
macro_impacts = await fetch_macro_impact_records(
pool, ticker, window_start, reference_time,
)
if macro_impacts:
macro_signals = build_macro_weighted_signals(
macro_impacts,
reference_time,
window,
macro_signal_weight=cfg.macro_signal_weight,
config=scoring_cfg,
returns=ticker_returns if probabilistic else None,
volumes=ticker_volumes if probabilistic else None,
)
if probabilistic:
# Probabilistic mode: use conditional macro modifier (Req 11.111.5)
company_direction = derive_trend_direction(
weighted_sentiment_average(signals),
).value
signals, macro_modifier = integrate_macro_signals(
company_signals=signals,
macro_signals=macro_signals,
company_direction=company_direction,
macro_impacts=macro_impacts,
ticker=ticker,
probabilistic=True,
macro_signal_weight=cfg.macro_signal_weight,
)
else:
# Heuristic mode: simple additive merge (current behavior)
signals = signals + macro_signals
logger.info(
"Merged %d macro signals for %s/%s (modifier=%.4f)",
len(macro_signals), ticker, window, macro_modifier,
)
# 5. Check competitive toggle and merge pattern/competitive signals
# (Requirements 5.1-5.6): Same toggle pattern as macro layer. When
# disabled, pattern mining remains queryable but aggregation skips
# competitive signals — no degradation of existing behavior.
competitive_enabled = cfg.competitive_enabled
db_competitive_toggle = await fetch_competitive_enabled(pool)
if db_competitive_toggle is not None:
competitive_enabled = db_competitive_toggle
if competitive_enabled:
try:
# Get unique catalyst types from the impact records
catalyst_types = {imp.catalyst_type for imp in impacts}
# Query self-company historical patterns for each catalyst type
all_patterns = []
for cat_type in catalyst_types:
patterns = await find_self_patterns(pool, ticker, cat_type)
all_patterns.extend(patterns)
# Fetch competitive signals targeting this ticker
comp_signals = await fetch_competitive_signals(
pool, ticker, window_start, reference_time,
)
# Convert to WeightedSignal objects
if all_patterns or comp_signals:
pattern_weighted = build_pattern_weighted_signals(
patterns=all_patterns,
competitive_signals=comp_signals,
reference_time=reference_time,
window=window,
)
signals = signals + pattern_weighted
logger.info(
"Merged %d pattern/competitive signals for %s/%s "
"(patterns=%d, competitive=%d)",
len(pattern_weighted), ticker, window,
len(all_patterns), len(comp_signals),
)
except Exception:
logger.exception(
"Failed to fetch pattern/competitive signals for %s/%s"
"continuing with company+macro signals only",
ticker, window,
)
# 6. Assemble trend summary with evidence details
assembled = assemble_trend_with_evidence(
ticker=ticker,
window=window,
signals=signals,
impacts=impacts,
market_ctx=market_ctx if market_ctx.has_data else None,
max_evidence=cfg.max_evidence,
reference_time=reference_time,
probabilistic=probabilistic,
regime=regime,
)
summary = assembled.summary
# 6b. Enrich probabilistic JSONB with macro modifier (Req 16.2)
if probabilistic and macro_modifier != 1.0:
ctx = summary.market_context
if isinstance(ctx, dict) and "probabilistic" in ctx:
ctx["probabilistic"]["macro_modifier"] = round(macro_modifier, 4)
# 7. Persist trend window
trend_id = await persist_trend_summary(pool, summary)
# 8. Persist evidence mappings
evidence_count = await persist_trend_evidence(
pool, trend_id,
assembled.supporting_evidence,
assembled.opposing_evidence,
)
logger.info(
"Persisted trend %s for %s/%s: direction=%s strength=%.3f confidence=%.3f signals=%d evidence=%d",
trend_id, ticker, window, summary.trend_direction.value,
summary.trend_strength, summary.confidence, len(signals), evidence_count,
)
# 9. Compute and persist trend projection
try:
macro_event_infos: list[MacroEventInfo] = []
if macro_enabled:
macro_event_infos = await _build_macro_event_infos(
pool, ticker, window_start, reference_time,
)
projection = compute_projection(
summary=summary,
macro_events=macro_event_infos if macro_event_infos else None,
macro_enabled=macro_enabled,
upcoming_catalysts=summary.dominant_catalysts[:3] if summary.dominant_catalysts else None,
)
await persist_trend_projection(pool, trend_id, projection)
logger.info(
"Persisted projection for %s/%s: direction=%s strength=%.3f confidence=%.3f diverges=%s",
ticker, window, projection.projected_direction,
projection.projected_strength, projection.projected_confidence,
projection.diverges_from_current,
)
except Exception:
logger.exception(
"Failed to compute/persist projection for trend %s (%s/%s) — continuing",
trend_id, ticker, window,
)
# Prometheus metrics
AGGREGATION_WINDOWS_COMPUTED.labels(window=window).inc()
AGGREGATION_SIGNALS_PROCESSED.labels(window=window).inc(len(signals))
AGGREGATION_CONTRADICTION_SCORE.observe(summary.contradiction_score)
AGGREGATION_DURATION.labels(window=window).observe(time.monotonic() - _agg_start)
return summary
# ---------------------------------------------------------------------------
# Aggregate all windows for a single ticker
# ---------------------------------------------------------------------------
async def aggregate_company(
pool: asyncpg.Pool,
ticker: str,
reference_time: datetime | None = None,
config: AggregationConfig | None = None,
) -> list[TrendSummary]:
"""Compute trend summaries for all configured windows for a ticker."""
cfg = config or AggregationConfig()
if reference_time is None:
reference_time = datetime.now(timezone.utc)
# Read probabilistic scoring flag once per cycle (Requirement 16.7).
# Mid-cycle changes take effect on the next cycle.
probabilistic = await fetch_probabilistic_scoring_enabled(pool)
pipeline_mode = "probabilistic" if probabilistic else "heuristic"
# --- Quality gate evaluation (Req 11.2, 11.3) ---
# Evaluate model quality gate at the start of each aggregation cycle.
# When the gate fails, all recommendations are forced to paper mode.
# Gate evaluation failure defaults to paper-only (fail-safe).
quality_gate_passed = False
try:
gate_result: QualityGateResult = await evaluate_quality_gate(pool)
quality_gate_passed = gate_result.passed
logger.info(
"Quality gate for %s cycle: %s%s",
ticker,
"PASS" if gate_result.passed else "FAIL",
gate_result.reason,
)
except Exception:
logger.exception(
"Quality gate evaluation failed for %s cycle — "
"defaulting to paper-only mode (fail-safe)",
ticker,
)
quality_gate_passed = False
logger.info(
"Aggregation cycle for %s: pipeline_mode=%s quality_gate=%s",
ticker,
pipeline_mode,
"passed" if quality_gate_passed else "failed",
)
# --- Regime detection (Req 7.1, 7.2, 7.3, 7.8, 7.9) ---
# Classify market regime for this ticker using closing prices and returns.
# Default to uncertainty regime when market data is unavailable.
regime: RegimeClassification | None = None
ticker_returns: list[float] | None = None
ticker_volumes: list[float] | None = None
source_accuracy_map: dict[str, float] | None = None
if probabilistic:
regime = await _classify_ticker_regime(pool, ticker)
logger.info(
"Regime for %s: %s (trend_indicator=%.1f, vol_ratio=%.2f, "
"bullish_threshold=%.2f, contradiction_mult=%.1f)",
ticker,
regime.regime.value,
regime.trend_indicator,
regime.volatility_ratio,
regime.bullish_threshold,
regime.contradiction_penalty_multiplier,
)
# Fetch market data (returns/volumes) for regime multiplier in scoring
# (Req 6.16.5). Fetched once per cycle and reused across all windows.
ticker_returns, ticker_volumes = await _fetch_ticker_market_data(pool, ticker)
# Batch-fetch source accuracy for all sources in the signal set
# (Req 4.14.3). Fetched once per cycle; individual signals look up
# their factor from this map. DB errors default to empty map (factor 1.0).
try:
# Fetch all source IDs from the longest window to cover all signals
longest_window = max(
cfg.effective_windows(),
key=lambda w: WINDOW_DURATIONS.get(w, timedelta(days=7)),
)
longest_duration = WINDOW_DURATIONS.get(longest_window, timedelta(days=90))
window_start = reference_time - longest_duration
all_impacts = await fetch_impact_records(pool, ticker, window_start, reference_time)
source_ids = list({imp.document_id for imp in all_impacts})
if source_ids:
sa_records = await fetch_source_accuracy(pool, source_ids)
source_accuracy_map = {
sid: sa.accuracy_factor for sid, sa in sa_records.items()
}
logger.info(
"Fetched source accuracy for %s: %d/%d sources have records",
ticker, len(sa_records), len(source_ids),
)
except Exception:
logger.warning(
"Failed to fetch source accuracy for %s — defaulting to neutral factor",
ticker,
exc_info=True,
)
source_accuracy_map = None
summaries: list[TrendSummary] = []
for window in cfg.effective_windows():
summary = await aggregate_company_window(
pool, ticker, window, reference_time, cfg,
probabilistic=probabilistic,
regime=regime,
source_accuracy_map=source_accuracy_map,
ticker_returns=ticker_returns,
ticker_volumes=ticker_volumes,
)
# When quality gate fails, annotate the trend summary so the
# recommendation engine forces paper mode (Req 11.2, 11.3).
if not quality_gate_passed:
ctx = summary.market_context
if isinstance(ctx, dict):
ctx["quality_gate_passed"] = False
elif ctx is not None and hasattr(ctx, "model_dump"):
ctx_dict = ctx.model_dump()
ctx_dict["quality_gate_passed"] = False
summary.market_context = ctx_dict
else:
summary.market_context = {"quality_gate_passed": False}
summaries.append(summary)
return summaries