"""Metadata persistence for market payloads, documents, and broker events. Persists structured metadata records to PostgreSQL for all ingested artifacts. Each source type has its own persistence path: - market_api → market_snapshots table - news_api / filings_api / web_scrape → documents + document_company_mentions - broker → order_events or market_snapshots (for position/account snapshots) Requirements: 3.3, 3.4, 8.3, 9.2 """ from __future__ import annotations import json import logging from datetime import datetime, timedelta, timezone from typing import Any import asyncpg from services.shared.content import content_hash_str, normalize_url logger = logging.getLogger("metadata") async def persist_market_snapshot( pool: asyncpg.Pool, *, company_id: str | None, ticker: str, snapshot_type: str, data: dict[str, Any], source_provider: str, storage_ref: str, content_hash: str, captured_at: datetime | None = None, ) -> str: """Persist a market data snapshot to PostgreSQL. Returns the snapshot row UUID. """ ts = captured_at or datetime.now(timezone.utc) row_id = await pool.fetchval( """INSERT INTO market_snapshots (company_id, ticker, snapshot_type, data, source_provider, captured_at, storage_ref, content_hash) VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7, $8) RETURNING id""", company_id, ticker, snapshot_type, json.dumps(data), source_provider, ts, storage_ref, content_hash, ) logger.debug("Persisted market snapshot %s for %s", row_id, ticker) return str(row_id) async def persist_document( pool: asyncpg.Pool, *, document_type: str, source_type: str, publisher: str, url: str | None, canonical_url: str | None, title: str, published_at: datetime | None, content_hash: str, storage_ref: str, language: str = "en", ) -> str | None: """Persist a document metadata record to PostgreSQL. Returns the document row UUID, or None if a duplicate content_hash exists. """ exists = await pool.fetchval( "SELECT 1 FROM documents WHERE content_hash = $1", content_hash ) if exists: return None doc_id = await pool.fetchval( """INSERT INTO documents (document_type, source_type, publisher, url, canonical_url, title, published_at, content_hash, raw_storage_ref, language, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, 'ingested') RETURNING id""", document_type, source_type, publisher, url, canonical_url, title, published_at, content_hash, storage_ref, language, ) logger.debug("Persisted document %s (%s)", doc_id, title[:60] if title else "") return str(doc_id) async def update_document_parse_results( pool: asyncpg.Pool, *, document_id: str, normalized_storage_ref: str | None, parser_output_ref: str | None, parse_quality_score: float, parse_confidence: str, status: str, ) -> None: """Update a document row with parser output references and quality scores. Called after the parsing stage to persist normalized text location, structured parser output location, quality score, and confidence. Requirements: 4.1, 4.3, 9.1 """ await pool.execute( """UPDATE documents SET normalized_storage_ref = $2, parser_output_ref = $3, parse_quality_score = $4, parse_confidence = $5, status = $6, updated_at = NOW() WHERE id = $1""", document_id, normalized_storage_ref, parser_output_ref, parse_quality_score, parse_confidence, status, ) logger.debug( "Updated document %s parse results: quality=%.2f confidence=%s status=%s", document_id, parse_quality_score, parse_confidence, status, ) async def persist_document_company_mention( pool: asyncpg.Pool, *, document_id: str, company_id: str, ticker: str, mention_type: str = "direct", confidence: float = 1.0, ) -> str: """Link a document to a company via document_company_mentions. Returns the mention row UUID. """ mention_id = await pool.fetchval( """INSERT INTO document_company_mentions (document_id, company_id, ticker, mention_type, confidence) VALUES ($1::uuid, $2::uuid, $3, $4, $5) RETURNING id""", document_id, company_id, ticker, mention_type, confidence, ) return str(mention_id) async def persist_broker_event( pool: asyncpg.Pool, *, ticker: str, event_type: str, data: dict[str, Any], source_provider: str, storage_ref: str, content_hash: str, captured_at: datetime | None = None, ) -> str: """Persist a broker event snapshot to market_snapshots. Broker position/account snapshots are stored as market_snapshots with snapshot_type prefixed by 'broker_' (e.g. broker_positions, broker_account, broker_orders). Returns the snapshot row UUID. """ ts = captured_at or datetime.now(timezone.utc) row_id = await pool.fetchval( """INSERT INTO market_snapshots (ticker, snapshot_type, data, source_provider, captured_at, storage_ref, content_hash) VALUES ($1, $2, $3::jsonb, $4, $5, $6, $7) RETURNING id""", ticker, f"broker_{event_type}", json.dumps(data), source_provider, ts, storage_ref, content_hash, ) logger.debug("Persisted broker event %s for %s", row_id, ticker) return str(row_id) def _resolve_document_type(source_type: str) -> str: """Map source_type to a document_type value.""" mapping = { "news_api": "article", "filings_api": "filing", "web_scrape": "press_release", } return mapping.get(source_type, "article") def _extract_publisher(item: dict[str, Any]) -> str: """Extract publisher name from an adapter item dict.""" if item.get("publisher"): return str(item["publisher"]) source = item.get("source") if isinstance(source, dict): return source.get("name", "") if source: return str(source) return "" def _parse_published_at(item: dict[str, Any]) -> datetime | None: """Parse published_at from various adapter item formats.""" raw = item.get("publishedAt") or item.get("published_at") if not raw: return None if isinstance(raw, datetime): return raw try: return datetime.fromisoformat(str(raw).replace("Z", "+00:00")) except (ValueError, TypeError): return None async def persist_ingestion_items( pool: asyncpg.Pool, *, source_type: str, ticker: str, company_id: str | None, items: list[dict[str, Any]], storage_ref: str, adapter_metadata: dict[str, Any], content_hash: str, ) -> tuple[int, list[str]]: """Route ingestion items to the correct persistence path. Returns (new_item_count, list_of_new_ids). """ if source_type == "market_api": return await _persist_market_items( pool, ticker=ticker, company_id=company_id, items=items, storage_ref=storage_ref, provider=adapter_metadata.get("provider", "unknown"), content_hash=content_hash, ) if source_type == "broker": return await _persist_broker_items( pool, ticker=ticker, items=items, storage_ref=storage_ref, provider=adapter_metadata.get("provider", "unknown"), endpoint=adapter_metadata.get("endpoint", "positions"), content_hash=content_hash, ) # Document types: news_api, filings_api, web_scrape return await _persist_document_items( pool, source_type=source_type, ticker=ticker, company_id=company_id, items=items, storage_ref=storage_ref, ) async def _persist_market_items( pool: asyncpg.Pool, *, ticker: str, company_id: str | None, items: list[dict[str, Any]], storage_ref: str, provider: str, content_hash: str, ) -> tuple[int, list[str]]: """Persist market data items as market_snapshots rows.""" ids: list[str] = [] for item in items: item_hash = content_hash_str(json.dumps(item, sort_keys=True)) # Skip duplicates exists = await pool.fetchval( "SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash ) if exists: continue snapshot_type = _infer_market_snapshot_type(item) row_id = await persist_market_snapshot( pool, company_id=company_id, ticker=ticker, snapshot_type=snapshot_type, data=item, source_provider=provider, storage_ref=storage_ref, content_hash=item_hash, ) ids.append(row_id) return len(ids), ids def _infer_market_snapshot_type(item: dict[str, Any]) -> str: """Infer snapshot_type from market data item fields.""" # Polygon aggregate bars have 'o', 'h', 'l', 'c' fields if all(k in item for k in ("o", "h", "l", "c")): return "bar" # Ticker details have 'market_cap' or 'sic_code' if "market_cap" in item or "sic_code" in item: return "ticker_details" # Quote snapshots if "ask" in item or "bid" in item: return "quote" return "snapshot" async def _persist_broker_items( pool: asyncpg.Pool, *, ticker: str, items: list[dict[str, Any]], storage_ref: str, provider: str, endpoint: str, content_hash: str, ) -> tuple[int, list[str]]: """Persist broker fetch items as market_snapshots with broker_ prefix.""" ids: list[str] = [] for item in items: item_hash = content_hash_str(json.dumps(item, sort_keys=True)) exists = await pool.fetchval( "SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash ) if exists: continue row_id = await persist_broker_event( pool, ticker=ticker, event_type=endpoint, data=item, source_provider=provider, storage_ref=storage_ref, content_hash=item_hash, ) ids.append(row_id) return len(ids), ids async def _persist_document_items( pool: asyncpg.Pool, *, source_type: str, ticker: str, company_id: str | None, items: list[dict[str, Any]], storage_ref: str, ) -> tuple[int, list[str]]: """Persist document items (news, filings, web scrape) to documents table.""" doc_type = _resolve_document_type(source_type) ids: list[str] = [] for item in items: item_hash = item.get("content_hash") or content_hash_str( json.dumps(item, sort_keys=True) ) title = item.get("title", item.get("name", "")) url = item.get("url", item.get("link", "")) canonical_url = item.get("canonical_url") or ( normalize_url(url) if url else None ) published_at = _parse_published_at(item) publisher = _extract_publisher(item) doc_id = await persist_document( pool, document_type=doc_type, source_type=source_type, publisher=publisher, url=url or None, canonical_url=canonical_url, title=title, published_at=published_at, content_hash=item_hash, storage_ref=storage_ref, ) if doc_id is None: continue # Link document to company if we have a company_id if company_id: await persist_document_company_mention( pool, document_id=doc_id, company_id=company_id, ticker=ticker, ) ids.append(doc_id) return len(ids), ids # --- Retry and failure tracking (Requirement 3.4) --- # Backoff constants — match scheduler defaults for consistency RETRY_BACKOFF_BASE: int = 60 RETRY_BACKOFF_MAX: int = 3600 RETRY_MAX_COUNT: int = 10 def compute_next_retry_at( retry_count: int, now: datetime | None = None, base: int = RETRY_BACKOFF_BASE, cap: int = RETRY_BACKOFF_MAX, ) -> datetime: """Compute the next eligible retry time using exponential backoff. Args: retry_count: Current retry count (before incrementing). now: Reference timestamp (defaults to UTC now). base: Base delay in seconds. cap: Maximum delay in seconds. Returns: Datetime of the next eligible retry. """ ts = now or datetime.now(timezone.utc) delay = min(base * (2 ** min(retry_count, 8)), cap) return ts + timedelta(seconds=delay) async def get_source_retry_count( pool: asyncpg.Pool, source_id: str, ) -> int: """Return the retry count from the most recent failed run for a source. If the last run succeeded or no runs exist, returns 0. """ row = await pool.fetchrow( """SELECT status, retry_count FROM ingestion_runs WHERE source_id = $1::uuid ORDER BY started_at DESC LIMIT 1""", source_id, ) if row and row["status"] == "failed": return row["retry_count"] or 0 return 0 async def record_retrieval_failure( pool: asyncpg.Pool, run_id: str, source_id: str, error_message: str, retry_count: int | None = None, now: datetime | None = None, ) -> dict[str, Any]: """Record a source retrieval failure with retry policy state. Updates the ingestion_runs row with: - error_message: the failure reason - retry_count: incremented from the previous failed run (or provided) - next_retry_at: computed via exponential backoff - status: 'failed' If retry_count is not provided, it is looked up from the most recent failed run for the same source and incremented. Returns a dict with the recorded retry state for observability. Requirement 3.4 """ ts = now or datetime.now(timezone.utc) if retry_count is None: prev_count = await get_source_retry_count(pool, source_id) retry_count = prev_count + 1 else: retry_count = retry_count + 1 next_retry = compute_next_retry_at(retry_count - 1, now=ts) exhausted = retry_count >= RETRY_MAX_COUNT await pool.execute( """UPDATE ingestion_runs SET status = 'failed', error_message = $2, retry_count = $3, next_retry_at = $4, completed_at = $5 WHERE id = $1""", run_id, error_message, retry_count, next_retry, ts, ) state = { "run_id": run_id, "source_id": source_id, "retry_count": retry_count, "next_retry_at": next_retry.isoformat(), "exhausted": exhausted, "error_message": error_message, } if exhausted: logger.warning( "Source %s exhausted retries (%d/%d): %s", source_id, retry_count, RETRY_MAX_COUNT, error_message, ) else: logger.info( "Source %s failed (retry %d/%d), next retry at %s: %s", source_id, retry_count, RETRY_MAX_COUNT, next_retry.isoformat(), error_message, ) return state async def persist_document_intelligence( pool: asyncpg.Pool, *, document_id: str, summary: str, macro_themes: list[str], novelty_score: float, source_credibility: float, extraction_warnings: list[str], confidence: float, model_provider: str, model_name: str, prompt_version: str, schema_version: str, raw_output_ref: str | None = None, prompt_ref: str | None = None, validation_status: str = "valid", validation_errors: list[str] | None = None, retry_count: int = 0, ) -> str: """Persist a document intelligence record to PostgreSQL. Returns the intelligence row UUID. Requirements: 5.3, 5.4, 9.2 """ intel_id = await pool.fetchval( """INSERT INTO document_intelligence (document_id, summary, macro_themes, novelty_score, source_credibility, extraction_warnings, confidence, model_provider, model_name, prompt_version, schema_version, raw_output_ref, prompt_ref, validation_status, validation_errors, retry_count) VALUES ($1::uuid, $2, $3::jsonb, $4, $5, $6::jsonb, $7, $8, $9, $10, $11, $12, $13, $14, $15::jsonb, $16) RETURNING id""", document_id, summary, json.dumps(macro_themes), novelty_score, source_credibility, json.dumps(extraction_warnings), confidence, model_provider, model_name, prompt_version, schema_version, raw_output_ref, prompt_ref, validation_status, json.dumps(validation_errors or []), retry_count, ) logger.debug("Persisted document intelligence %s for doc %s", intel_id, document_id) return str(intel_id) async def persist_document_impact( pool: asyncpg.Pool, *, intelligence_id: str, company_id: str, ticker: str, relevance: float, sentiment: str, impact_score: float, impact_horizon: str, catalyst_type: str, key_facts: list[str], risks: list[str], evidence_spans: list[str], ) -> str: """Persist a per-company impact record linked to a document intelligence row. Returns the impact record UUID. Requirements: 5.3, 5.5, 9.2 """ impact_id = await pool.fetchval( """INSERT INTO document_impact_records (intelligence_id, company_id, ticker, relevance, sentiment, impact_score, impact_horizon, catalyst_type, key_facts, risks, evidence_spans) VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9::jsonb, $10::jsonb, $11::jsonb) RETURNING id""", intelligence_id, company_id, ticker, relevance, sentiment, impact_score, impact_horizon, catalyst_type, json.dumps(key_facts), json.dumps(risks), json.dumps(evidence_spans), ) logger.debug("Persisted impact record %s for %s", impact_id, ticker) return str(impact_id) async def update_document_status( pool: asyncpg.Pool, *, document_id: str, status: str, ) -> None: """Update the status field on a document row. Used to advance documents through the pipeline: ingested → parsed → extracted → failed. Requirements: 5.4 """ await pool.execute( """UPDATE documents SET status = $2, updated_at = NOW() WHERE id = $1::uuid""", document_id, status, ) logger.debug("Updated document %s status to %s", document_id, status) async def reset_source_retry_state( pool: asyncpg.Pool, source_id: str, ) -> None: """Reset retry state for a source after a successful run. Sets retry_count=0 and next_retry_at=NULL on the most recent run. Called after a successful ingestion to clear any accumulated backoff. """ await pool.execute( """UPDATE ingestion_runs SET retry_count = 0, next_retry_at = NULL WHERE id = ( SELECT id FROM ingestion_runs WHERE source_id = $1::uuid ORDER BY started_at DESC LIMIT 1 )""", source_id, )