"""Tests for cross-source deduplication logic. Validates the pure functions and key-building helpers in services.shared.dedupe. Async functions that require Redis/PostgreSQL are tested with lightweight fakes. Requirements: 3.2, 3.3 """ from __future__ import annotations import pytest from services.shared.dedupe import ( DedupeResult, _hash_dedupe_key, _url_dedupe_key, check_duplicate, dedupe_items, mark_as_seen, ) from services.shared.redis_keys import DEDUPE_PREFIX class TestDedupeKeyBuilders: def test_hash_dedupe_key_format(self): key = _hash_dedupe_key("abc123") assert key == f"{DEDUPE_PREFIX}:abc123" def test_url_dedupe_key_is_hashed(self): key = _url_dedupe_key("https://example.com/article") assert key.startswith(f"{DEDUPE_PREFIX}:url:") # Should be deterministic assert key == _url_dedupe_key("https://example.com/article") def test_url_dedupe_key_differs_for_different_urls(self): k1 = _url_dedupe_key("https://a.com/1") k2 = _url_dedupe_key("https://b.com/2") assert k1 != k2 class TestDedupeResult: def test_not_duplicate(self): r = DedupeResult(is_duplicate=False) assert not r.is_duplicate assert r.existing_document_id is None assert r.match_type is None def test_duplicate_with_details(self): r = DedupeResult( is_duplicate=True, existing_document_id="doc-123", match_type="canonical_url", ) assert r.is_duplicate assert r.existing_document_id == "doc-123" class FakeRedis: """Minimal async Redis fake for dedupe tests.""" def __init__(self, data: dict[str, str] | None = None): self._data: dict[str, str] = data or {} async def get(self, key: str) -> str | None: return self._data.get(key) async def set(self, key: str, value: str, ex: int | None = None) -> None: self._data[key] = value class FakePool: """Minimal async PG pool fake that returns None for all queries.""" def __init__(self, rows: dict[str, dict | None] | None = None): self._rows = rows or {} async def fetchrow(self, query: str, *args) -> dict | None: # Match on the first arg (content_hash or canonical_url) if args: return self._rows.get(str(args[0])) return None @pytest.mark.asyncio async def test_check_duplicate_no_match(): rds = FakeRedis() pool = FakePool() result = await check_duplicate( pool, rds, content_hash="newhash", url="https://example.com/new" ) assert not result.is_duplicate @pytest.mark.asyncio async def test_check_duplicate_redis_hash_hit(): hash_key = _hash_dedupe_key("existinghash") rds = FakeRedis({hash_key: "doc-abc"}) pool = FakePool() result = await check_duplicate(pool, rds, content_hash="existinghash") assert result.is_duplicate assert result.existing_document_id == "doc-abc" assert result.match_type == "content_hash" @pytest.mark.asyncio async def test_check_duplicate_redis_url_hit(): canonical = "https://example.com/article" url_key = _url_dedupe_key(canonical) rds = FakeRedis({url_key: "doc-xyz"}) pool = FakePool() result = await check_duplicate( pool, rds, content_hash="newhash", canonical_url=canonical ) assert result.is_duplicate assert result.existing_document_id == "doc-xyz" assert result.match_type == "canonical_url" @pytest.mark.asyncio async def test_check_duplicate_pg_hash_fallback(): rds = FakeRedis() pool = FakePool({"pghash": {"id": "doc-pg1"}}) result = await check_duplicate(pool, rds, content_hash="pghash") assert result.is_duplicate assert result.existing_document_id == "doc-pg1" assert result.match_type == "content_hash" # Should have warmed Redis cache assert rds._data.get(_hash_dedupe_key("pghash")) == "doc-pg1" @pytest.mark.asyncio async def test_check_duplicate_pg_url_fallback(): canonical = "https://example.com/filing" rds = FakeRedis() pool = FakePool({canonical: {"id": "doc-pg2"}}) result = await check_duplicate( pool, rds, content_hash="nomatch", canonical_url=canonical ) assert result.is_duplicate assert result.existing_document_id == "doc-pg2" assert result.match_type == "canonical_url" @pytest.mark.asyncio async def test_dedupe_items_partitions_correctly(): """dedupe_items should split items into new and duplicate groups.""" existing_hash = "existinghash" hash_key = _hash_dedupe_key(existing_hash) rds = FakeRedis({hash_key: "doc-old"}) pool = FakePool() items = [ {"title": "New Article", "content_hash": "newhash", "url": "https://a.com/1"}, {"title": "Dup Article", "content_hash": existing_hash, "url": "https://b.com/2"}, {"title": "Another New", "content_hash": "anothernew", "url": "https://c.com/3"}, ] new, dups = await dedupe_items(pool, rds, items) assert len(new) == 2 assert len(dups) == 1 assert dups[0]["title"] == "Dup Article" assert dups[0]["_dedupe_existing_id"] == "doc-old" @pytest.mark.asyncio async def test_mark_as_seen_sets_redis_keys(): rds = FakeRedis() await mark_as_seen( rds, content_hash="hash123", canonical_url="https://example.com/page", document_id="doc-new", ) assert rds._data[_hash_dedupe_key("hash123")] == "doc-new" assert rds._data[_url_dedupe_key("https://example.com/page")] == "doc-new" @pytest.mark.asyncio async def test_mark_as_seen_handles_none_url(): rds = FakeRedis() await mark_as_seen( rds, content_hash="hash456", canonical_url=None, document_id="doc-x" ) assert rds._data[_hash_dedupe_key("hash456")] == "doc-x" # No URL key should be set assert len(rds._data) == 1