fix: use queue_key() for stage-prefixed Redis queue names in pipeline endpoints
The pipeline health, SSE stream, and retry endpoints were hardcoding
'stonks:queue:{name}' but services use DEPLOY_STAGE prefix
('stonks:paper:queue:{name}'). Now uses queue_key() from redis_keys.py.
This commit is contained in:
+7
-6
@@ -41,6 +41,7 @@ from services.shared.audit import get_entity_audit_trail, get_order_audit_trail,
|
|||||||
from services.shared.config import load_config
|
from services.shared.config import load_config
|
||||||
from services.shared.db import get_pg_pool, get_redis
|
from services.shared.db import get_pg_pool, get_redis
|
||||||
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
|
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
|
||||||
|
from services.shared.redis_keys import QUEUE_PREFIX, queue_key
|
||||||
from services.shared.schemas import MAJOR_DECISION_CATALYSTS
|
from services.shared.schemas import MAJOR_DECISION_CATALYSTS
|
||||||
|
|
||||||
logger = logging.getLogger("query_api")
|
logger = logging.getLogger("query_api")
|
||||||
@@ -1770,7 +1771,7 @@ async def get_pipeline_health(
|
|||||||
"trade", "trading_decisions", "broker_orders",
|
"trade", "trading_decisions", "broker_orders",
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
depth = await rds.llen(f"stonks:queue:{qname}")
|
depth = await rds.llen(queue_key(qname))
|
||||||
queue_depths[qname] = depth
|
queue_depths[qname] = depth
|
||||||
except Exception:
|
except Exception:
|
||||||
queue_depths[qname] = -1
|
queue_depths[qname] = -1
|
||||||
@@ -1780,7 +1781,7 @@ async def get_pipeline_health(
|
|||||||
"recommendation", "broker_orders",
|
"recommendation", "broker_orders",
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
depth = await rds.llen(f"stonks:dlq:{qname}")
|
depth = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}")
|
||||||
if depth > 0:
|
if depth > 0:
|
||||||
queue_depths[f"dlq:{qname}"] = depth
|
queue_depths[f"dlq:{qname}"] = depth
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -1834,12 +1835,12 @@ async def pipeline_stream(request: Request):
|
|||||||
if rds:
|
if rds:
|
||||||
for qname in PIPELINE_QUEUES:
|
for qname in PIPELINE_QUEUES:
|
||||||
try:
|
try:
|
||||||
depths[qname] = await rds.llen(f"stonks:queue:{qname}")
|
depths[qname] = await rds.llen(queue_key(qname))
|
||||||
except Exception:
|
except Exception:
|
||||||
depths[qname] = -1
|
depths[qname] = -1
|
||||||
for qname in PIPELINE_DLQS:
|
for qname in PIPELINE_DLQS:
|
||||||
try:
|
try:
|
||||||
d = await rds.llen(f"stonks:dlq:{qname}")
|
d = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}")
|
||||||
if d > 0:
|
if d > 0:
|
||||||
depths[f"dlq:{qname}"] = d
|
depths[f"dlq:{qname}"] = d
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -1893,9 +1894,9 @@ async def retry_failed_extractions_endpoint():
|
|||||||
for row in rows:
|
for row in rows:
|
||||||
doc_type = row["document_type"]
|
doc_type = row["document_type"]
|
||||||
if doc_type == "macro_event":
|
if doc_type == "macro_event":
|
||||||
target = "stonks:queue:macro_classification"
|
target = queue_key("macro_classification")
|
||||||
else:
|
else:
|
||||||
target = "stonks:queue:extraction"
|
target = queue_key("extraction")
|
||||||
|
|
||||||
await rds.rpush(target, json.dumps({
|
await rds.rpush(target, json.dumps({
|
||||||
"document_id": str(row["id"]),
|
"document_id": str(row["id"]),
|
||||||
|
|||||||
Reference in New Issue
Block a user