import asyncio import json import os import asyncpg import redis async def main(): pool = await asyncpg.create_pool( host=os.environ["POSTGRES_HOST"], port=int(os.environ["POSTGRES_PORT"]), database=os.environ["POSTGRES_DB"], user=os.environ["POSTGRES_USER"], password=os.environ["POSTGRES_PASSWORD"], ) r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") # Reset filing docs to ingested await pool.execute( "UPDATE documents SET status = 'ingested', parse_quality_score = NULL, parse_confidence = NULL " "WHERE source_type = 'filings_api' AND status = 'low_quality' AND url IS NOT NULL" ) rows = await pool.fetch( "SELECT d.id, dcm.ticker FROM documents d " "LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id " "WHERE d.source_type = 'filings_api' AND d.status = 'ingested' " "LIMIT 20" # Start with 20 to test ) for row in rows: r.rpush("stonks:queue:parsing", json.dumps({ "document_id": str(row["id"]), "ticker": row["ticker"] or "", })) print(f"Enqueued {len(rows)} filing docs for parsing (test batch)") await pool.close() asyncio.run(main())