From de35279269e9c4bb1ba87eecb761ecbb21f6d2b9 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Mon, 20 Apr 2026 08:09:29 +0000 Subject: [PATCH] feat: retry failed extractions button on pipeline page - POST /api/ops/pipeline/retry-failed endpoint resets extraction_failed docs to parsed, deletes failed intelligence rows, and re-enqueues them (batch of 200) - Scheduler now auto-retries extraction_failed docs every ~10 minutes (100 per cycle, 60-min cooldown per doc) - Pipeline page shows 'Retry Failed (N)' button when extraction_failed count > 0, with pending/success/error states --- frontend/src/api/hooks.ts | 8 ++++ frontend/src/pages/OpsPipeline.tsx | 21 ++++++++- frontend/src/test/mocks/handlers.ts | 3 +- services/api/app.py | 52 +++++++++++++++++++++ services/scheduler/app.py | 70 +++++++++++++++++++++++++++++ 5 files changed, 152 insertions(+), 2 deletions(-) diff --git a/frontend/src/api/hooks.ts b/frontend/src/api/hooks.ts index 27ba7c3..f44a707 100644 --- a/frontend/src/api/hooks.ts +++ b/frontend/src/api/hooks.ts @@ -525,6 +525,14 @@ export function usePipelineHealth(hours = 24) { return useGet>(['pipeline-health', hours], 'query', `/api/ops/pipeline/health?hours=${hours}`); } +export function useRetryFailedExtractions() { + const qc = useQueryClient(); + return useMutation({ + mutationFn: () => apiPost<{ retried: number; message: string }>('query', '/api/ops/pipeline/retry-failed', {}), + onSuccess: () => qc.invalidateQueries({ queryKey: ['pipeline-health'] }), + }); +} + export function useIngestionSummary(hours = 24) { return useGet>(['ingestion-summary', hours], 'query', `/api/ops/ingestion/summary?hours=${hours}`); } diff --git a/frontend/src/pages/OpsPipeline.tsx b/frontend/src/pages/OpsPipeline.tsx index 724898b..10f673b 100644 --- a/frontend/src/pages/OpsPipeline.tsx +++ b/frontend/src/pages/OpsPipeline.tsx @@ -1,5 +1,5 @@ import { useState, useEffect } from 'react'; -import { usePipelineHealth } from '../api/hooks'; +import { usePipelineHealth, useRetryFailedExtractions } from '../api/hooks'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; const QUEUE_LABELS: Record = { @@ -53,6 +53,7 @@ export function OpsPipelinePage() { const [hours, setHours] = useState(24); const { data, isLoading } = usePipelineHealth(hours); const stream = usePipelineStream(); + const retryMutation = useRetryFailedExtractions(); if (isLoading) return ; @@ -70,6 +71,8 @@ export function OpsPipelinePage() { .map((s) => [s.status, s.doc_count]), ); + const failedCount = docStages['extraction_failed'] ?? 0; + // Separate DLQ entries from regular queues const dlqEntries = Object.entries(queueDepths).filter(([k]) => k.startsWith('dlq:')); const regularQueues = Object.entries(QUEUE_LABELS); @@ -79,6 +82,22 @@ export function OpsPipelinePage() {

Pipeline Health

+ {failedCount > 0 && ( + + )} + {retryMutation.isSuccess && ( + {retryMutation.data.message} + )} + {retryMutation.isError && ( + Retry failed + )} {stream && (