"""Ingestion worker - processes jobs from the ingestion queue.""" import asyncio import json import logging import asyncpg import redis.asyncio as aioredis from minio import Minio from services.adapters.base import AdapterResult from services.adapters.broker_adapter import AlpacaBrokerAdapter, TradingMode from services.adapters.filings_adapter import SECEdgarAdapter from services.adapters.macro_news_adapter import MacroNewsAdapter from services.adapters.market_adapter import PolygonMarketAdapter from services.adapters.news_adapter import PolygonNewsAdapter from services.adapters.web_scrape_adapter import WebScrapeAdapter from services.shared.config import load_config from services.shared.db import get_minio, get_pg_pool, get_redis from services.shared.dedupe import dedupe_items, mark_as_seen from services.shared.logging import ( Span, inject_trace_context, new_trace_id, set_trace_context, setup_logging, ) from services.shared.metadata import ( persist_ingestion_items, record_retrieval_failure, reset_source_retry_state, ) from services.shared.metrics import ( INGESTION_ADAPTER_DURATION, INGESTION_ERRORS, INGESTION_ITEMS_DEDUPED, INGESTION_ITEMS_FETCHED, INGESTION_ITEMS_NEW, INGESTION_JOBS_TOTAL, ) from services.shared.redis_keys import ( QUEUE_INGESTION, QUEUE_PARSING, dedupe_key, queue_key, ) from services.shared.storage import ( ensure_buckets, upload_raw_artifact, ) logger = logging.getLogger("ingestion_worker") async def process_job( job: dict, pool: asyncpg.Pool, rds: aioredis.Redis, minio_client: Minio, adapters: dict, ): source_type = job["source_type"] ticker = job["ticker"] source_id = job["source_id"] config = job.get("config", {}) set_trace_context(trace_id=job.get("_trace_id") or new_trace_id()) adapter = adapters.get(source_type) if not adapter: logger.warning("No adapter for source_type=%s", source_type) return # Macro sources may not have a company_id company_id = job.get("company_id") # Record ingestion run run_id = await pool.fetchval( """INSERT INTO ingestion_runs (source_id, company_id, source_type, status) VALUES ($1, $2, $3, 'running') RETURNING id""", source_id, company_id, source_type, ) try: with Span("adapter_fetch", ticker=ticker, source_type=source_type): with INGESTION_ADAPTER_DURATION.labels(source_type=source_type).time(): result: AdapterResult = await adapter.fetch(ticker, config) if result.error: INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc() await record_retrieval_failure( pool, run_id=str(run_id), source_id=source_id, error_message=result.error, ) return # Store raw payload in MinIO artifact_type = "raw_html" if source_type == "web_scrape" else "raw_json" storage_uri = upload_raw_artifact( minio_client, source_type=source_type, ticker=ticker, document_id=str(run_id), data=result.raw_payload, artifact_type=artifact_type, ) # Dedupe check on the overall payload hash if result.content_hash: already_seen = await rds.get(dedupe_key(result.content_hash)) if already_seen: logger.info("Duplicate content for %s, skipping", ticker) await pool.execute( "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=0, completed_at=NOW() WHERE id=$1", run_id, len(result.items), ) return await rds.set(dedupe_key(result.content_hash), "1", ex=86400) # Cross-source dedupe on individual document items (news, filings, web_scrape) items_to_persist = result.items deduped_count = 0 if source_type not in ("market_api", "broker"): items_to_persist, dup_items = await dedupe_items(pool, rds, result.items) deduped_count = len(dup_items) if deduped_count: INGESTION_ITEMS_DEDUPED.labels(source_type=source_type).inc(deduped_count) logger.info( "Deduped %d/%d items for %s/%s", deduped_count, len(result.items), ticker, source_type, ) # Persist metadata via the unified metadata module new_items, new_ids = await persist_ingestion_items( pool, source_type=source_type, ticker=ticker, company_id=job.get("company_id"), items=items_to_persist, storage_ref=storage_uri, adapter_metadata=result.metadata, content_hash=result.content_hash, ) # Enqueue new document items for parsing (not market/broker) if source_type not in ("market_api", "broker"): for doc_id in new_ids: await rds.rpush(queue_key(QUEUE_PARSING), json.dumps(inject_trace_context({ "document_id": doc_id, "ticker": ticker, "source_type": source_type, }))) # Mark newly persisted documents in Redis for fast future dedupe for item, doc_id in zip(items_to_persist, new_ids): await mark_as_seen( rds, content_hash=item.get("content_hash", ""), canonical_url=item.get("canonical_url"), document_id=doc_id, ) # Link duplicate documents to this company if not already linked company_id = job.get("company_id") if company_id and deduped_count and source_type not in ("macro_news",): from services.shared.metadata import persist_document_company_mention for dup in dup_items: existing_id = dup.get("_dedupe_existing_id") if existing_id: try: await persist_document_company_mention( pool, document_id=existing_id, company_id=company_id, ticker=ticker, mention_type="cross_source", ) except Exception: # Duplicate mention link — safe to ignore pass await pool.execute( "UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=$3, completed_at=NOW() WHERE id=$1", run_id, len(result.items), new_items, ) # Clear any accumulated retry backoff after success await reset_source_retry_state(pool, source_id) INGESTION_ITEMS_FETCHED.labels(source_type=source_type).inc(len(result.items)) INGESTION_ITEMS_NEW.labels(source_type=source_type).inc(new_items) INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="success").inc() logger.info( "Ingested %s/%s: %d fetched, %d new", ticker, source_type, len(result.items), new_items, extra={"ticker": ticker, "source_type": source_type, "count": new_items}, ) # Track the latest published_utc so next fetch only gets newer articles if source_type in ("macro_news", "news_api") and result.items: latest_pub = None for item in result.items: pub = item.get("published_utc") if pub and (latest_pub is None or pub > latest_pub): latest_pub = pub if latest_pub: try: await pool.execute( "UPDATE sources SET config = config || $1::jsonb WHERE id = $2", json.dumps({"last_published_at": latest_pub}), source_id, ) except Exception: pass # Non-critical except Exception as e: INGESTION_ERRORS.labels(source_type=source_type).inc() INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc() logger.error( "Ingestion error for %s: %s", ticker, e, extra={"ticker": ticker, "source_type": source_type, "error": str(e)}, ) await record_retrieval_failure( pool, run_id=str(run_id), source_id=source_id, error_message=str(e), ) async def main(): cfg = load_config() setup_logging("ingestion_worker", level=cfg.log_level, json_output=cfg.json_logs) pool = await get_pg_pool(cfg) rds = get_redis(cfg) minio_client = get_minio(cfg) # Ensure all required buckets exist ensure_buckets(minio_client) adapters = { "market_api": PolygonMarketAdapter( api_key=cfg.market_data.api_key, base_url=cfg.market_data.base_url, ), "news_api": PolygonNewsAdapter( api_key=cfg.market_data.api_key, base_url="https://api.polygon.io", ), "filings_api": SECEdgarAdapter(), "web_scrape": WebScrapeAdapter(), "broker": AlpacaBrokerAdapter( api_key=cfg.broker.api_key or "", api_secret=cfg.broker.api_secret or "", mode=TradingMode.LIVE if cfg.broker.mode == "live" else TradingMode.PAPER, base_url=cfg.broker.base_url, ), "macro_news": MacroNewsAdapter( api_key=cfg.market_data.api_key, ), } logger.info("Ingestion worker started") queue = queue_key(QUEUE_INGESTION) try: while True: raw = await rds.lpop(queue) if raw: job = json.loads(raw) await process_job(job, pool, rds, minio_client, adapters) else: await asyncio.sleep(2) finally: await pool.close() await rds.close() if __name__ == "__main__": asyncio.run(main())