Files
Celes Renata 4e010bc048
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: signal math upgrade — probabilistic, regime-aware scoring pipeline
Implement full probabilistic signal processing pipeline gated behind
probabilistic_scoring_enabled feature flag in risk_configs:

- Bayesian log-likelihood accumulator with Beta posterior and entropy
- Regime detector (trend-following, panic, mean-reversion, uncertainty)
- Source accuracy tracker with per-source historical prediction accuracy
- Sigmoid confidence gate replacing binary gate
- Information gain surprise weighting for rare events
- Adaptive recency decay with event-specific half-lives
- Regime multiplier replacing market context multiplier
- Weighted disagreement entropy for contradiction detection
- Multiplicative macro exposure with conditional integration
- Graph-distance attenuated competitive signal propagation
- Exponentially weighted momentum with volatility scaling
- Expected value recommendation gate

All changes backward-compatible: flag=false preserves exact current behavior.
New outputs stored in existing JSONB columns (no schema changes except
source_accuracy table via migration 034).

Tests: 26 property-based tests (14 correctness properties), 99 unit tests,
1789 total tests passing with zero regressions.
2026-04-29 11:41:48 +00:00

467 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Deterministic recommendation eligibility logic.
Evaluates trend summaries against configurable thresholds to decide:
- Whether a recommendation should be generated at all
- What action type (buy/sell/hold/watch) is appropriate
- What execution mode (informational/paper_eligible/live_eligible) is allowed
- Position sizing guidance based on portfolio rules
All decisions are rule-based with no model involvement. The LLM is only
used downstream for optional thesis wording (a separate task).
Requirements: 7.1, 7.2, 7.3, 7.4, 14.1, 14.2, 14.3, 14.4, 14.5, 14.6
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from enum import Enum
from services.shared.schemas import (
ActionType,
PositionSizing,
RecommendationMode,
TrendDirection,
TrendSummary,
)
class RejectionReason(str, Enum):
"""Why a trend summary was deemed ineligible for a recommendation."""
LOW_CONFIDENCE = "low_confidence"
LOW_TREND_STRENGTH = "low_trend_strength"
HIGH_CONTRADICTION = "high_contradiction"
INSUFFICIENT_EVIDENCE = "insufficient_evidence"
NEUTRAL_DIRECTION = "neutral_direction"
@dataclass(frozen=True)
class EligibilityConfig:
"""Tunable thresholds for recommendation eligibility.
All thresholds are deterministic — no model inference involved.
"""
# --- Gate thresholds (below these → no recommendation) ---
min_confidence: float = 0.35
min_trend_strength: float = 0.10
max_contradiction_score: float = 0.60
min_evidence_count: int = 2 # combined supporting + opposing
# --- Action mapping thresholds ---
# Trend strength above this → buy/sell; below → hold/watch
action_strength_threshold: float = 0.25
# Confidence above this → hold (rather than watch) for weak signals
hold_confidence_threshold: float = 0.50
# --- Mode escalation thresholds ---
# Confidence required for paper_eligible (below → informational)
paper_confidence_threshold: float = 0.50
# Confidence required for live_eligible (below → paper at most)
live_confidence_threshold: float = 0.70
# Contradiction must be below this for live eligibility
live_max_contradiction: float = 0.25
# Minimum evidence count for live eligibility
live_min_evidence: int = 5
# --- Position sizing rules (Requirement 7.3) ---
# Base portfolio allocation percentage
base_portfolio_pct: float = 0.01
# Maximum portfolio allocation percentage
max_portfolio_pct: float = 0.10
# Base max loss percentage
base_max_loss_pct: float = 0.003
# Maximum max loss percentage
max_max_loss_pct: float = 0.02
# Confidence scaling: higher confidence → larger position (linear)
confidence_sizing_weight: float = 0.8
# Contradiction penalty: higher contradiction → smaller position
contradiction_sizing_penalty: float = 0.5
# --- Expected value gate (Requirement 14) ---
# EV threshold: minimum expected value to allow recommendation through
ev_threshold: float = 0.005
DEFAULT_ELIGIBILITY_CONFIG = EligibilityConfig()
@dataclass
class EligibilityResult:
"""Output of the deterministic eligibility evaluation.
Captures the decision, the reasoning, and all inputs used so the
full decision trace is reproducible (Requirement 8.3).
"""
eligible: bool
action: ActionType
mode: RecommendationMode
position_sizing: PositionSizing
rejection_reasons: list[RejectionReason] = field(default_factory=list)
time_horizon: str = ""
invalidation_conditions: list[str] = field(default_factory=list)
# Probabilistic pipeline fields (Req 14.5, 16.2)
ev_value: float | None = None
p_bull: float | None = None
pipeline_mode: str = "heuristic"
# ---------------------------------------------------------------------------
# Gate checks
# ---------------------------------------------------------------------------
def _check_gates(
summary: TrendSummary,
config: EligibilityConfig,
) -> list[RejectionReason]:
"""Apply hard gate checks. Returns a list of rejection reasons (empty = pass)."""
reasons: list[RejectionReason] = []
if summary.confidence < config.min_confidence:
reasons.append(RejectionReason.LOW_CONFIDENCE)
if summary.trend_strength < config.min_trend_strength:
reasons.append(RejectionReason.LOW_TREND_STRENGTH)
if summary.contradiction_score > config.max_contradiction_score:
reasons.append(RejectionReason.HIGH_CONTRADICTION)
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < config.min_evidence_count:
reasons.append(RejectionReason.INSUFFICIENT_EVIDENCE)
if summary.trend_direction == TrendDirection.NEUTRAL:
reasons.append(RejectionReason.NEUTRAL_DIRECTION)
return reasons
# ---------------------------------------------------------------------------
# Action mapping
# ---------------------------------------------------------------------------
def _determine_action(
summary: TrendSummary,
config: EligibilityConfig,
) -> ActionType:
"""Map trend direction and strength to an action type.
Strong bullish → BUY, strong bearish → SELL.
Weak but directional → HOLD if confidence is decent, else WATCH.
Mixed → WATCH.
"""
direction = summary.trend_direction
strength = summary.trend_strength
if direction == TrendDirection.MIXED:
return ActionType.WATCH
if direction == TrendDirection.NEUTRAL:
return ActionType.WATCH
strong_signal = strength >= config.action_strength_threshold
if direction == TrendDirection.BULLISH:
if strong_signal:
return ActionType.BUY
return ActionType.HOLD if summary.confidence >= config.hold_confidence_threshold else ActionType.WATCH
if direction == TrendDirection.BEARISH:
if strong_signal:
return ActionType.SELL
return ActionType.HOLD if summary.confidence >= config.hold_confidence_threshold else ActionType.WATCH
return ActionType.WATCH
# ---------------------------------------------------------------------------
# Mode escalation
# ---------------------------------------------------------------------------
def _determine_mode(
summary: TrendSummary,
action: ActionType,
config: EligibilityConfig,
) -> RecommendationMode:
"""Determine the highest execution mode allowed.
WATCH and HOLD actions are always informational — they don't trigger trades.
BUY/SELL can escalate to paper_eligible or live_eligible based on
confidence, contradiction, and evidence thresholds.
"""
if action in (ActionType.WATCH, ActionType.HOLD):
return RecommendationMode.INFORMATIONAL
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
# Check live eligibility first (strictest)
if (
summary.confidence >= config.live_confidence_threshold
and summary.contradiction_score <= config.live_max_contradiction
and evidence_count >= config.live_min_evidence
):
return RecommendationMode.LIVE_ELIGIBLE
# Check paper eligibility
if summary.confidence >= config.paper_confidence_threshold:
return RecommendationMode.PAPER_ELIGIBLE
return RecommendationMode.INFORMATIONAL
# ---------------------------------------------------------------------------
# Position sizing (Requirement 7.3)
# ---------------------------------------------------------------------------
def _compute_position_sizing(
summary: TrendSummary,
config: EligibilityConfig,
) -> PositionSizing:
"""Compute position sizing guidance from portfolio rules and signal quality.
Higher confidence and trend strength → larger allocation (up to max).
Higher contradiction → smaller allocation (penalty).
Low evidence count further reduces allocation.
"""
# Confidence-based scaling over the full range
confidence_factor = config.confidence_sizing_weight * summary.confidence
# Trend strength multiplier — stronger trends justify larger positions
strength_factor = 0.5 + 0.5 * summary.trend_strength # range [0.5, 1.0]
portfolio_range = config.max_portfolio_pct - config.base_portfolio_pct
raw_portfolio = config.base_portfolio_pct + confidence_factor * strength_factor * portfolio_range
# Apply contradiction penalty
contradiction_penalty = config.contradiction_sizing_penalty * summary.contradiction_score
portfolio_pct = raw_portfolio * (1.0 - contradiction_penalty)
# Evidence count penalty — fewer sources = less confidence in sizing
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < 3:
portfolio_pct *= 0.5
elif evidence_count < 5:
portfolio_pct *= 0.75
# Clamp to bounds
portfolio_pct = max(config.base_portfolio_pct * 0.5, min(portfolio_pct, config.max_portfolio_pct))
# Max loss scales similarly
loss_range = config.max_max_loss_pct - config.base_max_loss_pct
raw_loss = config.base_max_loss_pct + confidence_factor * strength_factor * loss_range
max_loss_pct = raw_loss * (1.0 - contradiction_penalty)
if evidence_count < 3:
max_loss_pct *= 0.5
elif evidence_count < 5:
max_loss_pct *= 0.75
max_loss_pct = max(config.base_max_loss_pct * 0.5, min(max_loss_pct, config.max_max_loss_pct))
return PositionSizing(
portfolio_pct=round(portfolio_pct, 6),
max_loss_pct=round(max_loss_pct, 6),
)
# ---------------------------------------------------------------------------
# Time horizon mapping
# ---------------------------------------------------------------------------
_WINDOW_TO_HORIZON: dict[str, str] = {
"intraday": "intraday",
"1d": "swing_1d_3d",
"7d": "swing_1d_10d",
"30d": "position_10d_30d",
"90d": "position_30d_90d",
}
def _map_time_horizon(window: str) -> str:
"""Map a trend window to a human-readable time horizon label."""
return _WINDOW_TO_HORIZON.get(window, f"window_{window}")
# ---------------------------------------------------------------------------
# Invalidation conditions
# ---------------------------------------------------------------------------
def _derive_invalidation_conditions(
summary: TrendSummary,
action: ActionType,
) -> list[str]:
"""Generate deterministic invalidation conditions for the recommendation.
These describe when the recommendation should be considered stale or wrong.
"""
conditions: list[str] = []
if action == ActionType.BUY:
conditions.append(
f"Trend direction for {summary.entity_id} reverses to bearish"
)
elif action == ActionType.SELL:
conditions.append(
f"Trend direction for {summary.entity_id} reverses to bullish"
)
if summary.contradiction_score > 0.0:
conditions.append(
f"Contradiction score exceeds 0.60 (currently {summary.contradiction_score:.2f})"
)
if summary.confidence > 0.0:
conditions.append(
f"Confidence drops below {summary.confidence * 0.7:.2f}"
)
if summary.material_risks:
conditions.append(
f"Material risk materialises: {summary.material_risks[0]}"
)
return conditions
# ---------------------------------------------------------------------------
# Expected value computation (Requirements: 14.114.6)
# ---------------------------------------------------------------------------
# Horizon days mapping for EV computation
_EV_HORIZON_DAYS: dict[str, float] = {
"intraday": 1.0,
"1d": 1.0,
"7d": 7.0,
"30d": 30.0,
"90d": 90.0,
}
def compute_expected_value(
p_bull: float,
strength: float,
sigma_20: float,
horizon_days: float,
) -> float:
"""Compute expected value for the recommendation gate.
Formula:
R_up = strength · σ_20 · √(horizon_days)
R_down = (1 - strength) · σ_20 · √(horizon_days)
EV = P_bull · R_up - P_bear · R_down
where P_bear = 1 - P_bull.
Args:
p_bull: Bayesian bullish probability in [0, 1].
strength: Trend strength in [0, 1].
sigma_20: 20-day return standard deviation.
horizon_days: Number of days for the projection horizon.
Returns:
Expected value (can be negative).
Requirements: 14.1, 14.2
"""
p_bear = 1.0 - p_bull
sqrt_horizon = math.sqrt(max(horizon_days, 0.0))
r_up = strength * sigma_20 * sqrt_horizon
r_down = (1.0 - strength) * sigma_20 * sqrt_horizon
ev = p_bull * r_up - p_bear * r_down
# Guard against NaN/infinity from extreme inputs
if math.isnan(ev) or math.isinf(ev):
return 0.0
return ev
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def evaluate_eligibility(
summary: TrendSummary,
config: EligibilityConfig = DEFAULT_ELIGIBILITY_CONFIG,
*,
probabilistic: bool = False,
p_bull: float | None = None,
sigma_20: float = 0.01,
) -> EligibilityResult:
"""Evaluate a trend summary for recommendation eligibility.
This is the single deterministic entry point. It:
1. Applies gate checks (confidence, strength, contradiction, evidence)
2. Maps trend direction + strength to an action type
3. Determines the highest allowed execution mode
4. Computes position sizing from portfolio rules
5. Derives invalidation conditions
6. (probabilistic) Applies EV gate: EV > threshold to proceed
When ``probabilistic=True``:
- Computes EV = P_bull · R_up - P_bear · R_down
- When EV > threshold (default 0.005), allows recommendation through
- When EV ≤ threshold, forces recommendation to informational mode
- Populates expected_value, p_bull, pipeline_mode on result
When ``probabilistic=False``:
- Skips EV gate entirely (existing behavior)
Args:
summary: The current trend summary.
config: Eligibility configuration thresholds.
probabilistic: Use EV gate when True.
p_bull: Bayesian bullish probability (required when probabilistic=True).
sigma_20: 20-day return standard deviation for EV computation.
Returns an EligibilityResult with the full decision trace.
Requirements: 14.1, 14.2, 14.3, 14.4, 14.5, 14.6
"""
rejection_reasons = _check_gates(summary, config)
# Even if rejected, we still compute action/mode for the trace
action = _determine_action(summary, config)
mode = _determine_mode(summary, action, config)
sizing = _compute_position_sizing(summary, config)
horizon = _map_time_horizon(summary.window.value)
invalidation = _derive_invalidation_conditions(summary, action)
eligible = len(rejection_reasons) == 0
# If not eligible, force mode to informational (Requirement 7.4)
if not eligible:
mode = RecommendationMode.INFORMATIONAL
# EV gate (Requirement 14.114.6)
ev_value: float | None = None
if probabilistic and p_bull is not None:
horizon_days = _EV_HORIZON_DAYS.get(summary.window.value, 7.0)
ev_value = compute_expected_value(
p_bull=p_bull,
strength=summary.trend_strength,
sigma_20=sigma_20,
horizon_days=horizon_days,
)
if ev_value <= config.ev_threshold:
# Force to informational mode (Req 14.4)
mode = RecommendationMode.INFORMATIONAL
return EligibilityResult(
eligible=eligible,
action=action,
mode=mode,
position_sizing=sizing,
rejection_reasons=rejection_reasons,
time_horizon=horizon,
invalidation_conditions=invalidation,
ev_value=ev_value,
p_bull=p_bull if probabilistic else None,
pipeline_mode="probabilistic" if probabilistic else "heuristic",
)