diff --git a/scripts/run_reclassify_and_reaggregate.sh b/scripts/run_reclassify_and_reaggregate.sh index 386d1a2..856a4ae 100755 --- a/scripts/run_reclassify_and_reaggregate.sh +++ b/scripts/run_reclassify_and_reaggregate.sh @@ -22,9 +22,15 @@ async def run(): pg_user = os.environ.get('POSTGRES_USER', 'stonks') pg_pass = os.environ.get('POSTGRES_PASSWORD', '') pg_db = os.environ.get('POSTGRES_DB', 'stonks') - redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' + rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') + rp = os.environ.get('REDIS_PORT', '6379') + rd = os.environ.get('REDIS_DB', '0') + rpw = os.environ.get('REDIS_PASSWORD', '') + rauth = f':{rpw}@' if rpw else '' + redis_url = f'redis://{rauth}{rh}:{rp}/{rd}' + pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) r = aioredis.from_url(redis_url) ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records') @@ -54,8 +60,12 @@ echo "Extractor pods will process these. Polling queue depth..." while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " import os, redis -url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') -r = redis.from_url(url); print(r.llen('stonks:queue:macro_classification')) +rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') +rp = os.environ.get('REDIS_PORT', '6379') +rd = os.environ.get('REDIS_DB', '0') +rpw = os.environ.get('REDIS_PASSWORD', '') +rauth = f':{rpw}@' if rpw else '' +r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:macro_classification')) " 2>/dev/null) echo " Macro queue: $DEPTH" if [ "$DEPTH" = "0" ]; then @@ -83,9 +93,15 @@ async def run(): pg_user = os.environ.get('POSTGRES_USER', 'stonks') pg_pass = os.environ.get('POSTGRES_PASSWORD', '') pg_db = os.environ.get('POSTGRES_DB', 'stonks') - redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' + rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') + rp = os.environ.get('REDIS_PORT', '6379') + rd = os.environ.get('REDIS_DB', '0') + rpw = os.environ.get('REDIS_PASSWORD', '') + rauth = f':{rpw}@' if rpw else '' + redis_url = f'redis://{rauth}{rh}:{rp}/{rd}' + pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) r = aioredis.from_url(redis_url) rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker') @@ -106,8 +122,12 @@ echo "=== Step 5: Monitor aggregation queue ===" while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " import os, redis -url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') -r = redis.from_url(url); print(r.llen('stonks:queue:aggregation')) +rh = os.environ.get('REDIS_HOST', 'redis-master.redis-service.svc.cluster.local') +rp = os.environ.get('REDIS_PORT', '6379') +rd = os.environ.get('REDIS_DB', '0') +rpw = os.environ.get('REDIS_PASSWORD', '') +rauth = f':{rpw}@' if rpw else '' +r = redis.from_url(f'redis://{rauth}{rh}:{rp}/{rd}'); print(r.llen('stonks:queue:aggregation')) " 2>/dev/null) echo " Aggregation queue: $DEPTH" if [ "$DEPTH" = "0" ]; then