"""Integration tests for the full ingest-to-recommendation flow. Exercises the pipeline end-to-end through all stages: Ingestion → Parsing → Extraction → Aggregation → Recommendation Each stage uses the real logic functions from the service modules. External infrastructure (PostgreSQL, MinIO, Redis, Ollama) is replaced with lightweight fakes that preserve the data contracts between stages. Requirements: 3.1-3.4, 4.1-4.3, 5.1-5.5, 6.1-6.5, 7.1-7.4 """ from __future__ import annotations import json import uuid from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock import pytest from services.aggregation.worker import ( ImpactRow, assemble_trend_with_evidence, build_weighted_signals, ) from services.extractor.client import ExtractionAttempt, ExtractionResponse from services.extractor.schemas import ExtractionResult, ValidationReport, validate_extraction from services.extractor.worker import persist_extraction from services.parser.html_parser import detect_company_mentions, parse_html from services.parser.worker import build_parser_output_json from services.recommendation.eligibility import evaluate_eligibility from services.recommendation.suppression import ( DataQualityContext, evaluate_suppression, ) from services.recommendation.worker import ( build_recommendation, ) from services.shared.schemas import ( ActionType, RecommendationMode, TrendDirection, TrendWindow, ) NOW = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc) # --------------------------------------------------------------------------- # Shared test fixtures # --------------------------------------------------------------------------- SAMPLE_HTML = """ Apple Reports Record Q2 Earnings

Apple Reports Record Q2 Earnings

Apple Inc. (AAPL) reported record quarterly revenue of $120 billion, beating analyst expectations by 8%. CEO Tim Cook cited strong iPhone and services growth as key drivers.

The company also announced a $100 billion share buyback program, signaling confidence in future cash flows. Analysts at Goldman Sachs raised their price target to $250.

However, regulatory scrutiny in the EU remains a risk factor, with potential fines related to the Digital Markets Act.

""" SAMPLE_EXTRACTION_JSON = { "summary": "Apple reported record Q2 revenue of $120B, beating expectations by 8%. " "Announced $100B buyback. EU regulatory risk remains.", "companies": [ { "ticker": "AAPL", "company_name": "Apple Inc.", "relevance": 0.95, "sentiment": "positive", "impact_score": 0.75, "impact_horizon": "1d_30d", "catalyst_type": "earnings", "key_facts": [ "Record quarterly revenue of $120 billion", "$100 billion share buyback announced", "Goldman Sachs raised price target to $250", ], "risks": ["EU regulatory scrutiny under Digital Markets Act"], "evidence_spans": [ "Apple Inc. (AAPL) reported record quarterly revenue of $120 billion", "beating analyst expectations by 8%", "announced a $100 billion share buyback program", ], } ], "macro_themes": ["consumer_tech", "buybacks"], "novelty_score": 0.7, "confidence": 0.88, "extraction_warnings": [], } COMPANY_ALIASES = [ {"company_id": "comp-1", "alias": "AAPL", "alias_type": "ticker", "ticker": "AAPL"}, {"company_id": "comp-1", "alias": "Apple Inc.", "alias_type": "legal_name", "ticker": "AAPL"}, ] # --------------------------------------------------------------------------- # Stage 1: Parsing # --------------------------------------------------------------------------- class TestParsingStage: """Verify the HTML parsing pipeline produces structured output.""" def test_parse_html_extracts_body_text(self): parsed = parse_html(SAMPLE_HTML, "https://example.com/apple-earnings") assert parsed.body_text is not None assert "record quarterly revenue" in parsed.body_text.lower() # Boilerplate should be stripped assert "Site Navigation" not in parsed.body_text assert "Copyright" not in parsed.body_text def test_parse_html_extracts_metadata(self): parsed = parse_html(SAMPLE_HTML, "https://example.com/apple-earnings") assert parsed.title == "Apple Reports Record Q2 Earnings" assert parsed.quality_score > 0.0 assert parsed.confidence != "low" def test_detect_company_mentions_finds_aapl(self): parsed = parse_html(SAMPLE_HTML, "https://example.com/apple-earnings") mentions = detect_company_mentions(parsed.body_text, COMPANY_ALIASES) tickers_found = {m["ticker"] for m in mentions} assert "AAPL" in tickers_found def test_parser_output_json_structure(self): parsed = parse_html(SAMPLE_HTML, "https://example.com/apple-earnings") mentions = detect_company_mentions(parsed.body_text, COMPANY_ALIASES) output = build_parser_output_json(parsed, mentions) assert "quality_score" in output assert "mentioned_companies" in output assert isinstance(output["mentioned_companies"], list) assert output["title"] == "Apple Reports Record Q2 Earnings" # --------------------------------------------------------------------------- # Stage 2: Extraction validation # --------------------------------------------------------------------------- class TestExtractionStage: """Verify extraction schema validation and result construction.""" def test_validate_extraction_accepts_valid_json(self): report = validate_extraction(SAMPLE_EXTRACTION_JSON) assert report.valid assert report.parsed is not None assert report.parsed.companies[0].ticker == "AAPL" def test_validate_extraction_rejects_invalid_json(self): report = validate_extraction("not json at all") assert not report.valid assert len(report.errors) > 0 def test_validate_extraction_rejects_bad_schema(self): bad = {"summary": "test"} # missing required fields — normalized with defaults report = validate_extraction(bad) assert report.valid assert report.parsed is not None assert "incomplete_model_output" in report.parsed.extraction_warnings def test_extraction_result_matches_intelligence_schema(self): result = ExtractionResult.model_validate(SAMPLE_EXTRACTION_JSON) assert result.confidence == 0.88 assert len(result.companies) == 1 assert result.companies[0].catalyst_type.value == "earnings" assert result.novelty_score == 0.7 def test_validate_extraction_with_document_text_checks_evidence(self): """Evidence grounding check should warn if spans not found.""" report = validate_extraction( SAMPLE_EXTRACTION_JSON, document_text="Completely unrelated text about weather.", ) # Should still be valid (evidence grounding is a warning, not error) assert report.valid assert any("evidence_span_not_found" in w for w in report.warnings) # --------------------------------------------------------------------------- # Stage 3: Extraction persistence (mocked infra) # --------------------------------------------------------------------------- class TestExtractionPersistence: """Verify extraction artifacts are persisted correctly.""" @pytest.mark.asyncio async def test_persist_successful_extraction_creates_all_artifacts(self): result_obj = ExtractionResult.model_validate(SAMPLE_EXTRACTION_JSON) validation = ValidationReport(valid=True, errors=[], warnings=[], parsed=result_obj) attempt = ExtractionAttempt( raw_output=json.dumps(SAMPLE_EXTRACTION_JSON), validation=validation, error=None, duration_ms=450, model="test-model", ) response = ExtractionResponse( success=True, result=result_obj, attempts=[attempt], prompt_metadata={"prompt_version": "document-intel-v2", "schema_version": "2.0.0"}, model="test-model", total_duration_ms=450, ) pool = AsyncMock() pool.fetchval = AsyncMock(side_effect=["intel-1", "impact-1", "metrics-1"]) pool.execute = AsyncMock() minio = MagicMock() persist_result = await persist_extraction( pool=pool, minio_client=minio, document_id=str(uuid.uuid4()), ticker="AAPL", extraction_response=response, company_id_map={"AAPL": "comp-1"}, source_credibility=0.8, timestamp=NOW, ) assert persist_result.success assert persist_result.intelligence_id == "intel-1" assert persist_result.impact_ids == ["impact-1"] # 4 MinIO uploads: prompt, raw_output, validation, intelligence assert minio.put_object.call_count == 4 # --------------------------------------------------------------------------- # Stage 4: Aggregation # --------------------------------------------------------------------------- class TestAggregationStage: """Verify trend summary assembly from document impact records.""" def _make_impacts_from_extraction(self) -> list[ImpactRow]: """Build ImpactRows that mirror what the extraction stage would produce.""" return [ ImpactRow( document_id="doc-1", confidence=0.88, novelty_score=0.7, source_credibility=0.8, sentiment="positive", impact_score=0.75, catalyst_type="earnings", key_facts=["Record revenue $120B", "$100B buyback"], risks=["EU regulatory scrutiny"], published_at=NOW - timedelta(hours=2), ), ImpactRow( document_id="doc-2", confidence=0.72, novelty_score=0.5, source_credibility=0.7, sentiment="positive", impact_score=0.6, catalyst_type="rating_change", key_facts=["Goldman raised target to $250"], risks=[], published_at=NOW - timedelta(hours=4), ), ImpactRow( document_id="doc-3", confidence=0.65, novelty_score=0.4, source_credibility=0.6, sentiment="negative", impact_score=0.4, catalyst_type="legal", key_facts=["EU DMA investigation"], risks=["Potential fines"], published_at=NOW - timedelta(hours=6), ), ] def test_aggregation_produces_bullish_trend(self): impacts = self._make_impacts_from_extraction() signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", signals, impacts, reference_time=NOW, ) summary = assembled.summary assert summary.entity_id == "AAPL" assert summary.window == TrendWindow.SEVEN_DAY # Two positive, one negative → should be bullish assert summary.trend_direction == TrendDirection.BULLISH assert summary.trend_strength > 0 assert summary.confidence > 0 assert len(summary.top_supporting_evidence) >= 1 assert len(summary.top_opposing_evidence) >= 1 assert summary.contradiction_score > 0 # has opposing signal def test_aggregation_evidence_rankings_are_populated(self): impacts = self._make_impacts_from_extraction() signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", signals, impacts, reference_time=NOW, ) # Supporting evidence should include the positive docs supporting_ids = {e.document_id for e in assembled.supporting_evidence} assert "doc-1" in supporting_ids assert "doc-2" in supporting_ids # Opposing evidence should include the negative doc opposing_ids = {e.document_id for e in assembled.opposing_evidence} assert "doc-3" in opposing_ids def test_aggregation_extracts_catalysts_and_risks(self): impacts = self._make_impacts_from_extraction() signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", signals, impacts, reference_time=NOW, ) summary = assembled.summary assert len(summary.dominant_catalysts) > 0 assert "earnings" in summary.dominant_catalysts assert len(summary.material_risks) > 0 # --------------------------------------------------------------------------- # Stage 5: Recommendation # --------------------------------------------------------------------------- class TestRecommendationStage: """Verify recommendation generation from trend summaries.""" def _make_trend_from_aggregation(self): """Build a TrendSummary that mirrors aggregation output.""" impacts = [ ImpactRow( document_id="doc-1", confidence=0.88, novelty_score=0.7, source_credibility=0.8, sentiment="positive", impact_score=0.75, catalyst_type="earnings", key_facts=["Record revenue"], risks=["EU regulatory"], published_at=NOW - timedelta(hours=2), ), ImpactRow( document_id="doc-2", confidence=0.72, novelty_score=0.5, source_credibility=0.7, sentiment="positive", impact_score=0.6, catalyst_type="rating_change", key_facts=["Target raised"], risks=[], published_at=NOW - timedelta(hours=4), ), ImpactRow( document_id="doc-3", confidence=0.65, novelty_score=0.4, source_credibility=0.6, sentiment="negative", impact_score=0.4, catalyst_type="legal", key_facts=["DMA investigation"], risks=["Potential fines"], published_at=NOW - timedelta(hours=6), ), ] signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", signals, impacts, reference_time=NOW, ) return assembled.summary def test_eligibility_produces_buy_for_bullish_trend(self): summary = self._make_trend_from_aggregation() result = evaluate_eligibility(summary) assert result.action == ActionType.BUY assert result.eligible def test_recommendation_has_thesis_and_evidence(self): summary = self._make_trend_from_aggregation() result = evaluate_eligibility(summary) rec = build_recommendation(summary, result, reference_time=NOW) assert rec.ticker == "AAPL" assert rec.action == ActionType.BUY assert len(rec.thesis) > 0 assert "[risk:" in rec.thesis assert len(rec.evidence_refs) > 0 assert rec.time_horizon == "swing_1d_10d" def test_recommendation_position_sizing_is_bounded(self): summary = self._make_trend_from_aggregation() result = evaluate_eligibility(summary) rec = build_recommendation(summary, result, reference_time=NOW) assert 0 < rec.position_sizing.portfolio_pct <= 0.05 assert 0 < rec.position_sizing.max_loss_pct <= 0.01 def test_recommendation_mode_reflects_confidence(self): summary = self._make_trend_from_aggregation() result = evaluate_eligibility(summary) rec = build_recommendation(summary, result, reference_time=NOW) # With 3 impact records the aggregated confidence is moderate (~0.41), # which is below the paper_confidence_threshold (0.50). The eligibility # engine correctly assigns INFORMATIONAL mode for BUY actions with # sub-threshold confidence. This validates Requirement 7.4. if summary.confidence >= 0.50: assert rec.mode in ( RecommendationMode.PAPER_ELIGIBLE, RecommendationMode.LIVE_ELIGIBLE, ) else: assert rec.mode == RecommendationMode.INFORMATIONAL def test_suppression_blocks_low_quality_data(self): summary = self._make_trend_from_aggregation() low_quality_ctx = DataQualityContext( total_documents=5, valid_documents=1, failed_documents=4, avg_extraction_confidence=0.2, newest_evidence_at=NOW - timedelta(days=14), source_types=set(), ) suppression = evaluate_suppression( summary, quality_ctx=low_quality_ctx, reference_time=NOW, ) assert suppression.suppressed assert len(suppression.reasons) > 0 # --------------------------------------------------------------------------- # Full pipeline integration # --------------------------------------------------------------------------- class TestFullPipelineIntegration: """End-to-end test wiring all stages together with real logic.""" def test_html_to_recommendation_pipeline(self): """Walk a document through parse → validate extraction → aggregate → recommend.""" # --- Stage 1: Parse HTML --- parsed = parse_html(SAMPLE_HTML, "https://example.com/apple-q2") assert parsed.body_text assert parsed.confidence != "low" mentions = detect_company_mentions(parsed.body_text, COMPANY_ALIASES) assert any(m["ticker"] == "AAPL" for m in mentions) # --- Stage 2: Validate extraction output --- report = validate_extraction( SAMPLE_EXTRACTION_JSON, document_text=parsed.body_text, ) assert report.valid extraction = report.parsed assert extraction is not None assert extraction.companies[0].ticker == "AAPL" # --- Stage 3: Build impact records from extraction --- company = extraction.companies[0] impact = ImpactRow( document_id="doc-pipeline-1", confidence=extraction.confidence, novelty_score=extraction.novelty_score, source_credibility=0.8, sentiment=company.sentiment.value, impact_score=company.impact_score, catalyst_type=company.catalyst_type.value, key_facts=company.key_facts, risks=company.risks, published_at=NOW - timedelta(hours=1), ) # Add a second supporting document for richer aggregation impact2 = ImpactRow( document_id="doc-pipeline-2", confidence=0.75, novelty_score=0.5, source_credibility=0.7, sentiment="positive", impact_score=0.6, catalyst_type="rating_change", key_facts=["Analyst upgrade"], risks=[], published_at=NOW - timedelta(hours=3), ) impacts = [impact, impact2] # --- Stage 4: Aggregate into trend summary --- signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "AAPL", "7d", signals, impacts, reference_time=NOW, ) summary = assembled.summary assert summary.trend_direction == TrendDirection.BULLISH assert summary.confidence > 0 assert len(summary.top_supporting_evidence) > 0 # --- Stage 5: Generate recommendation --- eligibility = evaluate_eligibility(summary) assert eligibility.action == ActionType.BUY assert eligibility.eligible rec = build_recommendation(summary, eligibility, reference_time=NOW) # Final assertions: the recommendation is coherent end-to-end assert rec.ticker == "AAPL" assert rec.action == ActionType.BUY assert rec.confidence == summary.confidence assert len(rec.evidence_refs) > 0 assert rec.thesis.startswith("[risk:") assert "AAPL" in rec.thesis assert "bullish" in rec.thesis assert rec.time_horizon == "swing_1d_10d" assert 0 < rec.position_sizing.portfolio_pct <= 0.05 def test_low_quality_document_is_blocked(self): """A low-quality parse should not produce a trade-eligible recommendation.""" # Minimal HTML that produces a low-quality parse bad_html = "

Ad. Subscribe now.

" parsed = parse_html(bad_html, "https://example.com/junk") # Low quality parse → should not advance to extraction # The parser worker checks confidence != "low" before enqueuing if parsed.confidence == "low" or parsed.quality_score < 0.3: # This is the expected path: document blocked at parse stage return # If somehow it passes parsing, suppression should catch it # Build a minimal trend with low data quality from services.shared.schemas import TrendSummary summary = TrendSummary( entity_type="company", entity_id="JUNK", window=TrendWindow.SEVEN_DAY, trend_direction=TrendDirection.BULLISH, trend_strength=0.3, confidence=0.3, top_supporting_evidence=["doc-1"], generated_at=NOW, ) suppression = evaluate_suppression(summary, reference_time=NOW) # With only 1 evidence doc and low confidence, should be suppressed assert suppression.suppressed def test_bearish_signal_produces_sell_recommendation(self): """Negative sentiment documents should produce a SELL recommendation.""" impacts = [ ImpactRow( document_id="doc-bear-1", confidence=0.82, novelty_score=0.6, source_credibility=0.8, sentiment="negative", impact_score=0.7, catalyst_type="legal", key_facts=["Major lawsuit filed"], risks=["Potential $5B fine"], published_at=NOW - timedelta(hours=1), ), ImpactRow( document_id="doc-bear-2", confidence=0.78, novelty_score=0.5, source_credibility=0.75, sentiment="negative", impact_score=0.65, catalyst_type="earnings", key_facts=["Revenue miss by 15%"], risks=["Guidance lowered"], published_at=NOW - timedelta(hours=3), ), ] signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "TSLA", "7d", signals, impacts, reference_time=NOW, ) summary = assembled.summary assert summary.trend_direction == TrendDirection.BEARISH eligibility = evaluate_eligibility(summary) assert eligibility.action == ActionType.SELL rec = build_recommendation(summary, eligibility, reference_time=NOW) assert rec.ticker == "TSLA" assert rec.action == ActionType.SELL assert "SELL" in rec.thesis def test_contradictory_signals_produce_mixed_or_watch(self): """Equal opposing signals should result in WATCH or MIXED direction.""" impacts = [ ImpactRow( document_id="doc-pos", confidence=0.8, novelty_score=0.5, source_credibility=0.8, sentiment="positive", impact_score=0.6, catalyst_type="earnings", key_facts=["Beat expectations"], risks=[], published_at=NOW - timedelta(hours=1), ), ImpactRow( document_id="doc-neg", confidence=0.8, novelty_score=0.5, source_credibility=0.8, sentiment="negative", impact_score=0.6, catalyst_type="legal", key_facts=["Lawsuit filed"], risks=["Regulatory risk"], published_at=NOW - timedelta(hours=1), ), ] signals = build_weighted_signals(impacts, NOW, "7d") assembled = assemble_trend_with_evidence( "MSFT", "7d", signals, impacts, reference_time=NOW, ) summary = assembled.summary assert summary.trend_direction in (TrendDirection.MIXED, TrendDirection.NEUTRAL) assert summary.contradiction_score > 0 eligibility = evaluate_eligibility(summary) rec = build_recommendation(summary, eligibility, reference_time=NOW) # Contradictory signals → WATCH or HOLD, mode should be informational assert rec.action in (ActionType.WATCH, ActionType.HOLD) assert rec.mode == RecommendationMode.INFORMATIONAL