diff --git a/scripts/run_reclassify_and_reaggregate.sh b/scripts/run_reclassify_and_reaggregate.sh new file mode 100755 index 0000000..660d553 --- /dev/null +++ b/scripts/run_reclassify_and_reaggregate.sh @@ -0,0 +1,104 @@ +#!/bin/bash +# Re-classify macro events and re-aggregate all tickers. +# Run from gremlin-1 (or anywhere with kubectl access to the cluster). +set -euo pipefail +NS="stonks-oracle" + +SCHED_POD=$(kubectl get pods -n $NS -l app=scheduler -o jsonpath='{.items[0].metadata.name}') +echo "Using scheduler pod: $SCHED_POD" + +echo "" +echo "=== Step 1: Re-classify macro events ===" +echo "Deleting old global_events + macro_impact_records, then enqueuing docs..." +kubectl exec -n $NS "$SCHED_POD" -- python -c " +import asyncio, json, logging +import asyncpg, redis.asyncio as aioredis +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') +log = logging.getLogger('reclassify') + +async def run(): + pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) + r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') + ic = await pool.fetchval('SELECT count(*) FROM macro_impact_records') + ec = await pool.fetchval('SELECT count(*) FROM global_events') + dc = await pool.fetchval(\"SELECT count(*) FROM documents WHERE document_type = 'macro_event' AND status != 'rejected'\") + log.info('Current: %d events, %d impacts, %d macro docs', ec, ic, dc) + await pool.execute('DELETE FROM macro_impact_records') + log.info('Deleted macro_impact_records') + await pool.execute('DELETE FROM global_events') + log.info('Deleted global_events') + rows = await pool.fetch(\"SELECT id FROM documents WHERE document_type = 'macro_event' AND status != 'rejected' ORDER BY published_at DESC\") + n = 0 + for row in rows: + await r.rpush('stonks:queue:macro_classification', json.dumps({'document_id': str(row['id'])})) + n += 1 + log.info('Enqueued %d macro re-classification jobs', n) + log.info('Queue depth: %d', await r.llen('stonks:queue:macro_classification')) + await pool.close() + await r.aclose() + +asyncio.run(run()) +" + +echo "" +echo "=== Step 2: Wait for macro classification queue to drain ===" +echo "Extractor pods will process these. Polling queue depth..." +while true; do + DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " +import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:macro_classification')) +" 2>/dev/null) + echo " Macro queue: $DEPTH" + if [ "$DEPTH" = "0" ]; then + echo "Macro queue drained!" + break + fi + sleep 15 +done + +echo "" +echo "=== Step 3: Scale aggregation to 16 pods ===" +kubectl scale deployment/aggregation --replicas=16 -n $NS +kubectl rollout status deployment/aggregation -n $NS --timeout=120s + +echo "" +echo "=== Step 4: Enqueue re-aggregation for all tickers ===" +kubectl exec -n $NS "$SCHED_POD" -- python -c " +import asyncio, json, logging +import asyncpg, redis.asyncio as aioredis +logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') +log = logging.getLogger('reaggregate') + +async def run(): + pool = await asyncpg.create_pool(dsn='postgresql://stonks:St0nks0racl3!@postgresql-rw.postgresql-service.svc.cluster.local:5432/stonks', min_size=1, max_size=4) + r = aioredis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0') + rows = await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker') + n = 0 + for row in rows: + await r.rpush('stonks:queue:aggregation', json.dumps({'ticker': row['ticker'], 'reaggregate': True})) + n += 1 + log.info('Enqueued %d aggregation jobs', n) + log.info('Queue depth: %d', await r.llen('stonks:queue:aggregation')) + await pool.close() + await r.aclose() + +asyncio.run(run()) +" + +echo "" +echo "=== Step 5: Monitor aggregation queue ===" +while true; do + DEPTH=$(kubectl exec -n $NS "$SCHED_POD" -- python -c " +import redis; r=redis.from_url('redis://redis-master.redis-service.svc.cluster.local:6379/0'); print(r.llen('stonks:queue:aggregation')) +" 2>/dev/null) + echo " Aggregation queue: $DEPTH" + if [ "$DEPTH" = "0" ]; then + echo "Aggregation queue drained!" + break + fi + sleep 10 +done + +echo "" +echo "=== Step 6: Scale aggregation back to 4 pods ===" +kubectl scale deployment/aggregation --replicas=4 -n $NS +echo "Done! Re-classification and re-aggregation complete." diff --git a/scripts/test_saved_queries.py b/scripts/test_saved_queries.py new file mode 100644 index 0000000..8922a7c --- /dev/null +++ b/scripts/test_saved_queries.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +"""Test all saved SQL explorer queries against the live API.""" +import json +import subprocess + +API = "https://stonks-api.celestium.life" + +result = subprocess.run( + ["curl", "-sk", f"{API}/api/analytics/saved-queries"], + capture_output=True, text=True, timeout=15, +) +queries = json.loads(result.stdout) +print(f"Found {len(queries)} saved queries\n") + +passed, failed, errors = 0, 0, [] + +for i, q in enumerate(queries, 1): + name = q["name"] + sql = q["sql_text"] + payload = json.dumps({"sql": sql, "limit": 5}) + r = subprocess.run( + ["curl", "-sk", "-X", "POST", f"{API}/api/analytics/query", + "-H", "Content-Type: application/json", "-d", payload], + capture_output=True, text=True, timeout=30, + ) + try: + resp = json.loads(r.stdout) + if "error" in resp or "detail" in resp: + err = str(resp.get("detail", resp.get("error", "?")))[:120] + print(f" [{i:2d}/24] FAIL {name}") + print(f" {err}") + failed += 1 + errors.append((name, err)) + else: + rows = len(resp.get("rows", [])) + cols = len(resp.get("columns", [])) + ms = resp.get("elapsed_ms", "?") + print(f" [{i:2d}/24] OK {name} ({rows} rows, {cols} cols, {ms}ms)") + passed += 1 + except json.JSONDecodeError: + print(f" [{i:2d}/24] FAIL {name} (not JSON)") + failed += 1 + errors.append((name, r.stdout[:100])) + +print(f"\n{'='*60}") +print(f"Results: {passed} passed, {failed} failed out of {len(queries)}") +if errors: + print("\nFailed:") + for n, e in errors: + print(f" - {n}: {e}")