import asyncio, asyncpg, json, os, redis async def main(): pool = await asyncpg.create_pool( host=os.environ["POSTGRES_HOST"], port=int(os.environ["POSTGRES_PORT"]), database=os.environ["POSTGRES_DB"], user=os.environ["POSTGRES_USER"], password=os.environ["POSTGRES_PASSWORD"], ) r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0") # Delete failed intelligence records so they don't block re-extraction await pool.execute( "DELETE FROM document_intelligence WHERE document_id IN " "(SELECT id FROM documents WHERE status = 'extraction_failed')" ) # Reset status back to parsed count = await pool.execute( "UPDATE documents SET status = 'parsed' WHERE status = 'extraction_failed'" ) print(f"Reset {count}") # Re-enqueue rows = await pool.fetch( "SELECT d.id, dcm.ticker FROM documents d " "LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id " "WHERE d.status = 'parsed'" ) for row in rows: r.rpush("stonks:queue:extraction", json.dumps({ "document_id": str(row["id"]), "ticker": row["ticker"] or "", })) print(f"Enqueued {len(rows)} docs for extraction") await pool.close() asyncio.run(main())