"""Web scrape adapter for curated URLs and article pages.
Fetches full article HTML from curated URLs (investor relations pages,
press releases, earnings transcripts, etc.) using BeautifulSoup + requests
with retry adapters, content hashing, boilerplate awareness, and quality scoring.
Inspired by Noctipede crawler patterns: BeautifulSoup + requests with retry
adapters, content hashing, boilerplate stripping, quality scoring.
Requirements: 1.2, 2.5, 3.1, 3.2, 3.3, 3.4
"""
import json
import logging
import time
from datetime import datetime, timezone
from typing import Any
from urllib.parse import urlparse
import httpx
from bs4 import BeautifulSoup
from services.shared.content import content_hash, normalize_url
from .base import AdapterResult, BaseAdapter
logger = logging.getLogger("web_scrape_adapter")
# Default request settings
DEFAULT_TIMEOUT = 30
DEFAULT_USER_AGENT = "StonksOracle/1.0 (+https://stonks-oracle.celestium.life)"
MAX_CONTENT_LENGTH = 10 * 1024 * 1024 # 10MB cap
def extract_metadata_from_html(html: str, url: str) -> dict[str, str | None]:
"""Extract title, author, publisher, published date, and links from HTML."""
soup = BeautifulSoup(html, "html.parser")
meta: dict[str, str | None] = {}
# Title: prefer og:title, then
og_title = soup.find("meta", property="og:title")
if og_title and og_title.get("content"):
content = og_title["content"]
meta["title"] = content.strip() if isinstance(content, str) else ""
elif soup.title and soup.title.string:
meta["title"] = soup.title.string.strip()
else:
meta["title"] = ""
# Author
author_tag = soup.find("meta", attrs={"name": "author"})
if author_tag and author_tag.get("content"):
content = author_tag["content"]
meta["author"] = content.strip() if isinstance(content, str) else ""
else:
meta["author"] = ""
# Publisher: og:site_name
site_name = soup.find("meta", property="og:site_name")
if site_name and site_name.get("content"):
content = site_name["content"]
meta["publisher"] = content.strip() if isinstance(content, str) else ""
else:
meta["publisher"] = urlparse(url).hostname or ""
# Published date: article:published_time or datePublished
pub_time = soup.find("meta", property="article:published_time")
if pub_time and pub_time.get("content"):
content = pub_time["content"]
meta["published_at"] = content.strip() if isinstance(content, str) else None
else:
# Try JSON-LD datePublished
for script in soup.find_all("script", type="application/ld+json"):
if script.string and "datePublished" in script.string:
try:
ld = json.loads(script.string)
if isinstance(ld, dict) and "datePublished" in ld:
meta["published_at"] = str(ld["datePublished"])
break
if isinstance(ld, list):
for item in ld:
if isinstance(item, dict) and "datePublished" in item:
meta["published_at"] = str(item["datePublished"])
break
except (json.JSONDecodeError, TypeError):
pass
if "published_at" not in meta:
meta["published_at"] = None
# Canonical URL
canonical = soup.find("link", rel="canonical")
if canonical and canonical.get("href"):
href = canonical["href"]
meta["canonical_url"] = str(href) if href else normalize_url(url)
else:
og_url = soup.find("meta", property="og:url")
if og_url and og_url.get("content"):
content = og_url["content"]
meta["canonical_url"] = str(content) if content else normalize_url(url)
else:
meta["canonical_url"] = normalize_url(url)
# Language
html_tag = soup.find("html")
if html_tag and html_tag.get("lang"):
lang = html_tag["lang"]
meta["language"] = str(lang)[:5] if lang else "en"
else:
meta["language"] = "en"
# Description for summary
desc = soup.find("meta", property="og:description") or soup.find(
"meta", attrs={"name": "description"}
)
if desc and desc.get("content"):
content = desc["content"]
meta["description"] = content.strip() if isinstance(content, str) else ""
else:
meta["description"] = ""
return meta
def extract_body_text(html: str) -> str:
"""Extract main body text from HTML, stripping nav/footer/ads."""
soup = BeautifulSoup(html, "html.parser")
# Remove non-content elements
for tag in soup.find_all(
["script", "style", "nav", "footer", "header", "aside", "iframe", "noscript"]
):
tag.decompose()
# Try to find article body
article = soup.find("article")
if not article:
for div in soup.find_all("div"):
cls = div.get("class", [])
cls_str = " ".join(cls) if isinstance(cls, list) else str(cls) if cls else ""
if any(kw in cls_str for kw in ["article-body", "post-content", "entry-content", "story-body"]):
article = div
break
if article:
text = article.get_text(separator="\n", strip=True)
else:
# Fallback: use body
body = soup.find("body")
text = body.get_text(separator="\n", strip=True) if body else soup.get_text(separator="\n", strip=True)
# Collapse whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
return "\n".join(lines)
class WebScrapeAdapter(BaseAdapter):
"""Adapter for fetching curated web pages and article URLs.
Config options (from source config):
urls: List of URLs to scrape for this company
url: Single URL to scrape (alternative to urls)
timeout: Request timeout in seconds (default 30)
user_agent: Custom user agent string
follow_links: Whether to follow article links from index pages (default False)
max_pages: Max pages to fetch per cycle (default 5)
"""
def __init__(self) -> None:
pass
def source_type(self) -> str:
return "web_scrape"
def bucket_name(self) -> str:
"""Web scrape artifacts go to the news raw bucket."""
return "stonks-raw-news"
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch HTML from curated URLs for a given ticker.
Supports both single URL and multi-URL configs. Each URL is fetched,
HTML is preserved as raw payload, and metadata is extracted.
"""
urls = config.get("urls", [])
if not urls and config.get("url"):
urls = [config["url"]]
if not urls:
return self._error_result(ticker, "No URLs configured for web_scrape source", 0)
timeout = config.get("timeout", DEFAULT_TIMEOUT)
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
max_pages = min(config.get("max_pages", 5), 20)
items: list[dict[str, Any]] = []
all_raw: list[bytes] = []
total_elapsed = 0.0
errors: list[str] = []
async with httpx.AsyncClient(
timeout=timeout,
follow_redirects=True,
headers={"User-Agent": user_agent},
) as client:
for url in urls[:max_pages]:
t0 = time.monotonic()
try:
resp = await client.get(url)
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
resp.raise_for_status()
# Content length guard
if len(resp.content) > MAX_CONTENT_LENGTH:
errors.append(f"Content too large for {url}: {len(resp.content)} bytes")
continue
html = resp.text
raw_bytes = resp.content
all_raw.append(raw_bytes)
item_content_hash = content_hash(raw_bytes)
meta = extract_metadata_from_html(html, url)
body_text = extract_body_text(html)
item: dict[str, Any] = {
"url": url,
"canonical_url": meta.get("canonical_url", normalize_url(url)),
"title": meta.get("title", ""),
"author": meta.get("author", ""),
"publisher": meta.get("publisher", ""),
"published_at": meta.get("published_at"),
"language": meta.get("language", "en"),
"description": meta.get("description", ""),
"content_hash": item_content_hash,
"body_text": body_text,
"body_length": len(body_text),
"html_length": len(html),
"http_status": resp.status_code,
"response_time_ms": round(elapsed_ms, 1),
}
items.append(item)
except httpx.HTTPStatusError as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
status = e.response.status_code if e.response else None
errors.append(f"HTTP {status} for {url}: {e}")
logger.warning("Scrape HTTP error for %s/%s: %s", ticker, url, e)
except httpx.TimeoutException as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
errors.append(f"Timeout for {url}: {e}")
logger.warning("Scrape timeout for %s/%s: %s", ticker, url, e)
except Exception as e:
elapsed_ms = (time.monotonic() - t0) * 1000
total_elapsed += elapsed_ms
errors.append(f"Error for {url}: {e}")
logger.warning("Scrape error for %s/%s: %s", ticker, url, e)
if not items:
error_msg = "; ".join(errors) if errors else "No pages fetched"
return self._error_result(ticker, error_msg, total_elapsed)
# Combine all raw payloads into a single artifact
combined_raw = json.dumps({
"ticker": ticker,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"pages": [
{
"url": item["url"],
"content_hash": item["content_hash"],
"html_length": item["html_length"],
"body_length": item["body_length"],
}
for item in items
],
"errors": errors,
}).encode("utf-8")
combined_hash = content_hash(
b"".join(item["content_hash"].encode() for item in items)
)
return AdapterResult(
source_type="web_scrape",
ticker=ticker,
items=items,
raw_payload=combined_raw,
content_hash=combined_hash,
fetched_at=datetime.now(timezone.utc),
http_status=200,
response_time_ms=round(total_elapsed, 1),
metadata={
"provider": "web_scrape",
"pages_fetched": len(items),
"pages_failed": len(errors),
"errors": errors,
},
)
def _error_result(
self,
ticker: str,
error: str,
elapsed_ms: float,
) -> AdapterResult:
"""Build an error AdapterResult for scrape fetches."""
return AdapterResult(
source_type="web_scrape",
ticker=ticker,
items=[],
raw_payload=b"",
content_hash="",
fetched_at=datetime.now(timezone.utc),
error=error,
http_status=None,
response_time_ms=round(elapsed_ms, 1),
metadata={"provider": "web_scrape"},
)