#!/bin/bash # Re-classify macro events and re-aggregate all tickers. # Run from gremlin-1 (or anywhere with kubectl access to the cluster). # Pulls DB/Redis credentials from the running pod's environment. set -euo pipefail NS="stonks-oracle" SCHED_POD=$(kubectl get pods -n $NS -l app=scheduler -o jsonpath='{.items[0].metadata.name}') echo "Using scheduler pod: $SCHED_POD" echo "" echo "=== Step 1: Re-classify macro events ===" echo "Deleting old global_events + macro_impact_records, then enqueuing docs..." kubectl exec -n $NS "$SCHED_POD" -- python -c " import asyncio, json, logging, os import asyncpg, redis.asyncio as aioredis logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') log = logging.getLogger('reclassify') async def run(): pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local') pg_user = os.environ.get('POSTGRES_USER', 'stonks') pg_pass = os.environ.get('POSTGRES_PASSWORD', '') pg_db = os.environ.get('POSTGRES_DB', 'stonks') dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') rp = os.environ.get('REDIS_PORT', '6379') rd = os.environ.get('REDIS_DB', '0') rpw = os.environ.get('REDIS_PASSWORD', '') rauth = f':{rpw}@' if rpw else '' redis_url = f'redis://{rauth}{rh}:{rp}/{rd}' pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) r = aioredis.from_url(redis_url) ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records') ec = await pool.fetchval('SELECT count(*) FROM global_events') dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\") log.info('Current: %d events, %d impacts, %d macro docs', ec, ic, dc) await pool.execute('DELETE FROM macro_impact_records') log.info('Deleted macro_impact_records') await pool.execute('DELETE FROM global_events') log.info('Deleted global_events') rows = await pool.fetch(\"SELECT id FROM documents WHERE document_type = 'macro_event' AND status != 'rejected' ORDER BY published_at DESC\") n = 0 for row in rows: await r.rpush('stonks:queue:macro_classification', json.dumps({'document_id': str(row['id'])})) n += 1 log.info('Enqueued %d macro re-classification jobs', n) log.info('Queue depth: %d', await r.llen('stonks:queue:macro_classification')) await pool.close() await r.aclose() asyncio.run(run()) " echo "" echo "=== Step 2: Wait for macro classification queue to drain ===" echo "Extractor pods will process these. Polling queue depth..." while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " import os, redis rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') rp = os.environ.get('REDIS_PORT', '6379') rd = os.environ.get('REDIS_DB', '0') rpw = os.environ.get('REDIS_PASSWORD', '') rauth = f':{rpw}@' if rpw else '' r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:macro_classification')) " 2>/dev/null) echo " Macro queue: $DEPTH" if [ "$DEPTH" = "0" ]; then echo "Macro queue drained!" break fi sleep 15 done echo "" echo "=== Step 3: Scale aggregation to 16 pods ===" kubectl scale deployment/aggregation --replicas=16 -n $NS kubectl rollout status deployment/aggregation -n $NS --timeout=120s echo "" echo "=== Step 4: Enqueue re-aggregation for all tickers ===" kubectl exec -n $NS "$SCHED_POD" -- python -c " import asyncio, json, logging, os import asyncpg, redis.asyncio as aioredis logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') log = logging.getLogger('reaggregate') async def run(): pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local') pg_user = os.environ.get('POSTGRES_USER', 'stonks') pg_pass = os.environ.get('POSTGRES_PASSWORD', '') pg_db = os.environ.get('POSTGRES_DB', 'stonks') dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') rp = os.environ.get('REDIS_PORT', '6379') rd = os.environ.get('REDIS_DB', '0') rpw = os.environ.get('REDIS_PASSWORD', '') rauth = f':{rpw}@' if rpw else '' redis_url = f'redis://{rauth}{rh}:{rp}/{rd}' pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) r = aioredis.from_url(redis_url) rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker') n = 0 for row in rows: await r.rpush('stonks:queue:aggregation', json.dumps({'ticker': row['ticker'], 'reaggregate': True})) n += 1 log.info('Enqueued %d aggregation jobs', n) log.info('Queue depth: %d', await r.llen('stonks:queue:aggregation')) await pool.close() await r.aclose() asyncio.run(run()) " echo "" echo "=== Step 5: Monitor aggregation queue ===" while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " import os, redis rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') rp = os.environ.get('REDIS_PORT', '6379') rd = os.environ.get('REDIS_DB', '0') rpw = os.environ.get('REDIS_PASSWORD', '') rauth = f':{rpw}@' if rpw else '' r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:aggregation')) " 2>/dev/null) echo " Aggregation queue: $DEPTH" if [ "$DEPTH" = "0" ]; then echo "Aggregation queue drained!" break fi sleep 10 done echo "" echo "=== Step 6: Scale aggregation back to 4 pods ===" kubectl scale deployment/aggregation --replicas=4 -n $NS echo "Done! Re-classification and re-aggregation complete."