"""Parser worker - HTML-to-text, boilerplate reduction, quality scoring. Uses BeautifulSoup-based parsing pipeline for structured HTML extraction, metadata extraction, outbound link extraction, and quality scoring. Persists normalized text and structured parser output to MinIO, and updates document metadata in PostgreSQL. Requirements: 4.1, 4.2, 4.3, 9.1, 9.2 """ import asyncio import json import logging import time from datetime import datetime, timezone from typing import Any, Optional import asyncpg import httpx import redis.asyncio as aioredis from minio import Minio from services.parser.html_parser import ParsedDocument, detect_company_mentions, parse_html from services.shared.config import load_config from services.shared.db import get_minio, get_pg_pool, get_redis from services.shared.logging import ( inject_trace_context, new_trace_id, set_trace_context, setup_logging, ) from services.shared.metadata import update_document_parse_results from services.shared.metrics import ( PARSE_DURATION, PARSE_JOBS_TOTAL, PARSE_LOW_QUALITY_TOTAL, PARSE_QUALITY_SCORE, ) from services.shared.redis_keys import QUEUE_EXTRACTION, QUEUE_PARSING, queue_key from services.shared.storage import upload_normalized_text, upload_parser_output logger = logging.getLogger("parser_worker") async def fetch_html(url: str) -> Optional[str]: """Fetch article HTML for scraping.""" if not url: return None async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: try: resp = await client.get(url, headers={"User-Agent": "StonksOracle/1.0"}) resp.raise_for_status() return resp.text except Exception as e: logger.warning(f"Failed to fetch {url}: {e}") return None def build_parser_output_json(parsed: ParsedDocument, mentions: list[dict[str, Any]]) -> dict[str, Any]: """Build a structured JSON dict from ParsedDocument and detected mentions. This captures the full parser output for audit and downstream use: metadata, quality signals, warnings, outbound links, tags, and mentions. """ return { "title": parsed.title, "author": parsed.author, "publisher": parsed.publisher, "published_at": parsed.published_at, "canonical_url": parsed.canonical_url, "language": parsed.language, "description": parsed.description, "document_type": parsed.document_type, "word_count": parsed.word_count, "outbound_links": parsed.outbound_links, "tags": parsed.tags, "quality_score": parsed.quality_score, "confidence": parsed.confidence, "low_quality_flag": parsed.low_quality_flag, "quality_warnings": parsed.quality_warnings, "quality_signals": parsed.quality_signals.as_dict(), "mentioned_companies": mentions, } async def process_job( job: dict[str, Any], pool: asyncpg.Pool, rds: aioredis.Redis, minio_client: Minio, ) -> None: doc_id = job["document_id"] ticker = job["ticker"] url = job.get("url", "") now = datetime.now(timezone.utc) _parse_start = time.monotonic() set_trace_context(trace_id=job.get("_trace_id") or new_trace_id()) # If no URL in job, look it up from the documents table if not url: row = await pool.fetchrow( "SELECT url FROM documents WHERE id = $1::uuid", doc_id, ) if row and row["url"]: url = row["url"] # Fetch HTML if we have a URL html = await fetch_html(url) if url else None if html: # Parse using BeautifulSoup pipeline parsed = parse_html(html, url) else: parsed = ParsedDocument() text = parsed.body_text # Upload normalized text to MinIO norm_ref: str | None = None if text: norm_ref = upload_normalized_text( minio_client, ticker, doc_id, text.encode("utf-8"), timestamp=now, ) # Detect company mentions aliases = await pool.fetch( """SELECT ca.company_id::text, ca.alias, ca.alias_type, c.ticker FROM company_aliases ca JOIN companies c ON ca.company_id = c.id UNION ALL SELECT c.id::text as company_id, c.ticker as alias, 'ticker' as alias_type, c.ticker FROM companies c UNION ALL SELECT c.id::text as company_id, c.legal_name as alias, 'legal_name' as alias_type, c.ticker FROM companies c""" ) mentions = detect_company_mentions(text, [dict(a) for a in aliases]) if text else [] # Build and upload structured parser output JSON output_json = build_parser_output_json(parsed, mentions) output_bytes = json.dumps(output_json, default=str, indent=2).encode("utf-8") parser_output_ref = upload_parser_output( minio_client, ticker, doc_id, output_bytes, timestamp=now, ) # Update document in PostgreSQL status = "parsed" if parsed.confidence != "low" else "low_quality" await update_document_parse_results( pool, document_id=doc_id, normalized_storage_ref=norm_ref, parser_output_ref=parser_output_ref, parse_quality_score=parsed.quality_score, parse_confidence=parsed.confidence, status=status, ) # Insert company mentions for m in mentions: await pool.execute( """INSERT INTO document_company_mentions (document_id, company_id, ticker, mention_type, confidence) VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING""", doc_id, m["company_id"], m["ticker"], m["mention_type"], m["confidence"], ) # Only enqueue for extraction if quality is acceptable if parsed.confidence != "low": await rds.rpush(queue_key(QUEUE_EXTRACTION), json.dumps(inject_trace_context({ "document_id": doc_id, "ticker": ticker, "normalized_text": text[:32000], }))) PARSE_JOBS_TOTAL.labels(status="parsed").inc() PARSE_QUALITY_SCORE.observe(parsed.quality_score) PARSE_DURATION.observe(time.monotonic() - _parse_start) logger.info( "Parsed doc %s for %s: quality=%.2f, confidence=%s", doc_id, ticker, parsed.quality_score, parsed.confidence, extra={"ticker": ticker, "document_id": doc_id}, ) else: PARSE_JOBS_TOTAL.labels(status="low_quality").inc() PARSE_LOW_QUALITY_TOTAL.inc() PARSE_QUALITY_SCORE.observe(parsed.quality_score) PARSE_DURATION.observe(time.monotonic() - _parse_start) logger.warning( "Low quality parse for doc %s, skipping extraction", doc_id, extra={"ticker": ticker, "document_id": doc_id}, ) async def main() -> None: config = load_config() setup_logging("parser_worker", level=config.log_level, json_output=config.json_logs) pool = await get_pg_pool(config) rds = get_redis(config) minio_client = get_minio(config) logger.info("Parser worker started") queue = queue_key(QUEUE_PARSING) try: while True: raw = await rds.lpop(queue) if raw: job = json.loads(raw) try: await process_job(job, pool, rds, minio_client) except Exception as e: logger.error("Parse error: %s", e, exc_info=True) else: await asyncio.sleep(2) finally: await pool.close() await rds.close() if __name__ == "__main__": asyncio.run(main())