"""Recency decay, source credibility weighting, and market context integration for aggregation. Provides scoring functions used by the aggregation engine to weight document intelligence signals when computing trend summaries. Requirements: 6.1, 6.2, 6.5 """ from __future__ import annotations import math from dataclasses import dataclass, field from datetime import datetime, timezone from services.shared.schemas import MarketContext @dataclass(frozen=True) class ScoringConfig: """Tunable parameters for signal scoring.""" # Recency decay: exponential half-life in hours per window. # After one half-life, a document's recency weight drops to 0.5. half_life_hours: dict[str, float] = field(default_factory=lambda: { "intraday": 2.0, "1d": 12.0, "7d": 72.0, "30d": 240.0, "90d": 720.0, }) # Minimum recency weight — prevents very old docs from being zeroed out # entirely so they can still contribute trace-level signal. min_recency_weight: float = 0.01 # Source credibility bounds — credibility scores outside this range # are clamped before weighting. credibility_floor: float = 0.1 credibility_ceiling: float = 1.0 # Exponent applied to credibility score. >1 penalises low-credibility # sources more aggressively; <1 flattens the curve. credibility_exponent: float = 1.0 # Novelty bonus: multiplier range applied on top of base weight. # A novelty_score of 1.0 gets the full bonus; 0.0 gets none. novelty_bonus_max: float = 0.25 # Confidence floor — documents below this extraction confidence # receive zero weight (they are too unreliable to aggregate). confidence_floor: float = 0.2 # Market context modulation --- # When volatility exceeds this threshold (in price units), recency # signals are amplified because fast-moving markets make fresh data # more important. volatility_recency_boost_threshold: float = 1.0 volatility_recency_boost_max: float = 0.30 # max extra multiplier # When volume surges above this % change, signals get a small boost # because high-volume moves carry more conviction. volume_surge_threshold_pct: float = 50.0 volume_surge_boost: float = 0.15 # Singleton default config DEFAULT_CONFIG = ScoringConfig() # --------------------------------------------------------------------------- # Recency decay # --------------------------------------------------------------------------- def recency_weight( published_at: datetime, reference_time: datetime, window: str, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute an exponential recency decay weight for a document. Uses the formula: w = 2^(-age_hours / half_life) Args: published_at: When the document was published (tz-aware). reference_time: The "now" anchor for the aggregation window (tz-aware). window: One of the TrendWindow values (e.g. "7d"). config: Scoring parameters. Returns: A weight in [config.min_recency_weight, 1.0]. """ # Ensure both are tz-aware; treat naive as UTC. if published_at.tzinfo is None: published_at = published_at.replace(tzinfo=timezone.utc) if reference_time.tzinfo is None: reference_time = reference_time.replace(tzinfo=timezone.utc) age_seconds = (reference_time - published_at).total_seconds() if age_seconds <= 0: return 1.0 age_hours = age_seconds / 3600.0 half_life = config.half_life_hours.get(window, 72.0) weight = math.pow(2.0, -age_hours / half_life) return max(weight, config.min_recency_weight) # --------------------------------------------------------------------------- # Source credibility weighting # --------------------------------------------------------------------------- def credibility_weight( source_credibility: float, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute a weight from a source's credibility score. The raw credibility (0-1) is clamped to [floor, ceiling] then raised to ``credibility_exponent``. Args: source_credibility: The credibility score from the source or document intelligence record (0-1). config: Scoring parameters. Returns: A weight in [floor^exp, ceiling^exp]. """ clamped = max(config.credibility_floor, min(source_credibility, config.credibility_ceiling)) return math.pow(clamped, config.credibility_exponent) # --------------------------------------------------------------------------- # Market context adjustment # --------------------------------------------------------------------------- def market_context_multiplier( market_ctx: MarketContext | None, config: ScoringConfig = DEFAULT_CONFIG, ) -> float: """Compute a multiplicative adjustment from market context features. Returns a value >= 1.0 that amplifies signal weights when market conditions suggest heightened importance (high volatility or volume surges). Returns 1.0 when no market context is available. """ if market_ctx is None or not market_ctx.has_data: return 1.0 boost = 0.0 # Volatility boost — more volatile markets make recent signals more valuable if market_ctx.volatility is not None and market_ctx.volatility > config.volatility_recency_boost_threshold: excess = market_ctx.volatility - config.volatility_recency_boost_threshold # Logarithmic scaling so extreme volatility doesn't blow up the weight boost += min( math.log1p(excess) * 0.15, config.volatility_recency_boost_max, ) # Volume surge boost if market_ctx.volume_change_pct is not None and market_ctx.volume_change_pct > config.volume_surge_threshold_pct: boost += config.volume_surge_boost return 1.0 + boost # --------------------------------------------------------------------------- # Combined document signal weight # --------------------------------------------------------------------------- @dataclass class SignalWeight: """Breakdown of a document's aggregation weight.""" recency: float credibility: float novelty_bonus: float confidence_gate: float # 0.0 or 1.0 market_ctx_multiplier: float # >= 1.0 combined: float def compute_signal_weight( published_at: datetime, reference_time: datetime, window: str, source_credibility: float, novelty_score: float = 0.5, extraction_confidence: float = 0.5, market_ctx: MarketContext | None = None, config: ScoringConfig = DEFAULT_CONFIG, ) -> SignalWeight: """Compute the combined aggregation weight for a single document signal. The formula is: combined = confidence_gate * recency * credibility * (1 + novelty_bonus) * market_ctx_multiplier where novelty_bonus = novelty_score * config.novelty_bonus_max and market_ctx_multiplier >= 1.0 based on volatility/volume features. Documents with extraction_confidence below config.confidence_floor receive a combined weight of 0.0 (gated out). Args: published_at: Document publication time. reference_time: Aggregation anchor time. window: Trend window identifier. source_credibility: Source credibility score (0-1). novelty_score: Document novelty score (0-1). extraction_confidence: Extraction confidence from the model (0-1). market_ctx: Optional market context features for the symbol. config: Scoring parameters. Returns: A ``SignalWeight`` with the component breakdown and combined score. """ # Confidence gate gate = 1.0 if extraction_confidence >= config.confidence_floor else 0.0 rec = recency_weight(published_at, reference_time, window, config) cred = credibility_weight(source_credibility, config) bonus = novelty_score * config.novelty_bonus_max mkt_mult = market_context_multiplier(market_ctx, config) combined = gate * rec * cred * (1.0 + bonus) * mkt_mult return SignalWeight( recency=rec, credibility=cred, novelty_bonus=bonus, confidence_gate=gate, market_ctx_multiplier=mkt_mult, combined=combined, ) # --------------------------------------------------------------------------- # Batch helpers # --------------------------------------------------------------------------- @dataclass class WeightedSignal: """A document intelligence reference paired with its computed weight.""" document_id: str weight: SignalWeight sentiment_value: float # numeric sentiment: +1 positive, -1 negative, 0 neutral/mixed impact_score: float def sentiment_to_numeric(sentiment: str) -> float: """Map a sentiment label to a signed numeric value.""" mapping = { "positive": 1.0, "negative": -1.0, "neutral": 0.0, "mixed": 0.0, } return mapping.get(sentiment.lower(), 0.0) def weighted_sentiment_average(signals: list[WeightedSignal]) -> float: """Compute a weight-adjusted average sentiment across signals. Returns a value in [-1, 1]. Returns 0.0 when total weight is zero. """ total_weight = 0.0 weighted_sum = 0.0 for sig in signals: w = sig.weight.combined * sig.impact_score weighted_sum += w * sig.sentiment_value total_weight += w if total_weight == 0.0: return 0.0 return weighted_sum / total_weight