"""Lake publisher worker - writes partitioned Parquet facts to MinIO for Trino/Superset. Transforms operational recommendation and trend data into analytical fact datasets stored as Parquet files in Hive-compatible partition layouts on MinIO. Requirements: 9.4, 9.5, 10.1 Design ref: Section 4.10 (Lake Publisher), Section 7 (Analytical Lake Datasets) """ from __future__ import annotations import io import logging import re import time from datetime import datetime, timezone import pyarrow as pa import pyarrow.parquet as pq from minio import Minio from services.lake_publisher.partitions import ( LAKEHOUSE_BUCKET, partition_path, partition_values, s3_uri, ) from services.shared.metrics import ( LAKE_FACTS_PUBLISHED, LAKE_PUBLISH_BYTES, LAKE_PUBLISH_DURATION, ) from services.shared.schemas import Recommendation logger = logging.getLogger(__name__) # --- market_bars fact table --- MARKET_BARS_SCHEMA = pa.schema([ ("ticker", pa.string()), ("open_price", pa.float64()), ("high_price", pa.float64()), ("low_price", pa.float64()), ("close_price", pa.float64()), ("volume", pa.int64()), ("vwap", pa.float64()), ("trade_count", pa.int64()), ("bar_timestamp", pa.timestamp("us", tz="UTC")), ("bar_interval", pa.string()), ("source", pa.string()), ("dt", pa.date32()), ]) # --- market_quotes fact table --- MARKET_QUOTES_SCHEMA = pa.schema([ ("ticker", pa.string()), ("bid_price", pa.float64()), ("ask_price", pa.float64()), ("bid_size", pa.int64()), ("ask_size", pa.int64()), ("last_price", pa.float64()), ("last_size", pa.int64()), ("source", pa.string()), ("quote_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) # --- company_events fact table --- COMPANY_EVENTS_SCHEMA = pa.schema([ ("event_id", pa.string()), ("ticker", pa.string()), ("event_type", pa.string()), ("event_subtype", pa.string()), ("title", pa.string()), ("description", pa.string()), ("source", pa.string()), ("source_url", pa.string()), ("event_at", pa.timestamp("us", tz="UTC")), ("ingested_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) # --- documents fact table --- DOCUMENTS_SCHEMA = pa.schema([ ("document_id", pa.string()), ("document_type", pa.string()), ("source_type", pa.string()), ("ticker", pa.string()), ("publisher", pa.string()), ("title", pa.string()), ("url", pa.string()), ("canonical_url", pa.string()), ("language", pa.string()), ("published_at", pa.timestamp("us", tz="UTC")), ("retrieved_at", pa.timestamp("us", tz="UTC")), ("content_hash", pa.string()), ("confidence", pa.float64()), ("dt", pa.date32()), ]) # --- document_extractions fact table --- DOCUMENT_EXTRACTIONS_SCHEMA = pa.schema([ ("document_id", pa.string()), ("ticker", pa.string()), ("company_name", pa.string()), ("relevance", pa.float64()), ("sentiment", pa.string()), ("impact_score", pa.float64()), ("impact_horizon", pa.string()), ("catalyst_type", pa.string()), ("confidence", pa.float64()), ("novelty_score", pa.float64()), ("source_credibility", pa.float64()), ("key_facts", pa.string()), ("risks", pa.string()), ("macro_themes", pa.string()), ("model_name", pa.string()), ("prompt_version", pa.string()), ("schema_version", pa.string()), ("extraction_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ("model_version", pa.string()), ]) # --- trade_signals fact table --- TRADE_SIGNALS_SCHEMA = pa.schema([ ("signal_id", pa.string()), ("ticker", pa.string()), ("trend_direction", pa.string()), ("trend_strength", pa.float64()), ("confidence", pa.float64()), ("contradiction_score", pa.float64()), ("dominant_catalysts", pa.string()), ("material_risks", pa.string()), ("action", pa.string()), ("time_horizon", pa.string()), ("recommendation_id", pa.string()), ("generated_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def build_trade_signal_row( rec: Recommendation, trend_direction: str = "", trend_strength: float = 0.0, contradiction_score: float = 0.0, dominant_catalysts: str = "", material_risks: str = "", ) -> dict[str, object]: """Build a single trade_signals fact row from a Recommendation and its trend context.""" return { "signal_id": rec.recommendation_id, "ticker": rec.ticker, "trend_direction": trend_direction, "trend_strength": trend_strength, "confidence": rec.confidence, "contradiction_score": contradiction_score, "dominant_catalysts": dominant_catalysts, "material_risks": material_risks, "action": rec.action.value, "time_horizon": rec.time_horizon, "recommendation_id": rec.recommendation_id, "generated_at": rec.generated_at, **partition_values(rec.generated_at), } def _write_parquet_bytes(table: pa.Table) -> bytes: """Serialize a PyArrow table to Parquet bytes.""" buf = io.BytesIO() pq.write_table(table, buf) return buf.getvalue() def _put_lakehouse_object( client: Minio, table_name: str, path: str, parquet_bytes: bytes, ) -> None: """Write a Parquet file to MinIO and record Prometheus metrics.""" _start = time.monotonic() client.put_object( LAKEHOUSE_BUCKET, path, io.BytesIO(parquet_bytes), length=len(parquet_bytes), content_type="application/octet-stream", ) LAKE_PUBLISH_DURATION.labels(table_name=table_name).observe(time.monotonic() - _start) LAKE_FACTS_PUBLISHED.labels(table_name=table_name).inc() LAKE_PUBLISH_BYTES.labels(table_name=table_name).inc(len(parquet_bytes)) def _partition_path(table_name: str, dt: datetime, extra_partitions: dict[str, str] | None = None) -> str: """Build a Hive-compatible partition path. Delegates to services.lake_publisher.partitions for the canonical implementation. Kept for backward compatibility with existing callers. """ return partition_path(table_name, dt, extra_partitions) def publish_trade_signal( client: Minio, rec: Recommendation, trend_direction: str = "", trend_strength: float = 0.0, contradiction_score: float = 0.0, dominant_catalysts: str = "", material_risks: str = "", ) -> str: """Publish a single recommendation as a trade_signals fact to MinIO. Writes a Parquet file to the Hive-compatible partition layout: s3://stonks-lakehouse/warehouse/trade_signals/dt={date}/part-{ts}.parquet Returns the s3:// URI of the written object. """ row = build_trade_signal_row( rec, trend_direction, trend_strength, contradiction_score, dominant_catalysts, material_risks, ) table = pa.Table.from_pylist([row], schema=TRADE_SIGNALS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("trade_signals", rec.generated_at) _put_lakehouse_object(client, "trade_signals", path, parquet_bytes) ref = s3_uri(path) logger.info("Published trade_signal fact for %s: %s", rec.ticker, ref) return ref # --- prediction_vs_outcome fact table (skeleton for Phase 10+) --- PREDICTION_VS_OUTCOME_SCHEMA = pa.schema([ ("recommendation_id", pa.string()), ("ticker", pa.string()), ("predicted_action", pa.string()), ("predicted_confidence", pa.float64()), ("actual_move_pct", pa.float64()), ("outcome", pa.string()), ("horizon_days", pa.int32()), ("predicted_at", pa.timestamp("us", tz="UTC")), ("evaluated_at", pa.timestamp("us", tz="UTC")), ("model_version", pa.string()), ("dt", pa.date32()), ]) def publish_prediction_fact( client: Minio, rec: Recommendation, trend_direction: str = "", trend_strength: float = 0.0, ) -> str: """Publish a prediction fact for a recommendation. This writes the prediction side of the prediction_vs_outcome table. The outcome fields (actual_move_pct, outcome, evaluated_at) are left as placeholders — they get backfilled when market outcomes are known. Returns the s3:// URI of the written Parquet file. """ # Parse horizon days from time_horizon string (e.g. "swing_1d_10d" -> 10) horizon_days = _parse_horizon_days(rec.time_horizon) model_ver = getattr(rec.model_metadata, "model_name", "") if rec.model_metadata else "" extra = {"model_version": model_ver} row = { "recommendation_id": rec.recommendation_id, "ticker": rec.ticker, "predicted_action": rec.action.value, "predicted_confidence": rec.confidence, "actual_move_pct": None, "outcome": "pending", "horizon_days": horizon_days, "predicted_at": rec.generated_at, "evaluated_at": None, **partition_values(rec.generated_at, extra), } table = pa.Table.from_pylist([row], schema=PREDICTION_VS_OUTCOME_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("prediction_vs_outcome", rec.generated_at, extra) _put_lakehouse_object(client, "prediction_vs_outcome", path, parquet_bytes) ref = s3_uri(path) logger.info("Published prediction_vs_outcome fact for %s: %s", rec.ticker, ref) return ref def _parse_horizon_days(time_horizon: str) -> int: """Extract the max horizon days from a time_horizon string. Examples: "swing_1d_10d" -> 10 "position_10d_30d" -> 30 "scalp_intraday" -> 1 "" -> 0 """ if not time_horizon: return 0 if "intraday" in time_horizon: return 1 numbers = re.findall(r"(\d+)", time_horizon) if numbers: return max(int(n) for n in numbers) return 0 def publish_recommendation_facts( client: Minio, rec: Recommendation, trend_direction: str = "", trend_strength: float = 0.0, contradiction_score: float = 0.0, dominant_catalysts: str = "", material_risks: str = "", ) -> dict[str, str]: """Publish all analytical facts for a recommendation. Writes both trade_signals and prediction_vs_outcome facts. Returns a dict mapping table name to s3:// URI. """ refs: dict[str, str] = {} refs["trade_signals"] = publish_trade_signal( client, rec, trend_direction, trend_strength, contradiction_score, dominant_catalysts, material_risks, ) refs["prediction_vs_outcome"] = publish_prediction_fact( client, rec, trend_direction, trend_strength, ) return refs # --- trade_orders fact table --- TRADE_ORDERS_SCHEMA = pa.schema([ ("order_id", pa.string()), ("recommendation_id", pa.string()), ("ticker", pa.string()), ("side", pa.string()), ("order_type", pa.string()), ("quantity", pa.float64()), ("limit_price", pa.float64()), ("status", pa.string()), ("execution_mode", pa.string()), ("broker_account", pa.string()), ("submitted_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def build_trade_order_row( order_id: str, ticker: str, side: str, order_type: str, quantity: float, limit_price: float | None, status: str, broker_account: str, submitted_at: datetime, recommendation_id: str = "", execution_mode: str = "paper", ) -> dict[str, object]: """Build a single trade_orders fact row.""" return { "order_id": order_id, "recommendation_id": recommendation_id, "ticker": ticker, "side": side, "order_type": order_type, "quantity": quantity, "limit_price": limit_price, "status": status, "execution_mode": execution_mode, "broker_account": broker_account, "submitted_at": submitted_at, **partition_values(submitted_at), } def publish_trade_order( client: Minio, order_id: str, ticker: str, side: str, order_type: str, quantity: float, limit_price: float | None, status: str, broker_account: str, submitted_at: datetime, recommendation_id: str = "", execution_mode: str = "paper", ) -> str: """Publish a single order as a trade_orders fact to MinIO. Returns the s3:// URI of the written object. Requirements: 9.4, 9.5 Design ref: Section 7 (lake.trade_orders) """ row = build_trade_order_row( order_id, ticker, side, order_type, quantity, limit_price, status, broker_account, submitted_at, recommendation_id, execution_mode, ) table = pa.Table.from_pylist([row], schema=TRADE_ORDERS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("trade_orders", submitted_at) _put_lakehouse_object(client, "trade_orders", path, parquet_bytes) ref = s3_uri(path) logger.info("Published trade_order fact for %s: %s", ticker, ref) return ref # --- trade_fills fact table --- TRADE_FILLS_SCHEMA = pa.schema([ ("fill_id", pa.string()), ("order_id", pa.string()), ("ticker", pa.string()), ("side", pa.string()), ("fill_price", pa.float64()), ("fill_quantity", pa.float64()), ("commission", pa.float64()), ("broker_account", pa.string()), ("filled_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def build_trade_fill_row( fill_id: str, order_id: str, ticker: str, side: str, fill_price: float, fill_quantity: float, broker_account: str, filled_at: datetime, commission: float = 0.0, ) -> dict[str, object]: """Build a single trade_fills fact row.""" return { "fill_id": fill_id, "order_id": order_id, "ticker": ticker, "side": side, "fill_price": fill_price, "fill_quantity": fill_quantity, "commission": commission, "broker_account": broker_account, "filled_at": filled_at, **partition_values(filled_at), } def publish_trade_fill( client: Minio, fill_id: str, order_id: str, ticker: str, side: str, fill_price: float, fill_quantity: float, broker_account: str, filled_at: datetime, commission: float = 0.0, ) -> str: """Publish a single fill as a trade_fills fact to MinIO. Returns the s3:// URI of the written object. Requirements: 9.4, 9.5 Design ref: Section 7 (lake.trade_fills) """ row = build_trade_fill_row( fill_id, order_id, ticker, side, fill_price, fill_quantity, broker_account, filled_at, commission, ) table = pa.Table.from_pylist([row], schema=TRADE_FILLS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("trade_fills", filled_at) _put_lakehouse_object(client, "trade_fills", path, parquet_bytes) ref = s3_uri(path) logger.info("Published trade_fill fact for %s: %s", ticker, ref) return ref # --- positions_daily fact table --- POSITIONS_DAILY_SCHEMA = pa.schema([ ("ticker", pa.string()), ("quantity", pa.float64()), ("avg_entry_price", pa.float64()), ("close_price", pa.float64()), ("market_value", pa.float64()), ("unrealized_pnl", pa.float64()), ("broker_account", pa.string()), ("execution_mode", pa.string()), ("snapshot_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def build_position_daily_row( ticker: str, quantity: float, avg_entry_price: float, close_price: float, unrealized_pnl: float, broker_account: str, snapshot_at: datetime, market_value: float = 0.0, execution_mode: str = "paper", ) -> dict[str, object]: """Build a single positions_daily fact row.""" return { "ticker": ticker, "quantity": quantity, "avg_entry_price": avg_entry_price, "close_price": close_price, "market_value": market_value, "unrealized_pnl": unrealized_pnl, "broker_account": broker_account, "execution_mode": execution_mode, "snapshot_at": snapshot_at, **partition_values(snapshot_at), } def publish_position_daily( client: Minio, ticker: str, quantity: float, avg_entry_price: float, close_price: float, unrealized_pnl: float, broker_account: str, snapshot_at: datetime, ) -> str: """Publish a single position snapshot as a positions_daily fact to MinIO. Returns the s3:// URI of the written object. Requirements: 9.4, 9.5 Design ref: Section 7 (lake.positions_daily) """ row = build_position_daily_row( ticker, quantity, avg_entry_price, close_price, unrealized_pnl, broker_account, snapshot_at, ) table = pa.Table.from_pylist([row], schema=POSITIONS_DAILY_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("positions_daily", snapshot_at) _put_lakehouse_object(client, "positions_daily", path, parquet_bytes) ref = s3_uri(path) logger.info("Published positions_daily fact for %s: %s", ticker, ref) return ref def publish_positions_daily_batch( client: Minio, positions: list[dict], broker_account: str, snapshot_at: datetime, ) -> str: """Publish a batch of position snapshots as a single Parquet file. Each dict in positions should have: ticker, quantity, avg_entry_price, close_price, unrealized_pnl. Returns the s3:// URI of the written object. """ rows = [ build_position_daily_row( ticker=p["ticker"], quantity=p["quantity"], avg_entry_price=p["avg_entry_price"], close_price=p["close_price"], unrealized_pnl=p["unrealized_pnl"], broker_account=broker_account, snapshot_at=snapshot_at, ) for p in positions ] if not rows: logger.info("No positions to publish for positions_daily") return "" table = pa.Table.from_pylist(rows, schema=POSITIONS_DAILY_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("positions_daily", snapshot_at) _put_lakehouse_object(client, "positions_daily", path, parquet_bytes) ref = s3_uri(path) logger.info("Published %d positions_daily facts: %s", len(rows), ref) return ref # --- pnl_daily fact table --- PNL_DAILY_SCHEMA = pa.schema([ ("ticker", pa.string()), ("realized_pnl", pa.float64()), ("unrealized_pnl", pa.float64()), ("total_pnl", pa.float64()), ("fees", pa.float64()), ("net_pnl", pa.float64()), ("broker_account", pa.string()), ("execution_mode", pa.string()), ("dt", pa.date32()), ]) def build_pnl_daily_row( ticker: str, realized_pnl: float, unrealized_pnl: float, total_pnl: float, broker_account: str, dt: datetime | None = None, fees: float = 0.0, net_pnl: float | None = None, execution_mode: str = "paper", ) -> dict[str, object]: """Build a single pnl_daily fact row.""" row_dt = dt or datetime.now(timezone.utc) return { "ticker": ticker, "realized_pnl": realized_pnl, "unrealized_pnl": unrealized_pnl, "total_pnl": total_pnl, "fees": fees, "net_pnl": net_pnl if net_pnl is not None else total_pnl - fees, "broker_account": broker_account, "execution_mode": execution_mode, **partition_values(row_dt), } def publish_pnl_daily( client: Minio, ticker: str, realized_pnl: float, unrealized_pnl: float, total_pnl: float, broker_account: str, dt: datetime, fees: float = 0.0, net_pnl: float | None = None, execution_mode: str = "paper", ) -> str: """Publish a single pnl_daily fact to MinIO. Returns the s3:// URI of the written object. Requirements: 9.4, 9.5 Design ref: Section 7 (lake.pnl_daily) """ row = build_pnl_daily_row( ticker, realized_pnl, unrealized_pnl, total_pnl, broker_account, dt=dt, fees=fees, net_pnl=net_pnl, execution_mode=execution_mode, ) table = pa.Table.from_pylist([row], schema=PNL_DAILY_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("pnl_daily", dt) _put_lakehouse_object(client, "pnl_daily", path, parquet_bytes) ref = s3_uri(path) logger.info("Published pnl_daily fact for %s: %s", ticker, ref) return ref # --- market_bars publisher --- def publish_market_bar( client: Minio, ticker: str, open_price: float, high_price: float, low_price: float, close_price: float, volume: int, bar_timestamp: datetime, source: str, vwap: float = 0.0, trade_count: int = 0, bar_interval: str = "1d", ) -> str: """Publish a single market bar fact to MinIO. Requirements: 2.1, 9.4, 9.5 Design ref: Section 7 (lake.market_bars) """ row: dict[str, object] = { "ticker": ticker, "open_price": open_price, "high_price": high_price, "low_price": low_price, "close_price": close_price, "volume": volume, "vwap": vwap, "trade_count": trade_count, "bar_timestamp": bar_timestamp, "bar_interval": bar_interval, "source": source, **partition_values(bar_timestamp), } table = pa.Table.from_pylist([row], schema=MARKET_BARS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("market_bars", bar_timestamp) _put_lakehouse_object(client, "market_bars", path, parquet_bytes) ref = s3_uri(path) logger.info("Published market_bar fact for %s: %s", ticker, ref) return ref # --- market_quotes publisher --- def publish_market_quote( client: Minio, ticker: str, bid_price: float, ask_price: float, last_price: float, quote_at: datetime, source: str, bid_size: int = 0, ask_size: int = 0, last_size: int = 0, ) -> str: """Publish a single market quote fact to MinIO. Requirements: 2.1, 9.4, 9.5 Design ref: Section 7 (lake.market_quotes) """ row: dict[str, object] = { "ticker": ticker, "bid_price": bid_price, "ask_price": ask_price, "bid_size": bid_size, "ask_size": ask_size, "last_price": last_price, "last_size": last_size, "source": source, "quote_at": quote_at, **partition_values(quote_at), } table = pa.Table.from_pylist([row], schema=MARKET_QUOTES_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("market_quotes", quote_at) _put_lakehouse_object(client, "market_quotes", path, parquet_bytes) ref = s3_uri(path) logger.info("Published market_quote fact for %s: %s", ticker, ref) return ref # --- company_events publisher --- def publish_company_event( client: Minio, event_id: str, ticker: str, event_type: str, title: str, event_at: datetime, source: str, event_subtype: str = "", description: str = "", source_url: str = "", ingested_at: datetime | None = None, ) -> str: """Publish a single company event fact to MinIO. Requirements: 2.3, 9.4, 9.5 Design ref: Section 7 (lake.company_events) """ row: dict[str, object] = { "event_id": event_id, "ticker": ticker, "event_type": event_type, "event_subtype": event_subtype, "title": title, "description": description, "source": source, "source_url": source_url, "event_at": event_at, "ingested_at": ingested_at or datetime.now(timezone.utc), **partition_values(event_at), } table = pa.Table.from_pylist([row], schema=COMPANY_EVENTS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("company_events", event_at) _put_lakehouse_object(client, "company_events", path, parquet_bytes) ref = s3_uri(path) logger.info("Published company_event fact for %s: %s", ticker, ref) return ref # --- documents publisher --- def publish_document_fact( client: Minio, document_id: str, document_type: str, source_type: str, ticker: str, publisher: str, title: str, published_at: datetime, content_hash: str, url: str = "", canonical_url: str = "", language: str = "en", confidence: float = 0.0, retrieved_at: datetime | None = None, ) -> str: """Publish a single document metadata fact to MinIO. Requirements: 3.1, 3.3, 9.4, 9.5 Design ref: Section 6.2, Section 7 (lake.documents) """ row: dict[str, object] = { "document_id": document_id, "document_type": document_type, "source_type": source_type, "ticker": ticker, "publisher": publisher, "title": title, "url": url, "canonical_url": canonical_url, "language": language, "published_at": published_at, "retrieved_at": retrieved_at or datetime.now(timezone.utc), "content_hash": content_hash, "confidence": confidence, **partition_values(published_at), } table = pa.Table.from_pylist([row], schema=DOCUMENTS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("documents", published_at) _put_lakehouse_object(client, "documents", path, parquet_bytes) ref = s3_uri(path) logger.info("Published document fact for %s: %s", ticker, ref) return ref # --- document_extractions publisher --- def publish_document_extraction( client: Minio, document_id: str, ticker: str, sentiment: str, impact_score: float, catalyst_type: str, confidence: float, extraction_at: datetime, model_name: str, prompt_version: str, company_name: str = "", relevance: float = 0.0, impact_horizon: str = "", novelty_score: float = 0.0, source_credibility: float = 0.0, key_facts: str = "", risks: str = "", macro_themes: str = "", schema_version: str = "", ) -> str: """Publish a single document extraction fact to MinIO. Requirements: 5.3, 5.5, 9.4, 9.5 Design ref: Section 6.3, Section 7 (lake.document_extractions) """ model_ver = schema_version or prompt_version extra = {"model_version": model_ver} row: dict[str, object] = { "document_id": document_id, "ticker": ticker, "company_name": company_name, "relevance": relevance, "sentiment": sentiment, "impact_score": impact_score, "impact_horizon": impact_horizon, "catalyst_type": catalyst_type, "confidence": confidence, "novelty_score": novelty_score, "source_credibility": source_credibility, "key_facts": key_facts, "risks": risks, "macro_themes": macro_themes, "model_name": model_name, "prompt_version": prompt_version, "schema_version": schema_version, "extraction_at": extraction_at, **partition_values(extraction_at, extra), } table = pa.Table.from_pylist([row], schema=DOCUMENT_EXTRACTIONS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path( "document_extractions", extraction_at, extra_partitions=extra, ) _put_lakehouse_object(client, "document_extractions", path, parquet_bytes) ref = s3_uri(path) logger.info("Published document_extraction fact for %s/%s: %s", ticker, document_id, ref) return ref # --- model_performance fact table --- MODEL_PERFORMANCE_SCHEMA = pa.schema([ ("document_id", pa.string()), ("ticker", pa.string()), ("model_name", pa.string()), ("prompt_version", pa.string()), ("schema_version", pa.string()), ("success", pa.bool_()), ("attempt_count", pa.int32()), ("total_duration_ms", pa.int32()), ("first_attempt_duration_ms", pa.int32()), ("final_attempt_duration_ms", pa.int32()), ("confidence", pa.float64()), ("validation_status", pa.string()), ("validation_error_count", pa.int32()), ("validation_warning_count", pa.int32()), ("retry_count", pa.int32()), ("input_token_estimate", pa.int32()), ("output_token_estimate", pa.int32()), ("company_count", pa.int32()), ("recorded_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ("model_version", pa.string()), ]) def build_model_performance_row( document_id: str, model_name: str, success: bool, total_duration_ms: int, recorded_at: datetime, ticker: str = "", prompt_version: str = "", schema_version: str = "", attempt_count: int = 1, first_attempt_duration_ms: int = 0, final_attempt_duration_ms: int = 0, confidence: float = 0.0, validation_status: str = "unknown", validation_error_count: int = 0, validation_warning_count: int = 0, retry_count: int = 0, input_token_estimate: int = 0, output_token_estimate: int = 0, company_count: int = 0, ) -> dict[str, object]: """Build a single model_performance fact row.""" model_ver = schema_version or prompt_version or model_name return { "document_id": document_id, "ticker": ticker, "model_name": model_name, "prompt_version": prompt_version, "schema_version": schema_version, "success": success, "attempt_count": attempt_count, "total_duration_ms": total_duration_ms, "first_attempt_duration_ms": first_attempt_duration_ms, "final_attempt_duration_ms": final_attempt_duration_ms, "confidence": confidence, "validation_status": validation_status, "validation_error_count": validation_error_count, "validation_warning_count": validation_warning_count, "retry_count": retry_count, "input_token_estimate": input_token_estimate, "output_token_estimate": output_token_estimate, "company_count": company_count, "recorded_at": recorded_at, **partition_values(recorded_at, {"model_version": model_ver}), } def publish_model_performance( client: Minio, document_id: str, model_name: str, success: bool, total_duration_ms: int, recorded_at: datetime, ticker: str = "", prompt_version: str = "", schema_version: str = "", attempt_count: int = 1, first_attempt_duration_ms: int = 0, final_attempt_duration_ms: int = 0, confidence: float = 0.0, validation_status: str = "unknown", validation_error_count: int = 0, validation_warning_count: int = 0, retry_count: int = 0, input_token_estimate: int = 0, output_token_estimate: int = 0, company_count: int = 0, ) -> str: """Publish a single model performance fact to MinIO. Requirements: 12.1, 12.2, 9.4, 9.5 Design ref: Section 7 (lake.model_performance) """ row = build_model_performance_row( document_id=document_id, model_name=model_name, success=success, total_duration_ms=total_duration_ms, recorded_at=recorded_at, ticker=ticker, prompt_version=prompt_version, schema_version=schema_version, attempt_count=attempt_count, first_attempt_duration_ms=first_attempt_duration_ms, final_attempt_duration_ms=final_attempt_duration_ms, confidence=confidence, validation_status=validation_status, validation_error_count=validation_error_count, validation_warning_count=validation_warning_count, retry_count=retry_count, input_token_estimate=input_token_estimate, output_token_estimate=output_token_estimate, company_count=company_count, ) model_ver = schema_version or prompt_version or model_name table = pa.Table.from_pylist([row], schema=MODEL_PERFORMANCE_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path( "model_performance", recorded_at, extra_partitions={"model_version": model_ver}, ) _put_lakehouse_object(client, "model_performance", path, parquet_bytes) ref = s3_uri(path) logger.info("Published model_performance fact for %s/%s: %s", model_name, document_id, ref) return ref # --------------------------------------------------------------------------- # Batch publish helpers # --------------------------------------------------------------------------- def _publish_batch( client: Minio, table_name: str, rows: list[dict[str, object]], schema: pa.Schema, dt: datetime, extra_partitions: dict[str, str] | None = None, ) -> str: """Generic batch publisher — writes a list of row dicts as a single Parquet file. Returns the s3:// URI of the written object, or "" if rows is empty. """ if not rows: logger.info("No rows to publish for %s", table_name) return "" # Inject partition columns into rows that don't already have them. pv = partition_values(dt, extra_partitions) enriched = [] for row in rows: merged = {**row} for k, v in pv.items(): if k not in merged: merged[k] = v enriched.append(merged) table = pa.Table.from_pylist(enriched, schema=schema) parquet_bytes = _write_parquet_bytes(table) path = _partition_path(table_name, dt, extra_partitions) _pub_start = time.monotonic() client.put_object( LAKEHOUSE_BUCKET, path, io.BytesIO(parquet_bytes), length=len(parquet_bytes), content_type="application/octet-stream", ) LAKE_PUBLISH_DURATION.labels(table_name=table_name).observe(time.monotonic() - _pub_start) LAKE_FACTS_PUBLISHED.labels(table_name=table_name).inc(len(enriched)) LAKE_PUBLISH_BYTES.labels(table_name=table_name).inc(len(parquet_bytes)) ref = s3_uri(path) logger.info("Published %d %s facts: %s", len(enriched), table_name, ref) return ref def publish_market_bars_batch( client: Minio, bars: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of market bar rows as a single Parquet file. Each dict should match MARKET_BARS_SCHEMA field names. """ return _publish_batch(client, "market_bars", bars, MARKET_BARS_SCHEMA, dt) def publish_market_quotes_batch( client: Minio, quotes: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of market quote rows as a single Parquet file.""" return _publish_batch(client, "market_quotes", quotes, MARKET_QUOTES_SCHEMA, dt) def publish_company_events_batch( client: Minio, events: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of company event rows as a single Parquet file.""" return _publish_batch(client, "company_events", events, COMPANY_EVENTS_SCHEMA, dt) def publish_documents_batch( client: Minio, docs: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of document metadata rows as a single Parquet file.""" return _publish_batch(client, "documents", docs, DOCUMENTS_SCHEMA, dt) def publish_document_extractions_batch( client: Minio, extractions: list[dict[str, object]], dt: datetime, model_version: str = "", ) -> str: """Publish a batch of document extraction rows as a single Parquet file.""" extra = {"model_version": model_version} if model_version else None return _publish_batch(client, "document_extractions", extractions, DOCUMENT_EXTRACTIONS_SCHEMA, dt, extra) def publish_trade_signals_batch( client: Minio, signals: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of trade signal rows as a single Parquet file.""" return _publish_batch(client, "trade_signals", signals, TRADE_SIGNALS_SCHEMA, dt) def publish_trade_orders_batch( client: Minio, orders: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of trade order rows as a single Parquet file.""" return _publish_batch(client, "trade_orders", orders, TRADE_ORDERS_SCHEMA, dt) def publish_trade_fills_batch( client: Minio, fills: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of trade fill rows as a single Parquet file.""" return _publish_batch(client, "trade_fills", fills, TRADE_FILLS_SCHEMA, dt) def publish_pnl_daily_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of PnL daily rows as a single Parquet file.""" return _publish_batch(client, "pnl_daily", rows, PNL_DAILY_SCHEMA, dt) def publish_model_performance_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, model_version: str = "", ) -> str: """Publish a batch of model performance rows as a single Parquet file.""" extra = {"model_version": model_version} if model_version else None return _publish_batch(client, "model_performance", rows, MODEL_PERFORMANCE_SCHEMA, dt, extra) def publish_prediction_vs_outcome_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of prediction vs outcome rows as a single Parquet file.""" return _publish_batch(client, "prediction_vs_outcome", rows, PREDICTION_VS_OUTCOME_SCHEMA, dt) # --- global_events fact table --- GLOBAL_EVENTS_SCHEMA = pa.schema([ ("event_id", pa.string()), ("event_types", pa.string()), ("severity", pa.string()), ("affected_regions", pa.string()), ("affected_sectors", pa.string()), ("affected_commodities", pa.string()), ("summary", pa.string()), ("estimated_duration", pa.string()), ("confidence", pa.float64()), ("source_document_id", pa.string()), ("created_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def publish_global_event_fact( client: Minio, event_id: str, event_types: list[str], severity: str, affected_regions: list[str], affected_sectors: list[str], affected_commodities: list[str], summary: str, estimated_duration: str, confidence: float, source_document_id: str, created_at: datetime, ) -> str: """Publish a single global event fact to MinIO. Writes a Parquet file to: s3://stonks-lakehouse/warehouse/global_events/dt={date}/part-{uuid}.parquet Returns the s3:// URI of the written object. Requirements: 7.3, 12.6 Design ref: Analytical Lake Datasets (lake.global_events) """ row: dict[str, object] = { "event_id": event_id, "event_types": ", ".join(event_types), "severity": severity, "affected_regions": ", ".join(affected_regions), "affected_sectors": ", ".join(affected_sectors), "affected_commodities": ", ".join(affected_commodities), "summary": summary, "estimated_duration": estimated_duration, "confidence": confidence, "source_document_id": source_document_id, "created_at": created_at, **partition_values(created_at), } table = pa.Table.from_pylist([row], schema=GLOBAL_EVENTS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("global_events", created_at) _put_lakehouse_object(client, "global_events", path, parquet_bytes) ref = s3_uri(path) logger.info("Published global_event fact %s: %s", event_id, ref) return ref # --- macro_impacts fact table --- MACRO_IMPACTS_SCHEMA = pa.schema([ ("event_id", pa.string()), ("company_id", pa.string()), ("ticker", pa.string()), ("macro_impact_score", pa.float64()), ("impact_direction", pa.string()), ("contributing_factors", pa.string()), ("confidence", pa.float64()), ("computed_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def publish_macro_impact_fact( client: Minio, event_id: str, company_id: str, ticker: str, macro_impact_score: float, impact_direction: str, contributing_factors: list[str], confidence: float, computed_at: datetime, ) -> str: """Publish a single macro impact fact to MinIO. Writes a Parquet file to: s3://stonks-lakehouse/warehouse/macro_impacts/dt={date}/ticker={ticker}/part-{uuid}.parquet Returns the s3:// URI of the written object. Requirements: 7.3, 12.6 Design ref: Analytical Lake Datasets (lake.macro_impacts) """ extra = {"ticker": ticker} row: dict[str, object] = { "event_id": event_id, "company_id": company_id, "ticker": ticker, "macro_impact_score": macro_impact_score, "impact_direction": impact_direction, "contributing_factors": ", ".join(contributing_factors), "confidence": confidence, "computed_at": computed_at, **partition_values(computed_at, extra), } table = pa.Table.from_pylist([row], schema=MACRO_IMPACTS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("macro_impacts", computed_at, extra_partitions=extra) _put_lakehouse_object(client, "macro_impacts", path, parquet_bytes) ref = s3_uri(path) logger.info("Published macro_impact fact for %s/%s: %s", ticker, event_id, ref) return ref # --- trend_projections fact table --- TREND_PROJECTIONS_SCHEMA = pa.schema([ ("trend_window_id", pa.string()), ("ticker", pa.string()), ("projected_direction", pa.string()), ("projected_strength", pa.float64()), ("projected_confidence", pa.float64()), ("projection_horizon", pa.string()), ("driving_factors", pa.string()), ("macro_contribution_pct", pa.float64()), ("diverges_from_current", pa.bool_()), ("computed_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def publish_trend_projection_fact( client: Minio, trend_window_id: str, ticker: str, projected_direction: str, projected_strength: float, projected_confidence: float, projection_horizon: str, driving_factors: list[str], macro_contribution_pct: float, diverges_from_current: bool, computed_at: datetime, ) -> str: """Publish a single trend projection fact to MinIO. Writes a Parquet file to: s3://stonks-lakehouse/warehouse/trend_projections/dt={date}/ticker={ticker}/part-{uuid}.parquet Returns the s3:// URI of the written object. Requirements: 7.3, 12.6 Design ref: Analytical Lake Datasets (lake.trend_projections) """ extra = {"ticker": ticker} row: dict[str, object] = { "trend_window_id": trend_window_id, "ticker": ticker, "projected_direction": projected_direction, "projected_strength": projected_strength, "projected_confidence": projected_confidence, "projection_horizon": projection_horizon, "driving_factors": ", ".join(driving_factors), "macro_contribution_pct": macro_contribution_pct, "diverges_from_current": diverges_from_current, "computed_at": computed_at, **partition_values(computed_at, extra), } table = pa.Table.from_pylist([row], schema=TREND_PROJECTIONS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("trend_projections", computed_at, extra_partitions=extra) _put_lakehouse_object(client, "trend_projections", path, parquet_bytes) ref = s3_uri(path) logger.info("Published trend_projection fact for %s: %s", ticker, ref) return ref # --- Batch publishers for macro fact tables --- def publish_global_events_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of global event rows as a single Parquet file.""" return _publish_batch(client, "global_events", rows, GLOBAL_EVENTS_SCHEMA, dt) def publish_macro_impacts_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ticker: str = "", ) -> str: """Publish a batch of macro impact rows as a single Parquet file.""" extra = {"ticker": ticker} if ticker else None return _publish_batch(client, "macro_impacts", rows, MACRO_IMPACTS_SCHEMA, dt, extra) def publish_trend_projections_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ticker: str = "", ) -> str: """Publish a batch of trend projection rows as a single Parquet file.""" extra = {"ticker": ticker} if ticker else None return _publish_batch(client, "trend_projections", rows, TREND_PROJECTIONS_SCHEMA, dt, extra) # --- competitor_relationships fact table --- COMPETITOR_RELATIONSHIPS_SCHEMA = pa.schema([ ("id", pa.string()), ("company_a_id", pa.string()), ("company_b_id", pa.string()), ("relationship_type", pa.string()), ("strength", pa.float64()), ("bidirectional", pa.bool_()), ("source", pa.string()), ("active", pa.bool_()), ("created_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def publish_competitor_relationship_fact( client: Minio, relationship_id: str, company_a_id: str, company_b_id: str, relationship_type: str, strength: float, bidirectional: bool, source: str, active: bool, created_at: datetime, ) -> str: """Publish a single competitor relationship fact to MinIO. Writes a Parquet file to: s3://stonks-lakehouse/warehouse/competitor_relationships/dt={date}/part-{uuid}.parquet Returns the s3:// URI of the written object. Requirements: 7.3 Design ref: Analytical Lake Datasets (lake.competitor_relationships) """ row: dict[str, object] = { "id": relationship_id, "company_a_id": company_a_id, "company_b_id": company_b_id, "relationship_type": relationship_type, "strength": strength, "bidirectional": bidirectional, "source": source, "active": active, "created_at": created_at, **partition_values(created_at), } table = pa.Table.from_pylist([row], schema=COMPETITOR_RELATIONSHIPS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("competitor_relationships", created_at) _put_lakehouse_object(client, "competitor_relationships", path, parquet_bytes) ref = s3_uri(path) logger.info("Published competitor_relationship fact %s: %s", relationship_id, ref) return ref def publish_competitor_relationships_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, ) -> str: """Publish a batch of competitor relationship rows as a single Parquet file.""" return _publish_batch(client, "competitor_relationships", rows, COMPETITOR_RELATIONSHIPS_SCHEMA, dt) # --- competitive_signals fact table --- COMPETITIVE_SIGNALS_SCHEMA = pa.schema([ ("id", pa.string()), ("source_document_id", pa.string()), ("source_ticker", pa.string()), ("target_ticker", pa.string()), ("catalyst_type", pa.string()), ("pattern_confidence", pa.float64()), ("signal_direction", pa.string()), ("signal_strength", pa.float64()), ("relationship_strength", pa.float64()), ("computed_at", pa.timestamp("us", tz="UTC")), ("dt", pa.date32()), ]) def publish_competitive_signal_fact( client: Minio, signal_id: str, source_document_id: str, source_ticker: str, target_ticker: str, catalyst_type: str, pattern_confidence: float, signal_direction: str, signal_strength: float, relationship_strength: float, computed_at: datetime, ) -> str: """Publish a single competitive signal fact to MinIO. Writes a Parquet file to: s3://stonks-lakehouse/warehouse/competitive_signals/dt={date}/target_ticker={ticker}/part-{uuid}.parquet Returns the s3:// URI of the written object. Requirements: 7.4 Design ref: Analytical Lake Datasets (lake.competitive_signals) """ extra = {"target_ticker": target_ticker} row: dict[str, object] = { "id": signal_id, "source_document_id": source_document_id, "source_ticker": source_ticker, "target_ticker": target_ticker, "catalyst_type": catalyst_type, "pattern_confidence": pattern_confidence, "signal_direction": signal_direction, "signal_strength": signal_strength, "relationship_strength": relationship_strength, "computed_at": computed_at, **partition_values(computed_at, extra), } table = pa.Table.from_pylist([row], schema=COMPETITIVE_SIGNALS_SCHEMA) parquet_bytes = _write_parquet_bytes(table) path = _partition_path("competitive_signals", computed_at, extra_partitions=extra) _put_lakehouse_object(client, "competitive_signals", path, parquet_bytes) ref = s3_uri(path) logger.info("Published competitive_signal fact for %s→%s: %s", source_ticker, target_ticker, ref) return ref def publish_competitive_signals_batch( client: Minio, rows: list[dict[str, object]], dt: datetime, target_ticker: str = "", ) -> str: """Publish a batch of competitive signal rows as a single Parquet file.""" extra = {"target_ticker": target_ticker} if target_ticker else None return _publish_batch(client, "competitive_signals", rows, COMPETITIVE_SIGNALS_SCHEMA, dt, extra)