6eda988e3b
The alpaca.url config file contains https://paper-api.alpaca.markets/v2 but the adapter code also prepends /v2/ to all paths, resulting in /v2/v2/positions which returns 404. Now strips trailing /v2 or /v1 from the configured base URL since the adapter manages API versioning. This was causing 1,017 consecutive broker sync failures.
611 lines
21 KiB
Python
611 lines
21 KiB
Python
"""Broker API adapter interface for paper trading and order events.
|
|
|
|
The BrokerDataAdapter is the abstract interface for all broker integrations.
|
|
AlpacaBrokerAdapter is the first concrete implementation, targeting the
|
|
Alpaca Markets REST API for paper and live trading.
|
|
|
|
Requirements: 2.4, 2.5, 8.1, 8.3, 8.5
|
|
"""
|
|
import hashlib
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from abc import ABC, abstractmethod
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .base import AdapterResult, BaseAdapter
|
|
|
|
logger = logging.getLogger("broker_adapter")
|
|
|
|
|
|
# --- Broker-specific enums ---
|
|
|
|
|
|
class OrderSide(str, Enum):
|
|
BUY = "buy"
|
|
SELL = "sell"
|
|
|
|
|
|
class OrderType(str, Enum):
|
|
MARKET = "market"
|
|
LIMIT = "limit"
|
|
STOP = "stop"
|
|
STOP_LIMIT = "stop_limit"
|
|
|
|
|
|
class OrderStatus(str, Enum):
|
|
PENDING = "pending"
|
|
SUBMITTED = "submitted"
|
|
ACCEPTED = "accepted"
|
|
PARTIALLY_FILLED = "partially_filled"
|
|
FILLED = "filled"
|
|
CANCELLED = "cancelled"
|
|
REJECTED = "rejected"
|
|
EXPIRED = "expired"
|
|
|
|
|
|
class TradingMode(str, Enum):
|
|
PAPER = "paper"
|
|
LIVE = "live"
|
|
|
|
|
|
class OrderEventType(str, Enum):
|
|
SUBMITTED = "submitted"
|
|
ACCEPTED = "accepted"
|
|
REJECTED = "rejected"
|
|
FILL = "fill"
|
|
PARTIAL_FILL = "partial_fill"
|
|
CANCELLED = "cancelled"
|
|
EXPIRED = "expired"
|
|
|
|
|
|
# --- Data structures ---
|
|
|
|
|
|
class OrderRequest:
|
|
"""Represents an order to be submitted to a broker."""
|
|
|
|
def __init__(
|
|
self,
|
|
ticker: str,
|
|
side: OrderSide,
|
|
quantity: float,
|
|
order_type: OrderType = OrderType.MARKET,
|
|
limit_price: float | None = None,
|
|
stop_price: float | None = None,
|
|
time_in_force: str = "day",
|
|
idempotency_key: str | None = None,
|
|
) -> None:
|
|
self.ticker = ticker
|
|
self.side = side
|
|
self.quantity = quantity
|
|
self.order_type = order_type
|
|
self.limit_price = limit_price
|
|
self.stop_price = stop_price
|
|
self.time_in_force = time_in_force
|
|
self.idempotency_key = idempotency_key or str(uuid.uuid4())
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Serialize to a dict for audit/persistence."""
|
|
d: dict[str, Any] = {
|
|
"ticker": self.ticker,
|
|
"side": self.side.value,
|
|
"quantity": self.quantity,
|
|
"order_type": self.order_type.value,
|
|
"time_in_force": self.time_in_force,
|
|
"idempotency_key": self.idempotency_key,
|
|
}
|
|
if self.limit_price is not None:
|
|
d["limit_price"] = self.limit_price
|
|
if self.stop_price is not None:
|
|
d["stop_price"] = self.stop_price
|
|
return d
|
|
|
|
|
|
class OrderResponse:
|
|
"""Represents a broker's response to an order submission."""
|
|
|
|
def __init__(
|
|
self,
|
|
broker_order_id: str,
|
|
status: OrderStatus,
|
|
ticker: str,
|
|
side: OrderSide,
|
|
quantity: float,
|
|
filled_quantity: float = 0.0,
|
|
filled_avg_price: float | None = None,
|
|
submitted_at: datetime | None = None,
|
|
raw_response: dict[str, Any] | None = None,
|
|
error: str | None = None,
|
|
) -> None:
|
|
self.broker_order_id = broker_order_id
|
|
self.status = status
|
|
self.ticker = ticker
|
|
self.side = side
|
|
self.quantity = quantity
|
|
self.filled_quantity = filled_quantity
|
|
self.filled_avg_price = filled_avg_price
|
|
self.submitted_at = submitted_at or datetime.now(timezone.utc)
|
|
self.raw_response = raw_response or {}
|
|
self.error = error
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return self.error is None and self.status not in (
|
|
OrderStatus.REJECTED,
|
|
OrderStatus.CANCELLED,
|
|
OrderStatus.EXPIRED,
|
|
)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"broker_order_id": self.broker_order_id,
|
|
"status": self.status.value,
|
|
"ticker": self.ticker,
|
|
"side": self.side.value,
|
|
"quantity": self.quantity,
|
|
"filled_quantity": self.filled_quantity,
|
|
"filled_avg_price": self.filled_avg_price,
|
|
"submitted_at": self.submitted_at.isoformat(),
|
|
"error": self.error,
|
|
}
|
|
|
|
|
|
class PositionInfo:
|
|
"""Represents a current position from the broker."""
|
|
|
|
def __init__(
|
|
self,
|
|
ticker: str,
|
|
quantity: float,
|
|
avg_entry_price: float,
|
|
current_price: float,
|
|
unrealized_pnl: float,
|
|
market_value: float,
|
|
side: str = "long",
|
|
) -> None:
|
|
self.ticker = ticker
|
|
self.quantity = quantity
|
|
self.avg_entry_price = avg_entry_price
|
|
self.current_price = current_price
|
|
self.unrealized_pnl = unrealized_pnl
|
|
self.market_value = market_value
|
|
self.side = side
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"ticker": self.ticker,
|
|
"quantity": self.quantity,
|
|
"avg_entry_price": self.avg_entry_price,
|
|
"current_price": self.current_price,
|
|
"unrealized_pnl": self.unrealized_pnl,
|
|
"market_value": self.market_value,
|
|
"side": self.side,
|
|
}
|
|
|
|
|
|
class AccountInfo:
|
|
"""Represents broker account summary."""
|
|
|
|
def __init__(
|
|
self,
|
|
account_id: str,
|
|
buying_power: float,
|
|
cash: float,
|
|
portfolio_value: float,
|
|
currency: str = "USD",
|
|
mode: TradingMode = TradingMode.PAPER,
|
|
) -> None:
|
|
self.account_id = account_id
|
|
self.buying_power = buying_power
|
|
self.cash = cash
|
|
self.portfolio_value = portfolio_value
|
|
self.currency = currency
|
|
self.mode = mode
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"account_id": self.account_id,
|
|
"buying_power": self.buying_power,
|
|
"cash": self.cash,
|
|
"portfolio_value": self.portfolio_value,
|
|
"currency": self.currency,
|
|
"mode": self.mode.value,
|
|
}
|
|
|
|
|
|
# --- Abstract interface ---
|
|
|
|
|
|
class BrokerDataAdapter(BaseAdapter, ABC):
|
|
"""Abstract interface for broker API integrations.
|
|
|
|
Extends BaseAdapter with broker-specific operations:
|
|
- submit_order: place an order with idempotency key
|
|
- cancel_order: cancel an existing order
|
|
- get_order_status: check order state
|
|
- get_positions: list current positions
|
|
- get_account: retrieve account summary
|
|
|
|
All concrete adapters must enforce:
|
|
- Idempotent order submission via idempotency_key (Req 8.5)
|
|
- Paper/live mode separation (Req 8.1)
|
|
- Fail-closed on broker unavailability (Req 8.5)
|
|
"""
|
|
|
|
def __init__(self, mode: TradingMode = TradingMode.PAPER) -> None:
|
|
self._mode = mode
|
|
|
|
@property
|
|
def mode(self) -> TradingMode:
|
|
return self._mode
|
|
|
|
def source_type(self) -> str:
|
|
return "broker"
|
|
|
|
@abstractmethod
|
|
async def submit_order(self, order: OrderRequest) -> OrderResponse:
|
|
"""Submit an order to the broker.
|
|
|
|
Must use order.idempotency_key to prevent duplicate submissions.
|
|
Must fail closed if the broker is unavailable or returns ambiguous state.
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def cancel_order(self, broker_order_id: str) -> OrderResponse:
|
|
"""Cancel an existing order by broker order ID."""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def get_order_status(self, broker_order_id: str) -> OrderResponse:
|
|
"""Get the current status of an order."""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def get_positions(self) -> list[PositionInfo]:
|
|
"""Get all current positions."""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def get_account(self) -> AccountInfo:
|
|
"""Get account summary (balance, buying power, etc.)."""
|
|
...
|
|
|
|
|
|
# --- Concrete Alpaca implementation ---
|
|
|
|
|
|
class AlpacaBrokerAdapter(BrokerDataAdapter):
|
|
"""Concrete broker adapter for the Alpaca Markets REST API.
|
|
|
|
Supports:
|
|
- Paper trading via paper-api.alpaca.markets
|
|
- Live trading via api.alpaca.markets
|
|
- Order submission, cancellation, and status
|
|
- Position and account queries
|
|
|
|
Config options for fetch():
|
|
endpoint: One of "positions", "orders", "account" (default "positions")
|
|
"""
|
|
|
|
PAPER_BASE_URL: str = "https://paper-api.alpaca.markets"
|
|
LIVE_BASE_URL: str = "https://api.alpaca.markets"
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
api_secret: str,
|
|
mode: TradingMode = TradingMode.PAPER,
|
|
base_url: str | None = None,
|
|
) -> None:
|
|
super().__init__(mode=mode)
|
|
self.api_key = api_key
|
|
self.api_secret = api_secret
|
|
if base_url:
|
|
self.base_url = base_url.rstrip("/")
|
|
# Strip trailing /v2 or /v1 — the adapter adds API version prefixes itself
|
|
for suffix in ("/v2", "/v1"):
|
|
if self.base_url.endswith(suffix):
|
|
self.base_url = self.base_url[: -len(suffix)]
|
|
break
|
|
elif mode == TradingMode.LIVE:
|
|
self.base_url = self.LIVE_BASE_URL
|
|
else:
|
|
self.base_url = self.PAPER_BASE_URL
|
|
|
|
def _headers(self) -> dict[str, str]:
|
|
return {
|
|
"APCA-API-KEY-ID": self.api_key,
|
|
"APCA-API-SECRET-KEY": self.api_secret,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
|
|
"""Fetch positions or recent orders for a ticker from Alpaca.
|
|
|
|
This satisfies the BaseAdapter contract for the ingestion pipeline.
|
|
The broker adapter uses fetch() to pull position/order snapshots
|
|
that get persisted as raw artifacts.
|
|
"""
|
|
endpoint = config.get("endpoint", "positions")
|
|
url = self._build_fetch_url(ticker, endpoint)
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
t0 = time.monotonic()
|
|
try:
|
|
resp = await client.get(url, headers=self._headers())
|
|
elapsed_ms = (time.monotonic() - t0) * 1000
|
|
resp.raise_for_status()
|
|
|
|
raw = resp.content
|
|
data = resp.json()
|
|
content_hash = hashlib.sha256(raw).hexdigest()
|
|
items = [data] if isinstance(data, dict) else data if isinstance(data, list) else []
|
|
|
|
return AdapterResult(
|
|
source_type="broker",
|
|
ticker=ticker,
|
|
items=items,
|
|
raw_payload=raw,
|
|
content_hash=content_hash,
|
|
fetched_at=datetime.now(timezone.utc),
|
|
http_status=resp.status_code,
|
|
response_time_ms=round(elapsed_ms, 1),
|
|
metadata={
|
|
"provider": "alpaca",
|
|
"mode": self._mode.value,
|
|
"endpoint": endpoint,
|
|
},
|
|
)
|
|
except httpx.HTTPStatusError as e:
|
|
elapsed_ms = (time.monotonic() - t0) * 1000
|
|
logger.error("Alpaca HTTP error for %s: %s", ticker, e)
|
|
return self._error_result(
|
|
ticker, str(e), elapsed_ms,
|
|
http_status=e.response.status_code if e.response else None,
|
|
raw=e.response.content if e.response else b"",
|
|
)
|
|
except Exception as e:
|
|
elapsed_ms = (time.monotonic() - t0) * 1000
|
|
logger.error("Alpaca fetch failed for %s: %s", ticker, e)
|
|
return self._error_result(ticker, str(e), elapsed_ms)
|
|
|
|
def _build_fetch_url(self, ticker: str, endpoint: str) -> str:
|
|
"""Build the URL for a fetch operation."""
|
|
if endpoint == "orders":
|
|
return f"{self.base_url}/v2/orders?symbols={ticker}&status=all&limit=50"
|
|
if endpoint == "account":
|
|
return f"{self.base_url}/v2/account"
|
|
# Default: positions for ticker
|
|
return f"{self.base_url}/v2/positions/{ticker}"
|
|
|
|
async def submit_order(self, order: OrderRequest) -> OrderResponse:
|
|
"""Submit an order to Alpaca with idempotency key.
|
|
|
|
Fails closed: any network error or ambiguous response returns
|
|
a rejected OrderResponse rather than risking duplicate orders.
|
|
"""
|
|
if self._mode == TradingMode.LIVE:
|
|
logger.warning("LIVE order submission: %s %s %s", order.side.value, order.quantity, order.ticker)
|
|
|
|
payload: dict[str, Any] = {
|
|
"symbol": order.ticker,
|
|
"qty": str(order.quantity),
|
|
"side": order.side.value,
|
|
"type": order.order_type.value,
|
|
"time_in_force": order.time_in_force,
|
|
}
|
|
if order.limit_price is not None and order.order_type in (OrderType.LIMIT, OrderType.STOP_LIMIT):
|
|
payload["limit_price"] = str(order.limit_price)
|
|
if order.stop_price is not None and order.order_type in (OrderType.STOP, OrderType.STOP_LIMIT):
|
|
payload["stop_price"] = str(order.stop_price)
|
|
|
|
headers = {**self._headers(), "Idempotency-Key": order.idempotency_key}
|
|
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
try:
|
|
resp = await client.post(
|
|
f"{self.base_url}/v2/orders",
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return self._parse_order_response(data)
|
|
except httpx.HTTPStatusError as e:
|
|
error_body = e.response.text if e.response else "unknown"
|
|
logger.error("Order rejected by Alpaca: %s", error_body)
|
|
return OrderResponse(
|
|
broker_order_id="",
|
|
status=OrderStatus.REJECTED,
|
|
ticker=order.ticker,
|
|
side=order.side,
|
|
quantity=order.quantity,
|
|
error=f"HTTP {e.response.status_code}: {error_body}" if e.response else str(e),
|
|
raw_response={"error": error_body},
|
|
)
|
|
except Exception as e:
|
|
# Fail closed: treat any unexpected error as rejection
|
|
logger.error("Order submission failed (fail-closed): %s", e)
|
|
return OrderResponse(
|
|
broker_order_id="",
|
|
status=OrderStatus.REJECTED,
|
|
ticker=order.ticker,
|
|
side=order.side,
|
|
quantity=order.quantity,
|
|
error=f"fail-closed: {e}",
|
|
)
|
|
|
|
async def cancel_order(self, broker_order_id: str) -> OrderResponse:
|
|
"""Cancel an order on Alpaca."""
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
try:
|
|
resp = await client.delete(
|
|
f"{self.base_url}/v2/orders/{broker_order_id}",
|
|
headers=self._headers(),
|
|
)
|
|
if resp.status_code == 204:
|
|
return OrderResponse(
|
|
broker_order_id=broker_order_id,
|
|
status=OrderStatus.CANCELLED,
|
|
ticker="",
|
|
side=OrderSide.BUY,
|
|
quantity=0,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return self._parse_order_response(data)
|
|
except Exception as e:
|
|
logger.error("Cancel failed for %s: %s", broker_order_id, e)
|
|
return OrderResponse(
|
|
broker_order_id=broker_order_id,
|
|
status=OrderStatus.REJECTED,
|
|
ticker="",
|
|
side=OrderSide.BUY,
|
|
quantity=0,
|
|
error=str(e),
|
|
)
|
|
|
|
async def get_order_status(self, broker_order_id: str) -> OrderResponse:
|
|
"""Get order status from Alpaca."""
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{self.base_url}/v2/orders/{broker_order_id}",
|
|
headers=self._headers(),
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return self._parse_order_response(data)
|
|
except Exception as e:
|
|
logger.error("Get order status failed for %s: %s", broker_order_id, e)
|
|
return OrderResponse(
|
|
broker_order_id=broker_order_id,
|
|
status=OrderStatus.REJECTED,
|
|
ticker="",
|
|
side=OrderSide.BUY,
|
|
quantity=0,
|
|
error=str(e),
|
|
)
|
|
|
|
async def get_positions(self) -> list[PositionInfo]:
|
|
"""Get all current positions from Alpaca."""
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{self.base_url}/v2/positions",
|
|
headers=self._headers(),
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not isinstance(data, list):
|
|
return []
|
|
return [self._parse_position(p) for p in data if isinstance(p, dict)]
|
|
except Exception as e:
|
|
logger.error("Get positions failed: %s", e)
|
|
return []
|
|
|
|
async def get_account(self) -> AccountInfo:
|
|
"""Get account summary from Alpaca."""
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
try:
|
|
resp = await client.get(
|
|
f"{self.base_url}/v2/account",
|
|
headers=self._headers(),
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
return AccountInfo(
|
|
account_id=str(data.get("id", "")),
|
|
buying_power=float(data.get("buying_power", 0)),
|
|
cash=float(data.get("cash", 0)),
|
|
portfolio_value=float(data.get("portfolio_value", 0)),
|
|
currency=str(data.get("currency", "USD")),
|
|
mode=self._mode,
|
|
)
|
|
except Exception as e:
|
|
logger.error("Get account failed: %s", e)
|
|
return AccountInfo(
|
|
account_id="",
|
|
buying_power=0,
|
|
cash=0,
|
|
portfolio_value=0,
|
|
mode=self._mode,
|
|
)
|
|
|
|
def _parse_order_response(self, data: dict[str, Any]) -> OrderResponse:
|
|
"""Parse an Alpaca order response into an OrderResponse."""
|
|
status_map: dict[str, OrderStatus] = {
|
|
"new": OrderStatus.SUBMITTED,
|
|
"accepted": OrderStatus.ACCEPTED,
|
|
"partially_filled": OrderStatus.PARTIALLY_FILLED,
|
|
"filled": OrderStatus.FILLED,
|
|
"done_for_day": OrderStatus.FILLED,
|
|
"canceled": OrderStatus.CANCELLED,
|
|
"expired": OrderStatus.EXPIRED,
|
|
"replaced": OrderStatus.SUBMITTED,
|
|
"pending_new": OrderStatus.PENDING,
|
|
"pending_cancel": OrderStatus.PENDING,
|
|
"pending_replace": OrderStatus.PENDING,
|
|
"rejected": OrderStatus.REJECTED,
|
|
}
|
|
raw_status = str(data.get("status", "pending"))
|
|
status = status_map.get(raw_status, OrderStatus.PENDING)
|
|
|
|
side_str = str(data.get("side", "buy"))
|
|
side = OrderSide.SELL if side_str == "sell" else OrderSide.BUY
|
|
|
|
filled_qty = float(data.get("filled_qty", 0) or 0)
|
|
filled_avg = data.get("filled_avg_price")
|
|
filled_avg_price = float(filled_avg) if filled_avg else None
|
|
|
|
return OrderResponse(
|
|
broker_order_id=str(data.get("id", "")),
|
|
status=status,
|
|
ticker=str(data.get("symbol", "")),
|
|
side=side,
|
|
quantity=float(data.get("qty", 0) or 0),
|
|
filled_quantity=filled_qty,
|
|
filled_avg_price=filled_avg_price,
|
|
raw_response=data,
|
|
)
|
|
|
|
def _parse_position(self, data: dict[str, Any]) -> PositionInfo:
|
|
"""Parse an Alpaca position response into a PositionInfo."""
|
|
return PositionInfo(
|
|
ticker=str(data.get("symbol", "")),
|
|
quantity=float(data.get("qty", 0) or 0),
|
|
avg_entry_price=float(data.get("avg_entry_price", 0) or 0),
|
|
current_price=float(data.get("current_price", 0) or 0),
|
|
unrealized_pnl=float(data.get("unrealized_pl", 0) or 0),
|
|
market_value=float(data.get("market_value", 0) or 0),
|
|
side=str(data.get("side", "long")),
|
|
)
|
|
|
|
def _error_result(
|
|
self,
|
|
ticker: str,
|
|
error: str,
|
|
elapsed_ms: float,
|
|
http_status: int | None = None,
|
|
raw: bytes = b"",
|
|
) -> AdapterResult:
|
|
"""Build an error AdapterResult for broker fetches."""
|
|
return AdapterResult(
|
|
source_type="broker",
|
|
ticker=ticker,
|
|
items=[],
|
|
raw_payload=raw,
|
|
content_hash="",
|
|
fetched_at=datetime.now(timezone.utc),
|
|
error=error,
|
|
http_status=http_status,
|
|
response_time_ms=round(elapsed_ms, 1),
|
|
metadata={"provider": "alpaca", "mode": self._mode.value},
|
|
)
|