feat: comprehensive docs, unit tests, docker-compose app services

- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py)
- Add all 13 app services + dashboard to docker-compose.yml
- Add full documentation suite: API reference, Helm reference, Docker deployment guide,
  3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide,
  backup/restore guide, observability/metrics reference, per-service docs
- Add intelligence pipeline deep-dive docs with Mermaid diagrams
- Update README with documentation index and links
- Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive,
  sanitized-pipeline-docs
This commit is contained in:
Celes Renata
2026-04-22 02:56:41 +00:00
parent f251c53f92
commit 88ad1e8d99
57 changed files with 13318 additions and 51 deletions
+820
View File
@@ -0,0 +1,820 @@
"""Unit tests for ingestion worker process_job function.
Covers: successful job processing, adapter error with retry,
retry exhaustion → dead-letter queue (via record_retrieval_failure),
content hash deduplication skip, cross-source dedup via dedupe_items,
and error handling paths (unexpected exceptions).
Requirements: 2.1, 2.2, 2.3, 2.4
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from services.adapters.base import AdapterResult
from services.shared.redis_keys import (
QUEUE_PARSING,
queue_key,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _run_id() -> uuid.UUID:
return uuid.UUID("00000000-0000-0000-0000-000000000001")
def _make_job(
source_type: str = "news_api",
ticker: str = "AAPL",
source_id: str = "src-1",
company_id: str = "cid-1",
config: dict | None = None,
) -> dict:
return {
"source_type": source_type,
"ticker": ticker,
"source_id": source_id,
"company_id": company_id,
"config": config or {},
}
def _make_adapter_result(
source_type: str = "news_api",
ticker: str = "AAPL",
items: list | None = None,
raw_payload: bytes = b'{"data": []}',
content_hash: str = "abc123",
error: str | None = None,
) -> AdapterResult:
return AdapterResult(
source_type=source_type,
ticker=ticker,
items=items if items is not None else [{"title": "Test Article", "url": "https://example.com/1"}],
raw_payload=raw_payload,
content_hash=content_hash,
fetched_at=datetime(2026, 6, 15, 12, 0, 0, tzinfo=timezone.utc),
error=error,
)
def _mock_pool() -> AsyncMock:
pool = AsyncMock()
pool.fetchval = AsyncMock(return_value=_run_id())
pool.execute = AsyncMock(return_value="UPDATE 1")
pool.fetchrow = AsyncMock(return_value=None)
pool.fetch = AsyncMock(return_value=[])
return pool
def _mock_redis() -> AsyncMock:
rds = AsyncMock()
rds.rpush = AsyncMock(return_value=1)
rds.set = AsyncMock(return_value=True)
rds.get = AsyncMock(return_value=None)
rds.lpop = AsyncMock(return_value=None)
return rds
def _mock_minio() -> MagicMock:
return MagicMock()
def _mock_adapter(result: AdapterResult | None = None) -> AsyncMock:
adapter = AsyncMock()
adapter.fetch = AsyncMock(return_value=result or _make_adapter_result())
return adapter
# ---------------------------------------------------------------------------
# Test: Successful job processing
# ---------------------------------------------------------------------------
class TestSuccessfulJobProcessing:
"""Verify the happy path: adapter returns items, they are persisted and enqueued."""
@pytest.mark.asyncio
async def test_successful_news_ingestion(self):
"""A news_api job with new items should persist, enqueue for parsing, and mark complete."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [
{"title": "Article 1", "url": "https://example.com/1"},
{"title": "Article 2", "url": "https://example.com/2"},
]
result = _make_adapter_result(items=items, content_hash="hash1")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
new_doc_ids = ["doc-1", "doc-2"]
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path") as mock_upload,
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(2, new_doc_ids)) as mock_persist,
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])) as mock_dedupe,
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark,
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock) as mock_reset,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Adapter was called
adapter.fetch.assert_awaited_once_with("AAPL", {})
# Ingestion run was created
pool.fetchval.assert_awaited_once()
# Raw artifact uploaded to MinIO
mock_upload.assert_called_once()
# Content hash was checked in Redis and then set
rds.get.assert_awaited()
rds.set.assert_awaited()
# Cross-source dedupe was called (news_api is not market_api/broker)
mock_dedupe.assert_awaited_once()
# Items were persisted
mock_persist.assert_awaited_once()
# New docs enqueued for parsing
assert rds.rpush.await_count == len(new_doc_ids)
for call_args in rds.rpush.await_args_list:
assert call_args[0][0] == queue_key(QUEUE_PARSING)
# mark_as_seen called for each new item
assert mock_mark.await_count == len(new_doc_ids)
# Retry state reset after success
mock_reset.assert_awaited_once_with(pool, "src-1")
# Ingestion run updated to completed
update_calls = [
c for c in pool.execute.await_args_list
if "completed" in str(c)
]
assert len(update_calls) >= 1
# ---------------------------------------------------------------------------
# Test: Adapter error with retry
# ---------------------------------------------------------------------------
class TestAdapterErrorWithRetry:
"""Verify that adapter errors are recorded as retrieval failures."""
@pytest.mark.asyncio
async def test_adapter_error_records_failure(self):
"""When adapter returns an error, record_retrieval_failure is called."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result(error="API rate limited")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Failure was recorded with the error message
mock_record.assert_awaited_once()
call_kwargs = mock_record.await_args
assert call_kwargs[1]["error_message"] == "API rate limited"
assert call_kwargs[1]["run_id"] == str(_run_id())
assert call_kwargs[1]["source_id"] == "src-1"
@pytest.mark.asyncio
async def test_adapter_error_does_not_persist_items(self):
"""When adapter returns an error, no items should be persisted or enqueued."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result(error="Connection timeout")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock) as mock_persist,
patch("services.ingestion.worker.upload_raw_artifact") as mock_upload,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# No items persisted, no upload
mock_persist.assert_not_awaited()
mock_upload.assert_not_called()
# No parsing jobs enqueued
rds.rpush.assert_not_awaited()
# ---------------------------------------------------------------------------
# Test: Retry exhaustion → dead-letter queue
# ---------------------------------------------------------------------------
class TestRetryExhaustion:
"""Verify that record_retrieval_failure handles retry exhaustion.
The DLQ routing is handled inside record_retrieval_failure in the
metadata module. The worker calls record_retrieval_failure on both
adapter errors and unexpected exceptions. We verify the worker
correctly delegates to record_retrieval_failure in both cases.
"""
@pytest.mark.asyncio
async def test_adapter_error_delegates_to_record_failure(self):
"""Adapter error path calls record_retrieval_failure which manages retry state."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result(error="Server error 500")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock, return_value={"exhausted": True, "retry_count": 5}) as mock_record,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
mock_record.assert_awaited_once_with(
pool,
run_id=str(_run_id()),
source_id="src-1",
error_message="Server error 500",
)
@pytest.mark.asyncio
async def test_exception_delegates_to_record_failure(self):
"""Unexpected exception path also calls record_retrieval_failure."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
adapter = AsyncMock()
adapter.fetch = AsyncMock(side_effect=RuntimeError("Unexpected crash"))
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
mock_record.assert_awaited_once()
call_kwargs = mock_record.await_args
assert "Unexpected crash" in call_kwargs[1]["error_message"]
# ---------------------------------------------------------------------------
# Test: Content hash deduplication skip
# ---------------------------------------------------------------------------
class TestContentHashDedup:
"""Verify that a previously-seen content hash causes the job to skip processing."""
@pytest.mark.asyncio
async def test_duplicate_content_hash_skips_processing(self):
"""When Redis reports the content hash is already seen, skip persistence."""
pool = _mock_pool()
rds = _mock_redis()
# Simulate content hash already in Redis
rds.get = AsyncMock(return_value=b"1")
minio_client = _mock_minio()
result = _make_adapter_result(content_hash="already-seen-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock) as mock_persist,
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock) as mock_dedupe,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Items should NOT be persisted
mock_persist.assert_not_awaited()
# Cross-source dedupe should NOT be called
mock_dedupe.assert_not_awaited()
# No parsing jobs enqueued
rds.rpush.assert_not_awaited()
# Ingestion run updated with items_new=0
update_calls = [
c for c in pool.execute.await_args_list
if "items_new=0" in str(c)
]
assert len(update_calls) == 1
@pytest.mark.asyncio
async def test_no_content_hash_skips_dedupe_check(self):
"""When content_hash is empty, the Redis dedupe check is skipped."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [{"title": "Article"}]
result = _make_adapter_result(items=items, content_hash="")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-1"])),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Redis get should not be called for dedupe (empty content_hash)
rds.get.assert_not_awaited()
# But items should still be persisted
# (persist_ingestion_items was called via the patch)
# ---------------------------------------------------------------------------
# Test: Cross-source dedup via dedupe_items
# ---------------------------------------------------------------------------
class TestCrossSourceDedup:
"""Verify cross-source deduplication partitions items correctly."""
@pytest.mark.asyncio
async def test_dedupe_items_filters_duplicates(self):
"""When dedupe_items finds duplicates, only new items are persisted."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
all_items = [
{"title": "New Article", "url": "https://example.com/new"},
{"title": "Dup Article", "url": "https://example.com/dup", "_dedupe_existing_id": "existing-doc"},
]
new_items = [all_items[0]]
dup_items = [all_items[1]]
result = _make_adapter_result(items=all_items, content_hash="hash-x")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)) as mock_dedupe,
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new"])) as mock_persist,
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# dedupe_items was called with all items
mock_dedupe.assert_awaited_once_with(pool, rds, all_items)
# persist_ingestion_items received only the new items
persist_call = mock_persist.await_args
assert persist_call[1]["items"] == new_items
@pytest.mark.asyncio
async def test_market_api_skips_cross_source_dedupe(self):
"""Market API jobs should NOT go through cross-source deduplication."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [{"close": 150.0, "volume": 1000000}]
result = _make_adapter_result(
source_type="market_api",
items=items,
content_hash="market-hash",
)
adapter = _mock_adapter(result)
adapters = {"market_api": adapter}
job = _make_job(source_type="market_api")
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock) as mock_dedupe,
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["snap-1"])),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Cross-source dedupe should NOT be called for market_api
mock_dedupe.assert_not_awaited()
# No parsing jobs enqueued for market data
rds.rpush.assert_not_awaited()
# ---------------------------------------------------------------------------
# Test: Error handling paths
# ---------------------------------------------------------------------------
class TestErrorHandling:
"""Verify error handling for unexpected exceptions and missing adapters."""
@pytest.mark.asyncio
async def test_unknown_source_type_returns_early(self):
"""A job with no matching adapter should log a warning and return."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
adapters = {"news_api": _mock_adapter()}
job = _make_job(source_type="unknown_type")
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# No ingestion run should be created for unknown adapter
pool.fetchval.assert_not_awaited()
@pytest.mark.asyncio
async def test_unexpected_exception_records_failure(self):
"""An unexpected exception during processing records the failure."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
adapter = AsyncMock()
adapter.fetch = AsyncMock(side_effect=ConnectionError("DB gone"))
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
mock_record.assert_awaited_once()
call_kwargs = mock_record.await_args
assert "DB gone" in call_kwargs[1]["error_message"]
assert call_kwargs[1]["source_id"] == "src-1"
@pytest.mark.asyncio
async def test_upload_failure_records_error(self):
"""If MinIO upload raises, the exception handler records the failure."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result()
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", side_effect=OSError("MinIO unreachable")),
patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
mock_record.assert_awaited_once()
assert "MinIO unreachable" in mock_record.await_args[1]["error_message"]
# ---------------------------------------------------------------------------
# Edge-case tests (Task 2.2)
# Requirements: 2.1, 2.4
# ---------------------------------------------------------------------------
class TestEmptyAdapterResponse:
"""Verify behaviour when the adapter returns zero items but no error."""
@pytest.mark.asyncio
async def test_empty_items_still_uploads_artifact(self):
"""An adapter returning items=[] should upload the raw artifact and complete."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result(items=[], content_hash="empty-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path") as mock_upload,
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(0, [])) as mock_persist,
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=([], [])) as mock_dedupe,
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Raw artifact should still be uploaded
mock_upload.assert_called_once()
# Cross-source dedupe called with empty list
mock_dedupe.assert_awaited_once_with(pool, rds, [])
# Persist called with empty items
mock_persist.assert_awaited_once()
# No parsing jobs enqueued (no new doc IDs)
rds.rpush.assert_not_awaited()
# Ingestion run marked completed
update_calls = [
c for c in pool.execute.await_args_list
if "completed" in str(c)
]
assert len(update_calls) >= 1
@pytest.mark.asyncio
async def test_empty_items_with_no_content_hash(self):
"""Empty items and empty content_hash should skip dedupe check and complete."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
result = _make_adapter_result(items=[], content_hash="")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(0, [])),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=([], [])),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Redis get should NOT be called (empty content_hash skips dedupe)
rds.get.assert_not_awaited()
# Redis set should NOT be called (no hash to store)
rds.set.assert_not_awaited()
class TestPartialPersistenceFailures:
"""Verify behaviour when persist_ingestion_items returns fewer IDs than items."""
@pytest.mark.asyncio
async def test_partial_persist_enqueues_only_new_docs(self):
"""If 3 items are passed but only 2 are new, only 2 parsing jobs are enqueued."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [
{"title": "Article A", "url": "https://example.com/a"},
{"title": "Article B", "url": "https://example.com/b"},
{"title": "Article C", "url": "https://example.com/c"},
]
result = _make_adapter_result(items=items, content_hash="partial-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
# persist returns only 2 new IDs (one item was a DB-level duplicate)
new_ids = ["doc-a", "doc-c"]
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(2, new_ids)),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark,
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Only 2 parsing jobs enqueued (matching new_ids count)
assert rds.rpush.await_count == 2
# mark_as_seen called for each new doc
assert mock_mark.await_count == 2
class TestMultipleItemsSingleJob:
"""Verify correct handling of multiple items in a single ingestion job."""
@pytest.mark.asyncio
async def test_multiple_items_all_enqueued_for_parsing(self):
"""Five new items should produce five parsing queue entries."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [{"title": f"Article {i}", "url": f"https://example.com/{i}"} for i in range(5)]
new_ids = [f"doc-{i}" for i in range(5)]
result = _make_adapter_result(items=items, content_hash="multi-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(5, new_ids)),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark,
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# All 5 items enqueued for parsing
assert rds.rpush.await_count == 5
for call_args in rds.rpush.await_args_list:
assert call_args[0][0] == queue_key(QUEUE_PARSING)
# mark_as_seen called for each
assert mock_mark.await_count == 5
# Ingestion run updated with correct counts
update_calls = [
c for c in pool.execute.await_args_list
if "completed" in str(c) and "items_new" in str(c)
]
assert len(update_calls) == 1
@pytest.mark.asyncio
async def test_published_utc_tracking_picks_latest(self):
"""For news_api jobs, the latest published_utc across items updates the source config."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [
{"title": "Old", "url": "https://example.com/old", "published_utc": "2026-06-10T08:00:00Z"},
{"title": "New", "url": "https://example.com/new", "published_utc": "2026-06-15T12:00:00Z"},
{"title": "Mid", "url": "https://example.com/mid", "published_utc": "2026-06-12T10:00:00Z"},
]
new_ids = ["doc-old", "doc-new", "doc-mid"]
result = _make_adapter_result(items=items, content_hash="pub-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(3, new_ids)),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# The source config should be updated with the latest published_utc
source_update_calls = [
c for c in pool.execute.await_args_list
if "UPDATE sources" in str(c) and "last_published_at" in str(c)
]
assert len(source_update_calls) == 1
# The JSON payload should contain the latest date
update_json = source_update_calls[0][0][1]
assert "2026-06-15T12:00:00Z" in update_json
@pytest.mark.asyncio
async def test_cross_source_dup_links_company_mention(self):
"""When dedupe finds duplicates with existing IDs, the worker links them to the company."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
new_items = [{"title": "New Article", "url": "https://example.com/new"}]
dup_items = [
{"title": "Dup 1", "url": "https://example.com/dup1", "_dedupe_existing_id": "existing-doc-1"},
{"title": "Dup 2", "url": "https://example.com/dup2", "_dedupe_existing_id": "existing-doc-2"},
]
all_items = new_items + dup_items
result = _make_adapter_result(items=all_items, content_hash="dup-link-hash")
adapter = _mock_adapter(result)
adapters = {"news_api": adapter}
job = _make_job()
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new"])),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
patch("services.shared.metadata.persist_document_company_mention", new_callable=AsyncMock) as mock_mention,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# persist_document_company_mention called for each dup with existing ID
assert mock_mention.await_count == 2
linked_doc_ids = {call.kwargs["document_id"] for call in mock_mention.await_args_list}
assert linked_doc_ids == {"existing-doc-1", "existing-doc-2"}
class TestMacroNewsEdgeCases:
"""Verify edge cases specific to macro_news source type."""
@pytest.mark.asyncio
async def test_macro_news_without_company_id(self):
"""Macro news jobs may lack company_id — the worker should handle this gracefully."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
items = [{"title": "Global Event", "url": "https://example.com/macro", "published_utc": "2026-06-15T12:00:00Z"}]
result = _make_adapter_result(
source_type="macro_news",
items=items,
content_hash="macro-hash",
)
adapter = _mock_adapter(result)
adapters = {"macro_news": adapter}
job = _make_job(source_type="macro_news")
# Remove company_id to simulate macro source
job.pop("company_id", None)
new_ids = ["doc-macro"]
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, new_ids)),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# Should complete without error — parsing job enqueued
assert rds.rpush.await_count == 1
# Ingestion run created with company_id=None
create_call = pool.fetchval.await_args
assert create_call[0][1] == "src-1" # source_id
assert create_call[0][2] is None # company_id is None
@pytest.mark.asyncio
async def test_macro_news_skips_dup_company_linking(self):
"""Macro news should NOT link duplicate documents to companies (no company context)."""
pool = _mock_pool()
rds = _mock_redis()
minio_client = _mock_minio()
new_items = [{"title": "New Macro", "url": "https://example.com/new-macro"}]
dup_items = [{"title": "Dup Macro", "url": "https://example.com/dup-macro", "_dedupe_existing_id": "existing-macro"}]
all_items = new_items + dup_items
result = _make_adapter_result(
source_type="macro_news",
items=all_items,
content_hash="macro-dup-hash",
)
adapter = _mock_adapter(result)
adapters = {"macro_news": adapter}
job = _make_job(source_type="macro_news")
with (
patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"),
patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)),
patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new-macro"])),
patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock),
patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock),
patch("services.shared.metadata.persist_document_company_mention", new_callable=AsyncMock) as mock_mention,
):
from services.ingestion.worker import process_job
await process_job(job, pool, rds, minio_client, adapters)
# macro_news is excluded from dup company linking
mock_mention.assert_not_awaited()