feat: add live queue depths to pipeline health API and dashboard

This commit is contained in:
Celes Renata
2026-04-16 07:49:07 +00:00
parent 8050f4a03b
commit cdc825619e
2 changed files with 45 additions and 2 deletions
+28
View File
@@ -2,6 +2,16 @@ import { useState } from 'react';
import { usePipelineHealth } from '../api/hooks'; import { usePipelineHealth } from '../api/hooks';
import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui';
const QUEUE_LABELS: Record<string, string> = {
ingestion: 'Ingestion',
parsing: 'Parsing',
extraction: 'Extraction',
macro_classification: 'Macro Classification',
aggregation: 'Aggregation',
recommendation: 'Recommendation',
broker_orders: 'Broker Orders',
};
export function OpsPipelinePage() { export function OpsPipelinePage() {
const [hours, setHours] = useState(24); const [hours, setHours] = useState(24);
const { data, isLoading } = usePipelineHealth(hours); const { data, isLoading } = usePipelineHealth(hours);
@@ -12,6 +22,7 @@ export function OpsPipelinePage() {
const parsing = (data?.parsing ?? {}) as Record<string, unknown>; const parsing = (data?.parsing ?? {}) as Record<string, unknown>;
const extraction = (data?.extraction ?? {}) as Record<string, unknown>; const extraction = (data?.extraction ?? {}) as Record<string, unknown>;
const aggregation = (data?.aggregation ?? {}) as Record<string, unknown>; const aggregation = (data?.aggregation ?? {}) as Record<string, unknown>;
const queueDepths = (data?.queue_depths ?? {}) as Record<string, number>;
return ( return (
<div className="space-y-6"> <div className="space-y-6">
@@ -20,6 +31,23 @@ export function OpsPipelinePage() {
<DateRangeSelector value={hours} onChange={setHours} /> <DateRangeSelector value={hours} onChange={setHours} />
</div> </div>
{/* Queue Depths */}
<Card>
<h2 className="mb-3 text-sm font-medium text-gray-400">Queue Depths (live)</h2>
<div className="grid grid-cols-2 gap-3 sm:grid-cols-4 lg:grid-cols-7">
{Object.entries(QUEUE_LABELS).map(([key, label]) => {
const depth = queueDepths[key] ?? 0;
const color = depth > 100 ? 'text-amber-400' : depth > 0 ? 'text-blue-400' : 'text-gray-500';
return (
<div key={key} className="rounded-lg border border-surface-700 bg-surface-950 p-3 text-center">
<div className={`text-2xl font-bold ${color}`}>{depth}</div>
<div className="text-xs text-gray-500">{label}</div>
</div>
);
})}
</div>
</Card>
{/* Document Stage Counts */} {/* Document Stage Counts */}
<Card> <Card>
<h2 className="mb-3 text-sm font-medium text-gray-400">Document Stages</h2> <h2 className="mb-3 text-sm font-medium text-gray-400">Document Stages</h2>
+17 -2
View File
@@ -22,6 +22,7 @@ from typing import Any, Optional
import asyncpg import asyncpg
import httpx import httpx
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException, Query, Request from fastapi import FastAPI, HTTPException, Query, Request
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from pydantic import BaseModel from pydantic import BaseModel
@@ -35,7 +36,7 @@ from services.aggregation.pattern_matcher import (
from services.extractor.metrics import get_model_performance_summary from services.extractor.metrics import get_model_performance_summary
from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, record_audit_event from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, record_audit_event
from services.shared.config import load_config from services.shared.config import load_config
from services.shared.db import get_pg_pool from services.shared.db import get_pg_pool, get_redis
from services.shared.logging import new_trace_id, set_trace_context, setup_logging from services.shared.logging import new_trace_id, set_trace_context, setup_logging
from services.shared.schemas import MAJOR_DECISION_CATALYSTS from services.shared.schemas import MAJOR_DECISION_CATALYSTS
@@ -43,15 +44,18 @@ logger = logging.getLogger("query_api")
config = load_config() config = load_config()
pool: Optional[asyncpg.Pool] = None pool: Optional[asyncpg.Pool] = None
rds: Optional[aioredis.Redis] = None
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
global pool global pool, rds
setup_logging("query_api", level=config.log_level, json_output=config.json_logs) setup_logging("query_api", level=config.log_level, json_output=config.json_logs)
pool = await get_pg_pool(config) pool = await get_pg_pool(config)
rds = get_redis(config)
yield yield
await pool.close() await pool.close()
await rds.close()
app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan) app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan)
@@ -1474,12 +1478,23 @@ async def get_pipeline_health(
hours, hours,
) )
# Queue depths from Redis
queue_depths: dict[str, int] = {}
if rds:
for qname in ("ingestion", "parsing", "extraction", "macro_classification", "aggregation", "recommendation", "broker_orders"):
try:
depth = await rds.llen(f"stonks:queue:{qname}")
queue_depths[qname] = depth
except Exception:
queue_depths[qname] = -1
return { return {
"hours": hours, "hours": hours,
"document_stages": [_row_to_dict(r) for r in doc_stages], "document_stages": [_row_to_dict(r) for r in doc_stages],
"parsing": _row_to_dict(parse_quality) if parse_quality else {}, "parsing": _row_to_dict(parse_quality) if parse_quality else {},
"extraction": _row_to_dict(extraction_stats) if extraction_stats else {}, "extraction": _row_to_dict(extraction_stats) if extraction_stats else {},
"aggregation": _row_to_dict(trend_stats) if trend_stats else {}, "aggregation": _row_to_dict(trend_stats) if trend_stats else {},
"queue_depths": queue_depths,
} }