"""Ingestion worker - processes jobs from the ingestion queue.""" import asyncio import hashlib import io import json import logging from datetime import datetime import asyncpg import redis.asyncio as aioredis from minio import Minio from services.adapters.base import AdapterResult from services.adapters.filings_adapter import FilingsAdapter from services.adapters.market_adapter import MarketDataAdapter from services.adapters.news_adapter import NewsApiAdapter from services.shared.config import load_config from services.shared.db import get_minio, get_pg_pool, get_redis from services.shared.redis_keys import ( QUEUE_INGESTION, QUEUE_PARSING, dedupe_key, queue_key, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger("ingestion_worker") BUCKET_MAP = { "market_api": "stonks-raw-market", "news_api": "stonks-raw-news", "filings_api": "stonks-raw-filings", "broker": "stonks-raw-market", } def build_storage_path(source_type: str, ticker: str, doc_id: str) -> str: now = datetime.utcnow() return f"{source_type}/{ticker}/{now.year}/{now.month:02d}/{now.day:02d}/{doc_id}/raw.json" async def store_raw_artifact(minio_client: Minio, bucket: str, path: str, data: bytes): minio_client.put_object(bucket, path, io.BytesIO(data), len(data), content_type="application/json") async def process_job( job: dict, pool: asyncpg.Pool, rds: aioredis.Redis, minio_client: Minio, adapters: dict, ): source_type = job["source_type"] ticker = job["ticker"] source_id = job["source_id"] config = job.get("config", {}) adapter = adapters.get(source_type) if not adapter: logger.warning(f"No adapter for source_type={source_type}") return # Record ingestion run run_id = await pool.fetchval( """INSERT INTO ingestion_runs (source_id, company_id, source_type, status) VALUES ($1, $2, $3, 'running') RETURNING id""", source_id, job["company_id"], source_type, ) try: result: AdapterResult = await adapter.fetch(ticker, config) if result.error: await pool.execute( "UPDATE ingestion_runs SET status='failed', error_message=$2, completed_at=NOW() WHERE id=$1", run_id, result.error, ) return # Store raw payload bucket = BUCKET_MAP.get(source_type, "stonks-raw-market") storage_path = build_storage_path(source_type, ticker, str(run_id)) await store_raw_artifact(minio_client, bucket, storage_path, result.raw_payload) # Dedupe check if result.content_hash: already_seen = await rds.get(dedupe_key(result.content_hash)) if already_seen: logger.info(f"Duplicate content for {ticker}, skipping") await pool.execute( "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=0, completed_at=NOW() WHERE id=$1", run_id, len(result.items), ) return await rds.set(dedupe_key(result.content_hash), "1", ex=86400) new_items = 0 for item in result.items: item_json = json.dumps(item) item_hash = hashlib.sha256(item_json.encode()).hexdigest() # Check if document already exists exists = await pool.fetchval("SELECT 1 FROM documents WHERE content_hash = $1", item_hash) if exists: continue title = item.get("title", item.get("name", "")) url = item.get("url", item.get("link", "")) published = item.get("publishedAt", item.get("published_at")) doc_id = await pool.fetchval( """INSERT INTO documents (document_type, source_type, publisher, url, title, published_at, content_hash, raw_storage_ref, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'ingested') RETURNING id""", "article" if source_type == "news_api" else "filing" if source_type == "filings_api" else "article", source_type, item.get("source", {}).get("name", "") if isinstance(item.get("source"), dict) else str(item.get("source", "")), url, title, datetime.fromisoformat(published.replace("Z", "+00:00")) if published else None, item_hash, f"s3://{bucket}/{storage_path}", ) # Enqueue for parsing await rds.rpush(queue_key(QUEUE_PARSING), json.dumps({ "document_id": str(doc_id), "ticker": ticker, "source_type": source_type, "url": url, })) new_items += 1 await pool.execute( "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=$3, completed_at=NOW() WHERE id=$1", run_id, len(result.items), new_items, ) logger.info(f"Ingested {ticker}/{source_type}: {len(result.items)} fetched, {new_items} new") except Exception as e: logger.error(f"Ingestion error for {ticker}: {e}") await pool.execute( "UPDATE ingestion_runs SET status='failed', error_message=$2, completed_at=NOW() WHERE id=$1", run_id, str(e), ) async def main(): config = load_config() pool = await get_pg_pool(config) rds = get_redis(config) minio_client = get_minio(config) adapters = { "market_api": MarketDataAdapter( api_key=config.broker.api_key or "", base_url="https://api.polygon.io", ), "news_api": NewsApiAdapter( api_key="", base_url="https://newsapi.org", ), "filings_api": FilingsAdapter(), } logger.info("Ingestion worker started") queue = queue_key(QUEUE_INGESTION) try: while True: raw = await rds.lpop(queue) if raw: job = json.loads(raw) await process_job(job, pool, rds, minio_client, adapters) else: await asyncio.sleep(2) finally: await pool.close() await rds.close() if __name__ == "__main__": asyncio.run(main())