c85c0068a2
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files - Fix 12 failing tests to match current implementation behavior - Fix pytest_plugins in non-top-level conftest (moved to root conftest.py) - Auto-fix 189 lint issues (import sorting, unused imports) - Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests) - Add values-beta.yaml and values-paper.yaml for staged deployments - Update GitHub Actions workflow to use self-hosted-gremlin runners - Add integration-test job to CI pipeline Result: 1596 passed, 0 failed, 0 warnings
2642 lines
99 KiB
Python
2642 lines
99 KiB
Python
"""Property-based tests for the macro pipeline.
|
|
|
|
Feature: global-news-interpolation
|
|
|
|
Uses Hypothesis to validate correctness properties of the event classifier
|
|
and macro impact pipeline.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
from hypothesis import given, settings
|
|
from hypothesis import strategies as st
|
|
|
|
from services.extractor.event_classifier import (
|
|
GlobalEvent,
|
|
_parse_classification_response,
|
|
)
|
|
from services.shared.schemas import (
|
|
EstimatedDuration,
|
|
ImpactType,
|
|
MacroImpactRecordSchema,
|
|
ModelMetadata,
|
|
SeverityLevel,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis strategies for valid Ollama classification responses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_VALID_IMPACT_TYPES = [e.value for e in ImpactType]
|
|
_VALID_SEVERITY_LEVELS = [e.value for e in SeverityLevel]
|
|
_VALID_DURATIONS = [e.value for e in EstimatedDuration]
|
|
|
|
|
|
def _ollama_classification_response() -> st.SearchStrategy[str]:
|
|
"""Generate random valid JSON matching the event classification schema."""
|
|
return st.fixed_dictionaries({
|
|
"event_types": st.lists(
|
|
st.sampled_from(_VALID_IMPACT_TYPES),
|
|
min_size=1,
|
|
max_size=len(_VALID_IMPACT_TYPES),
|
|
),
|
|
"severity": st.sampled_from(_VALID_SEVERITY_LEVELS),
|
|
"affected_regions": st.lists(
|
|
st.text(
|
|
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd")),
|
|
min_size=1,
|
|
max_size=10,
|
|
),
|
|
min_size=0,
|
|
max_size=8,
|
|
),
|
|
"affected_sectors": st.lists(
|
|
st.text(
|
|
alphabet=st.characters(whitelist_categories=("Lu", "Ll", "Nd", "Zs")),
|
|
min_size=1,
|
|
max_size=30,
|
|
),
|
|
min_size=0,
|
|
max_size=6,
|
|
),
|
|
"affected_commodities": st.lists(
|
|
st.text(
|
|
alphabet=st.characters(whitelist_categories=("Ll", "Nd"), whitelist_characters="_"),
|
|
min_size=1,
|
|
max_size=20,
|
|
),
|
|
min_size=0,
|
|
max_size=5,
|
|
),
|
|
"summary": st.text(min_size=1, max_size=200).filter(lambda s: s.strip()),
|
|
"key_facts": st.lists(
|
|
st.text(min_size=1, max_size=100),
|
|
min_size=0,
|
|
max_size=5,
|
|
),
|
|
"estimated_duration": st.sampled_from(_VALID_DURATIONS),
|
|
"confidence": st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
}).map(json.dumps)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 2: Macro pipeline output schema completeness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty2MacroPipelineOutputSchemaCompleteness:
|
|
"""Feature: global-news-interpolation, Property 2: Macro pipeline output schema completeness
|
|
|
|
For any valid Ollama classification response, the resulting GlobalEvent
|
|
object SHALL contain all required fields (event_id, event_types, severity,
|
|
affected_regions, affected_sectors, summary, estimated_duration, confidence,
|
|
source_document_id, model_metadata). Similarly, for any valid macro impact
|
|
computation, the resulting MacroImpactRecord SHALL contain all required fields.
|
|
|
|
Validates: Requirements 2.2, 4.5
|
|
"""
|
|
|
|
@given(raw_json=_ollama_classification_response())
|
|
@settings(max_examples=100)
|
|
def test_global_event_has_all_required_fields(self, raw_json: str):
|
|
"""**Validates: Requirements 2.2**
|
|
|
|
Parsed GlobalEvent must contain every required field with correct types.
|
|
"""
|
|
event = _parse_classification_response(raw_json, "doc-test-123", "test-model")
|
|
|
|
# --- All required fields exist and are not None ---
|
|
assert event.event_id is not None and isinstance(event.event_id, str)
|
|
assert event.event_types is not None and isinstance(event.event_types, list)
|
|
assert event.severity is not None and isinstance(event.severity, str)
|
|
assert event.affected_regions is not None and isinstance(event.affected_regions, list)
|
|
assert event.affected_sectors is not None and isinstance(event.affected_sectors, list)
|
|
assert event.summary is not None and isinstance(event.summary, str)
|
|
assert event.estimated_duration is not None and isinstance(event.estimated_duration, str)
|
|
assert event.confidence is not None and isinstance(event.confidence, float)
|
|
assert event.source_document_id is not None and isinstance(event.source_document_id, str)
|
|
assert event.model_metadata is not None and isinstance(event.model_metadata, ModelMetadata)
|
|
|
|
# --- event_types is non-empty (normalization guarantees at least one) ---
|
|
assert len(event.event_types) >= 1
|
|
for et in event.event_types:
|
|
assert et in {e.value for e in ImpactType}
|
|
|
|
# --- severity is a valid SeverityLevel ---
|
|
assert event.severity in {e.value for e in SeverityLevel}
|
|
|
|
# --- confidence is in [0, 1] ---
|
|
assert 0.0 <= event.confidence <= 1.0
|
|
|
|
# --- estimated_duration is a valid EstimatedDuration ---
|
|
assert event.estimated_duration in {e.value for e in EstimatedDuration}
|
|
|
|
# --- source_document_id is preserved from input ---
|
|
assert event.source_document_id == "doc-test-123"
|
|
|
|
# --- model_metadata has correct provider and model ---
|
|
assert event.model_metadata.provider == "ollama"
|
|
assert event.model_metadata.model_name == "test-model"
|
|
|
|
@given(
|
|
event_id=st.uuids().map(str),
|
|
company_id=st.uuids().map(str),
|
|
ticker=st.text(
|
|
alphabet=st.characters(whitelist_categories=("Lu",)),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
score=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
direction=st.sampled_from(["positive", "negative", "mixed", "neutral"]),
|
|
factors=st.lists(st.text(min_size=1, max_size=50), min_size=0, max_size=5),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_macro_impact_record_has_all_required_fields(
|
|
self,
|
|
event_id: str,
|
|
company_id: str,
|
|
ticker: str,
|
|
score: float,
|
|
direction: str,
|
|
factors: list[str],
|
|
confidence: float,
|
|
):
|
|
"""**Validates: Requirements 4.5**
|
|
|
|
MacroImpactRecordSchema must contain all required fields with correct types.
|
|
"""
|
|
record = MacroImpactRecordSchema(
|
|
event_id=event_id,
|
|
company_id=company_id,
|
|
ticker=ticker,
|
|
macro_impact_score=score,
|
|
impact_direction=direction,
|
|
contributing_factors=factors,
|
|
confidence=confidence,
|
|
)
|
|
|
|
# --- All required fields exist and have correct types ---
|
|
assert record.event_id is not None and isinstance(record.event_id, str)
|
|
assert record.company_id is not None and isinstance(record.company_id, str)
|
|
assert record.ticker is not None and isinstance(record.ticker, str)
|
|
assert isinstance(record.macro_impact_score, float)
|
|
assert record.impact_direction is not None and isinstance(record.impact_direction, str)
|
|
assert record.contributing_factors is not None and isinstance(record.contributing_factors, list)
|
|
assert isinstance(record.confidence, float)
|
|
|
|
# --- Score and confidence are in [0, 1] ---
|
|
assert 0.0 <= record.macro_impact_score <= 1.0
|
|
assert 0.0 <= record.confidence <= 1.0
|
|
|
|
# --- Values are preserved ---
|
|
assert record.event_id == event_id
|
|
assert record.company_id == company_id
|
|
assert record.ticker == ticker
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 3: Multiple impact types preserved
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty3MultipleImpactTypesPreserved:
|
|
"""Feature: global-news-interpolation, Property 3: Multiple impact types preserved
|
|
|
|
For any global event classification where the source article implies N
|
|
distinct impact types, the resulting GlobalEvent's event_types list SHALL
|
|
contain all N types without collapsing to a single category.
|
|
|
|
Validates: Requirements 2.4
|
|
"""
|
|
|
|
@given(
|
|
chosen_types=st.lists(
|
|
st.sampled_from(_VALID_IMPACT_TYPES),
|
|
min_size=1,
|
|
max_size=len(_VALID_IMPACT_TYPES),
|
|
unique=True,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_all_impact_types_preserved_after_parsing(self, chosen_types: list[str]):
|
|
"""**Validates: Requirements 2.4**
|
|
|
|
Given N distinct valid ImpactType values in the JSON response,
|
|
the parsed GlobalEvent.event_types must contain ALL N types.
|
|
"""
|
|
# Build a valid classification JSON with the chosen event_types
|
|
response_dict = {
|
|
"event_types": chosen_types,
|
|
"severity": "moderate",
|
|
"affected_regions": ["US"],
|
|
"affected_sectors": ["Energy"],
|
|
"affected_commodities": ["crude_oil"],
|
|
"summary": "Test event for impact type preservation.",
|
|
"key_facts": ["Fact one."],
|
|
"estimated_duration": "short_term",
|
|
"confidence": 0.8,
|
|
}
|
|
raw_json = json.dumps(response_dict)
|
|
|
|
event = _parse_classification_response(raw_json, "doc-types-test", "test-model")
|
|
|
|
# All original types must be present (no collapsing)
|
|
assert len(event.event_types) >= len(chosen_types), (
|
|
f"Expected at least {len(chosen_types)} types, got {len(event.event_types)}: "
|
|
f"{event.event_types}"
|
|
)
|
|
for t in chosen_types:
|
|
assert t in event.event_types, (
|
|
f"Impact type '{t}' was lost during parsing. "
|
|
f"Input: {chosen_types}, Output: {event.event_types}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Property 6
|
|
# ---------------------------------------------------------------------------
|
|
import copy
|
|
import uuid as _uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
|
|
from services.symbol_registry.exposure import ExposureProfileCreate
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis strategy for valid ExposureProfileCreate data
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_REGION_CODES = ["US", "CN", "DE", "JP", "GB", "KR", "IN", "BR", "AU", "CA"]
|
|
_COMMODITIES = ["crude_oil", "natural_gas", "copper", "lithium", "steel", "wheat", "corn"]
|
|
_JURISDICTIONS = ["US", "EU", "CN", "JP", "UK", "AU"]
|
|
_TIERS = ["global_leader", "multinational", "regional", "domestic"]
|
|
_SOURCES = ["manual", "inferred"]
|
|
|
|
|
|
def _geo_revenue_mix() -> st.SearchStrategy[dict[str, float]]:
|
|
"""Generate a geographic revenue mix that sums to ~1.0."""
|
|
return st.lists(
|
|
st.sampled_from(_REGION_CODES),
|
|
min_size=1,
|
|
max_size=5,
|
|
unique=True,
|
|
).flatmap(
|
|
lambda regions: st.lists(
|
|
st.floats(min_value=0.01, max_value=1.0, allow_nan=False, allow_infinity=False),
|
|
min_size=len(regions),
|
|
max_size=len(regions),
|
|
).map(lambda vals: {r: round(v / sum(vals), 4) for r, v in zip(regions, vals)})
|
|
)
|
|
|
|
|
|
def _exposure_profile_create_strategy() -> st.SearchStrategy[ExposureProfileCreate]:
|
|
"""Generate random valid ExposureProfileCreate instances."""
|
|
return st.builds(
|
|
ExposureProfileCreate,
|
|
geographic_revenue_mix=_geo_revenue_mix(),
|
|
supply_chain_regions=st.lists(
|
|
st.sampled_from(_REGION_CODES), min_size=0, max_size=4, unique=True,
|
|
),
|
|
key_input_commodities=st.lists(
|
|
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
|
|
),
|
|
regulatory_jurisdictions=st.lists(
|
|
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
|
|
),
|
|
market_position_tier=st.sampled_from(_TIERS),
|
|
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
source=st.sampled_from(_SOURCES),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Simulated version history logic (mirrors the DB-backed upsert in exposure.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class _VersionedProfile:
|
|
"""An archived or active exposure profile snapshot."""
|
|
version: int
|
|
active: bool
|
|
profile_data: ExposureProfileCreate
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(tz=None))
|
|
|
|
|
|
def _simulate_version_history(
|
|
updates: list[ExposureProfileCreate],
|
|
) -> list[_VersionedProfile]:
|
|
"""Simulate the versioning logic from the PUT endpoint.
|
|
|
|
Each update:
|
|
1. Archives the previous active profile (active=False).
|
|
2. Inserts a new profile with incremented version and active=True.
|
|
Returns the full history ordered by version ascending.
|
|
"""
|
|
history: list[_VersionedProfile] = []
|
|
for idx, profile_data in enumerate(updates):
|
|
version = idx + 1
|
|
# Archive previous active entry
|
|
for entry in history:
|
|
if entry.active:
|
|
entry.active = False
|
|
# Insert new version as active
|
|
history.append(
|
|
_VersionedProfile(
|
|
version=version,
|
|
active=True,
|
|
profile_data=copy.deepcopy(profile_data),
|
|
)
|
|
)
|
|
return history
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 6: Exposure profile version history
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty6ExposureProfileVersionHistory:
|
|
"""Feature: global-news-interpolation, Property 6: Exposure profile version history
|
|
|
|
For any sequence of N updates to a company's ExposureProfile, the version
|
|
history SHALL contain exactly N records, each preserving the complete
|
|
profile state at the time of that update, with monotonically increasing
|
|
version numbers.
|
|
|
|
Validates: Requirements 3.3
|
|
"""
|
|
|
|
@given(
|
|
updates=st.lists(
|
|
_exposure_profile_create_strategy(),
|
|
min_size=1,
|
|
max_size=10,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_version_history_count_and_monotonicity(
|
|
self, updates: list[ExposureProfileCreate],
|
|
):
|
|
"""**Validates: Requirements 3.3**
|
|
|
|
Given N profile updates, the history must contain exactly N records
|
|
with monotonically increasing version numbers (1, 2, …, N), each
|
|
preserving the complete profile state, and only the latest version
|
|
having active=True.
|
|
"""
|
|
n = len(updates)
|
|
history = _simulate_version_history(updates)
|
|
|
|
# 1. Exactly N records
|
|
assert len(history) == n, (
|
|
f"Expected {n} history records, got {len(history)}"
|
|
)
|
|
|
|
# 2. Version numbers are monotonically increasing: 1, 2, …, N
|
|
versions = [entry.version for entry in history]
|
|
assert versions == list(range(1, n + 1)), (
|
|
f"Versions should be [1..{n}], got {versions}"
|
|
)
|
|
|
|
# 3. Each record preserves the complete profile state from that update
|
|
for idx, entry in enumerate(history):
|
|
original = updates[idx]
|
|
stored = entry.profile_data
|
|
|
|
assert stored.geographic_revenue_mix == original.geographic_revenue_mix, (
|
|
f"Version {entry.version}: geographic_revenue_mix mismatch"
|
|
)
|
|
assert stored.supply_chain_regions == original.supply_chain_regions, (
|
|
f"Version {entry.version}: supply_chain_regions mismatch"
|
|
)
|
|
assert stored.key_input_commodities == original.key_input_commodities, (
|
|
f"Version {entry.version}: key_input_commodities mismatch"
|
|
)
|
|
assert stored.regulatory_jurisdictions == original.regulatory_jurisdictions, (
|
|
f"Version {entry.version}: regulatory_jurisdictions mismatch"
|
|
)
|
|
assert stored.market_position_tier == original.market_position_tier, (
|
|
f"Version {entry.version}: market_position_tier mismatch"
|
|
)
|
|
assert stored.export_dependency_pct == original.export_dependency_pct, (
|
|
f"Version {entry.version}: export_dependency_pct mismatch"
|
|
)
|
|
assert stored.source == original.source, (
|
|
f"Version {entry.version}: source mismatch"
|
|
)
|
|
assert stored.confidence == original.confidence, (
|
|
f"Version {entry.version}: confidence mismatch"
|
|
)
|
|
|
|
# 4. Only the latest version (last entry) has active=True
|
|
for entry in history[:-1]:
|
|
assert entry.active is False, (
|
|
f"Version {entry.version} should be archived (active=False)"
|
|
)
|
|
assert history[-1].active is True, (
|
|
f"Latest version {history[-1].version} should be active"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Properties 5, 7, 8, 9, 10
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.aggregation.interpolation import (
|
|
_CAP_TO_TIER,
|
|
_DEFAULT_GEO,
|
|
_SECTOR_DEFAULT_GEO,
|
|
apply_resilience_modifier,
|
|
build_default_profile,
|
|
compute_macro_impact,
|
|
)
|
|
from services.shared.schemas import ExposureProfileSchema, MarketPositionTier
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared Hypothesis strategies for interpolation tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_VALID_SECTORS = list(_SECTOR_DEFAULT_GEO.keys())
|
|
_VALID_CAP_BUCKETS = list(_CAP_TO_TIER.keys())
|
|
_SEVERITY_ORDER = ["low", "moderate", "high", "critical"]
|
|
|
|
|
|
def _global_event_strategy(
|
|
*,
|
|
min_regions: int = 0,
|
|
max_regions: int = 5,
|
|
min_sectors: int = 0,
|
|
max_sectors: int = 4,
|
|
min_commodities: int = 0,
|
|
max_commodities: int = 4,
|
|
severity: st.SearchStrategy[str] | None = None,
|
|
event_types: st.SearchStrategy[list[str]] | None = None,
|
|
) -> st.SearchStrategy[GlobalEvent]:
|
|
"""Generate random valid GlobalEvent instances."""
|
|
return st.builds(
|
|
GlobalEvent,
|
|
event_id=st.uuids().map(str),
|
|
event_types=event_types or st.lists(
|
|
st.sampled_from(_VALID_IMPACT_TYPES),
|
|
min_size=1,
|
|
max_size=len(_VALID_IMPACT_TYPES),
|
|
),
|
|
severity=severity or st.sampled_from(_VALID_SEVERITY_LEVELS),
|
|
affected_regions=st.lists(
|
|
st.sampled_from(_REGION_CODES),
|
|
min_size=min_regions,
|
|
max_size=max_regions,
|
|
unique=True,
|
|
),
|
|
affected_sectors=st.lists(
|
|
st.sampled_from(_VALID_SECTORS),
|
|
min_size=min_sectors,
|
|
max_size=max_sectors,
|
|
unique=True,
|
|
),
|
|
affected_commodities=st.lists(
|
|
st.sampled_from(_COMMODITIES),
|
|
min_size=min_commodities,
|
|
max_size=max_commodities,
|
|
unique=True,
|
|
),
|
|
summary=st.text(min_size=1, max_size=100),
|
|
key_facts=st.just([]),
|
|
estimated_duration=st.sampled_from(_VALID_DURATIONS),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
source_document_id=st.uuids().map(str),
|
|
)
|
|
|
|
|
|
def _exposure_profile_schema_strategy(
|
|
*,
|
|
min_regions: int = 0,
|
|
max_regions: int = 5,
|
|
min_commodities: int = 0,
|
|
max_commodities: int = 4,
|
|
tier: st.SearchStrategy | None = None,
|
|
) -> st.SearchStrategy[ExposureProfileSchema]:
|
|
"""Generate random valid ExposureProfileSchema instances."""
|
|
return st.builds(
|
|
ExposureProfileSchema,
|
|
company_id=st.uuids().map(str),
|
|
geographic_revenue_mix=_geo_revenue_mix(),
|
|
supply_chain_regions=st.lists(
|
|
st.sampled_from(_REGION_CODES),
|
|
min_size=min_regions,
|
|
max_size=max_regions,
|
|
unique=True,
|
|
),
|
|
key_input_commodities=st.lists(
|
|
st.sampled_from(_COMMODITIES),
|
|
min_size=min_commodities,
|
|
max_size=max_commodities,
|
|
unique=True,
|
|
),
|
|
regulatory_jurisdictions=st.lists(
|
|
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
|
|
),
|
|
market_position_tier=tier or st.sampled_from(list(MarketPositionTier)),
|
|
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
source=st.sampled_from(["manual", "inferred"]),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 5: Default exposure profile derivation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty5DefaultExposureProfileDerivation:
|
|
"""Feature: global-news-interpolation, Property 5: Default exposure profile derivation
|
|
|
|
For any company with a valid sector, industry, and market_cap_bucket but
|
|
no manually configured ExposureProfile, the default profile SHALL have a
|
|
market_position_tier consistent with the market_cap_bucket mapping
|
|
(large_cap → global_leader, mid_cap → multinational, small_cap → regional,
|
|
micro_cap → domestic) and SHALL have non-empty geographic_revenue_mix
|
|
derived from the sector.
|
|
|
|
Validates: Requirements 3.2
|
|
"""
|
|
|
|
@given(
|
|
sector=st.sampled_from(_VALID_SECTORS + ["UnknownSector"]),
|
|
industry=st.text(min_size=1, max_size=30),
|
|
market_cap_bucket=st.sampled_from(_VALID_CAP_BUCKETS),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_default_profile_tier_and_geo_mix(
|
|
self,
|
|
sector: str,
|
|
industry: str,
|
|
market_cap_bucket: str,
|
|
):
|
|
"""**Validates: Requirements 3.2**
|
|
|
|
The default profile must map market_cap_bucket to the correct
|
|
market_position_tier and have a non-empty geographic_revenue_mix.
|
|
"""
|
|
profile = build_default_profile(sector, industry, market_cap_bucket)
|
|
|
|
# 1. market_position_tier matches the cap-to-tier mapping
|
|
expected_tier = _CAP_TO_TIER[market_cap_bucket]
|
|
actual_tier = profile.market_position_tier
|
|
if isinstance(actual_tier, MarketPositionTier):
|
|
actual_tier = actual_tier.value
|
|
assert actual_tier == expected_tier, (
|
|
f"For {market_cap_bucket}, expected tier={expected_tier}, got {actual_tier}"
|
|
)
|
|
|
|
# 2. geographic_revenue_mix is non-empty
|
|
assert len(profile.geographic_revenue_mix) > 0, (
|
|
f"Default profile for sector={sector} has empty geographic_revenue_mix"
|
|
)
|
|
|
|
# 3. geographic_revenue_mix is derived from the sector (known sector
|
|
# uses sector-specific map, unknown sector uses _DEFAULT_GEO)
|
|
if sector in _SECTOR_DEFAULT_GEO:
|
|
expected_geo = _SECTOR_DEFAULT_GEO[sector]
|
|
else:
|
|
expected_geo = _DEFAULT_GEO
|
|
assert set(profile.geographic_revenue_mix.keys()) == set(expected_geo.keys()), (
|
|
f"Geo mix keys mismatch for sector={sector}"
|
|
)
|
|
|
|
# 4. source is 'inferred' for default profiles
|
|
assert profile.source == "inferred"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 7: Macro impact score bounds and zero-overlap invariant
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty7MacroImpactScoreBoundsAndZeroOverlap:
|
|
"""Feature: global-news-interpolation, Property 7: Macro impact score bounds and zero-overlap invariant
|
|
|
|
For any GlobalEvent and ExposureProfile pair, the computed
|
|
Macro_Impact_Score SHALL be in [0, 1]. Furthermore, for any pair where
|
|
the event's affected_regions, affected_sectors, and affected_commodities
|
|
have zero intersection with the profile's geographic_revenue_mix keys,
|
|
supply_chain_regions, and key_input_commodities, the score SHALL be
|
|
exactly 0.0.
|
|
|
|
Validates: Requirements 4.1, 4.4
|
|
"""
|
|
|
|
@given(
|
|
event=_global_event_strategy(),
|
|
profile=_exposure_profile_schema_strategy(),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_score_in_bounds(self, event: GlobalEvent, profile: ExposureProfileSchema):
|
|
"""**Validates: Requirements 4.1**
|
|
|
|
The macro impact score must always be in [0, 1].
|
|
"""
|
|
record = compute_macro_impact(event, profile)
|
|
assert 0.0 <= record.macro_impact_score <= 1.0, (
|
|
f"Score {record.macro_impact_score} out of bounds [0, 1]"
|
|
)
|
|
|
|
@given(data=st.data())
|
|
@settings(max_examples=100)
|
|
def test_zero_overlap_produces_zero_score(self, data: st.DataObject):
|
|
"""**Validates: Requirements 4.4**
|
|
|
|
When event regions/sectors/commodities have zero intersection with
|
|
the profile, the score must be exactly 0.0.
|
|
"""
|
|
# Build an event with regions/commodities that do NOT overlap the profile
|
|
# Use two disjoint sets of region codes
|
|
event_regions = ["ZZ", "YY", "XX"]
|
|
event_commodities = ["unobtanium", "vibranium"]
|
|
|
|
event = data.draw(_global_event_strategy(
|
|
min_regions=0,
|
|
max_regions=0,
|
|
min_commodities=0,
|
|
max_commodities=0,
|
|
))
|
|
# Override with non-overlapping values
|
|
event.affected_regions = event_regions
|
|
event.affected_commodities = event_commodities
|
|
event.affected_sectors = ["NonexistentSector"]
|
|
|
|
profile = data.draw(_exposure_profile_schema_strategy())
|
|
|
|
record = compute_macro_impact(event, profile)
|
|
assert record.macro_impact_score == 0.0, (
|
|
f"Expected score 0.0 for zero-overlap, got {record.macro_impact_score}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 8: Scoring monotonicity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty8ScoringMonotonicity:
|
|
"""Feature: global-news-interpolation, Property 8: Scoring monotonicity
|
|
|
|
For any GlobalEvent and ExposureProfile pair, increasing the event's
|
|
severity level (low → moderate → high → critical) while holding all
|
|
other inputs constant SHALL produce a Macro_Impact_Score that is greater
|
|
than or equal to the previous score. Similarly, increasing the geographic
|
|
overlap percentage SHALL produce a score greater than or equal to the
|
|
previous score.
|
|
|
|
Validates: Requirements 4.2
|
|
"""
|
|
|
|
@given(
|
|
event=_global_event_strategy(min_regions=1, min_commodities=1),
|
|
profile=_exposure_profile_schema_strategy(min_regions=1, min_commodities=1),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_severity_monotonicity(
|
|
self, event: GlobalEvent, profile: ExposureProfileSchema,
|
|
):
|
|
"""**Validates: Requirements 4.2**
|
|
|
|
Increasing severity must produce a score >= the previous score.
|
|
"""
|
|
scores = []
|
|
for sev in _SEVERITY_ORDER:
|
|
event.severity = sev
|
|
record = compute_macro_impact(event, profile)
|
|
scores.append(record.macro_impact_score)
|
|
|
|
for i in range(1, len(scores)):
|
|
assert scores[i] >= scores[i - 1] - 1e-9, (
|
|
f"Severity monotonicity violated: "
|
|
f"{_SEVERITY_ORDER[i-1]}={scores[i-1]:.6f} > "
|
|
f"{_SEVERITY_ORDER[i]}={scores[i]:.6f}"
|
|
)
|
|
|
|
@given(
|
|
event=_global_event_strategy(min_regions=1),
|
|
profile=_exposure_profile_schema_strategy(),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_geographic_overlap_monotonicity(
|
|
self, event: GlobalEvent, profile: ExposureProfileSchema,
|
|
):
|
|
"""**Validates: Requirements 4.2**
|
|
|
|
Increasing geographic overlap must produce a score >= the previous,
|
|
holding the resilience modifier constant by keeping is_international
|
|
consistent across comparisons.
|
|
"""
|
|
geo_keys = list(profile.geographic_revenue_mix.keys())
|
|
if not geo_keys:
|
|
return # nothing to test with empty geo mix
|
|
|
|
# To isolate geographic overlap monotonicity we must keep
|
|
# is_international constant. We always include a non-overlapping
|
|
# sentinel region ("ZZ") so len(affected_regions) >= 2 in the
|
|
# overlap cases, making is_international=True throughout.
|
|
|
|
# Score with no geographic overlap (2 non-overlapping regions)
|
|
event.affected_regions = ["ZZ", "YY"]
|
|
record_none = compute_macro_impact(event, profile)
|
|
|
|
# Score with partial geographic overlap (first key + sentinel)
|
|
event.affected_regions = [geo_keys[0], "ZZ"]
|
|
record_partial = compute_macro_impact(event, profile)
|
|
|
|
# Score with full geographic overlap (all keys + sentinel)
|
|
event.affected_regions = geo_keys + ["ZZ"]
|
|
record_full = compute_macro_impact(event, profile)
|
|
|
|
assert record_partial.macro_impact_score >= record_none.macro_impact_score - 1e-9, (
|
|
f"Partial overlap ({record_partial.macro_impact_score}) < "
|
|
f"no overlap ({record_none.macro_impact_score})"
|
|
)
|
|
assert record_full.macro_impact_score >= record_partial.macro_impact_score - 1e-9, (
|
|
f"Full overlap ({record_full.macro_impact_score}) < "
|
|
f"partial overlap ({record_partial.macro_impact_score})"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 9: Resilience modifier tier ordering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty9ResilienceModifierTierOrdering:
|
|
"""Feature: global-news-interpolation, Property 9: Resilience modifier tier ordering
|
|
|
|
For any positive raw impact score and an international event, applying
|
|
the resilience modifier with market_position_tier=global_leader SHALL
|
|
produce a final score less than or equal to multinational, which SHALL
|
|
be less than or equal to regional, which SHALL be less than or equal
|
|
to domestic.
|
|
|
|
Validates: Requirements 4.3
|
|
"""
|
|
|
|
@given(
|
|
raw_score=st.floats(min_value=0.01, max_value=1.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_tier_ordering_for_international_events(self, raw_score: float):
|
|
"""**Validates: Requirements 4.3**
|
|
|
|
global_leader <= multinational <= regional <= domestic for
|
|
international events with positive raw scores.
|
|
"""
|
|
tier_order = [
|
|
MarketPositionTier.GLOBAL_LEADER.value,
|
|
MarketPositionTier.MULTINATIONAL.value,
|
|
MarketPositionTier.REGIONAL.value,
|
|
MarketPositionTier.DOMESTIC.value,
|
|
]
|
|
|
|
scores = [
|
|
apply_resilience_modifier(raw_score, tier, event_is_international=True)
|
|
for tier in tier_order
|
|
]
|
|
|
|
for i in range(1, len(scores)):
|
|
assert scores[i] >= scores[i - 1] - 1e-9, (
|
|
f"Tier ordering violated: {tier_order[i-1]}={scores[i-1]:.6f} > "
|
|
f"{tier_order[i]}={scores[i]:.6f} (raw={raw_score:.6f})"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 10: Mixed direction for dual-effect events
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty10MixedDirectionDualEffectEvents:
|
|
"""Feature: global-news-interpolation, Property 10: Mixed direction for dual-effect events
|
|
|
|
For any GlobalEvent and ExposureProfile pair where the computation
|
|
identifies both positive and negative contributing factors, the resulting
|
|
impact_direction SHALL be 'mixed' and both positive and negative factors
|
|
SHALL be preserved separately in contributing_factors.
|
|
|
|
Validates: Requirements 4.6
|
|
"""
|
|
|
|
@given(
|
|
profile=_exposure_profile_schema_strategy(
|
|
min_regions=1, min_commodities=1,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_dual_effect_produces_mixed_direction(
|
|
self, profile: ExposureProfileSchema,
|
|
):
|
|
"""**Validates: Requirements 4.6**
|
|
|
|
An event with both positive and negative impact types that overlaps
|
|
the profile must produce direction='mixed' with both factor lists.
|
|
"""
|
|
# Pick one positive and one negative event type to guarantee both
|
|
positive_type = "demand_shift"
|
|
negative_type = "supply_disruption"
|
|
|
|
# Ensure the event overlaps the profile geographically
|
|
geo_keys = list(profile.geographic_revenue_mix.keys())
|
|
if not geo_keys:
|
|
return
|
|
|
|
event = GlobalEvent(
|
|
event_id=str(_uuid.uuid4()),
|
|
event_types=[positive_type, negative_type],
|
|
severity="moderate",
|
|
affected_regions=geo_keys[:2] if len(geo_keys) >= 2 else geo_keys,
|
|
affected_sectors=[],
|
|
affected_commodities=profile.key_input_commodities[:1] if profile.key_input_commodities else [],
|
|
summary="Dual-effect test event",
|
|
key_facts=[],
|
|
estimated_duration="short_term",
|
|
confidence=0.8,
|
|
source_document_id=str(_uuid.uuid4()),
|
|
)
|
|
|
|
record = compute_macro_impact(event, profile)
|
|
|
|
# Only check direction if there's actual overlap (non-zero score)
|
|
if record.macro_impact_score > 0.0:
|
|
assert record.impact_direction == "mixed", (
|
|
f"Expected direction='mixed' for dual-effect event, "
|
|
f"got '{record.impact_direction}'"
|
|
)
|
|
|
|
# Both positive and negative factors must be present
|
|
factors_str = " ".join(record.contributing_factors)
|
|
assert "positive_types:" in factors_str, (
|
|
f"Missing positive_types in contributing_factors: {record.contributing_factors}"
|
|
)
|
|
assert "negative_types:" in factors_str, (
|
|
f"Missing negative_types in contributing_factors: {record.contributing_factors}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Properties 11, 12, 13, 14
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from datetime import timezone
|
|
|
|
from services.aggregation.scoring import SignalWeight, WeightedSignal
|
|
from services.aggregation.worker import (
|
|
ImpactRow,
|
|
assemble_trend_summary,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared strategies for aggregation-level property tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_signal_weight(
|
|
combined: float,
|
|
recency: float = 0.9,
|
|
credibility: float = 0.8,
|
|
) -> SignalWeight:
|
|
"""Helper to build a SignalWeight with sensible defaults."""
|
|
return SignalWeight(
|
|
recency=recency,
|
|
credibility=credibility,
|
|
novelty_bonus=0.0,
|
|
confidence_gate=1.0,
|
|
market_ctx_multiplier=1.0,
|
|
combined=combined,
|
|
)
|
|
|
|
|
|
def _company_signal_strategy(
|
|
*,
|
|
sentiment: st.SearchStrategy[float] | None = None,
|
|
min_impact: float = 0.1,
|
|
) -> st.SearchStrategy[WeightedSignal]:
|
|
"""Generate a company-specific WeightedSignal."""
|
|
sent_st = sentiment if sentiment is not None else st.sampled_from([1.0, -1.0])
|
|
return st.builds(
|
|
WeightedSignal,
|
|
document_id=st.uuids().map(lambda u: f"company-doc-{u}"),
|
|
weight=st.builds(
|
|
_make_signal_weight,
|
|
combined=st.floats(min_value=0.3, max_value=1.0, allow_nan=False),
|
|
recency=st.floats(min_value=0.5, max_value=1.0, allow_nan=False),
|
|
credibility=st.floats(min_value=0.5, max_value=1.0, allow_nan=False),
|
|
),
|
|
sentiment_value=sent_st,
|
|
impact_score=st.floats(min_value=min_impact, max_value=1.0, allow_nan=False),
|
|
)
|
|
|
|
|
|
def _macro_signal_strategy(
|
|
*,
|
|
sentiment: st.SearchStrategy[float] | None = None,
|
|
doc_id_prefix: str = "macro-doc",
|
|
min_impact: float = 0.05,
|
|
) -> st.SearchStrategy[WeightedSignal]:
|
|
"""Generate a macro WeightedSignal."""
|
|
sent_st = sentiment if sentiment is not None else st.sampled_from([1.0, -1.0])
|
|
return st.builds(
|
|
WeightedSignal,
|
|
document_id=st.uuids().map(lambda u: f"{doc_id_prefix}-{u}"),
|
|
weight=st.builds(
|
|
_make_signal_weight,
|
|
combined=st.floats(min_value=0.2, max_value=1.0, allow_nan=False),
|
|
recency=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
|
|
credibility=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
|
|
),
|
|
sentiment_value=sent_st,
|
|
impact_score=st.floats(min_value=min_impact, max_value=0.5, allow_nan=False),
|
|
)
|
|
|
|
|
|
def _impact_row_from_signal(sig: WeightedSignal) -> ImpactRow:
|
|
"""Build a minimal ImpactRow matching a WeightedSignal's document_id."""
|
|
return ImpactRow(
|
|
document_id=sig.document_id,
|
|
confidence=sig.weight.credibility,
|
|
novelty_score=0.5,
|
|
source_credibility=sig.weight.credibility,
|
|
sentiment="positive" if sig.sentiment_value > 0 else "negative",
|
|
impact_score=sig.impact_score,
|
|
catalyst_type="macro",
|
|
key_facts=[],
|
|
risks=[],
|
|
published_at=datetime.now(tz=timezone.utc),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 11: Macro signals influence trend output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty11MacroSignalsInfluenceTrendOutput:
|
|
"""Feature: global-news-interpolation, Property 11: Macro signals influence trend output
|
|
|
|
For any company with both company-specific signals and non-zero macro
|
|
impact signals, the trend summary computed with macro signals included
|
|
SHALL differ from the trend summary computed with only company-specific
|
|
signals (in at least one of: trend_strength, confidence, or evidence
|
|
references).
|
|
|
|
Validates: Requirements 5.1
|
|
"""
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(sentiment=st.just(1.0)),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
macro_signals=st.lists(
|
|
_macro_signal_strategy(sentiment=st.just(-1.0), min_impact=0.1),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_macro_signals_change_trend_output(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
macro_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 5.1**
|
|
|
|
Adding macro signals to company-only signals must change at least
|
|
one of trend_strength, confidence, or evidence references.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
|
|
# Build ImpactRow stubs for company signals only
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
# Company-only trend
|
|
company_only = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=list(company_signals),
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
# Combined trend (company + macro)
|
|
combined_signals = list(company_signals) + list(macro_signals)
|
|
combined = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=combined_signals,
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
# At least one of these must differ
|
|
differs = (
|
|
company_only.trend_strength != combined.trend_strength
|
|
or company_only.confidence != combined.confidence
|
|
or company_only.top_supporting_evidence != combined.top_supporting_evidence
|
|
or company_only.top_opposing_evidence != combined.top_opposing_evidence
|
|
or company_only.contradiction_score != combined.contradiction_score
|
|
)
|
|
assert differs, (
|
|
f"Macro signals had no effect on trend output. "
|
|
f"Company-only: strength={company_only.trend_strength}, "
|
|
f"confidence={company_only.confidence}, "
|
|
f"contradiction={company_only.contradiction_score}. "
|
|
f"Combined: strength={combined.trend_strength}, "
|
|
f"confidence={combined.confidence}, "
|
|
f"contradiction={combined.contradiction_score}."
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 12: Macro-company contradiction detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty12MacroCompanyContradictionDetection:
|
|
"""Feature: global-news-interpolation, Property 12: Macro-company contradiction detection
|
|
|
|
For any set of signals where macro impact signals have a negative
|
|
direction and company-specific signals have a positive sentiment
|
|
(or vice versa), the resulting trend summary's contradiction_score
|
|
SHALL be greater than zero and disagreement_details SHALL contain
|
|
at least one entry.
|
|
|
|
Validates: Requirements 5.3
|
|
"""
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(sentiment=st.just(1.0), min_impact=0.2),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
macro_signals=st.lists(
|
|
_macro_signal_strategy(sentiment=st.just(-1.0), min_impact=0.1),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_opposing_macro_company_signals_produce_contradiction(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
macro_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 5.3**
|
|
|
|
When company signals are positive and macro signals are negative,
|
|
contradiction_score must be > 0 and disagreement_details non-empty.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
combined_signals = list(company_signals) + list(macro_signals)
|
|
summary = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=combined_signals,
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
assert summary.contradiction_score > 0.0, (
|
|
f"Expected contradiction_score > 0 for opposing signals, "
|
|
f"got {summary.contradiction_score}"
|
|
)
|
|
assert len(summary.disagreement_details) >= 1, (
|
|
f"Expected at least one disagreement_detail entry, "
|
|
f"got {len(summary.disagreement_details)}"
|
|
)
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(sentiment=st.just(-1.0), min_impact=0.2),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
macro_signals=st.lists(
|
|
_macro_signal_strategy(sentiment=st.just(1.0), min_impact=0.1),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_opposing_macro_positive_company_negative_contradiction(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
macro_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 5.3**
|
|
|
|
When company signals are negative and macro signals are positive,
|
|
contradiction_score must be > 0 and disagreement_details non-empty.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
combined_signals = list(company_signals) + list(macro_signals)
|
|
summary = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=combined_signals,
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
assert summary.contradiction_score > 0.0, (
|
|
f"Expected contradiction_score > 0 for opposing signals, "
|
|
f"got {summary.contradiction_score}"
|
|
)
|
|
assert len(summary.disagreement_details) >= 1, (
|
|
f"Expected at least one disagreement_detail entry, "
|
|
f"got {len(summary.disagreement_details)}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 13: Macro evidence traceability
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty13MacroEvidenceTraceability:
|
|
"""Feature: global-news-interpolation, Property 13: Macro evidence traceability
|
|
|
|
For any trend summary that includes macro signal contributions, the
|
|
top_supporting_evidence or top_opposing_evidence lists SHALL contain
|
|
the source_document_id of at least one contributing GlobalEvent.
|
|
|
|
Validates: Requirements 5.4
|
|
"""
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(sentiment=st.just(1.0)),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
macro_signals=st.lists(
|
|
_macro_signal_strategy(sentiment=st.sampled_from([1.0, -1.0]), min_impact=0.1),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_macro_document_ids_appear_in_evidence(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
macro_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 5.4**
|
|
|
|
At least one macro signal's document_id must appear in either
|
|
top_supporting_evidence or top_opposing_evidence.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
combined_signals = list(company_signals) + list(macro_signals)
|
|
summary = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=combined_signals,
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
macro_doc_ids = {s.document_id for s in macro_signals}
|
|
all_evidence = set(summary.top_supporting_evidence) | set(summary.top_opposing_evidence)
|
|
|
|
found = macro_doc_ids & all_evidence
|
|
assert len(found) >= 1, (
|
|
f"No macro document_id found in evidence lists. "
|
|
f"Macro IDs: {macro_doc_ids}, "
|
|
f"Supporting: {summary.top_supporting_evidence}, "
|
|
f"Opposing: {summary.top_opposing_evidence}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 14: No degradation without macro data and disabled-layer equivalence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty14NoDegradationWithoutMacroData:
|
|
"""Feature: global-news-interpolation, Property 14: No degradation without macro data and disabled-layer equivalence
|
|
|
|
For any company with no macro impact records in the aggregation window,
|
|
the trend summary produced with the macro layer enabled SHALL be
|
|
identical to the trend summary produced with the macro layer disabled.
|
|
Furthermore, for any aggregation run with the macro layer disabled,
|
|
the output SHALL be identical to company-only aggregation regardless
|
|
of existing macro data.
|
|
|
|
Validates: Requirements 5.5, 11.2
|
|
"""
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_no_macro_data_produces_identical_output(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 5.5**
|
|
|
|
With no macro signals, the trend summary must be identical
|
|
regardless of whether the macro layer is conceptually enabled
|
|
or disabled — both paths receive the same company-only signals.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
# "Macro enabled" path — but no macro signals exist
|
|
summary_enabled = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=list(company_signals),
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
# "Macro disabled" path — same company-only signals
|
|
summary_disabled = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=list(company_signals),
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
assert summary_enabled.trend_direction == summary_disabled.trend_direction
|
|
assert summary_enabled.trend_strength == summary_disabled.trend_strength
|
|
assert summary_enabled.confidence == summary_disabled.confidence
|
|
assert summary_enabled.contradiction_score == summary_disabled.contradiction_score
|
|
assert summary_enabled.top_supporting_evidence == summary_disabled.top_supporting_evidence
|
|
assert summary_enabled.top_opposing_evidence == summary_disabled.top_opposing_evidence
|
|
assert summary_enabled.dominant_catalysts == summary_disabled.dominant_catalysts
|
|
assert summary_enabled.material_risks == summary_disabled.material_risks
|
|
|
|
@given(
|
|
company_signals=st.lists(
|
|
_company_signal_strategy(),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
macro_signals=st.lists(
|
|
_macro_signal_strategy(min_impact=0.1),
|
|
min_size=1,
|
|
max_size=3,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_disabled_layer_ignores_macro_signals(
|
|
self,
|
|
company_signals: list[WeightedSignal],
|
|
macro_signals: list[WeightedSignal],
|
|
):
|
|
"""**Validates: Requirements 11.2**
|
|
|
|
When the macro layer is disabled, the output must be identical
|
|
to company-only aggregation even if macro data exists. We simulate
|
|
"disabled" by not passing macro signals to assemble_trend_summary.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
impacts = [_impact_row_from_signal(s) for s in company_signals]
|
|
|
|
# Company-only (macro layer disabled — macro signals excluded)
|
|
summary_disabled = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=list(company_signals),
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
# Company-only baseline (no macro signals at all)
|
|
summary_baseline = assemble_trend_summary(
|
|
ticker="TEST",
|
|
window="7d",
|
|
signals=list(company_signals),
|
|
impacts=impacts,
|
|
reference_time=ref_time,
|
|
)
|
|
|
|
assert summary_disabled.trend_direction == summary_baseline.trend_direction
|
|
assert summary_disabled.trend_strength == summary_baseline.trend_strength
|
|
assert summary_disabled.confidence == summary_baseline.confidence
|
|
assert summary_disabled.contradiction_score == summary_baseline.contradiction_score
|
|
assert summary_disabled.top_supporting_evidence == summary_baseline.top_supporting_evidence
|
|
assert summary_disabled.top_opposing_evidence == summary_baseline.top_opposing_evidence
|
|
assert summary_disabled.dominant_catalysts == summary_baseline.dominant_catalysts
|
|
assert summary_disabled.material_risks == summary_baseline.material_risks
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Property 15
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.aggregation.rollups import (
|
|
SECTOR_CONCENTRATION_THRESHOLD,
|
|
CompanyTrendRow,
|
|
SectorMacroImpact,
|
|
compute_sector_macro_concentration,
|
|
rollup_trends,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis strategies for rollup property tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ROLLUP_SECTORS = ["Technology", "Energy", "Healthcare", "Financials", "Industrials"]
|
|
|
|
|
|
def _company_trend_row_strategy(
|
|
*,
|
|
sector: str | None = None,
|
|
direction: str | None = None,
|
|
) -> st.SearchStrategy[CompanyTrendRow]:
|
|
"""Generate a random CompanyTrendRow for rollup tests."""
|
|
return st.builds(
|
|
CompanyTrendRow,
|
|
entity_id=st.from_regex(r"[A-Z]{2,5}", fullmatch=True),
|
|
sector=st.just(sector) if sector else st.sampled_from(_ROLLUP_SECTORS),
|
|
window=st.just("7d"),
|
|
trend_direction=st.just(direction) if direction else st.sampled_from(
|
|
["bullish", "bearish", "neutral", "mixed"]
|
|
),
|
|
trend_strength=st.floats(min_value=0.1, max_value=0.8, allow_nan=False),
|
|
confidence=st.floats(min_value=0.3, max_value=0.8, allow_nan=False),
|
|
contradiction_score=st.floats(min_value=0.0, max_value=0.5, allow_nan=False),
|
|
dominant_catalysts=st.just([]),
|
|
material_risks=st.just([]),
|
|
top_supporting_evidence=st.just([]),
|
|
top_opposing_evidence=st.just([]),
|
|
)
|
|
|
|
|
|
def _sector_macro_impact_strategy(
|
|
*,
|
|
sector: str | None = None,
|
|
min_total: float = 0.1,
|
|
max_total: float = 5.0,
|
|
) -> st.SearchStrategy[SectorMacroImpact]:
|
|
"""Generate a random SectorMacroImpact."""
|
|
return st.builds(
|
|
SectorMacroImpact,
|
|
sector=st.just(sector) if sector else st.sampled_from(_ROLLUP_SECTORS),
|
|
total_impact=st.floats(min_value=min_total, max_value=max_total, allow_nan=False),
|
|
avg_impact=st.floats(min_value=0.05, max_value=1.0, allow_nan=False),
|
|
company_count=st.integers(min_value=1, max_value=20),
|
|
net_direction=st.floats(min_value=-1.0, max_value=1.0, allow_nan=False),
|
|
event_ids=st.lists(st.uuids().map(str), min_size=1, max_size=3),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 15: Sector and market rollup macro incorporation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty15SectorAndMarketRollupMacroIncorporation:
|
|
"""Feature: global-news-interpolation, Property 15: Sector and market rollup macro incorporation
|
|
|
|
For any sector containing companies with non-zero macro impact scores,
|
|
the sector-level rollup SHALL reflect those macro signals in its
|
|
trend_strength or confidence. Furthermore, for any GlobalEvent that
|
|
disproportionately affects a single sector (>60% of total macro impact
|
|
concentrated in one sector), that sector SHALL appear in the market-level
|
|
rollup's material_risks or dominant_catalysts.
|
|
|
|
Validates: Requirements 6.1, 6.2, 6.3
|
|
"""
|
|
|
|
@given(
|
|
trends=st.lists(
|
|
_company_trend_row_strategy(sector="Technology"),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
macro_impact=_sector_macro_impact_strategy(sector="Technology", min_total=0.5),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_sector_rollup_reflects_macro_signals(
|
|
self,
|
|
trends: list[CompanyTrendRow],
|
|
macro_impact: SectorMacroImpact,
|
|
):
|
|
"""**Validates: Requirements 6.1**
|
|
|
|
A sector rollup with macro data must differ from one without
|
|
in trend_strength or confidence.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
sector = "Technology"
|
|
|
|
# Rollup without macro impacts
|
|
summary_without = rollup_trends(
|
|
trends=trends,
|
|
entity_type="sector",
|
|
entity_id=sector,
|
|
window="7d",
|
|
reference_time=ref_time,
|
|
macro_impacts=None,
|
|
)
|
|
|
|
# Rollup with macro impacts
|
|
macro_impacts = {sector: macro_impact}
|
|
summary_with = rollup_trends(
|
|
trends=trends,
|
|
entity_type="sector",
|
|
entity_id=sector,
|
|
window="7d",
|
|
reference_time=ref_time,
|
|
macro_impacts=macro_impacts,
|
|
)
|
|
|
|
# At least one of strength or confidence must differ
|
|
differs = (
|
|
summary_with.trend_strength != summary_without.trend_strength
|
|
or summary_with.confidence != summary_without.confidence
|
|
)
|
|
assert differs, (
|
|
f"Sector rollup with macro data is identical to without. "
|
|
f"Without: strength={summary_without.trend_strength}, "
|
|
f"confidence={summary_without.confidence}. "
|
|
f"With: strength={summary_with.trend_strength}, "
|
|
f"confidence={summary_with.confidence}."
|
|
)
|
|
|
|
@given(data=st.data())
|
|
@settings(max_examples=100)
|
|
def test_concentrated_sector_appears_in_market_rollup(self, data: st.DataObject):
|
|
"""**Validates: Requirements 6.2, 6.3**
|
|
|
|
When one sector has >60% of total macro impact, that sector must
|
|
appear in the market-level rollup's material_risks or
|
|
dominant_catalysts.
|
|
"""
|
|
ref_time = datetime.now(tz=timezone.utc)
|
|
|
|
# Pick a dominant sector and generate trends across multiple sectors
|
|
dominant_sector = data.draw(st.sampled_from(_ROLLUP_SECTORS))
|
|
other_sectors = [s for s in _ROLLUP_SECTORS if s != dominant_sector]
|
|
|
|
# Generate at least one trend per sector so rollup has data
|
|
all_trends: list[CompanyTrendRow] = []
|
|
dominant_trends = data.draw(st.lists(
|
|
_company_trend_row_strategy(sector=dominant_sector),
|
|
min_size=1,
|
|
max_size=3,
|
|
))
|
|
all_trends.extend(dominant_trends)
|
|
|
|
for sec in other_sectors[:2]:
|
|
sec_trends = data.draw(st.lists(
|
|
_company_trend_row_strategy(sector=sec),
|
|
min_size=1,
|
|
max_size=2,
|
|
))
|
|
all_trends.extend(sec_trends)
|
|
|
|
# Build macro impacts where dominant sector has >60% of total
|
|
# Give dominant sector a large impact, others small
|
|
dominant_total = data.draw(
|
|
st.floats(min_value=3.0, max_value=10.0, allow_nan=False)
|
|
)
|
|
# Other sectors share the remaining <40%
|
|
# max other total = dominant_total * (0.39 / 0.61) to ensure >60%
|
|
max_other_per_sector = dominant_total * 0.15
|
|
macro_impacts: dict[str, SectorMacroImpact] = {}
|
|
macro_impacts[dominant_sector] = SectorMacroImpact(
|
|
sector=dominant_sector,
|
|
total_impact=dominant_total,
|
|
avg_impact=dominant_total / max(len(dominant_trends), 1),
|
|
company_count=len(dominant_trends),
|
|
net_direction=data.draw(
|
|
st.floats(min_value=-1.0, max_value=1.0, allow_nan=False)
|
|
),
|
|
event_ids=["evt-1"],
|
|
)
|
|
|
|
for sec in other_sectors[:2]:
|
|
other_total = data.draw(
|
|
st.floats(min_value=0.01, max_value=max(max_other_per_sector, 0.02),
|
|
allow_nan=False)
|
|
)
|
|
macro_impacts[sec] = SectorMacroImpact(
|
|
sector=sec,
|
|
total_impact=other_total,
|
|
avg_impact=other_total,
|
|
company_count=1,
|
|
net_direction=0.0,
|
|
event_ids=["evt-2"],
|
|
)
|
|
|
|
# Verify concentration is indeed >60%
|
|
concentration = compute_sector_macro_concentration(macro_impacts)
|
|
dominant_fraction = next(
|
|
(frac for sec, frac in concentration if sec == dominant_sector), 0.0
|
|
)
|
|
# If our generation didn't produce >60%, skip (shouldn't happen with our constraints)
|
|
if dominant_fraction <= SECTOR_CONCENTRATION_THRESHOLD:
|
|
return
|
|
|
|
# Compute market-level rollup
|
|
summary = rollup_trends(
|
|
trends=all_trends,
|
|
entity_type="market",
|
|
entity_id="all",
|
|
window="7d",
|
|
reference_time=ref_time,
|
|
macro_impacts=macro_impacts,
|
|
)
|
|
|
|
# The dominant sector must appear in material_risks or dominant_catalysts
|
|
all_labels = summary.material_risks + summary.dominant_catalysts
|
|
found = any(dominant_sector in label for label in all_labels)
|
|
assert found, (
|
|
f"Dominant sector '{dominant_sector}' (fraction={dominant_fraction:.2%}) "
|
|
f"not found in material_risks or dominant_catalysts. "
|
|
f"material_risks={summary.material_risks}, "
|
|
f"dominant_catalysts={summary.dominant_catalysts}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Properties 20, 21, 22, 23
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.aggregation.projection import (
|
|
DEFAULT_CONFIDENCE_THRESHOLD,
|
|
MacroEventInfo,
|
|
TrendProjection,
|
|
compute_projection,
|
|
)
|
|
from services.shared.schemas import TrendDirection, TrendSummary, TrendWindow
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis strategies for projection property tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_VALID_TREND_DIRECTIONS = [d for d in TrendDirection]
|
|
_VALID_TREND_WINDOWS = [w for w in TrendWindow]
|
|
_VALID_ESTIMATED_DURATIONS = ["short_term", "medium_term", "long_term"]
|
|
_VALID_MACRO_DIRECTIONS = ["positive", "negative", "mixed", "neutral"]
|
|
_VALID_SEVERITIES_PROJ = ["low", "moderate", "high", "critical"]
|
|
|
|
|
|
def _trend_summary_strategy(
|
|
*,
|
|
direction: st.SearchStrategy[TrendDirection] | None = None,
|
|
min_strength: float = 0.1,
|
|
max_strength: float = 1.0,
|
|
min_confidence: float = 0.1,
|
|
max_confidence: float = 1.0,
|
|
) -> st.SearchStrategy[TrendSummary]:
|
|
"""Generate random valid TrendSummary instances for projection tests."""
|
|
return st.builds(
|
|
TrendSummary,
|
|
entity_type=st.just("company"),
|
|
entity_id=st.from_regex(r"[A-Z]{2,5}", fullmatch=True),
|
|
window=st.sampled_from(_VALID_TREND_WINDOWS),
|
|
trend_direction=direction or st.sampled_from(_VALID_TREND_DIRECTIONS),
|
|
trend_strength=st.floats(
|
|
min_value=min_strength, max_value=max_strength, allow_nan=False,
|
|
),
|
|
confidence=st.floats(
|
|
min_value=min_confidence, max_value=max_confidence, allow_nan=False,
|
|
),
|
|
top_supporting_evidence=st.just([]),
|
|
top_opposing_evidence=st.just([]),
|
|
dominant_catalysts=st.just([]),
|
|
material_risks=st.just([]),
|
|
contradiction_score=st.just(0.0),
|
|
disagreement_details=st.just([]),
|
|
)
|
|
|
|
|
|
def _macro_event_info_strategy(
|
|
*,
|
|
min_score: float = 0.1,
|
|
max_score: float = 1.0,
|
|
) -> st.SearchStrategy[MacroEventInfo]:
|
|
"""Generate random valid MacroEventInfo instances for projection tests."""
|
|
return st.builds(
|
|
MacroEventInfo,
|
|
event_id=st.uuids().map(str),
|
|
macro_impact_score=st.floats(
|
|
min_value=min_score, max_value=max_score, allow_nan=False,
|
|
),
|
|
impact_direction=st.sampled_from(_VALID_MACRO_DIRECTIONS),
|
|
confidence=st.floats(min_value=0.1, max_value=1.0, allow_nan=False),
|
|
estimated_duration=st.sampled_from(_VALID_ESTIMATED_DURATIONS),
|
|
severity=st.sampled_from(_VALID_SEVERITIES_PROJ),
|
|
event_age_hours=st.floats(min_value=0.0, max_value=720.0, allow_nan=False),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 20: Trend projection always produced
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty20TrendProjectionAlwaysProduced:
|
|
"""Feature: global-news-interpolation, Property 20: Trend projection always produced
|
|
|
|
For any trend summary produced by the Aggregation_Engine, a corresponding
|
|
TrendProjection SHALL also be produced with valid projected_direction,
|
|
projected_strength in [0, 1], projected_confidence in [0, 1], and a
|
|
non-empty driving_factors list.
|
|
|
|
Validates: Requirements 12.1
|
|
"""
|
|
|
|
@given(
|
|
summary=_trend_summary_strategy(),
|
|
macro_events=st.lists(
|
|
_macro_event_info_strategy(),
|
|
min_size=0,
|
|
max_size=5,
|
|
),
|
|
macro_enabled=st.booleans(),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_projection_always_produced_with_valid_fields(
|
|
self,
|
|
summary: TrendSummary,
|
|
macro_events: list[MacroEventInfo],
|
|
macro_enabled: bool,
|
|
):
|
|
"""**Validates: Requirements 12.1**
|
|
|
|
compute_projection must always return a TrendProjection with valid
|
|
projected_direction, projected_strength in [0, 1],
|
|
projected_confidence in [0, 1], and non-empty driving_factors.
|
|
"""
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_events if macro_events else None,
|
|
macro_enabled=macro_enabled,
|
|
)
|
|
|
|
# Must be a TrendProjection instance
|
|
assert isinstance(projection, TrendProjection)
|
|
|
|
# projected_direction must be a valid direction
|
|
assert projection.projected_direction in {"bullish", "bearish", "mixed", "neutral"}, (
|
|
f"Invalid projected_direction: {projection.projected_direction}"
|
|
)
|
|
|
|
# projected_strength in [0, 1]
|
|
assert 0.0 <= projection.projected_strength <= 1.0, (
|
|
f"projected_strength {projection.projected_strength} out of bounds [0, 1]"
|
|
)
|
|
|
|
# projected_confidence in [0, 1]
|
|
assert 0.0 <= projection.projected_confidence <= 1.0, (
|
|
f"projected_confidence {projection.projected_confidence} out of bounds [0, 1]"
|
|
)
|
|
|
|
# driving_factors must be non-empty
|
|
assert len(projection.driving_factors) >= 1, (
|
|
"driving_factors is empty; must contain at least one entry"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 21: Projection divergence flagging
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty21ProjectionDivergenceFlagging:
|
|
"""Feature: global-news-interpolation, Property 21: Projection divergence flagging
|
|
|
|
For any TrendProjection where projected_direction differs from the
|
|
current trend summary's trend_direction, the diverges_from_current
|
|
field SHALL be True and driving_factors SHALL contain at least one
|
|
entry explaining the divergence.
|
|
|
|
Validates: Requirements 12.3
|
|
"""
|
|
|
|
@given(
|
|
summary=_trend_summary_strategy(),
|
|
macro_events=st.lists(
|
|
_macro_event_info_strategy(),
|
|
min_size=0,
|
|
max_size=5,
|
|
),
|
|
macro_enabled=st.booleans(),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_divergence_flagged_when_directions_differ(
|
|
self,
|
|
summary: TrendSummary,
|
|
macro_events: list[MacroEventInfo],
|
|
macro_enabled: bool,
|
|
):
|
|
"""**Validates: Requirements 12.3**
|
|
|
|
When projected_direction != current trend_direction,
|
|
diverges_from_current must be True and driving_factors must
|
|
contain at least one entry mentioning the divergence.
|
|
"""
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_events if macro_events else None,
|
|
macro_enabled=macro_enabled,
|
|
)
|
|
|
|
current_dir = summary.trend_direction.value
|
|
|
|
if projection.projected_direction != current_dir:
|
|
assert projection.diverges_from_current is True, (
|
|
f"diverges_from_current should be True when "
|
|
f"projected={projection.projected_direction} != "
|
|
f"current={current_dir}"
|
|
)
|
|
# At least one driving factor must mention the divergence
|
|
divergence_mentioned = any(
|
|
"DIVERGENCE" in f or "diverge" in f.lower()
|
|
for f in projection.driving_factors
|
|
)
|
|
assert divergence_mentioned, (
|
|
f"No divergence explanation in driving_factors when "
|
|
f"projected={projection.projected_direction} != "
|
|
f"current={current_dir}. "
|
|
f"driving_factors={projection.driving_factors}"
|
|
)
|
|
else:
|
|
# When directions match, diverges_from_current must be False
|
|
assert projection.diverges_from_current is False, (
|
|
f"diverges_from_current should be False when "
|
|
f"projected={projection.projected_direction} == "
|
|
f"current={current_dir}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 22: Macro-disabled projections have reduced confidence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty22MacroDisabledProjectionsReducedConfidence:
|
|
"""Feature: global-news-interpolation, Property 22: Macro-disabled projections have reduced confidence
|
|
|
|
For any identical set of company signals and macro signals, the
|
|
TrendProjection computed with the macro layer disabled SHALL have
|
|
projected_confidence less than or equal to the projection computed
|
|
with the macro layer enabled.
|
|
|
|
Validates: Requirements 12.4
|
|
"""
|
|
|
|
@given(
|
|
summary=_trend_summary_strategy(min_confidence=0.2),
|
|
macro_events=st.lists(
|
|
_macro_event_info_strategy(min_score=0.1),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_disabled_macro_has_lower_or_equal_confidence(
|
|
self,
|
|
summary: TrendSummary,
|
|
macro_events: list[MacroEventInfo],
|
|
):
|
|
"""**Validates: Requirements 12.4**
|
|
|
|
With macro layer disabled, projected_confidence must be <=
|
|
the confidence computed with macro layer enabled.
|
|
"""
|
|
projection_enabled = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_events,
|
|
macro_enabled=True,
|
|
)
|
|
|
|
projection_disabled = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_events,
|
|
macro_enabled=False,
|
|
)
|
|
|
|
assert projection_disabled.projected_confidence <= projection_enabled.projected_confidence + 1e-9, (
|
|
f"Disabled confidence ({projection_disabled.projected_confidence}) > "
|
|
f"enabled confidence ({projection_enabled.projected_confidence}) "
|
|
f"for summary confidence={summary.confidence}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 23: Low-confidence projection exclusion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty23LowConfidenceProjectionExclusion:
|
|
"""Feature: global-news-interpolation, Property 23: Low-confidence projection exclusion
|
|
|
|
For any TrendProjection with projected_confidence below the configurable
|
|
threshold (default 0.3), the projection SHALL be marked as low_confidence
|
|
and SHALL NOT influence recommendation eligibility.
|
|
|
|
Validates: Requirements 12.9
|
|
"""
|
|
|
|
@given(
|
|
summary=_trend_summary_strategy(),
|
|
macro_events=st.lists(
|
|
_macro_event_info_strategy(),
|
|
min_size=0,
|
|
max_size=5,
|
|
),
|
|
macro_enabled=st.booleans(),
|
|
threshold=st.floats(min_value=0.1, max_value=0.9, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_low_confidence_projection_marked_correctly(
|
|
self,
|
|
summary: TrendSummary,
|
|
macro_events: list[MacroEventInfo],
|
|
macro_enabled: bool,
|
|
threshold: float,
|
|
):
|
|
"""**Validates: Requirements 12.9**
|
|
|
|
When projected_confidence < threshold, low_confidence must be True.
|
|
When projected_confidence >= threshold, low_confidence must be False.
|
|
"""
|
|
projection = compute_projection(
|
|
summary=summary,
|
|
macro_events=macro_events if macro_events else None,
|
|
macro_enabled=macro_enabled,
|
|
confidence_threshold=threshold,
|
|
)
|
|
|
|
if projection.projected_confidence < threshold:
|
|
assert projection.low_confidence is True, (
|
|
f"low_confidence should be True when "
|
|
f"projected_confidence={projection.projected_confidence} < "
|
|
f"threshold={threshold}"
|
|
)
|
|
else:
|
|
assert projection.low_confidence is False, (
|
|
f"low_confidence should be False when "
|
|
f"projected_confidence={projection.projected_confidence} >= "
|
|
f"threshold={threshold}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Imports for Properties 16, 17, 18, 19
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.aggregation.interpolation import (
|
|
DEFAULT_CONFIDENCE_THRESHOLD,
|
|
apply_accelerated_decay,
|
|
compute_standard_recency_decay,
|
|
filter_low_confidence_events,
|
|
)
|
|
from services.extractor.exposure_inference import infer_exposure_profile
|
|
from services.recommendation.suppression import (
|
|
evaluate_macro_only_suppression,
|
|
)
|
|
from services.shared.schemas import (
|
|
CatalystType,
|
|
CompanyImpact,
|
|
DocumentIntelligence,
|
|
DocumentType,
|
|
)
|
|
from services.shared.schemas import (
|
|
Sentiment as SentimentEnum,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis strategies for exposure inference tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_FILING_TYPES = ["filing", "transcript"]
|
|
_INFERENCE_SECTORS = [
|
|
"Information Technology", "Energy", "Materials", "Industrials",
|
|
"Health Care", "Financials", "Consumer Discretionary",
|
|
]
|
|
_INFERENCE_INDUSTRIES = ["Software", "Oil & Gas", "Banking", "Machinery", "Pharma"]
|
|
_INFERENCE_CAP_BUCKETS = ["large_cap", "mid_cap", "small_cap", "micro_cap"]
|
|
|
|
# Region and commodity text fragments for generating filing content
|
|
_GEO_FRAGMENTS = [
|
|
"United States", "China", "Japan", "Germany", "United Kingdom",
|
|
"India", "Brazil", "South Korea", "Canada", "Australia",
|
|
"Europe", "Asia Pacific", "Latin America",
|
|
]
|
|
_COMMODITY_FRAGMENTS = [
|
|
"crude oil", "natural gas", "copper", "steel", "lithium",
|
|
"semiconductor", "wheat", "corn", "gold", "aluminum",
|
|
]
|
|
|
|
|
|
def _filing_intelligence_strategy(
|
|
*,
|
|
min_geo_fragments: int = 1,
|
|
min_commodity_fragments: int = 0,
|
|
) -> st.SearchStrategy[DocumentIntelligence]:
|
|
"""Generate a DocumentIntelligence with filing-type and geographic/commodity content."""
|
|
return st.builds(
|
|
lambda doc_type, geo_frags, commodity_frags, extra_facts: DocumentIntelligence(
|
|
document_type=DocumentType(doc_type),
|
|
summary=" ".join(
|
|
[f"Revenue from {g} grew significantly." for g in geo_frags]
|
|
+ [f"{c} prices impacted margins." for c in commodity_frags]
|
|
),
|
|
companies=[
|
|
CompanyImpact(
|
|
ticker="TEST",
|
|
company_name="Test Corp",
|
|
relevance=0.8,
|
|
sentiment=SentimentEnum.NEUTRAL,
|
|
impact_score=0.5,
|
|
impact_horizon="medium_term",
|
|
catalyst_type=CatalystType.EARNINGS,
|
|
key_facts=extra_facts,
|
|
)
|
|
] if extra_facts else [],
|
|
macro_themes=[],
|
|
confidence=0.7,
|
|
),
|
|
doc_type=st.sampled_from(_FILING_TYPES),
|
|
geo_frags=st.lists(
|
|
st.sampled_from(_GEO_FRAGMENTS),
|
|
min_size=min_geo_fragments,
|
|
max_size=5,
|
|
),
|
|
commodity_frags=st.lists(
|
|
st.sampled_from(_COMMODITY_FRAGMENTS),
|
|
min_size=min_commodity_fragments,
|
|
max_size=3,
|
|
),
|
|
extra_facts=st.lists(
|
|
st.sampled_from([
|
|
f"Operations in {g}" for g in _GEO_FRAGMENTS
|
|
] + [
|
|
f"{c} supply chain disruption" for c in _COMMODITY_FRAGMENTS
|
|
]),
|
|
min_size=0,
|
|
max_size=3,
|
|
),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 16: Inferred exposure profile correctness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty16InferredExposureProfileCorrectness:
|
|
"""Feature: global-news-interpolation, Property 16: Inferred exposure profile correctness
|
|
|
|
For any set of filing extractions containing geographic revenue breakdowns
|
|
or commodity references, the inferred ExposureProfile SHALL have
|
|
source='inferred', confidence in [0, 1], and geographic_revenue_mix
|
|
entries that correspond to regions mentioned in the filings.
|
|
|
|
Validates: Requirements 9.1, 9.2
|
|
"""
|
|
|
|
@given(
|
|
filings=st.lists(
|
|
_filing_intelligence_strategy(min_geo_fragments=1),
|
|
min_size=1,
|
|
max_size=5,
|
|
),
|
|
sector=st.sampled_from(_INFERENCE_SECTORS),
|
|
industry=st.sampled_from(_INFERENCE_INDUSTRIES),
|
|
cap_bucket=st.sampled_from(_INFERENCE_CAP_BUCKETS),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_inferred_profile_source_and_confidence(
|
|
self,
|
|
filings: list[DocumentIntelligence],
|
|
sector: str,
|
|
industry: str,
|
|
cap_bucket: str,
|
|
):
|
|
"""**Validates: Requirements 9.1, 9.2**
|
|
|
|
The inferred profile must have source='inferred' and confidence in [0, 1].
|
|
Geographic revenue mix entries must correspond to regions mentioned
|
|
in the filings.
|
|
"""
|
|
profile = infer_exposure_profile(filings, sector, industry, cap_bucket)
|
|
|
|
# source must be 'inferred'
|
|
assert profile.source == "inferred", (
|
|
f"Expected source='inferred', got '{profile.source}'"
|
|
)
|
|
|
|
# confidence must be in [0, 1]
|
|
assert 0.0 <= profile.confidence <= 1.0, (
|
|
f"Confidence {profile.confidence} out of bounds [0, 1]"
|
|
)
|
|
|
|
# geographic_revenue_mix must be non-empty (filings have geo fragments)
|
|
assert len(profile.geographic_revenue_mix) > 0, (
|
|
"Expected non-empty geographic_revenue_mix for filings with geo data"
|
|
)
|
|
|
|
# Revenue mix values must sum to approximately 1.0
|
|
mix_total = sum(profile.geographic_revenue_mix.values())
|
|
assert abs(mix_total - 1.0) < 0.02, (
|
|
f"Revenue mix sums to {mix_total}, expected ~1.0"
|
|
)
|
|
|
|
# All revenue mix values must be in (0, 1]
|
|
for region, pct in profile.geographic_revenue_mix.items():
|
|
assert 0.0 < pct <= 1.0, (
|
|
f"Revenue mix for {region}={pct} out of bounds (0, 1]"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 17: Low-confidence event exclusion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty17LowConfidenceEventExclusion:
|
|
"""Feature: global-news-interpolation, Property 17: Low-confidence event exclusion
|
|
|
|
For any GlobalEvent classification with confidence below the configurable
|
|
threshold (default 0.4), the Interpolation_Engine SHALL produce zero
|
|
MacroImpactRecords for that event.
|
|
|
|
Validates: Requirements 10.1
|
|
"""
|
|
|
|
@given(
|
|
low_conf=st.floats(min_value=0.0, max_value=0.39, allow_nan=False),
|
|
high_conf=st.floats(min_value=0.4, max_value=1.0, allow_nan=False),
|
|
threshold=st.just(DEFAULT_CONFIDENCE_THRESHOLD),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_low_confidence_events_excluded(
|
|
self,
|
|
low_conf: float,
|
|
high_conf: float,
|
|
threshold: float,
|
|
):
|
|
"""**Validates: Requirements 10.1**
|
|
|
|
Events with confidence below threshold must be excluded.
|
|
Events at or above threshold must be included.
|
|
"""
|
|
low_event = GlobalEvent(
|
|
event_id="low-conf",
|
|
event_types=["supply_disruption"],
|
|
severity="moderate",
|
|
confidence=low_conf,
|
|
)
|
|
high_event = GlobalEvent(
|
|
event_id="high-conf",
|
|
event_types=["supply_disruption"],
|
|
severity="moderate",
|
|
confidence=high_conf,
|
|
)
|
|
|
|
result = filter_low_confidence_events(
|
|
[low_event, high_event],
|
|
confidence_threshold=threshold,
|
|
)
|
|
|
|
# Low confidence event must be excluded
|
|
result_ids = [e.event_id for e in result]
|
|
assert "low-conf" not in result_ids, (
|
|
f"Low-confidence event (conf={low_conf}) should be excluded "
|
|
f"with threshold={threshold}"
|
|
)
|
|
|
|
# High confidence event must be included
|
|
assert "high-conf" in result_ids, (
|
|
f"High-confidence event (conf={high_conf}) should be included "
|
|
f"with threshold={threshold}"
|
|
)
|
|
|
|
@given(
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
threshold=st.floats(min_value=0.01, max_value=0.99, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_threshold_boundary(
|
|
self,
|
|
confidence: float,
|
|
threshold: float,
|
|
):
|
|
"""**Validates: Requirements 10.1**
|
|
|
|
Events exactly at or above threshold are included; below are excluded.
|
|
"""
|
|
event = GlobalEvent(
|
|
event_id="test-event",
|
|
event_types=["supply_disruption"],
|
|
severity="moderate",
|
|
confidence=confidence,
|
|
)
|
|
|
|
result = filter_low_confidence_events([event], confidence_threshold=threshold)
|
|
|
|
if confidence < threshold:
|
|
assert len(result) == 0, (
|
|
f"Event with confidence={confidence} should be excluded "
|
|
f"with threshold={threshold}"
|
|
)
|
|
else:
|
|
assert len(result) == 1, (
|
|
f"Event with confidence={confidence} should be included "
|
|
f"with threshold={threshold}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 18: Accelerated decay for stale short-term events
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty18AcceleratedDecayStaleShortTerm:
|
|
"""Feature: global-news-interpolation, Property 18: Accelerated decay for stale short-term events
|
|
|
|
For any GlobalEvent with estimated_duration='short_term' and age
|
|
exceeding 48 hours, the effective signal weight SHALL be strictly less
|
|
than the weight computed using standard recency decay for the same age.
|
|
|
|
Validates: Requirements 10.2
|
|
"""
|
|
|
|
@given(
|
|
age_hours=st.floats(min_value=48.01, max_value=720.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_accelerated_decay_strictly_less_than_standard(
|
|
self,
|
|
age_hours: float,
|
|
):
|
|
"""**Validates: Requirements 10.2**
|
|
|
|
For short_term events older than 48 hours, the effective weight
|
|
must be strictly less than standard recency decay.
|
|
"""
|
|
standard = compute_standard_recency_decay(age_hours)
|
|
accelerated = apply_accelerated_decay(age_hours, "short_term")
|
|
|
|
assert accelerated < standard, (
|
|
f"Accelerated decay ({accelerated}) should be strictly less than "
|
|
f"standard decay ({standard}) for age={age_hours}h"
|
|
)
|
|
|
|
@given(
|
|
age_hours=st.floats(min_value=48.01, max_value=720.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_accelerated_decay_positive(
|
|
self,
|
|
age_hours: float,
|
|
):
|
|
"""**Validates: Requirements 10.2**
|
|
|
|
Accelerated decay must still be positive (> 0).
|
|
"""
|
|
accelerated = apply_accelerated_decay(age_hours, "short_term")
|
|
assert accelerated > 0.0, (
|
|
f"Accelerated decay should be positive, got {accelerated} for age={age_hours}h"
|
|
)
|
|
|
|
@given(
|
|
age_hours=st.floats(min_value=0.0, max_value=48.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_no_acceleration_within_staleness_window(
|
|
self,
|
|
age_hours: float,
|
|
):
|
|
"""**Validates: Requirements 10.2**
|
|
|
|
Short-term events within 48 hours should get standard decay (no acceleration).
|
|
"""
|
|
standard = compute_standard_recency_decay(age_hours)
|
|
result = apply_accelerated_decay(age_hours, "short_term")
|
|
|
|
assert abs(result - standard) < 1e-12, (
|
|
f"Within staleness window, decay should equal standard: "
|
|
f"result={result}, standard={standard}, age={age_hours}h"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 19: Macro-only recommendation suppression
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestProperty19MacroOnlyRecommendationSuppression:
|
|
"""Feature: global-news-interpolation, Property 19: Macro-only recommendation suppression
|
|
|
|
For any trend summary where the trend direction is driven solely by
|
|
macro signals (no company-specific signals support the direction), the
|
|
resulting recommendation SHALL have mode='informational' and the thesis
|
|
SHALL contain a macro-only caveat.
|
|
|
|
Validates: Requirements 10.3
|
|
"""
|
|
|
|
@given(
|
|
macro_count=st.integers(min_value=1, max_value=20),
|
|
direction=st.sampled_from(_VALID_TREND_DIRECTIONS),
|
|
strength=st.floats(min_value=0.1, max_value=1.0, allow_nan=False),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_macro_only_triggers_suppression(
|
|
self,
|
|
macro_count: int,
|
|
direction: TrendDirection,
|
|
strength: float,
|
|
):
|
|
"""**Validates: Requirements 10.3**
|
|
|
|
When macro signals are the sole basis (company_signal_count=0),
|
|
evaluate_macro_only_suppression must return True.
|
|
"""
|
|
summary = TrendSummary(
|
|
entity_type="company",
|
|
entity_id="TEST",
|
|
trend_direction=direction,
|
|
trend_strength=strength,
|
|
confidence=0.5,
|
|
)
|
|
|
|
result = evaluate_macro_only_suppression(
|
|
summary,
|
|
macro_signal_count=macro_count,
|
|
company_signal_count=0,
|
|
)
|
|
|
|
assert result is True, (
|
|
f"Expected suppression for macro_count={macro_count}, "
|
|
f"company_count=0, direction={direction.value}"
|
|
)
|
|
|
|
@given(
|
|
macro_count=st.integers(min_value=1, max_value=20),
|
|
company_count=st.integers(min_value=1, max_value=20),
|
|
direction=st.sampled_from(_VALID_TREND_DIRECTIONS),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_mixed_signals_no_suppression(
|
|
self,
|
|
macro_count: int,
|
|
company_count: int,
|
|
direction: TrendDirection,
|
|
):
|
|
"""**Validates: Requirements 10.3**
|
|
|
|
When both macro and company signals are present,
|
|
evaluate_macro_only_suppression must return False.
|
|
"""
|
|
summary = TrendSummary(
|
|
entity_type="company",
|
|
entity_id="TEST",
|
|
trend_direction=direction,
|
|
trend_strength=0.5,
|
|
confidence=0.5,
|
|
)
|
|
|
|
result = evaluate_macro_only_suppression(
|
|
summary,
|
|
macro_signal_count=macro_count,
|
|
company_signal_count=company_count,
|
|
)
|
|
|
|
assert result is False, (
|
|
f"Expected no suppression for macro_count={macro_count}, "
|
|
f"company_count={company_count}"
|
|
)
|
|
|
|
@given(
|
|
company_count=st.integers(min_value=0, max_value=20),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_no_macro_signals_no_suppression(
|
|
self,
|
|
company_count: int,
|
|
):
|
|
"""**Validates: Requirements 10.3**
|
|
|
|
When there are no macro signals, suppression must not trigger.
|
|
"""
|
|
summary = TrendSummary(
|
|
entity_type="company",
|
|
entity_id="TEST",
|
|
trend_direction=TrendDirection.BULLISH,
|
|
trend_strength=0.5,
|
|
confidence=0.5,
|
|
)
|
|
|
|
result = evaluate_macro_only_suppression(
|
|
summary,
|
|
macro_signal_count=0,
|
|
company_signal_count=company_count,
|
|
)
|
|
|
|
assert result is False, (
|
|
"Expected no suppression when macro_count=0"
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 4: Macro data persistence round-trip
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.shared.schemas import (
|
|
ExposureProfileSchema as ExposureProfileSchemaImport,
|
|
)
|
|
from services.shared.schemas import (
|
|
GlobalEventSchema,
|
|
TrendDirection,
|
|
TrendProjectionSchema,
|
|
)
|
|
|
|
|
|
def _global_event_schema_strategy() -> st.SearchStrategy[GlobalEventSchema]:
|
|
"""Generate random valid GlobalEventSchema instances for round-trip testing."""
|
|
return st.builds(
|
|
GlobalEventSchema,
|
|
event_id=st.uuids().map(str),
|
|
event_types=st.lists(
|
|
st.sampled_from(_VALID_IMPACT_TYPES).map(ImpactType),
|
|
min_size=1,
|
|
max_size=4,
|
|
),
|
|
severity=st.sampled_from(list(SeverityLevel)),
|
|
affected_regions=st.lists(
|
|
st.sampled_from(_REGION_CODES), min_size=0, max_size=5, unique=True,
|
|
),
|
|
affected_sectors=st.lists(
|
|
st.sampled_from(_VALID_SECTORS + ["Energy", "Technology"]),
|
|
min_size=0, max_size=4, unique=True,
|
|
),
|
|
affected_commodities=st.lists(
|
|
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
|
|
),
|
|
summary=st.text(min_size=1, max_size=200),
|
|
key_facts=st.lists(st.text(min_size=1, max_size=80), min_size=0, max_size=5),
|
|
estimated_duration=st.sampled_from(list(EstimatedDuration)),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
source_document_id=st.uuids().map(str),
|
|
)
|
|
|
|
|
|
def _macro_impact_record_schema_strategy() -> st.SearchStrategy[MacroImpactRecordSchema]:
|
|
"""Generate random valid MacroImpactRecordSchema instances."""
|
|
return st.builds(
|
|
MacroImpactRecordSchema,
|
|
event_id=st.uuids().map(str),
|
|
company_id=st.uuids().map(str),
|
|
ticker=st.text(
|
|
alphabet=st.characters(whitelist_categories=("Lu",)),
|
|
min_size=1, max_size=5,
|
|
),
|
|
macro_impact_score=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
impact_direction=st.sampled_from(["positive", "negative", "mixed", "neutral"]),
|
|
contributing_factors=st.lists(st.text(min_size=1, max_size=50), min_size=0, max_size=5),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
)
|
|
|
|
|
|
def _exposure_profile_schema_roundtrip_strategy() -> st.SearchStrategy[ExposureProfileSchemaImport]:
|
|
"""Generate random valid ExposureProfileSchema instances for round-trip testing."""
|
|
return st.builds(
|
|
ExposureProfileSchemaImport,
|
|
company_id=st.uuids().map(str),
|
|
geographic_revenue_mix=_geo_revenue_mix(),
|
|
supply_chain_regions=st.lists(
|
|
st.sampled_from(_REGION_CODES), min_size=0, max_size=4, unique=True,
|
|
),
|
|
key_input_commodities=st.lists(
|
|
st.sampled_from(_COMMODITIES), min_size=0, max_size=3, unique=True,
|
|
),
|
|
regulatory_jurisdictions=st.lists(
|
|
st.sampled_from(_JURISDICTIONS), min_size=0, max_size=3, unique=True,
|
|
),
|
|
market_position_tier=st.sampled_from(list(MarketPositionTier)),
|
|
export_dependency_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
source=st.sampled_from(["manual", "inferred"]),
|
|
confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
version=st.integers(min_value=1, max_value=100),
|
|
active=st.booleans(),
|
|
)
|
|
|
|
|
|
def _trend_projection_schema_strategy() -> st.SearchStrategy[TrendProjectionSchema]:
|
|
"""Generate random valid TrendProjectionSchema instances."""
|
|
return st.builds(
|
|
TrendProjectionSchema,
|
|
trend_window_id=st.uuids().map(str),
|
|
projected_direction=st.sampled_from(list(TrendDirection)),
|
|
projected_strength=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
projected_confidence=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
projection_horizon=st.sampled_from(["1d", "7d", "30d"]),
|
|
driving_factors=st.lists(st.text(min_size=1, max_size=80), min_size=0, max_size=5),
|
|
macro_contribution_pct=st.floats(min_value=0.0, max_value=1.0, allow_nan=False),
|
|
diverges_from_current=st.booleans(),
|
|
)
|
|
|
|
|
|
class TestProperty4MacroDataPersistenceRoundTrip:
|
|
"""Feature: global-news-interpolation, Property 4: Macro data persistence round-trip
|
|
|
|
For any valid GlobalEvent, MacroImpactRecord, ExposureProfile, or
|
|
TrendProjection object, persisting it to PostgreSQL and reading it back
|
|
SHALL produce an equivalent object with all fields preserved.
|
|
|
|
Since we can't use a real DB in tests, we test the serialization/
|
|
deserialization round-trip of the Pydantic schemas (model_dump → model_validate).
|
|
|
|
Validates: Requirements 3.1, 7.1, 7.2, 12.5
|
|
"""
|
|
|
|
@given(event=_global_event_schema_strategy())
|
|
@settings(max_examples=100)
|
|
def test_global_event_schema_round_trip(self, event: GlobalEventSchema):
|
|
"""**Validates: Requirements 7.1**
|
|
|
|
Serializing and deserializing a GlobalEventSchema must preserve all fields.
|
|
"""
|
|
data = event.model_dump(mode="json")
|
|
restored = GlobalEventSchema.model_validate(data)
|
|
|
|
assert restored.event_id == event.event_id
|
|
assert restored.severity == event.severity
|
|
assert restored.affected_regions == event.affected_regions
|
|
assert restored.affected_sectors == event.affected_sectors
|
|
assert restored.affected_commodities == event.affected_commodities
|
|
assert restored.summary == event.summary
|
|
assert restored.key_facts == event.key_facts
|
|
assert restored.estimated_duration == event.estimated_duration
|
|
assert restored.confidence == event.confidence
|
|
assert restored.source_document_id == event.source_document_id
|
|
# event_types: compare values
|
|
assert [et.value if hasattr(et, "value") else et for et in restored.event_types] == \
|
|
[et.value if hasattr(et, "value") else et for et in event.event_types]
|
|
|
|
@given(record=_macro_impact_record_schema_strategy())
|
|
@settings(max_examples=100)
|
|
def test_macro_impact_record_schema_round_trip(self, record: MacroImpactRecordSchema):
|
|
"""**Validates: Requirements 7.2**
|
|
|
|
Serializing and deserializing a MacroImpactRecordSchema must preserve all fields.
|
|
"""
|
|
data = record.model_dump(mode="json")
|
|
restored = MacroImpactRecordSchema.model_validate(data)
|
|
|
|
assert restored.event_id == record.event_id
|
|
assert restored.company_id == record.company_id
|
|
assert restored.ticker == record.ticker
|
|
assert restored.macro_impact_score == record.macro_impact_score
|
|
assert restored.impact_direction == record.impact_direction
|
|
assert restored.contributing_factors == record.contributing_factors
|
|
assert restored.confidence == record.confidence
|
|
|
|
@given(profile=_exposure_profile_schema_roundtrip_strategy())
|
|
@settings(max_examples=100)
|
|
def test_exposure_profile_schema_round_trip(self, profile: ExposureProfileSchemaImport):
|
|
"""**Validates: Requirements 3.1**
|
|
|
|
Serializing and deserializing an ExposureProfileSchema must preserve all fields.
|
|
"""
|
|
data = profile.model_dump(mode="json")
|
|
restored = ExposureProfileSchemaImport.model_validate(data)
|
|
|
|
assert restored.company_id == profile.company_id
|
|
assert restored.geographic_revenue_mix == profile.geographic_revenue_mix
|
|
assert restored.supply_chain_regions == profile.supply_chain_regions
|
|
assert restored.key_input_commodities == profile.key_input_commodities
|
|
assert restored.regulatory_jurisdictions == profile.regulatory_jurisdictions
|
|
assert restored.market_position_tier == profile.market_position_tier
|
|
assert restored.export_dependency_pct == profile.export_dependency_pct
|
|
assert restored.source == profile.source
|
|
assert restored.confidence == profile.confidence
|
|
assert restored.version == profile.version
|
|
assert restored.active == profile.active
|
|
|
|
@given(projection=_trend_projection_schema_strategy())
|
|
@settings(max_examples=100)
|
|
def test_trend_projection_schema_round_trip(self, projection: TrendProjectionSchema):
|
|
"""**Validates: Requirements 12.5**
|
|
|
|
Serializing and deserializing a TrendProjectionSchema must preserve all fields.
|
|
"""
|
|
data = projection.model_dump(mode="json")
|
|
restored = TrendProjectionSchema.model_validate(data)
|
|
|
|
assert restored.trend_window_id == projection.trend_window_id
|
|
assert restored.projected_direction == projection.projected_direction
|
|
assert restored.projected_strength == projection.projected_strength
|
|
assert restored.projected_confidence == projection.projected_confidence
|
|
assert restored.projection_horizon == projection.projection_horizon
|
|
assert restored.driving_factors == projection.driving_factors
|
|
assert restored.macro_contribution_pct == projection.macro_contribution_pct
|
|
assert restored.diverges_from_current == projection.diverges_from_current
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 1: Content hash stability and uniqueness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from services.shared.content import content_hash, content_hash_str
|
|
|
|
|
|
class TestProperty1ContentHashStabilityAndUniqueness:
|
|
"""Feature: global-news-interpolation, Property 1: Content hash stability and uniqueness
|
|
|
|
For any macro news article content, computing the content hash twice on
|
|
identical content SHALL produce the same hash, and computing the hash on
|
|
distinct content SHALL produce different hashes.
|
|
|
|
Validates: Requirements 1.2
|
|
"""
|
|
|
|
@given(content=st.binary(min_size=1, max_size=10000))
|
|
@settings(max_examples=100)
|
|
def test_content_hash_stability_bytes(self, content: bytes):
|
|
"""**Validates: Requirements 1.2**
|
|
|
|
Computing content_hash twice on the same bytes must produce the same result.
|
|
"""
|
|
hash1 = content_hash(content)
|
|
hash2 = content_hash(content)
|
|
assert hash1 == hash2, (
|
|
f"Hash instability: {hash1} != {hash2} for same content"
|
|
)
|
|
|
|
@given(text=st.text(min_size=1, max_size=5000))
|
|
@settings(max_examples=100)
|
|
def test_content_hash_str_stability(self, text: str):
|
|
"""**Validates: Requirements 1.2**
|
|
|
|
Computing content_hash_str twice on the same string must produce the same result.
|
|
"""
|
|
hash1 = content_hash_str(text)
|
|
hash2 = content_hash_str(text)
|
|
assert hash1 == hash2, (
|
|
f"Hash instability: {hash1} != {hash2} for same text"
|
|
)
|
|
|
|
@given(
|
|
content_a=st.binary(min_size=1, max_size=5000),
|
|
content_b=st.binary(min_size=1, max_size=5000),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_content_hash_uniqueness_bytes(self, content_a: bytes, content_b: bytes):
|
|
"""**Validates: Requirements 1.2**
|
|
|
|
Computing content_hash on distinct content must produce different hashes.
|
|
"""
|
|
from hypothesis import assume
|
|
assume(content_a != content_b)
|
|
|
|
hash_a = content_hash(content_a)
|
|
hash_b = content_hash(content_b)
|
|
assert hash_a != hash_b, (
|
|
f"Hash collision: {hash_a} for distinct content "
|
|
f"({len(content_a)} bytes vs {len(content_b)} bytes)"
|
|
)
|
|
|
|
@given(
|
|
text_a=st.text(min_size=1, max_size=5000),
|
|
text_b=st.text(min_size=1, max_size=5000),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_content_hash_str_uniqueness(self, text_a: str, text_b: str):
|
|
"""**Validates: Requirements 1.2**
|
|
|
|
Computing content_hash_str on distinct strings must produce different hashes.
|
|
"""
|
|
from hypothesis import assume
|
|
assume(text_a != text_b)
|
|
|
|
hash_a = content_hash_str(text_a)
|
|
hash_b = content_hash_str(text_b)
|
|
assert hash_a != hash_b, (
|
|
f"Hash collision: {hash_a} for distinct text"
|
|
)
|