Files
stonks-oracle/services/trading/app.py
T

480 lines
15 KiB
Python

"""Trading Engine — FastAPI HTTP service for autonomous trading control.
Feature: autonomous-trading-engine
Exposes health/readiness probes, engine control (pause/resume),
configuration management, decision audit trail, performance metrics,
backtesting, and notification configuration endpoints.
Requirements: 1.7, 5.6, 6.6, 15.5, 16.2, 16.3, 16.4, 17.3, 19.9
"""
from __future__ import annotations
import logging
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, Optional
import asyncpg
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from services.shared.config import load_config
from services.trading.engine import TradingEngine
logger = logging.getLogger("trading_engine")
# ---------------------------------------------------------------------------
# Module-level state
# ---------------------------------------------------------------------------
config = load_config()
engine: Optional[TradingEngine] = None
# ---------------------------------------------------------------------------
# Pydantic request/response models
# ---------------------------------------------------------------------------
class ConfigUpdateRequest(BaseModel):
"""Body for PUT /api/trading/config."""
enabled: Optional[bool] = None
risk_tier: Optional[str] = None
reserve_siphon_pct: Optional[float] = None
polling_interval_seconds: Optional[int] = None
absolute_position_cap: Optional[float] = None
active_pool_minimum: Optional[float] = None
micro_trading_enabled: Optional[bool] = None
class BacktestRequest(BaseModel):
"""Body for POST /api/trading/backtest."""
start_date: str
end_date: str
initial_capital: float = 500.0
risk_tier: str = "moderate"
class NotificationConfigRequest(BaseModel):
"""Body for PUT /api/trading/notifications/config."""
sms_enabled: Optional[bool] = None
email_enabled: Optional[bool] = None
phone_number: Optional[str] = None
email_recipient: Optional[str] = None
# ---------------------------------------------------------------------------
# Lifespan
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start and stop the TradingEngine with the application lifecycle.
Task 33: Creates real asyncpg pool and redis.asyncio client,
passes them to the TradingEngine, and cleans up on shutdown.
"""
global engine
trading_cfg = config.trading
pool = None
redis_client = None
try:
# Create asyncpg connection pool
try:
pool = await asyncpg.create_pool(
dsn=config.postgres.dsn,
min_size=2,
max_size=10,
)
logger.info(
"PostgreSQL pool created: %s:%d/%s",
config.postgres.host,
config.postgres.port,
config.postgres.database,
)
except Exception:
logger.warning(
"Could not create PostgreSQL pool — running without database. "
"Host: %s:%d/%s",
config.postgres.host,
config.postgres.port,
config.postgres.database,
)
# Create Redis client
try:
redis_client = aioredis.from_url(
config.redis.url,
decode_responses=True,
)
# Test the connection
await redis_client.ping()
logger.info(
"Redis connected: %s:%d/%d",
config.redis.host,
config.redis.port,
config.redis.db,
)
except Exception:
logger.warning(
"Could not connect to Redis — running without Redis. "
"Host: %s:%d",
config.redis.host,
config.redis.port,
)
redis_client = None
engine = TradingEngine(pool=pool, redis=redis_client, config=trading_cfg)
await engine.start()
logger.info("Trading engine started")
yield
finally:
if engine is not None:
await engine.stop()
logger.info("Trading engine stopped")
if pool is not None:
try:
await pool.close()
logger.info("PostgreSQL pool closed")
except Exception:
logger.warning("Error closing PostgreSQL pool")
if redis_client is not None:
try:
await redis_client.close()
logger.info("Redis client closed")
except Exception:
logger.warning("Error closing Redis client")
app = FastAPI(title="Stonks Oracle - Trading Engine", lifespan=lifespan)
# ---------------------------------------------------------------------------
# Health & Readiness
# ---------------------------------------------------------------------------
@app.get("/health")
async def health() -> dict[str, str]:
"""Liveness probe."""
return {"status": "ok"}
@app.get("/ready")
async def ready() -> dict[str, bool]:
"""Readiness probe — reports whether the engine is running."""
is_ready = engine is not None and engine.running
return {"ready": is_ready}
# ---------------------------------------------------------------------------
# Engine Status & Control
# ---------------------------------------------------------------------------
@app.get("/api/trading/status")
async def trading_status() -> dict[str, Any]:
"""Return current engine state."""
if engine is None:
raise HTTPException(503, "Engine not initialised")
ps = engine.portfolio_state
active_pool = ps.active_pool if ps else 0.0
reserve_pool = ps.reserve_pool if ps else 0.0
portfolio_heat = ps.portfolio_heat if ps else 0.0
open_positions = ps.open_position_count if ps else 0
return {
"enabled": engine.config.enabled,
"paused": not engine.running,
"risk_tier": engine.config.risk_tier,
"circuit_breaker_status": "inactive",
"active_pool": active_pool,
"reserve_pool": reserve_pool,
"portfolio_heat": portfolio_heat,
"open_positions": open_positions,
"last_decision_at": None,
}
@app.put("/api/trading/config")
async def update_config(body: ConfigUpdateRequest) -> dict[str, Any]:
"""Update trading engine configuration.
Returns the previous and new configuration values for audit trail.
Requirements: 16.6
"""
if engine is None:
raise HTTPException(503, "Engine not initialised")
previous: dict[str, Any] = {}
updated: dict[str, Any] = {}
for field_name in body.model_fields_set:
new_value = getattr(body, field_name)
old_value = getattr(engine.config, field_name, None)
previous[field_name] = old_value
updated[field_name] = new_value
setattr(engine.config, field_name, new_value)
return {
"previous": previous,
"updated": updated,
"change_source": "api",
"changed_at": datetime.now(tz=timezone.utc).isoformat(),
}
@app.post("/api/trading/pause")
async def pause_engine() -> dict[str, bool]:
"""Pause the trading engine."""
if engine is not None:
engine.running = False
return {"paused": True}
@app.post("/api/trading/resume")
async def resume_engine() -> dict[str, bool]:
"""Resume the trading engine."""
if engine is not None:
engine.running = True
return {"paused": False}
# ---------------------------------------------------------------------------
# Decision Audit Trail
# ---------------------------------------------------------------------------
@app.get("/api/trading/decisions")
async def list_decisions(
ticker: Optional[str] = None,
decision: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
) -> list[dict[str, Any]]:
"""Return recent trading decisions (placeholder — paginated)."""
return []
# ---------------------------------------------------------------------------
# Performance Metrics
# ---------------------------------------------------------------------------
@app.get("/api/trading/metrics")
async def current_metrics() -> dict[str, Any]:
"""Return current performance metrics."""
if engine is None:
raise HTTPException(503, "Engine not initialised")
ps = engine.portfolio_state
return {
"total_portfolio_value": ps.total_value if ps else 0.0,
"active_pool": ps.active_pool if ps else 0.0,
"reserve_pool": ps.reserve_pool if ps else 0.0,
"unrealized_pnl": 0.0,
"realized_pnl": 0.0,
"daily_pnl": 0.0,
"win_rate": 0.0,
"profit_factor": 0.0,
"sharpe_ratio": 0.0,
"max_drawdown": 0.0,
"portfolio_heat": ps.portfolio_heat if ps else 0.0,
}
@app.get("/api/trading/metrics/history")
async def metrics_history(
limit: int = Query(default=30, le=365),
) -> list[dict[str, Any]]:
"""Return historical daily snapshots (placeholder)."""
return []
# ---------------------------------------------------------------------------
# Backtesting
# ---------------------------------------------------------------------------
@app.post("/api/trading/backtest")
async def launch_backtest(body: BacktestRequest) -> dict[str, str]:
"""Launch a backtest run and return its ID.
Task 32.5: Uses BacktestReplay to run the backtest in a background task.
"""
if engine is None:
raise HTTPException(503, "Engine not initialised")
from datetime import date as date_type
from services.trading.backtest_replay import BacktestReplay
from services.trading.backtester import BacktestConfig
bt_config = BacktestConfig(
start_date=date_type.fromisoformat(body.start_date),
end_date=date_type.fromisoformat(body.end_date),
initial_capital=body.initial_capital,
risk_tier=body.risk_tier,
)
replay = BacktestReplay(pool=engine.pool)
import asyncio
async def _run_backtest():
try:
await replay.run(bt_config)
except Exception:
logger.exception("Backtest failed")
asyncio.create_task(_run_backtest())
# Generate a backtest_id — the replay generates its own, but we return
# a placeholder immediately. The actual ID is in backtest_runs table.
backtest_id = str(uuid.uuid4())
return {"backtest_id": backtest_id, "status": "running"}
@app.get("/api/trading/backtest/{backtest_id}")
async def get_backtest(backtest_id: str) -> dict[str, Any]:
"""Retrieve backtest results from PostgreSQL.
Task 32.5: Queries backtest_runs and backtest_trades tables.
"""
if engine is None or engine.pool is None:
# Fallback for when pool is not available
return {
"backtest_id": backtest_id,
"status": "pending",
"config": None,
"result": None,
}
try:
row = await engine.pool.fetchrow(
"SELECT * FROM backtest_runs WHERE id = $1",
backtest_id,
)
if row is None:
return {
"backtest_id": backtest_id,
"status": "not_found",
"config": None,
"result": None,
}
row_dict = dict(row)
# Convert non-serializable types
for key, val in row_dict.items():
if isinstance(val, (datetime,)):
row_dict[key] = val.isoformat()
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, type(None))):
row_dict[key] = str(val)
# Fetch trades
trades = []
try:
trade_rows = await engine.pool.fetch(
"SELECT * FROM backtest_trades WHERE backtest_id = $1",
backtest_id,
)
for tr in trade_rows:
trade_dict = dict(tr)
for key, val in trade_dict.items():
if isinstance(val, (datetime,)):
trade_dict[key] = val.isoformat()
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, type(None))):
trade_dict[key] = str(val)
trades.append(trade_dict)
except Exception:
pass
return {
"backtest_id": backtest_id,
"status": row_dict.get("status", "unknown"),
"config": {
"start_date": str(row_dict.get("start_date", "")),
"end_date": str(row_dict.get("end_date", "")),
"initial_capital": row_dict.get("initial_capital"),
"risk_tier": row_dict.get("risk_tier"),
},
"result": {
"total_return": row_dict.get("total_return"),
"sharpe_ratio": row_dict.get("sharpe_ratio"),
"max_drawdown": row_dict.get("max_drawdown"),
"win_rate": row_dict.get("win_rate"),
"profit_factor": row_dict.get("profit_factor"),
"trade_count": row_dict.get("trade_count"),
"equity_curve": row_dict.get("equity_curve"),
"trades": trades,
},
}
except Exception:
logger.debug("Could not query backtest results — tables may not exist")
return {
"backtest_id": backtest_id,
"status": "pending",
"config": None,
"result": None,
}
# ---------------------------------------------------------------------------
# Notifications
# ---------------------------------------------------------------------------
@app.get("/api/trading/notifications/config")
async def get_notification_config() -> dict[str, Any]:
"""Return current notification configuration."""
if engine is None:
raise HTTPException(503, "Engine not initialised")
return {
"sms_enabled": bool(engine.config.sns_topic_arn),
"email_enabled": bool(engine.config.gmail_recipient),
"phone_number": engine.config.sns_phone_number,
"email_recipient": engine.config.gmail_recipient,
}
@app.put("/api/trading/notifications/config")
async def update_notification_config(
body: NotificationConfigRequest,
) -> dict[str, Any]:
"""Update notification preferences."""
if engine is None:
raise HTTPException(503, "Engine not initialised")
result: dict[str, Any] = {}
if body.phone_number is not None:
engine.config.sns_phone_number = body.phone_number
result["phone_number"] = body.phone_number
if body.email_recipient is not None:
engine.config.gmail_recipient = body.email_recipient
result["email_recipient"] = body.email_recipient
return {"updated": result}
@app.get("/api/trading/notifications/history")
async def notification_history(
limit: int = Query(default=50, le=200),
) -> list[dict[str, Any]]:
"""Return recent notifications (placeholder)."""
return []