"""Recency decay, source credibility weighting, and market context integration for aggregation. Provides scoring functions used by the aggregation engine to weight document intelligence signals when computing trend summaries. Requirements: 2.1–2.6, 3.1–3.5, 4.2–4.3, 5.1–5.7, 6.1–6.5, 16.4–16.5 """ from __future__ import annotations import math from dataclasses import dataclass, field from datetime import datetime, timezone from services.shared.schemas import MarketContext # --------------------------------------------------------------------------- # Event type base rates for information gain computation (Req 3.1) # --------------------------------------------------------------------------- EVENT_TYPE_BASE_RATES: dict[str, float] = { "earnings": 0.25, "product_launch": 0.10, "regulatory": 0.08, "legal": 0.05, "m_and_a": 0.03, "management_change": 0.06, "partnership": 0.12, "market_expansion": 0.09, "restructuring": 0.04, "dividend": 0.15, } DEFAULT_BASE_RATE = 0.1 @dataclass(frozen=True) class ScoringConfig: """Tunable parameters for signal scoring.""" # Recency decay: exponential half-life in hours per window. # After one half-life, a document's recency weight drops to 0.5. half_life_hours: dict[str, float] = field(default_factory=lambda: { "intraday": 2.0, "1d": 12.0, "7d": 72.0, "30d": 240.0, "90d": 720.0, }) # Minimum recency weight — prevents very old docs from being zeroed out # entirely so they can still contribute trace-level signal. min_recency_weight: float = 0.01 # Source credibility bounds — credibility scores outside this range # are clamped before weighting. credibility_floor: float = 0.1 credibility_ceiling: float = 1.0 # Exponent applied to credibility score. >1 penalises low-credibility # sources more aggressively; <1 flattens the curve. credibility_exponent: float = 1.0 # Novelty bonus: multiplier range applied on top of base weight. # A novelty_score of 1.0 gets the full bonus; 0.0 gets none. novelty_bonus_max: float = 0.25 # Confidence floor — documents below this extraction confidence # receive zero weight (they are too unreliable to aggregate). confidence_floor: float = 0.2 # Market context modulation --- # When volatility exceeds this threshold (in price units), recency # signals are amplified because fast-moving markets make fresh data # more important. volatility_recency_boost_threshold: float = 1.0 volatility_recency_boost_max: float = 0.30 # max extra multiplier # When volume surges above this % change, signals get a small boost # because high-volume moves carry more conviction. volume_surge_threshold_pct: float = 50.0 volume_surge_boost: float = 0.15 # --- Probabilistic scoring parameters --- # Toggle: when True, use probabilistic formulas (sigmoid gate, # adaptive decay, info gain, regime multiplier, source accuracy). # When False, preserve exact current heuristic behaviour. probabilistic: bool = False # Sigmoid gate parameters — smooth replacement for binary confidence gate. # Gate value: σ(k·(x - midpoint)) where k = steepness. sigmoid_steepness: float = 5.0 sigmoid_midpoint: float = 0.5 # Information gain parameters — surprise weighting for rare events. # r = 1 + λ·(-log₂ P(event_type)), clamped to info_gain_max. info_gain_lambda: float = 0.3 info_gain_max: float = 3.0 default_base_rate: float = 0.1 # Adaptive decay parameters — β scaling factors for event-specific # half-life adjustment: τ_i = τ_base · (1+β_impact)·(1+β_surprise)·(1+β_market). adaptive_decay_impact_scale: float = 1.0 adaptive_decay_surprise_scale: float = 1.0 adaptive_decay_market_scale: float = 0.5 # Regime multiplier parameters — replaces market context multiplier. # M_regime = 1 + regime_return_weight·|z_r| + regime_volume_weight·|z_v|, # clamped to [1.0, regime_multiplier_max]. regime_return_weight: float = 0.15 regime_volume_weight: float = 0.10 regime_multiplier_max: float = 2.5 # Singleton default config DEFAULT_CONFIG = ScoringConfig() # --------------------------------------------------------------------------- # Recency decay # --------------------------------------------------------------------------- def recency_weight( published_at: datetime, reference_time: datetime, window: str, config: ScoringConfig = DEFAULT_CONFIG, *, half_life_override: float | None = None, ) -> float: """Compute an exponential recency decay weight for a document. Uses the formula: w = 2^(-age_hours / half_life) Args: published_at: When the document was published (tz-aware). reference_time: The "now" anchor for the aggregation window (tz-aware). window: One of the TrendWindow values (e.g. "7d"). config: Scoring parameters. half_life_override: If provided, use this half-life instead of the window-based default (used for adaptive decay). Returns: A weight in [config.min_recency_weight, 1.0]. """ # Ensure both are tz-aware; treat naive as UTC. if published_at.tzinfo is None: published_at = published_at.replace(tzinfo=timezone.utc) if reference_time.tzinfo is None: reference_time = reference_time.replace(tzinfo=timezone.utc) age_seconds = (reference_time - published_at).total_seconds() if age_seconds <= 0: return 1.0 age_hours = age_seconds / 3600.0 half_life = half_life_override if half_life_override is not None else config.half_life_hours.get(window, 72.0) weight = math.pow(2.0, -age_hours / half_life) return max(weight, config.min_recency_weight) # --------------------------------------------------------------------------- # Source credibility weighting # --------------------------------------------------------------------------- def credibility_weight( source_credibility: float, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute a weight from a source's credibility score. The raw credibility (0-1) is clamped to [floor, ceiling] then raised to ``credibility_exponent``. Args: source_credibility: The credibility score from the source or document intelligence record (0-1). config: Scoring parameters. Returns: A weight in [floor^exp, ceiling^exp]. """ clamped = max(config.credibility_floor, min(source_credibility, config.credibility_ceiling)) return math.pow(clamped, config.credibility_exponent) # --------------------------------------------------------------------------- # Market context adjustment # --------------------------------------------------------------------------- def market_context_multiplier( market_ctx: MarketContext | None, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute a multiplicative adjustment from market context features. Returns a value >= 1.0 that amplifies signal weights when market conditions suggest heightened importance (high volatility or volume surges). Returns 1.0 when no market context is available. """ if market_ctx is None or not market_ctx.has_data: return 1.0 boost = 0.0 # Volatility boost — more volatile markets make recent signals more valuable if market_ctx.volatility is not None and market_ctx.volatility > config.volatility_recency_boost_threshold: excess = market_ctx.volatility - config.volatility_recency_boost_threshold # Logarithmic scaling so extreme volatility doesn't blow up the weight boost += min( math.log1p(excess) * 0.15, config.volatility_recency_boost_max, ) # Volume surge boost if market_ctx.volume_change_pct is not None and market_ctx.volume_change_pct > config.volume_surge_threshold_pct: boost += config.volume_surge_boost return 1.0 + boost # --------------------------------------------------------------------------- # Sigmoid confidence gate (Req 2.1–2.6) # --------------------------------------------------------------------------- def sigmoid_gate( x: float, steepness: float = 5.0, midpoint: float = 0.5, ) -> float: """Smooth sigmoid confidence gate: σ(k·(x - midpoint)). Replaces the binary 0/1 confidence gate in probabilistic mode. Returns a value in (0, 1) — higher confidence produces higher gate. Args: x: Extraction confidence value, typically in [0, 1]. steepness: Steepness parameter k (default 5.0). midpoint: Midpoint of the sigmoid transition (default 0.5). Returns: Gate value in (0, 1). """ z = steepness * (x - midpoint) # Guard against overflow in exp for very negative z if z < -500.0: return 0.0 if z > 500.0: return 1.0 return 1.0 / (1.0 + math.exp(-z)) # --------------------------------------------------------------------------- # Information gain surprise weighting (Req 3.1–3.5) # --------------------------------------------------------------------------- def compute_info_gain( event_type: str | None, lambda_param: float = 0.3, max_gain: float = 3.0, default_base_rate: float = 0.1, ) -> float: """Compute information gain factor for an event type. Formula: r = 1 + λ·(-log₂ P(event_type)), clamped to [1.0, max_gain]. Rarer events produce higher surprise weight. Unknown event types use the default base rate. Args: event_type: Event type string (e.g. "earnings", "m_and_a"). lambda_param: Scaling parameter λ (default 0.3). max_gain: Maximum clamp for the info gain factor (default 3.0). default_base_rate: Fallback base rate for unknown event types. Returns: Information gain factor r in [1.0, max_gain]. """ if event_type is None: return 1.0 base_rate = EVENT_TYPE_BASE_RATES.get(event_type, default_base_rate) # Guard against log₂(0) — base rates must be > 0 if base_rate <= 0.0: base_rate = default_base_rate if base_rate <= 0.0: return 1.0 surprise = -math.log2(base_rate) r = 1.0 + lambda_param * surprise return min(max(r, 1.0), max_gain) # --------------------------------------------------------------------------- # Adaptive recency decay (Req 5.1–5.7) # --------------------------------------------------------------------------- def compute_adaptive_half_life( base_half_life: float, impact_score: float, info_gain_factor: float, market_multiplier: float, config: ScoringConfig, ) -> float: """Compute adaptive half-life for event-specific recency decay. Formula: τ_i = τ_base · (1 + β_impact) · (1 + β_surprise) · (1 + β_market) The adaptive half-life is always >= base_half_life (decay is never faster). Args: base_half_life: Fixed half-life for the window (hours). impact_score: Signal impact score in [0, 1]. info_gain_factor: Information gain factor r in [1.0, 3.0]. market_multiplier: Market context/regime multiplier in [1.0, ~2.5]. config: Scoring config with adaptive decay scale parameters. Returns: Adaptive half-life in hours, >= base_half_life. """ # β_impact: impact_score scaled linearly 0→0, 1→adaptive_decay_impact_scale beta_impact = impact_score * config.adaptive_decay_impact_scale # β_surprise: info_gain_factor scaled linearly r=1→0, r=3→adaptive_decay_surprise_scale beta_surprise = ((info_gain_factor - 1.0) / 2.0) * config.adaptive_decay_surprise_scale # β_market: market_multiplier scaled linearly 1.0→0, 1.45→adaptive_decay_market_scale if market_multiplier > 1.0: beta_market = ((market_multiplier - 1.0) / 0.45) * config.adaptive_decay_market_scale else: beta_market = 0.0 tau = base_half_life * (1.0 + beta_impact) * (1.0 + beta_surprise) * (1.0 + beta_market) # Ensure adaptive half-life is never less than base (Property 5) return max(tau, base_half_life) # --------------------------------------------------------------------------- # Regime multiplier (Req 6.1–6.5) # --------------------------------------------------------------------------- def compute_regime_multiplier( returns: list[float] | None, volumes: list[float] | None, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute regime-aware multiplier from return and volume z-scores. Formula: M_regime = 1 + 0.15·|z_r| + 0.10·|z_v|, clamped to [1.0, max]. Args: returns: List of recent daily returns (at least 20 values for z-score). volumes: List of recent daily volumes (at least 20 values for z-score). config: Scoring config with regime multiplier parameters. Returns: Regime multiplier in [1.0, config.regime_multiplier_max]. """ if not returns or len(returns) < 2: return 1.0 # Filter out NaN values from returns clean_returns = [r for r in returns if not math.isnan(r)] if len(clean_returns) < 2: return 1.0 # Return z-score: z_r = (r_t - μ_20) / σ_20 r_window = clean_returns[-20:] if len(clean_returns) >= 20 else clean_returns r_t = clean_returns[-1] mu_r = sum(r_window) / len(r_window) var_r = sum((x - mu_r) ** 2 for x in r_window) / len(r_window) sigma_r = math.sqrt(var_r) z_r = 0.0 if sigma_r > 0.0: z_r = (r_t - mu_r) / sigma_r # Volume z-score: z_v = (log(V_t) - μ_V) / σ_V z_v = 0.0 if volumes and len(volumes) >= 2: clean_volumes = [v for v in volumes if not math.isnan(v)] if len(clean_volumes) >= 2: v_window = clean_volumes[-20:] if len(clean_volumes) >= 20 else clean_volumes # Use log-volumes, guard against zero/negative volumes log_vols = [math.log(max(v, 1.0)) for v in v_window] log_v_t = math.log(max(clean_volumes[-1], 1.0)) mu_v = sum(log_vols) / len(log_vols) var_v = sum((x - mu_v) ** 2 for x in log_vols) / len(log_vols) sigma_v = math.sqrt(var_v) if sigma_v > 0.0: z_v = (log_v_t - mu_v) / sigma_v m_regime = 1.0 + config.regime_return_weight * abs(z_r) + config.regime_volume_weight * abs(z_v) # Guard against NaN propagation from upstream data if math.isnan(m_regime) or math.isinf(m_regime): return 1.0 return max(1.0, min(m_regime, config.regime_multiplier_max)) # --------------------------------------------------------------------------- # Combined document signal weight # --------------------------------------------------------------------------- @dataclass class SignalWeight: """Breakdown of a document's aggregation weight.""" recency: float credibility: float novelty_bonus: float confidence_gate: float # 0.0 or 1.0 market_ctx_multiplier: float # >= 1.0 combined: float # New optional fields for probabilistic mode sigmoid_gate: float | None = None # Smooth gate value [0, 1] info_gain_factor: float = 1.0 # Surprise multiplier source_accuracy_factor: float = 1.0 # Historical accuracy multiplier regime_multiplier: float | None = None # M_regime replacing M_context def compute_signal_weight( published_at: datetime, reference_time: datetime, window: str, source_credibility: float, novelty_score: float = 0.5, extraction_confidence: float = 0.5, market_ctx: MarketContext | None = None, config: ScoringConfig = DEFAULT_CONFIG, *, event_type: str | None = None, impact_score: float = 0.5, source_accuracy_factor: float = 1.0, returns: list[float] | None = None, volumes: list[float] | None = None, ) -> SignalWeight: """Compute the combined aggregation weight for a single document signal. When ``config.probabilistic`` is False (default), the formula is: combined = confidence_gate * recency * credibility * (1 + novelty_bonus) * market_ctx_multiplier When ``config.probabilistic`` is True, the formula is: combined = sigmoid_gate * recency(adaptive) * credibility * (1 + novelty_bonus) * info_gain * source_accuracy * regime_multiplier Args: published_at: Document publication time. reference_time: Aggregation anchor time. window: Trend window identifier. source_credibility: Source credibility score (0-1). novelty_score: Document novelty score (0-1). extraction_confidence: Extraction confidence from the model (0-1). market_ctx: Optional market context features for the symbol. config: Scoring parameters. event_type: Optional event type for information gain computation. impact_score: Signal impact score in [0, 1] (default 0.5). source_accuracy_factor: Historical source accuracy factor (default 1.0). returns: Optional list of recent daily returns for regime multiplier. volumes: Optional list of recent daily volumes for regime multiplier. Returns: A ``SignalWeight`` with the component breakdown and combined score. """ cred = credibility_weight(source_credibility, config) bonus = novelty_score * config.novelty_bonus_max if not config.probabilistic: # --- Heuristic mode: preserve exact current formula --- gate = 1.0 if extraction_confidence >= config.confidence_floor else 0.0 rec = recency_weight(published_at, reference_time, window, config) mkt_mult = market_context_multiplier(market_ctx, config) combined = gate * rec * cred * (1.0 + bonus) * mkt_mult return SignalWeight( recency=rec, credibility=cred, novelty_bonus=bonus, confidence_gate=gate, market_ctx_multiplier=mkt_mult, combined=combined, ) # --- Probabilistic mode --- # 1. Sigmoid confidence gate (Req 2.1–2.5) sg = sigmoid_gate(extraction_confidence, config.sigmoid_steepness, config.sigmoid_midpoint) # 2. Information gain factor (Req 3.1–3.5) ig = compute_info_gain( event_type, lambda_param=config.info_gain_lambda, max_gain=config.info_gain_max, default_base_rate=config.default_base_rate, ) # 3. Regime multiplier (Req 6.1–6.5) — replaces market_context_multiplier rm = compute_regime_multiplier(returns, volumes, config) # 4. Adaptive recency decay (Req 5.1–5.7) base_half_life = config.half_life_hours.get(window, 72.0) adaptive_hl = compute_adaptive_half_life( base_half_life=base_half_life, impact_score=impact_score, info_gain_factor=ig, market_multiplier=rm, config=config, ) rec = recency_weight( published_at, reference_time, window, config, half_life_override=adaptive_hl, ) # 5. Source accuracy factor (Req 4.2–4.3) saf = source_accuracy_factor # 6. Combined weight combined = sg * rec * cred * (1.0 + bonus) * ig * saf * rm return SignalWeight( recency=rec, credibility=cred, novelty_bonus=bonus, confidence_gate=sg, # sigmoid gate value in probabilistic mode market_ctx_multiplier=rm, # regime multiplier stored here for compat combined=combined, sigmoid_gate=sg, info_gain_factor=ig, source_accuracy_factor=saf, regime_multiplier=rm, ) # --------------------------------------------------------------------------- # Batch helpers # --------------------------------------------------------------------------- @dataclass class WeightedSignal: """A document intelligence reference paired with its computed weight.""" document_id: str weight: SignalWeight sentiment_value: float # numeric sentiment: +1 positive, -1 negative, 0 neutral/mixed impact_score: float # New optional fields for probabilistic mode info_gain_factor: float = 1.0 # r = 1 + λ·(-log₂ P(event_type)) source_accuracy_factor: float = 1.0 # [0.5, 1.5] from historical accuracy adaptive_half_life: float | None = None # τ_i when adaptive decay is active def sentiment_to_numeric(sentiment: str) -> float: """Map a sentiment label to a signed numeric value.""" mapping = { "positive": 1.0, "negative": -1.0, "neutral": 0.0, "mixed": 0.0, } return mapping.get(sentiment.lower(), 0.0) def weighted_sentiment_average(signals: list[WeightedSignal]) -> float: """Compute a weight-adjusted average sentiment across signals. Returns a value in [-1, 1]. Returns 0.0 when total weight is zero. """ total_weight = 0.0 weighted_sum = 0.0 for sig in signals: w = sig.weight.combined * sig.impact_score weighted_sum += w * sig.sentiment_value total_weight += w if total_weight == 0.0: return 0.0 return weighted_sum / total_weight