859 lines
27 KiB
Python
859 lines
27 KiB
Python
"""HTML-to-text parsing pipeline using BeautifulSoup.
|
|
|
|
Provides structured HTML parsing with boilerplate removal, metadata extraction,
|
|
outbound link extraction, and quality scoring. Inspired by Noctipede crawler
|
|
patterns: BeautifulSoup + content hashing, boilerplate stripping, quality scoring.
|
|
|
|
Requirements: 4.1, 4.2, 4.3
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import math
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from urllib.parse import urlparse
|
|
|
|
from bs4 import BeautifulSoup, Tag
|
|
|
|
logger = logging.getLogger("html_parser")
|
|
|
|
# Tags that never contain useful article content
|
|
STRIP_TAGS = [
|
|
"script", "style", "nav", "footer", "header", "aside",
|
|
"iframe", "noscript", "svg", "form", "button",
|
|
]
|
|
|
|
# CSS class / id substrings that signal boilerplate containers
|
|
BOILERPLATE_SIGNALS = [
|
|
"sidebar", "widget", "advert", "promo", "newsletter",
|
|
"social-share", "share-bar", "related-posts", "comment",
|
|
"cookie", "popup", "modal", "banner", "breadcrumb",
|
|
"pagination", "nav-", "menu", "toolbar", "signup",
|
|
"subscribe", "follow-us", "social-media", "share-button",
|
|
"ad-slot", "ad-container", "sponsored",
|
|
]
|
|
|
|
# Regex patterns for residual boilerplate in extracted text
|
|
BOILERPLATE_TEXT_PATTERNS = [
|
|
re.compile(r"(?i)subscribe to our newsletter.*?(?:\n|$)"),
|
|
re.compile(r"(?i)click here to read more.*?(?:\n|$)"),
|
|
re.compile(r"(?i)advertisement\s*\n?"),
|
|
re.compile(r"(?i)copyright ©.*?(?:\n|$)"),
|
|
re.compile(r"(?i)all rights reserved.*?(?:\n|$)"),
|
|
re.compile(r"(?i)terms of (use|service).*?(?:\n|$)"),
|
|
re.compile(r"(?i)privacy policy.*?(?:\n|$)"),
|
|
re.compile(r"\s*\[.*?ad.*?\]\s*", re.IGNORECASE),
|
|
re.compile(r"(?i)sign up for .*?(?:\n|$)"),
|
|
re.compile(r"(?i)follow us on .*?(?:\n|$)"),
|
|
re.compile(r"(?i)share this (article|story|post).*?(?:\n|$)"),
|
|
re.compile(r"(?i)read more:?\s*$"),
|
|
re.compile(r"(?i)recommended for you.*?(?:\n|$)"),
|
|
re.compile(r"(?i)you may also like.*?(?:\n|$)"),
|
|
re.compile(r"(?i)trending now.*?(?:\n|$)"),
|
|
re.compile(r"(?i)most (popular|read).*?(?:\n|$)"),
|
|
re.compile(r"(?i)^tags:\s*$"),
|
|
re.compile(r"(?i)^\s*photo\s*:.*?(?:\n|$)"),
|
|
re.compile(r"(?i)^\s*image\s*(credit|source|courtesy)\s*:.*?(?:\n|$)"),
|
|
]
|
|
|
|
# Selectors for article body candidates, in priority order
|
|
ARTICLE_SELECTORS = [
|
|
"article",
|
|
"[role='main']",
|
|
".article-body",
|
|
".post-content",
|
|
".entry-content",
|
|
".story-body",
|
|
".article-content",
|
|
"#article-body",
|
|
"#story-body",
|
|
".article-text",
|
|
".post-body",
|
|
".content-body",
|
|
"main",
|
|
]
|
|
|
|
# Minimum text density (text chars / total chars including markup) for a block
|
|
# to be considered content-rich rather than boilerplate
|
|
_MIN_TEXT_DENSITY = 0.25
|
|
|
|
# Minimum word count for a block to be a viable body candidate
|
|
_MIN_BLOCK_WORDS = 20
|
|
|
|
|
|
@dataclass
|
|
class QualitySignals:
|
|
"""Individual quality signals contributing to the overall parse score.
|
|
|
|
Each signal is a float in [0, 1] representing how well the parsed
|
|
content performs on that dimension.
|
|
|
|
Requirements: 4.3
|
|
"""
|
|
word_count_signal: float = 0.0
|
|
diversity_signal: float = 0.0
|
|
sentence_signal: float = 0.0
|
|
paragraph_signal: float = 0.0
|
|
body_found_signal: float = 0.0
|
|
metadata_signal: float = 0.0
|
|
|
|
def as_dict(self) -> dict[str, float]:
|
|
return {
|
|
"word_count": self.word_count_signal,
|
|
"diversity": self.diversity_signal,
|
|
"sentence": self.sentence_signal,
|
|
"paragraph": self.paragraph_signal,
|
|
"body_found": self.body_found_signal,
|
|
"metadata": self.metadata_signal,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class CompanyMention:
|
|
"""A detected company mention in parsed text.
|
|
|
|
Requirements: 1.3, 4.1
|
|
"""
|
|
company_id: str
|
|
ticker: str
|
|
mention_type: str # ticker, legal_name, alias, brand
|
|
confidence: float
|
|
match_count: int = 1
|
|
|
|
|
|
@dataclass
|
|
class ParsedDocument:
|
|
"""Result of HTML-to-text parsing pipeline."""
|
|
body_text: str = ""
|
|
title: str = ""
|
|
author: str = ""
|
|
publisher: str = ""
|
|
published_at: str | None = None
|
|
canonical_url: str | None = None
|
|
language: str = "en"
|
|
description: str = ""
|
|
document_type: str = "article"
|
|
outbound_links: list[str] = field(default_factory=list)
|
|
tags: list[str] = field(default_factory=list)
|
|
mentioned_companies: list[CompanyMention] = field(default_factory=list)
|
|
quality_score: float = 0.0
|
|
confidence: str = "low"
|
|
word_count: int = 0
|
|
quality_signals: QualitySignals = field(default_factory=QualitySignals)
|
|
low_quality_flag: bool = False
|
|
quality_warnings: list[str] = field(default_factory=list)
|
|
|
|
|
|
def _attr_str(tag: Tag, attr: str) -> str:
|
|
"""Safely get a tag attribute as a joined string."""
|
|
val = tag.get(attr, "")
|
|
if isinstance(val, list):
|
|
return " ".join(val)
|
|
return str(val) if val else ""
|
|
|
|
|
|
def _is_boilerplate_container(tag: Tag) -> bool:
|
|
"""Check if a tag looks like a boilerplate container by class/id."""
|
|
cls = _attr_str(tag, "class").lower()
|
|
tag_id = _attr_str(tag, "id").lower()
|
|
combined = f"{cls} {tag_id}"
|
|
return any(sig in combined for sig in BOILERPLATE_SIGNALS)
|
|
|
|
|
|
def _strip_boilerplate_tags(soup: BeautifulSoup) -> None:
|
|
"""Remove known non-content tags and boilerplate containers in-place."""
|
|
for tag_name in STRIP_TAGS:
|
|
for tag in soup.find_all(tag_name):
|
|
tag.decompose()
|
|
|
|
for tag in soup.find_all(True):
|
|
if _is_boilerplate_container(tag):
|
|
tag.decompose()
|
|
|
|
|
|
def _reduce_boilerplate_text(text: str) -> str:
|
|
"""Apply regex patterns to strip residual boilerplate from extracted text."""
|
|
for pattern in BOILERPLATE_TEXT_PATTERNS:
|
|
text = pattern.sub("", text)
|
|
return text.strip()
|
|
|
|
|
|
def _text_density(tag: Tag) -> float:
|
|
"""Compute text density for a tag: ratio of text length to total markup length.
|
|
|
|
Higher density means more actual text relative to HTML structure,
|
|
which is a strong signal for content blocks vs boilerplate.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
markup_len = len(str(tag))
|
|
if markup_len == 0:
|
|
return 0.0
|
|
text_len = len(tag.get_text(strip=True))
|
|
return text_len / markup_len
|
|
|
|
|
|
def _link_density(tag: Tag) -> float:
|
|
"""Compute link density: ratio of text inside <a> tags to total text.
|
|
|
|
High link density signals navigation/boilerplate blocks (menus, sidebars).
|
|
Low link density signals content paragraphs.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
total_text = len(tag.get_text(strip=True))
|
|
if total_text == 0:
|
|
return 1.0
|
|
link_text = sum(len(a.get_text(strip=True)) for a in tag.find_all("a"))
|
|
return link_text / total_text
|
|
|
|
|
|
def _block_score(tag: Tag) -> float:
|
|
"""Score a block element as a body candidate using text density heuristics.
|
|
|
|
Combines text density, link density, paragraph count, and word count
|
|
into a composite score. Higher is more likely to be the article body.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
text = tag.get_text(strip=True)
|
|
word_count = len(text.split())
|
|
if word_count < _MIN_BLOCK_WORDS:
|
|
return 0.0
|
|
|
|
td = _text_density(tag)
|
|
ld = _link_density(tag)
|
|
p_count = len(tag.find_all("p"))
|
|
|
|
# Base score from text density (0-1), penalized by link density
|
|
score = td * (1.0 - ld)
|
|
|
|
# Bonus for paragraph-rich blocks (structured article content)
|
|
if p_count >= 2:
|
|
score += 0.1 * min(p_count, 10)
|
|
|
|
# Bonus for word count (log-scaled to avoid runaway scores)
|
|
score += 0.05 * math.log(max(word_count, 1))
|
|
|
|
return score
|
|
|
|
|
|
def _find_article_body(soup: BeautifulSoup) -> Tag | None:
|
|
"""Find the most likely article body element.
|
|
|
|
First tries semantic selectors (article, [role=main], etc.).
|
|
If no semantic match, falls back to text-density scoring across
|
|
candidate block elements to find the content-richest container.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
# Priority 1: semantic selectors
|
|
for selector in ARTICLE_SELECTORS:
|
|
result = soup.select_one(selector)
|
|
if result:
|
|
text = result.get_text(strip=True)
|
|
if len(text.split()) >= _MIN_BLOCK_WORDS:
|
|
return result
|
|
|
|
# Priority 2: text-density scoring on block-level containers
|
|
candidates: list[tuple[float, Tag]] = []
|
|
for tag in soup.find_all(["div", "section", "td"]):
|
|
score = _block_score(tag)
|
|
if score > 0:
|
|
candidates.append((score, tag))
|
|
|
|
if candidates:
|
|
candidates.sort(key=lambda x: x[0], reverse=True)
|
|
return candidates[0][1]
|
|
|
|
return None
|
|
|
|
|
|
def _collapse_whitespace(text: str) -> str:
|
|
"""Collapse runs of blank lines into single separators."""
|
|
lines = [line.strip() for line in text.splitlines()]
|
|
result: list[str] = []
|
|
prev_blank = False
|
|
for line in lines:
|
|
if not line:
|
|
if not prev_blank:
|
|
result.append("")
|
|
prev_blank = True
|
|
else:
|
|
result.append(line)
|
|
prev_blank = False
|
|
return "\n".join(result).strip()
|
|
|
|
|
|
def _remove_short_orphan_lines(text: str, min_words: int = 3) -> str:
|
|
"""Remove very short orphan lines that are likely UI fragments or captions.
|
|
|
|
Lines shorter than min_words that don't end with sentence punctuation
|
|
are stripped. This catches leftover button labels, image captions,
|
|
and navigation fragments.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
lines = text.splitlines()
|
|
kept: list[str] = []
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
words = stripped.split()
|
|
if len(words) < min_words and not stripped.endswith((".", "!", "?", ":")):
|
|
continue
|
|
kept.append(line)
|
|
return "\n".join(kept)
|
|
|
|
|
|
def _detect_repeated_blocks(text: str, min_len: int = 40) -> str:
|
|
"""Remove repeated text blocks that appear more than once.
|
|
|
|
Template text (disclaimers, repeated footers) often appears verbatim
|
|
in multiple places. This strips exact duplicate blocks.
|
|
|
|
Requirements: 4.2
|
|
"""
|
|
lines = text.splitlines()
|
|
seen: dict[str, int] = {}
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if len(stripped) >= min_len:
|
|
seen[stripped] = seen.get(stripped, 0) + 1
|
|
|
|
duplicates = {k for k, v in seen.items() if v > 1}
|
|
if not duplicates:
|
|
return text
|
|
|
|
kept: list[str] = []
|
|
emitted: set[str] = set()
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if stripped in duplicates:
|
|
if stripped not in emitted:
|
|
kept.append(line)
|
|
emitted.add(stripped)
|
|
# Skip subsequent duplicates
|
|
else:
|
|
kept.append(line)
|
|
return "\n".join(kept)
|
|
|
|
|
|
def extract_body_text(html: str) -> str:
|
|
"""Extract main body text from HTML with boilerplate removal.
|
|
|
|
Pipeline:
|
|
1. Strip non-content tags (script, style, nav, footer, etc.)
|
|
2. Strip boilerplate containers by class/id signals
|
|
3. Find article body via semantic selectors or text-density scoring
|
|
4. Extract text from best candidate
|
|
5. Remove residual boilerplate via regex patterns
|
|
6. Remove short orphan lines (UI fragments)
|
|
7. Detect and collapse repeated template blocks
|
|
8. Collapse whitespace
|
|
|
|
Requirements: 4.1, 4.2
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
_strip_boilerplate_tags(soup)
|
|
|
|
article = _find_article_body(soup)
|
|
if article:
|
|
raw_text = article.get_text(separator="\n", strip=True)
|
|
else:
|
|
body = soup.find("body")
|
|
raw_text = (body or soup).get_text(separator="\n", strip=True)
|
|
|
|
# Multi-stage text cleaning
|
|
text = _reduce_boilerplate_text(raw_text)
|
|
text = _remove_short_orphan_lines(text)
|
|
text = _detect_repeated_blocks(text)
|
|
text = _collapse_whitespace(text)
|
|
return text
|
|
|
|
|
|
def extract_metadata(html: str, url: str = "") -> dict[str, str | None]:
|
|
"""Extract document metadata from HTML head elements.
|
|
|
|
Extracts title, author, publisher, published date, canonical URL,
|
|
language, description, and tags/keywords.
|
|
|
|
Requirements: 4.1
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
meta: dict[str, str | None] = {}
|
|
|
|
# Title: og:title > <title>
|
|
og_title = soup.find("meta", property="og:title")
|
|
if og_title and og_title.get("content"):
|
|
content = og_title["content"]
|
|
meta["title"] = content.strip() if isinstance(content, str) else ""
|
|
elif soup.title and soup.title.string:
|
|
meta["title"] = soup.title.string.strip()
|
|
else:
|
|
meta["title"] = ""
|
|
|
|
# Author
|
|
author_tag = soup.find("meta", attrs={"name": "author"})
|
|
if author_tag and author_tag.get("content"):
|
|
content = author_tag["content"]
|
|
meta["author"] = content.strip() if isinstance(content, str) else ""
|
|
else:
|
|
meta["author"] = ""
|
|
|
|
# Publisher: og:site_name > hostname
|
|
site_name = soup.find("meta", property="og:site_name")
|
|
if site_name and site_name.get("content"):
|
|
content = site_name["content"]
|
|
meta["publisher"] = content.strip() if isinstance(content, str) else ""
|
|
else:
|
|
meta["publisher"] = urlparse(url).hostname or "" if url else ""
|
|
|
|
# Published date: article:published_time > JSON-LD datePublished
|
|
pub_time = soup.find("meta", property="article:published_time")
|
|
if pub_time and pub_time.get("content"):
|
|
content = pub_time["content"]
|
|
meta["published_at"] = content.strip() if isinstance(content, str) else None
|
|
else:
|
|
meta["published_at"] = _extract_jsonld_date(soup)
|
|
|
|
# Canonical URL
|
|
canonical = soup.find("link", rel="canonical")
|
|
if canonical and canonical.get("href"):
|
|
meta["canonical_url"] = str(canonical["href"])
|
|
else:
|
|
og_url = soup.find("meta", property="og:url")
|
|
if og_url and og_url.get("content"):
|
|
meta["canonical_url"] = str(og_url["content"])
|
|
else:
|
|
meta["canonical_url"] = url or None
|
|
|
|
# Language
|
|
html_tag = soup.find("html")
|
|
if html_tag and html_tag.get("lang"):
|
|
lang = html_tag["lang"]
|
|
meta["language"] = str(lang)[:5] if lang else "en"
|
|
else:
|
|
meta["language"] = "en"
|
|
|
|
# Description
|
|
desc = soup.find("meta", property="og:description") or soup.find(
|
|
"meta", attrs={"name": "description"}
|
|
)
|
|
if desc and desc.get("content"):
|
|
content = desc["content"]
|
|
meta["description"] = content.strip() if isinstance(content, str) else ""
|
|
else:
|
|
meta["description"] = ""
|
|
|
|
# Tags / keywords
|
|
keywords = soup.find("meta", attrs={"name": "keywords"})
|
|
if keywords and keywords.get("content"):
|
|
content = keywords["content"]
|
|
raw = content.strip() if isinstance(content, str) else ""
|
|
meta["tags"] = raw # comma-separated string
|
|
else:
|
|
meta["tags"] = ""
|
|
|
|
return meta
|
|
|
|
|
|
def _extract_jsonld_date(soup: BeautifulSoup) -> str | None:
|
|
"""Try to extract datePublished from JSON-LD script tags."""
|
|
for script in soup.find_all("script", type="application/ld+json"):
|
|
if script.string and "datePublished" in script.string:
|
|
try:
|
|
ld = json.loads(script.string)
|
|
if isinstance(ld, dict) and "datePublished" in ld:
|
|
return str(ld["datePublished"])
|
|
if isinstance(ld, list):
|
|
for item in ld:
|
|
if isinstance(item, dict) and "datePublished" in item:
|
|
return str(item["datePublished"])
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def extract_outbound_links(html: str, base_url: str = "") -> list[str]:
|
|
"""Extract outbound links from HTML, filtering out self-references.
|
|
|
|
Requirements: 4.1
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
base_host = urlparse(base_url).hostname or "" if base_url else ""
|
|
links: list[str] = []
|
|
|
|
for a_tag in soup.find_all("a", href=True):
|
|
href = str(a_tag["href"]).strip()
|
|
if not href or href.startswith("#") or href.startswith("javascript:"):
|
|
continue
|
|
parsed = urlparse(href)
|
|
# Only include absolute URLs that point to different hosts
|
|
if parsed.scheme in ("http", "https") and parsed.hostname:
|
|
if parsed.hostname != base_host:
|
|
links.append(href)
|
|
|
|
# Dedupe while preserving order
|
|
seen: set[str] = set()
|
|
unique: list[str] = []
|
|
for link in links:
|
|
if link not in seen:
|
|
seen.add(link)
|
|
unique.append(link)
|
|
return unique
|
|
|
|
|
|
def _count_sentences(text: str) -> int:
|
|
"""Count approximate sentence count by terminal punctuation."""
|
|
return len(re.findall(r"[.!?]+(?:\s|$)", text))
|
|
|
|
|
|
def _count_paragraphs(text: str) -> int:
|
|
"""Count non-empty paragraph blocks separated by blank lines."""
|
|
blocks = re.split(r"\n\s*\n", text.strip())
|
|
return sum(1 for b in blocks if len(b.strip().split()) >= 5)
|
|
|
|
|
|
def score_parse_quality(
|
|
text: str,
|
|
*,
|
|
body_found: bool = True,
|
|
has_title: bool = False,
|
|
has_author: bool = False,
|
|
has_publisher: bool = False,
|
|
has_published_at: bool = False,
|
|
) -> tuple[float, str, QualitySignals, list[str]]:
|
|
"""Score parse quality using multiple content and metadata signals.
|
|
|
|
Returns (score, confidence_label, signals, warnings).
|
|
|
|
Signals considered:
|
|
- word_count_signal: length of extracted text
|
|
- diversity_signal: vocabulary richness (unique/total words)
|
|
- sentence_signal: presence of proper sentence structure
|
|
- paragraph_signal: multi-paragraph structure
|
|
- body_found_signal: whether a semantic article body was located
|
|
- metadata_signal: presence of title, author, publisher, date
|
|
|
|
Requirements: 4.3
|
|
"""
|
|
warnings: list[str] = []
|
|
words = text.split()
|
|
word_count = len(words)
|
|
|
|
# --- word count signal ---
|
|
if word_count < 20:
|
|
wc_sig = 0.1
|
|
warnings.append("very_short_text")
|
|
elif word_count < 50:
|
|
wc_sig = 0.3
|
|
warnings.append("short_text")
|
|
elif word_count < 150:
|
|
wc_sig = 0.6
|
|
elif word_count < 300:
|
|
wc_sig = 0.8
|
|
else:
|
|
wc_sig = 1.0
|
|
|
|
# --- diversity signal ---
|
|
if word_count > 0:
|
|
unique = len(set(w.lower() for w in words))
|
|
diversity = unique / word_count
|
|
else:
|
|
diversity = 0.0
|
|
if diversity < 0.2:
|
|
div_sig = 0.2
|
|
if word_count >= 20:
|
|
warnings.append("low_vocabulary_diversity")
|
|
elif diversity < 0.4:
|
|
div_sig = 0.5
|
|
else:
|
|
div_sig = 1.0
|
|
|
|
# --- sentence signal ---
|
|
sentence_count = _count_sentences(text)
|
|
if sentence_count == 0:
|
|
sent_sig = 0.1
|
|
if word_count >= 20:
|
|
warnings.append("no_sentence_structure")
|
|
elif sentence_count < 3:
|
|
sent_sig = 0.5
|
|
else:
|
|
sent_sig = 1.0
|
|
|
|
# --- paragraph signal ---
|
|
para_count = _count_paragraphs(text)
|
|
if para_count == 0:
|
|
para_sig = 0.2
|
|
elif para_count == 1:
|
|
para_sig = 0.5
|
|
else:
|
|
para_sig = 1.0
|
|
|
|
# --- body found signal ---
|
|
body_sig = 1.0 if body_found else 0.3
|
|
if not body_found:
|
|
warnings.append("no_article_body_found")
|
|
|
|
# --- metadata signal ---
|
|
meta_hits = sum([has_title, has_author, has_publisher, has_published_at])
|
|
meta_sig = meta_hits / 4.0
|
|
|
|
signals = QualitySignals(
|
|
word_count_signal=wc_sig,
|
|
diversity_signal=div_sig,
|
|
sentence_signal=sent_sig,
|
|
paragraph_signal=para_sig,
|
|
body_found_signal=body_sig,
|
|
metadata_signal=meta_sig,
|
|
)
|
|
|
|
# Weighted composite score
|
|
score = (
|
|
0.30 * wc_sig
|
|
+ 0.15 * div_sig
|
|
+ 0.15 * sent_sig
|
|
+ 0.10 * para_sig
|
|
+ 0.20 * body_sig
|
|
+ 0.10 * meta_sig
|
|
)
|
|
score = round(min(score, 0.95), 2)
|
|
|
|
# Confidence label
|
|
if score < 0.35:
|
|
confidence = "low"
|
|
elif score < 0.65:
|
|
confidence = "medium"
|
|
else:
|
|
confidence = "high"
|
|
|
|
return score, confidence, signals, warnings
|
|
|
|
|
|
def score_quality(text: str) -> tuple[float, str]:
|
|
"""Score parse quality based on extracted text characteristics.
|
|
|
|
Returns (score, confidence_label) where confidence is low/medium/high.
|
|
Thin wrapper around score_parse_quality for backward compatibility.
|
|
|
|
Requirements: 4.3
|
|
"""
|
|
score, confidence, _signals, _warnings = score_parse_quality(text)
|
|
return score, confidence
|
|
|
|
|
|
def infer_document_type(html: str, url: str = "") -> str:
|
|
"""Infer document type from URL patterns and HTML content.
|
|
|
|
Requirements: 4.1
|
|
"""
|
|
url_lower = url.lower()
|
|
if any(kw in url_lower for kw in ["sec.gov", "edgar", "filing", "10-k", "10-q", "8-k"]):
|
|
return "filing"
|
|
if any(kw in url_lower for kw in ["transcript", "earnings-call", "earnings_call"]):
|
|
return "transcript"
|
|
if any(kw in url_lower for kw in ["press-release", "press_release", "newsroom"]):
|
|
return "press_release"
|
|
# html reserved for future content-based inference
|
|
_ = html
|
|
return "article"
|
|
|
|
|
|
def parse_html(html: str, url: str = "", aliases: list[dict[str, str]] | None = None) -> ParsedDocument:
|
|
"""Full HTML-to-text parsing pipeline.
|
|
|
|
Combines body extraction, metadata extraction, link extraction,
|
|
quality scoring, document type inference, and company mention
|
|
detection into a single result.
|
|
|
|
Requirements: 1.3, 4.1, 4.2, 4.3
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
_strip_boilerplate_tags(soup)
|
|
|
|
article = _find_article_body(soup)
|
|
body_found = article is not None
|
|
if article:
|
|
raw_text = article.get_text(separator="\n", strip=True)
|
|
else:
|
|
body = soup.find("body")
|
|
raw_text = (body or soup).get_text(separator="\n", strip=True)
|
|
|
|
# Multi-stage text cleaning
|
|
text = _reduce_boilerplate_text(raw_text)
|
|
text = _remove_short_orphan_lines(text)
|
|
text = _detect_repeated_blocks(text)
|
|
text = _collapse_whitespace(text)
|
|
|
|
metadata = extract_metadata(html, url)
|
|
outbound_links = extract_outbound_links(html, url)
|
|
doc_type = infer_document_type(html, url)
|
|
word_count = len(text.split())
|
|
|
|
tags_raw = metadata.get("tags", "") or ""
|
|
tags = [t.strip() for t in tags_raw.split(",") if t.strip()] if tags_raw else []
|
|
|
|
# Rich quality scoring with all available signals
|
|
quality, confidence, signals, warnings = score_parse_quality(
|
|
text,
|
|
body_found=body_found,
|
|
has_title=bool(metadata.get("title")),
|
|
has_author=bool(metadata.get("author")),
|
|
has_publisher=bool(metadata.get("publisher")),
|
|
has_published_at=bool(metadata.get("published_at")),
|
|
)
|
|
|
|
low_quality_flag = confidence == "low"
|
|
|
|
# Company mention detection
|
|
mentioned: list[CompanyMention] = []
|
|
if aliases and text:
|
|
# Search title + body for mentions
|
|
search_text = f"{metadata.get('title', '')} {text}"
|
|
raw_mentions = detect_company_mentions(search_text, aliases)
|
|
for m in raw_mentions:
|
|
mentioned.append(CompanyMention(
|
|
company_id=str(m["company_id"]),
|
|
ticker=str(m["ticker"]),
|
|
mention_type=str(m["mention_type"]),
|
|
confidence=float(m["confidence"]),
|
|
match_count=int(m["match_count"]),
|
|
))
|
|
|
|
return ParsedDocument(
|
|
body_text=text,
|
|
title=metadata.get("title", "") or "",
|
|
author=metadata.get("author", "") or "",
|
|
publisher=metadata.get("publisher", "") or "",
|
|
published_at=metadata.get("published_at"),
|
|
canonical_url=metadata.get("canonical_url"),
|
|
language=metadata.get("language", "en") or "en",
|
|
description=metadata.get("description", "") or "",
|
|
document_type=doc_type,
|
|
outbound_links=outbound_links,
|
|
tags=tags,
|
|
mentioned_companies=mentioned,
|
|
quality_score=quality,
|
|
confidence=confidence,
|
|
word_count=word_count,
|
|
quality_signals=signals,
|
|
low_quality_flag=low_quality_flag,
|
|
quality_warnings=warnings,
|
|
)
|
|
|
|
|
|
|
|
@dataclass
|
|
class AliasEntry:
|
|
"""A company alias used for mention detection."""
|
|
company_id: str
|
|
alias: str
|
|
alias_type: str = "alias"
|
|
ticker: str = ""
|
|
|
|
|
|
# Confidence by alias type — tickers are most precise, brands least
|
|
_CONFIDENCE_BY_TYPE: dict[str, float] = {
|
|
"ticker": 0.9,
|
|
"legal_name": 0.85,
|
|
"alias": 0.7,
|
|
"brand": 0.6,
|
|
}
|
|
|
|
|
|
def _build_alias_entries(aliases: list[dict[str, str]]) -> list[AliasEntry]:
|
|
"""Convert raw alias dicts to typed AliasEntry objects."""
|
|
entries: list[AliasEntry] = []
|
|
for a in aliases:
|
|
alias_val = a.get("alias", "")
|
|
if not alias_val:
|
|
continue
|
|
entries.append(AliasEntry(
|
|
company_id=a.get("company_id", ""),
|
|
alias=alias_val,
|
|
alias_type=a.get("alias_type", "alias"),
|
|
ticker=a.get("ticker", ""),
|
|
))
|
|
return entries
|
|
|
|
|
|
def _count_matches(text: str, pattern: re.Pattern[str]) -> int:
|
|
"""Count non-overlapping matches of pattern in text."""
|
|
return len(pattern.findall(text))
|
|
|
|
|
|
def detect_company_mentions(
|
|
text: str,
|
|
aliases: list[dict[str, str]],
|
|
) -> list[dict[str, str | float | int]]:
|
|
"""Detect company mentions using ticker, alias, and name matching.
|
|
|
|
Matching strategy by alias length:
|
|
- 1-2 chars: case-sensitive word-boundary match (avoids "A" matching "a")
|
|
- 3-4 chars: case-insensitive word-boundary match (standard tickers)
|
|
- 5+ chars: case-insensitive substring match (company names, brands)
|
|
|
|
Confidence varies by alias_type: ticker > legal_name > alias > brand.
|
|
Multiple alias hits for the same company are deduplicated, keeping the
|
|
highest-confidence match and summing match counts.
|
|
|
|
Requirements: 1.3, 4.1
|
|
"""
|
|
if not text:
|
|
return []
|
|
|
|
entries = _build_alias_entries(aliases)
|
|
text_upper = text.upper()
|
|
|
|
# Track best match per company: company_id -> (confidence, ticker, mention_type, count)
|
|
best: dict[str, tuple[float, str, str, int]] = {}
|
|
|
|
for entry in entries:
|
|
alias = entry.alias
|
|
alias_type = entry.alias_type
|
|
base_confidence = _CONFIDENCE_BY_TYPE.get(alias_type, 0.7)
|
|
|
|
match_count = 0
|
|
|
|
if len(alias) <= 2:
|
|
# Very short: case-sensitive word boundary
|
|
pattern = re.compile(r"\b" + re.escape(alias) + r"\b")
|
|
match_count = _count_matches(text, pattern)
|
|
elif len(alias) <= 4:
|
|
# Standard ticker length: case-insensitive word boundary
|
|
pattern = re.compile(r"\b" + re.escape(alias.upper()) + r"\b")
|
|
match_count = _count_matches(text_upper, pattern)
|
|
else:
|
|
# Longer names: case-insensitive substring
|
|
alias_up = alias.upper()
|
|
match_count = text_upper.count(alias_up)
|
|
|
|
if match_count == 0:
|
|
continue
|
|
|
|
cid = entry.company_id
|
|
existing = best.get(cid)
|
|
if existing is None:
|
|
best[cid] = (base_confidence, entry.ticker, alias_type, match_count)
|
|
else:
|
|
# Keep highest confidence, accumulate match count
|
|
prev_conf, prev_ticker, prev_type, prev_count = existing
|
|
if base_confidence > prev_conf:
|
|
best[cid] = (base_confidence, entry.ticker, alias_type, prev_count + match_count)
|
|
else:
|
|
best[cid] = (prev_conf, prev_ticker, prev_type, prev_count + match_count)
|
|
|
|
mentions: list[dict[str, str | float | int]] = []
|
|
for cid, (confidence, ticker, mention_type, count) in best.items():
|
|
mentions.append({
|
|
"company_id": cid,
|
|
"ticker": ticker,
|
|
"mention_type": mention_type,
|
|
"confidence": confidence,
|
|
"match_count": count,
|
|
})
|
|
|
|
return mentions
|