From a3e8009fa98456462bb9681b30eac9c91982cae3 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sun, 12 Apr 2026 10:58:53 -0700 Subject: [PATCH] phase 17: revert to qwen3.5:9b (9b-fast was removed from Ollama), add retry script --- infra/helm/stonks-oracle/values.yaml | 2 +- scripts/check_failure.py | 41 ++++++++++++++++++++++++++++ scripts/retry_failed.py | 39 ++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 scripts/check_failure.py create mode 100644 scripts/retry_failed.py diff --git a/infra/helm/stonks-oracle/values.yaml b/infra/helm/stonks-oracle/values.yaml index 423390b..ad8870b 100644 --- a/infra/helm/stonks-oracle/values.yaml +++ b/infra/helm/stonks-oracle/values.yaml @@ -159,7 +159,7 @@ config: MINIO_ENDPOINT: "minio.minio-service.svc.cluster.local:80" MINIO_SECURE: "false" OLLAMA_BASE_URL: "http://ollama.ollama-service.svc.cluster.local:11434" - OLLAMA_MODEL: "qwen3.5:9b-fast" + OLLAMA_MODEL: "qwen3.5:9b" OLLAMA_TIMEOUT: "300" OLLAMA_MAX_RETRIES: "2" OLLAMA_RETRY_BASE_DELAY: "1.0" diff --git a/scripts/check_failure.py b/scripts/check_failure.py new file mode 100644 index 0000000..43ef73c --- /dev/null +++ b/scripts/check_failure.py @@ -0,0 +1,41 @@ +from minio import Minio +import os, json + +mc = Minio(os.environ["MINIO_ENDPOINT"], access_key=os.environ["MINIO_ACCESS_KEY"], secret_key=os.environ["MINIO_SECRET_KEY"], secure=False) + +# Find the failed doc's LLM result +target = "b6003360" +objs = [o for o in mc.list_objects("stonks-llm-results", recursive=True) if target in o.object_name] +if not objs: + # Search all recent results + all_objs = sorted(mc.list_objects("stonks-llm-results", recursive=True), key=lambda o: o.last_modified, reverse=True) + for o in all_objs[:5]: + if target in o.object_name: + objs.append(o) + +for o in objs: + data = json.loads(mc.get_object("stonks-llm-results", o.object_name).read()) + print(f"success: {data.get('success')}") + print(f"total_duration_ms: {data.get('total_duration_ms')}") + for i, att in enumerate(data.get("attempts", [])): + print(f"\n attempt {i+1}:") + print(f" error: {att.get('error')}") + print(f" duration_ms: {att.get('duration_ms')}") + print(f" retryable: {att.get('retryable')}") + raw = att.get("raw_output", "") + if raw: + print(f" output ({len(raw)} chars): {raw[:200]}") + else: + print(f" output: (empty)") + +if not objs: + print(f"No LLM result found for {target}") + print("Checking most recent failed result instead...") + all_objs = sorted(mc.list_objects("stonks-llm-results", recursive=True), key=lambda o: o.last_modified, reverse=True) + for o in all_objs[:10]: + data = json.loads(mc.get_object("stonks-llm-results", o.object_name).read()) + if not data.get("success"): + print(f"\nFailed: {o.object_name}") + for i, att in enumerate(data.get("attempts", [])): + print(f" attempt {i+1}: error={att.get('error')} duration={att.get('duration_ms')}ms") + break diff --git a/scripts/retry_failed.py b/scripts/retry_failed.py new file mode 100644 index 0000000..250da02 --- /dev/null +++ b/scripts/retry_failed.py @@ -0,0 +1,39 @@ +import asyncio, asyncpg, json, os, redis + +async def main(): + pool = await asyncpg.create_pool( + host=os.environ["POSTGRES_HOST"], + port=int(os.environ["POSTGRES_PORT"]), + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + ) + r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") + + # Delete failed intelligence records so they don't block re-extraction + await pool.execute( + "DELETE FROM document_intelligence WHERE document_id IN " + "(SELECT id FROM documents WHERE status = 'extraction_failed')" + ) + + # Reset status back to parsed + count = await pool.execute( + "UPDATE documents SET status = 'parsed' WHERE status = 'extraction_failed'" + ) + print(f"Reset {count}") + + # Re-enqueue + rows = await pool.fetch( + "SELECT d.id, dcm.ticker FROM documents d " + "LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id " + "WHERE d.status = 'parsed'" + ) + for row in rows: + r.rpush("stonks:queue:extraction", json.dumps({ + "document_id": str(row["id"]), + "ticker": row["ticker"] or "", + })) + print(f"Enqueued {len(rows)} docs for extraction") + await pool.close() + +asyncio.run(main())