"""Lake publisher async job runner — transforms operational data into analytical facts. Reads jobs from the QUEUE_LAKE_PUBLISH Redis queue, queries PostgreSQL for operational records, and publishes them as partitioned Parquet files to MinIO via the existing publish_* functions in worker.py. Job message format: {"job_type": "", "entity_id": "", "dt": "2026-04-11T..."} Supported job types: - document: publish a single document metadata fact - document_extraction: publish extraction facts for a document - market_snapshot: publish market bars/quotes from a snapshot - trade_order: publish an order fact - trade_fill: publish fill facts for an order - positions_snapshot: publish daily position snapshots for a broker account - pnl_snapshot: publish daily PnL for a broker account - company_event: publish a company event fact - bulk_documents: publish all unpublished documents since a cutoff - bulk_extractions: publish all unpublished extractions since a cutoff Requirements: 9.4, 9.5, 10.1 Design ref: Section 4.10 (Lake Publisher), Section 8.4 (Lake publication flow) """ from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone import asyncpg import redis.asyncio as aioredis from minio import Minio from services.lake_publisher.partitions import partition_values from services.lake_publisher.worker import ( publish_competitive_signal_fact, publish_competitor_relationship_fact, publish_document_extraction, publish_document_extractions_batch, publish_document_fact, publish_documents_batch, publish_global_event_fact, publish_macro_impact_fact, publish_market_bar, publish_market_quote, publish_pnl_daily, publish_positions_daily_batch, publish_trade_fill, publish_trade_order, publish_trend_projection_fact, ) from services.shared.config import load_config from services.shared.db import get_minio, get_pg_pool, get_redis from services.shared.logging import setup_logging from services.shared.redis_keys import QUEUE_LAKE_PUBLISH, is_pipeline_enabled, queue_key logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # SQL queries for fetching operational data # --------------------------------------------------------------------------- _FETCH_DOCUMENT = """ SELECT d.id, d.document_type, d.source_type, d.publisher, d.title, d.url, d.canonical_url, d.language, d.published_at, d.retrieved_at, d.content_hash, d.parse_quality_score, COALESCE( (SELECT dcm.ticker FROM document_company_mentions dcm WHERE dcm.document_id = d.id LIMIT 1), '' ) AS ticker FROM documents d WHERE d.id = $1::uuid """ _FETCH_EXTRACTIONS = """ SELECT di.document_id, dir.ticker, dir.relevance, dir.sentiment, dir.impact_score, dir.impact_horizon, dir.catalyst_type, di.confidence, di.novelty_score, di.source_credibility, dir.key_facts, dir.risks, di.macro_themes, di.model_name, di.prompt_version, di.schema_version, di.created_at AS extraction_at, COALESCE(c.legal_name, '') AS company_name FROM document_intelligence di JOIN document_impact_records dir ON dir.intelligence_id = di.id LEFT JOIN companies c ON c.id = dir.company_id WHERE di.document_id = $1::uuid AND di.validation_status = 'valid' """ _FETCH_MARKET_SNAPSHOT = """ SELECT ms.ticker, ms.snapshot_type, ms.data, ms.source_provider, ms.captured_at FROM market_snapshots ms WHERE ms.id = $1::uuid """ _FETCH_ORDER = """ SELECT o.id, o.recommendation_id, o.ticker, o.side, o.order_type, o.quantity, o.limit_price, o.status, o.submitted_at, o.fill_price, o.fill_quantity, o.filled_at, COALESCE(ba.account_id, '') AS broker_account, COALESCE(ba.mode, 'paper') AS execution_mode FROM orders o LEFT JOIN broker_accounts ba ON ba.id = o.broker_account_id WHERE o.id = $1::uuid """ _FETCH_ORDER_FILLS = """ SELECT oe.id AS fill_id, oe.order_id, oe.data, oe.broker_timestamp, o.ticker, o.side, COALESCE(ba.account_id, '') AS broker_account FROM order_events oe JOIN orders o ON o.id = oe.order_id LEFT JOIN broker_accounts ba ON ba.id = o.broker_account_id WHERE oe.order_id = $1::uuid AND oe.event_type = 'fill' """ _FETCH_POSITIONS = """ SELECT p.ticker, p.quantity, p.avg_entry_price, p.current_price, p.unrealized_pnl, p.realized_pnl, COALESCE(ba.account_id, '') AS broker_account, COALESCE(ba.mode, 'paper') AS execution_mode FROM positions p LEFT JOIN broker_accounts ba ON ba.id = p.broker_account_id WHERE p.broker_account_id = $1::uuid AND p.quantity != 0 """ _FETCH_BULK_DOCUMENTS = """ SELECT d.id, d.document_type, d.source_type, d.publisher, d.title, d.url, d.canonical_url, d.language, d.published_at, d.retrieved_at, d.content_hash, d.parse_quality_score, COALESCE( (SELECT dcm.ticker FROM document_company_mentions dcm WHERE dcm.document_id = d.id LIMIT 1), '' ) AS ticker FROM documents d WHERE d.created_at >= $1 AND d.status IN ('parsed', 'extracted') ORDER BY d.created_at LIMIT 500 """ _FETCH_BULK_EXTRACTIONS = """ SELECT di.document_id, dir.ticker, dir.relevance, dir.sentiment, dir.impact_score, dir.impact_horizon, dir.catalyst_type, di.confidence, di.novelty_score, di.source_credibility, dir.key_facts, dir.risks, di.macro_themes, di.model_name, di.prompt_version, di.schema_version, di.created_at AS extraction_at, COALESCE(c.legal_name, '') AS company_name FROM document_intelligence di JOIN document_impact_records dir ON dir.intelligence_id = di.id LEFT JOIN companies c ON c.id = dir.company_id WHERE di.created_at >= $1 AND di.validation_status = 'valid' ORDER BY di.created_at LIMIT 500 """ _FETCH_GLOBAL_EVENT = """ SELECT ge.id, ge.event_types, ge.severity, ge.affected_regions, ge.affected_sectors, ge.affected_commodities, ge.summary, ge.estimated_duration, ge.confidence, ge.source_document_id, ge.created_at FROM global_events ge WHERE ge.id = $1::uuid """ _FETCH_MACRO_IMPACTS_FOR_EVENT = """ SELECT mir.event_id, mir.company_id, mir.ticker, mir.macro_impact_score, mir.impact_direction, mir.contributing_factors, mir.confidence, mir.computed_at FROM macro_impact_records mir WHERE mir.event_id = $1::uuid """ _FETCH_TREND_PROJECTION = """ SELECT tp.id, tp.trend_window_id, tp.projected_direction, tp.projected_strength, tp.projected_confidence, tp.projection_horizon, tp.driving_factors, tp.macro_contribution_pct, tp.diverges_from_current, tp.computed_at, tw.ticker FROM trend_projections tp JOIN trend_windows tw ON tw.id = tp.trend_window_id WHERE tp.trend_window_id = $1::uuid """ _FETCH_COMPETITOR_RELATIONSHIP = """ SELECT cr.id, cr.company_a_id, cr.company_b_id, cr.relationship_type, cr.strength, cr.bidirectional, cr.source, cr.active, cr.created_at FROM competitor_relationships cr WHERE cr.id = $1::uuid """ _FETCH_COMPETITIVE_SIGNALS_FOR_DOCUMENT = """ SELECT csr.id, csr.source_document_id, csr.source_ticker, csr.target_ticker, csr.catalyst_type, csr.pattern_confidence, csr.signal_direction, csr.signal_strength, csr.relationship_strength, csr.computed_at FROM competitive_signal_records csr WHERE csr.source_document_id = $1::uuid """ # --------------------------------------------------------------------------- # Job handlers — each transforms operational rows into lake facts # --------------------------------------------------------------------------- def _jsonb_to_str(val: object) -> str: """Convert a JSONB column value (list or str) to a comma-separated string.""" if val is None: return "" if isinstance(val, str): try: parsed = json.loads(val) if isinstance(parsed, list): return ", ".join(str(x) for x in parsed) return val except (json.JSONDecodeError, TypeError): return val if isinstance(val, list): return ", ".join(str(x) for x in val) return str(val) async def publish_document_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish a single document metadata fact from PostgreSQL to the lake.""" row = await pool.fetchrow(_FETCH_DOCUMENT, entity_id) if row is None: logger.warning("Document %s not found, skipping lake publish", entity_id) return "" published_at = row["published_at"] or row["retrieved_at"] return publish_document_fact( client=minio_client, document_id=str(row["id"]), document_type=row["document_type"], source_type=row["source_type"], ticker=row["ticker"] or "", publisher=row["publisher"] or "", title=row["title"] or "", published_at=published_at, content_hash=row["content_hash"], url=row["url"] or "", canonical_url=row["canonical_url"] or "", language=row["language"] or "en", confidence=float(row["parse_quality_score"] or 0.0), retrieved_at=row["retrieved_at"], ) async def publish_extraction_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish document extraction facts for a document from PostgreSQL to the lake.""" rows = await pool.fetch(_FETCH_EXTRACTIONS, entity_id) if not rows: logger.info("No valid extractions for document %s", entity_id) return [] refs: list[str] = [] for row in rows: ref = publish_document_extraction( client=minio_client, document_id=str(row["document_id"]), ticker=row["ticker"], sentiment=row["sentiment"] or "neutral", impact_score=float(row["impact_score"] or 0.0), catalyst_type=row["catalyst_type"] or "other", confidence=float(row["confidence"] or 0.0), extraction_at=row["extraction_at"], model_name=row["model_name"] or "", prompt_version=row["prompt_version"] or "", company_name=row["company_name"] or "", relevance=float(row["relevance"] or 0.0), impact_horizon=row["impact_horizon"] or "", novelty_score=float(row["novelty_score"] or 0.0), source_credibility=float(row["source_credibility"] or 0.0), key_facts=_jsonb_to_str(row["key_facts"]), risks=_jsonb_to_str(row["risks"]), macro_themes=_jsonb_to_str(row["macro_themes"]), schema_version=row["schema_version"] or "", ) refs.append(ref) return refs async def publish_market_snapshot_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish market bar/quote facts from a market_snapshots row.""" row = await pool.fetchrow(_FETCH_MARKET_SNAPSHOT, entity_id) if row is None: logger.warning("Market snapshot %s not found", entity_id) return [] ticker = row["ticker"] data = row["data"] if isinstance(row["data"], dict) else json.loads(row["data"]) source = row["source_provider"] or "" captured_at = row["captured_at"] snapshot_type = row["snapshot_type"] refs: list[str] = [] if snapshot_type == "bar" or snapshot_type == "bars": # Single bar or list of bars bars = data.get("bars", [data]) if "bars" in data else [data] for bar in bars: ref = publish_market_bar( client=minio_client, ticker=ticker, open_price=float(bar.get("open", bar.get("o", 0))), high_price=float(bar.get("high", bar.get("h", 0))), low_price=float(bar.get("low", bar.get("l", 0))), close_price=float(bar.get("close", bar.get("c", 0))), volume=int(bar.get("volume", bar.get("v", 0))), bar_timestamp=captured_at, source=source, vwap=float(bar.get("vwap", bar.get("vw", 0))), trade_count=int(bar.get("trade_count", bar.get("n", 0))), bar_interval=bar.get("interval", "1d"), ) refs.append(ref) elif snapshot_type == "quote" or snapshot_type == "quotes": ref = publish_market_quote( client=minio_client, ticker=ticker, bid_price=float(data.get("bid_price", data.get("bp", 0))), ask_price=float(data.get("ask_price", data.get("ap", 0))), last_price=float(data.get("last_price", data.get("lp", 0))), quote_at=captured_at, source=source, bid_size=int(data.get("bid_size", data.get("bs", 0))), ask_size=int(data.get("ask_size", data.get("as", 0))), last_size=int(data.get("last_size", data.get("ls", 0))), ) refs.append(ref) return refs async def publish_order_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish a trade order fact from PostgreSQL to the lake.""" row = await pool.fetchrow(_FETCH_ORDER, entity_id) if row is None: logger.warning("Order %s not found", entity_id) return "" submitted_at = row["submitted_at"] or datetime.now(timezone.utc) return publish_trade_order( client=minio_client, order_id=str(row["id"]), ticker=row["ticker"], side=row["side"], order_type=row["order_type"], quantity=float(row["quantity"]), limit_price=float(row["limit_price"]) if row["limit_price"] else None, status=row["status"], broker_account=row["broker_account"], submitted_at=submitted_at, recommendation_id=str(row["recommendation_id"]) if row["recommendation_id"] else "", execution_mode=row["execution_mode"], ) async def publish_fills_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish trade fill facts for an order from PostgreSQL to the lake.""" rows = await pool.fetch(_FETCH_ORDER_FILLS, entity_id) if not rows: logger.info("No fill events for order %s", entity_id) return [] refs: list[str] = [] for row in rows: data = row["data"] if isinstance(row["data"], dict) else json.loads(row["data"] or "{}") filled_at = row["broker_timestamp"] or datetime.now(timezone.utc) ref = publish_trade_fill( client=minio_client, fill_id=str(row["fill_id"]), order_id=str(row["order_id"]), ticker=row["ticker"], side=row["side"], fill_price=float(data.get("fill_price", data.get("price", 0))), fill_quantity=float(data.get("fill_quantity", data.get("qty", 0))), broker_account=row["broker_account"], filled_at=filled_at, commission=float(data.get("commission", 0)), ) refs.append(ref) return refs async def publish_positions_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish daily position snapshots for a broker account.""" rows = await pool.fetch(_FETCH_POSITIONS, entity_id) if not rows: logger.info("No open positions for account %s", entity_id) return "" snapshot_at = datetime.now(timezone.utc) positions = [ { "ticker": row["ticker"], "quantity": float(row["quantity"]), "avg_entry_price": float(row["avg_entry_price"] or 0), "close_price": float(row["current_price"] or 0), "unrealized_pnl": float(row["unrealized_pnl"] or 0), } for row in rows ] broker_account = rows[0]["broker_account"] if rows else "" return publish_positions_daily_batch( client=minio_client, positions=positions, broker_account=broker_account, snapshot_at=snapshot_at, ) async def publish_pnl_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish daily PnL facts for a broker account's positions.""" rows = await pool.fetch(_FETCH_POSITIONS, entity_id) if not rows: logger.info("No positions for PnL snapshot, account %s", entity_id) return [] now = datetime.now(timezone.utc) refs: list[str] = [] for row in rows: realized = float(row["realized_pnl"] or 0) unrealized = float(row["unrealized_pnl"] or 0) total = realized + unrealized ref = publish_pnl_daily( client=minio_client, ticker=row["ticker"], realized_pnl=realized, unrealized_pnl=unrealized, total_pnl=total, broker_account=row["broker_account"], dt=now, execution_mode=row["execution_mode"], ) refs.append(ref) return refs async def publish_bulk_documents_job( pool: asyncpg.Pool, minio_client: Minio, since: datetime, ) -> list[str]: """Publish all documents created since a cutoff as a batch.""" rows = await pool.fetch(_FETCH_BULK_DOCUMENTS, since) if not rows: logger.info("No documents to bulk-publish since %s", since) return [] doc_rows: list[dict[str, object]] = [] for row in rows: published_at = row["published_at"] or row["retrieved_at"] doc_rows.append({ "document_id": str(row["id"]), "document_type": row["document_type"], "source_type": row["source_type"], "ticker": row["ticker"] or "", "publisher": row["publisher"] or "", "title": row["title"] or "", "url": row["url"] or "", "canonical_url": row["canonical_url"] or "", "language": row["language"] or "en", "published_at": published_at, "retrieved_at": row["retrieved_at"], "content_hash": row["content_hash"], "confidence": float(row["parse_quality_score"] or 0.0), **partition_values(published_at), }) ref = publish_documents_batch(minio_client, doc_rows, since) return [ref] if ref else [] async def publish_bulk_extractions_job( pool: asyncpg.Pool, minio_client: Minio, since: datetime, ) -> list[str]: """Publish all extractions created since a cutoff as a batch.""" rows = await pool.fetch(_FETCH_BULK_EXTRACTIONS, since) if not rows: logger.info("No extractions to bulk-publish since %s", since) return [] extraction_rows: list[dict[str, object]] = [] for row in rows: model_ver = row["schema_version"] or row["prompt_version"] or "" extraction_rows.append({ "document_id": str(row["document_id"]), "ticker": row["ticker"], "company_name": row["company_name"] or "", "relevance": float(row["relevance"] or 0.0), "sentiment": row["sentiment"] or "neutral", "impact_score": float(row["impact_score"] or 0.0), "impact_horizon": row["impact_horizon"] or "", "catalyst_type": row["catalyst_type"] or "other", "confidence": float(row["confidence"] or 0.0), "novelty_score": float(row["novelty_score"] or 0.0), "source_credibility": float(row["source_credibility"] or 0.0), "key_facts": _jsonb_to_str(row["key_facts"]), "risks": _jsonb_to_str(row["risks"]), "macro_themes": _jsonb_to_str(row["macro_themes"]), "model_name": row["model_name"] or "", "prompt_version": row["prompt_version"] or "", "schema_version": row["schema_version"] or "", "extraction_at": row["extraction_at"], **partition_values(row["extraction_at"], {"model_version": model_ver}), }) model_ver = extraction_rows[0].get("model_version", "") if extraction_rows else "" ref = publish_document_extractions_batch( minio_client, extraction_rows, since, model_version=str(model_ver), ) return [ref] if ref else [] async def publish_global_event_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish a global event fact from PostgreSQL to the lake.""" row = await pool.fetchrow(_FETCH_GLOBAL_EVENT, entity_id) if row is None: logger.warning("Global event %s not found, skipping lake publish", entity_id) return "" event_types = row["event_types"] or [] affected_regions = row["affected_regions"] or [] affected_sectors = row["affected_sectors"] or [] affected_commodities = row["affected_commodities"] or [] return publish_global_event_fact( client=minio_client, event_id=str(row["id"]), event_types=list(event_types), severity=row["severity"] or "low", affected_regions=list(affected_regions), affected_sectors=list(affected_sectors), affected_commodities=list(affected_commodities), summary=row["summary"] or "", estimated_duration=row["estimated_duration"] or "short_term", confidence=float(row["confidence"] or 0.0), source_document_id=str(row["source_document_id"]) if row["source_document_id"] else "", created_at=row["created_at"], ) async def publish_macro_impacts_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish macro impact facts for a global event from PostgreSQL to the lake.""" rows = await pool.fetch(_FETCH_MACRO_IMPACTS_FOR_EVENT, entity_id) if not rows: logger.info("No macro impact records for event %s", entity_id) return [] refs: list[str] = [] for row in rows: factors = row["contributing_factors"] if isinstance(factors, str): try: factors = json.loads(factors) except (json.JSONDecodeError, TypeError): factors = [factors] if factors else [] elif factors is None: factors = [] ref = publish_macro_impact_fact( client=minio_client, event_id=str(row["event_id"]), company_id=str(row["company_id"]), ticker=row["ticker"], macro_impact_score=float(row["macro_impact_score"] or 0.0), impact_direction=row["impact_direction"] or "neutral", contributing_factors=list(factors), confidence=float(row["confidence"] or 0.0), computed_at=row["computed_at"], ) refs.append(ref) return refs async def publish_trend_projection_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish a trend projection fact from PostgreSQL to the lake.""" row = await pool.fetchrow(_FETCH_TREND_PROJECTION, entity_id) if row is None: logger.warning("Trend projection for window %s not found", entity_id) return "" factors = row["driving_factors"] if isinstance(factors, str): try: factors = json.loads(factors) except (json.JSONDecodeError, TypeError): factors = [factors] if factors else [] elif factors is None: factors = [] return publish_trend_projection_fact( client=minio_client, trend_window_id=str(row["trend_window_id"]), ticker=row["ticker"] or "", projected_direction=row["projected_direction"] or "neutral", projected_strength=float(row["projected_strength"] or 0.0), projected_confidence=float(row["projected_confidence"] or 0.0), projection_horizon=row["projection_horizon"] or "7d", driving_factors=list(factors), macro_contribution_pct=float(row["macro_contribution_pct"] or 0.0), diverges_from_current=bool(row["diverges_from_current"]), computed_at=row["computed_at"], ) async def publish_competitor_relationship_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> str: """Publish a competitor relationship fact from PostgreSQL to the lake.""" row = await pool.fetchrow(_FETCH_COMPETITOR_RELATIONSHIP, entity_id) if row is None: logger.warning("Competitor relationship %s not found, skipping lake publish", entity_id) return "" return publish_competitor_relationship_fact( client=minio_client, relationship_id=str(row["id"]), company_a_id=str(row["company_a_id"]), company_b_id=str(row["company_b_id"]), relationship_type=row["relationship_type"], strength=float(row["strength"]), bidirectional=bool(row["bidirectional"]), source=row["source"], active=bool(row["active"]), created_at=row["created_at"], ) async def publish_competitive_signals_job( pool: asyncpg.Pool, minio_client: Minio, entity_id: str, ) -> list[str]: """Publish competitive signal facts for a document from PostgreSQL to the lake.""" rows = await pool.fetch(_FETCH_COMPETITIVE_SIGNALS_FOR_DOCUMENT, entity_id) if not rows: logger.info("No competitive signals for document %s", entity_id) return [] refs: list[str] = [] for row in rows: ref = publish_competitive_signal_fact( client=minio_client, signal_id=str(row["id"]), source_document_id=str(row["source_document_id"]), source_ticker=row["source_ticker"], target_ticker=row["target_ticker"], catalyst_type=row["catalyst_type"], pattern_confidence=float(row["pattern_confidence"]), signal_direction=row["signal_direction"], signal_strength=float(row["signal_strength"]), relationship_strength=float(row["relationship_strength"]), computed_at=row["computed_at"], ) refs.append(ref) return refs # --------------------------------------------------------------------------- # Job dispatcher # --------------------------------------------------------------------------- JOB_TYPES = { "document", "document_extraction", "market_snapshot", "trade_order", "trade_fill", "positions_snapshot", "pnl_snapshot", "company_event", "bulk_documents", "bulk_extractions", "global_event", "macro_impact", "trend_projection", "competitor_relationship", "competitive_signal", } async def dispatch_job( pool: asyncpg.Pool, minio_client: Minio, job: dict[str, str], ) -> dict[str, object]: """Dispatch a lake publish job to the appropriate handler. Args: pool: PostgreSQL connection pool. minio_client: MinIO client for writing Parquet files. job: Job dict with at least 'job_type' and 'entity_id'. Returns: A result dict with 'job_type', 'entity_id', 'refs' (list of s3 URIs), and 'error' (None on success). """ job_type = job.get("job_type", "") entity_id = job.get("entity_id", "") since_str = job.get("since") result: dict[str, object] = { "job_type": job_type, "entity_id": entity_id, "refs": [], "error": None, } try: if job_type == "document": ref = await publish_document_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "document_extraction": refs = await publish_extraction_job(pool, minio_client, entity_id) result["refs"] = refs elif job_type == "market_snapshot": refs = await publish_market_snapshot_job(pool, minio_client, entity_id) result["refs"] = refs elif job_type == "trade_order": ref = await publish_order_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "trade_fill": refs = await publish_fills_job(pool, minio_client, entity_id) result["refs"] = refs elif job_type == "positions_snapshot": ref = await publish_positions_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "pnl_snapshot": refs = await publish_pnl_job(pool, minio_client, entity_id) result["refs"] = refs elif job_type == "bulk_documents": since = datetime.fromisoformat(since_str) if since_str else datetime.now(timezone.utc) refs = await publish_bulk_documents_job(pool, minio_client, since) result["refs"] = refs elif job_type == "bulk_extractions": since = datetime.fromisoformat(since_str) if since_str else datetime.now(timezone.utc) refs = await publish_bulk_extractions_job(pool, minio_client, since) result["refs"] = refs elif job_type == "global_event": ref = await publish_global_event_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "macro_impact": refs = await publish_macro_impacts_job(pool, minio_client, entity_id) result["refs"] = refs elif job_type == "trend_projection": ref = await publish_trend_projection_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "competitor_relationship": ref = await publish_competitor_relationship_job(pool, minio_client, entity_id) result["refs"] = [ref] if ref else [] elif job_type == "competitive_signal": refs = await publish_competitive_signals_job(pool, minio_client, entity_id) result["refs"] = refs else: result["error"] = f"Unknown job_type: {job_type}" logger.warning("Unknown lake publish job type: %s", job_type) except Exception as exc: result["error"] = str(exc) logger.exception("Lake publish job failed: %s/%s", job_type, entity_id) return result # --------------------------------------------------------------------------- # Async worker loop # --------------------------------------------------------------------------- async def run_worker( pool: asyncpg.Pool, rds: aioredis.Redis, minio_client: Minio, poll_interval: float = 2.0, ) -> None: """Main worker loop — reads jobs from Redis and dispatches them. Runs indefinitely until cancelled. Each job is processed sequentially to keep MinIO write ordering predictable. """ queue = queue_key(QUEUE_LAKE_PUBLISH) logger.info("Lake publisher worker started, listening on %s", queue) while True: if not await is_pipeline_enabled(rds): await asyncio.sleep(poll_interval) continue raw = await rds.lpop(queue) # type: ignore[misc] if raw is None: await asyncio.sleep(poll_interval) continue try: job = json.loads(str(raw)) except (json.JSONDecodeError, TypeError): logger.error("Invalid lake publish job payload: %s", raw) continue result = await dispatch_job(pool, minio_client, job) refs = result.get("refs") or [] error = result.get("error") if error: logger.error( "Lake publish job %s/%s failed: %s", result["job_type"], result["entity_id"], error, ) else: ref_count = len(refs) if isinstance(refs, list) else 0 logger.info( "Lake publish job %s/%s completed: %d facts written", result["job_type"], result["entity_id"], ref_count, ) async def main() -> None: """Entry point for the lake publisher worker process.""" config = load_config() pool = await get_pg_pool(config) rds = get_redis(config) minio_client = get_minio(config) try: await run_worker(pool, rds, minio_client) finally: await pool.close() await rds.close() if __name__ == "__main__": cfg = load_config() setup_logging("lake_publisher", level=cfg.log_level, json_output=cfg.json_logs) asyncio.run(main())