"""Data collector for trading performance reports. Queries all relevant trading data for a reporting period and returns a CollectedData bundle for downstream section builders. """ from __future__ import annotations import logging import uuid from dataclasses import dataclass, field from datetime import date from typing import Any import asyncpg logger = logging.getLogger(__name__) @dataclass class CollectedData: """Raw data collected for a reporting period.""" trading_decisions: list[dict] = field(default_factory=list) orders: list[dict] = field(default_factory=list) open_positions: list[dict] = field(default_factory=list) closed_positions: list[dict] = field(default_factory=list) portfolio_snapshot: dict | None = None previous_portfolio_snapshot: dict | None = None recommendations: list[dict] = field(default_factory=list) prediction_outcomes: list[dict] = field(default_factory=list) model_metric_snapshots: list[dict] = field(default_factory=list) circuit_breaker_events: list[dict] = field(default_factory=list) reserve_pool_balance: float = 0.0 def _row_dict(row: asyncpg.Record) -> dict[str, Any]: """Convert asyncpg Record to dict with UUID→str coercion.""" d = dict(row) for k, v in d.items(): if isinstance(v, uuid.UUID): d[k] = str(v) return d async def collect_report_data( pool: asyncpg.Pool, period_start: date, period_end: date, ) -> CollectedData: """Query all trading data for the reporting period. Queries: trading_decisions, orders, positions, portfolio_snapshots, recommendations, prediction_outcomes, model_metric_snapshots, circuit_breaker_events, reserve_pool_ledger. Returns CollectedData with all raw query results. If no trading_decisions exist, returns empty lists (zero-activity). """ async with pool.acquire() as conn: trading_decisions = await _fetch_trading_decisions(conn, period_start, period_end) orders = await _fetch_orders(conn, period_start, period_end) open_positions = await _fetch_open_positions(conn) closed_positions = await _fetch_closed_positions(conn, period_start, period_end) portfolio_snapshot = await _fetch_portfolio_snapshot(conn, period_start, period_end) previous_portfolio_snapshot = await _fetch_previous_portfolio_snapshot(conn, period_start) recommendations = await _fetch_recommendations(conn, period_start, period_end) prediction_outcomes = await _fetch_prediction_outcomes(conn, period_start, period_end) model_metric_snapshots = await _fetch_model_metric_snapshots(conn, period_start, period_end) circuit_breaker_events = await _fetch_circuit_breaker_events(conn, period_start, period_end) reserve_pool_balance = await _fetch_reserve_pool_balance(conn) return CollectedData( trading_decisions=trading_decisions, orders=orders, open_positions=open_positions, closed_positions=closed_positions, portfolio_snapshot=portfolio_snapshot, previous_portfolio_snapshot=previous_portfolio_snapshot, recommendations=recommendations, prediction_outcomes=prediction_outcomes, model_metric_snapshots=model_metric_snapshots, circuit_breaker_events=circuit_breaker_events, reserve_pool_balance=reserve_pool_balance, ) async def _fetch_trading_decisions( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch trading decisions created within the period.""" rows = await conn.fetch( """SELECT id, recommendation_id, decision, skip_reason, ticker, computed_position_size, computed_share_quantity, risk_tier_at_decision, portfolio_heat_at_decision, active_pool_at_decision, reserve_pool_at_decision, circuit_breaker_status, correlation_check_result, sector_exposure_check_result, earnings_proximity_flag, is_micro_trade, decision_trace, created_at FROM trading_decisions WHERE created_at >= $1::date AND created_at < ($2::date + INTERVAL '1 day') ORDER BY created_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_orders( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch orders created within the period.""" rows = await conn.fetch( """SELECT id, recommendation_id, broker_account_id, ticker, side, order_type, quantity, limit_price, stop_price, status, broker_order_id, fill_price, fill_quantity, submitted_at, filled_at, cancelled_at, rejected_at, rejection_reason, created_at FROM orders WHERE created_at >= $1::date AND created_at < ($2::date + INTERVAL '1 day') ORDER BY created_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_open_positions(conn: asyncpg.Connection) -> list[dict]: """Fetch currently open positions (quantity > 0).""" rows = await conn.fetch( """SELECT id, broker_account_id, ticker, quantity, avg_entry_price, current_price, unrealized_pnl, realized_pnl, updated_at FROM positions WHERE quantity > 0 ORDER BY ticker""", ) return [_row_dict(r) for r in rows] async def _fetch_closed_positions( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch positions closed during the period (quantity = 0, updated within period).""" rows = await conn.fetch( """SELECT id, broker_account_id, ticker, quantity, avg_entry_price, current_price, unrealized_pnl, realized_pnl, updated_at FROM positions WHERE quantity = 0 AND updated_at >= $1::date AND updated_at < ($2::date + INTERVAL '1 day') ORDER BY updated_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_portfolio_snapshot( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> dict | None: """Fetch the most recent portfolio snapshot within the period.""" row = await conn.fetchrow( """SELECT id, snapshot_date, portfolio_value, active_pool, reserve_pool, daily_return, cumulative_return, unrealized_pnl, realized_pnl, win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, current_drawdown_pct, portfolio_heat, risk_tier, positions, metrics, created_at FROM portfolio_snapshots WHERE snapshot_date >= $1 AND snapshot_date <= $2 ORDER BY snapshot_date DESC LIMIT 1""", period_start, period_end, ) return _row_dict(row) if row else None async def _fetch_previous_portfolio_snapshot( conn: asyncpg.Connection, period_start: date, ) -> dict | None: """Fetch the most recent portfolio snapshot before the period start.""" row = await conn.fetchrow( """SELECT id, snapshot_date, portfolio_value, active_pool, reserve_pool, daily_return, cumulative_return, unrealized_pnl, realized_pnl, win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, current_drawdown_pct, portfolio_heat, risk_tier, positions, metrics, created_at FROM portfolio_snapshots WHERE snapshot_date < $1 ORDER BY snapshot_date DESC LIMIT 1""", period_start, ) return _row_dict(row) if row else None async def _fetch_recommendations( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch recommendations created within the period.""" rows = await conn.fetch( """SELECT id, ticker, company_id, action, mode, confidence, time_horizon, thesis, portfolio_pct, max_loss_pct, model_version, generated_at, created_at FROM recommendations WHERE created_at >= $1::date AND created_at < ($2::date + INTERVAL '1 day') ORDER BY created_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_prediction_outcomes( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch prediction outcomes evaluated within the period.""" rows = await conn.fetch( """SELECT po.id, po.prediction_id, po.evaluated_at, po.horizon, po.future_price, po.future_return, po.spy_future_price, po.spy_return, po.sector_etf_future_price, po.sector_etf_return, po.excess_return_vs_spy, po.excess_return_vs_sector, po.direction_correct, po.profitable, ps.ticker, ps.direction, ps.action, ps.confidence FROM prediction_outcomes po JOIN prediction_snapshots ps ON ps.id = po.prediction_id WHERE po.evaluated_at >= $1::date AND po.evaluated_at < ($2::date + INTERVAL '1 day') ORDER BY po.evaluated_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_model_metric_snapshots( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch model metric snapshots generated within the period.""" rows = await conn.fetch( """SELECT id, generated_at, lookback_window, horizon, prediction_count, win_rate, directional_accuracy, information_coefficient, rank_information_coefficient, avg_return, avg_excess_return_vs_spy, avg_excess_return_vs_sector, calibration_error, brier_score, buy_win_rate, sell_win_rate, hold_win_rate, created_at FROM model_metric_snapshots WHERE generated_at >= $1::date AND generated_at < ($2::date + INTERVAL '1 day') ORDER BY generated_at DESC""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_circuit_breaker_events( conn: asyncpg.Connection, period_start: date, period_end: date, ) -> list[dict]: """Fetch circuit breaker events from trading decisions within the period. Circuit breaker events are trading decisions where circuit_breaker_status is not 'clear' (i.e. a breaker was active). """ rows = await conn.fetch( """SELECT id, recommendation_id, decision, ticker, circuit_breaker_status, decision_trace, created_at FROM trading_decisions WHERE circuit_breaker_status != 'clear' AND created_at >= $1::date AND created_at < ($2::date + INTERVAL '1 day') ORDER BY created_at""", period_start, period_end, ) return [_row_dict(r) for r in rows] async def _fetch_reserve_pool_balance(conn: asyncpg.Connection) -> float: """Fetch the latest reserve pool balance.""" row = await conn.fetchrow( "SELECT balance_after FROM reserve_pool_ledger ORDER BY created_at DESC LIMIT 1", ) return float(row["balance_after"]) if row else 0.0