Files
stonks-oracle/services/recommendation/eligibility.py
T
Celes Renata f11aa0a1ee fix: deduplicate recommendations and widen position sizing range
- Add dedup check in recommendation worker: skip generation when latest
  rec for same ticker+window has identical action/mode/confidence
- Widen position sizing range (1-10% portfolio, 0.3-2% max loss) and
  factor in trend strength + evidence count for differentiated sizing
- API returns only latest recommendation per ticker by default (DISTINCT ON)
  to eliminate duplicate rows in the frontend list view
2026-04-17 00:15:32 +00:00

365 lines
12 KiB
Python

"""Deterministic recommendation eligibility logic.
Evaluates trend summaries against configurable thresholds to decide:
- Whether a recommendation should be generated at all
- What action type (buy/sell/hold/watch) is appropriate
- What execution mode (informational/paper_eligible/live_eligible) is allowed
- Position sizing guidance based on portfolio rules
All decisions are rule-based with no model involvement. The LLM is only
used downstream for optional thesis wording (a separate task).
Requirements: 7.1, 7.2, 7.3, 7.4
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from services.shared.schemas import (
ActionType,
PositionSizing,
RecommendationMode,
TrendDirection,
TrendSummary,
)
class RejectionReason(str, Enum):
"""Why a trend summary was deemed ineligible for a recommendation."""
LOW_CONFIDENCE = "low_confidence"
LOW_TREND_STRENGTH = "low_trend_strength"
HIGH_CONTRADICTION = "high_contradiction"
INSUFFICIENT_EVIDENCE = "insufficient_evidence"
NEUTRAL_DIRECTION = "neutral_direction"
@dataclass(frozen=True)
class EligibilityConfig:
"""Tunable thresholds for recommendation eligibility.
All thresholds are deterministic — no model inference involved.
"""
# --- Gate thresholds (below these → no recommendation) ---
min_confidence: float = 0.35
min_trend_strength: float = 0.10
max_contradiction_score: float = 0.60
min_evidence_count: int = 2 # combined supporting + opposing
# --- Action mapping thresholds ---
# Trend strength above this → buy/sell; below → hold/watch
action_strength_threshold: float = 0.25
# Confidence above this → hold (rather than watch) for weak signals
hold_confidence_threshold: float = 0.50
# --- Mode escalation thresholds ---
# Confidence required for paper_eligible (below → informational)
paper_confidence_threshold: float = 0.50
# Confidence required for live_eligible (below → paper at most)
live_confidence_threshold: float = 0.70
# Contradiction must be below this for live eligibility
live_max_contradiction: float = 0.25
# Minimum evidence count for live eligibility
live_min_evidence: int = 5
# --- Position sizing rules (Requirement 7.3) ---
# Base portfolio allocation percentage
base_portfolio_pct: float = 0.01
# Maximum portfolio allocation percentage
max_portfolio_pct: float = 0.10
# Base max loss percentage
base_max_loss_pct: float = 0.003
# Maximum max loss percentage
max_max_loss_pct: float = 0.02
# Confidence scaling: higher confidence → larger position (linear)
confidence_sizing_weight: float = 0.8
# Contradiction penalty: higher contradiction → smaller position
contradiction_sizing_penalty: float = 0.5
DEFAULT_ELIGIBILITY_CONFIG = EligibilityConfig()
@dataclass
class EligibilityResult:
"""Output of the deterministic eligibility evaluation.
Captures the decision, the reasoning, and all inputs used so the
full decision trace is reproducible (Requirement 8.3).
"""
eligible: bool
action: ActionType
mode: RecommendationMode
position_sizing: PositionSizing
rejection_reasons: list[RejectionReason] = field(default_factory=list)
time_horizon: str = ""
invalidation_conditions: list[str] = field(default_factory=list)
# ---------------------------------------------------------------------------
# Gate checks
# ---------------------------------------------------------------------------
def _check_gates(
summary: TrendSummary,
config: EligibilityConfig,
) -> list[RejectionReason]:
"""Apply hard gate checks. Returns a list of rejection reasons (empty = pass)."""
reasons: list[RejectionReason] = []
if summary.confidence < config.min_confidence:
reasons.append(RejectionReason.LOW_CONFIDENCE)
if summary.trend_strength < config.min_trend_strength:
reasons.append(RejectionReason.LOW_TREND_STRENGTH)
if summary.contradiction_score > config.max_contradiction_score:
reasons.append(RejectionReason.HIGH_CONTRADICTION)
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < config.min_evidence_count:
reasons.append(RejectionReason.INSUFFICIENT_EVIDENCE)
if summary.trend_direction == TrendDirection.NEUTRAL:
reasons.append(RejectionReason.NEUTRAL_DIRECTION)
return reasons
# ---------------------------------------------------------------------------
# Action mapping
# ---------------------------------------------------------------------------
def _determine_action(
summary: TrendSummary,
config: EligibilityConfig,
) -> ActionType:
"""Map trend direction and strength to an action type.
Strong bullish → BUY, strong bearish → SELL.
Weak but directional → HOLD if confidence is decent, else WATCH.
Mixed → WATCH.
"""
direction = summary.trend_direction
strength = summary.trend_strength
if direction == TrendDirection.MIXED:
return ActionType.WATCH
if direction == TrendDirection.NEUTRAL:
return ActionType.WATCH
strong_signal = strength >= config.action_strength_threshold
if direction == TrendDirection.BULLISH:
if strong_signal:
return ActionType.BUY
return ActionType.HOLD if summary.confidence >= config.hold_confidence_threshold else ActionType.WATCH
if direction == TrendDirection.BEARISH:
if strong_signal:
return ActionType.SELL
return ActionType.HOLD if summary.confidence >= config.hold_confidence_threshold else ActionType.WATCH
return ActionType.WATCH
# ---------------------------------------------------------------------------
# Mode escalation
# ---------------------------------------------------------------------------
def _determine_mode(
summary: TrendSummary,
action: ActionType,
config: EligibilityConfig,
) -> RecommendationMode:
"""Determine the highest execution mode allowed.
WATCH and HOLD actions are always informational — they don't trigger trades.
BUY/SELL can escalate to paper_eligible or live_eligible based on
confidence, contradiction, and evidence thresholds.
"""
if action in (ActionType.WATCH, ActionType.HOLD):
return RecommendationMode.INFORMATIONAL
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
# Check live eligibility first (strictest)
if (
summary.confidence >= config.live_confidence_threshold
and summary.contradiction_score <= config.live_max_contradiction
and evidence_count >= config.live_min_evidence
):
return RecommendationMode.LIVE_ELIGIBLE
# Check paper eligibility
if summary.confidence >= config.paper_confidence_threshold:
return RecommendationMode.PAPER_ELIGIBLE
return RecommendationMode.INFORMATIONAL
# ---------------------------------------------------------------------------
# Position sizing (Requirement 7.3)
# ---------------------------------------------------------------------------
def _compute_position_sizing(
summary: TrendSummary,
config: EligibilityConfig,
) -> PositionSizing:
"""Compute position sizing guidance from portfolio rules and signal quality.
Higher confidence and trend strength → larger allocation (up to max).
Higher contradiction → smaller allocation (penalty).
Low evidence count further reduces allocation.
"""
# Confidence-based scaling over the full range
confidence_factor = config.confidence_sizing_weight * summary.confidence
# Trend strength multiplier — stronger trends justify larger positions
strength_factor = 0.5 + 0.5 * summary.trend_strength # range [0.5, 1.0]
portfolio_range = config.max_portfolio_pct - config.base_portfolio_pct
raw_portfolio = config.base_portfolio_pct + confidence_factor * strength_factor * portfolio_range
# Apply contradiction penalty
contradiction_penalty = config.contradiction_sizing_penalty * summary.contradiction_score
portfolio_pct = raw_portfolio * (1.0 - contradiction_penalty)
# Evidence count penalty — fewer sources = less confidence in sizing
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < 3:
portfolio_pct *= 0.5
elif evidence_count < 5:
portfolio_pct *= 0.75
# Clamp to bounds
portfolio_pct = max(config.base_portfolio_pct * 0.5, min(portfolio_pct, config.max_portfolio_pct))
# Max loss scales similarly
loss_range = config.max_max_loss_pct - config.base_max_loss_pct
raw_loss = config.base_max_loss_pct + confidence_factor * strength_factor * loss_range
max_loss_pct = raw_loss * (1.0 - contradiction_penalty)
if evidence_count < 3:
max_loss_pct *= 0.5
elif evidence_count < 5:
max_loss_pct *= 0.75
max_loss_pct = max(config.base_max_loss_pct * 0.5, min(max_loss_pct, config.max_max_loss_pct))
return PositionSizing(
portfolio_pct=round(portfolio_pct, 6),
max_loss_pct=round(max_loss_pct, 6),
)
# ---------------------------------------------------------------------------
# Time horizon mapping
# ---------------------------------------------------------------------------
_WINDOW_TO_HORIZON: dict[str, str] = {
"intraday": "intraday",
"1d": "swing_1d_3d",
"7d": "swing_1d_10d",
"30d": "position_10d_30d",
"90d": "position_30d_90d",
}
def _map_time_horizon(window: str) -> str:
"""Map a trend window to a human-readable time horizon label."""
return _WINDOW_TO_HORIZON.get(window, f"window_{window}")
# ---------------------------------------------------------------------------
# Invalidation conditions
# ---------------------------------------------------------------------------
def _derive_invalidation_conditions(
summary: TrendSummary,
action: ActionType,
) -> list[str]:
"""Generate deterministic invalidation conditions for the recommendation.
These describe when the recommendation should be considered stale or wrong.
"""
conditions: list[str] = []
if action == ActionType.BUY:
conditions.append(
f"Trend direction for {summary.entity_id} reverses to bearish"
)
elif action == ActionType.SELL:
conditions.append(
f"Trend direction for {summary.entity_id} reverses to bullish"
)
if summary.contradiction_score > 0.0:
conditions.append(
f"Contradiction score exceeds 0.60 (currently {summary.contradiction_score:.2f})"
)
if summary.confidence > 0.0:
conditions.append(
f"Confidence drops below {summary.confidence * 0.7:.2f}"
)
if summary.material_risks:
conditions.append(
f"Material risk materialises: {summary.material_risks[0]}"
)
return conditions
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def evaluate_eligibility(
summary: TrendSummary,
config: EligibilityConfig = DEFAULT_ELIGIBILITY_CONFIG,
) -> EligibilityResult:
"""Evaluate a trend summary for recommendation eligibility.
This is the single deterministic entry point. It:
1. Applies gate checks (confidence, strength, contradiction, evidence)
2. Maps trend direction + strength to an action type
3. Determines the highest allowed execution mode
4. Computes position sizing from portfolio rules
5. Derives invalidation conditions
Returns an EligibilityResult with the full decision trace.
"""
rejection_reasons = _check_gates(summary, config)
# Even if rejected, we still compute action/mode for the trace
action = _determine_action(summary, config)
mode = _determine_mode(summary, action, config)
sizing = _compute_position_sizing(summary, config)
horizon = _map_time_horizon(summary.window.value)
invalidation = _derive_invalidation_conditions(summary, action)
eligible = len(rejection_reasons) == 0
# If not eligible, force mode to informational (Requirement 7.4)
if not eligible:
mode = RecommendationMode.INFORMATIONAL
return EligibilityResult(
eligible=eligible,
action=action,
mode=mode,
position_sizing=sizing,
rejection_reasons=rejection_reasons,
time_horizon=horizon,
invalidation_conditions=invalidation,
)