Files
stonks-oracle/services/adapters/broker_service.py
T
Celes Renata 8c3c1aab43
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
fix: pipeline stop now halts all workers and flushes queues
Workers (ingestion, parser, extractor, aggregation, recommendation,
broker, lake-publisher) now check the pipeline:enabled Redis flag on
each loop iteration and sleep when disabled.

The toggle endpoint flushes all pipeline queues on disable so queued
jobs don't resume when workers eventually check. Broker/trading queues
are excluded from flush to avoid dropping in-flight orders.
2026-04-29 07:59:35 +00:00

947 lines
33 KiB
Python

"""Broker adapter service - standalone worker for sandbox order execution.
Runs the Alpaca broker adapter in sandbox (paper) mode, processing order
requests from the broker queue, evaluating them through the risk engine,
submitting to Alpaca's paper trading API, and persisting the full audit trail.
Also periodically syncs positions and account state from Alpaca.
Implements idempotent order submission keys and duplicate prevention:
- Deterministic idempotency key generation from job attributes
- Redis-based fast-path duplicate detection before broker submission
- PostgreSQL UNIQUE constraint on idempotency_key as durable fallback
Requirements: 2.4, 8.1, 8.3, 8.5
Design: Section 4.9 - Broker Adapter
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
import asyncpg
import redis.asyncio as aioredis
from services.adapters.broker_adapter import (
AlpacaBrokerAdapter,
OrderRequest,
OrderResponse,
OrderSide,
OrderStatus,
OrderType,
TradingMode,
)
from services.lake_publisher.worker import (
LAKEHOUSE_BUCKET,
publish_positions_daily_batch,
publish_trade_fill,
publish_trade_order,
)
from services.risk.approval import (
ApprovalRequest,
compute_expiry,
create_approval_request,
requires_approval,
)
from services.risk.engine import (
AccountRiskState,
PortfolioRiskConfig,
ProposedOrder,
clamp_order_to_position_limits,
evaluate_order,
)
from services.shared.audit import (
audit_approval_requested,
audit_duplicate_prevented,
audit_order_filled,
audit_order_rejected,
audit_order_submitted,
audit_risk_evaluated,
)
from services.shared.config import load_config
from services.shared.db import get_pg_pool, get_redis
from services.shared.logging import setup_logging
from services.shared.metrics import (
ORDERS_CLAMPED,
ORDERS_DUPLICATES_PREVENTED,
ORDERS_FILLED,
ORDERS_REJECTED,
ORDERS_SUBMITTED,
POSITIONS_SYNCED,
RISK_CHECK_FAILURES,
RISK_EVALUATIONS_TOTAL,
)
from services.shared.redis_keys import QUEUE_BROKER, is_pipeline_enabled, queue_key
logger = logging.getLogger("broker_service")
POSITION_SYNC_INTERVAL = 60 # seconds
# Redis TTL for idempotency markers (24 hours)
ORDER_IDEMPOTENCY_TTL = 86400
ORDER_IDEMPOTENCY_PREFIX = "stonks:order_idempotency"
# ---------------------------------------------------------------------------
# DB persistence helpers
# ---------------------------------------------------------------------------
_UPSERT_BROKER_ACCOUNT = """
INSERT INTO broker_accounts (id, provider, account_id, mode, config, active)
VALUES ($1::uuid, $2, $3, $4, $5::jsonb, TRUE)
ON CONFLICT (id) DO UPDATE SET
config = EXCLUDED.config,
mode = EXCLUDED.mode,
active = TRUE
"""
_INSERT_ORDER = """
INSERT INTO orders (
id, recommendation_id, broker_account_id, ticker, side, order_type,
quantity, limit_price, stop_price, status, idempotency_key,
broker_order_id, decision_trace, submitted_at, filled_at,
fill_price, fill_quantity
) VALUES (
$1::uuid, $2, $3::uuid, $4, $5, $6,
$7, $8, $9, $10, $11,
$12, $13::jsonb, $14, $15,
$16, $17
)
ON CONFLICT (idempotency_key) DO UPDATE SET
status = EXCLUDED.status,
broker_order_id = EXCLUDED.broker_order_id,
filled_at = EXCLUDED.filled_at,
fill_price = EXCLUDED.fill_price,
fill_quantity = EXCLUDED.fill_quantity,
updated_at = NOW()
"""
_INSERT_ORDER_EVENT = """
INSERT INTO order_events (order_id, event_type, data, broker_timestamp)
VALUES ($1::uuid, $2, $3::jsonb, $4)
"""
_INSERT_RISK_EVALUATION = """
INSERT INTO risk_evaluations (id, recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at)
VALUES ($1::uuid, $2::uuid, $3, $4, $5::jsonb, $6::jsonb, $7)
"""
_UPSERT_POSITION = """
INSERT INTO positions (broker_account_id, ticker, quantity, avg_entry_price, current_price, unrealized_pnl, updated_at)
VALUES ($1::uuid, $2, $3, $4, $5, $6, $7)
ON CONFLICT (broker_account_id, ticker)
DO UPDATE SET
quantity = EXCLUDED.quantity,
avg_entry_price = EXCLUDED.avg_entry_price,
current_price = EXCLUDED.current_price,
unrealized_pnl = EXCLUDED.unrealized_pnl,
updated_at = EXCLUDED.updated_at
"""
_LOAD_RISK_CONFIG = """
SELECT config FROM risk_configs WHERE active = TRUE ORDER BY updated_at DESC LIMIT 1
"""
_LOAD_DAILY_SNAPSHOT = """
SELECT portfolio_value, daily_pnl, daily_trade_count, positions_by_sector
FROM daily_risk_snapshots
WHERE account_id = $1 AND snapshot_date = CURRENT_DATE
LIMIT 1
"""
_CHECK_ORDER_BY_IDEMPOTENCY_KEY = """
SELECT id, status, broker_order_id FROM orders
WHERE idempotency_key = $1
LIMIT 1
"""
# ---------------------------------------------------------------------------
# Idempotency helpers (Requirement 8.5)
# ---------------------------------------------------------------------------
def generate_idempotency_key(job: dict[str, Any]) -> str:
"""Generate a deterministic idempotency key from job attributes.
If the job already carries an explicit idempotency_key, use it.
Otherwise, derive a stable key from the combination of
recommendation_id, ticker, side, quantity, and order_type so that
replayed queue messages produce the same key and are detected as
duplicates.
"""
explicit = job.get("idempotency_key")
if explicit:
return str(explicit)
# Build a deterministic key from job content
parts = [
str(job.get("recommendation_id", "")),
str(job.get("ticker", "")),
str(job.get("side", "buy")),
str(job.get("quantity", 0)),
str(job.get("order_type", "market")),
str(job.get("limit_price", "")),
str(job.get("stop_price", "")),
]
raw = "|".join(parts)
return hashlib.sha256(raw.encode()).hexdigest()[:40]
def _redis_idempotency_key(idempotency_key: str) -> str:
"""Build the Redis key for an order idempotency marker."""
return f"{ORDER_IDEMPOTENCY_PREFIX}:{idempotency_key}"
async def check_idempotency_redis(
rds: aioredis.Redis,
idempotency_key: str,
) -> str | None:
"""Fast-path: check Redis for a previously processed idempotency key.
Returns the existing order_id if found, None otherwise.
"""
redis_key = _redis_idempotency_key(idempotency_key)
cached = await rds.get(redis_key)
if cached:
return str(cached)
return None
async def check_idempotency_db(
pool: asyncpg.Pool,
idempotency_key: str,
) -> dict[str, Any] | None:
"""Durable fallback: check PostgreSQL for an existing order with this key.
Returns a dict with id, status, broker_order_id if found, None otherwise.
"""
row = await pool.fetchrow(_CHECK_ORDER_BY_IDEMPOTENCY_KEY, idempotency_key)
if row:
return {
"id": str(row["id"]),
"status": str(row["status"]),
"broker_order_id": str(row["broker_order_id"] or ""),
}
return None
async def mark_idempotency_redis(
rds: aioredis.Redis,
idempotency_key: str,
order_id: str,
) -> None:
"""Set the Redis idempotency marker after an order is processed."""
redis_key = _redis_idempotency_key(idempotency_key)
await rds.set(redis_key, order_id, ex=ORDER_IDEMPOTENCY_TTL)
# ---------------------------------------------------------------------------
# Core service logic
# ---------------------------------------------------------------------------
def build_order_request(job: dict[str, Any]) -> OrderRequest:
"""Build an OrderRequest from a broker queue job payload."""
side = OrderSide.SELL if job.get("side", "buy") == "sell" else OrderSide.BUY
order_type_str = job.get("order_type", "market")
order_type_map = {
"market": OrderType.MARKET,
"limit": OrderType.LIMIT,
"stop": OrderType.STOP,
"stop_limit": OrderType.STOP_LIMIT,
}
return OrderRequest(
ticker=job["ticker"],
side=side,
quantity=float(job.get("quantity", 0)),
order_type=order_type_map.get(order_type_str, OrderType.MARKET),
limit_price=job.get("limit_price"),
stop_price=job.get("stop_price"),
time_in_force=job.get("time_in_force", "day"),
idempotency_key=generate_idempotency_key(job),
)
def build_proposed_order(job: dict[str, Any]) -> ProposedOrder:
"""Build a ProposedOrder for risk evaluation from a broker queue job."""
return ProposedOrder(
recommendation_id=job.get("recommendation_id"),
ticker=job["ticker"],
sector=job.get("sector", ""),
action=job.get("side", "buy"),
quantity=float(job.get("quantity", 0)),
estimated_value=float(job.get("estimated_value", 0)),
confidence=float(job.get("confidence", 0)),
)
async def load_risk_config(pool: asyncpg.Pool) -> PortfolioRiskConfig:
"""Load the active risk configuration from the database."""
row = await pool.fetchrow(_LOAD_RISK_CONFIG)
if row and row["config"]:
data = row["config"] if isinstance(row["config"], dict) else json.loads(row["config"])
return PortfolioRiskConfig.from_db_json(data)
return PortfolioRiskConfig()
async def _estimate_share_price(
adapter: AlpacaBrokerAdapter,
ticker: str,
) -> float:
"""Estimate the current per-share price for a ticker.
Checks existing Alpaca positions first (free, no API call).
Returns 0.0 if no price can be determined.
"""
try:
positions = await adapter.get_positions()
for pos in positions:
if pos.ticker == ticker and pos.current_price > 0:
return pos.current_price
except Exception as e:
logger.debug("Could not fetch positions for price estimate: %s", e)
return 0.0
async def load_account_risk_state(
pool: asyncpg.Pool,
adapter: AlpacaBrokerAdapter,
account_uuid: str,
) -> AccountRiskState:
"""Build an AccountRiskState from the broker and daily snapshot."""
state = AccountRiskState(account_id=account_uuid)
# Get live account info from Alpaca
try:
acct = await adapter.get_account()
state.portfolio_value = acct.portfolio_value
state.cash = acct.cash
state.buying_power = acct.buying_power
except Exception as e:
logger.warning("Failed to fetch account from Alpaca: %s", e)
# Get positions from Alpaca
try:
positions = await adapter.get_positions()
for pos in positions:
state.positions_by_symbol[pos.ticker] = pos.market_value
state.open_position_count = len(positions)
except Exception as e:
logger.warning("Failed to fetch positions from Alpaca: %s", e)
# Overlay daily snapshot from DB
row = await pool.fetchrow(_LOAD_DAILY_SNAPSHOT, account_uuid)
if row:
state.daily_pnl = float(row["daily_pnl"] or 0)
state.daily_trade_count = int(row["daily_trade_count"] or 0)
sector_data = row["positions_by_sector"]
if sector_data:
state.positions_by_sector = (
sector_data if isinstance(sector_data, dict) else json.loads(sector_data)
)
return state
async def persist_order(
pool: asyncpg.Pool,
order_id: str,
order: OrderRequest,
resp: OrderResponse,
account_uuid: str,
risk_eval: dict[str, Any],
recommendation_id: str | None = None,
) -> None:
"""Persist order, events, and risk evaluation to PostgreSQL."""
now = datetime.now(timezone.utc)
filled_at = now if resp.status == OrderStatus.FILLED else None
decision_trace = {
"risk_evaluation": risk_eval,
"order_request": order.to_dict(),
"broker_response": resp.to_dict(),
}
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
_INSERT_ORDER,
order_id,
recommendation_id,
account_uuid,
order.ticker,
order.side.value,
order.order_type.value,
order.quantity,
order.limit_price,
order.stop_price,
resp.status.value,
order.idempotency_key,
resp.broker_order_id,
json.dumps(decision_trace),
resp.submitted_at or now,
filled_at,
resp.filled_avg_price,
resp.filled_quantity,
)
# Record order events
for event_type in ["submitted"]:
await conn.execute(
_INSERT_ORDER_EVENT,
order_id,
event_type,
json.dumps({"ticker": order.ticker, "side": order.side.value}),
now,
)
if resp.status == OrderStatus.FILLED:
await conn.execute(
_INSERT_ORDER_EVENT,
order_id,
"fill",
json.dumps({
"fill_price": resp.filled_avg_price,
"fill_qty": resp.filled_quantity,
}),
now,
)
elif resp.status == OrderStatus.REJECTED:
await conn.execute(
_INSERT_ORDER_EVENT,
order_id,
"rejected",
json.dumps({"error": resp.error}),
now,
)
async def sync_positions(
adapter: AlpacaBrokerAdapter,
pool: asyncpg.Pool,
account_uuid: str,
minio_client: Any | None = None,
) -> None:
"""Sync current positions from Alpaca to PostgreSQL and publish to lake."""
now = datetime.now(timezone.utc)
try:
positions = await adapter.get_positions()
async with pool.acquire() as conn:
for pos in positions:
await conn.execute(
_UPSERT_POSITION,
account_uuid,
pos.ticker,
pos.quantity,
pos.avg_entry_price,
pos.current_price,
pos.unrealized_pnl,
now,
)
logger.info("Synced %d positions from Alpaca", len(positions))
POSITIONS_SYNCED.inc()
# Publish positions snapshot to analytical lake
if minio_client is not None and positions:
try:
pos_dicts = [
{
"ticker": p.ticker,
"quantity": p.quantity,
"avg_entry_price": p.avg_entry_price,
"close_price": p.current_price,
"unrealized_pnl": p.unrealized_pnl,
}
for p in positions
]
publish_positions_daily_batch(
minio_client, pos_dicts, account_uuid, now,
)
except Exception as e:
logger.warning("Failed to publish positions to lake: %s", e)
except Exception as e:
logger.error("Position sync failed: %s", e)
async def register_broker_account(
pool: asyncpg.Pool,
account_uuid: str,
adapter: AlpacaBrokerAdapter,
) -> None:
"""Register or update the broker account in PostgreSQL."""
try:
acct = await adapter.get_account()
config_json = json.dumps({
"provider": "alpaca",
"buying_power": acct.buying_power,
"cash": acct.cash,
"portfolio_value": acct.portfolio_value,
})
await pool.execute(
_UPSERT_BROKER_ACCOUNT,
account_uuid,
"alpaca",
acct.account_id or account_uuid,
adapter.mode.value,
config_json,
)
logger.info(
"Registered Alpaca account: id=%s mode=%s portfolio=%.2f",
acct.account_id, adapter.mode.value, acct.portfolio_value,
)
except Exception as e:
logger.error("Failed to register broker account: %s", e)
async def process_order_job(
job: dict[str, Any],
adapter: AlpacaBrokerAdapter,
pool: asyncpg.Pool,
account_uuid: str,
rds: aioredis.Redis | None = None,
minio_client: Any | None = None,
) -> None:
"""Process a single order job from the broker queue.
1. Generate deterministic idempotency key
2. Check Redis + DB for duplicate (Req 8.5)
3. Build proposed order and run risk evaluation
4. If risk passes, submit to Alpaca
5. Persist order, events, and risk evaluation
6. Set Redis idempotency marker
"""
ticker = job.get("ticker", "???")
order_id = str(uuid.uuid4())
idempotency_key = generate_idempotency_key(job)
# --- Duplicate prevention (Requirement 8.5) ---
# Fast path: Redis check
if rds is not None:
existing_order_id = await check_idempotency_redis(rds, idempotency_key)
if existing_order_id:
logger.info(
"Duplicate order detected (redis) for %s key=%s existing=%s",
ticker, idempotency_key[:16], existing_order_id,
)
ORDERS_DUPLICATES_PREVENTED.labels(detected_via="redis").inc()
await audit_duplicate_prevented(
pool, existing_order_id, ticker, idempotency_key, detected_via="redis",
)
return
# Durable fallback: DB check
existing = await check_idempotency_db(pool, idempotency_key)
if existing:
logger.info(
"Duplicate order detected (db) for %s key=%s existing=%s status=%s",
ticker, idempotency_key[:16], existing["id"], existing["status"],
)
ORDERS_DUPLICATES_PREVENTED.labels(detected_via="db").inc()
await audit_duplicate_prevented(
pool, existing["id"], ticker, idempotency_key, detected_via="db",
)
# Warm Redis cache for future fast-path hits
if rds is not None:
await mark_idempotency_redis(rds, idempotency_key, existing["id"])
return
# Risk evaluation
risk_config = await load_risk_config(pool)
risk_state = await load_account_risk_state(pool, adapter, account_uuid)
proposed = build_proposed_order(job)
# If estimated_value is missing, derive it from Alpaca positions or
# a fresh quote so that position-limit clamping can work.
if proposed.estimated_value <= 0 and proposed.quantity > 0:
price_per_share = await _estimate_share_price(adapter, proposed.ticker)
if price_per_share > 0:
proposed = proposed.model_copy(update={
"estimated_value": proposed.quantity * price_per_share,
})
job["estimated_value"] = proposed.estimated_value
# Auto-clamp buy orders to fit within position limits instead of
# hard-rejecting. If the clamped quantity is zero the normal risk
# evaluation will still reject the order with a clear reason.
original_qty = proposed.quantity
proposed = clamp_order_to_position_limits(proposed, risk_config, risk_state)
if proposed.quantity != original_qty:
logger.info(
"Order for %s clamped from %.0f to %.0f shares "
"(value %.2f%.2f) to fit position limits",
ticker, original_qty, proposed.quantity,
float(job.get("estimated_value", 0)), proposed.estimated_value,
)
ORDERS_CLAMPED.inc()
# Update the job dict so build_order_request picks up the clamped qty
job["quantity"] = proposed.quantity
job["estimated_value"] = proposed.estimated_value
evaluation = evaluate_order(proposed, risk_config, risk_state)
risk_eval_dict = {
"evaluation_id": evaluation.evaluation_id,
"eligible": evaluation.eligible,
"allowed_mode": evaluation.allowed_mode.value,
"rejection_reasons": evaluation.rejection_reasons,
"checks": [c.model_dump(mode="json") for c in evaluation.checks],
}
# Persist risk evaluation
rec_id = job.get("recommendation_id")
try:
await pool.execute(
_INSERT_RISK_EVALUATION,
evaluation.evaluation_id,
rec_id,
evaluation.eligible,
evaluation.allowed_mode.value,
json.dumps(evaluation.rejection_reasons),
json.dumps(risk_eval_dict["checks"]),
evaluation.evaluated_at,
)
except Exception as e:
logger.warning("Failed to persist risk evaluation: %s", e)
# Audit: risk evaluation result
await audit_risk_evaluated(
pool,
evaluation_id=evaluation.evaluation_id,
recommendation_id=rec_id,
ticker=ticker,
eligible=evaluation.eligible,
allowed_mode=evaluation.allowed_mode.value,
rejection_reasons=evaluation.rejection_reasons,
check_count=len(evaluation.checks),
)
if not evaluation.eligible:
RISK_EVALUATIONS_TOTAL.labels(result="rejected").inc()
for check in evaluation.checks:
if check.result.value == "fail":
RISK_CHECK_FAILURES.labels(check_name=check.check_name).inc()
ORDERS_REJECTED.labels(reason_category="risk_engine").inc()
logger.info(
"Order rejected by risk engine for %s: %s",
ticker, evaluation.rejection_reasons,
)
# Persist the rejected order for audit
order_req = build_order_request(job)
rejected_resp = OrderResponse(
broker_order_id="",
status=OrderStatus.REJECTED,
ticker=ticker,
side=OrderSide.SELL if job.get("side") == "sell" else OrderSide.BUY,
quantity=float(job.get("quantity", 0)),
error=f"Risk rejected: {'; '.join(evaluation.rejection_reasons)}",
)
await persist_order(
pool, order_id, order_req, rejected_resp,
account_uuid, risk_eval_dict, rec_id,
)
# Publish rejected order fact to analytical lake
if minio_client is not None:
try:
publish_trade_order(
minio_client, order_id, ticker,
side=job.get("side", "buy"),
order_type=job.get("order_type", "market"),
quantity=float(job.get("quantity", 0)),
limit_price=job.get("limit_price"),
status="rejected",
broker_account=account_uuid,
submitted_at=datetime.now(timezone.utc),
)
except Exception as e:
logger.warning("Failed to publish rejected order to lake: %s", e)
# Audit: order rejected by risk engine
await audit_order_rejected(
pool, order_id, ticker,
reason=f"Risk rejected: {'; '.join(evaluation.rejection_reasons)}",
source="risk_engine",
)
# Mark idempotency even for rejected orders to prevent reprocessing
if rds is not None:
await mark_idempotency_redis(rds, idempotency_key, order_id)
return
# --- Operator approval gate (Requirement 8.2) ---
if requires_approval(risk_config, evaluation.allowed_mode):
expiry = compute_expiry(risk_config)
approval_req = ApprovalRequest(
order_job=job,
recommendation_id=rec_id,
ticker=ticker,
side=job.get("side", "buy"),
quantity=float(job.get("quantity", 0)),
estimated_value=float(job.get("estimated_value", 0)),
risk_evaluation_id=evaluation.evaluation_id,
expires_at=expiry,
)
try:
await create_approval_request(pool, approval_req)
logger.info(
"Order for %s held for operator approval (id=%s, expires=%s)",
ticker, approval_req.approval_id, expiry.isoformat(),
)
await audit_approval_requested(
pool,
approval_id=approval_req.approval_id,
ticker=ticker,
side=approval_req.side,
quantity=approval_req.quantity,
estimated_value=approval_req.estimated_value,
recommendation_id=rec_id,
expires_at=expiry.isoformat(),
)
except Exception as e:
logger.error("Failed to create approval request for %s: %s", ticker, e)
# Do NOT mark idempotency — the job will be re-submitted after approval
return
# Submit to Alpaca
order_req = build_order_request(job)
RISK_EVALUATIONS_TOTAL.labels(result="passed").inc()
# Audit: order submitted to broker
await audit_order_submitted(
pool,
order_id=order_id,
ticker=ticker,
side=order_req.side.value,
quantity=order_req.quantity,
order_type=order_req.order_type.value,
idempotency_key=order_req.idempotency_key,
recommendation_id=rec_id,
evaluation_id=evaluation.evaluation_id,
)
resp = await adapter.submit_order(order_req)
await persist_order(
pool, order_id, order_req, resp,
account_uuid, risk_eval_dict, rec_id,
)
# Publish order fact to analytical lake
if minio_client is not None:
try:
publish_trade_order(
minio_client, order_id, ticker,
side=order_req.side.value,
order_type=order_req.order_type.value,
quantity=order_req.quantity,
limit_price=order_req.limit_price,
status=resp.status.value,
broker_account=account_uuid,
submitted_at=resp.submitted_at or datetime.now(timezone.utc),
)
except Exception as e:
logger.warning("Failed to publish order to lake: %s", e)
# Publish fill fact if the order was filled
if resp.status == OrderStatus.FILLED and resp.filled_avg_price is not None:
try:
fill_id = str(uuid.uuid4())
publish_trade_fill(
minio_client, fill_id, order_id, ticker,
side=order_req.side.value,
fill_price=resp.filled_avg_price,
fill_quantity=resp.filled_quantity,
broker_account=account_uuid,
filled_at=datetime.now(timezone.utc),
)
except Exception as e:
logger.warning("Failed to publish fill to lake: %s", e)
# Mark idempotency after successful persistence
if rds is not None:
await mark_idempotency_redis(rds, idempotency_key, order_id)
if resp.ok:
mode = "paper" if adapter.mode == TradingMode.PAPER else "live"
ORDERS_SUBMITTED.labels(
side=order_req.side.value,
order_type=order_req.order_type.value,
mode=mode,
).inc()
logger.info(
"Order submitted to Alpaca: %s %s %.0f %s @ %s | broker_id=%s",
resp.status.value, order_req.side.value, order_req.quantity,
ticker, resp.filled_avg_price, resp.broker_order_id,
)
# Audit: order filled
if resp.status == OrderStatus.FILLED:
ORDERS_FILLED.labels(side=order_req.side.value).inc()
await audit_order_filled(
pool, order_id, ticker,
side=order_req.side.value,
fill_quantity=resp.filled_quantity,
fill_price=resp.filled_avg_price,
broker_order_id=resp.broker_order_id,
)
else:
ORDERS_REJECTED.labels(reason_category="broker").inc()
logger.warning(
"Order failed for %s: %s (status=%s)",
ticker, resp.error, resp.status.value,
)
# Audit: order rejected by broker
await audit_order_rejected(
pool, order_id, ticker,
reason=resp.error or f"Broker status: {resp.status.value}",
source="broker",
)
async def sync_order_statuses(
adapter: AlpacaBrokerAdapter,
pool: asyncpg.Pool,
minio_client: Any | None = None,
) -> None:
"""Sync pending order statuses from Alpaca to PostgreSQL.
Queries for orders still in 'pending' or 'submitted' state and checks
their current status via the broker API. Updates the local row when
the order has been filled, cancelled, or rejected.
"""
try:
rows = await pool.fetch(
"SELECT id, broker_order_id, ticker, side, quantity "
"FROM orders WHERE status IN ('pending', 'submitted', 'accepted') "
"AND broker_order_id IS NOT NULL "
"AND submitted_at > NOW() - INTERVAL '24 hours'",
)
if not rows:
return
now = datetime.now(timezone.utc)
updated = 0
for row in rows:
try:
resp = await adapter.get_order_status(row["broker_order_id"])
new_status = resp.status.value
if new_status in ("pending", "submitted", "accepted"):
continue # still in-flight
update_fields: dict[str, Any] = {"status": new_status, "updated_at": now}
if resp.status == OrderStatus.FILLED:
update_fields["filled_at"] = now
update_fields["fill_price"] = resp.filled_avg_price
update_fields["fill_quantity"] = resp.filled_quantity
elif resp.status == OrderStatus.CANCELLED:
update_fields["cancelled_at"] = now
elif resp.status == OrderStatus.REJECTED:
update_fields["rejected_at"] = now
update_fields["rejection_reason"] = resp.error
set_clause = ", ".join(f"{k} = ${i+2}" for i, k in enumerate(update_fields))
await pool.execute(
f"UPDATE orders SET {set_clause} WHERE id = $1",
row["id"],
*update_fields.values(),
)
updated += 1
logger.info(
"Order %s (%s %s) status updated: %s -> %s",
row["id"], row["side"], row["ticker"],
"pending", new_status,
)
except Exception:
logger.debug("Could not sync order %s", row["id"], exc_info=True)
if updated:
logger.info("Synced %d order statuses from Alpaca", updated)
except Exception as e:
logger.error("Order status sync failed: %s", e)
async def position_sync_loop(
adapter: AlpacaBrokerAdapter,
pool: asyncpg.Pool,
account_uuid: str,
minio_client: Any | None = None,
) -> None:
"""Periodically sync positions from Alpaca to PostgreSQL and lake."""
while True:
await sync_positions(adapter, pool, account_uuid, minio_client)
await sync_order_statuses(adapter, pool, minio_client)
await asyncio.sleep(POSITION_SYNC_INTERVAL)
async def main() -> None:
config = load_config()
setup_logging("broker_service", level=config.log_level, json_output=config.json_logs)
pool = await get_pg_pool(config)
rds = get_redis(config)
# Initialize MinIO client for lake publishing
from minio import Minio
minio_client = Minio(
config.minio.endpoint,
access_key=config.minio.access_key,
secret_key=config.minio.secret_key,
secure=config.minio.secure,
)
# Ensure lakehouse bucket exists
if not minio_client.bucket_exists(LAKEHOUSE_BUCKET):
minio_client.make_bucket(LAKEHOUSE_BUCKET)
# Determine mode — default to paper for safety (Req 8.1)
mode = TradingMode.LIVE if config.broker.mode == "live" else TradingMode.PAPER
if mode == TradingMode.LIVE:
logger.warning("LIVE trading mode enabled — orders will be submitted to real broker")
adapter = AlpacaBrokerAdapter(
api_key=config.broker.api_key or "",
api_secret=config.broker.api_secret or "",
mode=mode,
base_url=config.broker.base_url,
)
# Generate a stable account UUID from the API key
account_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"alpaca-{config.broker.api_key or 'default'}"))
# Register broker account on startup
await register_broker_account(pool, account_uuid, adapter)
# Start position sync in background
sync_task = asyncio.create_task(
position_sync_loop(adapter, pool, account_uuid, minio_client)
)
queue = queue_key(QUEUE_BROKER)
logger.info("Broker service started (mode=%s)", mode.value)
try:
while True:
if not await is_pipeline_enabled(rds):
await asyncio.sleep(2)
continue
result = await rds.lpop(queue)
raw = str(result) if result else None
if raw:
try:
job = json.loads(raw)
await process_order_job(job, adapter, pool, account_uuid, rds, minio_client)
except Exception:
logger.exception("Error processing broker job")
else:
await asyncio.sleep(2)
finally:
sync_task.cancel()
await pool.close()
await rds.close()
if __name__ == "__main__":
asyncio.run(main())