720 lines
21 KiB
Python
720 lines
21 KiB
Python
"""Metadata persistence for market payloads, documents, and broker events.
|
|
|
|
Persists structured metadata records to PostgreSQL for all ingested artifacts.
|
|
Each source type has its own persistence path:
|
|
- market_api → market_snapshots table
|
|
- news_api / filings_api / web_scrape → documents + document_company_mentions
|
|
- broker → order_events or market_snapshots (for position/account snapshots)
|
|
|
|
Requirements: 3.3, 3.4, 8.3, 9.2
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
from services.shared.content import content_hash_str, normalize_url
|
|
|
|
logger = logging.getLogger("metadata")
|
|
|
|
|
|
async def persist_market_snapshot(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
company_id: str | None,
|
|
ticker: str,
|
|
snapshot_type: str,
|
|
data: dict[str, Any],
|
|
source_provider: str,
|
|
storage_ref: str,
|
|
content_hash: str,
|
|
captured_at: datetime | None = None,
|
|
) -> str:
|
|
"""Persist a market data snapshot to PostgreSQL.
|
|
|
|
Returns the snapshot row UUID.
|
|
"""
|
|
ts = captured_at or datetime.now(timezone.utc)
|
|
row_id = await pool.fetchval(
|
|
"""INSERT INTO market_snapshots
|
|
(company_id, ticker, snapshot_type, data, source_provider,
|
|
captured_at, storage_ref, content_hash)
|
|
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7, $8)
|
|
RETURNING id""",
|
|
company_id,
|
|
ticker,
|
|
snapshot_type,
|
|
json.dumps(data),
|
|
source_provider,
|
|
ts,
|
|
storage_ref,
|
|
content_hash,
|
|
)
|
|
logger.debug("Persisted market snapshot %s for %s", row_id, ticker)
|
|
return str(row_id)
|
|
|
|
|
|
async def persist_document(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
document_type: str,
|
|
source_type: str,
|
|
publisher: str,
|
|
url: str | None,
|
|
canonical_url: str | None,
|
|
title: str,
|
|
published_at: datetime | None,
|
|
content_hash: str,
|
|
storage_ref: str,
|
|
language: str = "en",
|
|
) -> str | None:
|
|
"""Persist a document metadata record to PostgreSQL.
|
|
|
|
Returns the document row UUID, or None if a duplicate content_hash exists.
|
|
"""
|
|
exists = await pool.fetchval(
|
|
"SELECT 1 FROM documents WHERE content_hash = $1", content_hash
|
|
)
|
|
if exists:
|
|
return None
|
|
|
|
doc_id = await pool.fetchval(
|
|
"""INSERT INTO documents
|
|
(document_type, source_type, publisher, url, canonical_url,
|
|
title, published_at, content_hash, raw_storage_ref,
|
|
language, status)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, 'ingested')
|
|
RETURNING id""",
|
|
document_type,
|
|
source_type,
|
|
publisher,
|
|
url,
|
|
canonical_url,
|
|
title,
|
|
published_at,
|
|
content_hash,
|
|
storage_ref,
|
|
language,
|
|
)
|
|
logger.debug("Persisted document %s (%s)", doc_id, title[:60] if title else "")
|
|
return str(doc_id)
|
|
|
|
|
|
async def update_document_parse_results(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
document_id: str,
|
|
normalized_storage_ref: str | None,
|
|
parser_output_ref: str | None,
|
|
parse_quality_score: float,
|
|
parse_confidence: str,
|
|
status: str,
|
|
) -> None:
|
|
"""Update a document row with parser output references and quality scores.
|
|
|
|
Called after the parsing stage to persist normalized text location,
|
|
structured parser output location, quality score, and confidence.
|
|
|
|
Requirements: 4.1, 4.3, 9.1
|
|
"""
|
|
await pool.execute(
|
|
"""UPDATE documents SET
|
|
normalized_storage_ref = $2,
|
|
parser_output_ref = $3,
|
|
parse_quality_score = $4,
|
|
parse_confidence = $5,
|
|
status = $6,
|
|
updated_at = NOW()
|
|
WHERE id = $1""",
|
|
document_id,
|
|
normalized_storage_ref,
|
|
parser_output_ref,
|
|
parse_quality_score,
|
|
parse_confidence,
|
|
status,
|
|
)
|
|
logger.debug(
|
|
"Updated document %s parse results: quality=%.2f confidence=%s status=%s",
|
|
document_id, parse_quality_score, parse_confidence, status,
|
|
)
|
|
|
|
|
|
async def persist_document_company_mention(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
document_id: str,
|
|
company_id: str,
|
|
ticker: str,
|
|
mention_type: str = "direct",
|
|
confidence: float = 1.0,
|
|
) -> str:
|
|
"""Link a document to a company via document_company_mentions.
|
|
|
|
Returns the mention row UUID.
|
|
"""
|
|
mention_id = await pool.fetchval(
|
|
"""INSERT INTO document_company_mentions
|
|
(document_id, company_id, ticker, mention_type, confidence)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5)
|
|
RETURNING id""",
|
|
document_id,
|
|
company_id,
|
|
ticker,
|
|
mention_type,
|
|
confidence,
|
|
)
|
|
return str(mention_id)
|
|
|
|
|
|
async def persist_broker_event(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
ticker: str,
|
|
event_type: str,
|
|
data: dict[str, Any],
|
|
source_provider: str,
|
|
storage_ref: str,
|
|
content_hash: str,
|
|
captured_at: datetime | None = None,
|
|
) -> str:
|
|
"""Persist a broker event snapshot to market_snapshots.
|
|
|
|
Broker position/account snapshots are stored as market_snapshots
|
|
with snapshot_type prefixed by 'broker_' (e.g. broker_positions,
|
|
broker_account, broker_orders).
|
|
|
|
Returns the snapshot row UUID.
|
|
"""
|
|
ts = captured_at or datetime.now(timezone.utc)
|
|
row_id = await pool.fetchval(
|
|
"""INSERT INTO market_snapshots
|
|
(ticker, snapshot_type, data, source_provider,
|
|
captured_at, storage_ref, content_hash)
|
|
VALUES ($1, $2, $3::jsonb, $4, $5, $6, $7)
|
|
RETURNING id""",
|
|
ticker,
|
|
f"broker_{event_type}",
|
|
json.dumps(data),
|
|
source_provider,
|
|
ts,
|
|
storage_ref,
|
|
content_hash,
|
|
)
|
|
logger.debug("Persisted broker event %s for %s", row_id, ticker)
|
|
return str(row_id)
|
|
|
|
|
|
def _resolve_document_type(source_type: str) -> str:
|
|
"""Map source_type to a document_type value.
|
|
|
|
Note: macro_news articles default to 'article' — the extractor
|
|
reclassifies them as 'macro_event' only if the content is truly
|
|
about macro/global events (not company-specific news).
|
|
"""
|
|
mapping = {
|
|
"news_api": "article",
|
|
"filings_api": "filing",
|
|
"web_scrape": "press_release",
|
|
"macro_news": "article",
|
|
}
|
|
return mapping.get(source_type, "article")
|
|
|
|
|
|
def _extract_publisher(item: dict[str, Any]) -> str:
|
|
"""Extract publisher name from an adapter item dict."""
|
|
if item.get("publisher"):
|
|
return str(item["publisher"])
|
|
source = item.get("source")
|
|
if isinstance(source, dict):
|
|
return source.get("name", "")
|
|
if source:
|
|
return str(source)
|
|
return ""
|
|
|
|
|
|
def _parse_published_at(item: dict[str, Any]) -> datetime | None:
|
|
"""Parse published_at from various adapter item formats."""
|
|
raw = item.get("publishedAt") or item.get("published_at") or item.get("published_utc")
|
|
if not raw:
|
|
return None
|
|
if isinstance(raw, datetime):
|
|
return raw
|
|
try:
|
|
return datetime.fromisoformat(str(raw).replace("Z", "+00:00"))
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
|
|
async def persist_ingestion_items(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
source_type: str,
|
|
ticker: str,
|
|
company_id: str | None,
|
|
items: list[dict[str, Any]],
|
|
storage_ref: str,
|
|
adapter_metadata: dict[str, Any],
|
|
content_hash: str,
|
|
) -> tuple[int, list[str]]:
|
|
"""Route ingestion items to the correct persistence path.
|
|
|
|
Returns (new_item_count, list_of_new_ids).
|
|
"""
|
|
if source_type == "market_api":
|
|
return await _persist_market_items(
|
|
pool,
|
|
ticker=ticker,
|
|
company_id=company_id,
|
|
items=items,
|
|
storage_ref=storage_ref,
|
|
provider=adapter_metadata.get("provider", "unknown"),
|
|
content_hash=content_hash,
|
|
)
|
|
|
|
if source_type == "broker":
|
|
return await _persist_broker_items(
|
|
pool,
|
|
ticker=ticker,
|
|
items=items,
|
|
storage_ref=storage_ref,
|
|
provider=adapter_metadata.get("provider", "unknown"),
|
|
endpoint=adapter_metadata.get("endpoint", "positions"),
|
|
content_hash=content_hash,
|
|
)
|
|
|
|
# Document types: news_api, filings_api, web_scrape
|
|
return await _persist_document_items(
|
|
pool,
|
|
source_type=source_type,
|
|
ticker=ticker,
|
|
company_id=company_id,
|
|
items=items,
|
|
storage_ref=storage_ref,
|
|
)
|
|
|
|
|
|
async def _persist_market_items(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
ticker: str,
|
|
company_id: str | None,
|
|
items: list[dict[str, Any]],
|
|
storage_ref: str,
|
|
provider: str,
|
|
content_hash: str,
|
|
) -> tuple[int, list[str]]:
|
|
"""Persist market data items as market_snapshots rows.
|
|
|
|
For grouped daily responses, each item contains a 'T' field with the
|
|
ticker. When present, the item's ticker overrides the job-level ticker.
|
|
"""
|
|
ids: list[str] = []
|
|
for item in items:
|
|
item_hash = content_hash_str(json.dumps(item, sort_keys=True))
|
|
# Skip duplicates
|
|
exists = await pool.fetchval(
|
|
"SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash
|
|
)
|
|
if exists:
|
|
continue
|
|
|
|
# Use item-level ticker if available (grouped daily), else job-level
|
|
item_ticker = item.get("T", ticker) or ticker
|
|
snapshot_type = _infer_market_snapshot_type(item)
|
|
|
|
# For grouped daily items, look up company_id by ticker
|
|
item_company_id = company_id
|
|
if item.get("T") and not company_id:
|
|
cid = await pool.fetchval(
|
|
"SELECT id FROM companies WHERE ticker = $1", item_ticker
|
|
)
|
|
if cid:
|
|
item_company_id = str(cid)
|
|
# If not in our companies table, store with company_id=NULL
|
|
|
|
row_id = await persist_market_snapshot(
|
|
pool,
|
|
company_id=item_company_id,
|
|
ticker=item_ticker,
|
|
snapshot_type=snapshot_type,
|
|
data=item,
|
|
source_provider=provider,
|
|
storage_ref=storage_ref,
|
|
content_hash=item_hash,
|
|
)
|
|
ids.append(row_id)
|
|
return len(ids), ids
|
|
|
|
|
|
def _infer_market_snapshot_type(item: dict[str, Any]) -> str:
|
|
"""Infer snapshot_type from market data item fields."""
|
|
# Polygon aggregate bars have 'o', 'h', 'l', 'c' fields
|
|
if all(k in item for k in ("o", "h", "l", "c")):
|
|
return "bar"
|
|
# Ticker details have 'market_cap' or 'sic_code'
|
|
if "market_cap" in item or "sic_code" in item:
|
|
return "ticker_details"
|
|
# Quote snapshots
|
|
if "ask" in item or "bid" in item:
|
|
return "quote"
|
|
return "snapshot"
|
|
|
|
|
|
async def _persist_broker_items(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
ticker: str,
|
|
items: list[dict[str, Any]],
|
|
storage_ref: str,
|
|
provider: str,
|
|
endpoint: str,
|
|
content_hash: str,
|
|
) -> tuple[int, list[str]]:
|
|
"""Persist broker fetch items as market_snapshots with broker_ prefix."""
|
|
ids: list[str] = []
|
|
for item in items:
|
|
item_hash = content_hash_str(json.dumps(item, sort_keys=True))
|
|
exists = await pool.fetchval(
|
|
"SELECT 1 FROM market_snapshots WHERE content_hash = $1", item_hash
|
|
)
|
|
if exists:
|
|
continue
|
|
|
|
row_id = await persist_broker_event(
|
|
pool,
|
|
ticker=ticker,
|
|
event_type=endpoint,
|
|
data=item,
|
|
source_provider=provider,
|
|
storage_ref=storage_ref,
|
|
content_hash=item_hash,
|
|
)
|
|
ids.append(row_id)
|
|
return len(ids), ids
|
|
|
|
|
|
async def _persist_document_items(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
source_type: str,
|
|
ticker: str,
|
|
company_id: str | None,
|
|
items: list[dict[str, Any]],
|
|
storage_ref: str,
|
|
) -> tuple[int, list[str]]:
|
|
"""Persist document items (news, filings, web scrape) to documents table."""
|
|
doc_type = _resolve_document_type(source_type)
|
|
ids: list[str] = []
|
|
|
|
for item in items:
|
|
item_hash = item.get("content_hash") or content_hash_str(
|
|
json.dumps(item, sort_keys=True)
|
|
)
|
|
title = item.get("title", item.get("name", ""))
|
|
url = item.get("url", item.get("link", item.get("article_url", "")))
|
|
canonical_url = item.get("canonical_url") or (
|
|
normalize_url(url) if url else None
|
|
)
|
|
published_at = _parse_published_at(item)
|
|
publisher = _extract_publisher(item)
|
|
|
|
doc_id = await persist_document(
|
|
pool,
|
|
document_type=doc_type,
|
|
source_type=source_type,
|
|
publisher=publisher,
|
|
url=url or None,
|
|
canonical_url=canonical_url,
|
|
title=title,
|
|
published_at=published_at,
|
|
content_hash=item_hash,
|
|
storage_ref=storage_ref,
|
|
)
|
|
if doc_id is None:
|
|
continue
|
|
|
|
# Link document to company if we have a company_id
|
|
if company_id:
|
|
await persist_document_company_mention(
|
|
pool,
|
|
document_id=doc_id,
|
|
company_id=company_id,
|
|
ticker=ticker,
|
|
)
|
|
|
|
ids.append(doc_id)
|
|
|
|
return len(ids), ids
|
|
|
|
|
|
# --- Retry and failure tracking (Requirement 3.4) ---
|
|
|
|
# Backoff constants — match scheduler defaults for consistency
|
|
RETRY_BACKOFF_BASE: int = 60
|
|
RETRY_BACKOFF_MAX: int = 3600
|
|
RETRY_MAX_COUNT: int = 10
|
|
|
|
|
|
def compute_next_retry_at(
|
|
retry_count: int,
|
|
now: datetime | None = None,
|
|
base: int = RETRY_BACKOFF_BASE,
|
|
cap: int = RETRY_BACKOFF_MAX,
|
|
) -> datetime:
|
|
"""Compute the next eligible retry time using exponential backoff.
|
|
|
|
Args:
|
|
retry_count: Current retry count (before incrementing).
|
|
now: Reference timestamp (defaults to UTC now).
|
|
base: Base delay in seconds.
|
|
cap: Maximum delay in seconds.
|
|
|
|
Returns:
|
|
Datetime of the next eligible retry.
|
|
"""
|
|
ts = now or datetime.now(timezone.utc)
|
|
delay = min(base * (2 ** min(retry_count, 8)), cap)
|
|
return ts + timedelta(seconds=delay)
|
|
|
|
|
|
async def get_source_retry_count(
|
|
pool: asyncpg.Pool,
|
|
source_id: str,
|
|
) -> int:
|
|
"""Return the retry count from the most recent failed run for a source.
|
|
|
|
If the last run succeeded or no runs exist, returns 0.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT status, retry_count
|
|
FROM ingestion_runs
|
|
WHERE source_id = $1::uuid
|
|
ORDER BY started_at DESC
|
|
LIMIT 1""",
|
|
source_id,
|
|
)
|
|
if row and row["status"] == "failed":
|
|
return row["retry_count"] or 0
|
|
return 0
|
|
|
|
|
|
async def record_retrieval_failure(
|
|
pool: asyncpg.Pool,
|
|
run_id: str,
|
|
source_id: str,
|
|
error_message: str,
|
|
retry_count: int | None = None,
|
|
now: datetime | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Record a source retrieval failure with retry policy state.
|
|
|
|
Updates the ingestion_runs row with:
|
|
- error_message: the failure reason
|
|
- retry_count: incremented from the previous failed run (or provided)
|
|
- next_retry_at: computed via exponential backoff
|
|
- status: 'failed'
|
|
|
|
If retry_count is not provided, it is looked up from the most recent
|
|
failed run for the same source and incremented.
|
|
|
|
Returns a dict with the recorded retry state for observability.
|
|
|
|
Requirement 3.4
|
|
"""
|
|
ts = now or datetime.now(timezone.utc)
|
|
|
|
if retry_count is None:
|
|
prev_count = await get_source_retry_count(pool, source_id)
|
|
retry_count = prev_count + 1
|
|
else:
|
|
retry_count = retry_count + 1
|
|
|
|
next_retry = compute_next_retry_at(retry_count - 1, now=ts)
|
|
exhausted = retry_count >= RETRY_MAX_COUNT
|
|
|
|
await pool.execute(
|
|
"""UPDATE ingestion_runs
|
|
SET status = 'failed',
|
|
error_message = $2,
|
|
retry_count = $3,
|
|
next_retry_at = $4,
|
|
completed_at = $5
|
|
WHERE id = $1""",
|
|
run_id,
|
|
error_message,
|
|
retry_count,
|
|
next_retry,
|
|
ts,
|
|
)
|
|
|
|
state = {
|
|
"run_id": run_id,
|
|
"source_id": source_id,
|
|
"retry_count": retry_count,
|
|
"next_retry_at": next_retry.isoformat(),
|
|
"exhausted": exhausted,
|
|
"error_message": error_message,
|
|
}
|
|
|
|
if exhausted:
|
|
logger.warning(
|
|
"Source %s exhausted retries (%d/%d): %s",
|
|
source_id, retry_count, RETRY_MAX_COUNT, error_message,
|
|
)
|
|
else:
|
|
logger.info(
|
|
"Source %s failed (retry %d/%d), next retry at %s: %s",
|
|
source_id, retry_count, RETRY_MAX_COUNT,
|
|
next_retry.isoformat(), error_message,
|
|
)
|
|
|
|
return state
|
|
|
|
|
|
async def persist_document_intelligence(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
document_id: str,
|
|
summary: str,
|
|
macro_themes: list[str],
|
|
novelty_score: float,
|
|
source_credibility: float,
|
|
extraction_warnings: list[str],
|
|
confidence: float,
|
|
model_provider: str,
|
|
model_name: str,
|
|
prompt_version: str,
|
|
schema_version: str,
|
|
raw_output_ref: str | None = None,
|
|
prompt_ref: str | None = None,
|
|
validation_status: str = "valid",
|
|
validation_errors: list[str] | None = None,
|
|
retry_count: int = 0,
|
|
) -> str:
|
|
"""Persist a document intelligence record to PostgreSQL.
|
|
|
|
Returns the intelligence row UUID.
|
|
|
|
Requirements: 5.3, 5.4, 9.2
|
|
"""
|
|
intel_id = await pool.fetchval(
|
|
"""INSERT INTO document_intelligence
|
|
(document_id, summary, macro_themes, novelty_score,
|
|
source_credibility, extraction_warnings, confidence,
|
|
model_provider, model_name, prompt_version, schema_version,
|
|
raw_output_ref, prompt_ref, validation_status,
|
|
validation_errors, retry_count)
|
|
VALUES ($1::uuid, $2, $3::jsonb, $4, $5, $6::jsonb, $7,
|
|
$8, $9, $10, $11, $12, $13, $14, $15::jsonb, $16)
|
|
RETURNING id""",
|
|
document_id,
|
|
summary,
|
|
json.dumps(macro_themes),
|
|
novelty_score,
|
|
source_credibility,
|
|
json.dumps(extraction_warnings),
|
|
confidence,
|
|
model_provider,
|
|
model_name,
|
|
prompt_version,
|
|
schema_version,
|
|
raw_output_ref,
|
|
prompt_ref,
|
|
validation_status,
|
|
json.dumps(validation_errors or []),
|
|
retry_count,
|
|
)
|
|
logger.debug("Persisted document intelligence %s for doc %s", intel_id, document_id)
|
|
return str(intel_id)
|
|
|
|
|
|
async def persist_document_impact(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
intelligence_id: str,
|
|
company_id: str,
|
|
ticker: str,
|
|
relevance: float,
|
|
sentiment: str,
|
|
impact_score: float,
|
|
impact_horizon: str,
|
|
catalyst_type: str,
|
|
key_facts: list[str],
|
|
risks: list[str],
|
|
evidence_spans: list[str],
|
|
) -> str:
|
|
"""Persist a per-company impact record linked to a document intelligence row.
|
|
|
|
Returns the impact record UUID.
|
|
|
|
Requirements: 5.3, 5.5, 9.2
|
|
"""
|
|
impact_id = await pool.fetchval(
|
|
"""INSERT INTO document_impact_records
|
|
(intelligence_id, company_id, ticker, relevance, sentiment,
|
|
impact_score, impact_horizon, catalyst_type,
|
|
key_facts, risks, evidence_spans)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8,
|
|
$9::jsonb, $10::jsonb, $11::jsonb)
|
|
RETURNING id""",
|
|
intelligence_id,
|
|
company_id,
|
|
ticker,
|
|
relevance,
|
|
sentiment,
|
|
impact_score,
|
|
impact_horizon,
|
|
catalyst_type,
|
|
json.dumps(key_facts),
|
|
json.dumps(risks),
|
|
json.dumps(evidence_spans),
|
|
)
|
|
logger.debug("Persisted impact record %s for %s", impact_id, ticker)
|
|
return str(impact_id)
|
|
|
|
|
|
async def update_document_status(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
document_id: str,
|
|
status: str,
|
|
) -> None:
|
|
"""Update the status field on a document row.
|
|
|
|
Used to advance documents through the pipeline: ingested → parsed → extracted → failed.
|
|
|
|
Requirements: 5.4
|
|
"""
|
|
await pool.execute(
|
|
"""UPDATE documents SET status = $2, updated_at = NOW() WHERE id = $1::uuid""",
|
|
document_id,
|
|
status,
|
|
)
|
|
logger.debug("Updated document %s status to %s", document_id, status)
|
|
|
|
|
|
async def reset_source_retry_state(
|
|
pool: asyncpg.Pool,
|
|
source_id: str,
|
|
) -> None:
|
|
"""Reset retry state for a source after a successful run.
|
|
|
|
Sets retry_count=0 and next_retry_at=NULL on the most recent run.
|
|
Called after a successful ingestion to clear any accumulated backoff.
|
|
"""
|
|
await pool.execute(
|
|
"""UPDATE ingestion_runs
|
|
SET retry_count = 0, next_retry_at = NULL
|
|
WHERE id = (
|
|
SELECT id FROM ingestion_runs
|
|
WHERE source_id = $1::uuid
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
)""",
|
|
source_id,
|
|
)
|