feat: reset endpoint now liquidates Alpaca positions and cancels orders
- Added cancel_all_orders() and close_all_positions() to AlpacaBrokerAdapter - Reset endpoint creates a temporary adapter to call Alpaca DELETE /v2/orders and DELETE /v2/positions before clearing DB and engine state - Also clears positions table and processed_recommendation_ids on reset - Broker reset is best-effort — DB/engine reset proceeds even if Alpaca fails
This commit is contained in:
@@ -276,6 +276,14 @@ class BrokerDataAdapter(BaseAdapter, ABC):
|
||||
"""Get account summary (balance, buying power, etc.)."""
|
||||
...
|
||||
|
||||
async def cancel_all_orders(self) -> int:
|
||||
"""Cancel all open orders. Returns the number of orders cancelled."""
|
||||
return 0
|
||||
|
||||
async def close_all_positions(self) -> int:
|
||||
"""Liquidate all open positions. Returns the number of positions closed."""
|
||||
return 0
|
||||
|
||||
|
||||
# --- Concrete Alpaca implementation ---
|
||||
|
||||
@@ -551,6 +559,50 @@ class AlpacaBrokerAdapter(BrokerDataAdapter):
|
||||
mode=self._mode,
|
||||
)
|
||||
|
||||
async def cancel_all_orders(self) -> int:
|
||||
"""Cancel all open orders on Alpaca.
|
||||
|
||||
Uses DELETE /v2/orders which cancels all open orders in bulk.
|
||||
Returns the number of orders that were cancelled.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
try:
|
||||
resp = await client.delete(
|
||||
f"{self.base_url}/v2/orders",
|
||||
headers=self._headers(),
|
||||
)
|
||||
resp.raise_for_status()
|
||||
# Alpaca returns a list of cancelled order objects (HTTP 207)
|
||||
data = resp.json()
|
||||
cancelled = len(data) if isinstance(data, list) else 0
|
||||
logger.info("Cancelled %d open orders on Alpaca", cancelled)
|
||||
return cancelled
|
||||
except Exception as e:
|
||||
logger.error("Cancel all orders failed: %s", e)
|
||||
return 0
|
||||
|
||||
async def close_all_positions(self) -> int:
|
||||
"""Liquidate all open positions on Alpaca.
|
||||
|
||||
Uses DELETE /v2/positions which closes all positions at market.
|
||||
Returns the number of positions that were closed.
|
||||
"""
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
try:
|
||||
resp = await client.delete(
|
||||
f"{self.base_url}/v2/positions",
|
||||
headers=self._headers(),
|
||||
)
|
||||
resp.raise_for_status()
|
||||
# Alpaca returns a list of closed position order objects (HTTP 207)
|
||||
data = resp.json()
|
||||
closed = len(data) if isinstance(data, list) else 0
|
||||
logger.info("Closed %d positions on Alpaca", closed)
|
||||
return closed
|
||||
except Exception as e:
|
||||
logger.error("Close all positions failed: %s", e)
|
||||
return 0
|
||||
|
||||
def _parse_order_response(self, data: dict[str, Any]) -> OrderResponse:
|
||||
"""Parse an Alpaca order response into an OrderResponse."""
|
||||
status_map: dict[str, OrderStatus] = {
|
||||
|
||||
+33
-1
@@ -407,6 +407,9 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]:
|
||||
"""Full paper trading reset: clear all positions, orders, decisions,
|
||||
stop levels, and snapshots, then reset capital to the specified amount.
|
||||
|
||||
Also liquidates all positions and cancels all open orders on the
|
||||
broker (Alpaca) so the paper account starts clean.
|
||||
|
||||
This is a destructive operation — all trading history is wiped.
|
||||
"""
|
||||
if engine is None:
|
||||
@@ -420,7 +423,32 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]:
|
||||
reserve = capital * reserve_pct
|
||||
active = capital - reserve
|
||||
|
||||
# Clear trading state in the database
|
||||
# --- Reset broker (Alpaca) state ---
|
||||
broker_result: dict[str, Any] = {"orders_cancelled": 0, "positions_closed": 0}
|
||||
try:
|
||||
from services.adapters.broker_adapter import AlpacaBrokerAdapter
|
||||
from services.shared.config import load_config as _load_config
|
||||
|
||||
broker_cfg = _load_config().broker
|
||||
if broker_cfg.api_key:
|
||||
adapter = AlpacaBrokerAdapter(
|
||||
api_key=broker_cfg.api_key,
|
||||
api_secret=broker_cfg.api_secret or "",
|
||||
base_url=broker_cfg.base_url,
|
||||
)
|
||||
broker_result["orders_cancelled"] = await adapter.cancel_all_orders()
|
||||
broker_result["positions_closed"] = await adapter.close_all_positions()
|
||||
logger.info(
|
||||
"Broker reset: cancelled %d orders, closed %d positions",
|
||||
broker_result["orders_cancelled"],
|
||||
broker_result["positions_closed"],
|
||||
)
|
||||
else:
|
||||
logger.info("No broker API key configured — skipping broker reset")
|
||||
except Exception:
|
||||
logger.exception("Broker reset failed — continuing with DB/engine reset")
|
||||
|
||||
# --- Clear trading state in the database ---
|
||||
if engine.pool:
|
||||
try:
|
||||
async with engine.pool.acquire() as conn:
|
||||
@@ -438,6 +466,8 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]:
|
||||
# Clear orders and their events
|
||||
await conn.execute("DELETE FROM order_events")
|
||||
await conn.execute("DELETE FROM orders")
|
||||
# Clear synced positions from broker
|
||||
await conn.execute("DELETE FROM positions")
|
||||
# Re-seed reserve pool ledger
|
||||
await conn.execute(
|
||||
"INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) "
|
||||
@@ -458,12 +488,14 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]:
|
||||
reserve_pool=reserve,
|
||||
)
|
||||
engine.circuit_breaker_state = CircuitBreakerState()
|
||||
engine.processed_recommendation_ids.clear()
|
||||
|
||||
return {
|
||||
"reset": True,
|
||||
"initial_capital": capital,
|
||||
"active_pool": active,
|
||||
"reserve_pool": reserve,
|
||||
"broker": broker_result,
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user