Files
stonks-oracle/services/extractor/exposure_inference.py
T

395 lines
13 KiB
Python

"""Exposure profile auto-inference from filing extractions.
Infers baseline exposure profiles from company filing extractions when
no manual profile exists. Scans recent filing extractions for geographic
revenue breakdowns, supplier mentions, and commodity references.
Requirements: 9.1, 9.2, 9.3
"""
from __future__ import annotations
import logging
import re
from collections import defaultdict
from services.aggregation.interpolation import build_default_profile
from services.shared.schemas import (
DocumentIntelligence,
ExposureProfileSchema,
MarketPositionTier,
)
logger = logging.getLogger("exposure_inference")
# ---------------------------------------------------------------------------
# Known region patterns for geographic extraction
# ---------------------------------------------------------------------------
_REGION_KEYWORDS: dict[str, str] = {
"united states": "US",
"u.s.": "US",
"us": "US",
"america": "US",
"north america": "US",
"china": "CN",
"chinese": "CN",
"europe": "EU",
"european": "EU",
"eu": "EU",
"japan": "JP",
"japanese": "JP",
"germany": "DE",
"german": "DE",
"united kingdom": "GB",
"uk": "GB",
"britain": "GB",
"british": "GB",
"south korea": "KR",
"korea": "KR",
"india": "IN",
"indian": "IN",
"brazil": "BR",
"brazilian": "BR",
"australia": "AU",
"australian": "AU",
"canada": "CA",
"canadian": "CA",
"taiwan": "TW",
"saudi arabia": "SA",
"russia": "RU",
"russian": "RU",
"mexico": "MX",
"singapore": "SG",
"asia": "CN",
"asia pacific": "CN",
"latin america": "BR",
"middle east": "SA",
}
# ---------------------------------------------------------------------------
# Known commodity patterns
# ---------------------------------------------------------------------------
_COMMODITY_KEYWORDS: dict[str, str] = {
"crude oil": "crude_oil",
"oil": "crude_oil",
"petroleum": "crude_oil",
"natural gas": "natural_gas",
"gas": "natural_gas",
"copper": "copper",
"steel": "steel",
"lithium": "lithium",
"semiconductor": "semiconductors",
"semiconductors": "semiconductors",
"chip": "semiconductors",
"chips": "semiconductors",
"wheat": "wheat",
"corn": "corn",
"gold": "gold",
"aluminum": "aluminum",
"aluminium": "aluminum",
"nickel": "nickel",
"cobalt": "cobalt",
"rare earth": "rare_earth",
}
# Minimum number of filing documents to consider inference meaningful
_MIN_FILINGS_FOR_INFERENCE = 1
# Minimum total mentions to consider a region significant
_MIN_REGION_MENTIONS = 1
# Minimum total mentions to consider a commodity significant
_MIN_COMMODITY_MENTIONS = 1
# ---------------------------------------------------------------------------
# Text scanning helpers
# ---------------------------------------------------------------------------
def _extract_regions_from_text(text: str) -> dict[str, int]:
"""Extract region mentions from text, returning region_code -> count."""
text_lower = text.lower()
region_counts: dict[str, int] = defaultdict(int)
for keyword, code in _REGION_KEYWORDS.items():
# Use word boundary matching for short keywords
if len(keyword) <= 3:
pattern = rf"\b{re.escape(keyword)}\b"
matches = re.findall(pattern, text_lower)
else:
matches = re.findall(re.escape(keyword), text_lower)
if matches:
region_counts[code] += len(matches)
return dict(region_counts)
def _extract_commodities_from_text(text: str) -> dict[str, int]:
"""Extract commodity mentions from text, returning commodity_id -> count."""
text_lower = text.lower()
commodity_counts: dict[str, int] = defaultdict(int)
for keyword, commodity_id in _COMMODITY_KEYWORDS.items():
if len(keyword) <= 4:
pattern = rf"\b{re.escape(keyword)}\b"
matches = re.findall(pattern, text_lower)
else:
matches = re.findall(re.escape(keyword), text_lower)
if matches:
commodity_counts[commodity_id] += len(matches)
return dict(commodity_counts)
def _extract_supply_chain_regions(text: str) -> set[str]:
"""Extract supply chain region mentions from text."""
supply_keywords = [
"supplier", "supply chain", "sourcing", "manufacturing",
"factory", "plant", "warehouse", "distribution",
"import", "export", "procurement",
]
text_lower = text.lower()
regions: set[str] = set()
for keyword in supply_keywords:
if keyword in text_lower:
# Find regions mentioned near supply chain keywords
# Look within a window around each occurrence
for match in re.finditer(re.escape(keyword), text_lower):
start = max(0, match.start() - 200)
end = min(len(text_lower), match.end() + 200)
window = text_lower[start:end]
window_regions = _extract_regions_from_text(window)
regions.update(window_regions.keys())
return regions
# ---------------------------------------------------------------------------
# Revenue mix estimation
# ---------------------------------------------------------------------------
def _estimate_revenue_mix(region_counts: dict[str, int]) -> dict[str, float]:
"""Estimate geographic revenue mix from region mention counts.
Uses mention frequency as a proxy for revenue distribution.
Normalizes to sum to 1.0.
"""
if not region_counts:
return {}
total = sum(region_counts.values())
if total == 0:
return {}
mix = {
region: round(count / total, 4)
for region, count in region_counts.items()
if count >= _MIN_REGION_MENTIONS
}
# Re-normalize after filtering
mix_total = sum(mix.values())
if mix_total > 0 and abs(mix_total - 1.0) > 0.001:
mix = {r: round(v / mix_total, 4) for r, v in mix.items()}
return mix
# ---------------------------------------------------------------------------
# Confidence scoring
# ---------------------------------------------------------------------------
def _compute_inference_confidence(
num_filings: int,
num_regions: int,
num_commodities: int,
total_mentions: int,
) -> float:
"""Compute confidence score for the inferred profile.
Higher confidence when more filings are available and more
geographic/commodity data points are found.
"""
# Base confidence from number of filings (more filings = more reliable)
filing_factor = min(num_filings / 5.0, 1.0) # saturates at 5 filings
# Data richness factor
data_points = num_regions + num_commodities
richness_factor = min(data_points / 8.0, 1.0) # saturates at 8 data points
# Mention volume factor
volume_factor = min(total_mentions / 20.0, 1.0) # saturates at 20 mentions
confidence = 0.4 * filing_factor + 0.35 * richness_factor + 0.25 * volume_factor
return round(max(0.0, min(1.0, confidence)), 4)
# ---------------------------------------------------------------------------
# Main inference function
# ---------------------------------------------------------------------------
def infer_exposure_profile(
document_intelligences: list[DocumentIntelligence],
sector: str,
industry: str,
market_cap_bucket: str,
) -> ExposureProfileSchema:
"""Infer a baseline exposure profile from filing extractions.
Scans recent filing extractions for geographic revenue breakdowns,
supplier mentions, and commodity references. Produces an
ExposureProfile with source='inferred' and a confidence score
reflecting data quality.
Falls back to sector-based default profile when insufficient
filing data is available.
Args:
document_intelligences: List of DocumentIntelligence from recent filings.
sector: Company's GICS sector name.
industry: Company's industry name.
market_cap_bucket: One of large_cap, mid_cap, small_cap, micro_cap.
Returns:
An ExposureProfileSchema with source='inferred'.
Requirements: 9.1, 9.2, 9.3
"""
# Filter to filing-type documents
filings = [
di for di in document_intelligences
if di.document_type.value in ("filing", "transcript")
]
if len(filings) < _MIN_FILINGS_FOR_INFERENCE:
logger.info(
"Insufficient filing data (%d filings) for inference, "
"falling back to sector-based default profile",
len(filings),
)
return build_default_profile(sector, industry, market_cap_bucket)
# Aggregate region and commodity mentions across all filings
all_region_counts: dict[str, int] = defaultdict(int)
all_commodity_counts: dict[str, int] = defaultdict(int)
all_supply_regions: set[str] = set()
for filing in filings:
# Scan summary text
if filing.summary:
regions = _extract_regions_from_text(filing.summary)
for r, c in regions.items():
all_region_counts[r] += c
commodities = _extract_commodities_from_text(filing.summary)
for com, c in commodities.items():
all_commodity_counts[com] += c
supply_regions = _extract_supply_chain_regions(filing.summary)
all_supply_regions.update(supply_regions)
# Scan company impacts for geographic and commodity mentions
for company in filing.companies:
# Key facts and evidence spans contain geographic details
for text in company.key_facts + company.evidence_spans:
regions = _extract_regions_from_text(text)
for r, c in regions.items():
all_region_counts[r] += c
commodities = _extract_commodities_from_text(text)
for com, c in commodities.items():
all_commodity_counts[com] += c
supply_regions = _extract_supply_chain_regions(text)
all_supply_regions.update(supply_regions)
# Scan macro themes for commodity/region hints
for theme in filing.macro_themes:
regions = _extract_regions_from_text(theme)
for r, c in regions.items():
all_region_counts[r] += c
commodities = _extract_commodities_from_text(theme)
for com, c in commodities.items():
all_commodity_counts[com] += c
# Check if we have enough data to infer
total_mentions = sum(all_region_counts.values()) + sum(all_commodity_counts.values())
has_regions = len(all_region_counts) > 0
has_commodities = len(all_commodity_counts) > 0
if not has_regions and not has_commodities:
logger.info(
"No geographic or commodity data found in %d filings, "
"falling back to sector-based default profile",
len(filings),
)
return build_default_profile(sector, industry, market_cap_bucket)
# Build the inferred profile
geographic_revenue_mix = _estimate_revenue_mix(dict(all_region_counts))
# Filter commodities by minimum mentions
key_commodities = [
com for com, count in all_commodity_counts.items()
if count >= _MIN_COMMODITY_MENTIONS
]
# Supply chain regions: combine extracted supply regions with geo regions
supply_chain_regions = list(all_supply_regions | set(geographic_revenue_mix.keys()))
# Market position tier from market cap bucket
from services.aggregation.interpolation import _CAP_TO_TIER
tier_value = _CAP_TO_TIER.get(market_cap_bucket, MarketPositionTier.REGIONAL.value)
# Regulatory jurisdictions: top regions by revenue
sorted_regions = sorted(
geographic_revenue_mix.items(), key=lambda x: x[1], reverse=True,
)
regulatory_jurisdictions = [r for r, _ in sorted_regions[:3]]
# Export dependency: fraction of revenue outside the top region
if geographic_revenue_mix:
top_region_pct = max(geographic_revenue_mix.values())
export_pct = round(1.0 - top_region_pct, 4)
else:
export_pct = 0.0
# Confidence score
confidence = _compute_inference_confidence(
num_filings=len(filings),
num_regions=len(all_region_counts),
num_commodities=len(all_commodity_counts),
total_mentions=total_mentions,
)
profile = ExposureProfileSchema(
company_id="",
geographic_revenue_mix=geographic_revenue_mix,
supply_chain_regions=supply_chain_regions,
key_input_commodities=key_commodities,
regulatory_jurisdictions=regulatory_jurisdictions,
market_position_tier=MarketPositionTier(tier_value),
export_dependency_pct=max(0.0, min(1.0, export_pct)),
source="inferred",
confidence=confidence,
version=1,
)
logger.info(
"Inferred exposure profile: regions=%d, commodities=%d, "
"supply_chain=%d, confidence=%.3f",
len(geographic_revenue_mix),
len(key_commodities),
len(supply_chain_regions),
confidence,
)
return profile