fix: add order status sync loop to broker adapter — pending orders now update to filled/cancelled
This commit is contained in:
@@ -752,6 +752,68 @@ async def process_order_job(
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def sync_order_statuses(
|
||||||
|
adapter: AlpacaBrokerAdapter,
|
||||||
|
pool: asyncpg.Pool,
|
||||||
|
minio_client: Any | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Sync pending order statuses from Alpaca to PostgreSQL.
|
||||||
|
|
||||||
|
Queries for orders still in 'pending' or 'submitted' state and checks
|
||||||
|
their current status via the broker API. Updates the local row when
|
||||||
|
the order has been filled, cancelled, or rejected.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
rows = await pool.fetch(
|
||||||
|
"SELECT id, broker_order_id, ticker, side, quantity "
|
||||||
|
"FROM orders WHERE status IN ('pending', 'submitted', 'accepted') "
|
||||||
|
"AND broker_order_id IS NOT NULL "
|
||||||
|
"AND submitted_at > NOW() - INTERVAL '24 hours'",
|
||||||
|
)
|
||||||
|
if not rows:
|
||||||
|
return
|
||||||
|
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
updated = 0
|
||||||
|
for row in rows:
|
||||||
|
try:
|
||||||
|
resp = await adapter.get_order_status(row["broker_order_id"])
|
||||||
|
new_status = resp.status.value
|
||||||
|
if new_status in ("pending", "submitted", "accepted"):
|
||||||
|
continue # still in-flight
|
||||||
|
|
||||||
|
update_fields: dict[str, Any] = {"status": new_status, "updated_at": now}
|
||||||
|
if resp.status == OrderStatus.FILLED:
|
||||||
|
update_fields["filled_at"] = now
|
||||||
|
update_fields["fill_price"] = resp.filled_avg_price
|
||||||
|
update_fields["fill_quantity"] = resp.filled_quantity
|
||||||
|
elif resp.status == OrderStatus.CANCELLED:
|
||||||
|
update_fields["cancelled_at"] = now
|
||||||
|
elif resp.status == OrderStatus.REJECTED:
|
||||||
|
update_fields["rejected_at"] = now
|
||||||
|
update_fields["rejection_reason"] = resp.error
|
||||||
|
|
||||||
|
set_clause = ", ".join(f"{k} = ${i+2}" for i, k in enumerate(update_fields))
|
||||||
|
await pool.execute(
|
||||||
|
f"UPDATE orders SET {set_clause} WHERE id = $1",
|
||||||
|
row["id"],
|
||||||
|
*update_fields.values(),
|
||||||
|
)
|
||||||
|
updated += 1
|
||||||
|
logger.info(
|
||||||
|
"Order %s (%s %s) status updated: %s -> %s",
|
||||||
|
row["id"], row["side"], row["ticker"],
|
||||||
|
"pending", new_status,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Could not sync order %s", row["id"], exc_info=True)
|
||||||
|
|
||||||
|
if updated:
|
||||||
|
logger.info("Synced %d order statuses from Alpaca", updated)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Order status sync failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
async def position_sync_loop(
|
async def position_sync_loop(
|
||||||
adapter: AlpacaBrokerAdapter,
|
adapter: AlpacaBrokerAdapter,
|
||||||
pool: asyncpg.Pool,
|
pool: asyncpg.Pool,
|
||||||
@@ -761,6 +823,7 @@ async def position_sync_loop(
|
|||||||
"""Periodically sync positions from Alpaca to PostgreSQL and lake."""
|
"""Periodically sync positions from Alpaca to PostgreSQL and lake."""
|
||||||
while True:
|
while True:
|
||||||
await sync_positions(adapter, pool, account_uuid, minio_client)
|
await sync_positions(adapter, pool, account_uuid, minio_client)
|
||||||
|
await sync_order_statuses(adapter, pool, minio_client)
|
||||||
await asyncio.sleep(POSITION_SYNC_INTERVAL)
|
await asyncio.sleep(POSITION_SYNC_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user