199 lines
6.3 KiB
Python
199 lines
6.3 KiB
Python
"""Cross-source deduplication for articles and filings.
|
|
|
|
Detects duplicate documents across different source types (news_api,
|
|
filings_api, web_scrape) using a layered approach:
|
|
|
|
1. Redis fast-path: check content_hash and canonical_url markers for
|
|
recently-seen documents (TTL-bounded, cheap).
|
|
2. PostgreSQL fallback: query the documents table by canonical_url or
|
|
content_hash for durable cross-source matching.
|
|
|
|
When a duplicate is detected the caller receives the existing document_id
|
|
so it can link additional company mentions without re-inserting the document.
|
|
|
|
Requirements: 3.2, 3.3
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
import redis.asyncio as aioredis
|
|
|
|
from services.shared.content import content_hash_str, normalize_url
|
|
from services.shared.redis_keys import DEDUPE_PREFIX
|
|
|
|
logger = logging.getLogger("dedupe")
|
|
|
|
# Redis TTL for dedupe markers (24 hours)
|
|
DEDUPE_TTL_SECONDS: int = 86400
|
|
|
|
|
|
def _url_dedupe_key(canonical_url: str) -> str:
|
|
"""Build a Redis key for URL-based deduplication."""
|
|
return f"{DEDUPE_PREFIX}:url:{content_hash_str(canonical_url)}"
|
|
|
|
|
|
def _hash_dedupe_key(content_hash: str) -> str:
|
|
"""Build a Redis key for content-hash-based deduplication."""
|
|
return f"{DEDUPE_PREFIX}:{content_hash}"
|
|
|
|
|
|
@dataclass
|
|
class DedupeResult:
|
|
"""Result of a deduplication check."""
|
|
|
|
is_duplicate: bool
|
|
existing_document_id: str | None = None
|
|
match_type: str | None = None # "content_hash" | "canonical_url" | None
|
|
|
|
|
|
async def check_duplicate(
|
|
pool: asyncpg.Pool,
|
|
rds: aioredis.Redis,
|
|
*,
|
|
content_hash: str,
|
|
url: str | None = None,
|
|
canonical_url: str | None = None,
|
|
) -> DedupeResult:
|
|
"""Check whether a document is a duplicate across all source types.
|
|
|
|
Checks in order of cost:
|
|
1. Redis content_hash marker (fast path)
|
|
2. Redis canonical_url marker (fast path)
|
|
3. PostgreSQL documents.content_hash (durable)
|
|
4. PostgreSQL documents.canonical_url (cross-source)
|
|
|
|
Returns a DedupeResult indicating whether the document already exists.
|
|
"""
|
|
# Resolve canonical URL if only raw URL provided
|
|
resolved_canonical = canonical_url or (normalize_url(url) if url else None)
|
|
|
|
# --- Redis fast path: content hash ---
|
|
if content_hash:
|
|
redis_key = _hash_dedupe_key(content_hash)
|
|
cached_id = await rds.get(redis_key)
|
|
if cached_id:
|
|
logger.debug("Dedupe hit (redis content_hash) for %s", content_hash[:16])
|
|
return DedupeResult(
|
|
is_duplicate=True,
|
|
existing_document_id=str(cached_id),
|
|
match_type="content_hash",
|
|
)
|
|
|
|
# --- Redis fast path: canonical URL ---
|
|
if resolved_canonical:
|
|
url_key = _url_dedupe_key(resolved_canonical)
|
|
cached_id = await rds.get(url_key)
|
|
if cached_id:
|
|
logger.debug("Dedupe hit (redis canonical_url) for %s", resolved_canonical[:60])
|
|
return DedupeResult(
|
|
is_duplicate=True,
|
|
existing_document_id=str(cached_id),
|
|
match_type="canonical_url",
|
|
)
|
|
|
|
# --- PostgreSQL fallback: content hash ---
|
|
if content_hash:
|
|
row = await pool.fetchrow(
|
|
"SELECT id FROM documents WHERE content_hash = $1 LIMIT 1",
|
|
content_hash,
|
|
)
|
|
if row:
|
|
doc_id = str(row["id"])
|
|
# Warm the Redis cache for future checks
|
|
await _set_dedupe_markers(rds, content_hash, resolved_canonical, doc_id)
|
|
logger.debug("Dedupe hit (pg content_hash) for %s", content_hash[:16])
|
|
return DedupeResult(
|
|
is_duplicate=True,
|
|
existing_document_id=doc_id,
|
|
match_type="content_hash",
|
|
)
|
|
|
|
# --- PostgreSQL fallback: canonical URL ---
|
|
if resolved_canonical:
|
|
row = await pool.fetchrow(
|
|
"SELECT id FROM documents WHERE canonical_url = $1 LIMIT 1",
|
|
resolved_canonical,
|
|
)
|
|
if row:
|
|
doc_id = str(row["id"])
|
|
await _set_dedupe_markers(rds, content_hash, resolved_canonical, doc_id)
|
|
logger.debug("Dedupe hit (pg canonical_url) for %s", resolved_canonical[:60])
|
|
return DedupeResult(
|
|
is_duplicate=True,
|
|
existing_document_id=doc_id,
|
|
match_type="canonical_url",
|
|
)
|
|
|
|
return DedupeResult(is_duplicate=False)
|
|
|
|
|
|
async def mark_as_seen(
|
|
rds: aioredis.Redis,
|
|
*,
|
|
content_hash: str,
|
|
canonical_url: str | None,
|
|
document_id: str,
|
|
) -> None:
|
|
"""Mark a newly-persisted document in Redis for fast future dedupe checks."""
|
|
await _set_dedupe_markers(rds, content_hash, canonical_url, document_id)
|
|
|
|
|
|
async def _set_dedupe_markers(
|
|
rds: aioredis.Redis,
|
|
content_hash: str | None,
|
|
canonical_url: str | None,
|
|
document_id: str,
|
|
) -> None:
|
|
"""Set Redis dedupe markers for both content hash and canonical URL."""
|
|
if content_hash:
|
|
await rds.set(
|
|
_hash_dedupe_key(content_hash), document_id, ex=DEDUPE_TTL_SECONDS
|
|
)
|
|
if canonical_url:
|
|
await rds.set(
|
|
_url_dedupe_key(canonical_url), document_id, ex=DEDUPE_TTL_SECONDS
|
|
)
|
|
|
|
|
|
async def dedupe_items(
|
|
pool: asyncpg.Pool,
|
|
rds: aioredis.Redis,
|
|
items: list[dict[str, Any]],
|
|
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
|
|
"""Partition a list of ingestion items into new and duplicate groups.
|
|
|
|
Each item is expected to have at least one of:
|
|
- content_hash: SHA-256 of the raw content
|
|
- url / canonical_url: the document URL
|
|
|
|
Returns (new_items, duplicate_items).
|
|
"""
|
|
new_items: list[dict[str, Any]] = []
|
|
dup_items: list[dict[str, Any]] = []
|
|
|
|
for item in items:
|
|
item_hash = item.get("content_hash", "")
|
|
item_url = item.get("url") or item.get("link")
|
|
item_canonical = item.get("canonical_url")
|
|
|
|
result = await check_duplicate(
|
|
pool,
|
|
rds,
|
|
content_hash=item_hash,
|
|
url=item_url,
|
|
canonical_url=item_canonical,
|
|
)
|
|
|
|
if result.is_duplicate:
|
|
item["_dedupe_match_type"] = result.match_type
|
|
item["_dedupe_existing_id"] = result.existing_document_id
|
|
dup_items.append(item)
|
|
else:
|
|
new_items.append(item)
|
|
|
|
return new_items, dup_items
|