Files
stonks-oracle/services/api/app.py
T
Celes Renata bc077bfcc8
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: trading feedback engine — periodic performance reports with AI summarization
- Migration 038: trading_reports table + report-summarizer agent seed
- 6 reporting modules: models, collector, sections, validator, summarizer, generator
- API endpoints: GET /api/reports (paginated, filterable), GET /api/reports/{id}
- Frontend hooks: useReports, useReport with TanStack Query
- Scheduler: daily (after 16:30 ET) and weekly (Saturday) report triggers
- Redis queue consumer for async report generation with retry/dedup
- 5 property-based tests (chunking, serialization, validation, accuracy, deltas)
- 109 unit/integration tests across all modules
- 6 frontend hook tests with MSW mocks
2026-05-01 22:13:09 +00:00

4219 lines
154 KiB
Python

"""Query API - FastAPI application for analytics, evidence drill-down, and admin controls.
Exposes read-only endpoints for:
- Companies and watchlists (proxied from symbol registry data)
- Document timelines with intelligence
- Trend summaries
- Recommendation history with evidence
- Order history with audit trails
Requirements: 11.1, 11.2, 11.3
Design: Section 9.1 (Operational API)
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time as _time
from contextlib import asynccontextmanager
from dataclasses import asdict
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import asyncpg
import httpx
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException, Query, Request
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from pydantic import BaseModel
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response, StreamingResponse
from services.aggregation.pattern_matcher import (
find_cross_company_patterns,
find_self_patterns,
)
from services.extractor.metrics import get_model_performance_summary
from services.risk.engine import PortfolioRiskConfig
from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, record_audit_event
from services.shared.config import load_config
from services.shared.db import get_pg_pool, get_redis
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
from services.shared.redis_keys import PIPELINE_ENABLED_KEY, QUEUE_BROKER, QUEUE_PREFIX, queue_key
from services.shared.schemas import MAJOR_DECISION_CATALYSTS
from services.validation.attribution import (
compute_catalyst_attribution,
compute_layer_attribution,
compute_source_attribution,
)
logger = logging.getLogger("query_api")
config = load_config()
pool: Optional[asyncpg.Pool] = None
rds: Optional[aioredis.Redis] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global pool, rds
setup_logging("query_api", level=config.log_level, json_output=config.json_logs)
pool = await get_pg_pool(config)
rds = get_redis(config)
yield
await pool.close()
await rds.close()
app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan)
class TraceMiddleware(BaseHTTPMiddleware):
"""Inject trace context for every incoming HTTP request."""
async def dispatch(self, request: Request, call_next):
trace_id = request.headers.get("x-trace-id") or new_trace_id()
set_trace_context(trace_id=trace_id)
response = await call_next(request)
response.headers["x-trace-id"] = trace_id
return response
app.add_middleware(TraceMiddleware)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _row_to_dict(row: asyncpg.Record) -> dict[str, Any]:
"""Convert an asyncpg Record to a JSON-safe dict."""
from decimal import Decimal
from uuid import UUID
d: dict[str, Any] = {}
for key, val in dict(row).items():
if isinstance(val, datetime):
d[key] = val.isoformat()
elif isinstance(val, Decimal):
d[key] = float(val)
elif isinstance(val, UUID):
d[key] = str(val)
elif hasattr(val, "__str__") and not isinstance(val, (str, int, float, bool, list, dict, type(None))):
d[key] = str(val)
else:
d[key] = val
return d
def _parse_jsonb(val: Any) -> Any:
"""Parse a JSONB value that may come back as str or already-decoded."""
if val is None:
return None
if isinstance(val, (dict, list)):
return val
try:
return json.loads(val)
except (json.JSONDecodeError, TypeError):
return val
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@app.get("/health")
async def health():
try:
await pool.fetchval("SELECT 1")
return {"status": "ok"}
except Exception:
raise HTTPException(503, "Database unavailable")
@app.get("/metrics")
async def metrics():
"""Expose Prometheus metrics for scraping.
Requirements: 12.1, 12.2
"""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST,
)
# ---------------------------------------------------------------------------
# Companies (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/companies")
async def list_companies(
active: bool = True,
sector: Optional[str] = None,
ticker: Optional[str] = None,
):
"""List tracked companies with optional filters."""
conditions = ["c.active = $1"]
params: list[Any] = [active]
idx = 2
if sector:
conditions.append(f"c.sector = ${idx}")
params.append(sector)
idx += 1
if ticker:
conditions.append(f"c.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
where = " AND ".join(conditions)
rows = await pool.fetch(
f"""SELECT c.id, c.ticker, c.legal_name, c.exchange, c.sector,
c.industry, c.market_cap_bucket, c.active,
c.created_at, c.updated_at
FROM companies c
WHERE {where}
ORDER BY c.ticker""",
*params,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/companies/{company_id}")
async def get_company(company_id: str):
"""Get a single company with aliases and source count."""
row = await pool.fetchrow(
"""SELECT id, ticker, legal_name, exchange, sector, industry,
market_cap_bucket, active, created_at, updated_at
FROM companies WHERE id = $1""",
company_id,
)
if not row:
raise HTTPException(404, "Company not found")
result = _row_to_dict(row)
aliases = await pool.fetch(
"SELECT id, alias, alias_type FROM company_aliases WHERE company_id = $1",
company_id,
)
result["aliases"] = [dict(a) for a in aliases]
source_count = await pool.fetchval(
"SELECT COUNT(*) FROM sources WHERE company_id = $1 AND active = true",
company_id,
)
result["active_source_count"] = source_count
return result
@app.get("/api/companies/{company_id}/sources")
async def list_company_sources(company_id: str):
"""List sources configured for a company."""
rows = await pool.fetch(
"""SELECT id, source_type, source_name, config, credibility_score,
retention_days, access_policy, active
FROM sources WHERE company_id = $1 ORDER BY source_type""",
company_id,
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Document Timelines (Requirement 11.1, 11.2)
# ---------------------------------------------------------------------------
@app.get("/api/documents")
async def list_documents(
ticker: Optional[str] = None,
company_id: Optional[str] = None,
document_type: Optional[str] = None,
status: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List documents with optional filters, ordered by published_at desc."""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"""d.id IN (
SELECT document_id FROM document_company_mentions WHERE ticker = ${idx}
)""")
params.append(ticker.upper())
idx += 1
if company_id:
conditions.append(f"""d.id IN (
SELECT document_id FROM document_company_mentions WHERE company_id = ${idx}
)""")
params.append(company_id)
idx += 1
if document_type:
conditions.append(f"d.document_type = ${idx}")
params.append(document_type)
idx += 1
if status:
conditions.append(f"d.status = ${idx}")
params.append(status)
idx += 1
if since:
conditions.append(f"d.published_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT d.id, d.document_type, d.source_type, d.publisher, d.url,
d.title, d.published_at, d.retrieved_at, d.language,
d.content_hash, d.parse_quality_score, d.parse_confidence,
d.status, d.created_at
FROM documents d
{where}
ORDER BY d.published_at DESC NULLS LAST
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/documents/{document_id}")
async def get_document(document_id: str):
"""Get a single document with its intelligence extraction and company mentions."""
row = await pool.fetchrow(
"""SELECT id, document_type, source_type, publisher, url, canonical_url,
title, published_at, retrieved_at, language, content_hash,
raw_storage_ref, normalized_storage_ref,
parse_quality_score, parse_confidence, status,
created_at, updated_at
FROM documents WHERE id = $1""",
document_id,
)
if not row:
raise HTTPException(404, "Document not found")
result = _row_to_dict(row)
# Company mentions
mentions = await pool.fetch(
"""SELECT dcm.company_id, dcm.ticker, dcm.mention_type, dcm.confidence,
c.legal_name
FROM document_company_mentions dcm
JOIN companies c ON c.id = dcm.company_id
WHERE dcm.document_id = $1""",
document_id,
)
result["company_mentions"] = [_row_to_dict(m) for m in mentions]
# Intelligence extraction
intel = await pool.fetchrow(
"""SELECT id, summary, macro_themes, novelty_score, source_credibility,
extraction_warnings, confidence, model_provider, model_name,
prompt_version, schema_version, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
document_id,
)
if intel:
intel_dict = _row_to_dict(intel)
intel_dict["macro_themes"] = _parse_jsonb(intel_dict.get("macro_themes"))
intel_dict["extraction_warnings"] = _parse_jsonb(intel_dict.get("extraction_warnings"))
intel_dict["validation_errors"] = _parse_jsonb(intel_dict.get("validation_errors"))
# Impact records per company
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel["id"],
)
impact_list = []
for imp in impacts:
imp_dict = _row_to_dict(imp)
imp_dict["key_facts"] = _parse_jsonb(imp_dict.get("key_facts"))
imp_dict["risks"] = _parse_jsonb(imp_dict.get("risks"))
imp_dict["evidence_spans"] = _parse_jsonb(imp_dict.get("evidence_spans"))
impact_list.append(imp_dict)
intel_dict["company_impacts"] = impact_list
result["intelligence"] = intel_dict
else:
result["intelligence"] = None
return result
# ---------------------------------------------------------------------------
# Trend Summaries (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/trends")
async def list_trends(
ticker: Optional[str] = None,
entity_type: str = "company",
window: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List trend summaries with optional filters."""
conditions = ["entity_type = $1"]
params: list[Any] = [entity_type]
idx = 2
if ticker:
conditions.append(f"entity_id = ${idx}")
params.append(ticker.upper())
idx += 1
if window:
conditions.append(f"\"window\" = ${idx}")
params.append(window)
idx += 1
where = " AND ".join(conditions)
rows = await pool.fetch(
f"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows
WHERE {where}
ORDER BY generated_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
results = []
for r in rows:
d = _row_to_dict(r)
for jsonb_field in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
results.append(d)
# Include projection data for each trend (Requirement 12.10)
if results:
trend_ids = [r["id"] for r in rows]
proj_rows = await pool.fetch(
"""SELECT DISTINCT ON (trend_window_id)
trend_window_id, projected_direction, projected_strength,
projected_confidence, projection_horizon,
macro_contribution_pct, diverges_from_current
FROM trend_projections
WHERE trend_window_id = ANY($1::uuid[])
ORDER BY trend_window_id, computed_at DESC""",
trend_ids,
)
proj_map = {str(p["trend_window_id"]): _row_to_dict(p) for p in proj_rows}
for d in results:
d["projection"] = proj_map.get(d["id"])
return results
@app.get("/api/trends/history")
async def list_trend_history(
ticker: Optional[str] = None,
window: Optional[str] = None,
limit: int = Query(default=200, le=1000),
):
"""Return historical trend snapshots for charting.
Unlike /api/trends which returns the latest snapshot per entity/window,
this endpoint returns the time series from the trend_history table.
"""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"entity_id = ${idx}")
params.append(ticker.upper())
idx += 1
if window:
conditions.append(f"\"window\" = ${idx}")
params.append(window)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
try:
rows = await pool.fetch(
f"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, contradiction_score,
dominant_catalysts, material_risks, generated_at
FROM trend_history
{where}
ORDER BY generated_at ASC
LIMIT ${idx}""",
*params, limit,
)
except Exception:
# Table may not exist yet (pre-migration 024)
return []
results = []
for r in rows:
d = _row_to_dict(r)
d["dominant_catalysts"] = _parse_jsonb(d.get("dominant_catalysts"))
d["material_risks"] = _parse_jsonb(d.get("material_risks"))
results.append(d)
return results
@app.get("/api/market/prices/{ticker}")
async def get_market_prices(
ticker: str,
limit: int = Query(default=200, le=500),
):
"""Return historical close prices for a ticker from market_snapshots.
Each row has a bar_date (from the Polygon bar timestamp) and OHLCV data.
Ordered oldest-first for chart rendering. Also returns 90-day high/low
computed from all bars in the last 90 days.
"""
ticker = ticker.upper()
rows = await pool.fetch(
"""SELECT
captured_at,
(data->>'c')::float AS close,
(data->>'o')::float AS open,
(data->>'h')::float AS high,
(data->>'l')::float AS low,
(data->>'v')::float AS volume,
(data->>'t')::bigint AS bar_timestamp
FROM market_snapshots
WHERE ticker = $1 AND snapshot_type = 'bar'
ORDER BY captured_at ASC
LIMIT $2""",
ticker, limit,
)
results = []
seen_dates: set[str] = set()
for r in rows:
# Deduplicate by bar_timestamp (same day bar captured multiple times)
bar_ts = r["bar_timestamp"]
if bar_ts is None:
continue
date_key = str(bar_ts)
if date_key in seen_dates:
continue
seen_dates.add(date_key)
results.append({
"ticker": ticker,
"close": r["close"],
"open": r["open"],
"high": r["high"],
"low": r["low"],
"volume": r["volume"],
"bar_timestamp": bar_ts,
"captured_at": r["captured_at"].isoformat() if r["captured_at"] else None,
})
# Compute 90-day high/low from all bars in the window
cutoff_90d = datetime.now(timezone.utc) - timedelta(days=90)
range_row = await pool.fetchrow(
"""SELECT
MIN((data->>'l')::float) AS low_90d,
MAX((data->>'h')::float) AS high_90d
FROM market_snapshots
WHERE ticker = $1 AND snapshot_type = 'bar'
AND captured_at >= $2""",
ticker, cutoff_90d,
)
low_90d = range_row["low_90d"] if range_row else None
high_90d = range_row["high_90d"] if range_row else None
return {
"bars": results,
"range_90d": {"low": low_90d, "high": high_90d},
}
@app.post("/api/market/backfill/{ticker}")
async def backfill_market_prices(ticker: str, days: int = Query(default=90, le=365)):
"""Backfill daily OHLCV bars from Polygon for the last N days.
Fetches daily aggregate bars from Polygon's range endpoint and inserts
any missing bars into market_snapshots (deduped by bar timestamp).
Returns the number of bars inserted.
"""
ticker = ticker.upper()
api_key = config.market_data.api_key
if not api_key:
raise HTTPException(503, "No market data API key configured")
import hashlib
from datetime import date, timedelta
import httpx
to_date = date.today().isoformat()
from_date = (date.today() - timedelta(days=days)).isoformat()
url = (
f"{config.market_data.base_url}/v2/aggs/ticker/{ticker}"
f"/range/1/day/{from_date}/{to_date}"
)
params = {"apiKey": api_key, "adjusted": "true", "sort": "asc", "limit": "500"}
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json()
bars = data.get("results", [])
if not bars:
return {"ticker": ticker, "inserted": 0, "total_bars": 0}
# Find existing bar timestamps to avoid duplicates
existing = await pool.fetch(
"""SELECT DISTINCT (data->>'t')::bigint AS bar_ts
FROM market_snapshots
WHERE ticker = $1 AND snapshot_type = 'bar'""",
ticker,
)
existing_ts = {r["bar_ts"] for r in existing if r["bar_ts"] is not None}
# Look up company_id (nullable)
company_row = await pool.fetchrow(
"SELECT id FROM companies WHERE ticker = $1", ticker,
)
company_id = company_row["id"] if company_row else None
inserted = 0
for bar in bars:
bar_ts = bar.get("t")
if bar_ts is None or bar_ts in existing_ts:
continue
bar_json = json.dumps(bar)
content_hash = hashlib.sha256(bar_json.encode()).hexdigest()
captured_at = datetime.fromtimestamp(bar_ts / 1000, tz=timezone.utc)
await pool.execute(
"""INSERT INTO market_snapshots
(company_id, ticker, snapshot_type, data, source_provider, captured_at, content_hash)
VALUES ($1, $2, 'bar', $3::jsonb, 'polygon_backfill', $4, $5)""",
company_id, ticker, bar_json, captured_at, content_hash,
)
existing_ts.add(bar_ts)
inserted += 1
return {"ticker": ticker, "inserted": inserted, "total_bars": len(bars), "days": days}
@app.post("/api/market/backfill-all")
async def backfill_all_market_prices(days: int = Query(default=90, le=365)):
"""Backfill daily bars for ALL active companies from Polygon.
Iterates through all active tickers and calls the per-ticker backfill.
Returns a summary of results per ticker.
"""
api_key = config.market_data.api_key
if not api_key:
raise HTTPException(503, "No market data API key configured")
rows = await pool.fetch(
"SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker",
)
results = []
for row in rows:
ticker = row["ticker"]
try:
result = await backfill_market_prices(ticker, days)
results.append(result)
except Exception as e:
logger.warning("Backfill failed for %s: %s", ticker, e)
results.append({"ticker": ticker, "inserted": 0, "error": str(e)})
total_inserted = sum(r.get("inserted", 0) for r in results)
return {"total_inserted": total_inserted, "tickers": len(results), "details": results}
@app.get("/api/trends/{trend_id}")
async def get_trend(trend_id: str):
"""Get a single trend summary by ID."""
row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at, created_at
FROM trend_windows WHERE id = $1""",
trend_id,
)
if not row:
raise HTTPException(404, "Trend not found")
d = _row_to_dict(row)
for jsonb_field in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
d[jsonb_field] = _parse_jsonb(d.get(jsonb_field))
return d
# ---------------------------------------------------------------------------
# Recommendations (Requirement 11.1, 11.2)
# ---------------------------------------------------------------------------
@app.get("/api/recommendations")
async def list_recommendations(
ticker: Optional[str] = None,
action: Optional[str] = None,
mode: Optional[str] = None,
since: Optional[str] = None,
min_confidence: Optional[float] = Query(default=None, ge=0.0, le=1.0),
limit: int = Query(default=50, le=200),
offset: int = 0,
latest: bool = Query(default=True, description="Return only the latest recommendation per ticker"),
):
"""List recommendations with optional filters.
By default (latest=true), returns only the most recent recommendation
per ticker to avoid showing duplicate/stale entries. Set latest=false
to see the full history.
min_confidence filters to recommendations at or above the given threshold,
useful for showing only recs that would pass a specific risk tier gate.
"""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"r.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
if action:
conditions.append(f"r.action = ${idx}")
params.append(action)
idx += 1
if mode:
conditions.append(f"r.mode = ${idx}")
params.append(mode)
idx += 1
if since:
conditions.append(f"r.generated_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
if min_confidence is not None:
conditions.append(f"r.confidence >= ${idx}")
params.append(min_confidence)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
if latest:
# Use DISTINCT ON to get only the latest recommendation per ticker
rows = await pool.fetch(
f"""SELECT DISTINCT ON (r.ticker)
r.id, r.ticker, r.action, r.mode, r.confidence,
r.time_horizon, r.thesis, r.invalidation_conditions,
r.portfolio_pct, r.max_loss_pct, r.model_version,
r.risk_classification, r.generated_at
FROM recommendations r
{where}
ORDER BY r.ticker, r.generated_at DESC""",
*params,
)
# Apply limit/offset manually after DISTINCT ON
# Sort by generated_at DESC for the final result
rows = sorted(rows, key=lambda r: r["generated_at"], reverse=True)
rows = rows[offset:offset + limit]
else:
rows = await pool.fetch(
f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence,
r.time_horizon, r.thesis, r.invalidation_conditions,
r.portfolio_pct, r.max_loss_pct, r.model_version,
r.risk_classification, r.generated_at
FROM recommendations r
{where}
ORDER BY r.generated_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
results = []
for r in rows:
d = _row_to_dict(r)
d["invalidation_conditions"] = _parse_jsonb(d.get("invalidation_conditions"))
results.append(d)
return results
@app.get("/api/recommendations/{recommendation_id}")
async def get_recommendation(recommendation_id: str):
"""Get a single recommendation with evidence and risk evaluation.
Requirement 11.2: display contributing intelligence objects, raw sources,
and market context that influenced the decision.
"""
row = await pool.fetchrow(
"""SELECT r.id, r.ticker, r.company_id, r.action, r.mode, r.confidence,
r.time_horizon, r.thesis, r.invalidation_conditions,
r.portfolio_pct, r.max_loss_pct, r.model_version,
r.model_provider, r.prompt_version, r.schema_version,
r.risk_classification, r.generated_at, r.created_at
FROM recommendations r WHERE r.id = $1""",
recommendation_id,
)
if not row:
raise HTTPException(404, "Recommendation not found")
result = _row_to_dict(row)
result["invalidation_conditions"] = _parse_jsonb(result.get("invalidation_conditions"))
# Evidence: linked documents and intelligence objects
evidence_rows = await pool.fetch(
"""SELECT re.id, re.document_id, re.intelligence_id, re.evidence_type, re.weight,
d.title, d.document_type, d.source_type, d.publisher, d.url,
d.published_at
FROM recommendation_evidence re
LEFT JOIN documents d ON d.id = re.document_id
WHERE re.recommendation_id = $1
ORDER BY re.weight DESC""",
recommendation_id,
)
result["evidence"] = [_row_to_dict(e) for e in evidence_rows]
# Risk evaluation
risk_row = await pool.fetchrow(
"""SELECT id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
FROM risk_evaluations WHERE recommendation_id = $1
ORDER BY evaluated_at DESC LIMIT 1""",
recommendation_id,
)
if risk_row:
risk_dict = _row_to_dict(risk_row)
risk_dict["rejection_reasons"] = _parse_jsonb(risk_dict.get("rejection_reasons"))
risk_dict["risk_checks"] = _parse_jsonb(risk_dict.get("risk_checks"))
result["risk_evaluation"] = risk_dict
else:
result["risk_evaluation"] = None
return result
# ---------------------------------------------------------------------------
# Evidence Drill-Down (Requirement 11.2, 10.4)
# ---------------------------------------------------------------------------
@app.get("/api/recommendations/{recommendation_id}/evidence")
async def get_recommendation_evidence_drilldown(recommendation_id: str):
"""Full evidence drill-down linking a recommendation to source documents and raw artifacts.
Returns the complete provenance chain for each piece of evidence:
recommendation_evidence → document (with storage refs) → document_intelligence
→ document_impact_records, plus the trend window that fed the recommendation.
Requirements: 11.2, 10.4
Design: Section 9.1 (evidence drill-down and audit views)
"""
# Verify recommendation exists and get basic info
rec_row = await pool.fetchrow(
"""SELECT id, ticker, company_id, action, mode, confidence,
time_horizon, thesis, model_version, model_provider,
prompt_version, schema_version, generated_at
FROM recommendations WHERE id = $1""",
recommendation_id,
)
if not rec_row:
raise HTTPException(404, "Recommendation not found")
result: dict[str, Any] = {
"recommendation": _row_to_dict(rec_row),
"evidence": [],
"trend_window": None,
}
# Fetch evidence rows with full document details including storage refs
evidence_rows = await pool.fetch(
"""SELECT re.id AS evidence_id,
re.document_id,
re.intelligence_id,
re.evidence_type,
re.weight,
d.document_type,
d.source_type,
d.publisher,
d.url,
d.canonical_url,
d.title,
d.published_at,
d.retrieved_at,
d.language,
d.content_hash,
d.raw_storage_ref,
d.normalized_storage_ref,
d.parse_quality_score,
d.parse_confidence,
d.status AS document_status
FROM recommendation_evidence re
LEFT JOIN documents d ON d.id = re.document_id
WHERE re.recommendation_id = $1
ORDER BY re.weight DESC""",
recommendation_id,
)
for ev in evidence_rows:
ev_dict = _row_to_dict(ev)
ev_dict["intelligence"] = None
ev_dict["company_impacts"] = []
# Fetch intelligence extraction for this evidence
intel_id = ev["intelligence_id"]
doc_id = ev["document_id"]
# Use the linked intelligence_id if available, otherwise look up by document_id
intel_row = None
if intel_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE id = $1""",
intel_id,
)
elif doc_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
doc_id,
)
if intel_row:
intel_dict = _row_to_dict(intel_row)
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
ev_dict["intelligence"] = intel_dict
# Fetch per-company impact records for this intelligence
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel_row["id"],
)
impact_list = []
for imp in impacts:
imp_dict = _row_to_dict(imp)
for jf in ("key_facts", "risks", "evidence_spans"):
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
impact_list.append(imp_dict)
ev_dict["company_impacts"] = impact_list
result["evidence"].append(ev_dict)
# Fetch the most recent trend window for this ticker to show market context
ticker = rec_row["ticker"]
generated_at = rec_row["generated_at"]
if ticker and generated_at:
trend_row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows
WHERE entity_id = $1 AND entity_type = 'company'
AND generated_at <= $2
ORDER BY generated_at DESC LIMIT 1""",
ticker, generated_at,
)
if trend_row:
trend_dict = _row_to_dict(trend_row)
for jf in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
# Include trend evidence linkage: documents that contributed to this trend
trend_ev_rows = await pool.fetch(
"""SELECT te.id, te.document_id, te.evidence_type, te.rank_score,
te.weight_component, te.impact_component,
te.recency_component, te.confidence_component,
te.sentiment_value,
d.title, d.document_type, d.source_type, d.publisher,
d.url, d.published_at, d.raw_storage_ref,
d.normalized_storage_ref
FROM trend_evidence te
LEFT JOIN documents d ON d.id = te.document_id
WHERE te.trend_window_id = $1
ORDER BY te.rank_score DESC""",
trend_row["id"],
)
trend_dict["evidence"] = [_row_to_dict(te) for te in trend_ev_rows]
result["trend_window"] = trend_dict
return result
# ---------------------------------------------------------------------------
# Trend Evidence Drill-Down (Requirement 10.4)
# ---------------------------------------------------------------------------
@app.get("/api/trends/{trend_id}/evidence")
async def get_trend_evidence_drilldown(trend_id: str):
"""Drill down from a trend window to its contributing documents and raw artifacts.
Returns the trend summary plus each contributing document with storage refs,
intelligence extraction, and impact records — full provenance chain.
Requirements: 10.4, 6.5
"""
trend_row = await pool.fetchrow(
"""SELECT id, entity_type, entity_id, "window", trend_direction,
trend_strength, confidence, top_supporting_evidence,
top_opposing_evidence, dominant_catalysts, material_risks,
contradiction_score, market_context, generated_at
FROM trend_windows WHERE id = $1""",
trend_id,
)
if not trend_row:
raise HTTPException(404, "Trend not found")
trend_dict = _row_to_dict(trend_row)
for jf in (
"top_supporting_evidence", "top_opposing_evidence",
"dominant_catalysts", "material_risks", "market_context",
):
trend_dict[jf] = _parse_jsonb(trend_dict.get(jf))
# Fetch trend evidence with full document details
evidence_rows = await pool.fetch(
"""SELECT te.id AS evidence_id,
te.document_id,
te.evidence_type,
te.rank_score,
te.weight_component,
te.impact_component,
te.recency_component,
te.confidence_component,
te.sentiment_value,
d.document_type,
d.source_type,
d.publisher,
d.url,
d.canonical_url,
d.title,
d.published_at,
d.retrieved_at,
d.content_hash,
d.raw_storage_ref,
d.normalized_storage_ref,
d.parse_quality_score,
d.parse_confidence,
d.status AS document_status
FROM trend_evidence te
LEFT JOIN documents d ON d.id = te.document_id
WHERE te.trend_window_id = $1
ORDER BY te.rank_score DESC""",
trend_id,
)
evidence_list = []
for ev in evidence_rows:
ev_dict = _row_to_dict(ev)
ev_dict["intelligence"] = None
ev_dict["company_impacts"] = []
doc_id = ev["document_id"]
if doc_id:
intel_row = await pool.fetchrow(
"""SELECT id, document_id, summary, macro_themes, novelty_score,
source_credibility, extraction_warnings, confidence,
model_provider, model_name, prompt_version, schema_version,
raw_output_ref, prompt_ref, validation_status,
validation_errors, created_at
FROM document_intelligence WHERE document_id = $1
ORDER BY created_at DESC LIMIT 1""",
doc_id,
)
if intel_row:
intel_dict = _row_to_dict(intel_row)
for jf in ("macro_themes", "extraction_warnings", "validation_errors"):
intel_dict[jf] = _parse_jsonb(intel_dict.get(jf))
ev_dict["intelligence"] = intel_dict
impacts = await pool.fetch(
"""SELECT dir.company_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
dir.key_facts, dir.risks, dir.evidence_spans,
c.legal_name
FROM document_impact_records dir
JOIN companies c ON c.id = dir.company_id
WHERE dir.intelligence_id = $1""",
intel_row["id"],
)
for imp in impacts:
imp_dict = _row_to_dict(imp)
for jf in ("key_facts", "risks", "evidence_spans"):
imp_dict[jf] = _parse_jsonb(imp_dict.get(jf))
ev_dict["company_impacts"].append(imp_dict)
evidence_list.append(ev_dict)
return {
"trend": trend_dict,
"evidence": evidence_list,
}
# ---------------------------------------------------------------------------
# Order History (Requirement 11.1, 11.3)
# ---------------------------------------------------------------------------
@app.get("/api/orders")
async def list_orders(
ticker: Optional[str] = None,
status: Optional[str] = None,
side: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List orders with optional filters."""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if ticker:
conditions.append(f"o.ticker = ${idx}")
params.append(ticker.upper())
idx += 1
if status:
conditions.append(f"o.status = ${idx}")
params.append(status)
idx += 1
if side:
conditions.append(f"o.side = ${idx}")
params.append(side)
idx += 1
if since:
conditions.append(f"o.created_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
o.status, o.broker_order_id, o.submitted_at, o.acknowledged_at,
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
o.fill_price, o.fill_quantity, o.created_at
FROM orders o
{where}
ORDER BY o.created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str):
"""Get a single order with its events, decision trace, and full audit trail.
Requirement 11.3: expose full audit trail from ingestion through broker
execution and eventual market outcome.
"""
row = await pool.fetchrow(
"""SELECT o.id, o.recommendation_id, o.broker_account_id, o.ticker,
o.side, o.order_type, o.quantity, o.limit_price, o.stop_price,
o.status, o.idempotency_key, o.broker_order_id,
o.decision_trace, o.submitted_at, o.acknowledged_at,
o.filled_at, o.cancelled_at, o.rejected_at, o.rejection_reason,
o.fill_price, o.fill_quantity, o.created_at, o.updated_at
FROM orders o WHERE o.id = $1""",
order_id,
)
if not row:
raise HTTPException(404, "Order not found")
result = _row_to_dict(row)
result["decision_trace"] = _parse_jsonb(result.get("decision_trace"))
# Order events
events = await pool.fetch(
"""SELECT id, event_type, data, broker_timestamp, created_at
FROM order_events WHERE order_id = $1 ORDER BY created_at ASC""",
order_id,
)
result["events"] = []
for ev in events:
ev_dict = _row_to_dict(ev)
ev_dict["data"] = _parse_jsonb(ev_dict.get("data"))
result["events"].append(ev_dict)
# Full audit trail (Requirement 11.3)
recommendation_id = str(row["recommendation_id"]) if row["recommendation_id"] else None
result["audit_trail"] = await get_order_audit_trail(pool, order_id, recommendation_id)
return result
# ---------------------------------------------------------------------------
# Positions (Requirement 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/positions")
async def list_positions(
ticker: Optional[str] = None,
):
"""List current positions with Polygon market prices overlaid.
The current_price from the broker (Alpaca paper) can be stale or
inaccurate. We overlay the latest close from market_snapshots
(Polygon daily bars) and recompute unrealized P&L from that.
"""
if ticker:
rows = await pool.fetch(
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
p.avg_entry_price, p.current_price,
p.unrealized_pnl, p.realized_pnl, p.updated_at
FROM positions p WHERE p.ticker = $1 ORDER BY p.ticker""",
ticker.upper(),
)
else:
rows = await pool.fetch(
"""SELECT p.id, p.broker_account_id, p.ticker, p.quantity,
p.avg_entry_price, p.current_price,
p.unrealized_pnl, p.realized_pnl, p.updated_at
FROM positions p ORDER BY p.ticker""",
)
# Enrich with latest Polygon close for comparison.
# Use whichever price is more recent: broker sync or Polygon bar.
tickers = list({r["ticker"] for r in rows})
price_map: dict[str, float] = {}
if tickers:
price_rows = await pool.fetch(
"""SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float AS close
FROM market_snapshots
WHERE ticker = ANY($1) AND snapshot_type = 'bar'
ORDER BY ticker, captured_at DESC""",
tickers,
)
price_map = {r["ticker"]: r["close"] for r in price_rows if r["close"]}
results = []
for r in rows:
d = _row_to_dict(r)
polygon_price = price_map.get(d["ticker"])
d["polygon_price"] = polygon_price
results.append(d)
return results
# ---------------------------------------------------------------------------
# Audit Trail (Requirement 11.3)
# ---------------------------------------------------------------------------
@app.get("/api/audit/{entity_type}/{entity_id}")
async def get_audit_trail(entity_type: str, entity_id: str):
"""Get audit events for any entity type and ID."""
events = await get_entity_audit_trail(pool, entity_type, entity_id)
if not events:
raise HTTPException(404, "No audit events found")
return events
# ---------------------------------------------------------------------------
# Admin: Source Health (Requirement 11.1 - source health)
# ---------------------------------------------------------------------------
@app.get("/api/admin/sources/health")
async def get_source_health(
source_type: Optional[str] = None,
company_id: Optional[str] = None,
active_only: bool = True,
):
"""Source health overview: each source with its latest ingestion status and failure counts.
Design: Section 9.1 (source health and job state)
"""
conditions = []
params: list[Any] = []
idx = 1
if active_only:
conditions.append(f"s.active = ${idx}")
params.append(True)
idx += 1
if source_type:
conditions.append(f"s.source_type = ${idx}")
params.append(source_type)
idx += 1
if company_id:
conditions.append(f"s.company_id = ${idx}")
params.append(company_id)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT s.id AS source_id, s.source_type, s.source_name,
s.credibility_score, s.active,
c.ticker, c.legal_name, c.id AS company_id,
latest.status AS last_run_status,
latest.started_at AS last_run_at,
latest.error_message AS last_error,
latest.items_fetched AS last_items_fetched,
latest.items_new AS last_items_new,
COALESCE(stats.total_runs, 0) AS total_runs_24h,
COALESCE(stats.failed_runs, 0) AS failed_runs_24h,
COALESCE(stats.total_items, 0) AS total_items_24h
FROM sources s
JOIN companies c ON c.id = s.company_id
LEFT JOIN LATERAL (
SELECT ir.status, ir.started_at, ir.error_message,
ir.items_fetched, ir.items_new
FROM ingestion_runs ir
WHERE ir.source_id = s.id
ORDER BY ir.started_at DESC
LIMIT 1
) latest ON TRUE
LEFT JOIN LATERAL (
SELECT COUNT(*) AS total_runs,
COUNT(*) FILTER (WHERE ir2.status = 'failed') AS failed_runs,
COALESCE(SUM(ir2.items_fetched), 0) AS total_items
FROM ingestion_runs ir2
WHERE ir2.source_id = s.id
AND ir2.started_at >= NOW() - INTERVAL '24 hours'
) stats ON TRUE
{where}
ORDER BY c.ticker, s.source_type""",
*params,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/admin/sources/{source_id}/runs")
async def get_source_runs(
source_id: str,
limit: int = Query(default=20, le=100),
offset: int = 0,
):
"""Recent ingestion runs for a specific source."""
rows = await pool.fetch(
"""SELECT id, source_id, company_id, source_type, status,
started_at, completed_at, items_fetched, items_new,
error_message, retry_count, next_retry_at
FROM ingestion_runs
WHERE source_id = $1
ORDER BY started_at DESC
LIMIT $2 OFFSET $3""",
source_id, limit, offset,
)
return [_row_to_dict(r) for r in rows]
@app.put("/api/admin/sources/{source_id}/toggle")
async def toggle_source(source_id: str, active: bool = True):
"""Enable or disable a source."""
row = await pool.fetchrow(
"""UPDATE sources SET active = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, source_type, source_name, active""",
source_id, active,
)
if not row:
raise HTTPException(404, "Source not found")
return _row_to_dict(row)
@app.put("/api/admin/sources/{source_id}/credibility")
async def update_source_credibility(source_id: str, credibility_score: float = Query(ge=0.0, le=1.0)):
"""Update a source's credibility score."""
row = await pool.fetchrow(
"""UPDATE sources SET credibility_score = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, source_type, source_name, credibility_score""",
source_id, credibility_score,
)
if not row:
raise HTTPException(404, "Source not found")
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Admin: Symbol Configs (Requirement 11.1 - symbol configs)
# ---------------------------------------------------------------------------
@app.put("/api/admin/companies/{company_id}/toggle")
async def toggle_company(company_id: str, active: bool = True):
"""Enable or disable a tracked company."""
row = await pool.fetchrow(
"""UPDATE companies SET active = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, active""",
company_id, active,
)
if not row:
raise HTTPException(404, "Company not found")
return _row_to_dict(row)
@app.put("/api/admin/companies/{company_id}/sector")
async def update_company_sector(
company_id: str,
sector: str = Query(...),
industry: Optional[str] = None,
):
"""Update a company's sector and industry classification."""
if industry is not None:
row = await pool.fetchrow(
"""UPDATE companies SET sector = $2, industry = $3, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, sector, industry""",
company_id, sector, industry,
)
else:
row = await pool.fetchrow(
"""UPDATE companies SET sector = $2, updated_at = NOW()
WHERE id = $1
RETURNING id, ticker, legal_name, sector, industry""",
company_id, sector,
)
if not row:
raise HTTPException(404, "Company not found")
return _row_to_dict(row)
@app.get("/api/admin/companies/coverage")
async def get_symbol_coverage():
"""Overview of source coverage per active company.
Shows how many active sources of each type are configured per symbol,
useful for identifying coverage gaps.
"""
rows = await pool.fetch(
"""SELECT c.id AS company_id, c.ticker, c.legal_name, c.sector,
c.active,
COUNT(s.id) FILTER (WHERE s.active) AS active_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'market_api' AND s.active) AS market_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'news_api' AND s.active) AS news_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'filings_api' AND s.active) AS filings_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'web_scrape' AND s.active) AS web_scrape_sources,
COUNT(s.id) FILTER (WHERE s.source_type = 'broker' AND s.active) AS broker_sources
FROM companies c
LEFT JOIN sources s ON s.company_id = c.id
WHERE c.active = TRUE
GROUP BY c.id, c.ticker, c.legal_name, c.sector, c.active
ORDER BY c.ticker""",
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Admin: Trading Mode (Requirement 8.1, 8.2, 11.1)
# ---------------------------------------------------------------------------
@app.get("/api/admin/trading/config")
async def get_trading_config():
"""Get the current active risk/trading configuration."""
row = await pool.fetchrow(
"""SELECT id, name, trading_mode, config, active, created_at, updated_at
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if not row:
return {"trading_mode": "paper", "config": {}, "message": "No active config found, using defaults"}
result = _row_to_dict(row)
result["config"] = _parse_jsonb(result.get("config"))
return result
@app.put("/api/admin/trading/mode")
async def set_trading_mode(mode: str = Query(..., pattern="^(paper|live|disabled)$")):
"""Switch the active trading mode.
Requirement 8.1: support paper and live as separate execution environments.
Requirement 8.2: live mode requires operator approval controls.
"""
row = await pool.fetchrow(
"""UPDATE risk_configs SET trading_mode = $1, updated_at = NOW()
WHERE active = TRUE
RETURNING id, name, trading_mode""",
mode,
)
if not row:
# No active config exists yet — create one with the requested mode
row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', $1, '{}', TRUE)
RETURNING id, name, trading_mode""",
mode,
)
return _row_to_dict(row)
@app.put("/api/admin/trading/config")
async def update_trading_config(config: dict[str, Any]):
"""Update the active risk configuration JSON.
Accepts a partial or full risk config object. The config is stored
as JSONB alongside the trading_mode in risk_configs.
"""
config_json = json.dumps(config)
row = await pool.fetchrow(
"""UPDATE risk_configs SET config = $1::jsonb, updated_at = NOW()
WHERE active = TRUE
RETURNING id, name, trading_mode, config""",
config_json,
)
if not row:
row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', 'paper', $1::jsonb, TRUE)
RETURNING id, name, trading_mode, config""",
config_json,
)
result = _row_to_dict(row)
result["config"] = _parse_jsonb(result.get("config"))
return result
@app.get("/api/admin/trading/approvals")
async def list_pending_approvals():
"""List pending operator approval requests for live trading orders."""
rows = await pool.fetch(
"""SELECT id, order_job, recommendation_id, ticker, side, quantity,
estimated_value, status, risk_evaluation_id, requested_by,
reviewed_by, review_note, expires_at, requested_at, reviewed_at
FROM operator_approvals
WHERE status = 'pending'
ORDER BY requested_at ASC""",
)
results = []
for r in rows:
d = _row_to_dict(r)
d["order_job"] = _parse_jsonb(d.get("order_job"))
results.append(d)
return results
@app.put("/api/admin/trading/approvals/{approval_id}")
async def review_approval_request(
approval_id: str,
approved: bool = Query(...),
reviewed_by: str = "operator",
review_note: str = "",
):
"""Approve or reject a pending operator approval request.
Requirement 8.2: live orders require operator approval controls.
"""
now = datetime.now(timezone.utc)
new_status = "approved" if approved else "rejected"
row = await pool.fetchrow(
"""UPDATE operator_approvals
SET status = $2, reviewed_by = $3, review_note = $4,
reviewed_at = $5, updated_at = NOW()
WHERE id = $1::uuid AND status = 'pending'
RETURNING id, ticker, status, reviewed_by, order_job""",
approval_id, new_status, reviewed_by, review_note, now,
)
if not row:
raise HTTPException(404, "Approval not found or no longer pending")
# Re-enqueue approved orders to the broker queue for execution
if approved and rds:
order_job = row["order_job"]
if isinstance(order_job, str):
job_payload = order_job
else:
job_payload = json.dumps(order_job)
await rds.rpush(queue_key(QUEUE_BROKER), job_payload)
result = _row_to_dict(row)
result.pop("order_job", None)
return result
@app.get("/api/admin/trading/lockouts")
async def list_active_lockouts():
"""List active symbol lockouts (news-shock, cooldown)."""
rows = await pool.fetch(
"""SELECT id, ticker, lockout_type, reason, expires_at, created_at
FROM symbol_lockouts
WHERE expires_at > NOW()
ORDER BY expires_at ASC""",
)
return [_row_to_dict(r) for r in rows]
@app.post("/api/admin/trading/lockouts")
async def create_lockout(body: dict[str, Any]):
"""Create a manual symbol lockout.
Accepts: { ticker: string, reason: string, duration_minutes: int, lockout_type?: string }
Computes expires_at = NOW() + duration_minutes.
Defaults lockout_type to "manual".
"""
ticker = body.get("ticker")
reason = body.get("reason")
duration_minutes = body.get("duration_minutes")
if not ticker or not isinstance(ticker, str):
raise HTTPException(400, "ticker is required and must be a string")
if not reason or not isinstance(reason, str):
raise HTTPException(400, "reason is required and must be a string")
if duration_minutes is None or not isinstance(duration_minutes, (int, float)) or duration_minutes <= 0:
raise HTTPException(400, "duration_minutes is required and must be a positive number")
lockout_type = body.get("lockout_type", "manual")
duration = timedelta(minutes=int(duration_minutes))
row = await pool.fetchrow(
"""INSERT INTO symbol_lockouts (ticker, lockout_type, reason, expires_at)
VALUES ($1, $2, $3, NOW() + $4::interval)
RETURNING id, ticker, lockout_type, reason, expires_at, created_at""",
ticker.upper(), lockout_type, reason, duration,
)
return _row_to_dict(row)
@app.delete("/api/admin/trading/lockouts/{lockout_id}")
async def delete_lockout(lockout_id: str):
"""Delete a symbol lockout by ID, allowing early removal."""
result = await pool.execute(
"DELETE FROM symbol_lockouts WHERE id = $1::uuid", lockout_id,
)
if result == "DELETE 0":
raise HTTPException(404, "Lockout not found")
return {"status": "deleted"}
@app.get("/api/admin/trading/approval-config")
async def get_approval_config():
"""Get the current operator approval settings from the active risk config.
Reads the active risk_configs row, parses the config JSONB with
PortfolioRiskConfig.from_db_json() to fill defaults for missing fields,
and returns the operator_approval sub-object.
"""
row = await pool.fetchrow(
"""SELECT config
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
config_json = _parse_jsonb(row["config"]) if row else {}
if not isinstance(config_json, dict):
config_json = {}
risk_config = PortfolioRiskConfig.from_db_json(config_json)
approval = risk_config.operator_approval
return {
"auto_approve_paper": approval.auto_approve_paper,
"require_approval_for_live": approval.require_approval_for_live,
"approval_timeout_minutes": approval.approval_timeout_minutes,
}
@app.put("/api/admin/trading/approval-config")
async def update_approval_config(body: dict[str, Any]):
"""Update operator approval settings in the active risk config.
Reads the current config JSONB, merges the operator_approval sub-object,
and writes back the full config. This preserves all other config fields
(position_limits, sector_exposure, etc.) while only updating approval settings.
Accepts: { auto_approve_paper: bool, require_approval_for_live?: bool, approval_timeout_minutes?: int }
Bug fix: This endpoint allows the operator to set auto_approve_paper=False,
which was previously impossible because no UI/endpoint wrote operator_approval
into the risk config JSON.
"""
# Read current config
row = await pool.fetchrow(
"""SELECT config
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
current_config = _parse_jsonb(row["config"]) if row else {}
if not isinstance(current_config, dict):
current_config = {}
# Build the operator_approval sub-object by merging with existing values
existing_approval = current_config.get("operator_approval", {})
if not isinstance(existing_approval, dict):
existing_approval = {}
# Only update fields that are provided in the request body
allowed_fields = {"auto_approve_paper", "require_approval_for_live", "approval_timeout_minutes"}
for field in allowed_fields:
if field in body:
existing_approval[field] = body[field]
# Merge back into the full config (preserves all other config fields)
current_config["operator_approval"] = existing_approval
config_json = json.dumps(current_config)
# Write back
updated_row = await pool.fetchrow(
"""UPDATE risk_configs SET config = $1::jsonb, updated_at = NOW()
WHERE active = TRUE
RETURNING id, name, trading_mode, config""",
config_json,
)
if not updated_row:
updated_row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', 'paper', $1::jsonb, TRUE)
RETURNING id, name, trading_mode, config""",
config_json,
)
# Return the updated approval settings
updated_config = _parse_jsonb(updated_row["config"]) if updated_row else current_config
if not isinstance(updated_config, dict):
updated_config = current_config
risk_config = PortfolioRiskConfig.from_db_json(updated_config)
approval = risk_config.operator_approval
return {
"auto_approve_paper": approval.auto_approve_paper,
"require_approval_for_live": approval.require_approval_for_live,
"approval_timeout_minutes": approval.approval_timeout_minutes,
}
# ---------------------------------------------------------------------------
# Operational Dashboard (Requirement 12.1, 12.2, 12.3)
# ---------------------------------------------------------------------------
@app.get("/api/ops/ingestion/throughput")
async def get_ingestion_throughput(
hours: int = Query(default=24, ge=1, le=168),
bucket: str = Query(default="1h", pattern="^(15m|1h|6h|1d)$"),
):
"""Ingestion throughput over time, bucketed by interval.
Returns document counts and item counts per time bucket, broken down
by source type. Powers the ingestion throughput chart.
Requirements: 12.1, 12.3
"""
bucket_interval = {
"15m": "15 minutes",
"1h": "1 hour",
"6h": "6 hours",
"1d": "1 day",
}[bucket]
rows = await pool.fetch(
f"""SELECT
date_trunc('hour', ir.started_at)
- (EXTRACT(minute FROM ir.started_at)::int
% EXTRACT(epoch FROM INTERVAL '{bucket_interval}')::int / 60)
* INTERVAL '1 minute' AS bucket_start,
ir.source_type,
COUNT(*) AS run_count,
COUNT(*) FILTER (WHERE ir.status = 'completed') AS completed,
COUNT(*) FILTER (WHERE ir.status = 'failed') AS failed,
COALESCE(SUM(ir.items_fetched), 0) AS items_fetched,
COALESCE(SUM(ir.items_new), 0) AS items_new
FROM ingestion_runs ir
WHERE ir.started_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY bucket_start, ir.source_type
ORDER BY bucket_start DESC, ir.source_type""",
hours,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/ops/ingestion/summary")
async def get_ingestion_summary(
hours: int = Query(default=24, ge=1, le=168),
):
"""High-level ingestion summary for the operational dashboard.
Returns total runs, success/failure counts, items processed, and
per-source-type breakdown for the given time window.
Requirements: 12.1
"""
row = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_runs,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COUNT(*) FILTER (WHERE status = 'pending') AS pending,
COUNT(*) FILTER (WHERE status = 'running') AS running,
COALESCE(SUM(items_fetched), 0) AS total_items_fetched,
COALESCE(SUM(items_new), 0) AS total_items_new,
COUNT(DISTINCT source_id) AS active_sources,
COUNT(DISTINCT company_id) AS active_companies
FROM ingestion_runs
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
by_type = await pool.fetch(
"""SELECT
source_type,
COUNT(*) AS runs,
COUNT(*) FILTER (WHERE status = 'completed') AS completed,
COUNT(*) FILTER (WHERE status = 'failed') AS failed,
COALESCE(SUM(items_fetched), 0) AS items_fetched,
COALESCE(SUM(items_new), 0) AS items_new
FROM ingestion_runs
WHERE started_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY source_type
ORDER BY runs DESC""",
hours,
)
result = _row_to_dict(row) if row else {}
result["by_source_type"] = [_row_to_dict(r) for r in by_type]
result["hours"] = hours
return result
@app.get("/api/ops/model/failures")
async def get_model_failures(
hours: int = Query(default=24, ge=1, le=168),
limit: int = Query(default=50, le=200),
):
"""Recent model extraction failures with error details.
Returns individual failed extraction attempts for debugging.
Requirements: 12.2
"""
rows = await pool.fetch(
"""SELECT
mpm.id, mpm.document_id, mpm.ticker, mpm.model_name,
mpm.prompt_version, mpm.schema_version,
mpm.attempt_count, mpm.total_duration_ms,
mpm.validation_status, mpm.validation_error_count,
mpm.validation_errors, mpm.retry_count,
mpm.confidence, mpm.recorded_at,
d.title AS document_title, d.document_type, d.source_type
FROM model_performance_metrics mpm
LEFT JOIN documents d ON d.id = mpm.document_id
WHERE mpm.success = FALSE
AND mpm.recorded_at >= NOW() - INTERVAL '1 hour' * $1
ORDER BY mpm.recorded_at DESC
LIMIT $2""",
hours, limit,
)
results = []
for r in rows:
d = _row_to_dict(r)
d["validation_errors"] = _parse_jsonb(d.get("validation_errors"))
results.append(d)
return results
@app.get("/api/ops/model/performance")
async def get_model_performance(
hours: int = Query(default=24, ge=1, le=168),
model_name: Optional[str] = None,
):
"""Aggregated model performance metrics for the operational dashboard.
Returns success rate, latency percentiles, retry rate, confidence
distribution, and token usage for the given time window.
Requirements: 12.2
"""
return await get_model_performance_summary(
pool,
model_name=model_name,
hours=hours,
)
@app.get("/api/ops/pipeline/health")
async def get_pipeline_health(
hours: int = Query(default=24, ge=1, le=168),
):
"""Pipeline stage health summary across ingestion, parsing, extraction, and aggregation.
Shows document counts at each processing stage and identifies bottlenecks.
Requirements: 12.1
"""
# Document status distribution (pipeline stages)
doc_stages = await pool.fetch(
"""SELECT
status,
COUNT(*) AS doc_count
FROM documents
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
GROUP BY status
ORDER BY doc_count DESC""",
hours,
)
# Parsing quality distribution
parse_quality = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_parsed,
COUNT(*) FILTER (WHERE parse_confidence = 'high') AS high_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'medium') AS medium_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'low') AS low_confidence,
COUNT(*) FILTER (WHERE parse_confidence = 'unknown' OR parse_confidence IS NULL) AS unknown_confidence,
ROUND(AVG(parse_quality_score)::numeric, 3) AS avg_quality_score
FROM documents
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1
AND status IN ('parsed', 'extracted', 'aggregated')""",
hours,
)
# Extraction validation distribution
extraction_stats = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_extractions,
COUNT(*) FILTER (WHERE validation_status = 'valid') AS valid,
COUNT(*) FILTER (WHERE validation_status = 'failed') AS failed,
COUNT(*) FILTER (WHERE validation_status = 'pending') AS pending,
ROUND(AVG(confidence)::numeric, 3) AS avg_confidence,
ROUND(AVG(retry_count)::numeric, 2) AS avg_retries
FROM document_intelligence
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
# Aggregation output (trend windows generated)
trend_stats = await pool.fetchrow(
"""SELECT
COUNT(*) AS trends_generated,
COUNT(DISTINCT entity_id) AS symbols_covered,
ROUND(AVG(confidence)::numeric, 3) AS avg_trend_confidence,
ROUND(AVG(contradiction_score)::numeric, 3) AS avg_contradiction
FROM trend_windows
WHERE created_at >= NOW() - INTERVAL '1 hour' * $1""",
hours,
)
# Queue depths from Redis
queue_depths: dict[str, int] = {}
if rds:
for qname in (
"ingestion", "parsing", "extraction", "macro_classification",
"aggregation", "recommendation", "lake_publish",
"trade", "trading_decisions", "broker_orders",
):
try:
depth = await rds.llen(queue_key(qname))
queue_depths[qname] = depth
except Exception:
queue_depths[qname] = -1
# Also check dead-letter queues
for qname in (
"ingestion", "parsing", "extraction", "aggregation",
"recommendation", "broker_orders",
):
try:
depth = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}")
if depth > 0:
queue_depths[f"dlq:{qname}"] = depth
except Exception:
pass
# Pipeline enabled flag
pipeline_flag = await rds.get(_PIPELINE_ENABLED_KEY) if rds else None
pipeline_enabled = pipeline_flag != "0" if pipeline_flag is not None else True
return {
"hours": hours,
"pipeline_enabled": pipeline_enabled,
"document_stages": [_row_to_dict(r) for r in doc_stages],
"parsing": _row_to_dict(parse_quality) if parse_quality else {},
"extraction": _row_to_dict(extraction_stats) if extraction_stats else {},
"aggregation": _row_to_dict(trend_stats) if trend_stats else {},
"queue_depths": queue_depths,
}
# ---------------------------------------------------------------------------
# SSE: Live Pipeline Stream
# ---------------------------------------------------------------------------
PIPELINE_QUEUES = (
"ingestion", "parsing", "extraction", "macro_classification",
"aggregation", "recommendation", "lake_publish",
"trade", "trading_decisions", "broker_orders",
)
PIPELINE_DLQS = (
"ingestion", "parsing", "extraction", "aggregation",
"recommendation", "broker_orders",
)
@app.get("/api/ops/pipeline/stream")
async def pipeline_stream(request: Request):
"""Server-Sent Events stream of live pipeline status.
Pushes queue depths and document stage counts every 3 seconds.
The browser can consume this with EventSource for real-time updates
without polling.
"""
async def event_generator():
while True:
# Check if client disconnected
if await request.is_disconnected():
break
data: dict[str, Any] = {}
# Queue depths
depths: dict[str, int] = {}
if rds:
for qname in PIPELINE_QUEUES:
try:
depths[qname] = await rds.llen(queue_key(qname))
except Exception:
depths[qname] = -1
for qname in PIPELINE_DLQS:
try:
d = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}")
if d > 0:
depths[f"dlq:{qname}"] = d
except Exception:
pass
data["queue_depths"] = depths
# Document stage counts (lightweight query)
try:
stages = await pool.fetch(
"SELECT status, count(*) AS doc_count FROM documents GROUP BY status"
)
data["document_stages"] = {r["status"]: r["doc_count"] for r in stages}
except Exception:
data["document_stages"] = {}
yield f"data: {json.dumps(data)}\n\n"
await asyncio.sleep(3)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/ops/pipeline/retry-failed")
async def retry_failed_extractions_endpoint():
"""Re-enqueue documents stuck in extraction_failed for another attempt.
Resets up to 200 extraction_failed documents back to 'parsed',
deletes their failed intelligence rows, and pushes them onto the
extraction queue. Returns the count of documents re-enqueued.
"""
rows = await pool.fetch(
"""SELECT d.id, d.document_type, dcm.ticker
FROM documents d
LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id
WHERE d.status = 'extraction_failed'
ORDER BY d.updated_at ASC
LIMIT 200""",
)
if not rows:
return {"retried": 0, "message": "No extraction-failed documents to retry"}
doc_ids = []
enqueued_set_prefix = f"{QUEUE_PREFIX}:enqueued"
for row in rows:
doc_type = row["document_type"]
if doc_type == "macro_event":
target = queue_key("macro_classification")
else:
target = queue_key("extraction")
doc_id = str(row["id"])
marker = f"{enqueued_set_prefix}:{doc_id}"
added = await rds.set(marker, "1", nx=True, ex=3600)
if added:
await rds.rpush(target, json.dumps({
"document_id": doc_id,
"ticker": row["ticker"] or "",
}))
doc_ids.append(row["id"])
# Delete failed intelligence rows so extractor starts fresh
await pool.execute(
"""DELETE FROM document_intelligence
WHERE document_id = ANY($1::uuid[])
AND validation_status = 'failed'""",
doc_ids,
)
# Reset status to 'parsed' and touch updated_at
await pool.execute(
"""UPDATE documents
SET status = 'parsed', updated_at = NOW()
WHERE id = ANY($1::uuid[])""",
doc_ids,
)
return {"retried": len(doc_ids), "message": f"Re-enqueued {len(doc_ids)} documents for extraction"}
# ---------------------------------------------------------------------------
# Pipeline On/Off Toggle
# ---------------------------------------------------------------------------
_PIPELINE_ENABLED_KEY = PIPELINE_ENABLED_KEY
@app.get("/api/ops/pipeline/toggle")
async def get_pipeline_toggle():
"""Get the current pipeline enabled/disabled state."""
val = await rds.get(_PIPELINE_ENABLED_KEY)
# Default to enabled if key doesn't exist
enabled = val != "0"
return {"pipeline_enabled": enabled}
@app.post("/api/ops/pipeline/toggle")
async def set_pipeline_toggle(body: dict[str, Any]):
"""Toggle the pipeline on or off.
Accepts: { "enabled": true/false }
Workers check this flag before processing jobs.
When disabling, optionally flush all pipeline queues so in-flight
work stops immediately.
"""
enabled = body.get("enabled", True)
flush = body.get("flush", not enabled) # default: flush when disabling
await rds.set(_PIPELINE_ENABLED_KEY, "1" if enabled else "0")
flushed_counts: dict[str, int] = {}
if flush and not enabled:
from services.shared.redis_keys import QUEUE_PREFIX
# Flush all pipeline queues
queue_names = [
"ingestion", "parsing", "extraction", "macro_classification",
"aggregation", "recommendation", "lake_publish",
]
for qname in queue_names:
qkey = f"{QUEUE_PREFIX}:{qname}"
count = await rds.llen(qkey)
if count > 0:
await rds.delete(qkey)
flushed_counts[qname] = count
msg = f"Pipeline {'enabled' if enabled else 'disabled'}"
if flushed_counts:
total = sum(flushed_counts.values())
msg += f" — flushed {total} queued jobs"
return {"pipeline_enabled": enabled, "flushed": flushed_counts, "message": msg}
@app.get("/api/ops/sources/coverage-gaps")
async def get_source_coverage_gaps():
"""Identify symbols with missing or insufficient source coverage.
Returns companies that lack one or more expected source types
(market_api, news_api, filings_api), or have sources that haven't
produced successful ingestion runs recently.
Requirements: 12.3
"""
# Companies missing expected source types
missing_types = await pool.fetch(
"""SELECT
c.id AS company_id, c.ticker, c.legal_name, c.sector,
ARRAY_AGG(DISTINCT s.source_type) FILTER (WHERE s.active) AS active_types,
ARRAY['market_api', 'news_api', 'filings_api'] AS expected_types
FROM companies c
LEFT JOIN sources s ON s.company_id = c.id AND s.active = TRUE
WHERE c.active = TRUE
GROUP BY c.id, c.ticker, c.legal_name, c.sector
HAVING NOT ARRAY['market_api', 'news_api', 'filings_api']::text[] <@ ARRAY_AGG(DISTINCT s.source_type::text) FILTER (WHERE s.active)
OR ARRAY_AGG(DISTINCT s.source_type::text) FILTER (WHERE s.active) IS NULL
ORDER BY c.ticker""",
)
# Sources with no successful runs in the last 24 hours
stale_sources = await pool.fetch(
"""SELECT
s.id AS source_id, s.source_type, s.source_name,
c.ticker, c.legal_name,
MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') AS last_success,
MAX(ir.started_at) AS last_attempt,
COUNT(*) FILTER (WHERE ir.status = 'failed'
AND ir.started_at >= NOW() - INTERVAL '24 hours') AS recent_failures
FROM sources s
JOIN companies c ON c.id = s.company_id
LEFT JOIN ingestion_runs ir ON ir.source_id = s.id
WHERE s.active = TRUE AND c.active = TRUE
GROUP BY s.id, s.source_type, s.source_name, c.ticker, c.legal_name
HAVING MAX(ir.started_at) FILTER (WHERE ir.status = 'completed')
< NOW() - INTERVAL '24 hours'
OR MAX(ir.started_at) FILTER (WHERE ir.status = 'completed') IS NULL
ORDER BY c.ticker, s.source_type""",
)
return {
"missing_source_types": [_row_to_dict(r) for r in missing_types],
"stale_sources": [_row_to_dict(r) for r in stale_sources],
}
# ---------------------------------------------------------------------------
# System: Rate Limit Info
# ---------------------------------------------------------------------------
@app.get("/api/system/rate-limits")
async def get_rate_limits():
"""Return current rate limit configuration and usage.
Exposes the scheduler's rate limits so the frontend can calculate
how many tickers can be refreshed within the configured cadence.
"""
from services.scheduler.app import (
DEFAULT_CADENCES,
DEFAULT_RATE_LIMITS,
POLYGON_GLOBAL_RATE_LIMIT,
POLYGON_SOURCE_TYPES,
)
# Count active market_api sources to report current load
market_count = await pool.fetchval(
"SELECT count(*) FROM sources WHERE active = TRUE AND source_type = 'market_api'"
)
news_count = await pool.fetchval(
"SELECT count(*) FROM sources WHERE active = TRUE AND source_type = 'news_api'"
)
market_cadence = DEFAULT_CADENCES.get("market_api", 300)
market_rate = DEFAULT_RATE_LIMITS.get("market_api", 20)
# How many tickers can we refresh within one cadence window?
# cadence_minutes * rate_per_minute = max tickers per cycle
cadence_minutes = market_cadence / 60
max_tickers_per_cycle = int(cadence_minutes * market_rate)
return {
"polygon_global_limit": POLYGON_GLOBAL_RATE_LIMIT,
"polygon_source_types": sorted(POLYGON_SOURCE_TYPES),
"per_type_limits": DEFAULT_RATE_LIMITS,
"cadences_seconds": DEFAULT_CADENCES,
"market_api": {
"rate_per_minute": market_rate,
"cadence_seconds": market_cadence,
"max_tickers_per_cycle": max_tickers_per_cycle,
"active_sources": market_count,
},
"news_api": {
"rate_per_minute": DEFAULT_RATE_LIMITS.get("news_api", 20),
"cadence_seconds": DEFAULT_CADENCES.get("news_api", 300),
"active_sources": news_count,
},
}
# ---------------------------------------------------------------------------
# Analytics: Trino SQL Proxy (Requirement 10.1, 10.3, 13.7)
# ---------------------------------------------------------------------------
@app.post("/api/analytics/query")
async def analytics_query(body: dict[str, Any]):
"""Proxy SQL to Trino, enforce row limits, return structured results.
Design: Section 9.3 (API proxy for Trino)
Requirements: 10.1, 10.3, 13.7
"""
sql = body.get("sql", "").strip()
if not sql:
raise HTTPException(400, "sql is required")
limit = min(int(body.get("limit", 1000)), 10000)
trino_host = config.trino.host
trino_port = config.trino.port
trino_catalog = config.trino.catalog
trino_schema = config.trino.schema
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
headers = {
"X-Trino-User": "stonks-dashboard",
"X-Trino-Catalog": trino_catalog,
"X-Trino-Schema": trino_schema,
}
start = _time.monotonic()
try:
async with httpx.AsyncClient(timeout=60.0) as client:
# Submit query
resp = await client.post(trino_url, content=sql, headers=headers)
if resp.status_code != 200:
raise HTTPException(502, f"Trino error: {resp.text[:500]}")
result = resp.json()
columns: list[dict[str, str]] = []
all_rows: list[list[Any]] = []
# Extract columns from first response
if "columns" in result:
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
if "data" in result:
all_rows.extend(result["data"])
# Follow nextUri to get all results
while "nextUri" in result and len(all_rows) < limit:
next_url = result["nextUri"]
resp = await client.get(next_url, headers=headers)
if resp.status_code != 200:
break
result = resp.json()
if "columns" in result and not columns:
columns = [{"name": c["name"], "type": c.get("type", "unknown")} for c in result["columns"]]
if "data" in result:
all_rows.extend(result["data"])
elapsed_ms = round((_time.monotonic() - start) * 1000)
all_rows = all_rows[:limit]
return {
"columns": columns,
"rows": all_rows,
"row_count": len(all_rows),
"elapsed_ms": elapsed_ms,
}
except httpx.ConnectError:
raise HTTPException(502, "Cannot connect to Trino")
except httpx.TimeoutException:
raise HTTPException(504, "Trino query timed out")
@app.get("/api/analytics/schema")
async def analytics_schema():
"""Return Trino catalog/schema/table/column metadata for the schema browser.
Requirements: 13.7
"""
trino_host = config.trino.host
trino_port = config.trino.port
trino_catalog = config.trino.catalog
trino_schema = config.trino.schema
trino_url = f"http://{trino_host}:{trino_port}/v1/statement"
headers = {
"X-Trino-User": "stonks-dashboard",
"X-Trino-Catalog": trino_catalog,
"X-Trino-Schema": trino_schema,
}
async def _run_trino_query(sql: str) -> list[list[Any]]:
rows: list[list[Any]] = []
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(trino_url, content=sql, headers=headers)
if resp.status_code != 200:
return rows
result = resp.json()
if "data" in result:
rows.extend(result["data"])
while "nextUri" in result:
resp = await client.get(result["nextUri"], headers=headers)
if resp.status_code != 200:
break
result = resp.json()
if "data" in result:
rows.extend(result["data"])
return rows
try:
# Get tables
table_rows = await _run_trino_query(
f"SELECT table_name FROM information_schema.tables WHERE table_schema = '{trino_schema}' ORDER BY table_name"
)
tables = []
for tr in table_rows:
table_name = tr[0] if tr else None
if not table_name:
continue
# Get columns for each table
col_rows = await _run_trino_query(
f"SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = '{trino_schema}' AND table_name = '{table_name}' ORDER BY ordinal_position"
)
columns = [{"name": cr[0], "type": cr[1]} for cr in col_rows if cr]
tables.append({"name": table_name, "columns": columns})
return {
"catalog": trino_catalog,
"schema": trino_schema,
"tables": tables,
}
except Exception:
return {"catalog": trino_catalog, "schema": trino_schema, "tables": []}
# ---------------------------------------------------------------------------
# Analytics: PostgreSQL Direct Query (Schema browser + read-only SQL)
# ---------------------------------------------------------------------------
@app.get("/api/analytics/pg-schema")
async def pg_schema():
"""Return PostgreSQL table/column metadata for the schema browser."""
# Columns with ordinal position
col_rows = await pool.fetch("""
SELECT t.table_name, c.column_name, c.data_type, c.is_nullable,
c.column_default
FROM information_schema.tables t
JOIN information_schema.columns c
ON t.table_name = c.table_name AND t.table_schema = c.table_schema
WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE'
ORDER BY t.table_name, c.ordinal_position
""")
# Primary key columns
pk_rows = await pool.fetch("""
SELECT kcu.table_name, kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_schema = 'public' AND tc.constraint_type = 'PRIMARY KEY'
""")
pk_set: set[tuple[str, str]] = {(r["table_name"], r["column_name"]) for r in pk_rows}
# Foreign key columns with referenced table
fk_rows = await pool.fetch("""
SELECT kcu.table_name, kcu.column_name,
ccu.table_name AS foreign_table
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
AND tc.table_schema = ccu.table_schema
WHERE tc.table_schema = 'public' AND tc.constraint_type = 'FOREIGN KEY'
""")
fk_map: dict[tuple[str, str], str] = {
(r["table_name"], r["column_name"]): r["foreign_table"] for r in fk_rows
}
# Approximate row counts from pg_stat
count_rows = await pool.fetch("""
SELECT relname AS table_name, reltuples::bigint AS row_estimate
FROM pg_class
WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
AND relkind = 'r'
""")
row_counts: dict[str, int] = {r["table_name"]: max(0, r["row_estimate"]) for r in count_rows}
tables: dict[str, dict[str, Any]] = {}
for row in col_rows:
tname = row["table_name"]
if tname not in tables:
tables[tname] = {"name": tname, "row_estimate": row_counts.get(tname, 0), "columns": []}
col_info: dict[str, Any] = {
"name": row["column_name"],
"type": row["data_type"],
"nullable": row["is_nullable"] == "YES",
}
if (tname, row["column_name"]) in pk_set:
col_info["primary_key"] = True
fk_ref = fk_map.get((tname, row["column_name"]))
if fk_ref:
col_info["references"] = fk_ref
if row["column_default"] is not None:
col_info["has_default"] = True
tables[tname]["columns"].append(col_info)
return {"catalog": "postgresql", "schema": "public", "tables": list(tables.values())}
@app.post("/api/analytics/pg-query")
async def pg_query(body: dict[str, Any]):
"""Run read-only SQL against PostgreSQL directly."""
sql = body.get("sql", "").strip()
if not sql:
raise HTTPException(400, "sql is required")
limit = min(int(body.get("limit", 1000)), 10000)
# Safety: only allow SELECT statements
# Strip SQL comments (-- and /* */) and whitespace before checking
stripped = re.sub(r'--[^\n]*', '', sql) # remove -- comments
stripped = re.sub(r'/\*.*?\*/', '', stripped, flags=re.DOTALL) # remove /* */ comments
stripped = stripped.strip()
if not stripped.upper().startswith("SELECT"):
raise HTTPException(400, "Only SELECT queries are allowed")
# Add LIMIT if not present
if "LIMIT" not in sql.upper():
sql = f"{sql} LIMIT {limit}"
start = _time.monotonic()
try:
rows = await pool.fetch(sql)
elapsed_ms = round((_time.monotonic() - start) * 1000)
columns = [{"name": k, "type": "text"} for k in rows[0].keys()] if rows else []
return {
"columns": columns,
"rows": [[str(v) for v in row.values()] for row in rows],
"row_count": len(rows),
"elapsed_ms": elapsed_ms,
}
except asyncpg.PostgresSyntaxError as exc:
raise HTTPException(400, f"SQL syntax error: {exc}")
except asyncpg.UndefinedTableError as exc:
raise HTTPException(400, f"Table not found: {exc}")
except asyncpg.PostgresError as exc:
raise HTTPException(400, f"Query error: {exc}")
# ---------------------------------------------------------------------------
# Analytics: Saved Queries (Requirement 13.7)
# ---------------------------------------------------------------------------
class SavedQueryBody(BaseModel):
name: str
description: str = ""
sql_text: str
@app.get("/api/analytics/saved-queries")
async def list_saved_queries():
"""List all saved queries."""
rows = await pool.fetch(
"SELECT id, name, description, sql_text, created_by, created_at, updated_at FROM saved_queries ORDER BY updated_at DESC"
)
return [_row_to_dict(r) for r in rows]
@app.post("/api/analytics/saved-queries", status_code=201)
async def create_saved_query(body: SavedQueryBody):
"""Save a new query."""
row = await pool.fetchrow(
"""INSERT INTO saved_queries (name, description, sql_text)
VALUES ($1, $2, $3)
RETURNING id, name, description, sql_text, created_by, created_at""",
body.name, body.description, body.sql_text,
)
return _row_to_dict(row)
@app.delete("/api/analytics/saved-queries/{query_id}")
async def delete_saved_query(query_id: str):
"""Delete a saved query."""
result = await pool.execute("DELETE FROM saved_queries WHERE id = $1::uuid", query_id)
if result == "DELETE 0":
raise HTTPException(404, "Query not found")
return {"status": "deleted"}
# ---------------------------------------------------------------------------
# Admin: Macro Signal Layer Toggle (Requirement 11.1, 11.5, 11.7)
# ---------------------------------------------------------------------------
class MacroToggleBody(BaseModel):
enabled: bool
operator: str = "operator"
@app.get("/api/admin/macro/status")
async def get_macro_status():
"""Return the current macro signal layer enabled/disabled state.
Reads from the active risk_configs row's JSONB config field.
Requirements: 11.1, 11.5
"""
row = await pool.fetchrow(
"""SELECT config->>'macro_enabled' AS macro_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if row is None or row["macro_enabled"] is None:
return {"macro_enabled": True, "source": "default"}
return {
"macro_enabled": row["macro_enabled"].lower() == "true",
"source": "risk_configs",
}
@app.put("/api/admin/macro/toggle")
async def toggle_macro_layer(body: MacroToggleBody):
"""Toggle the macro signal layer on or off.
Persists the new state into the active risk_configs row's JSONB config
and records an audit event with previous state, new state, and operator.
The toggle state is read from PostgreSQL at the start of each aggregation
cycle (no caching), so changes take effect on the next cycle.
Requirements: 11.1, 11.5, 11.7
"""
# Read current state
current_row = await pool.fetchrow(
"""SELECT id, config->>'macro_enabled' AS macro_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if current_row is None:
# No active config exists — create one
new_config = json.dumps({"macro_enabled": str(body.enabled).lower()})
current_row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', 'paper', $1::jsonb, TRUE)
RETURNING id, config->>'macro_enabled' AS macro_enabled""",
new_config,
)
previous_enabled = True # default was enabled
else:
prev_val = current_row["macro_enabled"]
previous_enabled = prev_val.lower() == "true" if prev_val else True
config_id = str(current_row["id"])
# Update the config JSONB to set macro_enabled
await pool.execute(
"""UPDATE risk_configs
SET config = config || $2::jsonb, updated_at = NOW()
WHERE id = $1""",
current_row["id"],
json.dumps({"macro_enabled": str(body.enabled).lower()}),
)
# Record audit event (Requirement 11.7)
await record_audit_event(
pool,
event_type="macro.layer_toggled",
entity_type="risk_config",
entity_id=config_id,
data={
"previous_enabled": previous_enabled,
"new_enabled": body.enabled,
},
actor=body.operator,
)
return {
"macro_enabled": body.enabled,
"previous_enabled": previous_enabled,
"toggled_by": body.operator,
}
# ---------------------------------------------------------------------------
# Macro Events and Impacts (Requirement 8.1, 8.2, 12.10)
# ---------------------------------------------------------------------------
@app.get("/api/macro/events")
async def list_macro_events(
severity: Optional[str] = None,
region: Optional[str] = None,
sector: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""List recent global events with filtering by severity, region, sector, date range.
Requirements: 8.1
"""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if severity:
conditions.append(f"ge.severity = ${idx}")
params.append(severity)
idx += 1
if region:
conditions.append(f"${idx} = ANY(ge.affected_regions)")
params.append(region)
idx += 1
if sector:
conditions.append(f"${idx} = ANY(ge.affected_sectors)")
params.append(sector)
idx += 1
if since:
conditions.append(f"ge.created_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
if until:
conditions.append(f"ge.created_at <= ${idx}::timestamptz")
params.append(until)
idx += 1
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
rows = await pool.fetch(
f"""SELECT ge.id, ge.event_types, ge.severity, ge.affected_regions,
ge.affected_sectors, ge.affected_commodities, ge.summary,
ge.key_facts, ge.estimated_duration, ge.confidence,
ge.source_document_id, ge.created_at
FROM global_events ge
{where}
ORDER BY ge.created_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
results = []
for r in rows:
d = _row_to_dict(r)
d["key_facts"] = _parse_jsonb(d.get("key_facts"))
results.append(d)
return results
@app.get("/api/macro/events/{event_id}")
async def get_macro_event(event_id: str):
"""Event detail with list of affected companies and their macro impact scores.
Requirements: 8.2
"""
row = await pool.fetchrow(
"""SELECT id, event_types, severity, affected_regions, affected_sectors,
affected_commodities, summary, key_facts, estimated_duration,
confidence, source_document_id, model_provider, model_name,
prompt_version, schema_version, created_at
FROM global_events WHERE id = $1""",
event_id,
)
if not row:
raise HTTPException(404, "Global event not found")
result = _row_to_dict(row)
result["key_facts"] = _parse_jsonb(result.get("key_facts"))
# Affected companies with macro impact scores
impacts = await pool.fetch(
"""SELECT mir.id, mir.company_id, mir.ticker, mir.macro_impact_score,
mir.impact_direction, mir.contributing_factors, mir.confidence,
mir.computed_at, c.legal_name, c.sector
FROM macro_impact_records mir
JOIN companies c ON c.id = mir.company_id
WHERE mir.event_id = $1
ORDER BY mir.macro_impact_score DESC""",
event_id,
)
impact_list = []
for imp in impacts:
imp_dict = _row_to_dict(imp)
imp_dict["contributing_factors"] = _parse_jsonb(imp_dict.get("contributing_factors"))
impact_list.append(imp_dict)
result["impacts"] = impact_list
return result
@app.get("/api/macro/impacts/{ticker}")
async def get_macro_impacts_for_ticker(
ticker: str,
since: Optional[str] = None,
limit: int = Query(default=50, le=200),
offset: int = 0,
):
"""Macro impacts and exposure profile for a specific company.
Returns { exposure_profile, impacts } matching the frontend
CompanyMacroImpacts type.
Requirements: 8.2
"""
ticker_upper = ticker.upper()
# Fetch exposure profile for this company
profile_row = await pool.fetchrow(
"""SELECT ep.id, ep.company_id, ep.geographic_revenue_mix,
ep.supply_chain_regions, ep.key_input_commodities,
ep.regulatory_jurisdictions, ep.market_position_tier,
ep.export_dependency_pct, ep.source, ep.confidence,
ep.version, ep.active, ep.created_at, ep.updated_at
FROM exposure_profiles ep
JOIN companies c ON c.id = ep.company_id
WHERE c.ticker = $1 AND ep.active = TRUE
ORDER BY ep.version DESC LIMIT 1""",
ticker_upper,
)
exposure_profile = _row_to_dict(profile_row) if profile_row else None
if exposure_profile:
exposure_profile["geographic_revenue_mix"] = _parse_jsonb(
exposure_profile.get("geographic_revenue_mix")
)
# Fetch macro impact records
conditions = ["mir.ticker = $1"]
params: list[Any] = [ticker_upper]
idx = 2
if since:
conditions.append(f"mir.computed_at >= ${idx}::timestamptz")
params.append(since)
idx += 1
where = " AND ".join(conditions)
rows = await pool.fetch(
f"""SELECT mir.id, mir.event_id, mir.company_id, mir.ticker,
mir.macro_impact_score, mir.impact_direction,
mir.contributing_factors, mir.confidence, mir.computed_at,
ge.summary AS event_summary, ge.severity AS event_severity,
ge.event_types AS event_types, ge.affected_regions
FROM macro_impact_records mir
JOIN global_events ge ON ge.id = mir.event_id
WHERE {where}
ORDER BY mir.computed_at DESC
LIMIT ${idx} OFFSET ${idx + 1}""",
*params, limit, offset,
)
impacts = []
for r in rows:
d = _row_to_dict(r)
d["contributing_factors"] = _parse_jsonb(d.get("contributing_factors"))
impacts.append(d)
return {
"exposure_profile": exposure_profile,
"impacts": impacts,
}
# ---------------------------------------------------------------------------
# Trend Projections (Requirement 12.10)
# ---------------------------------------------------------------------------
@app.get("/api/trends/{trend_id}/projection")
async def get_trend_projection(trend_id: str):
"""Trend projection for a specific trend window.
Requirements: 12.10
"""
# Verify trend exists
trend_row = await pool.fetchrow(
"SELECT id FROM trend_windows WHERE id = $1", trend_id,
)
if not trend_row:
raise HTTPException(404, "Trend not found")
row = await pool.fetchrow(
"""SELECT id, trend_window_id, projected_direction, projected_strength,
projected_confidence, projection_horizon, driving_factors,
macro_contribution_pct, diverges_from_current, computed_at
FROM trend_projections WHERE trend_window_id = $1
ORDER BY computed_at DESC LIMIT 1""",
trend_id,
)
if not row:
return {"trend_window_id": trend_id, "projection": None}
d = _row_to_dict(row)
d["driving_factors"] = _parse_jsonb(d.get("driving_factors"))
return d
# ---------------------------------------------------------------------------
# Competitive Layer Toggle (Requirements 6.1, 6.2, 6.3, 6.4, 6.5, 6.7)
# ---------------------------------------------------------------------------
class CompetitiveToggleBody(BaseModel):
enabled: bool
operator: str = "operator"
@app.get("/api/admin/competitive/status")
async def get_competitive_status():
"""Return the current competitive signal layer enabled/disabled state.
Reads from the active risk_configs row's JSONB config field.
Requirements: 6.1, 6.5
"""
row = await pool.fetchrow(
"""SELECT config->>'competitive_enabled' AS competitive_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if row is None or row["competitive_enabled"] is None:
return {"competitive_enabled": True, "source": "default"}
return {
"competitive_enabled": row["competitive_enabled"].lower() == "true",
"source": "risk_configs",
}
@app.put("/api/admin/competitive/toggle")
async def toggle_competitive_layer(body: CompetitiveToggleBody):
"""Toggle the competitive signal layer on or off.
Persists the new state into the active risk_configs row's JSONB config
and records an audit event with previous state, new state, and operator.
Toggle state is read from PostgreSQL at the start of each aggregation
cycle (no caching), so changes take effect on the next cycle.
When disabled, pattern mining remains queryable via API but signal
propagation is skipped during aggregation. When re-enabled, the engine
resumes computing signals using latest historical data including
intelligence ingested while disabled.
Requirements: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
"""
# Read current state
current_row = await pool.fetchrow(
"""SELECT id, config->>'competitive_enabled' AS competitive_enabled
FROM risk_configs
WHERE active = TRUE
ORDER BY updated_at DESC
LIMIT 1""",
)
if current_row is None:
# No active config exists — create one
new_config = json.dumps({"competitive_enabled": str(body.enabled).lower()})
current_row = await pool.fetchrow(
"""INSERT INTO risk_configs (name, trading_mode, config, active)
VALUES ('default', 'paper', $1::jsonb, TRUE)
RETURNING id, config->>'competitive_enabled' AS competitive_enabled""",
new_config,
)
previous_enabled = True # default was enabled
else:
prev_val = current_row["competitive_enabled"]
previous_enabled = prev_val.lower() == "true" if prev_val else True
config_id = str(current_row["id"])
# Update the config JSONB to set competitive_enabled
await pool.execute(
"""UPDATE risk_configs
SET config = config || $2::jsonb, updated_at = NOW()
WHERE id = $1""",
current_row["id"],
json.dumps({"competitive_enabled": str(body.enabled).lower()}),
)
# Record audit event (Requirement 6.7)
await record_audit_event(
pool,
event_type="competitive.layer_toggled",
entity_type="risk_config",
entity_id=config_id,
data={
"previous_enabled": previous_enabled,
"new_enabled": body.enabled,
},
actor=body.operator,
)
return {
"competitive_enabled": body.enabled,
"previous_enabled": previous_enabled,
"toggled_by": body.operator,
}
# ---------------------------------------------------------------------------
# Historical Pattern & Competitive Signal Query Endpoints
# (Requirements 10.1, 10.2, 10.3, 10.4, 11.4, 11.6)
# ---------------------------------------------------------------------------
def _pattern_to_dict(p) -> dict[str, Any]:
"""Convert a HistoricalPattern dataclass to a JSON-safe dict."""
d = asdict(p)
for key, val in d.items():
if isinstance(val, datetime):
d[key] = val.isoformat()
return d
@app.get("/api/patterns/{ticker}")
async def get_patterns_for_ticker(
ticker: str,
catalyst_type: Optional[str] = None,
time_horizon: Optional[str] = None,
):
"""Historical patterns for a company.
Filterable by catalyst_type and time_horizon.
Returns sample_count, outcome distribution, pattern_confidence,
and date range for each pattern.
Requirements: 10.1, 10.3
"""
horizons = [time_horizon] if time_horizon else None
if catalyst_type:
patterns = await find_self_patterns(pool, ticker, catalyst_type, horizons=horizons)
else:
# Query across all catalyst types present in the company's history
rows = await pool.fetch(
"""SELECT DISTINCT dir.catalyst_type
FROM document_impact_records dir
JOIN document_intelligence di ON di.id = dir.intelligence_id
JOIN documents d ON d.id = di.document_id
WHERE dir.ticker = $1
AND d.status != 'rejected'
AND dir.catalyst_type IS NOT NULL""",
ticker,
)
patterns = []
for row in rows:
ct = row["catalyst_type"]
patterns.extend(await find_self_patterns(pool, ticker, ct, horizons=horizons))
return {
"ticker": ticker,
"patterns": [_pattern_to_dict(p) for p in patterns],
"count": len(patterns),
}
@app.get("/api/patterns/{ticker}/competitors")
async def get_competitor_patterns(
ticker: str,
catalyst_type: Optional[str] = None,
time_horizon: Optional[str] = None,
):
"""Cross-company patterns showing how this company's catalysts affected competitors.
Requirements: 10.2, 10.3
"""
horizons = [time_horizon] if time_horizon else None
# Find active competitors for this ticker
comp_rows = await pool.fetch(
"""SELECT DISTINCT
CASE WHEN ca.ticker = $1 THEN cb.ticker ELSE ca.ticker END AS competitor_ticker
FROM competitor_relationships cr
JOIN companies ca ON ca.id = cr.company_a_id
JOIN companies cb ON cb.id = cr.company_b_id
WHERE cr.active = TRUE
AND (ca.ticker = $1 OR cb.ticker = $1)""",
ticker,
)
# Determine catalyst types to query
if catalyst_type:
catalyst_types = [catalyst_type]
else:
ct_rows = await pool.fetch(
"""SELECT DISTINCT di.catalyst_type
FROM document_impact_records dir
JOIN document_intelligence di ON di.document_id = dir.document_id
JOIN documents d ON d.id = dir.document_id
WHERE dir.ticker = $1
AND di.validation_status = 'valid'
AND d.status != 'rejected'
AND di.catalyst_type IS NOT NULL""",
ticker,
)
catalyst_types = [r["catalyst_type"] for r in ct_rows]
patterns = []
for comp_row in comp_rows:
comp_ticker = comp_row["competitor_ticker"]
for ct in catalyst_types:
cross = await find_cross_company_patterns(
pool, ticker, comp_ticker, ct, horizons=horizons,
)
patterns.extend(cross)
return {
"ticker": ticker,
"cross_company_patterns": [_pattern_to_dict(p) for p in patterns],
"count": len(patterns),
}
@app.get("/api/patterns/{ticker}/competitive-signals")
async def get_competitive_signals(ticker: str):
"""Recent competitive signals targeting this company.
Requirements: 10.4
"""
rows = await pool.fetch(
"""SELECT id, source_document_id, source_ticker, target_ticker,
catalyst_type, pattern_confidence, signal_direction,
signal_strength, relationship_strength, computed_at
FROM competitive_signal_records
WHERE target_ticker = $1
ORDER BY computed_at DESC
LIMIT 100""",
ticker,
)
return {
"ticker": ticker,
"competitive_signals": [_row_to_dict(r) for r in rows],
"count": len(rows),
}
@app.get("/api/patterns/{ticker}/decisions")
async def get_decision_history(
ticker: str,
time_horizon: Optional[str] = None,
):
"""Major corporate decision history with trend outcomes and pattern statistics.
Queries document_impact_records filtered by MAJOR_DECISION_CATALYSTS,
joined with trend_windows for outcome data.
Requirements: 11.4, 11.6
"""
major_types = list(MAJOR_DECISION_CATALYSTS)
horizons = [time_horizon] if time_horizon else None
# Fetch major decision records for this ticker
rows = await pool.fetch(
"""SELECT dir.id, dir.document_id, dir.ticker,
di.catalyst_type, di.summary,
dir.impact_score, dir.created_at,
d.published_at
FROM document_impact_records dir
JOIN document_intelligence di ON di.document_id = dir.document_id
JOIN documents d ON d.id = dir.document_id
WHERE dir.ticker = $1
AND di.validation_status = 'valid'
AND d.status != 'rejected'
AND di.catalyst_type = ANY($2)
ORDER BY dir.created_at DESC
LIMIT 50""",
ticker,
major_types,
)
decisions = []
for row in rows:
decision = _row_to_dict(row)
# Fetch pattern statistics for this catalyst type
ct = row["catalyst_type"]
patterns = await find_self_patterns(pool, ticker, ct, horizons=horizons)
decision["pattern_statistics"] = [_pattern_to_dict(p) for p in patterns]
decisions.append(decision)
return {
"ticker": ticker,
"decisions": decisions,
"count": len(decisions),
}
# ---------------------------------------------------------------------------
# AI Agents (Editable agent configurations + performance tracking)
# ---------------------------------------------------------------------------
class AgentUpdateBody(BaseModel):
name: Optional[str] = None
purpose: Optional[str] = None
model_provider: Optional[str] = None
model_name: Optional[str] = None
system_prompt: Optional[str] = None
user_prompt_template: Optional[str] = None
prompt_version: Optional[str] = None
schema_version: Optional[str] = None
temperature: Optional[float] = None
max_tokens: Optional[int] = None
timeout_seconds: Optional[int] = None
max_retries: Optional[int] = None
active: Optional[bool] = None
class AgentCreateBody(BaseModel):
name: str
slug: str | None = None
purpose: str = ""
model_provider: str = "ollama"
model_name: str = "llama3.1:8b"
system_prompt: str = ""
user_prompt_template: str = ""
prompt_version: str = ""
schema_version: str = "1.0.0"
temperature: float = 0.0
max_tokens: int = 32768
timeout_seconds: int = 120
max_retries: int = 2
# ---------------------------------------------------------------------------
# Variant Pydantic Models (Requirement 2, 3)
# ---------------------------------------------------------------------------
class VariantCreateBody(BaseModel):
variant_name: str
variant_slug: str | None = None
description: str = ""
model_provider: str = "ollama"
model_name: str
system_prompt: str = ""
user_prompt_template: str = ""
prompt_version: str = ""
temperature: float = 0.0
max_tokens: int = 32768
context_window: int = 0
input_token_limit: int = 0
token_budget: int = 0
timeout_seconds: int = 120
max_retries: int = 2
class VariantUpdateBody(BaseModel):
variant_name: str | None = None
description: str | None = None
model_provider: str | None = None
model_name: str | None = None
system_prompt: str | None = None
user_prompt_template: str | None = None
prompt_version: str | None = None
temperature: float | None = None
max_tokens: int | None = None
context_window: int | None = None
input_token_limit: int | None = None
token_budget: int | None = None
timeout_seconds: int | None = None
max_retries: int | None = None
class VariantCloneBody(BaseModel):
variant_name: str
variant_slug: str | None = None
description: str | None = None
model_provider: str | None = None
model_name: str | None = None
system_prompt: str | None = None
user_prompt_template: str | None = None
prompt_version: str | None = None
temperature: float | None = None
max_tokens: int | None = None
context_window: int | None = None
input_token_limit: int | None = None
token_budget: int | None = None
timeout_seconds: int | None = None
max_retries: int | None = None
def _slugify(name: str) -> str:
"""Generate a URL-safe slug from a variant name."""
slug = re.sub(r"[^a-z0-9]+", "-", name.lower())
return slug.strip("-")
@app.get("/api/agents")
async def list_agents(active_only: bool = False):
"""List all AI agent configurations."""
where = "WHERE active = TRUE" if active_only else ""
rows = await pool.fetch(
f"""SELECT id, name, slug, purpose, model_provider, model_name,
system_prompt, user_prompt_template, prompt_version,
schema_version, temperature, max_tokens, timeout_seconds,
max_retries, active, source, created_at, updated_at
FROM ai_agents {where}
ORDER BY source DESC, name ASC"""
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/agents/{agent_id}")
async def get_agent(agent_id: str):
"""Get a single agent configuration."""
row = await pool.fetchrow(
"""SELECT id, name, slug, purpose, model_provider, model_name,
system_prompt, user_prompt_template, prompt_version,
schema_version, temperature, max_tokens, timeout_seconds,
max_retries, active, source, created_at, updated_at
FROM ai_agents WHERE id = $1""",
agent_id,
)
if not row:
raise HTTPException(404, "Agent not found")
return _row_to_dict(row)
@app.post("/api/agents", status_code=201)
async def create_agent(body: AgentCreateBody):
"""Create a new user-defined agent."""
slug = body.slug or body.name.lower().replace(" ", "-").replace("_", "-")
row = await pool.fetchrow(
"""INSERT INTO ai_agents (
name, slug, purpose, model_provider, model_name,
system_prompt, user_prompt_template, prompt_version,
schema_version, temperature, max_tokens, timeout_seconds,
max_retries, source
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, 'user')
RETURNING id, name, slug, source, created_at""",
body.name, slug, body.purpose, body.model_provider, body.model_name,
body.system_prompt, body.user_prompt_template, body.prompt_version,
body.schema_version, body.temperature, body.max_tokens, body.timeout_seconds,
body.max_retries,
)
return _row_to_dict(row)
@app.put("/api/agents/{agent_id}")
async def update_agent(agent_id: str, body: AgentUpdateBody):
"""Update an agent configuration.
Both system and user agents can be edited. User changes are preserved
across reinstalls because migration 026 only inserts system agents
that don't already exist (by slug).
"""
updates: list[str] = []
params: list[Any] = []
idx = 1
for field_name, value in body.model_dump(exclude_none=True).items():
updates.append(f"{field_name} = ${idx}")
params.append(value)
idx += 1
if not updates:
raise HTTPException(400, "No fields to update")
updates.append("updated_at = NOW()")
set_clause = ", ".join(updates)
params.append(agent_id)
row = await pool.fetchrow(
f"""UPDATE ai_agents SET {set_clause}
WHERE id = ${idx}
RETURNING id, name, slug, purpose, model_provider, model_name,
system_prompt, user_prompt_template, prompt_version,
schema_version, temperature, max_tokens, timeout_seconds,
max_retries, active, source, created_at, updated_at""",
*params,
)
if not row:
raise HTTPException(404, "Agent not found")
return _row_to_dict(row)
@app.delete("/api/agents/{agent_id}")
async def delete_agent(agent_id: str):
"""Delete a user-created agent. System agents cannot be deleted."""
row = await pool.fetchrow(
"SELECT source FROM ai_agents WHERE id = $1", agent_id,
)
if not row:
raise HTTPException(404, "Agent not found")
if row["source"] == "system":
raise HTTPException(403, "Cannot delete system agents — deactivate instead")
await pool.execute("DELETE FROM ai_agents WHERE id = $1", agent_id)
return {"deleted": True}
@app.get("/api/agents/{agent_id}/performance")
async def get_agent_performance(agent_id: str, hours: int = Query(default=24, le=720)):
"""Get aggregated performance metrics for an agent."""
row = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_invocations,
COUNT(*) FILTER (WHERE success) AS successes,
COUNT(*) FILTER (WHERE NOT success) AS failures,
ROUND(AVG(duration_ms)::numeric) AS avg_duration_ms,
ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms)::numeric) AS p95_duration_ms,
ROUND(AVG(confidence)::numeric, 4) AS avg_confidence,
ROUND(AVG(retry_count)::numeric, 2) AS avg_retries,
SUM(input_tokens) AS total_input_tokens,
SUM(output_tokens) AS total_output_tokens
FROM agent_performance_log
WHERE agent_id = $1
AND recorded_at >= NOW() - make_interval(hours => $2)""",
agent_id, hours,
)
d = _row_to_dict(row) if row else {}
total = int(d.get("total_invocations", 0) or 0)
successes = int(d.get("successes", 0) or 0)
d["success_rate"] = round(successes / total, 4) if total > 0 else None
return d
@app.get("/api/agents/{agent_id}/performance/history")
async def get_agent_performance_history(
agent_id: str,
hours: int = Query(default=24, le=720),
):
"""Get hourly performance time-series for an agent."""
rows = await pool.fetch(
"""SELECT
date_trunc('hour', recorded_at) AS hour,
COUNT(*) AS invocations,
COUNT(*) FILTER (WHERE success) AS successes,
ROUND(AVG(duration_ms)::numeric) AS avg_duration_ms,
ROUND(AVG(confidence)::numeric, 4) AS avg_confidence
FROM agent_performance_log
WHERE agent_id = $1
AND recorded_at >= NOW() - make_interval(hours => $2)
GROUP BY 1 ORDER BY 1""",
agent_id, hours,
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Agent Variants (Requirements 2, 3, 4, 6, 10)
# ---------------------------------------------------------------------------
@app.get("/api/agents/{agent_id}/variants")
async def list_variants(agent_id: str):
"""List all variants for an agent, ordered by created_at ascending.
Requirement 3.1
"""
rows = await pool.fetch(
"""SELECT id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at
FROM agent_variants
WHERE agent_id = $1
ORDER BY created_at ASC""",
agent_id,
)
return [_row_to_dict(r) for r in rows]
@app.get("/api/agents/{agent_id}/variants/{variant_id}")
async def get_variant(agent_id: str, variant_id: str):
"""Get a single variant. Returns 404 if not found or agent mismatch.
Requirement 3.2
"""
row = await pool.fetchrow(
"""SELECT id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at
FROM agent_variants
WHERE id = $1 AND agent_id = $2""",
variant_id, agent_id,
)
if not row:
raise HTTPException(404, "Variant not found")
return _row_to_dict(row)
@app.post("/api/agents/{agent_id}/variants", status_code=201)
async def create_variant(agent_id: str, body: VariantCreateBody):
"""Create a new variant for an agent.
Auto-generates slug from variant_name if not provided.
Returns 409 on duplicate slug.
Requirement 3
"""
slug = body.variant_slug or _slugify(body.variant_name)
try:
row = await pool.fetchrow(
"""INSERT INTO agent_variants (
agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at""",
agent_id, body.variant_name, slug, body.description,
body.model_provider, body.model_name, body.system_prompt,
body.user_prompt_template, body.prompt_version, body.temperature,
body.max_tokens, body.context_window, body.input_token_limit,
body.token_budget, body.timeout_seconds, body.max_retries,
)
except asyncpg.UniqueViolationError:
raise HTTPException(409, f"Variant slug '{slug}' already exists for this agent")
return _row_to_dict(row)
@app.put("/api/agents/{agent_id}/variants/{variant_id}")
async def update_variant(agent_id: str, variant_id: str, body: VariantUpdateBody):
"""Partial update a variant. Sets updated_at = NOW().
Requirement 3.4
"""
updates: list[str] = []
params: list[Any] = []
idx = 1
for field_name, value in body.model_dump(exclude_none=True).items():
updates.append(f"{field_name} = ${idx}")
params.append(value)
idx += 1
if not updates:
raise HTTPException(400, "No fields to update")
updates.append("updated_at = NOW()")
set_clause = ", ".join(updates)
params.append(variant_id)
params.append(agent_id)
row = await pool.fetchrow(
f"""UPDATE agent_variants SET {set_clause}
WHERE id = ${idx} AND agent_id = ${idx + 1}
RETURNING id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at""",
*params,
)
if not row:
raise HTTPException(404, "Variant not found")
return _row_to_dict(row)
@app.delete("/api/agents/{agent_id}/variants/{variant_id}")
async def delete_variant(agent_id: str, variant_id: str):
"""Delete a variant. Returns 400 if variant is currently active.
Requirement 3.5, 3.6
"""
row = await pool.fetchrow(
"SELECT is_active FROM agent_variants WHERE id = $1 AND agent_id = $2",
variant_id, agent_id,
)
if not row:
raise HTTPException(404, "Variant not found")
if row["is_active"]:
raise HTTPException(400, "Cannot delete active variant — deactivate it first")
await pool.execute(
"DELETE FROM agent_variants WHERE id = $1 AND agent_id = $2",
variant_id, agent_id,
)
return {"deleted": True}
# ---------------------------------------------------------------------------
# Clone Endpoints (Requirement 2)
# ---------------------------------------------------------------------------
@app.post("/api/agents/{agent_id}/clone", status_code=201)
async def clone_agent_as_variant(agent_id: str, body: VariantCloneBody):
"""Clone an agent's configuration as a new variant.
Copies the agent's model/prompt/parameter fields into a new variant,
with optional overrides from the request body.
Requirement 2.1, 2.3, 2.4, 2.5, 2.6
"""
agent = await pool.fetchrow(
"""SELECT model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, timeout_seconds, max_retries
FROM ai_agents WHERE id = $1""",
agent_id,
)
if not agent:
raise HTTPException(404, "Agent not found")
slug = body.variant_slug or _slugify(body.variant_name)
description = body.description if body.description is not None else ""
model_provider = body.model_provider if body.model_provider is not None else agent["model_provider"]
model_name = body.model_name if body.model_name is not None else agent["model_name"]
system_prompt = body.system_prompt if body.system_prompt is not None else agent["system_prompt"]
user_prompt_template = body.user_prompt_template if body.user_prompt_template is not None else agent["user_prompt_template"]
prompt_version = body.prompt_version if body.prompt_version is not None else agent["prompt_version"]
temperature = body.temperature if body.temperature is not None else agent["temperature"]
max_tokens = body.max_tokens if body.max_tokens is not None else agent["max_tokens"]
timeout_seconds = body.timeout_seconds if body.timeout_seconds is not None else agent["timeout_seconds"]
max_retries = body.max_retries if body.max_retries is not None else agent["max_retries"]
# ai_agents table doesn't have these columns — default to 0
context_window = body.context_window if body.context_window is not None else 0
input_token_limit = body.input_token_limit if body.input_token_limit is not None else 0
token_budget = body.token_budget if body.token_budget is not None else 0
try:
row = await pool.fetchrow(
"""INSERT INTO agent_variants (
agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at""",
agent_id, body.variant_name, slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
)
except asyncpg.UniqueViolationError:
raise HTTPException(409, f"Variant slug '{slug}' already exists for this agent")
return _row_to_dict(row)
@app.post("/api/agents/{agent_id}/variants/{variant_id}/clone", status_code=201)
async def clone_variant(agent_id: str, variant_id: str, body: VariantCloneBody):
"""Clone an existing variant as a new variant under the same agent.
Copies all configuration fields from the source variant,
with optional overrides from the request body.
Requirement 2.2, 2.3, 2.4, 2.5, 2.6
"""
source = await pool.fetchrow(
"""SELECT model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
description
FROM agent_variants
WHERE id = $1 AND agent_id = $2""",
variant_id, agent_id,
)
if not source:
raise HTTPException(404, "Source variant not found")
slug = body.variant_slug or _slugify(body.variant_name)
description = body.description if body.description is not None else source["description"]
model_provider = body.model_provider if body.model_provider is not None else source["model_provider"]
model_name = body.model_name if body.model_name is not None else source["model_name"]
system_prompt = body.system_prompt if body.system_prompt is not None else source["system_prompt"]
user_prompt_template = body.user_prompt_template if body.user_prompt_template is not None else source["user_prompt_template"]
prompt_version = body.prompt_version if body.prompt_version is not None else source["prompt_version"]
temperature = body.temperature if body.temperature is not None else source["temperature"]
max_tokens = body.max_tokens if body.max_tokens is not None else source["max_tokens"]
context_window = body.context_window if body.context_window is not None else source["context_window"]
input_token_limit = body.input_token_limit if body.input_token_limit is not None else source["input_token_limit"]
token_budget = body.token_budget if body.token_budget is not None else source["token_budget"]
timeout_seconds = body.timeout_seconds if body.timeout_seconds is not None else source["timeout_seconds"]
max_retries = body.max_retries if body.max_retries is not None else source["max_retries"]
try:
row = await pool.fetchrow(
"""INSERT INTO agent_variants (
agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at""",
agent_id, body.variant_name, slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
)
except asyncpg.UniqueViolationError:
raise HTTPException(409, f"Variant slug '{slug}' already exists for this agent")
return _row_to_dict(row)
# ---------------------------------------------------------------------------
# Activate / Deactivate Endpoints (Requirement 4)
# ---------------------------------------------------------------------------
@app.post("/api/agents/{agent_id}/variants/{variant_id}/activate")
async def activate_variant(agent_id: str, variant_id: str):
"""Set a variant as the active variant for its agent.
Within a single transaction: deactivate any currently active variant,
then activate the target variant.
Requirement 4.1, 4.5
"""
async with pool.acquire() as conn:
async with conn.transaction():
# Deactivate any currently active variant for this agent
await conn.execute(
"""UPDATE agent_variants SET is_active = FALSE, updated_at = NOW()
WHERE agent_id = $1 AND is_active = TRUE""",
agent_id,
)
# Activate the target variant
row = await conn.fetchrow(
"""UPDATE agent_variants SET is_active = TRUE, updated_at = NOW()
WHERE id = $1 AND agent_id = $2
RETURNING id, agent_id, variant_name, variant_slug, description,
model_provider, model_name, system_prompt, user_prompt_template,
prompt_version, temperature, max_tokens, context_window,
input_token_limit, token_budget, timeout_seconds, max_retries,
is_active, created_at, updated_at""",
variant_id, agent_id,
)
if not row:
raise HTTPException(404, "Variant not found")
return _row_to_dict(row)
@app.post("/api/agents/{agent_id}/variants/deactivate")
async def deactivate_variants(agent_id: str):
"""Deactivate the currently active variant for an agent.
The agent falls back to its base configuration.
Requirement 4.2
"""
await pool.execute(
"""UPDATE agent_variants SET is_active = FALSE, updated_at = NOW()
WHERE agent_id = $1 AND is_active = TRUE""",
agent_id,
)
return {"deactivated": True}
# ---------------------------------------------------------------------------
# Per-Variant Performance Endpoints (Requirement 6)
# ---------------------------------------------------------------------------
@app.get("/api/agents/{agent_id}/variants/{variant_id}/performance")
async def get_variant_performance(
agent_id: str,
variant_id: str,
hours: int = Query(default=24, le=720),
):
"""Aggregated performance metrics for a specific variant.
Requirement 6.3
"""
row = await pool.fetchrow(
"""SELECT
COUNT(*) AS total_invocations,
COUNT(*) FILTER (WHERE success) AS successes,
COUNT(*) FILTER (WHERE NOT success) AS failures,
ROUND(AVG(duration_ms)::numeric) AS avg_duration_ms,
ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration_ms)::numeric) AS p95_duration_ms,
ROUND(AVG(confidence)::numeric, 4) AS avg_confidence,
ROUND(AVG(retry_count)::numeric, 2) AS avg_retries,
SUM(input_tokens) AS total_input_tokens,
SUM(output_tokens) AS total_output_tokens
FROM agent_performance_log
WHERE agent_id = $1
AND variant_id = $2
AND recorded_at >= NOW() - make_interval(hours => $3)""",
agent_id, variant_id, hours,
)
d = _row_to_dict(row) if row else {}
total = int(d.get("total_invocations", 0) or 0)
successes = int(d.get("successes", 0) or 0)
d["success_rate"] = round(successes / total, 4) if total > 0 else None
return d
@app.get("/api/agents/{agent_id}/variants/{variant_id}/performance/history")
async def get_variant_performance_history(
agent_id: str,
variant_id: str,
hours: int = Query(default=24, le=720),
):
"""Hourly performance time-series for a specific variant.
Requirement 6.4
"""
rows = await pool.fetch(
"""SELECT
date_trunc('hour', recorded_at) AS hour,
COUNT(*) AS invocations,
COUNT(*) FILTER (WHERE success) AS successes,
ROUND(AVG(duration_ms)::numeric) AS avg_duration_ms,
ROUND(AVG(confidence)::numeric, 4) AS avg_confidence
FROM agent_performance_log
WHERE agent_id = $1
AND variant_id = $2
AND recorded_at >= NOW() - make_interval(hours => $3)
GROUP BY 1 ORDER BY 1""",
agent_id, variant_id, hours,
)
return [_row_to_dict(r) for r in rows]
# ---------------------------------------------------------------------------
# Model Validation Dashboard (Requirements 12.1, 12.2, 12.3, 12.7)
# ---------------------------------------------------------------------------
_VALID_LOOKBACKS = {"7d", "30d", "90d", "all"}
_VALID_HORIZONS = {"1h", "6h", "1d", "7d", "30d"}
@app.get("/api/validation/summary")
async def get_validation_summary(
lookback: str = Query(default="30d"),
horizon: str = Query(default="7d"),
):
"""Latest model metric snapshot plus quality gate status.
Returns the most recent model_metric_snapshot for the given
lookback/horizon combination, along with the current gate status
from risk_configs.
Requirement 12.1
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
if horizon not in _VALID_HORIZONS:
raise HTTPException(400, f"Invalid horizon: {horizon}. Must be one of {sorted(_VALID_HORIZONS)}")
# Latest metric snapshot for the requested lookback/horizon
snapshot_row = await pool.fetchrow(
"""SELECT id, generated_at, lookback_window, horizon,
prediction_count, win_rate, directional_accuracy,
information_coefficient, rank_information_coefficient,
avg_return, avg_excess_return_vs_spy, avg_excess_return_vs_sector,
calibration_error, brier_score,
buy_win_rate, sell_win_rate, hold_win_rate,
metadata
FROM model_metric_snapshots
WHERE lookback_window = $1 AND horizon = $2
ORDER BY generated_at DESC
LIMIT 1""",
lookback, horizon,
)
snapshot = None
if snapshot_row:
snapshot = _row_to_dict(snapshot_row)
snapshot["metadata"] = _parse_jsonb(snapshot.get("metadata"))
# Gate status from risk_configs
gate_row = await pool.fetchrow(
"SELECT config, updated_at FROM risk_configs WHERE name = 'model_quality_gate'",
)
gate_status = None
if gate_row:
gate_status = _parse_jsonb(gate_row["config"])
return {
"snapshot": snapshot,
"gate_status": gate_status,
}
@app.get("/api/validation/calibration")
async def get_validation_calibration(
lookback: str = Query(default="30d"),
horizon: str = Query(default="7d"),
):
"""Calibration table with confidence buckets.
Queries v_prediction_performance for the given lookback/horizon,
groups by confidence buckets, and computes avg_confidence,
observed_win_rate, count, and miscalibrated flag per bucket.
Requirement 12.2
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
if horizon not in _VALID_HORIZONS:
raise HTTPException(400, f"Invalid horizon: {horizon}. Must be one of {sorted(_VALID_HORIZONS)}")
# Build lookback filter
lookback_condition = ""
params: list[Any] = [horizon]
idx = 2
if lookback != "all":
lookback_days = {"7d": 7, "30d": 30, "90d": 90}[lookback]
lookback_condition = f"AND generated_at >= NOW() - make_interval(days => ${idx})"
params.append(lookback_days)
idx += 1
rows = await pool.fetch(
f"""SELECT confidence, direction_correct
FROM v_prediction_performance
WHERE horizon = $1
{lookback_condition}
AND confidence IS NOT NULL""",
*params,
)
# Group into calibration buckets
buckets_def = [
(0.50, 0.60),
(0.60, 0.70),
(0.70, 0.80),
(0.80, 0.90),
(0.90, 1.00),
]
buckets = []
for low, high in buckets_def:
bucket_rows = []
for r in rows:
conf = float(r["confidence"])
if high == 1.00:
in_bucket = low <= conf <= high
else:
in_bucket = low <= conf < high
if in_bucket:
bucket_rows.append(r)
count = len(bucket_rows)
if count == 0:
buckets.append({
"bucket_low": low,
"bucket_high": high,
"avg_confidence": 0.0,
"observed_win_rate": 0.0,
"prediction_count": 0,
"miscalibrated": False,
})
continue
avg_conf = sum(float(r["confidence"]) for r in bucket_rows) / count
win_count = sum(1 for r in bucket_rows if r["direction_correct"] is True)
win_rate = win_count / count
diff = abs(avg_conf - win_rate)
buckets.append({
"bucket_low": low,
"bucket_high": high,
"avg_confidence": round(avg_conf, 4),
"observed_win_rate": round(win_rate, 4),
"prediction_count": count,
"miscalibrated": diff > 0.15,
})
return {"buckets": buckets, "lookback": lookback, "horizon": horizon}
@app.get("/api/validation/ic-by-horizon")
async def get_validation_ic_by_horizon(
lookback: str = Query(default="30d"),
):
"""IC and Rank IC per prediction horizon.
Queries the most recent model_metric_snapshot for the given lookback
across all 5 horizons, returning IC and Rank IC for each.
Requirement 12.3
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
rows = await pool.fetch(
"""SELECT DISTINCT ON (horizon)
horizon,
information_coefficient,
rank_information_coefficient,
prediction_count,
generated_at
FROM model_metric_snapshots
WHERE lookback_window = $1
ORDER BY horizon, generated_at DESC""",
lookback,
)
horizons = []
for r in rows:
horizons.append({
"horizon": r["horizon"],
"information_coefficient": float(r["information_coefficient"]) if r["information_coefficient"] is not None else None,
"rank_information_coefficient": float(r["rank_information_coefficient"]) if r["rank_information_coefficient"] is not None else None,
"prediction_count": r["prediction_count"],
"generated_at": r["generated_at"].isoformat() if r["generated_at"] else None,
})
# Sort by canonical horizon order
horizon_order = {"1h": 0, "6h": 1, "1d": 2, "7d": 3, "30d": 4}
horizons.sort(key=lambda h: horizon_order.get(h["horizon"], 99))
return {"horizons": horizons, "lookback": lookback}
@app.get("/api/validation/gate-status")
async def get_validation_gate_status():
"""Quality gate evaluation detail.
Returns the stored gate evaluation result from risk_configs
where key = 'model_quality_gate'.
Requirement 12.7
"""
gate_row = await pool.fetchrow(
"SELECT config, updated_at FROM risk_configs WHERE name = 'model_quality_gate'",
)
if not gate_row:
return {
"gate_status": None,
"message": "No gate evaluation found. Model metrics may not have been computed yet.",
}
gate_data = _parse_jsonb(gate_row["config"])
updated_at = gate_row["updated_at"].isoformat() if gate_row.get("updated_at") else None
return {
"gate_status": gate_data,
"updated_at": updated_at,
}
# ---------------------------------------------------------------------------
# Attribution Endpoints (Requirements 12.4, 12.5, 12.6)
# ---------------------------------------------------------------------------
_LOOKBACK_TO_DAYS: dict[str, int] = {
"7d": 7,
"30d": 30,
"90d": 90,
"all": 3650,
}
@app.get("/api/validation/attribution/sources")
async def get_validation_attribution_sources(
lookback: str = Query(default="30d"),
horizon: str = Query(default="7d"),
):
"""Per-source performance metrics.
Returns win rate, IC, average return, duplicate rate, and other
attribution metrics for each source, computed over the given
lookback window and prediction horizon.
Requirement 12.4
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
if horizon not in _VALID_HORIZONS:
raise HTTPException(400, f"Invalid horizon: {horizon}. Must be one of {sorted(_VALID_HORIZONS)}")
lookback_days = _LOOKBACK_TO_DAYS[lookback]
try:
results = await compute_source_attribution(pool, lookback_days=lookback_days, horizon=horizon)
except Exception:
logger.exception("Failed to compute source attribution")
raise HTTPException(500, "Failed to compute source attribution")
return {
"sources": [asdict(r) for r in results],
"lookback": lookback,
"horizon": horizon,
}
@app.get("/api/validation/attribution/catalysts")
async def get_validation_attribution_catalysts(
lookback: str = Query(default="30d"),
horizon: str = Query(default="7d"),
):
"""Per-catalyst-type performance metrics.
Returns win rate, IC, average return, and other attribution metrics
for each catalyst type, computed over the given lookback window
and prediction horizon.
Requirement 12.5
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
if horizon not in _VALID_HORIZONS:
raise HTTPException(400, f"Invalid horizon: {horizon}. Must be one of {sorted(_VALID_HORIZONS)}")
lookback_days = _LOOKBACK_TO_DAYS[lookback]
try:
results = await compute_catalyst_attribution(pool, lookback_days=lookback_days, horizon=horizon)
except Exception:
logger.exception("Failed to compute catalyst attribution")
raise HTTPException(500, "Failed to compute catalyst attribution")
return {
"catalysts": [asdict(r) for r in results],
"lookback": lookback,
"horizon": horizon,
}
@app.get("/api/validation/attribution/layers")
async def get_validation_attribution_layers(
lookback: str = Query(default="30d"),
horizon: str = Query(default="7d"),
):
"""Per-signal-layer (company, macro, competitive) performance metrics.
Returns average contribution percentage, dominant win rate, and
dominant IC for each of the three signal layers, computed over
the given lookback window and prediction horizon.
Requirement 12.6
"""
if lookback not in _VALID_LOOKBACKS:
raise HTTPException(400, f"Invalid lookback: {lookback}. Must be one of {sorted(_VALID_LOOKBACKS)}")
if horizon not in _VALID_HORIZONS:
raise HTTPException(400, f"Invalid horizon: {horizon}. Must be one of {sorted(_VALID_HORIZONS)}")
lookback_days = _LOOKBACK_TO_DAYS[lookback]
try:
results = await compute_layer_attribution(pool, lookback_days=lookback_days, horizon=horizon)
except Exception:
logger.exception("Failed to compute layer attribution")
raise HTTPException(500, "Failed to compute layer attribution")
return {
"layers": [asdict(r) for r in results],
"lookback": lookback,
"horizon": horizon,
}
# ---------------------------------------------------------------------------
# Trading Reports
# ---------------------------------------------------------------------------
@app.get("/api/reports")
async def list_reports(
report_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = Query(default=20, le=100),
offset: int = Query(default=0, ge=0),
):
"""Paginated list of trading reports with optional filtering.
Query params:
- report_type: 'daily' or 'weekly'
- start_date: ISO date (YYYY-MM-DD) — filter period_start >= this
- end_date: ISO date (YYYY-MM-DD) — filter period_end <= this
- limit: max results (default 20, max 100)
- offset: pagination offset (default 0)
Requirements: 5.4, 5.5, 5.6
"""
conditions: list[str] = []
params: list[Any] = []
idx = 1
if report_type:
if report_type not in ("daily", "weekly"):
raise HTTPException(400, "report_type must be 'daily' or 'weekly'")
conditions.append(f"report_type = ${idx}")
params.append(report_type)
idx += 1
if start_date:
try:
from datetime import date as _date
_date.fromisoformat(start_date)
except ValueError:
raise HTTPException(400, "start_date must be YYYY-MM-DD")
conditions.append(f"period_start >= ${idx}::date")
params.append(start_date)
idx += 1
if end_date:
try:
from datetime import date as _date
_date.fromisoformat(end_date)
except ValueError:
raise HTTPException(400, "end_date must be YYYY-MM-DD")
conditions.append(f"period_end <= ${idx}::date")
params.append(end_date)
idx += 1
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
query = f"""
SELECT id, report_type, period_start, period_end,
validation_status, generated_at
FROM trading_reports
{where}
ORDER BY generated_at DESC
LIMIT ${idx} OFFSET ${idx + 1}
"""
params.extend([limit, offset])
rows = await pool.fetch(query, *params)
return [
{
"id": str(r["id"]),
"report_type": r["report_type"],
"period_start": r["period_start"].isoformat(),
"period_end": r["period_end"].isoformat(),
"validation_status": r["validation_status"],
"generated_at": r["generated_at"].isoformat(),
}
for r in rows
]
@app.get("/api/reports/{report_id}")
async def get_report(report_id: str):
"""Fetch a single report including full report_data JSONB.
Requirements: 5.4, 5.5
"""
row = await pool.fetchrow(
"""SELECT id, report_type, period_start, period_end,
report_data, validation_status, generated_at, created_at
FROM trading_reports
WHERE id = $1::uuid""",
report_id,
)
if row is None:
raise HTTPException(404, "Report not found")
return {
"id": str(row["id"]),
"report_type": row["report_type"],
"period_start": row["period_start"].isoformat(),
"period_end": row["period_end"].isoformat(),
"report_data": json.loads(row["report_data"]) if isinstance(row["report_data"], str) else row["report_data"],
"validation_status": row["validation_status"],
"generated_at": row["generated_at"].isoformat(),
"created_at": row["created_at"].isoformat(),
}