"""Extractor worker entrypoint - polls Redis for extraction jobs.""" from __future__ import annotations import asyncio import json import logging import asyncpg import redis.asyncio as aioredis from minio import Minio from services.extractor.client import OllamaClient from services.extractor.worker import persist_extraction from services.shared.config import load_config from services.shared.logging import inject_trace_context, setup_logging from services.shared.redis_keys import ( QUEUE_AGGREGATION, QUEUE_EXTRACTION, queue_key, ) logger = logging.getLogger("extractor_main") async def _build_company_id_map(pool: asyncpg.Pool) -> dict[str, str]: """Build a ticker -> company_id mapping from the companies table.""" rows = await pool.fetch("SELECT id, ticker FROM companies WHERE active = TRUE") return {row["ticker"]: str(row["id"]) for row in rows} async def main() -> None: config = load_config() setup_logging("extractor", level=config.log_level, json_output=config.json_logs) pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8) minio_client = Minio( config.minio.endpoint, access_key=config.minio.access_key, secret_key=config.minio.secret_key, secure=config.minio.secure, ) ollama = OllamaClient(config.ollama) redis_client = aioredis.from_url(config.redis.url) queue = queue_key(QUEUE_EXTRACTION) agg_queue = queue_key(QUEUE_AGGREGATION) logger.info("Extractor worker started, polling %s", queue) # Pre-load company ID map (refreshed periodically) company_id_map = await _build_company_id_map(pool) refresh_counter = 0 try: while True: raw = await redis_client.lpop(queue) if raw is None: await asyncio.sleep(1) continue job = json.loads(raw) document_id = job.get("document_id", "") ticker = job.get("ticker", "") text = job.get("text", "") or job.get("normalized_text", "") # If no text in job, try to fetch from MinIO via the document's normalized_storage_ref if not text: ref_row = await pool.fetchrow( "SELECT normalized_storage_ref FROM documents WHERE id = $1::uuid", document_id, ) if ref_row and ref_row["normalized_storage_ref"]: try: ref = ref_row["normalized_storage_ref"] # ref format: s3://bucket/path parts = ref.replace("s3://", "").split("/", 1) if len(parts) == 2: obj = minio_client.get_object(parts[0], parts[1]) text = obj.read().decode("utf-8") obj.close() obj.release_conn() except Exception as e: logger.warning("Could not fetch normalized text for doc %s: %s", document_id, e) logger.info("Processing extraction job for doc %s / %s", document_id, ticker) # Refresh company map every 100 jobs refresh_counter += 1 if refresh_counter % 100 == 0: company_id_map = await _build_company_id_map(pool) try: # Pass all tracked tickers so the model can identify any mentioned companies all_tickers = list(company_id_map.keys()) if company_id_map else ([ticker] if ticker else None) extraction_response = await ollama.extract( text, document_id=document_id, known_tickers=all_tickers, ) result = await persist_extraction( pool=pool, minio_client=minio_client, document_id=document_id, ticker=ticker, extraction_response=extraction_response, company_id_map=company_id_map, document_text_length=len(text), ) # Enqueue aggregation job for the ticker on success if result.success and ticker: await redis_client.rpush( agg_queue, json.dumps(inject_trace_context({"ticker": ticker})), ) except Exception: logger.exception("Extraction failed for doc %s", document_id) finally: await pool.close() await redis_client.close() if __name__ == "__main__": asyncio.run(main())