feat: stage-isolated infrastructure — separate Postgres DBs, Redis DBs, and MinIO bucket prefixes per stage

This commit is contained in:
Celes Renata
2026-04-19 22:20:03 +00:00
parent 2621b3c5c5
commit 5c63264393
10 changed files with 96 additions and 30 deletions
+3 -1
View File
@@ -12,6 +12,8 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from services.shared.storage import _prefixed
@dataclass
class AdapterResult:
@@ -71,7 +73,7 @@ class BaseAdapter(ABC):
Override in subclasses if the bucket differs from the default pattern.
"""
return f"stonks-raw-{self.source_type().replace('_api', '').replace('_', '-')}"
return _prefixed(f"stonks-raw-{self.source_type().replace('_api', '').replace('_', '-')}")
def artifact_path(self, ticker: str, document_id: str, now: datetime) -> str:
"""Build the MinIO object path for a raw artifact.
+2 -1
View File
@@ -20,6 +20,7 @@ import httpx
from bs4 import BeautifulSoup
from services.shared.content import content_hash, normalize_url
from services.shared.storage import _prefixed
from .base import AdapterResult, BaseAdapter
@@ -172,7 +173,7 @@ class WebScrapeAdapter(BaseAdapter):
def bucket_name(self) -> str:
"""Web scrape artifacts go to the news raw bucket."""
return "stonks-raw-news"
return _prefixed("stonks-raw-news")
async def fetch(self, ticker: str, config: dict[str, Any]) -> AdapterResult:
"""Fetch HTML from curated URLs for a given ticker.
+3 -3
View File
@@ -30,7 +30,7 @@ from services.shared.schemas import (
ModelMetadata,
SeverityLevel,
)
from services.shared.storage import upload_artifact
from services.shared.storage import _prefixed, upload_artifact
logger = logging.getLogger("event_classifier")
@@ -381,7 +381,7 @@ def _upload_classification_prompt(
f"{document_id}/prompt.json"
)
return upload_artifact(
minio_client, "stonks-llm-prompts", path, payload,
minio_client, _prefixed("stonks-llm-prompts"), path, payload,
content_type="application/json",
)
@@ -421,7 +421,7 @@ def _upload_classification_result(
f"{document_id}/result.json"
)
return upload_artifact(
minio_client, "stonks-llm-results", path, payload,
minio_client, _prefixed("stonks-llm-results"), path, payload,
content_type="application/json",
)
+3 -1
View File
@@ -23,7 +23,9 @@ import uuid
from dataclasses import dataclass, field
from datetime import date, datetime
LAKEHOUSE_BUCKET = "stonks-lakehouse"
from services.shared.storage import _prefixed
LAKEHOUSE_BUCKET = _prefixed("stonks-lakehouse")
WAREHOUSE_PREFIX = "warehouse"
+14 -8
View File
@@ -99,15 +99,21 @@ class RetentionConfig:
# Map bucket names to RetentionConfig field names
# Uses _bucket() so retention cleanup targets the correct stage-specific buckets
def _bucket(name: str) -> str:
"""Apply DEPLOY_STAGE prefix to a bucket name."""
prefix = os.getenv("DEPLOY_STAGE", "")
return f"{prefix}-{name}" if prefix else name
BUCKET_RETENTION_FIELDS: dict[str, str] = {
"stonks-raw-market": "raw_market_days",
"stonks-raw-news": "raw_news_days",
"stonks-raw-filings": "raw_filings_days",
"stonks-normalized": "normalized_days",
"stonks-llm-prompts": "llm_prompts_days",
"stonks-llm-results": "llm_results_days",
"stonks-lakehouse": "lakehouse_days",
"stonks-audit": "audit_days",
_bucket("stonks-raw-market"): "raw_market_days",
_bucket("stonks-raw-news"): "raw_news_days",
_bucket("stonks-raw-filings"): "raw_filings_days",
_bucket("stonks-normalized"): "normalized_days",
_bucket("stonks-llm-prompts"): "llm_prompts_days",
_bucket("stonks-llm-results"): "llm_results_days",
_bucket("stonks-lakehouse"): "lakehouse_days",
_bucket("stonks-audit"): "audit_days",
}
+33 -16
View File
@@ -21,6 +21,7 @@ Requirements: 3.1, 3.2, 3.3, 9.1
"""
import io
import logging
import os
from datetime import datetime, timezone
from typing import Mapping
@@ -29,8 +30,13 @@ from minio.error import S3Error
logger = logging.getLogger("storage")
# All known buckets the platform uses
ALL_BUCKETS = [
# Optional bucket prefix for stage isolation (e.g. "beta", "paper").
# When set, all bucket names become "beta-stonks-raw-market", etc.
# Reads from DEPLOY_STAGE env var set by the Helm configmap.
_BUCKET_PREFIX = os.getenv("DEPLOY_STAGE", "")
# All known base bucket names the platform uses
_BASE_BUCKETS = [
"stonks-raw-market",
"stonks-raw-news",
"stonks-raw-filings",
@@ -41,14 +47,25 @@ ALL_BUCKETS = [
"stonks-audit",
]
# Map source_type to the correct raw bucket
def _prefixed(bucket: str) -> str:
"""Apply the stage prefix to a bucket name."""
if _BUCKET_PREFIX:
return f"{_BUCKET_PREFIX}-{bucket}"
return bucket
# Public list with prefix applied
ALL_BUCKETS = [_prefixed(b) for b in _BASE_BUCKETS]
# Map source_type to the correct raw bucket (with prefix)
SOURCE_BUCKET_MAP: dict[str, str] = {
"market_api": "stonks-raw-market",
"news_api": "stonks-raw-news",
"filings_api": "stonks-raw-filings",
"web_scrape": "stonks-raw-news",
"broker": "stonks-raw-market",
"macro_news": "stonks-raw-news",
"market_api": _prefixed("stonks-raw-market"),
"news_api": _prefixed("stonks-raw-news"),
"filings_api": _prefixed("stonks-raw-filings"),
"web_scrape": _prefixed("stonks-raw-news"),
"broker": _prefixed("stonks-raw-market"),
"macro_news": _prefixed("stonks-raw-news"),
}
# Map artifact type to content type and file extension
@@ -62,7 +79,7 @@ ARTIFACT_CONTENT_TYPES: dict[str, tuple[str, str]] = {
def bucket_for_source(source_type: str) -> str:
"""Return the MinIO bucket name for a given source type."""
return SOURCE_BUCKET_MAP.get(source_type, "stonks-raw-market")
return SOURCE_BUCKET_MAP.get(source_type, _prefixed("stonks-raw-market"))
def build_artifact_path(
@@ -227,7 +244,7 @@ def upload_normalized_text(
f"{document_id}/normalized.txt"
)
return upload_artifact(
client, "stonks-normalized", path, text_bytes,
client, _prefixed("stonks-normalized"), path, text_bytes,
content_type="text/plain", metadata=metadata,
)
@@ -251,7 +268,7 @@ def upload_parser_output(
f"{document_id}/parser_output.json"
)
return upload_artifact(
client, "stonks-normalized", path, output_bytes,
client, _prefixed("stonks-normalized"), path, output_bytes,
content_type="application/json", metadata=metadata,
)
@@ -275,7 +292,7 @@ def upload_extraction_prompt(
f"{document_id}/prompt.json"
)
return upload_artifact(
client, "stonks-llm-prompts", path, prompt_data,
client, _prefixed("stonks-llm-prompts"), path, prompt_data,
content_type="application/json", metadata=metadata,
)
@@ -300,7 +317,7 @@ def upload_extraction_raw_output(
f"{document_id}/raw_output_{attempt_index}.json"
)
return upload_artifact(
client, "stonks-llm-results", path, output_data,
client, _prefixed("stonks-llm-results"), path, output_data,
content_type="application/json", metadata=metadata,
)
@@ -324,7 +341,7 @@ def upload_extraction_validation(
f"{document_id}/validation.json"
)
return upload_artifact(
client, "stonks-llm-results", path, validation_data,
client, _prefixed("stonks-llm-results"), path, validation_data,
content_type="application/json", metadata=metadata,
)
@@ -348,7 +365,7 @@ def upload_extraction_intelligence(
f"{document_id}/intelligence.json"
)
return upload_artifact(
client, "stonks-llm-results", path, intelligence_data,
client, _prefixed("stonks-llm-results"), path, intelligence_data,
content_type="application/json", metadata=metadata,
)