Files
stonks-oracle/services/aggregation/scoring.py
T
2026-04-11 12:10:01 -07:00

285 lines
9.3 KiB
Python

"""Recency decay, source credibility weighting, and market context
integration for aggregation.
Provides scoring functions used by the aggregation engine to weight
document intelligence signals when computing trend summaries.
Requirements: 6.1, 6.2, 6.5
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from services.shared.schemas import MarketContext
@dataclass(frozen=True)
class ScoringConfig:
"""Tunable parameters for signal scoring."""
# Recency decay: exponential half-life in hours per window.
# After one half-life, a document's recency weight drops to 0.5.
half_life_hours: dict[str, float] = field(default_factory=lambda: {
"intraday": 2.0,
"1d": 12.0,
"7d": 72.0,
"30d": 240.0,
"90d": 720.0,
})
# Minimum recency weight — prevents very old docs from being zeroed out
# entirely so they can still contribute trace-level signal.
min_recency_weight: float = 0.01
# Source credibility bounds — credibility scores outside this range
# are clamped before weighting.
credibility_floor: float = 0.1
credibility_ceiling: float = 1.0
# Exponent applied to credibility score. >1 penalises low-credibility
# sources more aggressively; <1 flattens the curve.
credibility_exponent: float = 1.0
# Novelty bonus: multiplier range applied on top of base weight.
# A novelty_score of 1.0 gets the full bonus; 0.0 gets none.
novelty_bonus_max: float = 0.25
# Confidence floor — documents below this extraction confidence
# receive zero weight (they are too unreliable to aggregate).
confidence_floor: float = 0.2
# Market context modulation ---
# When volatility exceeds this threshold (in price units), recency
# signals are amplified because fast-moving markets make fresh data
# more important.
volatility_recency_boost_threshold: float = 1.0
volatility_recency_boost_max: float = 0.30 # max extra multiplier
# When volume surges above this % change, signals get a small boost
# because high-volume moves carry more conviction.
volume_surge_threshold_pct: float = 50.0
volume_surge_boost: float = 0.15
# Singleton default config
DEFAULT_CONFIG = ScoringConfig()
# ---------------------------------------------------------------------------
# Recency decay
# ---------------------------------------------------------------------------
def recency_weight(
published_at: datetime,
reference_time: datetime,
window: str,
config: ScoringConfig = DEFAULT_CONFIG,
) -> float:
"""Compute an exponential recency decay weight for a document.
Uses the formula: w = 2^(-age_hours / half_life)
Args:
published_at: When the document was published (tz-aware).
reference_time: The "now" anchor for the aggregation window (tz-aware).
window: One of the TrendWindow values (e.g. "7d").
config: Scoring parameters.
Returns:
A weight in [config.min_recency_weight, 1.0].
"""
# Ensure both are tz-aware; treat naive as UTC.
if published_at.tzinfo is None:
published_at = published_at.replace(tzinfo=timezone.utc)
if reference_time.tzinfo is None:
reference_time = reference_time.replace(tzinfo=timezone.utc)
age_seconds = (reference_time - published_at).total_seconds()
if age_seconds <= 0:
return 1.0
age_hours = age_seconds / 3600.0
half_life = config.half_life_hours.get(window, 72.0)
weight = math.pow(2.0, -age_hours / half_life)
return max(weight, config.min_recency_weight)
# ---------------------------------------------------------------------------
# Source credibility weighting
# ---------------------------------------------------------------------------
def credibility_weight(
source_credibility: float,
config: ScoringConfig = DEFAULT_CONFIG,
) -> float:
"""Compute a weight from a source's credibility score.
The raw credibility (0-1) is clamped to [floor, ceiling] then raised
to ``credibility_exponent``.
Args:
source_credibility: The credibility score from the source or
document intelligence record (0-1).
config: Scoring parameters.
Returns:
A weight in [floor^exp, ceiling^exp].
"""
clamped = max(config.credibility_floor, min(source_credibility, config.credibility_ceiling))
return math.pow(clamped, config.credibility_exponent)
# ---------------------------------------------------------------------------
# Market context adjustment
# ---------------------------------------------------------------------------
def market_context_multiplier(
market_ctx: MarketContext | None,
config: ScoringConfig = DEFAULT_CONFIG,
) -> float:
"""Compute a multiplicative adjustment from market context features.
Returns a value >= 1.0 that amplifies signal weights when market
conditions suggest heightened importance (high volatility or volume
surges). Returns 1.0 when no market context is available.
"""
if market_ctx is None or not market_ctx.has_data:
return 1.0
boost = 0.0
# Volatility boost — more volatile markets make recent signals more valuable
if market_ctx.volatility is not None and market_ctx.volatility > config.volatility_recency_boost_threshold:
excess = market_ctx.volatility - config.volatility_recency_boost_threshold
# Logarithmic scaling so extreme volatility doesn't blow up the weight
boost += min(
math.log1p(excess) * 0.15,
config.volatility_recency_boost_max,
)
# Volume surge boost
if market_ctx.volume_change_pct is not None and market_ctx.volume_change_pct > config.volume_surge_threshold_pct:
boost += config.volume_surge_boost
return 1.0 + boost
# ---------------------------------------------------------------------------
# Combined document signal weight
# ---------------------------------------------------------------------------
@dataclass
class SignalWeight:
"""Breakdown of a document's aggregation weight."""
recency: float
credibility: float
novelty_bonus: float
confidence_gate: float # 0.0 or 1.0
market_ctx_multiplier: float # >= 1.0
combined: float
def compute_signal_weight(
published_at: datetime,
reference_time: datetime,
window: str,
source_credibility: float,
novelty_score: float = 0.5,
extraction_confidence: float = 0.5,
market_ctx: MarketContext | None = None,
config: ScoringConfig = DEFAULT_CONFIG,
) -> SignalWeight:
"""Compute the combined aggregation weight for a single document signal.
The formula is:
combined = confidence_gate * recency * credibility
* (1 + novelty_bonus) * market_ctx_multiplier
where novelty_bonus = novelty_score * config.novelty_bonus_max
and market_ctx_multiplier >= 1.0 based on volatility/volume features.
Documents with extraction_confidence below config.confidence_floor
receive a combined weight of 0.0 (gated out).
Args:
published_at: Document publication time.
reference_time: Aggregation anchor time.
window: Trend window identifier.
source_credibility: Source credibility score (0-1).
novelty_score: Document novelty score (0-1).
extraction_confidence: Extraction confidence from the model (0-1).
market_ctx: Optional market context features for the symbol.
config: Scoring parameters.
Returns:
A ``SignalWeight`` with the component breakdown and combined score.
"""
# Confidence gate
gate = 1.0 if extraction_confidence >= config.confidence_floor else 0.0
rec = recency_weight(published_at, reference_time, window, config)
cred = credibility_weight(source_credibility, config)
bonus = novelty_score * config.novelty_bonus_max
mkt_mult = market_context_multiplier(market_ctx, config)
combined = gate * rec * cred * (1.0 + bonus) * mkt_mult
return SignalWeight(
recency=rec,
credibility=cred,
novelty_bonus=bonus,
confidence_gate=gate,
market_ctx_multiplier=mkt_mult,
combined=combined,
)
# ---------------------------------------------------------------------------
# Batch helpers
# ---------------------------------------------------------------------------
@dataclass
class WeightedSignal:
"""A document intelligence reference paired with its computed weight."""
document_id: str
weight: SignalWeight
sentiment_value: float # numeric sentiment: +1 positive, -1 negative, 0 neutral/mixed
impact_score: float
def sentiment_to_numeric(sentiment: str) -> float:
"""Map a sentiment label to a signed numeric value."""
mapping = {
"positive": 1.0,
"negative": -1.0,
"neutral": 0.0,
"mixed": 0.0,
}
return mapping.get(sentiment.lower(), 0.0)
def weighted_sentiment_average(signals: list[WeightedSignal]) -> float:
"""Compute a weight-adjusted average sentiment across signals.
Returns a value in [-1, 1]. Returns 0.0 when total weight is zero.
"""
total_weight = 0.0
weighted_sum = 0.0
for sig in signals:
w = sig.weight.combined * sig.impact_score
weighted_sum += w * sig.sentiment_value
total_weight += w
if total_weight == 0.0:
return 0.0
return weighted_sum / total_weight