From f83577480f5c288b24624bb44c6a240f0934a4f4 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 17:45:25 +0000 Subject: [PATCH] fix: alternate extractor between macro and extraction queues (1:2 ratio) to prevent starvation --- services/extractor/main.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/services/extractor/main.py b/services/extractor/main.py index f9ec66b..1d23839 100644 --- a/services/extractor/main.py +++ b/services/extractor/main.py @@ -250,15 +250,30 @@ async def main() -> None: # Pre-load company ID map (refreshed periodically) company_id_map = await _build_company_id_map(pool) refresh_counter = 0 + # Alternate between queues to prevent starvation: process 1 macro then 2 extractions + macro_turn_counter = 0 try: while True: - # Check macro classification queue first (priority) - raw = await redis_client.lpop(macro_queue) - is_macro_job = raw is not None + # Alternate: every 3rd job from macro queue, rest from extraction + # This prevents macro events from starving regular extractions + raw = None + is_macro_job = False - if raw is None: + if macro_turn_counter % 3 == 0: + # Try macro first + raw = await redis_client.lpop(macro_queue) + is_macro_job = raw is not None + if raw is None: + raw = await redis_client.lpop(queue) + else: + # Try extraction first raw = await redis_client.lpop(queue) + if raw is None: + raw = await redis_client.lpop(macro_queue) + is_macro_job = raw is not None + + macro_turn_counter += 1 if raw is None: await asyncio.sleep(1)