Files
stonks-oracle/services/adapters/web_scrape_adapter.py
T
2026-04-11 12:10:01 -07:00

322 lines
12 KiB
Python

"""Web scrape adapter for curated URLs and article pages.
Fetches full article HTML from curated URLs (investor relations pages,
press releases, earnings transcripts, etc.) using BeautifulSoup + requests
with retry adapters, content hashing, boilerplate awareness, and quality scoring.
Inspired by Noctipede crawler patterns: BeautifulSoup + requests with retry
adapters, content hashing, boilerplate stripping, quality scoring.
Requirements: 1.2, 2.5, 3.1, 3.2, 3.3, 3.4
"""
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup
from services.shared.content import content_hash, normalize_url
from .base import AdapterResult, BaseAdapter
logger = logging.getLogger("web_scrape_adapter")
# Default request settings
DEFAULT_TIMEOUT = 30
DEFAULT_USER_AGENT = "StonksOracle/1.0 (+https://stonks-oracle.celestium.life)"
MAX_CONTENT_LENGTH = 10 * 1024 * 1024 # 10MB cap
def extract_metadata_from_html(html: str, url: str) -> dict[str, str | None]:
"""Extract title, author, publisher, published date, and links from HTML."""
soup = BeautifulSoup(html, "html.parser")
meta: dict[str, str | None] = {}
# Title: prefer og:title, then <title>
og_title = soup.find("meta", property="og:title")
if og_title and og_title.get("content"):
content = og_title["content"]
meta["title"] = content.strip() if isinstance(content, str) else ""
elif soup.title and soup.title.string:
meta["title"] = soup.title.string.strip()
else:
meta["title"] = ""
# Author
author_tag = soup.find("meta", attrs={"name": "author"})
if author_tag and author_tag.get("content"):
content = author_tag["content"]
meta["author"] = content.strip() if isinstance(content, str) else ""
else:
meta["author"] = ""
# Publisher: og:site_name
site_name = soup.find("meta", property="og:site_name")
if site_name and site_name.get("content"):
content = site_name["content"]
meta["publisher"] = content.strip() if isinstance(content, str) else ""
else:
meta["publisher"] = urlparse(url).hostname or ""
# Published date: article:published_time or datePublished
pub_time = soup.find("meta", property="article:published_time")
if pub_time and pub_time.get("content"):
content = pub_time["content"]
meta["published_at"] = content.strip() if isinstance(content, str) else None
else:
# Try JSON-LD datePublished
for script in soup.find_all("script", type="application/ld+json"):
if script.string and "datePublished" in script.string:
try:
ld = json.loads(script.string)
if isinstance(ld, dict) and "datePublished" in ld:
meta["published_at"] = str(ld["datePublished"])
break
if isinstance(ld, list):
for item in ld:
if isinstance(item, dict) and "datePublished" in item:
meta["published_at"] = str(item["datePublished"])
break
except (json.JSONDecodeError, TypeError):
pass
if "published_at" not in meta:
meta["published_at"] = None
# Canonical URL
canonical = soup.find("link", rel="canonical")
if canonical and canonical.get("href"):
href = canonical["href"]
meta["canonical_url"] = str(href) if href else normalize_url(url)
else:
og_url = soup.find("meta", property="og:url")
if og_url and og_url.get("content"):
content = og_url["content"]
meta["canonical_url"] = str(content) if content else normalize_url(url)
else:
meta["canonical_url"] = normalize_url(url)
# Language
html_tag = soup.find("html")
if html_tag and html_tag.get("lang"):
lang = html_tag["lang"]
meta["language"] = str(lang)[:5] if lang else "en"
else:
meta["language"] = "en"
# Description for summary
desc = soup.find("meta", property="og:description") or soup.find(
"meta", attrs={"name": "description"}
)
if desc and desc.get("content"):
content = desc["content"]
meta["description"] = content.strip() if isinstance(content, str) else ""
else:
meta["description"] = ""
return meta
def extract_body_text(html: str) -> str:
"""Extract main body text from HTML, stripping nav/footer/ads."""
soup = BeautifulSoup(html, "html.parser")
# Remove non-content elements
for tag in soup.find_all(
["script", "style", "nav", "footer", "header", "aside", "iframe", "noscript"]
):
tag.decompose()
# Try to find article body
article = soup.find("article")
if not article:
for div in soup.find_all("div"):
cls = div.get("class", [])
cls_str = " ".join(cls) if isinstance(cls, list) else str(cls) if cls else ""
if any(kw in cls_str for kw in ["article-body", "post-content", "entry-content", "story-body"]):
article = div
break
if article:
text = article.get_text(separator="\n", strip=True)
else:
# Fallback: use body
body = soup.find("body")
text = body.get_text(separator="\n", strip=True) if body else soup.get_text(separator="\n", strip=True)
# Collapse whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
return "\n".join(lines)
class WebScrapeAdapter(BaseAdapter):
"""Adapter for fetching curated web pages and article URLs.
Config options (from source config):
urls: List of URLs to scrape for this company
url: Single URL to scrape (alternative to urls)
timeout: Request timeout in seconds (default 30)
user_agent: Custom user agent string
follow_links: Whether to follow article links from index pages (default False)
max_pages: Max pages to fetch per cycle (default 5)
"""
def __init__(self) -> None:
pass
def source_type(self) -> str:
return "web_scrape"
def bucket_name(self) -> str:
"""Web scrape artifacts go to the news raw bucket."""
return "stonks-raw-news"
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch HTML from curated URLs for a given ticker.
Supports both single URL and multi-URL configs. Each URL is fetched,
HTML is preserved as raw payload, and metadata is extracted.
"""
urls = config.get("urls", [])
if not urls and config.get("url"):
urls = [config["url"]]
if not urls:
return self._error_result(ticker, "No URLs configured for web_scrape source", 0)
timeout = config.get("timeout", DEFAULT_TIMEOUT)
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
max_pages = min(config.get("max_pages", 5), 20)
items: list[dict[str, Any]] = []
all_raw: list[bytes] = []
total_elapsed = 0.0
errors: list[str] = []
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=True,
headers={"User-Agent": user_agent},
) as client:
for url in urls[:max_pages]:
t0 = time.monotonic()
try:
resp = await client.get(url)
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
resp.raise_for_status()
# Content length guard
if len(resp.content) > MAX_CONTENT_LENGTH:
errors.append(f"Content too large for {url}: {len(resp.content)} bytes")
continue
html = resp.text
raw_bytes = resp.content
all_raw.append(raw_bytes)
item_content_hash = content_hash(raw_bytes)
meta = extract_metadata_from_html(html, url)
body_text = extract_body_text(html)
item: dict[str, Any] = {
"url": url,
"canonical_url": meta.get("canonical_url", normalize_url(url)),
"title": meta.get("title", ""),
"author": meta.get("author", ""),
"publisher": meta.get("publisher", ""),
"published_at": meta.get("published_at"),
"language": meta.get("language", "en"),
"description": meta.get("description", ""),
"content_hash": item_content_hash,
"body_text": body_text,
"body_length": len(body_text),
"html_length": len(html),
"http_status": resp.status_code,
"response_time_ms": round(elapsed_ms, 1),
}
items.append(item)
except httpx.HTTPStatusError as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
status = e.response.status_code if e.response else None
errors.append(f"HTTP {status} for {url}: {e}")
logger.warning("Scrape HTTP error for %s/%s: %s", ticker, url, e)
except httpx.TimeoutException as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
errors.append(f"Timeout for {url}: {e}")
logger.warning("Scrape timeout for %s/%s: %s", ticker, url, e)
except Exception as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
errors.append(f"Error for {url}: {e}")
logger.warning("Scrape error for %s/%s: %s", ticker, url, e)
if not items:
error_msg = "; ".join(errors) if errors else "No pages fetched"
return self._error_result(ticker, error_msg, total_elapsed)
# Combine all raw payloads into a single artifact
combined_raw = json.dumps({
"ticker": ticker,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"pages": [
{
"url": item["url"],
"content_hash": item["content_hash"],
"html_length": item["html_length"],
"body_length": item["body_length"],
}
for item in items
],
"errors": errors,
}).encode("utf-8")
combined_hash = content_hash(
b"".join(item["content_hash"].encode() for item in items)
)
return AdapterResult(
source_type="web_scrape",
ticker=ticker,
items=items,
raw_payload=combined_raw,
content_hash=combined_hash,
fetched_at=datetime.now(timezone.utc),
http_status=200,
response_time_ms=round(total_elapsed, 1),
metadata={
"provider": "web_scrape",
"pages_fetched": len(items),
"pages_failed": len(errors),
"errors": errors,
},
)
def _error_result(
self,
ticker: str,
error: str,
elapsed_ms: float,
) -> AdapterResult:
"""Build an error AdapterResult for scrape fetches."""
return AdapterResult(
source_type="web_scrape",
ticker=ticker,
items=[],
raw_payload=b"",
content_hash="",
fetched_at=datetime.now(timezone.utc),
error=error,
http_status=None,
response_time_ms=round(elapsed_ms, 1),
metadata={"provider": "web_scrape"},
)