Files
stonks-oracle/services/parser/worker.py
T

267 lines
9.7 KiB
Python

"""Parser worker - HTML-to-text, boilerplate reduction, quality scoring.
Uses BeautifulSoup-based parsing pipeline for structured HTML extraction,
metadata extraction, outbound link extraction, and quality scoring.
Persists normalized text and structured parser output to MinIO,
and updates document metadata in PostgreSQL.
Requirements: 4.1, 4.2, 4.3, 9.1, 9.2
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any, Optional
import asyncpg
import httpx
import redis.asyncio as aioredis
from minio import Minio
from services.parser.html_parser import ParsedDocument, detect_company_mentions, parse_html
from services.shared.config import load_config
from services.shared.db import get_minio, get_pg_pool, get_redis
from services.shared.logging import (
inject_trace_context,
new_trace_id,
set_trace_context,
setup_logging,
)
from services.shared.metadata import update_document_parse_results
from services.shared.metrics import (
PARSE_DURATION,
PARSE_JOBS_TOTAL,
PARSE_LOW_QUALITY_TOTAL,
PARSE_QUALITY_SCORE,
)
from services.shared.redis_keys import QUEUE_EXTRACTION, QUEUE_PARSING, queue_key
from services.shared.storage import upload_normalized_text, upload_parser_output
logger = logging.getLogger("parser_worker")
async def fetch_html(url: str) -> Optional[str]:
"""Fetch article HTML for scraping."""
if not url:
return None
# SEC EDGAR requires a descriptive User-Agent with contact email per fair access policy
if "sec.gov" in url:
ua = "StonksOracle/1.0 (stonks-oracle-bot; contact@celestium.life)"
else:
ua = "StonksOracle/1.0"
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
try:
resp = await client.get(url, headers={"User-Agent": ua, "Accept-Encoding": "gzip, deflate"})
resp.raise_for_status()
return resp.text
except Exception as e:
logger.warning(f"Failed to fetch {url}: {e}")
return None
def build_parser_output_json(parsed: ParsedDocument, mentions: list[dict[str, Any]]) -> dict[str, Any]:
"""Build a structured JSON dict from ParsedDocument and detected mentions.
This captures the full parser output for audit and downstream use:
metadata, quality signals, warnings, outbound links, tags, and mentions.
"""
return {
"title": parsed.title,
"author": parsed.author,
"publisher": parsed.publisher,
"published_at": parsed.published_at,
"canonical_url": parsed.canonical_url,
"language": parsed.language,
"description": parsed.description,
"document_type": parsed.document_type,
"word_count": parsed.word_count,
"outbound_links": parsed.outbound_links,
"tags": parsed.tags,
"quality_score": parsed.quality_score,
"confidence": parsed.confidence,
"low_quality_flag": parsed.low_quality_flag,
"quality_warnings": parsed.quality_warnings,
"quality_signals": parsed.quality_signals.as_dict(),
"mentioned_companies": mentions,
}
async def process_job(
job: dict[str, Any],
pool: asyncpg.Pool,
rds: aioredis.Redis,
minio_client: Minio,
) -> None:
doc_id = job["document_id"]
ticker = job["ticker"]
url = job.get("url", "")
now = datetime.now(timezone.utc)
_parse_start = time.monotonic()
set_trace_context(trace_id=job.get("_trace_id") or new_trace_id())
# If no URL in job, look it up from the documents table
if not url:
row = await pool.fetchrow(
"SELECT url FROM documents WHERE id = $1::uuid", doc_id,
)
if row and row["url"]:
url = row["url"]
# Fetch HTML if we have a URL
html = await fetch_html(url) if url else None
if html:
# Parse using BeautifulSoup pipeline
parsed = parse_html(html, url)
else:
parsed = ParsedDocument()
text = parsed.body_text
# If parsed body is short (<500 chars), enrich with Polygon description from raw payload
if len(text or "") < 500:
doc_row = await pool.fetchrow(
"SELECT title, raw_storage_ref FROM documents WHERE id = $1::uuid", doc_id,
)
if doc_row and doc_row["raw_storage_ref"]:
try:
ref = doc_row["raw_storage_ref"]
parts = ref.replace("s3://", "").split("/", 1)
if len(parts) == 2:
raw_obj = minio_client.get_object(parts[0], parts[1])
raw_data = json.loads(raw_obj.read())
raw_obj.close()
raw_obj.release_conn()
# Find matching article by title
items = raw_data.get("results", [])
if isinstance(items, list):
doc_title = doc_row["title"] or ""
for item in items:
if item.get("title", "") == doc_title:
desc = item.get("description", "")
keywords = item.get("keywords", [])
author = item.get("author", "")
enriched_parts = []
if doc_title:
enriched_parts.append(doc_title)
if author:
enriched_parts.append(f"By {author}")
if desc:
enriched_parts.append(desc)
if keywords:
enriched_parts.append(f"Keywords: {', '.join(keywords)}")
if text:
enriched_parts.append(text)
text = "\n\n".join(enriched_parts)
break
except Exception as e:
logger.debug("Could not enrich short text for doc %s: %s", doc_id, e)
# Upload normalized text to MinIO
norm_ref: str | None = None
if text:
norm_ref = upload_normalized_text(
minio_client, ticker, doc_id,
text.encode("utf-8"), timestamp=now,
)
# Detect company mentions
aliases = await pool.fetch(
"""SELECT ca.company_id::text, ca.alias, ca.alias_type, c.ticker
FROM company_aliases ca JOIN companies c ON ca.company_id = c.id
UNION ALL
SELECT c.id::text as company_id, c.ticker as alias, 'ticker' as alias_type, c.ticker
FROM companies c
UNION ALL
SELECT c.id::text as company_id, c.legal_name as alias, 'legal_name' as alias_type, c.ticker
FROM companies c"""
)
mentions = detect_company_mentions(text, [dict(a) for a in aliases]) if text else []
# Build and upload structured parser output JSON
output_json = build_parser_output_json(parsed, mentions)
output_bytes = json.dumps(output_json, default=str, indent=2).encode("utf-8")
parser_output_ref = upload_parser_output(
minio_client, ticker, doc_id,
output_bytes, timestamp=now,
)
# Update document in PostgreSQL
status = "parsed" if parsed.confidence != "low" else "low_quality"
await update_document_parse_results(
pool,
document_id=doc_id,
normalized_storage_ref=norm_ref,
parser_output_ref=parser_output_ref,
parse_quality_score=parsed.quality_score,
parse_confidence=parsed.confidence,
status=status,
)
# Insert company mentions
for m in mentions:
await pool.execute(
"""INSERT INTO document_company_mentions (document_id, company_id, ticker, mention_type, confidence)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING""",
doc_id, m["company_id"], m["ticker"], m["mention_type"], m["confidence"],
)
# Only enqueue for extraction if quality is acceptable
if parsed.confidence != "low":
await rds.rpush(queue_key(QUEUE_EXTRACTION), json.dumps(inject_trace_context({
"document_id": doc_id,
"ticker": ticker,
"normalized_text": text[:32000],
})))
PARSE_JOBS_TOTAL.labels(status="parsed").inc()
PARSE_QUALITY_SCORE.observe(parsed.quality_score)
PARSE_DURATION.observe(time.monotonic() - _parse_start)
logger.info(
"Parsed doc %s for %s: quality=%.2f, confidence=%s",
doc_id, ticker, parsed.quality_score, parsed.confidence,
extra={"ticker": ticker, "document_id": doc_id},
)
else:
PARSE_JOBS_TOTAL.labels(status="low_quality").inc()
PARSE_LOW_QUALITY_TOTAL.inc()
PARSE_QUALITY_SCORE.observe(parsed.quality_score)
PARSE_DURATION.observe(time.monotonic() - _parse_start)
logger.warning(
"Low quality parse for doc %s, skipping extraction",
doc_id,
extra={"ticker": ticker, "document_id": doc_id},
)
async def main() -> None:
config = load_config()
setup_logging("parser_worker", level=config.log_level, json_output=config.json_logs)
pool = await get_pg_pool(config)
rds = get_redis(config)
minio_client = get_minio(config)
logger.info("Parser worker started")
queue = queue_key(QUEUE_PARSING)
try:
while True:
raw = await rds.lpop(queue)
if raw:
job = json.loads(raw)
try:
await process_job(job, pool, rds, minio_client)
except Exception as e:
logger.error("Parse error: %s", e, exc_info=True)
else:
await asyncio.sleep(2)
finally:
await pool.close()
await rds.close()
if __name__ == "__main__":
asyncio.run(main())