Files
stonks-oracle/services/api/app.py
T

1690 lines
62 KiB
Python

"""Query API - FastAPI application for analytics, evidence drill-down, and admin controls.
Exposes read-only endpoints for:
- Companies and watchlists (proxied from symbol registry data)
- Document timelines with intelligence
- Trend summaries
- Recommendation history with evidence
- Order history with audit trails
Requirements: 11.1, 11.2, 11.3
Design: Section 9.1 (Operational API)
"""
from __future__ import annotations
import json
import logging
import time as _time
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Any, Optional
import asyncpg
import httpx
from fastapi import FastAPI, HTTPException, Query, Request
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from pydantic import BaseModel
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from services.extractor.metrics import get_model_performance_summary
from services.shared.audit import get_entity_audit_trail, get_order_audit_trail
from services.shared.config import load_config
from services.shared.db import get_pg_pool
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
logger = logging.getLogger("query_api")
config = load_config()
pool: Optional[asyncpg.Pool] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool
setup_logging("query_api", level=config.log_level, json_output=config.json_logs)
pool = await get_pg_pool(config)
yield
await pool.close()
app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan)
class TraceMiddleware(BaseHTTPMiddleware):
"""Inject trace context for every incoming HTTP request."""
async def dispatch(self, request: Request, call_next):
trace_id = request.headers.get("x-trace-id") or new_trace_id()
set_trace_context(trace_id=trace_id)
response = await call_next(request)
response.headers["x-trace-id"] = trace_id
return response
app.add_middleware(TraceMiddleware)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _row_to_dict(row: asyncpg.Record) -> dict[str, Any]:
"""Convert an asyncpg Record to a JSON-safe dict."""
d: dict[str, Any] = {}
for key, val in dict(row).items():
if isinstance(val, datetime):
d[key] = val.isoformat()
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, list, dict, type(None))):
d[key] = str(val)
else:
d[key] = val
return d
def _parse_jsonb(val: Any) -> Any:
"""Parse a JSONB value that may come back as str or already-decoded."""
if val is None:
return None
if isinstance(val, (dict, list)):
return val
try:
return json.loads(val)
except (json.JSONDecodeError, TypeError):
return val
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@app.get("/health")
async def health():
try:
await pool.fetchval("SELECT 1")
return {"status": "ok"}
except Exception:
raise HTTPException(503, "Database unavailable")
@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics for scraping.
Requirements: 12.1, 12.2
"""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST,
)
# ---------------------------------------------------------------------------
# Companies (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/companies")
async def list_companies(
active: bool = True,
sector: Optional[str] = None,
ticker: Optional[str] = None,
):
"""List tracked companies with optional filters."""
conditions = ["c.active = $1"]
params: list[Any] = [active]
idx = 2
if sector:
conditions.append(f"c.sector = ${idx}")
params.append(sector)
idx += 1
if ticker:
conditions.append(f"c.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
where = " AND ".join(conditions)
rows = await pool.fetch(
f"""SELECT c.id, c.ticker, c.legal_name, c.exchange, c.sector,
c.industry, c.market_cap_bucket, c.active,
c.created_at, c.updated_at
FROM companies c
WHERE {where}
ORDER BY c.ticker""",
*params,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/companies/{company_id}")
async def get_company(company_id: str):
"""Get a single company with aliases and source count."""
row = await pool.fetchrow(
"""SELECT id, ticker, legal_name, exchange, sector, industry,
market_cap_bucket, active, created_at, updated_at
FROM companies WHERE id = $1""",
company_id,
)
if not row:
raise HTTPException(404, "Company not found")
result = _row_to_dict(row)
aliases = await pool.fetch(
"SELECT id, alias, alias_type FROM company_aliases WHERE company_id = $1",
company_id,
)
result["aliases"] = [dict(a) for a in aliases]
source_count = await pool.fetchval(
"SELECT COUNT(*) FROM sources WHERE company_id = $1 AND active = true",
company_id,
)
result["active_source_count"] = source_count
return result
@app.get("/api/companies/{company_id}/sources")
async def list_company_sources(company_id: str):
"""List sources configured for a company."""
rows = await pool.fetch(
"""SELECT id, source_type, source_name, config, credibility_score,
retention_days, access_policy, active
FROM sources WHERE company_id = $1 ORDER BY source_type""",
company_id,
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Document Timelines (Requirement 11.1, 11.2)
# ---------------------------------------------------------------------------
@app.get("/api/documents")
async def list_documents(
ticker: Optional[str] = None,
company_id: Optional[str] = None,
document_type: Optional[str] = None,
status: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List documents with optional filters, ordered by published_at desc."""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"""d.id IN (
SELECT document_id FROM document_company_mentions WHERE ticker = ${idx}
)""")
params.append(ticker.upper())
idx += 1
if company_id:
conditions.append(f"""d.id IN (
SELECT document_id FROM document_company_mentions WHERE company_id = ${idx}
)""")
params.append(company_id)
idx += 1
if document_type:
conditions.append(f"d.document_type = ${idx}")
params.append(document_type)
idx += 1
if status:
conditions.append(f"d.status = ${idx}")
params.append(status)
idx += 1
if since:
conditions.append(f"d.published_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT d.id, d.document_type, d.source_type, d.publisher, d.url,
d.title, d.published_at, d.retrieved_at, d.language,
d.content_hash, d.parse_quality_score, d.parse_confidence,
d.status, d.created_at
FROM documents d
{where}
ORDER BY d.published_at DESC NULLS LAST
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/documents/{document_id}")
async def get_document(document_id: str):
"""Get a single document with its intelligence extraction and company mentions."""
row = await pool.fetchrow(
"""SELECT id, document_type, source_type, publisher, url, canonical_url,
title, published_at, retrieved_at, language, content_hash,
raw_storage_ref, normalized_storage_ref,
parse_quality_score, parse_confidence, status,
created_at, updated_at
FROM documents WHERE id = $1""",
document_id,
)
if not row:
raise HTTPException(404, "Document not found")
result = _row_to_dict(row)
# Company mentions
mentions = await pool.fetch(
"""SELECT dcm.company_id, dcm.ticker, dcm.mention_type, dcm.confidence,
c.legal_name
FROM document_company_mentions dcm
JOIN companies c ON c.id = dcm.company_id
WHERE dcm.document_id = $1""",
document_id,
)
result["company_mentions"] = [_row_to_dict(m) for m in mentions]
# Intelligence extraction
intel = await pool.fetchrow(
"""SELECT id, summary, macro_themes, novelty_score, source_credibility,
extraction_warnings, confidence, model_provider, model_name,
prompt_version, schema_version, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
document_id,
)
if intel:
intel_dict = _row_to_dict(intel)
intel_dict["macro_themes"] = _parse_jsonb(intel_dict.get("macro_themes"))
intel_dict["extraction_warnings"] = _parse_jsonb(intel_dict.get("extraction_warnings"))
intel_dict["validation_errors"] = _parse_jsonb(intel_dict.get("validation_errors"))
# Impact records per company
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel["id"],
)
impact_list = []
for imp in impacts:
imp_dict = _row_to_dict(imp)
imp_dict["key_facts"] = _parse_jsonb(imp_dict.get("key_facts"))
imp_dict["risks"] = _parse_jsonb(imp_dict.get("risks"))
imp_dict["evidence_spans"] = _parse_jsonb(imp_dict.get("evidence_spans"))
impact_list.append(imp_dict)
intel_dict["company_impacts"] = impact_list
result["intelligence"] = intel_dict
else:
result["intelligence"] = None
return result
# ---------------------------------------------------------------------------
# Trend Summaries (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/trends")
async def list_trends(
ticker: Optional[str] = None,
entity_type: str = "company",
window: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List trend summaries with optional filters."""
conditions = ["entity_type = $1"]
params: list[Any] = [entity_type]
idx = 2
if ticker:
conditions.append(f"entity_id = ${idx}")
params.append(ticker.upper())
idx += 1
if window:
conditions.append(f"\"window\" = ${idx}")
params.append(window)
idx += 1
where = " AND ".join(conditions)
rows = await pool.fetch(
f"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows
WHERE {where}
ORDER BY generated_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
results = []
for r in rows:
d = _row_to_dict(r)
for jsonb_field in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
results.append(d)
return results
@app.get("/api/trends/{trend_id}")
async def get_trend(trend_id: str):
"""Get a single trend summary by ID."""
row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at, created_at
FROM trend_windows WHERE id = $1""",
trend_id,
)
if not row:
raise HTTPException(404, "Trend not found")
d = _row_to_dict(row)
for jsonb_field in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
return d
# ---------------------------------------------------------------------------
# Recommendations (Requirement 11.1, 11.2)
# ---------------------------------------------------------------------------
@app.get("/api/recommendations")
async def list_recommendations(
ticker: Optional[str] = None,
action: Optional[str] = None,
mode: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List recommendations with optional filters."""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"r.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
if action:
conditions.append(f"r.action = ${idx}")
params.append(action)
idx += 1
if mode:
conditions.append(f"r.mode = ${idx}")
params.append(mode)
idx += 1
if since:
conditions.append(f"r.generated_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence,
r.time_horizon, r.thesis, r.invalidation_conditions,
r.portfolio_pct, r.max_loss_pct, r.model_version,
r.risk_classification, r.generated_at
FROM recommendations r
{where}
ORDER BY r.generated_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
results = []
for r in rows:
d = _row_to_dict(r)
d["invalidation_conditions"] = _parse_jsonb(d.get("invalidation_conditions"))
results.append(d)
return results
@app.get("/api/recommendations/{recommendation_id}")
async def get_recommendation(recommendation_id: str):
"""Get a single recommendation with evidence and risk evaluation.
Requirement 11.2: display contributing intelligence objects, raw sources,
and market context that influenced the decision.
"""
row = await pool.fetchrow(
"""SELECT r.id, r.ticker, r.company_id, r.action, r.mode, r.confidence,
r.time_horizon, r.thesis, r.invalidation_conditions,
r.portfolio_pct, r.max_loss_pct, r.model_version,
r.model_provider, r.prompt_version, r.schema_version,
r.risk_classification, r.generated_at, r.created_at
FROM recommendations r WHERE r.id = $1""",
recommendation_id,
)
if not row:
raise HTTPException(404, "Recommendation not found")
result = _row_to_dict(row)
result["invalidation_conditions"] = _parse_jsonb(result.get("invalidation_conditions"))
# Evidence: linked documents and intelligence objects
evidence_rows = await pool.fetch(
"""SELECT re.id, re.document_id, re.intelligence_id, re.evidence_type, re.weight,
d.title, d.document_type, d.source_type, d.publisher, d.url,
d.published_at
FROM recommendation_evidence re
LEFT JOIN documents d ON d.id = re.document_id
WHERE re.recommendation_id = $1
ORDER BY re.weight DESC""",
recommendation_id,
)
result["evidence"] = [_row_to_dict(e) for e in evidence_rows]
# Risk evaluation
risk_row = await pool.fetchrow(
"""SELECT id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
FROM risk_evaluations WHERE recommendation_id = $1
ORDER BY evaluated_at DESC LIMIT 1""",
recommendation_id,
)
if risk_row:
risk_dict = _row_to_dict(risk_row)
risk_dict["rejection_reasons"] = _parse_jsonb(risk_dict.get("rejection_reasons"))
risk_dict["risk_checks"] = _parse_jsonb(risk_dict.get("risk_checks"))
result["risk_evaluation"] = risk_dict
else:
result["risk_evaluation"] = None
return result
# ---------------------------------------------------------------------------
# Evidence Drill-Down (Requirement 11.2, 10.4)
# ---------------------------------------------------------------------------
@app.get("/api/recommendations/{recommendation_id}/evidence")
async def get_recommendation_evidence_drilldown(recommendation_id: str):
"""Full evidence drill-down linking a recommendation to source documents and raw artifacts.
Returns the complete provenance chain for each piece of evidence:
recommendation_evidence → document (with storage refs) → document_intelligence
→ document_impact_records, plus the trend window that fed the recommendation.
Requirements: 11.2, 10.4
Design: Section 9.1 (evidence drill-down and audit views)
"""
# Verify recommendation exists and get basic info
rec_row = await pool.fetchrow(
"""SELECT id, ticker, company_id, action, mode, confidence,
time_horizon, thesis, model_version, model_provider,
prompt_version, schema_version, generated_at
FROM recommendations WHERE id = $1""",
recommendation_id,
)
if not rec_row:
raise HTTPException(404, "Recommendation not found")
result: dict[str, Any] = {
"recommendation": _row_to_dict(rec_row),
"evidence": [],
"trend_window": None,
}
# Fetch evidence rows with full document details including storage refs
evidence_rows = await pool.fetch(
"""SELECT re.id AS evidence_id,
re.document_id,
re.intelligence_id,
re.evidence_type,
re.weight,
d.document_type,
d.source_type,
d.publisher,
d.url,
d.canonical_url,
d.title,
d.published_at,
d.retrieved_at,
d.language,
d.content_hash,
d.raw_storage_ref,
d.normalized_storage_ref,
d.parse_quality_score,
d.parse_confidence,
d.status AS document_status
FROM recommendation_evidence re
LEFT JOIN documents d ON d.id = re.document_id
WHERE re.recommendation_id = $1
ORDER BY re.weight DESC""",
recommendation_id,
)
for ev in evidence_rows:
ev_dict = _row_to_dict(ev)
ev_dict["intelligence"] = None
ev_dict["company_impacts"] = []
# Fetch intelligence extraction for this evidence
intel_id = ev["intelligence_id"]
doc_id = ev["document_id"]
# Use the linked intelligence_id if available, otherwise look up by document_id
intel_row = None
if intel_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE id = $1""",
intel_id,
)
elif doc_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
doc_id,
)
if intel_row:
intel_dict = _row_to_dict(intel_row)
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
ev_dict["intelligence"] = intel_dict
# Fetch per-company impact records for this intelligence
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel_row["id"],
)
impact_list = []
for imp in impacts:
imp_dict = _row_to_dict(imp)
for jf in ("key_facts", "risks", "evidence_spans"):
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
impact_list.append(imp_dict)
ev_dict["company_impacts"] = impact_list
result["evidence"].append(ev_dict)
# Fetch the most recent trend window for this ticker to show market context
ticker = rec_row["ticker"]
generated_at = rec_row["generated_at"]
if ticker and generated_at:
trend_row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows
WHERE entity_id = $1 AND entity_type = 'company'
AND generated_at <= $2
ORDER BY generated_at DESC LIMIT 1""",
ticker, generated_at,
)
if trend_row:
trend_dict = _row_to_dict(trend_row)
for jf in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
# Include trend evidence linkage: documents that contributed to this trend
trend_ev_rows = await pool.fetch(
"""SELECT te.id, te.document_id, te.evidence_type, te.rank_score,
te.weight_component, te.impact_component,
te.recency_component, te.confidence_component,
te.sentiment_value,
d.title, d.document_type, d.source_type, d.publisher,
d.url, d.published_at, d.raw_storage_ref,
d.normalized_storage_ref
FROM trend_evidence te
LEFT JOIN documents d ON d.id = te.document_id
WHERE te.trend_window_id = $1
ORDER BY te.rank_score DESC""",
trend_row["id"],
)
trend_dict["evidence"] = [_row_to_dict(te) for te in trend_ev_rows]
result["trend_window"] = trend_dict
return result
# ---------------------------------------------------------------------------
# Trend Evidence Drill-Down (Requirement 10.4)
# ---------------------------------------------------------------------------
@app.get("/api/trends/{trend_id}/evidence")
async def get_trend_evidence_drilldown(trend_id: str):
"""Drill down from a trend window to its contributing documents and raw artifacts.
Returns the trend summary plus each contributing document with storage refs,
intelligence extraction, and impact records — full provenance chain.
Requirements: 10.4, 6.5
"""
trend_row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows WHERE id = $1""",
trend_id,
)
if not trend_row:
raise HTTPException(404, "Trend not found")
trend_dict = _row_to_dict(trend_row)
for jf in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
# Fetch trend evidence with full document details
evidence_rows = await pool.fetch(
"""SELECT te.id AS evidence_id,
te.document_id,
te.evidence_type,
te.rank_score,
te.weight_component,
te.impact_component,
te.recency_component,
te.confidence_component,
te.sentiment_value,
d.document_type,
d.source_type,
d.publisher,
d.url,
d.canonical_url,
d.title,
d.published_at,
d.retrieved_at,
d.content_hash,
d.raw_storage_ref,
d.normalized_storage_ref,
d.parse_quality_score,
d.parse_confidence,
d.status AS document_status
FROM trend_evidence te
LEFT JOIN documents d ON d.id = te.document_id
WHERE te.trend_window_id = $1
ORDER BY te.rank_score DESC""",
trend_id,
)
evidence_list = []
for ev in evidence_rows:
ev_dict = _row_to_dict(ev)
ev_dict["intelligence"] = None
ev_dict["company_impacts"] = []
doc_id = ev["document_id"]
if doc_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
doc_id,
)
if intel_row:
intel_dict = _row_to_dict(intel_row)
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
ev_dict["intelligence"] = intel_dict
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel_row["id"],
)
for imp in impacts:
imp_dict = _row_to_dict(imp)
for jf in ("key_facts", "risks", "evidence_spans"):
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
ev_dict["company_impacts"].append(imp_dict)
evidence_list.append(ev_dict)
return {
"trend": trend_dict,
"evidence": evidence_list,
}
# ---------------------------------------------------------------------------
# Order History (Requirement 11.1, 11.3)
# ---------------------------------------------------------------------------
@app.get("/api/orders")
async def list_orders(
ticker: Optional[str] = None,
status: Optional[str] = None,
side: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List orders with optional filters."""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"o.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
if status:
conditions.append(f"o.status = ${idx}")
params.append(status)
idx += 1
if side:
conditions.append(f"o.side = ${idx}")
params.append(side)
idx += 1
if since:
conditions.append(f"o.created_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
o.status, o.broker_order_id, o.submitted_at, o.acknowledged_at,
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
o.fill_price, o.fill_quantity, o.created_at
FROM orders o
{where}
ORDER BY o.created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str):
"""Get a single order with its events, decision trace, and full audit trail.
Requirement 11.3: expose full audit trail from ingestion through broker
execution and eventual market outcome.
"""
row = await pool.fetchrow(
"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
o.status, o.idempotency_key, o.broker_order_id,
o.decision_trace, o.submitted_at, o.acknowledged_at,
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
o.fill_price, o.fill_quantity, o.created_at, o.updated_at
FROM orders o WHERE o.id = $1""",
order_id,
)
if not row:
raise HTTPException(404, "Order not found")
result = _row_to_dict(row)
result["decision_trace"] = _parse_jsonb(result.get("decision_trace"))
# Order events
events = await pool.fetch(
"""SELECT id, event_type, data, broker_timestamp, created_at
FROM order_events WHERE order_id = $1 ORDER BY created_at ASC""",
order_id,
)
result["events"] = []
for ev in events:
ev_dict = _row_to_dict(ev)
ev_dict["data"] = _parse_jsonb(ev_dict.get("data"))
result["events"].append(ev_dict)
# Full audit trail (Requirement 11.3)
recommendation_id = str(row["recommendation_id"]) if row["recommendation_id"] else None
result["audit_trail"] = await get_order_audit_trail(pool, order_id, recommendation_id)
return result
# ---------------------------------------------------------------------------
# Positions (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/positions")
async def list_positions(
ticker: Optional[str] = None,
):
"""List current positions."""
if ticker:
rows = await pool.fetch(
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
p.avg_entry_price, p.current_price,
p.unrealized_pnl, p.realized_pnl, p.updated_at
FROM positions p WHERE p.ticker = $1 ORDER BY p.ticker""",
ticker.upper(),
)
else:
rows = await pool.fetch(
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
p.avg_entry_price, p.current_price,
p.unrealized_pnl, p.realized_pnl, p.updated_at
FROM positions p ORDER BY p.ticker""",
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Audit Trail (Requirement 11.3)
# ---------------------------------------------------------------------------
@app.get("/api/audit/{entity_type}/{entity_id}")
async def get_audit_trail(entity_type: str, entity_id: str):
"""Get audit events for any entity type and ID."""
events = await get_entity_audit_trail(pool, entity_type, entity_id)
if not events:
raise HTTPException(404, "No audit events found")
return events
# ---------------------------------------------------------------------------
# Admin: Source Health (Requirement 11.1 - source health)
# ---------------------------------------------------------------------------
@app.get("/api/admin/sources/health")
async def get_source_health(
source_type: Optional[str] = None,
company_id: Optional[str] = None,
active_only: bool = True,
):
"""Source health overview: each source with its latest ingestion status and failure counts.
Design: Section 9.1 (source health and job state)
"""
conditions = []
params: list[Any] = []
idx = 1
if active_only:
conditions.append(f"s.active = ${idx}")
params.append(True)
idx += 1
if source_type:
conditions.append(f"s.source_type = ${idx}")
params.append(source_type)
idx += 1
if company_id:
conditions.append(f"s.company_id = ${idx}")
params.append(company_id)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT s.id AS source_id, s.source_type, s.source_name,
s.credibility_score, s.active,
c.ticker, c.legal_name, c.id AS company_id,
latest.status AS last_run_status,
latest.started_at AS last_run_at,
latest.error_message AS last_error,
latest.items_fetched AS last_items_fetched,
latest.items_new AS last_items_new,
COALESCE(stats.total_runs, 0) AS total_runs_24h,
COALESCE(stats.failed_runs, 0) AS failed_runs_24h,
COALESCE(stats.total_items, 0) AS total_items_24h
FROM sources s
JOIN companies c ON c.id = s.company_id
LEFT JOIN LATERAL (
SELECT ir.status, ir.started_at, ir.error_message,
ir.items_fetched, ir.items_new
FROM ingestion_runs ir
WHERE ir.source_id = s.id
ORDER BY ir.started_at DESC
LIMIT 1
) latest ON TRUE
LEFT JOIN LATERAL (
SELECT COUNT(*) AS total_runs,
COUNT(*) FILTER (WHERE ir2.status = 'failed') AS failed_runs,
COALESCE(SUM(ir2.items_fetched), 0) AS total_items
FROM ingestion_runs ir2
WHERE ir2.source_id = s.id
AND ir2.started_at >= NOW() - INTERVAL '24 hours'
) stats ON TRUE
{where}
ORDER BY c.ticker, s.source_type""",
*params,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/admin/sources/{source_id}/runs")
async def get_source_runs(
source_id: str,
limit: int = Query(default=20, le=100),
offset: int = 0,
):
"""Recent ingestion runs for a specific source."""
rows = await pool.fetch(
"""SELECT id, source_id, company_id, source_type, status,
started_at, completed_at, items_fetched, items_new,
error_message, retry_count, next_retry_at
FROM ingestion_runs
WHERE source_id = $1
ORDER BY started_at DESC
LIMIT $2 OFFSET $3""",
source_id, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.put("/api/admin/sources/{source_id}/toggle")
async def toggle_source(source_id: str, active: bool = True):
"""Enable or disable a source."""
row = await pool.fetchrow(
"""UPDATE sources SET active = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, source_type, source_name, active""",
source_id, active,
)
if not row:
raise HTTPException(404, "Source not found")
return _row_to_dict(row)
@app.put("/api/admin/sources/{source_id}/credibility")
async def update_source_credibility(source_id: str, credibility_score: float = Query(ge=0.0, le=1.0)):
"""Update a source's credibility score."""
row = await pool.fetchrow(
"""UPDATE sources SET credibility_score = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, source_type, source_name, credibility_score""",
source_id, credibility_score,
)
if not row:
raise HTTPException(404, "Source not found")
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Admin: Symbol Configs (Requirement 11.1 - symbol configs)
# ---------------------------------------------------------------------------
@app.put("/api/admin/companies/{company_id}/toggle")
async def toggle_company(company_id: str, active: bool = True):
"""Enable or disable a tracked company."""
row = await pool.fetchrow(
"""UPDATE companies SET active = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, active""",
company_id, active,
)
if not row:
raise HTTPException(404, "Company not found")
return _row_to_dict(row)
@app.put("/api/admin/companies/{company_id}/sector")
async def update_company_sector(
company_id: str,
sector: str = Query(...),
industry: Optional[str] = None,
):
"""Update a company's sector and industry classification."""
if industry is not None:
row = await pool.fetchrow(
"""UPDATE companies SET sector = $2, industry = $3, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, sector, industry""",
company_id, sector, industry,
)
else:
row = await pool.fetchrow(
"""UPDATE companies SET sector = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, sector, industry""",
company_id, sector,
)
if not row:
raise HTTPException(404, "Company not found")
return _row_to_dict(row)
@app.get("/api/admin/companies/coverage")
async def get_symbol_coverage():
"""Overview of source coverage per active company.
Shows how many active sources of each type are configured per symbol,
useful for identifying coverage gaps.
"""
rows = await pool.fetch(
"""SELECT c.id AS company_id, c.ticker, c.legal_name, c.sector,
c.active,
COUNT(s.id) FILTER (WHERE s.active) AS active_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'market_api' AND s.active) AS market_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'news_api' AND s.active) AS news_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'filings_api' AND s.active) AS filings_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'web_scrape' AND s.active) AS web_scrape_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'broker' AND s.active) AS broker_sources
FROM companies c
LEFT JOIN sources s ON s.company_id = c.id
WHERE c.active = TRUE
GROUP BY c.id, c.ticker, c.legal_name, c.sector, c.active
ORDER BY c.ticker""",
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Admin: Trading Mode (Requirement 8.1, 8.2, 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/admin/trading/config")
async def get_trading_config():
"""Get the current active risk/trading configuration."""
row = await pool.fetchrow(
"""SELECT id, name, trading_mode, config, active, created_at, updated_at
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if not row:
return {"trading_mode": "paper", "config": {}, "message": "No active config found, using defaults"}
result = _row_to_dict(row)
result["config"] = _parse_jsonb(result.get("config"))
return result
@app.put("/api/admin/trading/mode")
async def set_trading_mode(mode: str = Query(..., pattern="^(paper|live|disabled)$")):
"""Switch the active trading mode.
Requirement 8.1: support paper and live as separate execution environments.
Requirement 8.2: live mode requires operator approval controls.
"""
row = await pool.fetchrow(
"""UPDATE risk_configs SET trading_mode = $1, updated_at = NOW()
WHERE active = TRUE
RETURNING id, name, trading_mode""",
mode,
)
if not row:
# No active config exists yet — create one with the requested mode
row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', $1, '{}', TRUE)
RETURNING id, name, trading_mode""",
mode,
)
return _row_to_dict(row)
@app.put("/api/admin/trading/config")
async def update_trading_config(config: dict[str, Any]):
"""Update the active risk configuration JSON.
Accepts a partial or full risk config object. The config is stored
as JSONB alongside the trading_mode in risk_configs.
"""
config_json = json.dumps(config)
row = await pool.fetchrow(
"""UPDATE risk_configs SET config = $1::jsonb, updated_at = NOW()
WHERE active = TRUE
RETURNING id, name, trading_mode, config""",
config_json,
)
if not row:
row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', 'paper', $1::jsonb, TRUE)
RETURNING id, name, trading_mode, config""",
config_json,
)
result = _row_to_dict(row)
result["config"] = _parse_jsonb(result.get("config"))
return result
@app.get("/api/admin/trading/approvals")
async def list_pending_approvals():
"""List pending operator approval requests for live trading orders."""
rows = await pool.fetch(
"""SELECT id, order_job, recommendation_id, ticker, side, quantity,
estimated_value, status, risk_evaluation_id, requested_by,
reviewed_by, review_note, expires_at, requested_at, reviewed_at
FROM operator_approvals
WHERE status = 'pending'
ORDER BY requested_at ASC""",
)
results = []
for r in rows:
d = _row_to_dict(r)
d["order_job"] = _parse_jsonb(d.get("order_job"))
results.append(d)
return results
@app.put("/api/admin/trading/approvals/{approval_id}")
async def review_approval_request(
approval_id: str,
approved: bool = Query(...),
reviewed_by: str = "operator",
review_note: str = "",
):
"""Approve or reject a pending operator approval request.
Requirement 8.2: live orders require operator approval controls.
"""
now = datetime.now(timezone.utc)
new_status = "approved" if approved else "rejected"
row = await pool.fetchrow(
"""UPDATE operator_approvals
SET status = $2, reviewed_by = $3, review_note = $4,
reviewed_at = $5, updated_at = NOW()
WHERE id = $1::uuid AND status = 'pending'
RETURNING id, ticker, status, reviewed_by""",
approval_id, new_status, reviewed_by, review_note, now,
)
if not row:
raise HTTPException(404, "Approval not found or no longer pending")
return _row_to_dict(row)
@app.get("/api/admin/trading/lockouts")
async def list_active_lockouts():
"""List active symbol lockouts (news-shock, cooldown)."""
rows = await pool.fetch(
"""SELECT id, ticker, lockout_type, reason, expires_at, created_at
FROM symbol_lockouts
WHERE expires_at > NOW()
ORDER BY expires_at ASC""",
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Operational Dashboard (Requirement 12.1, 12.2, 12.3)
# ---------------------------------------------------------------------------
@app.get("/api/ops/ingestion/throughput")
async def get_ingestion_throughput(
hours: int = Query(default=24, ge=1, le=168),
bucket: str = Query(default="1h", pattern="^(15m|1h|6h|1d)$"),
):
"""Ingestion throughput over time, bucketed by interval.
Returns document counts and item counts per time bucket, broken down
by source type. Powers the ingestion throughput chart.
Requirements: 12.1, 12.3
"""
bucket_interval = {
"15m": "15 minutes",
"1h": "1 hour",
"6h": "6 hours",
"1d": "1 day",
}[bucket]
rows = await pool.fetch(
f"""SELECT
date_trunc('hour', ir.started_at)
- (EXTRACT(minute FROM ir.started_at)::int
% EXTRACT(epoch FROM INTERVAL '{bucket_interval}')::int / 60)
* INTERVAL '1 minute' AS bucket_start,
ir.source_type,
COUNT(*) AS run_count,
COUNT(*) FILTER (WHERE ir.status = 'completed') AS completed,
COUNT(*) FILTER (WHERE ir.status = 'failed') AS failed,
COALESCE(SUM(ir.items_fetched), 0) AS items_fetched,
COALESCE(SUM(ir.items_new), 0) AS items_new
FROM ingestion_runs ir
WHERE ir.started_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY bucket_start, ir.source_type
ORDER BY bucket_start DESC, ir.source_type""",
hours,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/ops/ingestion/summary")
async def get_ingestion_summary(
hours: int = Query(default=24, ge=1, le=168),
):
"""High-level ingestion summary for the operational dashboard.
Returns total runs, success/failure counts, items processed, and
per-source-type breakdown for the given time window.
Requirements: 12.1
"""
row = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_runs,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'running') AS running,
COALESCE(SUM(items_fetched), 0) AS total_items_fetched,
COALESCE(SUM(items_new), 0) AS total_items_new,
COUNT(DISTINCT source_id) AS active_sources,
COUNT(DISTINCT company_id) AS active_companies
FROM ingestion_runs
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
by_type = await pool.fetch(
"""SELECT
source_type,
COUNT(*) AS runs,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COALESCE(SUM(items_fetched), 0) AS items_fetched,
COALESCE(SUM(items_new), 0) AS items_new
FROM ingestion_runs
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY source_type
ORDER BY runs DESC""",
hours,
)
result = _row_to_dict(row) if row else {}
result["by_source_type"] = [_row_to_dict(r) for r in by_type]
result["hours"] = hours
return result
@app.get("/api/ops/model/failures")
async def get_model_failures(
hours: int = Query(default=24, ge=1, le=168),
limit: int = Query(default=50, le=200),
):
"""Recent model extraction failures with error details.
Returns individual failed extraction attempts for debugging.
Requirements: 12.2
"""
rows = await pool.fetch(
"""SELECT
mpm.id, mpm.document_id, mpm.ticker, mpm.model_name,
mpm.prompt_version, mpm.schema_version,
mpm.attempt_count, mpm.total_duration_ms,
mpm.validation_status, mpm.validation_error_count,
mpm.validation_errors, mpm.retry_count,
mpm.confidence, mpm.recorded_at,
d.title AS document_title, d.document_type, d.source_type
FROM model_performance_metrics mpm
LEFT JOIN documents d ON d.id = mpm.document_id
WHERE mpm.success = FALSE
AND mpm.recorded_at >= NOW() - INTERVAL '1 hour' * $1
ORDER BY mpm.recorded_at DESC
LIMIT $2""",
hours, limit,
)
results = []
for r in rows:
d = _row_to_dict(r)
d["validation_errors"] = _parse_jsonb(d.get("validation_errors"))
results.append(d)
return results
@app.get("/api/ops/model/performance")
async def get_model_performance(
hours: int = Query(default=24, ge=1, le=168),
model_name: Optional[str] = None,
):
"""Aggregated model performance metrics for the operational dashboard.
Returns success rate, latency percentiles, retry rate, confidence
distribution, and token usage for the given time window.
Requirements: 12.2
"""
return await get_model_performance_summary(
pool,
model_name=model_name,
hours=hours,
)
@app.get("/api/ops/pipeline/health")
async def get_pipeline_health(
hours: int = Query(default=24, ge=1, le=168),
):
"""Pipeline stage health summary across ingestion, parsing, extraction, and aggregation.
Shows document counts at each processing stage and identifies bottlenecks.
Requirements: 12.1
"""
# Document status distribution (pipeline stages)
doc_stages = await pool.fetch(
"""SELECT
status,
COUNT(*) AS doc_count
FROM documents
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY status
ORDER BY doc_count DESC""",
hours,
)
# Parsing quality distribution
parse_quality = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_parsed,
COUNT(*) FILTER (WHERE parse_confidence = 'high') AS high_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'medium') AS medium_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'low') AS low_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'unknown' OR parse_confidence IS NULL) AS unknown_confidence,
ROUND(AVG(parse_quality_score)::numeric, 3) AS avg_quality_score
FROM documents
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
AND status IN ('parsed', 'extracted', 'aggregated')""",
hours,
)
# Extraction validation distribution
extraction_stats = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_extractions,
COUNT(*) FILTER (WHERE validation_status = 'valid') AS valid,
COUNT(*) FILTER (WHERE validation_status = 'failed') AS failed,
COUNT(*) FILTER (WHERE validation_status = 'pending') AS pending,
ROUND(AVG(confidence)::numeric, 3) AS avg_confidence,
ROUND(AVG(retry_count)::numeric, 2) AS avg_retries
FROM document_intelligence
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
# Aggregation output (trend windows generated)
trend_stats = await pool.fetchrow(
"""SELECT
COUNT(*) AS trends_generated,
COUNT(DISTINCT entity_id) AS symbols_covered,
ROUND(AVG(confidence)::numeric, 3) AS avg_trend_confidence,
ROUND(AVG(contradiction_score)::numeric, 3) AS avg_contradiction
FROM trend_windows
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
return {
"hours": hours,
"document_stages": [_row_to_dict(r) for r in doc_stages],
"parsing": _row_to_dict(parse_quality) if parse_quality else {},
"extraction": _row_to_dict(extraction_stats) if extraction_stats else {},
"aggregation": _row_to_dict(trend_stats) if trend_stats else {},
}
@app.get("/api/ops/sources/coverage-gaps")
async def get_source_coverage_gaps():
"""Identify symbols with missing or insufficient source coverage.
Returns companies that lack one or more expected source types
(market_api, news_api, filings_api), or have sources that haven't
produced successful ingestion runs recently.
Requirements: 12.3
"""
# Companies missing expected source types
missing_types = await pool.fetch(
"""SELECT
c.id AS company_id, c.ticker, c.legal_name, c.sector,
ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) AS active_types,
ARRAY['market_api', 'news_api', 'filings_api'] AS expected_types
FROM companies c
LEFT JOIN sources s ON s.company_id = c.id AND s.active = TRUE
WHERE c.active = TRUE
GROUP BY c.id, c.ticker, c.legal_name, c.sector
HAVING NOT ARRAY['market_api', 'news_api', 'filings_api'] <@ ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active)
OR ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) IS NULL
ORDER BY c.ticker""",
)
# Sources with no successful runs in the last 24 hours
stale_sources = await pool.fetch(
"""SELECT
s.id AS source_id, s.source_type, s.source_name,
c.ticker, c.legal_name,
MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') AS last_success,
MAX(ir.started_at) AS last_attempt,
COUNT(*) FILTER (WHERE ir.status = 'failed'
AND ir.started_at >= NOW() - INTERVAL '24 hours') AS recent_failures
FROM sources s
JOIN companies c ON c.id = s.company_id
LEFT JOIN ingestion_runs ir ON ir.source_id = s.id
WHERE s.active = TRUE AND c.active = TRUE
GROUP BY s.id, s.source_type, s.source_name, c.ticker, c.legal_name
HAVING MAX(ir.started_at) FILTER (WHERE ir.status = 'completed')
< NOW() - INTERVAL '24 hours'
OR MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') IS NULL
ORDER BY c.ticker, s.source_type""",
)
return {
"missing_source_types": [_row_to_dict(r) for r in missing_types],
"stale_sources": [_row_to_dict(r) for r in stale_sources],
}
# ---------------------------------------------------------------------------
# Analytics: Trino SQL Proxy (Requirement 10.1, 10.3, 13.7)
# ---------------------------------------------------------------------------
@app.post("/api/analytics/query")
async def analytics_query(body: dict[str, Any]):
"""Proxy SQL to Trino, enforce row limits, return structured results.
Design: Section 9.3 (API proxy for Trino)
Requirements: 10.1, 10.3, 13.7
"""
sql = body.get("sql", "").strip()
if not sql:
raise HTTPException(400, "sql is required")
limit = min(int(body.get("limit", 1000)), 10000)
trino_host = config.trino.host
trino_port = config.trino.port
trino_catalog = config.trino.catalog
trino_schema = config.trino.schema
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
headers = {
"X-Trino-User": "stonks-dashboard",
"X-Trino-Catalog": trino_catalog,
"X-Trino-Schema": trino_schema,
}
start = _time.monotonic()
try:
async with httpx.AsyncClient(timeout=60.0) as client:
# Submit query
resp = await client.post(trino_url, content=sql, headers=headers)
if resp.status_code != 200:
raise HTTPException(502, f"Trino error: {resp.text[:500]}")
result = resp.json()
columns: list[dict[str, str]] = []
all_rows: list[list[Any]] = []
# Extract columns from first response
if "columns" in result:
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
if "data" in result:
all_rows.extend(result["data"])
# Follow nextUri to get all results
while "nextUri" in result and len(all_rows) < limit:
next_url = result["nextUri"]
resp = await client.get(next_url, headers=headers)
if resp.status_code != 200:
break
result = resp.json()
if "columns" in result and not columns:
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
if "data" in result:
all_rows.extend(result["data"])
elapsed_ms = round((_time.monotonic() - start) * 1000)
all_rows = all_rows[:limit]
return {
"columns": columns,
"rows": all_rows,
"row_count": len(all_rows),
"elapsed_ms": elapsed_ms,
}
except httpx.ConnectError:
raise HTTPException(502, "Cannot connect to Trino")
except httpx.TimeoutException:
raise HTTPException(504, "Trino query timed out")
@app.get("/api/analytics/schema")
async def analytics_schema():
"""Return Trino catalog/schema/table/column metadata for the schema browser.
Requirements: 13.7
"""
trino_host = config.trino.host
trino_port = config.trino.port
trino_catalog = config.trino.catalog
trino_schema = config.trino.schema
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
headers = {
"X-Trino-User": "stonks-dashboard",
"X-Trino-Catalog": trino_catalog,
"X-Trino-Schema": trino_schema,
}
async def _run_trino_query(sql: str) -> list[list[Any]]:
rows: list[list[Any]] = []
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(trino_url, content=sql, headers=headers)
if resp.status_code != 200:
return rows
result = resp.json()
if "data" in result:
rows.extend(result["data"])
while "nextUri" in result:
resp = await client.get(result["nextUri"], headers=headers)
if resp.status_code != 200:
break
result = resp.json()
if "data" in result:
rows.extend(result["data"])
return rows
try:
# Get tables
table_rows = await _run_trino_query(
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{trino_schema}' ORDER BY table_name"
)
tables = []
for tr in table_rows:
table_name = tr[0] if tr else None
if not table_name:
continue
# Get columns for each table
col_rows = await _run_trino_query(
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{trino_schema}' AND table_name = '{table_name}' ORDER BY ordinal_position"
)
columns = [{"name": cr[0], "type": cr[1]} for cr in col_rows if cr]
tables.append({"name": table_name, "columns": columns})
return {
"catalog": trino_catalog,
"schema": trino_schema,
"tables": tables,
}
except Exception:
return {"catalog": trino_catalog, "schema": trino_schema, "tables": []}
# ---------------------------------------------------------------------------
# Analytics: Saved Queries (Requirement 13.7)
# ---------------------------------------------------------------------------
class SavedQueryBody(BaseModel):
name: str
description: str = ""
sql_text: str
@app.get("/api/analytics/saved-queries")
async def list_saved_queries():
"""List all saved queries."""
rows = await pool.fetch(
"SELECT id, name, description, sql_text, created_by, created_at, updated_at FROM saved_queries ORDER BY updated_at DESC"
)
return [_row_to_dict(r) for r in rows]
@app.post("/api/analytics/saved-queries", status_code=201)
async def create_saved_query(body: SavedQueryBody):
"""Save a new query."""
row = await pool.fetchrow(
"""INSERT INTO saved_queries (name, description, sql_text)
VALUES ($1, $2, $3)
RETURNING id, name, description, sql_text, created_by, created_at""",
body.name, body.description, body.sql_text,
)
return _row_to_dict(row)
@app.delete("/api/analytics/saved-queries/{query_id}")
async def delete_saved_query(query_id: str):
"""Delete a saved query."""
result = await pool.execute("DELETE FROM saved_queries WHERE id = $1::uuid", query_id)
if result == "DELETE 0":
raise HTTPException(404, "Query not found")
return {"status": "deleted"}