135 lines
5.1 KiB
Python
135 lines
5.1 KiB
Python
"""Hive-compatible partition layout conventions for the MinIO lakehouse.
|
|
|
|
Centralizes partition path generation, partition column injection, and
|
|
bucket provisioning so that all lake publisher writers produce layouts
|
|
that Trino's Hive and Iceberg connectors can discover and prune.
|
|
|
|
Design ref: Section 5.2, 5.3 (Lakehouse model)
|
|
Requirements: 9.4, 9.5, N4, N6
|
|
|
|
Layout convention:
|
|
s3://stonks-lakehouse/warehouse/{table_name}/dt={YYYY-MM-DD}[/{extra_key}={value}]/part-{uuid}.parquet
|
|
|
|
Rules:
|
|
- Every fact table is partitioned by ``dt`` (DATE) derived from the row timestamp.
|
|
- Some tables have a second partition key (e.g. ``model_version``).
|
|
- Partition columns MUST appear in the Parquet file so Trino can read them
|
|
without relying solely on path parsing.
|
|
- File names use a UUID suffix to avoid collisions on concurrent writes.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import date, datetime
|
|
|
|
from services.shared.storage import _prefixed
|
|
|
|
LAKEHOUSE_BUCKET = _prefixed("stonks-lakehouse")
|
|
WAREHOUSE_PREFIX = "warehouse"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PartitionSpec:
|
|
"""Describes the partition layout for a single fact table."""
|
|
|
|
table_name: str
|
|
extra_keys: tuple[str, ...] = field(default_factory=tuple)
|
|
|
|
@property
|
|
def all_keys(self) -> tuple[str, ...]:
|
|
"""Return all partition keys in order (dt first, then extras)."""
|
|
return ("dt", *self.extra_keys)
|
|
|
|
|
|
# Registry of every analytical fact table and its partition keys.
|
|
# This is the single source of truth — DDL, publisher, and tests should agree.
|
|
TABLE_PARTITIONS: dict[str, PartitionSpec] = {
|
|
"market_bars": PartitionSpec("market_bars"),
|
|
"market_quotes": PartitionSpec("market_quotes"),
|
|
"company_events": PartitionSpec("company_events"),
|
|
"documents": PartitionSpec("documents"),
|
|
"document_extractions": PartitionSpec("document_extractions", extra_keys=("model_version",)),
|
|
"trade_signals": PartitionSpec("trade_signals"),
|
|
"trade_orders": PartitionSpec("trade_orders"),
|
|
"trade_fills": PartitionSpec("trade_fills"),
|
|
"positions_daily": PartitionSpec("positions_daily"),
|
|
"pnl_daily": PartitionSpec("pnl_daily"),
|
|
"prediction_vs_outcome": PartitionSpec("prediction_vs_outcome", extra_keys=("model_version",)),
|
|
"model_performance": PartitionSpec("model_performance", extra_keys=("model_version",)),
|
|
"global_events": PartitionSpec("global_events"),
|
|
"macro_impacts": PartitionSpec("macro_impacts", extra_keys=("ticker",)),
|
|
"trend_projections": PartitionSpec("trend_projections", extra_keys=("ticker",)),
|
|
"competitor_relationships": PartitionSpec("competitor_relationships"),
|
|
"competitive_signals": PartitionSpec("competitive_signals", extra_keys=("target_ticker",)),
|
|
}
|
|
|
|
|
|
def partition_path(
|
|
table_name: str,
|
|
dt: datetime | date,
|
|
extra_partitions: dict[str, str] | None = None,
|
|
file_id: str | None = None,
|
|
) -> str:
|
|
"""Build a Hive-compatible object path for a Parquet file.
|
|
|
|
Args:
|
|
table_name: Logical fact table name (must be in TABLE_PARTITIONS).
|
|
dt: Row timestamp or date used to derive the ``dt=`` partition.
|
|
extra_partitions: Additional partition key/value pairs (e.g. model_version).
|
|
file_id: Optional override for the file suffix (defaults to a UUID4).
|
|
|
|
Returns:
|
|
Object key relative to the bucket root, e.g.
|
|
``warehouse/trade_signals/dt=2026-04-11/part-<uuid>.parquet``
|
|
"""
|
|
spec = TABLE_PARTITIONS.get(table_name)
|
|
if spec is None:
|
|
raise ValueError(f"Unknown table: {table_name}. Register it in TABLE_PARTITIONS.")
|
|
|
|
if isinstance(dt, datetime):
|
|
dt_str = dt.strftime("%Y-%m-%d")
|
|
else:
|
|
dt_str = dt.isoformat()
|
|
|
|
segments = [WAREHOUSE_PREFIX, table_name, f"dt={dt_str}"]
|
|
|
|
# Append extra partition directories in the order declared by the spec.
|
|
extras = extra_partitions or {}
|
|
for key in spec.extra_keys:
|
|
value = extras.get(key, "__NONE__")
|
|
segments.append(f"{key}={value}")
|
|
|
|
suffix = file_id or uuid.uuid4().hex[:16]
|
|
segments.append(f"part-{suffix}.parquet")
|
|
|
|
return "/".join(segments)
|
|
|
|
|
|
def partition_values(
|
|
dt: datetime | date,
|
|
extra_partitions: dict[str, str] | None = None,
|
|
) -> dict[str, object]:
|
|
"""Return partition column values to inject into Parquet row data.
|
|
|
|
Trino's Hive connector can read partition values from the directory path,
|
|
but embedding them in the Parquet file as well ensures compatibility with
|
|
engines that don't parse Hive paths (e.g. plain PyArrow reads, DuckDB).
|
|
|
|
Returns a dict like ``{"dt": date(2026, 4, 11), "model_version": "v2"}``.
|
|
"""
|
|
if isinstance(dt, datetime):
|
|
dt_date = dt.date()
|
|
else:
|
|
dt_date = dt
|
|
|
|
values: dict[str, object] = {"dt": dt_date}
|
|
if extra_partitions:
|
|
values.update(extra_partitions)
|
|
return values
|
|
|
|
|
|
def s3_uri(path: str) -> str:
|
|
"""Build an s3:// URI from a bucket-relative object path."""
|
|
return f"s3://{LAKEHOUSE_BUCKET}/{path}"
|