"""Validate lake publication and Trino query correctness over partitioned MinIO datasets. Ensures that: - PyArrow schemas in worker.py match the lakehouse DDL column definitions - Iceberg DDL generated from PyArrow schemas is consistent with lakehouse DDL - Partition layouts are Hive-compatible and discoverable by Trino - Published Parquet files embed partition columns in the data - Cross-table join keys used by views are present and type-consistent - All 12 analytical fact tables have aligned schema definitions across layers Requirements: 9.4, 9.5, 10.1, 10.3, N4, N6 Design ref: Section 5.2, 5.3, 7, 8.4 """ from __future__ import annotations import re from datetime import date, datetime, timezone from pathlib import Path from unittest.mock import MagicMock import pyarrow as pa import pyarrow.parquet as pq from services.lake_publisher.iceberg import ( TABLE_SCHEMAS, _arrow_type_to_trino, get_all_table_defs, ) from services.lake_publisher.partitions import ( LAKEHOUSE_BUCKET, TABLE_PARTITIONS, WAREHOUSE_PREFIX, partition_path, partition_values, ) from services.lake_publisher.worker import ( COMPANY_EVENTS_SCHEMA, DOCUMENT_EXTRACTIONS_SCHEMA, DOCUMENTS_SCHEMA, MARKET_BARS_SCHEMA, MARKET_QUOTES_SCHEMA, MODEL_PERFORMANCE_SCHEMA, PNL_DAILY_SCHEMA, POSITIONS_DAILY_SCHEMA, PREDICTION_VS_OUTCOME_SCHEMA, TRADE_FILLS_SCHEMA, TRADE_ORDERS_SCHEMA, TRADE_SIGNALS_SCHEMA, publish_company_event, publish_document_extraction, publish_document_fact, publish_market_bar, publish_market_quote, publish_model_performance, publish_pnl_daily, publish_position_daily, publish_prediction_fact, publish_trade_fill, publish_trade_order, ) from services.shared.schemas import ( ActionType, ModelMetadata, PositionSizing, Recommendation, RecommendationMode, ) NOW = datetime(2026, 4, 11, 14, 30, 0, tzinfo=timezone.utc) LAKEHOUSE_DDL_DIR = Path("lakehouse/schemas") # All 12 expected analytical fact tables ALL_TABLES = [ "market_bars", "market_quotes", "company_events", "documents", "document_extractions", "trade_signals", "trade_orders", "trade_fills", "positions_daily", "pnl_daily", "prediction_vs_outcome", "model_performance", ] # Map table names to their PyArrow schemas for direct reference PYARROW_SCHEMAS: dict[str, pa.Schema] = { "market_bars": MARKET_BARS_SCHEMA, "market_quotes": MARKET_QUOTES_SCHEMA, "company_events": COMPANY_EVENTS_SCHEMA, "documents": DOCUMENTS_SCHEMA, "document_extractions": DOCUMENT_EXTRACTIONS_SCHEMA, "trade_signals": TRADE_SIGNALS_SCHEMA, "trade_orders": TRADE_ORDERS_SCHEMA, "trade_fills": TRADE_FILLS_SCHEMA, "positions_daily": POSITIONS_DAILY_SCHEMA, "pnl_daily": PNL_DAILY_SCHEMA, "prediction_vs_outcome": PREDICTION_VS_OUTCOME_SCHEMA, "model_performance": MODEL_PERFORMANCE_SCHEMA, } # --------------------------------------------------------------------------- # Helpers: parse lakehouse DDL SQL files # --------------------------------------------------------------------------- def _parse_ddl_columns(sql_path: Path) -> list[tuple[str, str]]: """Parse column definitions from a lakehouse DDL SQL file. Returns list of (column_name, trino_type) tuples in declaration order. Includes partition columns from the partitioned_by clause appended at the end, since Hive DDL separates them but PyArrow/Iceberg schemas include them inline. """ text = sql_path.read_text() # Extract the column block — match balanced parens for the CREATE TABLE body. # The column block ends at the closing ) before WITH. match = re.search( r"CREATE TABLE[^(]+\((.*)\)\s*WITH", text, re.DOTALL | re.IGNORECASE, ) if not match: return [] col_block = match.group(1) columns = [] for line in col_block.strip().split("\n"): line = line.strip().rstrip(",") if not line or line.startswith("--"): continue # Split only on first whitespace to keep multi-word types intact parts = line.split(None, 1) if len(parts) >= 2: col_name = parts[0].lower() col_type = parts[1].upper().strip() columns.append((col_name, col_type)) return columns def _parse_ddl_partitions(sql_path: Path) -> list[str]: """Parse partition keys from a lakehouse DDL SQL file.""" text = sql_path.read_text() match = re.search(r"partitioned_by\s*=\s*ARRAY\[([^\]]+)\]", text, re.IGNORECASE) if not match: return [] raw = match.group(1) return [k.strip().strip("'\"") for k in raw.split(",")] # --------------------------------------------------------------------------- # 1. All 12 tables are registered across all layers # --------------------------------------------------------------------------- def test_all_tables_in_partition_registry(): """Every expected analytical table is registered in TABLE_PARTITIONS.""" for table in ALL_TABLES: assert table in TABLE_PARTITIONS, f"{table} missing from TABLE_PARTITIONS" def test_all_tables_in_schema_registry(): """Every expected analytical table has a PyArrow schema in TABLE_SCHEMAS.""" for table in ALL_TABLES: assert table in TABLE_SCHEMAS, f"{table} missing from TABLE_SCHEMAS" def test_all_tables_have_ddl_files(): """Every expected analytical table has a lakehouse DDL SQL file.""" for table in ALL_TABLES: ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql" assert ddl_path.exists(), f"Missing DDL file: {ddl_path}" def test_all_tables_have_iceberg_defs(): """Every table in TABLE_PARTITIONS produces a valid IcebergTableDef.""" defs = get_all_table_defs() def_names = {d.table_name for d in defs} for table in ALL_TABLES: assert table in def_names, f"{table} missing from Iceberg table defs" # --------------------------------------------------------------------------- # 2. PyArrow schema ↔ Lakehouse DDL column alignment # --------------------------------------------------------------------------- def test_pyarrow_columns_match_ddl(): """PyArrow schema column names and order match the lakehouse DDL for every table.""" for table in ALL_TABLES: ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql" if not ddl_path.exists(): continue ddl_cols = _parse_ddl_columns(ddl_path) ddl_col_names = [c[0] for c in ddl_cols] arrow_schema = PYARROW_SCHEMAS[table] arrow_col_names = [arrow_schema.field(i).name for i in range(len(arrow_schema))] assert arrow_col_names == ddl_col_names, ( f"Column mismatch for {table}:\n" f" PyArrow: {arrow_col_names}\n" f" DDL: {ddl_col_names}" ) def test_pyarrow_types_compatible_with_ddl(): """PyArrow types map to Trino types that match the lakehouse DDL.""" for table in ALL_TABLES: ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql" if not ddl_path.exists(): continue ddl_cols = _parse_ddl_columns(ddl_path) ddl_type_map = {name: typ for name, typ in ddl_cols} arrow_schema = PYARROW_SCHEMAS[table] for i in range(len(arrow_schema)): col_name = arrow_schema.field(i).name arrow_type = arrow_schema.field(i).type trino_type = _arrow_type_to_trino(arrow_type) ddl_type = ddl_type_map.get(col_name, "") assert trino_type == ddl_type, ( f"Type mismatch for {table}.{col_name}: " f"PyArrow→Trino={trino_type}, DDL={ddl_type}" ) # --------------------------------------------------------------------------- # 3. Partition key alignment across layers # --------------------------------------------------------------------------- def test_partition_keys_match_ddl(): """Partition keys in TABLE_PARTITIONS match the DDL partitioned_by clause.""" for table in ALL_TABLES: ddl_path = LAKEHOUSE_DDL_DIR / f"{table}.sql" if not ddl_path.exists(): continue ddl_parts = _parse_ddl_partitions(ddl_path) spec = TABLE_PARTITIONS[table] arrow_parts = list(spec.all_keys) assert arrow_parts == ddl_parts, ( f"Partition key mismatch for {table}: " f"TABLE_PARTITIONS={arrow_parts}, DDL={ddl_parts}" ) def test_iceberg_partition_keys_match(): """Iceberg DDL partition keys match TABLE_PARTITIONS for every table.""" for td in get_all_table_defs(): spec = TABLE_PARTITIONS[td.table_name] expected_keys = list(spec.all_keys) # Parse from the generated SQL sql = td.create_table_sql() match = re.search(r"partitioning = ARRAY\[([^\]]+)\]", sql) if expected_keys: assert match is not None, f"No partitioning clause for {td.table_name}" parsed = [k.strip().strip("'") for k in match.group(1).split(",")] assert parsed == expected_keys, ( f"Iceberg partition mismatch for {td.table_name}: " f"expected={expected_keys}, got={parsed}" ) # --------------------------------------------------------------------------- # 4. Partition columns are embedded in PyArrow schemas # --------------------------------------------------------------------------- def test_partition_columns_in_pyarrow_schemas(): """Partition columns (dt, model_version, etc.) appear in the PyArrow schema so they are written into Parquet files, not just inferred from paths.""" for table in ALL_TABLES: schema = PYARROW_SCHEMAS[table] spec = TABLE_PARTITIONS[table] col_names = {schema.field(i).name for i in range(len(schema))} for key in spec.all_keys: assert key in col_names, ( f"Partition column '{key}' missing from PyArrow schema for {table}" ) # --------------------------------------------------------------------------- # 5. Hive-compatible partition path format # --------------------------------------------------------------------------- def test_partition_paths_are_hive_compatible(): """Partition paths follow Hive key=value directory convention.""" for table in ALL_TABLES: spec = TABLE_PARTITIONS[table] extras = {} if spec.extra_keys: extras = {k: "test_val" for k in spec.extra_keys} path = partition_path(table, NOW, extras) # Must start with warehouse prefix assert path.startswith(f"{WAREHOUSE_PREFIX}/{table}/"), ( f"Path for {table} doesn't start with warehouse prefix: {path}" ) # Must contain dt= partition assert "dt=2026-04-11" in path, f"Missing dt partition in path for {table}: {path}" # Must end with .parquet assert path.endswith(".parquet"), f"Path for {table} doesn't end with .parquet: {path}" # Extra partition keys must appear for key in spec.extra_keys: assert f"{key}=test_val" in path, ( f"Missing extra partition {key} in path for {table}: {path}" ) def test_partition_path_dt_from_date_object(): """partition_path works with both datetime and date objects.""" d = date(2026, 4, 11) path = partition_path("market_bars", d) assert "dt=2026-04-11" in path # --------------------------------------------------------------------------- # 6. Published Parquet files contain partition columns in data # --------------------------------------------------------------------------- def _capture_parquet(mock_client: MagicMock) -> pa.Table: """Extract the Parquet table from a MagicMock MinIO client's put_object call.""" put_call = mock_client.put_object.call_args buf = put_call[0][2] buf.seek(0) return pq.read_table(buf) def test_published_market_bar_has_dt_column(): client = MagicMock() publish_market_bar( client, ticker="AAPL", open_price=150.0, high_price=155.0, low_price=149.0, close_price=153.0, volume=1000000, bar_timestamp=NOW, source="test", ) table = _capture_parquet(client) assert "dt" in table.column_names assert table.column("dt")[0].as_py() == date(2026, 4, 11) def test_published_document_extraction_has_partition_columns(): client = MagicMock() publish_document_extraction( client, document_id="doc-1", ticker="AAPL", sentiment="positive", impact_score=0.7, catalyst_type="earnings", confidence=0.85, extraction_at=NOW, model_name="test-model", prompt_version="v1", schema_version="2.0.0", ) table = _capture_parquet(client) assert "dt" in table.column_names assert "model_version" in table.column_names assert table.column("dt")[0].as_py() == date(2026, 4, 11) assert table.column("model_version")[0].as_py() == "2.0.0" def test_published_prediction_vs_outcome_has_partition_columns(): client = MagicMock() rec = Recommendation( recommendation_id="rec-001", ticker="AAPL", action=ActionType.BUY, mode=RecommendationMode.PAPER_ELIGIBLE, confidence=0.72, time_horizon="swing_1d_10d", thesis="test", invalidation_conditions=["x"], position_sizing=PositionSizing(portfolio_pct=0.02, max_loss_pct=0.005), evidence_refs=["doc1"], model_metadata=ModelMetadata(provider="ollama", model_name="test-v1"), generated_at=NOW, ) publish_prediction_fact(client, rec) table = _capture_parquet(client) assert "dt" in table.column_names assert "model_version" in table.column_names def test_published_model_performance_has_partition_columns(): client = MagicMock() publish_model_performance( client, document_id="doc-1", model_name="gpt-oss:20b", success=True, total_duration_ms=1500, recorded_at=NOW, schema_version="2.0.0", ) table = _capture_parquet(client) assert "dt" in table.column_names assert "model_version" in table.column_names assert table.column("model_version")[0].as_py() == "2.0.0" # --------------------------------------------------------------------------- # 7. Parquet schema matches PyArrow schema for every publisher # --------------------------------------------------------------------------- def _publish_and_verify_schema(table_name: str, publish_fn, expected_schema: pa.Schema): """Helper: call a publish function, read back the Parquet, verify column names match.""" client = MagicMock() publish_fn(client) table = _capture_parquet(client) expected_names = [expected_schema.field(i).name for i in range(len(expected_schema))] assert list(table.column_names) == expected_names, ( f"Parquet column mismatch for {table_name}: " f"got={list(table.column_names)}, expected={expected_names}" ) def test_parquet_schema_market_bars(): _publish_and_verify_schema("market_bars", lambda c: publish_market_bar( c, "AAPL", 150.0, 155.0, 149.0, 153.0, 1000000, NOW, "test", ), MARKET_BARS_SCHEMA) def test_parquet_schema_market_quotes(): _publish_and_verify_schema("market_quotes", lambda c: publish_market_quote( c, "AAPL", 150.0, 150.5, 150.25, NOW, "test", ), MARKET_QUOTES_SCHEMA) def test_parquet_schema_company_events(): _publish_and_verify_schema("company_events", lambda c: publish_company_event( c, "evt-1", "AAPL", "earnings", "Q1 Earnings", NOW, "test", ), COMPANY_EVENTS_SCHEMA) def test_parquet_schema_documents(): _publish_and_verify_schema("documents", lambda c: publish_document_fact( c, "doc-1", "article", "news_api", "AAPL", "Reuters", "Test", NOW, "hash123", ), DOCUMENTS_SCHEMA) def test_parquet_schema_trade_orders(): _publish_and_verify_schema("trade_orders", lambda c: publish_trade_order( c, "ord-1", "AAPL", "buy", "market", 10.0, None, "filled", "acct-1", NOW, ), TRADE_ORDERS_SCHEMA) def test_parquet_schema_trade_fills(): _publish_and_verify_schema("trade_fills", lambda c: publish_trade_fill( c, "fill-1", "ord-1", "AAPL", "buy", 150.25, 10.0, "acct-1", NOW, ), TRADE_FILLS_SCHEMA) def test_parquet_schema_positions_daily(): _publish_and_verify_schema("positions_daily", lambda c: publish_position_daily( c, "AAPL", 100.0, 145.0, 150.0, 500.0, "acct-1", NOW, ), POSITIONS_DAILY_SCHEMA) def test_parquet_schema_pnl_daily(): _publish_and_verify_schema("pnl_daily", lambda c: publish_pnl_daily( c, "AAPL", 200.0, 500.0, 700.0, "acct-1", NOW, ), PNL_DAILY_SCHEMA) # --------------------------------------------------------------------------- # 8. Cross-table join keys for views # --------------------------------------------------------------------------- def test_prediction_accuracy_view_join_keys(): """prediction_accuracy view joins prediction_vs_outcome with trade_signals on recommendation_id and dt — both tables must have these columns.""" pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))} ts_cols = {TRADE_SIGNALS_SCHEMA.field(i).name for i in range(len(TRADE_SIGNALS_SCHEMA))} assert "recommendation_id" in pvo_cols assert "recommendation_id" in ts_cols assert "dt" in pvo_cols assert "dt" in ts_cols def test_paper_trade_scorecard_view_join_keys(): """paper_trade_scorecard joins pnl_daily with trade_orders on ticker, broker_account, and dt.""" pnl_cols = {PNL_DAILY_SCHEMA.field(i).name for i in range(len(PNL_DAILY_SCHEMA))} ord_cols = {TRADE_ORDERS_SCHEMA.field(i).name for i in range(len(TRADE_ORDERS_SCHEMA))} for key in ["ticker", "broker_account", "dt"]: assert key in pnl_cols, f"pnl_daily missing join key: {key}" assert key in ord_cols, f"trade_orders missing join key: {key}" def test_paper_trade_detail_view_join_keys(): """paper_trade_detail joins trade_orders, trade_fills, and prediction_vs_outcome.""" ord_cols = {TRADE_ORDERS_SCHEMA.field(i).name for i in range(len(TRADE_ORDERS_SCHEMA))} fill_cols = {TRADE_FILLS_SCHEMA.field(i).name for i in range(len(TRADE_FILLS_SCHEMA))} pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))} # orders ↔ fills on order_id, dt assert "order_id" in ord_cols assert "order_id" in fill_cols assert "dt" in ord_cols assert "dt" in fill_cols # orders ↔ prediction_vs_outcome on recommendation_id, dt assert "recommendation_id" in ord_cols assert "recommendation_id" in pvo_cols def test_signal_hit_rate_view_columns(): """signal_hit_rate groups by dt and model_version from prediction_vs_outcome.""" pvo_cols = {PREDICTION_VS_OUTCOME_SCHEMA.field(i).name for i in range(len(PREDICTION_VS_OUTCOME_SCHEMA))} assert "dt" in pvo_cols assert "model_version" in pvo_cols assert "outcome" in pvo_cols assert "predicted_confidence" in pvo_cols assert "actual_move_pct" in pvo_cols # --------------------------------------------------------------------------- # 9. Iceberg DDL consistency with lakehouse DDL # --------------------------------------------------------------------------- def test_iceberg_ddl_columns_match_lakehouse_ddl(): """Iceberg CREATE TABLE columns match the lakehouse DDL columns for every table.""" for td in get_all_table_defs(): ddl_path = LAKEHOUSE_DDL_DIR / f"{td.table_name}.sql" if not ddl_path.exists(): continue ddl_cols = _parse_ddl_columns(ddl_path) ddl_col_names = [c[0] for c in ddl_cols] iceberg_sql = td.create_table_sql() # Extract column block from Iceberg DDL (greedy to handle nested parens) match = re.search(r"CREATE TABLE[^(]+\((.*)\)\s*WITH", iceberg_sql, re.DOTALL) assert match is not None, f"Could not parse Iceberg DDL for {td.table_name}" iceberg_col_block = match.group(1) iceberg_col_names = [] for line in iceberg_col_block.strip().split("\n"): line = line.strip().rstrip(",") if line: parts = line.split() if parts: iceberg_col_names.append(parts[0].lower()) assert iceberg_col_names == ddl_col_names, ( f"Iceberg DDL column mismatch for {td.table_name}:\n" f" Iceberg: {iceberg_col_names}\n" f" DDL: {ddl_col_names}" ) # --------------------------------------------------------------------------- # 10. MinIO bucket and path conventions # --------------------------------------------------------------------------- def test_lakehouse_bucket_name(): assert LAKEHOUSE_BUCKET == "stonks-lakehouse" def test_warehouse_prefix(): assert WAREHOUSE_PREFIX == "warehouse" def test_all_paths_use_warehouse_prefix(): """Every table's partition path starts with warehouse/{table_name}/.""" for table in ALL_TABLES: spec = TABLE_PARTITIONS[table] extras = {k: "v" for k in spec.extra_keys} path = partition_path(table, NOW, extras) assert path.startswith(f"warehouse/{table}/"), ( f"Path for {table} doesn't follow convention: {path}" ) # --------------------------------------------------------------------------- # 11. Iceberg table locations point to correct MinIO paths # --------------------------------------------------------------------------- def test_iceberg_locations_match_ddl_external_locations(): """Iceberg table locations use s3a:// and match the lakehouse DDL external_location.""" for td in get_all_table_defs(): expected = f"s3a://{LAKEHOUSE_BUCKET}/{WAREHOUSE_PREFIX}/{td.table_name}/" assert td.location == expected, ( f"Iceberg location mismatch for {td.table_name}: " f"got={td.location}, expected={expected}" ) # --------------------------------------------------------------------------- # 12. Partition values are injected correctly # --------------------------------------------------------------------------- def test_partition_values_dt_only(): pv = partition_values(NOW) assert pv == {"dt": date(2026, 4, 11)} def test_partition_values_with_model_version(): pv = partition_values(NOW, {"model_version": "2.0.0"}) assert pv == {"dt": date(2026, 4, 11), "model_version": "2.0.0"} def test_partition_values_from_date(): pv = partition_values(date(2026, 4, 11)) assert pv == {"dt": date(2026, 4, 11)}