From bb40a3cb8eae22f06f7844ea6d424ecbd4473550 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Wed, 29 Apr 2026 12:02:57 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20position=20sync=20now=20reconciles=20?= =?UTF-8?q?=E2=80=94=20removes=20positions=20broker=20no=20longer=20holds?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The sync_positions loop only upserted positions from Alpaca but never deleted DB rows for positions that were closed/liquidated on the broker side. After a paper reset, the next sync would not remove the stale positions because they simply weren't in Alpaca's response anymore. Now performs full reconciliation: after upserting what Alpaca reports, deletes any DB positions for the account that Alpaca no longer holds. --- services/adapters/broker_service.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/services/adapters/broker_service.py b/services/adapters/broker_service.py index 624a324..6421dd7 100644 --- a/services/adapters/broker_service.py +++ b/services/adapters/broker_service.py @@ -428,10 +428,16 @@ async def sync_positions( account_uuid: str, minio_client: Any | None = None, ) -> None: - """Sync current positions from Alpaca to PostgreSQL and publish to lake.""" + """Sync current positions from Alpaca to PostgreSQL and publish to lake. + + Performs a full reconciliation: upserts positions that Alpaca reports, + then removes any DB positions that Alpaca no longer holds (e.g. after + a paper reset or full liquidation). + """ now = datetime.now(timezone.utc) try: positions = await adapter.get_positions() + broker_tickers = {pos.ticker for pos in positions} async with pool.acquire() as conn: for pos in positions: await conn.execute( @@ -444,7 +450,20 @@ async def sync_positions( pos.unrealized_pnl, now, ) - logger.info("Synced %d positions from Alpaca", len(positions)) + # Remove positions that the broker no longer reports (closed/liquidated) + if broker_tickers: + await conn.execute( + "DELETE FROM positions WHERE broker_account_id = $1::uuid AND ticker != ALL($2::varchar[])", + account_uuid, + list(broker_tickers), + ) + else: + # Broker reports zero positions — clear all local positions for this account + await conn.execute( + "DELETE FROM positions WHERE broker_account_id = $1::uuid", + account_uuid, + ) + logger.info("Synced %d positions from Alpaca (reconciled)", len(positions)) POSITIONS_SYNCED.inc() # Publish positions snapshot to analytical lake