Files
stonks-oracle/services/extractor/main.py
T

74 lines
2.3 KiB
Python

"""Extractor worker entrypoint - polls Redis for extraction jobs."""
from __future__ import annotations
import asyncio
import logging
import asyncpg
from minio import Minio
from services.extractor.client import OllamaClient
from services.extractor.worker import persist_extraction
from services.shared.config import load_config
from services.shared.logging import setup_logging
from services.shared.redis_keys import QUEUE_EXTRACTION, queue_key
logger = logging.getLogger("extractor_main")
async def main() -> None:
config = load_config()
setup_logging("extractor", level=config.log_level, json_output=config.json_logs)
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
minio_client = Minio(
config.minio.endpoint,
access_key=config.minio.access_key,
secret_key=config.minio.secret_key,
secure=config.minio.secure,
)
ollama = OllamaClient(config.ollama)
import json
import redis.asyncio as aioredis
redis_client = aioredis.from_url(config.redis.url)
queue = queue_key(QUEUE_EXTRACTION)
logger.info("Extractor worker started, polling %s", queue)
try:
while True:
raw = await redis_client.lpop(queue)
if raw is None:
await asyncio.sleep(1)
continue
payload = raw
job = json.loads(payload)
document_id = job.get("document_id", "")
ticker = job.get("ticker", "")
text = job.get("text", "") or job.get("normalized_text", "")
logger.info("Processing extraction job for doc %s / %s", document_id, ticker)
try:
extraction_response = await ollama.extract(text)
await persist_extraction(
pool=pool,
minio_client=minio_client,
document_id=document_id,
ticker=ticker,
extraction_response=extraction_response,
document_text_length=len(text),
)
except Exception:
logger.exception("Extraction failed for doc %s", document_id)
finally:
await pool.close()
await redis_client.close()
if __name__ == "__main__":
asyncio.run(main())