"""Unit tests for ingestion worker process_job function. Covers: successful job processing, adapter error with retry, retry exhaustion → dead-letter queue (via record_retrieval_failure), content hash deduplication skip, cross-source dedup via dedupe_items, and error handling paths (unexpected exceptions). Requirements: 2.1, 2.2, 2.3, 2.4 """ from __future__ import annotations import uuid from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest from services.adapters.base import AdapterResult from services.shared.redis_keys import ( QUEUE_PARSING, queue_key, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _run_id() -> uuid.UUID: return uuid.UUID("00000000-0000-0000-0000-000000000001") def _make_job( source_type: str = "news_api", ticker: str = "AAPL", source_id: str = "src-1", company_id: str = "cid-1", config: dict | None = None, ) -> dict: return { "source_type": source_type, "ticker": ticker, "source_id": source_id, "company_id": company_id, "config": config or {}, } def _make_adapter_result( source_type: str = "news_api", ticker: str = "AAPL", items: list | None = None, raw_payload: bytes = b'{"data": []}', content_hash: str = "abc123", error: str | None = None, ) -> AdapterResult: return AdapterResult( source_type=source_type, ticker=ticker, items=items if items is not None else [{"title": "Test Article", "url": "https://example.com/1"}], raw_payload=raw_payload, content_hash=content_hash, fetched_at=datetime(2026, 6, 15, 12, 0, 0, tzinfo=timezone.utc), error=error, ) def _mock_pool() -> AsyncMock: pool = AsyncMock() pool.fetchval = AsyncMock(return_value=_run_id()) pool.execute = AsyncMock(return_value="UPDATE 1") pool.fetchrow = AsyncMock(return_value=None) pool.fetch = AsyncMock(return_value=[]) return pool def _mock_redis() -> AsyncMock: rds = AsyncMock() rds.rpush = AsyncMock(return_value=1) rds.set = AsyncMock(return_value=True) rds.get = AsyncMock(return_value=None) rds.lpop = AsyncMock(return_value=None) return rds def _mock_minio() -> MagicMock: return MagicMock() def _mock_adapter(result: AdapterResult | None = None) -> AsyncMock: adapter = AsyncMock() adapter.fetch = AsyncMock(return_value=result or _make_adapter_result()) return adapter # --------------------------------------------------------------------------- # Test: Successful job processing # --------------------------------------------------------------------------- class TestSuccessfulJobProcessing: """Verify the happy path: adapter returns items, they are persisted and enqueued.""" @pytest.mark.asyncio async def test_successful_news_ingestion(self): """A news_api job with new items should persist, enqueue for parsing, and mark complete.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [ {"title": "Article 1", "url": "https://example.com/1"}, {"title": "Article 2", "url": "https://example.com/2"}, ] result = _make_adapter_result(items=items, content_hash="hash1") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() new_doc_ids = ["doc-1", "doc-2"] with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path") as mock_upload, patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(2, new_doc_ids)) as mock_persist, patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])) as mock_dedupe, patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark, patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock) as mock_reset, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Adapter was called adapter.fetch.assert_awaited_once_with("AAPL", {}) # Ingestion run was created pool.fetchval.assert_awaited_once() # Raw artifact uploaded to MinIO mock_upload.assert_called_once() # Content hash was checked in Redis and then set rds.get.assert_awaited() rds.set.assert_awaited() # Cross-source dedupe was called (news_api is not market_api/broker) mock_dedupe.assert_awaited_once() # Items were persisted mock_persist.assert_awaited_once() # New docs enqueued for parsing assert rds.rpush.await_count == len(new_doc_ids) for call_args in rds.rpush.await_args_list: assert call_args[0][0] == queue_key(QUEUE_PARSING) # mark_as_seen called for each new item assert mock_mark.await_count == len(new_doc_ids) # Retry state reset after success mock_reset.assert_awaited_once_with(pool, "src-1") # Ingestion run updated to completed update_calls = [ c for c in pool.execute.await_args_list if "completed" in str(c) ] assert len(update_calls) >= 1 # --------------------------------------------------------------------------- # Test: Adapter error with retry # --------------------------------------------------------------------------- class TestAdapterErrorWithRetry: """Verify that adapter errors are recorded as retrieval failures.""" @pytest.mark.asyncio async def test_adapter_error_records_failure(self): """When adapter returns an error, record_retrieval_failure is called.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result(error="API rate limited") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Failure was recorded with the error message mock_record.assert_awaited_once() call_kwargs = mock_record.await_args assert call_kwargs[1]["error_message"] == "API rate limited" assert call_kwargs[1]["run_id"] == str(_run_id()) assert call_kwargs[1]["source_id"] == "src-1" @pytest.mark.asyncio async def test_adapter_error_does_not_persist_items(self): """When adapter returns an error, no items should be persisted or enqueued.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result(error="Connection timeout") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock) as mock_persist, patch("services.ingestion.worker.upload_raw_artifact") as mock_upload, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # No items persisted, no upload mock_persist.assert_not_awaited() mock_upload.assert_not_called() # No parsing jobs enqueued rds.rpush.assert_not_awaited() # --------------------------------------------------------------------------- # Test: Retry exhaustion → dead-letter queue # --------------------------------------------------------------------------- class TestRetryExhaustion: """Verify that record_retrieval_failure handles retry exhaustion. The DLQ routing is handled inside record_retrieval_failure in the metadata module. The worker calls record_retrieval_failure on both adapter errors and unexpected exceptions. We verify the worker correctly delegates to record_retrieval_failure in both cases. """ @pytest.mark.asyncio async def test_adapter_error_delegates_to_record_failure(self): """Adapter error path calls record_retrieval_failure which manages retry state.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result(error="Server error 500") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock, return_value={"exhausted": True, "retry_count": 5}) as mock_record, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) mock_record.assert_awaited_once_with( pool, run_id=str(_run_id()), source_id="src-1", error_message="Server error 500", ) @pytest.mark.asyncio async def test_exception_delegates_to_record_failure(self): """Unexpected exception path also calls record_retrieval_failure.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() adapter = AsyncMock() adapter.fetch = AsyncMock(side_effect=RuntimeError("Unexpected crash")) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) mock_record.assert_awaited_once() call_kwargs = mock_record.await_args assert "Unexpected crash" in call_kwargs[1]["error_message"] # --------------------------------------------------------------------------- # Test: Content hash deduplication skip # --------------------------------------------------------------------------- class TestContentHashDedup: """Verify that a previously-seen content hash causes the job to skip processing.""" @pytest.mark.asyncio async def test_duplicate_content_hash_skips_processing(self): """When Redis reports the content hash is already seen, skip persistence.""" pool = _mock_pool() rds = _mock_redis() # Simulate content hash already in Redis rds.get = AsyncMock(return_value=b"1") minio_client = _mock_minio() result = _make_adapter_result(content_hash="already-seen-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock) as mock_persist, patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock) as mock_dedupe, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Items should NOT be persisted mock_persist.assert_not_awaited() # Cross-source dedupe should NOT be called mock_dedupe.assert_not_awaited() # No parsing jobs enqueued rds.rpush.assert_not_awaited() # Ingestion run updated with items_new=0 update_calls = [ c for c in pool.execute.await_args_list if "items_new=0" in str(c) ] assert len(update_calls) == 1 @pytest.mark.asyncio async def test_no_content_hash_skips_dedupe_check(self): """When content_hash is empty, the Redis dedupe check is skipped.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [{"title": "Article"}] result = _make_adapter_result(items=items, content_hash="") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-1"])), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Redis get should not be called for dedupe (empty content_hash) rds.get.assert_not_awaited() # But items should still be persisted # (persist_ingestion_items was called via the patch) # --------------------------------------------------------------------------- # Test: Cross-source dedup via dedupe_items # --------------------------------------------------------------------------- class TestCrossSourceDedup: """Verify cross-source deduplication partitions items correctly.""" @pytest.mark.asyncio async def test_dedupe_items_filters_duplicates(self): """When dedupe_items finds duplicates, only new items are persisted.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() all_items = [ {"title": "New Article", "url": "https://example.com/new"}, {"title": "Dup Article", "url": "https://example.com/dup", "_dedupe_existing_id": "existing-doc"}, ] new_items = [all_items[0]] dup_items = [all_items[1]] result = _make_adapter_result(items=all_items, content_hash="hash-x") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)) as mock_dedupe, patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new"])) as mock_persist, patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # dedupe_items was called with all items mock_dedupe.assert_awaited_once_with(pool, rds, all_items) # persist_ingestion_items received only the new items persist_call = mock_persist.await_args assert persist_call[1]["items"] == new_items @pytest.mark.asyncio async def test_market_api_skips_cross_source_dedupe(self): """Market API jobs should NOT go through cross-source deduplication.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [{"close": 150.0, "volume": 1000000}] result = _make_adapter_result( source_type="market_api", items=items, content_hash="market-hash", ) adapter = _mock_adapter(result) adapters = {"market_api": adapter} job = _make_job(source_type="market_api") with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock) as mock_dedupe, patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["snap-1"])), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Cross-source dedupe should NOT be called for market_api mock_dedupe.assert_not_awaited() # No parsing jobs enqueued for market data rds.rpush.assert_not_awaited() # --------------------------------------------------------------------------- # Test: Error handling paths # --------------------------------------------------------------------------- class TestErrorHandling: """Verify error handling for unexpected exceptions and missing adapters.""" @pytest.mark.asyncio async def test_unknown_source_type_returns_early(self): """A job with no matching adapter should log a warning and return.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() adapters = {"news_api": _mock_adapter()} job = _make_job(source_type="unknown_type") from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # No ingestion run should be created for unknown adapter pool.fetchval.assert_not_awaited() @pytest.mark.asyncio async def test_unexpected_exception_records_failure(self): """An unexpected exception during processing records the failure.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() adapter = AsyncMock() adapter.fetch = AsyncMock(side_effect=ConnectionError("DB gone")) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) mock_record.assert_awaited_once() call_kwargs = mock_record.await_args assert "DB gone" in call_kwargs[1]["error_message"] assert call_kwargs[1]["source_id"] == "src-1" @pytest.mark.asyncio async def test_upload_failure_records_error(self): """If MinIO upload raises, the exception handler records the failure.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result() adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", side_effect=OSError("MinIO unreachable")), patch("services.ingestion.worker.record_retrieval_failure", new_callable=AsyncMock) as mock_record, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) mock_record.assert_awaited_once() assert "MinIO unreachable" in mock_record.await_args[1]["error_message"] # --------------------------------------------------------------------------- # Edge-case tests (Task 2.2) # Requirements: 2.1, 2.4 # --------------------------------------------------------------------------- class TestEmptyAdapterResponse: """Verify behaviour when the adapter returns zero items but no error.""" @pytest.mark.asyncio async def test_empty_items_still_uploads_artifact(self): """An adapter returning items=[] should upload the raw artifact and complete.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result(items=[], content_hash="empty-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path") as mock_upload, patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(0, [])) as mock_persist, patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=([], [])) as mock_dedupe, patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Raw artifact should still be uploaded mock_upload.assert_called_once() # Cross-source dedupe called with empty list mock_dedupe.assert_awaited_once_with(pool, rds, []) # Persist called with empty items mock_persist.assert_awaited_once() # No parsing jobs enqueued (no new doc IDs) rds.rpush.assert_not_awaited() # Ingestion run marked completed update_calls = [ c for c in pool.execute.await_args_list if "completed" in str(c) ] assert len(update_calls) >= 1 @pytest.mark.asyncio async def test_empty_items_with_no_content_hash(self): """Empty items and empty content_hash should skip dedupe check and complete.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() result = _make_adapter_result(items=[], content_hash="") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(0, [])), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=([], [])), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Redis get should NOT be called (empty content_hash skips dedupe) rds.get.assert_not_awaited() # Redis set should NOT be called (no hash to store) rds.set.assert_not_awaited() class TestPartialPersistenceFailures: """Verify behaviour when persist_ingestion_items returns fewer IDs than items.""" @pytest.mark.asyncio async def test_partial_persist_enqueues_only_new_docs(self): """If 3 items are passed but only 2 are new, only 2 parsing jobs are enqueued.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [ {"title": "Article A", "url": "https://example.com/a"}, {"title": "Article B", "url": "https://example.com/b"}, {"title": "Article C", "url": "https://example.com/c"}, ] result = _make_adapter_result(items=items, content_hash="partial-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() # persist returns only 2 new IDs (one item was a DB-level duplicate) new_ids = ["doc-a", "doc-c"] with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(2, new_ids)), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark, patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Only 2 parsing jobs enqueued (matching new_ids count) assert rds.rpush.await_count == 2 # mark_as_seen called for each new doc assert mock_mark.await_count == 2 class TestMultipleItemsSingleJob: """Verify correct handling of multiple items in a single ingestion job.""" @pytest.mark.asyncio async def test_multiple_items_all_enqueued_for_parsing(self): """Five new items should produce five parsing queue entries.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [{"title": f"Article {i}", "url": f"https://example.com/{i}"} for i in range(5)] new_ids = [f"doc-{i}" for i in range(5)] result = _make_adapter_result(items=items, content_hash="multi-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(5, new_ids)), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock) as mock_mark, patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # All 5 items enqueued for parsing assert rds.rpush.await_count == 5 for call_args in rds.rpush.await_args_list: assert call_args[0][0] == queue_key(QUEUE_PARSING) # mark_as_seen called for each assert mock_mark.await_count == 5 # Ingestion run updated with correct counts update_calls = [ c for c in pool.execute.await_args_list if "completed" in str(c) and "items_new" in str(c) ] assert len(update_calls) == 1 @pytest.mark.asyncio async def test_published_utc_tracking_picks_latest(self): """For news_api jobs, the latest published_utc across items updates the source config.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [ {"title": "Old", "url": "https://example.com/old", "published_utc": "2026-06-10T08:00:00Z"}, {"title": "New", "url": "https://example.com/new", "published_utc": "2026-06-15T12:00:00Z"}, {"title": "Mid", "url": "https://example.com/mid", "published_utc": "2026-06-12T10:00:00Z"}, ] new_ids = ["doc-old", "doc-new", "doc-mid"] result = _make_adapter_result(items=items, content_hash="pub-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(3, new_ids)), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # The source config should be updated with the latest published_utc source_update_calls = [ c for c in pool.execute.await_args_list if "UPDATE sources" in str(c) and "last_published_at" in str(c) ] assert len(source_update_calls) == 1 # The JSON payload should contain the latest date update_json = source_update_calls[0][0][1] assert "2026-06-15T12:00:00Z" in update_json @pytest.mark.asyncio async def test_cross_source_dup_links_company_mention(self): """When dedupe finds duplicates with existing IDs, the worker links them to the company.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() new_items = [{"title": "New Article", "url": "https://example.com/new"}] dup_items = [ {"title": "Dup 1", "url": "https://example.com/dup1", "_dedupe_existing_id": "existing-doc-1"}, {"title": "Dup 2", "url": "https://example.com/dup2", "_dedupe_existing_id": "existing-doc-2"}, ] all_items = new_items + dup_items result = _make_adapter_result(items=all_items, content_hash="dup-link-hash") adapter = _mock_adapter(result) adapters = {"news_api": adapter} job = _make_job() with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new"])), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), patch("services.shared.metadata.persist_document_company_mention", new_callable=AsyncMock) as mock_mention, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # persist_document_company_mention called for each dup with existing ID assert mock_mention.await_count == 2 linked_doc_ids = {call.kwargs["document_id"] for call in mock_mention.await_args_list} assert linked_doc_ids == {"existing-doc-1", "existing-doc-2"} class TestMacroNewsEdgeCases: """Verify edge cases specific to macro_news source type.""" @pytest.mark.asyncio async def test_macro_news_without_company_id(self): """Macro news jobs may lack company_id — the worker should handle this gracefully.""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() items = [{"title": "Global Event", "url": "https://example.com/macro", "published_utc": "2026-06-15T12:00:00Z"}] result = _make_adapter_result( source_type="macro_news", items=items, content_hash="macro-hash", ) adapter = _mock_adapter(result) adapters = {"macro_news": adapter} job = _make_job(source_type="macro_news") # Remove company_id to simulate macro source job.pop("company_id", None) new_ids = ["doc-macro"] with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(items, [])), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, new_ids)), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # Should complete without error — parsing job enqueued assert rds.rpush.await_count == 1 # Ingestion run created with company_id=None create_call = pool.fetchval.await_args assert create_call[0][1] == "src-1" # source_id assert create_call[0][2] is None # company_id is None @pytest.mark.asyncio async def test_macro_news_skips_dup_company_linking(self): """Macro news should NOT link duplicate documents to companies (no company context).""" pool = _mock_pool() rds = _mock_redis() minio_client = _mock_minio() new_items = [{"title": "New Macro", "url": "https://example.com/new-macro"}] dup_items = [{"title": "Dup Macro", "url": "https://example.com/dup-macro", "_dedupe_existing_id": "existing-macro"}] all_items = new_items + dup_items result = _make_adapter_result( source_type="macro_news", items=all_items, content_hash="macro-dup-hash", ) adapter = _mock_adapter(result) adapters = {"macro_news": adapter} job = _make_job(source_type="macro_news") with ( patch("services.ingestion.worker.upload_raw_artifact", return_value="s3://bucket/path"), patch("services.ingestion.worker.dedupe_items", new_callable=AsyncMock, return_value=(new_items, dup_items)), patch("services.ingestion.worker.persist_ingestion_items", new_callable=AsyncMock, return_value=(1, ["doc-new-macro"])), patch("services.ingestion.worker.mark_as_seen", new_callable=AsyncMock), patch("services.ingestion.worker.reset_source_retry_state", new_callable=AsyncMock), patch("services.shared.metadata.persist_document_company_mention", new_callable=AsyncMock) as mock_mention, ): from services.ingestion.worker import process_job await process_job(job, pool, rds, minio_client, adapters) # macro_news is excluded from dup company linking mock_mention.assert_not_awaited()