Files
stonks-oracle/services/trading/circuit_breaker.py
T
Celes Renata 4ffde8cc06 feat: autonomous trading engine — full implementation
- Database migration 018 with 13 tables for trading engine state
- Trading engine service (services/trading/) with 12 pure computation modules:
  position sizer, stop-loss manager, reserve pool, circuit breaker,
  risk tier controller, correlation matrix, tax lots, trading window,
  gradual entry, notifications, micro-trading, backtester
- Core TradingEngine with pre-trade evaluation pipeline and integration wiring
- FastAPI HTTP service with 14 endpoints (health, config, decisions, metrics, backtest)
- Performance tracker with Sharpe ratio, drawdown, profit factor computation
- 194 Python tests (165 property-based + 29 integration)
- Frontend: 13 TanStack Query hooks, 7 dashboard panels, tabbed Trading Engine page
- Helm chart entry, network policy, nginx proxy, ingress for trading-engine
- Shared infrastructure: enums, Redis keys, TradingConfig in AppConfig
2026-04-15 16:12:22 +00:00

153 lines
5.7 KiB
Python

"""Circuit breaker safety mechanism for the autonomous trading engine.
Pure computation module — no DB or Redis access. State management and
persistence are handled by the caller. All methods operate on values
passed in as arguments and return deterministic results.
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from services.trading.models import CircuitBreakerState
class CircuitBreaker:
"""Evaluates circuit breaker conditions and computes cooldown expiries.
Parameters
----------
daily_loss_pct:
Maximum allowed daily portfolio loss as a fraction (e.g. 0.05 = 5%).
single_position_loss_pct:
Maximum allowed loss on a single position as a fraction of entry value.
ticker_cooldown_hours:
Hours a ticker is blocked from re-entry after a single-position breach.
volatility_pause_hours:
Hours trading is paused after a volatility (stop-loss cluster) trigger.
stop_loss_hits_threshold:
Number of stop-loss hits within the window that triggers a volatility pause.
stop_loss_window_minutes:
Rolling window (in minutes) for counting clustered stop-loss hits.
"""
def __init__(
self,
daily_loss_pct: float = 0.05,
single_position_loss_pct: float = 0.15,
ticker_cooldown_hours: int = 48,
volatility_pause_hours: int = 2,
stop_loss_hits_threshold: int = 3,
stop_loss_window_minutes: int = 30,
) -> None:
self.daily_loss_pct = daily_loss_pct
self.single_position_loss_pct = single_position_loss_pct
self.ticker_cooldown_hours = ticker_cooldown_hours
self.volatility_pause_hours = volatility_pause_hours
self.stop_loss_hits_threshold = stop_loss_hits_threshold
self.stop_loss_window_minutes = stop_loss_window_minutes
# ------------------------------------------------------------------
# Trigger checks
# ------------------------------------------------------------------
def check_daily_loss(self, daily_pnl: float, portfolio_value: float) -> bool:
"""Return True when the portfolio has dropped more than *daily_loss_pct* today.
Parameters
----------
daily_pnl:
Today's profit/loss — negative values represent losses.
portfolio_value:
Current total portfolio value (must be positive).
"""
if portfolio_value <= 0:
return True # degenerate case — treat as triggered
loss_ratio = abs(daily_pnl) / portfolio_value
return daily_pnl < 0 and loss_ratio > self.daily_loss_pct
def check_single_position(self, position_loss_pct: float) -> bool:
"""Return True when a single position has lost more than the threshold.
Parameters
----------
position_loss_pct:
Loss expressed as a positive fraction of entry value
(e.g. 0.15 means the position is down 15%).
"""
return position_loss_pct > self.single_position_loss_pct
def check_volatility(self, stop_loss_hits: list[datetime]) -> bool:
"""Return True when too many stop-losses fired within the rolling window.
Parameters
----------
stop_loss_hits:
Timestamps of recent stop-loss trigger events.
"""
if len(stop_loss_hits) < self.stop_loss_hits_threshold:
return False
window = timedelta(minutes=self.stop_loss_window_minutes)
sorted_hits = sorted(stop_loss_hits)
# Sliding window: check every contiguous sub-sequence of length
# *stop_loss_hits_threshold* to see if it fits within the window.
for i in range(len(sorted_hits) - self.stop_loss_hits_threshold + 1):
start = sorted_hits[i]
end = sorted_hits[i + self.stop_loss_hits_threshold - 1]
if end - start <= window:
return True
return False
# ------------------------------------------------------------------
# Cooldown helpers
# ------------------------------------------------------------------
def is_ticker_cooled_down(
self,
ticker: str,
ticker_cooldowns: dict[str, datetime],
now: datetime | None = None,
) -> bool:
"""Return True if *ticker* is still in its cooldown period.
Returns False when the cooldown has expired or the ticker has no
active cooldown.
"""
if ticker not in ticker_cooldowns:
return False
now = now or datetime.now(tz=timezone.utc)
return now < ticker_cooldowns[ticker]
def is_active(
self,
state: CircuitBreakerState,
now: datetime | None = None,
) -> bool:
"""Return True if any circuit breaker is currently active (not expired)."""
if not state.active:
return False
if state.cooldown_expires is None:
# Active with no expiry — treat as active.
return True
now = now or datetime.now(tz=timezone.utc)
return now < state.cooldown_expires
def compute_cooldown_expiry(
self,
trigger_type: str,
triggered_at: datetime,
) -> datetime:
"""Compute when the cooldown for *trigger_type* expires.
* ``daily_loss`` / ``volatility`` → *triggered_at* + *volatility_pause_hours*
* ``single_position`` → *triggered_at* + *ticker_cooldown_hours*
"""
if trigger_type == "single_position":
return triggered_at + timedelta(hours=self.ticker_cooldown_hours)
# daily_loss, volatility, and any other type default to the
# volatility pause duration.
return triggered_at + timedelta(hours=self.volatility_pause_hours)