diff --git a/services/adapters/broker_adapter.py b/services/adapters/broker_adapter.py index 8641022..85b30b8 100644 --- a/services/adapters/broker_adapter.py +++ b/services/adapters/broker_adapter.py @@ -276,6 +276,14 @@ class BrokerDataAdapter(BaseAdapter, ABC): """Get account summary (balance, buying power, etc.).""" ... + async def cancel_all_orders(self) -> int: + """Cancel all open orders. Returns the number of orders cancelled.""" + return 0 + + async def close_all_positions(self) -> int: + """Liquidate all open positions. Returns the number of positions closed.""" + return 0 + # --- Concrete Alpaca implementation --- @@ -551,6 +559,50 @@ class AlpacaBrokerAdapter(BrokerDataAdapter): mode=self._mode, ) + async def cancel_all_orders(self) -> int: + """Cancel all open orders on Alpaca. + + Uses DELETE /v2/orders which cancels all open orders in bulk. + Returns the number of orders that were cancelled. + """ + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.delete( + f"{self.base_url}/v2/orders", + headers=self._headers(), + ) + resp.raise_for_status() + # Alpaca returns a list of cancelled order objects (HTTP 207) + data = resp.json() + cancelled = len(data) if isinstance(data, list) else 0 + logger.info("Cancelled %d open orders on Alpaca", cancelled) + return cancelled + except Exception as e: + logger.error("Cancel all orders failed: %s", e) + return 0 + + async def close_all_positions(self) -> int: + """Liquidate all open positions on Alpaca. + + Uses DELETE /v2/positions which closes all positions at market. + Returns the number of positions that were closed. + """ + async with httpx.AsyncClient(timeout=30) as client: + try: + resp = await client.delete( + f"{self.base_url}/v2/positions", + headers=self._headers(), + ) + resp.raise_for_status() + # Alpaca returns a list of closed position order objects (HTTP 207) + data = resp.json() + closed = len(data) if isinstance(data, list) else 0 + logger.info("Closed %d positions on Alpaca", closed) + return closed + except Exception as e: + logger.error("Close all positions failed: %s", e) + return 0 + def _parse_order_response(self, data: dict[str, Any]) -> OrderResponse: """Parse an Alpaca order response into an OrderResponse.""" status_map: dict[str, OrderStatus] = { diff --git a/services/trading/app.py b/services/trading/app.py index a5044a7..d3417aa 100644 --- a/services/trading/app.py +++ b/services/trading/app.py @@ -407,6 +407,9 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]: """Full paper trading reset: clear all positions, orders, decisions, stop levels, and snapshots, then reset capital to the specified amount. + Also liquidates all positions and cancels all open orders on the + broker (Alpaca) so the paper account starts clean. + This is a destructive operation — all trading history is wiped. """ if engine is None: @@ -420,7 +423,32 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]: reserve = capital * reserve_pct active = capital - reserve - # Clear trading state in the database + # --- Reset broker (Alpaca) state --- + broker_result: dict[str, Any] = {"orders_cancelled": 0, "positions_closed": 0} + try: + from services.adapters.broker_adapter import AlpacaBrokerAdapter + from services.shared.config import load_config as _load_config + + broker_cfg = _load_config().broker + if broker_cfg.api_key: + adapter = AlpacaBrokerAdapter( + api_key=broker_cfg.api_key, + api_secret=broker_cfg.api_secret or "", + base_url=broker_cfg.base_url, + ) + broker_result["orders_cancelled"] = await adapter.cancel_all_orders() + broker_result["positions_closed"] = await adapter.close_all_positions() + logger.info( + "Broker reset: cancelled %d orders, closed %d positions", + broker_result["orders_cancelled"], + broker_result["positions_closed"], + ) + else: + logger.info("No broker API key configured — skipping broker reset") + except Exception: + logger.exception("Broker reset failed — continuing with DB/engine reset") + + # --- Clear trading state in the database --- if engine.pool: try: async with engine.pool.acquire() as conn: @@ -438,6 +466,8 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]: # Clear orders and their events await conn.execute("DELETE FROM order_events") await conn.execute("DELETE FROM orders") + # Clear synced positions from broker + await conn.execute("DELETE FROM positions") # Re-seed reserve pool ledger await conn.execute( "INSERT INTO reserve_pool_ledger (amount, balance_after, trigger_type, notes) " @@ -458,12 +488,14 @@ async def reset_paper_trading(body: CapitalRequest) -> dict[str, Any]: reserve_pool=reserve, ) engine.circuit_breaker_state = CircuitBreakerState() + engine.processed_recommendation_ids.clear() return { "reset": True, "initial_capital": capital, "active_pool": active, "reserve_pool": reserve, + "broker": broker_result, }