"""Regime detector for market regime classification. Classifies the current market regime for each ticker based on EMA trend indicators and volatility ratios. Adjusts scoring thresholds and contradiction penalties per regime. Requirements: 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.9 """ from __future__ import annotations import math import statistics from dataclasses import dataclass from enum import Enum class MarketRegime(str, Enum): """Market regime classification categories.""" TREND_FOLLOWING = "trend_following" PANIC = "panic" MEAN_REVERSION = "mean_reversion" UNCERTAINTY = "uncertainty" @dataclass(frozen=True) class RegimeClassification: """Result of regime detection for a ticker.""" regime: MarketRegime trend_indicator: float # R = sign(EMA_20 - EMA_100) volatility_ratio: float # V_r = σ_20 / σ_100 bullish_threshold: float # Adjusted ±threshold for direction bearish_threshold: float contradiction_penalty_multiplier: float # 0.4 default, 0.6 for uncertainty @dataclass(frozen=True) class RegimeConfig: """Configuration parameters for regime detection.""" ema_short_period: int = 20 ema_long_period: int = 100 vol_short_period: int = 20 vol_long_period: int = 100 panic_vol_ratio: float = 1.5 trend_vol_ratio: float = 1.2 mean_reversion_vol_ratio: float = 1.0 default_threshold: float = 0.15 panic_threshold: float = 0.10 mean_reversion_threshold: float = 0.20 uncertainty_contradiction_multiplier: float = 0.6 # Default uncertainty classification used when data is insufficient _DEFAULT_UNCERTAINTY = RegimeClassification( regime=MarketRegime.UNCERTAINTY, trend_indicator=0.0, volatility_ratio=1.0, bullish_threshold=0.15, bearish_threshold=-0.15, contradiction_penalty_multiplier=0.6, ) def compute_ema(values: list[float], period: int) -> float: """Compute exponential moving average over the last ``period`` values. Uses the standard EMA formula with multiplier = 2 / (period + 1). Iterates through the values, seeding the EMA with the first value. Raises ``ValueError`` when *values* is empty or *period* < 1. """ if not values or period < 1: raise ValueError("values must be non-empty and period must be >= 1") # Use only the last `period` values (or all if fewer) data = values[-period:] if len(values) >= period else values multiplier = 2.0 / (period + 1) ema = data[0] for value in data[1:]: ema = (value - ema) * multiplier + ema return ema def _sign(x: float) -> float: """Return -1.0, 0.0, or 1.0 for the sign of *x*.""" if x > 0.0: return 1.0 if x < 0.0: return -1.0 return 0.0 def classify_regime( closing_prices: list[float], returns: list[float], config: RegimeConfig = RegimeConfig(), ) -> RegimeClassification: """Classify market regime from price and return history. Requires at least ``config.ema_long_period`` days of price history for EMA_100. Falls back to UNCERTAINTY when data is insufficient or standard deviations are zero. Requirements: 7.1, 7.2, 7.3, 7.4, 7.5, 7.6, 7.7, 7.9 """ # Insufficient price data → uncertainty if len(closing_prices) < config.ema_long_period: return _DEFAULT_UNCERTAINTY # Insufficient return data → uncertainty if len(returns) < config.vol_long_period: return _DEFAULT_UNCERTAINTY # --- Trend indicator: R = sign(EMA_short - EMA_long) --- ema_short = compute_ema(closing_prices, config.ema_short_period) ema_long = compute_ema(closing_prices, config.ema_long_period) trend_indicator = _sign(ema_short - ema_long) # --- Volatility ratio: V_r = σ_short / σ_long --- short_returns = returns[-config.vol_short_period:] long_returns = returns[-config.vol_long_period:] # Guard against zero or near-zero standard deviations if len(short_returns) < 2 or len(long_returns) < 2: return _DEFAULT_UNCERTAINTY sigma_short = statistics.stdev(short_returns) sigma_long = statistics.stdev(long_returns) if sigma_long == 0.0 or sigma_short == 0.0: return _DEFAULT_UNCERTAINTY if math.isnan(sigma_short) or math.isnan(sigma_long): return _DEFAULT_UNCERTAINTY volatility_ratio = sigma_short / sigma_long # --- Classification rules (Req 7.3) --- # Panic takes priority: V_r > 1.5 if volatility_ratio > config.panic_vol_ratio: regime = MarketRegime.PANIC threshold = config.panic_threshold # ±0.10 contradiction_mult = 0.4 # Trend-following: R ≠ 0 AND V_r < 1.2 elif trend_indicator != 0.0 and volatility_ratio < config.trend_vol_ratio: regime = MarketRegime.TREND_FOLLOWING threshold = config.default_threshold # ±0.15 contradiction_mult = 0.4 # Mean-reversion: R = 0 AND V_r < 1.0 elif trend_indicator == 0.0 and volatility_ratio < config.mean_reversion_vol_ratio: regime = MarketRegime.MEAN_REVERSION threshold = config.mean_reversion_threshold # ±0.20 contradiction_mult = 0.4 # Uncertainty: all other cases else: regime = MarketRegime.UNCERTAINTY threshold = config.default_threshold # ±0.15 contradiction_mult = config.uncertainty_contradiction_multiplier # 0.6 return RegimeClassification( regime=regime, trend_indicator=trend_indicator, volatility_ratio=volatility_ratio, bullish_threshold=threshold, bearish_threshold=-threshold, contradiction_penalty_multiplier=contradiction_mult, )