"""Cross-source deduplication for articles and filings. Detects duplicate documents across different source types (news_api, filings_api, web_scrape) using a layered approach: 1. Redis fast-path: check content_hash and canonical_url markers for recently-seen documents (TTL-bounded, cheap). 2. PostgreSQL fallback: query the documents table by canonical_url or content_hash for durable cross-source matching. When a duplicate is detected the caller receives the existing document_id so it can link additional company mentions without re-inserting the document. Requirements: 3.2, 3.3 """ from __future__ import annotations import logging from dataclasses import dataclass from typing import Any import asyncpg import redis.asyncio as aioredis from services.shared.content import content_hash_str, normalize_url from services.shared.redis_keys import DEDUPE_PREFIX logger = logging.getLogger("dedupe") # Redis TTL for dedupe markers (24 hours) DEDUPE_TTL_SECONDS: int = 86400 def _url_dedupe_key(canonical_url: str) -> str: """Build a Redis key for URL-based deduplication.""" return f"{DEDUPE_PREFIX}:url:{content_hash_str(canonical_url)}" def _hash_dedupe_key(content_hash: str) -> str: """Build a Redis key for content-hash-based deduplication.""" return f"{DEDUPE_PREFIX}:{content_hash}" @dataclass class DedupeResult: """Result of a deduplication check.""" is_duplicate: bool existing_document_id: str | None = None match_type: str | None = None # "content_hash" | "canonical_url" | None async def check_duplicate( pool: asyncpg.Pool, rds: aioredis.Redis, *, content_hash: str, url: str | None = None, canonical_url: str | None = None, ) -> DedupeResult: """Check whether a document is a duplicate across all source types. Checks in order of cost: 1. Redis content_hash marker (fast path) 2. Redis canonical_url marker (fast path) 3. PostgreSQL documents.content_hash (durable) 4. PostgreSQL documents.canonical_url (cross-source) Returns a DedupeResult indicating whether the document already exists. """ # Resolve canonical URL if only raw URL provided resolved_canonical = canonical_url or (normalize_url(url) if url else None) # --- Redis fast path: content hash --- if content_hash: redis_key = _hash_dedupe_key(content_hash) cached_id = await rds.get(redis_key) if cached_id: logger.debug("Dedupe hit (redis content_hash) for %s", content_hash[:16]) return DedupeResult( is_duplicate=True, existing_document_id=str(cached_id), match_type="content_hash", ) # --- Redis fast path: canonical URL --- if resolved_canonical: url_key = _url_dedupe_key(resolved_canonical) cached_id = await rds.get(url_key) if cached_id: logger.debug("Dedupe hit (redis canonical_url) for %s", resolved_canonical[:60]) return DedupeResult( is_duplicate=True, existing_document_id=str(cached_id), match_type="canonical_url", ) # --- PostgreSQL fallback: content hash --- if content_hash: row = await pool.fetchrow( "SELECT id FROM documents WHERE content_hash = $1 LIMIT 1", content_hash, ) if row: doc_id = str(row["id"]) # Warm the Redis cache for future checks await _set_dedupe_markers(rds, content_hash, resolved_canonical, doc_id) logger.debug("Dedupe hit (pg content_hash) for %s", content_hash[:16]) return DedupeResult( is_duplicate=True, existing_document_id=doc_id, match_type="content_hash", ) # --- PostgreSQL fallback: canonical URL --- if resolved_canonical: row = await pool.fetchrow( "SELECT id FROM documents WHERE canonical_url = $1 LIMIT 1", resolved_canonical, ) if row: doc_id = str(row["id"]) await _set_dedupe_markers(rds, content_hash, resolved_canonical, doc_id) logger.debug("Dedupe hit (pg canonical_url) for %s", resolved_canonical[:60]) return DedupeResult( is_duplicate=True, existing_document_id=doc_id, match_type="canonical_url", ) return DedupeResult(is_duplicate=False) async def mark_as_seen( rds: aioredis.Redis, *, content_hash: str, canonical_url: str | None, document_id: str, ) -> None: """Mark a newly-persisted document in Redis for fast future dedupe checks.""" await _set_dedupe_markers(rds, content_hash, canonical_url, document_id) async def _set_dedupe_markers( rds: aioredis.Redis, content_hash: str | None, canonical_url: str | None, document_id: str, ) -> None: """Set Redis dedupe markers for both content hash and canonical URL.""" if content_hash: await rds.set( _hash_dedupe_key(content_hash), document_id, ex=DEDUPE_TTL_SECONDS ) if canonical_url: await rds.set( _url_dedupe_key(canonical_url), document_id, ex=DEDUPE_TTL_SECONDS ) async def dedupe_items( pool: asyncpg.Pool, rds: aioredis.Redis, items: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: """Partition a list of ingestion items into new and duplicate groups. Each item is expected to have at least one of: - content_hash: SHA-256 of the raw content - url / canonical_url: the document URL Returns (new_items, duplicate_items). """ new_items: list[dict[str, Any]] = [] dup_items: list[dict[str, Any]] = [] for item in items: item_hash = item.get("content_hash", "") item_url = item.get("url") or item.get("link") item_canonical = item.get("canonical_url") result = await check_duplicate( pool, rds, content_hash=item_hash, url=item_url, canonical_url=item_canonical, ) if result.is_duplicate: item["_dedupe_match_type"] = result.match_type item["_dedupe_existing_id"] = result.existing_document_id dup_items.append(item) else: new_items.append(item) return new_items, dup_items