diff --git a/frontend/src/pages/OpsPipeline.tsx b/frontend/src/pages/OpsPipeline.tsx index 0a77a4d..e0da0ba 100644 --- a/frontend/src/pages/OpsPipeline.tsx +++ b/frontend/src/pages/OpsPipeline.tsx @@ -2,6 +2,16 @@ import { useState } from 'react'; import { usePipelineHealth } from '../api/hooks'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; +const QUEUE_LABELS: Record = { + ingestion: 'Ingestion', + parsing: 'Parsing', + extraction: 'Extraction', + macro_classification: 'Macro Classification', + aggregation: 'Aggregation', + recommendation: 'Recommendation', + broker_orders: 'Broker Orders', +}; + export function OpsPipelinePage() { const [hours, setHours] = useState(24); const { data, isLoading } = usePipelineHealth(hours); @@ -12,6 +22,7 @@ export function OpsPipelinePage() { const parsing = (data?.parsing ?? {}) as Record; const extraction = (data?.extraction ?? {}) as Record; const aggregation = (data?.aggregation ?? {}) as Record; + const queueDepths = (data?.queue_depths ?? {}) as Record; return (
@@ -20,6 +31,23 @@ export function OpsPipelinePage() {
+ {/* Queue Depths */} + +

Queue Depths (live)

+
+ {Object.entries(QUEUE_LABELS).map(([key, label]) => { + const depth = queueDepths[key] ?? 0; + const color = depth > 100 ? 'text-amber-400' : depth > 0 ? 'text-blue-400' : 'text-gray-500'; + return ( +
+
{depth}
+
{label}
+
+ ); + })} +
+
+ {/* Document Stage Counts */}

Document Stages

diff --git a/services/api/app.py b/services/api/app.py index 65ca1ad..125a6e0 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -22,6 +22,7 @@ from typing import Any, Optional import asyncpg import httpx +import redis.asyncio as aioredis from fastapi import FastAPI, HTTPException, Query, Request from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from pydantic import BaseModel @@ -35,7 +36,7 @@ from services.aggregation.pattern_matcher import ( from services.extractor.metrics import get_model_performance_summary from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, record_audit_event from services.shared.config import load_config -from services.shared.db import get_pg_pool +from services.shared.db import get_pg_pool, get_redis from services.shared.logging import new_trace_id, set_trace_context, setup_logging from services.shared.schemas import MAJOR_DECISION_CATALYSTS @@ -43,15 +44,18 @@ logger = logging.getLogger("query_api") config = load_config() pool: Optional[asyncpg.Pool] = None +rds: Optional[aioredis.Redis] = None @asynccontextmanager async def lifespan(app: FastAPI): - global pool + global pool, rds setup_logging("query_api", level=config.log_level, json_output=config.json_logs) pool = await get_pg_pool(config) + rds = get_redis(config) yield await pool.close() + await rds.close() app = FastAPI(title="Stonks Oracle - Query API", lifespan=lifespan) @@ -1474,12 +1478,23 @@ async def get_pipeline_health( hours, ) + # Queue depths from Redis + queue_depths: dict[str, int] = {} + if rds: + for qname in ("ingestion", "parsing", "extraction", "macro_classification", "aggregation", "recommendation", "broker_orders"): + try: + depth = await rds.llen(f"stonks:queue:{qname}") + queue_depths[qname] = depth + except Exception: + queue_depths[qname] = -1 + return { "hours": hours, "document_stages": [_row_to_dict(r) for r in doc_stages], "parsing": _row_to_dict(parse_quality) if parse_quality else {}, "extraction": _row_to_dict(extraction_stats) if extraction_stats else {}, "aggregation": _row_to_dict(trend_stats) if trend_stats else {}, + "queue_depths": queue_depths, }