phase 17: switch to qwen3.5:9b-fast (32k context), add queue management scripts

This commit is contained in:
Celes Renata
2026-04-12 10:19:28 -07:00
parent 1993bfdf3e
commit 7ee1d0f050
3 changed files with 39 additions and 1 deletions
+10
View File
@@ -0,0 +1,10 @@
import redis, os
r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0")
for q in ["ingestion","parsing","extraction","aggregation","recommendation","lake_publish","broker_orders"]:
key = f"stonks:queue:{q}"
depth = r.llen(key)
if depth > 0:
r.delete(key)
print(f" Flushed {key}: {depth} jobs")
else:
print(f" {key}: empty")
+28
View File
@@ -0,0 +1,28 @@
import asyncio, asyncpg, json, os, redis
async def main():
pool = await asyncpg.create_pool(
host=os.environ["POSTGRES_HOST"],
port=int(os.environ["POSTGRES_PORT"]),
database=os.environ["POSTGRES_DB"],
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASSWORD"],
)
r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0")
rows = await pool.fetch(
"SELECT d.id, dcm.ticker FROM documents d "
"LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id "
"WHERE d.status = 'parsed'"
)
for row in rows:
r.rpush("stonks:queue:extraction", json.dumps({
"document_id": str(row["id"]),
"ticker": row["ticker"] or "",
}))
print(f"Enqueued {len(rows)} parsed docs for extraction")
await pool.close()
asyncio.run(main())