678072e96b
Redis uses separate env vars, not a single REDIS_URL. Script now builds the connection string from REDIS_HOST, REDIS_PORT, REDIS_DB, and REDIS_PASSWORD — matching how services/shared/config.py does it.
144 lines
5.7 KiB
Bash
Executable File
144 lines
5.7 KiB
Bash
Executable File
#!/bin/bash
|
|
# Re-classify macro events and re-aggregate all tickers.
|
|
# Run from gremlin-1 (or anywhere with kubectl access to the cluster).
|
|
# Pulls DB/Redis credentials from the running pod's environment.
|
|
set -euo pipefail
|
|
NS="stonks-oracle"
|
|
|
|
SCHED_POD=$(kubectl get pods -n $NS -l app=scheduler -o jsonpath='{.items[0].metadata.name}')
|
|
echo "Using scheduler pod: $SCHED_POD"
|
|
|
|
echo ""
|
|
echo "=== Step 1: Re-classify macro events ==="
|
|
echo "Deleting old global_events + macro_impact_records, then enqueuing docs..."
|
|
kubectl exec -n $NS "$SCHED_POD" -- python -c "
|
|
import asyncio, json, logging, os
|
|
import asyncpg, redis.asyncio as aioredis
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
log = logging.getLogger('reclassify')
|
|
|
|
async def run():
|
|
pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local')
|
|
pg_user = os.environ.get('POSTGRES_USER', 'stonks')
|
|
pg_pass = os.environ.get('POSTGRES_PASSWORD', '')
|
|
pg_db = os.environ.get('POSTGRES_DB', 'stonks')
|
|
dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}'
|
|
|
|
rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local')
|
|
rp = os.environ.get('REDIS_PORT', '6379')
|
|
rd = os.environ.get('REDIS_DB', '0')
|
|
rpw = os.environ.get('REDIS_PASSWORD', '')
|
|
rauth = f':{rpw}@' if rpw else ''
|
|
redis_url = f'redis://{rauth}{rh}:{rp}/{rd}'
|
|
|
|
pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4)
|
|
r = aioredis.from_url(redis_url)
|
|
ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records')
|
|
ec = await pool.fetchval('SELECT count(*) FROM global_events')
|
|
dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\")
|
|
log.info('Current: %d events, %d impacts, %d macro docs', ec, ic, dc)
|
|
await pool.execute('DELETE FROM macro_impact_records')
|
|
log.info('Deleted macro_impact_records')
|
|
await pool.execute('DELETE FROM global_events')
|
|
log.info('Deleted global_events')
|
|
rows = await pool.fetch(\"SELECT id FROM documents WHERE document_type = 'macro_event' AND status != 'rejected' ORDER BY published_at DESC\")
|
|
n = 0
|
|
for row in rows:
|
|
await r.rpush('stonks:queue:macro_classification', json.dumps({'document_id': str(row['id'])}))
|
|
n += 1
|
|
log.info('Enqueued %d macro re-classification jobs', n)
|
|
log.info('Queue depth: %d', await r.llen('stonks:queue:macro_classification'))
|
|
await pool.close()
|
|
await r.aclose()
|
|
|
|
asyncio.run(run())
|
|
"
|
|
|
|
echo ""
|
|
echo "=== Step 2: Wait for macro classification queue to drain ==="
|
|
echo "Extractor pods will process these. Polling queue depth..."
|
|
while true; do
|
|
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
|
|
import os, redis
|
|
rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local')
|
|
rp = os.environ.get('REDIS_PORT', '6379')
|
|
rd = os.environ.get('REDIS_DB', '0')
|
|
rpw = os.environ.get('REDIS_PASSWORD', '')
|
|
rauth = f':{rpw}@' if rpw else ''
|
|
r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:macro_classification'))
|
|
" 2>/dev/null)
|
|
echo " Macro queue: $DEPTH"
|
|
if [ "$DEPTH" = "0" ]; then
|
|
echo "Macro queue drained!"
|
|
break
|
|
fi
|
|
sleep 15
|
|
done
|
|
|
|
echo ""
|
|
echo "=== Step 3: Scale aggregation to 16 pods ==="
|
|
kubectl scale deployment/aggregation --replicas=16 -n $NS
|
|
kubectl rollout status deployment/aggregation -n $NS --timeout=120s
|
|
|
|
echo ""
|
|
echo "=== Step 4: Enqueue re-aggregation for all tickers ==="
|
|
kubectl exec -n $NS "$SCHED_POD" -- python -c "
|
|
import asyncio, json, logging, os
|
|
import asyncpg, redis.asyncio as aioredis
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
log = logging.getLogger('reaggregate')
|
|
|
|
async def run():
|
|
pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local')
|
|
pg_user = os.environ.get('POSTGRES_USER', 'stonks')
|
|
pg_pass = os.environ.get('POSTGRES_PASSWORD', '')
|
|
pg_db = os.environ.get('POSTGRES_DB', 'stonks')
|
|
dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}'
|
|
|
|
rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local')
|
|
rp = os.environ.get('REDIS_PORT', '6379')
|
|
rd = os.environ.get('REDIS_DB', '0')
|
|
rpw = os.environ.get('REDIS_PASSWORD', '')
|
|
rauth = f':{rpw}@' if rpw else ''
|
|
redis_url = f'redis://{rauth}{rh}:{rp}/{rd}'
|
|
|
|
pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4)
|
|
r = aioredis.from_url(redis_url)
|
|
rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker')
|
|
n = 0
|
|
for row in rows:
|
|
await r.rpush('stonks:queue:aggregation', json.dumps({'ticker': row['ticker'], 'reaggregate': True}))
|
|
n += 1
|
|
log.info('Enqueued %d aggregation jobs', n)
|
|
log.info('Queue depth: %d', await r.llen('stonks:queue:aggregation'))
|
|
await pool.close()
|
|
await r.aclose()
|
|
|
|
asyncio.run(run())
|
|
"
|
|
|
|
echo ""
|
|
echo "=== Step 5: Monitor aggregation queue ==="
|
|
while true; do
|
|
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
|
|
import os, redis
|
|
rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local')
|
|
rp = os.environ.get('REDIS_PORT', '6379')
|
|
rd = os.environ.get('REDIS_DB', '0')
|
|
rpw = os.environ.get('REDIS_PASSWORD', '')
|
|
rauth = f':{rpw}@' if rpw else ''
|
|
r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:aggregation'))
|
|
" 2>/dev/null)
|
|
echo " Aggregation queue: $DEPTH"
|
|
if [ "$DEPTH" = "0" ]; then
|
|
echo "Aggregation queue drained!"
|
|
break
|
|
fi
|
|
sleep 10
|
|
done
|
|
|
|
echo ""
|
|
echo "=== Step 6: Scale aggregation back to 4 pods ==="
|
|
kubectl scale deployment/aggregation --replicas=4 -n $NS
|
|
echo "Done! Re-classification and re-aggregation complete."
|