Files
stonks-oracle/services/lake_publisher/jobs.py
T
2026-04-14 19:48:19 +00:00

914 lines
31 KiB
Python

"""Lake publisher async job runner — transforms operational data into analytical facts.
Reads jobs from the QUEUE_LAKE_PUBLISH Redis queue, queries PostgreSQL for
operational records, and publishes them as partitioned Parquet files to MinIO
via the existing publish_* functions in worker.py.
Job message format:
{"job_type": "<table_name>", "entity_id": "<uuid or ticker>", "dt": "2026-04-11T..."}
Supported job types:
- document: publish a single document metadata fact
- document_extraction: publish extraction facts for a document
- market_snapshot: publish market bars/quotes from a snapshot
- trade_order: publish an order fact
- trade_fill: publish fill facts for an order
- positions_snapshot: publish daily position snapshots for a broker account
- pnl_snapshot: publish daily PnL for a broker account
- company_event: publish a company event fact
- bulk_documents: publish all unpublished documents since a cutoff
- bulk_extractions: publish all unpublished extractions since a cutoff
Requirements: 9.4, 9.5, 10.1
Design ref: Section 4.10 (Lake Publisher), Section 8.4 (Lake publication flow)
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
import asyncpg
import redis.asyncio as aioredis
from minio import Minio
from services.lake_publisher.partitions import partition_values
from services.lake_publisher.worker import (
publish_competitive_signal_fact,
publish_competitor_relationship_fact,
publish_document_extraction,
publish_document_extractions_batch,
publish_document_fact,
publish_documents_batch,
publish_global_event_fact,
publish_macro_impact_fact,
publish_market_bar,
publish_market_quote,
publish_pnl_daily,
publish_positions_daily_batch,
publish_trade_fill,
publish_trade_order,
publish_trend_projection_fact,
)
from services.shared.config import load_config
from services.shared.db import get_minio, get_pg_pool, get_redis
from services.shared.logging import setup_logging
from services.shared.redis_keys import QUEUE_LAKE_PUBLISH, queue_key
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# SQL queries for fetching operational data
# ---------------------------------------------------------------------------
_FETCH_DOCUMENT = """
SELECT
d.id, d.document_type, d.source_type, d.publisher, d.title,
d.url, d.canonical_url, d.language, d.published_at, d.retrieved_at,
d.content_hash, d.parse_quality_score,
COALESCE(
(SELECT dcm.ticker FROM document_company_mentions dcm
WHERE dcm.document_id = d.id LIMIT 1),
''
) AS ticker
FROM documents d
WHERE d.id = $1::uuid
"""
_FETCH_EXTRACTIONS = """
SELECT
di.document_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
di.confidence, di.novelty_score, di.source_credibility,
dir.key_facts, dir.risks, di.macro_themes,
di.model_name, di.prompt_version, di.schema_version,
di.created_at AS extraction_at,
COALESCE(c.legal_name, '') AS company_name
FROM document_intelligence di
JOIN document_impact_records dir ON dir.intelligence_id = di.id
LEFT JOIN companies c ON c.id = dir.company_id
WHERE di.document_id = $1::uuid
AND di.validation_status = 'valid'
"""
_FETCH_MARKET_SNAPSHOT = """
SELECT
ms.ticker, ms.snapshot_type, ms.data, ms.source_provider, ms.captured_at
FROM market_snapshots ms
WHERE ms.id = $1::uuid
"""
_FETCH_ORDER = """
SELECT
o.id, o.recommendation_id, o.ticker, o.side, o.order_type,
o.quantity, o.limit_price, o.status, o.submitted_at,
o.fill_price, o.fill_quantity, o.filled_at,
COALESCE(ba.account_id, '') AS broker_account,
COALESCE(ba.mode, 'paper') AS execution_mode
FROM orders o
LEFT JOIN broker_accounts ba ON ba.id = o.broker_account_id
WHERE o.id = $1::uuid
"""
_FETCH_ORDER_FILLS = """
SELECT
oe.id AS fill_id, oe.order_id, oe.data, oe.broker_timestamp,
o.ticker, o.side,
COALESCE(ba.account_id, '') AS broker_account
FROM order_events oe
JOIN orders o ON o.id = oe.order_id
LEFT JOIN broker_accounts ba ON ba.id = o.broker_account_id
WHERE oe.order_id = $1::uuid AND oe.event_type = 'fill'
"""
_FETCH_POSITIONS = """
SELECT
p.ticker, p.quantity, p.avg_entry_price, p.current_price,
p.unrealized_pnl, p.realized_pnl,
COALESCE(ba.account_id, '') AS broker_account,
COALESCE(ba.mode, 'paper') AS execution_mode
FROM positions p
LEFT JOIN broker_accounts ba ON ba.id = p.broker_account_id
WHERE p.broker_account_id = $1::uuid AND p.quantity != 0
"""
_FETCH_BULK_DOCUMENTS = """
SELECT
d.id, d.document_type, d.source_type, d.publisher, d.title,
d.url, d.canonical_url, d.language, d.published_at, d.retrieved_at,
d.content_hash, d.parse_quality_score,
COALESCE(
(SELECT dcm.ticker FROM document_company_mentions dcm
WHERE dcm.document_id = d.id LIMIT 1),
''
) AS ticker
FROM documents d
WHERE d.created_at >= $1
AND d.status IN ('parsed', 'extracted')
ORDER BY d.created_at
LIMIT 500
"""
_FETCH_BULK_EXTRACTIONS = """
SELECT
di.document_id, dir.ticker, dir.relevance, dir.sentiment,
dir.impact_score, dir.impact_horizon, dir.catalyst_type,
di.confidence, di.novelty_score, di.source_credibility,
dir.key_facts, dir.risks, di.macro_themes,
di.model_name, di.prompt_version, di.schema_version,
di.created_at AS extraction_at,
COALESCE(c.legal_name, '') AS company_name
FROM document_intelligence di
JOIN document_impact_records dir ON dir.intelligence_id = di.id
LEFT JOIN companies c ON c.id = dir.company_id
WHERE di.created_at >= $1
AND di.validation_status = 'valid'
ORDER BY di.created_at
LIMIT 500
"""
_FETCH_GLOBAL_EVENT = """
SELECT
ge.id, ge.event_types, ge.severity, ge.affected_regions,
ge.affected_sectors, ge.affected_commodities, ge.summary,
ge.estimated_duration, ge.confidence, ge.source_document_id,
ge.created_at
FROM global_events ge
WHERE ge.id = $1::uuid
"""
_FETCH_MACRO_IMPACTS_FOR_EVENT = """
SELECT
mir.event_id, mir.company_id, mir.ticker,
mir.macro_impact_score, mir.impact_direction,
mir.contributing_factors, mir.confidence, mir.computed_at
FROM macro_impact_records mir
WHERE mir.event_id = $1::uuid
"""
_FETCH_TREND_PROJECTION = """
SELECT
tp.id, tp.trend_window_id, tp.projected_direction,
tp.projected_strength, tp.projected_confidence,
tp.projection_horizon, tp.driving_factors,
tp.macro_contribution_pct, tp.diverges_from_current,
tp.computed_at,
tw.ticker
FROM trend_projections tp
JOIN trend_windows tw ON tw.id = tp.trend_window_id
WHERE tp.trend_window_id = $1::uuid
"""
_FETCH_COMPETITOR_RELATIONSHIP = """
SELECT
cr.id, cr.company_a_id, cr.company_b_id,
cr.relationship_type, cr.strength, cr.bidirectional,
cr.source, cr.active, cr.created_at
FROM competitor_relationships cr
WHERE cr.id = $1::uuid
"""
_FETCH_COMPETITIVE_SIGNALS_FOR_DOCUMENT = """
SELECT
csr.id, csr.source_document_id, csr.source_ticker,
csr.target_ticker, csr.catalyst_type, csr.pattern_confidence,
csr.signal_direction, csr.signal_strength,
csr.relationship_strength, csr.computed_at
FROM competitive_signal_records csr
WHERE csr.source_document_id = $1::uuid
"""
# ---------------------------------------------------------------------------
# Job handlers — each transforms operational rows into lake facts
# ---------------------------------------------------------------------------
def _jsonb_to_str(val: object) -> str:
"""Convert a JSONB column value (list or str) to a comma-separated string."""
if val is None:
return ""
if isinstance(val, str):
try:
parsed = json.loads(val)
if isinstance(parsed, list):
return ", ".join(str(x) for x in parsed)
return val
except (json.JSONDecodeError, TypeError):
return val
if isinstance(val, list):
return ", ".join(str(x) for x in val)
return str(val)
async def publish_document_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish a single document metadata fact from PostgreSQL to the lake."""
row = await pool.fetchrow(_FETCH_DOCUMENT, entity_id)
if row is None:
logger.warning("Document %s not found, skipping lake publish", entity_id)
return ""
published_at = row["published_at"] or row["retrieved_at"]
return publish_document_fact(
client=minio_client,
document_id=str(row["id"]),
document_type=row["document_type"],
source_type=row["source_type"],
ticker=row["ticker"] or "",
publisher=row["publisher"] or "",
title=row["title"] or "",
published_at=published_at,
content_hash=row["content_hash"],
url=row["url"] or "",
canonical_url=row["canonical_url"] or "",
language=row["language"] or "en",
confidence=float(row["parse_quality_score"] or 0.0),
retrieved_at=row["retrieved_at"],
)
async def publish_extraction_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish document extraction facts for a document from PostgreSQL to the lake."""
rows = await pool.fetch(_FETCH_EXTRACTIONS, entity_id)
if not rows:
logger.info("No valid extractions for document %s", entity_id)
return []
refs: list[str] = []
for row in rows:
ref = publish_document_extraction(
client=minio_client,
document_id=str(row["document_id"]),
ticker=row["ticker"],
sentiment=row["sentiment"] or "neutral",
impact_score=float(row["impact_score"] or 0.0),
catalyst_type=row["catalyst_type"] or "other",
confidence=float(row["confidence"] or 0.0),
extraction_at=row["extraction_at"],
model_name=row["model_name"] or "",
prompt_version=row["prompt_version"] or "",
company_name=row["company_name"] or "",
relevance=float(row["relevance"] or 0.0),
impact_horizon=row["impact_horizon"] or "",
novelty_score=float(row["novelty_score"] or 0.0),
source_credibility=float(row["source_credibility"] or 0.0),
key_facts=_jsonb_to_str(row["key_facts"]),
risks=_jsonb_to_str(row["risks"]),
macro_themes=_jsonb_to_str(row["macro_themes"]),
schema_version=row["schema_version"] or "",
)
refs.append(ref)
return refs
async def publish_market_snapshot_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish market bar/quote facts from a market_snapshots row."""
row = await pool.fetchrow(_FETCH_MARKET_SNAPSHOT, entity_id)
if row is None:
logger.warning("Market snapshot %s not found", entity_id)
return []
ticker = row["ticker"]
data = row["data"] if isinstance(row["data"], dict) else json.loads(row["data"])
source = row["source_provider"] or ""
captured_at = row["captured_at"]
snapshot_type = row["snapshot_type"]
refs: list[str] = []
if snapshot_type == "bar" or snapshot_type == "bars":
# Single bar or list of bars
bars = data.get("bars", [data]) if "bars" in data else [data]
for bar in bars:
ref = publish_market_bar(
client=minio_client,
ticker=ticker,
open_price=float(bar.get("open", bar.get("o", 0))),
high_price=float(bar.get("high", bar.get("h", 0))),
low_price=float(bar.get("low", bar.get("l", 0))),
close_price=float(bar.get("close", bar.get("c", 0))),
volume=int(bar.get("volume", bar.get("v", 0))),
bar_timestamp=captured_at,
source=source,
vwap=float(bar.get("vwap", bar.get("vw", 0))),
trade_count=int(bar.get("trade_count", bar.get("n", 0))),
bar_interval=bar.get("interval", "1d"),
)
refs.append(ref)
elif snapshot_type == "quote" or snapshot_type == "quotes":
ref = publish_market_quote(
client=minio_client,
ticker=ticker,
bid_price=float(data.get("bid_price", data.get("bp", 0))),
ask_price=float(data.get("ask_price", data.get("ap", 0))),
last_price=float(data.get("last_price", data.get("lp", 0))),
quote_at=captured_at,
source=source,
bid_size=int(data.get("bid_size", data.get("bs", 0))),
ask_size=int(data.get("ask_size", data.get("as", 0))),
last_size=int(data.get("last_size", data.get("ls", 0))),
)
refs.append(ref)
return refs
async def publish_order_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish a trade order fact from PostgreSQL to the lake."""
row = await pool.fetchrow(_FETCH_ORDER, entity_id)
if row is None:
logger.warning("Order %s not found", entity_id)
return ""
submitted_at = row["submitted_at"] or datetime.now(timezone.utc)
return publish_trade_order(
client=minio_client,
order_id=str(row["id"]),
ticker=row["ticker"],
side=row["side"],
order_type=row["order_type"],
quantity=float(row["quantity"]),
limit_price=float(row["limit_price"]) if row["limit_price"] else None,
status=row["status"],
broker_account=row["broker_account"],
submitted_at=submitted_at,
recommendation_id=str(row["recommendation_id"]) if row["recommendation_id"] else "",
execution_mode=row["execution_mode"],
)
async def publish_fills_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish trade fill facts for an order from PostgreSQL to the lake."""
rows = await pool.fetch(_FETCH_ORDER_FILLS, entity_id)
if not rows:
logger.info("No fill events for order %s", entity_id)
return []
refs: list[str] = []
for row in rows:
data = row["data"] if isinstance(row["data"], dict) else json.loads(row["data"] or "{}")
filled_at = row["broker_timestamp"] or datetime.now(timezone.utc)
ref = publish_trade_fill(
client=minio_client,
fill_id=str(row["fill_id"]),
order_id=str(row["order_id"]),
ticker=row["ticker"],
side=row["side"],
fill_price=float(data.get("fill_price", data.get("price", 0))),
fill_quantity=float(data.get("fill_quantity", data.get("qty", 0))),
broker_account=row["broker_account"],
filled_at=filled_at,
commission=float(data.get("commission", 0)),
)
refs.append(ref)
return refs
async def publish_positions_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish daily position snapshots for a broker account."""
rows = await pool.fetch(_FETCH_POSITIONS, entity_id)
if not rows:
logger.info("No open positions for account %s", entity_id)
return ""
snapshot_at = datetime.now(timezone.utc)
positions = [
{
"ticker": row["ticker"],
"quantity": float(row["quantity"]),
"avg_entry_price": float(row["avg_entry_price"] or 0),
"close_price": float(row["current_price"] or 0),
"unrealized_pnl": float(row["unrealized_pnl"] or 0),
}
for row in rows
]
broker_account = rows[0]["broker_account"] if rows else ""
return publish_positions_daily_batch(
client=minio_client,
positions=positions,
broker_account=broker_account,
snapshot_at=snapshot_at,
)
async def publish_pnl_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish daily PnL facts for a broker account's positions."""
rows = await pool.fetch(_FETCH_POSITIONS, entity_id)
if not rows:
logger.info("No positions for PnL snapshot, account %s", entity_id)
return []
now = datetime.now(timezone.utc)
refs: list[str] = []
for row in rows:
realized = float(row["realized_pnl"] or 0)
unrealized = float(row["unrealized_pnl"] or 0)
total = realized + unrealized
ref = publish_pnl_daily(
client=minio_client,
ticker=row["ticker"],
realized_pnl=realized,
unrealized_pnl=unrealized,
total_pnl=total,
broker_account=row["broker_account"],
dt=now,
execution_mode=row["execution_mode"],
)
refs.append(ref)
return refs
async def publish_bulk_documents_job(
pool: asyncpg.Pool,
minio_client: Minio,
since: datetime,
) -> list[str]:
"""Publish all documents created since a cutoff as a batch."""
rows = await pool.fetch(_FETCH_BULK_DOCUMENTS, since)
if not rows:
logger.info("No documents to bulk-publish since %s", since)
return []
doc_rows: list[dict[str, object]] = []
for row in rows:
published_at = row["published_at"] or row["retrieved_at"]
doc_rows.append({
"document_id": str(row["id"]),
"document_type": row["document_type"],
"source_type": row["source_type"],
"ticker": row["ticker"] or "",
"publisher": row["publisher"] or "",
"title": row["title"] or "",
"url": row["url"] or "",
"canonical_url": row["canonical_url"] or "",
"language": row["language"] or "en",
"published_at": published_at,
"retrieved_at": row["retrieved_at"],
"content_hash": row["content_hash"],
"confidence": float(row["parse_quality_score"] or 0.0),
**partition_values(published_at),
})
ref = publish_documents_batch(minio_client, doc_rows, since)
return [ref] if ref else []
async def publish_bulk_extractions_job(
pool: asyncpg.Pool,
minio_client: Minio,
since: datetime,
) -> list[str]:
"""Publish all extractions created since a cutoff as a batch."""
rows = await pool.fetch(_FETCH_BULK_EXTRACTIONS, since)
if not rows:
logger.info("No extractions to bulk-publish since %s", since)
return []
extraction_rows: list[dict[str, object]] = []
for row in rows:
model_ver = row["schema_version"] or row["prompt_version"] or ""
extraction_rows.append({
"document_id": str(row["document_id"]),
"ticker": row["ticker"],
"company_name": row["company_name"] or "",
"relevance": float(row["relevance"] or 0.0),
"sentiment": row["sentiment"] or "neutral",
"impact_score": float(row["impact_score"] or 0.0),
"impact_horizon": row["impact_horizon"] or "",
"catalyst_type": row["catalyst_type"] or "other",
"confidence": float(row["confidence"] or 0.0),
"novelty_score": float(row["novelty_score"] or 0.0),
"source_credibility": float(row["source_credibility"] or 0.0),
"key_facts": _jsonb_to_str(row["key_facts"]),
"risks": _jsonb_to_str(row["risks"]),
"macro_themes": _jsonb_to_str(row["macro_themes"]),
"model_name": row["model_name"] or "",
"prompt_version": row["prompt_version"] or "",
"schema_version": row["schema_version"] or "",
"extraction_at": row["extraction_at"],
**partition_values(row["extraction_at"], {"model_version": model_ver}),
})
model_ver = extraction_rows[0].get("model_version", "") if extraction_rows else ""
ref = publish_document_extractions_batch(
minio_client, extraction_rows, since,
model_version=str(model_ver),
)
return [ref] if ref else []
async def publish_global_event_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish a global event fact from PostgreSQL to the lake."""
row = await pool.fetchrow(_FETCH_GLOBAL_EVENT, entity_id)
if row is None:
logger.warning("Global event %s not found, skipping lake publish", entity_id)
return ""
event_types = row["event_types"] or []
affected_regions = row["affected_regions"] or []
affected_sectors = row["affected_sectors"] or []
affected_commodities = row["affected_commodities"] or []
return publish_global_event_fact(
client=minio_client,
event_id=str(row["id"]),
event_types=list(event_types),
severity=row["severity"] or "low",
affected_regions=list(affected_regions),
affected_sectors=list(affected_sectors),
affected_commodities=list(affected_commodities),
summary=row["summary"] or "",
estimated_duration=row["estimated_duration"] or "short_term",
confidence=float(row["confidence"] or 0.0),
source_document_id=str(row["source_document_id"]) if row["source_document_id"] else "",
created_at=row["created_at"],
)
async def publish_macro_impacts_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish macro impact facts for a global event from PostgreSQL to the lake."""
rows = await pool.fetch(_FETCH_MACRO_IMPACTS_FOR_EVENT, entity_id)
if not rows:
logger.info("No macro impact records for event %s", entity_id)
return []
refs: list[str] = []
for row in rows:
factors = row["contributing_factors"]
if isinstance(factors, str):
try:
factors = json.loads(factors)
except (json.JSONDecodeError, TypeError):
factors = [factors] if factors else []
elif factors is None:
factors = []
ref = publish_macro_impact_fact(
client=minio_client,
event_id=str(row["event_id"]),
company_id=str(row["company_id"]),
ticker=row["ticker"],
macro_impact_score=float(row["macro_impact_score"] or 0.0),
impact_direction=row["impact_direction"] or "neutral",
contributing_factors=list(factors),
confidence=float(row["confidence"] or 0.0),
computed_at=row["computed_at"],
)
refs.append(ref)
return refs
async def publish_trend_projection_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish a trend projection fact from PostgreSQL to the lake."""
row = await pool.fetchrow(_FETCH_TREND_PROJECTION, entity_id)
if row is None:
logger.warning("Trend projection for window %s not found", entity_id)
return ""
factors = row["driving_factors"]
if isinstance(factors, str):
try:
factors = json.loads(factors)
except (json.JSONDecodeError, TypeError):
factors = [factors] if factors else []
elif factors is None:
factors = []
return publish_trend_projection_fact(
client=minio_client,
trend_window_id=str(row["trend_window_id"]),
ticker=row["ticker"] or "",
projected_direction=row["projected_direction"] or "neutral",
projected_strength=float(row["projected_strength"] or 0.0),
projected_confidence=float(row["projected_confidence"] or 0.0),
projection_horizon=row["projection_horizon"] or "7d",
driving_factors=list(factors),
macro_contribution_pct=float(row["macro_contribution_pct"] or 0.0),
diverges_from_current=bool(row["diverges_from_current"]),
computed_at=row["computed_at"],
)
async def publish_competitor_relationship_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> str:
"""Publish a competitor relationship fact from PostgreSQL to the lake."""
row = await pool.fetchrow(_FETCH_COMPETITOR_RELATIONSHIP, entity_id)
if row is None:
logger.warning("Competitor relationship %s not found, skipping lake publish", entity_id)
return ""
return publish_competitor_relationship_fact(
client=minio_client,
relationship_id=str(row["id"]),
company_a_id=str(row["company_a_id"]),
company_b_id=str(row["company_b_id"]),
relationship_type=row["relationship_type"],
strength=float(row["strength"]),
bidirectional=bool(row["bidirectional"]),
source=row["source"],
active=bool(row["active"]),
created_at=row["created_at"],
)
async def publish_competitive_signals_job(
pool: asyncpg.Pool,
minio_client: Minio,
entity_id: str,
) -> list[str]:
"""Publish competitive signal facts for a document from PostgreSQL to the lake."""
rows = await pool.fetch(_FETCH_COMPETITIVE_SIGNALS_FOR_DOCUMENT, entity_id)
if not rows:
logger.info("No competitive signals for document %s", entity_id)
return []
refs: list[str] = []
for row in rows:
ref = publish_competitive_signal_fact(
client=minio_client,
signal_id=str(row["id"]),
source_document_id=str(row["source_document_id"]),
source_ticker=row["source_ticker"],
target_ticker=row["target_ticker"],
catalyst_type=row["catalyst_type"],
pattern_confidence=float(row["pattern_confidence"]),
signal_direction=row["signal_direction"],
signal_strength=float(row["signal_strength"]),
relationship_strength=float(row["relationship_strength"]),
computed_at=row["computed_at"],
)
refs.append(ref)
return refs
# ---------------------------------------------------------------------------
# Job dispatcher
# ---------------------------------------------------------------------------
JOB_TYPES = {
"document",
"document_extraction",
"market_snapshot",
"trade_order",
"trade_fill",
"positions_snapshot",
"pnl_snapshot",
"company_event",
"bulk_documents",
"bulk_extractions",
"global_event",
"macro_impact",
"trend_projection",
"competitor_relationship",
"competitive_signal",
}
async def dispatch_job(
pool: asyncpg.Pool,
minio_client: Minio,
job: dict[str, str],
) -> dict[str, object]:
"""Dispatch a lake publish job to the appropriate handler.
Args:
pool: PostgreSQL connection pool.
minio_client: MinIO client for writing Parquet files.
job: Job dict with at least 'job_type' and 'entity_id'.
Returns:
A result dict with 'job_type', 'entity_id', 'refs' (list of s3 URIs),
and 'error' (None on success).
"""
job_type = job.get("job_type", "")
entity_id = job.get("entity_id", "")
since_str = job.get("since")
result: dict[str, object] = {
"job_type": job_type,
"entity_id": entity_id,
"refs": [],
"error": None,
}
try:
if job_type == "document":
ref = await publish_document_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "document_extraction":
refs = await publish_extraction_job(pool, minio_client, entity_id)
result["refs"] = refs
elif job_type == "market_snapshot":
refs = await publish_market_snapshot_job(pool, minio_client, entity_id)
result["refs"] = refs
elif job_type == "trade_order":
ref = await publish_order_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "trade_fill":
refs = await publish_fills_job(pool, minio_client, entity_id)
result["refs"] = refs
elif job_type == "positions_snapshot":
ref = await publish_positions_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "pnl_snapshot":
refs = await publish_pnl_job(pool, minio_client, entity_id)
result["refs"] = refs
elif job_type == "bulk_documents":
since = datetime.fromisoformat(since_str) if since_str else datetime.now(timezone.utc)
refs = await publish_bulk_documents_job(pool, minio_client, since)
result["refs"] = refs
elif job_type == "bulk_extractions":
since = datetime.fromisoformat(since_str) if since_str else datetime.now(timezone.utc)
refs = await publish_bulk_extractions_job(pool, minio_client, since)
result["refs"] = refs
elif job_type == "global_event":
ref = await publish_global_event_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "macro_impact":
refs = await publish_macro_impacts_job(pool, minio_client, entity_id)
result["refs"] = refs
elif job_type == "trend_projection":
ref = await publish_trend_projection_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "competitor_relationship":
ref = await publish_competitor_relationship_job(pool, minio_client, entity_id)
result["refs"] = [ref] if ref else []
elif job_type == "competitive_signal":
refs = await publish_competitive_signals_job(pool, minio_client, entity_id)
result["refs"] = refs
else:
result["error"] = f"Unknown job_type: {job_type}"
logger.warning("Unknown lake publish job type: %s", job_type)
except Exception as exc:
result["error"] = str(exc)
logger.exception("Lake publish job failed: %s/%s", job_type, entity_id)
return result
# ---------------------------------------------------------------------------
# Async worker loop
# ---------------------------------------------------------------------------
async def run_worker(
pool: asyncpg.Pool,
rds: aioredis.Redis,
minio_client: Minio,
poll_interval: float = 2.0,
) -> None:
"""Main worker loop — reads jobs from Redis and dispatches them.
Runs indefinitely until cancelled. Each job is processed sequentially
to keep MinIO write ordering predictable.
"""
queue = queue_key(QUEUE_LAKE_PUBLISH)
logger.info("Lake publisher worker started, listening on %s", queue)
while True:
raw = await rds.lpop(queue) # type: ignore[misc]
if raw is None:
await asyncio.sleep(poll_interval)
continue
try:
job = json.loads(str(raw))
except (json.JSONDecodeError, TypeError):
logger.error("Invalid lake publish job payload: %s", raw)
continue
result = await dispatch_job(pool, minio_client, job)
refs = result.get("refs") or []
error = result.get("error")
if error:
logger.error(
"Lake publish job %s/%s failed: %s",
result["job_type"], result["entity_id"], error,
)
else:
ref_count = len(refs) if isinstance(refs, list) else 0
logger.info(
"Lake publish job %s/%s completed: %d facts written",
result["job_type"], result["entity_id"], ref_count,
)
async def main() -> None:
"""Entry point for the lake publisher worker process."""
config = load_config()
pool = await get_pg_pool(config)
rds = get_redis(config)
minio_client = get_minio(config)
try:
await run_worker(pool, rds, minio_client)
finally:
await pool.close()
await rds.close()
if __name__ == "__main__":
cfg = load_config()
setup_logging("lake_publisher", level=cfg.log_level, json_output=cfg.json_logs)
asyncio.run(main())