Files
stonks-oracle/scripts/reenqueue_filings.py
T

36 lines
1.3 KiB
Python

import asyncio, asyncpg, json, os, redis
async def main():
pool = await asyncpg.create_pool(
host=os.environ["POSTGRES_HOST"],
port=int(os.environ["POSTGRES_PORT"]),
database=os.environ["POSTGRES_DB"],
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASSWORD"],
)
r = redis.from_url(f"redis://:{os.environ.get('REDIS_PASSWORD','')}@{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}/0")
# Reset filing docs to ingested
await pool.execute(
"UPDATE documents SET status = 'ingested', parse_quality_score = NULL, parse_confidence = NULL "
"WHERE source_type = 'filings_api' AND status = 'low_quality' AND url IS NOT NULL"
)
rows = await pool.fetch(
"SELECT d.id, dcm.ticker FROM documents d "
"LEFT JOIN document_company_mentions dcm ON d.id = dcm.document_id "
"WHERE d.source_type = 'filings_api' AND d.status = 'ingested' "
"LIMIT 20" # Start with 20 to test
)
for row in rows:
r.rpush("stonks:queue:parsing", json.dumps({
"document_id": str(row["id"]),
"ticker": row["ticker"] or "",
}))
print(f"Enqueued {len(rows)} filing docs for parsing (test batch)")
await pool.close()
asyncio.run(main())