Files
stonks-oracle/services/shared/metadata.py
T

697 lines
20 KiB
Python

"""Metadata persistence for market payloads, documents, and broker events.
Persists structured metadata records to PostgreSQL for all ingested artifacts.
Each source type has its own persistence path:
- market_api → market_snapshots table
- news_api / filings_api / web_scrape → documents + document_company_mentions
- broker → order_events or market_snapshots (for position/account snapshots)
Requirements: 3.3, 3.4, 8.3, 9.2
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timedelta, timezone
from typing import Any
import asyncpg
from services.shared.content import content_hash_str, normalize_url
logger = logging.getLogger("metadata")
async def persist_market_snapshot(
pool: asyncpg.Pool,
*,
company_id: str | None,
ticker: str,
snapshot_type: str,
data: dict[str, Any],
source_provider: str,
storage_ref: str,
content_hash: str,
captured_at: datetime | None = None,
) -> str:
"""Persist a market data snapshot to PostgreSQL.
Returns the snapshot row UUID.
"""
ts = captured_at or datetime.now(timezone.utc)
row_id = await pool.fetchval(
"""INSERT INTO market_snapshots
(company_id, ticker, snapshot_type, data, source_provider,
captured_at, storage_ref, content_hash)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7, $8)
RETURNING id""",
company_id,
ticker,
snapshot_type,
json.dumps(data),
source_provider,
ts,
storage_ref,
content_hash,
)
logger.debug("Persisted market snapshot %s for %s", row_id, ticker)
return str(row_id)
async def persist_document(
pool: asyncpg.Pool,
*,
document_type: str,
source_type: str,
publisher: str,
url: str | None,
canonical_url: str | None,
title: str,
published_at: datetime | None,
content_hash: str,
storage_ref: str,
language: str = "en",
) -> str | None:
"""Persist a document metadata record to PostgreSQL.
Returns the document row UUID, or None if a duplicate content_hash exists.
"""
exists = await pool.fetchval(
"SELECT 1 FROM documents WHERE content_hash = $1", content_hash
)
if exists:
return None
doc_id = await pool.fetchval(
"""INSERT INTO documents
(document_type, source_type, publisher, url, canonical_url,
title, published_at, content_hash, raw_storage_ref,
language, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, 'ingested')
RETURNING id""",
document_type,
source_type,
publisher,
url,
canonical_url,
title,
published_at,
content_hash,
storage_ref,
language,
)
logger.debug("Persisted document %s (%s)", doc_id, title[:60] if title else "")
return str(doc_id)
async def update_document_parse_results(
pool: asyncpg.Pool,
*,
document_id: str,
normalized_storage_ref: str | None,
parser_output_ref: str | None,
parse_quality_score: float,
parse_confidence: str,
status: str,
) -> None:
"""Update a document row with parser output references and quality scores.
Called after the parsing stage to persist normalized text location,
structured parser output location, quality score, and confidence.
Requirements: 4.1, 4.3, 9.1
"""
await pool.execute(
"""UPDATE documents SET
normalized_storage_ref = $2,
parser_output_ref = $3,
parse_quality_score = $4,
parse_confidence = $5,
status = $6,
updated_at = NOW()
WHERE id = $1""",
document_id,
normalized_storage_ref,
parser_output_ref,
parse_quality_score,
parse_confidence,
status,
)
logger.debug(
"Updated document %s parse results: quality=%.2f confidence=%s status=%s",
document_id, parse_quality_score, parse_confidence, status,
)
async def persist_document_company_mention(
pool: asyncpg.Pool,
*,
document_id: str,
company_id: str,
ticker: str,
mention_type: str = "direct",
confidence: float = 1.0,
) -> str:
"""Link a document to a company via document_company_mentions.
Returns the mention row UUID.
"""
mention_id = await pool.fetchval(
"""INSERT INTO document_company_mentions
(document_id, company_id, ticker, mention_type, confidence)
VALUES ($1::uuid, $2::uuid, $3, $4, $5)
RETURNING id""",
document_id,
company_id,
ticker,
mention_type,
confidence,
)
return str(mention_id)
async def persist_broker_event(
pool: asyncpg.Pool,
*,
ticker: str,
event_type: str,
data: dict[str, Any],
source_provider: str,
storage_ref: str,
content_hash: str,
captured_at: datetime | None = None,
) -> str:
"""Persist a broker event snapshot to market_snapshots.
Broker position/account snapshots are stored as market_snapshots
with snapshot_type prefixed by 'broker_' (e.g. broker_positions,
broker_account, broker_orders).
Returns the snapshot row UUID.
"""
ts = captured_at or datetime.now(timezone.utc)
row_id = await pool.fetchval(
"""INSERT INTO market_snapshots
(ticker, snapshot_type, data, source_provider,
captured_at, storage_ref, content_hash)
VALUES ($1, $2, $3::jsonb, $4, $5, $6, $7)
RETURNING id""",
ticker,
f"broker_{event_type}",
json.dumps(data),
source_provider,
ts,
storage_ref,
content_hash,
)
logger.debug("Persisted broker event %s for %s", row_id, ticker)
return str(row_id)
def _resolve_document_type(source_type: str) -> str:
"""Map source_type to a document_type value."""
mapping = {
"news_api": "article",
"filings_api": "filing",
"web_scrape": "press_release",
}
return mapping.get(source_type, "article")
def _extract_publisher(item: dict[str, Any]) -> str:
"""Extract publisher name from an adapter item dict."""
if item.get("publisher"):
return str(item["publisher"])
source = item.get("source")
if isinstance(source, dict):
return source.get("name", "")
if source:
return str(source)
return ""
def _parse_published_at(item: dict[str, Any]) -> datetime | None:
"""Parse published_at from various adapter item formats."""
raw = item.get("publishedAt") or item.get("published_at") or item.get("published_utc")
if not raw:
return None
if isinstance(raw, datetime):
return raw
try:
return datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
except (ValueError, TypeError):
return None
async def persist_ingestion_items(
pool: asyncpg.Pool,
*,
source_type: str,
ticker: str,
company_id: str | None,
items: list[dict[str, Any]],
storage_ref: str,
adapter_metadata: dict[str, Any],
content_hash: str,
) -> tuple[int, list[str]]:
"""Route ingestion items to the correct persistence path.
Returns (new_item_count, list_of_new_ids).
"""
if source_type == "market_api":
return await _persist_market_items(
pool,
ticker=ticker,
company_id=company_id,
items=items,
storage_ref=storage_ref,
provider=adapter_metadata.get("provider", "unknown"),
content_hash=content_hash,
)
if source_type == "broker":
return await _persist_broker_items(
pool,
ticker=ticker,
items=items,
storage_ref=storage_ref,
provider=adapter_metadata.get("provider", "unknown"),
endpoint=adapter_metadata.get("endpoint", "positions"),
content_hash=content_hash,
)
# Document types: news_api, filings_api, web_scrape
return await _persist_document_items(
pool,
source_type=source_type,
ticker=ticker,
company_id=company_id,
items=items,
storage_ref=storage_ref,
)
async def _persist_market_items(
pool: asyncpg.Pool,
*,
ticker: str,
company_id: str | None,
items: list[dict[str, Any]],
storage_ref: str,
provider: str,
content_hash: str,
) -> tuple[int, list[str]]:
"""Persist market data items as market_snapshots rows."""
ids: list[str] = []
for item in items:
item_hash = content_hash_str(json.dumps(item, sort_keys=True))
# Skip duplicates
exists = await pool.fetchval(
"SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash
)
if exists:
continue
snapshot_type = _infer_market_snapshot_type(item)
row_id = await persist_market_snapshot(
pool,
company_id=company_id,
ticker=ticker,
snapshot_type=snapshot_type,
data=item,
source_provider=provider,
storage_ref=storage_ref,
content_hash=item_hash,
)
ids.append(row_id)
return len(ids), ids
def _infer_market_snapshot_type(item: dict[str, Any]) -> str:
"""Infer snapshot_type from market data item fields."""
# Polygon aggregate bars have 'o', 'h', 'l', 'c' fields
if all(k in item for k in ("o", "h", "l", "c")):
return "bar"
# Ticker details have 'market_cap' or 'sic_code'
if "market_cap" in item or "sic_code" in item:
return "ticker_details"
# Quote snapshots
if "ask" in item or "bid" in item:
return "quote"
return "snapshot"
async def _persist_broker_items(
pool: asyncpg.Pool,
*,
ticker: str,
items: list[dict[str, Any]],
storage_ref: str,
provider: str,
endpoint: str,
content_hash: str,
) -> tuple[int, list[str]]:
"""Persist broker fetch items as market_snapshots with broker_ prefix."""
ids: list[str] = []
for item in items:
item_hash = content_hash_str(json.dumps(item, sort_keys=True))
exists = await pool.fetchval(
"SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash
)
if exists:
continue
row_id = await persist_broker_event(
pool,
ticker=ticker,
event_type=endpoint,
data=item,
source_provider=provider,
storage_ref=storage_ref,
content_hash=item_hash,
)
ids.append(row_id)
return len(ids), ids
async def _persist_document_items(
pool: asyncpg.Pool,
*,
source_type: str,
ticker: str,
company_id: str | None,
items: list[dict[str, Any]],
storage_ref: str,
) -> tuple[int, list[str]]:
"""Persist document items (news, filings, web scrape) to documents table."""
doc_type = _resolve_document_type(source_type)
ids: list[str] = []
for item in items:
item_hash = item.get("content_hash") or content_hash_str(
json.dumps(item, sort_keys=True)
)
title = item.get("title", item.get("name", ""))
url = item.get("url", item.get("link", item.get("article_url", "")))
canonical_url = item.get("canonical_url") or (
normalize_url(url) if url else None
)
published_at = _parse_published_at(item)
publisher = _extract_publisher(item)
doc_id = await persist_document(
pool,
document_type=doc_type,
source_type=source_type,
publisher=publisher,
url=url or None,
canonical_url=canonical_url,
title=title,
published_at=published_at,
content_hash=item_hash,
storage_ref=storage_ref,
)
if doc_id is None:
continue
# Link document to company if we have a company_id
if company_id:
await persist_document_company_mention(
pool,
document_id=doc_id,
company_id=company_id,
ticker=ticker,
)
ids.append(doc_id)
return len(ids), ids
# --- Retry and failure tracking (Requirement 3.4) ---
# Backoff constants — match scheduler defaults for consistency
RETRY_BACKOFF_BASE: int = 60
RETRY_BACKOFF_MAX: int = 3600
RETRY_MAX_COUNT: int = 10
def compute_next_retry_at(
retry_count: int,
now: datetime | None = None,
base: int = RETRY_BACKOFF_BASE,
cap: int = RETRY_BACKOFF_MAX,
) -> datetime:
"""Compute the next eligible retry time using exponential backoff.
Args:
retry_count: Current retry count (before incrementing).
now: Reference timestamp (defaults to UTC now).
base: Base delay in seconds.
cap: Maximum delay in seconds.
Returns:
Datetime of the next eligible retry.
"""
ts = now or datetime.now(timezone.utc)
delay = min(base * (2 ** min(retry_count, 8)), cap)
return ts + timedelta(seconds=delay)
async def get_source_retry_count(
pool: asyncpg.Pool,
source_id: str,
) -> int:
"""Return the retry count from the most recent failed run for a source.
If the last run succeeded or no runs exist, returns 0.
"""
row = await pool.fetchrow(
"""SELECT status, retry_count
FROM ingestion_runs
WHERE source_id = $1::uuid
ORDER BY started_at DESC
LIMIT 1""",
source_id,
)
if row and row["status"] == "failed":
return row["retry_count"] or 0
return 0
async def record_retrieval_failure(
pool: asyncpg.Pool,
run_id: str,
source_id: str,
error_message: str,
retry_count: int | None = None,
now: datetime | None = None,
) -> dict[str, Any]:
"""Record a source retrieval failure with retry policy state.
Updates the ingestion_runs row with:
- error_message: the failure reason
- retry_count: incremented from the previous failed run (or provided)
- next_retry_at: computed via exponential backoff
- status: 'failed'
If retry_count is not provided, it is looked up from the most recent
failed run for the same source and incremented.
Returns a dict with the recorded retry state for observability.
Requirement 3.4
"""
ts = now or datetime.now(timezone.utc)
if retry_count is None:
prev_count = await get_source_retry_count(pool, source_id)
retry_count = prev_count + 1
else:
retry_count = retry_count + 1
next_retry = compute_next_retry_at(retry_count - 1, now=ts)
exhausted = retry_count >= RETRY_MAX_COUNT
await pool.execute(
"""UPDATE ingestion_runs
SET status = 'failed',
error_message = $2,
retry_count = $3,
next_retry_at = $4,
completed_at = $5
WHERE id = $1""",
run_id,
error_message,
retry_count,
next_retry,
ts,
)
state = {
"run_id": run_id,
"source_id": source_id,
"retry_count": retry_count,
"next_retry_at": next_retry.isoformat(),
"exhausted": exhausted,
"error_message": error_message,
}
if exhausted:
logger.warning(
"Source %s exhausted retries (%d/%d): %s",
source_id, retry_count, RETRY_MAX_COUNT, error_message,
)
else:
logger.info(
"Source %s failed (retry %d/%d), next retry at %s: %s",
source_id, retry_count, RETRY_MAX_COUNT,
next_retry.isoformat(), error_message,
)
return state
async def persist_document_intelligence(
pool: asyncpg.Pool,
*,
document_id: str,
summary: str,
macro_themes: list[str],
novelty_score: float,
source_credibility: float,
extraction_warnings: list[str],
confidence: float,
model_provider: str,
model_name: str,
prompt_version: str,
schema_version: str,
raw_output_ref: str | None = None,
prompt_ref: str | None = None,
validation_status: str = "valid",
validation_errors: list[str] | None = None,
retry_count: int = 0,
) -> str:
"""Persist a document intelligence record to PostgreSQL.
Returns the intelligence row UUID.
Requirements: 5.3, 5.4, 9.2
"""
intel_id = await pool.fetchval(
"""INSERT INTO document_intelligence
(document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, retry_count)
VALUES ($1::uuid, $2, $3::jsonb, $4, $5, $6::jsonb, $7,
$8, $9, $10, $11, $12, $13, $14, $15::jsonb, $16)
RETURNING id""",
document_id,
summary,
json.dumps(macro_themes),
novelty_score,
source_credibility,
json.dumps(extraction_warnings),
confidence,
model_provider,
model_name,
prompt_version,
schema_version,
raw_output_ref,
prompt_ref,
validation_status,
json.dumps(validation_errors or []),
retry_count,
)
logger.debug("Persisted document intelligence %s for doc %s", intel_id, document_id)
return str(intel_id)
async def persist_document_impact(
pool: asyncpg.Pool,
*,
intelligence_id: str,
company_id: str,
ticker: str,
relevance: float,
sentiment: str,
impact_score: float,
impact_horizon: str,
catalyst_type: str,
key_facts: list[str],
risks: list[str],
evidence_spans: list[str],
) -> str:
"""Persist a per-company impact record linked to a document intelligence row.
Returns the impact record UUID.
Requirements: 5.3, 5.5, 9.2
"""
impact_id = await pool.fetchval(
"""INSERT INTO document_impact_records
(intelligence_id, company_id, ticker, relevance, sentiment,
impact_score, impact_horizon, catalyst_type,
key_facts, risks, evidence_spans)
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8,
$9::jsonb, $10::jsonb, $11::jsonb)
RETURNING id""",
intelligence_id,
company_id,
ticker,
relevance,
sentiment,
impact_score,
impact_horizon,
catalyst_type,
json.dumps(key_facts),
json.dumps(risks),
json.dumps(evidence_spans),
)
logger.debug("Persisted impact record %s for %s", impact_id, ticker)
return str(impact_id)
async def update_document_status(
pool: asyncpg.Pool,
*,
document_id: str,
status: str,
) -> None:
"""Update the status field on a document row.
Used to advance documents through the pipeline: ingested → parsed → extracted → failed.
Requirements: 5.4
"""
await pool.execute(
"""UPDATE documents SET status = $2, updated_at = NOW() WHERE id = $1::uuid""",
document_id,
status,
)
logger.debug("Updated document %s status to %s", document_id, status)
async def reset_source_retry_state(
pool: asyncpg.Pool,
source_id: str,
) -> None:
"""Reset retry state for a source after a successful run.
Sets retry_count=0 and next_retry_at=NULL on the most recent run.
Called after a successful ingestion to clear any accumulated backoff.
"""
await pool.execute(
"""UPDATE ingestion_runs
SET retry_count = 0, next_retry_at = NULL
WHERE id = (
SELECT id FROM ingestion_runs
WHERE source_id = $1::uuid
ORDER BY started_at DESC
LIMIT 1
)""",
source_id,
)