"""Backtest replay for the autonomous trading engine. Task 32: Fetches historical recommendations from the database, simulates the decision logic chronologically using evaluate_recommendation(), tracks simulated positions and equity curve, and persists results to backtest_runs and backtest_trades tables. Supports a validation mode (Requirements 15.1–15.5) that generates prediction snapshots and evaluates outcomes using only data available at each historical point in time, preventing future data leakage. """ from __future__ import annotations import json import logging import uuid from datetime import date, datetime, timedelta, timezone from services.trading.backtester import BacktestConfig, BacktestResult from services.trading.correlation import CorrelationMatrix from services.trading.engine import TradingEngine from services.trading.models import ( RISK_TIER_DEFAULTS, CircuitBreakerState, ClosedTrade, PortfolioState, ) from services.trading.performance_tracker import PerformanceComputer logger = logging.getLogger("trading_engine.backtest") class BacktestReplay: """Replays historical recommendations through the trading engine logic. Accepts an asyncpg pool for database access. The ``run()`` method fetches historical data, simulates decisions chronologically, and persists results. """ def __init__(self, pool: object) -> None: self.pool = pool self._perf = PerformanceComputer() async def run( self, config: BacktestConfig, backtest_id: str | None = None, validation_mode: bool = False, ) -> BacktestResult: """Execute a full backtest replay. Args: config: Backtest configuration (date range, capital, risk tier). backtest_id: Optional pre-generated ID. If not provided, one is generated. validation_mode: When True, creates prediction snapshots for each historical recommendation using only data available at that point in time, evaluates outcomes, and computes model metrics over the backtest period. Snapshots are tagged with the backtest_id. (Requirements 15.1–15.5) Returns: BacktestResult with metrics, trade log, and equity curve. """ if backtest_id is None: backtest_id = str(uuid.uuid4()) try: # Fetch historical recommendations recs = await self._fetch_recommendations(config.start_date, config.end_date) # Set up simulated state risk_tier = RISK_TIER_DEFAULTS.get( config.risk_tier, RISK_TIER_DEFAULTS["moderate"] ) portfolio_state = PortfolioState( total_value=config.initial_capital, cash=config.initial_capital, active_pool=config.initial_capital, reserve_pool=0.0, ) cb_state = CircuitBreakerState() correlation_matrix = CorrelationMatrix() earnings_calendar: dict = {} # Create a lightweight engine for evaluate_recommendation from services.shared.config import TradingConfig engine_config = TradingConfig( risk_tier=config.risk_tier, absolute_position_cap=config.initial_capital * 0.10, active_pool_minimum=config.initial_capital * 0.20, ) engine = TradingEngine(pool=None, redis=None, config=engine_config) # Simulation state simulated_positions: dict[str, dict] = {} # ticker -> position info closed_trades: list[ClosedTrade] = [] equity_curve: list[dict] = [] daily_returns: list[float] = [] prev_value = config.initial_capital trade_log: list[dict] = [] validation_snapshot_ids: list[str] = [] # track snapshot IDs for validation mode # Pre-load company sectors and latest prices for enrichment company_sectors: dict[str, str] = {} company_prices: dict[str, float] = {} if self.pool is not None: try: sector_rows = await self.pool.fetch( "SELECT ticker, sector FROM companies WHERE active = TRUE" ) for sr in sector_rows: company_sectors[sr["ticker"]] = sr["sector"] or "Unknown" except Exception: logger.debug("Could not load company sectors") # Load latest market prices (use most recent close from market_snapshots JSONB) try: price_rows = await self.pool.fetch( "SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float as close_price " "FROM market_snapshots WHERE snapshot_type = 'bar' " "ORDER BY ticker, captured_at DESC" ) for pr in price_rows: if pr["close_price"]: company_prices[pr["ticker"]] = float(pr["close_price"]) except Exception: logger.debug("Could not load market prices — using portfolio_pct fallback") # Group recommendations by date recs_by_date: dict[date, list[dict]] = {} for rec in recs: rec_date = rec.get("generated_at", datetime.now(tz=timezone.utc)) if isinstance(rec_date, datetime): d = rec_date.date() else: d = rec_date # Enrich rec with price and sector if missing ticker = rec.get("ticker", "") if "current_price" not in rec or not rec.get("current_price"): rec["current_price"] = company_prices.get(ticker, 50.0) if "sector" not in rec or not rec.get("sector"): rec["sector"] = company_sectors.get(ticker, "Unknown") # Map 'id' to 'recommendation_id' for evaluate_recommendation() if "recommendation_id" not in rec and "id" in rec: rec["recommendation_id"] = str(rec["id"]) # Ensure confidence is a float if rec.get("confidence") is not None: rec["confidence"] = float(rec["confidence"]) recs_by_date.setdefault(d, []).append(rec) # Iterate through each trading day current_date = config.start_date while current_date <= config.end_date: # Skip weekends if current_date.weekday() > 4: current_date += timedelta(days=1) continue day_recs = recs_by_date.get(current_date, []) act_count = 0 # Process recommendations for this day for rec in day_recs: # Set a timestamp within trading window for evaluation # Use 11:00 AM ET (within trading window) for simulation from services.trading.trading_window import ET sim_time = datetime( current_date.year, current_date.month, current_date.day, 11, 0, 0, tzinfo=ET, ) decision = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio_state, risk_tier=risk_tier, circuit_breaker_state=cb_state, correlation_matrix=correlation_matrix, earnings_calendar=earnings_calendar, now=sim_time, ) # --- Validation mode: create prediction snapshot (Req 15.1, 15.2, 15.4) --- if validation_mode and self.pool is not None: try: snapshot_id = await self._create_validation_snapshot( rec=rec, sim_time=sim_time, backtest_id=backtest_id, company_sectors=company_sectors, ) if snapshot_id is not None: validation_snapshot_ids.append(snapshot_id) except Exception: logger.warning( "Validation snapshot failed for %s at %s, continuing backtest", rec.get("ticker", "?"), sim_time, exc_info=True, ) if decision.decision == "act": act_count += 1 ticker = decision.ticker price = rec.get("current_price", 0.0) qty = decision.computed_share_quantity or 0 if qty > 0 and price > 0 and ticker not in simulated_positions: cost = price * qty if cost <= portfolio_state.active_pool: simulated_positions[ticker] = { "entry_price": price, "quantity": qty, "entry_date": current_date, "sector": rec.get("sector", ""), "recommendation_id": str( rec.get("recommendation_id", rec.get("id", "")) ), } portfolio_state.active_pool -= cost portfolio_state.open_position_count += 1 elif act_count == 0 and not hasattr(self, '_first_skip_logged'): # Log the first skip reason for debugging logger.warning( "Backtest first skip: ticker=%s reason=%s conf=%.2f price=%.2f pool=%.2f", decision.ticker, decision.skip_reason, rec.get("confidence", 0.0), rec.get("current_price", 0.0), portfolio_state.active_pool, ) self._first_skip_logged = True if day_recs: logger.warning( "Backtest day %s: %d recs, %d act, positions=%d, pool=$%.2f", current_date, len(day_recs), act_count, len(simulated_positions), portfolio_state.active_pool, ) # Simulate simple exit logic: close positions held > 5 days # or use actual market price if available tickers_to_close = [] for ticker, pos_info in simulated_positions.items(): hold_days = (current_date - pos_info["entry_date"]).days if hold_days >= 5: tickers_to_close.append(ticker) for ticker in tickers_to_close: pos_info = simulated_positions.pop(ticker) # Use actual market price if available, otherwise estimate exit_price = company_prices.get(ticker, pos_info["entry_price"] * 1.01) qty = pos_info["quantity"] pnl = (exit_price - pos_info["entry_price"]) * qty pnl_pct = ( (exit_price - pos_info["entry_price"]) / pos_info["entry_price"] if pos_info["entry_price"] > 0 else 0.0 ) hold_duration = timedelta( days=(current_date - pos_info["entry_date"]).days ) trade = ClosedTrade( ticker=ticker, entry_price=pos_info["entry_price"], exit_price=exit_price, quantity=qty, pnl=pnl, pnl_pct=pnl_pct, hold_duration=hold_duration, recommendation_id=pos_info.get("recommendation_id"), ) closed_trades.append(trade) trade_log.append(self._perf.compute_trade_metrics(trade)) # Return capital to active pool portfolio_state.active_pool += exit_price * qty portfolio_state.open_position_count = max( 0, portfolio_state.open_position_count - 1 ) # Compute daily portfolio value using latest market prices positions_value = sum( company_prices.get(t, p["entry_price"]) * p["quantity"] for t, p in simulated_positions.items() ) current_value = portfolio_state.active_pool + positions_value portfolio_state.total_value = current_value # Daily return daily_ret = ( (current_value - prev_value) / prev_value if prev_value > 0 else 0.0 ) daily_returns.append(daily_ret) prev_value = current_value equity_curve.append({ "date": current_date.isoformat(), "portfolio_value": round(current_value, 2), }) current_date += timedelta(days=1) # Force-close any remaining open positions at end of backtest # using the latest available market prices for ticker in list(simulated_positions.keys()): pos_info = simulated_positions.pop(ticker) exit_price = company_prices.get(ticker, pos_info["entry_price"]) qty = pos_info["quantity"] pnl = (exit_price - pos_info["entry_price"]) * qty pnl_pct = ( (exit_price - pos_info["entry_price"]) / pos_info["entry_price"] if pos_info["entry_price"] > 0 else 0.0 ) hold_duration = timedelta( days=(config.end_date - pos_info["entry_date"]).days ) trade = ClosedTrade( ticker=ticker, entry_price=pos_info["entry_price"], exit_price=exit_price, quantity=qty, pnl=pnl, pnl_pct=pnl_pct, hold_duration=hold_duration, recommendation_id=pos_info.get("recommendation_id"), ) closed_trades.append(trade) trade_log.append(self._perf.compute_trade_metrics(trade)) portfolio_state.active_pool += exit_price * qty if closed_trades: logger.info( "Backtest %s completed: %d trades, final value=$%.2f", backtest_id, len(closed_trades), portfolio_state.active_pool, ) # Compute final metrics metrics = self._perf.compute_metrics( closed_trades=closed_trades, portfolio_value=portfolio_state.total_value, active_pool=portfolio_state.active_pool, reserve_pool=portfolio_state.reserve_pool, daily_pnl=0.0, unrealized_pnl=0.0, portfolio_heat=0.0, daily_returns=daily_returns, ) total_return = ( (portfolio_state.total_value - config.initial_capital) / config.initial_capital if config.initial_capital > 0 else 0.0 ) result = BacktestResult( backtest_id=backtest_id, config=config, total_return=total_return, sharpe_ratio=metrics.sharpe_ratio, max_drawdown=metrics.max_drawdown, win_rate=metrics.win_rate, profit_factor=metrics.profit_factor, trade_count=len(closed_trades), trade_log=trade_log, equity_curve=equity_curve, ) # Persist results await self._persist_results(result, closed_trades) # --- Validation mode: evaluate outcomes and compute metrics (Req 15.3, 15.5) --- if validation_mode and self.pool is not None and validation_snapshot_ids: await self._run_validation_evaluation(backtest_id) return result except Exception as exc: logger.exception("Backtest %s failed", backtest_id) # Persist partial results with failed status await self._persist_failed_run(backtest_id, config, str(exc)) raise # ------------------------------------------------------------------ # Validation mode helpers (Requirements 15.1–15.5) # ------------------------------------------------------------------ # SQL to fetch the close price at or before a specific time — prevents # future data leakage by only returning data available at that point. _CLOSE_AT_TIME_SQL = """ SELECT (data->>'c')::float AS close FROM market_snapshots WHERE ticker = $1 AND snapshot_type = 'bar' AND data->>'c' IS NOT NULL AND captured_at <= $2 ORDER BY captured_at DESC LIMIT 1 """ _COMPANY_SECTOR_SQL = """ SELECT sector FROM companies WHERE ticker = $1 AND active = TRUE LIMIT 1 """ _SECTOR_ETF_MAP: dict[str, str] = { "Technology": "XLK", "Consumer Cyclical": "XLY", "Financial Services": "XLF", "Healthcare": "XLV", "Energy": "XLE", "Communication Services": "XLC", "Industrials": "XLI", "Consumer Defensive": "XLP", "Real Estate": "XLRE", "Utilities": "XLU", } async def _fetch_close_at_time( self, ticker: str, target_time: datetime, ) -> float | None: """Fetch the close price for *ticker* at or before *target_time*. Ensures no future data leakage — only market data with ``captured_at <= target_time`` is considered (Requirement 15.4). """ if self.pool is None: return None row = await self.pool.fetchrow(self._CLOSE_AT_TIME_SQL, ticker, target_time) if row is None: return None return row["close"] async def _create_validation_snapshot( self, rec: dict, sim_time: datetime, backtest_id: str, company_sectors: dict[str, str], ) -> str | None: """Create a prediction snapshot using only data available at *sim_time*. Fetches ticker, SPY, and sector ETF prices as of *sim_time* to prevent future data leakage (Requirements 15.1, 15.2, 15.4). The snapshot is tagged with *backtest_id* in its metadata field (Requirement 15.5). Returns the snapshot UUID on success, or ``None`` on failure. """ from services.validation.prediction_snapshot import ( SECTOR_ETF_MAP, ) ticker = rec.get("ticker", "") if not ticker: return None # Fetch prices using only data available at sim_time (Req 15.4) ticker_price = await self._fetch_close_at_time(ticker, sim_time) spy_price = await self._fetch_close_at_time("SPY", sim_time) # Sector ETF price sector = company_sectors.get(ticker) sector_etf_ticker = SECTOR_ETF_MAP.get(sector) if sector else None sector_etf_price: float | None = None if sector_etf_ticker is not None: sector_etf_price = await self._fetch_close_at_time( sector_etf_ticker, sim_time ) snapshot_id = str(uuid.uuid4()) # Build metadata tagged with backtest_id (Req 15.5) metadata: dict = { "backtest_id": backtest_id, "source": "backtest_validation", } # Map recommendation fields to snapshot columns direction = rec.get("direction", rec.get("trend_direction", "neutral")) action = rec.get("action", "watch") mode = rec.get("mode", "informational") confidence = float(rec.get("confidence", 0.5)) strength = float(rec.get("strength", rec.get("trend_strength", 0.5))) contradiction = float(rec.get("contradiction", rec.get("contradiction_score", 0.0))) p_bull = rec.get("p_bull") if p_bull is not None: p_bull = float(p_bull) p_bear = (1.0 - p_bull) if p_bull is not None else None window = rec.get("window", rec.get("trend_window", "7d")) horizon = rec.get("time_horizon", rec.get("horizon", "7d")) # Insert the snapshot directly — we bypass create_prediction_snapshot() # because that function fetches *latest* prices (not point-in-time). insert_sql = """ INSERT INTO prediction_snapshots ( id, generated_at, ticker, "window", horizon, direction, action, mode, strength, confidence, contradiction, p_bull, p_bear, score_company, score_macro, score_competitive, evidence_count, unique_source_count, duplicate_evidence_count, price_at_prediction, spy_price_at_prediction, sector_etf_price_at_prediction, metadata ) VALUES ( $1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23::jsonb ) """ await self.pool.execute( insert_sql, snapshot_id, sim_time, ticker, str(window), str(horizon), str(direction), str(action), str(mode), strength, confidence, contradiction, p_bull, p_bear, float(rec.get("score_company", 0.0)), float(rec.get("score_macro", 0.0)), float(rec.get("score_competitive", 0.0)), int(rec.get("evidence_count", 0)), int(rec.get("unique_source_count", 0)), int(rec.get("duplicate_evidence_count", 0)), ticker_price, spy_price, sector_etf_price, json.dumps(metadata), ) logger.debug( "Validation snapshot %s created for %s at %s (backtest %s)", snapshot_id, ticker, sim_time, backtest_id, ) return snapshot_id async def _run_validation_evaluation(self, backtest_id: str) -> None: """Evaluate prediction outcomes and compute metrics for the backtest. Calls the outcome evaluator and metrics engine after the backtest completes (Requirements 15.3, 15.5). Failures are logged but do not block the backtest result. """ from services.validation.metrics import compute_and_store_metric_snapshots from services.validation.outcome_evaluator import evaluate_matured_predictions # Step 1: Evaluate matured predictions (Req 15.3) try: outcomes_count = await evaluate_matured_predictions(self.pool) logger.info( "Backtest %s validation: %d prediction outcomes evaluated", backtest_id, outcomes_count, ) except Exception: logger.warning( "Backtest %s: outcome evaluation failed, continuing", backtest_id, exc_info=True, ) # Step 2: Compute and store metric snapshots (Req 15.5) try: snapshots = await compute_and_store_metric_snapshots(self.pool) logger.info( "Backtest %s validation: %d metric snapshots computed", backtest_id, len(snapshots), ) except Exception: logger.warning( "Backtest %s: metric snapshot computation failed, continuing", backtest_id, exc_info=True, ) # ------------------------------------------------------------------ # Database helpers # ------------------------------------------------------------------ async def _fetch_recommendations( self, start_date: date, end_date: date ) -> list[dict]: """Fetch historical recommendations for the date range.""" if self.pool is None: return [] try: start_dt = datetime( start_date.year, start_date.month, start_date.day, tzinfo=timezone.utc, ) end_dt = datetime( end_date.year, end_date.month, end_date.day, 23, 59, 59, tzinfo=timezone.utc, ) rows = await self.pool.fetch( "SELECT * FROM recommendations " "WHERE generated_at BETWEEN $1 AND $2 " "AND action IN ('buy', 'sell') " "ORDER BY generated_at ASC", start_dt, end_dt, ) return [dict(r) for r in rows] except Exception: logger.debug("Could not fetch historical recommendations — table may not exist") return [] async def _persist_results( self, result: BacktestResult, trades: list[ClosedTrade] ) -> None: """Persist backtest results to backtest_runs and backtest_trades.""" if self.pool is None: return try: await self.pool.execute( "INSERT INTO backtest_runs " "(id, start_date, end_date, initial_capital, risk_tier, " "config, total_return, sharpe_ratio, max_drawdown, " "win_rate, profit_factor, trade_count, equity_curve, " "status, completed_at, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, " "$11, $12, $13, $14, $15, $16)", result.backtest_id, result.config.start_date, result.config.end_date, result.config.initial_capital, result.config.risk_tier, json.dumps({}), result.total_return, result.sharpe_ratio, result.max_drawdown, result.win_rate, result.profit_factor, result.trade_count, json.dumps(result.equity_curve), "completed", datetime.now(tz=timezone.utc), datetime.now(tz=timezone.utc), ) # Persist individual trades for trade in trades: await self.pool.execute( "INSERT INTO backtest_trades " "(backtest_id, ticker, side, entry_price, exit_price, " "quantity, pnl, entry_date, exit_date, " "hold_duration_days, recommendation_id) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)", result.backtest_id, trade.ticker, "buy", trade.entry_price, trade.exit_price, trade.quantity, trade.pnl, datetime.now(tz=timezone.utc), # simplified datetime.now(tz=timezone.utc), trade.hold_duration.days, trade.recommendation_id, ) except Exception: logger.debug("Could not persist backtest results — tables may not exist") async def _persist_failed_run( self, backtest_id: str, config: BacktestConfig, error: str ) -> None: """Persist a failed backtest run.""" if self.pool is None: return try: await self.pool.execute( "INSERT INTO backtest_runs " "(id, start_date, end_date, initial_capital, risk_tier, " "config, status, created_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", backtest_id, config.start_date, config.end_date, config.initial_capital, config.risk_tier, json.dumps({"error": error}), "failed", datetime.now(tz=timezone.utc), ) except Exception: logger.debug("Could not persist failed backtest run")