Files
stonks-oracle/tests/test_extractor_worker.py
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

200 lines
6.5 KiB
Python

"""Tests for the extraction worker persistence logic.
Validates that persist_extraction correctly uploads artifacts to MinIO
and persists intelligence/impact records to PostgreSQL.
Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 9.1, 9.2
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from services.extractor.client import ExtractionAttempt, ExtractionResponse
from services.extractor.schemas import ExtractionResult, ValidationReport
from services.extractor.worker import persist_extraction
def _make_valid_result() -> ExtractionResult:
"""Build a minimal valid ExtractionResult."""
return ExtractionResult.model_validate({
"summary": "Apple beat earnings expectations.",
"companies": [
{
"ticker": "AAPL",
"company_name": "Apple Inc.",
"relevance": 0.95,
"sentiment": "positive",
"impact_score": 0.7,
"impact_horizon": "1d_30d",
"catalyst_type": "earnings",
"key_facts": ["Revenue up 12%"],
"risks": [],
"evidence_spans": ["Apple beat expectations"],
}
],
"macro_themes": ["ai_capex"],
"novelty_score": 0.6,
"confidence": 0.85,
"extraction_warnings": [],
})
def _make_success_response() -> ExtractionResponse:
"""Build a successful ExtractionResponse with one attempt."""
result = _make_valid_result()
validation = ValidationReport(valid=True, errors=[], warnings=[], parsed=result)
attempt = ExtractionAttempt(
raw_output=result.model_dump_json(),
validation=validation,
error=None,
duration_ms=500,
model="test-model",
)
return ExtractionResponse(
success=True,
result=result,
attempts=[attempt],
prompt_metadata={"prompt_version": "document-intel-v1", "schema_version": "2.0.0"},
model="test-model",
total_duration_ms=500,
)
def _make_failed_response() -> ExtractionResponse:
"""Build a failed ExtractionResponse with two attempts."""
attempt1 = ExtractionAttempt(
raw_output="bad json",
validation=None,
error="invalid_json",
duration_ms=200,
model="test-model",
)
attempt2 = ExtractionAttempt(
raw_output="still bad",
validation=ValidationReport(valid=False, errors=["schema_fail"], warnings=[]),
error="schema_fail",
duration_ms=300,
model="test-model",
)
return ExtractionResponse(
success=False,
result=None,
attempts=[attempt1, attempt2],
prompt_metadata={"prompt_version": "document-intel-v1", "schema_version": "2.0.0"},
model="test-model",
total_duration_ms=500,
)
def _mock_pool(intel_id: str = "intel-uuid-1", impact_id: str = "impact-uuid-1") -> AsyncMock:
"""Create a mock asyncpg pool that returns predictable UUIDs."""
pool = AsyncMock()
# Side effects: intelligence insert, impact insert, metrics insert
pool.fetchval = AsyncMock(side_effect=[intel_id, impact_id, "metrics-uuid-1"])
pool.execute = AsyncMock()
return pool
def _mock_minio() -> MagicMock:
"""Create a mock MinIO client."""
client = MagicMock()
return client
@pytest.mark.asyncio
async def test_persist_successful_extraction():
"""Successful extraction persists all artifacts and intelligence records."""
pool = _mock_pool()
minio = _mock_minio()
response = _make_success_response()
ts = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
result = await persist_extraction(
pool=pool,
minio_client=minio,
document_id="doc-123",
ticker="AAPL",
extraction_response=response,
company_id_map={"AAPL": "company-uuid-1"},
source_credibility=0.8,
timestamp=ts,
)
assert result.success
assert result.intelligence_id == "intel-uuid-1"
assert result.impact_ids == ["impact-uuid-1"]
assert result.prompt_ref is not None
assert "stonks-llm-prompts" in result.prompt_ref
assert result.raw_output_ref is not None
assert "stonks-llm-results" in result.raw_output_ref
assert result.validation_ref is not None
assert result.intelligence_ref is not None
# MinIO should have 4 uploads: prompt, raw output, validation, intelligence
assert minio.put_object.call_count == 4
# PostgreSQL: 1 intelligence insert + 1 impact insert + 1 metrics insert + 1 status update
assert pool.fetchval.call_count == 3
assert pool.execute.call_count == 1
@pytest.mark.asyncio
async def test_persist_failed_extraction():
"""Failed extraction still persists attempt data and marks document as failed."""
pool = AsyncMock()
pool.fetchval = AsyncMock(return_value="intel-uuid-fail")
pool.execute = AsyncMock()
minio = _mock_minio()
response = _make_failed_response()
ts = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
result = await persist_extraction(
pool=pool,
minio_client=minio,
document_id="doc-456",
ticker="AAPL",
extraction_response=response,
timestamp=ts,
)
assert not result.success
assert result.intelligence_id == "intel-uuid-fail"
assert result.intelligence_ref is None # no final intelligence on failure
assert result.prompt_ref is not None
assert result.raw_output_ref is not None
assert result.validation_ref is not None
# MinIO: 3 uploads (prompt, raw output, validation — no intelligence)
assert minio.put_object.call_count == 3
# PostgreSQL: 1 intelligence insert + 1 metrics insert + 1 status update
assert pool.fetchval.call_count == 2
assert pool.execute.call_count == 1
@pytest.mark.asyncio
async def test_persist_skips_impact_without_company_id():
"""Impact records are skipped when company_id_map doesn't have the ticker."""
pool = AsyncMock()
pool.fetchval = AsyncMock(return_value="intel-uuid-2")
pool.execute = AsyncMock()
minio = _mock_minio()
response = _make_success_response()
result = await persist_extraction(
pool=pool,
minio_client=minio,
document_id="doc-789",
ticker="AAPL",
extraction_response=response,
company_id_map={}, # no mapping for AAPL
)
assert result.success
assert result.impact_ids == []
# 1 fetchval for intelligence + 1 for metrics, no impact insert
assert pool.fetchval.call_count == 2