Files
stonks-oracle/services/recommendation/suppression.py
T

358 lines
13 KiB
Python

"""Suppression logic for low-quality data or low confidence.
Evaluates the quality of the underlying data feeding a trend summary
and suppresses automated trade eligibility when data quality is poor.
Suppressed recommendations are marked as informational only.
This layer runs *before* the eligibility engine and acts as a pre-filter
on data quality. The eligibility engine handles signal-level thresholds
(confidence, strength, contradiction); this module handles data-level
quality concerns (stale evidence, low extraction quality, poor source
diversity, insufficient valid documents).
Requirements: 7.4
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from services.shared.schemas import TrendSummary
logger = logging.getLogger(__name__)
class SuppressionReason(str, Enum):
"""Why a recommendation was suppressed due to data quality."""
LOW_DATA_CONFIDENCE = "low_data_confidence"
STALE_EVIDENCE = "stale_evidence"
LOW_SOURCE_DIVERSITY = "low_source_diversity"
HIGH_EXTRACTION_FAILURE_RATE = "high_extraction_failure_rate"
INSUFFICIENT_VALID_DOCUMENTS = "insufficient_valid_documents"
MACRO_ONLY_SIGNAL = "macro_only_signal"
PATTERN_ONLY_SIGNAL = "pattern_only_signal"
@dataclass(frozen=True)
class SuppressionConfig:
"""Tunable thresholds for data quality suppression.
These thresholds focus on the quality of the *input data* rather
than the trend signal itself (which is handled by EligibilityConfig).
"""
# Minimum average extraction confidence across evidence documents.
# Below this, the underlying data is too unreliable for trade decisions.
min_avg_extraction_confidence: float = 0.40
# Maximum age (hours) of the most recent evidence document.
# If the freshest evidence is older than this, the trend is stale.
max_evidence_staleness_hours: float = 168.0 # 7 days
# Minimum number of distinct source types (e.g. news, filings, market)
# represented in the evidence. Low diversity means the signal may be
# driven by a single unreliable source class.
min_source_types: int = 1
# Maximum tolerable extraction failure rate (0-1).
# If more than this fraction of documents failed extraction,
# the data pipeline is unreliable for this ticker.
max_extraction_failure_rate: float = 0.50
# Minimum number of valid (non-failed) documents that contributed
# to the trend. Below this, there isn't enough data to act on.
min_valid_documents: int = 2
# Overall data quality confidence threshold.
# The computed data quality score must exceed this for the
# recommendation to be eligible for automated trading.
min_data_quality_score: float = 0.30
DEFAULT_SUPPRESSION_CONFIG = SuppressionConfig()
@dataclass
class DataQualityContext:
"""Quality metrics about the data underlying a trend summary.
Populated by querying document and extraction metadata for the
ticker and window. When not available from the database, callers
can construct this from the trend summary itself.
"""
total_documents: int = 0
valid_documents: int = 0
failed_documents: int = 0
avg_extraction_confidence: float = 0.0
newest_evidence_at: datetime | None = None
source_types: set[str] = field(default_factory=set)
@dataclass
class SuppressionResult:
"""Output of the suppression evaluation."""
suppressed: bool
reasons: list[SuppressionReason] = field(default_factory=list)
data_quality_score: float = 0.0
context: DataQualityContext | None = None
def build_quality_context_from_summary(
summary: TrendSummary,
) -> DataQualityContext:
"""Build a minimal DataQualityContext from a TrendSummary.
This is a fallback when full document-level quality metrics aren't
available. It uses the trend summary's evidence counts and confidence
as proxies. We assume at least one source type contributed so that
the fallback does not automatically trigger LOW_SOURCE_DIVERSITY.
"""
total = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
return DataQualityContext(
total_documents=total,
valid_documents=total,
failed_documents=0,
avg_extraction_confidence=summary.confidence,
newest_evidence_at=summary.generated_at,
source_types={"unknown"},
)
def _compute_data_quality_score(
ctx: DataQualityContext,
config: SuppressionConfig,
reference_time: datetime,
) -> float:
"""Compute an overall data quality score from the context.
Returns a value in [0, 1] where higher is better quality.
Components:
- Extraction confidence (40% weight)
- Evidence freshness (30% weight)
- Document coverage (30% weight)
"""
# Extraction confidence component
conf_component = min(ctx.avg_extraction_confidence / 0.8, 1.0)
# Freshness component
if ctx.newest_evidence_at is not None:
if ctx.newest_evidence_at.tzinfo is None:
newest = ctx.newest_evidence_at.replace(tzinfo=timezone.utc)
else:
newest = ctx.newest_evidence_at
age_hours = (reference_time - newest).total_seconds() / 3600.0
max_hours = config.max_evidence_staleness_hours
freshness_component = max(0.0, 1.0 - (age_hours / max_hours))
else:
freshness_component = 0.0
# Document coverage component
if ctx.total_documents > 0:
valid_ratio = ctx.valid_documents / ctx.total_documents
count_factor = min(ctx.valid_documents / 10.0, 1.0)
coverage_component = valid_ratio * count_factor
else:
coverage_component = 0.0
score = (0.4 * conf_component) + (0.3 * freshness_component) + (0.3 * coverage_component)
return round(max(0.0, min(1.0, score)), 4)
def evaluate_suppression(
summary: TrendSummary,
quality_ctx: DataQualityContext | None = None,
config: SuppressionConfig = DEFAULT_SUPPRESSION_CONFIG,
reference_time: datetime | None = None,
) -> SuppressionResult:
"""Evaluate whether a recommendation should be suppressed due to data quality.
Checks multiple data quality dimensions and returns a SuppressionResult
indicating whether the recommendation should be suppressed and why.
Args:
summary: The trend summary to evaluate.
quality_ctx: Data quality context. If None, a minimal context is
built from the trend summary itself.
config: Suppression thresholds.
reference_time: Reference time for staleness checks.
Returns:
SuppressionResult with suppression decision and reasons.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
ctx = quality_ctx or build_quality_context_from_summary(summary)
reasons: list[SuppressionReason] = []
# Check average extraction confidence
if ctx.avg_extraction_confidence < config.min_avg_extraction_confidence:
reasons.append(SuppressionReason.LOW_DATA_CONFIDENCE)
# Check evidence staleness
if ctx.newest_evidence_at is not None:
newest = ctx.newest_evidence_at
if newest.tzinfo is None:
newest = newest.replace(tzinfo=timezone.utc)
age_hours = (reference_time - newest).total_seconds() / 3600.0
if age_hours > config.max_evidence_staleness_hours:
reasons.append(SuppressionReason.STALE_EVIDENCE)
elif ctx.total_documents > 0:
# Have documents but no timestamp — treat as stale
reasons.append(SuppressionReason.STALE_EVIDENCE)
# Check source diversity
if len(ctx.source_types) < config.min_source_types and ctx.total_documents > 0:
reasons.append(SuppressionReason.LOW_SOURCE_DIVERSITY)
# Check extraction failure rate
if ctx.total_documents > 0:
failure_rate = ctx.failed_documents / ctx.total_documents
if failure_rate > config.max_extraction_failure_rate:
reasons.append(SuppressionReason.HIGH_EXTRACTION_FAILURE_RATE)
# Check minimum valid documents
if ctx.valid_documents < config.min_valid_documents:
reasons.append(SuppressionReason.INSUFFICIENT_VALID_DOCUMENTS)
# Compute overall data quality score
quality_score = _compute_data_quality_score(ctx, config, reference_time)
# If quality score is below threshold, add a general suppression reason
if quality_score < config.min_data_quality_score and SuppressionReason.LOW_DATA_CONFIDENCE not in reasons:
reasons.append(SuppressionReason.LOW_DATA_CONFIDENCE)
suppressed = len(reasons) > 0
if suppressed:
logger.info(
"Recommendation suppressed for %s/%s: reasons=%s quality_score=%.3f",
summary.entity_id, summary.window.value,
[r.value for r in reasons], quality_score,
)
return SuppressionResult(
suppressed=suppressed,
reasons=reasons,
data_quality_score=quality_score,
context=ctx,
)
# ---------------------------------------------------------------------------
# Macro-only suppression (Requirements: 10.3)
# ---------------------------------------------------------------------------
MACRO_ONLY_CAVEAT = (
"[Macro-only signal] This trend direction is driven solely by macro/geopolitical "
"signals with no supporting company-specific evidence. Recommendation is "
"informational only and should not be used for automated trading decisions."
)
def evaluate_macro_only_suppression(
summary: TrendSummary,
macro_signal_count: int,
company_signal_count: int,
) -> bool:
"""Evaluate whether a recommendation should be suppressed due to macro-only signals.
When macro signals are the sole basis for a trend direction change
(no supporting company-specific signals), the recommendation should
be forced to informational mode with a macro-only caveat.
Args:
summary: The trend summary to evaluate.
macro_signal_count: Number of macro signals contributing to the trend.
company_signal_count: Number of company-specific signals contributing.
Returns:
True if the recommendation should be suppressed (macro-only), False otherwise.
Requirements: 10.3
"""
# No macro signals means no macro-only suppression
if macro_signal_count <= 0:
return False
# If there are company-specific signals, no suppression needed
if company_signal_count > 0:
return False
# Macro signals are the sole basis — suppress
logger.info(
"Macro-only suppression triggered for %s/%s: "
"macro_signals=%d, company_signals=%d, direction=%s",
summary.entity_id,
summary.window.value,
macro_signal_count,
company_signal_count,
summary.trend_direction.value,
)
return True
# ---------------------------------------------------------------------------
# Pattern-only suppression (Requirements: 9.3)
# ---------------------------------------------------------------------------
PATTERN_ONLY_CAVEAT = (
"[Pattern-only signal] This trend direction is driven solely by historical "
"pattern and competitive signals with no supporting company-specific or macro "
"evidence. Recommendation is informational only."
)
def evaluate_pattern_only_suppression(
summary: TrendSummary,
pattern_signal_count: int,
company_signal_count: int,
macro_signal_count: int,
) -> bool:
"""Evaluate whether a recommendation should be suppressed due to pattern-only signals.
When pattern-based signals are the sole basis for a trend direction change
(no supporting company-specific or macro signals), the recommendation should
be forced to informational mode with a pattern-only caveat.
Args:
summary: The trend summary to evaluate.
pattern_signal_count: Number of pattern/competitive signals contributing.
company_signal_count: Number of company-specific signals contributing.
macro_signal_count: Number of macro signals contributing.
Returns:
True if the recommendation should be suppressed (pattern-only), False otherwise.
Requirements: 9.3
"""
# No pattern signals means no pattern-only suppression
if pattern_signal_count <= 0:
return False
# If there are company-specific signals, no suppression needed
if company_signal_count > 0:
return False
# If there are macro signals, no suppression needed
if macro_signal_count > 0:
return False
# Pattern signals are the sole basis — suppress
logger.info(
"Pattern-only suppression triggered for %s/%s: "
"pattern_signals=%d, company_signals=%d, macro_signals=%d, direction=%s",
summary.entity_id,
summary.window.value,
pattern_signal_count,
company_signal_count,
macro_signal_count,
summary.trend_direction.value,
)
return True