feat: retry failed extractions button on pipeline page

- POST /api/ops/pipeline/retry-failed endpoint resets extraction_failed
  docs to parsed, deletes failed intelligence rows, and re-enqueues
  them (batch of 200)
- Scheduler now auto-retries extraction_failed docs every ~10 minutes
  (100 per cycle, 60-min cooldown per doc)
- Pipeline page shows 'Retry Failed (N)' button when extraction_failed
  count > 0, with pending/success/error states
This commit is contained in:
Celes Renata
2026-04-20 08:09:29 +00:00
parent 5289f0f195
commit de35279269
5 changed files with 152 additions and 2 deletions
+8
View File
@@ -525,6 +525,14 @@ export function usePipelineHealth(hours = 24) {
return useGet<Record<string, unknown>>(['pipeline-health', hours], 'query', `/api/ops/pipeline/health?hours=${hours}`); return useGet<Record<string, unknown>>(['pipeline-health', hours], 'query', `/api/ops/pipeline/health?hours=${hours}`);
} }
export function useRetryFailedExtractions() {
const qc = useQueryClient();
return useMutation({
mutationFn: () => apiPost<{ retried: number; message: string }>('query', '/api/ops/pipeline/retry-failed', {}),
onSuccess: () => qc.invalidateQueries({ queryKey: ['pipeline-health'] }),
});
}
export function useIngestionSummary(hours = 24) { export function useIngestionSummary(hours = 24) {
return useGet<Record<string, unknown>>(['ingestion-summary', hours], 'query', `/api/ops/ingestion/summary?hours=${hours}`); return useGet<Record<string, unknown>>(['ingestion-summary', hours], 'query', `/api/ops/ingestion/summary?hours=${hours}`);
} }
+20 -1
View File
@@ -1,5 +1,5 @@
import { useState, useEffect } from 'react'; import { useState, useEffect } from 'react';
import { usePipelineHealth } from '../api/hooks'; import { usePipelineHealth, useRetryFailedExtractions } from '../api/hooks';
import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui'; import { LoadingSpinner, DateRangeSelector, Card } from '../components/ui';
const QUEUE_LABELS: Record<string, string> = { const QUEUE_LABELS: Record<string, string> = {
@@ -53,6 +53,7 @@ export function OpsPipelinePage() {
const [hours, setHours] = useState(24); const [hours, setHours] = useState(24);
const { data, isLoading } = usePipelineHealth(hours); const { data, isLoading } = usePipelineHealth(hours);
const stream = usePipelineStream(); const stream = usePipelineStream();
const retryMutation = useRetryFailedExtractions();
if (isLoading) return <LoadingSpinner />; if (isLoading) return <LoadingSpinner />;
@@ -70,6 +71,8 @@ export function OpsPipelinePage() {
.map((s) => [s.status, s.doc_count]), .map((s) => [s.status, s.doc_count]),
); );
const failedCount = docStages['extraction_failed'] ?? 0;
// Separate DLQ entries from regular queues // Separate DLQ entries from regular queues
const dlqEntries = Object.entries(queueDepths).filter(([k]) => k.startsWith('dlq:')); const dlqEntries = Object.entries(queueDepths).filter(([k]) => k.startsWith('dlq:'));
const regularQueues = Object.entries(QUEUE_LABELS); const regularQueues = Object.entries(QUEUE_LABELS);
@@ -79,6 +82,22 @@ export function OpsPipelinePage() {
<div className="flex items-center justify-between"> <div className="flex items-center justify-between">
<h1 className="text-xl font-semibold text-gray-100">Pipeline Health</h1> <h1 className="text-xl font-semibold text-gray-100">Pipeline Health</h1>
<div className="flex items-center gap-3"> <div className="flex items-center gap-3">
{failedCount > 0 && (
<button
type="button"
onClick={() => retryMutation.mutate()}
disabled={retryMutation.isPending}
className="rounded-md bg-amber-600 px-3 py-1.5 text-xs font-medium text-white hover:bg-amber-500 disabled:opacity-50"
>
{retryMutation.isPending ? 'Retrying…' : `Retry Failed (${failedCount})`}
</button>
)}
{retryMutation.isSuccess && (
<span className="text-xs text-green-400">{retryMutation.data.message}</span>
)}
{retryMutation.isError && (
<span className="text-xs text-red-400">Retry failed</span>
)}
{stream && ( {stream && (
<span className="flex items-center gap-1.5 text-xs text-green-400"> <span className="flex items-center gap-1.5 text-xs text-green-400">
<span className="inline-block h-2 w-2 rounded-full bg-green-400 animate-pulse" aria-hidden="true" /> <span className="inline-block h-2 w-2 rounded-full bg-green-400 animate-pulse" aria-hidden="true" />
+2 -1
View File
@@ -104,7 +104,8 @@ export const handlers = [
return HttpResponse.json({ id: 'lockout-new', ticker: body.ticker, reason: body.reason, lockout_type: body.lockout_type ?? 'manual', expires_at: new Date(Date.now() + ((body.duration_minutes as number) ?? 60) * 60000).toISOString(), created_at: new Date().toISOString() }, { status: 201 }); return HttpResponse.json({ id: 'lockout-new', ticker: body.ticker, reason: body.reason, lockout_type: body.lockout_type ?? 'manual', expires_at: new Date(Date.now() + ((body.duration_minutes as number) ?? 60) * 60000).toISOString(), created_at: new Date().toISOString() }, { status: 201 });
}), }),
http.delete('/api/admin/trading/lockouts/:id', () => HttpResponse.json({ status: 'deleted' })), http.delete('/api/admin/trading/lockouts/:id', () => HttpResponse.json({ status: 'deleted' })),
http.get('/api/ops/pipeline/health', () => HttpResponse.json({ hours: 24, document_stages: [{ status: 'extracted', doc_count: 5 }], parsing: {}, extraction: {}, aggregation: {} })), http.get('/api/ops/pipeline/health', () => HttpResponse.json({ hours: 24, document_stages: [{ status: 'extracted', doc_count: 5 }], parsing: {}, extraction: {}, aggregation: {}, queue_depths: {} })),
http.post('/api/ops/pipeline/retry-failed', () => HttpResponse.json({ retried: 10, message: 'Re-enqueued 10 documents for extraction' })),
http.get('/api/ops/ingestion/summary', () => HttpResponse.json({ total_runs: 10, completed: 8, failed: 2, total_items_fetched: 50, total_items_new: 12, by_source_type: [] })), http.get('/api/ops/ingestion/summary', () => HttpResponse.json({ total_runs: 10, completed: 8, failed: 2, total_items_fetched: 50, total_items_new: 12, by_source_type: [] })),
http.get('/api/ops/ingestion/throughput', () => HttpResponse.json([])), http.get('/api/ops/ingestion/throughput', () => HttpResponse.json([])),
http.get('/api/ops/model/performance', () => HttpResponse.json({ total_extractions: 20, success_rate: 0.9, avg_duration_ms: 1500, retry_rate: 0.05, avg_confidence: 0.8 })), http.get('/api/ops/model/performance', () => HttpResponse.json({ total_extractions: 20, success_rate: 0.9, avg_duration_ms: 1500, retry_rate: 0.05, avg_confidence: 0.8 })),
+52
View File
@@ -1869,6 +1869,58 @@ async def pipeline_stream(request: Request):
) )
@app.post("/api/ops/pipeline/retry-failed")
async def retry_failed_extractions_endpoint():
"""Re-enqueue documents stuck in extraction_failed for another attempt.
Resets up to 200 extraction_failed documents back to 'parsed',
deletes their failed intelligence rows, and pushes them onto the
extraction queue. Returns the count of documents re-enqueued.
"""
rows = await pool.fetch(
"""SELECT d.id, d.document_type, dcm.ticker
FROM documents d
LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id
WHERE d.status = 'extraction_failed'
ORDER BY d.updated_at ASC
LIMIT 200""",
)
if not rows:
return {"retried": 0, "message": "No extraction-failed documents to retry"}
doc_ids = []
for row in rows:
doc_type = row["document_type"]
if doc_type == "macro_event":
target = "stonks:queue:macro_classification"
else:
target = "stonks:queue:extraction"
await rds.rpush(target, json.dumps({
"document_id": str(row["id"]),
"ticker": row["ticker"] or "",
}))
doc_ids.append(row["id"])
# Delete failed intelligence rows so extractor starts fresh
await pool.execute(
"""DELETE FROM document_intelligence
WHERE document_id = ANY($1::uuid[])
AND validation_status = 'failed'""",
doc_ids,
)
# Reset status to 'parsed' and touch updated_at
await pool.execute(
"""UPDATE documents
SET status = 'parsed', updated_at = NOW()
WHERE id = ANY($1::uuid[])""",
doc_ids,
)
return {"retried": len(doc_ids), "message": f"Re-enqueued {len(doc_ids)} documents for extraction"}
@app.get("/api/ops/sources/coverage-gaps") @app.get("/api/ops/sources/coverage-gaps")
async def get_source_coverage_gaps(): async def get_source_coverage_gaps():
"""Identify symbols with missing or insufficient source coverage. """Identify symbols with missing or insufficient source coverage.
+70
View File
@@ -499,6 +499,7 @@ async def main() -> None:
logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK) logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK)
recovery_counter = 0 recovery_counter = 0
retry_counter = 0
cleanup_counter = 0 cleanup_counter = 0
try: try:
while True: while True:
@@ -511,6 +512,11 @@ async def main() -> None:
if recovery_counter >= 20: if recovery_counter >= 20:
recovery_counter = 0 recovery_counter = 0
await recover_stale_documents(pool, rds) await recover_stale_documents(pool, rds)
# Retry extraction failures every ~40 cycles (~10 minutes)
retry_counter += 1
if retry_counter >= 40:
retry_counter = 0
await retry_failed_extractions(pool, rds)
# Run signal cleanup periodically (~25 minutes) # Run signal cleanup periodically (~25 minutes)
cleanup_counter += 1 cleanup_counter += 1
if cleanup_counter >= CLEANUP_CYCLE_INTERVAL: if cleanup_counter >= CLEANUP_CYCLE_INTERVAL:
@@ -529,6 +535,9 @@ async def main() -> None:
# How long a document can sit in "parsed" before we consider it orphaned # How long a document can sit in "parsed" before we consider it orphaned
STALE_PARSED_THRESHOLD_MINUTES: int = 30 STALE_PARSED_THRESHOLD_MINUTES: int = 30
# How long after an extraction failure before we retry
EXTRACTION_FAILED_RETRY_MINUTES: int = 60
async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
"""Re-enqueue documents stuck in 'parsed' status for extraction. """Re-enqueue documents stuck in 'parsed' status for extraction.
@@ -584,6 +593,67 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in
return enqueued return enqueued
async def retry_failed_extractions(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
"""Re-enqueue documents stuck in 'extraction_failed' for another attempt.
Resets status to 'parsed', deletes the failed intelligence row so the
extractor treats them as fresh, and pushes them onto the extraction queue.
Only retries documents whose last attempt was at least
EXTRACTION_FAILED_RETRY_MINUTES ago to avoid tight retry loops.
Returns the number of documents re-enqueued.
"""
rows = await pool.fetch(
"""SELECT d.id, d.document_type, dcm.ticker
FROM documents d
LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id
WHERE d.status = 'extraction_failed'
AND d.updated_at < NOW() - INTERVAL '1 minute' * $1
ORDER BY d.updated_at ASC
LIMIT 100""",
EXTRACTION_FAILED_RETRY_MINUTES,
)
if not rows:
return 0
enqueued = 0
doc_ids = []
for row in rows:
doc_type = row["document_type"]
if doc_type == "macro_event":
target = queue_key(QUEUE_MACRO_CLASSIFICATION)
else:
target = queue_key(QUEUE_EXTRACTION)
await rds.rpush(target, json.dumps({
"document_id": str(row["id"]),
"ticker": row["ticker"] or "",
}))
doc_ids.append(row["id"])
enqueued += 1
if doc_ids:
# Delete failed intelligence rows so extractor starts fresh
await pool.execute(
"""DELETE FROM document_intelligence
WHERE document_id = ANY($1::uuid[])
AND validation_status = 'failed'""",
doc_ids,
)
# Reset status to 'parsed' and touch updated_at
await pool.execute(
"""UPDATE documents
SET status = 'parsed', updated_at = NOW()
WHERE id = ANY($1::uuid[])""",
doc_ids,
)
logger.info("Retried %d extraction-failed documents", enqueued)
return enqueued
# How often to run competitive signal cleanup (every ~100 cycles = ~25 minutes) # How often to run competitive signal cleanup (every ~100 cycles = ~25 minutes)
CLEANUP_CYCLE_INTERVAL: int = 100 CLEANUP_CYCLE_INTERVAL: int = 100
# Keep competitive signals for this many days # Keep competitive signals for this many days