From 58a05ca3227b12fe5884a1333809bb577276c0f6 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 08:15:44 +0000 Subject: [PATCH] feat: add SSE stream for live pipeline status, add all 10 queues + DLQs, configure nginx for SSE --- frontend/nginx.conf | 13 ++++ frontend/src/pages/OpsPipeline.tsx | 105 ++++++++++++++++++++++++----- services/api/app.py | 93 ++++++++++++++++++++++++- 3 files changed, 194 insertions(+), 17 deletions(-) diff --git a/frontend/nginx.conf b/frontend/nginx.conf index 789ae5c..c3e8fe3 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -18,6 +18,19 @@ server { proxy_set_header X-Forwarded-Proto $scheme; } + # SSE stream endpoints — disable buffering for real-time delivery + location /api/ops/pipeline/stream { + proxy_pass http://query-api:8000; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_buffering off; + proxy_cache off; + proxy_read_timeout 86400s; + chunked_transfer_encoding off; + } + # Proxy Symbol Registry (companies CRUD, sources, watchlists, aliases) location /registry/ { proxy_pass http://symbol-registry:8000/; diff --git a/frontend/src/pages/OpsPipeline.tsx b/frontend/src/pages/OpsPipeline.tsx index e0da0ba..af2b0a5 100644 --- a/frontend/src/pages/OpsPipeline.tsx +++ b/frontend/src/pages/OpsPipeline.tsx @@ -1,4 +1,4 @@ -import { useState } from 'react'; +import { useState, useEffect } from 'react'; import { usePipelineHealth } from '../api/hooks'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; @@ -6,58 +6,133 @@ const QUEUE_LABELS: Record = { ingestion: 'Ingestion', parsing: 'Parsing', extraction: 'Extraction', - macro_classification: 'Macro Classification', + macro_classification: 'Macro Classify', aggregation: 'Aggregation', recommendation: 'Recommendation', + lake_publish: 'Lake Publish', + trade: 'Trade', + trading_decisions: 'Trading Decisions', broker_orders: 'Broker Orders', }; +interface StreamData { + queue_depths: Record; + document_stages: Record; +} + +function usePipelineStream() { + const [data, setData] = useState(null); + + useEffect(() => { + const base = import.meta.env.VITE_QUERY_API_BASE || ''; + const url = `${base}/api/ops/pipeline/stream`; + const es = new EventSource(url); + + es.onmessage = (event) => { + try { + setData(JSON.parse(event.data)); + } catch { + // ignore parse errors + } + }; + + es.onerror = () => { + // EventSource auto-reconnects + }; + + return () => es.close(); + }, []); + + return data; +} + export function OpsPipelinePage() { const [hours, setHours] = useState(24); const { data, isLoading } = usePipelineHealth(hours); + const stream = usePipelineStream(); if (isLoading) return ; - const stages = (data?.document_stages as Array<{ status: string; doc_count: number }>) ?? []; const parsing = (data?.parsing ?? {}) as Record; const extraction = (data?.extraction ?? {}) as Record; const aggregation = (data?.aggregation ?? {}) as Record; - const queueDepths = (data?.queue_depths ?? {}) as Record; + + // Prefer live stream data for queue depths and doc stages, fall back to initial fetch + const queueDepths = stream?.queue_depths + ?? (data?.queue_depths as Record | undefined) + ?? {}; + const docStages = stream?.document_stages + ?? Object.fromEntries( + ((data?.document_stages as Array<{ status: string; doc_count: number }>) ?? []) + .map((s) => [s.status, s.doc_count]), + ); + + // Separate DLQ entries from regular queues + const dlqEntries = Object.entries(queueDepths).filter(([k]) => k.startsWith('dlq:')); + const regularQueues = Object.entries(QUEUE_LABELS); return (

Pipeline Health

- +
+ {stream && ( + + + )} + +
{/* Queue Depths */} -

Queue Depths (live)

-
- {Object.entries(QUEUE_LABELS).map(([key, label]) => { +

Queue Depths

+
+ {regularQueues.map(([key, label]) => { const depth = queueDepths[key] ?? 0; const color = depth > 100 ? 'text-amber-400' : depth > 0 ? 'text-blue-400' : 'text-gray-500'; return (
-
{depth}
+
{depth}
{label}
); })}
+ {dlqEntries.length > 0 && ( +
+

Dead Letter Queues

+
+ {dlqEntries.map(([key, depth]) => ( +
+
{depth}
+
{key.replace('dlq:', '')}
+
+ ))} +
+
+ )} {/* Document Stage Counts */}

Document Stages

- {stages.map((s) => ( -
-
{s.doc_count}
-
{s.status}
-
- ))} + {Object.entries(docStages).map(([status, count]) => { + const color = status === 'extracted' ? 'text-green-400' + : status === 'parsed' ? 'text-yellow-400' + : status === 'extraction_failed' ? 'text-red-400' + : status === 'low_quality' ? 'text-orange-400' + : 'text-gray-100'; + return ( +
+
{count}
+
{status.replace('_', ' ')}
+
+ ); + })}
diff --git a/services/api/app.py b/services/api/app.py index 125a6e0..78d1425 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -12,6 +12,7 @@ Design: Section 9.1 (Operational API) """ from __future__ import annotations +import asyncio import json import logging import time as _time @@ -27,7 +28,7 @@ from fastapi import FastAPI, HTTPException, Query, Request from prometheus_client import CONTENT_TYPE_LATEST, generate_latest from pydantic import BaseModel from starlette.middleware.base import BaseHTTPMiddleware -from starlette.responses import Response +from starlette.responses import Response, StreamingResponse from services.aggregation.pattern_matcher import ( find_cross_company_patterns, @@ -1481,12 +1482,27 @@ async def get_pipeline_health( # Queue depths from Redis queue_depths: dict[str, int] = {} if rds: - for qname in ("ingestion", "parsing", "extraction", "macro_classification", "aggregation", "recommendation", "broker_orders"): + for qname in ( + "ingestion", "parsing", "extraction", "macro_classification", + "aggregation", "recommendation", "lake_publish", + "trade", "trading_decisions", "broker_orders", + ): try: depth = await rds.llen(f"stonks:queue:{qname}") queue_depths[qname] = depth except Exception: queue_depths[qname] = -1 + # Also check dead-letter queues + for qname in ( + "ingestion", "parsing", "extraction", "aggregation", + "recommendation", "broker_orders", + ): + try: + depth = await rds.llen(f"stonks:dlq:{qname}") + if depth > 0: + queue_depths[f"dlq:{qname}"] = depth + except Exception: + pass return { "hours": hours, @@ -1498,6 +1514,79 @@ async def get_pipeline_health( } +# --------------------------------------------------------------------------- +# SSE: Live Pipeline Stream +# --------------------------------------------------------------------------- + +PIPELINE_QUEUES = ( + "ingestion", "parsing", "extraction", "macro_classification", + "aggregation", "recommendation", "lake_publish", + "trade", "trading_decisions", "broker_orders", +) + +PIPELINE_DLQS = ( + "ingestion", "parsing", "extraction", "aggregation", + "recommendation", "broker_orders", +) + + +@app.get("/api/ops/pipeline/stream") +async def pipeline_stream(request: Request): + """Server-Sent Events stream of live pipeline status. + + Pushes queue depths and document stage counts every 3 seconds. + The browser can consume this with EventSource for real-time updates + without polling. + """ + + async def event_generator(): + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + data: dict[str, Any] = {} + + # Queue depths + depths: dict[str, int] = {} + if rds: + for qname in PIPELINE_QUEUES: + try: + depths[qname] = await rds.llen(f"stonks:queue:{qname}") + except Exception: + depths[qname] = -1 + for qname in PIPELINE_DLQS: + try: + d = await rds.llen(f"stonks:dlq:{qname}") + if d > 0: + depths[f"dlq:{qname}"] = d + except Exception: + pass + data["queue_depths"] = depths + + # Document stage counts (lightweight query) + try: + stages = await pool.fetch( + "SELECT status, count(*) AS doc_count FROM documents GROUP BY status" + ) + data["document_stages"] = {r["status"]: r["doc_count"] for r in stages} + except Exception: + data["document_stages"] = {} + + yield f"data: {json.dumps(data)}\n\n" + await asyncio.sleep(3) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + @app.get("/api/ops/sources/coverage-gaps") async def get_source_coverage_gaps(): """Identify symbols with missing or insufficient source coverage.