From 5c63264393a245a6b949cff794ef4e183adbf158 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Sun, 19 Apr 2026 22:20:03 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20stage-isolated=20infrastructure=20?= =?UTF-8?q?=E2=80=94=20separate=20Postgres=20DBs,=20Redis=20DBs,=20and=20M?= =?UTF-8?q?inIO=20bucket=20prefixes=20per=20stage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/helm/stonks-oracle/values-beta.yaml | 3 ++ infra/helm/stonks-oracle/values-paper.yaml | 3 ++ infra/helm/stonks-oracle/values.yaml | 1 + pipelines/runmefirst.sh | 31 ++++++++++++++ services/adapters/base.py | 4 +- services/adapters/web_scrape_adapter.py | 3 +- services/extractor/event_classifier.py | 6 +-- services/lake_publisher/partitions.py | 4 +- services/shared/config.py | 22 ++++++---- services/shared/storage.py | 49 +++++++++++++++------- 10 files changed, 96 insertions(+), 30 deletions(-) diff --git a/infra/helm/stonks-oracle/values-beta.yaml b/infra/helm/stonks-oracle/values-beta.yaml index 7c567bc..710347f 100644 --- a/infra/helm/stonks-oracle/values-beta.yaml +++ b/infra/helm/stonks-oracle/values-beta.yaml @@ -12,6 +12,9 @@ config: BROKER_PROVIDER: "mock" LOG_LEVEL: "DEBUG" TRADING_ENABLED: "false" + POSTGRES_DB: "stonks_beta" + REDIS_DB: "1" + DEPLOY_STAGE: "beta" ## All services pinned to 1 replica with lighter resource limits services: diff --git a/infra/helm/stonks-oracle/values-paper.yaml b/infra/helm/stonks-oracle/values-paper.yaml index 0eb217c..6778bba 100644 --- a/infra/helm/stonks-oracle/values-paper.yaml +++ b/infra/helm/stonks-oracle/values-paper.yaml @@ -12,6 +12,9 @@ config: BROKER_PROVIDER: "alpaca" LOG_LEVEL: "INFO" TRADING_ENABLED: "true" + POSTGRES_DB: "stonks_paper" + REDIS_DB: "2" + DEPLOY_STAGE: "paper" ## Secrets override: Alpaca paper trading API endpoint secrets: diff --git a/infra/helm/stonks-oracle/values.yaml b/infra/helm/stonks-oracle/values.yaml index a47d8a7..6904e47 100644 --- a/infra/helm/stonks-oracle/values.yaml +++ b/infra/helm/stonks-oracle/values.yaml @@ -189,6 +189,7 @@ config: RETENTION_BATCH_SIZE: "1000" LOG_LEVEL: "INFO" JSON_LOGS: "true" + DEPLOY_STAGE: "" ALERT_SOURCE_FAILURE_THRESHOLD: "3" ALERT_SOURCE_FAILURE_WINDOW_HOURS: "6" ALERT_SCHEMA_FAILURE_RATE_THRESHOLD: "0.3" diff --git a/pipelines/runmefirst.sh b/pipelines/runmefirst.sh index b846f4c..44cc4df 100755 --- a/pipelines/runmefirst.sh +++ b/pipelines/runmefirst.sh @@ -299,6 +299,37 @@ for ns in stonks-beta stonks-paper stonks-oracle; do done echo "" +# ------------------------------------------------------- +# 6d. Create isolated databases for beta and paper stages +# ------------------------------------------------------- +echo "--- Step 6d: Creating stage-isolated databases ---" +PG_POD=$(kubectl get pods -n postgresql-service -l cnpg.io/cluster=postgresql,role=primary -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "postgresql-1") + +# Create databases (idempotent) +for db in stonks_beta stonks_paper; do + kubectl exec -n postgresql-service "$PG_POD" -c postgres -- \ + psql -U postgres -tc "SELECT 1 FROM pg_database WHERE datname='$db'" | grep -q 1 || \ + kubectl exec -n postgresql-service "$PG_POD" -c postgres -- \ + psql -U postgres -c "CREATE DATABASE $db OWNER stonks;" + echo " ✓ Database $db exists" +done + +# Run migrations on beta and paper databases +REPO_ROOT="${SCRIPT_DIR}/../stonks-oracle" +if [ -d "$REPO_ROOT/infra/migrations" ]; then + for db in stonks_beta stonks_paper; do + echo " Running migrations on $db..." + for migration in $(ls "$REPO_ROOT/infra/migrations/"*.sql 2>/dev/null | sort); do + kubectl exec -n postgresql-service "$PG_POD" -c postgres -- \ + psql -U stonks -d "$db" -f - < "$migration" > /dev/null 2>&1 || true + done + echo " ✓ Migrations applied to $db" + done +else + echo " ⚠ Migrations directory not found at $REPO_ROOT/infra/migrations — skipping" +fi +echo "" + # ------------------------------------------------------- # 7. Install Kargo via Helm # ------------------------------------------------------- diff --git a/services/adapters/base.py b/services/adapters/base.py index 6ca6768..8f5df3b 100644 --- a/services/adapters/base.py +++ b/services/adapters/base.py @@ -12,6 +12,8 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any +from services.shared.storage import _prefixed + @dataclass class AdapterResult: @@ -71,7 +73,7 @@ class BaseAdapter(ABC): Override in subclasses if the bucket differs from the default pattern. """ - return f"stonks-raw-{self.source_type().replace('_api', '').replace('_', '-')}" + return _prefixed(f"stonks-raw-{self.source_type().replace('_api', '').replace('_', '-')}") def artifact_path(self, ticker: str, document_id: str, now: datetime) -> str: """Build the MinIO object path for a raw artifact. diff --git a/services/adapters/web_scrape_adapter.py b/services/adapters/web_scrape_adapter.py index 0203f3f..06d769a 100644 --- a/services/adapters/web_scrape_adapter.py +++ b/services/adapters/web_scrape_adapter.py @@ -20,6 +20,7 @@ import httpx from bs4 import BeautifulSoup from services.shared.content import content_hash, normalize_url +from services.shared.storage import _prefixed from .base import AdapterResult, BaseAdapter @@ -172,7 +173,7 @@ class WebScrapeAdapter(BaseAdapter): def bucket_name(self) -> str: """Web scrape artifacts go to the news raw bucket.""" - return "stonks-raw-news" + return _prefixed("stonks-raw-news") async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult: """Fetch HTML from curated URLs for a given ticker. diff --git a/services/extractor/event_classifier.py b/services/extractor/event_classifier.py index 382f03c..88b490a 100644 --- a/services/extractor/event_classifier.py +++ b/services/extractor/event_classifier.py @@ -30,7 +30,7 @@ from services.shared.schemas import ( ModelMetadata, SeverityLevel, ) -from services.shared.storage import upload_artifact +from services.shared.storage import _prefixed, upload_artifact logger = logging.getLogger("event_classifier") @@ -381,7 +381,7 @@ def _upload_classification_prompt( f"{document_id}/prompt.json" ) return upload_artifact( - minio_client, "stonks-llm-prompts", path, payload, + minio_client, _prefixed("stonks-llm-prompts"), path, payload, content_type="application/json", ) @@ -421,7 +421,7 @@ def _upload_classification_result( f"{document_id}/result.json" ) return upload_artifact( - minio_client, "stonks-llm-results", path, payload, + minio_client, _prefixed("stonks-llm-results"), path, payload, content_type="application/json", ) diff --git a/services/lake_publisher/partitions.py b/services/lake_publisher/partitions.py index e341948..13e58f0 100644 --- a/services/lake_publisher/partitions.py +++ b/services/lake_publisher/partitions.py @@ -23,7 +23,9 @@ import uuid from dataclasses import dataclass, field from datetime import date, datetime -LAKEHOUSE_BUCKET = "stonks-lakehouse" +from services.shared.storage import _prefixed + +LAKEHOUSE_BUCKET = _prefixed("stonks-lakehouse") WAREHOUSE_PREFIX = "warehouse" diff --git a/services/shared/config.py b/services/shared/config.py index 8175c97..ec6b6c3 100644 --- a/services/shared/config.py +++ b/services/shared/config.py @@ -99,15 +99,21 @@ class RetentionConfig: # Map bucket names to RetentionConfig field names +# Uses _bucket() so retention cleanup targets the correct stage-specific buckets +def _bucket(name: str) -> str: + """Apply DEPLOY_STAGE prefix to a bucket name.""" + prefix = os.getenv("DEPLOY_STAGE", "") + return f"{prefix}-{name}" if prefix else name + BUCKET_RETENTION_FIELDS: dict[str, str] = { - "stonks-raw-market": "raw_market_days", - "stonks-raw-news": "raw_news_days", - "stonks-raw-filings": "raw_filings_days", - "stonks-normalized": "normalized_days", - "stonks-llm-prompts": "llm_prompts_days", - "stonks-llm-results": "llm_results_days", - "stonks-lakehouse": "lakehouse_days", - "stonks-audit": "audit_days", + _bucket("stonks-raw-market"): "raw_market_days", + _bucket("stonks-raw-news"): "raw_news_days", + _bucket("stonks-raw-filings"): "raw_filings_days", + _bucket("stonks-normalized"): "normalized_days", + _bucket("stonks-llm-prompts"): "llm_prompts_days", + _bucket("stonks-llm-results"): "llm_results_days", + _bucket("stonks-lakehouse"): "lakehouse_days", + _bucket("stonks-audit"): "audit_days", } diff --git a/services/shared/storage.py b/services/shared/storage.py index 69567a6..da8155b 100644 --- a/services/shared/storage.py +++ b/services/shared/storage.py @@ -21,6 +21,7 @@ Requirements: 3.1, 3.2, 3.3, 9.1 """ import io import logging +import os from datetime import datetime, timezone from typing import Mapping @@ -29,8 +30,13 @@ from minio.error import S3Error logger = logging.getLogger("storage") -# All known buckets the platform uses -ALL_BUCKETS = [ +# Optional bucket prefix for stage isolation (e.g. "beta", "paper"). +# When set, all bucket names become "beta-stonks-raw-market", etc. +# Reads from DEPLOY_STAGE env var set by the Helm configmap. +_BUCKET_PREFIX = os.getenv("DEPLOY_STAGE", "") + +# All known base bucket names the platform uses +_BASE_BUCKETS = [ "stonks-raw-market", "stonks-raw-news", "stonks-raw-filings", @@ -41,14 +47,25 @@ ALL_BUCKETS = [ "stonks-audit", ] -# Map source_type to the correct raw bucket + +def _prefixed(bucket: str) -> str: + """Apply the stage prefix to a bucket name.""" + if _BUCKET_PREFIX: + return f"{_BUCKET_PREFIX}-{bucket}" + return bucket + + +# Public list with prefix applied +ALL_BUCKETS = [_prefixed(b) for b in _BASE_BUCKETS] + +# Map source_type to the correct raw bucket (with prefix) SOURCE_BUCKET_MAP: dict[str, str] = { - "market_api": "stonks-raw-market", - "news_api": "stonks-raw-news", - "filings_api": "stonks-raw-filings", - "web_scrape": "stonks-raw-news", - "broker": "stonks-raw-market", - "macro_news": "stonks-raw-news", + "market_api": _prefixed("stonks-raw-market"), + "news_api": _prefixed("stonks-raw-news"), + "filings_api": _prefixed("stonks-raw-filings"), + "web_scrape": _prefixed("stonks-raw-news"), + "broker": _prefixed("stonks-raw-market"), + "macro_news": _prefixed("stonks-raw-news"), } # Map artifact type to content type and file extension @@ -62,7 +79,7 @@ ARTIFACT_CONTENT_TYPES: dict[str, tuple[str, str]] = { def bucket_for_source(source_type: str) -> str: """Return the MinIO bucket name for a given source type.""" - return SOURCE_BUCKET_MAP.get(source_type, "stonks-raw-market") + return SOURCE_BUCKET_MAP.get(source_type, _prefixed("stonks-raw-market")) def build_artifact_path( @@ -227,7 +244,7 @@ def upload_normalized_text( f"{document_id}/normalized.txt" ) return upload_artifact( - client, "stonks-normalized", path, text_bytes, + client, _prefixed("stonks-normalized"), path, text_bytes, content_type="text/plain", metadata=metadata, ) @@ -251,7 +268,7 @@ def upload_parser_output( f"{document_id}/parser_output.json" ) return upload_artifact( - client, "stonks-normalized", path, output_bytes, + client, _prefixed("stonks-normalized"), path, output_bytes, content_type="application/json", metadata=metadata, ) @@ -275,7 +292,7 @@ def upload_extraction_prompt( f"{document_id}/prompt.json" ) return upload_artifact( - client, "stonks-llm-prompts", path, prompt_data, + client, _prefixed("stonks-llm-prompts"), path, prompt_data, content_type="application/json", metadata=metadata, ) @@ -300,7 +317,7 @@ def upload_extraction_raw_output( f"{document_id}/raw_output_{attempt_index}.json" ) return upload_artifact( - client, "stonks-llm-results", path, output_data, + client, _prefixed("stonks-llm-results"), path, output_data, content_type="application/json", metadata=metadata, ) @@ -324,7 +341,7 @@ def upload_extraction_validation( f"{document_id}/validation.json" ) return upload_artifact( - client, "stonks-llm-results", path, validation_data, + client, _prefixed("stonks-llm-results"), path, validation_data, content_type="application/json", metadata=metadata, ) @@ -348,7 +365,7 @@ def upload_extraction_intelligence( f"{document_id}/intelligence.json" ) return upload_artifact( - client, "stonks-llm-results", path, intelligence_data, + client, _prefixed("stonks-llm-results"), path, intelligence_data, content_type="application/json", metadata=metadata, )