phase 16: React dashboard with full platform control and analytics
This commit is contained in:
@@ -20,6 +20,7 @@ from typing import Any, Optional
|
||||
|
||||
import asyncpg
|
||||
from fastapi import FastAPI, HTTPException, Query, Request
|
||||
from pydantic import BaseModel
|
||||
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.responses import Response
|
||||
@@ -1505,3 +1506,185 @@ async def get_source_coverage_gaps():
|
||||
"missing_source_types": [_row_to_dict(r) for r in missing_types],
|
||||
"stale_sources": [_row_to_dict(r) for r in stale_sources],
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analytics: Trino SQL Proxy (Requirement 10.1, 10.3, 13.7)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
import time as _time
|
||||
import httpx
|
||||
|
||||
|
||||
@app.post("/api/analytics/query")
|
||||
async def analytics_query(body: dict[str, Any]):
|
||||
"""Proxy SQL to Trino, enforce row limits, return structured results.
|
||||
|
||||
Design: Section 9.3 (API proxy for Trino)
|
||||
Requirements: 10.1, 10.3, 13.7
|
||||
"""
|
||||
sql = body.get("sql", "").strip()
|
||||
if not sql:
|
||||
raise HTTPException(400, "sql is required")
|
||||
|
||||
limit = min(int(body.get("limit", 1000)), 10000)
|
||||
|
||||
trino_host = config.trino.host
|
||||
trino_port = config.trino.port
|
||||
trino_catalog = config.trino.catalog
|
||||
trino_schema = config.trino.schema
|
||||
|
||||
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
|
||||
headers = {
|
||||
"X-Trino-User": "stonks-dashboard",
|
||||
"X-Trino-Catalog": trino_catalog,
|
||||
"X-Trino-Schema": trino_schema,
|
||||
}
|
||||
|
||||
start = _time.monotonic()
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||||
# Submit query
|
||||
resp = await client.post(trino_url, content=sql, headers=headers)
|
||||
if resp.status_code != 200:
|
||||
raise HTTPException(502, f"Trino error: {resp.text[:500]}")
|
||||
|
||||
result = resp.json()
|
||||
columns: list[dict[str, str]] = []
|
||||
all_rows: list[list[Any]] = []
|
||||
|
||||
# Extract columns from first response
|
||||
if "columns" in result:
|
||||
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
|
||||
|
||||
if "data" in result:
|
||||
all_rows.extend(result["data"])
|
||||
|
||||
# Follow nextUri to get all results
|
||||
while "nextUri" in result and len(all_rows) < limit:
|
||||
next_url = result["nextUri"]
|
||||
resp = await client.get(next_url, headers=headers)
|
||||
if resp.status_code != 200:
|
||||
break
|
||||
result = resp.json()
|
||||
if "columns" in result and not columns:
|
||||
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
|
||||
if "data" in result:
|
||||
all_rows.extend(result["data"])
|
||||
|
||||
elapsed_ms = round((_time.monotonic() - start) * 1000)
|
||||
all_rows = all_rows[:limit]
|
||||
|
||||
return {
|
||||
"columns": columns,
|
||||
"rows": all_rows,
|
||||
"row_count": len(all_rows),
|
||||
"elapsed_ms": elapsed_ms,
|
||||
}
|
||||
|
||||
except httpx.ConnectError:
|
||||
raise HTTPException(502, "Cannot connect to Trino")
|
||||
except httpx.TimeoutException:
|
||||
raise HTTPException(504, "Trino query timed out")
|
||||
|
||||
|
||||
@app.get("/api/analytics/schema")
|
||||
async def analytics_schema():
|
||||
"""Return Trino catalog/schema/table/column metadata for the schema browser.
|
||||
|
||||
Requirements: 13.7
|
||||
"""
|
||||
trino_host = config.trino.host
|
||||
trino_port = config.trino.port
|
||||
trino_catalog = config.trino.catalog
|
||||
trino_schema = config.trino.schema
|
||||
|
||||
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
|
||||
headers = {
|
||||
"X-Trino-User": "stonks-dashboard",
|
||||
"X-Trino-Catalog": trino_catalog,
|
||||
"X-Trino-Schema": trino_schema,
|
||||
}
|
||||
|
||||
async def _run_trino_query(sql: str) -> list[list[Any]]:
|
||||
rows: list[list[Any]] = []
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.post(trino_url, content=sql, headers=headers)
|
||||
if resp.status_code != 200:
|
||||
return rows
|
||||
result = resp.json()
|
||||
if "data" in result:
|
||||
rows.extend(result["data"])
|
||||
while "nextUri" in result:
|
||||
resp = await client.get(result["nextUri"], headers=headers)
|
||||
if resp.status_code != 200:
|
||||
break
|
||||
result = resp.json()
|
||||
if "data" in result:
|
||||
rows.extend(result["data"])
|
||||
return rows
|
||||
|
||||
try:
|
||||
# Get tables
|
||||
table_rows = await _run_trino_query(
|
||||
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{trino_schema}' ORDER BY table_name"
|
||||
)
|
||||
tables = []
|
||||
for tr in table_rows:
|
||||
table_name = tr[0] if tr else None
|
||||
if not table_name:
|
||||
continue
|
||||
# Get columns for each table
|
||||
col_rows = await _run_trino_query(
|
||||
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{trino_schema}' AND table_name = '{table_name}' ORDER BY ordinal_position"
|
||||
)
|
||||
columns = [{"name": cr[0], "type": cr[1]} for cr in col_rows if cr]
|
||||
tables.append({"name": table_name, "columns": columns})
|
||||
|
||||
return {
|
||||
"catalog": trino_catalog,
|
||||
"schema": trino_schema,
|
||||
"tables": tables,
|
||||
}
|
||||
except Exception:
|
||||
return {"catalog": trino_catalog, "schema": trino_schema, "tables": []}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analytics: Saved Queries (Requirement 13.7)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class SavedQueryBody(BaseModel):
|
||||
name: str
|
||||
description: str = ""
|
||||
sql_text: str
|
||||
|
||||
|
||||
@app.get("/api/analytics/saved-queries")
|
||||
async def list_saved_queries():
|
||||
"""List all saved queries."""
|
||||
rows = await pool.fetch(
|
||||
"SELECT id, name, description, sql_text, created_by, created_at, updated_at FROM saved_queries ORDER BY updated_at DESC"
|
||||
)
|
||||
return [_row_to_dict(r) for r in rows]
|
||||
|
||||
|
||||
@app.post("/api/analytics/saved-queries", status_code=201)
|
||||
async def create_saved_query(body: SavedQueryBody):
|
||||
"""Save a new query."""
|
||||
row = await pool.fetchrow(
|
||||
"""INSERT INTO saved_queries (name, description, sql_text)
|
||||
VALUES ($1, $2, $3)
|
||||
RETURNING id, name, description, sql_text, created_by, created_at""",
|
||||
body.name, body.description, body.sql_text,
|
||||
)
|
||||
return _row_to_dict(row)
|
||||
|
||||
|
||||
@app.delete("/api/analytics/saved-queries/{query_id}")
|
||||
async def delete_saved_query(query_id: str):
|
||||
"""Delete a saved query."""
|
||||
result = await pool.execute("DELETE FROM saved_queries WHERE id = $1::uuid", query_id)
|
||||
if result == "DELETE 0":
|
||||
raise HTTPException(404, "Query not found")
|
||||
return {"status": "deleted"}
|
||||
|
||||
Reference in New Issue
Block a user