diff --git a/scripts/run_reclassify_and_reaggregate.sh b/scripts/run_reclassify_and_reaggregate.sh index 660d553..386d1a2 100755 --- a/scripts/run_reclassify_and_reaggregate.sh +++ b/scripts/run_reclassify_and_reaggregate.sh @@ -1,6 +1,7 @@ #!/bin/bash # Re-classify macro events and re-aggregate all tickers. # Run from gremlin-1 (or anywhere with kubectl access to the cluster). +# Pulls DB/Redis credentials from the running pod's environment. set -euo pipefail NS="stonks-oracle" @@ -11,14 +12,21 @@ echo "" echo "=== Step 1: Re-classify macro events ===" echo "Deleting old global_events + macro_impact_records, then enqueuing docs..." kubectl exec -n $NS "$SCHED_POD" -- python -c " -import asyncio, json, logging +import asyncio, json, logging, os import asyncpg, redis.asyncio as aioredis logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') log = logging.getLogger('reclassify') async def run(): - pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) - r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') + pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local') + pg_user = os.environ.get('POSTGRES_USER', 'stonks') + pg_pass = os.environ.get('POSTGRES_PASSWORD', '') + pg_db = os.environ.get('POSTGRES_DB', 'stonks') + redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') + dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' + + pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) + r = aioredis.from_url(redis_url) ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records') ec = await pool.fetchval('SELECT count(*) FROM global_events') dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\") @@ -45,7 +53,9 @@ echo "=== Step 2: Wait for macro classification queue to drain ===" echo "Extractor pods will process these. Polling queue depth..." while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " -import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:macro_classification')) +import os, redis +url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') +r = redis.from_url(url); print(r.llen('stonks:queue:macro_classification')) " 2>/dev/null) echo " Macro queue: $DEPTH" if [ "$DEPTH" = "0" ]; then @@ -63,14 +73,21 @@ kubectl rollout status deployment/aggregation -n $NS --timeout=120s echo "" echo "=== Step 4: Enqueue re-aggregation for all tickers ===" kubectl exec -n $NS "$SCHED_POD" -- python -c " -import asyncio, json, logging +import asyncio, json, logging, os import asyncpg, redis.asyncio as aioredis logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') log = logging.getLogger('reaggregate') async def run(): - pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) - r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') + pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local') + pg_user = os.environ.get('POSTGRES_USER', 'stonks') + pg_pass = os.environ.get('POSTGRES_PASSWORD', '') + pg_db = os.environ.get('POSTGRES_DB', 'stonks') + redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') + dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}' + + pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4) + r = aioredis.from_url(redis_url) rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker') n = 0 for row in rows: @@ -88,7 +105,9 @@ echo "" echo "=== Step 5: Monitor aggregation queue ===" while true; do DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " -import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:aggregation')) +import os, redis +url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0') +r = redis.from_url(url); print(r.llen('stonks:queue:aggregation')) " 2>/dev/null) echo " Aggregation queue: $DEPTH" if [ "$DEPTH" = "0" ]; then