223 lines
7.5 KiB
Python
223 lines
7.5 KiB
Python
"""Parser worker - HTML-to-text, boilerplate reduction, quality scoring.
|
|
|
|
Uses BeautifulSoup-based parsing pipeline for structured HTML extraction,
|
|
metadata extraction, outbound link extraction, and quality scoring.
|
|
Persists normalized text and structured parser output to MinIO,
|
|
and updates document metadata in PostgreSQL.
|
|
|
|
Requirements: 4.1, 4.2, 4.3, 9.1, 9.2
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
import asyncpg
|
|
import httpx
|
|
import redis.asyncio as aioredis
|
|
from minio import Minio
|
|
|
|
from services.parser.html_parser import ParsedDocument, detect_company_mentions, parse_html
|
|
from services.shared.config import load_config
|
|
from services.shared.db import get_minio, get_pg_pool, get_redis
|
|
from services.shared.logging import (
|
|
inject_trace_context,
|
|
new_trace_id,
|
|
set_trace_context,
|
|
setup_logging,
|
|
)
|
|
from services.shared.metadata import update_document_parse_results
|
|
from services.shared.metrics import (
|
|
PARSE_DURATION,
|
|
PARSE_JOBS_TOTAL,
|
|
PARSE_LOW_QUALITY_TOTAL,
|
|
PARSE_QUALITY_SCORE,
|
|
)
|
|
from services.shared.redis_keys import QUEUE_EXTRACTION, QUEUE_PARSING, queue_key
|
|
from services.shared.storage import upload_normalized_text, upload_parser_output
|
|
|
|
logger = logging.getLogger("parser_worker")
|
|
|
|
|
|
async def fetch_html(url: str) -> Optional[str]:
|
|
"""Fetch article HTML for scraping."""
|
|
if not url:
|
|
return None
|
|
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
|
|
try:
|
|
resp = await client.get(url, headers={"User-Agent": "StonksOracle/1.0"})
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch {url}: {e}")
|
|
return None
|
|
|
|
|
|
def build_parser_output_json(parsed: ParsedDocument, mentions: list[dict[str, Any]]) -> dict[str, Any]:
|
|
"""Build a structured JSON dict from ParsedDocument and detected mentions.
|
|
|
|
This captures the full parser output for audit and downstream use:
|
|
metadata, quality signals, warnings, outbound links, tags, and mentions.
|
|
"""
|
|
return {
|
|
"title": parsed.title,
|
|
"author": parsed.author,
|
|
"publisher": parsed.publisher,
|
|
"published_at": parsed.published_at,
|
|
"canonical_url": parsed.canonical_url,
|
|
"language": parsed.language,
|
|
"description": parsed.description,
|
|
"document_type": parsed.document_type,
|
|
"word_count": parsed.word_count,
|
|
"outbound_links": parsed.outbound_links,
|
|
"tags": parsed.tags,
|
|
"quality_score": parsed.quality_score,
|
|
"confidence": parsed.confidence,
|
|
"low_quality_flag": parsed.low_quality_flag,
|
|
"quality_warnings": parsed.quality_warnings,
|
|
"quality_signals": parsed.quality_signals.as_dict(),
|
|
"mentioned_companies": mentions,
|
|
}
|
|
|
|
|
|
async def process_job(
|
|
job: dict[str, Any],
|
|
pool: asyncpg.Pool,
|
|
rds: aioredis.Redis,
|
|
minio_client: Minio,
|
|
) -> None:
|
|
doc_id = job["document_id"]
|
|
ticker = job["ticker"]
|
|
url = job.get("url", "")
|
|
now = datetime.now(timezone.utc)
|
|
_parse_start = time.monotonic()
|
|
|
|
set_trace_context(trace_id=job.get("_trace_id") or new_trace_id())
|
|
|
|
# If no URL in job, look it up from the documents table
|
|
if not url:
|
|
row = await pool.fetchrow(
|
|
"SELECT url FROM documents WHERE id = $1::uuid", doc_id,
|
|
)
|
|
if row and row["url"]:
|
|
url = row["url"]
|
|
|
|
# Fetch HTML if we have a URL
|
|
html = await fetch_html(url) if url else None
|
|
|
|
if html:
|
|
# Parse using BeautifulSoup pipeline
|
|
parsed = parse_html(html, url)
|
|
else:
|
|
parsed = ParsedDocument()
|
|
|
|
text = parsed.body_text
|
|
|
|
# Upload normalized text to MinIO
|
|
norm_ref: str | None = None
|
|
if text:
|
|
norm_ref = upload_normalized_text(
|
|
minio_client, ticker, doc_id,
|
|
text.encode("utf-8"), timestamp=now,
|
|
)
|
|
|
|
# Detect company mentions
|
|
aliases = await pool.fetch(
|
|
"""SELECT ca.company_id::text, ca.alias, ca.alias_type, c.ticker
|
|
FROM company_aliases ca JOIN companies c ON ca.company_id = c.id
|
|
UNION ALL
|
|
SELECT c.id::text as company_id, c.ticker as alias, 'ticker' as alias_type, c.ticker
|
|
FROM companies c
|
|
UNION ALL
|
|
SELECT c.id::text as company_id, c.legal_name as alias, 'legal_name' as alias_type, c.ticker
|
|
FROM companies c"""
|
|
)
|
|
mentions = detect_company_mentions(text, [dict(a) for a in aliases]) if text else []
|
|
|
|
# Build and upload structured parser output JSON
|
|
output_json = build_parser_output_json(parsed, mentions)
|
|
output_bytes = json.dumps(output_json, default=str, indent=2).encode("utf-8")
|
|
parser_output_ref = upload_parser_output(
|
|
minio_client, ticker, doc_id,
|
|
output_bytes, timestamp=now,
|
|
)
|
|
|
|
# Update document in PostgreSQL
|
|
status = "parsed" if parsed.confidence != "low" else "low_quality"
|
|
await update_document_parse_results(
|
|
pool,
|
|
document_id=doc_id,
|
|
normalized_storage_ref=norm_ref,
|
|
parser_output_ref=parser_output_ref,
|
|
parse_quality_score=parsed.quality_score,
|
|
parse_confidence=parsed.confidence,
|
|
status=status,
|
|
)
|
|
|
|
# Insert company mentions
|
|
for m in mentions:
|
|
await pool.execute(
|
|
"""INSERT INTO document_company_mentions (document_id, company_id, ticker, mention_type, confidence)
|
|
VALUES ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING""",
|
|
doc_id, m["company_id"], m["ticker"], m["mention_type"], m["confidence"],
|
|
)
|
|
|
|
# Only enqueue for extraction if quality is acceptable
|
|
if parsed.confidence != "low":
|
|
await rds.rpush(queue_key(QUEUE_EXTRACTION), json.dumps(inject_trace_context({
|
|
"document_id": doc_id,
|
|
"ticker": ticker,
|
|
"normalized_text": text[:8000],
|
|
})))
|
|
PARSE_JOBS_TOTAL.labels(status="parsed").inc()
|
|
PARSE_QUALITY_SCORE.observe(parsed.quality_score)
|
|
PARSE_DURATION.observe(time.monotonic() - _parse_start)
|
|
logger.info(
|
|
"Parsed doc %s for %s: quality=%.2f, confidence=%s",
|
|
doc_id, ticker, parsed.quality_score, parsed.confidence,
|
|
extra={"ticker": ticker, "document_id": doc_id},
|
|
)
|
|
else:
|
|
PARSE_JOBS_TOTAL.labels(status="low_quality").inc()
|
|
PARSE_LOW_QUALITY_TOTAL.inc()
|
|
PARSE_QUALITY_SCORE.observe(parsed.quality_score)
|
|
PARSE_DURATION.observe(time.monotonic() - _parse_start)
|
|
logger.warning(
|
|
"Low quality parse for doc %s, skipping extraction",
|
|
doc_id,
|
|
extra={"ticker": ticker, "document_id": doc_id},
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
setup_logging("parser_worker", level=config.log_level, json_output=config.json_logs)
|
|
|
|
pool = await get_pg_pool(config)
|
|
rds = get_redis(config)
|
|
minio_client = get_minio(config)
|
|
|
|
logger.info("Parser worker started")
|
|
queue = queue_key(QUEUE_PARSING)
|
|
|
|
try:
|
|
while True:
|
|
raw = await rds.lpop(queue)
|
|
if raw:
|
|
job = json.loads(raw)
|
|
try:
|
|
await process_job(job, pool, rds, minio_client)
|
|
except Exception as e:
|
|
logger.error("Parse error: %s", e, exc_info=True)
|
|
else:
|
|
await asyncio.sleep(2)
|
|
finally:
|
|
await pool.close()
|
|
await rds.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|