"""Trading Engine — FastAPI HTTP service for autonomous trading control. Feature: autonomous-trading-engine Exposes health/readiness probes, engine control (pause/resume), configuration management, decision audit trail, performance metrics, backtesting, and notification configuration endpoints. Requirements: 1.7, 5.6, 6.6, 15.5, 16.2, 16.3, 16.4, 17.3, 19.9 """ from __future__ import annotations import logging import uuid from contextlib import asynccontextmanager from datetime import date, datetime, timezone from typing import Any, Optional import asyncpg import redis.asyncio as aioredis from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from services.shared.config import load_config from services.trading.engine import TradingEngine logger = logging.getLogger("trading_engine") # Configure logging so we can actually see what's happening logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) # --------------------------------------------------------------------------- # Module-level state # --------------------------------------------------------------------------- config = load_config() engine: Optional[TradingEngine] = None # --------------------------------------------------------------------------- # Pydantic request/response models # --------------------------------------------------------------------------- class ConfigUpdateRequest(BaseModel): """Body for PUT /api/trading/config.""" enabled: Optional[bool] = None risk_tier: Optional[str] = None reserve_siphon_pct: Optional[float] = None polling_interval_seconds: Optional[int] = None absolute_position_cap: Optional[float] = None active_pool_minimum: Optional[float] = None micro_trading_enabled: Optional[bool] = None max_open_positions: Optional[int] = None class CapitalRequest(BaseModel): """Body for PUT /api/trading/capital.""" initial_capital: float class BacktestRequest(BaseModel): """Body for POST /api/trading/backtest.""" start_date: str end_date: str initial_capital: float = 500.0 risk_tier: str = "moderate" class NotificationConfigRequest(BaseModel): """Body for PUT /api/trading/notifications/config.""" sms_enabled: Optional[bool] = None email_enabled: Optional[bool] = None phone_number: Optional[str] = None email_recipient: Optional[str] = None # --------------------------------------------------------------------------- # Lifespan # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): """Start and stop the TradingEngine with the application lifecycle. Task 33: Creates real asyncpg pool and redis.asyncio client, passes them to the TradingEngine, and cleans up on shutdown. """ global engine trading_cfg = config.trading pool = None redis_client = None try: # Create asyncpg connection pool try: pool = await asyncpg.create_pool( dsn=config.postgres.dsn, min_size=2, max_size=10, ) logger.info( "PostgreSQL pool created: %s:%d/%s", config.postgres.host, config.postgres.port, config.postgres.database, ) except Exception: logger.warning( "Could not create PostgreSQL pool — running without database. " "Host: %s:%d/%s", config.postgres.host, config.postgres.port, config.postgres.database, ) # Create Redis client try: redis_client = aioredis.from_url( config.redis.url, decode_responses=True, ) # Test the connection await redis_client.ping() logger.info( "Redis connected: %s:%d/%d", config.redis.host, config.redis.port, config.redis.db, ) except Exception: logger.warning( "Could not connect to Redis — running without Redis. " "Host: %s:%d", config.redis.host, config.redis.port, ) redis_client = None engine = TradingEngine(pool=pool, redis=redis_client, config=trading_cfg) await engine.start() logger.info("Trading engine started") yield finally: if engine is not None: await engine.stop() logger.info("Trading engine stopped") if pool is not None: try: await pool.close() logger.info("PostgreSQL pool closed") except Exception: logger.warning("Error closing PostgreSQL pool") if redis_client is not None: try: await redis_client.close() logger.info("Redis client closed") except Exception: logger.warning("Error closing Redis client") app = FastAPI(title="Stonks Oracle - Trading Engine", lifespan=lifespan) # --------------------------------------------------------------------------- # Health & Readiness # --------------------------------------------------------------------------- @app.get("/health") async def health() -> dict[str, str]: """Liveness probe.""" return {"status": "ok"} @app.get("/ready") async def ready() -> dict[str, bool]: """Readiness probe — reports whether the engine is running.""" is_ready = engine is not None and engine.running return {"ready": is_ready} @app.get("/api/trading/debug") async def debug_state() -> dict[str, Any]: """Diagnostic endpoint — shows engine internals for troubleshooting.""" if engine is None: return {"engine": None, "reason": "engine not initialised"} ps = engine.portfolio_state task_states = {} for t in engine._tasks: task_states[t.get_name()] = "done" if t.done() else "running" if t.done() and t.exception(): task_states[t.get_name()] = f"failed: {t.exception()}" return { "running": engine.running, "has_pool": engine.pool is not None, "has_redis": engine.redis is not None, "config_enabled": engine.config.enabled, "polling_interval": engine.config.polling_interval_seconds, "last_poll": str(engine._last_poll_timestamp), "portfolio_state": { "active_pool": ps.active_pool if ps else 0, "reserve_pool": ps.reserve_pool if ps else 0, "total_value": ps.total_value if ps else 0, "open_positions": ps.open_position_count if ps else 0, }, "risk_tier": engine._active_risk_tier.name if engine._active_risk_tier else "none", "tasks": task_states, "processed_rec_count": len(engine.processed_recommendation_ids), } # --------------------------------------------------------------------------- # Engine Status & Control # --------------------------------------------------------------------------- @app.get("/api/trading/status") async def trading_status() -> dict[str, Any]: """Return current engine state.""" if engine is None: raise HTTPException(503, "Engine not initialised") ps = engine.portfolio_state active_pool = ps.active_pool if ps else 0.0 reserve_pool = ps.reserve_pool if ps else 0.0 portfolio_heat = ps.portfolio_heat if ps else 0.0 open_positions = ps.open_position_count if ps else 0 return { "enabled": engine.config.enabled, "paused": not engine.running, "risk_tier": engine.config.risk_tier, "circuit_breaker_status": "inactive", "active_pool": active_pool, "reserve_pool": reserve_pool, "portfolio_heat": portfolio_heat, "open_positions": open_positions, "last_decision_at": None, } @app.put("/api/trading/config") async def update_config(body: ConfigUpdateRequest) -> dict[str, Any]: """Update trading engine configuration. Returns the previous and new configuration values for audit trail. Requirements: 16.6 """ if engine is None: raise HTTPException(503, "Engine not initialised") previous: dict[str, Any] = {} updated: dict[str, Any] = {} for field_name in body.model_fields_set: new_value = getattr(body, field_name) old_value = getattr(engine.config, field_name, None) previous[field_name] = old_value updated[field_name] = new_value setattr(engine.config, field_name, new_value) return { "previous": previous, "updated": updated, "change_source": "api", "changed_at": datetime.now(tz=timezone.utc).isoformat(), } @app.post("/api/trading/pause") async def pause_engine() -> dict[str, bool]: """Pause the trading engine.""" if engine is not None: engine.running = False return {"paused": True} @app.post("/api/trading/resume") async def resume_engine() -> dict[str, bool]: """Resume the trading engine.""" if engine is not None: engine.running = True return {"paused": False} @app.put("/api/trading/capital") async def set_capital(body: CapitalRequest) -> dict[str, Any]: """Set or reset the paper trading capital. Allocates the given amount as initial capital, splitting between active pool and reserve pool based on the current reserve_siphon_pct. """ if engine is None: raise HTTPException(503, "Engine not initialised") capital = body.initial_capital if capital <= 0: raise HTTPException(400, "initial_capital must be positive") reserve_pct = engine.config.reserve_siphon_pct reserve = capital * reserve_pct active = capital - reserve ps = engine.portfolio_state previous = { "total_value": ps.total_value if ps else 0.0, "active_pool": ps.active_pool if ps else 0.0, "reserve_pool": ps.reserve_pool if ps else 0.0, } from services.trading.models import PortfolioState engine.portfolio_state = PortfolioState( total_value=capital, cash=capital, active_pool=active, reserve_pool=reserve, ) # Record in reserve pool ledger if engine.pool: try: await engine.pool.execute( "INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) " "VALUES ($1, $2, 'capital_reset', $3)", reserve, reserve, f"Capital set to ${capital:,.2f} (active=${active:,.2f}, reserve=${reserve:,.2f})", ) except Exception: pass # Non-critical return { "initial_capital": capital, "active_pool": active, "reserve_pool": reserve, "reserve_siphon_pct": reserve_pct, "previous": previous, } # --------------------------------------------------------------------------- # Decision Audit Trail # --------------------------------------------------------------------------- @app.get("/api/trading/decisions") async def list_decisions( ticker: Optional[str] = None, decision: Optional[str] = None, is_micro_trade: Optional[bool] = None, limit: int = Query(default=50, le=200), offset: int = 0, ) -> list[dict[str, Any]]: """Return recent trading decisions from the database.""" if engine is None or engine.pool is None: return [] conditions = ["1=1"] params: list[Any] = [] idx = 1 if ticker: conditions.append(f"ticker = ${idx}") params.append(ticker.upper()) idx += 1 if decision: conditions.append(f"decision = ${idx}") params.append(decision) idx += 1 if is_micro_trade is not None: conditions.append(f"is_micro_trade = ${idx}") params.append(is_micro_trade) idx += 1 where = " AND ".join(conditions) params.extend([limit, offset]) try: rows = await engine.pool.fetch( f"SELECT id, recommendation_id, decision, skip_reason, ticker, " f"computed_position_size, computed_share_quantity, " f"risk_tier_at_decision, portfolio_heat_at_decision, " f"active_pool_at_decision, reserve_pool_at_decision, " f"circuit_breaker_status, is_micro_trade, created_at " f"FROM trading_decisions WHERE {where} " f"ORDER BY created_at DESC LIMIT ${idx} OFFSET ${idx + 1}", *params, ) from decimal import Decimal as _Dec result = [] for r in rows: d = dict(r) for k, v in d.items(): if isinstance(v, _Dec): d[k] = float(v) elif isinstance(v, datetime): d[k] = v.isoformat() elif hasattr(v, '__str__') and not isinstance(v, (str, int, float, bool, type(None))): d[k] = str(v) result.append(d) return result except Exception: return [] # --------------------------------------------------------------------------- # Performance Metrics # --------------------------------------------------------------------------- @app.get("/api/trading/metrics") async def current_metrics() -> dict[str, Any]: """Return current performance metrics.""" if engine is None: raise HTTPException(503, "Engine not initialised") ps = engine.portfolio_state return { "total_portfolio_value": ps.total_value if ps else 0.0, "active_pool": ps.active_pool if ps else 0.0, "reserve_pool": ps.reserve_pool if ps else 0.0, "unrealized_pnl": 0.0, "realized_pnl": 0.0, "daily_pnl": 0.0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0, "max_drawdown": 0.0, "portfolio_heat": ps.portfolio_heat if ps else 0.0, } @app.get("/api/trading/metrics/history") async def metrics_history( limit: int = Query(default=30, le=365), ) -> list[dict[str, Any]]: """Return historical daily portfolio snapshots.""" if engine is None or engine.pool is None: return [] try: rows = await engine.pool.fetch( "SELECT id, snapshot_date, portfolio_value, active_pool, reserve_pool, " "daily_return, cumulative_return, unrealized_pnl, realized_pnl, " "win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, " "current_drawdown_pct, portfolio_heat, risk_tier, created_at " "FROM portfolio_snapshots ORDER BY snapshot_date DESC LIMIT $1", limit, ) from decimal import Decimal as _Dec result = [] for r in rows: d = dict(r) for k, v in d.items(): if isinstance(v, _Dec): d[k] = float(v) elif isinstance(v, datetime): d[k] = v.isoformat() elif hasattr(v, '__str__') and not isinstance(v, (str, int, float, bool, type(None))): d[k] = str(v) result.append(d) return result except Exception: return [] # --------------------------------------------------------------------------- # Backtesting # --------------------------------------------------------------------------- @app.post("/api/trading/backtest") async def launch_backtest(body: BacktestRequest) -> dict[str, str]: """Launch a backtest run and return its ID. Task 32.5: Uses BacktestReplay to run the backtest in a background task. The backtest_id is pre-generated and passed to BacktestReplay so the frontend can poll for results using the same ID. """ if engine is None: raise HTTPException(503, "Engine not initialised") from datetime import date as date_type from services.trading.backtest_replay import BacktestReplay from services.trading.backtester import BacktestConfig backtest_id = str(uuid.uuid4()) bt_config = BacktestConfig( start_date=date_type.fromisoformat(body.start_date), end_date=date_type.fromisoformat(body.end_date), initial_capital=body.initial_capital, risk_tier=body.risk_tier, ) replay = BacktestReplay(pool=engine.pool) import asyncio async def _run_backtest(): try: await replay.run(bt_config, backtest_id=backtest_id) except Exception: logger.exception("Backtest failed") asyncio.create_task(_run_backtest()) return {"id": backtest_id, "status": "running"} @app.get("/api/trading/backtest/{backtest_id}") async def get_backtest(backtest_id: str) -> dict[str, Any]: """Retrieve backtest results from PostgreSQL. Task 32.5: Queries backtest_runs and backtest_trades tables. Returns a flat object matching the frontend BacktestResult type. """ if engine is None or engine.pool is None: return { "id": backtest_id, "status": "pending", } try: row = await engine.pool.fetchrow( "SELECT * FROM backtest_runs WHERE id = $1", backtest_id, ) if row is None: return { "id": backtest_id, "status": "not_found", } row_dict = dict(row) # Parse equity_curve from JSONB equity_curve = row_dict.get("equity_curve", []) if isinstance(equity_curve, str): import json as _json equity_curve = _json.loads(equity_curve) # Fetch trades trades = [] try: trade_rows = await engine.pool.fetch( "SELECT * FROM backtest_trades WHERE backtest_id = $1 ORDER BY created_at", backtest_id, ) for tr in trade_rows: trade_dict = dict(tr) for key, val in trade_dict.items(): if isinstance(val, (datetime,)): trade_dict[key] = val.isoformat() elif isinstance(val, date): trade_dict[key] = val.isoformat() elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, type(None))): trade_dict[key] = str(val) trades.append(trade_dict) except Exception: pass return { "id": str(row_dict.get("id", backtest_id)), "start_date": str(row_dict.get("start_date", "")), "end_date": str(row_dict.get("end_date", "")), "initial_capital": row_dict.get("initial_capital"), "risk_tier": row_dict.get("risk_tier"), "config": row_dict.get("config", {}), "total_return": row_dict.get("total_return"), "sharpe_ratio": row_dict.get("sharpe_ratio"), "max_drawdown": row_dict.get("max_drawdown"), "win_rate": row_dict.get("win_rate"), "profit_factor": row_dict.get("profit_factor"), "trade_count": row_dict.get("trade_count"), "equity_curve": equity_curve, "trades": trades, "status": row_dict.get("status", "unknown"), "completed_at": row_dict["completed_at"].isoformat() if row_dict.get("completed_at") else None, "created_at": row_dict["created_at"].isoformat() if row_dict.get("created_at") else None, } except Exception: logger.debug("Could not query backtest results — tables may not exist") return { "id": backtest_id, "status": "pending", } # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @app.get("/api/trading/notifications/config") async def get_notification_config() -> dict[str, Any]: """Return current notification configuration.""" if engine is None: raise HTTPException(503, "Engine not initialised") return { "sms_enabled": bool(engine.config.sns_topic_arn), "email_enabled": bool(engine.config.gmail_recipient), "phone_number": engine.config.sns_phone_number, "email_recipient": engine.config.gmail_recipient, } @app.put("/api/trading/notifications/config") async def update_notification_config( body: NotificationConfigRequest, ) -> dict[str, Any]: """Update notification preferences.""" if engine is None: raise HTTPException(503, "Engine not initialised") result: dict[str, Any] = {} if body.phone_number is not None: engine.config.sns_phone_number = body.phone_number result["phone_number"] = body.phone_number if body.email_recipient is not None: engine.config.gmail_recipient = body.email_recipient result["email_recipient"] = body.email_recipient return {"updated": result} @app.get("/api/trading/notifications/history") async def notification_history( limit: int = Query(default=50, le=200), ) -> list[dict[str, Any]]: """Return recent notifications (placeholder).""" return []