From 288c5333b528a63c49ecb23fec86fe9c888c2c6c Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Mon, 20 Apr 2026 13:16:11 +0000 Subject: [PATCH] fix: use queue_key() for stage-prefixed Redis queue names in pipeline endpoints The pipeline health, SSE stream, and retry endpoints were hardcoding 'stonks:queue:{name}' but services use DEPLOY_STAGE prefix ('stonks:paper:queue:{name}'). Now uses queue_key() from redis_keys.py. --- services/api/app.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/services/api/app.py b/services/api/app.py index fac2dde..1275b15 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -41,6 +41,7 @@ from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, from services.shared.config import load_config from services.shared.db import get_pg_pool, get_redis from services.shared.logging import new_trace_id, set_trace_context, setup_logging +from services.shared.redis_keys import QUEUE_PREFIX, queue_key from services.shared.schemas import MAJOR_DECISION_CATALYSTS logger = logging.getLogger("query_api") @@ -1770,7 +1771,7 @@ async def get_pipeline_health( "trade", "trading_decisions", "broker_orders", ): try: - depth = await rds.llen(f"stonks:queue:{qname}") + depth = await rds.llen(queue_key(qname)) queue_depths[qname] = depth except Exception: queue_depths[qname] = -1 @@ -1780,7 +1781,7 @@ async def get_pipeline_health( "recommendation", "broker_orders", ): try: - depth = await rds.llen(f"stonks:dlq:{qname}") + depth = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}") if depth > 0: queue_depths[f"dlq:{qname}"] = depth except Exception: @@ -1834,12 +1835,12 @@ async def pipeline_stream(request: Request): if rds: for qname in PIPELINE_QUEUES: try: - depths[qname] = await rds.llen(f"stonks:queue:{qname}") + depths[qname] = await rds.llen(queue_key(qname)) except Exception: depths[qname] = -1 for qname in PIPELINE_DLQS: try: - d = await rds.llen(f"stonks:dlq:{qname}") + d = await rds.llen(f"{QUEUE_PREFIX}:dlq:{qname}") if d > 0: depths[f"dlq:{qname}"] = d except Exception: @@ -1893,9 +1894,9 @@ async def retry_failed_extractions_endpoint(): for row in rows: doc_type = row["document_type"] if doc_type == "macro_event": - target = "stonks:queue:macro_classification" + target = queue_key("macro_classification") else: - target = "stonks:queue:extraction" + target = queue_key("extraction") await rds.rpush(target, json.dumps({ "document_id": str(row["id"]),