"""Tests for lake publisher job runner — dispatching operational data to analytical facts.""" from __future__ import annotations from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock import pytest from services.lake_publisher.jobs import ( _jsonb_to_str, dispatch_job, publish_document_job, publish_extraction_job, publish_fills_job, publish_market_snapshot_job, publish_order_job, publish_pnl_job, publish_positions_job, ) NOW = datetime(2026, 4, 11, 14, 30, 0, tzinfo=timezone.utc) # --------------------------------------------------------------------------- # _jsonb_to_str # --------------------------------------------------------------------------- def test_jsonb_to_str_list(): assert _jsonb_to_str(["a", "b", "c"]) == "a, b, c" def test_jsonb_to_str_json_string(): assert _jsonb_to_str('["x", "y"]') == "x, y" def test_jsonb_to_str_plain_string(): assert _jsonb_to_str("hello") == "hello" def test_jsonb_to_str_none(): assert _jsonb_to_str(None) == "" # --------------------------------------------------------------------------- # publish_document_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_document_job_found(): pool = AsyncMock() pool.fetchrow.return_value = { "id": "doc-uuid-1", "document_type": "article", "source_type": "news_api", "publisher": "Reuters", "title": "Test Article", "url": "https://example.com/article", "canonical_url": "https://example.com/article", "language": "en", "published_at": NOW, "retrieved_at": NOW, "content_hash": "abc123", "parse_quality_score": 0.85, "ticker": "AAPL", } minio_client = MagicMock() ref = await publish_document_job(pool, minio_client, "doc-uuid-1") assert ref.startswith("s3://stonks-lakehouse/warehouse/documents/") assert minio_client.put_object.call_count == 1 @pytest.mark.asyncio async def test_publish_document_job_not_found(): pool = AsyncMock() pool.fetchrow.return_value = None minio_client = MagicMock() ref = await publish_document_job(pool, minio_client, "missing-uuid") assert ref == "" assert minio_client.put_object.call_count == 0 # --------------------------------------------------------------------------- # publish_extraction_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_extraction_job(): pool = AsyncMock() pool.fetch.return_value = [ { "document_id": "doc-uuid-1", "ticker": "AAPL", "relevance": 0.9, "sentiment": "positive", "impact_score": 0.7, "impact_horizon": "1d_30d", "catalyst_type": "earnings", "confidence": 0.85, "novelty_score": 0.6, "source_credibility": 0.8, "key_facts": ["strong earnings"], "risks": ["regulatory"], "macro_themes": ["ai_capex"], "model_name": "gpt-oss:20b", "prompt_version": "document-intel-v2", "schema_version": "2.0.0", "extraction_at": NOW, "company_name": "Apple Inc.", }, ] minio_client = MagicMock() refs = await publish_extraction_job(pool, minio_client, "doc-uuid-1") assert len(refs) == 1 assert refs[0].startswith("s3://stonks-lakehouse/warehouse/document_extractions/") assert minio_client.put_object.call_count == 1 @pytest.mark.asyncio async def test_publish_extraction_job_empty(): pool = AsyncMock() pool.fetch.return_value = [] minio_client = MagicMock() refs = await publish_extraction_job(pool, minio_client, "doc-uuid-1") assert refs == [] # --------------------------------------------------------------------------- # publish_market_snapshot_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_market_snapshot_bar(): pool = AsyncMock() pool.fetchrow.return_value = { "ticker": "AAPL", "snapshot_type": "bar", "data": {"open": 150.0, "high": 155.0, "low": 149.0, "close": 153.0, "volume": 1000000, "vwap": 152.0, "trade_count": 5000}, "source_provider": "polygon", "captured_at": NOW, } minio_client = MagicMock() refs = await publish_market_snapshot_job(pool, minio_client, "snap-uuid-1") assert len(refs) == 1 assert refs[0].startswith("s3://stonks-lakehouse/warehouse/market_bars/") @pytest.mark.asyncio async def test_publish_market_snapshot_quote(): pool = AsyncMock() pool.fetchrow.return_value = { "ticker": "AAPL", "snapshot_type": "quote", "data": {"bid_price": 150.0, "ask_price": 150.5, "last_price": 150.25}, "source_provider": "polygon", "captured_at": NOW, } minio_client = MagicMock() refs = await publish_market_snapshot_job(pool, minio_client, "snap-uuid-1") assert len(refs) == 1 assert refs[0].startswith("s3://stonks-lakehouse/warehouse/market_quotes/") # --------------------------------------------------------------------------- # publish_order_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_order_job(): pool = AsyncMock() pool.fetchrow.return_value = { "id": "ord-uuid-1", "recommendation_id": "rec-uuid-1", "ticker": "AAPL", "side": "buy", "order_type": "market", "quantity": 10, "limit_price": None, "status": "filled", "submitted_at": NOW, "fill_price": 150.25, "fill_quantity": 10, "filled_at": NOW, "broker_account": "acct-001", "execution_mode": "paper", } minio_client = MagicMock() ref = await publish_order_job(pool, minio_client, "ord-uuid-1") assert ref.startswith("s3://stonks-lakehouse/warehouse/trade_orders/") assert minio_client.put_object.call_count == 1 # --------------------------------------------------------------------------- # publish_fills_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_fills_job(): pool = AsyncMock() pool.fetch.return_value = [ { "fill_id": "fill-uuid-1", "order_id": "ord-uuid-1", "data": {"fill_price": 150.25, "fill_quantity": 10, "commission": 0.5}, "broker_timestamp": NOW, "ticker": "AAPL", "side": "buy", "broker_account": "acct-001", }, ] minio_client = MagicMock() refs = await publish_fills_job(pool, minio_client, "ord-uuid-1") assert len(refs) == 1 assert refs[0].startswith("s3://stonks-lakehouse/warehouse/trade_fills/") # --------------------------------------------------------------------------- # publish_positions_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_positions_job(): pool = AsyncMock() pool.fetch.return_value = [ { "ticker": "AAPL", "quantity": 100, "avg_entry_price": 145.0, "current_price": 150.0, "unrealized_pnl": 500.0, "realized_pnl": 0, "broker_account": "acct-001", "execution_mode": "paper", }, ] minio_client = MagicMock() ref = await publish_positions_job(pool, minio_client, "acct-uuid-1") assert ref.startswith("s3://stonks-lakehouse/warehouse/positions_daily/") assert minio_client.put_object.call_count == 1 # --------------------------------------------------------------------------- # publish_pnl_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_publish_pnl_job(): pool = AsyncMock() pool.fetch.return_value = [ { "ticker": "AAPL", "quantity": 100, "avg_entry_price": 145.0, "current_price": 150.0, "unrealized_pnl": 500.0, "realized_pnl": 200.0, "broker_account": "acct-001", "execution_mode": "paper", }, ] minio_client = MagicMock() refs = await publish_pnl_job(pool, minio_client, "acct-uuid-1") assert len(refs) == 1 assert refs[0].startswith("s3://stonks-lakehouse/warehouse/pnl_daily/") # --------------------------------------------------------------------------- # dispatch_job # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_dispatch_unknown_job_type(): pool = AsyncMock() minio_client = MagicMock() result = await dispatch_job(pool, minio_client, {"job_type": "unknown", "entity_id": "x"}) assert result["error"] is not None assert "Unknown" in str(result["error"]) @pytest.mark.asyncio async def test_dispatch_document_job(): pool = AsyncMock() pool.fetchrow.return_value = { "id": "doc-uuid-1", "document_type": "article", "source_type": "news_api", "publisher": "Reuters", "title": "Test", "url": "", "canonical_url": "", "language": "en", "published_at": NOW, "retrieved_at": NOW, "content_hash": "abc", "parse_quality_score": 0.8, "ticker": "AAPL", } minio_client = MagicMock() result = await dispatch_job( pool, minio_client, {"job_type": "document", "entity_id": "doc-uuid-1"}, ) assert result["error"] is None refs = result["refs"] assert isinstance(refs, list) assert len(refs) == 1 @pytest.mark.asyncio async def test_dispatch_job_handles_exception(): pool = AsyncMock() pool.fetchrow.side_effect = Exception("DB down") minio_client = MagicMock() result = await dispatch_job( pool, minio_client, {"job_type": "document", "entity_id": "doc-uuid-1"}, ) assert result["error"] is not None assert "DB down" in str(result["error"])