Files
stonks-oracle/services/lake_publisher/worker.py
T

1599 lines
48 KiB
Python

"""Lake publisher worker - writes partitioned Parquet facts to MinIO for Trino/Superset.
Transforms operational recommendation and trend data into analytical fact datasets
stored as Parquet files in Hive-compatible partition layouts on MinIO.
Requirements: 9.4, 9.5, 10.1
Design ref: Section 4.10 (Lake Publisher), Section 7 (Analytical Lake Datasets)
"""
from __future__ import annotations
import io
import logging
import re
import time
from datetime import datetime, timezone
import pyarrow as pa
import pyarrow.parquet as pq
from minio import Minio
from services.lake_publisher.partitions import (
LAKEHOUSE_BUCKET,
partition_path,
partition_values,
s3_uri,
)
from services.shared.metrics import (
LAKE_FACTS_PUBLISHED,
LAKE_PUBLISH_BYTES,
LAKE_PUBLISH_DURATION,
)
from services.shared.schemas import Recommendation
logger = logging.getLogger(__name__)
# --- market_bars fact table ---
MARKET_BARS_SCHEMA = pa.schema([
("ticker", pa.string()),
("open_price", pa.float64()),
("high_price", pa.float64()),
("low_price", pa.float64()),
("close_price", pa.float64()),
("volume", pa.int64()),
("vwap", pa.float64()),
("trade_count", pa.int64()),
("bar_timestamp", pa.timestamp("us", tz="UTC")),
("bar_interval", pa.string()),
("source", pa.string()),
("dt", pa.date32()),
])
# --- market_quotes fact table ---
MARKET_QUOTES_SCHEMA = pa.schema([
("ticker", pa.string()),
("bid_price", pa.float64()),
("ask_price", pa.float64()),
("bid_size", pa.int64()),
("ask_size", pa.int64()),
("last_price", pa.float64()),
("last_size", pa.int64()),
("source", pa.string()),
("quote_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
# --- company_events fact table ---
COMPANY_EVENTS_SCHEMA = pa.schema([
("event_id", pa.string()),
("ticker", pa.string()),
("event_type", pa.string()),
("event_subtype", pa.string()),
("title", pa.string()),
("description", pa.string()),
("source", pa.string()),
("source_url", pa.string()),
("event_at", pa.timestamp("us", tz="UTC")),
("ingested_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
# --- documents fact table ---
DOCUMENTS_SCHEMA = pa.schema([
("document_id", pa.string()),
("document_type", pa.string()),
("source_type", pa.string()),
("ticker", pa.string()),
("publisher", pa.string()),
("title", pa.string()),
("url", pa.string()),
("canonical_url", pa.string()),
("language", pa.string()),
("published_at", pa.timestamp("us", tz="UTC")),
("retrieved_at", pa.timestamp("us", tz="UTC")),
("content_hash", pa.string()),
("confidence", pa.float64()),
("dt", pa.date32()),
])
# --- document_extractions fact table ---
DOCUMENT_EXTRACTIONS_SCHEMA = pa.schema([
("document_id", pa.string()),
("ticker", pa.string()),
("company_name", pa.string()),
("relevance", pa.float64()),
("sentiment", pa.string()),
("impact_score", pa.float64()),
("impact_horizon", pa.string()),
("catalyst_type", pa.string()),
("confidence", pa.float64()),
("novelty_score", pa.float64()),
("source_credibility", pa.float64()),
("key_facts", pa.string()),
("risks", pa.string()),
("macro_themes", pa.string()),
("model_name", pa.string()),
("prompt_version", pa.string()),
("schema_version", pa.string()),
("extraction_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
("model_version", pa.string()),
])
# --- trade_signals fact table ---
TRADE_SIGNALS_SCHEMA = pa.schema([
("signal_id", pa.string()),
("ticker", pa.string()),
("trend_direction", pa.string()),
("trend_strength", pa.float64()),
("confidence", pa.float64()),
("contradiction_score", pa.float64()),
("dominant_catalysts", pa.string()),
("material_risks", pa.string()),
("action", pa.string()),
("time_horizon", pa.string()),
("recommendation_id", pa.string()),
("generated_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def build_trade_signal_row(
rec: Recommendation,
trend_direction: str = "",
trend_strength: float = 0.0,
contradiction_score: float = 0.0,
dominant_catalysts: str = "",
material_risks: str = "",
) -> dict[str, object]:
"""Build a single trade_signals fact row from a Recommendation and its trend context."""
return {
"signal_id": rec.recommendation_id,
"ticker": rec.ticker,
"trend_direction": trend_direction,
"trend_strength": trend_strength,
"confidence": rec.confidence,
"contradiction_score": contradiction_score,
"dominant_catalysts": dominant_catalysts,
"material_risks": material_risks,
"action": rec.action.value,
"time_horizon": rec.time_horizon,
"recommendation_id": rec.recommendation_id,
"generated_at": rec.generated_at,
**partition_values(rec.generated_at),
}
def _write_parquet_bytes(table: pa.Table) -> bytes:
"""Serialize a PyArrow table to Parquet bytes."""
buf = io.BytesIO()
pq.write_table(table, buf)
return buf.getvalue()
def _put_lakehouse_object(
client: Minio,
table_name: str,
path: str,
parquet_bytes: bytes,
) -> None:
"""Write a Parquet file to MinIO and record Prometheus metrics."""
_start = time.monotonic()
client.put_object(
LAKEHOUSE_BUCKET,
path,
io.BytesIO(parquet_bytes),
length=len(parquet_bytes),
content_type="application/octet-stream",
)
LAKE_PUBLISH_DURATION.labels(table_name=table_name).observe(time.monotonic() - _start)
LAKE_FACTS_PUBLISHED.labels(table_name=table_name).inc()
LAKE_PUBLISH_BYTES.labels(table_name=table_name).inc(len(parquet_bytes))
def _partition_path(table_name: str, dt: datetime, extra_partitions: dict[str, str] | None = None) -> str:
"""Build a Hive-compatible partition path.
Delegates to services.lake_publisher.partitions for the canonical implementation.
Kept for backward compatibility with existing callers.
"""
return partition_path(table_name, dt, extra_partitions)
def publish_trade_signal(
client: Minio,
rec: Recommendation,
trend_direction: str = "",
trend_strength: float = 0.0,
contradiction_score: float = 0.0,
dominant_catalysts: str = "",
material_risks: str = "",
) -> str:
"""Publish a single recommendation as a trade_signals fact to MinIO.
Writes a Parquet file to the Hive-compatible partition layout:
s3://stonks-lakehouse/warehouse/trade_signals/dt={date}/part-{ts}.parquet
Returns the s3:// URI of the written object.
"""
row = build_trade_signal_row(
rec, trend_direction, trend_strength,
contradiction_score, dominant_catalysts, material_risks,
)
table = pa.Table.from_pylist([row], schema=TRADE_SIGNALS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("trade_signals", rec.generated_at)
_put_lakehouse_object(client, "trade_signals", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published trade_signal fact for %s: %s", rec.ticker, ref)
return ref
# --- prediction_vs_outcome fact table (skeleton for Phase 10+) ---
PREDICTION_VS_OUTCOME_SCHEMA = pa.schema([
("recommendation_id", pa.string()),
("ticker", pa.string()),
("predicted_action", pa.string()),
("predicted_confidence", pa.float64()),
("actual_move_pct", pa.float64()),
("outcome", pa.string()),
("horizon_days", pa.int32()),
("predicted_at", pa.timestamp("us", tz="UTC")),
("evaluated_at", pa.timestamp("us", tz="UTC")),
("model_version", pa.string()),
("dt", pa.date32()),
])
def publish_prediction_fact(
client: Minio,
rec: Recommendation,
trend_direction: str = "",
trend_strength: float = 0.0,
) -> str:
"""Publish a prediction fact for a recommendation.
This writes the prediction side of the prediction_vs_outcome table.
The outcome fields (actual_move_pct, outcome, evaluated_at) are left
as placeholders — they get backfilled when market outcomes are known.
Returns the s3:// URI of the written Parquet file.
"""
# Parse horizon days from time_horizon string (e.g. "swing_1d_10d" -> 10)
horizon_days = _parse_horizon_days(rec.time_horizon)
model_ver = getattr(rec.model_metadata, "model_name", "") if rec.model_metadata else ""
extra = {"model_version": model_ver}
row = {
"recommendation_id": rec.recommendation_id,
"ticker": rec.ticker,
"predicted_action": rec.action.value,
"predicted_confidence": rec.confidence,
"actual_move_pct": None,
"outcome": "pending",
"horizon_days": horizon_days,
"predicted_at": rec.generated_at,
"evaluated_at": None,
**partition_values(rec.generated_at, extra),
}
table = pa.Table.from_pylist([row], schema=PREDICTION_VS_OUTCOME_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("prediction_vs_outcome", rec.generated_at, extra)
_put_lakehouse_object(client, "prediction_vs_outcome", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published prediction_vs_outcome fact for %s: %s", rec.ticker, ref)
return ref
def _parse_horizon_days(time_horizon: str) -> int:
"""Extract the max horizon days from a time_horizon string.
Examples:
"swing_1d_10d" -> 10
"position_10d_30d" -> 30
"scalp_intraday" -> 1
"" -> 0
"""
if not time_horizon:
return 0
if "intraday" in time_horizon:
return 1
numbers = re.findall(r"(\d+)", time_horizon)
if numbers:
return max(int(n) for n in numbers)
return 0
def publish_recommendation_facts(
client: Minio,
rec: Recommendation,
trend_direction: str = "",
trend_strength: float = 0.0,
contradiction_score: float = 0.0,
dominant_catalysts: str = "",
material_risks: str = "",
) -> dict[str, str]:
"""Publish all analytical facts for a recommendation.
Writes both trade_signals and prediction_vs_outcome facts.
Returns a dict mapping table name to s3:// URI.
"""
refs: dict[str, str] = {}
refs["trade_signals"] = publish_trade_signal(
client, rec, trend_direction, trend_strength,
contradiction_score, dominant_catalysts, material_risks,
)
refs["prediction_vs_outcome"] = publish_prediction_fact(
client, rec, trend_direction, trend_strength,
)
return refs
# --- trade_orders fact table ---
TRADE_ORDERS_SCHEMA = pa.schema([
("order_id", pa.string()),
("recommendation_id", pa.string()),
("ticker", pa.string()),
("side", pa.string()),
("order_type", pa.string()),
("quantity", pa.float64()),
("limit_price", pa.float64()),
("status", pa.string()),
("execution_mode", pa.string()),
("broker_account", pa.string()),
("submitted_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def build_trade_order_row(
order_id: str,
ticker: str,
side: str,
order_type: str,
quantity: float,
limit_price: float | None,
status: str,
broker_account: str,
submitted_at: datetime,
recommendation_id: str = "",
execution_mode: str = "paper",
) -> dict[str, object]:
"""Build a single trade_orders fact row."""
return {
"order_id": order_id,
"recommendation_id": recommendation_id,
"ticker": ticker,
"side": side,
"order_type": order_type,
"quantity": quantity,
"limit_price": limit_price,
"status": status,
"execution_mode": execution_mode,
"broker_account": broker_account,
"submitted_at": submitted_at,
**partition_values(submitted_at),
}
def publish_trade_order(
client: Minio,
order_id: str,
ticker: str,
side: str,
order_type: str,
quantity: float,
limit_price: float | None,
status: str,
broker_account: str,
submitted_at: datetime,
recommendation_id: str = "",
execution_mode: str = "paper",
) -> str:
"""Publish a single order as a trade_orders fact to MinIO.
Returns the s3:// URI of the written object.
Requirements: 9.4, 9.5
Design ref: Section 7 (lake.trade_orders)
"""
row = build_trade_order_row(
order_id, ticker, side, order_type, quantity,
limit_price, status, broker_account, submitted_at,
recommendation_id, execution_mode,
)
table = pa.Table.from_pylist([row], schema=TRADE_ORDERS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("trade_orders", submitted_at)
_put_lakehouse_object(client, "trade_orders", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published trade_order fact for %s: %s", ticker, ref)
return ref
# --- trade_fills fact table ---
TRADE_FILLS_SCHEMA = pa.schema([
("fill_id", pa.string()),
("order_id", pa.string()),
("ticker", pa.string()),
("side", pa.string()),
("fill_price", pa.float64()),
("fill_quantity", pa.float64()),
("commission", pa.float64()),
("broker_account", pa.string()),
("filled_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def build_trade_fill_row(
fill_id: str,
order_id: str,
ticker: str,
side: str,
fill_price: float,
fill_quantity: float,
broker_account: str,
filled_at: datetime,
commission: float = 0.0,
) -> dict[str, object]:
"""Build a single trade_fills fact row."""
return {
"fill_id": fill_id,
"order_id": order_id,
"ticker": ticker,
"side": side,
"fill_price": fill_price,
"fill_quantity": fill_quantity,
"commission": commission,
"broker_account": broker_account,
"filled_at": filled_at,
**partition_values(filled_at),
}
def publish_trade_fill(
client: Minio,
fill_id: str,
order_id: str,
ticker: str,
side: str,
fill_price: float,
fill_quantity: float,
broker_account: str,
filled_at: datetime,
commission: float = 0.0,
) -> str:
"""Publish a single fill as a trade_fills fact to MinIO.
Returns the s3:// URI of the written object.
Requirements: 9.4, 9.5
Design ref: Section 7 (lake.trade_fills)
"""
row = build_trade_fill_row(
fill_id, order_id, ticker, side,
fill_price, fill_quantity, broker_account, filled_at,
commission,
)
table = pa.Table.from_pylist([row], schema=TRADE_FILLS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("trade_fills", filled_at)
_put_lakehouse_object(client, "trade_fills", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published trade_fill fact for %s: %s", ticker, ref)
return ref
# --- positions_daily fact table ---
POSITIONS_DAILY_SCHEMA = pa.schema([
("ticker", pa.string()),
("quantity", pa.float64()),
("avg_entry_price", pa.float64()),
("close_price", pa.float64()),
("market_value", pa.float64()),
("unrealized_pnl", pa.float64()),
("broker_account", pa.string()),
("execution_mode", pa.string()),
("snapshot_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def build_position_daily_row(
ticker: str,
quantity: float,
avg_entry_price: float,
close_price: float,
unrealized_pnl: float,
broker_account: str,
snapshot_at: datetime,
market_value: float = 0.0,
execution_mode: str = "paper",
) -> dict[str, object]:
"""Build a single positions_daily fact row."""
return {
"ticker": ticker,
"quantity": quantity,
"avg_entry_price": avg_entry_price,
"close_price": close_price,
"market_value": market_value,
"unrealized_pnl": unrealized_pnl,
"broker_account": broker_account,
"execution_mode": execution_mode,
"snapshot_at": snapshot_at,
**partition_values(snapshot_at),
}
def publish_position_daily(
client: Minio,
ticker: str,
quantity: float,
avg_entry_price: float,
close_price: float,
unrealized_pnl: float,
broker_account: str,
snapshot_at: datetime,
) -> str:
"""Publish a single position snapshot as a positions_daily fact to MinIO.
Returns the s3:// URI of the written object.
Requirements: 9.4, 9.5
Design ref: Section 7 (lake.positions_daily)
"""
row = build_position_daily_row(
ticker, quantity, avg_entry_price, close_price,
unrealized_pnl, broker_account, snapshot_at,
)
table = pa.Table.from_pylist([row], schema=POSITIONS_DAILY_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("positions_daily", snapshot_at)
_put_lakehouse_object(client, "positions_daily", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published positions_daily fact for %s: %s", ticker, ref)
return ref
def publish_positions_daily_batch(
client: Minio,
positions: list[dict],
broker_account: str,
snapshot_at: datetime,
) -> str:
"""Publish a batch of position snapshots as a single Parquet file.
Each dict in positions should have: ticker, quantity, avg_entry_price,
close_price, unrealized_pnl.
Returns the s3:// URI of the written object.
"""
rows = [
build_position_daily_row(
ticker=p["ticker"],
quantity=p["quantity"],
avg_entry_price=p["avg_entry_price"],
close_price=p["close_price"],
unrealized_pnl=p["unrealized_pnl"],
broker_account=broker_account,
snapshot_at=snapshot_at,
)
for p in positions
]
if not rows:
logger.info("No positions to publish for positions_daily")
return ""
table = pa.Table.from_pylist(rows, schema=POSITIONS_DAILY_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("positions_daily", snapshot_at)
_put_lakehouse_object(client, "positions_daily", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published %d positions_daily facts: %s", len(rows), ref)
return ref
# --- pnl_daily fact table ---
PNL_DAILY_SCHEMA = pa.schema([
("ticker", pa.string()),
("realized_pnl", pa.float64()),
("unrealized_pnl", pa.float64()),
("total_pnl", pa.float64()),
("fees", pa.float64()),
("net_pnl", pa.float64()),
("broker_account", pa.string()),
("execution_mode", pa.string()),
("dt", pa.date32()),
])
def build_pnl_daily_row(
ticker: str,
realized_pnl: float,
unrealized_pnl: float,
total_pnl: float,
broker_account: str,
dt: datetime | None = None,
fees: float = 0.0,
net_pnl: float | None = None,
execution_mode: str = "paper",
) -> dict[str, object]:
"""Build a single pnl_daily fact row."""
row_dt = dt or datetime.now(timezone.utc)
return {
"ticker": ticker,
"realized_pnl": realized_pnl,
"unrealized_pnl": unrealized_pnl,
"total_pnl": total_pnl,
"fees": fees,
"net_pnl": net_pnl if net_pnl is not None else total_pnl - fees,
"broker_account": broker_account,
"execution_mode": execution_mode,
**partition_values(row_dt),
}
def publish_pnl_daily(
client: Minio,
ticker: str,
realized_pnl: float,
unrealized_pnl: float,
total_pnl: float,
broker_account: str,
dt: datetime,
fees: float = 0.0,
net_pnl: float | None = None,
execution_mode: str = "paper",
) -> str:
"""Publish a single pnl_daily fact to MinIO.
Returns the s3:// URI of the written object.
Requirements: 9.4, 9.5
Design ref: Section 7 (lake.pnl_daily)
"""
row = build_pnl_daily_row(
ticker, realized_pnl, unrealized_pnl, total_pnl,
broker_account, dt=dt, fees=fees, net_pnl=net_pnl, execution_mode=execution_mode,
)
table = pa.Table.from_pylist([row], schema=PNL_DAILY_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("pnl_daily", dt)
_put_lakehouse_object(client, "pnl_daily", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published pnl_daily fact for %s: %s", ticker, ref)
return ref
# --- market_bars publisher ---
def publish_market_bar(
client: Minio,
ticker: str,
open_price: float,
high_price: float,
low_price: float,
close_price: float,
volume: int,
bar_timestamp: datetime,
source: str,
vwap: float = 0.0,
trade_count: int = 0,
bar_interval: str = "1d",
) -> str:
"""Publish a single market bar fact to MinIO.
Requirements: 2.1, 9.4, 9.5
Design ref: Section 7 (lake.market_bars)
"""
row: dict[str, object] = {
"ticker": ticker,
"open_price": open_price,
"high_price": high_price,
"low_price": low_price,
"close_price": close_price,
"volume": volume,
"vwap": vwap,
"trade_count": trade_count,
"bar_timestamp": bar_timestamp,
"bar_interval": bar_interval,
"source": source,
**partition_values(bar_timestamp),
}
table = pa.Table.from_pylist([row], schema=MARKET_BARS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("market_bars", bar_timestamp)
_put_lakehouse_object(client, "market_bars", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published market_bar fact for %s: %s", ticker, ref)
return ref
# --- market_quotes publisher ---
def publish_market_quote(
client: Minio,
ticker: str,
bid_price: float,
ask_price: float,
last_price: float,
quote_at: datetime,
source: str,
bid_size: int = 0,
ask_size: int = 0,
last_size: int = 0,
) -> str:
"""Publish a single market quote fact to MinIO.
Requirements: 2.1, 9.4, 9.5
Design ref: Section 7 (lake.market_quotes)
"""
row: dict[str, object] = {
"ticker": ticker,
"bid_price": bid_price,
"ask_price": ask_price,
"bid_size": bid_size,
"ask_size": ask_size,
"last_price": last_price,
"last_size": last_size,
"source": source,
"quote_at": quote_at,
**partition_values(quote_at),
}
table = pa.Table.from_pylist([row], schema=MARKET_QUOTES_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("market_quotes", quote_at)
_put_lakehouse_object(client, "market_quotes", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published market_quote fact for %s: %s", ticker, ref)
return ref
# --- company_events publisher ---
def publish_company_event(
client: Minio,
event_id: str,
ticker: str,
event_type: str,
title: str,
event_at: datetime,
source: str,
event_subtype: str = "",
description: str = "",
source_url: str = "",
ingested_at: datetime | None = None,
) -> str:
"""Publish a single company event fact to MinIO.
Requirements: 2.3, 9.4, 9.5
Design ref: Section 7 (lake.company_events)
"""
row: dict[str, object] = {
"event_id": event_id,
"ticker": ticker,
"event_type": event_type,
"event_subtype": event_subtype,
"title": title,
"description": description,
"source": source,
"source_url": source_url,
"event_at": event_at,
"ingested_at": ingested_at or datetime.now(timezone.utc),
**partition_values(event_at),
}
table = pa.Table.from_pylist([row], schema=COMPANY_EVENTS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("company_events", event_at)
_put_lakehouse_object(client, "company_events", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published company_event fact for %s: %s", ticker, ref)
return ref
# --- documents publisher ---
def publish_document_fact(
client: Minio,
document_id: str,
document_type: str,
source_type: str,
ticker: str,
publisher: str,
title: str,
published_at: datetime,
content_hash: str,
url: str = "",
canonical_url: str = "",
language: str = "en",
confidence: float = 0.0,
retrieved_at: datetime | None = None,
) -> str:
"""Publish a single document metadata fact to MinIO.
Requirements: 3.1, 3.3, 9.4, 9.5
Design ref: Section 6.2, Section 7 (lake.documents)
"""
row: dict[str, object] = {
"document_id": document_id,
"document_type": document_type,
"source_type": source_type,
"ticker": ticker,
"publisher": publisher,
"title": title,
"url": url,
"canonical_url": canonical_url,
"language": language,
"published_at": published_at,
"retrieved_at": retrieved_at or datetime.now(timezone.utc),
"content_hash": content_hash,
"confidence": confidence,
**partition_values(published_at),
}
table = pa.Table.from_pylist([row], schema=DOCUMENTS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("documents", published_at)
_put_lakehouse_object(client, "documents", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published document fact for %s: %s", ticker, ref)
return ref
# --- document_extractions publisher ---
def publish_document_extraction(
client: Minio,
document_id: str,
ticker: str,
sentiment: str,
impact_score: float,
catalyst_type: str,
confidence: float,
extraction_at: datetime,
model_name: str,
prompt_version: str,
company_name: str = "",
relevance: float = 0.0,
impact_horizon: str = "",
novelty_score: float = 0.0,
source_credibility: float = 0.0,
key_facts: str = "",
risks: str = "",
macro_themes: str = "",
schema_version: str = "",
) -> str:
"""Publish a single document extraction fact to MinIO.
Requirements: 5.3, 5.5, 9.4, 9.5
Design ref: Section 6.3, Section 7 (lake.document_extractions)
"""
model_ver = schema_version or prompt_version
extra = {"model_version": model_ver}
row: dict[str, object] = {
"document_id": document_id,
"ticker": ticker,
"company_name": company_name,
"relevance": relevance,
"sentiment": sentiment,
"impact_score": impact_score,
"impact_horizon": impact_horizon,
"catalyst_type": catalyst_type,
"confidence": confidence,
"novelty_score": novelty_score,
"source_credibility": source_credibility,
"key_facts": key_facts,
"risks": risks,
"macro_themes": macro_themes,
"model_name": model_name,
"prompt_version": prompt_version,
"schema_version": schema_version,
"extraction_at": extraction_at,
**partition_values(extraction_at, extra),
}
table = pa.Table.from_pylist([row], schema=DOCUMENT_EXTRACTIONS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path(
"document_extractions", extraction_at,
extra_partitions=extra,
)
_put_lakehouse_object(client, "document_extractions", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published document_extraction fact for %s/%s: %s", ticker, document_id, ref)
return ref
# --- model_performance fact table ---
MODEL_PERFORMANCE_SCHEMA = pa.schema([
("document_id", pa.string()),
("ticker", pa.string()),
("model_name", pa.string()),
("prompt_version", pa.string()),
("schema_version", pa.string()),
("success", pa.bool_()),
("attempt_count", pa.int32()),
("total_duration_ms", pa.int32()),
("first_attempt_duration_ms", pa.int32()),
("final_attempt_duration_ms", pa.int32()),
("confidence", pa.float64()),
("validation_status", pa.string()),
("validation_error_count", pa.int32()),
("validation_warning_count", pa.int32()),
("retry_count", pa.int32()),
("input_token_estimate", pa.int32()),
("output_token_estimate", pa.int32()),
("company_count", pa.int32()),
("recorded_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
("model_version", pa.string()),
])
def build_model_performance_row(
document_id: str,
model_name: str,
success: bool,
total_duration_ms: int,
recorded_at: datetime,
ticker: str = "",
prompt_version: str = "",
schema_version: str = "",
attempt_count: int = 1,
first_attempt_duration_ms: int = 0,
final_attempt_duration_ms: int = 0,
confidence: float = 0.0,
validation_status: str = "unknown",
validation_error_count: int = 0,
validation_warning_count: int = 0,
retry_count: int = 0,
input_token_estimate: int = 0,
output_token_estimate: int = 0,
company_count: int = 0,
) -> dict[str, object]:
"""Build a single model_performance fact row."""
model_ver = schema_version or prompt_version or model_name
return {
"document_id": document_id,
"ticker": ticker,
"model_name": model_name,
"prompt_version": prompt_version,
"schema_version": schema_version,
"success": success,
"attempt_count": attempt_count,
"total_duration_ms": total_duration_ms,
"first_attempt_duration_ms": first_attempt_duration_ms,
"final_attempt_duration_ms": final_attempt_duration_ms,
"confidence": confidence,
"validation_status": validation_status,
"validation_error_count": validation_error_count,
"validation_warning_count": validation_warning_count,
"retry_count": retry_count,
"input_token_estimate": input_token_estimate,
"output_token_estimate": output_token_estimate,
"company_count": company_count,
"recorded_at": recorded_at,
**partition_values(recorded_at, {"model_version": model_ver}),
}
def publish_model_performance(
client: Minio,
document_id: str,
model_name: str,
success: bool,
total_duration_ms: int,
recorded_at: datetime,
ticker: str = "",
prompt_version: str = "",
schema_version: str = "",
attempt_count: int = 1,
first_attempt_duration_ms: int = 0,
final_attempt_duration_ms: int = 0,
confidence: float = 0.0,
validation_status: str = "unknown",
validation_error_count: int = 0,
validation_warning_count: int = 0,
retry_count: int = 0,
input_token_estimate: int = 0,
output_token_estimate: int = 0,
company_count: int = 0,
) -> str:
"""Publish a single model performance fact to MinIO.
Requirements: 12.1, 12.2, 9.4, 9.5
Design ref: Section 7 (lake.model_performance)
"""
row = build_model_performance_row(
document_id=document_id,
model_name=model_name,
success=success,
total_duration_ms=total_duration_ms,
recorded_at=recorded_at,
ticker=ticker,
prompt_version=prompt_version,
schema_version=schema_version,
attempt_count=attempt_count,
first_attempt_duration_ms=first_attempt_duration_ms,
final_attempt_duration_ms=final_attempt_duration_ms,
confidence=confidence,
validation_status=validation_status,
validation_error_count=validation_error_count,
validation_warning_count=validation_warning_count,
retry_count=retry_count,
input_token_estimate=input_token_estimate,
output_token_estimate=output_token_estimate,
company_count=company_count,
)
model_ver = schema_version or prompt_version or model_name
table = pa.Table.from_pylist([row], schema=MODEL_PERFORMANCE_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path(
"model_performance", recorded_at,
extra_partitions={"model_version": model_ver},
)
_put_lakehouse_object(client, "model_performance", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published model_performance fact for %s/%s: %s", model_name, document_id, ref)
return ref
# ---------------------------------------------------------------------------
# Batch publish helpers
# ---------------------------------------------------------------------------
def _publish_batch(
client: Minio,
table_name: str,
rows: list[dict[str, object]],
schema: pa.Schema,
dt: datetime,
extra_partitions: dict[str, str] | None = None,
) -> str:
"""Generic batch publisher — writes a list of row dicts as a single Parquet file.
Returns the s3:// URI of the written object, or "" if rows is empty.
"""
if not rows:
logger.info("No rows to publish for %s", table_name)
return ""
# Inject partition columns into rows that don't already have them.
pv = partition_values(dt, extra_partitions)
enriched = []
for row in rows:
merged = {**row}
for k, v in pv.items():
if k not in merged:
merged[k] = v
enriched.append(merged)
table = pa.Table.from_pylist(enriched, schema=schema)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path(table_name, dt, extra_partitions)
_pub_start = time.monotonic()
client.put_object(
LAKEHOUSE_BUCKET, path,
io.BytesIO(parquet_bytes), length=len(parquet_bytes),
content_type="application/octet-stream",
)
LAKE_PUBLISH_DURATION.labels(table_name=table_name).observe(time.monotonic() - _pub_start)
LAKE_FACTS_PUBLISHED.labels(table_name=table_name).inc(len(enriched))
LAKE_PUBLISH_BYTES.labels(table_name=table_name).inc(len(parquet_bytes))
ref = s3_uri(path)
logger.info("Published %d %s facts: %s", len(enriched), table_name, ref)
return ref
def publish_market_bars_batch(
client: Minio,
bars: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of market bar rows as a single Parquet file.
Each dict should match MARKET_BARS_SCHEMA field names.
"""
return _publish_batch(client, "market_bars", bars, MARKET_BARS_SCHEMA, dt)
def publish_market_quotes_batch(
client: Minio,
quotes: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of market quote rows as a single Parquet file."""
return _publish_batch(client, "market_quotes", quotes, MARKET_QUOTES_SCHEMA, dt)
def publish_company_events_batch(
client: Minio,
events: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of company event rows as a single Parquet file."""
return _publish_batch(client, "company_events", events, COMPANY_EVENTS_SCHEMA, dt)
def publish_documents_batch(
client: Minio,
docs: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of document metadata rows as a single Parquet file."""
return _publish_batch(client, "documents", docs, DOCUMENTS_SCHEMA, dt)
def publish_document_extractions_batch(
client: Minio,
extractions: list[dict[str, object]],
dt: datetime,
model_version: str = "",
) -> str:
"""Publish a batch of document extraction rows as a single Parquet file."""
extra = {"model_version": model_version} if model_version else None
return _publish_batch(client, "document_extractions", extractions, DOCUMENT_EXTRACTIONS_SCHEMA, dt, extra)
def publish_trade_signals_batch(
client: Minio,
signals: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of trade signal rows as a single Parquet file."""
return _publish_batch(client, "trade_signals", signals, TRADE_SIGNALS_SCHEMA, dt)
def publish_trade_orders_batch(
client: Minio,
orders: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of trade order rows as a single Parquet file."""
return _publish_batch(client, "trade_orders", orders, TRADE_ORDERS_SCHEMA, dt)
def publish_trade_fills_batch(
client: Minio,
fills: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of trade fill rows as a single Parquet file."""
return _publish_batch(client, "trade_fills", fills, TRADE_FILLS_SCHEMA, dt)
def publish_pnl_daily_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of PnL daily rows as a single Parquet file."""
return _publish_batch(client, "pnl_daily", rows, PNL_DAILY_SCHEMA, dt)
def publish_model_performance_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
model_version: str = "",
) -> str:
"""Publish a batch of model performance rows as a single Parquet file."""
extra = {"model_version": model_version} if model_version else None
return _publish_batch(client, "model_performance", rows, MODEL_PERFORMANCE_SCHEMA, dt, extra)
def publish_prediction_vs_outcome_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of prediction vs outcome rows as a single Parquet file."""
return _publish_batch(client, "prediction_vs_outcome", rows, PREDICTION_VS_OUTCOME_SCHEMA, dt)
# --- global_events fact table ---
GLOBAL_EVENTS_SCHEMA = pa.schema([
("event_id", pa.string()),
("event_types", pa.string()),
("severity", pa.string()),
("affected_regions", pa.string()),
("affected_sectors", pa.string()),
("affected_commodities", pa.string()),
("summary", pa.string()),
("estimated_duration", pa.string()),
("confidence", pa.float64()),
("source_document_id", pa.string()),
("created_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def publish_global_event_fact(
client: Minio,
event_id: str,
event_types: list[str],
severity: str,
affected_regions: list[str],
affected_sectors: list[str],
affected_commodities: list[str],
summary: str,
estimated_duration: str,
confidence: float,
source_document_id: str,
created_at: datetime,
) -> str:
"""Publish a single global event fact to MinIO.
Writes a Parquet file to:
s3://stonks-lakehouse/warehouse/global_events/dt={date}/part-{uuid}.parquet
Returns the s3:// URI of the written object.
Requirements: 7.3, 12.6
Design ref: Analytical Lake Datasets (lake.global_events)
"""
row: dict[str, object] = {
"event_id": event_id,
"event_types": ", ".join(event_types),
"severity": severity,
"affected_regions": ", ".join(affected_regions),
"affected_sectors": ", ".join(affected_sectors),
"affected_commodities": ", ".join(affected_commodities),
"summary": summary,
"estimated_duration": estimated_duration,
"confidence": confidence,
"source_document_id": source_document_id,
"created_at": created_at,
**partition_values(created_at),
}
table = pa.Table.from_pylist([row], schema=GLOBAL_EVENTS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("global_events", created_at)
_put_lakehouse_object(client, "global_events", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published global_event fact %s: %s", event_id, ref)
return ref
# --- macro_impacts fact table ---
MACRO_IMPACTS_SCHEMA = pa.schema([
("event_id", pa.string()),
("company_id", pa.string()),
("ticker", pa.string()),
("macro_impact_score", pa.float64()),
("impact_direction", pa.string()),
("contributing_factors", pa.string()),
("confidence", pa.float64()),
("computed_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def publish_macro_impact_fact(
client: Minio,
event_id: str,
company_id: str,
ticker: str,
macro_impact_score: float,
impact_direction: str,
contributing_factors: list[str],
confidence: float,
computed_at: datetime,
) -> str:
"""Publish a single macro impact fact to MinIO.
Writes a Parquet file to:
s3://stonks-lakehouse/warehouse/macro_impacts/dt={date}/ticker={ticker}/part-{uuid}.parquet
Returns the s3:// URI of the written object.
Requirements: 7.3, 12.6
Design ref: Analytical Lake Datasets (lake.macro_impacts)
"""
extra = {"ticker": ticker}
row: dict[str, object] = {
"event_id": event_id,
"company_id": company_id,
"ticker": ticker,
"macro_impact_score": macro_impact_score,
"impact_direction": impact_direction,
"contributing_factors": ", ".join(contributing_factors),
"confidence": confidence,
"computed_at": computed_at,
**partition_values(computed_at, extra),
}
table = pa.Table.from_pylist([row], schema=MACRO_IMPACTS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("macro_impacts", computed_at, extra_partitions=extra)
_put_lakehouse_object(client, "macro_impacts", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published macro_impact fact for %s/%s: %s", ticker, event_id, ref)
return ref
# --- trend_projections fact table ---
TREND_PROJECTIONS_SCHEMA = pa.schema([
("trend_window_id", pa.string()),
("ticker", pa.string()),
("projected_direction", pa.string()),
("projected_strength", pa.float64()),
("projected_confidence", pa.float64()),
("projection_horizon", pa.string()),
("driving_factors", pa.string()),
("macro_contribution_pct", pa.float64()),
("diverges_from_current", pa.bool_()),
("computed_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def publish_trend_projection_fact(
client: Minio,
trend_window_id: str,
ticker: str,
projected_direction: str,
projected_strength: float,
projected_confidence: float,
projection_horizon: str,
driving_factors: list[str],
macro_contribution_pct: float,
diverges_from_current: bool,
computed_at: datetime,
) -> str:
"""Publish a single trend projection fact to MinIO.
Writes a Parquet file to:
s3://stonks-lakehouse/warehouse/trend_projections/dt={date}/ticker={ticker}/part-{uuid}.parquet
Returns the s3:// URI of the written object.
Requirements: 7.3, 12.6
Design ref: Analytical Lake Datasets (lake.trend_projections)
"""
extra = {"ticker": ticker}
row: dict[str, object] = {
"trend_window_id": trend_window_id,
"ticker": ticker,
"projected_direction": projected_direction,
"projected_strength": projected_strength,
"projected_confidence": projected_confidence,
"projection_horizon": projection_horizon,
"driving_factors": ", ".join(driving_factors),
"macro_contribution_pct": macro_contribution_pct,
"diverges_from_current": diverges_from_current,
"computed_at": computed_at,
**partition_values(computed_at, extra),
}
table = pa.Table.from_pylist([row], schema=TREND_PROJECTIONS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("trend_projections", computed_at, extra_partitions=extra)
_put_lakehouse_object(client, "trend_projections", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published trend_projection fact for %s: %s", ticker, ref)
return ref
# --- Batch publishers for macro fact tables ---
def publish_global_events_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of global event rows as a single Parquet file."""
return _publish_batch(client, "global_events", rows, GLOBAL_EVENTS_SCHEMA, dt)
def publish_macro_impacts_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
ticker: str = "",
) -> str:
"""Publish a batch of macro impact rows as a single Parquet file."""
extra = {"ticker": ticker} if ticker else None
return _publish_batch(client, "macro_impacts", rows, MACRO_IMPACTS_SCHEMA, dt, extra)
def publish_trend_projections_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
ticker: str = "",
) -> str:
"""Publish a batch of trend projection rows as a single Parquet file."""
extra = {"ticker": ticker} if ticker else None
return _publish_batch(client, "trend_projections", rows, TREND_PROJECTIONS_SCHEMA, dt, extra)
# --- competitor_relationships fact table ---
COMPETITOR_RELATIONSHIPS_SCHEMA = pa.schema([
("id", pa.string()),
("company_a_id", pa.string()),
("company_b_id", pa.string()),
("relationship_type", pa.string()),
("strength", pa.float64()),
("bidirectional", pa.bool_()),
("source", pa.string()),
("active", pa.bool_()),
("created_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def publish_competitor_relationship_fact(
client: Minio,
relationship_id: str,
company_a_id: str,
company_b_id: str,
relationship_type: str,
strength: float,
bidirectional: bool,
source: str,
active: bool,
created_at: datetime,
) -> str:
"""Publish a single competitor relationship fact to MinIO.
Writes a Parquet file to:
s3://stonks-lakehouse/warehouse/competitor_relationships/dt={date}/part-{uuid}.parquet
Returns the s3:// URI of the written object.
Requirements: 7.3
Design ref: Analytical Lake Datasets (lake.competitor_relationships)
"""
row: dict[str, object] = {
"id": relationship_id,
"company_a_id": company_a_id,
"company_b_id": company_b_id,
"relationship_type": relationship_type,
"strength": strength,
"bidirectional": bidirectional,
"source": source,
"active": active,
"created_at": created_at,
**partition_values(created_at),
}
table = pa.Table.from_pylist([row], schema=COMPETITOR_RELATIONSHIPS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("competitor_relationships", created_at)
_put_lakehouse_object(client, "competitor_relationships", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published competitor_relationship fact %s: %s", relationship_id, ref)
return ref
def publish_competitor_relationships_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
) -> str:
"""Publish a batch of competitor relationship rows as a single Parquet file."""
return _publish_batch(client, "competitor_relationships", rows, COMPETITOR_RELATIONSHIPS_SCHEMA, dt)
# --- competitive_signals fact table ---
COMPETITIVE_SIGNALS_SCHEMA = pa.schema([
("id", pa.string()),
("source_document_id", pa.string()),
("source_ticker", pa.string()),
("target_ticker", pa.string()),
("catalyst_type", pa.string()),
("pattern_confidence", pa.float64()),
("signal_direction", pa.string()),
("signal_strength", pa.float64()),
("relationship_strength", pa.float64()),
("computed_at", pa.timestamp("us", tz="UTC")),
("dt", pa.date32()),
])
def publish_competitive_signal_fact(
client: Minio,
signal_id: str,
source_document_id: str,
source_ticker: str,
target_ticker: str,
catalyst_type: str,
pattern_confidence: float,
signal_direction: str,
signal_strength: float,
relationship_strength: float,
computed_at: datetime,
) -> str:
"""Publish a single competitive signal fact to MinIO.
Writes a Parquet file to:
s3://stonks-lakehouse/warehouse/competitive_signals/dt={date}/target_ticker={ticker}/part-{uuid}.parquet
Returns the s3:// URI of the written object.
Requirements: 7.4
Design ref: Analytical Lake Datasets (lake.competitive_signals)
"""
extra = {"target_ticker": target_ticker}
row: dict[str, object] = {
"id": signal_id,
"source_document_id": source_document_id,
"source_ticker": source_ticker,
"target_ticker": target_ticker,
"catalyst_type": catalyst_type,
"pattern_confidence": pattern_confidence,
"signal_direction": signal_direction,
"signal_strength": signal_strength,
"relationship_strength": relationship_strength,
"computed_at": computed_at,
**partition_values(computed_at, extra),
}
table = pa.Table.from_pylist([row], schema=COMPETITIVE_SIGNALS_SCHEMA)
parquet_bytes = _write_parquet_bytes(table)
path = _partition_path("competitive_signals", computed_at, extra_partitions=extra)
_put_lakehouse_object(client, "competitive_signals", path, parquet_bytes)
ref = s3_uri(path)
logger.info("Published competitive_signal fact for %s%s: %s", source_ticker, target_ticker, ref)
return ref
def publish_competitive_signals_batch(
client: Minio,
rows: list[dict[str, object]],
dt: datetime,
target_ticker: str = "",
) -> str:
"""Publish a batch of competitive signal rows as a single Parquet file."""
extra = {"target_ticker": target_ticker} if target_ticker else None
return _publish_batch(client, "competitive_signals", rows, COMPETITIVE_SIGNALS_SCHEMA, dt, extra)