feat: add SSE stream for live pipeline status, add all 10 queues + DLQs, configure nginx for SSE
This commit is contained in:
+91
-2
@@ -12,6 +12,7 @@ Design: Section 9.1 (Operational API)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time as _time
|
||||
@@ -27,7 +28,7 @@ from fastapi import FastAPI, HTTPException, Query, Request
|
||||
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
|
||||
from pydantic import BaseModel
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.responses import Response
|
||||
from starlette.responses import Response, StreamingResponse
|
||||
|
||||
from services.aggregation.pattern_matcher import (
|
||||
find_cross_company_patterns,
|
||||
@@ -1481,12 +1482,27 @@ async def get_pipeline_health(
|
||||
# Queue depths from Redis
|
||||
queue_depths: dict[str, int] = {}
|
||||
if rds:
|
||||
for qname in ("ingestion", "parsing", "extraction", "macro_classification", "aggregation", "recommendation", "broker_orders"):
|
||||
for qname in (
|
||||
"ingestion", "parsing", "extraction", "macro_classification",
|
||||
"aggregation", "recommendation", "lake_publish",
|
||||
"trade", "trading_decisions", "broker_orders",
|
||||
):
|
||||
try:
|
||||
depth = await rds.llen(f"stonks:queue:{qname}")
|
||||
queue_depths[qname] = depth
|
||||
except Exception:
|
||||
queue_depths[qname] = -1
|
||||
# Also check dead-letter queues
|
||||
for qname in (
|
||||
"ingestion", "parsing", "extraction", "aggregation",
|
||||
"recommendation", "broker_orders",
|
||||
):
|
||||
try:
|
||||
depth = await rds.llen(f"stonks:dlq:{qname}")
|
||||
if depth > 0:
|
||||
queue_depths[f"dlq:{qname}"] = depth
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"hours": hours,
|
||||
@@ -1498,6 +1514,79 @@ async def get_pipeline_health(
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SSE: Live Pipeline Stream
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
PIPELINE_QUEUES = (
|
||||
"ingestion", "parsing", "extraction", "macro_classification",
|
||||
"aggregation", "recommendation", "lake_publish",
|
||||
"trade", "trading_decisions", "broker_orders",
|
||||
)
|
||||
|
||||
PIPELINE_DLQS = (
|
||||
"ingestion", "parsing", "extraction", "aggregation",
|
||||
"recommendation", "broker_orders",
|
||||
)
|
||||
|
||||
|
||||
@app.get("/api/ops/pipeline/stream")
|
||||
async def pipeline_stream(request: Request):
|
||||
"""Server-Sent Events stream of live pipeline status.
|
||||
|
||||
Pushes queue depths and document stage counts every 3 seconds.
|
||||
The browser can consume this with EventSource for real-time updates
|
||||
without polling.
|
||||
"""
|
||||
|
||||
async def event_generator():
|
||||
while True:
|
||||
# Check if client disconnected
|
||||
if await request.is_disconnected():
|
||||
break
|
||||
|
||||
data: dict[str, Any] = {}
|
||||
|
||||
# Queue depths
|
||||
depths: dict[str, int] = {}
|
||||
if rds:
|
||||
for qname in PIPELINE_QUEUES:
|
||||
try:
|
||||
depths[qname] = await rds.llen(f"stonks:queue:{qname}")
|
||||
except Exception:
|
||||
depths[qname] = -1
|
||||
for qname in PIPELINE_DLQS:
|
||||
try:
|
||||
d = await rds.llen(f"stonks:dlq:{qname}")
|
||||
if d > 0:
|
||||
depths[f"dlq:{qname}"] = d
|
||||
except Exception:
|
||||
pass
|
||||
data["queue_depths"] = depths
|
||||
|
||||
# Document stage counts (lightweight query)
|
||||
try:
|
||||
stages = await pool.fetch(
|
||||
"SELECT status, count(*) AS doc_count FROM documents GROUP BY status"
|
||||
)
|
||||
data["document_stages"] = {r["status"]: r["doc_count"] for r in stages}
|
||||
except Exception:
|
||||
data["document_stages"] = {}
|
||||
|
||||
yield f"data: {json.dumps(data)}\n\n"
|
||||
await asyncio.sleep(3)
|
||||
|
||||
return StreamingResponse(
|
||||
event_generator(),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/api/ops/sources/coverage-gaps")
|
||||
async def get_source_coverage_gaps():
|
||||
"""Identify symbols with missing or insufficient source coverage.
|
||||
|
||||
Reference in New Issue
Block a user