Files
stonks-oracle/services/adapters/paper_trading.py
T
2026-04-11 12:10:01 -07:00

604 lines
21 KiB
Python

"""Paper trading adapter - local order simulation and state sync.
Implements a fully local paper trading engine that simulates order
execution without requiring a real broker API. Tracks positions,
account balance, fills, and order events in-memory with PostgreSQL
persistence for state sync and audit trail.
Requirements: 8.1, 8.3, 8.5, 2.4
Design: Section 4.9 - Broker Adapter (paper mode)
"""
from __future__ import annotations
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
import asyncpg
from services.adapters.base import AdapterResult
from services.adapters.broker_adapter import (
AccountInfo,
BrokerDataAdapter,
OrderEventType,
OrderRequest,
OrderResponse,
OrderSide,
OrderStatus,
OrderType,
PositionInfo,
TradingMode,
)
logger = logging.getLogger("paper_trading")
# ---------------------------------------------------------------------------
# In-memory paper trading state
# ---------------------------------------------------------------------------
class PaperPosition:
"""Tracks a single paper position."""
def __init__(
self,
ticker: str,
quantity: float = 0.0,
avg_entry_price: float = 0.0,
realized_pnl: float = 0.0,
) -> None:
self.ticker = ticker
self.quantity = quantity
self.avg_entry_price = avg_entry_price
self.realized_pnl = realized_pnl
def apply_fill(self, side: OrderSide, fill_qty: float, fill_price: float) -> float:
"""Apply a fill to this position. Returns realized PnL from the fill."""
realized = 0.0
if side == OrderSide.BUY:
# Buying: average up the entry price
total_cost = self.avg_entry_price * self.quantity + fill_price * fill_qty
self.quantity += fill_qty
if self.quantity > 0:
self.avg_entry_price = total_cost / self.quantity
else:
# Selling: realize PnL on the sold shares
if self.quantity > 0:
sell_qty = min(fill_qty, self.quantity)
realized = sell_qty * (fill_price - self.avg_entry_price)
self.quantity -= sell_qty
self.realized_pnl += realized
if self.quantity <= 0:
self.quantity = 0.0
self.avg_entry_price = 0.0
return realized
@property
def is_open(self) -> bool:
return self.quantity > 0
def to_position_info(self, current_price: float | None = None) -> PositionInfo:
"""Convert to a PositionInfo for the broker interface."""
price = current_price if current_price is not None else self.avg_entry_price
unrealized = (price - self.avg_entry_price) * self.quantity if self.quantity > 0 else 0.0
market_value = price * self.quantity
return PositionInfo(
ticker=self.ticker,
quantity=self.quantity,
avg_entry_price=self.avg_entry_price,
current_price=price,
unrealized_pnl=round(unrealized, 4),
market_value=round(market_value, 4),
side="long" if self.quantity > 0 else "flat",
)
class PaperAccount:
"""In-memory paper trading account state."""
def __init__(
self,
account_id: str = "paper-default",
initial_cash: float = 100_000.0,
) -> None:
self.account_id = account_id
self.initial_cash = initial_cash
self.cash = initial_cash
self.positions: dict[str, PaperPosition] = {}
self.orders: dict[str, OrderResponse] = {}
self.order_events: list[dict[str, Any]] = []
self._seen_idempotency_keys: dict[str, str] = {} # key -> order_id
@property
def portfolio_value(self) -> float:
position_value = sum(
p.quantity * p.avg_entry_price for p in self.positions.values() if p.is_open
)
return self.cash + position_value
@property
def buying_power(self) -> float:
return self.cash
def get_position(self, ticker: str) -> PaperPosition:
if ticker not in self.positions:
self.positions[ticker] = PaperPosition(ticker=ticker)
return self.positions[ticker]
def to_account_info(self) -> AccountInfo:
return AccountInfo(
account_id=self.account_id,
buying_power=round(self.buying_power, 2),
cash=round(self.cash, 2),
portfolio_value=round(self.portfolio_value, 2),
currency="USD",
mode=TradingMode.PAPER,
)
# ---------------------------------------------------------------------------
# Paper trading adapter
# ---------------------------------------------------------------------------
class PaperTradingAdapter(BrokerDataAdapter):
"""Local paper trading adapter that simulates order execution.
All orders are filled immediately at the estimated price (market orders)
or at the limit/stop price when applicable. No real broker API is called.
Features:
- Idempotent order submission via idempotency_key (Req 8.5)
- Full order event trail for audit (Req 8.3)
- Position tracking with average entry price
- Cash balance management
- State sync to/from PostgreSQL
The adapter operates in PAPER mode only and rejects any attempt
to switch to LIVE mode.
"""
def __init__(
self,
account_id: str = "paper-default",
initial_cash: float = 100_000.0,
simulated_slippage_pct: float = 0.001,
) -> None:
super().__init__(mode=TradingMode.PAPER)
self.account = PaperAccount(account_id=account_id, initial_cash=initial_cash)
self.slippage_pct = simulated_slippage_pct
def source_type(self) -> str:
return "broker"
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch paper positions/account as a raw artifact snapshot."""
endpoint = config.get("endpoint", "positions")
now = datetime.now(timezone.utc)
if endpoint == "account":
data = self.account.to_account_info().to_dict()
items = [data]
elif endpoint == "orders":
items = [
resp.to_dict()
for resp in self.account.orders.values()
if resp.ticker == ticker or ticker == "*"
]
else:
pos = self.account.get_position(ticker)
data = pos.to_position_info().to_dict()
items = [data] if pos.is_open else []
raw = json.dumps(items).encode()
return AdapterResult(
source_type="broker",
ticker=ticker,
items=items,
raw_payload=raw,
content_hash="",
fetched_at=now,
metadata={"provider": "paper", "mode": "paper", "endpoint": endpoint},
)
async def submit_order(self, order: OrderRequest) -> OrderResponse:
"""Simulate order submission and immediate fill.
Idempotency: if the same idempotency_key was already used,
return the original response (Req 8.5).
"""
# Idempotency check
existing_id = self.account._seen_idempotency_keys.get(order.idempotency_key)
if existing_id and existing_id in self.account.orders:
logger.info("Duplicate order key %s — returning cached response", order.idempotency_key)
return self.account.orders[existing_id]
now = datetime.now(timezone.utc)
order_id = str(uuid.uuid4())
# Determine fill price based on order type
fill_price = self._compute_fill_price(order)
# Check if we have enough cash for buys
if order.side == OrderSide.BUY:
required_cash = fill_price * order.quantity
if required_cash > self.account.cash:
resp = OrderResponse(
broker_order_id=order_id,
status=OrderStatus.REJECTED,
ticker=order.ticker,
side=order.side,
quantity=order.quantity,
submitted_at=now,
error=f"Insufficient cash: need {required_cash:.2f}, have {self.account.cash:.2f}",
)
self._record_event(order_id, OrderEventType.REJECTED, resp.to_dict(), now)
self.account.orders[order_id] = resp
self.account._seen_idempotency_keys[order.idempotency_key] = order_id
return resp
# Check if we have enough shares for sells
if order.side == OrderSide.SELL:
pos = self.account.get_position(order.ticker)
if pos.quantity < order.quantity:
resp = OrderResponse(
broker_order_id=order_id,
status=OrderStatus.REJECTED,
ticker=order.ticker,
side=order.side,
quantity=order.quantity,
submitted_at=now,
error=f"Insufficient shares: need {order.quantity}, have {pos.quantity}",
)
self._record_event(order_id, OrderEventType.REJECTED, resp.to_dict(), now)
self.account.orders[order_id] = resp
self.account._seen_idempotency_keys[order.idempotency_key] = order_id
return resp
# Simulate immediate fill
position = self.account.get_position(order.ticker)
realized_pnl = position.apply_fill(order.side, order.quantity, fill_price)
# Update cash
if order.side == OrderSide.BUY:
self.account.cash -= fill_price * order.quantity
else:
self.account.cash += fill_price * order.quantity
resp = OrderResponse(
broker_order_id=order_id,
status=OrderStatus.FILLED,
ticker=order.ticker,
side=order.side,
quantity=order.quantity,
filled_quantity=order.quantity,
filled_avg_price=fill_price,
submitted_at=now,
raw_response={
"realized_pnl": round(realized_pnl, 4),
"cash_after": round(self.account.cash, 2),
"position_qty_after": position.quantity,
"simulated": True,
},
)
# Record events
self._record_event(order_id, OrderEventType.SUBMITTED, {"ticker": order.ticker}, now)
self._record_event(order_id, OrderEventType.ACCEPTED, {"ticker": order.ticker}, now)
self._record_event(order_id, OrderEventType.FILL, {
"fill_price": fill_price,
"fill_qty": order.quantity,
"realized_pnl": round(realized_pnl, 4),
}, now)
self.account.orders[order_id] = resp
self.account._seen_idempotency_keys[order.idempotency_key] = order_id
logger.info(
"Paper fill: %s %s %.0f %s @ %.2f | cash=%.2f pnl=%.4f",
order_id[:8], order.side.value, order.quantity,
order.ticker, fill_price, self.account.cash, realized_pnl,
)
return resp
async def cancel_order(self, broker_order_id: str) -> OrderResponse:
"""Cancel a paper order. Only pending orders can be cancelled."""
existing = self.account.orders.get(broker_order_id)
if existing is None:
return OrderResponse(
broker_order_id=broker_order_id,
status=OrderStatus.REJECTED,
ticker="",
side=OrderSide.BUY,
quantity=0,
error=f"Order {broker_order_id} not found",
)
# Paper orders fill immediately, so they can't be cancelled
if existing.status == OrderStatus.FILLED:
return OrderResponse(
broker_order_id=broker_order_id,
status=OrderStatus.REJECTED,
ticker=existing.ticker,
side=existing.side,
quantity=existing.quantity,
error="Cannot cancel a filled order",
)
now = datetime.now(timezone.utc)
cancelled = OrderResponse(
broker_order_id=broker_order_id,
status=OrderStatus.CANCELLED,
ticker=existing.ticker,
side=existing.side,
quantity=existing.quantity,
submitted_at=existing.submitted_at,
)
self.account.orders[broker_order_id] = cancelled
self._record_event(broker_order_id, OrderEventType.CANCELLED, {}, now)
return cancelled
async def get_order_status(self, broker_order_id: str) -> OrderResponse:
"""Get the status of a paper order."""
existing = self.account.orders.get(broker_order_id)
if existing is None:
return OrderResponse(
broker_order_id=broker_order_id,
status=OrderStatus.REJECTED,
ticker="",
side=OrderSide.BUY,
quantity=0,
error=f"Order {broker_order_id} not found",
)
return existing
async def get_positions(self) -> list[PositionInfo]:
"""Get all open paper positions."""
return [
p.to_position_info()
for p in self.account.positions.values()
if p.is_open
]
async def get_account(self) -> AccountInfo:
"""Get paper account summary."""
return self.account.to_account_info()
# -----------------------------------------------------------------------
# Internal helpers
# -----------------------------------------------------------------------
def _compute_fill_price(self, order: OrderRequest) -> float:
"""Determine the simulated fill price for an order.
Market orders use the limit_price as a proxy (or 0 if not set).
Limit orders fill at the limit price.
Stop orders fill at the stop price.
A small slippage is applied to market orders.
"""
if order.order_type == OrderType.LIMIT and order.limit_price is not None:
return order.limit_price
if order.order_type == OrderType.STOP and order.stop_price is not None:
return order.stop_price
if order.order_type == OrderType.STOP_LIMIT and order.limit_price is not None:
return order.limit_price
# Market order: use limit_price as estimate, or a default
base_price = order.limit_price if order.limit_price is not None else 100.0
if order.side == OrderSide.BUY:
return round(base_price * (1 + self.slippage_pct), 4)
return round(base_price * (1 - self.slippage_pct), 4)
def _record_event(
self,
order_id: str,
event_type: OrderEventType,
data: dict[str, Any],
timestamp: datetime,
) -> None:
"""Record an order event for audit trail."""
self.account.order_events.append({
"order_id": order_id,
"event_type": event_type.value,
"data": data,
"timestamp": timestamp.isoformat(),
})
# ---------------------------------------------------------------------------
# State sync: persist and restore paper trading state to/from PostgreSQL
# ---------------------------------------------------------------------------
# SQL for persisting paper orders to the orders table
_INSERT_PAPER_ORDER = """
INSERT INTO orders (
id, recommendation_id, broker_account_id, ticker, side, order_type,
quantity, limit_price, stop_price, status, idempotency_key,
broker_order_id, decision_trace, submitted_at, filled_at,
fill_price, fill_quantity
) VALUES (
$1::uuid, $2, $3, $4, $5, $6,
$7, $8, $9, $10, $11,
$12, $13::jsonb, $14, $15,
$16, $17
)
ON CONFLICT (idempotency_key) DO NOTHING
"""
_INSERT_PAPER_ORDER_EVENT = """
INSERT INTO order_events (order_id, event_type, data, broker_timestamp)
VALUES ($1::uuid, $2, $3::jsonb, $4)
"""
_UPSERT_PAPER_POSITION = """
INSERT INTO positions (broker_account_id, ticker, quantity, avg_entry_price, realized_pnl, updated_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (broker_account_id, ticker)
DO UPDATE SET
quantity = EXCLUDED.quantity,
avg_entry_price = EXCLUDED.avg_entry_price,
realized_pnl = EXCLUDED.realized_pnl,
updated_at = EXCLUDED.updated_at
"""
_UPSERT_PAPER_ACCOUNT = """
INSERT INTO broker_accounts (id, provider, account_id, mode, config, active)
VALUES ($1::uuid, 'paper', $2, 'paper', $3::jsonb, TRUE)
ON CONFLICT (id) DO UPDATE SET
config = EXCLUDED.config,
active = TRUE
"""
_LOAD_PAPER_POSITIONS = """
SELECT ticker, quantity, avg_entry_price, COALESCE(realized_pnl, 0) AS realized_pnl
FROM positions
WHERE broker_account_id = $1 AND quantity > 0
"""
_LOAD_PAPER_ACCOUNT_CONFIG = """
SELECT config FROM broker_accounts
WHERE account_id = $1 AND mode = 'paper' AND active = TRUE
LIMIT 1
"""
_LOAD_PAPER_ORDERS = """
SELECT
id, ticker, side, order_type, quantity, status,
idempotency_key, broker_order_id, fill_price, fill_quantity,
submitted_at
FROM orders
WHERE broker_account_id = (
SELECT id FROM broker_accounts WHERE account_id = $1 AND mode = 'paper' LIMIT 1
)
ORDER BY submitted_at DESC
LIMIT 500
"""
async def sync_state_to_db(
adapter: PaperTradingAdapter,
pool: asyncpg.Pool,
broker_account_uuid: str | None = None,
) -> None:
"""Persist the current paper trading state to PostgreSQL.
Writes:
- broker_accounts row for the paper account
- positions rows for all open positions
- orders rows for all orders (idempotent via ON CONFLICT)
- order_events for audit trail
This enables state recovery after restarts and provides the
full execution audit trail (Requirement 8.3).
"""
acct = adapter.account
now = datetime.now(timezone.utc)
acct_uuid = broker_account_uuid or str(uuid.uuid5(uuid.NAMESPACE_DNS, acct.account_id))
async with pool.acquire() as conn:
async with conn.transaction():
# 1. Upsert broker account
config_json = json.dumps({
"initial_cash": acct.initial_cash,
"current_cash": round(acct.cash, 2),
"portfolio_value": round(acct.portfolio_value, 2),
"slippage_pct": adapter.slippage_pct,
})
await conn.execute(_UPSERT_PAPER_ACCOUNT, acct_uuid, acct.account_id, config_json)
# 2. Upsert positions
for ticker, pos in acct.positions.items():
await conn.execute(
_UPSERT_PAPER_POSITION,
acct_uuid, ticker,
pos.quantity, pos.avg_entry_price, pos.realized_pnl,
now,
)
# 3. Insert orders (idempotent)
for order_id, resp in acct.orders.items():
filled_at = now if resp.status == OrderStatus.FILLED else None
await conn.execute(
_INSERT_PAPER_ORDER,
order_id,
None, # recommendation_id
acct_uuid,
resp.ticker,
resp.side.value,
"market", # paper orders are always market-simulated
resp.quantity,
resp.filled_avg_price, # limit_price
None, # stop_price
resp.status.value,
order_id, # use order_id as idempotency_key fallback
order_id,
json.dumps(resp.raw_response),
resp.submitted_at,
filled_at,
resp.filled_avg_price,
resp.filled_quantity,
)
# 4. Insert order events
for event in acct.order_events:
await conn.execute(
_INSERT_PAPER_ORDER_EVENT,
event["order_id"],
event["event_type"],
json.dumps(event["data"]),
datetime.fromisoformat(event["timestamp"]),
)
logger.info(
"Synced paper state to DB: account=%s positions=%d orders=%d events=%d",
acct.account_id, len(acct.positions), len(acct.orders), len(acct.order_events),
)
# Clear events after sync to avoid re-inserting
acct.order_events.clear()
async def load_state_from_db(
adapter: PaperTradingAdapter,
pool: asyncpg.Pool,
) -> bool:
"""Restore paper trading state from PostgreSQL.
Loads positions and account config from the DB so the adapter
can resume after a restart. Returns True if state was found.
"""
acct = adapter.account
async with pool.acquire() as conn:
# Load account config
row = await conn.fetchrow(_LOAD_PAPER_ACCOUNT_CONFIG, acct.account_id)
if row is None:
logger.info("No saved paper account state for %s", acct.account_id)
return False
config = json.loads(row["config"]) if isinstance(row["config"], str) else row["config"]
acct.cash = float(config.get("current_cash", acct.initial_cash))
# Load positions
pos_rows = await conn.fetch(_LOAD_PAPER_POSITIONS, acct.account_id)
for pr in pos_rows:
ticker = pr["ticker"]
acct.positions[ticker] = PaperPosition(
ticker=ticker,
quantity=float(pr["quantity"]),
avg_entry_price=float(pr["avg_entry_price"] or 0),
realized_pnl=float(pr["realized_pnl"]),
)
logger.info(
"Loaded paper state from DB: account=%s cash=%.2f positions=%d",
acct.account_id, acct.cash, len(acct.positions),
)
return True