bb40a3cb8e
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline failed
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize unknown status
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
The sync_positions loop only upserted positions from Alpaca but never deleted DB rows for positions that were closed/liquidated on the broker side. After a paper reset, the next sync would not remove the stale positions because they simply weren't in Alpaca's response anymore. Now performs full reconciliation: after upserting what Alpaca reports, deletes any DB positions for the account that Alpaca no longer holds.
966 lines
34 KiB
Python
966 lines
34 KiB
Python
"""Broker adapter service - standalone worker for sandbox order execution.
|
|
|
|
Runs the Alpaca broker adapter in sandbox (paper) mode, processing order
|
|
requests from the broker queue, evaluating them through the risk engine,
|
|
submitting to Alpaca's paper trading API, and persisting the full audit trail.
|
|
|
|
Also periodically syncs positions and account state from Alpaca.
|
|
|
|
Implements idempotent order submission keys and duplicate prevention:
|
|
- Deterministic idempotency key generation from job attributes
|
|
- Redis-based fast-path duplicate detection before broker submission
|
|
- PostgreSQL UNIQUE constraint on idempotency_key as durable fallback
|
|
|
|
Requirements: 2.4, 8.1, 8.3, 8.5
|
|
Design: Section 4.9 - Broker Adapter
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
import redis.asyncio as aioredis
|
|
|
|
from services.adapters.broker_adapter import (
|
|
AlpacaBrokerAdapter,
|
|
OrderRequest,
|
|
OrderResponse,
|
|
OrderSide,
|
|
OrderStatus,
|
|
OrderType,
|
|
TradingMode,
|
|
)
|
|
from services.lake_publisher.worker import (
|
|
LAKEHOUSE_BUCKET,
|
|
publish_positions_daily_batch,
|
|
publish_trade_fill,
|
|
publish_trade_order,
|
|
)
|
|
from services.risk.approval import (
|
|
ApprovalRequest,
|
|
compute_expiry,
|
|
create_approval_request,
|
|
requires_approval,
|
|
)
|
|
from services.risk.engine import (
|
|
AccountRiskState,
|
|
PortfolioRiskConfig,
|
|
ProposedOrder,
|
|
clamp_order_to_position_limits,
|
|
evaluate_order,
|
|
)
|
|
from services.shared.audit import (
|
|
audit_approval_requested,
|
|
audit_duplicate_prevented,
|
|
audit_order_filled,
|
|
audit_order_rejected,
|
|
audit_order_submitted,
|
|
audit_risk_evaluated,
|
|
)
|
|
from services.shared.config import load_config
|
|
from services.shared.db import get_pg_pool, get_redis
|
|
from services.shared.logging import setup_logging
|
|
from services.shared.metrics import (
|
|
ORDERS_CLAMPED,
|
|
ORDERS_DUPLICATES_PREVENTED,
|
|
ORDERS_FILLED,
|
|
ORDERS_REJECTED,
|
|
ORDERS_SUBMITTED,
|
|
POSITIONS_SYNCED,
|
|
RISK_CHECK_FAILURES,
|
|
RISK_EVALUATIONS_TOTAL,
|
|
)
|
|
from services.shared.redis_keys import QUEUE_BROKER, is_pipeline_enabled, queue_key
|
|
|
|
logger = logging.getLogger("broker_service")
|
|
|
|
POSITION_SYNC_INTERVAL = 60 # seconds
|
|
|
|
# Redis TTL for idempotency markers (24 hours)
|
|
ORDER_IDEMPOTENCY_TTL = 86400
|
|
ORDER_IDEMPOTENCY_PREFIX = "stonks:order_idempotency"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB persistence helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_UPSERT_BROKER_ACCOUNT = """
|
|
INSERT INTO broker_accounts (id, provider, account_id, mode, config, active)
|
|
VALUES ($1::uuid, $2, $3, $4, $5::jsonb, TRUE)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
config = EXCLUDED.config,
|
|
mode = EXCLUDED.mode,
|
|
active = TRUE
|
|
"""
|
|
|
|
_INSERT_ORDER = """
|
|
INSERT INTO orders (
|
|
id, recommendation_id, broker_account_id, ticker, side, order_type,
|
|
quantity, limit_price, stop_price, status, idempotency_key,
|
|
broker_order_id, decision_trace, submitted_at, filled_at,
|
|
fill_price, fill_quantity
|
|
) VALUES (
|
|
$1::uuid, $2, $3::uuid, $4, $5, $6,
|
|
$7, $8, $9, $10, $11,
|
|
$12, $13::jsonb, $14, $15,
|
|
$16, $17
|
|
)
|
|
ON CONFLICT (idempotency_key) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
broker_order_id = EXCLUDED.broker_order_id,
|
|
filled_at = EXCLUDED.filled_at,
|
|
fill_price = EXCLUDED.fill_price,
|
|
fill_quantity = EXCLUDED.fill_quantity,
|
|
updated_at = NOW()
|
|
"""
|
|
|
|
_INSERT_ORDER_EVENT = """
|
|
INSERT INTO order_events (order_id, event_type, data, broker_timestamp)
|
|
VALUES ($1::uuid, $2, $3::jsonb, $4)
|
|
"""
|
|
|
|
_INSERT_RISK_EVALUATION = """
|
|
INSERT INTO risk_evaluations (id, recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5::jsonb, $6::jsonb, $7)
|
|
"""
|
|
|
|
_UPSERT_POSITION = """
|
|
INSERT INTO positions (broker_account_id, ticker, quantity, avg_entry_price, current_price, unrealized_pnl, updated_at)
|
|
VALUES ($1::uuid, $2, $3, $4, $5, $6, $7)
|
|
ON CONFLICT (broker_account_id, ticker)
|
|
DO UPDATE SET
|
|
quantity = EXCLUDED.quantity,
|
|
avg_entry_price = EXCLUDED.avg_entry_price,
|
|
current_price = EXCLUDED.current_price,
|
|
unrealized_pnl = EXCLUDED.unrealized_pnl,
|
|
updated_at = EXCLUDED.updated_at
|
|
"""
|
|
|
|
_LOAD_RISK_CONFIG = """
|
|
SELECT config FROM risk_configs WHERE active = TRUE ORDER BY updated_at DESC LIMIT 1
|
|
"""
|
|
|
|
_LOAD_DAILY_SNAPSHOT = """
|
|
SELECT portfolio_value, daily_pnl, daily_trade_count, positions_by_sector
|
|
FROM daily_risk_snapshots
|
|
WHERE account_id = $1 AND snapshot_date = CURRENT_DATE
|
|
LIMIT 1
|
|
"""
|
|
|
|
_CHECK_ORDER_BY_IDEMPOTENCY_KEY = """
|
|
SELECT id, status, broker_order_id FROM orders
|
|
WHERE idempotency_key = $1
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Idempotency helpers (Requirement 8.5)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def generate_idempotency_key(job: dict[str, Any]) -> str:
|
|
"""Generate a deterministic idempotency key from job attributes.
|
|
|
|
If the job already carries an explicit idempotency_key, use it.
|
|
Otherwise, derive a stable key from the combination of
|
|
recommendation_id, ticker, side, quantity, and order_type so that
|
|
replayed queue messages produce the same key and are detected as
|
|
duplicates.
|
|
"""
|
|
explicit = job.get("idempotency_key")
|
|
if explicit:
|
|
return str(explicit)
|
|
|
|
# Build a deterministic key from job content
|
|
parts = [
|
|
str(job.get("recommendation_id", "")),
|
|
str(job.get("ticker", "")),
|
|
str(job.get("side", "buy")),
|
|
str(job.get("quantity", 0)),
|
|
str(job.get("order_type", "market")),
|
|
str(job.get("limit_price", "")),
|
|
str(job.get("stop_price", "")),
|
|
]
|
|
raw = "|".join(parts)
|
|
return hashlib.sha256(raw.encode()).hexdigest()[:40]
|
|
|
|
|
|
def _redis_idempotency_key(idempotency_key: str) -> str:
|
|
"""Build the Redis key for an order idempotency marker."""
|
|
return f"{ORDER_IDEMPOTENCY_PREFIX}:{idempotency_key}"
|
|
|
|
|
|
async def check_idempotency_redis(
|
|
rds: aioredis.Redis,
|
|
idempotency_key: str,
|
|
) -> str | None:
|
|
"""Fast-path: check Redis for a previously processed idempotency key.
|
|
|
|
Returns the existing order_id if found, None otherwise.
|
|
"""
|
|
redis_key = _redis_idempotency_key(idempotency_key)
|
|
cached = await rds.get(redis_key)
|
|
if cached:
|
|
return str(cached)
|
|
return None
|
|
|
|
|
|
async def check_idempotency_db(
|
|
pool: asyncpg.Pool,
|
|
idempotency_key: str,
|
|
) -> dict[str, Any] | None:
|
|
"""Durable fallback: check PostgreSQL for an existing order with this key.
|
|
|
|
Returns a dict with id, status, broker_order_id if found, None otherwise.
|
|
"""
|
|
row = await pool.fetchrow(_CHECK_ORDER_BY_IDEMPOTENCY_KEY, idempotency_key)
|
|
if row:
|
|
return {
|
|
"id": str(row["id"]),
|
|
"status": str(row["status"]),
|
|
"broker_order_id": str(row["broker_order_id"] or ""),
|
|
}
|
|
return None
|
|
|
|
|
|
async def mark_idempotency_redis(
|
|
rds: aioredis.Redis,
|
|
idempotency_key: str,
|
|
order_id: str,
|
|
) -> None:
|
|
"""Set the Redis idempotency marker after an order is processed."""
|
|
redis_key = _redis_idempotency_key(idempotency_key)
|
|
await rds.set(redis_key, order_id, ex=ORDER_IDEMPOTENCY_TTL)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core service logic
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_order_request(job: dict[str, Any]) -> OrderRequest:
|
|
"""Build an OrderRequest from a broker queue job payload."""
|
|
side = OrderSide.SELL if job.get("side", "buy") == "sell" else OrderSide.BUY
|
|
order_type_str = job.get("order_type", "market")
|
|
order_type_map = {
|
|
"market": OrderType.MARKET,
|
|
"limit": OrderType.LIMIT,
|
|
"stop": OrderType.STOP,
|
|
"stop_limit": OrderType.STOP_LIMIT,
|
|
}
|
|
return OrderRequest(
|
|
ticker=job["ticker"],
|
|
side=side,
|
|
quantity=float(job.get("quantity", 0)),
|
|
order_type=order_type_map.get(order_type_str, OrderType.MARKET),
|
|
limit_price=job.get("limit_price"),
|
|
stop_price=job.get("stop_price"),
|
|
time_in_force=job.get("time_in_force", "day"),
|
|
idempotency_key=generate_idempotency_key(job),
|
|
)
|
|
|
|
|
|
def build_proposed_order(job: dict[str, Any]) -> ProposedOrder:
|
|
"""Build a ProposedOrder for risk evaluation from a broker queue job."""
|
|
return ProposedOrder(
|
|
recommendation_id=job.get("recommendation_id"),
|
|
ticker=job["ticker"],
|
|
sector=job.get("sector", ""),
|
|
action=job.get("side", "buy"),
|
|
quantity=float(job.get("quantity", 0)),
|
|
estimated_value=float(job.get("estimated_value", 0)),
|
|
confidence=float(job.get("confidence", 0)),
|
|
)
|
|
|
|
|
|
async def load_risk_config(pool: asyncpg.Pool) -> PortfolioRiskConfig:
|
|
"""Load the active risk configuration from the database."""
|
|
row = await pool.fetchrow(_LOAD_RISK_CONFIG)
|
|
if row and row["config"]:
|
|
data = row["config"] if isinstance(row["config"], dict) else json.loads(row["config"])
|
|
return PortfolioRiskConfig.from_db_json(data)
|
|
return PortfolioRiskConfig()
|
|
|
|
|
|
async def _estimate_share_price(
|
|
adapter: AlpacaBrokerAdapter,
|
|
ticker: str,
|
|
) -> float:
|
|
"""Estimate the current per-share price for a ticker.
|
|
|
|
Checks existing Alpaca positions first (free, no API call).
|
|
Returns 0.0 if no price can be determined.
|
|
"""
|
|
try:
|
|
positions = await adapter.get_positions()
|
|
for pos in positions:
|
|
if pos.ticker == ticker and pos.current_price > 0:
|
|
return pos.current_price
|
|
except Exception as e:
|
|
logger.debug("Could not fetch positions for price estimate: %s", e)
|
|
return 0.0
|
|
|
|
|
|
async def load_account_risk_state(
|
|
pool: asyncpg.Pool,
|
|
adapter: AlpacaBrokerAdapter,
|
|
account_uuid: str,
|
|
) -> AccountRiskState:
|
|
"""Build an AccountRiskState from the broker and daily snapshot."""
|
|
state = AccountRiskState(account_id=account_uuid)
|
|
|
|
# Get live account info from Alpaca
|
|
try:
|
|
acct = await adapter.get_account()
|
|
state.portfolio_value = acct.portfolio_value
|
|
state.cash = acct.cash
|
|
state.buying_power = acct.buying_power
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch account from Alpaca: %s", e)
|
|
|
|
# Get positions from Alpaca
|
|
try:
|
|
positions = await adapter.get_positions()
|
|
for pos in positions:
|
|
state.positions_by_symbol[pos.ticker] = pos.market_value
|
|
state.open_position_count = len(positions)
|
|
except Exception as e:
|
|
logger.warning("Failed to fetch positions from Alpaca: %s", e)
|
|
|
|
# Overlay daily snapshot from DB
|
|
row = await pool.fetchrow(_LOAD_DAILY_SNAPSHOT, account_uuid)
|
|
if row:
|
|
state.daily_pnl = float(row["daily_pnl"] or 0)
|
|
state.daily_trade_count = int(row["daily_trade_count"] or 0)
|
|
sector_data = row["positions_by_sector"]
|
|
if sector_data:
|
|
state.positions_by_sector = (
|
|
sector_data if isinstance(sector_data, dict) else json.loads(sector_data)
|
|
)
|
|
|
|
return state
|
|
|
|
|
|
async def persist_order(
|
|
pool: asyncpg.Pool,
|
|
order_id: str,
|
|
order: OrderRequest,
|
|
resp: OrderResponse,
|
|
account_uuid: str,
|
|
risk_eval: dict[str, Any],
|
|
recommendation_id: str | None = None,
|
|
) -> None:
|
|
"""Persist order, events, and risk evaluation to PostgreSQL."""
|
|
now = datetime.now(timezone.utc)
|
|
filled_at = now if resp.status == OrderStatus.FILLED else None
|
|
|
|
decision_trace = {
|
|
"risk_evaluation": risk_eval,
|
|
"order_request": order.to_dict(),
|
|
"broker_response": resp.to_dict(),
|
|
}
|
|
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
await conn.execute(
|
|
_INSERT_ORDER,
|
|
order_id,
|
|
recommendation_id,
|
|
account_uuid,
|
|
order.ticker,
|
|
order.side.value,
|
|
order.order_type.value,
|
|
order.quantity,
|
|
order.limit_price,
|
|
order.stop_price,
|
|
resp.status.value,
|
|
order.idempotency_key,
|
|
resp.broker_order_id,
|
|
json.dumps(decision_trace),
|
|
resp.submitted_at or now,
|
|
filled_at,
|
|
resp.filled_avg_price,
|
|
resp.filled_quantity,
|
|
)
|
|
|
|
# Record order events
|
|
for event_type in ["submitted"]:
|
|
await conn.execute(
|
|
_INSERT_ORDER_EVENT,
|
|
order_id,
|
|
event_type,
|
|
json.dumps({"ticker": order.ticker, "side": order.side.value}),
|
|
now,
|
|
)
|
|
|
|
if resp.status == OrderStatus.FILLED:
|
|
await conn.execute(
|
|
_INSERT_ORDER_EVENT,
|
|
order_id,
|
|
"fill",
|
|
json.dumps({
|
|
"fill_price": resp.filled_avg_price,
|
|
"fill_qty": resp.filled_quantity,
|
|
}),
|
|
now,
|
|
)
|
|
elif resp.status == OrderStatus.REJECTED:
|
|
await conn.execute(
|
|
_INSERT_ORDER_EVENT,
|
|
order_id,
|
|
"rejected",
|
|
json.dumps({"error": resp.error}),
|
|
now,
|
|
)
|
|
|
|
|
|
async def sync_positions(
|
|
adapter: AlpacaBrokerAdapter,
|
|
pool: asyncpg.Pool,
|
|
account_uuid: str,
|
|
minio_client: Any | None = None,
|
|
) -> None:
|
|
"""Sync current positions from Alpaca to PostgreSQL and publish to lake.
|
|
|
|
Performs a full reconciliation: upserts positions that Alpaca reports,
|
|
then removes any DB positions that Alpaca no longer holds (e.g. after
|
|
a paper reset or full liquidation).
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
try:
|
|
positions = await adapter.get_positions()
|
|
broker_tickers = {pos.ticker for pos in positions}
|
|
async with pool.acquire() as conn:
|
|
for pos in positions:
|
|
await conn.execute(
|
|
_UPSERT_POSITION,
|
|
account_uuid,
|
|
pos.ticker,
|
|
pos.quantity,
|
|
pos.avg_entry_price,
|
|
pos.current_price,
|
|
pos.unrealized_pnl,
|
|
now,
|
|
)
|
|
# Remove positions that the broker no longer reports (closed/liquidated)
|
|
if broker_tickers:
|
|
await conn.execute(
|
|
"DELETE FROM positions WHERE broker_account_id = $1::uuid AND ticker != ALL($2::varchar[])",
|
|
account_uuid,
|
|
list(broker_tickers),
|
|
)
|
|
else:
|
|
# Broker reports zero positions — clear all local positions for this account
|
|
await conn.execute(
|
|
"DELETE FROM positions WHERE broker_account_id = $1::uuid",
|
|
account_uuid,
|
|
)
|
|
logger.info("Synced %d positions from Alpaca (reconciled)", len(positions))
|
|
POSITIONS_SYNCED.inc()
|
|
|
|
# Publish positions snapshot to analytical lake
|
|
if minio_client is not None and positions:
|
|
try:
|
|
pos_dicts = [
|
|
{
|
|
"ticker": p.ticker,
|
|
"quantity": p.quantity,
|
|
"avg_entry_price": p.avg_entry_price,
|
|
"close_price": p.current_price,
|
|
"unrealized_pnl": p.unrealized_pnl,
|
|
}
|
|
for p in positions
|
|
]
|
|
publish_positions_daily_batch(
|
|
minio_client, pos_dicts, account_uuid, now,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to publish positions to lake: %s", e)
|
|
except Exception as e:
|
|
logger.error("Position sync failed: %s", e)
|
|
|
|
|
|
async def register_broker_account(
|
|
pool: asyncpg.Pool,
|
|
account_uuid: str,
|
|
adapter: AlpacaBrokerAdapter,
|
|
) -> None:
|
|
"""Register or update the broker account in PostgreSQL."""
|
|
try:
|
|
acct = await adapter.get_account()
|
|
config_json = json.dumps({
|
|
"provider": "alpaca",
|
|
"buying_power": acct.buying_power,
|
|
"cash": acct.cash,
|
|
"portfolio_value": acct.portfolio_value,
|
|
})
|
|
await pool.execute(
|
|
_UPSERT_BROKER_ACCOUNT,
|
|
account_uuid,
|
|
"alpaca",
|
|
acct.account_id or account_uuid,
|
|
adapter.mode.value,
|
|
config_json,
|
|
)
|
|
logger.info(
|
|
"Registered Alpaca account: id=%s mode=%s portfolio=%.2f",
|
|
acct.account_id, adapter.mode.value, acct.portfolio_value,
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to register broker account: %s", e)
|
|
|
|
|
|
async def process_order_job(
|
|
job: dict[str, Any],
|
|
adapter: AlpacaBrokerAdapter,
|
|
pool: asyncpg.Pool,
|
|
account_uuid: str,
|
|
rds: aioredis.Redis | None = None,
|
|
minio_client: Any | None = None,
|
|
) -> None:
|
|
"""Process a single order job from the broker queue.
|
|
|
|
1. Generate deterministic idempotency key
|
|
2. Check Redis + DB for duplicate (Req 8.5)
|
|
3. Build proposed order and run risk evaluation
|
|
4. If risk passes, submit to Alpaca
|
|
5. Persist order, events, and risk evaluation
|
|
6. Set Redis idempotency marker
|
|
"""
|
|
ticker = job.get("ticker", "???")
|
|
order_id = str(uuid.uuid4())
|
|
idempotency_key = generate_idempotency_key(job)
|
|
|
|
# --- Duplicate prevention (Requirement 8.5) ---
|
|
# Fast path: Redis check
|
|
if rds is not None:
|
|
existing_order_id = await check_idempotency_redis(rds, idempotency_key)
|
|
if existing_order_id:
|
|
logger.info(
|
|
"Duplicate order detected (redis) for %s key=%s existing=%s",
|
|
ticker, idempotency_key[:16], existing_order_id,
|
|
)
|
|
ORDERS_DUPLICATES_PREVENTED.labels(detected_via="redis").inc()
|
|
await audit_duplicate_prevented(
|
|
pool, existing_order_id, ticker, idempotency_key, detected_via="redis",
|
|
)
|
|
return
|
|
|
|
# Durable fallback: DB check
|
|
existing = await check_idempotency_db(pool, idempotency_key)
|
|
if existing:
|
|
logger.info(
|
|
"Duplicate order detected (db) for %s key=%s existing=%s status=%s",
|
|
ticker, idempotency_key[:16], existing["id"], existing["status"],
|
|
)
|
|
ORDERS_DUPLICATES_PREVENTED.labels(detected_via="db").inc()
|
|
await audit_duplicate_prevented(
|
|
pool, existing["id"], ticker, idempotency_key, detected_via="db",
|
|
)
|
|
# Warm Redis cache for future fast-path hits
|
|
if rds is not None:
|
|
await mark_idempotency_redis(rds, idempotency_key, existing["id"])
|
|
return
|
|
|
|
# Risk evaluation
|
|
risk_config = await load_risk_config(pool)
|
|
risk_state = await load_account_risk_state(pool, adapter, account_uuid)
|
|
proposed = build_proposed_order(job)
|
|
|
|
# If estimated_value is missing, derive it from Alpaca positions or
|
|
# a fresh quote so that position-limit clamping can work.
|
|
if proposed.estimated_value <= 0 and proposed.quantity > 0:
|
|
price_per_share = await _estimate_share_price(adapter, proposed.ticker)
|
|
if price_per_share > 0:
|
|
proposed = proposed.model_copy(update={
|
|
"estimated_value": proposed.quantity * price_per_share,
|
|
})
|
|
job["estimated_value"] = proposed.estimated_value
|
|
|
|
# Auto-clamp buy orders to fit within position limits instead of
|
|
# hard-rejecting. If the clamped quantity is zero the normal risk
|
|
# evaluation will still reject the order with a clear reason.
|
|
original_qty = proposed.quantity
|
|
proposed = clamp_order_to_position_limits(proposed, risk_config, risk_state)
|
|
if proposed.quantity != original_qty:
|
|
logger.info(
|
|
"Order for %s clamped from %.0f to %.0f shares "
|
|
"(value %.2f → %.2f) to fit position limits",
|
|
ticker, original_qty, proposed.quantity,
|
|
float(job.get("estimated_value", 0)), proposed.estimated_value,
|
|
)
|
|
ORDERS_CLAMPED.inc()
|
|
# Update the job dict so build_order_request picks up the clamped qty
|
|
job["quantity"] = proposed.quantity
|
|
job["estimated_value"] = proposed.estimated_value
|
|
|
|
evaluation = evaluate_order(proposed, risk_config, risk_state)
|
|
|
|
risk_eval_dict = {
|
|
"evaluation_id": evaluation.evaluation_id,
|
|
"eligible": evaluation.eligible,
|
|
"allowed_mode": evaluation.allowed_mode.value,
|
|
"rejection_reasons": evaluation.rejection_reasons,
|
|
"checks": [c.model_dump(mode="json") for c in evaluation.checks],
|
|
}
|
|
|
|
# Persist risk evaluation
|
|
rec_id = job.get("recommendation_id")
|
|
try:
|
|
await pool.execute(
|
|
_INSERT_RISK_EVALUATION,
|
|
evaluation.evaluation_id,
|
|
rec_id,
|
|
evaluation.eligible,
|
|
evaluation.allowed_mode.value,
|
|
json.dumps(evaluation.rejection_reasons),
|
|
json.dumps(risk_eval_dict["checks"]),
|
|
evaluation.evaluated_at,
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to persist risk evaluation: %s", e)
|
|
|
|
# Audit: risk evaluation result
|
|
await audit_risk_evaluated(
|
|
pool,
|
|
evaluation_id=evaluation.evaluation_id,
|
|
recommendation_id=rec_id,
|
|
ticker=ticker,
|
|
eligible=evaluation.eligible,
|
|
allowed_mode=evaluation.allowed_mode.value,
|
|
rejection_reasons=evaluation.rejection_reasons,
|
|
check_count=len(evaluation.checks),
|
|
)
|
|
|
|
if not evaluation.eligible:
|
|
RISK_EVALUATIONS_TOTAL.labels(result="rejected").inc()
|
|
for check in evaluation.checks:
|
|
if check.result.value == "fail":
|
|
RISK_CHECK_FAILURES.labels(check_name=check.check_name).inc()
|
|
ORDERS_REJECTED.labels(reason_category="risk_engine").inc()
|
|
logger.info(
|
|
"Order rejected by risk engine for %s: %s",
|
|
ticker, evaluation.rejection_reasons,
|
|
)
|
|
# Persist the rejected order for audit
|
|
order_req = build_order_request(job)
|
|
rejected_resp = OrderResponse(
|
|
broker_order_id="",
|
|
status=OrderStatus.REJECTED,
|
|
ticker=ticker,
|
|
side=OrderSide.SELL if job.get("side") == "sell" else OrderSide.BUY,
|
|
quantity=float(job.get("quantity", 0)),
|
|
error=f"Risk rejected: {'; '.join(evaluation.rejection_reasons)}",
|
|
)
|
|
await persist_order(
|
|
pool, order_id, order_req, rejected_resp,
|
|
account_uuid, risk_eval_dict, rec_id,
|
|
)
|
|
# Publish rejected order fact to analytical lake
|
|
if minio_client is not None:
|
|
try:
|
|
publish_trade_order(
|
|
minio_client, order_id, ticker,
|
|
side=job.get("side", "buy"),
|
|
order_type=job.get("order_type", "market"),
|
|
quantity=float(job.get("quantity", 0)),
|
|
limit_price=job.get("limit_price"),
|
|
status="rejected",
|
|
broker_account=account_uuid,
|
|
submitted_at=datetime.now(timezone.utc),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to publish rejected order to lake: %s", e)
|
|
# Audit: order rejected by risk engine
|
|
await audit_order_rejected(
|
|
pool, order_id, ticker,
|
|
reason=f"Risk rejected: {'; '.join(evaluation.rejection_reasons)}",
|
|
source="risk_engine",
|
|
)
|
|
# Mark idempotency even for rejected orders to prevent reprocessing
|
|
if rds is not None:
|
|
await mark_idempotency_redis(rds, idempotency_key, order_id)
|
|
return
|
|
|
|
# --- Operator approval gate (Requirement 8.2) ---
|
|
if requires_approval(risk_config, evaluation.allowed_mode):
|
|
expiry = compute_expiry(risk_config)
|
|
approval_req = ApprovalRequest(
|
|
order_job=job,
|
|
recommendation_id=rec_id,
|
|
ticker=ticker,
|
|
side=job.get("side", "buy"),
|
|
quantity=float(job.get("quantity", 0)),
|
|
estimated_value=float(job.get("estimated_value", 0)),
|
|
risk_evaluation_id=evaluation.evaluation_id,
|
|
expires_at=expiry,
|
|
)
|
|
try:
|
|
await create_approval_request(pool, approval_req)
|
|
logger.info(
|
|
"Order for %s held for operator approval (id=%s, expires=%s)",
|
|
ticker, approval_req.approval_id, expiry.isoformat(),
|
|
)
|
|
await audit_approval_requested(
|
|
pool,
|
|
approval_id=approval_req.approval_id,
|
|
ticker=ticker,
|
|
side=approval_req.side,
|
|
quantity=approval_req.quantity,
|
|
estimated_value=approval_req.estimated_value,
|
|
recommendation_id=rec_id,
|
|
expires_at=expiry.isoformat(),
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to create approval request for %s: %s", ticker, e)
|
|
# Do NOT mark idempotency — the job will be re-submitted after approval
|
|
return
|
|
|
|
# Submit to Alpaca
|
|
order_req = build_order_request(job)
|
|
RISK_EVALUATIONS_TOTAL.labels(result="passed").inc()
|
|
|
|
# Audit: order submitted to broker
|
|
await audit_order_submitted(
|
|
pool,
|
|
order_id=order_id,
|
|
ticker=ticker,
|
|
side=order_req.side.value,
|
|
quantity=order_req.quantity,
|
|
order_type=order_req.order_type.value,
|
|
idempotency_key=order_req.idempotency_key,
|
|
recommendation_id=rec_id,
|
|
evaluation_id=evaluation.evaluation_id,
|
|
)
|
|
|
|
resp = await adapter.submit_order(order_req)
|
|
|
|
await persist_order(
|
|
pool, order_id, order_req, resp,
|
|
account_uuid, risk_eval_dict, rec_id,
|
|
)
|
|
|
|
# Publish order fact to analytical lake
|
|
if minio_client is not None:
|
|
try:
|
|
publish_trade_order(
|
|
minio_client, order_id, ticker,
|
|
side=order_req.side.value,
|
|
order_type=order_req.order_type.value,
|
|
quantity=order_req.quantity,
|
|
limit_price=order_req.limit_price,
|
|
status=resp.status.value,
|
|
broker_account=account_uuid,
|
|
submitted_at=resp.submitted_at or datetime.now(timezone.utc),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to publish order to lake: %s", e)
|
|
|
|
# Publish fill fact if the order was filled
|
|
if resp.status == OrderStatus.FILLED and resp.filled_avg_price is not None:
|
|
try:
|
|
fill_id = str(uuid.uuid4())
|
|
publish_trade_fill(
|
|
minio_client, fill_id, order_id, ticker,
|
|
side=order_req.side.value,
|
|
fill_price=resp.filled_avg_price,
|
|
fill_quantity=resp.filled_quantity,
|
|
broker_account=account_uuid,
|
|
filled_at=datetime.now(timezone.utc),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to publish fill to lake: %s", e)
|
|
|
|
# Mark idempotency after successful persistence
|
|
if rds is not None:
|
|
await mark_idempotency_redis(rds, idempotency_key, order_id)
|
|
|
|
if resp.ok:
|
|
mode = "paper" if adapter.mode == TradingMode.PAPER else "live"
|
|
ORDERS_SUBMITTED.labels(
|
|
side=order_req.side.value,
|
|
order_type=order_req.order_type.value,
|
|
mode=mode,
|
|
).inc()
|
|
logger.info(
|
|
"Order submitted to Alpaca: %s %s %.0f %s @ %s | broker_id=%s",
|
|
resp.status.value, order_req.side.value, order_req.quantity,
|
|
ticker, resp.filled_avg_price, resp.broker_order_id,
|
|
)
|
|
# Audit: order filled
|
|
if resp.status == OrderStatus.FILLED:
|
|
ORDERS_FILLED.labels(side=order_req.side.value).inc()
|
|
await audit_order_filled(
|
|
pool, order_id, ticker,
|
|
side=order_req.side.value,
|
|
fill_quantity=resp.filled_quantity,
|
|
fill_price=resp.filled_avg_price,
|
|
broker_order_id=resp.broker_order_id,
|
|
)
|
|
else:
|
|
ORDERS_REJECTED.labels(reason_category="broker").inc()
|
|
logger.warning(
|
|
"Order failed for %s: %s (status=%s)",
|
|
ticker, resp.error, resp.status.value,
|
|
)
|
|
# Audit: order rejected by broker
|
|
await audit_order_rejected(
|
|
pool, order_id, ticker,
|
|
reason=resp.error or f"Broker status: {resp.status.value}",
|
|
source="broker",
|
|
)
|
|
|
|
|
|
|
|
async def sync_order_statuses(
|
|
adapter: AlpacaBrokerAdapter,
|
|
pool: asyncpg.Pool,
|
|
minio_client: Any | None = None,
|
|
) -> None:
|
|
"""Sync pending order statuses from Alpaca to PostgreSQL.
|
|
|
|
Queries for orders still in 'pending' or 'submitted' state and checks
|
|
their current status via the broker API. Updates the local row when
|
|
the order has been filled, cancelled, or rejected.
|
|
"""
|
|
try:
|
|
rows = await pool.fetch(
|
|
"SELECT id, broker_order_id, ticker, side, quantity "
|
|
"FROM orders WHERE status IN ('pending', 'submitted', 'accepted') "
|
|
"AND broker_order_id IS NOT NULL "
|
|
"AND submitted_at > NOW() - INTERVAL '24 hours'",
|
|
)
|
|
if not rows:
|
|
return
|
|
|
|
now = datetime.now(timezone.utc)
|
|
updated = 0
|
|
for row in rows:
|
|
try:
|
|
resp = await adapter.get_order_status(row["broker_order_id"])
|
|
new_status = resp.status.value
|
|
if new_status in ("pending", "submitted", "accepted"):
|
|
continue # still in-flight
|
|
|
|
update_fields: dict[str, Any] = {"status": new_status, "updated_at": now}
|
|
if resp.status == OrderStatus.FILLED:
|
|
update_fields["filled_at"] = now
|
|
update_fields["fill_price"] = resp.filled_avg_price
|
|
update_fields["fill_quantity"] = resp.filled_quantity
|
|
elif resp.status == OrderStatus.CANCELLED:
|
|
update_fields["cancelled_at"] = now
|
|
elif resp.status == OrderStatus.REJECTED:
|
|
update_fields["rejected_at"] = now
|
|
update_fields["rejection_reason"] = resp.error
|
|
|
|
set_clause = ", ".join(f"{k} = ${i+2}" for i, k in enumerate(update_fields))
|
|
await pool.execute(
|
|
f"UPDATE orders SET {set_clause} WHERE id = $1",
|
|
row["id"],
|
|
*update_fields.values(),
|
|
)
|
|
updated += 1
|
|
logger.info(
|
|
"Order %s (%s %s) status updated: %s -> %s",
|
|
row["id"], row["side"], row["ticker"],
|
|
"pending", new_status,
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not sync order %s", row["id"], exc_info=True)
|
|
|
|
if updated:
|
|
logger.info("Synced %d order statuses from Alpaca", updated)
|
|
except Exception as e:
|
|
logger.error("Order status sync failed: %s", e)
|
|
|
|
|
|
async def position_sync_loop(
|
|
adapter: AlpacaBrokerAdapter,
|
|
pool: asyncpg.Pool,
|
|
account_uuid: str,
|
|
minio_client: Any | None = None,
|
|
) -> None:
|
|
"""Periodically sync positions from Alpaca to PostgreSQL and lake."""
|
|
while True:
|
|
await sync_positions(adapter, pool, account_uuid, minio_client)
|
|
await sync_order_statuses(adapter, pool, minio_client)
|
|
await asyncio.sleep(POSITION_SYNC_INTERVAL)
|
|
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
setup_logging("broker_service", level=config.log_level, json_output=config.json_logs)
|
|
|
|
pool = await get_pg_pool(config)
|
|
rds = get_redis(config)
|
|
|
|
# Initialize MinIO client for lake publishing
|
|
from minio import Minio
|
|
minio_client = Minio(
|
|
config.minio.endpoint,
|
|
access_key=config.minio.access_key,
|
|
secret_key=config.minio.secret_key,
|
|
secure=config.minio.secure,
|
|
)
|
|
# Ensure lakehouse bucket exists
|
|
if not minio_client.bucket_exists(LAKEHOUSE_BUCKET):
|
|
minio_client.make_bucket(LAKEHOUSE_BUCKET)
|
|
|
|
# Determine mode — default to paper for safety (Req 8.1)
|
|
mode = TradingMode.LIVE if config.broker.mode == "live" else TradingMode.PAPER
|
|
if mode == TradingMode.LIVE:
|
|
logger.warning("LIVE trading mode enabled — orders will be submitted to real broker")
|
|
|
|
adapter = AlpacaBrokerAdapter(
|
|
api_key=config.broker.api_key or "",
|
|
api_secret=config.broker.api_secret or "",
|
|
mode=mode,
|
|
base_url=config.broker.base_url,
|
|
)
|
|
|
|
# Generate a stable account UUID from the API key
|
|
account_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"alpaca-{config.broker.api_key or 'default'}"))
|
|
|
|
# Register broker account on startup
|
|
await register_broker_account(pool, account_uuid, adapter)
|
|
|
|
# Start position sync in background
|
|
sync_task = asyncio.create_task(
|
|
position_sync_loop(adapter, pool, account_uuid, minio_client)
|
|
)
|
|
|
|
queue = queue_key(QUEUE_BROKER)
|
|
logger.info("Broker service started (mode=%s)", mode.value)
|
|
|
|
try:
|
|
while True:
|
|
if not await is_pipeline_enabled(rds):
|
|
await asyncio.sleep(2)
|
|
continue
|
|
result = await rds.lpop(queue)
|
|
raw = str(result) if result else None
|
|
if raw:
|
|
try:
|
|
job = json.loads(raw)
|
|
await process_order_job(job, adapter, pool, account_uuid, rds, minio_client)
|
|
except Exception:
|
|
logger.exception("Error processing broker job")
|
|
else:
|
|
await asyncio.sleep(2)
|
|
finally:
|
|
sync_task.cancel()
|
|
await pool.close()
|
|
await rds.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|