Files
stonks-oracle/services/shared/storage.py
T
Celes Renata 326c409d63 fix: use _global fallback for empty ticker in MinIO storage paths
Macro news documents have no ticker, causing upload_normalized_text
and upload_parser_output to produce paths like parsed//2026/...
which MinIO rejects as XMinioInvalidObjectName. Use '_global' as
the path segment when ticker is empty, matching the existing
macro prefix pattern in upload_raw_document.
2026-04-15 19:25:23 +00:00

364 lines
11 KiB
Python

"""Raw artifact upload to MinIO.
Provides a reusable storage layer for uploading raw artifacts (API payloads,
HTML, normalized text, model outputs) to MinIO with consistent path conventions,
bucket management, and content-type handling.
Bucket layout follows the design spec:
- stonks-raw-market — raw market API payloads
- stonks-raw-news — raw news API payloads and article HTML
- stonks-raw-filings — raw filings and issuer event payloads
- stonks-normalized — cleaned text and parser outputs
- stonks-llm-prompts — prompts and schemas used
- stonks-llm-results — raw model outputs and validation reports
- stonks-lakehouse — partitioned analytical datasets and table metadata
- stonks-audit — execution traces and exported reports
Object path pattern:
/{stage}/{symbol}/{yyyy}/{mm}/{dd}/{document_id}/{artifact_type}.{ext}
Requirements: 3.1, 3.2, 3.3, 9.1
"""
import io
import logging
from datetime import datetime, timezone
from typing import Mapping
from minio import Minio
from minio.error import S3Error
logger = logging.getLogger("storage")
# All known buckets the platform uses
ALL_BUCKETS = [
"stonks-raw-market",
"stonks-raw-news",
"stonks-raw-filings",
"stonks-normalized",
"stonks-llm-prompts",
"stonks-llm-results",
"stonks-lakehouse",
"stonks-audit",
]
# Map source_type to the correct raw bucket
SOURCE_BUCKET_MAP: dict[str, str] = {
"market_api": "stonks-raw-market",
"news_api": "stonks-raw-news",
"filings_api": "stonks-raw-filings",
"web_scrape": "stonks-raw-news",
"broker": "stonks-raw-market",
"macro_news": "stonks-raw-news",
}
# Map artifact type to content type and file extension
ARTIFACT_CONTENT_TYPES: dict[str, tuple[str, str]] = {
"raw_json": ("application/json", "json"),
"raw_html": ("text/html", "html"),
"raw_text": ("text/plain", "txt"),
"raw_payload": ("application/octet-stream", "bin"),
}
def bucket_for_source(source_type: str) -> str:
"""Return the MinIO bucket name for a given source type."""
return SOURCE_BUCKET_MAP.get(source_type, "stonks-raw-market")
def build_artifact_path(
source_type: str,
ticker: str,
document_id: str,
artifact_name: str = "raw",
ext: str = "json",
timestamp: datetime | None = None,
) -> str:
"""Build a MinIO object path following the design convention.
Pattern: {source_type}/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/{artifact_name}.{ext}
For macro_news sources, uses macro/ prefix instead of ticker:
macro/{yyyy}/{mm}/{dd}/{document_id}/{artifact_name}.{ext}
"""
ts = timestamp or datetime.now(timezone.utc)
# Macro sources use macro/ prefix instead of ticker (Requirement 1.1)
path_prefix = "macro" if source_type == "macro_news" else f"{source_type}/{ticker}"
return (
f"{path_prefix}/"
f"{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/{artifact_name}.{ext}"
)
def storage_ref(bucket: str, path: str) -> str:
"""Build an s3:// URI for a stored artifact."""
return f"s3://{bucket}/{path}"
def ensure_buckets(client: Minio, buckets: list[str] | None = None) -> list[str]:
"""Create any missing buckets. Returns list of buckets that were created."""
target_buckets = buckets or ALL_BUCKETS
created: list[str] = []
for bucket in target_buckets:
try:
if not client.bucket_exists(bucket):
client.make_bucket(bucket)
created.append(bucket)
logger.info("Created bucket: %s", bucket)
except S3Error as e:
logger.error("Failed to ensure bucket %s: %s", bucket, e)
raise
return created
def upload_artifact(
client: Minio,
bucket: str,
path: str,
data: bytes,
content_type: str = "application/json",
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload raw bytes to MinIO and return the s3:// storage reference.
Args:
client: MinIO client instance.
bucket: Target bucket name.
path: Object path within the bucket.
data: Raw bytes to upload.
content_type: MIME type for the object.
metadata: Optional user metadata to attach to the object.
Returns:
s3:// URI pointing to the uploaded object.
"""
_result = client.put_object(
bucket,
path,
io.BytesIO(data),
length=len(data),
content_type=content_type,
metadata=metadata,
)
ref = storage_ref(bucket, path)
logger.debug("Uploaded %d bytes to %s", len(data), ref)
return ref
def upload_raw_artifact(
client: Minio,
source_type: str,
ticker: str,
document_id: str,
data: bytes,
artifact_type: str = "raw_json",
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload a raw artifact using standard conventions for bucket, path, and content type.
This is the primary entry point for ingestion workers to store raw payloads.
Args:
client: MinIO client instance.
source_type: One of market_api, news_api, filings_api, web_scrape, broker.
ticker: Company ticker symbol.
document_id: Unique document or run identifier.
data: Raw bytes to upload.
artifact_type: One of raw_json, raw_html, raw_text, raw_payload.
timestamp: Override timestamp for path generation (defaults to now UTC).
metadata: Optional user metadata dict.
Returns:
s3:// URI pointing to the uploaded object.
"""
bucket = bucket_for_source(source_type)
ct, ext = ARTIFACT_CONTENT_TYPES.get(artifact_type, ("application/octet-stream", "bin"))
path = build_artifact_path(
source_type=source_type,
ticker=ticker,
document_id=document_id,
artifact_name="raw",
ext=ext,
timestamp=timestamp,
)
return upload_artifact(client, bucket, path, data, content_type=ct, metadata=metadata)
def upload_html_artifact(
client: Minio,
ticker: str,
document_id: str,
html_bytes: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload raw HTML for a scraped web page.
Stores in stonks-raw-news under the web_scrape source path.
"""
bucket = bucket_for_source("web_scrape")
path = build_artifact_path(
source_type="web_scrape",
ticker=ticker,
document_id=document_id,
artifact_name="raw",
ext="html",
timestamp=timestamp,
)
return upload_artifact(client, bucket, path, html_bytes, content_type="text/html", metadata=metadata)
def upload_normalized_text(
client: Minio,
ticker: str,
document_id: str,
text_bytes: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload normalized (parsed) text to the stonks-normalized bucket.
Stores under parsed/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/normalized.txt
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_global"
path = (
f"parsed/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/normalized.txt"
)
return upload_artifact(
client, "stonks-normalized", path, text_bytes,
content_type="text/plain", metadata=metadata,
)
def upload_parser_output(
client: Minio,
ticker: str,
document_id: str,
output_bytes: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload structured parser output JSON to the stonks-normalized bucket.
Stores under parsed/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/parser_output.json
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_global"
path = (
f"parsed/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/parser_output.json"
)
return upload_artifact(
client, "stonks-normalized", path, output_bytes,
content_type="application/json", metadata=metadata,
)
def upload_extraction_prompt(
client: Minio,
ticker: str,
document_id: str,
prompt_data: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload the extraction prompt and schema to stonks-llm-prompts.
Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/prompt.json
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_unknown"
path = (
f"extraction/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/prompt.json"
)
return upload_artifact(
client, "stonks-llm-prompts", path, prompt_data,
content_type="application/json", metadata=metadata,
)
def upload_extraction_raw_output(
client: Minio,
ticker: str,
document_id: str,
output_data: bytes,
attempt_index: int = 0,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload a raw model output to stonks-llm-results.
Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/raw_output_{attempt}.json
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_unknown"
path = (
f"extraction/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/raw_output_{attempt_index}.json"
)
return upload_artifact(
client, "stonks-llm-results", path, output_data,
content_type="application/json", metadata=metadata,
)
def upload_extraction_validation(
client: Minio,
ticker: str,
document_id: str,
validation_data: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload a validation report to stonks-llm-results.
Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/validation.json
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_unknown"
path = (
f"extraction/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/validation.json"
)
return upload_artifact(
client, "stonks-llm-results", path, validation_data,
content_type="application/json", metadata=metadata,
)
def upload_extraction_intelligence(
client: Minio,
ticker: str,
document_id: str,
intelligence_data: bytes,
timestamp: datetime | None = None,
metadata: Mapping[str, str] | None = None,
) -> str:
"""Upload the final intelligence object to stonks-llm-results.
Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/intelligence.json
"""
ts = timestamp or datetime.now(timezone.utc)
safe_ticker = ticker or "_unknown"
path = (
f"extraction/{safe_ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/"
f"{document_id}/intelligence.json"
)
return upload_artifact(
client, "stonks-llm-results", path, intelligence_data,
content_type="application/json", metadata=metadata,
)
def download_artifact(client: Minio, bucket: str, path: str) -> bytes:
"""Download an artifact from MinIO and return its bytes."""
response = client.get_object(bucket, path)
try:
return response.read()
finally:
response.close()
response.release_conn()