"""Trading Engine — FastAPI HTTP service for autonomous trading control. Feature: autonomous-trading-engine Exposes health/readiness probes, engine control (pause/resume), configuration management, decision audit trail, performance metrics, backtesting, and notification configuration endpoints. Requirements: 1.7, 5.6, 6.6, 15.5, 16.2, 16.3, 16.4, 17.3, 19.9 """ from __future__ import annotations import logging import uuid from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any, Optional from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from services.shared.config import load_config from services.trading.engine import TradingEngine logger = logging.getLogger("trading_engine") # --------------------------------------------------------------------------- # Module-level state # --------------------------------------------------------------------------- config = load_config() engine: Optional[TradingEngine] = None # --------------------------------------------------------------------------- # Pydantic request/response models # --------------------------------------------------------------------------- class ConfigUpdateRequest(BaseModel): """Body for PUT /api/trading/config.""" enabled: Optional[bool] = None risk_tier: Optional[str] = None reserve_siphon_pct: Optional[float] = None polling_interval_seconds: Optional[int] = None absolute_position_cap: Optional[float] = None active_pool_minimum: Optional[float] = None micro_trading_enabled: Optional[bool] = None class BacktestRequest(BaseModel): """Body for POST /api/trading/backtest.""" start_date: str end_date: str initial_capital: float = 500.0 risk_tier: str = "moderate" class NotificationConfigRequest(BaseModel): """Body for PUT /api/trading/notifications/config.""" sms_enabled: Optional[bool] = None email_enabled: Optional[bool] = None phone_number: Optional[str] = None email_recipient: Optional[str] = None # --------------------------------------------------------------------------- # Lifespan # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): """Start and stop the TradingEngine with the application lifecycle.""" global engine trading_cfg = config.trading engine = TradingEngine(pool=None, redis=None, config=trading_cfg) await engine.start() logger.info("Trading engine started") yield if engine is not None: await engine.stop() logger.info("Trading engine stopped") app = FastAPI(title="Stonks Oracle - Trading Engine", lifespan=lifespan) # --------------------------------------------------------------------------- # Health & Readiness # --------------------------------------------------------------------------- @app.get("/health") async def health() -> dict[str, str]: """Liveness probe.""" return {"status": "ok"} @app.get("/ready") async def ready() -> dict[str, bool]: """Readiness probe — reports whether the engine is running.""" is_ready = engine is not None and engine.running return {"ready": is_ready} # --------------------------------------------------------------------------- # Engine Status & Control # --------------------------------------------------------------------------- @app.get("/api/trading/status") async def trading_status() -> dict[str, Any]: """Return current engine state.""" if engine is None: raise HTTPException(503, "Engine not initialised") return { "enabled": engine.config.enabled, "paused": not engine.running, "risk_tier": engine.config.risk_tier, "circuit_breaker_status": "inactive", "active_pool": 0.0, "reserve_pool": 0.0, "portfolio_heat": 0.0, "open_positions": 0, "last_decision_at": None, } @app.put("/api/trading/config") async def update_config(body: ConfigUpdateRequest) -> dict[str, Any]: """Update trading engine configuration. Returns the previous and new configuration values for audit trail. Requirements: 16.6 """ if engine is None: raise HTTPException(503, "Engine not initialised") previous: dict[str, Any] = {} updated: dict[str, Any] = {} for field_name in body.model_fields_set: new_value = getattr(body, field_name) old_value = getattr(engine.config, field_name, None) previous[field_name] = old_value updated[field_name] = new_value setattr(engine.config, field_name, new_value) return { "previous": previous, "updated": updated, "change_source": "api", "changed_at": datetime.now(tz=timezone.utc).isoformat(), } @app.post("/api/trading/pause") async def pause_engine() -> dict[str, bool]: """Pause the trading engine.""" if engine is not None: engine.running = False return {"paused": True} @app.post("/api/trading/resume") async def resume_engine() -> dict[str, bool]: """Resume the trading engine.""" if engine is not None: engine.running = True return {"paused": False} # --------------------------------------------------------------------------- # Decision Audit Trail # --------------------------------------------------------------------------- @app.get("/api/trading/decisions") async def list_decisions( ticker: Optional[str] = None, decision: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, ) -> list[dict[str, Any]]: """Return recent trading decisions (placeholder — paginated).""" return [] # --------------------------------------------------------------------------- # Performance Metrics # --------------------------------------------------------------------------- @app.get("/api/trading/metrics") async def current_metrics() -> dict[str, Any]: """Return current performance metrics (placeholder).""" return { "total_portfolio_value": 0.0, "active_pool": 0.0, "reserve_pool": 0.0, "unrealized_pnl": 0.0, "realized_pnl": 0.0, "daily_pnl": 0.0, "win_rate": 0.0, "profit_factor": 0.0, "sharpe_ratio": 0.0, "max_drawdown": 0.0, "portfolio_heat": 0.0, } @app.get("/api/trading/metrics/history") async def metrics_history( limit: int = Query(default=30, le=365), ) -> list[dict[str, Any]]: """Return historical daily snapshots (placeholder).""" return [] # --------------------------------------------------------------------------- # Backtesting # --------------------------------------------------------------------------- @app.post("/api/trading/backtest") async def launch_backtest(body: BacktestRequest) -> dict[str, str]: """Launch a backtest run and return its ID.""" backtest_id = str(uuid.uuid4()) return {"backtest_id": backtest_id} @app.get("/api/trading/backtest/{backtest_id}") async def get_backtest(backtest_id: str) -> dict[str, Any]: """Retrieve backtest results (placeholder).""" return { "backtest_id": backtest_id, "status": "pending", "config": None, "result": None, } # --------------------------------------------------------------------------- # Notifications # --------------------------------------------------------------------------- @app.get("/api/trading/notifications/config") async def get_notification_config() -> dict[str, Any]: """Return current notification configuration.""" if engine is None: raise HTTPException(503, "Engine not initialised") return { "sms_enabled": bool(engine.config.sns_topic_arn), "email_enabled": bool(engine.config.gmail_recipient), "phone_number": engine.config.sns_phone_number, "email_recipient": engine.config.gmail_recipient, } @app.put("/api/trading/notifications/config") async def update_notification_config( body: NotificationConfigRequest, ) -> dict[str, Any]: """Update notification preferences.""" if engine is None: raise HTTPException(503, "Engine not initialised") result: dict[str, Any] = {} if body.phone_number is not None: engine.config.sns_phone_number = body.phone_number result["phone_number"] = body.phone_number if body.email_recipient is not None: engine.config.gmail_recipient = body.email_recipient result["email_recipient"] = body.email_recipient return {"updated": result} @app.get("/api/trading/notifications/history") async def notification_history( limit: int = Query(default=50, le=200), ) -> list[dict[str, Any]]: """Return recent notifications (placeholder).""" return []