Files
stonks-oracle/scripts/run_reclassify_and_reaggregate.sh
T
Celes Renata 1842b2039c ops: add SQL query test script and macro reclassify/reaggregate runner
- scripts/test_saved_queries.py: tests all 24 saved SQL explorer queries
  against the live Trino API (all 24 pass)
- scripts/run_reclassify_and_reaggregate.sh: self-contained script to
  re-classify macro events with updated prompts and re-aggregate all
  tickers. Scales aggregation to 16 pods, monitors queues, scales back.
2026-04-17 08:03:14 +00:00

105 lines
4.3 KiB
Bash
Executable File

#!/bin/bash
# Re-classify macro events and re-aggregate all tickers.
# Run from gremlin-1 (or anywhere with kubectl access to the cluster).
set -euo pipefail
NS="stonks-oracle"
SCHED_POD=$(kubectl get pods -n $NS -l app=scheduler -o jsonpath='{.items[0].metadata.name}')
echo "Using scheduler pod: $SCHED_POD"
echo ""
echo "=== Step 1: Re-classify macro events ==="
echo "Deleting old global_events + macro_impact_records, then enqueuing docs..."
kubectl exec -n $NS "$SCHED_POD" -- python -c "
import asyncio, json, logging
import asyncpg, redis.asyncio as aioredis
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('reclassify')
async def run():
pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4)
r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0')
ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records')
ec = await pool.fetchval('SELECT count(*) FROM global_events')
dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\")
log.info('Current: %d events, %d impacts, %d macro docs', ec, ic, dc)
await pool.execute('DELETE FROM macro_impact_records')
log.info('Deleted macro_impact_records')
await pool.execute('DELETE FROM global_events')
log.info('Deleted global_events')
rows = await pool.fetch(\"SELECT id FROM documents WHERE document_type = 'macro_event' AND status != 'rejected' ORDER BY published_at DESC\")
n = 0
for row in rows:
await r.rpush('stonks:queue:macro_classification', json.dumps({'document_id': str(row['id'])}))
n += 1
log.info('Enqueued %d macro re-classification jobs', n)
log.info('Queue depth: %d', await r.llen('stonks:queue:macro_classification'))
await pool.close()
await r.aclose()
asyncio.run(run())
"
echo ""
echo "=== Step 2: Wait for macro classification queue to drain ==="
echo "Extractor pods will process these. Polling queue depth..."
while true; do
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:macro_classification'))
" 2>/dev/null)
echo " Macro queue: $DEPTH"
if [ "$DEPTH" = "0" ]; then
echo "Macro queue drained!"
break
fi
sleep 15
done
echo ""
echo "=== Step 3: Scale aggregation to 16 pods ==="
kubectl scale deployment/aggregation --replicas=16 -n $NS
kubectl rollout status deployment/aggregation -n $NS --timeout=120s
echo ""
echo "=== Step 4: Enqueue re-aggregation for all tickers ==="
kubectl exec -n $NS "$SCHED_POD" -- python -c "
import asyncio, json, logging
import asyncpg, redis.asyncio as aioredis
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('reaggregate')
async def run():
pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4)
r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0')
rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker')
n = 0
for row in rows:
await r.rpush('stonks:queue:aggregation', json.dumps({'ticker': row['ticker'], 'reaggregate': True}))
n += 1
log.info('Enqueued %d aggregation jobs', n)
log.info('Queue depth: %d', await r.llen('stonks:queue:aggregation'))
await pool.close()
await r.aclose()
asyncio.run(run())
"
echo ""
echo "=== Step 5: Monitor aggregation queue ==="
while true; do
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:aggregation'))
" 2>/dev/null)
echo " Aggregation queue: $DEPTH"
if [ "$DEPTH" = "0" ]; then
echo "Aggregation queue drained!"
break
fi
sleep 10
done
echo ""
echo "=== Step 6: Scale aggregation back to 4 pods ==="
kubectl scale deployment/aggregation --replicas=4 -n $NS
echo "Done! Re-classification and re-aggregation complete."