90614dd7bb
Three distinct capital operations on the Trading Controls page: - Set Capital: overwrites pool balances to a new amount (existing) - Add/Withdraw: adjusts active pool by a delta without touching positions, orders, or history. Validates sufficient balance for withdrawals. Logged to reserve_pool_ledger as manual_adjustment. - Reset Everything: nuclear option — deletes all positions, orders, trading decisions, stop levels, snapshots, backtests, notifications, and circuit breaker events, then resets capital fresh. Red button with double-confirmation dialog. Backend: POST /api/trading/capital/adjust and POST /api/trading/reset Frontend: CapitalCard rebuilt with three sections and confirmation UIs
763 lines
25 KiB
Python
763 lines
25 KiB
Python
"""Trading Engine — FastAPI HTTP service for autonomous trading control.
|
|
|
|
Feature: autonomous-trading-engine
|
|
|
|
Exposes health/readiness probes, engine control (pause/resume),
|
|
configuration management, decision audit trail, performance metrics,
|
|
backtesting, and notification configuration endpoints.
|
|
|
|
Requirements: 1.7, 5.6, 6.6, 15.5, 16.2, 16.3, 16.4, 17.3, 19.9
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import date, datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
import asyncpg
|
|
import redis.asyncio as aioredis
|
|
from fastapi import FastAPI, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
|
|
from services.shared.config import load_config
|
|
from services.trading.engine import TradingEngine
|
|
|
|
logger = logging.getLogger("trading_engine")
|
|
|
|
# Configure logging so we can actually see what's happening
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Module-level state
|
|
# ---------------------------------------------------------------------------
|
|
|
|
config = load_config()
|
|
engine: Optional[TradingEngine] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pydantic request/response models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class ConfigUpdateRequest(BaseModel):
|
|
"""Body for PUT /api/trading/config."""
|
|
|
|
enabled: Optional[bool] = None
|
|
risk_tier: Optional[str] = None
|
|
reserve_siphon_pct: Optional[float] = None
|
|
polling_interval_seconds: Optional[int] = None
|
|
absolute_position_cap: Optional[float] = None
|
|
active_pool_minimum: Optional[float] = None
|
|
micro_trading_enabled: Optional[bool] = None
|
|
max_open_positions: Optional[int] = None
|
|
|
|
|
|
class CapitalRequest(BaseModel):
|
|
"""Body for PUT /api/trading/capital."""
|
|
|
|
initial_capital: float
|
|
|
|
|
|
class BacktestRequest(BaseModel):
|
|
"""Body for POST /api/trading/backtest."""
|
|
|
|
start_date: str
|
|
end_date: str
|
|
initial_capital: float = 500.0
|
|
risk_tier: str = "moderate"
|
|
|
|
|
|
class NotificationConfigRequest(BaseModel):
|
|
"""Body for PUT /api/trading/notifications/config."""
|
|
|
|
sms_enabled: Optional[bool] = None
|
|
email_enabled: Optional[bool] = None
|
|
phone_number: Optional[str] = None
|
|
email_recipient: Optional[str] = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifespan
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Start and stop the TradingEngine with the application lifecycle.
|
|
|
|
Task 33: Creates real asyncpg pool and redis.asyncio client,
|
|
passes them to the TradingEngine, and cleans up on shutdown.
|
|
"""
|
|
global engine
|
|
|
|
trading_cfg = config.trading
|
|
pool = None
|
|
redis_client = None
|
|
|
|
try:
|
|
# Create asyncpg connection pool
|
|
try:
|
|
pool = await asyncpg.create_pool(
|
|
dsn=config.postgres.dsn,
|
|
min_size=2,
|
|
max_size=10,
|
|
)
|
|
logger.info(
|
|
"PostgreSQL pool created: %s:%d/%s",
|
|
config.postgres.host,
|
|
config.postgres.port,
|
|
config.postgres.database,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Could not create PostgreSQL pool — running without database. "
|
|
"Host: %s:%d/%s",
|
|
config.postgres.host,
|
|
config.postgres.port,
|
|
config.postgres.database,
|
|
)
|
|
|
|
# Create Redis client
|
|
try:
|
|
redis_client = aioredis.from_url(
|
|
config.redis.url,
|
|
decode_responses=True,
|
|
)
|
|
# Test the connection
|
|
await redis_client.ping()
|
|
logger.info(
|
|
"Redis connected: %s:%d/%d",
|
|
config.redis.host,
|
|
config.redis.port,
|
|
config.redis.db,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Could not connect to Redis — running without Redis. "
|
|
"Host: %s:%d",
|
|
config.redis.host,
|
|
config.redis.port,
|
|
)
|
|
redis_client = None
|
|
|
|
engine = TradingEngine(pool=pool, redis=redis_client, config=trading_cfg)
|
|
await engine.start()
|
|
logger.info("Trading engine started")
|
|
|
|
yield
|
|
|
|
finally:
|
|
if engine is not None:
|
|
await engine.stop()
|
|
logger.info("Trading engine stopped")
|
|
|
|
if pool is not None:
|
|
try:
|
|
await pool.close()
|
|
logger.info("PostgreSQL pool closed")
|
|
except Exception:
|
|
logger.warning("Error closing PostgreSQL pool")
|
|
|
|
if redis_client is not None:
|
|
try:
|
|
await redis_client.close()
|
|
logger.info("Redis client closed")
|
|
except Exception:
|
|
logger.warning("Error closing Redis client")
|
|
|
|
|
|
app = FastAPI(title="Stonks Oracle - Trading Engine", lifespan=lifespan)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Health & Readiness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/health")
|
|
async def health() -> dict[str, str]:
|
|
"""Liveness probe."""
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.get("/ready")
|
|
async def ready() -> dict[str, bool]:
|
|
"""Readiness probe — reports whether the engine is running."""
|
|
is_ready = engine is not None and engine.running
|
|
return {"ready": is_ready}
|
|
|
|
|
|
@app.get("/api/trading/debug")
|
|
async def debug_state() -> dict[str, Any]:
|
|
"""Diagnostic endpoint — shows engine internals for troubleshooting."""
|
|
if engine is None:
|
|
return {"engine": None, "reason": "engine not initialised"}
|
|
|
|
ps = engine.portfolio_state
|
|
task_states = {}
|
|
for t in engine._tasks:
|
|
task_states[t.get_name()] = "done" if t.done() else "running"
|
|
if t.done() and t.exception():
|
|
task_states[t.get_name()] = f"failed: {t.exception()}"
|
|
|
|
return {
|
|
"running": engine.running,
|
|
"has_pool": engine.pool is not None,
|
|
"has_redis": engine.redis is not None,
|
|
"config_enabled": engine.config.enabled,
|
|
"polling_interval": engine.config.polling_interval_seconds,
|
|
"last_poll": str(engine._last_poll_timestamp),
|
|
"portfolio_state": {
|
|
"active_pool": ps.active_pool if ps else 0,
|
|
"reserve_pool": ps.reserve_pool if ps else 0,
|
|
"total_value": ps.total_value if ps else 0,
|
|
"open_positions": ps.open_position_count if ps else 0,
|
|
},
|
|
"risk_tier": engine._active_risk_tier.name if engine._active_risk_tier else "none",
|
|
"tasks": task_states,
|
|
"processed_rec_count": len(engine.processed_recommendation_ids),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Engine Status & Control
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/trading/status")
|
|
async def trading_status() -> dict[str, Any]:
|
|
"""Return current engine state."""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
ps = engine.portfolio_state
|
|
active_pool = ps.active_pool if ps else 0.0
|
|
reserve_pool = ps.reserve_pool if ps else 0.0
|
|
portfolio_heat = ps.portfolio_heat if ps else 0.0
|
|
open_positions = ps.open_position_count if ps else 0
|
|
|
|
return {
|
|
"enabled": engine.config.enabled,
|
|
"paused": not engine.running,
|
|
"risk_tier": engine.config.risk_tier,
|
|
"circuit_breaker_status": "inactive",
|
|
"active_pool": active_pool,
|
|
"reserve_pool": reserve_pool,
|
|
"portfolio_heat": portfolio_heat,
|
|
"open_positions": open_positions,
|
|
"last_decision_at": None,
|
|
}
|
|
|
|
|
|
@app.put("/api/trading/config")
|
|
async def update_config(body: ConfigUpdateRequest) -> dict[str, Any]:
|
|
"""Update trading engine configuration.
|
|
|
|
Returns the previous and new configuration values for audit trail.
|
|
Requirements: 16.6
|
|
"""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
previous: dict[str, Any] = {}
|
|
updated: dict[str, Any] = {}
|
|
|
|
for field_name in body.model_fields_set:
|
|
new_value = getattr(body, field_name)
|
|
old_value = getattr(engine.config, field_name, None)
|
|
previous[field_name] = old_value
|
|
updated[field_name] = new_value
|
|
setattr(engine.config, field_name, new_value)
|
|
|
|
return {
|
|
"previous": previous,
|
|
"updated": updated,
|
|
"change_source": "api",
|
|
"changed_at": datetime.now(tz=timezone.utc).isoformat(),
|
|
}
|
|
|
|
|
|
@app.post("/api/trading/pause")
|
|
async def pause_engine() -> dict[str, bool]:
|
|
"""Pause the trading engine."""
|
|
if engine is not None:
|
|
engine.running = False
|
|
return {"paused": True}
|
|
|
|
|
|
@app.post("/api/trading/resume")
|
|
async def resume_engine() -> dict[str, bool]:
|
|
"""Resume the trading engine."""
|
|
if engine is not None:
|
|
engine.running = True
|
|
return {"paused": False}
|
|
|
|
|
|
@app.put("/api/trading/capital")
|
|
async def set_capital(body: CapitalRequest) -> dict[str, Any]:
|
|
"""Set or reset the paper trading capital.
|
|
|
|
Allocates the given amount as initial capital, splitting between
|
|
active pool and reserve pool based on the current reserve_siphon_pct.
|
|
"""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
capital = body.initial_capital
|
|
if capital <= 0:
|
|
raise HTTPException(400, "initial_capital must be positive")
|
|
|
|
reserve_pct = engine.config.reserve_siphon_pct
|
|
reserve = capital * reserve_pct
|
|
active = capital - reserve
|
|
|
|
ps = engine.portfolio_state
|
|
previous = {
|
|
"total_value": ps.total_value if ps else 0.0,
|
|
"active_pool": ps.active_pool if ps else 0.0,
|
|
"reserve_pool": ps.reserve_pool if ps else 0.0,
|
|
}
|
|
|
|
from services.trading.models import PortfolioState
|
|
engine.portfolio_state = PortfolioState(
|
|
total_value=capital,
|
|
cash=capital,
|
|
active_pool=active,
|
|
reserve_pool=reserve,
|
|
)
|
|
|
|
# Record in reserve pool ledger
|
|
if engine.pool:
|
|
try:
|
|
await engine.pool.execute(
|
|
"INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) "
|
|
"VALUES ($1, $2, 'capital_reset', $3)",
|
|
reserve, reserve,
|
|
f"Capital set to ${capital:,.2f} (active=${active:,.2f}, reserve=${reserve:,.2f})",
|
|
)
|
|
except Exception:
|
|
pass # Non-critical
|
|
|
|
return {
|
|
"initial_capital": capital,
|
|
"active_pool": active,
|
|
"reserve_pool": reserve,
|
|
"reserve_siphon_pct": reserve_pct,
|
|
"previous": previous,
|
|
}
|
|
|
|
|
|
@app.post("/api/trading/capital/adjust")
|
|
async def adjust_capital(body: dict[str, Any]) -> dict[str, Any]:
|
|
"""Add or subtract capital from the active pool without resetting anything.
|
|
|
|
Body: { "amount": 5000 } to add, { "amount": -2000 } to subtract.
|
|
Positions, orders, decisions, and history are untouched.
|
|
"""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
amount = body.get("amount", 0)
|
|
if not isinstance(amount, (int, float)) or amount == 0:
|
|
raise HTTPException(400, "amount must be a non-zero number")
|
|
|
|
ps = engine.portfolio_state
|
|
if ps is None:
|
|
raise HTTPException(400, "No portfolio state — set capital first")
|
|
|
|
new_active = ps.active_pool + amount
|
|
if new_active < 0:
|
|
raise HTTPException(400, f"Cannot subtract ${abs(amount):,.2f} — only ${ps.active_pool:,.2f} available in active pool")
|
|
|
|
previous_active = ps.active_pool
|
|
ps.active_pool = new_active
|
|
ps.cash = ps.cash + amount
|
|
ps.total_value = ps.total_value + amount
|
|
|
|
# Record in reserve pool ledger for audit
|
|
if engine.pool:
|
|
try:
|
|
trigger = "manual_adjustment"
|
|
note = f"{'Added' if amount > 0 else 'Withdrew'} ${abs(amount):,.2f} (active pool: ${previous_active:,.2f} → ${new_active:,.2f})"
|
|
await engine.pool.execute(
|
|
"INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) "
|
|
"VALUES ($1, $2, $3, $4)",
|
|
amount, new_active, trigger, note,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"adjusted": amount,
|
|
"active_pool": ps.active_pool,
|
|
"reserve_pool": ps.reserve_pool,
|
|
"total_value": ps.total_value,
|
|
}
|
|
|
|
|
|
@app.post("/api/trading/reset")
|
|
async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]:
|
|
"""Full paper trading reset: clear all positions, orders, decisions,
|
|
stop levels, and snapshots, then reset capital to the specified amount.
|
|
|
|
This is a destructive operation — all trading history is wiped.
|
|
"""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
capital = body.initial_capital
|
|
if capital <= 0:
|
|
raise HTTPException(400, "initial_capital must be positive")
|
|
|
|
reserve_pct = engine.config.reserve_siphon_pct
|
|
reserve = capital * reserve_pct
|
|
active = capital - reserve
|
|
|
|
# Clear trading state in the database
|
|
if engine.pool:
|
|
try:
|
|
async with engine.pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
# Order matters due to FK constraints
|
|
await conn.execute("DELETE FROM backtest_trades")
|
|
await conn.execute("DELETE FROM backtest_runs")
|
|
await conn.execute("DELETE FROM position_stop_levels")
|
|
await conn.execute("DELETE FROM trading_decisions")
|
|
await conn.execute("DELETE FROM portfolio_snapshots")
|
|
await conn.execute("DELETE FROM reserve_pool_ledger")
|
|
await conn.execute("DELETE FROM risk_tier_history")
|
|
await conn.execute("DELETE FROM circuit_breaker_events")
|
|
await conn.execute("DELETE FROM notifications")
|
|
# Clear orders and their events
|
|
await conn.execute("DELETE FROM order_events")
|
|
await conn.execute("DELETE FROM orders")
|
|
# Re-seed reserve pool ledger
|
|
await conn.execute(
|
|
"INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) "
|
|
"VALUES ($1, $2, 'initial', $3)",
|
|
reserve, reserve,
|
|
f"Paper trading reset to ${capital:,.2f}",
|
|
)
|
|
except Exception:
|
|
logger.exception("Failed to clear trading tables during reset")
|
|
raise HTTPException(500, "Database reset failed")
|
|
|
|
# Reset in-memory engine state
|
|
from services.trading.models import CircuitBreakerState, PortfolioState
|
|
engine.portfolio_state = PortfolioState(
|
|
total_value=capital,
|
|
cash=capital,
|
|
active_pool=active,
|
|
reserve_pool=reserve,
|
|
)
|
|
engine.circuit_breaker_state = CircuitBreakerState()
|
|
|
|
return {
|
|
"reset": True,
|
|
"initial_capital": capital,
|
|
"active_pool": active,
|
|
"reserve_pool": reserve,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decision Audit Trail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/trading/decisions")
|
|
async def list_decisions(
|
|
ticker: Optional[str] = None,
|
|
decision: Optional[str] = None,
|
|
is_micro_trade: Optional[bool] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
) -> list[dict[str, Any]]:
|
|
"""Return recent trading decisions from the database."""
|
|
if engine is None or engine.pool is None:
|
|
return []
|
|
|
|
conditions = ["1=1"]
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if ticker:
|
|
conditions.append(f"ticker = ${idx}")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
if decision:
|
|
conditions.append(f"decision = ${idx}")
|
|
params.append(decision)
|
|
idx += 1
|
|
if is_micro_trade is not None:
|
|
conditions.append(f"is_micro_trade = ${idx}")
|
|
params.append(is_micro_trade)
|
|
idx += 1
|
|
|
|
where = " AND ".join(conditions)
|
|
params.extend([limit, offset])
|
|
|
|
try:
|
|
rows = await engine.pool.fetch(
|
|
f"SELECT id, recommendation_id, decision, skip_reason, ticker, "
|
|
f"computed_position_size, computed_share_quantity, "
|
|
f"risk_tier_at_decision, portfolio_heat_at_decision, "
|
|
f"active_pool_at_decision, reserve_pool_at_decision, "
|
|
f"circuit_breaker_status, is_micro_trade, created_at "
|
|
f"FROM trading_decisions WHERE {where} "
|
|
f"ORDER BY created_at DESC LIMIT ${idx} OFFSET ${idx + 1}",
|
|
*params,
|
|
)
|
|
from decimal import Decimal as _Dec
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
for k, v in d.items():
|
|
if isinstance(v, _Dec):
|
|
d[k] = float(v)
|
|
elif isinstance(v, datetime):
|
|
d[k] = v.isoformat()
|
|
elif hasattr(v, '__str__') and not isinstance(v, (str, int, float, bool, type(None))):
|
|
d[k] = str(v)
|
|
result.append(d)
|
|
return result
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Performance Metrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/trading/metrics")
|
|
async def current_metrics() -> dict[str, Any]:
|
|
"""Return current performance metrics."""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
ps = engine.portfolio_state
|
|
return {
|
|
"total_portfolio_value": ps.total_value if ps else 0.0,
|
|
"active_pool": ps.active_pool if ps else 0.0,
|
|
"reserve_pool": ps.reserve_pool if ps else 0.0,
|
|
"unrealized_pnl": 0.0,
|
|
"realized_pnl": 0.0,
|
|
"daily_pnl": 0.0,
|
|
"win_rate": 0.0,
|
|
"profit_factor": 0.0,
|
|
"sharpe_ratio": 0.0,
|
|
"max_drawdown": 0.0,
|
|
"portfolio_heat": ps.portfolio_heat if ps else 0.0,
|
|
}
|
|
|
|
|
|
@app.get("/api/trading/metrics/history")
|
|
async def metrics_history(
|
|
limit: int = Query(default=30, le=365),
|
|
) -> list[dict[str, Any]]:
|
|
"""Return historical daily portfolio snapshots."""
|
|
if engine is None or engine.pool is None:
|
|
return []
|
|
|
|
try:
|
|
rows = await engine.pool.fetch(
|
|
"SELECT id, snapshot_date, portfolio_value, active_pool, reserve_pool, "
|
|
"daily_return, cumulative_return, unrealized_pnl, realized_pnl, "
|
|
"win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, "
|
|
"current_drawdown_pct, portfolio_heat, risk_tier, created_at "
|
|
"FROM portfolio_snapshots ORDER BY snapshot_date DESC LIMIT $1",
|
|
limit,
|
|
)
|
|
from decimal import Decimal as _Dec
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
for k, v in d.items():
|
|
if isinstance(v, _Dec):
|
|
d[k] = float(v)
|
|
elif isinstance(v, datetime):
|
|
d[k] = v.isoformat()
|
|
elif hasattr(v, '__str__') and not isinstance(v, (str, int, float, bool, type(None))):
|
|
d[k] = str(v)
|
|
result.append(d)
|
|
return result
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Backtesting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.post("/api/trading/backtest")
|
|
async def launch_backtest(body: BacktestRequest) -> dict[str, str]:
|
|
"""Launch a backtest run and return its ID.
|
|
|
|
Task 32.5: Uses BacktestReplay to run the backtest in a background task.
|
|
The backtest_id is pre-generated and passed to BacktestReplay so the
|
|
frontend can poll for results using the same ID.
|
|
"""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
from datetime import date as date_type
|
|
|
|
from services.trading.backtest_replay import BacktestReplay
|
|
from services.trading.backtester import BacktestConfig
|
|
|
|
backtest_id = str(uuid.uuid4())
|
|
|
|
bt_config = BacktestConfig(
|
|
start_date=date_type.fromisoformat(body.start_date),
|
|
end_date=date_type.fromisoformat(body.end_date),
|
|
initial_capital=body.initial_capital,
|
|
risk_tier=body.risk_tier,
|
|
)
|
|
|
|
replay = BacktestReplay(pool=engine.pool)
|
|
|
|
import asyncio
|
|
|
|
async def _run_backtest():
|
|
try:
|
|
await replay.run(bt_config, backtest_id=backtest_id)
|
|
except Exception:
|
|
logger.exception("Backtest failed")
|
|
|
|
asyncio.create_task(_run_backtest())
|
|
return {"id": backtest_id, "status": "running"}
|
|
|
|
|
|
@app.get("/api/trading/backtest/{backtest_id}")
|
|
async def get_backtest(backtest_id: str) -> dict[str, Any]:
|
|
"""Retrieve backtest results from PostgreSQL.
|
|
|
|
Task 32.5: Queries backtest_runs and backtest_trades tables.
|
|
Returns a flat object matching the frontend BacktestResult type.
|
|
"""
|
|
if engine is None or engine.pool is None:
|
|
return {
|
|
"id": backtest_id,
|
|
"status": "pending",
|
|
}
|
|
|
|
try:
|
|
row = await engine.pool.fetchrow(
|
|
"SELECT * FROM backtest_runs WHERE id = $1",
|
|
backtest_id,
|
|
)
|
|
if row is None:
|
|
return {
|
|
"id": backtest_id,
|
|
"status": "not_found",
|
|
}
|
|
|
|
row_dict = dict(row)
|
|
|
|
# Parse equity_curve from JSONB
|
|
equity_curve = row_dict.get("equity_curve", [])
|
|
if isinstance(equity_curve, str):
|
|
import json as _json
|
|
equity_curve = _json.loads(equity_curve)
|
|
|
|
# Fetch trades
|
|
trades = []
|
|
try:
|
|
trade_rows = await engine.pool.fetch(
|
|
"SELECT * FROM backtest_trades WHERE backtest_id = $1 ORDER BY created_at",
|
|
backtest_id,
|
|
)
|
|
for tr in trade_rows:
|
|
trade_dict = dict(tr)
|
|
for key, val in trade_dict.items():
|
|
if isinstance(val, (datetime,)):
|
|
trade_dict[key] = val.isoformat()
|
|
elif isinstance(val, date):
|
|
trade_dict[key] = val.isoformat()
|
|
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, type(None))):
|
|
trade_dict[key] = str(val)
|
|
trades.append(trade_dict)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"id": str(row_dict.get("id", backtest_id)),
|
|
"start_date": str(row_dict.get("start_date", "")),
|
|
"end_date": str(row_dict.get("end_date", "")),
|
|
"initial_capital": row_dict.get("initial_capital"),
|
|
"risk_tier": row_dict.get("risk_tier"),
|
|
"config": row_dict.get("config", {}),
|
|
"total_return": row_dict.get("total_return"),
|
|
"sharpe_ratio": row_dict.get("sharpe_ratio"),
|
|
"max_drawdown": row_dict.get("max_drawdown"),
|
|
"win_rate": row_dict.get("win_rate"),
|
|
"profit_factor": row_dict.get("profit_factor"),
|
|
"trade_count": row_dict.get("trade_count"),
|
|
"equity_curve": equity_curve,
|
|
"trades": trades,
|
|
"status": row_dict.get("status", "unknown"),
|
|
"completed_at": row_dict["completed_at"].isoformat() if row_dict.get("completed_at") else None,
|
|
"created_at": row_dict["created_at"].isoformat() if row_dict.get("created_at") else None,
|
|
}
|
|
except Exception:
|
|
logger.debug("Could not query backtest results — tables may not exist")
|
|
return {
|
|
"id": backtest_id,
|
|
"status": "pending",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Notifications
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/trading/notifications/config")
|
|
async def get_notification_config() -> dict[str, Any]:
|
|
"""Return current notification configuration."""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
return {
|
|
"sms_enabled": bool(engine.config.sns_topic_arn),
|
|
"email_enabled": bool(engine.config.gmail_recipient),
|
|
"phone_number": engine.config.sns_phone_number,
|
|
"email_recipient": engine.config.gmail_recipient,
|
|
}
|
|
|
|
|
|
@app.put("/api/trading/notifications/config")
|
|
async def update_notification_config(
|
|
body: NotificationConfigRequest,
|
|
) -> dict[str, Any]:
|
|
"""Update notification preferences."""
|
|
if engine is None:
|
|
raise HTTPException(503, "Engine not initialised")
|
|
|
|
result: dict[str, Any] = {}
|
|
if body.phone_number is not None:
|
|
engine.config.sns_phone_number = body.phone_number
|
|
result["phone_number"] = body.phone_number
|
|
if body.email_recipient is not None:
|
|
engine.config.gmail_recipient = body.email_recipient
|
|
result["email_recipient"] = body.email_recipient
|
|
|
|
return {"updated": result}
|
|
|
|
|
|
@app.get("/api/trading/notifications/history")
|
|
async def notification_history(
|
|
limit: int = Query(default=50, le=200),
|
|
) -> list[dict[str, Any]]:
|
|
"""Return recent notifications (placeholder)."""
|
|
return []
|