fix: reclassify script reads DB/Redis creds from pod env vars

No more hardcoded passwords — pulls POSTGRES_HOST, POSTGRES_USER,
POSTGRES_PASSWORD, POSTGRES_DB, and REDIS_URL from the pod's
environment (injected by k8s secrets).
This commit is contained in:
Celes Renata
2026-04-17 08:19:23 +00:00
parent 9c4118c0e7
commit 06e1e7ea0f
+27 -8
View File
@@ -1,6 +1,7 @@
#!/bin/bash #!/bin/bash
# Re-classify macro events and re-aggregate all tickers. # Re-classify macro events and re-aggregate all tickers.
# Run from gremlin-1 (or anywhere with kubectl access to the cluster). # Run from gremlin-1 (or anywhere with kubectl access to the cluster).
# Pulls DB/Redis credentials from the running pod's environment.
set -euo pipefail set -euo pipefail
NS="stonks-oracle" NS="stonks-oracle"
@@ -11,14 +12,21 @@ echo ""
echo "=== Step 1: Re-classify macro events ===" echo "=== Step 1: Re-classify macro events ==="
echo "Deleting old global_events + macro_impact_records, then enqueuing docs..." echo "Deleting old global_events + macro_impact_records, then enqueuing docs..."
kubectl exec -n $NS "$SCHED_POD" -- python -c " kubectl exec -n $NS "$SCHED_POD" -- python -c "
import asyncio, json, logging import asyncio, json, logging, os
import asyncpg, redis.asyncio as aioredis import asyncpg, redis.asyncio as aioredis
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('reclassify') log = logging.getLogger('reclassify')
async def run(): async def run():
pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local')
r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') pg_user = os.environ.get('POSTGRES_USER', 'stonks')
pg_pass = os.environ.get('POSTGRES_PASSWORD', '')
pg_db = os.environ.get('POSTGRES_DB', 'stonks')
redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0')
dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}'
pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4)
r = aioredis.from_url(redis_url)
ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records') ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records')
ec = await pool.fetchval('SELECT count(*) FROM global_events') ec = await pool.fetchval('SELECT count(*) FROM global_events')
dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\") dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\")
@@ -45,7 +53,9 @@ echo "=== Step 2: Wait for macro classification queue to drain ==="
echo "Extractor pods will process these. Polling queue depth..." echo "Extractor pods will process these. Polling queue depth..."
while true; do while true; do
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:macro_classification')) import os, redis
url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0')
r = redis.from_url(url); print(r.llen('stonks:queue:macro_classification'))
" 2>/dev/null) " 2>/dev/null)
echo " Macro queue: $DEPTH" echo " Macro queue: $DEPTH"
if [ "$DEPTH" = "0" ]; then if [ "$DEPTH" = "0" ]; then
@@ -63,14 +73,21 @@ kubectl rollout status deployment/aggregation -n $NS --timeout=120s
echo "" echo ""
echo "=== Step 4: Enqueue re-aggregation for all tickers ===" echo "=== Step 4: Enqueue re-aggregation for all tickers ==="
kubectl exec -n $NS "$SCHED_POD" -- python -c " kubectl exec -n $NS "$SCHED_POD" -- python -c "
import asyncio, json, logging import asyncio, json, logging, os
import asyncpg, redis.asyncio as aioredis import asyncpg, redis.asyncio as aioredis
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('reaggregate') log = logging.getLogger('reaggregate')
async def run(): async def run():
pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) pg_host = os.environ.get('POSTGRES_HOST', 'postgresql-rw.postgresql-service.svc.cluster.local')
r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') pg_user = os.environ.get('POSTGRES_USER', 'stonks')
pg_pass = os.environ.get('POSTGRES_PASSWORD', '')
pg_db = os.environ.get('POSTGRES_DB', 'stonks')
redis_url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0')
dsn = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:5432/{pg_db}'
pool = await asyncpg.create_pool(dsn=dsn, min_size=1, max_size=4)
r = aioredis.from_url(redis_url)
rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker') rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker')
n = 0 n = 0
for row in rows: for row in rows:
@@ -88,7 +105,9 @@ echo ""
echo "=== Step 5: Monitor aggregation queue ===" echo "=== Step 5: Monitor aggregation queue ==="
while true; do while true; do
DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c "
import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:aggregation')) import os, redis
url = os.environ.get('REDIS_URL', 'redis://redis-master.redis-service.svc.cluster.local:6379/0')
r = redis.from_url(url); print(r.llen('stonks:queue:aggregation'))
" 2>/dev/null) " 2>/dev/null)
echo " Aggregation queue: $DEPTH" echo " Aggregation queue: $DEPTH"
if [ "$DEPTH" = "0" ]; then if [ "$DEPTH" = "0" ]; then