"""Extractor worker entrypoint - polls Redis for extraction jobs.""" from __future__ import annotations import asyncio import logging import asyncpg from minio import Minio from services.extractor.client import OllamaClient from services.extractor.worker import persist_extraction from services.shared.config import load_config from services.shared.logging import setup_logging from services.shared.redis_keys import QUEUE_EXTRACTION, queue_key logger = logging.getLogger("extractor_main") async def main() -> None: config = load_config() setup_logging("extractor", level=config.log_level, json_output=config.json_logs) pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8) minio_client = Minio( config.minio.endpoint, access_key=config.minio.access_key, secret_key=config.minio.secret_key, secure=config.minio.secure, ) ollama = OllamaClient(config.ollama) import json import redis.asyncio as aioredis redis_client = aioredis.from_url(config.redis.url) queue = queue_key(QUEUE_EXTRACTION) logger.info("Extractor worker started, polling %s", queue) try: while True: raw = await redis_client.lpop(queue) if raw is None: await asyncio.sleep(1) continue payload = raw job = json.loads(payload) document_id = job.get("document_id", "") ticker = job.get("ticker", "") text = job.get("text", "") logger.info("Processing extraction job for doc %s / %s", document_id, ticker) try: extraction_response = await ollama.extract(text) await persist_extraction( pool=pool, minio_client=minio_client, document_id=document_id, ticker=ticker, extraction_response=extraction_response, document_text_length=len(text), ) except Exception: logger.exception("Extraction failed for doc %s", document_id) finally: await pool.close() await redis_client.close() if __name__ == "__main__": asyncio.run(main())