395 lines
13 KiB
Python
395 lines
13 KiB
Python
"""Exposure profile auto-inference from filing extractions.
|
|
|
|
Infers baseline exposure profiles from company filing extractions when
|
|
no manual profile exists. Scans recent filing extractions for geographic
|
|
revenue breakdowns, supplier mentions, and commodity references.
|
|
|
|
Requirements: 9.1, 9.2, 9.3
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
from services.aggregation.interpolation import build_default_profile
|
|
from services.shared.schemas import (
|
|
DocumentIntelligence,
|
|
ExposureProfileSchema,
|
|
MarketPositionTier,
|
|
)
|
|
|
|
logger = logging.getLogger("exposure_inference")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Known region patterns for geographic extraction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_REGION_KEYWORDS: dict[str, str] = {
|
|
"united states": "US",
|
|
"u.s.": "US",
|
|
"us": "US",
|
|
"america": "US",
|
|
"north america": "US",
|
|
"china": "CN",
|
|
"chinese": "CN",
|
|
"europe": "EU",
|
|
"european": "EU",
|
|
"eu": "EU",
|
|
"japan": "JP",
|
|
"japanese": "JP",
|
|
"germany": "DE",
|
|
"german": "DE",
|
|
"united kingdom": "GB",
|
|
"uk": "GB",
|
|
"britain": "GB",
|
|
"british": "GB",
|
|
"south korea": "KR",
|
|
"korea": "KR",
|
|
"india": "IN",
|
|
"indian": "IN",
|
|
"brazil": "BR",
|
|
"brazilian": "BR",
|
|
"australia": "AU",
|
|
"australian": "AU",
|
|
"canada": "CA",
|
|
"canadian": "CA",
|
|
"taiwan": "TW",
|
|
"saudi arabia": "SA",
|
|
"russia": "RU",
|
|
"russian": "RU",
|
|
"mexico": "MX",
|
|
"singapore": "SG",
|
|
"asia": "CN",
|
|
"asia pacific": "CN",
|
|
"latin america": "BR",
|
|
"middle east": "SA",
|
|
}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Known commodity patterns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_COMMODITY_KEYWORDS: dict[str, str] = {
|
|
"crude oil": "crude_oil",
|
|
"oil": "crude_oil",
|
|
"petroleum": "crude_oil",
|
|
"natural gas": "natural_gas",
|
|
"gas": "natural_gas",
|
|
"copper": "copper",
|
|
"steel": "steel",
|
|
"lithium": "lithium",
|
|
"semiconductor": "semiconductors",
|
|
"semiconductors": "semiconductors",
|
|
"chip": "semiconductors",
|
|
"chips": "semiconductors",
|
|
"wheat": "wheat",
|
|
"corn": "corn",
|
|
"gold": "gold",
|
|
"aluminum": "aluminum",
|
|
"aluminium": "aluminum",
|
|
"nickel": "nickel",
|
|
"cobalt": "cobalt",
|
|
"rare earth": "rare_earth",
|
|
}
|
|
|
|
# Minimum number of filing documents to consider inference meaningful
|
|
_MIN_FILINGS_FOR_INFERENCE = 1
|
|
|
|
# Minimum total mentions to consider a region significant
|
|
_MIN_REGION_MENTIONS = 1
|
|
|
|
# Minimum total mentions to consider a commodity significant
|
|
_MIN_COMMODITY_MENTIONS = 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Text scanning helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _extract_regions_from_text(text: str) -> dict[str, int]:
|
|
"""Extract region mentions from text, returning region_code -> count."""
|
|
text_lower = text.lower()
|
|
region_counts: dict[str, int] = defaultdict(int)
|
|
|
|
for keyword, code in _REGION_KEYWORDS.items():
|
|
# Use word boundary matching for short keywords
|
|
if len(keyword) <= 3:
|
|
pattern = rf"\b{re.escape(keyword)}\b"
|
|
matches = re.findall(pattern, text_lower)
|
|
else:
|
|
matches = re.findall(re.escape(keyword), text_lower)
|
|
if matches:
|
|
region_counts[code] += len(matches)
|
|
|
|
return dict(region_counts)
|
|
|
|
|
|
def _extract_commodities_from_text(text: str) -> dict[str, int]:
|
|
"""Extract commodity mentions from text, returning commodity_id -> count."""
|
|
text_lower = text.lower()
|
|
commodity_counts: dict[str, int] = defaultdict(int)
|
|
|
|
for keyword, commodity_id in _COMMODITY_KEYWORDS.items():
|
|
if len(keyword) <= 4:
|
|
pattern = rf"\b{re.escape(keyword)}\b"
|
|
matches = re.findall(pattern, text_lower)
|
|
else:
|
|
matches = re.findall(re.escape(keyword), text_lower)
|
|
if matches:
|
|
commodity_counts[commodity_id] += len(matches)
|
|
|
|
return dict(commodity_counts)
|
|
|
|
|
|
def _extract_supply_chain_regions(text: str) -> set[str]:
|
|
"""Extract supply chain region mentions from text."""
|
|
supply_keywords = [
|
|
"supplier", "supply chain", "sourcing", "manufacturing",
|
|
"factory", "plant", "warehouse", "distribution",
|
|
"import", "export", "procurement",
|
|
]
|
|
text_lower = text.lower()
|
|
|
|
regions: set[str] = set()
|
|
for keyword in supply_keywords:
|
|
if keyword in text_lower:
|
|
# Find regions mentioned near supply chain keywords
|
|
# Look within a window around each occurrence
|
|
for match in re.finditer(re.escape(keyword), text_lower):
|
|
start = max(0, match.start() - 200)
|
|
end = min(len(text_lower), match.end() + 200)
|
|
window = text_lower[start:end]
|
|
window_regions = _extract_regions_from_text(window)
|
|
regions.update(window_regions.keys())
|
|
|
|
return regions
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Revenue mix estimation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _estimate_revenue_mix(region_counts: dict[str, int]) -> dict[str, float]:
|
|
"""Estimate geographic revenue mix from region mention counts.
|
|
|
|
Uses mention frequency as a proxy for revenue distribution.
|
|
Normalizes to sum to 1.0.
|
|
"""
|
|
if not region_counts:
|
|
return {}
|
|
|
|
total = sum(region_counts.values())
|
|
if total == 0:
|
|
return {}
|
|
|
|
mix = {
|
|
region: round(count / total, 4)
|
|
for region, count in region_counts.items()
|
|
if count >= _MIN_REGION_MENTIONS
|
|
}
|
|
|
|
# Re-normalize after filtering
|
|
mix_total = sum(mix.values())
|
|
if mix_total > 0 and abs(mix_total - 1.0) > 0.001:
|
|
mix = {r: round(v / mix_total, 4) for r, v in mix.items()}
|
|
|
|
return mix
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Confidence scoring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _compute_inference_confidence(
|
|
num_filings: int,
|
|
num_regions: int,
|
|
num_commodities: int,
|
|
total_mentions: int,
|
|
) -> float:
|
|
"""Compute confidence score for the inferred profile.
|
|
|
|
Higher confidence when more filings are available and more
|
|
geographic/commodity data points are found.
|
|
"""
|
|
# Base confidence from number of filings (more filings = more reliable)
|
|
filing_factor = min(num_filings / 5.0, 1.0) # saturates at 5 filings
|
|
|
|
# Data richness factor
|
|
data_points = num_regions + num_commodities
|
|
richness_factor = min(data_points / 8.0, 1.0) # saturates at 8 data points
|
|
|
|
# Mention volume factor
|
|
volume_factor = min(total_mentions / 20.0, 1.0) # saturates at 20 mentions
|
|
|
|
confidence = 0.4 * filing_factor + 0.35 * richness_factor + 0.25 * volume_factor
|
|
return round(max(0.0, min(1.0, confidence)), 4)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main inference function
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def infer_exposure_profile(
|
|
document_intelligences: list[DocumentIntelligence],
|
|
sector: str,
|
|
industry: str,
|
|
market_cap_bucket: str,
|
|
) -> ExposureProfileSchema:
|
|
"""Infer a baseline exposure profile from filing extractions.
|
|
|
|
Scans recent filing extractions for geographic revenue breakdowns,
|
|
supplier mentions, and commodity references. Produces an
|
|
ExposureProfile with source='inferred' and a confidence score
|
|
reflecting data quality.
|
|
|
|
Falls back to sector-based default profile when insufficient
|
|
filing data is available.
|
|
|
|
Args:
|
|
document_intelligences: List of DocumentIntelligence from recent filings.
|
|
sector: Company's GICS sector name.
|
|
industry: Company's industry name.
|
|
market_cap_bucket: One of large_cap, mid_cap, small_cap, micro_cap.
|
|
|
|
Returns:
|
|
An ExposureProfileSchema with source='inferred'.
|
|
|
|
Requirements: 9.1, 9.2, 9.3
|
|
"""
|
|
# Filter to filing-type documents
|
|
filings = [
|
|
di for di in document_intelligences
|
|
if di.document_type.value in ("filing", "transcript")
|
|
]
|
|
|
|
if len(filings) < _MIN_FILINGS_FOR_INFERENCE:
|
|
logger.info(
|
|
"Insufficient filing data (%d filings) for inference, "
|
|
"falling back to sector-based default profile",
|
|
len(filings),
|
|
)
|
|
return build_default_profile(sector, industry, market_cap_bucket)
|
|
|
|
# Aggregate region and commodity mentions across all filings
|
|
all_region_counts: dict[str, int] = defaultdict(int)
|
|
all_commodity_counts: dict[str, int] = defaultdict(int)
|
|
all_supply_regions: set[str] = set()
|
|
|
|
for filing in filings:
|
|
# Scan summary text
|
|
if filing.summary:
|
|
regions = _extract_regions_from_text(filing.summary)
|
|
for r, c in regions.items():
|
|
all_region_counts[r] += c
|
|
|
|
commodities = _extract_commodities_from_text(filing.summary)
|
|
for com, c in commodities.items():
|
|
all_commodity_counts[com] += c
|
|
|
|
supply_regions = _extract_supply_chain_regions(filing.summary)
|
|
all_supply_regions.update(supply_regions)
|
|
|
|
# Scan company impacts for geographic and commodity mentions
|
|
for company in filing.companies:
|
|
# Key facts and evidence spans contain geographic details
|
|
for text in company.key_facts + company.evidence_spans:
|
|
regions = _extract_regions_from_text(text)
|
|
for r, c in regions.items():
|
|
all_region_counts[r] += c
|
|
|
|
commodities = _extract_commodities_from_text(text)
|
|
for com, c in commodities.items():
|
|
all_commodity_counts[com] += c
|
|
|
|
supply_regions = _extract_supply_chain_regions(text)
|
|
all_supply_regions.update(supply_regions)
|
|
|
|
# Scan macro themes for commodity/region hints
|
|
for theme in filing.macro_themes:
|
|
regions = _extract_regions_from_text(theme)
|
|
for r, c in regions.items():
|
|
all_region_counts[r] += c
|
|
|
|
commodities = _extract_commodities_from_text(theme)
|
|
for com, c in commodities.items():
|
|
all_commodity_counts[com] += c
|
|
|
|
# Check if we have enough data to infer
|
|
total_mentions = sum(all_region_counts.values()) + sum(all_commodity_counts.values())
|
|
has_regions = len(all_region_counts) > 0
|
|
has_commodities = len(all_commodity_counts) > 0
|
|
|
|
if not has_regions and not has_commodities:
|
|
logger.info(
|
|
"No geographic or commodity data found in %d filings, "
|
|
"falling back to sector-based default profile",
|
|
len(filings),
|
|
)
|
|
return build_default_profile(sector, industry, market_cap_bucket)
|
|
|
|
# Build the inferred profile
|
|
geographic_revenue_mix = _estimate_revenue_mix(dict(all_region_counts))
|
|
|
|
# Filter commodities by minimum mentions
|
|
key_commodities = [
|
|
com for com, count in all_commodity_counts.items()
|
|
if count >= _MIN_COMMODITY_MENTIONS
|
|
]
|
|
|
|
# Supply chain regions: combine extracted supply regions with geo regions
|
|
supply_chain_regions = list(all_supply_regions | set(geographic_revenue_mix.keys()))
|
|
|
|
# Market position tier from market cap bucket
|
|
from services.aggregation.interpolation import _CAP_TO_TIER
|
|
tier_value = _CAP_TO_TIER.get(market_cap_bucket, MarketPositionTier.REGIONAL.value)
|
|
|
|
# Regulatory jurisdictions: top regions by revenue
|
|
sorted_regions = sorted(
|
|
geographic_revenue_mix.items(), key=lambda x: x[1], reverse=True,
|
|
)
|
|
regulatory_jurisdictions = [r for r, _ in sorted_regions[:3]]
|
|
|
|
# Export dependency: fraction of revenue outside the top region
|
|
if geographic_revenue_mix:
|
|
top_region_pct = max(geographic_revenue_mix.values())
|
|
export_pct = round(1.0 - top_region_pct, 4)
|
|
else:
|
|
export_pct = 0.0
|
|
|
|
# Confidence score
|
|
confidence = _compute_inference_confidence(
|
|
num_filings=len(filings),
|
|
num_regions=len(all_region_counts),
|
|
num_commodities=len(all_commodity_counts),
|
|
total_mentions=total_mentions,
|
|
)
|
|
|
|
profile = ExposureProfileSchema(
|
|
company_id="",
|
|
geographic_revenue_mix=geographic_revenue_mix,
|
|
supply_chain_regions=supply_chain_regions,
|
|
key_input_commodities=key_commodities,
|
|
regulatory_jurisdictions=regulatory_jurisdictions,
|
|
market_position_tier=MarketPositionTier(tier_value),
|
|
export_dependency_pct=max(0.0, min(1.0, export_pct)),
|
|
source="inferred",
|
|
confidence=confidence,
|
|
version=1,
|
|
)
|
|
|
|
logger.info(
|
|
"Inferred exposure profile: regions=%d, commodities=%d, "
|
|
"supply_chain=%d, confidence=%.3f",
|
|
len(geographic_revenue_mix),
|
|
len(key_commodities),
|
|
len(supply_chain_regions),
|
|
confidence,
|
|
)
|
|
|
|
return profile
|