diff --git a/services/adapters/broker_service.py b/services/adapters/broker_service.py index 0b0cf4a..6fdad31 100644 --- a/services/adapters/broker_service.py +++ b/services/adapters/broker_service.py @@ -752,6 +752,68 @@ async def process_order_job( +async def sync_order_statuses( + adapter: AlpacaBrokerAdapter, + pool: asyncpg.Pool, + minio_client: Any | None = None, +) -> None: + """Sync pending order statuses from Alpaca to PostgreSQL. + + Queries for orders still in 'pending' or 'submitted' state and checks + their current status via the broker API. Updates the local row when + the order has been filled, cancelled, or rejected. + """ + try: + rows = await pool.fetch( + "SELECT id, broker_order_id, ticker, side, quantity " + "FROM orders WHERE status IN ('pending', 'submitted', 'accepted') " + "AND broker_order_id IS NOT NULL " + "AND submitted_at > NOW() - INTERVAL '24 hours'", + ) + if not rows: + return + + now = datetime.now(timezone.utc) + updated = 0 + for row in rows: + try: + resp = await adapter.get_order_status(row["broker_order_id"]) + new_status = resp.status.value + if new_status in ("pending", "submitted", "accepted"): + continue # still in-flight + + update_fields: dict[str, Any] = {"status": new_status, "updated_at": now} + if resp.status == OrderStatus.FILLED: + update_fields["filled_at"] = now + update_fields["fill_price"] = resp.filled_avg_price + update_fields["fill_quantity"] = resp.filled_quantity + elif resp.status == OrderStatus.CANCELLED: + update_fields["cancelled_at"] = now + elif resp.status == OrderStatus.REJECTED: + update_fields["rejected_at"] = now + update_fields["rejection_reason"] = resp.error + + set_clause = ", ".join(f"{k} = ${i+2}" for i, k in enumerate(update_fields)) + await pool.execute( + f"UPDATE orders SET {set_clause} WHERE id = $1", + row["id"], + *update_fields.values(), + ) + updated += 1 + logger.info( + "Order %s (%s %s) status updated: %s -> %s", + row["id"], row["side"], row["ticker"], + "pending", new_status, + ) + except Exception: + logger.debug("Could not sync order %s", row["id"], exc_info=True) + + if updated: + logger.info("Synced %d order statuses from Alpaca", updated) + except Exception as e: + logger.error("Order status sync failed: %s", e) + + async def position_sync_loop( adapter: AlpacaBrokerAdapter, pool: asyncpg.Pool, @@ -761,6 +823,7 @@ async def position_sync_loop( """Periodically sync positions from Alpaca to PostgreSQL and lake.""" while True: await sync_positions(adapter, pool, account_uuid, minio_client) + await sync_order_statuses(adapter, pool, minio_client) await asyncio.sleep(POSITION_SYNC_INTERVAL)