"""Raw artifact upload to MinIO. Provides a reusable storage layer for uploading raw artifacts (API payloads, HTML, normalized text, model outputs) to MinIO with consistent path conventions, bucket management, and content-type handling. Bucket layout follows the design spec: - stonks-raw-market — raw market API payloads - stonks-raw-news — raw news API payloads and article HTML - stonks-raw-filings — raw filings and issuer event payloads - stonks-normalized — cleaned text and parser outputs - stonks-llm-prompts — prompts and schemas used - stonks-llm-results — raw model outputs and validation reports - stonks-lakehouse — partitioned analytical datasets and table metadata - stonks-audit — execution traces and exported reports Object path pattern: /{stage}/{symbol}/{yyyy}/{mm}/{dd}/{document_id}/{artifact_type}.{ext} Requirements: 3.1, 3.2, 3.3, 9.1 """ import io import logging from datetime import datetime, timezone from typing import Mapping from minio import Minio from minio.error import S3Error logger = logging.getLogger("storage") # All known buckets the platform uses ALL_BUCKETS = [ "stonks-raw-market", "stonks-raw-news", "stonks-raw-filings", "stonks-normalized", "stonks-llm-prompts", "stonks-llm-results", "stonks-lakehouse", "stonks-audit", ] # Map source_type to the correct raw bucket SOURCE_BUCKET_MAP: dict[str, str] = { "market_api": "stonks-raw-market", "news_api": "stonks-raw-news", "filings_api": "stonks-raw-filings", "web_scrape": "stonks-raw-news", "broker": "stonks-raw-market", "macro_news": "stonks-raw-news", } # Map artifact type to content type and file extension ARTIFACT_CONTENT_TYPES: dict[str, tuple[str, str]] = { "raw_json": ("application/json", "json"), "raw_html": ("text/html", "html"), "raw_text": ("text/plain", "txt"), "raw_payload": ("application/octet-stream", "bin"), } def bucket_for_source(source_type: str) -> str: """Return the MinIO bucket name for a given source type.""" return SOURCE_BUCKET_MAP.get(source_type, "stonks-raw-market") def build_artifact_path( source_type: str, ticker: str, document_id: str, artifact_name: str = "raw", ext: str = "json", timestamp: datetime | None = None, ) -> str: """Build a MinIO object path following the design convention. Pattern: {source_type}/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/{artifact_name}.{ext} For macro_news sources, uses macro/ prefix instead of ticker: macro/{yyyy}/{mm}/{dd}/{document_id}/{artifact_name}.{ext} """ ts = timestamp or datetime.now(timezone.utc) # Macro sources use macro/ prefix instead of ticker (Requirement 1.1) path_prefix = "macro" if source_type == "macro_news" else f"{source_type}/{ticker}" return ( f"{path_prefix}/" f"{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/{artifact_name}.{ext}" ) def storage_ref(bucket: str, path: str) -> str: """Build an s3:// URI for a stored artifact.""" return f"s3://{bucket}/{path}" def ensure_buckets(client: Minio, buckets: list[str] | None = None) -> list[str]: """Create any missing buckets. Returns list of buckets that were created.""" target_buckets = buckets or ALL_BUCKETS created: list[str] = [] for bucket in target_buckets: try: if not client.bucket_exists(bucket): client.make_bucket(bucket) created.append(bucket) logger.info("Created bucket: %s", bucket) except S3Error as e: logger.error("Failed to ensure bucket %s: %s", bucket, e) raise return created def upload_artifact( client: Minio, bucket: str, path: str, data: bytes, content_type: str = "application/json", metadata: Mapping[str, str] | None = None, ) -> str: """Upload raw bytes to MinIO and return the s3:// storage reference. Args: client: MinIO client instance. bucket: Target bucket name. path: Object path within the bucket. data: Raw bytes to upload. content_type: MIME type for the object. metadata: Optional user metadata to attach to the object. Returns: s3:// URI pointing to the uploaded object. """ _result = client.put_object( bucket, path, io.BytesIO(data), length=len(data), content_type=content_type, metadata=metadata, ) ref = storage_ref(bucket, path) logger.debug("Uploaded %d bytes to %s", len(data), ref) return ref def upload_raw_artifact( client: Minio, source_type: str, ticker: str, document_id: str, data: bytes, artifact_type: str = "raw_json", timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload a raw artifact using standard conventions for bucket, path, and content type. This is the primary entry point for ingestion workers to store raw payloads. Args: client: MinIO client instance. source_type: One of market_api, news_api, filings_api, web_scrape, broker. ticker: Company ticker symbol. document_id: Unique document or run identifier. data: Raw bytes to upload. artifact_type: One of raw_json, raw_html, raw_text, raw_payload. timestamp: Override timestamp for path generation (defaults to now UTC). metadata: Optional user metadata dict. Returns: s3:// URI pointing to the uploaded object. """ bucket = bucket_for_source(source_type) ct, ext = ARTIFACT_CONTENT_TYPES.get(artifact_type, ("application/octet-stream", "bin")) path = build_artifact_path( source_type=source_type, ticker=ticker, document_id=document_id, artifact_name="raw", ext=ext, timestamp=timestamp, ) return upload_artifact(client, bucket, path, data, content_type=ct, metadata=metadata) def upload_html_artifact( client: Minio, ticker: str, document_id: str, html_bytes: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload raw HTML for a scraped web page. Stores in stonks-raw-news under the web_scrape source path. """ bucket = bucket_for_source("web_scrape") path = build_artifact_path( source_type="web_scrape", ticker=ticker, document_id=document_id, artifact_name="raw", ext="html", timestamp=timestamp, ) return upload_artifact(client, bucket, path, html_bytes, content_type="text/html", metadata=metadata) def upload_normalized_text( client: Minio, ticker: str, document_id: str, text_bytes: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload normalized (parsed) text to the stonks-normalized bucket. Stores under parsed/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/normalized.txt """ ts = timestamp or datetime.now(timezone.utc) path = ( f"parsed/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/normalized.txt" ) return upload_artifact( client, "stonks-normalized", path, text_bytes, content_type="text/plain", metadata=metadata, ) def upload_parser_output( client: Minio, ticker: str, document_id: str, output_bytes: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload structured parser output JSON to the stonks-normalized bucket. Stores under parsed/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/parser_output.json """ ts = timestamp or datetime.now(timezone.utc) path = ( f"parsed/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/parser_output.json" ) return upload_artifact( client, "stonks-normalized", path, output_bytes, content_type="application/json", metadata=metadata, ) def upload_extraction_prompt( client: Minio, ticker: str, document_id: str, prompt_data: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload the extraction prompt and schema to stonks-llm-prompts. Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/prompt.json """ ts = timestamp or datetime.now(timezone.utc) path = ( f"extraction/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/prompt.json" ) return upload_artifact( client, "stonks-llm-prompts", path, prompt_data, content_type="application/json", metadata=metadata, ) def upload_extraction_raw_output( client: Minio, ticker: str, document_id: str, output_data: bytes, attempt_index: int = 0, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload a raw model output to stonks-llm-results. Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/raw_output_{attempt}.json """ ts = timestamp or datetime.now(timezone.utc) path = ( f"extraction/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/raw_output_{attempt_index}.json" ) return upload_artifact( client, "stonks-llm-results", path, output_data, content_type="application/json", metadata=metadata, ) def upload_extraction_validation( client: Minio, ticker: str, document_id: str, validation_data: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload a validation report to stonks-llm-results. Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/validation.json """ ts = timestamp or datetime.now(timezone.utc) path = ( f"extraction/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/validation.json" ) return upload_artifact( client, "stonks-llm-results", path, validation_data, content_type="application/json", metadata=metadata, ) def upload_extraction_intelligence( client: Minio, ticker: str, document_id: str, intelligence_data: bytes, timestamp: datetime | None = None, metadata: Mapping[str, str] | None = None, ) -> str: """Upload the final intelligence object to stonks-llm-results. Stores under extraction/{ticker}/{yyyy}/{mm}/{dd}/{document_id}/intelligence.json """ ts = timestamp or datetime.now(timezone.utc) path = ( f"extraction/{ticker}/{ts.year}/{ts.month:02d}/{ts.day:02d}/" f"{document_id}/intelligence.json" ) return upload_artifact( client, "stonks-llm-results", path, intelligence_data, content_type="application/json", metadata=metadata, ) def download_artifact(client: Minio, bucket: str, path: str) -> bytes: """Download an artifact from MinIO and return its bytes.""" response = client.get_object(bucket, path) try: return response.read() finally: response.close() response.release_conn()