1107d34027
- Strip SQL comments (-- and /* */) before checking for SELECT, so queries with leading comments don't get rejected - Show the actual error detail from the API response instead of generic 'API error 400' in the SQL Explorer UI
2354 lines
84 KiB
Python
2354 lines
84 KiB
Python
"""Query API - FastAPI application for analytics, evidence drill-down, and admin controls.
|
|
|
|
Exposes read-only endpoints for:
|
|
- Companies and watchlists (proxied from symbol registry data)
|
|
- Document timelines with intelligence
|
|
- Trend summaries
|
|
- Recommendation history with evidence
|
|
- Order history with audit trails
|
|
|
|
Requirements: 11.1, 11.2, 11.3
|
|
Design: Section 9.1 (Operational API)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time as _time
|
|
from contextlib import asynccontextmanager
|
|
from dataclasses import asdict
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
import asyncpg
|
|
import httpx
|
|
from fastapi import FastAPI, HTTPException, Query, Request
|
|
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
|
|
from pydantic import BaseModel
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.responses import Response
|
|
|
|
from services.aggregation.pattern_matcher import (
|
|
find_cross_company_patterns,
|
|
find_self_patterns,
|
|
)
|
|
from services.extractor.metrics import get_model_performance_summary
|
|
from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, record_audit_event
|
|
from services.shared.config import load_config
|
|
from services.shared.db import get_pg_pool
|
|
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
|
|
from services.shared.schemas import MAJOR_DECISION_CATALYSTS
|
|
|
|
logger = logging.getLogger("query_api")
|
|
|
|
config = load_config()
|
|
pool: Optional[asyncpg.Pool] = None
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
global pool
|
|
setup_logging("query_api", level=config.log_level, json_output=config.json_logs)
|
|
pool = await get_pg_pool(config)
|
|
yield
|
|
await pool.close()
|
|
|
|
|
|
app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan)
|
|
|
|
|
|
class TraceMiddleware(BaseHTTPMiddleware):
|
|
"""Inject trace context for every incoming HTTP request."""
|
|
|
|
async def dispatch(self, request: Request, call_next):
|
|
trace_id = request.headers.get("x-trace-id") or new_trace_id()
|
|
set_trace_context(trace_id=trace_id)
|
|
response = await call_next(request)
|
|
response.headers["x-trace-id"] = trace_id
|
|
return response
|
|
|
|
|
|
app.add_middleware(TraceMiddleware)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _row_to_dict(row: asyncpg.Record) -> dict[str, Any]:
|
|
"""Convert an asyncpg Record to a JSON-safe dict."""
|
|
d: dict[str, Any] = {}
|
|
for key, val in dict(row).items():
|
|
if isinstance(val, datetime):
|
|
d[key] = val.isoformat()
|
|
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, list, dict, type(None))):
|
|
d[key] = str(val)
|
|
else:
|
|
d[key] = val
|
|
return d
|
|
|
|
|
|
def _parse_jsonb(val: Any) -> Any:
|
|
"""Parse a JSONB value that may come back as str or already-decoded."""
|
|
if val is None:
|
|
return None
|
|
if isinstance(val, (dict, list)):
|
|
return val
|
|
try:
|
|
return json.loads(val)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return val
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Health
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
try:
|
|
await pool.fetchval("SELECT 1")
|
|
return {"status": "ok"}
|
|
except Exception:
|
|
raise HTTPException(503, "Database unavailable")
|
|
|
|
|
|
@app.get("/metrics")
|
|
async def metrics():
|
|
"""Expose Prometheus metrics for scraping.
|
|
|
|
Requirements: 12.1, 12.2
|
|
"""
|
|
return Response(
|
|
content=generate_latest(),
|
|
media_type=CONTENT_TYPE_LATEST,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Companies (Requirement 11.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/companies")
|
|
async def list_companies(
|
|
active: bool = True,
|
|
sector: Optional[str] = None,
|
|
ticker: Optional[str] = None,
|
|
):
|
|
"""List tracked companies with optional filters."""
|
|
conditions = ["c.active = $1"]
|
|
params: list[Any] = [active]
|
|
idx = 2
|
|
|
|
if sector:
|
|
conditions.append(f"c.sector = ${idx}")
|
|
params.append(sector)
|
|
idx += 1
|
|
if ticker:
|
|
conditions.append(f"c.ticker = ${idx}")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
|
|
where = " AND ".join(conditions)
|
|
rows = await pool.fetch(
|
|
f"""SELECT c.id, c.ticker, c.legal_name, c.exchange, c.sector,
|
|
c.industry, c.market_cap_bucket, c.active,
|
|
c.created_at, c.updated_at
|
|
FROM companies c
|
|
WHERE {where}
|
|
ORDER BY c.ticker""",
|
|
*params,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/companies/{company_id}")
|
|
async def get_company(company_id: str):
|
|
"""Get a single company with aliases and source count."""
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, ticker, legal_name, exchange, sector, industry,
|
|
market_cap_bucket, active, created_at, updated_at
|
|
FROM companies WHERE id = $1""",
|
|
company_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Company not found")
|
|
|
|
result = _row_to_dict(row)
|
|
|
|
aliases = await pool.fetch(
|
|
"SELECT id, alias, alias_type FROM company_aliases WHERE company_id = $1",
|
|
company_id,
|
|
)
|
|
result["aliases"] = [dict(a) for a in aliases]
|
|
|
|
source_count = await pool.fetchval(
|
|
"SELECT COUNT(*) FROM sources WHERE company_id = $1 AND active = true",
|
|
company_id,
|
|
)
|
|
result["active_source_count"] = source_count
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/api/companies/{company_id}/sources")
|
|
async def list_company_sources(company_id: str):
|
|
"""List sources configured for a company."""
|
|
rows = await pool.fetch(
|
|
"""SELECT id, source_type, source_name, config, credibility_score,
|
|
retention_days, access_policy, active
|
|
FROM sources WHERE company_id = $1 ORDER BY source_type""",
|
|
company_id,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Document Timelines (Requirement 11.1, 11.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/documents")
|
|
async def list_documents(
|
|
ticker: Optional[str] = None,
|
|
company_id: Optional[str] = None,
|
|
document_type: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
since: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""List documents with optional filters, ordered by published_at desc."""
|
|
conditions: list[str] = []
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if ticker:
|
|
conditions.append(f"""d.id IN (
|
|
SELECT document_id FROM document_company_mentions WHERE ticker = ${idx}
|
|
)""")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
if company_id:
|
|
conditions.append(f"""d.id IN (
|
|
SELECT document_id FROM document_company_mentions WHERE company_id = ${idx}
|
|
)""")
|
|
params.append(company_id)
|
|
idx += 1
|
|
if document_type:
|
|
conditions.append(f"d.document_type = ${idx}")
|
|
params.append(document_type)
|
|
idx += 1
|
|
if status:
|
|
conditions.append(f"d.status = ${idx}")
|
|
params.append(status)
|
|
idx += 1
|
|
if since:
|
|
conditions.append(f"d.published_at >= ${idx}::timestamptz")
|
|
params.append(since)
|
|
idx += 1
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT d.id, d.document_type, d.source_type, d.publisher, d.url,
|
|
d.title, d.published_at, d.retrieved_at, d.language,
|
|
d.content_hash, d.parse_quality_score, d.parse_confidence,
|
|
d.status, d.created_at
|
|
FROM documents d
|
|
{where}
|
|
ORDER BY d.published_at DESC NULLS LAST
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/documents/{document_id}")
|
|
async def get_document(document_id: str):
|
|
"""Get a single document with its intelligence extraction and company mentions."""
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, document_type, source_type, publisher, url, canonical_url,
|
|
title, published_at, retrieved_at, language, content_hash,
|
|
raw_storage_ref, normalized_storage_ref,
|
|
parse_quality_score, parse_confidence, status,
|
|
created_at, updated_at
|
|
FROM documents WHERE id = $1""",
|
|
document_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Document not found")
|
|
|
|
result = _row_to_dict(row)
|
|
|
|
# Company mentions
|
|
mentions = await pool.fetch(
|
|
"""SELECT dcm.company_id, dcm.ticker, dcm.mention_type, dcm.confidence,
|
|
c.legal_name
|
|
FROM document_company_mentions dcm
|
|
JOIN companies c ON c.id = dcm.company_id
|
|
WHERE dcm.document_id = $1""",
|
|
document_id,
|
|
)
|
|
result["company_mentions"] = [_row_to_dict(m) for m in mentions]
|
|
|
|
# Intelligence extraction
|
|
intel = await pool.fetchrow(
|
|
"""SELECT id, summary, macro_themes, novelty_score, source_credibility,
|
|
extraction_warnings, confidence, model_provider, model_name,
|
|
prompt_version, schema_version, validation_status,
|
|
validation_errors, created_at
|
|
FROM document_intelligence WHERE document_id = $1
|
|
ORDER BY created_at DESC LIMIT 1""",
|
|
document_id,
|
|
)
|
|
if intel:
|
|
intel_dict = _row_to_dict(intel)
|
|
intel_dict["macro_themes"] = _parse_jsonb(intel_dict.get("macro_themes"))
|
|
intel_dict["extraction_warnings"] = _parse_jsonb(intel_dict.get("extraction_warnings"))
|
|
intel_dict["validation_errors"] = _parse_jsonb(intel_dict.get("validation_errors"))
|
|
|
|
# Impact records per company
|
|
impacts = await pool.fetch(
|
|
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
|
|
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
|
|
dir.key_facts, dir.risks, dir.evidence_spans,
|
|
c.legal_name
|
|
FROM document_impact_records dir
|
|
JOIN companies c ON c.id = dir.company_id
|
|
WHERE dir.intelligence_id = $1""",
|
|
intel["id"],
|
|
)
|
|
impact_list = []
|
|
for imp in impacts:
|
|
imp_dict = _row_to_dict(imp)
|
|
imp_dict["key_facts"] = _parse_jsonb(imp_dict.get("key_facts"))
|
|
imp_dict["risks"] = _parse_jsonb(imp_dict.get("risks"))
|
|
imp_dict["evidence_spans"] = _parse_jsonb(imp_dict.get("evidence_spans"))
|
|
impact_list.append(imp_dict)
|
|
intel_dict["company_impacts"] = impact_list
|
|
result["intelligence"] = intel_dict
|
|
else:
|
|
result["intelligence"] = None
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trend Summaries (Requirement 11.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/trends")
|
|
async def list_trends(
|
|
ticker: Optional[str] = None,
|
|
entity_type: str = "company",
|
|
window: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""List trend summaries with optional filters."""
|
|
conditions = ["entity_type = $1"]
|
|
params: list[Any] = [entity_type]
|
|
idx = 2
|
|
|
|
if ticker:
|
|
conditions.append(f"entity_id = ${idx}")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
if window:
|
|
conditions.append(f"\"window\" = ${idx}")
|
|
params.append(window)
|
|
idx += 1
|
|
|
|
where = " AND ".join(conditions)
|
|
rows = await pool.fetch(
|
|
f"""SELECT id, entity_type, entity_id, "window", trend_direction,
|
|
trend_strength, confidence, top_supporting_evidence,
|
|
top_opposing_evidence, dominant_catalysts, material_risks,
|
|
contradiction_score, market_context, generated_at
|
|
FROM trend_windows
|
|
WHERE {where}
|
|
ORDER BY generated_at DESC
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
for jsonb_field in (
|
|
"top_supporting_evidence", "top_opposing_evidence",
|
|
"dominant_catalysts", "material_risks", "market_context",
|
|
):
|
|
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
|
|
results.append(d)
|
|
|
|
# Include projection data for each trend (Requirement 12.10)
|
|
if results:
|
|
trend_ids = [r["id"] for r in rows]
|
|
proj_rows = await pool.fetch(
|
|
"""SELECT DISTINCT ON (trend_window_id)
|
|
trend_window_id, projected_direction, projected_strength,
|
|
projected_confidence, projection_horizon,
|
|
macro_contribution_pct, diverges_from_current
|
|
FROM trend_projections
|
|
WHERE trend_window_id = ANY($1::uuid[])
|
|
ORDER BY trend_window_id, computed_at DESC""",
|
|
trend_ids,
|
|
)
|
|
proj_map = {str(p["trend_window_id"]): _row_to_dict(p) for p in proj_rows}
|
|
for d in results:
|
|
d["projection"] = proj_map.get(d["id"])
|
|
|
|
return results
|
|
|
|
|
|
@app.get("/api/trends/{trend_id}")
|
|
async def get_trend(trend_id: str):
|
|
"""Get a single trend summary by ID."""
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, entity_type, entity_id, "window", trend_direction,
|
|
trend_strength, confidence, top_supporting_evidence,
|
|
top_opposing_evidence, dominant_catalysts, material_risks,
|
|
contradiction_score, market_context, generated_at, created_at
|
|
FROM trend_windows WHERE id = $1""",
|
|
trend_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Trend not found")
|
|
|
|
d = _row_to_dict(row)
|
|
for jsonb_field in (
|
|
"top_supporting_evidence", "top_opposing_evidence",
|
|
"dominant_catalysts", "material_risks", "market_context",
|
|
):
|
|
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
|
|
return d
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Recommendations (Requirement 11.1, 11.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/recommendations")
|
|
async def list_recommendations(
|
|
ticker: Optional[str] = None,
|
|
action: Optional[str] = None,
|
|
mode: Optional[str] = None,
|
|
since: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""List recommendations with optional filters."""
|
|
conditions: list[str] = []
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if ticker:
|
|
conditions.append(f"r.ticker = ${idx}")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
if action:
|
|
conditions.append(f"r.action = ${idx}")
|
|
params.append(action)
|
|
idx += 1
|
|
if mode:
|
|
conditions.append(f"r.mode = ${idx}")
|
|
params.append(mode)
|
|
idx += 1
|
|
if since:
|
|
conditions.append(f"r.generated_at >= ${idx}::timestamptz")
|
|
params.append(since)
|
|
idx += 1
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence,
|
|
r.time_horizon, r.thesis, r.invalidation_conditions,
|
|
r.portfolio_pct, r.max_loss_pct, r.model_version,
|
|
r.risk_classification, r.generated_at
|
|
FROM recommendations r
|
|
{where}
|
|
ORDER BY r.generated_at DESC
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
d["invalidation_conditions"] = _parse_jsonb(d.get("invalidation_conditions"))
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
@app.get("/api/recommendations/{recommendation_id}")
|
|
async def get_recommendation(recommendation_id: str):
|
|
"""Get a single recommendation with evidence and risk evaluation.
|
|
|
|
Requirement 11.2: display contributing intelligence objects, raw sources,
|
|
and market context that influenced the decision.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT r.id, r.ticker, r.company_id, r.action, r.mode, r.confidence,
|
|
r.time_horizon, r.thesis, r.invalidation_conditions,
|
|
r.portfolio_pct, r.max_loss_pct, r.model_version,
|
|
r.model_provider, r.prompt_version, r.schema_version,
|
|
r.risk_classification, r.generated_at, r.created_at
|
|
FROM recommendations r WHERE r.id = $1""",
|
|
recommendation_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Recommendation not found")
|
|
|
|
result = _row_to_dict(row)
|
|
result["invalidation_conditions"] = _parse_jsonb(result.get("invalidation_conditions"))
|
|
|
|
# Evidence: linked documents and intelligence objects
|
|
evidence_rows = await pool.fetch(
|
|
"""SELECT re.id, re.document_id, re.intelligence_id, re.evidence_type, re.weight,
|
|
d.title, d.document_type, d.source_type, d.publisher, d.url,
|
|
d.published_at
|
|
FROM recommendation_evidence re
|
|
LEFT JOIN documents d ON d.id = re.document_id
|
|
WHERE re.recommendation_id = $1
|
|
ORDER BY re.weight DESC""",
|
|
recommendation_id,
|
|
)
|
|
result["evidence"] = [_row_to_dict(e) for e in evidence_rows]
|
|
|
|
# Risk evaluation
|
|
risk_row = await pool.fetchrow(
|
|
"""SELECT id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
|
|
FROM risk_evaluations WHERE recommendation_id = $1
|
|
ORDER BY evaluated_at DESC LIMIT 1""",
|
|
recommendation_id,
|
|
)
|
|
if risk_row:
|
|
risk_dict = _row_to_dict(risk_row)
|
|
risk_dict["rejection_reasons"] = _parse_jsonb(risk_dict.get("rejection_reasons"))
|
|
risk_dict["risk_checks"] = _parse_jsonb(risk_dict.get("risk_checks"))
|
|
result["risk_evaluation"] = risk_dict
|
|
else:
|
|
result["risk_evaluation"] = None
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Evidence Drill-Down (Requirement 11.2, 10.4)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/recommendations/{recommendation_id}/evidence")
|
|
async def get_recommendation_evidence_drilldown(recommendation_id: str):
|
|
"""Full evidence drill-down linking a recommendation to source documents and raw artifacts.
|
|
|
|
Returns the complete provenance chain for each piece of evidence:
|
|
recommendation_evidence → document (with storage refs) → document_intelligence
|
|
→ document_impact_records, plus the trend window that fed the recommendation.
|
|
|
|
Requirements: 11.2, 10.4
|
|
Design: Section 9.1 (evidence drill-down and audit views)
|
|
"""
|
|
# Verify recommendation exists and get basic info
|
|
rec_row = await pool.fetchrow(
|
|
"""SELECT id, ticker, company_id, action, mode, confidence,
|
|
time_horizon, thesis, model_version, model_provider,
|
|
prompt_version, schema_version, generated_at
|
|
FROM recommendations WHERE id = $1""",
|
|
recommendation_id,
|
|
)
|
|
if not rec_row:
|
|
raise HTTPException(404, "Recommendation not found")
|
|
|
|
result: dict[str, Any] = {
|
|
"recommendation": _row_to_dict(rec_row),
|
|
"evidence": [],
|
|
"trend_window": None,
|
|
}
|
|
|
|
# Fetch evidence rows with full document details including storage refs
|
|
evidence_rows = await pool.fetch(
|
|
"""SELECT re.id AS evidence_id,
|
|
re.document_id,
|
|
re.intelligence_id,
|
|
re.evidence_type,
|
|
re.weight,
|
|
d.document_type,
|
|
d.source_type,
|
|
d.publisher,
|
|
d.url,
|
|
d.canonical_url,
|
|
d.title,
|
|
d.published_at,
|
|
d.retrieved_at,
|
|
d.language,
|
|
d.content_hash,
|
|
d.raw_storage_ref,
|
|
d.normalized_storage_ref,
|
|
d.parse_quality_score,
|
|
d.parse_confidence,
|
|
d.status AS document_status
|
|
FROM recommendation_evidence re
|
|
LEFT JOIN documents d ON d.id = re.document_id
|
|
WHERE re.recommendation_id = $1
|
|
ORDER BY re.weight DESC""",
|
|
recommendation_id,
|
|
)
|
|
|
|
for ev in evidence_rows:
|
|
ev_dict = _row_to_dict(ev)
|
|
ev_dict["intelligence"] = None
|
|
ev_dict["company_impacts"] = []
|
|
|
|
# Fetch intelligence extraction for this evidence
|
|
intel_id = ev["intelligence_id"]
|
|
doc_id = ev["document_id"]
|
|
|
|
# Use the linked intelligence_id if available, otherwise look up by document_id
|
|
intel_row = None
|
|
if intel_id:
|
|
intel_row = await pool.fetchrow(
|
|
"""SELECT id, document_id, summary, macro_themes, novelty_score,
|
|
source_credibility, extraction_warnings, confidence,
|
|
model_provider, model_name, prompt_version, schema_version,
|
|
raw_output_ref, prompt_ref, validation_status,
|
|
validation_errors, created_at
|
|
FROM document_intelligence WHERE id = $1""",
|
|
intel_id,
|
|
)
|
|
elif doc_id:
|
|
intel_row = await pool.fetchrow(
|
|
"""SELECT id, document_id, summary, macro_themes, novelty_score,
|
|
source_credibility, extraction_warnings, confidence,
|
|
model_provider, model_name, prompt_version, schema_version,
|
|
raw_output_ref, prompt_ref, validation_status,
|
|
validation_errors, created_at
|
|
FROM document_intelligence WHERE document_id = $1
|
|
ORDER BY created_at DESC LIMIT 1""",
|
|
doc_id,
|
|
)
|
|
|
|
if intel_row:
|
|
intel_dict = _row_to_dict(intel_row)
|
|
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
|
|
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
|
|
ev_dict["intelligence"] = intel_dict
|
|
|
|
# Fetch per-company impact records for this intelligence
|
|
impacts = await pool.fetch(
|
|
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
|
|
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
|
|
dir.key_facts, dir.risks, dir.evidence_spans,
|
|
c.legal_name
|
|
FROM document_impact_records dir
|
|
JOIN companies c ON c.id = dir.company_id
|
|
WHERE dir.intelligence_id = $1""",
|
|
intel_row["id"],
|
|
)
|
|
impact_list = []
|
|
for imp in impacts:
|
|
imp_dict = _row_to_dict(imp)
|
|
for jf in ("key_facts", "risks", "evidence_spans"):
|
|
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
|
|
impact_list.append(imp_dict)
|
|
ev_dict["company_impacts"] = impact_list
|
|
|
|
result["evidence"].append(ev_dict)
|
|
|
|
# Fetch the most recent trend window for this ticker to show market context
|
|
ticker = rec_row["ticker"]
|
|
generated_at = rec_row["generated_at"]
|
|
if ticker and generated_at:
|
|
trend_row = await pool.fetchrow(
|
|
"""SELECT id, entity_type, entity_id, "window", trend_direction,
|
|
trend_strength, confidence, top_supporting_evidence,
|
|
top_opposing_evidence, dominant_catalysts, material_risks,
|
|
contradiction_score, market_context, generated_at
|
|
FROM trend_windows
|
|
WHERE entity_id = $1 AND entity_type = 'company'
|
|
AND generated_at <= $2
|
|
ORDER BY generated_at DESC LIMIT 1""",
|
|
ticker, generated_at,
|
|
)
|
|
if trend_row:
|
|
trend_dict = _row_to_dict(trend_row)
|
|
for jf in (
|
|
"top_supporting_evidence", "top_opposing_evidence",
|
|
"dominant_catalysts", "material_risks", "market_context",
|
|
):
|
|
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
|
|
|
|
# Include trend evidence linkage: documents that contributed to this trend
|
|
trend_ev_rows = await pool.fetch(
|
|
"""SELECT te.id, te.document_id, te.evidence_type, te.rank_score,
|
|
te.weight_component, te.impact_component,
|
|
te.recency_component, te.confidence_component,
|
|
te.sentiment_value,
|
|
d.title, d.document_type, d.source_type, d.publisher,
|
|
d.url, d.published_at, d.raw_storage_ref,
|
|
d.normalized_storage_ref
|
|
FROM trend_evidence te
|
|
LEFT JOIN documents d ON d.id = te.document_id
|
|
WHERE te.trend_window_id = $1
|
|
ORDER BY te.rank_score DESC""",
|
|
trend_row["id"],
|
|
)
|
|
trend_dict["evidence"] = [_row_to_dict(te) for te in trend_ev_rows]
|
|
|
|
result["trend_window"] = trend_dict
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trend Evidence Drill-Down (Requirement 10.4)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/trends/{trend_id}/evidence")
|
|
async def get_trend_evidence_drilldown(trend_id: str):
|
|
"""Drill down from a trend window to its contributing documents and raw artifacts.
|
|
|
|
Returns the trend summary plus each contributing document with storage refs,
|
|
intelligence extraction, and impact records — full provenance chain.
|
|
|
|
Requirements: 10.4, 6.5
|
|
"""
|
|
trend_row = await pool.fetchrow(
|
|
"""SELECT id, entity_type, entity_id, "window", trend_direction,
|
|
trend_strength, confidence, top_supporting_evidence,
|
|
top_opposing_evidence, dominant_catalysts, material_risks,
|
|
contradiction_score, market_context, generated_at
|
|
FROM trend_windows WHERE id = $1""",
|
|
trend_id,
|
|
)
|
|
if not trend_row:
|
|
raise HTTPException(404, "Trend not found")
|
|
|
|
trend_dict = _row_to_dict(trend_row)
|
|
for jf in (
|
|
"top_supporting_evidence", "top_opposing_evidence",
|
|
"dominant_catalysts", "material_risks", "market_context",
|
|
):
|
|
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
|
|
|
|
# Fetch trend evidence with full document details
|
|
evidence_rows = await pool.fetch(
|
|
"""SELECT te.id AS evidence_id,
|
|
te.document_id,
|
|
te.evidence_type,
|
|
te.rank_score,
|
|
te.weight_component,
|
|
te.impact_component,
|
|
te.recency_component,
|
|
te.confidence_component,
|
|
te.sentiment_value,
|
|
d.document_type,
|
|
d.source_type,
|
|
d.publisher,
|
|
d.url,
|
|
d.canonical_url,
|
|
d.title,
|
|
d.published_at,
|
|
d.retrieved_at,
|
|
d.content_hash,
|
|
d.raw_storage_ref,
|
|
d.normalized_storage_ref,
|
|
d.parse_quality_score,
|
|
d.parse_confidence,
|
|
d.status AS document_status
|
|
FROM trend_evidence te
|
|
LEFT JOIN documents d ON d.id = te.document_id
|
|
WHERE te.trend_window_id = $1
|
|
ORDER BY te.rank_score DESC""",
|
|
trend_id,
|
|
)
|
|
|
|
evidence_list = []
|
|
for ev in evidence_rows:
|
|
ev_dict = _row_to_dict(ev)
|
|
ev_dict["intelligence"] = None
|
|
ev_dict["company_impacts"] = []
|
|
|
|
doc_id = ev["document_id"]
|
|
if doc_id:
|
|
intel_row = await pool.fetchrow(
|
|
"""SELECT id, document_id, summary, macro_themes, novelty_score,
|
|
source_credibility, extraction_warnings, confidence,
|
|
model_provider, model_name, prompt_version, schema_version,
|
|
raw_output_ref, prompt_ref, validation_status,
|
|
validation_errors, created_at
|
|
FROM document_intelligence WHERE document_id = $1
|
|
ORDER BY created_at DESC LIMIT 1""",
|
|
doc_id,
|
|
)
|
|
if intel_row:
|
|
intel_dict = _row_to_dict(intel_row)
|
|
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
|
|
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
|
|
ev_dict["intelligence"] = intel_dict
|
|
|
|
impacts = await pool.fetch(
|
|
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
|
|
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
|
|
dir.key_facts, dir.risks, dir.evidence_spans,
|
|
c.legal_name
|
|
FROM document_impact_records dir
|
|
JOIN companies c ON c.id = dir.company_id
|
|
WHERE dir.intelligence_id = $1""",
|
|
intel_row["id"],
|
|
)
|
|
for imp in impacts:
|
|
imp_dict = _row_to_dict(imp)
|
|
for jf in ("key_facts", "risks", "evidence_spans"):
|
|
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
|
|
ev_dict["company_impacts"].append(imp_dict)
|
|
|
|
evidence_list.append(ev_dict)
|
|
|
|
return {
|
|
"trend": trend_dict,
|
|
"evidence": evidence_list,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Order History (Requirement 11.1, 11.3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/orders")
|
|
async def list_orders(
|
|
ticker: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
side: Optional[str] = None,
|
|
since: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""List orders with optional filters."""
|
|
conditions: list[str] = []
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if ticker:
|
|
conditions.append(f"o.ticker = ${idx}")
|
|
params.append(ticker.upper())
|
|
idx += 1
|
|
if status:
|
|
conditions.append(f"o.status = ${idx}")
|
|
params.append(status)
|
|
idx += 1
|
|
if side:
|
|
conditions.append(f"o.side = ${idx}")
|
|
params.append(side)
|
|
idx += 1
|
|
if since:
|
|
conditions.append(f"o.created_at >= ${idx}::timestamptz")
|
|
params.append(since)
|
|
idx += 1
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
|
|
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
|
|
o.status, o.broker_order_id, o.submitted_at, o.acknowledged_at,
|
|
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
|
|
o.fill_price, o.fill_quantity, o.created_at
|
|
FROM orders o
|
|
{where}
|
|
ORDER BY o.created_at DESC
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/orders/{order_id}")
|
|
async def get_order(order_id: str):
|
|
"""Get a single order with its events, decision trace, and full audit trail.
|
|
|
|
Requirement 11.3: expose full audit trail from ingestion through broker
|
|
execution and eventual market outcome.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
|
|
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
|
|
o.status, o.idempotency_key, o.broker_order_id,
|
|
o.decision_trace, o.submitted_at, o.acknowledged_at,
|
|
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
|
|
o.fill_price, o.fill_quantity, o.created_at, o.updated_at
|
|
FROM orders o WHERE o.id = $1""",
|
|
order_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Order not found")
|
|
|
|
result = _row_to_dict(row)
|
|
result["decision_trace"] = _parse_jsonb(result.get("decision_trace"))
|
|
|
|
# Order events
|
|
events = await pool.fetch(
|
|
"""SELECT id, event_type, data, broker_timestamp, created_at
|
|
FROM order_events WHERE order_id = $1 ORDER BY created_at ASC""",
|
|
order_id,
|
|
)
|
|
result["events"] = []
|
|
for ev in events:
|
|
ev_dict = _row_to_dict(ev)
|
|
ev_dict["data"] = _parse_jsonb(ev_dict.get("data"))
|
|
result["events"].append(ev_dict)
|
|
|
|
# Full audit trail (Requirement 11.3)
|
|
recommendation_id = str(row["recommendation_id"]) if row["recommendation_id"] else None
|
|
result["audit_trail"] = await get_order_audit_trail(pool, order_id, recommendation_id)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Positions (Requirement 11.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/positions")
|
|
async def list_positions(
|
|
ticker: Optional[str] = None,
|
|
):
|
|
"""List current positions."""
|
|
if ticker:
|
|
rows = await pool.fetch(
|
|
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
|
|
p.avg_entry_price, p.current_price,
|
|
p.unrealized_pnl, p.realized_pnl, p.updated_at
|
|
FROM positions p WHERE p.ticker = $1 ORDER BY p.ticker""",
|
|
ticker.upper(),
|
|
)
|
|
else:
|
|
rows = await pool.fetch(
|
|
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
|
|
p.avg_entry_price, p.current_price,
|
|
p.unrealized_pnl, p.realized_pnl, p.updated_at
|
|
FROM positions p ORDER BY p.ticker""",
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Audit Trail (Requirement 11.3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/audit/{entity_type}/{entity_id}")
|
|
async def get_audit_trail(entity_type: str, entity_id: str):
|
|
"""Get audit events for any entity type and ID."""
|
|
events = await get_entity_audit_trail(pool, entity_type, entity_id)
|
|
if not events:
|
|
raise HTTPException(404, "No audit events found")
|
|
return events
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: Source Health (Requirement 11.1 - source health)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/admin/sources/health")
|
|
async def get_source_health(
|
|
source_type: Optional[str] = None,
|
|
company_id: Optional[str] = None,
|
|
active_only: bool = True,
|
|
):
|
|
"""Source health overview: each source with its latest ingestion status and failure counts.
|
|
|
|
Design: Section 9.1 (source health and job state)
|
|
"""
|
|
conditions = []
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if active_only:
|
|
conditions.append(f"s.active = ${idx}")
|
|
params.append(True)
|
|
idx += 1
|
|
if source_type:
|
|
conditions.append(f"s.source_type = ${idx}")
|
|
params.append(source_type)
|
|
idx += 1
|
|
if company_id:
|
|
conditions.append(f"s.company_id = ${idx}")
|
|
params.append(company_id)
|
|
idx += 1
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT s.id AS source_id, s.source_type, s.source_name,
|
|
s.credibility_score, s.active,
|
|
c.ticker, c.legal_name, c.id AS company_id,
|
|
latest.status AS last_run_status,
|
|
latest.started_at AS last_run_at,
|
|
latest.error_message AS last_error,
|
|
latest.items_fetched AS last_items_fetched,
|
|
latest.items_new AS last_items_new,
|
|
COALESCE(stats.total_runs, 0) AS total_runs_24h,
|
|
COALESCE(stats.failed_runs, 0) AS failed_runs_24h,
|
|
COALESCE(stats.total_items, 0) AS total_items_24h
|
|
FROM sources s
|
|
JOIN companies c ON c.id = s.company_id
|
|
LEFT JOIN LATERAL (
|
|
SELECT ir.status, ir.started_at, ir.error_message,
|
|
ir.items_fetched, ir.items_new
|
|
FROM ingestion_runs ir
|
|
WHERE ir.source_id = s.id
|
|
ORDER BY ir.started_at DESC
|
|
LIMIT 1
|
|
) latest ON TRUE
|
|
LEFT JOIN LATERAL (
|
|
SELECT COUNT(*) AS total_runs,
|
|
COUNT(*) FILTER (WHERE ir2.status = 'failed') AS failed_runs,
|
|
COALESCE(SUM(ir2.items_fetched), 0) AS total_items
|
|
FROM ingestion_runs ir2
|
|
WHERE ir2.source_id = s.id
|
|
AND ir2.started_at >= NOW() - INTERVAL '24 hours'
|
|
) stats ON TRUE
|
|
{where}
|
|
ORDER BY c.ticker, s.source_type""",
|
|
*params,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/admin/sources/{source_id}/runs")
|
|
async def get_source_runs(
|
|
source_id: str,
|
|
limit: int = Query(default=20, le=100),
|
|
offset: int = 0,
|
|
):
|
|
"""Recent ingestion runs for a specific source."""
|
|
rows = await pool.fetch(
|
|
"""SELECT id, source_id, company_id, source_type, status,
|
|
started_at, completed_at, items_fetched, items_new,
|
|
error_message, retry_count, next_retry_at
|
|
FROM ingestion_runs
|
|
WHERE source_id = $1
|
|
ORDER BY started_at DESC
|
|
LIMIT $2 OFFSET $3""",
|
|
source_id, limit, offset,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.put("/api/admin/sources/{source_id}/toggle")
|
|
async def toggle_source(source_id: str, active: bool = True):
|
|
"""Enable or disable a source."""
|
|
row = await pool.fetchrow(
|
|
"""UPDATE sources SET active = $2, updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING id, source_type, source_name, active""",
|
|
source_id, active,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Source not found")
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.put("/api/admin/sources/{source_id}/credibility")
|
|
async def update_source_credibility(source_id: str, credibility_score: float = Query(ge=0.0, le=1.0)):
|
|
"""Update a source's credibility score."""
|
|
row = await pool.fetchrow(
|
|
"""UPDATE sources SET credibility_score = $2, updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING id, source_type, source_name, credibility_score""",
|
|
source_id, credibility_score,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Source not found")
|
|
return _row_to_dict(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: Symbol Configs (Requirement 11.1 - symbol configs)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.put("/api/admin/companies/{company_id}/toggle")
|
|
async def toggle_company(company_id: str, active: bool = True):
|
|
"""Enable or disable a tracked company."""
|
|
row = await pool.fetchrow(
|
|
"""UPDATE companies SET active = $2, updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING id, ticker, legal_name, active""",
|
|
company_id, active,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Company not found")
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.put("/api/admin/companies/{company_id}/sector")
|
|
async def update_company_sector(
|
|
company_id: str,
|
|
sector: str = Query(...),
|
|
industry: Optional[str] = None,
|
|
):
|
|
"""Update a company's sector and industry classification."""
|
|
if industry is not None:
|
|
row = await pool.fetchrow(
|
|
"""UPDATE companies SET sector = $2, industry = $3, updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING id, ticker, legal_name, sector, industry""",
|
|
company_id, sector, industry,
|
|
)
|
|
else:
|
|
row = await pool.fetchrow(
|
|
"""UPDATE companies SET sector = $2, updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING id, ticker, legal_name, sector, industry""",
|
|
company_id, sector,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Company not found")
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.get("/api/admin/companies/coverage")
|
|
async def get_symbol_coverage():
|
|
"""Overview of source coverage per active company.
|
|
|
|
Shows how many active sources of each type are configured per symbol,
|
|
useful for identifying coverage gaps.
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""SELECT c.id AS company_id, c.ticker, c.legal_name, c.sector,
|
|
c.active,
|
|
COUNT(s.id) FILTER (WHERE s.active) AS active_sources,
|
|
COUNT(s.id) FILTER (WHERE s.source_type = 'market_api' AND s.active) AS market_sources,
|
|
COUNT(s.id) FILTER (WHERE s.source_type = 'news_api' AND s.active) AS news_sources,
|
|
COUNT(s.id) FILTER (WHERE s.source_type = 'filings_api' AND s.active) AS filings_sources,
|
|
COUNT(s.id) FILTER (WHERE s.source_type = 'web_scrape' AND s.active) AS web_scrape_sources,
|
|
COUNT(s.id) FILTER (WHERE s.source_type = 'broker' AND s.active) AS broker_sources
|
|
FROM companies c
|
|
LEFT JOIN sources s ON s.company_id = c.id
|
|
WHERE c.active = TRUE
|
|
GROUP BY c.id, c.ticker, c.legal_name, c.sector, c.active
|
|
ORDER BY c.ticker""",
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: Trading Mode (Requirement 8.1, 8.2, 11.1)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/admin/trading/config")
|
|
async def get_trading_config():
|
|
"""Get the current active risk/trading configuration."""
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, name, trading_mode, config, active, created_at, updated_at
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1""",
|
|
)
|
|
if not row:
|
|
return {"trading_mode": "paper", "config": {}, "message": "No active config found, using defaults"}
|
|
|
|
result = _row_to_dict(row)
|
|
result["config"] = _parse_jsonb(result.get("config"))
|
|
return result
|
|
|
|
|
|
@app.put("/api/admin/trading/mode")
|
|
async def set_trading_mode(mode: str = Query(..., pattern="^(paper|live|disabled)$")):
|
|
"""Switch the active trading mode.
|
|
|
|
Requirement 8.1: support paper and live as separate execution environments.
|
|
Requirement 8.2: live mode requires operator approval controls.
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""UPDATE risk_configs SET trading_mode = $1, updated_at = NOW()
|
|
WHERE active = TRUE
|
|
RETURNING id, name, trading_mode""",
|
|
mode,
|
|
)
|
|
if not row:
|
|
# No active config exists yet — create one with the requested mode
|
|
row = await pool.fetchrow(
|
|
"""INSERT INTO risk_configs (name, trading_mode, config, active)
|
|
VALUES ('default', $1, '{}', TRUE)
|
|
RETURNING id, name, trading_mode""",
|
|
mode,
|
|
)
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.put("/api/admin/trading/config")
|
|
async def update_trading_config(config: dict[str, Any]):
|
|
"""Update the active risk configuration JSON.
|
|
|
|
Accepts a partial or full risk config object. The config is stored
|
|
as JSONB alongside the trading_mode in risk_configs.
|
|
"""
|
|
config_json = json.dumps(config)
|
|
|
|
row = await pool.fetchrow(
|
|
"""UPDATE risk_configs SET config = $1::jsonb, updated_at = NOW()
|
|
WHERE active = TRUE
|
|
RETURNING id, name, trading_mode, config""",
|
|
config_json,
|
|
)
|
|
if not row:
|
|
row = await pool.fetchrow(
|
|
"""INSERT INTO risk_configs (name, trading_mode, config, active)
|
|
VALUES ('default', 'paper', $1::jsonb, TRUE)
|
|
RETURNING id, name, trading_mode, config""",
|
|
config_json,
|
|
)
|
|
result = _row_to_dict(row)
|
|
result["config"] = _parse_jsonb(result.get("config"))
|
|
return result
|
|
|
|
|
|
@app.get("/api/admin/trading/approvals")
|
|
async def list_pending_approvals():
|
|
"""List pending operator approval requests for live trading orders."""
|
|
rows = await pool.fetch(
|
|
"""SELECT id, order_job, recommendation_id, ticker, side, quantity,
|
|
estimated_value, status, risk_evaluation_id, requested_by,
|
|
reviewed_by, review_note, expires_at, requested_at, reviewed_at
|
|
FROM operator_approvals
|
|
WHERE status = 'pending'
|
|
ORDER BY requested_at ASC""",
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
d["order_job"] = _parse_jsonb(d.get("order_job"))
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
@app.put("/api/admin/trading/approvals/{approval_id}")
|
|
async def review_approval_request(
|
|
approval_id: str,
|
|
approved: bool = Query(...),
|
|
reviewed_by: str = "operator",
|
|
review_note: str = "",
|
|
):
|
|
"""Approve or reject a pending operator approval request.
|
|
|
|
Requirement 8.2: live orders require operator approval controls.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
new_status = "approved" if approved else "rejected"
|
|
|
|
row = await pool.fetchrow(
|
|
"""UPDATE operator_approvals
|
|
SET status = $2, reviewed_by = $3, review_note = $4,
|
|
reviewed_at = $5, updated_at = NOW()
|
|
WHERE id = $1::uuid AND status = 'pending'
|
|
RETURNING id, ticker, status, reviewed_by""",
|
|
approval_id, new_status, reviewed_by, review_note, now,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Approval not found or no longer pending")
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.get("/api/admin/trading/lockouts")
|
|
async def list_active_lockouts():
|
|
"""List active symbol lockouts (news-shock, cooldown)."""
|
|
rows = await pool.fetch(
|
|
"""SELECT id, ticker, lockout_type, reason, expires_at, created_at
|
|
FROM symbol_lockouts
|
|
WHERE expires_at > NOW()
|
|
ORDER BY expires_at ASC""",
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Operational Dashboard (Requirement 12.1, 12.2, 12.3)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/ops/ingestion/throughput")
|
|
async def get_ingestion_throughput(
|
|
hours: int = Query(default=24, ge=1, le=168),
|
|
bucket: str = Query(default="1h", pattern="^(15m|1h|6h|1d)$"),
|
|
):
|
|
"""Ingestion throughput over time, bucketed by interval.
|
|
|
|
Returns document counts and item counts per time bucket, broken down
|
|
by source type. Powers the ingestion throughput chart.
|
|
|
|
Requirements: 12.1, 12.3
|
|
"""
|
|
bucket_interval = {
|
|
"15m": "15 minutes",
|
|
"1h": "1 hour",
|
|
"6h": "6 hours",
|
|
"1d": "1 day",
|
|
}[bucket]
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT
|
|
date_trunc('hour', ir.started_at)
|
|
- (EXTRACT(minute FROM ir.started_at)::int
|
|
% EXTRACT(epoch FROM INTERVAL '{bucket_interval}')::int / 60)
|
|
* INTERVAL '1 minute' AS bucket_start,
|
|
ir.source_type,
|
|
COUNT(*) AS run_count,
|
|
COUNT(*) FILTER (WHERE ir.status = 'completed') AS completed,
|
|
COUNT(*) FILTER (WHERE ir.status = 'failed') AS failed,
|
|
COALESCE(SUM(ir.items_fetched), 0) AS items_fetched,
|
|
COALESCE(SUM(ir.items_new), 0) AS items_new
|
|
FROM ingestion_runs ir
|
|
WHERE ir.started_at >= NOW() - INTERVAL '1 hour' * $1
|
|
GROUP BY bucket_start, ir.source_type
|
|
ORDER BY bucket_start DESC, ir.source_type""",
|
|
hours,
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.get("/api/ops/ingestion/summary")
|
|
async def get_ingestion_summary(
|
|
hours: int = Query(default=24, ge=1, le=168),
|
|
):
|
|
"""High-level ingestion summary for the operational dashboard.
|
|
|
|
Returns total runs, success/failure counts, items processed, and
|
|
per-source-type breakdown for the given time window.
|
|
|
|
Requirements: 12.1
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) AS total_runs,
|
|
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
|
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
|
|
COUNT(*) FILTER (WHERE status = 'running') AS running,
|
|
COALESCE(SUM(items_fetched), 0) AS total_items_fetched,
|
|
COALESCE(SUM(items_new), 0) AS total_items_new,
|
|
COUNT(DISTINCT source_id) AS active_sources,
|
|
COUNT(DISTINCT company_id) AS active_companies
|
|
FROM ingestion_runs
|
|
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1""",
|
|
hours,
|
|
)
|
|
|
|
by_type = await pool.fetch(
|
|
"""SELECT
|
|
source_type,
|
|
COUNT(*) AS runs,
|
|
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
|
|
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
|
|
COALESCE(SUM(items_fetched), 0) AS items_fetched,
|
|
COALESCE(SUM(items_new), 0) AS items_new
|
|
FROM ingestion_runs
|
|
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1
|
|
GROUP BY source_type
|
|
ORDER BY runs DESC""",
|
|
hours,
|
|
)
|
|
|
|
result = _row_to_dict(row) if row else {}
|
|
result["by_source_type"] = [_row_to_dict(r) for r in by_type]
|
|
result["hours"] = hours
|
|
return result
|
|
|
|
|
|
@app.get("/api/ops/model/failures")
|
|
async def get_model_failures(
|
|
hours: int = Query(default=24, ge=1, le=168),
|
|
limit: int = Query(default=50, le=200),
|
|
):
|
|
"""Recent model extraction failures with error details.
|
|
|
|
Returns individual failed extraction attempts for debugging.
|
|
|
|
Requirements: 12.2
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""SELECT
|
|
mpm.id, mpm.document_id, mpm.ticker, mpm.model_name,
|
|
mpm.prompt_version, mpm.schema_version,
|
|
mpm.attempt_count, mpm.total_duration_ms,
|
|
mpm.validation_status, mpm.validation_error_count,
|
|
mpm.validation_errors, mpm.retry_count,
|
|
mpm.confidence, mpm.recorded_at,
|
|
d.title AS document_title, d.document_type, d.source_type
|
|
FROM model_performance_metrics mpm
|
|
LEFT JOIN documents d ON d.id = mpm.document_id
|
|
WHERE mpm.success = FALSE
|
|
AND mpm.recorded_at >= NOW() - INTERVAL '1 hour' * $1
|
|
ORDER BY mpm.recorded_at DESC
|
|
LIMIT $2""",
|
|
hours, limit,
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
d["validation_errors"] = _parse_jsonb(d.get("validation_errors"))
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
@app.get("/api/ops/model/performance")
|
|
async def get_model_performance(
|
|
hours: int = Query(default=24, ge=1, le=168),
|
|
model_name: Optional[str] = None,
|
|
):
|
|
"""Aggregated model performance metrics for the operational dashboard.
|
|
|
|
Returns success rate, latency percentiles, retry rate, confidence
|
|
distribution, and token usage for the given time window.
|
|
|
|
Requirements: 12.2
|
|
"""
|
|
return await get_model_performance_summary(
|
|
pool,
|
|
model_name=model_name,
|
|
hours=hours,
|
|
)
|
|
|
|
|
|
@app.get("/api/ops/pipeline/health")
|
|
async def get_pipeline_health(
|
|
hours: int = Query(default=24, ge=1, le=168),
|
|
):
|
|
"""Pipeline stage health summary across ingestion, parsing, extraction, and aggregation.
|
|
|
|
Shows document counts at each processing stage and identifies bottlenecks.
|
|
|
|
Requirements: 12.1
|
|
"""
|
|
# Document status distribution (pipeline stages)
|
|
doc_stages = await pool.fetch(
|
|
"""SELECT
|
|
status,
|
|
COUNT(*) AS doc_count
|
|
FROM documents
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
|
|
GROUP BY status
|
|
ORDER BY doc_count DESC""",
|
|
hours,
|
|
)
|
|
|
|
# Parsing quality distribution
|
|
parse_quality = await pool.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) AS total_parsed,
|
|
COUNT(*) FILTER (WHERE parse_confidence = 'high') AS high_confidence,
|
|
COUNT(*) FILTER (WHERE parse_confidence = 'medium') AS medium_confidence,
|
|
COUNT(*) FILTER (WHERE parse_confidence = 'low') AS low_confidence,
|
|
COUNT(*) FILTER (WHERE parse_confidence = 'unknown' OR parse_confidence IS NULL) AS unknown_confidence,
|
|
ROUND(AVG(parse_quality_score)::numeric, 3) AS avg_quality_score
|
|
FROM documents
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
|
|
AND status IN ('parsed', 'extracted', 'aggregated')""",
|
|
hours,
|
|
)
|
|
|
|
# Extraction validation distribution
|
|
extraction_stats = await pool.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) AS total_extractions,
|
|
COUNT(*) FILTER (WHERE validation_status = 'valid') AS valid,
|
|
COUNT(*) FILTER (WHERE validation_status = 'failed') AS failed,
|
|
COUNT(*) FILTER (WHERE validation_status = 'pending') AS pending,
|
|
ROUND(AVG(confidence)::numeric, 3) AS avg_confidence,
|
|
ROUND(AVG(retry_count)::numeric, 2) AS avg_retries
|
|
FROM document_intelligence
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
|
|
hours,
|
|
)
|
|
|
|
# Aggregation output (trend windows generated)
|
|
trend_stats = await pool.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) AS trends_generated,
|
|
COUNT(DISTINCT entity_id) AS symbols_covered,
|
|
ROUND(AVG(confidence)::numeric, 3) AS avg_trend_confidence,
|
|
ROUND(AVG(contradiction_score)::numeric, 3) AS avg_contradiction
|
|
FROM trend_windows
|
|
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
|
|
hours,
|
|
)
|
|
|
|
return {
|
|
"hours": hours,
|
|
"document_stages": [_row_to_dict(r) for r in doc_stages],
|
|
"parsing": _row_to_dict(parse_quality) if parse_quality else {},
|
|
"extraction": _row_to_dict(extraction_stats) if extraction_stats else {},
|
|
"aggregation": _row_to_dict(trend_stats) if trend_stats else {},
|
|
}
|
|
|
|
|
|
@app.get("/api/ops/sources/coverage-gaps")
|
|
async def get_source_coverage_gaps():
|
|
"""Identify symbols with missing or insufficient source coverage.
|
|
|
|
Returns companies that lack one or more expected source types
|
|
(market_api, news_api, filings_api), or have sources that haven't
|
|
produced successful ingestion runs recently.
|
|
|
|
Requirements: 12.3
|
|
"""
|
|
# Companies missing expected source types
|
|
missing_types = await pool.fetch(
|
|
"""SELECT
|
|
c.id AS company_id, c.ticker, c.legal_name, c.sector,
|
|
ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) AS active_types,
|
|
ARRAY['market_api', 'news_api', 'filings_api'] AS expected_types
|
|
FROM companies c
|
|
LEFT JOIN sources s ON s.company_id = c.id AND s.active = TRUE
|
|
WHERE c.active = TRUE
|
|
GROUP BY c.id, c.ticker, c.legal_name, c.sector
|
|
HAVING NOT ARRAY['market_api', 'news_api', 'filings_api']::text[] <@ ARRAY_AGG(DISTINCT s.source_type::text) FILTER (WHERE s.active)
|
|
OR ARRAY_AGG(DISTINCT s.source_type::text) FILTER (WHERE s.active) IS NULL
|
|
ORDER BY c.ticker""",
|
|
)
|
|
|
|
# Sources with no successful runs in the last 24 hours
|
|
stale_sources = await pool.fetch(
|
|
"""SELECT
|
|
s.id AS source_id, s.source_type, s.source_name,
|
|
c.ticker, c.legal_name,
|
|
MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') AS last_success,
|
|
MAX(ir.started_at) AS last_attempt,
|
|
COUNT(*) FILTER (WHERE ir.status = 'failed'
|
|
AND ir.started_at >= NOW() - INTERVAL '24 hours') AS recent_failures
|
|
FROM sources s
|
|
JOIN companies c ON c.id = s.company_id
|
|
LEFT JOIN ingestion_runs ir ON ir.source_id = s.id
|
|
WHERE s.active = TRUE AND c.active = TRUE
|
|
GROUP BY s.id, s.source_type, s.source_name, c.ticker, c.legal_name
|
|
HAVING MAX(ir.started_at) FILTER (WHERE ir.status = 'completed')
|
|
< NOW() - INTERVAL '24 hours'
|
|
OR MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') IS NULL
|
|
ORDER BY c.ticker, s.source_type""",
|
|
)
|
|
|
|
return {
|
|
"missing_source_types": [_row_to_dict(r) for r in missing_types],
|
|
"stale_sources": [_row_to_dict(r) for r in stale_sources],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Analytics: Trino SQL Proxy (Requirement 10.1, 10.3, 13.7)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.post("/api/analytics/query")
|
|
async def analytics_query(body: dict[str, Any]):
|
|
"""Proxy SQL to Trino, enforce row limits, return structured results.
|
|
|
|
Design: Section 9.3 (API proxy for Trino)
|
|
Requirements: 10.1, 10.3, 13.7
|
|
"""
|
|
sql = body.get("sql", "").strip()
|
|
if not sql:
|
|
raise HTTPException(400, "sql is required")
|
|
|
|
limit = min(int(body.get("limit", 1000)), 10000)
|
|
|
|
trino_host = config.trino.host
|
|
trino_port = config.trino.port
|
|
trino_catalog = config.trino.catalog
|
|
trino_schema = config.trino.schema
|
|
|
|
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
|
|
headers = {
|
|
"X-Trino-User": "stonks-dashboard",
|
|
"X-Trino-Catalog": trino_catalog,
|
|
"X-Trino-Schema": trino_schema,
|
|
}
|
|
|
|
start = _time.monotonic()
|
|
try:
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
|
# Submit query
|
|
resp = await client.post(trino_url, content=sql, headers=headers)
|
|
if resp.status_code != 200:
|
|
raise HTTPException(502, f"Trino error: {resp.text[:500]}")
|
|
|
|
result = resp.json()
|
|
columns: list[dict[str, str]] = []
|
|
all_rows: list[list[Any]] = []
|
|
|
|
# Extract columns from first response
|
|
if "columns" in result:
|
|
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
|
|
|
|
if "data" in result:
|
|
all_rows.extend(result["data"])
|
|
|
|
# Follow nextUri to get all results
|
|
while "nextUri" in result and len(all_rows) < limit:
|
|
next_url = result["nextUri"]
|
|
resp = await client.get(next_url, headers=headers)
|
|
if resp.status_code != 200:
|
|
break
|
|
result = resp.json()
|
|
if "columns" in result and not columns:
|
|
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
|
|
if "data" in result:
|
|
all_rows.extend(result["data"])
|
|
|
|
elapsed_ms = round((_time.monotonic() - start) * 1000)
|
|
all_rows = all_rows[:limit]
|
|
|
|
return {
|
|
"columns": columns,
|
|
"rows": all_rows,
|
|
"row_count": len(all_rows),
|
|
"elapsed_ms": elapsed_ms,
|
|
}
|
|
|
|
except httpx.ConnectError:
|
|
raise HTTPException(502, "Cannot connect to Trino")
|
|
except httpx.TimeoutException:
|
|
raise HTTPException(504, "Trino query timed out")
|
|
|
|
|
|
@app.get("/api/analytics/schema")
|
|
async def analytics_schema():
|
|
"""Return Trino catalog/schema/table/column metadata for the schema browser.
|
|
|
|
Requirements: 13.7
|
|
"""
|
|
trino_host = config.trino.host
|
|
trino_port = config.trino.port
|
|
trino_catalog = config.trino.catalog
|
|
trino_schema = config.trino.schema
|
|
|
|
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
|
|
headers = {
|
|
"X-Trino-User": "stonks-dashboard",
|
|
"X-Trino-Catalog": trino_catalog,
|
|
"X-Trino-Schema": trino_schema,
|
|
}
|
|
|
|
async def _run_trino_query(sql: str) -> list[list[Any]]:
|
|
rows: list[list[Any]] = []
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(trino_url, content=sql, headers=headers)
|
|
if resp.status_code != 200:
|
|
return rows
|
|
result = resp.json()
|
|
if "data" in result:
|
|
rows.extend(result["data"])
|
|
while "nextUri" in result:
|
|
resp = await client.get(result["nextUri"], headers=headers)
|
|
if resp.status_code != 200:
|
|
break
|
|
result = resp.json()
|
|
if "data" in result:
|
|
rows.extend(result["data"])
|
|
return rows
|
|
|
|
try:
|
|
# Get tables
|
|
table_rows = await _run_trino_query(
|
|
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{trino_schema}' ORDER BY table_name"
|
|
)
|
|
tables = []
|
|
for tr in table_rows:
|
|
table_name = tr[0] if tr else None
|
|
if not table_name:
|
|
continue
|
|
# Get columns for each table
|
|
col_rows = await _run_trino_query(
|
|
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{trino_schema}' AND table_name = '{table_name}' ORDER BY ordinal_position"
|
|
)
|
|
columns = [{"name": cr[0], "type": cr[1]} for cr in col_rows if cr]
|
|
tables.append({"name": table_name, "columns": columns})
|
|
|
|
return {
|
|
"catalog": trino_catalog,
|
|
"schema": trino_schema,
|
|
"tables": tables,
|
|
}
|
|
except Exception:
|
|
return {"catalog": trino_catalog, "schema": trino_schema, "tables": []}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Analytics: PostgreSQL Direct Query (Schema browser + read-only SQL)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/api/analytics/pg-schema")
|
|
async def pg_schema():
|
|
"""Return PostgreSQL table/column metadata for the schema browser."""
|
|
rows = await pool.fetch("""
|
|
SELECT t.table_name, c.column_name, c.data_type, c.is_nullable
|
|
FROM information_schema.tables t
|
|
JOIN information_schema.columns c
|
|
ON t.table_name = c.table_name AND t.table_schema = c.table_schema
|
|
WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE'
|
|
ORDER BY t.table_name, c.ordinal_position
|
|
""")
|
|
tables: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
tname = row["table_name"]
|
|
if tname not in tables:
|
|
tables[tname] = {"name": tname, "columns": []}
|
|
tables[tname]["columns"].append({
|
|
"name": row["column_name"],
|
|
"type": row["data_type"],
|
|
"nullable": row["is_nullable"] == "YES",
|
|
})
|
|
return {"catalog": "postgresql", "schema": "public", "tables": list(tables.values())}
|
|
|
|
|
|
@app.post("/api/analytics/pg-query")
|
|
async def pg_query(body: dict[str, Any]):
|
|
"""Run read-only SQL against PostgreSQL directly."""
|
|
sql = body.get("sql", "").strip()
|
|
if not sql:
|
|
raise HTTPException(400, "sql is required")
|
|
|
|
limit = min(int(body.get("limit", 1000)), 10000)
|
|
|
|
# Safety: only allow SELECT statements
|
|
# Strip SQL comments (-- and /* */) and whitespace before checking
|
|
import re
|
|
stripped = re.sub(r'--[^\n]*', '', sql) # remove -- comments
|
|
stripped = re.sub(r'/\*.*?\*/', '', stripped, flags=re.DOTALL) # remove /* */ comments
|
|
stripped = stripped.strip()
|
|
if not stripped.upper().startswith("SELECT"):
|
|
raise HTTPException(400, "Only SELECT queries are allowed")
|
|
|
|
# Add LIMIT if not present
|
|
if "LIMIT" not in sql.upper():
|
|
sql = f"{sql} LIMIT {limit}"
|
|
|
|
start = _time.monotonic()
|
|
try:
|
|
rows = await pool.fetch(sql)
|
|
elapsed_ms = round((_time.monotonic() - start) * 1000)
|
|
columns = [{"name": k, "type": "text"} for k in rows[0].keys()] if rows else []
|
|
return {
|
|
"columns": columns,
|
|
"rows": [[str(v) for v in row.values()] for row in rows],
|
|
"row_count": len(rows),
|
|
"elapsed_ms": elapsed_ms,
|
|
}
|
|
except asyncpg.PostgresSyntaxError as exc:
|
|
raise HTTPException(400, f"SQL syntax error: {exc}")
|
|
except asyncpg.UndefinedTableError as exc:
|
|
raise HTTPException(400, f"Table not found: {exc}")
|
|
except asyncpg.PostgresError as exc:
|
|
raise HTTPException(400, f"Query error: {exc}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Analytics: Saved Queries (Requirement 13.7)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SavedQueryBody(BaseModel):
|
|
name: str
|
|
description: str = ""
|
|
sql_text: str
|
|
|
|
|
|
@app.get("/api/analytics/saved-queries")
|
|
async def list_saved_queries():
|
|
"""List all saved queries."""
|
|
rows = await pool.fetch(
|
|
"SELECT id, name, description, sql_text, created_by, created_at, updated_at FROM saved_queries ORDER BY updated_at DESC"
|
|
)
|
|
return [_row_to_dict(r) for r in rows]
|
|
|
|
|
|
@app.post("/api/analytics/saved-queries", status_code=201)
|
|
async def create_saved_query(body: SavedQueryBody):
|
|
"""Save a new query."""
|
|
row = await pool.fetchrow(
|
|
"""INSERT INTO saved_queries (name, description, sql_text)
|
|
VALUES ($1, $2, $3)
|
|
RETURNING id, name, description, sql_text, created_by, created_at""",
|
|
body.name, body.description, body.sql_text,
|
|
)
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@app.delete("/api/analytics/saved-queries/{query_id}")
|
|
async def delete_saved_query(query_id: str):
|
|
"""Delete a saved query."""
|
|
result = await pool.execute("DELETE FROM saved_queries WHERE id = $1::uuid", query_id)
|
|
if result == "DELETE 0":
|
|
raise HTTPException(404, "Query not found")
|
|
return {"status": "deleted"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: Macro Signal Layer Toggle (Requirement 11.1, 11.5, 11.7)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MacroToggleBody(BaseModel):
|
|
enabled: bool
|
|
operator: str = "operator"
|
|
|
|
|
|
@app.get("/api/admin/macro/status")
|
|
async def get_macro_status():
|
|
"""Return the current macro signal layer enabled/disabled state.
|
|
|
|
Reads from the active risk_configs row's JSONB config field.
|
|
Requirements: 11.1, 11.5
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT config->>'macro_enabled' AS macro_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1""",
|
|
)
|
|
if row is None or row["macro_enabled"] is None:
|
|
return {"macro_enabled": True, "source": "default"}
|
|
return {
|
|
"macro_enabled": row["macro_enabled"].lower() == "true",
|
|
"source": "risk_configs",
|
|
}
|
|
|
|
|
|
@app.put("/api/admin/macro/toggle")
|
|
async def toggle_macro_layer(body: MacroToggleBody):
|
|
"""Toggle the macro signal layer on or off.
|
|
|
|
Persists the new state into the active risk_configs row's JSONB config
|
|
and records an audit event with previous state, new state, and operator.
|
|
|
|
The toggle state is read from PostgreSQL at the start of each aggregation
|
|
cycle (no caching), so changes take effect on the next cycle.
|
|
|
|
Requirements: 11.1, 11.5, 11.7
|
|
"""
|
|
# Read current state
|
|
current_row = await pool.fetchrow(
|
|
"""SELECT id, config->>'macro_enabled' AS macro_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1""",
|
|
)
|
|
|
|
if current_row is None:
|
|
# No active config exists — create one
|
|
new_config = json.dumps({"macro_enabled": str(body.enabled).lower()})
|
|
current_row = await pool.fetchrow(
|
|
"""INSERT INTO risk_configs (name, trading_mode, config, active)
|
|
VALUES ('default', 'paper', $1::jsonb, TRUE)
|
|
RETURNING id, config->>'macro_enabled' AS macro_enabled""",
|
|
new_config,
|
|
)
|
|
previous_enabled = True # default was enabled
|
|
else:
|
|
prev_val = current_row["macro_enabled"]
|
|
previous_enabled = prev_val.lower() == "true" if prev_val else True
|
|
|
|
config_id = str(current_row["id"])
|
|
|
|
# Update the config JSONB to set macro_enabled
|
|
await pool.execute(
|
|
"""UPDATE risk_configs
|
|
SET config = config || $2::jsonb, updated_at = NOW()
|
|
WHERE id = $1""",
|
|
current_row["id"],
|
|
json.dumps({"macro_enabled": str(body.enabled).lower()}),
|
|
)
|
|
|
|
# Record audit event (Requirement 11.7)
|
|
await record_audit_event(
|
|
pool,
|
|
event_type="macro.layer_toggled",
|
|
entity_type="risk_config",
|
|
entity_id=config_id,
|
|
data={
|
|
"previous_enabled": previous_enabled,
|
|
"new_enabled": body.enabled,
|
|
},
|
|
actor=body.operator,
|
|
)
|
|
|
|
return {
|
|
"macro_enabled": body.enabled,
|
|
"previous_enabled": previous_enabled,
|
|
"toggled_by": body.operator,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Macro Events and Impacts (Requirement 8.1, 8.2, 12.10)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/macro/events")
|
|
async def list_macro_events(
|
|
severity: Optional[str] = None,
|
|
region: Optional[str] = None,
|
|
sector: Optional[str] = None,
|
|
since: Optional[str] = None,
|
|
until: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""List recent global events with filtering by severity, region, sector, date range.
|
|
|
|
Requirements: 8.1
|
|
"""
|
|
conditions: list[str] = []
|
|
params: list[Any] = []
|
|
idx = 1
|
|
|
|
if severity:
|
|
conditions.append(f"ge.severity = ${idx}")
|
|
params.append(severity)
|
|
idx += 1
|
|
if region:
|
|
conditions.append(f"${idx} = ANY(ge.affected_regions)")
|
|
params.append(region)
|
|
idx += 1
|
|
if sector:
|
|
conditions.append(f"${idx} = ANY(ge.affected_sectors)")
|
|
params.append(sector)
|
|
idx += 1
|
|
if since:
|
|
conditions.append(f"ge.created_at >= ${idx}::timestamptz")
|
|
params.append(since)
|
|
idx += 1
|
|
if until:
|
|
conditions.append(f"ge.created_at <= ${idx}::timestamptz")
|
|
params.append(until)
|
|
idx += 1
|
|
|
|
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT ge.id, ge.event_types, ge.severity, ge.affected_regions,
|
|
ge.affected_sectors, ge.affected_commodities, ge.summary,
|
|
ge.key_facts, ge.estimated_duration, ge.confidence,
|
|
ge.source_document_id, ge.created_at
|
|
FROM global_events ge
|
|
{where}
|
|
ORDER BY ge.created_at DESC
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
d["key_facts"] = _parse_jsonb(d.get("key_facts"))
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
@app.get("/api/macro/events/{event_id}")
|
|
async def get_macro_event(event_id: str):
|
|
"""Event detail with list of affected companies and their macro impact scores.
|
|
|
|
Requirements: 8.2
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, event_types, severity, affected_regions, affected_sectors,
|
|
affected_commodities, summary, key_facts, estimated_duration,
|
|
confidence, source_document_id, model_provider, model_name,
|
|
prompt_version, schema_version, created_at
|
|
FROM global_events WHERE id = $1""",
|
|
event_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(404, "Global event not found")
|
|
|
|
result = _row_to_dict(row)
|
|
result["key_facts"] = _parse_jsonb(result.get("key_facts"))
|
|
|
|
# Affected companies with macro impact scores
|
|
impacts = await pool.fetch(
|
|
"""SELECT mir.id, mir.company_id, mir.ticker, mir.macro_impact_score,
|
|
mir.impact_direction, mir.contributing_factors, mir.confidence,
|
|
mir.computed_at, c.legal_name, c.sector
|
|
FROM macro_impact_records mir
|
|
JOIN companies c ON c.id = mir.company_id
|
|
WHERE mir.event_id = $1
|
|
ORDER BY mir.macro_impact_score DESC""",
|
|
event_id,
|
|
)
|
|
impact_list = []
|
|
for imp in impacts:
|
|
imp_dict = _row_to_dict(imp)
|
|
imp_dict["contributing_factors"] = _parse_jsonb(imp_dict.get("contributing_factors"))
|
|
impact_list.append(imp_dict)
|
|
result["affected_companies"] = impact_list
|
|
|
|
return result
|
|
|
|
|
|
@app.get("/api/macro/impacts/{ticker}")
|
|
async def get_macro_impacts_for_ticker(
|
|
ticker: str,
|
|
since: Optional[str] = None,
|
|
limit: int = Query(default=50, le=200),
|
|
offset: int = 0,
|
|
):
|
|
"""Macro impacts for a specific company.
|
|
|
|
Requirements: 8.2
|
|
"""
|
|
conditions = ["mir.ticker = $1"]
|
|
params: list[Any] = [ticker.upper()]
|
|
idx = 2
|
|
|
|
if since:
|
|
conditions.append(f"mir.computed_at >= ${idx}::timestamptz")
|
|
params.append(since)
|
|
idx += 1
|
|
|
|
where = " AND ".join(conditions)
|
|
|
|
rows = await pool.fetch(
|
|
f"""SELECT mir.id, mir.event_id, mir.company_id, mir.ticker,
|
|
mir.macro_impact_score, mir.impact_direction,
|
|
mir.contributing_factors, mir.confidence, mir.computed_at,
|
|
ge.summary AS event_summary, ge.severity AS event_severity,
|
|
ge.event_types AS event_types, ge.affected_regions
|
|
FROM macro_impact_records mir
|
|
JOIN global_events ge ON ge.id = mir.event_id
|
|
WHERE {where}
|
|
ORDER BY mir.computed_at DESC
|
|
LIMIT ${idx} OFFSET ${idx + 1}""",
|
|
*params, limit, offset,
|
|
)
|
|
results = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
d["contributing_factors"] = _parse_jsonb(d.get("contributing_factors"))
|
|
results.append(d)
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trend Projections (Requirement 12.10)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/api/trends/{trend_id}/projection")
|
|
async def get_trend_projection(trend_id: str):
|
|
"""Trend projection for a specific trend window.
|
|
|
|
Requirements: 12.10
|
|
"""
|
|
# Verify trend exists
|
|
trend_row = await pool.fetchrow(
|
|
"SELECT id FROM trend_windows WHERE id = $1", trend_id,
|
|
)
|
|
if not trend_row:
|
|
raise HTTPException(404, "Trend not found")
|
|
|
|
row = await pool.fetchrow(
|
|
"""SELECT id, trend_window_id, projected_direction, projected_strength,
|
|
projected_confidence, projection_horizon, driving_factors,
|
|
macro_contribution_pct, diverges_from_current, computed_at
|
|
FROM trend_projections WHERE trend_window_id = $1
|
|
ORDER BY computed_at DESC LIMIT 1""",
|
|
trend_id,
|
|
)
|
|
if not row:
|
|
return {"trend_window_id": trend_id, "projection": None}
|
|
|
|
d = _row_to_dict(row)
|
|
d["driving_factors"] = _parse_jsonb(d.get("driving_factors"))
|
|
return d
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Competitive Layer Toggle (Requirements 6.1, 6.2, 6.3, 6.4, 6.5, 6.7)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CompetitiveToggleBody(BaseModel):
|
|
enabled: bool
|
|
operator: str = "operator"
|
|
|
|
|
|
@app.get("/api/admin/competitive/status")
|
|
async def get_competitive_status():
|
|
"""Return the current competitive signal layer enabled/disabled state.
|
|
|
|
Reads from the active risk_configs row's JSONB config field.
|
|
Requirements: 6.1, 6.5
|
|
"""
|
|
row = await pool.fetchrow(
|
|
"""SELECT config->>'competitive_enabled' AS competitive_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1""",
|
|
)
|
|
if row is None or row["competitive_enabled"] is None:
|
|
return {"competitive_enabled": True, "source": "default"}
|
|
return {
|
|
"competitive_enabled": row["competitive_enabled"].lower() == "true",
|
|
"source": "risk_configs",
|
|
}
|
|
|
|
|
|
@app.put("/api/admin/competitive/toggle")
|
|
async def toggle_competitive_layer(body: CompetitiveToggleBody):
|
|
"""Toggle the competitive signal layer on or off.
|
|
|
|
Persists the new state into the active risk_configs row's JSONB config
|
|
and records an audit event with previous state, new state, and operator.
|
|
|
|
Toggle state is read from PostgreSQL at the start of each aggregation
|
|
cycle (no caching), so changes take effect on the next cycle.
|
|
|
|
When disabled, pattern mining remains queryable via API but signal
|
|
propagation is skipped during aggregation. When re-enabled, the engine
|
|
resumes computing signals using latest historical data including
|
|
intelligence ingested while disabled.
|
|
|
|
Requirements: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
|
|
"""
|
|
# Read current state
|
|
current_row = await pool.fetchrow(
|
|
"""SELECT id, config->>'competitive_enabled' AS competitive_enabled
|
|
FROM risk_configs
|
|
WHERE active = TRUE
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1""",
|
|
)
|
|
|
|
if current_row is None:
|
|
# No active config exists — create one
|
|
new_config = json.dumps({"competitive_enabled": str(body.enabled).lower()})
|
|
current_row = await pool.fetchrow(
|
|
"""INSERT INTO risk_configs (name, trading_mode, config, active)
|
|
VALUES ('default', 'paper', $1::jsonb, TRUE)
|
|
RETURNING id, config->>'competitive_enabled' AS competitive_enabled""",
|
|
new_config,
|
|
)
|
|
previous_enabled = True # default was enabled
|
|
else:
|
|
prev_val = current_row["competitive_enabled"]
|
|
previous_enabled = prev_val.lower() == "true" if prev_val else True
|
|
|
|
config_id = str(current_row["id"])
|
|
|
|
# Update the config JSONB to set competitive_enabled
|
|
await pool.execute(
|
|
"""UPDATE risk_configs
|
|
SET config = config || $2::jsonb, updated_at = NOW()
|
|
WHERE id = $1""",
|
|
current_row["id"],
|
|
json.dumps({"competitive_enabled": str(body.enabled).lower()}),
|
|
)
|
|
|
|
# Record audit event (Requirement 6.7)
|
|
await record_audit_event(
|
|
pool,
|
|
event_type="competitive.layer_toggled",
|
|
entity_type="risk_config",
|
|
entity_id=config_id,
|
|
data={
|
|
"previous_enabled": previous_enabled,
|
|
"new_enabled": body.enabled,
|
|
},
|
|
actor=body.operator,
|
|
)
|
|
|
|
return {
|
|
"competitive_enabled": body.enabled,
|
|
"previous_enabled": previous_enabled,
|
|
"toggled_by": body.operator,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Historical Pattern & Competitive Signal Query Endpoints
|
|
# (Requirements 10.1, 10.2, 10.3, 10.4, 11.4, 11.6)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _pattern_to_dict(p) -> dict[str, Any]:
|
|
"""Convert a HistoricalPattern dataclass to a JSON-safe dict."""
|
|
d = asdict(p)
|
|
for key, val in d.items():
|
|
if isinstance(val, datetime):
|
|
d[key] = val.isoformat()
|
|
return d
|
|
|
|
|
|
@app.get("/api/patterns/{ticker}")
|
|
async def get_patterns_for_ticker(
|
|
ticker: str,
|
|
catalyst_type: Optional[str] = None,
|
|
time_horizon: Optional[str] = None,
|
|
):
|
|
"""Historical patterns for a company.
|
|
|
|
Filterable by catalyst_type and time_horizon.
|
|
Returns sample_count, outcome distribution, pattern_confidence,
|
|
and date range for each pattern.
|
|
|
|
Requirements: 10.1, 10.3
|
|
"""
|
|
horizons = [time_horizon] if time_horizon else None
|
|
|
|
if catalyst_type:
|
|
patterns = await find_self_patterns(pool, ticker, catalyst_type, horizons=horizons)
|
|
else:
|
|
# Query across all catalyst types present in the company's history
|
|
rows = await pool.fetch(
|
|
"""SELECT DISTINCT di.catalyst_type
|
|
FROM document_impact_records dir
|
|
JOIN document_intelligence di ON di.document_id = dir.document_id
|
|
JOIN documents d ON d.id = dir.document_id
|
|
WHERE dir.ticker = $1
|
|
AND di.validation_status = 'valid'
|
|
AND d.status != 'rejected'
|
|
AND di.catalyst_type IS NOT NULL""",
|
|
ticker,
|
|
)
|
|
patterns = []
|
|
for row in rows:
|
|
ct = row["catalyst_type"]
|
|
patterns.extend(await find_self_patterns(pool, ticker, ct, horizons=horizons))
|
|
|
|
return {
|
|
"ticker": ticker,
|
|
"patterns": [_pattern_to_dict(p) for p in patterns],
|
|
"count": len(patterns),
|
|
}
|
|
|
|
|
|
@app.get("/api/patterns/{ticker}/competitors")
|
|
async def get_competitor_patterns(
|
|
ticker: str,
|
|
catalyst_type: Optional[str] = None,
|
|
time_horizon: Optional[str] = None,
|
|
):
|
|
"""Cross-company patterns showing how this company's catalysts affected competitors.
|
|
|
|
Requirements: 10.2, 10.3
|
|
"""
|
|
horizons = [time_horizon] if time_horizon else None
|
|
|
|
# Find active competitors for this ticker
|
|
comp_rows = await pool.fetch(
|
|
"""SELECT DISTINCT
|
|
CASE WHEN ca.ticker = $1 THEN cb.ticker ELSE ca.ticker END AS competitor_ticker
|
|
FROM competitor_relationships cr
|
|
JOIN companies ca ON ca.id = cr.company_a_id
|
|
JOIN companies cb ON cb.id = cr.company_b_id
|
|
WHERE cr.active = TRUE
|
|
AND (ca.ticker = $1 OR cb.ticker = $1)""",
|
|
ticker,
|
|
)
|
|
|
|
# Determine catalyst types to query
|
|
if catalyst_type:
|
|
catalyst_types = [catalyst_type]
|
|
else:
|
|
ct_rows = await pool.fetch(
|
|
"""SELECT DISTINCT di.catalyst_type
|
|
FROM document_impact_records dir
|
|
JOIN document_intelligence di ON di.document_id = dir.document_id
|
|
JOIN documents d ON d.id = dir.document_id
|
|
WHERE dir.ticker = $1
|
|
AND di.validation_status = 'valid'
|
|
AND d.status != 'rejected'
|
|
AND di.catalyst_type IS NOT NULL""",
|
|
ticker,
|
|
)
|
|
catalyst_types = [r["catalyst_type"] for r in ct_rows]
|
|
|
|
patterns = []
|
|
for comp_row in comp_rows:
|
|
comp_ticker = comp_row["competitor_ticker"]
|
|
for ct in catalyst_types:
|
|
cross = await find_cross_company_patterns(
|
|
pool, ticker, comp_ticker, ct, horizons=horizons,
|
|
)
|
|
patterns.extend(cross)
|
|
|
|
return {
|
|
"ticker": ticker,
|
|
"cross_company_patterns": [_pattern_to_dict(p) for p in patterns],
|
|
"count": len(patterns),
|
|
}
|
|
|
|
|
|
@app.get("/api/patterns/{ticker}/competitive-signals")
|
|
async def get_competitive_signals(ticker: str):
|
|
"""Recent competitive signals targeting this company.
|
|
|
|
Requirements: 10.4
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""SELECT id, source_document_id, source_ticker, target_ticker,
|
|
catalyst_type, pattern_confidence, signal_direction,
|
|
signal_strength, relationship_strength, computed_at
|
|
FROM competitive_signal_records
|
|
WHERE target_ticker = $1
|
|
ORDER BY computed_at DESC
|
|
LIMIT 100""",
|
|
ticker,
|
|
)
|
|
return {
|
|
"ticker": ticker,
|
|
"competitive_signals": [_row_to_dict(r) for r in rows],
|
|
"count": len(rows),
|
|
}
|
|
|
|
|
|
@app.get("/api/patterns/{ticker}/decisions")
|
|
async def get_decision_history(
|
|
ticker: str,
|
|
time_horizon: Optional[str] = None,
|
|
):
|
|
"""Major corporate decision history with trend outcomes and pattern statistics.
|
|
|
|
Queries document_impact_records filtered by MAJOR_DECISION_CATALYSTS,
|
|
joined with trend_windows for outcome data.
|
|
|
|
Requirements: 11.4, 11.6
|
|
"""
|
|
major_types = list(MAJOR_DECISION_CATALYSTS)
|
|
horizons = [time_horizon] if time_horizon else None
|
|
|
|
# Fetch major decision records for this ticker
|
|
rows = await pool.fetch(
|
|
"""SELECT dir.id, dir.document_id, dir.ticker,
|
|
di.catalyst_type, di.summary,
|
|
dir.impact_score, dir.created_at,
|
|
d.published_at
|
|
FROM document_impact_records dir
|
|
JOIN document_intelligence di ON di.document_id = dir.document_id
|
|
JOIN documents d ON d.id = dir.document_id
|
|
WHERE dir.ticker = $1
|
|
AND di.validation_status = 'valid'
|
|
AND d.status != 'rejected'
|
|
AND di.catalyst_type = ANY($2)
|
|
ORDER BY dir.created_at DESC
|
|
LIMIT 50""",
|
|
ticker,
|
|
major_types,
|
|
)
|
|
|
|
decisions = []
|
|
for row in rows:
|
|
decision = _row_to_dict(row)
|
|
|
|
# Fetch pattern statistics for this catalyst type
|
|
ct = row["catalyst_type"]
|
|
patterns = await find_self_patterns(pool, ticker, ct, horizons=horizons)
|
|
decision["pattern_statistics"] = [_pattern_to_dict(p) for p in patterns]
|
|
|
|
decisions.append(decision)
|
|
|
|
return {
|
|
"ticker": ticker,
|
|
"decisions": decisions,
|
|
"count": len(decisions),
|
|
}
|