"""Query API - FastAPI application for analytics, evidence drill-down, and admin controls. Exposes read-only endpoints for: - Companies and watchlists (proxied from symbol registry data) - Document timelines with intelligence - Trend summaries - Recommendation history with evidence - Order history with audit trails Requirements: 11.1, 11.2, 11.3 Design: Section 9.1 (Operational API) """ from __future__ import annotations import json import logging import time as _time from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any, Optional import asyncpg import httpx from fastapi import FastAPI, HTTPException, Query, Request from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from pydantic import BaseModel from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import Response from services.extractor.metrics import get_model_performance_summary from services.shared.audit import get_entity_audit_trail, get_order_audit_trail from services.shared.config import load_config from services.shared.db import get_pg_pool from services.shared.logging import new_trace_id, set_trace_context, setup_logging logger = logging.getLogger("query_api") config = load_config() pool: Optional[asyncpg.Pool] = None @asynccontextmanager async def lifespan(app: FastAPI): global pool setup_logging("query_api", level=config.log_level, json_output=config.json_logs) pool = await get_pg_pool(config) yield await pool.close() app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan) class TraceMiddleware(BaseHTTPMiddleware): """Inject trace context for every incoming HTTP request.""" async def dispatch(self, request: Request, call_next): trace_id = request.headers.get("x-trace-id") or new_trace_id() set_trace_context(trace_id=trace_id) response = await call_next(request) response.headers["x-trace-id"] = trace_id return response app.add_middleware(TraceMiddleware) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _row_to_dict(row: asyncpg.Record) -> dict[str, Any]: """Convert an asyncpg Record to a JSON-safe dict.""" d: dict[str, Any] = {} for key, val in dict(row).items(): if isinstance(val, datetime): d[key] = val.isoformat() elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, list, dict, type(None))): d[key] = str(val) else: d[key] = val return d def _parse_jsonb(val: Any) -> Any: """Parse a JSONB value that may come back as str or already-decoded.""" if val is None: return None if isinstance(val, (dict, list)): return val try: return json.loads(val) except (json.JSONDecodeError, TypeError): return val # --------------------------------------------------------------------------- # Health # --------------------------------------------------------------------------- @app.get("/health") async def health(): try: await pool.fetchval("SELECT 1") return {"status": "ok"} except Exception: raise HTTPException(503, "Database unavailable") @app.get("/metrics") async def metrics(): """Expose Prometheus metrics for scraping. Requirements: 12.1, 12.2 """ return Response( content=generate_latest(), media_type=CONTENT_TYPE_LATEST, ) # --------------------------------------------------------------------------- # Companies (Requirement 11.1) # --------------------------------------------------------------------------- @app.get("/api/companies") async def list_companies( active: bool = True, sector: Optional[str] = None, ticker: Optional[str] = None, ): """List tracked companies with optional filters.""" conditions = ["c.active = $1"] params: list[Any] = [active] idx = 2 if sector: conditions.append(f"c.sector = ${idx}") params.append(sector) idx += 1 if ticker: conditions.append(f"c.ticker = ${idx}") params.append(ticker.upper()) idx += 1 where = " AND ".join(conditions) rows = await pool.fetch( f"""SELECT c.id, c.ticker, c.legal_name, c.exchange, c.sector, c.industry, c.market_cap_bucket, c.active, c.created_at, c.updated_at FROM companies c WHERE {where} ORDER BY c.ticker""", *params, ) return [_row_to_dict(r) for r in rows] @app.get("/api/companies/{company_id}") async def get_company(company_id: str): """Get a single company with aliases and source count.""" row = await pool.fetchrow( """SELECT id, ticker, legal_name, exchange, sector, industry, market_cap_bucket, active, created_at, updated_at FROM companies WHERE id = $1""", company_id, ) if not row: raise HTTPException(404, "Company not found") result = _row_to_dict(row) aliases = await pool.fetch( "SELECT id, alias, alias_type FROM company_aliases WHERE company_id = $1", company_id, ) result["aliases"] = [dict(a) for a in aliases] source_count = await pool.fetchval( "SELECT COUNT(*) FROM sources WHERE company_id = $1 AND active = true", company_id, ) result["active_source_count"] = source_count return result @app.get("/api/companies/{company_id}/sources") async def list_company_sources(company_id: str): """List sources configured for a company.""" rows = await pool.fetch( """SELECT id, source_type, source_name, config, credibility_score, retention_days, access_policy, active FROM sources WHERE company_id = $1 ORDER BY source_type""", company_id, ) return [_row_to_dict(r) for r in rows] # --------------------------------------------------------------------------- # Document Timelines (Requirement 11.1, 11.2) # --------------------------------------------------------------------------- @app.get("/api/documents") async def list_documents( ticker: Optional[str] = None, company_id: Optional[str] = None, document_type: Optional[str] = None, status: Optional[str] = None, since: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, ): """List documents with optional filters, ordered by published_at desc.""" conditions: list[str] = [] params: list[Any] = [] idx = 1 if ticker: conditions.append(f"""d.id IN ( SELECT document_id FROM document_company_mentions WHERE ticker = ${idx} )""") params.append(ticker.upper()) idx += 1 if company_id: conditions.append(f"""d.id IN ( SELECT document_id FROM document_company_mentions WHERE company_id = ${idx} )""") params.append(company_id) idx += 1 if document_type: conditions.append(f"d.document_type = ${idx}") params.append(document_type) idx += 1 if status: conditions.append(f"d.status = ${idx}") params.append(status) idx += 1 if since: conditions.append(f"d.published_at >= ${idx}::timestamptz") params.append(since) idx += 1 where = ("WHERE " + " AND ".join(conditions)) if conditions else "" rows = await pool.fetch( f"""SELECT d.id, d.document_type, d.source_type, d.publisher, d.url, d.title, d.published_at, d.retrieved_at, d.language, d.content_hash, d.parse_quality_score, d.parse_confidence, d.status, d.created_at FROM documents d {where} ORDER BY d.published_at DESC NULLS LAST LIMIT ${idx} OFFSET ${idx + 1}""", *params, limit, offset, ) return [_row_to_dict(r) for r in rows] @app.get("/api/documents/{document_id}") async def get_document(document_id: str): """Get a single document with its intelligence extraction and company mentions.""" row = await pool.fetchrow( """SELECT id, document_type, source_type, publisher, url, canonical_url, title, published_at, retrieved_at, language, content_hash, raw_storage_ref, normalized_storage_ref, parse_quality_score, parse_confidence, status, created_at, updated_at FROM documents WHERE id = $1""", document_id, ) if not row: raise HTTPException(404, "Document not found") result = _row_to_dict(row) # Company mentions mentions = await pool.fetch( """SELECT dcm.company_id, dcm.ticker, dcm.mention_type, dcm.confidence, c.legal_name FROM document_company_mentions dcm JOIN companies c ON c.id = dcm.company_id WHERE dcm.document_id = $1""", document_id, ) result["company_mentions"] = [_row_to_dict(m) for m in mentions] # Intelligence extraction intel = await pool.fetchrow( """SELECT id, summary, macro_themes, novelty_score, source_credibility, extraction_warnings, confidence, model_provider, model_name, prompt_version, schema_version, validation_status, validation_errors, created_at FROM document_intelligence WHERE document_id = $1 ORDER BY created_at DESC LIMIT 1""", document_id, ) if intel: intel_dict = _row_to_dict(intel) intel_dict["macro_themes"] = _parse_jsonb(intel_dict.get("macro_themes")) intel_dict["extraction_warnings"] = _parse_jsonb(intel_dict.get("extraction_warnings")) intel_dict["validation_errors"] = _parse_jsonb(intel_dict.get("validation_errors")) # Impact records per company impacts = await pool.fetch( """SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment, dir.impact_score, dir.impact_horizon, dir.catalyst_type, dir.key_facts, dir.risks, dir.evidence_spans, c.legal_name FROM document_impact_records dir JOIN companies c ON c.id = dir.company_id WHERE dir.intelligence_id = $1""", intel["id"], ) impact_list = [] for imp in impacts: imp_dict = _row_to_dict(imp) imp_dict["key_facts"] = _parse_jsonb(imp_dict.get("key_facts")) imp_dict["risks"] = _parse_jsonb(imp_dict.get("risks")) imp_dict["evidence_spans"] = _parse_jsonb(imp_dict.get("evidence_spans")) impact_list.append(imp_dict) intel_dict["company_impacts"] = impact_list result["intelligence"] = intel_dict else: result["intelligence"] = None return result # --------------------------------------------------------------------------- # Trend Summaries (Requirement 11.1) # --------------------------------------------------------------------------- @app.get("/api/trends") async def list_trends( ticker: Optional[str] = None, entity_type: str = "company", window: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, ): """List trend summaries with optional filters.""" conditions = ["entity_type = $1"] params: list[Any] = [entity_type] idx = 2 if ticker: conditions.append(f"entity_id = ${idx}") params.append(ticker.upper()) idx += 1 if window: conditions.append(f"window = ${idx}") params.append(window) idx += 1 where = " AND ".join(conditions) rows = await pool.fetch( f"""SELECT id, entity_type, entity_id, window, trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, market_context, generated_at FROM trend_windows WHERE {where} ORDER BY generated_at DESC LIMIT ${idx} OFFSET ${idx + 1}""", *params, limit, offset, ) results = [] for r in rows: d = _row_to_dict(r) for jsonb_field in ( "top_supporting_evidence", "top_opposing_evidence", "dominant_catalysts", "material_risks", "market_context", ): d[jsonb_field] = _parse_jsonb(d.get(jsonb_field)) results.append(d) return results @app.get("/api/trends/{trend_id}") async def get_trend(trend_id: str): """Get a single trend summary by ID.""" row = await pool.fetchrow( """SELECT id, entity_type, entity_id, window, trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, market_context, generated_at, created_at FROM trend_windows WHERE id = $1""", trend_id, ) if not row: raise HTTPException(404, "Trend not found") d = _row_to_dict(row) for jsonb_field in ( "top_supporting_evidence", "top_opposing_evidence", "dominant_catalysts", "material_risks", "market_context", ): d[jsonb_field] = _parse_jsonb(d.get(jsonb_field)) return d # --------------------------------------------------------------------------- # Recommendations (Requirement 11.1, 11.2) # --------------------------------------------------------------------------- @app.get("/api/recommendations") async def list_recommendations( ticker: Optional[str] = None, action: Optional[str] = None, mode: Optional[str] = None, since: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, ): """List recommendations with optional filters.""" conditions: list[str] = [] params: list[Any] = [] idx = 1 if ticker: conditions.append(f"r.ticker = ${idx}") params.append(ticker.upper()) idx += 1 if action: conditions.append(f"r.action = ${idx}") params.append(action) idx += 1 if mode: conditions.append(f"r.mode = ${idx}") params.append(mode) idx += 1 if since: conditions.append(f"r.generated_at >= ${idx}::timestamptz") params.append(since) idx += 1 where = ("WHERE " + " AND ".join(conditions)) if conditions else "" rows = await pool.fetch( f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence, r.time_horizon, r.thesis, r.invalidation_conditions, r.portfolio_pct, r.max_loss_pct, r.model_version, r.risk_classification, r.generated_at FROM recommendations r {where} ORDER BY r.generated_at DESC LIMIT ${idx} OFFSET ${idx + 1}""", *params, limit, offset, ) results = [] for r in rows: d = _row_to_dict(r) d["invalidation_conditions"] = _parse_jsonb(d.get("invalidation_conditions")) results.append(d) return results @app.get("/api/recommendations/{recommendation_id}") async def get_recommendation(recommendation_id: str): """Get a single recommendation with evidence and risk evaluation. Requirement 11.2: display contributing intelligence objects, raw sources, and market context that influenced the decision. """ row = await pool.fetchrow( """SELECT r.id, r.ticker, r.company_id, r.action, r.mode, r.confidence, r.time_horizon, r.thesis, r.invalidation_conditions, r.portfolio_pct, r.max_loss_pct, r.model_version, r.model_provider, r.prompt_version, r.schema_version, r.risk_classification, r.generated_at, r.created_at FROM recommendations r WHERE r.id = $1""", recommendation_id, ) if not row: raise HTTPException(404, "Recommendation not found") result = _row_to_dict(row) result["invalidation_conditions"] = _parse_jsonb(result.get("invalidation_conditions")) # Evidence: linked documents and intelligence objects evidence_rows = await pool.fetch( """SELECT re.id, re.document_id, re.intelligence_id, re.evidence_type, re.weight, d.title, d.document_type, d.source_type, d.publisher, d.url, d.published_at FROM recommendation_evidence re LEFT JOIN documents d ON d.id = re.document_id WHERE re.recommendation_id = $1 ORDER BY re.weight DESC""", recommendation_id, ) result["evidence"] = [_row_to_dict(e) for e in evidence_rows] # Risk evaluation risk_row = await pool.fetchrow( """SELECT id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at FROM risk_evaluations WHERE recommendation_id = $1 ORDER BY evaluated_at DESC LIMIT 1""", recommendation_id, ) if risk_row: risk_dict = _row_to_dict(risk_row) risk_dict["rejection_reasons"] = _parse_jsonb(risk_dict.get("rejection_reasons")) risk_dict["risk_checks"] = _parse_jsonb(risk_dict.get("risk_checks")) result["risk_evaluation"] = risk_dict else: result["risk_evaluation"] = None return result # --------------------------------------------------------------------------- # Evidence Drill-Down (Requirement 11.2, 10.4) # --------------------------------------------------------------------------- @app.get("/api/recommendations/{recommendation_id}/evidence") async def get_recommendation_evidence_drilldown(recommendation_id: str): """Full evidence drill-down linking a recommendation to source documents and raw artifacts. Returns the complete provenance chain for each piece of evidence: recommendation_evidence → document (with storage refs) → document_intelligence → document_impact_records, plus the trend window that fed the recommendation. Requirements: 11.2, 10.4 Design: Section 9.1 (evidence drill-down and audit views) """ # Verify recommendation exists and get basic info rec_row = await pool.fetchrow( """SELECT id, ticker, company_id, action, mode, confidence, time_horizon, thesis, model_version, model_provider, prompt_version, schema_version, generated_at FROM recommendations WHERE id = $1""", recommendation_id, ) if not rec_row: raise HTTPException(404, "Recommendation not found") result: dict[str, Any] = { "recommendation": _row_to_dict(rec_row), "evidence": [], "trend_window": None, } # Fetch evidence rows with full document details including storage refs evidence_rows = await pool.fetch( """SELECT re.id AS evidence_id, re.document_id, re.intelligence_id, re.evidence_type, re.weight, d.document_type, d.source_type, d.publisher, d.url, d.canonical_url, d.title, d.published_at, d.retrieved_at, d.language, d.content_hash, d.raw_storage_ref, d.normalized_storage_ref, d.parse_quality_score, d.parse_confidence, d.status AS document_status FROM recommendation_evidence re LEFT JOIN documents d ON d.id = re.document_id WHERE re.recommendation_id = $1 ORDER BY re.weight DESC""", recommendation_id, ) for ev in evidence_rows: ev_dict = _row_to_dict(ev) ev_dict["intelligence"] = None ev_dict["company_impacts"] = [] # Fetch intelligence extraction for this evidence intel_id = ev["intelligence_id"] doc_id = ev["document_id"] # Use the linked intelligence_id if available, otherwise look up by document_id intel_row = None if intel_id: intel_row = await pool.fetchrow( """SELECT id, document_id, summary, macro_themes, novelty_score, source_credibility, extraction_warnings, confidence, model_provider, model_name, prompt_version, schema_version, raw_output_ref, prompt_ref, validation_status, validation_errors, created_at FROM document_intelligence WHERE id = $1""", intel_id, ) elif doc_id: intel_row = await pool.fetchrow( """SELECT id, document_id, summary, macro_themes, novelty_score, source_credibility, extraction_warnings, confidence, model_provider, model_name, prompt_version, schema_version, raw_output_ref, prompt_ref, validation_status, validation_errors, created_at FROM document_intelligence WHERE document_id = $1 ORDER BY created_at DESC LIMIT 1""", doc_id, ) if intel_row: intel_dict = _row_to_dict(intel_row) for jf in ("macro_themes", "extraction_warnings", "validation_errors"): intel_dict[jf] = _parse_jsonb(intel_dict.get(jf)) ev_dict["intelligence"] = intel_dict # Fetch per-company impact records for this intelligence impacts = await pool.fetch( """SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment, dir.impact_score, dir.impact_horizon, dir.catalyst_type, dir.key_facts, dir.risks, dir.evidence_spans, c.legal_name FROM document_impact_records dir JOIN companies c ON c.id = dir.company_id WHERE dir.intelligence_id = $1""", intel_row["id"], ) impact_list = [] for imp in impacts: imp_dict = _row_to_dict(imp) for jf in ("key_facts", "risks", "evidence_spans"): imp_dict[jf] = _parse_jsonb(imp_dict.get(jf)) impact_list.append(imp_dict) ev_dict["company_impacts"] = impact_list result["evidence"].append(ev_dict) # Fetch the most recent trend window for this ticker to show market context ticker = rec_row["ticker"] generated_at = rec_row["generated_at"] if ticker and generated_at: trend_row = await pool.fetchrow( """SELECT id, entity_type, entity_id, window, trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, market_context, generated_at FROM trend_windows WHERE entity_id = $1 AND entity_type = 'company' AND generated_at <= $2 ORDER BY generated_at DESC LIMIT 1""", ticker, generated_at, ) if trend_row: trend_dict = _row_to_dict(trend_row) for jf in ( "top_supporting_evidence", "top_opposing_evidence", "dominant_catalysts", "material_risks", "market_context", ): trend_dict[jf] = _parse_jsonb(trend_dict.get(jf)) # Include trend evidence linkage: documents that contributed to this trend trend_ev_rows = await pool.fetch( """SELECT te.id, te.document_id, te.evidence_type, te.rank_score, te.weight_component, te.impact_component, te.recency_component, te.confidence_component, te.sentiment_value, d.title, d.document_type, d.source_type, d.publisher, d.url, d.published_at, d.raw_storage_ref, d.normalized_storage_ref FROM trend_evidence te LEFT JOIN documents d ON d.id = te.document_id WHERE te.trend_window_id = $1 ORDER BY te.rank_score DESC""", trend_row["id"], ) trend_dict["evidence"] = [_row_to_dict(te) for te in trend_ev_rows] result["trend_window"] = trend_dict return result # --------------------------------------------------------------------------- # Trend Evidence Drill-Down (Requirement 10.4) # --------------------------------------------------------------------------- @app.get("/api/trends/{trend_id}/evidence") async def get_trend_evidence_drilldown(trend_id: str): """Drill down from a trend window to its contributing documents and raw artifacts. Returns the trend summary plus each contributing document with storage refs, intelligence extraction, and impact records — full provenance chain. Requirements: 10.4, 6.5 """ trend_row = await pool.fetchrow( """SELECT id, entity_type, entity_id, window, trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, market_context, generated_at FROM trend_windows WHERE id = $1""", trend_id, ) if not trend_row: raise HTTPException(404, "Trend not found") trend_dict = _row_to_dict(trend_row) for jf in ( "top_supporting_evidence", "top_opposing_evidence", "dominant_catalysts", "material_risks", "market_context", ): trend_dict[jf] = _parse_jsonb(trend_dict.get(jf)) # Fetch trend evidence with full document details evidence_rows = await pool.fetch( """SELECT te.id AS evidence_id, te.document_id, te.evidence_type, te.rank_score, te.weight_component, te.impact_component, te.recency_component, te.confidence_component, te.sentiment_value, d.document_type, d.source_type, d.publisher, d.url, d.canonical_url, d.title, d.published_at, d.retrieved_at, d.content_hash, d.raw_storage_ref, d.normalized_storage_ref, d.parse_quality_score, d.parse_confidence, d.status AS document_status FROM trend_evidence te LEFT JOIN documents d ON d.id = te.document_id WHERE te.trend_window_id = $1 ORDER BY te.rank_score DESC""", trend_id, ) evidence_list = [] for ev in evidence_rows: ev_dict = _row_to_dict(ev) ev_dict["intelligence"] = None ev_dict["company_impacts"] = [] doc_id = ev["document_id"] if doc_id: intel_row = await pool.fetchrow( """SELECT id, document_id, summary, macro_themes, novelty_score, source_credibility, extraction_warnings, confidence, model_provider, model_name, prompt_version, schema_version, raw_output_ref, prompt_ref, validation_status, validation_errors, created_at FROM document_intelligence WHERE document_id = $1 ORDER BY created_at DESC LIMIT 1""", doc_id, ) if intel_row: intel_dict = _row_to_dict(intel_row) for jf in ("macro_themes", "extraction_warnings", "validation_errors"): intel_dict[jf] = _parse_jsonb(intel_dict.get(jf)) ev_dict["intelligence"] = intel_dict impacts = await pool.fetch( """SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment, dir.impact_score, dir.impact_horizon, dir.catalyst_type, dir.key_facts, dir.risks, dir.evidence_spans, c.legal_name FROM document_impact_records dir JOIN companies c ON c.id = dir.company_id WHERE dir.intelligence_id = $1""", intel_row["id"], ) for imp in impacts: imp_dict = _row_to_dict(imp) for jf in ("key_facts", "risks", "evidence_spans"): imp_dict[jf] = _parse_jsonb(imp_dict.get(jf)) ev_dict["company_impacts"].append(imp_dict) evidence_list.append(ev_dict) return { "trend": trend_dict, "evidence": evidence_list, } # --------------------------------------------------------------------------- # Order History (Requirement 11.1, 11.3) # --------------------------------------------------------------------------- @app.get("/api/orders") async def list_orders( ticker: Optional[str] = None, status: Optional[str] = None, side: Optional[str] = None, since: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, ): """List orders with optional filters.""" conditions: list[str] = [] params: list[Any] = [] idx = 1 if ticker: conditions.append(f"o.ticker = ${idx}") params.append(ticker.upper()) idx += 1 if status: conditions.append(f"o.status = ${idx}") params.append(status) idx += 1 if side: conditions.append(f"o.side = ${idx}") params.append(side) idx += 1 if since: conditions.append(f"o.created_at >= ${idx}::timestamptz") params.append(since) idx += 1 where = ("WHERE " + " AND ".join(conditions)) if conditions else "" rows = await pool.fetch( f"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker, o.side, o.order_type, o.quantity, o.limit_price, o.stop_price, o.status, o.broker_order_id, o.submitted_at, o.acknowledged_at, o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason, o.fill_price, o.fill_quantity, o.created_at FROM orders o {where} ORDER BY o.created_at DESC LIMIT ${idx} OFFSET ${idx + 1}""", *params, limit, offset, ) return [_row_to_dict(r) for r in rows] @app.get("/api/orders/{order_id}") async def get_order(order_id: str): """Get a single order with its events, decision trace, and full audit trail. Requirement 11.3: expose full audit trail from ingestion through broker execution and eventual market outcome. """ row = await pool.fetchrow( """SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker, o.side, o.order_type, o.quantity, o.limit_price, o.stop_price, o.status, o.idempotency_key, o.broker_order_id, o.decision_trace, o.submitted_at, o.acknowledged_at, o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason, o.fill_price, o.fill_quantity, o.created_at, o.updated_at FROM orders o WHERE o.id = $1""", order_id, ) if not row: raise HTTPException(404, "Order not found") result = _row_to_dict(row) result["decision_trace"] = _parse_jsonb(result.get("decision_trace")) # Order events events = await pool.fetch( """SELECT id, event_type, data, broker_timestamp, created_at FROM order_events WHERE order_id = $1 ORDER BY created_at ASC""", order_id, ) result["events"] = [] for ev in events: ev_dict = _row_to_dict(ev) ev_dict["data"] = _parse_jsonb(ev_dict.get("data")) result["events"].append(ev_dict) # Full audit trail (Requirement 11.3) recommendation_id = str(row["recommendation_id"]) if row["recommendation_id"] else None result["audit_trail"] = await get_order_audit_trail(pool, order_id, recommendation_id) return result # --------------------------------------------------------------------------- # Positions (Requirement 11.1) # --------------------------------------------------------------------------- @app.get("/api/positions") async def list_positions( ticker: Optional[str] = None, ): """List current positions.""" if ticker: rows = await pool.fetch( """SELECT p.id, p.broker_account_id, p.ticker, p.quantity, p.avg_entry_price, p.current_price, p.unrealized_pnl, p.realized_pnl, p.updated_at FROM positions p WHERE p.ticker = $1 ORDER BY p.ticker""", ticker.upper(), ) else: rows = await pool.fetch( """SELECT p.id, p.broker_account_id, p.ticker, p.quantity, p.avg_entry_price, p.current_price, p.unrealized_pnl, p.realized_pnl, p.updated_at FROM positions p ORDER BY p.ticker""", ) return [_row_to_dict(r) for r in rows] # --------------------------------------------------------------------------- # Audit Trail (Requirement 11.3) # --------------------------------------------------------------------------- @app.get("/api/audit/{entity_type}/{entity_id}") async def get_audit_trail(entity_type: str, entity_id: str): """Get audit events for any entity type and ID.""" events = await get_entity_audit_trail(pool, entity_type, entity_id) if not events: raise HTTPException(404, "No audit events found") return events # --------------------------------------------------------------------------- # Admin: Source Health (Requirement 11.1 - source health) # --------------------------------------------------------------------------- @app.get("/api/admin/sources/health") async def get_source_health( source_type: Optional[str] = None, company_id: Optional[str] = None, active_only: bool = True, ): """Source health overview: each source with its latest ingestion status and failure counts. Design: Section 9.1 (source health and job state) """ conditions = [] params: list[Any] = [] idx = 1 if active_only: conditions.append(f"s.active = ${idx}") params.append(True) idx += 1 if source_type: conditions.append(f"s.source_type = ${idx}") params.append(source_type) idx += 1 if company_id: conditions.append(f"s.company_id = ${idx}") params.append(company_id) idx += 1 where = ("WHERE " + " AND ".join(conditions)) if conditions else "" rows = await pool.fetch( f"""SELECT s.id AS source_id, s.source_type, s.source_name, s.credibility_score, s.active, c.ticker, c.legal_name, c.id AS company_id, latest.status AS last_run_status, latest.started_at AS last_run_at, latest.error_message AS last_error, latest.items_fetched AS last_items_fetched, latest.items_new AS last_items_new, COALESCE(stats.total_runs, 0) AS total_runs_24h, COALESCE(stats.failed_runs, 0) AS failed_runs_24h, COALESCE(stats.total_items, 0) AS total_items_24h FROM sources s JOIN companies c ON c.id = s.company_id LEFT JOIN LATERAL ( SELECT ir.status, ir.started_at, ir.error_message, ir.items_fetched, ir.items_new FROM ingestion_runs ir WHERE ir.source_id = s.id ORDER BY ir.started_at DESC LIMIT 1 ) latest ON TRUE LEFT JOIN LATERAL ( SELECT COUNT(*) AS total_runs, COUNT(*) FILTER (WHERE ir2.status = 'failed') AS failed_runs, COALESCE(SUM(ir2.items_fetched), 0) AS total_items FROM ingestion_runs ir2 WHERE ir2.source_id = s.id AND ir2.started_at >= NOW() - INTERVAL '24 hours' ) stats ON TRUE {where} ORDER BY c.ticker, s.source_type""", *params, ) return [_row_to_dict(r) for r in rows] @app.get("/api/admin/sources/{source_id}/runs") async def get_source_runs( source_id: str, limit: int = Query(default=20, le=100), offset: int = 0, ): """Recent ingestion runs for a specific source.""" rows = await pool.fetch( """SELECT id, source_id, company_id, source_type, status, started_at, completed_at, items_fetched, items_new, error_message, retry_count, next_retry_at FROM ingestion_runs WHERE source_id = $1 ORDER BY started_at DESC LIMIT $2 OFFSET $3""", source_id, limit, offset, ) return [_row_to_dict(r) for r in rows] @app.put("/api/admin/sources/{source_id}/toggle") async def toggle_source(source_id: str, active: bool = True): """Enable or disable a source.""" row = await pool.fetchrow( """UPDATE sources SET active = $2, updated_at = NOW() WHERE id = $1 RETURNING id, source_type, source_name, active""", source_id, active, ) if not row: raise HTTPException(404, "Source not found") return _row_to_dict(row) @app.put("/api/admin/sources/{source_id}/credibility") async def update_source_credibility(source_id: str, credibility_score: float = Query(ge=0.0, le=1.0)): """Update a source's credibility score.""" row = await pool.fetchrow( """UPDATE sources SET credibility_score = $2, updated_at = NOW() WHERE id = $1 RETURNING id, source_type, source_name, credibility_score""", source_id, credibility_score, ) if not row: raise HTTPException(404, "Source not found") return _row_to_dict(row) # --------------------------------------------------------------------------- # Admin: Symbol Configs (Requirement 11.1 - symbol configs) # --------------------------------------------------------------------------- @app.put("/api/admin/companies/{company_id}/toggle") async def toggle_company(company_id: str, active: bool = True): """Enable or disable a tracked company.""" row = await pool.fetchrow( """UPDATE companies SET active = $2, updated_at = NOW() WHERE id = $1 RETURNING id, ticker, legal_name, active""", company_id, active, ) if not row: raise HTTPException(404, "Company not found") return _row_to_dict(row) @app.put("/api/admin/companies/{company_id}/sector") async def update_company_sector( company_id: str, sector: str = Query(...), industry: Optional[str] = None, ): """Update a company's sector and industry classification.""" if industry is not None: row = await pool.fetchrow( """UPDATE companies SET sector = $2, industry = $3, updated_at = NOW() WHERE id = $1 RETURNING id, ticker, legal_name, sector, industry""", company_id, sector, industry, ) else: row = await pool.fetchrow( """UPDATE companies SET sector = $2, updated_at = NOW() WHERE id = $1 RETURNING id, ticker, legal_name, sector, industry""", company_id, sector, ) if not row: raise HTTPException(404, "Company not found") return _row_to_dict(row) @app.get("/api/admin/companies/coverage") async def get_symbol_coverage(): """Overview of source coverage per active company. Shows how many active sources of each type are configured per symbol, useful for identifying coverage gaps. """ rows = await pool.fetch( """SELECT c.id AS company_id, c.ticker, c.legal_name, c.sector, c.active, COUNT(s.id) FILTER (WHERE s.active) AS active_sources, COUNT(s.id) FILTER (WHERE s.source_type = 'market_api' AND s.active) AS market_sources, COUNT(s.id) FILTER (WHERE s.source_type = 'news_api' AND s.active) AS news_sources, COUNT(s.id) FILTER (WHERE s.source_type = 'filings_api' AND s.active) AS filings_sources, COUNT(s.id) FILTER (WHERE s.source_type = 'web_scrape' AND s.active) AS web_scrape_sources, COUNT(s.id) FILTER (WHERE s.source_type = 'broker' AND s.active) AS broker_sources FROM companies c LEFT JOIN sources s ON s.company_id = c.id WHERE c.active = TRUE GROUP BY c.id, c.ticker, c.legal_name, c.sector, c.active ORDER BY c.ticker""", ) return [_row_to_dict(r) for r in rows] # --------------------------------------------------------------------------- # Admin: Trading Mode (Requirement 8.1, 8.2, 11.1) # --------------------------------------------------------------------------- @app.get("/api/admin/trading/config") async def get_trading_config(): """Get the current active risk/trading configuration.""" row = await pool.fetchrow( """SELECT id, name, trading_mode, config, active, created_at, updated_at FROM risk_configs WHERE active = TRUE ORDER BY updated_at DESC LIMIT 1""", ) if not row: return {"trading_mode": "paper", "config": {}, "message": "No active config found, using defaults"} result = _row_to_dict(row) result["config"] = _parse_jsonb(result.get("config")) return result @app.put("/api/admin/trading/mode") async def set_trading_mode(mode: str = Query(..., pattern="^(paper|live|disabled)$")): """Switch the active trading mode. Requirement 8.1: support paper and live as separate execution environments. Requirement 8.2: live mode requires operator approval controls. """ row = await pool.fetchrow( """UPDATE risk_configs SET trading_mode = $1, updated_at = NOW() WHERE active = TRUE RETURNING id, name, trading_mode""", mode, ) if not row: # No active config exists yet — create one with the requested mode row = await pool.fetchrow( """INSERT INTO risk_configs (name, trading_mode, config, active) VALUES ('default', $1, '{}', TRUE) RETURNING id, name, trading_mode""", mode, ) return _row_to_dict(row) @app.put("/api/admin/trading/config") async def update_trading_config(config: dict[str, Any]): """Update the active risk configuration JSON. Accepts a partial or full risk config object. The config is stored as JSONB alongside the trading_mode in risk_configs. """ config_json = json.dumps(config) row = await pool.fetchrow( """UPDATE risk_configs SET config = $1::jsonb, updated_at = NOW() WHERE active = TRUE RETURNING id, name, trading_mode, config""", config_json, ) if not row: row = await pool.fetchrow( """INSERT INTO risk_configs (name, trading_mode, config, active) VALUES ('default', 'paper', $1::jsonb, TRUE) RETURNING id, name, trading_mode, config""", config_json, ) result = _row_to_dict(row) result["config"] = _parse_jsonb(result.get("config")) return result @app.get("/api/admin/trading/approvals") async def list_pending_approvals(): """List pending operator approval requests for live trading orders.""" rows = await pool.fetch( """SELECT id, order_job, recommendation_id, ticker, side, quantity, estimated_value, status, risk_evaluation_id, requested_by, reviewed_by, review_note, expires_at, requested_at, reviewed_at FROM operator_approvals WHERE status = 'pending' ORDER BY requested_at ASC""", ) results = [] for r in rows: d = _row_to_dict(r) d["order_job"] = _parse_jsonb(d.get("order_job")) results.append(d) return results @app.put("/api/admin/trading/approvals/{approval_id}") async def review_approval_request( approval_id: str, approved: bool = Query(...), reviewed_by: str = "operator", review_note: str = "", ): """Approve or reject a pending operator approval request. Requirement 8.2: live orders require operator approval controls. """ now = datetime.now(timezone.utc) new_status = "approved" if approved else "rejected" row = await pool.fetchrow( """UPDATE operator_approvals SET status = $2, reviewed_by = $3, review_note = $4, reviewed_at = $5, updated_at = NOW() WHERE id = $1::uuid AND status = 'pending' RETURNING id, ticker, status, reviewed_by""", approval_id, new_status, reviewed_by, review_note, now, ) if not row: raise HTTPException(404, "Approval not found or no longer pending") return _row_to_dict(row) @app.get("/api/admin/trading/lockouts") async def list_active_lockouts(): """List active symbol lockouts (news-shock, cooldown).""" rows = await pool.fetch( """SELECT id, ticker, lockout_type, reason, expires_at, created_at FROM symbol_lockouts WHERE expires_at > NOW() ORDER BY expires_at ASC""", ) return [_row_to_dict(r) for r in rows] # --------------------------------------------------------------------------- # Operational Dashboard (Requirement 12.1, 12.2, 12.3) # --------------------------------------------------------------------------- @app.get("/api/ops/ingestion/throughput") async def get_ingestion_throughput( hours: int = Query(default=24, ge=1, le=168), bucket: str = Query(default="1h", pattern="^(15m|1h|6h|1d)$"), ): """Ingestion throughput over time, bucketed by interval. Returns document counts and item counts per time bucket, broken down by source type. Powers the ingestion throughput chart. Requirements: 12.1, 12.3 """ bucket_interval = { "15m": "15 minutes", "1h": "1 hour", "6h": "6 hours", "1d": "1 day", }[bucket] rows = await pool.fetch( f"""SELECT date_trunc('hour', ir.started_at) - (EXTRACT(minute FROM ir.started_at)::int % EXTRACT(epoch FROM INTERVAL '{bucket_interval}')::int / 60) * INTERVAL '1 minute' AS bucket_start, ir.source_type, COUNT(*) AS run_count, COUNT(*) FILTER (WHERE ir.status = 'completed') AS completed, COUNT(*) FILTER (WHERE ir.status = 'failed') AS failed, COALESCE(SUM(ir.items_fetched), 0) AS items_fetched, COALESCE(SUM(ir.items_new), 0) AS items_new FROM ingestion_runs ir WHERE ir.started_at >= NOW() - INTERVAL '1 hour' * $1 GROUP BY bucket_start, ir.source_type ORDER BY bucket_start DESC, ir.source_type""", hours, ) return [_row_to_dict(r) for r in rows] @app.get("/api/ops/ingestion/summary") async def get_ingestion_summary( hours: int = Query(default=24, ge=1, le=168), ): """High-level ingestion summary for the operational dashboard. Returns total runs, success/failure counts, items processed, and per-source-type breakdown for the given time window. Requirements: 12.1 """ row = await pool.fetchrow( """SELECT COUNT(*) AS total_runs, COUNT(*) FILTER (WHERE status = 'completed') AS completed, COUNT(*) FILTER (WHERE status = 'failed') AS failed, COUNT(*) FILTER (WHERE status = 'pending') AS pending, COUNT(*) FILTER (WHERE status = 'running') AS running, COALESCE(SUM(items_fetched), 0) AS total_items_fetched, COALESCE(SUM(items_new), 0) AS total_items_new, COUNT(DISTINCT source_id) AS active_sources, COUNT(DISTINCT company_id) AS active_companies FROM ingestion_runs WHERE started_at >= NOW() - INTERVAL '1 hour' * $1""", hours, ) by_type = await pool.fetch( """SELECT source_type, COUNT(*) AS runs, COUNT(*) FILTER (WHERE status = 'completed') AS completed, COUNT(*) FILTER (WHERE status = 'failed') AS failed, COALESCE(SUM(items_fetched), 0) AS items_fetched, COALESCE(SUM(items_new), 0) AS items_new FROM ingestion_runs WHERE started_at >= NOW() - INTERVAL '1 hour' * $1 GROUP BY source_type ORDER BY runs DESC""", hours, ) result = _row_to_dict(row) if row else {} result["by_source_type"] = [_row_to_dict(r) for r in by_type] result["hours"] = hours return result @app.get("/api/ops/model/failures") async def get_model_failures( hours: int = Query(default=24, ge=1, le=168), limit: int = Query(default=50, le=200), ): """Recent model extraction failures with error details. Returns individual failed extraction attempts for debugging. Requirements: 12.2 """ rows = await pool.fetch( """SELECT mpm.id, mpm.document_id, mpm.ticker, mpm.model_name, mpm.prompt_version, mpm.schema_version, mpm.attempt_count, mpm.total_duration_ms, mpm.validation_status, mpm.validation_error_count, mpm.validation_errors, mpm.retry_count, mpm.confidence, mpm.recorded_at, d.title AS document_title, d.document_type, d.source_type FROM model_performance_metrics mpm LEFT JOIN documents d ON d.id = mpm.document_id WHERE mpm.success = FALSE AND mpm.recorded_at >= NOW() - INTERVAL '1 hour' * $1 ORDER BY mpm.recorded_at DESC LIMIT $2""", hours, limit, ) results = [] for r in rows: d = _row_to_dict(r) d["validation_errors"] = _parse_jsonb(d.get("validation_errors")) results.append(d) return results @app.get("/api/ops/model/performance") async def get_model_performance( hours: int = Query(default=24, ge=1, le=168), model_name: Optional[str] = None, ): """Aggregated model performance metrics for the operational dashboard. Returns success rate, latency percentiles, retry rate, confidence distribution, and token usage for the given time window. Requirements: 12.2 """ return await get_model_performance_summary( pool, model_name=model_name, hours=hours, ) @app.get("/api/ops/pipeline/health") async def get_pipeline_health( hours: int = Query(default=24, ge=1, le=168), ): """Pipeline stage health summary across ingestion, parsing, extraction, and aggregation. Shows document counts at each processing stage and identifies bottlenecks. Requirements: 12.1 """ # Document status distribution (pipeline stages) doc_stages = await pool.fetch( """SELECT status, COUNT(*) AS doc_count FROM documents WHERE created_at >= NOW() - INTERVAL '1 hour' * $1 GROUP BY status ORDER BY doc_count DESC""", hours, ) # Parsing quality distribution parse_quality = await pool.fetchrow( """SELECT COUNT(*) AS total_parsed, COUNT(*) FILTER (WHERE parse_confidence = 'high') AS high_confidence, COUNT(*) FILTER (WHERE parse_confidence = 'medium') AS medium_confidence, COUNT(*) FILTER (WHERE parse_confidence = 'low') AS low_confidence, COUNT(*) FILTER (WHERE parse_confidence = 'unknown' OR parse_confidence IS NULL) AS unknown_confidence, ROUND(AVG(parse_quality_score)::numeric, 3) AS avg_quality_score FROM documents WHERE created_at >= NOW() - INTERVAL '1 hour' * $1 AND status IN ('parsed', 'extracted', 'aggregated')""", hours, ) # Extraction validation distribution extraction_stats = await pool.fetchrow( """SELECT COUNT(*) AS total_extractions, COUNT(*) FILTER (WHERE validation_status = 'valid') AS valid, COUNT(*) FILTER (WHERE validation_status = 'failed') AS failed, COUNT(*) FILTER (WHERE validation_status = 'pending') AS pending, ROUND(AVG(confidence)::numeric, 3) AS avg_confidence, ROUND(AVG(retry_count)::numeric, 2) AS avg_retries FROM document_intelligence WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""", hours, ) # Aggregation output (trend windows generated) trend_stats = await pool.fetchrow( """SELECT COUNT(*) AS trends_generated, COUNT(DISTINCT entity_id) AS symbols_covered, ROUND(AVG(confidence)::numeric, 3) AS avg_trend_confidence, ROUND(AVG(contradiction_score)::numeric, 3) AS avg_contradiction FROM trend_windows WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""", hours, ) return { "hours": hours, "document_stages": [_row_to_dict(r) for r in doc_stages], "parsing": _row_to_dict(parse_quality) if parse_quality else {}, "extraction": _row_to_dict(extraction_stats) if extraction_stats else {}, "aggregation": _row_to_dict(trend_stats) if trend_stats else {}, } @app.get("/api/ops/sources/coverage-gaps") async def get_source_coverage_gaps(): """Identify symbols with missing or insufficient source coverage. Returns companies that lack one or more expected source types (market_api, news_api, filings_api), or have sources that haven't produced successful ingestion runs recently. Requirements: 12.3 """ # Companies missing expected source types missing_types = await pool.fetch( """SELECT c.id AS company_id, c.ticker, c.legal_name, c.sector, ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) AS active_types, ARRAY['market_api', 'news_api', 'filings_api'] AS expected_types FROM companies c LEFT JOIN sources s ON s.company_id = c.id AND s.active = TRUE WHERE c.active = TRUE GROUP BY c.id, c.ticker, c.legal_name, c.sector HAVING NOT ARRAY['market_api', 'news_api', 'filings_api'] <@ ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) OR ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) IS NULL ORDER BY c.ticker""", ) # Sources with no successful runs in the last 24 hours stale_sources = await pool.fetch( """SELECT s.id AS source_id, s.source_type, s.source_name, c.ticker, c.legal_name, MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') AS last_success, MAX(ir.started_at) AS last_attempt, COUNT(*) FILTER (WHERE ir.status = 'failed' AND ir.started_at >= NOW() - INTERVAL '24 hours') AS recent_failures FROM sources s JOIN companies c ON c.id = s.company_id LEFT JOIN ingestion_runs ir ON ir.source_id = s.id WHERE s.active = TRUE AND c.active = TRUE GROUP BY s.id, s.source_type, s.source_name, c.ticker, c.legal_name HAVING MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') < NOW() - INTERVAL '24 hours' OR MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') IS NULL ORDER BY c.ticker, s.source_type""", ) return { "missing_source_types": [_row_to_dict(r) for r in missing_types], "stale_sources": [_row_to_dict(r) for r in stale_sources], } # --------------------------------------------------------------------------- # Analytics: Trino SQL Proxy (Requirement 10.1, 10.3, 13.7) # --------------------------------------------------------------------------- @app.post("/api/analytics/query") async def analytics_query(body: dict[str, Any]): """Proxy SQL to Trino, enforce row limits, return structured results. Design: Section 9.3 (API proxy for Trino) Requirements: 10.1, 10.3, 13.7 """ sql = body.get("sql", "").strip() if not sql: raise HTTPException(400, "sql is required") limit = min(int(body.get("limit", 1000)), 10000) trino_host = config.trino.host trino_port = config.trino.port trino_catalog = config.trino.catalog trino_schema = config.trino.schema trino_url = f"http://{trino_host}:{trino_port}/v1/statement" headers = { "X-Trino-User": "stonks-dashboard", "X-Trino-Catalog": trino_catalog, "X-Trino-Schema": trino_schema, } start = _time.monotonic() try: async with httpx.AsyncClient(timeout=60.0) as client: # Submit query resp = await client.post(trino_url, content=sql, headers=headers) if resp.status_code != 200: raise HTTPException(502, f"Trino error: {resp.text[:500]}") result = resp.json() columns: list[dict[str, str]] = [] all_rows: list[list[Any]] = [] # Extract columns from first response if "columns" in result: columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]] if "data" in result: all_rows.extend(result["data"]) # Follow nextUri to get all results while "nextUri" in result and len(all_rows) < limit: next_url = result["nextUri"] resp = await client.get(next_url, headers=headers) if resp.status_code != 200: break result = resp.json() if "columns" in result and not columns: columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]] if "data" in result: all_rows.extend(result["data"]) elapsed_ms = round((_time.monotonic() - start) * 1000) all_rows = all_rows[:limit] return { "columns": columns, "rows": all_rows, "row_count": len(all_rows), "elapsed_ms": elapsed_ms, } except httpx.ConnectError: raise HTTPException(502, "Cannot connect to Trino") except httpx.TimeoutException: raise HTTPException(504, "Trino query timed out") @app.get("/api/analytics/schema") async def analytics_schema(): """Return Trino catalog/schema/table/column metadata for the schema browser. Requirements: 13.7 """ trino_host = config.trino.host trino_port = config.trino.port trino_catalog = config.trino.catalog trino_schema = config.trino.schema trino_url = f"http://{trino_host}:{trino_port}/v1/statement" headers = { "X-Trino-User": "stonks-dashboard", "X-Trino-Catalog": trino_catalog, "X-Trino-Schema": trino_schema, } async def _run_trino_query(sql: str) -> list[list[Any]]: rows: list[list[Any]] = [] async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post(trino_url, content=sql, headers=headers) if resp.status_code != 200: return rows result = resp.json() if "data" in result: rows.extend(result["data"]) while "nextUri" in result: resp = await client.get(result["nextUri"], headers=headers) if resp.status_code != 200: break result = resp.json() if "data" in result: rows.extend(result["data"]) return rows try: # Get tables table_rows = await _run_trino_query( f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{trino_schema}' ORDER BY table_name" ) tables = [] for tr in table_rows: table_name = tr[0] if tr else None if not table_name: continue # Get columns for each table col_rows = await _run_trino_query( f"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{trino_schema}' AND table_name = '{table_name}' ORDER BY ordinal_position" ) columns = [{"name": cr[0], "type": cr[1]} for cr in col_rows if cr] tables.append({"name": table_name, "columns": columns}) return { "catalog": trino_catalog, "schema": trino_schema, "tables": tables, } except Exception: return {"catalog": trino_catalog, "schema": trino_schema, "tables": []} # --------------------------------------------------------------------------- # Analytics: Saved Queries (Requirement 13.7) # --------------------------------------------------------------------------- class SavedQueryBody(BaseModel): name: str description: str = "" sql_text: str @app.get("/api/analytics/saved-queries") async def list_saved_queries(): """List all saved queries.""" rows = await pool.fetch( "SELECT id, name, description, sql_text, created_by, created_at, updated_at FROM saved_queries ORDER BY updated_at DESC" ) return [_row_to_dict(r) for r in rows] @app.post("/api/analytics/saved-queries", status_code=201) async def create_saved_query(body: SavedQueryBody): """Save a new query.""" row = await pool.fetchrow( """INSERT INTO saved_queries (name, description, sql_text) VALUES ($1, $2, $3) RETURNING id, name, description, sql_text, created_by, created_at""", body.name, body.description, body.sql_text, ) return _row_to_dict(row) @app.delete("/api/analytics/saved-queries/{query_id}") async def delete_saved_query(query_id: str): """Delete a saved query.""" result = await pool.execute("DELETE FROM saved_queries WHERE id = $1::uuid", query_id) if result == "DELETE 0": raise HTTPException(404, "Query not found") return {"status": "deleted"}