4ffde8cc06
- Database migration 018 with 13 tables for trading engine state - Trading engine service (services/trading/) with 12 pure computation modules: position sizer, stop-loss manager, reserve pool, circuit breaker, risk tier controller, correlation matrix, tax lots, trading window, gradual entry, notifications, micro-trading, backtester - Core TradingEngine with pre-trade evaluation pipeline and integration wiring - FastAPI HTTP service with 14 endpoints (health, config, decisions, metrics, backtest) - Performance tracker with Sharpe ratio, drawdown, profit factor computation - 194 Python tests (165 property-based + 29 integration) - Frontend: 13 TanStack Query hooks, 7 dashboard panels, tabbed Trading Engine page - Helm chart entry, network policy, nginx proxy, ingress for trading-engine - Shared infrastructure: enums, Redis keys, TradingConfig in AppConfig
513 lines
18 KiB
Python
513 lines
18 KiB
Python
"""Core autonomous trading engine — decision loop and pre-trade evaluation.
|
|
|
|
Feature: autonomous-trading-engine
|
|
|
|
Coordinates all trading sub-components (PositionSizer, StopLossManager,
|
|
CircuitBreaker, ReservePoolController, RiskTierController, CorrelationMatrix)
|
|
to evaluate recommendations and produce TradingDecision records.
|
|
|
|
The ``evaluate_recommendation`` method is deliberately synchronous-compatible
|
|
so that it can be tested without real DB/Redis connections. The async
|
|
``start`` / ``stop`` methods are thin lifecycle stubs wired up in Task 25.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from services.shared.config import TradingConfig
|
|
from services.trading.circuit_breaker import CircuitBreaker
|
|
from services.trading.correlation import CorrelationMatrix
|
|
from services.trading.micro_trading import MicroTradeConfig, MicroTradingModule
|
|
from services.trading.models import (
|
|
CircuitBreakerState,
|
|
OpenPosition,
|
|
PerformanceMetrics,
|
|
PortfolioState,
|
|
PositionSizeResult,
|
|
RiskTierConfig,
|
|
StopLevels,
|
|
StopTrigger,
|
|
TradingDecision,
|
|
)
|
|
from services.trading.notifications import NotificationRecord, NotificationService
|
|
from services.trading.position_sizer import PositionSizer
|
|
from services.trading.rebalancer import PortfolioRebalancer
|
|
from services.trading.reserve_pool import ReservePoolController
|
|
from services.trading.risk_tier_controller import RiskTierController
|
|
from services.trading.stop_loss_manager import StopLossManager
|
|
from services.trading.trading_window import is_within_trading_window
|
|
|
|
|
|
class TradingEngine:
|
|
"""Main autonomous trading engine.
|
|
|
|
Manages the decision loop, coordinates all sub-components,
|
|
and maintains runtime state.
|
|
|
|
Parameters
|
|
----------
|
|
pool:
|
|
asyncpg connection pool (used by async lifecycle methods).
|
|
redis:
|
|
Redis client (used for deduplication and pub/sub).
|
|
config:
|
|
Trading engine configuration.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pool: object,
|
|
redis: object,
|
|
config: TradingConfig,
|
|
) -> None:
|
|
self.pool = pool
|
|
self.redis = redis
|
|
self.config = config
|
|
|
|
# Sub-components
|
|
self.position_sizer = PositionSizer()
|
|
self.stop_loss_manager = StopLossManager()
|
|
self.circuit_breaker = CircuitBreaker()
|
|
self.reserve_pool_controller = ReservePoolController(
|
|
siphon_pct=config.reserve_siphon_pct,
|
|
high_water_pct=config.reserve_high_water_pct,
|
|
)
|
|
self.risk_tier_controller = RiskTierController()
|
|
self.correlation_matrix = CorrelationMatrix()
|
|
self.notification_service = NotificationService()
|
|
self.micro_trading_module = MicroTradingModule()
|
|
self.rebalancer = PortfolioRebalancer()
|
|
|
|
# Runtime state
|
|
self.running: bool = False
|
|
self.portfolio_state: PortfolioState | None = None
|
|
self.processed_recommendation_ids: set[str] = set()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle (stubs — wired in Task 25)
|
|
# ------------------------------------------------------------------
|
|
|
|
async def start(self) -> None:
|
|
"""Load portfolio state and enter the decision loop.
|
|
|
|
Full implementation is deferred to Task 25. This stub sets the
|
|
``running`` flag so readiness probes can report status.
|
|
"""
|
|
self.running = True
|
|
|
|
async def stop(self) -> None:
|
|
"""Graceful shutdown — cancel pending work and persist state.
|
|
|
|
Full implementation is deferred to Task 25.
|
|
"""
|
|
self.running = False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Core evaluation logic (synchronous-compatible for testing)
|
|
# ------------------------------------------------------------------
|
|
|
|
def evaluate_recommendation(
|
|
self,
|
|
rec: dict,
|
|
portfolio_state: PortfolioState,
|
|
risk_tier: RiskTierConfig,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
correlation_matrix: CorrelationMatrix,
|
|
earnings_calendar: dict,
|
|
now: datetime | None = None,
|
|
) -> TradingDecision:
|
|
"""Run all pre-trade checks and produce a TradingDecision.
|
|
|
|
The checks are applied in order; the first failure short-circuits
|
|
with a ``skip`` decision. If all checks pass the PositionSizer
|
|
is invoked and its result determines the final decision.
|
|
|
|
Parameters
|
|
----------
|
|
rec:
|
|
Recommendation dict with at least ``recommendation_id``,
|
|
``ticker``, ``confidence``, ``sector``, ``current_price``,
|
|
and ``action``.
|
|
portfolio_state:
|
|
Current portfolio snapshot.
|
|
risk_tier:
|
|
Active risk tier configuration.
|
|
circuit_breaker_state:
|
|
Current circuit breaker state.
|
|
correlation_matrix:
|
|
Correlation matrix instance for diversification checks.
|
|
earnings_calendar:
|
|
Mapping of ticker → next earnings datetime.
|
|
now:
|
|
Optional override for the current timestamp (for testing).
|
|
"""
|
|
now = now or datetime.now(tz=timezone.utc)
|
|
|
|
rec_id = rec.get("recommendation_id")
|
|
ticker = rec.get("ticker", "")
|
|
confidence = rec.get("confidence", 0.0)
|
|
sector = rec.get("sector", "")
|
|
current_price = rec.get("current_price", 0.0)
|
|
|
|
reasoning: list[str] = []
|
|
|
|
# --- a. Circuit breaker active? --------------------------------
|
|
if self.circuit_breaker.is_active(circuit_breaker_state, now=now):
|
|
reasoning.append("Circuit breaker is active")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="circuit_breaker_active",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- b. Trading window? ----------------------------------------
|
|
if not is_within_trading_window(now):
|
|
reasoning.append("Outside trading window")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="outside_trading_window",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- c. Confidence gate (early check before sizer) -------------
|
|
if confidence < risk_tier.min_confidence:
|
|
reasoning.append(
|
|
f"Confidence {confidence:.4f} below tier minimum "
|
|
f"{risk_tier.min_confidence}"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="insufficient_confidence",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- d. Deduplication check ------------------------------------
|
|
if rec_id and rec_id in self.processed_recommendation_ids:
|
|
reasoning.append(f"Recommendation {rec_id} already processed")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="duplicate_recommendation",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- e. Multiple declining positions check ---------------------
|
|
if self.check_declining_positions(portfolio_state.positions):
|
|
reasoning.append(
|
|
"Multiple declining positions — halting new entries"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="multiple_declining_positions",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- f. Max open positions check -------------------------------
|
|
max_positions = self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10
|
|
if self.check_max_positions(
|
|
portfolio_state.open_position_count, max_positions
|
|
):
|
|
reasoning.append(
|
|
f"At max open positions ({portfolio_state.open_position_count}/"
|
|
f"{max_positions})"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="max_positions_reached",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- g. Position sizing ----------------------------------------
|
|
reasoning.append("All pre-trade checks passed — computing position size")
|
|
|
|
# Build the raw correlation dict expected by PositionSizer
|
|
corr_dict: dict[tuple[str, str], float] = correlation_matrix._data
|
|
|
|
size_result: PositionSizeResult = self.position_sizer.compute(
|
|
confidence=confidence,
|
|
ticker=ticker,
|
|
sector=sector,
|
|
current_price=current_price,
|
|
active_pool=portfolio_state.active_pool,
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
correlation_matrix=corr_dict,
|
|
earnings_calendar=earnings_calendar,
|
|
absolute_position_cap=self.config.absolute_position_cap,
|
|
active_pool_minimum=self.config.active_pool_minimum,
|
|
)
|
|
|
|
reasoning.extend(size_result.adjustments)
|
|
|
|
if size_result.rejected:
|
|
reasoning.append(f"Position sizer rejected: {size_result.rejection_reason}")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason=f"position_sizer_rejected: {size_result.rejection_reason}",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# Mark recommendation as processed
|
|
if rec_id:
|
|
self.processed_recommendation_ids.add(rec_id)
|
|
|
|
return self._act_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
size_result=size_result,
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helper checks
|
|
# ------------------------------------------------------------------
|
|
|
|
def check_declining_positions(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
threshold_pct: float = 0.50,
|
|
decline_pct: float = 0.02,
|
|
) -> bool:
|
|
"""Return True if > threshold_pct of positions have > decline_pct negative unrealized P&L.
|
|
|
|
A position is considered "declining" when its unrealized P&L as a
|
|
fraction of its entry value is worse than ``-decline_pct``.
|
|
|
|
Parameters
|
|
----------
|
|
positions:
|
|
List of currently open positions.
|
|
threshold_pct:
|
|
Fraction of positions that must be declining to trigger
|
|
(default 0.50 = 50%).
|
|
decline_pct:
|
|
Minimum loss fraction to count as declining
|
|
(default 0.02 = 2%).
|
|
"""
|
|
if not positions:
|
|
return False
|
|
|
|
declining_count = 0
|
|
for pos in positions:
|
|
entry_value = pos.entry_price * pos.quantity
|
|
if entry_value <= 0:
|
|
continue
|
|
loss_pct = -pos.unrealized_pnl / entry_value
|
|
if loss_pct > decline_pct:
|
|
declining_count += 1
|
|
|
|
return declining_count > threshold_pct * len(positions)
|
|
|
|
def check_max_positions(
|
|
self,
|
|
open_count: int,
|
|
max_positions: int = 10,
|
|
) -> bool:
|
|
"""Return True if the portfolio is at the maximum number of open positions."""
|
|
return open_count >= max_positions
|
|
|
|
# ------------------------------------------------------------------
|
|
# Integration wiring — thin wrappers for the decision loop
|
|
# ------------------------------------------------------------------
|
|
|
|
def check_stop_loss_crossings(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
prices: dict[str, float],
|
|
stop_levels: dict[str, StopLevels],
|
|
) -> list[StopTrigger]:
|
|
"""Delegate to StopLossManager.check_price_crossings().
|
|
|
|
Called by the async decision loop at the configured interval
|
|
(5 min default, 60s during high-severity events).
|
|
"""
|
|
return self.stop_loss_manager.check_price_crossings(
|
|
positions, prices, stop_levels
|
|
)
|
|
|
|
def handle_position_close(
|
|
self,
|
|
realized_profit: float,
|
|
reserve_balance: float,
|
|
) -> tuple[float, float]:
|
|
"""Delegate to ReservePoolController.siphon_profit().
|
|
|
|
Called when a position close event is detected from broker
|
|
service fill events.
|
|
"""
|
|
return self.reserve_pool_controller.siphon_profit(
|
|
realized_profit, reserve_balance
|
|
)
|
|
|
|
def evaluate_risk_tier(
|
|
self,
|
|
current_tier: str,
|
|
metrics: PerformanceMetrics,
|
|
reserve_pct: float,
|
|
) -> str | None:
|
|
"""Delegate to RiskTierController.evaluate().
|
|
|
|
Scheduled to run at daily market close.
|
|
"""
|
|
return self.risk_tier_controller.evaluate(
|
|
current_tier, metrics, reserve_pct
|
|
)
|
|
|
|
def evaluate_rebalancing(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
risk_tier: RiskTierConfig,
|
|
active_pool: float,
|
|
) -> list:
|
|
"""Delegate to PortfolioRebalancer.evaluate().
|
|
|
|
Scheduled to run weekly at Monday market open.
|
|
"""
|
|
return self.rebalancer.evaluate(positions, risk_tier, active_pool)
|
|
|
|
def create_alert(
|
|
self,
|
|
event_type: str,
|
|
details: str,
|
|
) -> NotificationRecord:
|
|
"""Create a notification record for a critical event.
|
|
|
|
Delegates formatting and record creation to NotificationService.
|
|
The caller is responsible for actual delivery.
|
|
"""
|
|
message = self.notification_service.format_alert(event_type, details)
|
|
return self.notification_service.create_notification(
|
|
channel="email",
|
|
event_type=event_type,
|
|
message=message,
|
|
)
|
|
|
|
def check_micro_trade_constraints(
|
|
self,
|
|
daily_count: int,
|
|
is_within_window: bool,
|
|
circuit_breaker_active: bool,
|
|
portfolio_heat_pct: float,
|
|
max_heat: float,
|
|
) -> tuple[bool, str]:
|
|
"""Delegate to MicroTradingModule.check_constraints().
|
|
|
|
Called by the decision loop when micro-trading is enabled.
|
|
"""
|
|
micro_config = MicroTradeConfig(
|
|
enabled=self.config.micro_trading_enabled,
|
|
allocation_cap_pct=self.config.micro_trading_allocation_cap_pct,
|
|
max_daily=self.config.micro_trading_max_daily,
|
|
max_hold_minutes=self.config.micro_trading_max_hold_minutes,
|
|
)
|
|
return self.micro_trading_module.check_constraints(
|
|
config=micro_config,
|
|
daily_count=daily_count,
|
|
is_within_window=is_within_window,
|
|
circuit_breaker_active=circuit_breaker_active,
|
|
portfolio_heat_pct=portfolio_heat_pct,
|
|
max_heat=max_heat,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Decision builders
|
|
# ------------------------------------------------------------------
|
|
|
|
def _skip_decision(
|
|
self,
|
|
*,
|
|
rec_id: str | None,
|
|
ticker: str,
|
|
skip_reason: str,
|
|
risk_tier: RiskTierConfig,
|
|
portfolio_state: PortfolioState,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
reasoning: list[str],
|
|
now: datetime,
|
|
) -> TradingDecision:
|
|
return TradingDecision(
|
|
id=str(uuid.uuid4()),
|
|
recommendation_id=rec_id,
|
|
decision="skip",
|
|
skip_reason=skip_reason,
|
|
ticker=ticker,
|
|
computed_position_size=None,
|
|
computed_share_quantity=None,
|
|
risk_tier_at_decision=risk_tier.name,
|
|
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
|
|
active_pool_at_decision=portfolio_state.active_pool,
|
|
reserve_pool_at_decision=portfolio_state.reserve_pool,
|
|
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
|
|
decision_trace={"reasoning": reasoning},
|
|
created_at=now,
|
|
)
|
|
|
|
def _act_decision(
|
|
self,
|
|
*,
|
|
rec_id: str | None,
|
|
ticker: str,
|
|
size_result: PositionSizeResult,
|
|
risk_tier: RiskTierConfig,
|
|
portfolio_state: PortfolioState,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
reasoning: list[str],
|
|
now: datetime,
|
|
) -> TradingDecision:
|
|
return TradingDecision(
|
|
id=str(uuid.uuid4()),
|
|
recommendation_id=rec_id,
|
|
decision="act",
|
|
skip_reason=None,
|
|
ticker=ticker,
|
|
computed_position_size=size_result.dollar_amount,
|
|
computed_share_quantity=size_result.share_quantity,
|
|
risk_tier_at_decision=risk_tier.name,
|
|
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
|
|
active_pool_at_decision=portfolio_state.active_pool,
|
|
reserve_pool_at_decision=portfolio_state.reserve_pool,
|
|
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
|
|
decision_trace={"reasoning": reasoning},
|
|
created_at=now,
|
|
)
|