Files
stonks-oracle/tests/test_lake_publication_validation.py
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

598 lines
22 KiB
Python

"""Validate lake publication and Trino query correctness over partitioned MinIO datasets.
Ensures that:
- PyArrow schemas in worker.py match the lakehouse DDL column definitions
- Iceberg DDL generated from PyArrow schemas is consistent with lakehouse DDL
- Partition layouts are Hive-compatible and discoverable by Trino
- Published Parquet files embed partition columns in the data
- Cross-table join keys used by views are present and type-consistent
- All 12 analytical fact tables have aligned schema definitions across layers
Requirements: 9.4, 9.5, 10.1, 10.3, N4, N6
Design ref: Section 5.2, 5.3, 7, 8.4
"""
from __future__ import annotations
import re
from datetime import date, datetime, timezone
from pathlib import Path
from unittest.mock import MagicMock
import pyarrow as pa
import pyarrow.parquet as pq
from services.lake_publisher.iceberg import (
TABLE_SCHEMAS,
_arrow_type_to_trino,
get_all_table_defs,
)
from services.lake_publisher.partitions import (
LAKEHOUSE_BUCKET,
TABLE_PARTITIONS,
WAREHOUSE_PREFIX,
partition_path,
partition_values,
)
from services.lake_publisher.worker import (
COMPANY_EVENTS_SCHEMA,
DOCUMENT_EXTRACTIONS_SCHEMA,
DOCUMENTS_SCHEMA,
MARKET_BARS_SCHEMA,
MARKET_QUOTES_SCHEMA,
MODEL_PERFORMANCE_SCHEMA,
PNL_DAILY_SCHEMA,
POSITIONS_DAILY_SCHEMA,
PREDICTION_VS_OUTCOME_SCHEMA,
TRADE_FILLS_SCHEMA,
TRADE_ORDERS_SCHEMA,
TRADE_SIGNALS_SCHEMA,
publish_company_event,
publish_document_extraction,
publish_document_fact,
publish_market_bar,
publish_market_quote,
publish_model_performance,
publish_pnl_daily,
publish_position_daily,
publish_prediction_fact,
publish_trade_fill,
publish_trade_order,
)
from services.shared.schemas import (
ActionType,
ModelMetadata,
PositionSizing,
Recommendation,
RecommendationMode,
)
NOW = datetime(2026, 4, 11, 14, 30, 0, tzinfo=timezone.utc)
LAKEHOUSE_DDL_DIR = Path("lakehouse/schemas")
# All 12 expected analytical fact tables
ALL_TABLES = [
"market_bars",
"market_quotes",
"company_events",
"documents",
"document_extractions",
"trade_signals",
"trade_orders",
"trade_fills",
"positions_daily",
"pnl_daily",
"prediction_vs_outcome",
"model_performance",
]
# Map table names to their PyArrow schemas for direct reference
PYARROW_SCHEMAS: dict[str, pa.Schema] = {
"market_bars": MARKET_BARS_SCHEMA,
"market_quotes": MARKET_QUOTES_SCHEMA,
"company_events": COMPANY_EVENTS_SCHEMA,
"documents": DOCUMENTS_SCHEMA,
"document_extractions": DOCUMENT_EXTRACTIONS_SCHEMA,
"trade_signals": TRADE_SIGNALS_SCHEMA,
"trade_orders": TRADE_ORDERS_SCHEMA,
"trade_fills": TRADE_FILLS_SCHEMA,
"positions_daily": POSITIONS_DAILY_SCHEMA,
"pnl_daily": PNL_DAILY_SCHEMA,
"prediction_vs_outcome": PREDICTION_VS_OUTCOME_SCHEMA,
"model_performance": MODEL_PERFORMANCE_SCHEMA,
}
# ---------------------------------------------------------------------------
# Helpers: parse lakehouse DDL SQL files
# ---------------------------------------------------------------------------
def _parse_ddl_columns(sql_path: Path) -> list[tuple[str, str]]:
"""Parse column definitions from a lakehouse DDL SQL file.
Returns list of (column_name, trino_type) tuples in declaration order.
Includes partition columns from the partitioned_by clause appended at the end,
since Hive DDL separates them but PyArrow/Iceberg schemas include them inline.
"""
text = sql_path.read_text()
# Extract the column block — match balanced parens for the CREATE TABLE body.
# The column block ends at the closing ) before WITH.
match = re.search(
r"CREATE TABLE[^(]+\((.*)\)\s*WITH",
text, re.DOTALL | re.IGNORECASE,
)
if not match:
return []
col_block = match.group(1)
columns = []
for line in col_block.strip().split("\n"):
line = line.strip().rstrip(",")
if not line or line.startswith("--"):
continue
# Split only on first whitespace to keep multi-word types intact
parts = line.split(None, 1)
if len(parts) >= 2:
col_name = parts[0].lower()
col_type = parts[1].upper().strip()
columns.append((col_name, col_type))
return columns
def _parse_ddl_partitions(sql_path: Path) -> list[str]:
"""Parse partition keys from a lakehouse DDL SQL file."""
text = sql_path.read_text()
match = re.search(r"partitioned_by\s*=\s*ARRAY\[([^\]]+)\]", text, re.IGNORECASE)
if not match:
return []
raw = match.group(1)
return [k.strip().strip("'\"") for k in raw.split(",")]
# ---------------------------------------------------------------------------
# 1. All 12 tables are registered across all layers
# ---------------------------------------------------------------------------
def test_all_tables_in_partition_registry():
"""Every expected analytical table is registered in TABLE_PARTITIONS."""
for table in ALL_TABLES:
assert table in TABLE_PARTITIONS, f"{table} missing from TABLE_PARTITIONS"
def test_all_tables_in_schema_registry():
"""Every expected analytical table has a PyArrow schema in TABLE_SCHEMAS."""
for table in ALL_TABLES:
assert table in TABLE_SCHEMAS, f"{table} missing from TABLE_SCHEMAS"
def test_all_tables_have_ddl_files():
"""Every expected analytical table has a lakehouse DDL SQL file."""
for table in ALL_TABLES:
ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql"
assert ddl_path.exists(), f"Missing DDL file: {ddl_path}"
def test_all_tables_have_iceberg_defs():
"""Every table in TABLE_PARTITIONS produces a valid IcebergTableDef."""
defs = get_all_table_defs()
def_names = {d.table_name for d in defs}
for table in ALL_TABLES:
assert table in def_names, f"{table} missing from Iceberg table defs"
# ---------------------------------------------------------------------------
# 2. PyArrow schema ↔ Lakehouse DDL column alignment
# ---------------------------------------------------------------------------
def test_pyarrow_columns_match_ddl():
"""PyArrow schema column names and order match the lakehouse DDL for every table."""
for table in ALL_TABLES:
ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql"
if not ddl_path.exists():
continue
ddl_cols = _parse_ddl_columns(ddl_path)
ddl_col_names = [c[0] for c in ddl_cols]
arrow_schema = PYARROW_SCHEMAS[table]
arrow_col_names = [arrow_schema.field(i).name for i in range(len(arrow_schema))]
assert arrow_col_names == ddl_col_names, (
f"Column mismatch for {table}:\n"
f" PyArrow: {arrow_col_names}\n"
f" DDL: {ddl_col_names}"
)
def test_pyarrow_types_compatible_with_ddl():
"""PyArrow types map to Trino types that match the lakehouse DDL."""
for table in ALL_TABLES:
ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql"
if not ddl_path.exists():
continue
ddl_cols = _parse_ddl_columns(ddl_path)
ddl_type_map = {name: typ for name, typ in ddl_cols}
arrow_schema = PYARROW_SCHEMAS[table]
for i in range(len(arrow_schema)):
col_name = arrow_schema.field(i).name
arrow_type = arrow_schema.field(i).type
trino_type = _arrow_type_to_trino(arrow_type)
ddl_type = ddl_type_map.get(col_name, "")
assert trino_type == ddl_type, (
f"Type mismatch for {table}.{col_name}: "
f"PyArrow→Trino={trino_type}, DDL={ddl_type}"
)
# ---------------------------------------------------------------------------
# 3. Partition key alignment across layers
# ---------------------------------------------------------------------------
def test_partition_keys_match_ddl():
"""Partition keys in TABLE_PARTITIONS match the DDL partitioned_by clause."""
for table in ALL_TABLES:
ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql"
if not ddl_path.exists():
continue
ddl_parts = _parse_ddl_partitions(ddl_path)
spec = TABLE_PARTITIONS[table]
arrow_parts = list(spec.all_keys)
assert arrow_parts == ddl_parts, (
f"Partition key mismatch for {table}: "
f"TABLE_PARTITIONS={arrow_parts}, DDL={ddl_parts}"
)
def test_iceberg_partition_keys_match():
"""Iceberg DDL partition keys match TABLE_PARTITIONS for every table."""
for td in get_all_table_defs():
spec = TABLE_PARTITIONS[td.table_name]
expected_keys = list(spec.all_keys)
# Parse from the generated SQL
sql = td.create_table_sql()
match = re.search(r"partitioning = ARRAY\[([^\]]+)\]", sql)
if expected_keys:
assert match is not None, f"No partitioning clause for {td.table_name}"
parsed = [k.strip().strip("'") for k in match.group(1).split(",")]
assert parsed == expected_keys, (
f"Iceberg partition mismatch for {td.table_name}: "
f"expected={expected_keys}, got={parsed}"
)
# ---------------------------------------------------------------------------
# 4. Partition columns are embedded in PyArrow schemas
# ---------------------------------------------------------------------------
def test_partition_columns_in_pyarrow_schemas():
"""Partition columns (dt, model_version, etc.) appear in the PyArrow schema
so they are written into Parquet files, not just inferred from paths."""
for table in ALL_TABLES:
schema = PYARROW_SCHEMAS[table]
spec = TABLE_PARTITIONS[table]
col_names = {schema.field(i).name for i in range(len(schema))}
for key in spec.all_keys:
assert key in col_names, (
f"Partition column '{key}' missing from PyArrow schema for {table}"
)
# ---------------------------------------------------------------------------
# 5. Hive-compatible partition path format
# ---------------------------------------------------------------------------
def test_partition_paths_are_hive_compatible():
"""Partition paths follow Hive key=value directory convention."""
for table in ALL_TABLES:
spec = TABLE_PARTITIONS[table]
extras = {}
if spec.extra_keys:
extras = {k: "test_val" for k in spec.extra_keys}
path = partition_path(table, NOW, extras)
# Must start with warehouse prefix
assert path.startswith(f"{WAREHOUSE_PREFIX}/{table}/"), (
f"Path for {table} doesn't start with warehouse prefix: {path}"
)
# Must contain dt= partition
assert "dt=2026-04-11" in path, f"Missing dt partition in path for {table}: {path}"
# Must end with .parquet
assert path.endswith(".parquet"), f"Path for {table} doesn't end with .parquet: {path}"
# Extra partition keys must appear
for key in spec.extra_keys:
assert f"{key}=test_val" in path, (
f"Missing extra partition {key} in path for {table}: {path}"
)
def test_partition_path_dt_from_date_object():
"""partition_path works with both datetime and date objects."""
d = date(2026, 4, 11)
path = partition_path("market_bars", d)
assert "dt=2026-04-11" in path
# ---------------------------------------------------------------------------
# 6. Published Parquet files contain partition columns in data
# ---------------------------------------------------------------------------
def _capture_parquet(mock_client: MagicMock) -> pa.Table:
"""Extract the Parquet table from a MagicMock MinIO client's put_object call."""
put_call = mock_client.put_object.call_args
buf = put_call[0][2]
buf.seek(0)
return pq.read_table(buf)
def test_published_market_bar_has_dt_column():
client = MagicMock()
publish_market_bar(
client, ticker="AAPL", open_price=150.0, high_price=155.0,
low_price=149.0, close_price=153.0, volume=1000000,
bar_timestamp=NOW, source="test",
)
table = _capture_parquet(client)
assert "dt" in table.column_names
assert table.column("dt")[0].as_py() == date(2026, 4, 11)
def test_published_document_extraction_has_partition_columns():
client = MagicMock()
publish_document_extraction(
client, document_id="doc-1", ticker="AAPL", sentiment="positive",
impact_score=0.7, catalyst_type="earnings", confidence=0.85,
extraction_at=NOW, model_name="test-model", prompt_version="v1",
schema_version="2.0.0",
)
table = _capture_parquet(client)
assert "dt" in table.column_names
assert "model_version" in table.column_names
assert table.column("dt")[0].as_py() == date(2026, 4, 11)
assert table.column("model_version")[0].as_py() == "2.0.0"
def test_published_prediction_vs_outcome_has_partition_columns():
client = MagicMock()
rec = Recommendation(
recommendation_id="rec-001", ticker="AAPL", action=ActionType.BUY,
mode=RecommendationMode.PAPER_ELIGIBLE, confidence=0.72,
time_horizon="swing_1d_10d", thesis="test",
invalidation_conditions=["x"], position_sizing=PositionSizing(portfolio_pct=0.02, max_loss_pct=0.005),
evidence_refs=["doc1"], model_metadata=ModelMetadata(provider="ollama", model_name="test-v1"),
generated_at=NOW,
)
publish_prediction_fact(client, rec)
table = _capture_parquet(client)
assert "dt" in table.column_names
assert "model_version" in table.column_names
def test_published_model_performance_has_partition_columns():
client = MagicMock()
publish_model_performance(
client, document_id="doc-1", model_name="gpt-oss:20b",
success=True, total_duration_ms=1500, recorded_at=NOW,
schema_version="2.0.0",
)
table = _capture_parquet(client)
assert "dt" in table.column_names
assert "model_version" in table.column_names
assert table.column("model_version")[0].as_py() == "2.0.0"
# ---------------------------------------------------------------------------
# 7. Parquet schema matches PyArrow schema for every publisher
# ---------------------------------------------------------------------------
def _publish_and_verify_schema(table_name: str, publish_fn, expected_schema: pa.Schema):
"""Helper: call a publish function, read back the Parquet, verify column names match."""
client = MagicMock()
publish_fn(client)
table = _capture_parquet(client)
expected_names = [expected_schema.field(i).name for i in range(len(expected_schema))]
assert list(table.column_names) == expected_names, (
f"Parquet column mismatch for {table_name}: "
f"got={list(table.column_names)}, expected={expected_names}"
)
def test_parquet_schema_market_bars():
_publish_and_verify_schema("market_bars", lambda c: publish_market_bar(
c, "AAPL", 150.0, 155.0, 149.0, 153.0, 1000000, NOW, "test",
), MARKET_BARS_SCHEMA)
def test_parquet_schema_market_quotes():
_publish_and_verify_schema("market_quotes", lambda c: publish_market_quote(
c, "AAPL", 150.0, 150.5, 150.25, NOW, "test",
), MARKET_QUOTES_SCHEMA)
def test_parquet_schema_company_events():
_publish_and_verify_schema("company_events", lambda c: publish_company_event(
c, "evt-1", "AAPL", "earnings", "Q1 Earnings", NOW, "test",
), COMPANY_EVENTS_SCHEMA)
def test_parquet_schema_documents():
_publish_and_verify_schema("documents", lambda c: publish_document_fact(
c, "doc-1", "article", "news_api", "AAPL", "Reuters", "Test", NOW, "hash123",
), DOCUMENTS_SCHEMA)
def test_parquet_schema_trade_orders():
_publish_and_verify_schema("trade_orders", lambda c: publish_trade_order(
c, "ord-1", "AAPL", "buy", "market", 10.0, None, "filled", "acct-1", NOW,
), TRADE_ORDERS_SCHEMA)
def test_parquet_schema_trade_fills():
_publish_and_verify_schema("trade_fills", lambda c: publish_trade_fill(
c, "fill-1", "ord-1", "AAPL", "buy", 150.25, 10.0, "acct-1", NOW,
), TRADE_FILLS_SCHEMA)
def test_parquet_schema_positions_daily():
_publish_and_verify_schema("positions_daily", lambda c: publish_position_daily(
c, "AAPL", 100.0, 145.0, 150.0, 500.0, "acct-1", NOW,
), POSITIONS_DAILY_SCHEMA)
def test_parquet_schema_pnl_daily():
_publish_and_verify_schema("pnl_daily", lambda c: publish_pnl_daily(
c, "AAPL", 200.0, 500.0, 700.0, "acct-1", NOW,
), PNL_DAILY_SCHEMA)
# ---------------------------------------------------------------------------
# 8. Cross-table join keys for views
# ---------------------------------------------------------------------------
def test_prediction_accuracy_view_join_keys():
"""prediction_accuracy view joins prediction_vs_outcome with trade_signals
on recommendation_id and dt — both tables must have these columns."""
pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))}
ts_cols = {TRADE_SIGNALS_SCHEMA.field(i).name for i in range(len(TRADE_SIGNALS_SCHEMA))}
assert "recommendation_id" in pvo_cols
assert "recommendation_id" in ts_cols
assert "dt" in pvo_cols
assert "dt" in ts_cols
def test_paper_trade_scorecard_view_join_keys():
"""paper_trade_scorecard joins pnl_daily with trade_orders
on ticker, broker_account, and dt."""
pnl_cols = {PNL_DAILY_SCHEMA.field(i).name for i in range(len(PNL_DAILY_SCHEMA))}
ord_cols = {TRADE_ORDERS_SCHEMA.field(i).name for i in range(len(TRADE_ORDERS_SCHEMA))}
for key in ["ticker", "broker_account", "dt"]:
assert key in pnl_cols, f"pnl_daily missing join key: {key}"
assert key in ord_cols, f"trade_orders missing join key: {key}"
def test_paper_trade_detail_view_join_keys():
"""paper_trade_detail joins trade_orders, trade_fills, and prediction_vs_outcome."""
ord_cols = {TRADE_ORDERS_SCHEMA.field(i).name for i in range(len(TRADE_ORDERS_SCHEMA))}
fill_cols = {TRADE_FILLS_SCHEMA.field(i).name for i in range(len(TRADE_FILLS_SCHEMA))}
pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))}
# orders ↔ fills on order_id, dt
assert "order_id" in ord_cols
assert "order_id" in fill_cols
assert "dt" in ord_cols
assert "dt" in fill_cols
# orders ↔ prediction_vs_outcome on recommendation_id, dt
assert "recommendation_id" in ord_cols
assert "recommendation_id" in pvo_cols
def test_signal_hit_rate_view_columns():
"""signal_hit_rate groups by dt and model_version from prediction_vs_outcome."""
pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))}
assert "dt" in pvo_cols
assert "model_version" in pvo_cols
assert "outcome" in pvo_cols
assert "predicted_confidence" in pvo_cols
assert "actual_move_pct" in pvo_cols
# ---------------------------------------------------------------------------
# 9. Iceberg DDL consistency with lakehouse DDL
# ---------------------------------------------------------------------------
def test_iceberg_ddl_columns_match_lakehouse_ddl():
"""Iceberg CREATE TABLE columns match the lakehouse DDL columns for every table."""
for td in get_all_table_defs():
ddl_path = LAKEHOUSE_DDL_DIR / f"{td.table_name}.sql"
if not ddl_path.exists():
continue
ddl_cols = _parse_ddl_columns(ddl_path)
ddl_col_names = [c[0] for c in ddl_cols]
iceberg_sql = td.create_table_sql()
# Extract column block from Iceberg DDL (greedy to handle nested parens)
match = re.search(r"CREATE TABLE[^(]+\((.*)\)\s*WITH", iceberg_sql, re.DOTALL)
assert match is not None, f"Could not parse Iceberg DDL for {td.table_name}"
iceberg_col_block = match.group(1)
iceberg_col_names = []
for line in iceberg_col_block.strip().split("\n"):
line = line.strip().rstrip(",")
if line:
parts = line.split()
if parts:
iceberg_col_names.append(parts[0].lower())
assert iceberg_col_names == ddl_col_names, (
f"Iceberg DDL column mismatch for {td.table_name}:\n"
f" Iceberg: {iceberg_col_names}\n"
f" DDL: {ddl_col_names}"
)
# ---------------------------------------------------------------------------
# 10. MinIO bucket and path conventions
# ---------------------------------------------------------------------------
def test_lakehouse_bucket_name():
assert LAKEHOUSE_BUCKET == "stonks-lakehouse"
def test_warehouse_prefix():
assert WAREHOUSE_PREFIX == "warehouse"
def test_all_paths_use_warehouse_prefix():
"""Every table's partition path starts with warehouse/{table_name}/."""
for table in ALL_TABLES:
spec = TABLE_PARTITIONS[table]
extras = {k: "v" for k in spec.extra_keys}
path = partition_path(table, NOW, extras)
assert path.startswith(f"warehouse/{table}/"), (
f"Path for {table} doesn't follow convention: {path}"
)
# ---------------------------------------------------------------------------
# 11. Iceberg table locations point to correct MinIO paths
# ---------------------------------------------------------------------------
def test_iceberg_locations_match_ddl_external_locations():
"""Iceberg table locations use s3a:// and match the lakehouse DDL external_location."""
for td in get_all_table_defs():
expected = f"s3a://{LAKEHOUSE_BUCKET}/{WAREHOUSE_PREFIX}/{td.table_name}/"
assert td.location == expected, (
f"Iceberg location mismatch for {td.table_name}: "
f"got={td.location}, expected={expected}"
)
# ---------------------------------------------------------------------------
# 12. Partition values are injected correctly
# ---------------------------------------------------------------------------
def test_partition_values_dt_only():
pv = partition_values(NOW)
assert pv == {"dt": date(2026, 4, 11)}
def test_partition_values_with_model_version():
pv = partition_values(NOW, {"model_version": "2.0.0"})
assert pv == {"dt": date(2026, 4, 11), "model_version": "2.0.0"}
def test_partition_values_from_date():
pv = partition_values(date(2026, 4, 11))
assert pv == {"dt": date(2026, 4, 11)}