phase 17: revert to qwen3.5:9b (9b-fast was removed from Ollama), add retry script
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
from minio import Minio
|
||||
import os, json
|
||||
|
||||
mc = Minio(os.environ["MINIO_ENDPOINT"], access_key=os.environ["MINIO_ACCESS_KEY"], secret_key=os.environ["MINIO_SECRET_KEY"], secure=False)
|
||||
|
||||
# Find the failed doc's LLM result
|
||||
target = "b6003360"
|
||||
objs = [o for o in mc.list_objects("stonks-llm-results", recursive=True) if target in o.object_name]
|
||||
if not objs:
|
||||
# Search all recent results
|
||||
all_objs = sorted(mc.list_objects("stonks-llm-results", recursive=True), key=lambda o: o.last_modified, reverse=True)
|
||||
for o in all_objs[:5]:
|
||||
if target in o.object_name:
|
||||
objs.append(o)
|
||||
|
||||
for o in objs:
|
||||
data = json.loads(mc.get_object("stonks-llm-results", o.object_name).read())
|
||||
print(f"success: {data.get('success')}")
|
||||
print(f"total_duration_ms: {data.get('total_duration_ms')}")
|
||||
for i, att in enumerate(data.get("attempts", [])):
|
||||
print(f"\n attempt {i+1}:")
|
||||
print(f" error: {att.get('error')}")
|
||||
print(f" duration_ms: {att.get('duration_ms')}")
|
||||
print(f" retryable: {att.get('retryable')}")
|
||||
raw = att.get("raw_output", "")
|
||||
if raw:
|
||||
print(f" output ({len(raw)} chars): {raw[:200]}")
|
||||
else:
|
||||
print(f" output: (empty)")
|
||||
|
||||
if not objs:
|
||||
print(f"No LLM result found for {target}")
|
||||
print("Checking most recent failed result instead...")
|
||||
all_objs = sorted(mc.list_objects("stonks-llm-results", recursive=True), key=lambda o: o.last_modified, reverse=True)
|
||||
for o in all_objs[:10]:
|
||||
data = json.loads(mc.get_object("stonks-llm-results", o.object_name).read())
|
||||
if not data.get("success"):
|
||||
print(f"\nFailed: {o.object_name}")
|
||||
for i, att in enumerate(data.get("attempts", [])):
|
||||
print(f" attempt {i+1}: error={att.get('error')} duration={att.get('duration_ms')}ms")
|
||||
break
|
||||
@@ -0,0 +1,39 @@
|
||||
import asyncio, asyncpg, json, os, redis
|
||||
|
||||
async def main():
|
||||
pool = await asyncpg.create_pool(
|
||||
host=os.environ["POSTGRES_HOST"],
|
||||
port=int(os.environ["POSTGRES_PORT"]),
|
||||
database=os.environ["POSTGRES_DB"],
|
||||
user=os.environ["POSTGRES_USER"],
|
||||
password=os.environ["POSTGRES_PASSWORD"],
|
||||
)
|
||||
r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0")
|
||||
|
||||
# Delete failed intelligence records so they don't block re-extraction
|
||||
await pool.execute(
|
||||
"DELETE FROM document_intelligence WHERE document_id IN "
|
||||
"(SELECT id FROM documents WHERE status = 'extraction_failed')"
|
||||
)
|
||||
|
||||
# Reset status back to parsed
|
||||
count = await pool.execute(
|
||||
"UPDATE documents SET status = 'parsed' WHERE status = 'extraction_failed'"
|
||||
)
|
||||
print(f"Reset {count}")
|
||||
|
||||
# Re-enqueue
|
||||
rows = await pool.fetch(
|
||||
"SELECT d.id, dcm.ticker FROM documents d "
|
||||
"LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id "
|
||||
"WHERE d.status = 'parsed'"
|
||||
)
|
||||
for row in rows:
|
||||
r.rpush("stonks:queue:extraction", json.dumps({
|
||||
"document_id": str(row["id"]),
|
||||
"ticker": row["ticker"] or "",
|
||||
}))
|
||||
print(f"Enqueued {len(rows)} docs for extraction")
|
||||
await pool.close()
|
||||
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user