Files
stonks-oracle/services/recommendation/suppression.py
T

242 lines
8.6 KiB
Python

"""Suppression logic for low-quality data or low confidence.
Evaluates the quality of the underlying data feeding a trend summary
and suppresses automated trade eligibility when data quality is poor.
Suppressed recommendations are marked as informational only.
This layer runs *before* the eligibility engine and acts as a pre-filter
on data quality. The eligibility engine handles signal-level thresholds
(confidence, strength, contradiction); this module handles data-level
quality concerns (stale evidence, low extraction quality, poor source
diversity, insufficient valid documents).
Requirements: 7.4
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from services.shared.schemas import TrendSummary
logger = logging.getLogger(__name__)
class SuppressionReason(str, Enum):
"""Why a recommendation was suppressed due to data quality."""
LOW_DATA_CONFIDENCE = "low_data_confidence"
STALE_EVIDENCE = "stale_evidence"
LOW_SOURCE_DIVERSITY = "low_source_diversity"
HIGH_EXTRACTION_FAILURE_RATE = "high_extraction_failure_rate"
INSUFFICIENT_VALID_DOCUMENTS = "insufficient_valid_documents"
@dataclass(frozen=True)
class SuppressionConfig:
"""Tunable thresholds for data quality suppression.
These thresholds focus on the quality of the *input data* rather
than the trend signal itself (which is handled by EligibilityConfig).
"""
# Minimum average extraction confidence across evidence documents.
# Below this, the underlying data is too unreliable for trade decisions.
min_avg_extraction_confidence: float = 0.40
# Maximum age (hours) of the most recent evidence document.
# If the freshest evidence is older than this, the trend is stale.
max_evidence_staleness_hours: float = 168.0 # 7 days
# Minimum number of distinct source types (e.g. news, filings, market)
# represented in the evidence. Low diversity means the signal may be
# driven by a single unreliable source class.
min_source_types: int = 1
# Maximum tolerable extraction failure rate (0-1).
# If more than this fraction of documents failed extraction,
# the data pipeline is unreliable for this ticker.
max_extraction_failure_rate: float = 0.50
# Minimum number of valid (non-failed) documents that contributed
# to the trend. Below this, there isn't enough data to act on.
min_valid_documents: int = 2
# Overall data quality confidence threshold.
# The computed data quality score must exceed this for the
# recommendation to be eligible for automated trading.
min_data_quality_score: float = 0.30
DEFAULT_SUPPRESSION_CONFIG = SuppressionConfig()
@dataclass
class DataQualityContext:
"""Quality metrics about the data underlying a trend summary.
Populated by querying document and extraction metadata for the
ticker and window. When not available from the database, callers
can construct this from the trend summary itself.
"""
total_documents: int = 0
valid_documents: int = 0
failed_documents: int = 0
avg_extraction_confidence: float = 0.0
newest_evidence_at: datetime | None = None
source_types: set[str] = field(default_factory=set)
@dataclass
class SuppressionResult:
"""Output of the suppression evaluation."""
suppressed: bool
reasons: list[SuppressionReason] = field(default_factory=list)
data_quality_score: float = 0.0
context: DataQualityContext | None = None
def build_quality_context_from_summary(
summary: TrendSummary,
) -> DataQualityContext:
"""Build a minimal DataQualityContext from a TrendSummary.
This is a fallback when full document-level quality metrics aren't
available. It uses the trend summary's evidence counts and confidence
as proxies.
"""
total = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
return DataQualityContext(
total_documents=total,
valid_documents=total,
failed_documents=0,
avg_extraction_confidence=summary.confidence,
newest_evidence_at=summary.generated_at,
source_types=set(),
)
def _compute_data_quality_score(
ctx: DataQualityContext,
config: SuppressionConfig,
reference_time: datetime,
) -> float:
"""Compute an overall data quality score from the context.
Returns a value in [0, 1] where higher is better quality.
Components:
- Extraction confidence (40% weight)
- Evidence freshness (30% weight)
- Document coverage (30% weight)
"""
# Extraction confidence component
conf_component = min(ctx.avg_extraction_confidence / 0.8, 1.0)
# Freshness component
if ctx.newest_evidence_at is not None:
if ctx.newest_evidence_at.tzinfo is None:
newest = ctx.newest_evidence_at.replace(tzinfo=timezone.utc)
else:
newest = ctx.newest_evidence_at
age_hours = (reference_time - newest).total_seconds() / 3600.0
max_hours = config.max_evidence_staleness_hours
freshness_component = max(0.0, 1.0 - (age_hours / max_hours))
else:
freshness_component = 0.0
# Document coverage component
if ctx.total_documents > 0:
valid_ratio = ctx.valid_documents / ctx.total_documents
count_factor = min(ctx.valid_documents / 10.0, 1.0)
coverage_component = valid_ratio * count_factor
else:
coverage_component = 0.0
score = (0.4 * conf_component) + (0.3 * freshness_component) + (0.3 * coverage_component)
return round(max(0.0, min(1.0, score)), 4)
def evaluate_suppression(
summary: TrendSummary,
quality_ctx: DataQualityContext | None = None,
config: SuppressionConfig = DEFAULT_SUPPRESSION_CONFIG,
reference_time: datetime | None = None,
) -> SuppressionResult:
"""Evaluate whether a recommendation should be suppressed due to data quality.
Checks multiple data quality dimensions and returns a SuppressionResult
indicating whether the recommendation should be suppressed and why.
Args:
summary: The trend summary to evaluate.
quality_ctx: Data quality context. If None, a minimal context is
built from the trend summary itself.
config: Suppression thresholds.
reference_time: Reference time for staleness checks.
Returns:
SuppressionResult with suppression decision and reasons.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
ctx = quality_ctx or build_quality_context_from_summary(summary)
reasons: list[SuppressionReason] = []
# Check average extraction confidence
if ctx.avg_extraction_confidence < config.min_avg_extraction_confidence:
reasons.append(SuppressionReason.LOW_DATA_CONFIDENCE)
# Check evidence staleness
if ctx.newest_evidence_at is not None:
newest = ctx.newest_evidence_at
if newest.tzinfo is None:
newest = newest.replace(tzinfo=timezone.utc)
age_hours = (reference_time - newest).total_seconds() / 3600.0
if age_hours > config.max_evidence_staleness_hours:
reasons.append(SuppressionReason.STALE_EVIDENCE)
elif ctx.total_documents > 0:
# Have documents but no timestamp — treat as stale
reasons.append(SuppressionReason.STALE_EVIDENCE)
# Check source diversity
if len(ctx.source_types) < config.min_source_types and ctx.total_documents > 0:
reasons.append(SuppressionReason.LOW_SOURCE_DIVERSITY)
# Check extraction failure rate
if ctx.total_documents > 0:
failure_rate = ctx.failed_documents / ctx.total_documents
if failure_rate > config.max_extraction_failure_rate:
reasons.append(SuppressionReason.HIGH_EXTRACTION_FAILURE_RATE)
# Check minimum valid documents
if ctx.valid_documents < config.min_valid_documents:
reasons.append(SuppressionReason.INSUFFICIENT_VALID_DOCUMENTS)
# Compute overall data quality score
quality_score = _compute_data_quality_score(ctx, config, reference_time)
# If quality score is below threshold, add a general suppression reason
if quality_score < config.min_data_quality_score and SuppressionReason.LOW_DATA_CONFIDENCE not in reasons:
reasons.append(SuppressionReason.LOW_DATA_CONFIDENCE)
suppressed = len(reasons) > 0
if suppressed:
logger.info(
"Recommendation suppressed for %s/%s: reasons=%s quality_score=%.3f",
summary.entity_id, summary.window.value,
[r.value for r in reasons], quality_score,
)
return SuppressionResult(
suppressed=suppressed,
reasons=reasons,
data_quality_score=quality_score,
context=ctx,
)