diff --git a/services/adapters/broker_service.py b/services/adapters/broker_service.py index eba4012..624a324 100644 --- a/services/adapters/broker_service.py +++ b/services/adapters/broker_service.py @@ -76,7 +76,7 @@ from services.shared.metrics import ( RISK_CHECK_FAILURES, RISK_EVALUATIONS_TOTAL, ) -from services.shared.redis_keys import QUEUE_BROKER, queue_key +from services.shared.redis_keys import QUEUE_BROKER, is_pipeline_enabled, queue_key logger = logging.getLogger("broker_service") @@ -923,6 +923,9 @@ async def main() -> None: try: while True: + if not await is_pipeline_enabled(rds): + await asyncio.sleep(2) + continue result = await rds.lpop(queue) raw = str(result) if result else None if raw: diff --git a/services/aggregation/main.py b/services/aggregation/main.py index 0fd2e7f..9b9a568 100644 --- a/services/aggregation/main.py +++ b/services/aggregation/main.py @@ -23,6 +23,7 @@ from services.shared.logging import inject_trace_context, setup_logging from services.shared.redis_keys import ( QUEUE_AGGREGATION, QUEUE_RECOMMENDATION, + is_pipeline_enabled, queue_key, ) @@ -134,6 +135,10 @@ async def main() -> None: try: while True: + if not await is_pipeline_enabled(redis_client): + await asyncio.sleep(1) + continue + raw = await redis_client.lpop(queue) if raw is None: await asyncio.sleep(1) diff --git a/services/api/app.py b/services/api/app.py index ea19c58..c5ae23e 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -41,7 +41,7 @@ from services.shared.audit import get_entity_audit_trail, get_order_audit_trail, from services.shared.config import load_config from services.shared.db import get_pg_pool, get_redis from services.shared.logging import new_trace_id, set_trace_context, setup_logging -from services.shared.redis_keys import PREFIX, QUEUE_BROKER, QUEUE_PREFIX, queue_key +from services.shared.redis_keys import PIPELINE_ENABLED_KEY, QUEUE_BROKER, QUEUE_PREFIX, queue_key from services.shared.schemas import MAJOR_DECISION_CATALYSTS logger = logging.getLogger("query_api") @@ -1948,7 +1948,7 @@ async def retry_failed_extractions_endpoint(): # Pipeline On/Off Toggle # --------------------------------------------------------------------------- -_PIPELINE_ENABLED_KEY = f"{PREFIX}:pipeline:enabled" +_PIPELINE_ENABLED_KEY = PIPELINE_ENABLED_KEY @app.get("/api/ops/pipeline/toggle") @@ -1966,10 +1966,33 @@ async def set_pipeline_toggle(body: dict[str, Any]): Accepts: { "enabled": true/false } Workers check this flag before processing jobs. + When disabling, optionally flush all pipeline queues so in-flight + work stops immediately. """ enabled = body.get("enabled", True) + flush = body.get("flush", not enabled) # default: flush when disabling await rds.set(_PIPELINE_ENABLED_KEY, "1" if enabled else "0") - return {"pipeline_enabled": enabled, "message": f"Pipeline {'enabled' if enabled else 'disabled'}"} + + flushed_counts: dict[str, int] = {} + if flush and not enabled: + from services.shared.redis_keys import QUEUE_PREFIX + # Flush all pipeline queues + queue_names = [ + "ingestion", "parsing", "extraction", "macro_classification", + "aggregation", "recommendation", "lake_publish", + ] + for qname in queue_names: + qkey = f"{QUEUE_PREFIX}:{qname}" + count = await rds.llen(qkey) + if count > 0: + await rds.delete(qkey) + flushed_counts[qname] = count + + msg = f"Pipeline {'enabled' if enabled else 'disabled'}" + if flushed_counts: + total = sum(flushed_counts.values()) + msg += f" — flushed {total} queued jobs" + return {"pipeline_enabled": enabled, "flushed": flushed_counts, "message": msg} @app.get("/api/ops/sources/coverage-gaps") diff --git a/services/extractor/main.py b/services/extractor/main.py index 464e136..8fabe85 100644 --- a/services/extractor/main.py +++ b/services/extractor/main.py @@ -27,6 +27,7 @@ from services.shared.redis_keys import ( QUEUE_AGGREGATION, QUEUE_EXTRACTION, QUEUE_MACRO_CLASSIFICATION, + is_pipeline_enabled, queue_key, ) @@ -421,6 +422,10 @@ async def main() -> None: try: while True: + if not await is_pipeline_enabled(redis_client): + await asyncio.sleep(1) + continue + # Alternate: every 3rd job from macro queue, rest from extraction # This prevents macro events from starving regular extractions raw = None diff --git a/services/ingestion/worker.py b/services/ingestion/worker.py index a502c63..c2e56ef 100644 --- a/services/ingestion/worker.py +++ b/services/ingestion/worker.py @@ -41,6 +41,7 @@ from services.shared.redis_keys import ( QUEUE_INGESTION, QUEUE_PARSING, dedupe_key, + is_pipeline_enabled, queue_key, ) from services.shared.storage import ( @@ -265,6 +266,9 @@ async def main(): try: while True: + if not await is_pipeline_enabled(rds): + await asyncio.sleep(2) + continue raw = await rds.lpop(queue) if raw: job = json.loads(raw) diff --git a/services/lake_publisher/jobs.py b/services/lake_publisher/jobs.py index e74583a..eb9bc3a 100644 --- a/services/lake_publisher/jobs.py +++ b/services/lake_publisher/jobs.py @@ -54,7 +54,7 @@ from services.lake_publisher.worker import ( from services.shared.config import load_config from services.shared.db import get_minio, get_pg_pool, get_redis from services.shared.logging import setup_logging -from services.shared.redis_keys import QUEUE_LAKE_PUBLISH, queue_key +from services.shared.redis_keys import QUEUE_LAKE_PUBLISH, is_pipeline_enabled, queue_key logger = logging.getLogger(__name__) @@ -865,6 +865,9 @@ async def run_worker( logger.info("Lake publisher worker started, listening on %s", queue) while True: + if not await is_pipeline_enabled(rds): + await asyncio.sleep(poll_interval) + continue raw = await rds.lpop(queue) # type: ignore[misc] if raw is None: await asyncio.sleep(poll_interval) diff --git a/services/parser/worker.py b/services/parser/worker.py index 32e6e58..fa3b9a8 100644 --- a/services/parser/worker.py +++ b/services/parser/worker.py @@ -35,7 +35,13 @@ from services.shared.metrics import ( PARSE_LOW_QUALITY_TOTAL, PARSE_QUALITY_SCORE, ) -from services.shared.redis_keys import QUEUE_EXTRACTION, QUEUE_MACRO_CLASSIFICATION, QUEUE_PARSING, queue_key +from services.shared.redis_keys import ( + QUEUE_EXTRACTION, + QUEUE_MACRO_CLASSIFICATION, + QUEUE_PARSING, + is_pipeline_enabled, + queue_key, +) from services.shared.storage import upload_normalized_text, upload_parser_output logger = logging.getLogger("parser_worker") @@ -260,6 +266,9 @@ async def main() -> None: try: while True: + if not await is_pipeline_enabled(rds): + await asyncio.sleep(2) + continue raw = await rds.lpop(queue) if raw: job = json.loads(raw) diff --git a/services/recommendation/main.py b/services/recommendation/main.py index fb0d6c2..4ef51b1 100644 --- a/services/recommendation/main.py +++ b/services/recommendation/main.py @@ -12,7 +12,7 @@ from services.recommendation.worker import generate_recommendation from services.shared.agent_config import AgentConfigResolver from services.shared.config import OllamaConfig, load_config from services.shared.logging import setup_logging -from services.shared.redis_keys import QUEUE_RECOMMENDATION, queue_key +from services.shared.redis_keys import QUEUE_RECOMMENDATION, is_pipeline_enabled, queue_key logger = logging.getLogger("recommendation_main") @@ -62,6 +62,10 @@ async def main() -> None: try: while True: + if not await is_pipeline_enabled(redis_client): + await asyncio.sleep(1) + continue + raw = await redis_client.lpop(queue) if raw is None: await asyncio.sleep(1) diff --git a/services/scheduler/app.py b/services/scheduler/app.py index faa3547..f958e80 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -20,7 +20,7 @@ from services.shared.config import load_config from services.shared.db import get_pg_pool, get_redis from services.shared.logging import setup_logging from services.shared.redis_keys import ( - PREFIX, + PIPELINE_ENABLED_KEY, QUEUE_EXTRACTION, QUEUE_INGESTION, QUEUE_MACRO_CLASSIFICATION, @@ -501,7 +501,7 @@ async def main() -> None: rds = get_redis(config) logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK) - pipeline_key = f"{PREFIX}:pipeline:enabled" + pipeline_key = PIPELINE_ENABLED_KEY # If PIPELINE_DEFAULT_OFF is set, initialize the toggle to OFF on first boot # (only if the key doesn't already exist — preserves manual overrides) diff --git a/services/shared/redis_keys.py b/services/shared/redis_keys.py index 16f6c87..e176486 100644 --- a/services/shared/redis_keys.py +++ b/services/shared/redis_keys.py @@ -89,3 +89,18 @@ def trading_cb_key(trigger_type: str) -> str: def trading_notification_rate_key(channel: str) -> str: """Return the notification rate-limit key for a given channel.""" return f"{TRADING_NOTIFICATION_RATE}:{channel}" + + +# --- Pipeline toggle --- +PIPELINE_ENABLED_KEY = f"{PREFIX}:pipeline:enabled" + + +async def is_pipeline_enabled(rds: "redis.asyncio.Redis") -> bool: # type: ignore[name-defined] # noqa: F821 + """Check whether the pipeline is enabled via the Redis toggle. + + Returns True (enabled) when the key is absent or set to anything + other than ``"0"``. Workers should call this at the top of each + loop iteration and sleep when it returns False. + """ + val = await rds.get(PIPELINE_ENABLED_KEY) + return val != "0"