"""Core autonomous trading engine — decision loop and pre-trade evaluation. Feature: autonomous-trading-engine Coordinates all trading sub-components (PositionSizer, StopLossManager, CircuitBreaker, ReservePoolController, RiskTierController, CorrelationMatrix) to evaluate recommendations and produce TradingDecision records. The ``evaluate_recommendation`` method is deliberately synchronous-compatible so that it can be tested without real DB/Redis connections. The async ``start`` / ``stop`` methods manage the live decision loop, stop-loss monitor, and performance metrics loop. """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime, timedelta, timezone import httpx try: import numpy as np # noqa: F401 _HAS_NUMPY = True except ImportError: _HAS_NUMPY = False from services.shared.config import TradingConfig from services.shared.redis_keys import ( QUEUE_BROKER, queue_key, trading_dedupe_key, ) from services.trading.circuit_breaker import CircuitBreaker from services.trading.correlation import CorrelationMatrix from services.trading.micro_trading import MicroTradeConfig, MicroTradingModule from services.trading.models import ( RISK_TIER_DEFAULTS, CircuitBreakerState, OpenPosition, PerformanceMetrics, PortfolioState, PositionSizeResult, RiskTierConfig, StopLevels, StopTrigger, TradingDecision, ) from services.trading.notifications import NotificationRecord, NotificationService from services.trading.performance_tracker import PerformanceComputer from services.trading.position_sizer import PositionSizer from services.trading.rebalancer import PortfolioRebalancer from services.trading.reserve_pool import ReservePoolController from services.trading.risk_tier_controller import RiskTierController from services.trading.stop_loss_manager import StopLossManager from services.trading.trading_window import is_market_open, is_within_trading_window logger = logging.getLogger("trading_engine") class TradingEngine: """Main autonomous trading engine. Manages the decision loop, coordinates all sub-components, and maintains runtime state. Parameters ---------- pool: asyncpg connection pool (used by async lifecycle methods). redis: Redis client (used for deduplication and pub/sub). config: Trading engine configuration. """ def __init__( self, pool: object, redis: object, config: TradingConfig, ) -> None: self.pool = pool self.redis = redis self.config = config # Sub-components self.position_sizer = PositionSizer() self.stop_loss_manager = StopLossManager() self.circuit_breaker = CircuitBreaker() self.reserve_pool_controller = ReservePoolController( siphon_pct=config.reserve_siphon_pct, high_water_pct=config.reserve_high_water_pct, ) self.risk_tier_controller = RiskTierController() self.correlation_matrix = CorrelationMatrix() self.notification_service = NotificationService() self.micro_trading_module = MicroTradingModule() self.rebalancer = PortfolioRebalancer() self.performance_computer = PerformanceComputer() # Runtime state self.running: bool = False self.portfolio_state: PortfolioState | None = None self.processed_recommendation_ids: set[str] = set() # Async task management (Task 27.6) self._tasks: list[asyncio.Task] = [] # type: ignore[type-arg] # Active risk tier loaded from config defaults self._active_risk_tier: RiskTierConfig = RISK_TIER_DEFAULTS.get( config.risk_tier, RISK_TIER_DEFAULTS["moderate"] ) # Circuit breaker runtime state self._cb_state: CircuitBreakerState = CircuitBreakerState() # Earnings calendar cache self._earnings_calendar: dict = {} # Last poll timestamp — initialised to 24 h ago so first poll # picks up recent recommendations self._last_poll_timestamp: datetime = datetime.now(tz=timezone.utc) - timedelta(hours=24) # Per-ticker last-price-fetch timestamps for safety sell (Task 28.4) self._last_price_timestamps: dict[str, datetime] = {} # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: """Load portfolio state and spawn the async worker loops. When ``self.pool`` is ``None`` (unit-test / lightweight mode) the engine skips database loading and starts with an empty portfolio. """ # --- Load initial state from PostgreSQL (graceful degradation) --- if self.pool is not None: try: await self._load_initial_state() except Exception: logger.exception("Failed to load initial state from DB — starting with defaults") # Ensure we always have a portfolio state if self.portfolio_state is None: self.portfolio_state = PortfolioState() self.running = True # Spawn async worker loops self._tasks = [ asyncio.create_task(self._decision_loop(), name="decision_loop"), asyncio.create_task(self._stop_loss_monitor(), name="stop_loss_monitor"), asyncio.create_task(self._performance_loop(), name="performance_loop"), asyncio.create_task(self._risk_tier_scheduler(), name="risk_tier_scheduler"), asyncio.create_task(self._rebalance_scheduler(), name="rebalance_scheduler"), ] logger.info("Trading engine started with %d worker tasks", len(self._tasks)) async def stop(self) -> None: """Graceful shutdown — cancel all worker tasks and persist state.""" self.running = False # Cancel all tasks for task in self._tasks: task.cancel() if self._tasks: await asyncio.gather(*self._tasks, return_exceptions=True) self._tasks.clear() logger.info("Trading engine stopped") # ------------------------------------------------------------------ # Core evaluation logic (synchronous-compatible for testing) # ------------------------------------------------------------------ def evaluate_recommendation( self, rec: dict, portfolio_state: PortfolioState, risk_tier: RiskTierConfig, circuit_breaker_state: CircuitBreakerState, correlation_matrix: CorrelationMatrix, earnings_calendar: dict, now: datetime | None = None, ) -> TradingDecision: """Run all pre-trade checks and produce a TradingDecision. The checks are applied in order; the first failure short-circuits with a ``skip`` decision. If all checks pass the PositionSizer is invoked and its result determines the final decision. Parameters ---------- rec: Recommendation dict with at least ``recommendation_id``, ``ticker``, ``confidence``, ``sector``, ``current_price``, and ``action``. portfolio_state: Current portfolio snapshot. risk_tier: Active risk tier configuration. circuit_breaker_state: Current circuit breaker state. correlation_matrix: Correlation matrix instance for diversification checks. earnings_calendar: Mapping of ticker → next earnings datetime. now: Optional override for the current timestamp (for testing). """ now = now or datetime.now(tz=timezone.utc) rec_id = rec.get("recommendation_id") ticker = rec.get("ticker", "") confidence = rec.get("confidence", 0.0) sector = rec.get("sector", "") current_price = rec.get("current_price", 0.0) reasoning: list[str] = [] # --- a. Circuit breaker active? -------------------------------- if self.circuit_breaker.is_active(circuit_breaker_state, now=now): reasoning.append("Circuit breaker is active") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="circuit_breaker_active", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- b. Trading window? ---------------------------------------- if not is_within_trading_window(now): reasoning.append("Outside trading window") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="outside_trading_window", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- c. Confidence gate (early check before sizer) ------------- if confidence < risk_tier.min_confidence: reasoning.append( f"Confidence {confidence:.4f} below tier minimum " f"{risk_tier.min_confidence}" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="insufficient_confidence", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- d. Deduplication check ------------------------------------ if rec_id and rec_id in self.processed_recommendation_ids: reasoning.append(f"Recommendation {rec_id} already processed") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="duplicate_recommendation", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- e. Multiple declining positions check --------------------- if self.check_declining_positions(portfolio_state.positions): reasoning.append( "Multiple declining positions — halting new entries" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="multiple_declining_positions", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- f. Max open positions check ------------------------------- max_positions = self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10 if self.check_max_positions( portfolio_state.open_position_count, max_positions ): reasoning.append( f"At max open positions ({portfolio_state.open_position_count}/" f"{max_positions})" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="max_positions_reached", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- g. Position sizing ---------------------------------------- reasoning.append("All pre-trade checks passed — computing position size") # Build the raw correlation dict expected by PositionSizer corr_dict: dict[tuple[str, str], float] = correlation_matrix._data size_result: PositionSizeResult = self.position_sizer.compute( confidence=confidence, ticker=ticker, sector=sector, current_price=current_price, active_pool=portfolio_state.active_pool, risk_tier=risk_tier, portfolio_state=portfolio_state, correlation_matrix=corr_dict, earnings_calendar=earnings_calendar, absolute_position_cap=self.config.absolute_position_cap, active_pool_minimum=self.config.active_pool_minimum, ) reasoning.extend(size_result.adjustments) if size_result.rejected: reasoning.append(f"Position sizer rejected: {size_result.rejection_reason}") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason=f"position_sizer_rejected: {size_result.rejection_reason}", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # Mark recommendation as processed if rec_id: self.processed_recommendation_ids.add(rec_id) return self._act_decision( rec_id=rec_id, ticker=ticker, size_result=size_result, risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # ------------------------------------------------------------------ # Helper checks # ------------------------------------------------------------------ def check_declining_positions( self, positions: list[OpenPosition], threshold_pct: float = 0.50, decline_pct: float = 0.02, ) -> bool: """Return True if > threshold_pct of positions have > decline_pct negative unrealized P&L. A position is considered "declining" when its unrealized P&L as a fraction of its entry value is worse than ``-decline_pct``. Parameters ---------- positions: List of currently open positions. threshold_pct: Fraction of positions that must be declining to trigger (default 0.50 = 50%). decline_pct: Minimum loss fraction to count as declining (default 0.02 = 2%). """ if not positions: return False declining_count = 0 for pos in positions: entry_value = pos.entry_price * pos.quantity if entry_value <= 0: continue loss_pct = -pos.unrealized_pnl / entry_value if loss_pct > decline_pct: declining_count += 1 return declining_count > threshold_pct * len(positions) def check_max_positions( self, open_count: int, max_positions: int = 10, ) -> bool: """Return True if the portfolio is at the maximum number of open positions.""" return open_count >= max_positions # ------------------------------------------------------------------ # Integration wiring — thin wrappers for the decision loop # ------------------------------------------------------------------ def check_stop_loss_crossings( self, positions: list[OpenPosition], prices: dict[str, float], stop_levels: dict[str, StopLevels], ) -> list[StopTrigger]: """Delegate to StopLossManager.check_price_crossings(). Called by the async decision loop at the configured interval (5 min default, 60s during high-severity events). """ return self.stop_loss_manager.check_price_crossings( positions, prices, stop_levels ) def handle_position_close( self, realized_profit: float, reserve_balance: float, ) -> tuple[float, float]: """Delegate to ReservePoolController.siphon_profit(). Called when a position close event is detected from broker service fill events. """ return self.reserve_pool_controller.siphon_profit( realized_profit, reserve_balance ) def evaluate_risk_tier( self, current_tier: str, metrics: PerformanceMetrics, reserve_pct: float, ) -> str | None: """Delegate to RiskTierController.evaluate(). Scheduled to run at daily market close. """ return self.risk_tier_controller.evaluate( current_tier, metrics, reserve_pct ) def evaluate_rebalancing( self, positions: list[OpenPosition], risk_tier: RiskTierConfig, active_pool: float, ) -> list: """Delegate to PortfolioRebalancer.evaluate(). Scheduled to run weekly at Monday market open. """ return self.rebalancer.evaluate(positions, risk_tier, active_pool) def create_alert( self, event_type: str, details: str, ) -> NotificationRecord: """Create a notification record for a critical event. Delegates formatting and record creation to NotificationService. The caller is responsible for actual delivery. """ message = self.notification_service.format_alert(event_type, details) return self.notification_service.create_notification( channel="email", event_type=event_type, message=message, ) def check_micro_trade_constraints( self, daily_count: int, is_within_window: bool, circuit_breaker_active: bool, portfolio_heat_pct: float, max_heat: float, ) -> tuple[bool, str]: """Delegate to MicroTradingModule.check_constraints(). Called by the decision loop when micro-trading is enabled. """ micro_config = MicroTradeConfig( enabled=self.config.micro_trading_enabled, allocation_cap_pct=self.config.micro_trading_allocation_cap_pct, max_daily=self.config.micro_trading_max_daily, max_hold_minutes=self.config.micro_trading_max_hold_minutes, ) return self.micro_trading_module.check_constraints( config=micro_config, daily_count=daily_count, is_within_window=is_within_window, circuit_breaker_active=circuit_breaker_active, portfolio_heat_pct=portfolio_heat_pct, max_heat=max_heat, ) # ------------------------------------------------------------------ # Async worker loops # ------------------------------------------------------------------ async def _load_initial_state(self) -> None: """Load portfolio state, risk tier, reserve pool, and CB status from DB.""" if self.pool is None: return # Load reserve pool balance reserve_balance = 0.0 try: row = await self.pool.fetchrow( "SELECT balance_after FROM reserve_pool_ledger ORDER BY created_at DESC LIMIT 1" ) if row: reserve_balance = float(row["balance_after"]) except Exception: logger.debug("Could not load reserve pool balance — using 0.0") # Load circuit breaker state (unresolved events) try: cb_row = await self.pool.fetchrow( "SELECT trigger_type, triggered_at, cooldown_expires " "FROM circuit_breaker_events WHERE resolved_at IS NULL " "ORDER BY created_at DESC LIMIT 1" ) if cb_row: self._cb_state = CircuitBreakerState( active=True, trigger_type=cb_row["trigger_type"], triggered_at=cb_row["triggered_at"], cooldown_expires=cb_row["cooldown_expires"], ) except Exception: logger.debug("Could not load circuit breaker state — using inactive") # Build portfolio state from broker account (real buying power) initial_capital = 100000.0 try: acct_row = await self.pool.fetchrow( "SELECT config FROM broker_accounts WHERE active = TRUE ORDER BY created_at DESC LIMIT 1" ) if acct_row and acct_row["config"]: cfg = acct_row["config"] if isinstance(acct_row["config"], dict) else {} initial_capital = float(cfg.get("portfolio_value", cfg.get("cash", 100000.0))) except Exception: logger.debug("Could not load broker account — using default capital") # Load actual positions to calculate invested amount invested = 0.0 open_count = 0 try: pos_rows = await self.pool.fetch( "SELECT quantity, avg_entry_price, current_price, unrealized_pnl FROM positions WHERE quantity > 0" ) for pr in pos_rows: invested += float(pr["quantity"]) * float(pr["current_price"] or pr["avg_entry_price"]) open_count += 1 except Exception: logger.debug("Could not load positions — assuming no invested capital") # Use portfolio_value (equity) as the total, not initial_capital # Available = equity - invested market value is wrong with margin # Instead use: buying_power from broker or equity - long_market_value available = max(0.0, initial_capital - invested) self.portfolio_state = PortfolioState( reserve_pool=reserve_balance, active_pool=available, total_value=initial_capital, open_position_count=open_count, ) logger.info( "Portfolio state: total=$%.2f invested=$%.2f available=$%.2f reserve=$%.2f positions=%d", initial_capital, invested, available, reserve_balance, open_count, ) # Compute initial correlation matrix from market data await self._compute_correlation_matrix() async def _decision_loop(self) -> None: """Poll recommendations and evaluate them in a continuous loop. Task 27.3: Main decision loop that polls the recommendations table, checks Redis deduplication, evaluates each recommendation, and pushes "act" decisions to the broker queue. """ while self.running: try: await asyncio.sleep(self.config.polling_interval_seconds) if not self.running: break if self.pool is None: continue # Poll recommendations from PostgreSQL recs: list[dict] = [] try: rows = await self.pool.fetch( "SELECT * FROM recommendations " "WHERE action IN ('buy','sell') " "AND mode IN ('paper_eligible','live_eligible') " "AND generated_at > NOW() - INTERVAL '2 hours' " "AND generated_at > $1 " "ORDER BY confidence DESC " "LIMIT 50", self._last_poll_timestamp, ) if rows: # Advance timestamp to the oldest rec in this batch # so next poll picks up where we left off last_gen = max(r["generated_at"] for r in rows) self._last_poll_timestamp = last_gen recs = [dict(r) for r in rows] if recs: logger.info( "Polled %d recommendations (highest confidence=%.3f)", len(recs), recs[0].get("confidence", 0), ) # Fetch current prices for all tickers in this batch batch_tickers = list({r.get("ticker", "") for r in recs if r.get("ticker")}) price_map: dict[str, float] = {} if batch_tickers and self.pool is not None: try: price_rows = await self.pool.fetch( """SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float AS price FROM market_snapshots WHERE ticker = ANY($1) AND snapshot_type = 'bar' ORDER BY ticker, captured_at DESC""", batch_tickers, ) price_map = {r["ticker"]: r["price"] for r in price_rows if r["price"]} except Exception: logger.debug("Could not fetch prices from market_snapshots") # Fall back to Polygon API for any missing prices missing = [t for t in batch_tickers if t not in price_map] if missing: fetched = await self._fetch_current_prices(missing) price_map.update(fetched) except Exception: logger.debug("Could not poll recommendations — table may not exist yet") continue for rec in recs: try: rec_id = str(rec.get("recommendation_id", rec.get("id", ""))) ticker = rec.get("ticker", "") # Inject current price from market data if not rec.get("current_price") and ticker in price_map: rec["current_price"] = price_map[ticker] # Skip if no price available if not rec.get("current_price") or rec["current_price"] <= 0: continue # Redis deduplication check if self.redis is not None: dedupe_key = trading_dedupe_key(rec_id) already = await self.redis.get(dedupe_key) if already: continue # Ensure portfolio state exists if self.portfolio_state is None: self.portfolio_state = PortfolioState() action = rec.get("action", "buy") # --- Sell path: skip position sizing, look up existing position --- if action == "sell": # Check trading window for sells too now_check = datetime.now(tz=timezone.utc) if not is_within_trading_window(now_check): continue pos_row = None try: pos_row = await self.pool.fetchrow( "SELECT quantity, avg_entry_price, current_price " "FROM positions WHERE ticker = $1 AND quantity > 0", ticker, ) except Exception: logger.debug("Could not look up position for sell: %s", ticker) if pos_row is None: continue # Set dedup key only after confirming we have a position to sell if self.redis is not None: await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400) sell_qty = int(pos_row["quantity"]) sell_price = rec.get("current_price", 0.0) estimated_proceeds = sell_qty * sell_price order_job = { "trading_decision_id": str(uuid.uuid4()), "ticker": ticker, "action": "sell", "side": "sell", "quantity": sell_qty, "order_type": "market", "estimated_value": estimated_proceeds, "source": "trading_engine", } if self.redis is not None: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info( "Pushed sell order for %s (%d shares, ~$%.2f) to broker queue", ticker, sell_qty, estimated_proceeds, ) # Update portfolio state if self.portfolio_state: self.portfolio_state.open_position_count = max( 0, self.portfolio_state.open_position_count - 1 ) self.portfolio_state.active_pool += estimated_proceeds # Persist sell decision for audit trail sell_decision = TradingDecision( id=order_job["trading_decision_id"], recommendation_id=rec_id, decision="act", skip_reason=None, ticker=ticker, computed_position_size=estimated_proceeds, computed_share_quantity=sell_qty, risk_tier_at_decision=self._active_risk_tier.name, portfolio_heat_at_decision=self.portfolio_state.portfolio_heat if self.portfolio_state else 0, active_pool_at_decision=self.portfolio_state.active_pool if self.portfolio_state else 0, reserve_pool_at_decision=self.portfolio_state.reserve_pool if self.portfolio_state else 0, circuit_breaker_status="active" if self._cb_state.active else "inactive", decision_trace={"action": "sell", "reasoning": [f"Sell {sell_qty} shares of {ticker}"]}, ) await self._persist_decision(sell_decision) # Deactivate stop levels for sold position try: await self.pool.execute( "UPDATE position_stop_levels SET active = FALSE, updated_at = NOW() WHERE ticker = $1 AND active = TRUE", ticker, ) except Exception: pass if rec_id: self.processed_recommendation_ids.add(rec_id) continue # --- Buy path --- # Set dedup key for buys if self.redis is not None: await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400) # Check if we already hold this ticker — don't double up try: existing_pos = await self.pool.fetchrow( "SELECT quantity FROM positions WHERE ticker = $1 AND quantity > 0", ticker, ) if existing_pos: continue except Exception: pass # Evaluate recommendation through position sizer decision = self.evaluate_recommendation( rec=rec, portfolio_state=self.portfolio_state, risk_tier=self._active_risk_tier, circuit_breaker_state=self._cb_state, correlation_matrix=self.correlation_matrix, earnings_calendar=self._earnings_calendar, ) logger.info( "Decision for %s: %s (reason=%s, size=$%.2f, shares=%d)", decision.ticker, decision.decision, decision.skip_reason or "n/a", decision.computed_position_size or 0, decision.computed_share_quantity or 0, ) # For "act" decisions: push order to broker queue if decision.decision == "act": # Deduct from active pool immediately to prevent over-allocation order_cost = (decision.computed_share_quantity or 0) * rec.get("current_price", 0) if self.portfolio_state and order_cost > 0: self.portfolio_state.active_pool = max(0.0, self.portfolio_state.active_pool - order_cost) self.portfolio_state.open_position_count += 1 order_job = { "trading_decision_id": decision.id, "ticker": decision.ticker, "action": rec.get("action", "buy"), "quantity": decision.computed_share_quantity, "order_type": "market", "source": "trading_engine", } if self.redis is not None: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info( "Pushed order for %s (%d shares, $%.2f) to broker queue — remaining pool: $%.2f", decision.ticker, decision.computed_share_quantity or 0, order_cost, self.portfolio_state.active_pool if self.portfolio_state else 0, ) # Create stop-loss levels for the new position if self.pool is not None: try: price = rec.get("current_price", 0.0) atr_est = price * 0.02 # 2% ATR estimate tier = self._active_risk_tier sl_price = price * (1 - 0.02 * tier.stop_loss_atr_multiplier) tp_price = price * (1 + 0.02 * tier.stop_loss_atr_multiplier * tier.reward_risk_ratio) await self.pool.execute( "INSERT INTO position_stop_levels " "(ticker, entry_price, stop_loss_price, take_profit_price, " "trailing_stop_active, atr_value, atr_multiplier, " "reward_risk_ratio, signal_confidence, is_micro_trade, active) " "VALUES ($1, $2, $3, $4, FALSE, $5, $6, $7, $8, FALSE, TRUE)", decision.ticker, price, sl_price, tp_price, atr_est, tier.stop_loss_atr_multiplier, tier.reward_risk_ratio, rec.get("confidence", 0.8), ) except Exception: logger.debug("Could not create stop levels for %s", decision.ticker) # Persist decision await self._persist_decision(decision) except Exception: logger.exception("Error evaluating recommendation %s", rec.get("recommendation_id", "?")) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in decision loop") if self.running: await asyncio.sleep(5) async def _stop_loss_monitor(self) -> None: """Monitor open positions for stop-loss and take-profit crossings. Task 28.1: Periodically checks current prices against stop levels and submits sell orders for triggered positions. """ while self.running: try: await asyncio.sleep(self.config.stop_loss_check_interval_seconds) if not self.running: break now = datetime.now(tz=timezone.utc) # Skip if not market hours if not is_market_open(now): continue if self.pool is None: continue # Load positions and stop levels from DB positions = await self._load_open_positions() stop_levels = await self._load_stop_levels() if not positions: continue # Fetch current prices tickers = [p.ticker for p in positions] prices = await self._fetch_current_prices(tickers) # Update last-price timestamps for tickers that returned data for ticker in tickers: if ticker in prices: self._last_price_timestamps[ticker] = now # Safety sell for missing price data (Task 28.4) for pos in positions: if pos.ticker not in prices: last_ts = self._last_price_timestamps.get(pos.ticker) if last_ts and (now - last_ts) > timedelta(minutes=15): logger.warning( "No price data for %s for >15 min — submitting safety sell", pos.ticker, ) await self._submit_sell_order( pos.ticker, pos.quantity, "safety_sell_missing_price" ) # Check crossings triggers = self.check_stop_loss_crossings(positions, prices, stop_levels) for trigger in triggers: # Find the position to get quantity pos_match = next((p for p in positions if p.ticker == trigger.ticker), None) if pos_match is None: continue await self._submit_sell_order( trigger.ticker, pos_match.quantity, f"{trigger.trigger_type}_triggered", ) logger.info( "Stop-loss monitor: %s triggered for %s at %.2f (trigger: %.2f)", trigger.trigger_type, trigger.ticker, trigger.current_price, trigger.trigger_price, ) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in stop-loss monitor") if self.running: await asyncio.sleep(5) async def _performance_loop(self) -> None: """Compute and update performance metrics periodically. Task 29.1: Runs every 5 minutes during market hours, computing portfolio metrics and updating self.portfolio_state. Task 29.2: Persists a daily snapshot at end of trading day. """ last_snapshot_date: str | None = None while self.running: try: await asyncio.sleep(300) # 5 minutes if not self.running: break now = datetime.now(tz=timezone.utc) # Skip if not market hours if not is_market_open(now): # Check if we should persist end-of-day snapshot (Task 29.2) from services.trading.trading_window import ET et_now = now.astimezone(ET) today_str = et_now.strftime("%Y-%m-%d") # After 4:00 PM ET and haven't snapshotted today if et_now.hour >= 16 and last_snapshot_date != today_str: await self._persist_daily_snapshot(now) last_snapshot_date = today_str continue # Compute metrics from current state if self.portfolio_state is None: continue # Sync active_pool with actual positions from DB try: if self.pool is not None: pos_rows = await self.pool.fetch( "SELECT quantity, avg_entry_price, current_price FROM positions WHERE quantity > 0" ) invested = sum(float(r["quantity"]) * float(r["current_price"] or r["avg_entry_price"]) for r in pos_rows) open_count = len(pos_rows) available = max(0.0, self.portfolio_state.total_value - self.portfolio_state.reserve_pool - invested) self.portfolio_state.active_pool = available self.portfolio_state.open_position_count = open_count except Exception: logger.debug("Could not sync positions for portfolio state") # Update portfolio heat and metrics from current positions try: metrics = self.performance_computer.compute_metrics( closed_trades=[], portfolio_value=self.portfolio_state.total_value, active_pool=self.portfolio_state.active_pool, reserve_pool=self.portfolio_state.reserve_pool, daily_pnl=0.0, unrealized_pnl=sum( p.unrealized_pnl for p in self.portfolio_state.positions ), portfolio_heat=self.portfolio_state.portfolio_heat, daily_returns=[], ) logger.debug( "Performance update: value=%.2f heat=%.4f", metrics.total_portfolio_value, metrics.portfolio_heat, ) except Exception: logger.debug("Could not compute performance metrics") # Refresh correlation matrix every 5 minutes try: await self._compute_correlation_matrix() except Exception: logger.debug("Could not refresh correlation matrix") # Profit-taking: sell positions that have exceeded the take-profit threshold try: await self._check_profit_taking() except Exception: logger.debug("Could not run profit-taking check") # Circuit breaker: check daily loss threshold try: await self._check_circuit_breaker_daily_loss() except Exception: logger.debug("Could not run circuit breaker daily loss check") # Persist a snapshot every cycle (during market hours) for the performance tab try: await self._persist_daily_snapshot(now) except Exception: logger.debug("Could not persist snapshot") except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in performance loop") if self.running: await asyncio.sleep(5) async def _risk_tier_scheduler(self) -> None: """Evaluate risk tier at daily market close (16:00 ET). Task 30.1: Computes seconds until next 16:00 ET, sleeps until then, loads latest PerformanceMetrics, computes reserve_pct, calls evaluate_risk_tier(), and persists tier changes. """ from services.trading.trading_window import ET while self.running: try: # Compute seconds until next 16:00 ET now_utc = datetime.now(tz=timezone.utc) et_now = now_utc.astimezone(ET) target_today = et_now.replace(hour=16, minute=0, second=0, microsecond=0) if et_now >= target_today: # Already past 16:00 ET today — target tomorrow target = target_today + timedelta(days=1) else: target = target_today # Skip weekends while target.weekday() > 4: # Saturday=5, Sunday=6 target += timedelta(days=1) sleep_seconds = (target - et_now).total_seconds() if sleep_seconds > 0: await asyncio.sleep(sleep_seconds) if not self.running: break if self.portfolio_state is None: continue # Load latest PerformanceMetrics from portfolio_snapshots or compute fresh metrics: PerformanceMetrics | None = None if self.pool is not None: try: row = await self.pool.fetchrow( "SELECT metrics FROM portfolio_snapshots " "ORDER BY snapshot_date DESC LIMIT 1" ) if row and row["metrics"]: m = json.loads(row["metrics"]) if isinstance(row["metrics"], str) else row["metrics"] if m: metrics = PerformanceMetrics( total_portfolio_value=m.get("total_portfolio_value", self.portfolio_state.total_value), active_pool=m.get("active_pool", self.portfolio_state.active_pool), reserve_pool=m.get("reserve_pool", self.portfolio_state.reserve_pool), unrealized_pnl=m.get("unrealized_pnl", 0.0), realized_pnl=m.get("realized_pnl", 0.0), daily_pnl=m.get("daily_pnl", 0.0), win_count=m.get("win_count", 0), loss_count=m.get("loss_count", 0), win_rate=m.get("win_rate", 0.0), avg_win=m.get("avg_win", 0.0), avg_loss=m.get("avg_loss", 0.0), profit_factor=m.get("profit_factor", 0.0), sharpe_ratio=m.get("sharpe_ratio", 0.0), max_drawdown=m.get("max_drawdown", 0.0), current_drawdown_pct=m.get("current_drawdown_pct", 0.0), portfolio_heat=m.get("portfolio_heat", 0.0), ) except Exception: logger.debug("Could not load metrics from portfolio_snapshots") # Fall back to computing fresh metrics if metrics is None: metrics = self.performance_computer.compute_metrics( closed_trades=[], portfolio_value=self.portfolio_state.total_value, active_pool=self.portfolio_state.active_pool, reserve_pool=self.portfolio_state.reserve_pool, daily_pnl=0.0, unrealized_pnl=sum( p.unrealized_pnl for p in self.portfolio_state.positions ), portfolio_heat=self.portfolio_state.portfolio_heat, daily_returns=[], ) # Compute reserve_pct total_value = self.portfolio_state.total_value reserve_pct = ( self.portfolio_state.reserve_pool / total_value if total_value > 0 else 0.0 ) # Evaluate risk tier current_tier = self.config.risk_tier new_tier = self.evaluate_risk_tier(current_tier, metrics, reserve_pct) if new_tier is not None and new_tier != current_tier: # Persist to risk_tier_history if self.pool is not None: try: await self.pool.execute( "INSERT INTO risk_tier_history " "(previous_tier, new_tier, trigger_source, trigger_metrics, created_at) " "VALUES ($1, $2, $3, $4, $5)", current_tier, new_tier, "auto_adjustment", json.dumps({ "win_rate": metrics.win_rate, "current_drawdown_pct": metrics.current_drawdown_pct, "reserve_pct": reserve_pct, "sharpe_ratio": metrics.sharpe_ratio, }), datetime.now(tz=timezone.utc), ) except Exception: logger.debug("Could not persist risk tier change") # Update config and active tier self.config.risk_tier = new_tier self._active_risk_tier = RISK_TIER_DEFAULTS.get( new_tier, RISK_TIER_DEFAULTS["moderate"] ) # Create alert notification self.create_alert( "risk_tier_changed", f"Risk tier changed from {current_tier} to {new_tier} " f"(win_rate={metrics.win_rate:.2%}, " f"drawdown={metrics.current_drawdown_pct:.2%}, " f"reserve={reserve_pct:.2%})", ) logger.info( "Risk tier changed: %s → %s (win_rate=%.2f, drawdown=%.2f, reserve=%.2f)", current_tier, new_tier, metrics.win_rate, metrics.current_drawdown_pct, reserve_pct, ) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in risk tier scheduler") if self.running: await asyncio.sleep(60) async def _rebalance_scheduler(self) -> None: """Evaluate portfolio rebalancing weekly at Monday 09:45 ET. Task 30.2: Computes seconds until next Monday 09:45 ET, sleeps until then, loads positions and risk tier, calls evaluate_rebalancing(), and pushes rebalance orders to the broker queue. """ from services.trading.trading_window import ET, WINDOW_OPEN while self.running: try: # Compute seconds until next Monday 09:45 ET now_utc = datetime.now(tz=timezone.utc) et_now = now_utc.astimezone(ET) # Find next Monday days_until_monday = (7 - et_now.weekday()) % 7 if days_until_monday == 0: # It's Monday — check if we're past 09:45 target_today = et_now.replace( hour=WINDOW_OPEN.hour, minute=WINDOW_OPEN.minute, second=0, microsecond=0, ) if et_now >= target_today: # Already past 09:45 on Monday — target next Monday days_until_monday = 7 else: days_until_monday = 0 target = et_now.replace( hour=WINDOW_OPEN.hour, minute=WINDOW_OPEN.minute, second=0, microsecond=0, ) + timedelta(days=days_until_monday) sleep_seconds = (target - et_now).total_seconds() if sleep_seconds > 0: await asyncio.sleep(sleep_seconds) if not self.running: break # Respect circuit breaker status if self.circuit_breaker.is_active(self._cb_state, now=datetime.now(tz=timezone.utc)): logger.info("Rebalance skipped — circuit breaker is active") continue if self.portfolio_state is None: continue # Load current positions positions = self.portfolio_state.positions if self.pool is not None: try: positions = await self._load_open_positions() except Exception: logger.debug("Could not load positions for rebalancing") if not positions: continue # Evaluate rebalancing max_positions = ( self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10 ) rebalance_orders = self.rebalancer.evaluate( positions, self._active_risk_tier, self.portfolio_state.active_pool, max_positions, ) # Push rebalance orders to broker queue for order in rebalance_orders: order_job = { "ticker": order.ticker, "action": order.action, "quantity": order.quantity, "order_type": "market", "source": "trading_engine", "reason": order.reason, "tag": order.tag, } if self.redis is not None: try: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info( "Rebalance: pushed %s order for %s (%d shares)", order.action, order.ticker, order.quantity, ) except Exception: logger.exception("Failed to push rebalance order for %s", order.ticker) else: logger.info( "Rebalance (no redis): %s %s %d shares — %s", order.action, order.ticker, order.quantity, order.reason, ) if rebalance_orders: logger.info("Rebalance cycle completed: %d orders generated", len(rebalance_orders)) else: logger.debug("Rebalance cycle completed: no orders needed") except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in rebalance scheduler") if self.running: await asyncio.sleep(60) # ------------------------------------------------------------------ # Correlation matrix computation # ------------------------------------------------------------------ async def _compute_correlation_matrix(self) -> None: """Compute pairwise price correlations from market_snapshots and load into self.correlation_matrix. Queries the last 30 days of daily close prices, computes daily returns, then calculates Pearson correlation coefficients between each ticker pair. Uses numpy when available, otherwise falls back to a manual computation. """ if self.pool is None: return try: rows = await self.pool.fetch( "SELECT ticker, captured_at::date AS dt, (data->>'c')::float AS close " "FROM market_snapshots " "WHERE snapshot_type = 'bar' AND captured_at > NOW() - INTERVAL '30 days' " "ORDER BY ticker, captured_at" ) except Exception: logger.debug("Could not query market_snapshots for correlation matrix") return if not rows: return # Group close prices by ticker, keyed by date ticker_prices: dict[str, dict] = {} for row in rows: ticker = row["ticker"] dt = row["dt"] close = row["close"] if close is None: continue if ticker not in ticker_prices: ticker_prices[ticker] = {} ticker_prices[ticker][dt] = close # Compute daily returns for each ticker ticker_returns: dict[str, list[float]] = {} all_dates: set = set() for ticker, prices_by_date in ticker_prices.items(): sorted_dates = sorted(prices_by_date.keys()) all_dates.update(sorted_dates) returns = [] for i in range(1, len(sorted_dates)): prev = prices_by_date[sorted_dates[i - 1]] curr = prices_by_date[sorted_dates[i]] if prev > 0: returns.append((curr - prev) / prev) if returns: ticker_returns[ticker] = returns tickers = list(ticker_returns.keys()) if len(tickers) < 2: return # Align returns to common dates for proper pairwise comparison sorted_all_dates = sorted(all_dates) aligned_returns: dict[str, list[float]] = {} for ticker in tickers: prices_by_date = ticker_prices[ticker] aligned = [] for i in range(1, len(sorted_all_dates)): prev_dt = sorted_all_dates[i - 1] curr_dt = sorted_all_dates[i] if prev_dt in prices_by_date and curr_dt in prices_by_date: prev_p = prices_by_date[prev_dt] curr_p = prices_by_date[curr_dt] if prev_p > 0: aligned.append((curr_p - prev_p) / prev_p) else: aligned.append(0.0) else: aligned.append(None) # type: ignore[arg-type] aligned_returns[ticker] = aligned corr_data: dict[tuple[str, str], float] = {} if _HAS_NUMPY: import numpy as _np for i in range(len(tickers)): for j in range(i + 1, len(tickers)): a_raw = aligned_returns[tickers[i]] b_raw = aligned_returns[tickers[j]] # Use only indices where both have valid returns pairs = [ (a_raw[k], b_raw[k]) for k in range(len(a_raw)) if a_raw[k] is not None and b_raw[k] is not None ] if len(pairs) < 5: continue a_arr = _np.array([p[0] for p in pairs]) b_arr = _np.array([p[1] for p in pairs]) corr_matrix = _np.corrcoef(a_arr, b_arr) corr_val = float(corr_matrix[0, 1]) if not _np.isnan(corr_val): corr_data[(tickers[i], tickers[j])] = corr_val else: # Manual Pearson correlation fallback for i in range(len(tickers)): for j in range(i + 1, len(tickers)): a_raw = aligned_returns[tickers[i]] b_raw = aligned_returns[tickers[j]] pairs = [ (a_raw[k], b_raw[k]) for k in range(len(a_raw)) if a_raw[k] is not None and b_raw[k] is not None ] if len(pairs) < 5: continue a_vals = [p[0] for p in pairs] b_vals = [p[1] for p in pairs] n = len(a_vals) mean_a = sum(a_vals) / n mean_b = sum(b_vals) / n cov = sum((a_vals[k] - mean_a) * (b_vals[k] - mean_b) for k in range(n)) std_a = sum((v - mean_a) ** 2 for v in a_vals) ** 0.5 std_b = sum((v - mean_b) ** 2 for v in b_vals) ** 0.5 if std_a > 0 and std_b > 0: corr_data[(tickers[i], tickers[j])] = cov / (std_a * std_b) self.correlation_matrix.load(corr_data) logger.info("Correlation matrix loaded: %d pairs", len(corr_data)) # ------------------------------------------------------------------ # Async helpers # ------------------------------------------------------------------ async def _persist_decision(self, decision: TradingDecision) -> None: """INSERT a trading decision into the trading_decisions table. Task 27.5: Handles pool=None gracefully (skip persistence, log only). """ logger.info( "Decision: %s %s ticker=%s reason=%s", decision.decision, decision.id[:8], decision.ticker, decision.skip_reason or "—", ) if self.pool is None: return try: await self.pool.execute( "INSERT INTO trading_decisions " "(id, recommendation_id, decision, skip_reason, ticker, " "computed_position_size, computed_share_quantity, " "risk_tier_at_decision, portfolio_heat_at_decision, " "active_pool_at_decision, reserve_pool_at_decision, " "circuit_breaker_status, correlation_check_result, " "sector_exposure_check_result, earnings_proximity_flag, " "is_micro_trade, decision_trace, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, " "$11, $12, $13, $14, $15, $16, $17, $18)", decision.id, decision.recommendation_id, decision.decision, decision.skip_reason, decision.ticker, decision.computed_position_size, decision.computed_share_quantity, decision.risk_tier_at_decision, decision.portfolio_heat_at_decision, decision.active_pool_at_decision, decision.reserve_pool_at_decision, decision.circuit_breaker_status, json.dumps(decision.correlation_check_result), json.dumps(decision.sector_exposure_check_result), decision.earnings_proximity_flag, decision.is_micro_trade, json.dumps(decision.decision_trace), decision.created_at, ) except Exception: logger.debug("Could not persist decision %s — table may not exist", decision.id[:8]) async def _sync_positions_and_siphon(self) -> None: """Sync positions from DB and siphon profit on closed positions. Task 27.4: Fetches current positions, detects closes, and calls siphon_profit() for profitable closes. """ if self.pool is None or self.portfolio_state is None: return try: positions = await self._load_open_positions() old_tickers = {p.ticker for p in self.portfolio_state.positions} new_tickers = {p.ticker for p in positions} # Detect closed positions closed_tickers = old_tickers - new_tickers for ticker in closed_tickers: old_pos = next( (p for p in self.portfolio_state.positions if p.ticker == ticker), None, ) if old_pos and old_pos.unrealized_pnl > 0: transfer, new_balance = self.reserve_pool_controller.siphon_profit( old_pos.unrealized_pnl, self.portfolio_state.reserve_pool, ) if transfer > 0: self.portfolio_state.reserve_pool = new_balance # Persist to reserve_pool_ledger try: await self.pool.execute( "INSERT INTO reserve_pool_ledger " "(amount, balance_after, trigger_type, reference_id, notes, created_at) " "VALUES ($1, $2, 'profit_siphon', $3, $4, $5)", transfer, new_balance, ticker, f"Siphoned from {ticker} close", datetime.now(tz=timezone.utc), ) except Exception: logger.debug("Could not persist siphon event for %s", ticker) logger.info( "Siphoned $%.2f from %s close → reserve now $%.2f", transfer, ticker, new_balance, ) # Update portfolio state self.portfolio_state.positions = positions self.portfolio_state.open_position_count = len(positions) except Exception: logger.exception("Error syncing positions") async def _fetch_current_prices(self, tickers: list[str]) -> dict[str, float]: """Fetch latest prices from Polygon API for the given tickers. Task 28.2: Uses httpx for async HTTP calls. Returns a dict mapping ticker → latest price. Handles API errors gracefully. """ if not tickers: return {} prices: dict[str, float] = {} # Use the market data config for API key api_key = "" base_url = "https://api.polygon.io" try: from services.shared.config import load_config app_config = load_config() api_key = app_config.market_data.api_key base_url = app_config.market_data.base_url except Exception: pass if not api_key: logger.debug("No Polygon API key configured — skipping price fetch") return prices try: async with httpx.AsyncClient(timeout=10.0) as client: # Use the grouped daily endpoint or snapshot for multiple tickers tickers_str = ",".join(tickers) url = f"{base_url}/v2/snapshot/locale/us/markets/stocks/tickers" params = {"tickers": tickers_str, "apiKey": api_key} resp = await client.get(url, params=params) if resp.status_code == 200: data = resp.json() for item in data.get("tickers", []): t = item.get("ticker", "") last_trade = item.get("lastTrade", {}) price = last_trade.get("p", 0.0) if t and price > 0: prices[t] = price else: logger.warning("Polygon API returned status %d", resp.status_code) except Exception: logger.warning("Failed to fetch prices from Polygon API") return prices async def _load_open_positions(self) -> list[OpenPosition]: """Load open positions from the database. Task 28.3: Queries the position_stop_levels table for active positions. Returns typed OpenPosition list. """ if self.pool is None: return [] positions: list[OpenPosition] = [] try: rows = await self.pool.fetch( "SELECT ticker, entry_price, stop_loss_price, take_profit_price, " "signal_confidence, is_micro_trade " "FROM position_stop_levels WHERE active = TRUE" ) for row in rows: positions.append( OpenPosition( ticker=row["ticker"], quantity=1, # Default; real quantity from orders table entry_price=float(row["entry_price"]), current_price=float(row["entry_price"]), unrealized_pnl=0.0, market_value=float(row["entry_price"]), sector="", stop_loss_price=float(row["stop_loss_price"]), take_profit_price=float(row["take_profit_price"]), signal_confidence=float(row["signal_confidence"]), is_micro_trade=bool(row["is_micro_trade"]), ) ) except Exception: logger.debug("Could not load open positions — table may not exist") return positions async def _load_stop_levels(self) -> dict[str, StopLevels]: """Load active stop-loss/take-profit levels from the database. Task 28.3: Queries position_stop_levels WHERE active = TRUE. Returns dict keyed by ticker. """ if self.pool is None: return {} levels: dict[str, StopLevels] = {} try: rows = await self.pool.fetch( "SELECT ticker, stop_loss_price, take_profit_price, " "trailing_stop_active, atr_value, atr_multiplier, " "reward_risk_ratio, updated_at " "FROM position_stop_levels WHERE active = TRUE" ) for row in rows: levels[row["ticker"]] = StopLevels( stop_loss_price=float(row["stop_loss_price"]), take_profit_price=float(row["take_profit_price"]), trailing_stop_active=bool(row["trailing_stop_active"]), atr_value=float(row["atr_value"]), atr_multiplier=float(row["atr_multiplier"]), reward_risk_ratio=float(row["reward_risk_ratio"]), ) return levels except Exception: logger.debug("Could not load stop levels — table may not exist") return {} async def _check_circuit_breaker_daily_loss(self) -> None: """Check if daily unrealized loss exceeds the circuit breaker threshold. If the portfolio has lost more than circuit_breaker_daily_loss_pct (default 5%) from its start-of-day value, activate the circuit breaker to halt all new trades. """ if self.pool is None or self.portfolio_state is None: return # Already active — nothing to do if self._cb_state.active: return try: # Get total unrealized P&L from positions row = await self.pool.fetchrow( "SELECT COALESCE(SUM(unrealized_pnl), 0) AS total_pnl FROM positions WHERE quantity > 0" ) total_pnl = float(row["total_pnl"]) if row else 0.0 total_value = self.portfolio_state.total_value if total_value <= 0: return daily_loss_pct = abs(min(0, total_pnl)) / total_value # Check against threshold (default 5%) threshold = getattr(self.config, 'circuit_breaker_daily_loss_pct', 0.05) or 0.05 if isinstance(threshold, str): threshold = float(threshold) if daily_loss_pct >= threshold: # Activate circuit breaker now = datetime.now(tz=timezone.utc) cooldown_hours = getattr(self.config, 'circuit_breaker_volatility_pause_hours', 2) or 2 cooldown_expires = now + timedelta(hours=int(cooldown_hours)) self._cb_state = CircuitBreakerState( active=True, trigger_type="daily_loss_limit", triggered_at=now, cooldown_expires=cooldown_expires, ) # Persist to DB try: await self.pool.execute( "INSERT INTO circuit_breaker_events " "(trigger_type, triggered_at, cooldown_expires, trigger_data) " "VALUES ($1, $2, $3, $4)", "daily_loss_limit", now, cooldown_expires, json.dumps({ "daily_loss_pct": round(daily_loss_pct, 4), "threshold": threshold, "total_pnl": round(total_pnl, 2), "total_value": round(total_value, 2), }), ) except Exception: logger.debug("Could not persist circuit breaker event") logger.warning( "CIRCUIT BREAKER ACTIVATED: daily loss %.1f%% exceeds %.1f%% threshold " "(P&L=$%.2f, value=$%.2f). Trading halted until %s", daily_loss_pct * 100, threshold * 100, total_pnl, total_value, cooldown_expires.isoformat(), ) # Create alert self.create_alert( "circuit_breaker_activated", f"Daily loss {daily_loss_pct:.1%} exceeded {threshold:.1%} threshold. " f"Trading halted until {cooldown_expires.strftime('%H:%M ET')}.", ) except Exception: logger.debug("Could not check circuit breaker daily loss") async def _check_profit_taking(self) -> None: """Check positions for profit-taking opportunities. Sells positions that have gained more than the take-profit threshold. Only runs during trading window hours. """ if self.pool is None: return # Only sell during market hours now = datetime.now(tz=timezone.utc) if not is_within_trading_window(now): return try: rows = await self.pool.fetch( "SELECT ticker, quantity, avg_entry_price, current_price, unrealized_pnl " "FROM positions WHERE quantity > 0" ) except Exception: return for row in rows: ticker = row["ticker"] qty = int(row["quantity"]) entry = float(row["avg_entry_price"]) current = float(row["current_price"] or 0) if entry <= 0 or current <= 0 or qty <= 0: continue gain_pct = (current - entry) / entry # Absolute profit cap: sell if gain > 10% # Or tier-based: reward_risk_ratio * stop_loss_atr_multiplier * ~2% base ATR tier_target = self._active_risk_tier.reward_risk_ratio * self._active_risk_tier.stop_loss_atr_multiplier * 0.02 should_sell = gain_pct >= 0.10 or gain_pct >= tier_target if should_sell: logger.info( "Profit-taking: %s gained %.1f%% (target=%.1f%%) — selling %d shares", ticker, gain_pct * 100, max(10.0, tier_target * 100), qty, ) await self._submit_sell_order(ticker, qty, f"profit_taking_{gain_pct:.1%}") # Update portfolio state if self.portfolio_state: self.portfolio_state.open_position_count = max( 0, self.portfolio_state.open_position_count - 1 ) self.portfolio_state.active_pool += current * qty async def _submit_sell_order( self, ticker: str, quantity: int, reason: str, estimated_value: float = 0.0, ) -> None: """Push a sell order to the broker queue via Redis.""" order_job = { "ticker": ticker, "action": "sell", "side": "sell", "quantity": quantity, "order_type": "market", "estimated_value": estimated_value, "source": "trading_engine", "reason": reason, } if self.redis is not None: try: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info("Submitted sell order for %s (%d shares): %s", ticker, quantity, reason) except Exception: logger.exception("Failed to push sell order for %s", ticker) else: logger.info("Sell order (no redis): %s %d shares — %s", ticker, quantity, reason) async def _persist_daily_snapshot(self, now: datetime) -> None: """Persist end-of-day portfolio snapshot to portfolio_snapshots table. Task 29.2: Called after 4:00 PM ET when market closes. """ if self.pool is None or self.portfolio_state is None: return from services.trading.trading_window import ET et_now = now.astimezone(ET) snapshot_date = et_now.date() try: await self.pool.execute( "INSERT INTO portfolio_snapshots " "(snapshot_date, portfolio_value, active_pool, reserve_pool, " "daily_return, cumulative_return, unrealized_pnl, realized_pnl, " "win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, " "current_drawdown_pct, portfolio_heat, risk_tier, " "positions, metrics, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, " "$11, $12, $13, $14, $15, $16, $17, $18, $19) " "ON CONFLICT (snapshot_date) DO UPDATE SET " "portfolio_value = EXCLUDED.portfolio_value, " "active_pool = EXCLUDED.active_pool, " "reserve_pool = EXCLUDED.reserve_pool, " "updated_at = NOW()", snapshot_date, self.portfolio_state.total_value, self.portfolio_state.active_pool, self.portfolio_state.reserve_pool, 0.0, # daily_return 0.0, # cumulative_return sum(p.unrealized_pnl for p in self.portfolio_state.positions), 0.0, # realized_pnl 0, # win_count 0, # loss_count 0.0, # win_rate 0.0, # sharpe_ratio 0.0, # max_drawdown 0.0, # current_drawdown_pct self.portfolio_state.portfolio_heat, self.config.risk_tier, json.dumps([]), # positions json.dumps({}), # metrics now, ) logger.info("Persisted daily snapshot for %s", snapshot_date) except Exception: logger.debug("Could not persist daily snapshot — table may not exist") # ------------------------------------------------------------------ # Decision builders # ------------------------------------------------------------------ def _skip_decision( self, *, rec_id: str | None, ticker: str, skip_reason: str, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, circuit_breaker_state: CircuitBreakerState, reasoning: list[str], now: datetime, ) -> TradingDecision: return TradingDecision( id=str(uuid.uuid4()), recommendation_id=rec_id, decision="skip", skip_reason=skip_reason, ticker=ticker, computed_position_size=None, computed_share_quantity=None, risk_tier_at_decision=risk_tier.name, portfolio_heat_at_decision=portfolio_state.portfolio_heat, active_pool_at_decision=portfolio_state.active_pool, reserve_pool_at_decision=portfolio_state.reserve_pool, circuit_breaker_status="active" if circuit_breaker_state.active else "inactive", decision_trace={"reasoning": reasoning}, created_at=now, ) def _act_decision( self, *, rec_id: str | None, ticker: str, size_result: PositionSizeResult, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, circuit_breaker_state: CircuitBreakerState, reasoning: list[str], now: datetime, ) -> TradingDecision: return TradingDecision( id=str(uuid.uuid4()), recommendation_id=rec_id, decision="act", skip_reason=None, ticker=ticker, computed_position_size=size_result.dollar_amount, computed_share_quantity=size_result.share_quantity, risk_tier_at_decision=risk_tier.name, portfolio_heat_at_decision=portfolio_state.portfolio_heat, active_pool_at_decision=portfolio_state.active_pool, reserve_pool_at_decision=portfolio_state.reserve_pool, circuit_breaker_status="active" if circuit_breaker_state.active else "inactive", decision_trace={"reasoning": reasoning}, created_at=now, )