From be526ae6143a3d429aaa60d9948ad639b94be20e Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Tue, 21 Apr 2026 00:21:53 +0000 Subject: [PATCH] feat: pipeline on/off toggle with per-stage Helm control - Added pipelineEnabled flag to Helm values (default: true) - Worker services (scheduler, ingestion, parser, extractor, aggregation, recommendation, broker-adapter, lake-publisher) scale to 0 when disabled - API services always run regardless of toggle - Redis-based runtime toggle: POST /api/ops/pipeline/toggle - Scheduler checks the flag before each cycle - Frontend: green/red Pipeline ON/OFF button on the pipeline page - Beta defaults to pipelineEnabled: false - Base values.yaml: blanked external URLs (Ollama, Polygon, Alpaca) so stages only connect to what they explicitly configure --- .github/workflows/build.yml | 52 +++ .../unicode_data/15.0.0/codec-utf-8.json.gz | Bin 60 -> 60 bytes frontend/src/api/hooks.ts | 8 + frontend/src/pages/OpsPipeline.tsx | 12 +- frontend/src/test/mocks/handlers.ts | 2 + .../stonks-oracle/templates/deployments.yaml | 2 +- infra/helm/stonks-oracle/values-beta.yaml | 28 +- infra/helm/stonks-oracle/values.yaml | 13 + infra/inttest/promote.sh | 409 ++++++++++++++++++ pipelines/harbor/values.yaml | 5 + services/api/app.py | 35 +- services/scheduler/app.py | 8 + services/shared/migrate.py | 77 ---- tests/integration/test_signal_flow.py | 376 ++++++++++++++++ 14 files changed, 923 insertions(+), 104 deletions(-) create mode 100755 infra/inttest/promote.sh delete mode 100644 services/shared/migrate.py create mode 100644 tests/integration/test_signal_flow.py diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a48d8b1..2f04339 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -227,3 +227,55 @@ jobs: with: name: inttest-results path: inttest-results.json + + beta-gate: + needs: [integration-test] + if: github.event_name == 'push' && github.ref == 'refs/heads/main' + runs-on: self-hosted-gremlin + permissions: + contents: read + packages: read + steps: + - uses: actions/checkout@v5 + + - name: Install kubectl + run: | + if ! command -v kubectl &> /dev/null; then + curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" + chmod +x kubectl + sudo mv kubectl /usr/local/bin/kubectl + fi + kubectl version --client + + - name: Install Helm + run: | + if ! command -v helm &> /dev/null; then + curl -fsSL https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | sudo bash + fi + helm version + + - name: Configure kubectl + run: | + if [ -f /var/run/secrets/kubernetes.io/serviceaccount/token ]; then + kubectl config set-cluster in-cluster \ + --server=https://kubernetes.default.svc \ + --certificate-authority=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt + kubectl config set-credentials runner \ + --token="$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" + kubectl config set-context runner --cluster=in-cluster --user=runner + kubectl config use-context runner + fi + kubectl cluster-info || echo "WARNING: kubectl cannot reach cluster API" + + - name: Run beta gate (deploy → test → promote) + run: | + bash infra/inttest/promote.sh \ + --image-tag ${{ github.sha }} \ + --results-file beta-gate-results.json + + - name: Upload beta gate results + if: always() + uses: actions/upload-artifact@v4 + with: + name: beta-gate-results + path: beta-gate-results.json diff --git a/.hypothesis/unicode_data/15.0.0/codec-utf-8.json.gz b/.hypothesis/unicode_data/15.0.0/codec-utf-8.json.gz index aa18e8c97a6d81c34c91b2c75f1e18c03b70c826..e278eb9d0155c5bf809f61edcdd62187c1997feb 100644 GIT binary patch delta 27 icmcDq5tZ-e;9z86U|{-Rl3S2&mTgjyla(@2R2=|T!3RzN delta 27 icmcDq5tZ-e;9z86U|{-Rl3S2umQzs_pHwhWR2=|Vdk2RA diff --git a/frontend/src/api/hooks.ts b/frontend/src/api/hooks.ts index f44a707..f1b9532 100644 --- a/frontend/src/api/hooks.ts +++ b/frontend/src/api/hooks.ts @@ -533,6 +533,14 @@ export function useRetryFailedExtractions() { }); } +export function usePipelineToggle() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: (enabled: boolean) => apiPost<{ pipeline_enabled: boolean }>('query', '/api/ops/pipeline/toggle', { enabled }), + onSuccess: () => qc.invalidateQueries({ queryKey: ['pipeline-health'] }), + }); +} + export function useIngestionSummary(hours = 24) { return useGet>(['ingestion-summary', hours], 'query', `/api/ops/ingestion/summary?hours=${hours}`); } diff --git a/frontend/src/pages/OpsPipeline.tsx b/frontend/src/pages/OpsPipeline.tsx index 10f673b..c8b4156 100644 --- a/frontend/src/pages/OpsPipeline.tsx +++ b/frontend/src/pages/OpsPipeline.tsx @@ -1,5 +1,5 @@ import { useState, useEffect } from 'react'; -import { usePipelineHealth, useRetryFailedExtractions } from '../api/hooks'; +import { usePipelineHealth, useRetryFailedExtractions, usePipelineToggle } from '../api/hooks'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; const QUEUE_LABELS: Record = { @@ -54,12 +54,14 @@ export function OpsPipelinePage() { const { data, isLoading } = usePipelineHealth(hours); const stream = usePipelineStream(); const retryMutation = useRetryFailedExtractions(); + const toggleMutation = usePipelineToggle(); if (isLoading) return ; const parsing = (data?.parsing ?? {}) as Record; const extraction = (data?.extraction ?? {}) as Record; const aggregation = (data?.aggregation ?? {}) as Record; + const pipelineEnabled = (data?.pipeline_enabled ?? true) as boolean; // Prefer live stream data for queue depths and doc stages, fall back to initial fetch const queueDepths = stream?.queue_depths @@ -82,6 +84,14 @@ export function OpsPipelinePage() {

Pipeline Health

+ {failedCount > 0 && (