feat: pipeline on/off toggle with per-stage Helm control
- Added pipelineEnabled flag to Helm values (default: true) - Worker services (scheduler, ingestion, parser, extractor, aggregation, recommendation, broker-adapter, lake-publisher) scale to 0 when disabled - API services always run regardless of toggle - Redis-based runtime toggle: POST /api/ops/pipeline/toggle - Scheduler checks the flag before each cycle - Frontend: green/red Pipeline ON/OFF button on the pipeline page - Beta defaults to pipelineEnabled: false - Base values.yaml: blanked external URLs (Ollama, Polygon, Alpaca) so stages only connect to what they explicitly configure
This commit is contained in:
+34
-1
@@ -41,7 +41,7 @@ from services.shared.audit import get_entity_audit_trail, get_order_audit_trail,
|
||||
from services.shared.config import load_config
|
||||
from services.shared.db import get_pg_pool, get_redis
|
||||
from services.shared.logging import new_trace_id, set_trace_context, setup_logging
|
||||
from services.shared.redis_keys import QUEUE_PREFIX, queue_key
|
||||
from services.shared.redis_keys import PREFIX, QUEUE_PREFIX, queue_key
|
||||
from services.shared.schemas import MAJOR_DECISION_CATALYSTS
|
||||
|
||||
logger = logging.getLogger("query_api")
|
||||
@@ -1787,8 +1787,13 @@ async def get_pipeline_health(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Pipeline enabled flag
|
||||
pipeline_flag = await rds.get(_PIPELINE_ENABLED_KEY) if rds else None
|
||||
pipeline_enabled = pipeline_flag != "0" if pipeline_flag is not None else True
|
||||
|
||||
return {
|
||||
"hours": hours,
|
||||
"pipeline_enabled": pipeline_enabled,
|
||||
"document_stages": [_row_to_dict(r) for r in doc_stages],
|
||||
"parsing": _row_to_dict(parse_quality) if parse_quality else {},
|
||||
"extraction": _row_to_dict(extraction_stats) if extraction_stats else {},
|
||||
@@ -1927,6 +1932,34 @@ async def retry_failed_extractions_endpoint():
|
||||
return {"retried": len(doc_ids), "message": f"Re-enqueued {len(doc_ids)} documents for extraction"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pipeline On/Off Toggle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PIPELINE_ENABLED_KEY = f"{PREFIX}:pipeline:enabled"
|
||||
|
||||
|
||||
@app.get("/api/ops/pipeline/toggle")
|
||||
async def get_pipeline_toggle():
|
||||
"""Get the current pipeline enabled/disabled state."""
|
||||
val = await rds.get(_PIPELINE_ENABLED_KEY)
|
||||
# Default to enabled if key doesn't exist
|
||||
enabled = val != "0"
|
||||
return {"pipeline_enabled": enabled}
|
||||
|
||||
|
||||
@app.post("/api/ops/pipeline/toggle")
|
||||
async def set_pipeline_toggle(body: dict[str, Any]):
|
||||
"""Toggle the pipeline on or off.
|
||||
|
||||
Accepts: { "enabled": true/false }
|
||||
Workers check this flag before processing jobs.
|
||||
"""
|
||||
enabled = body.get("enabled", True)
|
||||
await rds.set(_PIPELINE_ENABLED_KEY, "1" if enabled else "0")
|
||||
return {"pipeline_enabled": enabled, "message": f"Pipeline {'enabled' if enabled else 'disabled'}"}
|
||||
|
||||
|
||||
@app.get("/api/ops/sources/coverage-gaps")
|
||||
async def get_source_coverage_gaps():
|
||||
"""Identify symbols with missing or insufficient source coverage.
|
||||
|
||||
@@ -19,6 +19,7 @@ from services.shared.config import load_config
|
||||
from services.shared.db import get_pg_pool, get_redis
|
||||
from services.shared.logging import setup_logging
|
||||
from services.shared.redis_keys import (
|
||||
PREFIX,
|
||||
QUEUE_EXTRACTION,
|
||||
QUEUE_INGESTION,
|
||||
QUEUE_MACRO_CLASSIFICATION,
|
||||
@@ -499,12 +500,19 @@ async def main() -> None:
|
||||
rds = get_redis(config)
|
||||
|
||||
logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK)
|
||||
pipeline_key = f"{PREFIX}:pipeline:enabled"
|
||||
recovery_counter = 0
|
||||
retry_counter = 0
|
||||
cleanup_counter = 0
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
# Check pipeline toggle — skip cycle if disabled
|
||||
flag = await rds.get(pipeline_key)
|
||||
if flag == "0":
|
||||
await asyncio.sleep(SCHEDULER_TICK)
|
||||
continue
|
||||
|
||||
if await acquire_lock(rds, "scheduler_cycle", ttl=30):
|
||||
try:
|
||||
await schedule_cycle(pool, rds)
|
||||
|
||||
@@ -1,77 +0,0 @@
|
||||
"""Database migration runner using asyncpg.
|
||||
|
||||
Applies all SQL migration files from infra/migrations/ in sorted order.
|
||||
Each file is split on semicolons and executed statement-by-statement.
|
||||
Idempotent — migrations use IF NOT EXISTS / CREATE OR REPLACE patterns.
|
||||
|
||||
Usage:
|
||||
python -m services.shared.migrate
|
||||
"""
|
||||
import asyncio
|
||||
import glob
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
import asyncpg
|
||||
|
||||
logger = logging.getLogger("migrate")
|
||||
|
||||
|
||||
async def run_migrations() -> None:
|
||||
host = os.getenv("POSTGRES_HOST", "localhost")
|
||||
port = int(os.getenv("POSTGRES_PORT", "5432"))
|
||||
user = os.getenv("POSTGRES_USER", "stonks")
|
||||
password = os.getenv("POSTGRES_PASSWORD", "")
|
||||
database = os.getenv("POSTGRES_DB", "stonks")
|
||||
|
||||
migrations_dir = os.path.join(
|
||||
os.path.dirname(__file__), "..", "..", "infra", "migrations"
|
||||
)
|
||||
migrations_dir = os.path.normpath(migrations_dir)
|
||||
|
||||
if not os.path.isdir(migrations_dir):
|
||||
logger.error("Migrations directory not found: %s", migrations_dir)
|
||||
sys.exit(1)
|
||||
|
||||
files = sorted(glob.glob(os.path.join(migrations_dir, "*.sql")))
|
||||
if not files:
|
||||
logger.warning("No migration files found in %s", migrations_dir)
|
||||
return
|
||||
|
||||
logger.info("Connecting to %s@%s:%d/%s", user, host, port, database)
|
||||
conn = await asyncpg.connect(
|
||||
host=host, port=port, user=user, password=password, database=database
|
||||
)
|
||||
|
||||
try:
|
||||
for path in files:
|
||||
name = os.path.basename(path)
|
||||
with open(path) as f:
|
||||
sql = f.read()
|
||||
# Split on semicolons and execute each statement individually.
|
||||
# asyncpg.execute() doesn't support multi-statement strings.
|
||||
statements = [s.strip() for s in sql.split(";") if s.strip()]
|
||||
try:
|
||||
for stmt in statements:
|
||||
await conn.execute(stmt)
|
||||
logger.info(" ✓ %s (%d statements)", name, len(statements))
|
||||
except Exception as exc:
|
||||
logger.warning(" ⚠ %s: %s", name, exc)
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
logger.info("Migrations complete (%d files)", len(files))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s %(name)s %(message)s",
|
||||
datefmt="%H:%M:%S",
|
||||
)
|
||||
asyncio.run(run_migrations())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user