"""Section builders for trading performance reports. Each builder takes a CollectedData bundle and returns a typed Pydantic section model. All builders handle zero-activity gracefully by returning zero values and empty lists when no data is available. """ from __future__ import annotations import logging from datetime import datetime, timezone from services.reporting.collector import CollectedData from services.reporting.models import ( ModelQualitySection, ModelQualityWindow, PLSection, PositionDetail, PositionPerformanceSection, RecommendationAccuracySection, RiskMetricsSection, ) logger = logging.getLogger(__name__) def build_pnl_section(data: CollectedData) -> PLSection: """Build P&L section from collected data. Computes realized/unrealized P&L, daily return, cumulative return, win/loss counts, win rate, profit factor, and Sharpe ratio from portfolio_snapshot and closed positions. """ snap = data.portfolio_snapshot if snap is None: return PLSection( realized_pnl=0.0, unrealized_pnl=0.0, daily_return=0.0, cumulative_return=0.0, win_count=0, loss_count=0, win_rate=0.0, profit_factor=0.0, sharpe_ratio=0.0, ) # Compute profit factor from closed positions: # sum of gains / abs(sum of losses) gains = 0.0 losses = 0.0 for pos in data.closed_positions: rpnl = float(pos.get("realized_pnl", 0) or 0) if rpnl > 0: gains += rpnl elif rpnl < 0: losses += abs(rpnl) profit_factor = (gains / losses) if losses > 0 else 0.0 return PLSection( realized_pnl=float(snap.get("realized_pnl", 0) or 0), unrealized_pnl=float(snap.get("unrealized_pnl", 0) or 0), daily_return=float(snap.get("daily_return", 0) or 0), cumulative_return=float(snap.get("cumulative_return", 0) or 0), win_count=int(snap.get("win_count", 0) or 0), loss_count=int(snap.get("loss_count", 0) or 0), win_rate=float(snap.get("win_rate", 0) or 0), profit_factor=profit_factor, sharpe_ratio=float(snap.get("sharpe_ratio", 0) or 0), ) def build_recommendation_accuracy_section( data: CollectedData, ) -> RecommendationAccuracySection: """Build recommendation accuracy section. Joins trading_decisions with prediction_outcomes to compute act/skip breakdown, win rate of acted recommendations, and average confidence of acted vs skipped. """ if not data.trading_decisions: return RecommendationAccuracySection( total_evaluated=0, act_count=0, skip_count=0, acted_win_rate=0.0, avg_confidence_acted=0.0, avg_confidence_skipped=0.0, ) # Build lookup: recommendation_id -> prediction_outcome # prediction_outcomes are joined with prediction_snapshots in the collector, # so they carry ticker, direction, action, confidence from the snapshot. # trading_decisions reference recommendations via recommendation_id. # We need to match trading_decisions -> recommendations -> prediction_outcomes. # # The collector fetches prediction_outcomes joined with prediction_snapshots # (po.prediction_id = ps.id). Trading decisions reference recommendation_id. # Recommendations and prediction_snapshots share the same ticker, so we # match by recommendation_id on the trading_decision side. # Build recommendation_id -> recommendation dict for confidence lookup rec_by_id: dict[str, dict] = {} for rec in data.recommendations: rec_id = str(rec.get("id", "")) if rec_id: rec_by_id[rec_id] = rec # Build prediction_id -> prediction_outcome for profitability lookup # We also need to map recommendation_id -> prediction_outcome. # The link is: trading_decision.recommendation_id -> recommendation.id # and prediction_outcome has ticker from prediction_snapshots. # We match by ticker between recommendation and prediction_outcome. outcome_by_ticker: dict[str, list[dict]] = {} for po in data.prediction_outcomes: ticker = po.get("ticker", "") if ticker: outcome_by_ticker.setdefault(ticker, []).append(po) act_count = 0 skip_count = 0 acted_wins = 0 acted_total_with_outcome = 0 confidence_acted: list[float] = [] confidence_skipped: list[float] = [] for td in data.trading_decisions: decision = str(td.get("decision", "")).lower() rec_id = str(td.get("recommendation_id", "")) rec = rec_by_id.get(rec_id, {}) conf = rec.get("confidence") ticker = td.get("ticker", "") if decision == "act": act_count += 1 if conf is not None: confidence_acted.append(float(conf)) # Check profitability from prediction_outcomes for this ticker ticker_outcomes = outcome_by_ticker.get(ticker, []) if ticker_outcomes: # Use the most recent outcome for this ticker latest = ticker_outcomes[-1] acted_total_with_outcome += 1 if latest.get("profitable"): acted_wins += 1 else: skip_count += 1 if conf is not None: confidence_skipped.append(float(conf)) total_evaluated = act_count + skip_count acted_win_rate = ( (acted_wins / acted_total_with_outcome) if acted_total_with_outcome > 0 else 0.0 ) avg_confidence_acted = ( (sum(confidence_acted) / len(confidence_acted)) if confidence_acted else 0.0 ) avg_confidence_skipped = ( (sum(confidence_skipped) / len(confidence_skipped)) if confidence_skipped else 0.0 ) return RecommendationAccuracySection( total_evaluated=total_evaluated, act_count=act_count, skip_count=skip_count, acted_win_rate=acted_win_rate, avg_confidence_acted=avg_confidence_acted, avg_confidence_skipped=avg_confidence_skipped, ) def build_position_performance_section( data: CollectedData, ) -> PositionPerformanceSection: """Build position performance section. Lists each position (open and closed) with entry price, current/exit price, P&L, P&L%, and hold duration. """ positions: list[PositionDetail] = [] now = datetime.now(timezone.utc) # Open positions for pos in data.open_positions: entry_price = float(pos.get("avg_entry_price", 0) or 0) current_price = float(pos.get("current_price", 0) or 0) quantity = float(pos.get("quantity", 0) or 0) pnl = (current_price - entry_price) * quantity cost_basis = entry_price * quantity pnl_pct = (pnl / cost_basis * 100) if cost_basis > 0 else 0.0 # Hold duration from updated_at to now updated_at = pos.get("updated_at") hold_hours = _compute_hold_hours(updated_at, now) positions.append( PositionDetail( ticker=pos.get("ticker", ""), entry_price=entry_price, current_or_exit_price=current_price, pnl=pnl, pnl_pct=pnl_pct, hold_duration_hours=hold_hours, status="open", ) ) # Closed positions for pos in data.closed_positions: entry_price = float(pos.get("avg_entry_price", 0) or 0) current_price = float(pos.get("current_price", 0) or 0) realized_pnl = float(pos.get("realized_pnl", 0) or 0) cost_basis = entry_price * float(pos.get("quantity", 0) or 0) # For closed positions, quantity is 0 in the DB, so use realized_pnl # directly. P&L% is based on the original cost basis which we can # approximate from entry_price and the realized_pnl. # If entry_price is available, compute pnl_pct from realized_pnl / cost. # Since quantity=0 for closed, we estimate original quantity from # realized_pnl and price difference, or just use realized_pnl directly. if entry_price > 0 and current_price != entry_price: # Estimate original quantity from realized_pnl / (exit - entry) price_diff = current_price - entry_price if price_diff != 0: est_quantity = abs(realized_pnl / price_diff) est_cost = entry_price * est_quantity pnl_pct = (realized_pnl / est_cost * 100) if est_cost > 0 else 0.0 else: pnl_pct = 0.0 else: pnl_pct = 0.0 updated_at = pos.get("updated_at") hold_hours = _compute_hold_hours(updated_at, now) positions.append( PositionDetail( ticker=pos.get("ticker", ""), entry_price=entry_price, current_or_exit_price=current_price, pnl=realized_pnl, pnl_pct=pnl_pct, hold_duration_hours=hold_hours, status="closed", ) ) return PositionPerformanceSection(positions=positions) def _compute_hold_hours(updated_at: datetime | str | None, now: datetime) -> float: """Compute hold duration in hours from updated_at to now.""" if updated_at is None: return 0.0 if isinstance(updated_at, str): try: updated_at = datetime.fromisoformat(updated_at) except (ValueError, TypeError): return 0.0 if not isinstance(updated_at, datetime): return 0.0 # Ensure timezone-aware comparison if updated_at.tzinfo is None: updated_at = updated_at.replace(tzinfo=timezone.utc) delta = now - updated_at return max(delta.total_seconds() / 3600.0, 0.0) def build_risk_metrics_section(data: CollectedData) -> RiskMetricsSection: """Build risk metrics section. Extracts current risk tier, portfolio heat, max drawdown, current drawdown %, reserve pool balance, and circuit breaker event count from collected data. """ snap = data.portfolio_snapshot if snap is None: return RiskMetricsSection( current_risk_tier="unknown", portfolio_heat=0.0, max_drawdown=0.0, current_drawdown_pct=0.0, reserve_pool_balance=data.reserve_pool_balance, circuit_breaker_event_count=len(data.circuit_breaker_events), ) return RiskMetricsSection( current_risk_tier=str(snap.get("risk_tier", "unknown") or "unknown"), portfolio_heat=float(snap.get("portfolio_heat", 0) or 0), max_drawdown=float(snap.get("max_drawdown", 0) or 0), current_drawdown_pct=float(snap.get("current_drawdown_pct", 0) or 0), reserve_pool_balance=data.reserve_pool_balance, circuit_breaker_event_count=len(data.circuit_breaker_events), ) def build_model_quality_section(data: CollectedData) -> ModelQualitySection: """Build model quality section. Extracts latest model_metric_snapshot values for 7d, 30d, 90d lookback windows. """ if not data.model_metric_snapshots: return ModelQualitySection(windows=[]) # Group by lookback_window, take the latest (first in list since # collector orders by generated_at DESC) target_windows = {"7d", "30d", "90d"} latest_by_window: dict[str, dict] = {} for snap in data.model_metric_snapshots: window = snap.get("lookback_window", "") if window in target_windows and window not in latest_by_window: latest_by_window[window] = snap windows: list[ModelQualityWindow] = [] for w in ("7d", "30d", "90d"): snap = latest_by_window.get(w) if snap is None: windows.append( ModelQualityWindow( lookback=w, win_rate=None, directional_accuracy=None, information_coefficient=None, calibration_error=None, brier_score=None, ) ) else: windows.append( ModelQualityWindow( lookback=w, win_rate=_safe_float(snap.get("win_rate")), directional_accuracy=_safe_float(snap.get("directional_accuracy")), information_coefficient=_safe_float( snap.get("information_coefficient") ), calibration_error=_safe_float(snap.get("calibration_error")), brier_score=_safe_float(snap.get("brier_score")), ) ) return ModelQualitySection(windows=windows) def _safe_float(value: object) -> float | None: """Convert a value to float, returning None for None/invalid values.""" if value is None: return None try: f = float(value) # type: ignore[arg-type] # Replace NaN/inf with None if f != f or f == float("inf") or f == float("-inf"): return None return f except (ValueError, TypeError): return None