1902 lines
79 KiB
Python
1902 lines
79 KiB
Python
"""Core autonomous trading engine — decision loop and pre-trade evaluation.
|
|
|
|
Feature: autonomous-trading-engine
|
|
|
|
Coordinates all trading sub-components (PositionSizer, StopLossManager,
|
|
CircuitBreaker, ReservePoolController, RiskTierController, CorrelationMatrix)
|
|
to evaluate recommendations and produce TradingDecision records.
|
|
|
|
The ``evaluate_recommendation`` method is deliberately synchronous-compatible
|
|
so that it can be tested without real DB/Redis connections. The async
|
|
``start`` / ``stop`` methods manage the live decision loop, stop-loss
|
|
monitor, and performance metrics loop.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import httpx
|
|
|
|
try:
|
|
import numpy as np # noqa: F401
|
|
|
|
_HAS_NUMPY = True
|
|
except ImportError:
|
|
_HAS_NUMPY = False
|
|
|
|
from services.shared.config import TradingConfig
|
|
from services.shared.redis_keys import (
|
|
QUEUE_BROKER,
|
|
queue_key,
|
|
trading_dedupe_key,
|
|
)
|
|
from services.trading.circuit_breaker import CircuitBreaker
|
|
from services.trading.correlation import CorrelationMatrix
|
|
from services.trading.micro_trading import MicroTradeConfig, MicroTradingModule
|
|
from services.trading.models import (
|
|
RISK_TIER_DEFAULTS,
|
|
CircuitBreakerState,
|
|
OpenPosition,
|
|
PerformanceMetrics,
|
|
PortfolioState,
|
|
PositionSizeResult,
|
|
RiskTierConfig,
|
|
StopLevels,
|
|
StopTrigger,
|
|
TradingDecision,
|
|
)
|
|
from services.trading.notifications import NotificationRecord, NotificationService
|
|
from services.trading.performance_tracker import PerformanceComputer
|
|
from services.trading.position_sizer import PositionSizer
|
|
from services.trading.rebalancer import PortfolioRebalancer
|
|
from services.trading.reserve_pool import ReservePoolController
|
|
from services.trading.risk_tier_controller import RiskTierController
|
|
from services.trading.stop_loss_manager import StopLossManager
|
|
from services.trading.trading_window import is_market_open, is_within_trading_window
|
|
|
|
logger = logging.getLogger("trading_engine")
|
|
|
|
|
|
class TradingEngine:
|
|
"""Main autonomous trading engine.
|
|
|
|
Manages the decision loop, coordinates all sub-components,
|
|
and maintains runtime state.
|
|
|
|
Parameters
|
|
----------
|
|
pool:
|
|
asyncpg connection pool (used by async lifecycle methods).
|
|
redis:
|
|
Redis client (used for deduplication and pub/sub).
|
|
config:
|
|
Trading engine configuration.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pool: object,
|
|
redis: object,
|
|
config: TradingConfig,
|
|
) -> None:
|
|
self.pool = pool
|
|
self.redis = redis
|
|
self.config = config
|
|
|
|
# Sub-components
|
|
self.position_sizer = PositionSizer()
|
|
self.stop_loss_manager = StopLossManager()
|
|
self.circuit_breaker = CircuitBreaker()
|
|
self.reserve_pool_controller = ReservePoolController(
|
|
siphon_pct=config.reserve_siphon_pct,
|
|
high_water_pct=config.reserve_high_water_pct,
|
|
)
|
|
self.risk_tier_controller = RiskTierController()
|
|
self.correlation_matrix = CorrelationMatrix()
|
|
self.notification_service = NotificationService()
|
|
self.micro_trading_module = MicroTradingModule()
|
|
self.rebalancer = PortfolioRebalancer()
|
|
self.performance_computer = PerformanceComputer()
|
|
|
|
# Runtime state
|
|
self.running: bool = False
|
|
self.portfolio_state: PortfolioState | None = None
|
|
self.processed_recommendation_ids: set[str] = set()
|
|
|
|
# Async task management (Task 27.6)
|
|
self._tasks: list[asyncio.Task] = [] # type: ignore[type-arg]
|
|
|
|
# Active risk tier loaded from config defaults
|
|
self._active_risk_tier: RiskTierConfig = RISK_TIER_DEFAULTS.get(
|
|
config.risk_tier, RISK_TIER_DEFAULTS["moderate"]
|
|
)
|
|
|
|
# Circuit breaker runtime state
|
|
self._cb_state: CircuitBreakerState = CircuitBreakerState()
|
|
|
|
# Earnings calendar cache
|
|
self._earnings_calendar: dict = {}
|
|
|
|
# Last poll timestamp — initialised to 24 h ago so first poll
|
|
# picks up recent recommendations
|
|
self._last_poll_timestamp: datetime = datetime.now(tz=timezone.utc) - timedelta(hours=24)
|
|
|
|
# Per-ticker last-price-fetch timestamps for safety sell (Task 28.4)
|
|
self._last_price_timestamps: dict[str, datetime] = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ------------------------------------------------------------------
|
|
|
|
async def start(self) -> None:
|
|
"""Load portfolio state and spawn the async worker loops.
|
|
|
|
When ``self.pool`` is ``None`` (unit-test / lightweight mode) the
|
|
engine skips database loading and starts with an empty portfolio.
|
|
"""
|
|
# --- Load initial state from PostgreSQL (graceful degradation) ---
|
|
if self.pool is not None:
|
|
try:
|
|
await self._load_initial_state()
|
|
except Exception:
|
|
logger.exception("Failed to load initial state from DB — starting with defaults")
|
|
|
|
# Ensure we always have a portfolio state
|
|
if self.portfolio_state is None:
|
|
self.portfolio_state = PortfolioState()
|
|
|
|
self.running = True
|
|
|
|
# Spawn async worker loops
|
|
self._tasks = [
|
|
asyncio.create_task(self._decision_loop(), name="decision_loop"),
|
|
asyncio.create_task(self._stop_loss_monitor(), name="stop_loss_monitor"),
|
|
asyncio.create_task(self._performance_loop(), name="performance_loop"),
|
|
asyncio.create_task(self._risk_tier_scheduler(), name="risk_tier_scheduler"),
|
|
asyncio.create_task(self._rebalance_scheduler(), name="rebalance_scheduler"),
|
|
]
|
|
logger.info("Trading engine started with %d worker tasks", len(self._tasks))
|
|
|
|
async def stop(self) -> None:
|
|
"""Graceful shutdown — cancel all worker tasks and persist state."""
|
|
self.running = False
|
|
|
|
# Cancel all tasks
|
|
for task in self._tasks:
|
|
task.cancel()
|
|
|
|
if self._tasks:
|
|
await asyncio.gather(*self._tasks, return_exceptions=True)
|
|
|
|
self._tasks.clear()
|
|
logger.info("Trading engine stopped")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Core evaluation logic (synchronous-compatible for testing)
|
|
# ------------------------------------------------------------------
|
|
|
|
def evaluate_recommendation(
|
|
self,
|
|
rec: dict,
|
|
portfolio_state: PortfolioState,
|
|
risk_tier: RiskTierConfig,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
correlation_matrix: CorrelationMatrix,
|
|
earnings_calendar: dict,
|
|
now: datetime | None = None,
|
|
) -> TradingDecision:
|
|
"""Run all pre-trade checks and produce a TradingDecision.
|
|
|
|
The checks are applied in order; the first failure short-circuits
|
|
with a ``skip`` decision. If all checks pass the PositionSizer
|
|
is invoked and its result determines the final decision.
|
|
|
|
Parameters
|
|
----------
|
|
rec:
|
|
Recommendation dict with at least ``recommendation_id``,
|
|
``ticker``, ``confidence``, ``sector``, ``current_price``,
|
|
and ``action``.
|
|
portfolio_state:
|
|
Current portfolio snapshot.
|
|
risk_tier:
|
|
Active risk tier configuration.
|
|
circuit_breaker_state:
|
|
Current circuit breaker state.
|
|
correlation_matrix:
|
|
Correlation matrix instance for diversification checks.
|
|
earnings_calendar:
|
|
Mapping of ticker → next earnings datetime.
|
|
now:
|
|
Optional override for the current timestamp (for testing).
|
|
"""
|
|
now = now or datetime.now(tz=timezone.utc)
|
|
|
|
rec_id = rec.get("recommendation_id")
|
|
ticker = rec.get("ticker", "")
|
|
confidence = rec.get("confidence", 0.0)
|
|
sector = rec.get("sector", "")
|
|
current_price = rec.get("current_price", 0.0)
|
|
|
|
reasoning: list[str] = []
|
|
|
|
# --- a. Circuit breaker active? --------------------------------
|
|
if self.circuit_breaker.is_active(circuit_breaker_state, now=now):
|
|
reasoning.append("Circuit breaker is active")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="circuit_breaker_active",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- b. Trading window? ----------------------------------------
|
|
if not is_within_trading_window(now):
|
|
reasoning.append("Outside trading window")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="outside_trading_window",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- c. Confidence gate (early check before sizer) -------------
|
|
if confidence < risk_tier.min_confidence:
|
|
reasoning.append(
|
|
f"Confidence {confidence:.4f} below tier minimum "
|
|
f"{risk_tier.min_confidence}"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="insufficient_confidence",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- d. Deduplication check ------------------------------------
|
|
if rec_id and rec_id in self.processed_recommendation_ids:
|
|
reasoning.append(f"Recommendation {rec_id} already processed")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="duplicate_recommendation",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- e. Multiple declining positions check ---------------------
|
|
if self.check_declining_positions(portfolio_state.positions):
|
|
reasoning.append(
|
|
"Multiple declining positions — halting new entries"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="multiple_declining_positions",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- f. Max open positions check -------------------------------
|
|
max_positions = self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10
|
|
if self.check_max_positions(
|
|
portfolio_state.open_position_count, max_positions
|
|
):
|
|
reasoning.append(
|
|
f"At max open positions ({portfolio_state.open_position_count}/"
|
|
f"{max_positions})"
|
|
)
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason="max_positions_reached",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# --- g. Position sizing ----------------------------------------
|
|
reasoning.append("All pre-trade checks passed — computing position size")
|
|
|
|
# Build the raw correlation dict expected by PositionSizer
|
|
corr_dict: dict[tuple[str, str], float] = correlation_matrix._data
|
|
|
|
size_result: PositionSizeResult = self.position_sizer.compute(
|
|
confidence=confidence,
|
|
ticker=ticker,
|
|
sector=sector,
|
|
current_price=current_price,
|
|
active_pool=portfolio_state.active_pool,
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
correlation_matrix=corr_dict,
|
|
earnings_calendar=earnings_calendar,
|
|
absolute_position_cap=self.config.absolute_position_cap,
|
|
active_pool_minimum=self.config.active_pool_minimum,
|
|
)
|
|
|
|
reasoning.extend(size_result.adjustments)
|
|
|
|
if size_result.rejected:
|
|
reasoning.append(f"Position sizer rejected: {size_result.rejection_reason}")
|
|
return self._skip_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
skip_reason=f"position_sizer_rejected: {size_result.rejection_reason}",
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# Mark recommendation as processed
|
|
if rec_id:
|
|
self.processed_recommendation_ids.add(rec_id)
|
|
|
|
return self._act_decision(
|
|
rec_id=rec_id,
|
|
ticker=ticker,
|
|
size_result=size_result,
|
|
risk_tier=risk_tier,
|
|
portfolio_state=portfolio_state,
|
|
circuit_breaker_state=circuit_breaker_state,
|
|
reasoning=reasoning,
|
|
now=now,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helper checks
|
|
# ------------------------------------------------------------------
|
|
|
|
def check_declining_positions(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
threshold_pct: float = 0.50,
|
|
decline_pct: float = 0.02,
|
|
) -> bool:
|
|
"""Return True if > threshold_pct of positions have > decline_pct negative unrealized P&L.
|
|
|
|
A position is considered "declining" when its unrealized P&L as a
|
|
fraction of its entry value is worse than ``-decline_pct``.
|
|
|
|
Parameters
|
|
----------
|
|
positions:
|
|
List of currently open positions.
|
|
threshold_pct:
|
|
Fraction of positions that must be declining to trigger
|
|
(default 0.50 = 50%).
|
|
decline_pct:
|
|
Minimum loss fraction to count as declining
|
|
(default 0.02 = 2%).
|
|
"""
|
|
if not positions:
|
|
return False
|
|
|
|
declining_count = 0
|
|
for pos in positions:
|
|
entry_value = pos.entry_price * pos.quantity
|
|
if entry_value <= 0:
|
|
continue
|
|
loss_pct = -pos.unrealized_pnl / entry_value
|
|
if loss_pct > decline_pct:
|
|
declining_count += 1
|
|
|
|
return declining_count > threshold_pct * len(positions)
|
|
|
|
def check_max_positions(
|
|
self,
|
|
open_count: int,
|
|
max_positions: int = 10,
|
|
) -> bool:
|
|
"""Return True if the portfolio is at the maximum number of open positions."""
|
|
return open_count >= max_positions
|
|
|
|
# ------------------------------------------------------------------
|
|
# Integration wiring — thin wrappers for the decision loop
|
|
# ------------------------------------------------------------------
|
|
|
|
def check_stop_loss_crossings(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
prices: dict[str, float],
|
|
stop_levels: dict[str, StopLevels],
|
|
) -> list[StopTrigger]:
|
|
"""Delegate to StopLossManager.check_price_crossings().
|
|
|
|
Called by the async decision loop at the configured interval
|
|
(5 min default, 60s during high-severity events).
|
|
"""
|
|
return self.stop_loss_manager.check_price_crossings(
|
|
positions, prices, stop_levels
|
|
)
|
|
|
|
def handle_position_close(
|
|
self,
|
|
realized_profit: float,
|
|
reserve_balance: float,
|
|
) -> tuple[float, float]:
|
|
"""Delegate to ReservePoolController.siphon_profit().
|
|
|
|
Called when a position close event is detected from broker
|
|
service fill events.
|
|
"""
|
|
return self.reserve_pool_controller.siphon_profit(
|
|
realized_profit, reserve_balance
|
|
)
|
|
|
|
def evaluate_risk_tier(
|
|
self,
|
|
current_tier: str,
|
|
metrics: PerformanceMetrics,
|
|
reserve_pct: float,
|
|
) -> str | None:
|
|
"""Delegate to RiskTierController.evaluate().
|
|
|
|
Scheduled to run at daily market close.
|
|
"""
|
|
return self.risk_tier_controller.evaluate(
|
|
current_tier, metrics, reserve_pct
|
|
)
|
|
|
|
def evaluate_rebalancing(
|
|
self,
|
|
positions: list[OpenPosition],
|
|
risk_tier: RiskTierConfig,
|
|
active_pool: float,
|
|
) -> list:
|
|
"""Delegate to PortfolioRebalancer.evaluate().
|
|
|
|
Scheduled to run weekly at Monday market open.
|
|
"""
|
|
return self.rebalancer.evaluate(positions, risk_tier, active_pool)
|
|
|
|
def create_alert(
|
|
self,
|
|
event_type: str,
|
|
details: str,
|
|
) -> NotificationRecord:
|
|
"""Create a notification record for a critical event.
|
|
|
|
Delegates formatting and record creation to NotificationService.
|
|
The caller is responsible for actual delivery.
|
|
"""
|
|
message = self.notification_service.format_alert(event_type, details)
|
|
return self.notification_service.create_notification(
|
|
channel="email",
|
|
event_type=event_type,
|
|
message=message,
|
|
)
|
|
|
|
def check_micro_trade_constraints(
|
|
self,
|
|
daily_count: int,
|
|
is_within_window: bool,
|
|
circuit_breaker_active: bool,
|
|
portfolio_heat_pct: float,
|
|
max_heat: float,
|
|
) -> tuple[bool, str]:
|
|
"""Delegate to MicroTradingModule.check_constraints().
|
|
|
|
Called by the decision loop when micro-trading is enabled.
|
|
"""
|
|
micro_config = MicroTradeConfig(
|
|
enabled=self.config.micro_trading_enabled,
|
|
allocation_cap_pct=self.config.micro_trading_allocation_cap_pct,
|
|
max_daily=self.config.micro_trading_max_daily,
|
|
max_hold_minutes=self.config.micro_trading_max_hold_minutes,
|
|
)
|
|
return self.micro_trading_module.check_constraints(
|
|
config=micro_config,
|
|
daily_count=daily_count,
|
|
is_within_window=is_within_window,
|
|
circuit_breaker_active=circuit_breaker_active,
|
|
portfolio_heat_pct=portfolio_heat_pct,
|
|
max_heat=max_heat,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Async worker loops
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _load_initial_state(self) -> None:
|
|
"""Load portfolio state, risk tier, reserve pool, and CB status from DB."""
|
|
if self.pool is None:
|
|
return
|
|
|
|
# Load reserve pool balance
|
|
reserve_balance = 0.0
|
|
try:
|
|
row = await self.pool.fetchrow(
|
|
"SELECT balance_after FROM reserve_pool_ledger ORDER BY created_at DESC LIMIT 1"
|
|
)
|
|
if row:
|
|
reserve_balance = float(row["balance_after"])
|
|
except Exception:
|
|
logger.debug("Could not load reserve pool balance — using 0.0")
|
|
|
|
# Load circuit breaker state (unresolved events)
|
|
try:
|
|
cb_row = await self.pool.fetchrow(
|
|
"SELECT trigger_type, triggered_at, cooldown_expires "
|
|
"FROM circuit_breaker_events WHERE resolved_at IS NULL "
|
|
"ORDER BY created_at DESC LIMIT 1"
|
|
)
|
|
if cb_row:
|
|
self._cb_state = CircuitBreakerState(
|
|
active=True,
|
|
trigger_type=cb_row["trigger_type"],
|
|
triggered_at=cb_row["triggered_at"],
|
|
cooldown_expires=cb_row["cooldown_expires"],
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not load circuit breaker state — using inactive")
|
|
|
|
# Build portfolio state from broker account (real buying power)
|
|
initial_capital = 100000.0
|
|
try:
|
|
acct_row = await self.pool.fetchrow(
|
|
"SELECT config FROM broker_accounts WHERE active = TRUE ORDER BY created_at DESC LIMIT 1"
|
|
)
|
|
if acct_row and acct_row["config"]:
|
|
cfg = acct_row["config"] if isinstance(acct_row["config"], dict) else {}
|
|
initial_capital = float(cfg.get("portfolio_value", cfg.get("cash", 100000.0)))
|
|
except Exception:
|
|
logger.debug("Could not load broker account — using default capital")
|
|
|
|
# Load actual positions to calculate invested amount
|
|
invested = 0.0
|
|
open_count = 0
|
|
try:
|
|
pos_rows = await self.pool.fetch(
|
|
"SELECT quantity, avg_entry_price FROM positions WHERE quantity > 0"
|
|
)
|
|
for pr in pos_rows:
|
|
invested += float(pr["quantity"]) * float(pr["avg_entry_price"])
|
|
open_count += 1
|
|
except Exception:
|
|
logger.debug("Could not load positions — assuming no invested capital")
|
|
|
|
available = max(0.0, initial_capital - reserve_balance - invested)
|
|
self.portfolio_state = PortfolioState(
|
|
reserve_pool=reserve_balance,
|
|
active_pool=available,
|
|
total_value=initial_capital,
|
|
open_position_count=open_count,
|
|
)
|
|
logger.info(
|
|
"Portfolio state: total=$%.2f invested=$%.2f available=$%.2f reserve=$%.2f positions=%d",
|
|
initial_capital, invested, available, reserve_balance, open_count,
|
|
)
|
|
|
|
# Compute initial correlation matrix from market data
|
|
await self._compute_correlation_matrix()
|
|
|
|
async def _decision_loop(self) -> None:
|
|
"""Poll recommendations and evaluate them in a continuous loop.
|
|
|
|
Task 27.3: Main decision loop that polls the recommendations table,
|
|
checks Redis deduplication, evaluates each recommendation, and
|
|
pushes "act" decisions to the broker queue.
|
|
"""
|
|
while self.running:
|
|
try:
|
|
await asyncio.sleep(self.config.polling_interval_seconds)
|
|
if not self.running:
|
|
break
|
|
|
|
if self.pool is None:
|
|
continue
|
|
|
|
# Poll recommendations from PostgreSQL
|
|
recs: list[dict] = []
|
|
try:
|
|
rows = await self.pool.fetch(
|
|
"SELECT * FROM recommendations "
|
|
"WHERE action IN ('buy','sell') "
|
|
"AND mode IN ('paper_eligible','live_eligible') "
|
|
"AND generated_at > NOW() - INTERVAL '2 hours' "
|
|
"AND generated_at > $1 "
|
|
"ORDER BY confidence DESC "
|
|
"LIMIT 50",
|
|
self._last_poll_timestamp,
|
|
)
|
|
if rows:
|
|
# Advance timestamp to the oldest rec in this batch
|
|
# so next poll picks up where we left off
|
|
last_gen = max(r["generated_at"] for r in rows)
|
|
self._last_poll_timestamp = last_gen
|
|
recs = [dict(r) for r in rows]
|
|
if recs:
|
|
logger.info(
|
|
"Polled %d recommendations (highest confidence=%.3f)",
|
|
len(recs), recs[0].get("confidence", 0),
|
|
)
|
|
|
|
# Fetch current prices for all tickers in this batch
|
|
batch_tickers = list({r.get("ticker", "") for r in recs if r.get("ticker")})
|
|
price_map: dict[str, float] = {}
|
|
if batch_tickers and self.pool is not None:
|
|
try:
|
|
price_rows = await self.pool.fetch(
|
|
"""SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float AS price
|
|
FROM market_snapshots
|
|
WHERE ticker = ANY($1) AND snapshot_type = 'bar'
|
|
ORDER BY ticker, captured_at DESC""",
|
|
batch_tickers,
|
|
)
|
|
price_map = {r["ticker"]: r["price"] for r in price_rows if r["price"]}
|
|
except Exception:
|
|
logger.debug("Could not fetch prices from market_snapshots")
|
|
|
|
# Fall back to Polygon API for any missing prices
|
|
missing = [t for t in batch_tickers if t not in price_map]
|
|
if missing:
|
|
fetched = await self._fetch_current_prices(missing)
|
|
price_map.update(fetched)
|
|
except Exception:
|
|
logger.debug("Could not poll recommendations — table may not exist yet")
|
|
continue
|
|
|
|
for rec in recs:
|
|
try:
|
|
rec_id = str(rec.get("recommendation_id", rec.get("id", "")))
|
|
ticker = rec.get("ticker", "")
|
|
|
|
# Inject current price from market data
|
|
if not rec.get("current_price") and ticker in price_map:
|
|
rec["current_price"] = price_map[ticker]
|
|
|
|
# Skip if no price available
|
|
if not rec.get("current_price") or rec["current_price"] <= 0:
|
|
continue
|
|
|
|
# Redis deduplication check
|
|
if self.redis is not None:
|
|
dedupe_key = trading_dedupe_key(rec_id)
|
|
already = await self.redis.get(dedupe_key)
|
|
if already:
|
|
continue
|
|
# Set dedupe key with 24h TTL before evaluation
|
|
await self.redis.set(dedupe_key, "1", ex=86400)
|
|
|
|
# Ensure portfolio state exists
|
|
if self.portfolio_state is None:
|
|
self.portfolio_state = PortfolioState()
|
|
|
|
action = rec.get("action", "buy")
|
|
|
|
# --- Sell path: skip position sizing, look up existing position ---
|
|
if action == "sell":
|
|
pos_row = None
|
|
try:
|
|
pos_row = await self.pool.fetchrow(
|
|
"SELECT quantity, avg_entry_price, current_price "
|
|
"FROM positions WHERE ticker = $1 AND quantity > 0",
|
|
ticker,
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not look up position for sell: %s", ticker)
|
|
|
|
if pos_row is None:
|
|
logger.info("Sell recommendation for %s but no open position — skipping", ticker)
|
|
continue
|
|
|
|
sell_qty = int(pos_row["quantity"])
|
|
sell_price = rec.get("current_price", 0.0)
|
|
estimated_proceeds = sell_qty * sell_price
|
|
|
|
order_job = {
|
|
"trading_decision_id": str(uuid.uuid4()),
|
|
"ticker": ticker,
|
|
"action": "sell",
|
|
"quantity": sell_qty,
|
|
"order_type": "market",
|
|
"source": "trading_engine",
|
|
}
|
|
if self.redis is not None:
|
|
broker_queue = queue_key(QUEUE_BROKER)
|
|
await self.redis.rpush(broker_queue, json.dumps(order_job))
|
|
logger.info(
|
|
"Pushed sell order for %s (%d shares, ~$%.2f) to broker queue",
|
|
ticker, sell_qty, estimated_proceeds,
|
|
)
|
|
|
|
# Update portfolio state
|
|
if self.portfolio_state:
|
|
self.portfolio_state.open_position_count = max(
|
|
0, self.portfolio_state.open_position_count - 1
|
|
)
|
|
self.portfolio_state.active_pool += estimated_proceeds
|
|
|
|
# Mark as processed
|
|
if rec_id:
|
|
self.processed_recommendation_ids.add(rec_id)
|
|
continue
|
|
|
|
# --- Buy path: evaluate recommendation through position sizer ---
|
|
# Evaluate recommendation
|
|
decision = self.evaluate_recommendation(
|
|
rec=rec,
|
|
portfolio_state=self.portfolio_state,
|
|
risk_tier=self._active_risk_tier,
|
|
circuit_breaker_state=self._cb_state,
|
|
correlation_matrix=self.correlation_matrix,
|
|
earnings_calendar=self._earnings_calendar,
|
|
)
|
|
|
|
logger.info(
|
|
"Decision for %s: %s (reason=%s, size=$%.2f, shares=%d)",
|
|
decision.ticker,
|
|
decision.decision,
|
|
decision.skip_reason or "n/a",
|
|
decision.computed_position_size or 0,
|
|
decision.computed_share_quantity or 0,
|
|
)
|
|
|
|
# For "act" decisions: push order to broker queue
|
|
if decision.decision == "act":
|
|
# Deduct from active pool immediately to prevent over-allocation
|
|
order_cost = (decision.computed_share_quantity or 0) * rec.get("current_price", 0)
|
|
if self.portfolio_state and order_cost > 0:
|
|
self.portfolio_state.active_pool = max(0.0, self.portfolio_state.active_pool - order_cost)
|
|
self.portfolio_state.open_position_count += 1
|
|
|
|
order_job = {
|
|
"trading_decision_id": decision.id,
|
|
"ticker": decision.ticker,
|
|
"action": rec.get("action", "buy"),
|
|
"quantity": decision.computed_share_quantity,
|
|
"order_type": "market",
|
|
"source": "trading_engine",
|
|
}
|
|
if self.redis is not None:
|
|
broker_queue = queue_key(QUEUE_BROKER)
|
|
await self.redis.rpush(broker_queue, json.dumps(order_job))
|
|
logger.info(
|
|
"Pushed order for %s (%d shares, $%.2f) to broker queue — remaining pool: $%.2f",
|
|
decision.ticker,
|
|
decision.computed_share_quantity or 0,
|
|
order_cost,
|
|
self.portfolio_state.active_pool if self.portfolio_state else 0,
|
|
)
|
|
|
|
# Create stop-loss levels for the new position
|
|
if self.pool is not None:
|
|
try:
|
|
price = rec.get("current_price", 0.0)
|
|
atr_est = price * 0.02 # 2% ATR estimate
|
|
tier = self._active_risk_tier
|
|
sl_price = price * (1 - 0.02 * tier.stop_loss_atr_multiplier)
|
|
tp_price = price * (1 + 0.02 * tier.stop_loss_atr_multiplier * tier.reward_risk_ratio)
|
|
await self.pool.execute(
|
|
"INSERT INTO position_stop_levels "
|
|
"(ticker, entry_price, stop_loss_price, take_profit_price, "
|
|
"trailing_stop_active, atr_value, atr_multiplier, "
|
|
"reward_risk_ratio, signal_confidence, is_micro_trade, active) "
|
|
"VALUES ($1, $2, $3, $4, FALSE, $5, $6, $7, $8, FALSE, TRUE) "
|
|
"ON CONFLICT (ticker) WHERE active = TRUE DO UPDATE SET "
|
|
"entry_price = EXCLUDED.entry_price, "
|
|
"stop_loss_price = EXCLUDED.stop_loss_price, "
|
|
"take_profit_price = EXCLUDED.take_profit_price, "
|
|
"updated_at = NOW()",
|
|
decision.ticker, price, sl_price, tp_price,
|
|
atr_est, tier.stop_loss_atr_multiplier,
|
|
tier.reward_risk_ratio,
|
|
rec.get("confidence", 0.8),
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not create stop levels for %s", decision.ticker)
|
|
|
|
# Persist decision
|
|
await self._persist_decision(decision)
|
|
|
|
except Exception:
|
|
logger.exception("Error evaluating recommendation %s", rec.get("recommendation_id", "?"))
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Unexpected error in decision loop")
|
|
if self.running:
|
|
await asyncio.sleep(5)
|
|
|
|
async def _stop_loss_monitor(self) -> None:
|
|
"""Monitor open positions for stop-loss and take-profit crossings.
|
|
|
|
Task 28.1: Periodically checks current prices against stop levels
|
|
and submits sell orders for triggered positions.
|
|
"""
|
|
while self.running:
|
|
try:
|
|
await asyncio.sleep(self.config.stop_loss_check_interval_seconds)
|
|
if not self.running:
|
|
break
|
|
|
|
now = datetime.now(tz=timezone.utc)
|
|
|
|
# Skip if not market hours
|
|
if not is_market_open(now):
|
|
continue
|
|
|
|
if self.pool is None:
|
|
continue
|
|
|
|
# Load positions and stop levels from DB
|
|
positions = await self._load_open_positions()
|
|
stop_levels = await self._load_stop_levels()
|
|
|
|
if not positions:
|
|
continue
|
|
|
|
# Fetch current prices
|
|
tickers = [p.ticker for p in positions]
|
|
prices = await self._fetch_current_prices(tickers)
|
|
|
|
# Update last-price timestamps for tickers that returned data
|
|
for ticker in tickers:
|
|
if ticker in prices:
|
|
self._last_price_timestamps[ticker] = now
|
|
|
|
# Safety sell for missing price data (Task 28.4)
|
|
for pos in positions:
|
|
if pos.ticker not in prices:
|
|
last_ts = self._last_price_timestamps.get(pos.ticker)
|
|
if last_ts and (now - last_ts) > timedelta(minutes=15):
|
|
logger.warning(
|
|
"No price data for %s for >15 min — submitting safety sell",
|
|
pos.ticker,
|
|
)
|
|
await self._submit_sell_order(
|
|
pos.ticker, pos.quantity, "safety_sell_missing_price"
|
|
)
|
|
|
|
# Check crossings
|
|
triggers = self.check_stop_loss_crossings(positions, prices, stop_levels)
|
|
|
|
for trigger in triggers:
|
|
# Find the position to get quantity
|
|
pos_match = next((p for p in positions if p.ticker == trigger.ticker), None)
|
|
if pos_match is None:
|
|
continue
|
|
|
|
await self._submit_sell_order(
|
|
trigger.ticker,
|
|
pos_match.quantity,
|
|
f"{trigger.trigger_type}_triggered",
|
|
)
|
|
logger.info(
|
|
"Stop-loss monitor: %s triggered for %s at %.2f (trigger: %.2f)",
|
|
trigger.trigger_type,
|
|
trigger.ticker,
|
|
trigger.current_price,
|
|
trigger.trigger_price,
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Unexpected error in stop-loss monitor")
|
|
if self.running:
|
|
await asyncio.sleep(5)
|
|
|
|
async def _performance_loop(self) -> None:
|
|
"""Compute and update performance metrics periodically.
|
|
|
|
Task 29.1: Runs every 5 minutes during market hours, computing
|
|
portfolio metrics and updating self.portfolio_state.
|
|
Task 29.2: Persists a daily snapshot at end of trading day.
|
|
"""
|
|
last_snapshot_date: str | None = None
|
|
|
|
while self.running:
|
|
try:
|
|
await asyncio.sleep(300) # 5 minutes
|
|
if not self.running:
|
|
break
|
|
|
|
now = datetime.now(tz=timezone.utc)
|
|
|
|
# Skip if not market hours
|
|
if not is_market_open(now):
|
|
# Check if we should persist end-of-day snapshot (Task 29.2)
|
|
from services.trading.trading_window import ET
|
|
et_now = now.astimezone(ET)
|
|
today_str = et_now.strftime("%Y-%m-%d")
|
|
|
|
# After 4:00 PM ET and haven't snapshotted today
|
|
if et_now.hour >= 16 and last_snapshot_date != today_str:
|
|
await self._persist_daily_snapshot(now)
|
|
last_snapshot_date = today_str
|
|
|
|
continue
|
|
|
|
# Compute metrics from current state
|
|
if self.portfolio_state is None:
|
|
continue
|
|
|
|
# Sync active_pool with actual positions from DB
|
|
try:
|
|
if self.pool is not None:
|
|
pos_rows = await self.pool.fetch(
|
|
"SELECT quantity, avg_entry_price FROM positions WHERE quantity > 0"
|
|
)
|
|
invested = sum(float(r["quantity"]) * float(r["avg_entry_price"]) for r in pos_rows)
|
|
open_count = len(pos_rows)
|
|
available = max(0.0, self.portfolio_state.total_value - self.portfolio_state.reserve_pool - invested)
|
|
self.portfolio_state.active_pool = available
|
|
self.portfolio_state.open_position_count = open_count
|
|
except Exception:
|
|
logger.debug("Could not sync positions for portfolio state")
|
|
|
|
# Update portfolio heat and metrics from current positions
|
|
try:
|
|
metrics = self.performance_computer.compute_metrics(
|
|
closed_trades=[],
|
|
portfolio_value=self.portfolio_state.total_value,
|
|
active_pool=self.portfolio_state.active_pool,
|
|
reserve_pool=self.portfolio_state.reserve_pool,
|
|
daily_pnl=0.0,
|
|
unrealized_pnl=sum(
|
|
p.unrealized_pnl for p in self.portfolio_state.positions
|
|
),
|
|
portfolio_heat=self.portfolio_state.portfolio_heat,
|
|
daily_returns=[],
|
|
)
|
|
logger.debug(
|
|
"Performance update: value=%.2f heat=%.4f",
|
|
metrics.total_portfolio_value,
|
|
metrics.portfolio_heat,
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not compute performance metrics")
|
|
|
|
# Refresh correlation matrix every 5 minutes
|
|
try:
|
|
await self._compute_correlation_matrix()
|
|
except Exception:
|
|
logger.debug("Could not refresh correlation matrix")
|
|
|
|
# Profit-taking: sell positions that have exceeded the take-profit threshold
|
|
try:
|
|
await self._check_profit_taking()
|
|
except Exception:
|
|
logger.debug("Could not run profit-taking check")
|
|
|
|
# Circuit breaker: check daily loss threshold
|
|
try:
|
|
await self._check_circuit_breaker_daily_loss()
|
|
except Exception:
|
|
logger.debug("Could not run circuit breaker daily loss check")
|
|
|
|
# Persist a snapshot every cycle (during market hours) for the performance tab
|
|
try:
|
|
await self._persist_daily_snapshot(now)
|
|
except Exception:
|
|
logger.debug("Could not persist snapshot")
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Unexpected error in performance loop")
|
|
if self.running:
|
|
await asyncio.sleep(5)
|
|
|
|
async def _risk_tier_scheduler(self) -> None:
|
|
"""Evaluate risk tier at daily market close (16:00 ET).
|
|
|
|
Task 30.1: Computes seconds until next 16:00 ET, sleeps until then,
|
|
loads latest PerformanceMetrics, computes reserve_pct, calls
|
|
evaluate_risk_tier(), and persists tier changes.
|
|
"""
|
|
from services.trading.trading_window import ET
|
|
|
|
while self.running:
|
|
try:
|
|
# Compute seconds until next 16:00 ET
|
|
now_utc = datetime.now(tz=timezone.utc)
|
|
et_now = now_utc.astimezone(ET)
|
|
target_today = et_now.replace(hour=16, minute=0, second=0, microsecond=0)
|
|
|
|
if et_now >= target_today:
|
|
# Already past 16:00 ET today — target tomorrow
|
|
target = target_today + timedelta(days=1)
|
|
else:
|
|
target = target_today
|
|
|
|
# Skip weekends
|
|
while target.weekday() > 4: # Saturday=5, Sunday=6
|
|
target += timedelta(days=1)
|
|
|
|
sleep_seconds = (target - et_now).total_seconds()
|
|
if sleep_seconds > 0:
|
|
await asyncio.sleep(sleep_seconds)
|
|
|
|
if not self.running:
|
|
break
|
|
|
|
if self.portfolio_state is None:
|
|
continue
|
|
|
|
# Load latest PerformanceMetrics from portfolio_snapshots or compute fresh
|
|
metrics: PerformanceMetrics | None = None
|
|
if self.pool is not None:
|
|
try:
|
|
row = await self.pool.fetchrow(
|
|
"SELECT metrics FROM portfolio_snapshots "
|
|
"ORDER BY snapshot_date DESC LIMIT 1"
|
|
)
|
|
if row and row["metrics"]:
|
|
m = json.loads(row["metrics"]) if isinstance(row["metrics"], str) else row["metrics"]
|
|
if m:
|
|
metrics = PerformanceMetrics(
|
|
total_portfolio_value=m.get("total_portfolio_value", self.portfolio_state.total_value),
|
|
active_pool=m.get("active_pool", self.portfolio_state.active_pool),
|
|
reserve_pool=m.get("reserve_pool", self.portfolio_state.reserve_pool),
|
|
unrealized_pnl=m.get("unrealized_pnl", 0.0),
|
|
realized_pnl=m.get("realized_pnl", 0.0),
|
|
daily_pnl=m.get("daily_pnl", 0.0),
|
|
win_count=m.get("win_count", 0),
|
|
loss_count=m.get("loss_count", 0),
|
|
win_rate=m.get("win_rate", 0.0),
|
|
avg_win=m.get("avg_win", 0.0),
|
|
avg_loss=m.get("avg_loss", 0.0),
|
|
profit_factor=m.get("profit_factor", 0.0),
|
|
sharpe_ratio=m.get("sharpe_ratio", 0.0),
|
|
max_drawdown=m.get("max_drawdown", 0.0),
|
|
current_drawdown_pct=m.get("current_drawdown_pct", 0.0),
|
|
portfolio_heat=m.get("portfolio_heat", 0.0),
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not load metrics from portfolio_snapshots")
|
|
|
|
# Fall back to computing fresh metrics
|
|
if metrics is None:
|
|
metrics = self.performance_computer.compute_metrics(
|
|
closed_trades=[],
|
|
portfolio_value=self.portfolio_state.total_value,
|
|
active_pool=self.portfolio_state.active_pool,
|
|
reserve_pool=self.portfolio_state.reserve_pool,
|
|
daily_pnl=0.0,
|
|
unrealized_pnl=sum(
|
|
p.unrealized_pnl for p in self.portfolio_state.positions
|
|
),
|
|
portfolio_heat=self.portfolio_state.portfolio_heat,
|
|
daily_returns=[],
|
|
)
|
|
|
|
# Compute reserve_pct
|
|
total_value = self.portfolio_state.total_value
|
|
reserve_pct = (
|
|
self.portfolio_state.reserve_pool / total_value
|
|
if total_value > 0
|
|
else 0.0
|
|
)
|
|
|
|
# Evaluate risk tier
|
|
current_tier = self.config.risk_tier
|
|
new_tier = self.evaluate_risk_tier(current_tier, metrics, reserve_pct)
|
|
|
|
if new_tier is not None and new_tier != current_tier:
|
|
# Persist to risk_tier_history
|
|
if self.pool is not None:
|
|
try:
|
|
await self.pool.execute(
|
|
"INSERT INTO risk_tier_history "
|
|
"(previous_tier, new_tier, trigger_source, trigger_metrics, created_at) "
|
|
"VALUES ($1, $2, $3, $4, $5)",
|
|
current_tier,
|
|
new_tier,
|
|
"auto_adjustment",
|
|
json.dumps({
|
|
"win_rate": metrics.win_rate,
|
|
"current_drawdown_pct": metrics.current_drawdown_pct,
|
|
"reserve_pct": reserve_pct,
|
|
"sharpe_ratio": metrics.sharpe_ratio,
|
|
}),
|
|
datetime.now(tz=timezone.utc),
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not persist risk tier change")
|
|
|
|
# Update config and active tier
|
|
self.config.risk_tier = new_tier
|
|
self._active_risk_tier = RISK_TIER_DEFAULTS.get(
|
|
new_tier, RISK_TIER_DEFAULTS["moderate"]
|
|
)
|
|
|
|
# Create alert notification
|
|
self.create_alert(
|
|
"risk_tier_changed",
|
|
f"Risk tier changed from {current_tier} to {new_tier} "
|
|
f"(win_rate={metrics.win_rate:.2%}, "
|
|
f"drawdown={metrics.current_drawdown_pct:.2%}, "
|
|
f"reserve={reserve_pct:.2%})",
|
|
)
|
|
|
|
logger.info(
|
|
"Risk tier changed: %s → %s (win_rate=%.2f, drawdown=%.2f, reserve=%.2f)",
|
|
current_tier,
|
|
new_tier,
|
|
metrics.win_rate,
|
|
metrics.current_drawdown_pct,
|
|
reserve_pct,
|
|
)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Unexpected error in risk tier scheduler")
|
|
if self.running:
|
|
await asyncio.sleep(60)
|
|
|
|
async def _rebalance_scheduler(self) -> None:
|
|
"""Evaluate portfolio rebalancing weekly at Monday 09:45 ET.
|
|
|
|
Task 30.2: Computes seconds until next Monday 09:45 ET, sleeps until
|
|
then, loads positions and risk tier, calls evaluate_rebalancing(),
|
|
and pushes rebalance orders to the broker queue.
|
|
"""
|
|
from services.trading.trading_window import ET, WINDOW_OPEN
|
|
|
|
while self.running:
|
|
try:
|
|
# Compute seconds until next Monday 09:45 ET
|
|
now_utc = datetime.now(tz=timezone.utc)
|
|
et_now = now_utc.astimezone(ET)
|
|
|
|
# Find next Monday
|
|
days_until_monday = (7 - et_now.weekday()) % 7
|
|
if days_until_monday == 0:
|
|
# It's Monday — check if we're past 09:45
|
|
target_today = et_now.replace(
|
|
hour=WINDOW_OPEN.hour,
|
|
minute=WINDOW_OPEN.minute,
|
|
second=0,
|
|
microsecond=0,
|
|
)
|
|
if et_now >= target_today:
|
|
# Already past 09:45 on Monday — target next Monday
|
|
days_until_monday = 7
|
|
else:
|
|
days_until_monday = 0
|
|
|
|
target = et_now.replace(
|
|
hour=WINDOW_OPEN.hour,
|
|
minute=WINDOW_OPEN.minute,
|
|
second=0,
|
|
microsecond=0,
|
|
) + timedelta(days=days_until_monday)
|
|
|
|
sleep_seconds = (target - et_now).total_seconds()
|
|
if sleep_seconds > 0:
|
|
await asyncio.sleep(sleep_seconds)
|
|
|
|
if not self.running:
|
|
break
|
|
|
|
# Respect circuit breaker status
|
|
if self.circuit_breaker.is_active(self._cb_state, now=datetime.now(tz=timezone.utc)):
|
|
logger.info("Rebalance skipped — circuit breaker is active")
|
|
continue
|
|
|
|
if self.portfolio_state is None:
|
|
continue
|
|
|
|
# Load current positions
|
|
positions = self.portfolio_state.positions
|
|
if self.pool is not None:
|
|
try:
|
|
positions = await self._load_open_positions()
|
|
except Exception:
|
|
logger.debug("Could not load positions for rebalancing")
|
|
|
|
if not positions:
|
|
continue
|
|
|
|
# Evaluate rebalancing
|
|
max_positions = (
|
|
self.config.max_open_positions
|
|
if hasattr(self.config, "max_open_positions")
|
|
else 10
|
|
)
|
|
rebalance_orders = self.rebalancer.evaluate(
|
|
positions,
|
|
self._active_risk_tier,
|
|
self.portfolio_state.active_pool,
|
|
max_positions,
|
|
)
|
|
|
|
# Push rebalance orders to broker queue
|
|
for order in rebalance_orders:
|
|
order_job = {
|
|
"ticker": order.ticker,
|
|
"action": order.action,
|
|
"quantity": order.quantity,
|
|
"order_type": "market",
|
|
"source": "trading_engine",
|
|
"reason": order.reason,
|
|
"tag": order.tag,
|
|
}
|
|
if self.redis is not None:
|
|
try:
|
|
broker_queue = queue_key(QUEUE_BROKER)
|
|
await self.redis.rpush(broker_queue, json.dumps(order_job))
|
|
logger.info(
|
|
"Rebalance: pushed %s order for %s (%d shares)",
|
|
order.action,
|
|
order.ticker,
|
|
order.quantity,
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to push rebalance order for %s", order.ticker)
|
|
else:
|
|
logger.info(
|
|
"Rebalance (no redis): %s %s %d shares — %s",
|
|
order.action,
|
|
order.ticker,
|
|
order.quantity,
|
|
order.reason,
|
|
)
|
|
|
|
if rebalance_orders:
|
|
logger.info("Rebalance cycle completed: %d orders generated", len(rebalance_orders))
|
|
else:
|
|
logger.debug("Rebalance cycle completed: no orders needed")
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Unexpected error in rebalance scheduler")
|
|
if self.running:
|
|
await asyncio.sleep(60)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Correlation matrix computation
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _compute_correlation_matrix(self) -> None:
|
|
"""Compute pairwise price correlations from market_snapshots and load into self.correlation_matrix.
|
|
|
|
Queries the last 30 days of daily close prices, computes daily returns,
|
|
then calculates Pearson correlation coefficients between each ticker pair.
|
|
Uses numpy when available, otherwise falls back to a manual computation.
|
|
"""
|
|
if self.pool is None:
|
|
return
|
|
|
|
try:
|
|
rows = await self.pool.fetch(
|
|
"SELECT ticker, captured_at::date AS dt, (data->>'c')::float AS close "
|
|
"FROM market_snapshots "
|
|
"WHERE snapshot_type = 'bar' AND captured_at > NOW() - INTERVAL '30 days' "
|
|
"ORDER BY ticker, captured_at"
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not query market_snapshots for correlation matrix")
|
|
return
|
|
|
|
if not rows:
|
|
return
|
|
|
|
# Group close prices by ticker, keyed by date
|
|
ticker_prices: dict[str, dict] = {}
|
|
for row in rows:
|
|
ticker = row["ticker"]
|
|
dt = row["dt"]
|
|
close = row["close"]
|
|
if close is None:
|
|
continue
|
|
if ticker not in ticker_prices:
|
|
ticker_prices[ticker] = {}
|
|
ticker_prices[ticker][dt] = close
|
|
|
|
# Compute daily returns for each ticker
|
|
ticker_returns: dict[str, list[float]] = {}
|
|
all_dates: set = set()
|
|
for ticker, prices_by_date in ticker_prices.items():
|
|
sorted_dates = sorted(prices_by_date.keys())
|
|
all_dates.update(sorted_dates)
|
|
returns = []
|
|
for i in range(1, len(sorted_dates)):
|
|
prev = prices_by_date[sorted_dates[i - 1]]
|
|
curr = prices_by_date[sorted_dates[i]]
|
|
if prev > 0:
|
|
returns.append((curr - prev) / prev)
|
|
if returns:
|
|
ticker_returns[ticker] = returns
|
|
|
|
tickers = list(ticker_returns.keys())
|
|
if len(tickers) < 2:
|
|
return
|
|
|
|
# Align returns to common dates for proper pairwise comparison
|
|
sorted_all_dates = sorted(all_dates)
|
|
aligned_returns: dict[str, list[float]] = {}
|
|
for ticker in tickers:
|
|
prices_by_date = ticker_prices[ticker]
|
|
aligned = []
|
|
for i in range(1, len(sorted_all_dates)):
|
|
prev_dt = sorted_all_dates[i - 1]
|
|
curr_dt = sorted_all_dates[i]
|
|
if prev_dt in prices_by_date and curr_dt in prices_by_date:
|
|
prev_p = prices_by_date[prev_dt]
|
|
curr_p = prices_by_date[curr_dt]
|
|
if prev_p > 0:
|
|
aligned.append((curr_p - prev_p) / prev_p)
|
|
else:
|
|
aligned.append(0.0)
|
|
else:
|
|
aligned.append(None) # type: ignore[arg-type]
|
|
aligned_returns[ticker] = aligned
|
|
|
|
corr_data: dict[tuple[str, str], float] = {}
|
|
|
|
if _HAS_NUMPY:
|
|
import numpy as _np
|
|
|
|
for i in range(len(tickers)):
|
|
for j in range(i + 1, len(tickers)):
|
|
a_raw = aligned_returns[tickers[i]]
|
|
b_raw = aligned_returns[tickers[j]]
|
|
# Use only indices where both have valid returns
|
|
pairs = [
|
|
(a_raw[k], b_raw[k])
|
|
for k in range(len(a_raw))
|
|
if a_raw[k] is not None and b_raw[k] is not None
|
|
]
|
|
if len(pairs) < 5:
|
|
continue
|
|
a_arr = _np.array([p[0] for p in pairs])
|
|
b_arr = _np.array([p[1] for p in pairs])
|
|
corr_matrix = _np.corrcoef(a_arr, b_arr)
|
|
corr_val = float(corr_matrix[0, 1])
|
|
if not _np.isnan(corr_val):
|
|
corr_data[(tickers[i], tickers[j])] = corr_val
|
|
else:
|
|
# Manual Pearson correlation fallback
|
|
for i in range(len(tickers)):
|
|
for j in range(i + 1, len(tickers)):
|
|
a_raw = aligned_returns[tickers[i]]
|
|
b_raw = aligned_returns[tickers[j]]
|
|
pairs = [
|
|
(a_raw[k], b_raw[k])
|
|
for k in range(len(a_raw))
|
|
if a_raw[k] is not None and b_raw[k] is not None
|
|
]
|
|
if len(pairs) < 5:
|
|
continue
|
|
a_vals = [p[0] for p in pairs]
|
|
b_vals = [p[1] for p in pairs]
|
|
n = len(a_vals)
|
|
mean_a = sum(a_vals) / n
|
|
mean_b = sum(b_vals) / n
|
|
cov = sum((a_vals[k] - mean_a) * (b_vals[k] - mean_b) for k in range(n))
|
|
std_a = sum((v - mean_a) ** 2 for v in a_vals) ** 0.5
|
|
std_b = sum((v - mean_b) ** 2 for v in b_vals) ** 0.5
|
|
if std_a > 0 and std_b > 0:
|
|
corr_data[(tickers[i], tickers[j])] = cov / (std_a * std_b)
|
|
|
|
self.correlation_matrix.load(corr_data)
|
|
logger.info("Correlation matrix loaded: %d pairs", len(corr_data))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Async helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _persist_decision(self, decision: TradingDecision) -> None:
|
|
"""INSERT a trading decision into the trading_decisions table.
|
|
|
|
Task 27.5: Handles pool=None gracefully (skip persistence, log only).
|
|
"""
|
|
logger.info(
|
|
"Decision: %s %s ticker=%s reason=%s",
|
|
decision.decision,
|
|
decision.id[:8],
|
|
decision.ticker,
|
|
decision.skip_reason or "—",
|
|
)
|
|
|
|
if self.pool is None:
|
|
return
|
|
|
|
try:
|
|
await self.pool.execute(
|
|
"INSERT INTO trading_decisions "
|
|
"(id, recommendation_id, decision, skip_reason, ticker, "
|
|
"computed_position_size, computed_share_quantity, "
|
|
"risk_tier_at_decision, portfolio_heat_at_decision, "
|
|
"active_pool_at_decision, reserve_pool_at_decision, "
|
|
"circuit_breaker_status, correlation_check_result, "
|
|
"sector_exposure_check_result, earnings_proximity_flag, "
|
|
"is_micro_trade, decision_trace, created_at) "
|
|
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, "
|
|
"$11, $12, $13, $14, $15, $16, $17, $18)",
|
|
decision.id,
|
|
decision.recommendation_id,
|
|
decision.decision,
|
|
decision.skip_reason,
|
|
decision.ticker,
|
|
decision.computed_position_size,
|
|
decision.computed_share_quantity,
|
|
decision.risk_tier_at_decision,
|
|
decision.portfolio_heat_at_decision,
|
|
decision.active_pool_at_decision,
|
|
decision.reserve_pool_at_decision,
|
|
decision.circuit_breaker_status,
|
|
json.dumps(decision.correlation_check_result),
|
|
json.dumps(decision.sector_exposure_check_result),
|
|
decision.earnings_proximity_flag,
|
|
decision.is_micro_trade,
|
|
json.dumps(decision.decision_trace),
|
|
decision.created_at,
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not persist decision %s — table may not exist", decision.id[:8])
|
|
|
|
async def _sync_positions_and_siphon(self) -> None:
|
|
"""Sync positions from DB and siphon profit on closed positions.
|
|
|
|
Task 27.4: Fetches current positions, detects closes, and calls
|
|
siphon_profit() for profitable closes.
|
|
"""
|
|
if self.pool is None or self.portfolio_state is None:
|
|
return
|
|
|
|
try:
|
|
positions = await self._load_open_positions()
|
|
old_tickers = {p.ticker for p in self.portfolio_state.positions}
|
|
new_tickers = {p.ticker for p in positions}
|
|
|
|
# Detect closed positions
|
|
closed_tickers = old_tickers - new_tickers
|
|
for ticker in closed_tickers:
|
|
old_pos = next(
|
|
(p for p in self.portfolio_state.positions if p.ticker == ticker),
|
|
None,
|
|
)
|
|
if old_pos and old_pos.unrealized_pnl > 0:
|
|
transfer, new_balance = self.reserve_pool_controller.siphon_profit(
|
|
old_pos.unrealized_pnl,
|
|
self.portfolio_state.reserve_pool,
|
|
)
|
|
if transfer > 0:
|
|
self.portfolio_state.reserve_pool = new_balance
|
|
# Persist to reserve_pool_ledger
|
|
try:
|
|
await self.pool.execute(
|
|
"INSERT INTO reserve_pool_ledger "
|
|
"(amount, balance_after, trigger_type, reference_id, notes, created_at) "
|
|
"VALUES ($1, $2, 'profit_siphon', $3, $4, $5)",
|
|
transfer,
|
|
new_balance,
|
|
ticker,
|
|
f"Siphoned from {ticker} close",
|
|
datetime.now(tz=timezone.utc),
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not persist siphon event for %s", ticker)
|
|
|
|
logger.info(
|
|
"Siphoned $%.2f from %s close → reserve now $%.2f",
|
|
transfer,
|
|
ticker,
|
|
new_balance,
|
|
)
|
|
|
|
# Update portfolio state
|
|
self.portfolio_state.positions = positions
|
|
self.portfolio_state.open_position_count = len(positions)
|
|
|
|
except Exception:
|
|
logger.exception("Error syncing positions")
|
|
|
|
async def _fetch_current_prices(self, tickers: list[str]) -> dict[str, float]:
|
|
"""Fetch latest prices from Polygon API for the given tickers.
|
|
|
|
Task 28.2: Uses httpx for async HTTP calls. Returns a dict mapping
|
|
ticker → latest price. Handles API errors gracefully.
|
|
"""
|
|
if not tickers:
|
|
return {}
|
|
|
|
prices: dict[str, float] = {}
|
|
|
|
# Use the market data config for API key
|
|
api_key = ""
|
|
base_url = "https://api.polygon.io"
|
|
try:
|
|
from services.shared.config import load_config
|
|
app_config = load_config()
|
|
api_key = app_config.market_data.api_key
|
|
base_url = app_config.market_data.base_url
|
|
except Exception:
|
|
pass
|
|
|
|
if not api_key:
|
|
logger.debug("No Polygon API key configured — skipping price fetch")
|
|
return prices
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
# Use the grouped daily endpoint or snapshot for multiple tickers
|
|
tickers_str = ",".join(tickers)
|
|
url = f"{base_url}/v2/snapshot/locale/us/markets/stocks/tickers"
|
|
params = {"tickers": tickers_str, "apiKey": api_key}
|
|
|
|
resp = await client.get(url, params=params)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
for item in data.get("tickers", []):
|
|
t = item.get("ticker", "")
|
|
last_trade = item.get("lastTrade", {})
|
|
price = last_trade.get("p", 0.0)
|
|
if t and price > 0:
|
|
prices[t] = price
|
|
else:
|
|
logger.warning("Polygon API returned status %d", resp.status_code)
|
|
except Exception:
|
|
logger.warning("Failed to fetch prices from Polygon API")
|
|
|
|
return prices
|
|
|
|
async def _load_open_positions(self) -> list[OpenPosition]:
|
|
"""Load open positions from the database.
|
|
|
|
Task 28.3: Queries the position_stop_levels table for active positions.
|
|
Returns typed OpenPosition list.
|
|
"""
|
|
if self.pool is None:
|
|
return []
|
|
|
|
positions: list[OpenPosition] = []
|
|
try:
|
|
rows = await self.pool.fetch(
|
|
"SELECT ticker, entry_price, stop_loss_price, take_profit_price, "
|
|
"signal_confidence, is_micro_trade "
|
|
"FROM position_stop_levels WHERE active = TRUE"
|
|
)
|
|
for row in rows:
|
|
positions.append(
|
|
OpenPosition(
|
|
ticker=row["ticker"],
|
|
quantity=1, # Default; real quantity from orders table
|
|
entry_price=float(row["entry_price"]),
|
|
current_price=float(row["entry_price"]),
|
|
unrealized_pnl=0.0,
|
|
market_value=float(row["entry_price"]),
|
|
sector="",
|
|
stop_loss_price=float(row["stop_loss_price"]),
|
|
take_profit_price=float(row["take_profit_price"]),
|
|
signal_confidence=float(row["signal_confidence"]),
|
|
is_micro_trade=bool(row["is_micro_trade"]),
|
|
)
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not load open positions — table may not exist")
|
|
|
|
return positions
|
|
|
|
async def _load_stop_levels(self) -> dict[str, StopLevels]:
|
|
"""Load active stop-loss/take-profit levels from the database.
|
|
|
|
Task 28.3: Queries position_stop_levels WHERE active = TRUE.
|
|
Returns dict keyed by ticker.
|
|
"""
|
|
if self.pool is None:
|
|
return {}
|
|
|
|
levels: dict[str, StopLevels] = {}
|
|
try:
|
|
rows = await self.pool.fetch(
|
|
"SELECT ticker, stop_loss_price, take_profit_price, "
|
|
"trailing_stop_active, atr_value, atr_multiplier, "
|
|
"reward_risk_ratio, updated_at "
|
|
"FROM position_stop_levels WHERE active = TRUE"
|
|
)
|
|
for row in rows:
|
|
levels[row["ticker"]] = StopLevels(
|
|
stop_loss_price=float(row["stop_loss_price"]),
|
|
take_profit_price=float(row["take_profit_price"]),
|
|
trailing_stop_active=bool(row["trailing_stop_active"]),
|
|
atr_value=float(row["atr_value"]),
|
|
atr_multiplier=float(row["atr_multiplier"]),
|
|
reward_risk_ratio=float(row["reward_risk_ratio"]),
|
|
)
|
|
return levels
|
|
except Exception:
|
|
logger.debug("Could not load stop levels — table may not exist")
|
|
return {}
|
|
|
|
async def _check_circuit_breaker_daily_loss(self) -> None:
|
|
"""Check if daily unrealized loss exceeds the circuit breaker threshold.
|
|
|
|
If the portfolio has lost more than circuit_breaker_daily_loss_pct
|
|
(default 5%) from its start-of-day value, activate the circuit breaker
|
|
to halt all new trades.
|
|
"""
|
|
if self.pool is None or self.portfolio_state is None:
|
|
return
|
|
|
|
# Already active — nothing to do
|
|
if self._cb_state.active:
|
|
return
|
|
|
|
try:
|
|
# Get total unrealized P&L from positions
|
|
row = await self.pool.fetchrow(
|
|
"SELECT COALESCE(SUM(unrealized_pnl), 0) AS total_pnl FROM positions WHERE quantity > 0"
|
|
)
|
|
total_pnl = float(row["total_pnl"]) if row else 0.0
|
|
total_value = self.portfolio_state.total_value
|
|
|
|
if total_value <= 0:
|
|
return
|
|
|
|
daily_loss_pct = abs(min(0, total_pnl)) / total_value
|
|
|
|
# Check against threshold (default 5%)
|
|
threshold = getattr(self.config, 'circuit_breaker_daily_loss_pct', 0.05) or 0.05
|
|
if isinstance(threshold, str):
|
|
threshold = float(threshold)
|
|
|
|
if daily_loss_pct >= threshold:
|
|
# Activate circuit breaker
|
|
now = datetime.now(tz=timezone.utc)
|
|
cooldown_hours = getattr(self.config, 'circuit_breaker_volatility_pause_hours', 2) or 2
|
|
cooldown_expires = now + timedelta(hours=int(cooldown_hours))
|
|
|
|
self._cb_state = CircuitBreakerState(
|
|
active=True,
|
|
trigger_type="daily_loss_limit",
|
|
triggered_at=now,
|
|
cooldown_expires=cooldown_expires,
|
|
)
|
|
|
|
# Persist to DB
|
|
try:
|
|
await self.pool.execute(
|
|
"INSERT INTO circuit_breaker_events "
|
|
"(trigger_type, triggered_at, cooldown_expires, trigger_data) "
|
|
"VALUES ($1, $2, $3, $4)",
|
|
"daily_loss_limit",
|
|
now,
|
|
cooldown_expires,
|
|
json.dumps({
|
|
"daily_loss_pct": round(daily_loss_pct, 4),
|
|
"threshold": threshold,
|
|
"total_pnl": round(total_pnl, 2),
|
|
"total_value": round(total_value, 2),
|
|
}),
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not persist circuit breaker event")
|
|
|
|
logger.warning(
|
|
"CIRCUIT BREAKER ACTIVATED: daily loss %.1f%% exceeds %.1f%% threshold "
|
|
"(P&L=$%.2f, value=$%.2f). Trading halted until %s",
|
|
daily_loss_pct * 100, threshold * 100,
|
|
total_pnl, total_value, cooldown_expires.isoformat(),
|
|
)
|
|
|
|
# Create alert
|
|
self.create_alert(
|
|
"circuit_breaker_activated",
|
|
f"Daily loss {daily_loss_pct:.1%} exceeded {threshold:.1%} threshold. "
|
|
f"Trading halted until {cooldown_expires.strftime('%H:%M ET')}.",
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not check circuit breaker daily loss")
|
|
|
|
async def _check_profit_taking(self) -> None:
|
|
"""Check positions for profit-taking opportunities.
|
|
|
|
Sells positions that have gained more than the take-profit threshold
|
|
defined by the risk tier's reward_risk_ratio. For moderate tier with
|
|
2.0 ATR stop and 1.5 reward/risk, that's roughly a 3% gain target.
|
|
|
|
Also sells positions that have gained > 10% regardless of tier
|
|
(absolute profit cap to lock in gains).
|
|
"""
|
|
if self.pool is None:
|
|
return
|
|
|
|
try:
|
|
rows = await self.pool.fetch(
|
|
"SELECT ticker, quantity, avg_entry_price, current_price, unrealized_pnl "
|
|
"FROM positions WHERE quantity > 0"
|
|
)
|
|
except Exception:
|
|
return
|
|
|
|
for row in rows:
|
|
ticker = row["ticker"]
|
|
qty = int(row["quantity"])
|
|
entry = float(row["avg_entry_price"])
|
|
current = float(row["current_price"] or 0)
|
|
|
|
if entry <= 0 or current <= 0 or qty <= 0:
|
|
continue
|
|
|
|
gain_pct = (current - entry) / entry
|
|
|
|
# Absolute profit cap: sell if gain > 10%
|
|
# Or tier-based: reward_risk_ratio * stop_loss_atr_multiplier * ~2% base ATR
|
|
tier_target = self._active_risk_tier.reward_risk_ratio * self._active_risk_tier.stop_loss_atr_multiplier * 0.02
|
|
should_sell = gain_pct >= 0.10 or gain_pct >= tier_target
|
|
|
|
if should_sell:
|
|
logger.info(
|
|
"Profit-taking: %s gained %.1f%% (target=%.1f%%) — selling %d shares",
|
|
ticker, gain_pct * 100, max(10.0, tier_target * 100), qty,
|
|
)
|
|
await self._submit_sell_order(ticker, qty, f"profit_taking_{gain_pct:.1%}")
|
|
|
|
# Update portfolio state
|
|
if self.portfolio_state:
|
|
self.portfolio_state.open_position_count = max(
|
|
0, self.portfolio_state.open_position_count - 1
|
|
)
|
|
self.portfolio_state.active_pool += current * qty
|
|
|
|
async def _submit_sell_order(
|
|
self, ticker: str, quantity: int, reason: str
|
|
) -> None:
|
|
"""Push a sell order to the broker queue via Redis."""
|
|
order_job = {
|
|
"ticker": ticker,
|
|
"action": "sell",
|
|
"quantity": quantity,
|
|
"order_type": "market",
|
|
"source": "trading_engine",
|
|
"reason": reason,
|
|
}
|
|
if self.redis is not None:
|
|
try:
|
|
broker_queue = queue_key(QUEUE_BROKER)
|
|
await self.redis.rpush(broker_queue, json.dumps(order_job))
|
|
logger.info("Submitted sell order for %s (%d shares): %s", ticker, quantity, reason)
|
|
except Exception:
|
|
logger.exception("Failed to push sell order for %s", ticker)
|
|
else:
|
|
logger.info("Sell order (no redis): %s %d shares — %s", ticker, quantity, reason)
|
|
|
|
async def _persist_daily_snapshot(self, now: datetime) -> None:
|
|
"""Persist end-of-day portfolio snapshot to portfolio_snapshots table.
|
|
|
|
Task 29.2: Called after 4:00 PM ET when market closes.
|
|
"""
|
|
if self.pool is None or self.portfolio_state is None:
|
|
return
|
|
|
|
from services.trading.trading_window import ET
|
|
et_now = now.astimezone(ET)
|
|
snapshot_date = et_now.date()
|
|
|
|
try:
|
|
await self.pool.execute(
|
|
"INSERT INTO portfolio_snapshots "
|
|
"(snapshot_date, portfolio_value, active_pool, reserve_pool, "
|
|
"daily_return, cumulative_return, unrealized_pnl, realized_pnl, "
|
|
"win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, "
|
|
"current_drawdown_pct, portfolio_heat, risk_tier, "
|
|
"positions, metrics, created_at) "
|
|
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, "
|
|
"$11, $12, $13, $14, $15, $16, $17, $18, $19) "
|
|
"ON CONFLICT (snapshot_date) DO UPDATE SET "
|
|
"portfolio_value = EXCLUDED.portfolio_value, "
|
|
"active_pool = EXCLUDED.active_pool, "
|
|
"reserve_pool = EXCLUDED.reserve_pool, "
|
|
"updated_at = NOW()",
|
|
snapshot_date,
|
|
self.portfolio_state.total_value,
|
|
self.portfolio_state.active_pool,
|
|
self.portfolio_state.reserve_pool,
|
|
0.0, # daily_return
|
|
0.0, # cumulative_return
|
|
sum(p.unrealized_pnl for p in self.portfolio_state.positions),
|
|
0.0, # realized_pnl
|
|
0, # win_count
|
|
0, # loss_count
|
|
0.0, # win_rate
|
|
0.0, # sharpe_ratio
|
|
0.0, # max_drawdown
|
|
0.0, # current_drawdown_pct
|
|
self.portfolio_state.portfolio_heat,
|
|
self.config.risk_tier,
|
|
json.dumps([]), # positions
|
|
json.dumps({}), # metrics
|
|
now,
|
|
)
|
|
logger.info("Persisted daily snapshot for %s", snapshot_date)
|
|
except Exception:
|
|
logger.debug("Could not persist daily snapshot — table may not exist")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Decision builders
|
|
# ------------------------------------------------------------------
|
|
|
|
def _skip_decision(
|
|
self,
|
|
*,
|
|
rec_id: str | None,
|
|
ticker: str,
|
|
skip_reason: str,
|
|
risk_tier: RiskTierConfig,
|
|
portfolio_state: PortfolioState,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
reasoning: list[str],
|
|
now: datetime,
|
|
) -> TradingDecision:
|
|
return TradingDecision(
|
|
id=str(uuid.uuid4()),
|
|
recommendation_id=rec_id,
|
|
decision="skip",
|
|
skip_reason=skip_reason,
|
|
ticker=ticker,
|
|
computed_position_size=None,
|
|
computed_share_quantity=None,
|
|
risk_tier_at_decision=risk_tier.name,
|
|
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
|
|
active_pool_at_decision=portfolio_state.active_pool,
|
|
reserve_pool_at_decision=portfolio_state.reserve_pool,
|
|
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
|
|
decision_trace={"reasoning": reasoning},
|
|
created_at=now,
|
|
)
|
|
|
|
def _act_decision(
|
|
self,
|
|
*,
|
|
rec_id: str | None,
|
|
ticker: str,
|
|
size_result: PositionSizeResult,
|
|
risk_tier: RiskTierConfig,
|
|
portfolio_state: PortfolioState,
|
|
circuit_breaker_state: CircuitBreakerState,
|
|
reasoning: list[str],
|
|
now: datetime,
|
|
) -> TradingDecision:
|
|
return TradingDecision(
|
|
id=str(uuid.uuid4()),
|
|
recommendation_id=rec_id,
|
|
decision="act",
|
|
skip_reason=None,
|
|
ticker=ticker,
|
|
computed_position_size=size_result.dollar_amount,
|
|
computed_share_quantity=size_result.share_quantity,
|
|
risk_tier_at_decision=risk_tier.name,
|
|
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
|
|
active_pool_at_decision=portfolio_state.active_pool,
|
|
reserve_pool_at_decision=portfolio_state.reserve_pool,
|
|
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
|
|
decision_trace={"reasoning": reasoning},
|
|
created_at=now,
|
|
)
|