diff --git a/infra/helm/stonks-oracle/values.yaml b/infra/helm/stonks-oracle/values.yaml index 29ef19b..71d5181 100644 --- a/infra/helm/stonks-oracle/values.yaml +++ b/infra/helm/stonks-oracle/values.yaml @@ -159,7 +159,7 @@ config: MINIO_ENDPOINT: "minio.minio-service.svc.cluster.local:80" MINIO_SECURE: "false" OLLAMA_BASE_URL: "http://ollama.ollama-service.svc.cluster.local:11434" - OLLAMA_MODEL: "qwen3.5:9b" + OLLAMA_MODEL: "qwen3.5:9b-fast" OLLAMA_TIMEOUT: "120" OLLAMA_MAX_RETRIES: "2" OLLAMA_RETRY_BASE_DELAY: "1.0" diff --git a/scripts/flush_queues.py b/scripts/flush_queues.py new file mode 100644 index 0000000..087a276 --- /dev/null +++ b/scripts/flush_queues.py @@ -0,0 +1,10 @@ +import redis, os +r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") +for q in ["ingestion","parsing","extraction","aggregation","recommendation","lake_publish","broker_orders"]: + key = f"stonks:queue:{q}" + depth = r.llen(key) + if depth > 0: + r.delete(key) + print(f" Flushed {key}: {depth} jobs") + else: + print(f" {key}: empty") diff --git a/scripts/reenqueue_all_parsed.py b/scripts/reenqueue_all_parsed.py new file mode 100644 index 0000000..01fe8fc --- /dev/null +++ b/scripts/reenqueue_all_parsed.py @@ -0,0 +1,28 @@ +import asyncio, asyncpg, json, os, redis + +async def main(): + pool = await asyncpg.create_pool( + host=os.environ["POSTGRES_HOST"], + port=int(os.environ["POSTGRES_PORT"]), + database=os.environ["POSTGRES_DB"], + user=os.environ["POSTGRES_USER"], + password=os.environ["POSTGRES_PASSWORD"], + ) + r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") + + rows = await pool.fetch( + "SELECT d.id, dcm.ticker FROM documents d " + "LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id " + "WHERE d.status = 'parsed'" + ) + + for row in rows: + r.rpush("stonks:queue:extraction", json.dumps({ + "document_id": str(row["id"]), + "ticker": row["ticker"] or "", + })) + + print(f"Enqueued {len(rows)} parsed docs for extraction") + await pool.close() + +asyncio.run(main())