diff --git a/services/extractor/main.py b/services/extractor/main.py index f9ec66b..1d23839 100644 --- a/services/extractor/main.py +++ b/services/extractor/main.py @@ -250,15 +250,30 @@ async def main() -> None: # Pre-load company ID map (refreshed periodically) company_id_map = await _build_company_id_map(pool) refresh_counter = 0 + # Alternate between queues to prevent starvation: process 1 macro then 2 extractions + macro_turn_counter = 0 try: while True: - # Check macro classification queue first (priority) - raw = await redis_client.lpop(macro_queue) - is_macro_job = raw is not None + # Alternate: every 3rd job from macro queue, rest from extraction + # This prevents macro events from starving regular extractions + raw = None + is_macro_job = False - if raw is None: + if macro_turn_counter % 3 == 0: + # Try macro first + raw = await redis_client.lpop(macro_queue) + is_macro_job = raw is not None + if raw is None: + raw = await redis_client.lpop(queue) + else: + # Try extraction first raw = await redis_client.lpop(queue) + if raw is None: + raw = await redis_client.lpop(macro_queue) + is_macro_job = raw is not None + + macro_turn_counter += 1 if raw is None: await asyncio.sleep(1)