"""Core autonomous trading engine — decision loop and pre-trade evaluation. Feature: autonomous-trading-engine Coordinates all trading sub-components (PositionSizer, StopLossManager, CircuitBreaker, ReservePoolController, RiskTierController, CorrelationMatrix) to evaluate recommendations and produce TradingDecision records. The ``evaluate_recommendation`` method is deliberately synchronous-compatible so that it can be tested without real DB/Redis connections. The async ``start`` / ``stop`` methods manage the live decision loop, stop-loss monitor, and performance metrics loop. """ from __future__ import annotations import asyncio import json import logging import uuid from datetime import datetime, timedelta, timezone import httpx from services.shared.config import TradingConfig from services.shared.redis_keys import ( QUEUE_BROKER, queue_key, trading_dedupe_key, ) from services.trading.circuit_breaker import CircuitBreaker from services.trading.correlation import CorrelationMatrix from services.trading.micro_trading import MicroTradeConfig, MicroTradingModule from services.trading.models import ( RISK_TIER_DEFAULTS, CircuitBreakerState, OpenPosition, PerformanceMetrics, PortfolioState, PositionSizeResult, RiskTierConfig, StopLevels, StopTrigger, TradingDecision, ) from services.trading.notifications import NotificationRecord, NotificationService from services.trading.performance_tracker import PerformanceComputer from services.trading.position_sizer import PositionSizer from services.trading.rebalancer import PortfolioRebalancer from services.trading.reserve_pool import ReservePoolController from services.trading.risk_tier_controller import RiskTierController from services.trading.stop_loss_manager import StopLossManager from services.trading.trading_window import is_market_open, is_within_trading_window logger = logging.getLogger("trading_engine") class TradingEngine: """Main autonomous trading engine. Manages the decision loop, coordinates all sub-components, and maintains runtime state. Parameters ---------- pool: asyncpg connection pool (used by async lifecycle methods). redis: Redis client (used for deduplication and pub/sub). config: Trading engine configuration. """ def __init__( self, pool: object, redis: object, config: TradingConfig, ) -> None: self.pool = pool self.redis = redis self.config = config # Sub-components self.position_sizer = PositionSizer() self.stop_loss_manager = StopLossManager() self.circuit_breaker = CircuitBreaker() self.reserve_pool_controller = ReservePoolController( siphon_pct=config.reserve_siphon_pct, high_water_pct=config.reserve_high_water_pct, ) self.risk_tier_controller = RiskTierController() self.correlation_matrix = CorrelationMatrix() self.notification_service = NotificationService() self.micro_trading_module = MicroTradingModule() self.rebalancer = PortfolioRebalancer() self.performance_computer = PerformanceComputer() # Runtime state self.running: bool = False self.portfolio_state: PortfolioState | None = None self.processed_recommendation_ids: set[str] = set() # Async task management (Task 27.6) self._tasks: list[asyncio.Task] = [] # type: ignore[type-arg] # Active risk tier loaded from config defaults self._active_risk_tier: RiskTierConfig = RISK_TIER_DEFAULTS.get( config.risk_tier, RISK_TIER_DEFAULTS["moderate"] ) # Circuit breaker runtime state self._cb_state: CircuitBreakerState = CircuitBreakerState() # Earnings calendar cache self._earnings_calendar: dict = {} # Last poll timestamp — initialised to 24 h ago so first poll # picks up recent recommendations self._last_poll_timestamp: datetime = datetime.now(tz=timezone.utc) - timedelta(hours=24) # Per-ticker last-price-fetch timestamps for safety sell (Task 28.4) self._last_price_timestamps: dict[str, datetime] = {} # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: """Load portfolio state and spawn the async worker loops. When ``self.pool`` is ``None`` (unit-test / lightweight mode) the engine skips database loading and starts with an empty portfolio. """ # --- Load initial state from PostgreSQL (graceful degradation) --- if self.pool is not None: try: await self._load_initial_state() except Exception: logger.exception("Failed to load initial state from DB — starting with defaults") # Ensure we always have a portfolio state if self.portfolio_state is None: self.portfolio_state = PortfolioState() self.running = True # Spawn async worker loops self._tasks = [ asyncio.create_task(self._decision_loop(), name="decision_loop"), asyncio.create_task(self._stop_loss_monitor(), name="stop_loss_monitor"), asyncio.create_task(self._performance_loop(), name="performance_loop"), asyncio.create_task(self._risk_tier_scheduler(), name="risk_tier_scheduler"), asyncio.create_task(self._rebalance_scheduler(), name="rebalance_scheduler"), ] logger.info("Trading engine started with %d worker tasks", len(self._tasks)) async def stop(self) -> None: """Graceful shutdown — cancel all worker tasks and persist state.""" self.running = False # Cancel all tasks for task in self._tasks: task.cancel() if self._tasks: await asyncio.gather(*self._tasks, return_exceptions=True) self._tasks.clear() logger.info("Trading engine stopped") # ------------------------------------------------------------------ # Core evaluation logic (synchronous-compatible for testing) # ------------------------------------------------------------------ def evaluate_recommendation( self, rec: dict, portfolio_state: PortfolioState, risk_tier: RiskTierConfig, circuit_breaker_state: CircuitBreakerState, correlation_matrix: CorrelationMatrix, earnings_calendar: dict, now: datetime | None = None, ) -> TradingDecision: """Run all pre-trade checks and produce a TradingDecision. The checks are applied in order; the first failure short-circuits with a ``skip`` decision. If all checks pass the PositionSizer is invoked and its result determines the final decision. Parameters ---------- rec: Recommendation dict with at least ``recommendation_id``, ``ticker``, ``confidence``, ``sector``, ``current_price``, and ``action``. portfolio_state: Current portfolio snapshot. risk_tier: Active risk tier configuration. circuit_breaker_state: Current circuit breaker state. correlation_matrix: Correlation matrix instance for diversification checks. earnings_calendar: Mapping of ticker → next earnings datetime. now: Optional override for the current timestamp (for testing). """ now = now or datetime.now(tz=timezone.utc) rec_id = rec.get("recommendation_id") ticker = rec.get("ticker", "") confidence = rec.get("confidence", 0.0) sector = rec.get("sector", "") current_price = rec.get("current_price", 0.0) reasoning: list[str] = [] # --- a. Circuit breaker active? -------------------------------- if self.circuit_breaker.is_active(circuit_breaker_state, now=now): reasoning.append("Circuit breaker is active") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="circuit_breaker_active", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- b. Trading window? ---------------------------------------- if not is_within_trading_window(now): reasoning.append("Outside trading window") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="outside_trading_window", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- c. Confidence gate (early check before sizer) ------------- if confidence < risk_tier.min_confidence: reasoning.append( f"Confidence {confidence:.4f} below tier minimum " f"{risk_tier.min_confidence}" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="insufficient_confidence", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- d. Deduplication check ------------------------------------ if rec_id and rec_id in self.processed_recommendation_ids: reasoning.append(f"Recommendation {rec_id} already processed") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="duplicate_recommendation", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- e. Multiple declining positions check --------------------- if self.check_declining_positions(portfolio_state.positions): reasoning.append( "Multiple declining positions — halting new entries" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="multiple_declining_positions", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- f. Max open positions check ------------------------------- max_positions = self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10 if self.check_max_positions( portfolio_state.open_position_count, max_positions ): reasoning.append( f"At max open positions ({portfolio_state.open_position_count}/" f"{max_positions})" ) return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason="max_positions_reached", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # --- g. Position sizing ---------------------------------------- reasoning.append("All pre-trade checks passed — computing position size") # Build the raw correlation dict expected by PositionSizer corr_dict: dict[tuple[str, str], float] = correlation_matrix._data size_result: PositionSizeResult = self.position_sizer.compute( confidence=confidence, ticker=ticker, sector=sector, current_price=current_price, active_pool=portfolio_state.active_pool, risk_tier=risk_tier, portfolio_state=portfolio_state, correlation_matrix=corr_dict, earnings_calendar=earnings_calendar, absolute_position_cap=self.config.absolute_position_cap, active_pool_minimum=self.config.active_pool_minimum, ) reasoning.extend(size_result.adjustments) if size_result.rejected: reasoning.append(f"Position sizer rejected: {size_result.rejection_reason}") return self._skip_decision( rec_id=rec_id, ticker=ticker, skip_reason=f"position_sizer_rejected: {size_result.rejection_reason}", risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # Mark recommendation as processed if rec_id: self.processed_recommendation_ids.add(rec_id) return self._act_decision( rec_id=rec_id, ticker=ticker, size_result=size_result, risk_tier=risk_tier, portfolio_state=portfolio_state, circuit_breaker_state=circuit_breaker_state, reasoning=reasoning, now=now, ) # ------------------------------------------------------------------ # Helper checks # ------------------------------------------------------------------ def check_declining_positions( self, positions: list[OpenPosition], threshold_pct: float = 0.50, decline_pct: float = 0.02, ) -> bool: """Return True if > threshold_pct of positions have > decline_pct negative unrealized P&L. A position is considered "declining" when its unrealized P&L as a fraction of its entry value is worse than ``-decline_pct``. Parameters ---------- positions: List of currently open positions. threshold_pct: Fraction of positions that must be declining to trigger (default 0.50 = 50%). decline_pct: Minimum loss fraction to count as declining (default 0.02 = 2%). """ if not positions: return False declining_count = 0 for pos in positions: entry_value = pos.entry_price * pos.quantity if entry_value <= 0: continue loss_pct = -pos.unrealized_pnl / entry_value if loss_pct > decline_pct: declining_count += 1 return declining_count > threshold_pct * len(positions) def check_max_positions( self, open_count: int, max_positions: int = 10, ) -> bool: """Return True if the portfolio is at the maximum number of open positions.""" return open_count >= max_positions # ------------------------------------------------------------------ # Integration wiring — thin wrappers for the decision loop # ------------------------------------------------------------------ def check_stop_loss_crossings( self, positions: list[OpenPosition], prices: dict[str, float], stop_levels: dict[str, StopLevels], ) -> list[StopTrigger]: """Delegate to StopLossManager.check_price_crossings(). Called by the async decision loop at the configured interval (5 min default, 60s during high-severity events). """ return self.stop_loss_manager.check_price_crossings( positions, prices, stop_levels ) def handle_position_close( self, realized_profit: float, reserve_balance: float, ) -> tuple[float, float]: """Delegate to ReservePoolController.siphon_profit(). Called when a position close event is detected from broker service fill events. """ return self.reserve_pool_controller.siphon_profit( realized_profit, reserve_balance ) def evaluate_risk_tier( self, current_tier: str, metrics: PerformanceMetrics, reserve_pct: float, ) -> str | None: """Delegate to RiskTierController.evaluate(). Scheduled to run at daily market close. """ return self.risk_tier_controller.evaluate( current_tier, metrics, reserve_pct ) def evaluate_rebalancing( self, positions: list[OpenPosition], risk_tier: RiskTierConfig, active_pool: float, ) -> list: """Delegate to PortfolioRebalancer.evaluate(). Scheduled to run weekly at Monday market open. """ return self.rebalancer.evaluate(positions, risk_tier, active_pool) def create_alert( self, event_type: str, details: str, ) -> NotificationRecord: """Create a notification record for a critical event. Delegates formatting and record creation to NotificationService. The caller is responsible for actual delivery. """ message = self.notification_service.format_alert(event_type, details) return self.notification_service.create_notification( channel="email", event_type=event_type, message=message, ) def check_micro_trade_constraints( self, daily_count: int, is_within_window: bool, circuit_breaker_active: bool, portfolio_heat_pct: float, max_heat: float, ) -> tuple[bool, str]: """Delegate to MicroTradingModule.check_constraints(). Called by the decision loop when micro-trading is enabled. """ micro_config = MicroTradeConfig( enabled=self.config.micro_trading_enabled, allocation_cap_pct=self.config.micro_trading_allocation_cap_pct, max_daily=self.config.micro_trading_max_daily, max_hold_minutes=self.config.micro_trading_max_hold_minutes, ) return self.micro_trading_module.check_constraints( config=micro_config, daily_count=daily_count, is_within_window=is_within_window, circuit_breaker_active=circuit_breaker_active, portfolio_heat_pct=portfolio_heat_pct, max_heat=max_heat, ) # ------------------------------------------------------------------ # Async worker loops # ------------------------------------------------------------------ async def _load_initial_state(self) -> None: """Load portfolio state, risk tier, reserve pool, and CB status from DB.""" if self.pool is None: return # Load reserve pool balance reserve_balance = 0.0 try: row = await self.pool.fetchrow( "SELECT balance_after FROM reserve_pool_ledger ORDER BY created_at DESC LIMIT 1" ) if row: reserve_balance = float(row["balance_after"]) except Exception: logger.debug("Could not load reserve pool balance — using 0.0") # Load circuit breaker state (unresolved events) try: cb_row = await self.pool.fetchrow( "SELECT trigger_type, triggered_at, cooldown_expires " "FROM circuit_breaker_events WHERE resolved_at IS NULL " "ORDER BY created_at DESC LIMIT 1" ) if cb_row: self._cb_state = CircuitBreakerState( active=True, trigger_type=cb_row["trigger_type"], triggered_at=cb_row["triggered_at"], cooldown_expires=cb_row["cooldown_expires"], ) except Exception: logger.debug("Could not load circuit breaker state — using inactive") # Build portfolio state with defaults # Default initial capital for paper trading — overridden by broker sync initial_capital = 100000.0 self.portfolio_state = PortfolioState( reserve_pool=reserve_balance, active_pool=max(0.0, initial_capital - reserve_balance), total_value=initial_capital, ) async def _decision_loop(self) -> None: """Poll recommendations and evaluate them in a continuous loop. Task 27.3: Main decision loop that polls the recommendations table, checks Redis deduplication, evaluates each recommendation, and pushes "act" decisions to the broker queue. """ while self.running: try: await asyncio.sleep(self.config.polling_interval_seconds) if not self.running: break if self.pool is None: continue # Poll recommendations from PostgreSQL recs: list[dict] = [] try: rows = await self.pool.fetch( "SELECT * FROM recommendations " "WHERE action IN ('buy','sell') " "AND mode IN ('paper_eligible','live_eligible') " "AND generated_at > $1 " "ORDER BY confidence DESC " "LIMIT 50", self._last_poll_timestamp, ) if rows: # Advance timestamp to the oldest rec in this batch # so next poll picks up where we left off last_gen = max(r["generated_at"] for r in rows) self._last_poll_timestamp = last_gen recs = [dict(r) for r in rows] if recs: logger.info( "Polled %d recommendations (highest confidence=%.3f)", len(recs), recs[0].get("confidence", 0), ) # Fetch current prices for all tickers in this batch batch_tickers = list({r.get("ticker", "") for r in recs if r.get("ticker")}) price_map: dict[str, float] = {} if batch_tickers and self.pool is not None: try: price_rows = await self.pool.fetch( """SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float AS price FROM market_snapshots WHERE ticker = ANY($1) AND snapshot_type = 'bar' ORDER BY ticker, captured_at DESC""", batch_tickers, ) price_map = {r["ticker"]: r["price"] for r in price_rows if r["price"]} except Exception: logger.debug("Could not fetch prices from market_snapshots") # Fall back to Polygon API for any missing prices missing = [t for t in batch_tickers if t not in price_map] if missing: fetched = await self._fetch_current_prices(missing) price_map.update(fetched) except Exception: logger.debug("Could not poll recommendations — table may not exist yet") continue for rec in recs: try: rec_id = str(rec.get("recommendation_id", rec.get("id", ""))) ticker = rec.get("ticker", "") # Inject current price from market data if not rec.get("current_price") and ticker in price_map: rec["current_price"] = price_map[ticker] # Skip if no price available if not rec.get("current_price") or rec["current_price"] <= 0: continue # Redis deduplication check if self.redis is not None: dedupe_key = trading_dedupe_key(rec_id) already = await self.redis.get(dedupe_key) if already: continue # Set dedupe key with 24h TTL before evaluation await self.redis.set(dedupe_key, "1", ex=86400) # Ensure portfolio state exists if self.portfolio_state is None: self.portfolio_state = PortfolioState() # Evaluate recommendation decision = self.evaluate_recommendation( rec=rec, portfolio_state=self.portfolio_state, risk_tier=self._active_risk_tier, circuit_breaker_state=self._cb_state, correlation_matrix=self.correlation_matrix, earnings_calendar=self._earnings_calendar, ) logger.info( "Decision for %s: %s (reason=%s, size=$%.2f, shares=%d)", decision.ticker, decision.decision, decision.skip_reason or "n/a", decision.computed_position_size or 0, decision.computed_share_quantity or 0, ) # For "act" decisions: push order to broker queue if decision.decision == "act": order_job = { "trading_decision_id": decision.id, "ticker": decision.ticker, "action": rec.get("action", "buy"), "quantity": decision.computed_share_quantity, "order_type": "market", "source": "trading_engine", } if self.redis is not None: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info( "Pushed order for %s (%d shares) to broker queue", decision.ticker, decision.computed_share_quantity or 0, ) # Persist decision await self._persist_decision(decision) except Exception: logger.exception("Error evaluating recommendation %s", rec.get("recommendation_id", "?")) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in decision loop") if self.running: await asyncio.sleep(5) async def _stop_loss_monitor(self) -> None: """Monitor open positions for stop-loss and take-profit crossings. Task 28.1: Periodically checks current prices against stop levels and submits sell orders for triggered positions. """ while self.running: try: await asyncio.sleep(self.config.stop_loss_check_interval_seconds) if not self.running: break now = datetime.now(tz=timezone.utc) # Skip if not market hours if not is_market_open(now): continue if self.pool is None: continue # Load positions and stop levels from DB positions = await self._load_open_positions() stop_levels = await self._load_stop_levels() if not positions: continue # Fetch current prices tickers = [p.ticker for p in positions] prices = await self._fetch_current_prices(tickers) # Update last-price timestamps for tickers that returned data for ticker in tickers: if ticker in prices: self._last_price_timestamps[ticker] = now # Safety sell for missing price data (Task 28.4) for pos in positions: if pos.ticker not in prices: last_ts = self._last_price_timestamps.get(pos.ticker) if last_ts and (now - last_ts) > timedelta(minutes=15): logger.warning( "No price data for %s for >15 min — submitting safety sell", pos.ticker, ) await self._submit_sell_order( pos.ticker, pos.quantity, "safety_sell_missing_price" ) # Check crossings triggers = self.check_stop_loss_crossings(positions, prices, stop_levels) for trigger in triggers: # Find the position to get quantity pos_match = next((p for p in positions if p.ticker == trigger.ticker), None) if pos_match is None: continue await self._submit_sell_order( trigger.ticker, pos_match.quantity, f"{trigger.trigger_type}_triggered", ) logger.info( "Stop-loss monitor: %s triggered for %s at %.2f (trigger: %.2f)", trigger.trigger_type, trigger.ticker, trigger.current_price, trigger.trigger_price, ) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in stop-loss monitor") if self.running: await asyncio.sleep(5) async def _performance_loop(self) -> None: """Compute and update performance metrics periodically. Task 29.1: Runs every 5 minutes during market hours, computing portfolio metrics and updating self.portfolio_state. Task 29.2: Persists a daily snapshot at end of trading day. """ last_snapshot_date: str | None = None while self.running: try: await asyncio.sleep(300) # 5 minutes if not self.running: break now = datetime.now(tz=timezone.utc) # Skip if not market hours if not is_market_open(now): # Check if we should persist end-of-day snapshot (Task 29.2) from services.trading.trading_window import ET et_now = now.astimezone(ET) today_str = et_now.strftime("%Y-%m-%d") # After 4:00 PM ET and haven't snapshotted today if et_now.hour >= 16 and last_snapshot_date != today_str: await self._persist_daily_snapshot(now) last_snapshot_date = today_str continue # Compute metrics from current state if self.portfolio_state is None: continue # Update portfolio heat and metrics from current positions try: metrics = self.performance_computer.compute_metrics( closed_trades=[], portfolio_value=self.portfolio_state.total_value, active_pool=self.portfolio_state.active_pool, reserve_pool=self.portfolio_state.reserve_pool, daily_pnl=0.0, unrealized_pnl=sum( p.unrealized_pnl for p in self.portfolio_state.positions ), portfolio_heat=self.portfolio_state.portfolio_heat, daily_returns=[], ) logger.debug( "Performance update: value=%.2f heat=%.4f", metrics.total_portfolio_value, metrics.portfolio_heat, ) except Exception: logger.debug("Could not compute performance metrics") except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in performance loop") if self.running: await asyncio.sleep(5) async def _risk_tier_scheduler(self) -> None: """Evaluate risk tier at daily market close (16:00 ET). Task 30.1: Computes seconds until next 16:00 ET, sleeps until then, loads latest PerformanceMetrics, computes reserve_pct, calls evaluate_risk_tier(), and persists tier changes. """ from services.trading.trading_window import ET while self.running: try: # Compute seconds until next 16:00 ET now_utc = datetime.now(tz=timezone.utc) et_now = now_utc.astimezone(ET) target_today = et_now.replace(hour=16, minute=0, second=0, microsecond=0) if et_now >= target_today: # Already past 16:00 ET today — target tomorrow target = target_today + timedelta(days=1) else: target = target_today # Skip weekends while target.weekday() > 4: # Saturday=5, Sunday=6 target += timedelta(days=1) sleep_seconds = (target - et_now).total_seconds() if sleep_seconds > 0: await asyncio.sleep(sleep_seconds) if not self.running: break if self.portfolio_state is None: continue # Load latest PerformanceMetrics from portfolio_snapshots or compute fresh metrics: PerformanceMetrics | None = None if self.pool is not None: try: row = await self.pool.fetchrow( "SELECT metrics FROM portfolio_snapshots " "ORDER BY snapshot_date DESC LIMIT 1" ) if row and row["metrics"]: m = json.loads(row["metrics"]) if isinstance(row["metrics"], str) else row["metrics"] if m: metrics = PerformanceMetrics( total_portfolio_value=m.get("total_portfolio_value", self.portfolio_state.total_value), active_pool=m.get("active_pool", self.portfolio_state.active_pool), reserve_pool=m.get("reserve_pool", self.portfolio_state.reserve_pool), unrealized_pnl=m.get("unrealized_pnl", 0.0), realized_pnl=m.get("realized_pnl", 0.0), daily_pnl=m.get("daily_pnl", 0.0), win_count=m.get("win_count", 0), loss_count=m.get("loss_count", 0), win_rate=m.get("win_rate", 0.0), avg_win=m.get("avg_win", 0.0), avg_loss=m.get("avg_loss", 0.0), profit_factor=m.get("profit_factor", 0.0), sharpe_ratio=m.get("sharpe_ratio", 0.0), max_drawdown=m.get("max_drawdown", 0.0), current_drawdown_pct=m.get("current_drawdown_pct", 0.0), portfolio_heat=m.get("portfolio_heat", 0.0), ) except Exception: logger.debug("Could not load metrics from portfolio_snapshots") # Fall back to computing fresh metrics if metrics is None: metrics = self.performance_computer.compute_metrics( closed_trades=[], portfolio_value=self.portfolio_state.total_value, active_pool=self.portfolio_state.active_pool, reserve_pool=self.portfolio_state.reserve_pool, daily_pnl=0.0, unrealized_pnl=sum( p.unrealized_pnl for p in self.portfolio_state.positions ), portfolio_heat=self.portfolio_state.portfolio_heat, daily_returns=[], ) # Compute reserve_pct total_value = self.portfolio_state.total_value reserve_pct = ( self.portfolio_state.reserve_pool / total_value if total_value > 0 else 0.0 ) # Evaluate risk tier current_tier = self.config.risk_tier new_tier = self.evaluate_risk_tier(current_tier, metrics, reserve_pct) if new_tier is not None and new_tier != current_tier: # Persist to risk_tier_history if self.pool is not None: try: await self.pool.execute( "INSERT INTO risk_tier_history " "(previous_tier, new_tier, trigger_source, trigger_metrics, created_at) " "VALUES ($1, $2, $3, $4, $5)", current_tier, new_tier, "auto_adjustment", json.dumps({ "win_rate": metrics.win_rate, "current_drawdown_pct": metrics.current_drawdown_pct, "reserve_pct": reserve_pct, "sharpe_ratio": metrics.sharpe_ratio, }), datetime.now(tz=timezone.utc), ) except Exception: logger.debug("Could not persist risk tier change") # Update config and active tier self.config.risk_tier = new_tier self._active_risk_tier = RISK_TIER_DEFAULTS.get( new_tier, RISK_TIER_DEFAULTS["moderate"] ) # Create alert notification self.create_alert( "risk_tier_changed", f"Risk tier changed from {current_tier} to {new_tier} " f"(win_rate={metrics.win_rate:.2%}, " f"drawdown={metrics.current_drawdown_pct:.2%}, " f"reserve={reserve_pct:.2%})", ) logger.info( "Risk tier changed: %s → %s (win_rate=%.2f, drawdown=%.2f, reserve=%.2f)", current_tier, new_tier, metrics.win_rate, metrics.current_drawdown_pct, reserve_pct, ) except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in risk tier scheduler") if self.running: await asyncio.sleep(60) async def _rebalance_scheduler(self) -> None: """Evaluate portfolio rebalancing weekly at Monday 09:45 ET. Task 30.2: Computes seconds until next Monday 09:45 ET, sleeps until then, loads positions and risk tier, calls evaluate_rebalancing(), and pushes rebalance orders to the broker queue. """ from services.trading.trading_window import ET, WINDOW_OPEN while self.running: try: # Compute seconds until next Monday 09:45 ET now_utc = datetime.now(tz=timezone.utc) et_now = now_utc.astimezone(ET) # Find next Monday days_until_monday = (7 - et_now.weekday()) % 7 if days_until_monday == 0: # It's Monday — check if we're past 09:45 target_today = et_now.replace( hour=WINDOW_OPEN.hour, minute=WINDOW_OPEN.minute, second=0, microsecond=0, ) if et_now >= target_today: # Already past 09:45 on Monday — target next Monday days_until_monday = 7 else: days_until_monday = 0 target = et_now.replace( hour=WINDOW_OPEN.hour, minute=WINDOW_OPEN.minute, second=0, microsecond=0, ) + timedelta(days=days_until_monday) sleep_seconds = (target - et_now).total_seconds() if sleep_seconds > 0: await asyncio.sleep(sleep_seconds) if not self.running: break # Respect circuit breaker status if self.circuit_breaker.is_active(self._cb_state, now=datetime.now(tz=timezone.utc)): logger.info("Rebalance skipped — circuit breaker is active") continue if self.portfolio_state is None: continue # Load current positions positions = self.portfolio_state.positions if self.pool is not None: try: positions = await self._load_open_positions() except Exception: logger.debug("Could not load positions for rebalancing") if not positions: continue # Evaluate rebalancing max_positions = ( self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10 ) rebalance_orders = self.rebalancer.evaluate( positions, self._active_risk_tier, self.portfolio_state.active_pool, max_positions, ) # Push rebalance orders to broker queue for order in rebalance_orders: order_job = { "ticker": order.ticker, "action": order.action, "quantity": order.quantity, "order_type": "market", "source": "trading_engine", "reason": order.reason, "tag": order.tag, } if self.redis is not None: try: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info( "Rebalance: pushed %s order for %s (%d shares)", order.action, order.ticker, order.quantity, ) except Exception: logger.exception("Failed to push rebalance order for %s", order.ticker) else: logger.info( "Rebalance (no redis): %s %s %d shares — %s", order.action, order.ticker, order.quantity, order.reason, ) if rebalance_orders: logger.info("Rebalance cycle completed: %d orders generated", len(rebalance_orders)) else: logger.debug("Rebalance cycle completed: no orders needed") except asyncio.CancelledError: break except Exception: logger.exception("Unexpected error in rebalance scheduler") if self.running: await asyncio.sleep(60) # ------------------------------------------------------------------ # Async helpers # ------------------------------------------------------------------ async def _persist_decision(self, decision: TradingDecision) -> None: """INSERT a trading decision into the trading_decisions table. Task 27.5: Handles pool=None gracefully (skip persistence, log only). """ logger.info( "Decision: %s %s ticker=%s reason=%s", decision.decision, decision.id[:8], decision.ticker, decision.skip_reason or "—", ) if self.pool is None: return try: await self.pool.execute( "INSERT INTO trading_decisions " "(id, recommendation_id, decision, skip_reason, ticker, " "computed_position_size, computed_share_quantity, " "risk_tier_at_decision, portfolio_heat_at_decision, " "active_pool_at_decision, reserve_pool_at_decision, " "circuit_breaker_status, correlation_check_result, " "sector_exposure_check_result, earnings_proximity_flag, " "is_micro_trade, decision_trace, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, " "$11, $12, $13, $14, $15, $16, $17, $18)", decision.id, decision.recommendation_id, decision.decision, decision.skip_reason, decision.ticker, decision.computed_position_size, decision.computed_share_quantity, decision.risk_tier_at_decision, decision.portfolio_heat_at_decision, decision.active_pool_at_decision, decision.reserve_pool_at_decision, decision.circuit_breaker_status, json.dumps(decision.correlation_check_result), json.dumps(decision.sector_exposure_check_result), decision.earnings_proximity_flag, decision.is_micro_trade, json.dumps(decision.decision_trace), decision.created_at, ) except Exception: logger.debug("Could not persist decision %s — table may not exist", decision.id[:8]) async def _sync_positions_and_siphon(self) -> None: """Sync positions from DB and siphon profit on closed positions. Task 27.4: Fetches current positions, detects closes, and calls siphon_profit() for profitable closes. """ if self.pool is None or self.portfolio_state is None: return try: positions = await self._load_open_positions() old_tickers = {p.ticker for p in self.portfolio_state.positions} new_tickers = {p.ticker for p in positions} # Detect closed positions closed_tickers = old_tickers - new_tickers for ticker in closed_tickers: old_pos = next( (p for p in self.portfolio_state.positions if p.ticker == ticker), None, ) if old_pos and old_pos.unrealized_pnl > 0: transfer, new_balance = self.reserve_pool_controller.siphon_profit( old_pos.unrealized_pnl, self.portfolio_state.reserve_pool, ) if transfer > 0: self.portfolio_state.reserve_pool = new_balance # Persist to reserve_pool_ledger try: await self.pool.execute( "INSERT INTO reserve_pool_ledger " "(amount, balance_after, trigger_type, reference_id, notes, created_at) " "VALUES ($1, $2, 'profit_siphon', $3, $4, $5)", transfer, new_balance, ticker, f"Siphoned from {ticker} close", datetime.now(tz=timezone.utc), ) except Exception: logger.debug("Could not persist siphon event for %s", ticker) logger.info( "Siphoned $%.2f from %s close → reserve now $%.2f", transfer, ticker, new_balance, ) # Update portfolio state self.portfolio_state.positions = positions self.portfolio_state.open_position_count = len(positions) except Exception: logger.exception("Error syncing positions") async def _fetch_current_prices(self, tickers: list[str]) -> dict[str, float]: """Fetch latest prices from Polygon API for the given tickers. Task 28.2: Uses httpx for async HTTP calls. Returns a dict mapping ticker → latest price. Handles API errors gracefully. """ if not tickers: return {} prices: dict[str, float] = {} # Use the market data config for API key api_key = "" base_url = "https://api.polygon.io" try: from services.shared.config import load_config app_config = load_config() api_key = app_config.market_data.api_key base_url = app_config.market_data.base_url except Exception: pass if not api_key: logger.debug("No Polygon API key configured — skipping price fetch") return prices try: async with httpx.AsyncClient(timeout=10.0) as client: # Use the grouped daily endpoint or snapshot for multiple tickers tickers_str = ",".join(tickers) url = f"{base_url}/v2/snapshot/locale/us/markets/stocks/tickers" params = {"tickers": tickers_str, "apiKey": api_key} resp = await client.get(url, params=params) if resp.status_code == 200: data = resp.json() for item in data.get("tickers", []): t = item.get("ticker", "") last_trade = item.get("lastTrade", {}) price = last_trade.get("p", 0.0) if t and price > 0: prices[t] = price else: logger.warning("Polygon API returned status %d", resp.status_code) except Exception: logger.warning("Failed to fetch prices from Polygon API") return prices async def _load_open_positions(self) -> list[OpenPosition]: """Load open positions from the database. Task 28.3: Queries the position_stop_levels table for active positions. Returns typed OpenPosition list. """ if self.pool is None: return [] positions: list[OpenPosition] = [] try: rows = await self.pool.fetch( "SELECT ticker, entry_price, stop_loss_price, take_profit_price, " "signal_confidence, is_micro_trade " "FROM position_stop_levels WHERE active = TRUE" ) for row in rows: positions.append( OpenPosition( ticker=row["ticker"], quantity=1, # Default; real quantity from orders table entry_price=float(row["entry_price"]), current_price=float(row["entry_price"]), unrealized_pnl=0.0, market_value=float(row["entry_price"]), sector="", stop_loss_price=float(row["stop_loss_price"]), take_profit_price=float(row["take_profit_price"]), signal_confidence=float(row["signal_confidence"]), is_micro_trade=bool(row["is_micro_trade"]), ) ) except Exception: logger.debug("Could not load open positions — table may not exist") return positions async def _load_stop_levels(self) -> dict[str, StopLevels]: """Load active stop-loss/take-profit levels from the database. Task 28.3: Queries position_stop_levels WHERE active = TRUE. Returns dict keyed by ticker. """ if self.pool is None: return {} levels: dict[str, StopLevels] = {} try: rows = await self.pool.fetch( "SELECT ticker, stop_loss_price, take_profit_price, " "trailing_stop_active, atr_value, atr_multiplier, " "reward_risk_ratio, updated_at " "FROM position_stop_levels WHERE active = TRUE" ) for row in rows: levels[row["ticker"]] = StopLevels( stop_loss_price=float(row["stop_loss_price"]), take_profit_price=float(row["take_profit_price"]), trailing_stop_active=bool(row["trailing_stop_active"]), atr_value=float(row["atr_value"]), atr_multiplier=float(row["atr_multiplier"]), reward_risk_ratio=float(row["reward_risk_ratio"]), ) return levels except Exception: logger.debug("Could not load stop levels — table may not exist") return {} async def _submit_sell_order( self, ticker: str, quantity: int, reason: str ) -> None: """Push a sell order to the broker queue via Redis.""" order_job = { "ticker": ticker, "action": "sell", "quantity": quantity, "order_type": "market", "source": "trading_engine", "reason": reason, } if self.redis is not None: try: broker_queue = queue_key(QUEUE_BROKER) await self.redis.rpush(broker_queue, json.dumps(order_job)) logger.info("Submitted sell order for %s (%d shares): %s", ticker, quantity, reason) except Exception: logger.exception("Failed to push sell order for %s", ticker) else: logger.info("Sell order (no redis): %s %d shares — %s", ticker, quantity, reason) async def _persist_daily_snapshot(self, now: datetime) -> None: """Persist end-of-day portfolio snapshot to portfolio_snapshots table. Task 29.2: Called after 4:00 PM ET when market closes. """ if self.pool is None or self.portfolio_state is None: return from services.trading.trading_window import ET et_now = now.astimezone(ET) snapshot_date = et_now.date() try: await self.pool.execute( "INSERT INTO portfolio_snapshots " "(snapshot_date, portfolio_value, active_pool, reserve_pool, " "daily_return, cumulative_return, unrealized_pnl, realized_pnl, " "win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, " "current_drawdown_pct, portfolio_heat, risk_tier, " "positions, metrics, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, " "$11, $12, $13, $14, $15, $16, $17, $18, $19) " "ON CONFLICT (snapshot_date) DO UPDATE SET " "portfolio_value = EXCLUDED.portfolio_value, " "active_pool = EXCLUDED.active_pool, " "reserve_pool = EXCLUDED.reserve_pool, " "updated_at = NOW()", snapshot_date, self.portfolio_state.total_value, self.portfolio_state.active_pool, self.portfolio_state.reserve_pool, 0.0, # daily_return 0.0, # cumulative_return sum(p.unrealized_pnl for p in self.portfolio_state.positions), 0.0, # realized_pnl 0, # win_count 0, # loss_count 0.0, # win_rate 0.0, # sharpe_ratio 0.0, # max_drawdown 0.0, # current_drawdown_pct self.portfolio_state.portfolio_heat, self.config.risk_tier, json.dumps([]), # positions json.dumps({}), # metrics now, ) logger.info("Persisted daily snapshot for %s", snapshot_date) except Exception: logger.debug("Could not persist daily snapshot — table may not exist") # ------------------------------------------------------------------ # Decision builders # ------------------------------------------------------------------ def _skip_decision( self, *, rec_id: str | None, ticker: str, skip_reason: str, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, circuit_breaker_state: CircuitBreakerState, reasoning: list[str], now: datetime, ) -> TradingDecision: return TradingDecision( id=str(uuid.uuid4()), recommendation_id=rec_id, decision="skip", skip_reason=skip_reason, ticker=ticker, computed_position_size=None, computed_share_quantity=None, risk_tier_at_decision=risk_tier.name, portfolio_heat_at_decision=portfolio_state.portfolio_heat, active_pool_at_decision=portfolio_state.active_pool, reserve_pool_at_decision=portfolio_state.reserve_pool, circuit_breaker_status="active" if circuit_breaker_state.active else "inactive", decision_trace={"reasoning": reasoning}, created_at=now, ) def _act_decision( self, *, rec_id: str | None, ticker: str, size_result: PositionSizeResult, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, circuit_breaker_state: CircuitBreakerState, reasoning: list[str], now: datetime, ) -> TradingDecision: return TradingDecision( id=str(uuid.uuid4()), recommendation_id=rec_id, decision="act", skip_reason=None, ticker=ticker, computed_position_size=size_result.dollar_amount, computed_share_quantity=size_result.share_quantity, risk_tier_at_decision=risk_tier.name, portfolio_heat_at_decision=portfolio_state.portfolio_heat, active_pool_at_decision=portfolio_state.active_pool, reserve_pool_at_decision=portfolio_state.reserve_pool, circuit_breaker_status="active" if circuit_breaker_state.active else "inactive", decision_trace={"reasoning": reasoning}, created_at=now, )