Files
stonks-oracle/services/ingestion/worker.py
T

254 lines
9.1 KiB
Python

"""Ingestion worker - processes jobs from the ingestion queue."""
import asyncio
import json
import logging
import asyncpg
import redis.asyncio as aioredis
from minio import Minio
from services.adapters.base import AdapterResult
from services.adapters.broker_adapter import AlpacaBrokerAdapter, TradingMode
from services.adapters.filings_adapter import SECEdgarAdapter
from services.adapters.market_adapter import PolygonMarketAdapter
from services.adapters.news_adapter import PolygonNewsAdapter
from services.adapters.web_scrape_adapter import WebScrapeAdapter
from services.shared.config import load_config
from services.shared.db import get_minio, get_pg_pool, get_redis
from services.shared.dedupe import dedupe_items, mark_as_seen
from services.shared.metadata import (
persist_ingestion_items,
record_retrieval_failure,
reset_source_retry_state,
)
from services.shared.redis_keys import (
QUEUE_INGESTION,
QUEUE_PARSING,
dedupe_key,
queue_key,
)
from services.shared.logging import Span, extract_trace_context, inject_trace_context, new_trace_id, set_trace_context, setup_logging
from services.shared.metrics import (
ACTIVE_JOBS,
INGESTION_ADAPTER_DURATION,
INGESTION_ERRORS,
INGESTION_ITEMS_DEDUPED,
INGESTION_ITEMS_FETCHED,
INGESTION_ITEMS_NEW,
INGESTION_JOBS_TOTAL,
)
from services.shared.storage import (
bucket_for_source,
ensure_buckets,
upload_raw_artifact,
)
logger = logging.getLogger("ingestion_worker")
async def process_job(
job: dict,
pool: asyncpg.Pool,
rds: aioredis.Redis,
minio_client: Minio,
adapters: dict,
):
source_type = job["source_type"]
ticker = job["ticker"]
source_id = job["source_id"]
config = job.get("config", {})
set_trace_context(trace_id=job.get("_trace_id") or new_trace_id())
adapter = adapters.get(source_type)
if not adapter:
logger.warning("No adapter for source_type=%s", source_type)
return
# Record ingestion run
run_id = await pool.fetchval(
"""INSERT INTO ingestion_runs (source_id, company_id, source_type, status)
VALUES ($1, $2, $3, 'running') RETURNING id""",
source_id, job["company_id"], source_type,
)
try:
with Span("adapter_fetch", ticker=ticker, source_type=source_type):
with INGESTION_ADAPTER_DURATION.labels(source_type=source_type).time():
result: AdapterResult = await adapter.fetch(ticker, config)
if result.error:
INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc()
await record_retrieval_failure(
pool,
run_id=str(run_id),
source_id=source_id,
error_message=result.error,
)
return
# Store raw payload in MinIO
bucket = bucket_for_source(source_type)
artifact_type = "raw_html" if source_type == "web_scrape" else "raw_json"
storage_uri = upload_raw_artifact(
minio_client,
source_type=source_type,
ticker=ticker,
document_id=str(run_id),
data=result.raw_payload,
artifact_type=artifact_type,
)
# Dedupe check on the overall payload hash
if result.content_hash:
already_seen = await rds.get(dedupe_key(result.content_hash))
if already_seen:
logger.info("Duplicate content for %s, skipping", ticker)
await pool.execute(
"UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=0, completed_at=NOW() WHERE id=$1",
run_id, len(result.items),
)
return
await rds.set(dedupe_key(result.content_hash), "1", ex=86400)
# Cross-source dedupe on individual document items (news, filings, web_scrape)
items_to_persist = result.items
deduped_count = 0
if source_type not in ("market_api", "broker"):
items_to_persist, dup_items = await dedupe_items(pool, rds, result.items)
deduped_count = len(dup_items)
if deduped_count:
INGESTION_ITEMS_DEDUPED.labels(source_type=source_type).inc(deduped_count)
logger.info(
"Deduped %d/%d items for %s/%s",
deduped_count, len(result.items), ticker, source_type,
)
# Persist metadata via the unified metadata module
new_items, new_ids = await persist_ingestion_items(
pool,
source_type=source_type,
ticker=ticker,
company_id=job.get("company_id"),
items=items_to_persist,
storage_ref=storage_uri,
adapter_metadata=result.metadata,
content_hash=result.content_hash,
)
# Enqueue new document items for parsing (not market/broker)
if source_type not in ("market_api", "broker"):
for doc_id in new_ids:
await rds.rpush(queue_key(QUEUE_PARSING), json.dumps(inject_trace_context({
"document_id": doc_id,
"ticker": ticker,
"source_type": source_type,
})))
# Mark newly persisted documents in Redis for fast future dedupe
for item, doc_id in zip(items_to_persist, new_ids):
await mark_as_seen(
rds,
content_hash=item.get("content_hash", ""),
canonical_url=item.get("canonical_url"),
document_id=doc_id,
)
# Link duplicate documents to this company if not already linked
company_id = job.get("company_id")
if company_id and deduped_count:
from services.shared.metadata import persist_document_company_mention
for dup in dup_items:
existing_id = dup.get("_dedupe_existing_id")
if existing_id:
try:
await persist_document_company_mention(
pool,
document_id=existing_id,
company_id=company_id,
ticker=ticker,
mention_type="cross_source",
)
except Exception:
# Duplicate mention link — safe to ignore
pass
await pool.execute(
"UPDATE ingestion_runs SET status='completed', items_fetched=$2, items_new=$3, completed_at=NOW() WHERE id=$1",
run_id, len(result.items), new_items,
)
# Clear any accumulated retry backoff after success
await reset_source_retry_state(pool, source_id)
INGESTION_ITEMS_FETCHED.labels(source_type=source_type).inc(len(result.items))
INGESTION_ITEMS_NEW.labels(source_type=source_type).inc(new_items)
INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="success").inc()
logger.info(
"Ingested %s/%s: %d fetched, %d new",
ticker, source_type, len(result.items), new_items,
extra={"ticker": ticker, "source_type": source_type, "count": new_items},
)
except Exception as e:
INGESTION_ERRORS.labels(source_type=source_type).inc()
INGESTION_JOBS_TOTAL.labels(source_type=source_type, status="error").inc()
logger.error(
"Ingestion error for %s: %s", ticker, e,
extra={"ticker": ticker, "source_type": source_type, "error": str(e)},
)
await record_retrieval_failure(
pool,
run_id=str(run_id),
source_id=source_id,
error_message=str(e),
)
async def main():
cfg = load_config()
setup_logging("ingestion_worker", level=cfg.log_level, json_output=cfg.json_logs)
pool = await get_pg_pool(cfg)
rds = get_redis(cfg)
minio_client = get_minio(cfg)
# Ensure all required buckets exist
ensure_buckets(minio_client)
adapters = {
"market_api": PolygonMarketAdapter(
api_key=cfg.market_data.api_key,
base_url=cfg.market_data.base_url,
),
"news_api": PolygonNewsAdapter(
api_key=cfg.market_data.api_key,
base_url="https://api.polygon.io",
),
"filings_api": SECEdgarAdapter(),
"web_scrape": WebScrapeAdapter(),
"broker": AlpacaBrokerAdapter(
api_key=cfg.broker.api_key or "",
api_secret=cfg.broker.api_secret or "",
mode=TradingMode.LIVE if cfg.broker.mode == "live" else TradingMode.PAPER,
base_url=cfg.broker.base_url,
),
}
logger.info("Ingestion worker started")
queue = queue_key(QUEUE_INGESTION)
try:
while True:
raw = await rds.lpop(queue)
if raw:
job = json.loads(raw)
await process_job(job, pool, rds, minio_client, adapters)
else:
await asyncio.sleep(2)
finally:
await pool.close()
await rds.close()
if __name__ == "__main__":
asyncio.run(main())