Files
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

2642 lines
99 KiB
Python

"""Property-based tests for the macro pipeline.
Feature: global-news-interpolation
Uses Hypothesis to validate correctness properties of the event classifier
and macro impact pipeline.
"""
from __future__ import annotations
import json
from hypothesis import given, settings
from hypothesis import strategies as st
from services.extractor.event_classifier import (
GlobalEvent,
_parse_classification_response,
)
from services.shared.schemas import (
EstimatedDuration,
ImpactType,
MacroImpactRecordSchema,
ModelMetadata,
SeverityLevel,
)
# ---------------------------------------------------------------------------
# Hypothesis strategies for valid Ollama classification responses
# ---------------------------------------------------------------------------
_VALID_IMPACT_TYPES = [e.value for e in ImpactType]
_VALID_SEVERITY_LEVELS = [e.value for e in SeverityLevel]
_VALID_DURATIONS = [e.value for e in EstimatedDuration]
def _ollama_classification_response() -> st.SearchStrategy[str]:
"""Generate random valid JSON matching the event classification schema."""
return st.fixed_dictionaries({
"event_types": st.lists(
st.sampled_from(_VALID_IMPACT_TYPES),
min_size=1,
max_size=len(_VALID_IMPACT_TYPES),
),
"severity": st.sampled_from(_VALID_SEVERITY_LEVELS),
"affected_regions": st.lists(
st.text(
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd")),
min_size=1,
max_size=10,
),
min_size=0,
max_size=8,
),
"affected_sectors": st.lists(
st.text(
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd", "Zs")),
min_size=1,
max_size=30,
),
min_size=0,
max_size=6,
),
"affected_commodities": st.lists(
st.text(
alphabet=st.characters(whitelist_categories=("Ll", "Nd"), whitelist_characters="_"),
min_size=1,
max_size=20,
),
min_size=0,
max_size=5,
),
"summary": st.text(min_size=1, max_size=200).filter(lambda s: s.strip()),
"key_facts": st.lists(
st.text(min_size=1, max_size=100),
min_size=0,
max_size=5,
),
"estimated_duration": st.sampled_from(_VALID_DURATIONS),
"confidence": st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
}).map(json.dumps)
# ---------------------------------------------------------------------------
# Property 2: Macro pipeline output schema completeness
# ---------------------------------------------------------------------------
class TestProperty2MacroPipelineOutputSchemaCompleteness:
"""Feature: global-news-interpolation, Property 2: Macro pipeline output schema completeness
For any valid Ollama classification response, the resulting GlobalEvent
object SHALL contain all required fields (event_id, event_types, severity,
affected_regions, affected_sectors, summary, estimated_duration, confidence,
source_document_id, model_metadata). Similarly, for any valid macro impact
computation, the resulting MacroImpactRecord SHALL contain all required fields.
Validates: Requirements 2.2, 4.5
"""
@given(raw_json=_ollama_classification_response())
@settings(max_examples=100)
def test_global_event_has_all_required_fields(self, raw_json: str):
"""**Validates: Requirements 2.2**
Parsed GlobalEvent must contain every required field with correct types.
"""
event = _parse_classification_response(raw_json, "doc-test-123", "test-model")
# --- All required fields exist and are not None ---
assert event.event_id is not None and isinstance(event.event_id, str)
assert event.event_types is not None and isinstance(event.event_types, list)
assert event.severity is not None and isinstance(event.severity, str)
assert event.affected_regions is not None and isinstance(event.affected_regions, list)
assert event.affected_sectors is not None and isinstance(event.affected_sectors, list)
assert event.summary is not None and isinstance(event.summary, str)
assert event.estimated_duration is not None and isinstance(event.estimated_duration, str)
assert event.confidence is not None and isinstance(event.confidence, float)
assert event.source_document_id is not None and isinstance(event.source_document_id, str)
assert event.model_metadata is not None and isinstance(event.model_metadata, ModelMetadata)
# --- event_types is non-empty (normalization guarantees at least one) ---
assert len(event.event_types) >= 1
for et in event.event_types:
assert et in {e.value for e in ImpactType}
# --- severity is a valid SeverityLevel ---
assert event.severity in {e.value for e in SeverityLevel}
# --- confidence is in [0, 1] ---
assert 0.0 <= event.confidence <= 1.0
# --- estimated_duration is a valid EstimatedDuration ---
assert event.estimated_duration in {e.value for e in EstimatedDuration}
# --- source_document_id is preserved from input ---
assert event.source_document_id == "doc-test-123"
# --- model_metadata has correct provider and model ---
assert event.model_metadata.provider == "ollama"
assert event.model_metadata.model_name == "test-model"
@given(
event_id=st.uuids().map(str),
company_id=st.uuids().map(str),
ticker=st.text(
alphabet=st.characters(whitelist_categories=("Lu",)),
min_size=1,
max_size=5,
),
score=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
direction=st.sampled_from(["positive", "negative", "mixed", "neutral"]),
factors=st.lists(st.text(min_size=1, max_size=50), min_size=0, max_size=5),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
)
@settings(max_examples=100)
def test_macro_impact_record_has_all_required_fields(
self,
event_id: str,
company_id: str,
ticker: str,
score: float,
direction: str,
factors: list[str],
confidence: float,
):
"""**Validates: Requirements 4.5**
MacroImpactRecordSchema must contain all required fields with correct types.
"""
record = MacroImpactRecordSchema(
event_id=event_id,
company_id=company_id,
ticker=ticker,
macro_impact_score=score,
impact_direction=direction,
contributing_factors=factors,
confidence=confidence,
)
# --- All required fields exist and have correct types ---
assert record.event_id is not None and isinstance(record.event_id, str)
assert record.company_id is not None and isinstance(record.company_id, str)
assert record.ticker is not None and isinstance(record.ticker, str)
assert isinstance(record.macro_impact_score, float)
assert record.impact_direction is not None and isinstance(record.impact_direction, str)
assert record.contributing_factors is not None and isinstance(record.contributing_factors, list)
assert isinstance(record.confidence, float)
# --- Score and confidence are in [0, 1] ---
assert 0.0 <= record.macro_impact_score <= 1.0
assert 0.0 <= record.confidence <= 1.0
# --- Values are preserved ---
assert record.event_id == event_id
assert record.company_id == company_id
assert record.ticker == ticker
# ---------------------------------------------------------------------------
# Property 3: Multiple impact types preserved
# ---------------------------------------------------------------------------
class TestProperty3MultipleImpactTypesPreserved:
"""Feature: global-news-interpolation, Property 3: Multiple impact types preserved
For any global event classification where the source article implies N
distinct impact types, the resulting GlobalEvent's event_types list SHALL
contain all N types without collapsing to a single category.
Validates: Requirements 2.4
"""
@given(
chosen_types=st.lists(
st.sampled_from(_VALID_IMPACT_TYPES),
min_size=1,
max_size=len(_VALID_IMPACT_TYPES),
unique=True,
),
)
@settings(max_examples=100)
def test_all_impact_types_preserved_after_parsing(self, chosen_types: list[str]):
"""**Validates: Requirements 2.4**
Given N distinct valid ImpactType values in the JSON response,
the parsed GlobalEvent.event_types must contain ALL N types.
"""
# Build a valid classification JSON with the chosen event_types
response_dict = {
"event_types": chosen_types,
"severity": "moderate",
"affected_regions": ["US"],
"affected_sectors": ["Energy"],
"affected_commodities": ["crude_oil"],
"summary": "Test event for impact type preservation.",
"key_facts": ["Fact one."],
"estimated_duration": "short_term",
"confidence": 0.8,
}
raw_json = json.dumps(response_dict)
event = _parse_classification_response(raw_json, "doc-types-test", "test-model")
# All original types must be present (no collapsing)
assert len(event.event_types) >= len(chosen_types), (
f"Expected at least {len(chosen_types)} types, got {len(event.event_types)}: "
f"{event.event_types}"
)
for t in chosen_types:
assert t in event.event_types, (
f"Impact type '{t}' was lost during parsing. "
f"Input: {chosen_types}, Output: {event.event_types}"
)
# ---------------------------------------------------------------------------
# Imports for Property 6
# ---------------------------------------------------------------------------
import copy
import uuid as _uuid
from dataclasses import dataclass, field
from datetime import datetime
from services.symbol_registry.exposure import ExposureProfileCreate
# ---------------------------------------------------------------------------
# Hypothesis strategy for valid ExposureProfileCreate data
# ---------------------------------------------------------------------------
_REGION_CODES = ["US", "CN", "DE", "JP", "GB", "KR", "IN", "BR", "AU", "CA"]
_COMMODITIES = ["crude_oil", "natural_gas", "copper", "lithium", "steel", "wheat", "corn"]
_JURISDICTIONS = ["US", "EU", "CN", "JP", "UK", "AU"]
_TIERS = ["global_leader", "multinational", "regional", "domestic"]
_SOURCES = ["manual", "inferred"]
def _geo_revenue_mix() -> st.SearchStrategy[dict[str, float]]:
"""Generate a geographic revenue mix that sums to ~1.0."""
return st.lists(
st.sampled_from(_REGION_CODES),
min_size=1,
max_size=5,
unique=True,
).flatmap(
lambda regions: st.lists(
st.floats(min_value=0.01, max_value=1.0, allow_nan=False, allow_infinity=False),
min_size=len(regions),
max_size=len(regions),
).map(lambda vals: {r: round(v / sum(vals), 4) for r, v in zip(regions, vals)})
)
def _exposure_profile_create_strategy() -> st.SearchStrategy[ExposureProfileCreate]:
"""Generate random valid ExposureProfileCreate instances."""
return st.builds(
ExposureProfileCreate,
geographic_revenue_mix=_geo_revenue_mix(),
supply_chain_regions=st.lists(
st.sampled_from(_REGION_CODES), min_size=0, max_size=4, unique=True,
),
key_input_commodities=st.lists(
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
),
regulatory_jurisdictions=st.lists(
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
),
market_position_tier=st.sampled_from(_TIERS),
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
source=st.sampled_from(_SOURCES),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
)
# ---------------------------------------------------------------------------
# Simulated version history logic (mirrors the DB-backed upsert in exposure.py)
# ---------------------------------------------------------------------------
@dataclass
class _VersionedProfile:
"""An archived or active exposure profile snapshot."""
version: int
active: bool
profile_data: ExposureProfileCreate
created_at: datetime = field(default_factory=lambda: datetime.now(tz=None))
def _simulate_version_history(
updates: list[ExposureProfileCreate],
) -> list[_VersionedProfile]:
"""Simulate the versioning logic from the PUT endpoint.
Each update:
1. Archives the previous active profile (active=False).
2. Inserts a new profile with incremented version and active=True.
Returns the full history ordered by version ascending.
"""
history: list[_VersionedProfile] = []
for idx, profile_data in enumerate(updates):
version = idx + 1
# Archive previous active entry
for entry in history:
if entry.active:
entry.active = False
# Insert new version as active
history.append(
_VersionedProfile(
version=version,
active=True,
profile_data=copy.deepcopy(profile_data),
)
)
return history
# ---------------------------------------------------------------------------
# Property 6: Exposure profile version history
# ---------------------------------------------------------------------------
class TestProperty6ExposureProfileVersionHistory:
"""Feature: global-news-interpolation, Property 6: Exposure profile version history
For any sequence of N updates to a company's ExposureProfile, the version
history SHALL contain exactly N records, each preserving the complete
profile state at the time of that update, with monotonically increasing
version numbers.
Validates: Requirements 3.3
"""
@given(
updates=st.lists(
_exposure_profile_create_strategy(),
min_size=1,
max_size=10,
),
)
@settings(max_examples=100)
def test_version_history_count_and_monotonicity(
self, updates: list[ExposureProfileCreate],
):
"""**Validates: Requirements 3.3**
Given N profile updates, the history must contain exactly N records
with monotonically increasing version numbers (1, 2, …, N), each
preserving the complete profile state, and only the latest version
having active=True.
"""
n = len(updates)
history = _simulate_version_history(updates)
# 1. Exactly N records
assert len(history) == n, (
f"Expected {n} history records, got {len(history)}"
)
# 2. Version numbers are monotonically increasing: 1, 2, …, N
versions = [entry.version for entry in history]
assert versions == list(range(1, n + 1)), (
f"Versions should be [1..{n}], got {versions}"
)
# 3. Each record preserves the complete profile state from that update
for idx, entry in enumerate(history):
original = updates[idx]
stored = entry.profile_data
assert stored.geographic_revenue_mix == original.geographic_revenue_mix, (
f"Version {entry.version}: geographic_revenue_mix mismatch"
)
assert stored.supply_chain_regions == original.supply_chain_regions, (
f"Version {entry.version}: supply_chain_regions mismatch"
)
assert stored.key_input_commodities == original.key_input_commodities, (
f"Version {entry.version}: key_input_commodities mismatch"
)
assert stored.regulatory_jurisdictions == original.regulatory_jurisdictions, (
f"Version {entry.version}: regulatory_jurisdictions mismatch"
)
assert stored.market_position_tier == original.market_position_tier, (
f"Version {entry.version}: market_position_tier mismatch"
)
assert stored.export_dependency_pct == original.export_dependency_pct, (
f"Version {entry.version}: export_dependency_pct mismatch"
)
assert stored.source == original.source, (
f"Version {entry.version}: source mismatch"
)
assert stored.confidence == original.confidence, (
f"Version {entry.version}: confidence mismatch"
)
# 4. Only the latest version (last entry) has active=True
for entry in history[:-1]:
assert entry.active is False, (
f"Version {entry.version} should be archived (active=False)"
)
assert history[-1].active is True, (
f"Latest version {history[-1].version} should be active"
)
# ---------------------------------------------------------------------------
# Imports for Properties 5, 7, 8, 9, 10
# ---------------------------------------------------------------------------
from services.aggregation.interpolation import (
_CAP_TO_TIER,
_DEFAULT_GEO,
_SECTOR_DEFAULT_GEO,
apply_resilience_modifier,
build_default_profile,
compute_macro_impact,
)
from services.shared.schemas import ExposureProfileSchema, MarketPositionTier
# ---------------------------------------------------------------------------
# Shared Hypothesis strategies for interpolation tests
# ---------------------------------------------------------------------------
_VALID_SECTORS = list(_SECTOR_DEFAULT_GEO.keys())
_VALID_CAP_BUCKETS = list(_CAP_TO_TIER.keys())
_SEVERITY_ORDER = ["low", "moderate", "high", "critical"]
def _global_event_strategy(
*,
min_regions: int = 0,
max_regions: int = 5,
min_sectors: int = 0,
max_sectors: int = 4,
min_commodities: int = 0,
max_commodities: int = 4,
severity: st.SearchStrategy[str] | None = None,
event_types: st.SearchStrategy[list[str]] | None = None,
) -> st.SearchStrategy[GlobalEvent]:
"""Generate random valid GlobalEvent instances."""
return st.builds(
GlobalEvent,
event_id=st.uuids().map(str),
event_types=event_types or st.lists(
st.sampled_from(_VALID_IMPACT_TYPES),
min_size=1,
max_size=len(_VALID_IMPACT_TYPES),
),
severity=severity or st.sampled_from(_VALID_SEVERITY_LEVELS),
affected_regions=st.lists(
st.sampled_from(_REGION_CODES),
min_size=min_regions,
max_size=max_regions,
unique=True,
),
affected_sectors=st.lists(
st.sampled_from(_VALID_SECTORS),
min_size=min_sectors,
max_size=max_sectors,
unique=True,
),
affected_commodities=st.lists(
st.sampled_from(_COMMODITIES),
min_size=min_commodities,
max_size=max_commodities,
unique=True,
),
summary=st.text(min_size=1, max_size=100),
key_facts=st.just([]),
estimated_duration=st.sampled_from(_VALID_DURATIONS),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
source_document_id=st.uuids().map(str),
)
def _exposure_profile_schema_strategy(
*,
min_regions: int = 0,
max_regions: int = 5,
min_commodities: int = 0,
max_commodities: int = 4,
tier: st.SearchStrategy | None = None,
) -> st.SearchStrategy[ExposureProfileSchema]:
"""Generate random valid ExposureProfileSchema instances."""
return st.builds(
ExposureProfileSchema,
company_id=st.uuids().map(str),
geographic_revenue_mix=_geo_revenue_mix(),
supply_chain_regions=st.lists(
st.sampled_from(_REGION_CODES),
min_size=min_regions,
max_size=max_regions,
unique=True,
),
key_input_commodities=st.lists(
st.sampled_from(_COMMODITIES),
min_size=min_commodities,
max_size=max_commodities,
unique=True,
),
regulatory_jurisdictions=st.lists(
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
),
market_position_tier=tier or st.sampled_from(list(MarketPositionTier)),
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
source=st.sampled_from(["manual", "inferred"]),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
)
# ---------------------------------------------------------------------------
# Property 5: Default exposure profile derivation
# ---------------------------------------------------------------------------
class TestProperty5DefaultExposureProfileDerivation:
"""Feature: global-news-interpolation, Property 5: Default exposure profile derivation
For any company with a valid sector, industry, and market_cap_bucket but
no manually configured ExposureProfile, the default profile SHALL have a
market_position_tier consistent with the market_cap_bucket mapping
(large_cap → global_leader, mid_cap → multinational, small_cap → regional,
micro_cap → domestic) and SHALL have non-empty geographic_revenue_mix
derived from the sector.
Validates: Requirements 3.2
"""
@given(
sector=st.sampled_from(_VALID_SECTORS + ["UnknownSector"]),
industry=st.text(min_size=1, max_size=30),
market_cap_bucket=st.sampled_from(_VALID_CAP_BUCKETS),
)
@settings(max_examples=100)
def test_default_profile_tier_and_geo_mix(
self,
sector: str,
industry: str,
market_cap_bucket: str,
):
"""**Validates: Requirements 3.2**
The default profile must map market_cap_bucket to the correct
market_position_tier and have a non-empty geographic_revenue_mix.
"""
profile = build_default_profile(sector, industry, market_cap_bucket)
# 1. market_position_tier matches the cap-to-tier mapping
expected_tier = _CAP_TO_TIER[market_cap_bucket]
actual_tier = profile.market_position_tier
if isinstance(actual_tier, MarketPositionTier):
actual_tier = actual_tier.value
assert actual_tier == expected_tier, (
f"For {market_cap_bucket}, expected tier={expected_tier}, got {actual_tier}"
)
# 2. geographic_revenue_mix is non-empty
assert len(profile.geographic_revenue_mix) > 0, (
f"Default profile for sector={sector} has empty geographic_revenue_mix"
)
# 3. geographic_revenue_mix is derived from the sector (known sector
# uses sector-specific map, unknown sector uses _DEFAULT_GEO)
if sector in _SECTOR_DEFAULT_GEO:
expected_geo = _SECTOR_DEFAULT_GEO[sector]
else:
expected_geo = _DEFAULT_GEO
assert set(profile.geographic_revenue_mix.keys()) == set(expected_geo.keys()), (
f"Geo mix keys mismatch for sector={sector}"
)
# 4. source is 'inferred' for default profiles
assert profile.source == "inferred"
# ---------------------------------------------------------------------------
# Property 7: Macro impact score bounds and zero-overlap invariant
# ---------------------------------------------------------------------------
class TestProperty7MacroImpactScoreBoundsAndZeroOverlap:
"""Feature: global-news-interpolation, Property 7: Macro impact score bounds and zero-overlap invariant
For any GlobalEvent and ExposureProfile pair, the computed
Macro_Impact_Score SHALL be in [0, 1]. Furthermore, for any pair where
the event's affected_regions, affected_sectors, and affected_commodities
have zero intersection with the profile's geographic_revenue_mix keys,
supply_chain_regions, and key_input_commodities, the score SHALL be
exactly 0.0.
Validates: Requirements 4.1, 4.4
"""
@given(
event=_global_event_strategy(),
profile=_exposure_profile_schema_strategy(),
)
@settings(max_examples=100)
def test_score_in_bounds(self, event: GlobalEvent, profile: ExposureProfileSchema):
"""**Validates: Requirements 4.1**
The macro impact score must always be in [0, 1].
"""
record = compute_macro_impact(event, profile)
assert 0.0 <= record.macro_impact_score <= 1.0, (
f"Score {record.macro_impact_score} out of bounds [0, 1]"
)
@given(data=st.data())
@settings(max_examples=100)
def test_zero_overlap_produces_zero_score(self, data: st.DataObject):
"""**Validates: Requirements 4.4**
When event regions/sectors/commodities have zero intersection with
the profile, the score must be exactly 0.0.
"""
# Build an event with regions/commodities that do NOT overlap the profile
# Use two disjoint sets of region codes
event_regions = ["ZZ", "YY", "XX"]
event_commodities = ["unobtanium", "vibranium"]
event = data.draw(_global_event_strategy(
min_regions=0,
max_regions=0,
min_commodities=0,
max_commodities=0,
))
# Override with non-overlapping values
event.affected_regions = event_regions
event.affected_commodities = event_commodities
event.affected_sectors = ["NonexistentSector"]
profile = data.draw(_exposure_profile_schema_strategy())
record = compute_macro_impact(event, profile)
assert record.macro_impact_score == 0.0, (
f"Expected score 0.0 for zero-overlap, got {record.macro_impact_score}"
)
# ---------------------------------------------------------------------------
# Property 8: Scoring monotonicity
# ---------------------------------------------------------------------------
class TestProperty8ScoringMonotonicity:
"""Feature: global-news-interpolation, Property 8: Scoring monotonicity
For any GlobalEvent and ExposureProfile pair, increasing the event's
severity level (low → moderate → high → critical) while holding all
other inputs constant SHALL produce a Macro_Impact_Score that is greater
than or equal to the previous score. Similarly, increasing the geographic
overlap percentage SHALL produce a score greater than or equal to the
previous score.
Validates: Requirements 4.2
"""
@given(
event=_global_event_strategy(min_regions=1, min_commodities=1),
profile=_exposure_profile_schema_strategy(min_regions=1, min_commodities=1),
)
@settings(max_examples=100)
def test_severity_monotonicity(
self, event: GlobalEvent, profile: ExposureProfileSchema,
):
"""**Validates: Requirements 4.2**
Increasing severity must produce a score >= the previous score.
"""
scores = []
for sev in _SEVERITY_ORDER:
event.severity = sev
record = compute_macro_impact(event, profile)
scores.append(record.macro_impact_score)
for i in range(1, len(scores)):
assert scores[i] >= scores[i - 1] - 1e-9, (
f"Severity monotonicity violated: "
f"{_SEVERITY_ORDER[i-1]}={scores[i-1]:.6f} > "
f"{_SEVERITY_ORDER[i]}={scores[i]:.6f}"
)
@given(
event=_global_event_strategy(min_regions=1),
profile=_exposure_profile_schema_strategy(),
)
@settings(max_examples=100)
def test_geographic_overlap_monotonicity(
self, event: GlobalEvent, profile: ExposureProfileSchema,
):
"""**Validates: Requirements 4.2**
Increasing geographic overlap must produce a score >= the previous,
holding the resilience modifier constant by keeping is_international
consistent across comparisons.
"""
geo_keys = list(profile.geographic_revenue_mix.keys())
if not geo_keys:
return # nothing to test with empty geo mix
# To isolate geographic overlap monotonicity we must keep
# is_international constant. We always include a non-overlapping
# sentinel region ("ZZ") so len(affected_regions) >= 2 in the
# overlap cases, making is_international=True throughout.
# Score with no geographic overlap (2 non-overlapping regions)
event.affected_regions = ["ZZ", "YY"]
record_none = compute_macro_impact(event, profile)
# Score with partial geographic overlap (first key + sentinel)
event.affected_regions = [geo_keys[0], "ZZ"]
record_partial = compute_macro_impact(event, profile)
# Score with full geographic overlap (all keys + sentinel)
event.affected_regions = geo_keys + ["ZZ"]
record_full = compute_macro_impact(event, profile)
assert record_partial.macro_impact_score >= record_none.macro_impact_score - 1e-9, (
f"Partial overlap ({record_partial.macro_impact_score}) < "
f"no overlap ({record_none.macro_impact_score})"
)
assert record_full.macro_impact_score >= record_partial.macro_impact_score - 1e-9, (
f"Full overlap ({record_full.macro_impact_score}) < "
f"partial overlap ({record_partial.macro_impact_score})"
)
# ---------------------------------------------------------------------------
# Property 9: Resilience modifier tier ordering
# ---------------------------------------------------------------------------
class TestProperty9ResilienceModifierTierOrdering:
"""Feature: global-news-interpolation, Property 9: Resilience modifier tier ordering
For any positive raw impact score and an international event, applying
the resilience modifier with market_position_tier=global_leader SHALL
produce a final score less than or equal to multinational, which SHALL
be less than or equal to regional, which SHALL be less than or equal
to domestic.
Validates: Requirements 4.3
"""
@given(
raw_score=st.floats(min_value=0.01, max_value=1.0, allow_nan=False),
)
@settings(max_examples=100)
def test_tier_ordering_for_international_events(self, raw_score: float):
"""**Validates: Requirements 4.3**
global_leader <= multinational <= regional <= domestic for
international events with positive raw scores.
"""
tier_order = [
MarketPositionTier.GLOBAL_LEADER.value,
MarketPositionTier.MULTINATIONAL.value,
MarketPositionTier.REGIONAL.value,
MarketPositionTier.DOMESTIC.value,
]
scores = [
apply_resilience_modifier(raw_score, tier, event_is_international=True)
for tier in tier_order
]
for i in range(1, len(scores)):
assert scores[i] >= scores[i - 1] - 1e-9, (
f"Tier ordering violated: {tier_order[i-1]}={scores[i-1]:.6f} > "
f"{tier_order[i]}={scores[i]:.6f} (raw={raw_score:.6f})"
)
# ---------------------------------------------------------------------------
# Property 10: Mixed direction for dual-effect events
# ---------------------------------------------------------------------------
class TestProperty10MixedDirectionDualEffectEvents:
"""Feature: global-news-interpolation, Property 10: Mixed direction for dual-effect events
For any GlobalEvent and ExposureProfile pair where the computation
identifies both positive and negative contributing factors, the resulting
impact_direction SHALL be 'mixed' and both positive and negative factors
SHALL be preserved separately in contributing_factors.
Validates: Requirements 4.6
"""
@given(
profile=_exposure_profile_schema_strategy(
min_regions=1, min_commodities=1,
),
)
@settings(max_examples=100)
def test_dual_effect_produces_mixed_direction(
self, profile: ExposureProfileSchema,
):
"""**Validates: Requirements 4.6**
An event with both positive and negative impact types that overlaps
the profile must produce direction='mixed' with both factor lists.
"""
# Pick one positive and one negative event type to guarantee both
positive_type = "demand_shift"
negative_type = "supply_disruption"
# Ensure the event overlaps the profile geographically
geo_keys = list(profile.geographic_revenue_mix.keys())
if not geo_keys:
return
event = GlobalEvent(
event_id=str(_uuid.uuid4()),
event_types=[positive_type, negative_type],
severity="moderate",
affected_regions=geo_keys[:2] if len(geo_keys) >= 2 else geo_keys,
affected_sectors=[],
affected_commodities=profile.key_input_commodities[:1] if profile.key_input_commodities else [],
summary="Dual-effect test event",
key_facts=[],
estimated_duration="short_term",
confidence=0.8,
source_document_id=str(_uuid.uuid4()),
)
record = compute_macro_impact(event, profile)
# Only check direction if there's actual overlap (non-zero score)
if record.macro_impact_score > 0.0:
assert record.impact_direction == "mixed", (
f"Expected direction='mixed' for dual-effect event, "
f"got '{record.impact_direction}'"
)
# Both positive and negative factors must be present
factors_str = " ".join(record.contributing_factors)
assert "positive_types:" in factors_str, (
f"Missing positive_types in contributing_factors: {record.contributing_factors}"
)
assert "negative_types:" in factors_str, (
f"Missing negative_types in contributing_factors: {record.contributing_factors}"
)
# ---------------------------------------------------------------------------
# Imports for Properties 11, 12, 13, 14
# ---------------------------------------------------------------------------
from datetime import timezone
from services.aggregation.scoring import SignalWeight, WeightedSignal
from services.aggregation.worker import (
ImpactRow,
assemble_trend_summary,
)
# ---------------------------------------------------------------------------
# Shared strategies for aggregation-level property tests
# ---------------------------------------------------------------------------
def _make_signal_weight(
combined: float,
recency: float = 0.9,
credibility: float = 0.8,
) -> SignalWeight:
"""Helper to build a SignalWeight with sensible defaults."""
return SignalWeight(
recency=recency,
credibility=credibility,
novelty_bonus=0.0,
confidence_gate=1.0,
market_ctx_multiplier=1.0,
combined=combined,
)
def _company_signal_strategy(
*,
sentiment: st.SearchStrategy[float] | None = None,
min_impact: float = 0.1,
) -> st.SearchStrategy[WeightedSignal]:
"""Generate a company-specific WeightedSignal."""
sent_st = sentiment if sentiment is not None else st.sampled_from([1.0, -1.0])
return st.builds(
WeightedSignal,
document_id=st.uuids().map(lambda u: f"company-doc-{u}"),
weight=st.builds(
_make_signal_weight,
combined=st.floats(min_value=0.3, max_value=1.0, allow_nan=False),
recency=st.floats(min_value=0.5, max_value=1.0, allow_nan=False),
credibility=st.floats(min_value=0.5, max_value=1.0, allow_nan=False),
),
sentiment_value=sent_st,
impact_score=st.floats(min_value=min_impact, max_value=1.0, allow_nan=False),
)
def _macro_signal_strategy(
*,
sentiment: st.SearchStrategy[float] | None = None,
doc_id_prefix: str = "macro-doc",
min_impact: float = 0.05,
) -> st.SearchStrategy[WeightedSignal]:
"""Generate a macro WeightedSignal."""
sent_st = sentiment if sentiment is not None else st.sampled_from([1.0, -1.0])
return st.builds(
WeightedSignal,
document_id=st.uuids().map(lambda u: f"{doc_id_prefix}-{u}"),
weight=st.builds(
_make_signal_weight,
combined=st.floats(min_value=0.2, max_value=1.0, allow_nan=False),
recency=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
credibility=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
),
sentiment_value=sent_st,
impact_score=st.floats(min_value=min_impact, max_value=0.5, allow_nan=False),
)
def _impact_row_from_signal(sig: WeightedSignal) -> ImpactRow:
"""Build a minimal ImpactRow matching a WeightedSignal's document_id."""
return ImpactRow(
document_id=sig.document_id,
confidence=sig.weight.credibility,
novelty_score=0.5,
source_credibility=sig.weight.credibility,
sentiment="positive" if sig.sentiment_value > 0 else "negative",
impact_score=sig.impact_score,
catalyst_type="macro",
key_facts=[],
risks=[],
published_at=datetime.now(tz=timezone.utc),
)
# ---------------------------------------------------------------------------
# Property 11: Macro signals influence trend output
# ---------------------------------------------------------------------------
class TestProperty11MacroSignalsInfluenceTrendOutput:
"""Feature: global-news-interpolation, Property 11: Macro signals influence trend output
For any company with both company-specific signals and non-zero macro
impact signals, the trend summary computed with macro signals included
SHALL differ from the trend summary computed with only company-specific
signals (in at least one of: trend_strength, confidence, or evidence
references).
Validates: Requirements 5.1
"""
@given(
company_signals=st.lists(
_company_signal_strategy(sentiment=st.just(1.0)),
min_size=1,
max_size=5,
),
macro_signals=st.lists(
_macro_signal_strategy(sentiment=st.just(-1.0), min_impact=0.1),
min_size=1,
max_size=3,
),
)
@settings(max_examples=100)
def test_macro_signals_change_trend_output(
self,
company_signals: list[WeightedSignal],
macro_signals: list[WeightedSignal],
):
"""**Validates: Requirements 5.1**
Adding macro signals to company-only signals must change at least
one of trend_strength, confidence, or evidence references.
"""
ref_time = datetime.now(tz=timezone.utc)
# Build ImpactRow stubs for company signals only
impacts = [_impact_row_from_signal(s) for s in company_signals]
# Company-only trend
company_only = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=list(company_signals),
impacts=impacts,
reference_time=ref_time,
)
# Combined trend (company + macro)
combined_signals = list(company_signals) + list(macro_signals)
combined = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=combined_signals,
impacts=impacts,
reference_time=ref_time,
)
# At least one of these must differ
differs = (
company_only.trend_strength != combined.trend_strength
or company_only.confidence != combined.confidence
or company_only.top_supporting_evidence != combined.top_supporting_evidence
or company_only.top_opposing_evidence != combined.top_opposing_evidence
or company_only.contradiction_score != combined.contradiction_score
)
assert differs, (
f"Macro signals had no effect on trend output. "
f"Company-only: strength={company_only.trend_strength}, "
f"confidence={company_only.confidence}, "
f"contradiction={company_only.contradiction_score}. "
f"Combined: strength={combined.trend_strength}, "
f"confidence={combined.confidence}, "
f"contradiction={combined.contradiction_score}."
)
# ---------------------------------------------------------------------------
# Property 12: Macro-company contradiction detection
# ---------------------------------------------------------------------------
class TestProperty12MacroCompanyContradictionDetection:
"""Feature: global-news-interpolation, Property 12: Macro-company contradiction detection
For any set of signals where macro impact signals have a negative
direction and company-specific signals have a positive sentiment
(or vice versa), the resulting trend summary's contradiction_score
SHALL be greater than zero and disagreement_details SHALL contain
at least one entry.
Validates: Requirements 5.3
"""
@given(
company_signals=st.lists(
_company_signal_strategy(sentiment=st.just(1.0), min_impact=0.2),
min_size=1,
max_size=5,
),
macro_signals=st.lists(
_macro_signal_strategy(sentiment=st.just(-1.0), min_impact=0.1),
min_size=1,
max_size=3,
),
)
@settings(max_examples=100)
def test_opposing_macro_company_signals_produce_contradiction(
self,
company_signals: list[WeightedSignal],
macro_signals: list[WeightedSignal],
):
"""**Validates: Requirements 5.3**
When company signals are positive and macro signals are negative,
contradiction_score must be > 0 and disagreement_details non-empty.
"""
ref_time = datetime.now(tz=timezone.utc)
impacts = [_impact_row_from_signal(s) for s in company_signals]
combined_signals = list(company_signals) + list(macro_signals)
summary = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=combined_signals,
impacts=impacts,
reference_time=ref_time,
)
assert summary.contradiction_score > 0.0, (
f"Expected contradiction_score > 0 for opposing signals, "
f"got {summary.contradiction_score}"
)
assert len(summary.disagreement_details) >= 1, (
f"Expected at least one disagreement_detail entry, "
f"got {len(summary.disagreement_details)}"
)
@given(
company_signals=st.lists(
_company_signal_strategy(sentiment=st.just(-1.0), min_impact=0.2),
min_size=1,
max_size=5,
),
macro_signals=st.lists(
_macro_signal_strategy(sentiment=st.just(1.0), min_impact=0.1),
min_size=1,
max_size=3,
),
)
@settings(max_examples=100)
def test_opposing_macro_positive_company_negative_contradiction(
self,
company_signals: list[WeightedSignal],
macro_signals: list[WeightedSignal],
):
"""**Validates: Requirements 5.3**
When company signals are negative and macro signals are positive,
contradiction_score must be > 0 and disagreement_details non-empty.
"""
ref_time = datetime.now(tz=timezone.utc)
impacts = [_impact_row_from_signal(s) for s in company_signals]
combined_signals = list(company_signals) + list(macro_signals)
summary = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=combined_signals,
impacts=impacts,
reference_time=ref_time,
)
assert summary.contradiction_score > 0.0, (
f"Expected contradiction_score > 0 for opposing signals, "
f"got {summary.contradiction_score}"
)
assert len(summary.disagreement_details) >= 1, (
f"Expected at least one disagreement_detail entry, "
f"got {len(summary.disagreement_details)}"
)
# ---------------------------------------------------------------------------
# Property 13: Macro evidence traceability
# ---------------------------------------------------------------------------
class TestProperty13MacroEvidenceTraceability:
"""Feature: global-news-interpolation, Property 13: Macro evidence traceability
For any trend summary that includes macro signal contributions, the
top_supporting_evidence or top_opposing_evidence lists SHALL contain
the source_document_id of at least one contributing GlobalEvent.
Validates: Requirements 5.4
"""
@given(
company_signals=st.lists(
_company_signal_strategy(sentiment=st.just(1.0)),
min_size=1,
max_size=3,
),
macro_signals=st.lists(
_macro_signal_strategy(sentiment=st.sampled_from([1.0, -1.0]), min_impact=0.1),
min_size=1,
max_size=3,
),
)
@settings(max_examples=100)
def test_macro_document_ids_appear_in_evidence(
self,
company_signals: list[WeightedSignal],
macro_signals: list[WeightedSignal],
):
"""**Validates: Requirements 5.4**
At least one macro signal's document_id must appear in either
top_supporting_evidence or top_opposing_evidence.
"""
ref_time = datetime.now(tz=timezone.utc)
impacts = [_impact_row_from_signal(s) for s in company_signals]
combined_signals = list(company_signals) + list(macro_signals)
summary = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=combined_signals,
impacts=impacts,
reference_time=ref_time,
)
macro_doc_ids = {s.document_id for s in macro_signals}
all_evidence = set(summary.top_supporting_evidence) | set(summary.top_opposing_evidence)
found = macro_doc_ids & all_evidence
assert len(found) >= 1, (
f"No macro document_id found in evidence lists. "
f"Macro IDs: {macro_doc_ids}, "
f"Supporting: {summary.top_supporting_evidence}, "
f"Opposing: {summary.top_opposing_evidence}"
)
# ---------------------------------------------------------------------------
# Property 14: No degradation without macro data and disabled-layer equivalence
# ---------------------------------------------------------------------------
class TestProperty14NoDegradationWithoutMacroData:
"""Feature: global-news-interpolation, Property 14: No degradation without macro data and disabled-layer equivalence
For any company with no macro impact records in the aggregation window,
the trend summary produced with the macro layer enabled SHALL be
identical to the trend summary produced with the macro layer disabled.
Furthermore, for any aggregation run with the macro layer disabled,
the output SHALL be identical to company-only aggregation regardless
of existing macro data.
Validates: Requirements 5.5, 11.2
"""
@given(
company_signals=st.lists(
_company_signal_strategy(),
min_size=1,
max_size=5,
),
)
@settings(max_examples=100)
def test_no_macro_data_produces_identical_output(
self,
company_signals: list[WeightedSignal],
):
"""**Validates: Requirements 5.5**
With no macro signals, the trend summary must be identical
regardless of whether the macro layer is conceptually enabled
or disabled — both paths receive the same company-only signals.
"""
ref_time = datetime.now(tz=timezone.utc)
impacts = [_impact_row_from_signal(s) for s in company_signals]
# "Macro enabled" path — but no macro signals exist
summary_enabled = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=list(company_signals),
impacts=impacts,
reference_time=ref_time,
)
# "Macro disabled" path — same company-only signals
summary_disabled = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=list(company_signals),
impacts=impacts,
reference_time=ref_time,
)
assert summary_enabled.trend_direction == summary_disabled.trend_direction
assert summary_enabled.trend_strength == summary_disabled.trend_strength
assert summary_enabled.confidence == summary_disabled.confidence
assert summary_enabled.contradiction_score == summary_disabled.contradiction_score
assert summary_enabled.top_supporting_evidence == summary_disabled.top_supporting_evidence
assert summary_enabled.top_opposing_evidence == summary_disabled.top_opposing_evidence
assert summary_enabled.dominant_catalysts == summary_disabled.dominant_catalysts
assert summary_enabled.material_risks == summary_disabled.material_risks
@given(
company_signals=st.lists(
_company_signal_strategy(),
min_size=1,
max_size=5,
),
macro_signals=st.lists(
_macro_signal_strategy(min_impact=0.1),
min_size=1,
max_size=3,
),
)
@settings(max_examples=100)
def test_disabled_layer_ignores_macro_signals(
self,
company_signals: list[WeightedSignal],
macro_signals: list[WeightedSignal],
):
"""**Validates: Requirements 11.2**
When the macro layer is disabled, the output must be identical
to company-only aggregation even if macro data exists. We simulate
"disabled" by not passing macro signals to assemble_trend_summary.
"""
ref_time = datetime.now(tz=timezone.utc)
impacts = [_impact_row_from_signal(s) for s in company_signals]
# Company-only (macro layer disabled — macro signals excluded)
summary_disabled = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=list(company_signals),
impacts=impacts,
reference_time=ref_time,
)
# Company-only baseline (no macro signals at all)
summary_baseline = assemble_trend_summary(
ticker="TEST",
window="7d",
signals=list(company_signals),
impacts=impacts,
reference_time=ref_time,
)
assert summary_disabled.trend_direction == summary_baseline.trend_direction
assert summary_disabled.trend_strength == summary_baseline.trend_strength
assert summary_disabled.confidence == summary_baseline.confidence
assert summary_disabled.contradiction_score == summary_baseline.contradiction_score
assert summary_disabled.top_supporting_evidence == summary_baseline.top_supporting_evidence
assert summary_disabled.top_opposing_evidence == summary_baseline.top_opposing_evidence
assert summary_disabled.dominant_catalysts == summary_baseline.dominant_catalysts
assert summary_disabled.material_risks == summary_baseline.material_risks
# ---------------------------------------------------------------------------
# Imports for Property 15
# ---------------------------------------------------------------------------
from services.aggregation.rollups import (
SECTOR_CONCENTRATION_THRESHOLD,
CompanyTrendRow,
SectorMacroImpact,
compute_sector_macro_concentration,
rollup_trends,
)
# ---------------------------------------------------------------------------
# Hypothesis strategies for rollup property tests
# ---------------------------------------------------------------------------
_ROLLUP_SECTORS = ["Technology", "Energy", "Healthcare", "Financials", "Industrials"]
def _company_trend_row_strategy(
*,
sector: str | None = None,
direction: str | None = None,
) -> st.SearchStrategy[CompanyTrendRow]:
"""Generate a random CompanyTrendRow for rollup tests."""
return st.builds(
CompanyTrendRow,
entity_id=st.from_regex(r"[A-Z]{2,5}", fullmatch=True),
sector=st.just(sector) if sector else st.sampled_from(_ROLLUP_SECTORS),
window=st.just("7d"),
trend_direction=st.just(direction) if direction else st.sampled_from(
["bullish", "bearish", "neutral", "mixed"]
),
trend_strength=st.floats(min_value=0.1, max_value=0.8, allow_nan=False),
confidence=st.floats(min_value=0.3, max_value=0.8, allow_nan=False),
contradiction_score=st.floats(min_value=0.0, max_value=0.5, allow_nan=False),
dominant_catalysts=st.just([]),
material_risks=st.just([]),
top_supporting_evidence=st.just([]),
top_opposing_evidence=st.just([]),
)
def _sector_macro_impact_strategy(
*,
sector: str | None = None,
min_total: float = 0.1,
max_total: float = 5.0,
) -> st.SearchStrategy[SectorMacroImpact]:
"""Generate a random SectorMacroImpact."""
return st.builds(
SectorMacroImpact,
sector=st.just(sector) if sector else st.sampled_from(_ROLLUP_SECTORS),
total_impact=st.floats(min_value=min_total, max_value=max_total, allow_nan=False),
avg_impact=st.floats(min_value=0.05, max_value=1.0, allow_nan=False),
company_count=st.integers(min_value=1, max_value=20),
net_direction=st.floats(min_value=-1.0, max_value=1.0, allow_nan=False),
event_ids=st.lists(st.uuids().map(str), min_size=1, max_size=3),
)
# ---------------------------------------------------------------------------
# Property 15: Sector and market rollup macro incorporation
# ---------------------------------------------------------------------------
class TestProperty15SectorAndMarketRollupMacroIncorporation:
"""Feature: global-news-interpolation, Property 15: Sector and market rollup macro incorporation
For any sector containing companies with non-zero macro impact scores,
the sector-level rollup SHALL reflect those macro signals in its
trend_strength or confidence. Furthermore, for any GlobalEvent that
disproportionately affects a single sector (>60% of total macro impact
concentrated in one sector), that sector SHALL appear in the market-level
rollup's material_risks or dominant_catalysts.
Validates: Requirements 6.1, 6.2, 6.3
"""
@given(
trends=st.lists(
_company_trend_row_strategy(sector="Technology"),
min_size=1,
max_size=5,
),
macro_impact=_sector_macro_impact_strategy(sector="Technology", min_total=0.5),
)
@settings(max_examples=100)
def test_sector_rollup_reflects_macro_signals(
self,
trends: list[CompanyTrendRow],
macro_impact: SectorMacroImpact,
):
"""**Validates: Requirements 6.1**
A sector rollup with macro data must differ from one without
in trend_strength or confidence.
"""
ref_time = datetime.now(tz=timezone.utc)
sector = "Technology"
# Rollup without macro impacts
summary_without = rollup_trends(
trends=trends,
entity_type="sector",
entity_id=sector,
window="7d",
reference_time=ref_time,
macro_impacts=None,
)
# Rollup with macro impacts
macro_impacts = {sector: macro_impact}
summary_with = rollup_trends(
trends=trends,
entity_type="sector",
entity_id=sector,
window="7d",
reference_time=ref_time,
macro_impacts=macro_impacts,
)
# At least one of strength or confidence must differ
differs = (
summary_with.trend_strength != summary_without.trend_strength
or summary_with.confidence != summary_without.confidence
)
assert differs, (
f"Sector rollup with macro data is identical to without. "
f"Without: strength={summary_without.trend_strength}, "
f"confidence={summary_without.confidence}. "
f"With: strength={summary_with.trend_strength}, "
f"confidence={summary_with.confidence}."
)
@given(data=st.data())
@settings(max_examples=100)
def test_concentrated_sector_appears_in_market_rollup(self, data: st.DataObject):
"""**Validates: Requirements 6.2, 6.3**
When one sector has >60% of total macro impact, that sector must
appear in the market-level rollup's material_risks or
dominant_catalysts.
"""
ref_time = datetime.now(tz=timezone.utc)
# Pick a dominant sector and generate trends across multiple sectors
dominant_sector = data.draw(st.sampled_from(_ROLLUP_SECTORS))
other_sectors = [s for s in _ROLLUP_SECTORS if s != dominant_sector]
# Generate at least one trend per sector so rollup has data
all_trends: list[CompanyTrendRow] = []
dominant_trends = data.draw(st.lists(
_company_trend_row_strategy(sector=dominant_sector),
min_size=1,
max_size=3,
))
all_trends.extend(dominant_trends)
for sec in other_sectors[:2]:
sec_trends = data.draw(st.lists(
_company_trend_row_strategy(sector=sec),
min_size=1,
max_size=2,
))
all_trends.extend(sec_trends)
# Build macro impacts where dominant sector has >60% of total
# Give dominant sector a large impact, others small
dominant_total = data.draw(
st.floats(min_value=3.0, max_value=10.0, allow_nan=False)
)
# Other sectors share the remaining <40%
# max other total = dominant_total * (0.39 / 0.61) to ensure >60%
max_other_per_sector = dominant_total * 0.15
macro_impacts: dict[str, SectorMacroImpact] = {}
macro_impacts[dominant_sector] = SectorMacroImpact(
sector=dominant_sector,
total_impact=dominant_total,
avg_impact=dominant_total / max(len(dominant_trends), 1),
company_count=len(dominant_trends),
net_direction=data.draw(
st.floats(min_value=-1.0, max_value=1.0, allow_nan=False)
),
event_ids=["evt-1"],
)
for sec in other_sectors[:2]:
other_total = data.draw(
st.floats(min_value=0.01, max_value=max(max_other_per_sector, 0.02),
allow_nan=False)
)
macro_impacts[sec] = SectorMacroImpact(
sector=sec,
total_impact=other_total,
avg_impact=other_total,
company_count=1,
net_direction=0.0,
event_ids=["evt-2"],
)
# Verify concentration is indeed >60%
concentration = compute_sector_macro_concentration(macro_impacts)
dominant_fraction = next(
(frac for sec, frac in concentration if sec == dominant_sector), 0.0
)
# If our generation didn't produce >60%, skip (shouldn't happen with our constraints)
if dominant_fraction <= SECTOR_CONCENTRATION_THRESHOLD:
return
# Compute market-level rollup
summary = rollup_trends(
trends=all_trends,
entity_type="market",
entity_id="all",
window="7d",
reference_time=ref_time,
macro_impacts=macro_impacts,
)
# The dominant sector must appear in material_risks or dominant_catalysts
all_labels = summary.material_risks + summary.dominant_catalysts
found = any(dominant_sector in label for label in all_labels)
assert found, (
f"Dominant sector '{dominant_sector}' (fraction={dominant_fraction:.2%}) "
f"not found in material_risks or dominant_catalysts. "
f"material_risks={summary.material_risks}, "
f"dominant_catalysts={summary.dominant_catalysts}"
)
# ---------------------------------------------------------------------------
# Imports for Properties 20, 21, 22, 23
# ---------------------------------------------------------------------------
from services.aggregation.projection import (
DEFAULT_CONFIDENCE_THRESHOLD,
MacroEventInfo,
TrendProjection,
compute_projection,
)
from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow
# ---------------------------------------------------------------------------
# Hypothesis strategies for projection property tests
# ---------------------------------------------------------------------------
_VALID_TREND_DIRECTIONS = [d for d in TrendDirection]
_VALID_TREND_WINDOWS = [w for w in TrendWindow]
_VALID_ESTIMATED_DURATIONS = ["short_term", "medium_term", "long_term"]
_VALID_MACRO_DIRECTIONS = ["positive", "negative", "mixed", "neutral"]
_VALID_SEVERITIES_PROJ = ["low", "moderate", "high", "critical"]
def _trend_summary_strategy(
*,
direction: st.SearchStrategy[TrendDirection] | None = None,
min_strength: float = 0.1,
max_strength: float = 1.0,
min_confidence: float = 0.1,
max_confidence: float = 1.0,
) -> st.SearchStrategy[TrendSummary]:
"""Generate random valid TrendSummary instances for projection tests."""
return st.builds(
TrendSummary,
entity_type=st.just("company"),
entity_id=st.from_regex(r"[A-Z]{2,5}", fullmatch=True),
window=st.sampled_from(_VALID_TREND_WINDOWS),
trend_direction=direction or st.sampled_from(_VALID_TREND_DIRECTIONS),
trend_strength=st.floats(
min_value=min_strength, max_value=max_strength, allow_nan=False,
),
confidence=st.floats(
min_value=min_confidence, max_value=max_confidence, allow_nan=False,
),
top_supporting_evidence=st.just([]),
top_opposing_evidence=st.just([]),
dominant_catalysts=st.just([]),
material_risks=st.just([]),
contradiction_score=st.just(0.0),
disagreement_details=st.just([]),
)
def _macro_event_info_strategy(
*,
min_score: float = 0.1,
max_score: float = 1.0,
) -> st.SearchStrategy[MacroEventInfo]:
"""Generate random valid MacroEventInfo instances for projection tests."""
return st.builds(
MacroEventInfo,
event_id=st.uuids().map(str),
macro_impact_score=st.floats(
min_value=min_score, max_value=max_score, allow_nan=False,
),
impact_direction=st.sampled_from(_VALID_MACRO_DIRECTIONS),
confidence=st.floats(min_value=0.1, max_value=1.0, allow_nan=False),
estimated_duration=st.sampled_from(_VALID_ESTIMATED_DURATIONS),
severity=st.sampled_from(_VALID_SEVERITIES_PROJ),
event_age_hours=st.floats(min_value=0.0, max_value=720.0, allow_nan=False),
)
# ---------------------------------------------------------------------------
# Property 20: Trend projection always produced
# ---------------------------------------------------------------------------
class TestProperty20TrendProjectionAlwaysProduced:
"""Feature: global-news-interpolation, Property 20: Trend projection always produced
For any trend summary produced by the Aggregation_Engine, a corresponding
TrendProjection SHALL also be produced with valid projected_direction,
projected_strength in [0, 1], projected_confidence in [0, 1], and a
non-empty driving_factors list.
Validates: Requirements 12.1
"""
@given(
summary=_trend_summary_strategy(),
macro_events=st.lists(
_macro_event_info_strategy(),
min_size=0,
max_size=5,
),
macro_enabled=st.booleans(),
)
@settings(max_examples=100)
def test_projection_always_produced_with_valid_fields(
self,
summary: TrendSummary,
macro_events: list[MacroEventInfo],
macro_enabled: bool,
):
"""**Validates: Requirements 12.1**
compute_projection must always return a TrendProjection with valid
projected_direction, projected_strength in [0, 1],
projected_confidence in [0, 1], and non-empty driving_factors.
"""
projection = compute_projection(
summary=summary,
macro_events=macro_events if macro_events else None,
macro_enabled=macro_enabled,
)
# Must be a TrendProjection instance
assert isinstance(projection, TrendProjection)
# projected_direction must be a valid direction
assert projection.projected_direction in {"bullish", "bearish", "mixed", "neutral"}, (
f"Invalid projected_direction: {projection.projected_direction}"
)
# projected_strength in [0, 1]
assert 0.0 <= projection.projected_strength <= 1.0, (
f"projected_strength {projection.projected_strength} out of bounds [0, 1]"
)
# projected_confidence in [0, 1]
assert 0.0 <= projection.projected_confidence <= 1.0, (
f"projected_confidence {projection.projected_confidence} out of bounds [0, 1]"
)
# driving_factors must be non-empty
assert len(projection.driving_factors) >= 1, (
"driving_factors is empty; must contain at least one entry"
)
# ---------------------------------------------------------------------------
# Property 21: Projection divergence flagging
# ---------------------------------------------------------------------------
class TestProperty21ProjectionDivergenceFlagging:
"""Feature: global-news-interpolation, Property 21: Projection divergence flagging
For any TrendProjection where projected_direction differs from the
current trend summary's trend_direction, the diverges_from_current
field SHALL be True and driving_factors SHALL contain at least one
entry explaining the divergence.
Validates: Requirements 12.3
"""
@given(
summary=_trend_summary_strategy(),
macro_events=st.lists(
_macro_event_info_strategy(),
min_size=0,
max_size=5,
),
macro_enabled=st.booleans(),
)
@settings(max_examples=100)
def test_divergence_flagged_when_directions_differ(
self,
summary: TrendSummary,
macro_events: list[MacroEventInfo],
macro_enabled: bool,
):
"""**Validates: Requirements 12.3**
When projected_direction != current trend_direction,
diverges_from_current must be True and driving_factors must
contain at least one entry mentioning the divergence.
"""
projection = compute_projection(
summary=summary,
macro_events=macro_events if macro_events else None,
macro_enabled=macro_enabled,
)
current_dir = summary.trend_direction.value
if projection.projected_direction != current_dir:
assert projection.diverges_from_current is True, (
f"diverges_from_current should be True when "
f"projected={projection.projected_direction} != "
f"current={current_dir}"
)
# At least one driving factor must mention the divergence
divergence_mentioned = any(
"DIVERGENCE" in f or "diverge" in f.lower()
for f in projection.driving_factors
)
assert divergence_mentioned, (
f"No divergence explanation in driving_factors when "
f"projected={projection.projected_direction} != "
f"current={current_dir}. "
f"driving_factors={projection.driving_factors}"
)
else:
# When directions match, diverges_from_current must be False
assert projection.diverges_from_current is False, (
f"diverges_from_current should be False when "
f"projected={projection.projected_direction} == "
f"current={current_dir}"
)
# ---------------------------------------------------------------------------
# Property 22: Macro-disabled projections have reduced confidence
# ---------------------------------------------------------------------------
class TestProperty22MacroDisabledProjectionsReducedConfidence:
"""Feature: global-news-interpolation, Property 22: Macro-disabled projections have reduced confidence
For any identical set of company signals and macro signals, the
TrendProjection computed with the macro layer disabled SHALL have
projected_confidence less than or equal to the projection computed
with the macro layer enabled.
Validates: Requirements 12.4
"""
@given(
summary=_trend_summary_strategy(min_confidence=0.2),
macro_events=st.lists(
_macro_event_info_strategy(min_score=0.1),
min_size=1,
max_size=5,
),
)
@settings(max_examples=100)
def test_disabled_macro_has_lower_or_equal_confidence(
self,
summary: TrendSummary,
macro_events: list[MacroEventInfo],
):
"""**Validates: Requirements 12.4**
With macro layer disabled, projected_confidence must be <=
the confidence computed with macro layer enabled.
"""
projection_enabled = compute_projection(
summary=summary,
macro_events=macro_events,
macro_enabled=True,
)
projection_disabled = compute_projection(
summary=summary,
macro_events=macro_events,
macro_enabled=False,
)
assert projection_disabled.projected_confidence <= projection_enabled.projected_confidence + 1e-9, (
f"Disabled confidence ({projection_disabled.projected_confidence}) > "
f"enabled confidence ({projection_enabled.projected_confidence}) "
f"for summary confidence={summary.confidence}"
)
# ---------------------------------------------------------------------------
# Property 23: Low-confidence projection exclusion
# ---------------------------------------------------------------------------
class TestProperty23LowConfidenceProjectionExclusion:
"""Feature: global-news-interpolation, Property 23: Low-confidence projection exclusion
For any TrendProjection with projected_confidence below the configurable
threshold (default 0.3), the projection SHALL be marked as low_confidence
and SHALL NOT influence recommendation eligibility.
Validates: Requirements 12.9
"""
@given(
summary=_trend_summary_strategy(),
macro_events=st.lists(
_macro_event_info_strategy(),
min_size=0,
max_size=5,
),
macro_enabled=st.booleans(),
threshold=st.floats(min_value=0.1, max_value=0.9, allow_nan=False),
)
@settings(max_examples=100)
def test_low_confidence_projection_marked_correctly(
self,
summary: TrendSummary,
macro_events: list[MacroEventInfo],
macro_enabled: bool,
threshold: float,
):
"""**Validates: Requirements 12.9**
When projected_confidence < threshold, low_confidence must be True.
When projected_confidence >= threshold, low_confidence must be False.
"""
projection = compute_projection(
summary=summary,
macro_events=macro_events if macro_events else None,
macro_enabled=macro_enabled,
confidence_threshold=threshold,
)
if projection.projected_confidence < threshold:
assert projection.low_confidence is True, (
f"low_confidence should be True when "
f"projected_confidence={projection.projected_confidence} < "
f"threshold={threshold}"
)
else:
assert projection.low_confidence is False, (
f"low_confidence should be False when "
f"projected_confidence={projection.projected_confidence} >= "
f"threshold={threshold}"
)
# ---------------------------------------------------------------------------
# Imports for Properties 16, 17, 18, 19
# ---------------------------------------------------------------------------
from services.aggregation.interpolation import (
DEFAULT_CONFIDENCE_THRESHOLD,
apply_accelerated_decay,
compute_standard_recency_decay,
filter_low_confidence_events,
)
from services.extractor.exposure_inference import infer_exposure_profile
from services.recommendation.suppression import (
evaluate_macro_only_suppression,
)
from services.shared.schemas import (
CatalystType,
CompanyImpact,
DocumentIntelligence,
DocumentType,
)
from services.shared.schemas import (
Sentiment as SentimentEnum,
)
# ---------------------------------------------------------------------------
# Hypothesis strategies for exposure inference tests
# ---------------------------------------------------------------------------
_FILING_TYPES = ["filing", "transcript"]
_INFERENCE_SECTORS = [
"Information Technology", "Energy", "Materials", "Industrials",
"Health Care", "Financials", "Consumer Discretionary",
]
_INFERENCE_INDUSTRIES = ["Software", "Oil & Gas", "Banking", "Machinery", "Pharma"]
_INFERENCE_CAP_BUCKETS = ["large_cap", "mid_cap", "small_cap", "micro_cap"]
# Region and commodity text fragments for generating filing content
_GEO_FRAGMENTS = [
"United States", "China", "Japan", "Germany", "United Kingdom",
"India", "Brazil", "South Korea", "Canada", "Australia",
"Europe", "Asia Pacific", "Latin America",
]
_COMMODITY_FRAGMENTS = [
"crude oil", "natural gas", "copper", "steel", "lithium",
"semiconductor", "wheat", "corn", "gold", "aluminum",
]
def _filing_intelligence_strategy(
*,
min_geo_fragments: int = 1,
min_commodity_fragments: int = 0,
) -> st.SearchStrategy[DocumentIntelligence]:
"""Generate a DocumentIntelligence with filing-type and geographic/commodity content."""
return st.builds(
lambda doc_type, geo_frags, commodity_frags, extra_facts: DocumentIntelligence(
document_type=DocumentType(doc_type),
summary=" ".join(
[f"Revenue from {g} grew significantly." for g in geo_frags]
+ [f"{c} prices impacted margins." for c in commodity_frags]
),
companies=[
CompanyImpact(
ticker="TEST",
company_name="Test Corp",
relevance=0.8,
sentiment=SentimentEnum.NEUTRAL,
impact_score=0.5,
impact_horizon="medium_term",
catalyst_type=CatalystType.EARNINGS,
key_facts=extra_facts,
)
] if extra_facts else [],
macro_themes=[],
confidence=0.7,
),
doc_type=st.sampled_from(_FILING_TYPES),
geo_frags=st.lists(
st.sampled_from(_GEO_FRAGMENTS),
min_size=min_geo_fragments,
max_size=5,
),
commodity_frags=st.lists(
st.sampled_from(_COMMODITY_FRAGMENTS),
min_size=min_commodity_fragments,
max_size=3,
),
extra_facts=st.lists(
st.sampled_from([
f"Operations in {g}" for g in _GEO_FRAGMENTS
] + [
f"{c} supply chain disruption" for c in _COMMODITY_FRAGMENTS
]),
min_size=0,
max_size=3,
),
)
# ---------------------------------------------------------------------------
# Property 16: Inferred exposure profile correctness
# ---------------------------------------------------------------------------
class TestProperty16InferredExposureProfileCorrectness:
"""Feature: global-news-interpolation, Property 16: Inferred exposure profile correctness
For any set of filing extractions containing geographic revenue breakdowns
or commodity references, the inferred ExposureProfile SHALL have
source='inferred', confidence in [0, 1], and geographic_revenue_mix
entries that correspond to regions mentioned in the filings.
Validates: Requirements 9.1, 9.2
"""
@given(
filings=st.lists(
_filing_intelligence_strategy(min_geo_fragments=1),
min_size=1,
max_size=5,
),
sector=st.sampled_from(_INFERENCE_SECTORS),
industry=st.sampled_from(_INFERENCE_INDUSTRIES),
cap_bucket=st.sampled_from(_INFERENCE_CAP_BUCKETS),
)
@settings(max_examples=100)
def test_inferred_profile_source_and_confidence(
self,
filings: list[DocumentIntelligence],
sector: str,
industry: str,
cap_bucket: str,
):
"""**Validates: Requirements 9.1, 9.2**
The inferred profile must have source='inferred' and confidence in [0, 1].
Geographic revenue mix entries must correspond to regions mentioned
in the filings.
"""
profile = infer_exposure_profile(filings, sector, industry, cap_bucket)
# source must be 'inferred'
assert profile.source == "inferred", (
f"Expected source='inferred', got '{profile.source}'"
)
# confidence must be in [0, 1]
assert 0.0 <= profile.confidence <= 1.0, (
f"Confidence {profile.confidence} out of bounds [0, 1]"
)
# geographic_revenue_mix must be non-empty (filings have geo fragments)
assert len(profile.geographic_revenue_mix) > 0, (
"Expected non-empty geographic_revenue_mix for filings with geo data"
)
# Revenue mix values must sum to approximately 1.0
mix_total = sum(profile.geographic_revenue_mix.values())
assert abs(mix_total - 1.0) < 0.02, (
f"Revenue mix sums to {mix_total}, expected ~1.0"
)
# All revenue mix values must be in (0, 1]
for region, pct in profile.geographic_revenue_mix.items():
assert 0.0 < pct <= 1.0, (
f"Revenue mix for {region}={pct} out of bounds (0, 1]"
)
# ---------------------------------------------------------------------------
# Property 17: Low-confidence event exclusion
# ---------------------------------------------------------------------------
class TestProperty17LowConfidenceEventExclusion:
"""Feature: global-news-interpolation, Property 17: Low-confidence event exclusion
For any GlobalEvent classification with confidence below the configurable
threshold (default 0.4), the Interpolation_Engine SHALL produce zero
MacroImpactRecords for that event.
Validates: Requirements 10.1
"""
@given(
low_conf=st.floats(min_value=0.0, max_value=0.39, allow_nan=False),
high_conf=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
threshold=st.just(DEFAULT_CONFIDENCE_THRESHOLD),
)
@settings(max_examples=100)
def test_low_confidence_events_excluded(
self,
low_conf: float,
high_conf: float,
threshold: float,
):
"""**Validates: Requirements 10.1**
Events with confidence below threshold must be excluded.
Events at or above threshold must be included.
"""
low_event = GlobalEvent(
event_id="low-conf",
event_types=["supply_disruption"],
severity="moderate",
confidence=low_conf,
)
high_event = GlobalEvent(
event_id="high-conf",
event_types=["supply_disruption"],
severity="moderate",
confidence=high_conf,
)
result = filter_low_confidence_events(
[low_event, high_event],
confidence_threshold=threshold,
)
# Low confidence event must be excluded
result_ids = [e.event_id for e in result]
assert "low-conf" not in result_ids, (
f"Low-confidence event (conf={low_conf}) should be excluded "
f"with threshold={threshold}"
)
# High confidence event must be included
assert "high-conf" in result_ids, (
f"High-confidence event (conf={high_conf}) should be included "
f"with threshold={threshold}"
)
@given(
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
threshold=st.floats(min_value=0.01, max_value=0.99, allow_nan=False),
)
@settings(max_examples=100)
def test_threshold_boundary(
self,
confidence: float,
threshold: float,
):
"""**Validates: Requirements 10.1**
Events exactly at or above threshold are included; below are excluded.
"""
event = GlobalEvent(
event_id="test-event",
event_types=["supply_disruption"],
severity="moderate",
confidence=confidence,
)
result = filter_low_confidence_events([event], confidence_threshold=threshold)
if confidence < threshold:
assert len(result) == 0, (
f"Event with confidence={confidence} should be excluded "
f"with threshold={threshold}"
)
else:
assert len(result) == 1, (
f"Event with confidence={confidence} should be included "
f"with threshold={threshold}"
)
# ---------------------------------------------------------------------------
# Property 18: Accelerated decay for stale short-term events
# ---------------------------------------------------------------------------
class TestProperty18AcceleratedDecayStaleShortTerm:
"""Feature: global-news-interpolation, Property 18: Accelerated decay for stale short-term events
For any GlobalEvent with estimated_duration='short_term' and age
exceeding 48 hours, the effective signal weight SHALL be strictly less
than the weight computed using standard recency decay for the same age.
Validates: Requirements 10.2
"""
@given(
age_hours=st.floats(min_value=48.01, max_value=720.0, allow_nan=False),
)
@settings(max_examples=100)
def test_accelerated_decay_strictly_less_than_standard(
self,
age_hours: float,
):
"""**Validates: Requirements 10.2**
For short_term events older than 48 hours, the effective weight
must be strictly less than standard recency decay.
"""
standard = compute_standard_recency_decay(age_hours)
accelerated = apply_accelerated_decay(age_hours, "short_term")
assert accelerated < standard, (
f"Accelerated decay ({accelerated}) should be strictly less than "
f"standard decay ({standard}) for age={age_hours}h"
)
@given(
age_hours=st.floats(min_value=48.01, max_value=720.0, allow_nan=False),
)
@settings(max_examples=100)
def test_accelerated_decay_positive(
self,
age_hours: float,
):
"""**Validates: Requirements 10.2**
Accelerated decay must still be positive (> 0).
"""
accelerated = apply_accelerated_decay(age_hours, "short_term")
assert accelerated > 0.0, (
f"Accelerated decay should be positive, got {accelerated} for age={age_hours}h"
)
@given(
age_hours=st.floats(min_value=0.0, max_value=48.0, allow_nan=False),
)
@settings(max_examples=100)
def test_no_acceleration_within_staleness_window(
self,
age_hours: float,
):
"""**Validates: Requirements 10.2**
Short-term events within 48 hours should get standard decay (no acceleration).
"""
standard = compute_standard_recency_decay(age_hours)
result = apply_accelerated_decay(age_hours, "short_term")
assert abs(result - standard) < 1e-12, (
f"Within staleness window, decay should equal standard: "
f"result={result}, standard={standard}, age={age_hours}h"
)
# ---------------------------------------------------------------------------
# Property 19: Macro-only recommendation suppression
# ---------------------------------------------------------------------------
class TestProperty19MacroOnlyRecommendationSuppression:
"""Feature: global-news-interpolation, Property 19: Macro-only recommendation suppression
For any trend summary where the trend direction is driven solely by
macro signals (no company-specific signals support the direction), the
resulting recommendation SHALL have mode='informational' and the thesis
SHALL contain a macro-only caveat.
Validates: Requirements 10.3
"""
@given(
macro_count=st.integers(min_value=1, max_value=20),
direction=st.sampled_from(_VALID_TREND_DIRECTIONS),
strength=st.floats(min_value=0.1, max_value=1.0, allow_nan=False),
)
@settings(max_examples=100)
def test_macro_only_triggers_suppression(
self,
macro_count: int,
direction: TrendDirection,
strength: float,
):
"""**Validates: Requirements 10.3**
When macro signals are the sole basis (company_signal_count=0),
evaluate_macro_only_suppression must return True.
"""
summary = TrendSummary(
entity_type="company",
entity_id="TEST",
trend_direction=direction,
trend_strength=strength,
confidence=0.5,
)
result = evaluate_macro_only_suppression(
summary,
macro_signal_count=macro_count,
company_signal_count=0,
)
assert result is True, (
f"Expected suppression for macro_count={macro_count}, "
f"company_count=0, direction={direction.value}"
)
@given(
macro_count=st.integers(min_value=1, max_value=20),
company_count=st.integers(min_value=1, max_value=20),
direction=st.sampled_from(_VALID_TREND_DIRECTIONS),
)
@settings(max_examples=100)
def test_mixed_signals_no_suppression(
self,
macro_count: int,
company_count: int,
direction: TrendDirection,
):
"""**Validates: Requirements 10.3**
When both macro and company signals are present,
evaluate_macro_only_suppression must return False.
"""
summary = TrendSummary(
entity_type="company",
entity_id="TEST",
trend_direction=direction,
trend_strength=0.5,
confidence=0.5,
)
result = evaluate_macro_only_suppression(
summary,
macro_signal_count=macro_count,
company_signal_count=company_count,
)
assert result is False, (
f"Expected no suppression for macro_count={macro_count}, "
f"company_count={company_count}"
)
@given(
company_count=st.integers(min_value=0, max_value=20),
)
@settings(max_examples=100)
def test_no_macro_signals_no_suppression(
self,
company_count: int,
):
"""**Validates: Requirements 10.3**
When there are no macro signals, suppression must not trigger.
"""
summary = TrendSummary(
entity_type="company",
entity_id="TEST",
trend_direction=TrendDirection.BULLISH,
trend_strength=0.5,
confidence=0.5,
)
result = evaluate_macro_only_suppression(
summary,
macro_signal_count=0,
company_signal_count=company_count,
)
assert result is False, (
"Expected no suppression when macro_count=0"
)
# ---------------------------------------------------------------------------
# Property 4: Macro data persistence round-trip
# ---------------------------------------------------------------------------
from services.shared.schemas import (
ExposureProfileSchema as ExposureProfileSchemaImport,
)
from services.shared.schemas import (
GlobalEventSchema,
TrendDirection,
TrendProjectionSchema,
)
def _global_event_schema_strategy() -> st.SearchStrategy[GlobalEventSchema]:
"""Generate random valid GlobalEventSchema instances for round-trip testing."""
return st.builds(
GlobalEventSchema,
event_id=st.uuids().map(str),
event_types=st.lists(
st.sampled_from(_VALID_IMPACT_TYPES).map(ImpactType),
min_size=1,
max_size=4,
),
severity=st.sampled_from(list(SeverityLevel)),
affected_regions=st.lists(
st.sampled_from(_REGION_CODES), min_size=0, max_size=5, unique=True,
),
affected_sectors=st.lists(
st.sampled_from(_VALID_SECTORS + ["Energy", "Technology"]),
min_size=0, max_size=4, unique=True,
),
affected_commodities=st.lists(
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
),
summary=st.text(min_size=1, max_size=200),
key_facts=st.lists(st.text(min_size=1, max_size=80), min_size=0, max_size=5),
estimated_duration=st.sampled_from(list(EstimatedDuration)),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
source_document_id=st.uuids().map(str),
)
def _macro_impact_record_schema_strategy() -> st.SearchStrategy[MacroImpactRecordSchema]:
"""Generate random valid MacroImpactRecordSchema instances."""
return st.builds(
MacroImpactRecordSchema,
event_id=st.uuids().map(str),
company_id=st.uuids().map(str),
ticker=st.text(
alphabet=st.characters(whitelist_categories=("Lu",)),
min_size=1, max_size=5,
),
macro_impact_score=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
impact_direction=st.sampled_from(["positive", "negative", "mixed", "neutral"]),
contributing_factors=st.lists(st.text(min_size=1, max_size=50), min_size=0, max_size=5),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
)
def _exposure_profile_schema_roundtrip_strategy() -> st.SearchStrategy[ExposureProfileSchemaImport]:
"""Generate random valid ExposureProfileSchema instances for round-trip testing."""
return st.builds(
ExposureProfileSchemaImport,
company_id=st.uuids().map(str),
geographic_revenue_mix=_geo_revenue_mix(),
supply_chain_regions=st.lists(
st.sampled_from(_REGION_CODES), min_size=0, max_size=4, unique=True,
),
key_input_commodities=st.lists(
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
),
regulatory_jurisdictions=st.lists(
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
),
market_position_tier=st.sampled_from(list(MarketPositionTier)),
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
source=st.sampled_from(["manual", "inferred"]),
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
version=st.integers(min_value=1, max_value=100),
active=st.booleans(),
)
def _trend_projection_schema_strategy() -> st.SearchStrategy[TrendProjectionSchema]:
"""Generate random valid TrendProjectionSchema instances."""
return st.builds(
TrendProjectionSchema,
trend_window_id=st.uuids().map(str),
projected_direction=st.sampled_from(list(TrendDirection)),
projected_strength=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
projected_confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
projection_horizon=st.sampled_from(["1d", "7d", "30d"]),
driving_factors=st.lists(st.text(min_size=1, max_size=80), min_size=0, max_size=5),
macro_contribution_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
diverges_from_current=st.booleans(),
)
class TestProperty4MacroDataPersistenceRoundTrip:
"""Feature: global-news-interpolation, Property 4: Macro data persistence round-trip
For any valid GlobalEvent, MacroImpactRecord, ExposureProfile, or
TrendProjection object, persisting it to PostgreSQL and reading it back
SHALL produce an equivalent object with all fields preserved.
Since we can't use a real DB in tests, we test the serialization/
deserialization round-trip of the Pydantic schemas (model_dump → model_validate).
Validates: Requirements 3.1, 7.1, 7.2, 12.5
"""
@given(event=_global_event_schema_strategy())
@settings(max_examples=100)
def test_global_event_schema_round_trip(self, event: GlobalEventSchema):
"""**Validates: Requirements 7.1**
Serializing and deserializing a GlobalEventSchema must preserve all fields.
"""
data = event.model_dump(mode="json")
restored = GlobalEventSchema.model_validate(data)
assert restored.event_id == event.event_id
assert restored.severity == event.severity
assert restored.affected_regions == event.affected_regions
assert restored.affected_sectors == event.affected_sectors
assert restored.affected_commodities == event.affected_commodities
assert restored.summary == event.summary
assert restored.key_facts == event.key_facts
assert restored.estimated_duration == event.estimated_duration
assert restored.confidence == event.confidence
assert restored.source_document_id == event.source_document_id
# event_types: compare values
assert [et.value if hasattr(et, "value") else et for et in restored.event_types] == \
[et.value if hasattr(et, "value") else et for et in event.event_types]
@given(record=_macro_impact_record_schema_strategy())
@settings(max_examples=100)
def test_macro_impact_record_schema_round_trip(self, record: MacroImpactRecordSchema):
"""**Validates: Requirements 7.2**
Serializing and deserializing a MacroImpactRecordSchema must preserve all fields.
"""
data = record.model_dump(mode="json")
restored = MacroImpactRecordSchema.model_validate(data)
assert restored.event_id == record.event_id
assert restored.company_id == record.company_id
assert restored.ticker == record.ticker
assert restored.macro_impact_score == record.macro_impact_score
assert restored.impact_direction == record.impact_direction
assert restored.contributing_factors == record.contributing_factors
assert restored.confidence == record.confidence
@given(profile=_exposure_profile_schema_roundtrip_strategy())
@settings(max_examples=100)
def test_exposure_profile_schema_round_trip(self, profile: ExposureProfileSchemaImport):
"""**Validates: Requirements 3.1**
Serializing and deserializing an ExposureProfileSchema must preserve all fields.
"""
data = profile.model_dump(mode="json")
restored = ExposureProfileSchemaImport.model_validate(data)
assert restored.company_id == profile.company_id
assert restored.geographic_revenue_mix == profile.geographic_revenue_mix
assert restored.supply_chain_regions == profile.supply_chain_regions
assert restored.key_input_commodities == profile.key_input_commodities
assert restored.regulatory_jurisdictions == profile.regulatory_jurisdictions
assert restored.market_position_tier == profile.market_position_tier
assert restored.export_dependency_pct == profile.export_dependency_pct
assert restored.source == profile.source
assert restored.confidence == profile.confidence
assert restored.version == profile.version
assert restored.active == profile.active
@given(projection=_trend_projection_schema_strategy())
@settings(max_examples=100)
def test_trend_projection_schema_round_trip(self, projection: TrendProjectionSchema):
"""**Validates: Requirements 12.5**
Serializing and deserializing a TrendProjectionSchema must preserve all fields.
"""
data = projection.model_dump(mode="json")
restored = TrendProjectionSchema.model_validate(data)
assert restored.trend_window_id == projection.trend_window_id
assert restored.projected_direction == projection.projected_direction
assert restored.projected_strength == projection.projected_strength
assert restored.projected_confidence == projection.projected_confidence
assert restored.projection_horizon == projection.projection_horizon
assert restored.driving_factors == projection.driving_factors
assert restored.macro_contribution_pct == projection.macro_contribution_pct
assert restored.diverges_from_current == projection.diverges_from_current
# ---------------------------------------------------------------------------
# Property 1: Content hash stability and uniqueness
# ---------------------------------------------------------------------------
from services.shared.content import content_hash, content_hash_str
class TestProperty1ContentHashStabilityAndUniqueness:
"""Feature: global-news-interpolation, Property 1: Content hash stability and uniqueness
For any macro news article content, computing the content hash twice on
identical content SHALL produce the same hash, and computing the hash on
distinct content SHALL produce different hashes.
Validates: Requirements 1.2
"""
@given(content=st.binary(min_size=1, max_size=10000))
@settings(max_examples=100)
def test_content_hash_stability_bytes(self, content: bytes):
"""**Validates: Requirements 1.2**
Computing content_hash twice on the same bytes must produce the same result.
"""
hash1 = content_hash(content)
hash2 = content_hash(content)
assert hash1 == hash2, (
f"Hash instability: {hash1} != {hash2} for same content"
)
@given(text=st.text(min_size=1, max_size=5000))
@settings(max_examples=100)
def test_content_hash_str_stability(self, text: str):
"""**Validates: Requirements 1.2**
Computing content_hash_str twice on the same string must produce the same result.
"""
hash1 = content_hash_str(text)
hash2 = content_hash_str(text)
assert hash1 == hash2, (
f"Hash instability: {hash1} != {hash2} for same text"
)
@given(
content_a=st.binary(min_size=1, max_size=5000),
content_b=st.binary(min_size=1, max_size=5000),
)
@settings(max_examples=100)
def test_content_hash_uniqueness_bytes(self, content_a: bytes, content_b: bytes):
"""**Validates: Requirements 1.2**
Computing content_hash on distinct content must produce different hashes.
"""
from hypothesis import assume
assume(content_a != content_b)
hash_a = content_hash(content_a)
hash_b = content_hash(content_b)
assert hash_a != hash_b, (
f"Hash collision: {hash_a} for distinct content "
f"({len(content_a)} bytes vs {len(content_b)} bytes)"
)
@given(
text_a=st.text(min_size=1, max_size=5000),
text_b=st.text(min_size=1, max_size=5000),
)
@settings(max_examples=100)
def test_content_hash_str_uniqueness(self, text_a: str, text_b: str):
"""**Validates: Requirements 1.2**
Computing content_hash_str on distinct strings must produce different hashes.
"""
from hypothesis import assume
assume(text_a != text_b)
hash_a = content_hash_str(text_a)
hash_b = content_hash_str(text_b)
assert hash_a != hash_b, (
f"Hash collision: {hash_a} for distinct text"
)