Files
stonks-oracle/tests/test_lake_publisher_jobs.py
T
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

353 lines
10 KiB
Python

"""Tests for lake publisher job runner — dispatching operational data to analytical facts."""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from services.lake_publisher.jobs import (
_jsonb_to_str,
dispatch_job,
publish_document_job,
publish_extraction_job,
publish_fills_job,
publish_market_snapshot_job,
publish_order_job,
publish_pnl_job,
publish_positions_job,
)
NOW = datetime(2026, 4, 11, 14, 30, 0, tzinfo=timezone.utc)
# ---------------------------------------------------------------------------
# _jsonb_to_str
# ---------------------------------------------------------------------------
def test_jsonb_to_str_list():
assert _jsonb_to_str(["a", "b", "c"]) == "a, b, c"
def test_jsonb_to_str_json_string():
assert _jsonb_to_str('["x", "y"]') == "x, y"
def test_jsonb_to_str_plain_string():
assert _jsonb_to_str("hello") == "hello"
def test_jsonb_to_str_none():
assert _jsonb_to_str(None) == ""
# ---------------------------------------------------------------------------
# publish_document_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_document_job_found():
pool = AsyncMock()
pool.fetchrow.return_value = {
"id": "doc-uuid-1",
"document_type": "article",
"source_type": "news_api",
"publisher": "Reuters",
"title": "Test Article",
"url": "https://example.com/article",
"canonical_url": "https://example.com/article",
"language": "en",
"published_at": NOW,
"retrieved_at": NOW,
"content_hash": "abc123",
"parse_quality_score": 0.85,
"ticker": "AAPL",
}
minio_client = MagicMock()
ref = await publish_document_job(pool, minio_client, "doc-uuid-1")
assert ref.startswith("s3://stonks-lakehouse/warehouse/documents/")
assert minio_client.put_object.call_count == 1
@pytest.mark.asyncio
async def test_publish_document_job_not_found():
pool = AsyncMock()
pool.fetchrow.return_value = None
minio_client = MagicMock()
ref = await publish_document_job(pool, minio_client, "missing-uuid")
assert ref == ""
assert minio_client.put_object.call_count == 0
# ---------------------------------------------------------------------------
# publish_extraction_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_extraction_job():
pool = AsyncMock()
pool.fetch.return_value = [
{
"document_id": "doc-uuid-1",
"ticker": "AAPL",
"relevance": 0.9,
"sentiment": "positive",
"impact_score": 0.7,
"impact_horizon": "1d_30d",
"catalyst_type": "earnings",
"confidence": 0.85,
"novelty_score": 0.6,
"source_credibility": 0.8,
"key_facts": ["strong earnings"],
"risks": ["regulatory"],
"macro_themes": ["ai_capex"],
"model_name": "gpt-oss:20b",
"prompt_version": "document-intel-v2",
"schema_version": "2.0.0",
"extraction_at": NOW,
"company_name": "Apple Inc.",
},
]
minio_client = MagicMock()
refs = await publish_extraction_job(pool, minio_client, "doc-uuid-1")
assert len(refs) == 1
assert refs[0].startswith("s3://stonks-lakehouse/warehouse/document_extractions/")
assert minio_client.put_object.call_count == 1
@pytest.mark.asyncio
async def test_publish_extraction_job_empty():
pool = AsyncMock()
pool.fetch.return_value = []
minio_client = MagicMock()
refs = await publish_extraction_job(pool, minio_client, "doc-uuid-1")
assert refs == []
# ---------------------------------------------------------------------------
# publish_market_snapshot_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_market_snapshot_bar():
pool = AsyncMock()
pool.fetchrow.return_value = {
"ticker": "AAPL",
"snapshot_type": "bar",
"data": {"open": 150.0, "high": 155.0, "low": 149.0, "close": 153.0,
"volume": 1000000, "vwap": 152.0, "trade_count": 5000},
"source_provider": "polygon",
"captured_at": NOW,
}
minio_client = MagicMock()
refs = await publish_market_snapshot_job(pool, minio_client, "snap-uuid-1")
assert len(refs) == 1
assert refs[0].startswith("s3://stonks-lakehouse/warehouse/market_bars/")
@pytest.mark.asyncio
async def test_publish_market_snapshot_quote():
pool = AsyncMock()
pool.fetchrow.return_value = {
"ticker": "AAPL",
"snapshot_type": "quote",
"data": {"bid_price": 150.0, "ask_price": 150.5, "last_price": 150.25},
"source_provider": "polygon",
"captured_at": NOW,
}
minio_client = MagicMock()
refs = await publish_market_snapshot_job(pool, minio_client, "snap-uuid-1")
assert len(refs) == 1
assert refs[0].startswith("s3://stonks-lakehouse/warehouse/market_quotes/")
# ---------------------------------------------------------------------------
# publish_order_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_order_job():
pool = AsyncMock()
pool.fetchrow.return_value = {
"id": "ord-uuid-1",
"recommendation_id": "rec-uuid-1",
"ticker": "AAPL",
"side": "buy",
"order_type": "market",
"quantity": 10,
"limit_price": None,
"status": "filled",
"submitted_at": NOW,
"fill_price": 150.25,
"fill_quantity": 10,
"filled_at": NOW,
"broker_account": "acct-001",
"execution_mode": "paper",
}
minio_client = MagicMock()
ref = await publish_order_job(pool, minio_client, "ord-uuid-1")
assert ref.startswith("s3://stonks-lakehouse/warehouse/trade_orders/")
assert minio_client.put_object.call_count == 1
# ---------------------------------------------------------------------------
# publish_fills_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_fills_job():
pool = AsyncMock()
pool.fetch.return_value = [
{
"fill_id": "fill-uuid-1",
"order_id": "ord-uuid-1",
"data": {"fill_price": 150.25, "fill_quantity": 10, "commission": 0.5},
"broker_timestamp": NOW,
"ticker": "AAPL",
"side": "buy",
"broker_account": "acct-001",
},
]
minio_client = MagicMock()
refs = await publish_fills_job(pool, minio_client, "ord-uuid-1")
assert len(refs) == 1
assert refs[0].startswith("s3://stonks-lakehouse/warehouse/trade_fills/")
# ---------------------------------------------------------------------------
# publish_positions_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_positions_job():
pool = AsyncMock()
pool.fetch.return_value = [
{
"ticker": "AAPL",
"quantity": 100,
"avg_entry_price": 145.0,
"current_price": 150.0,
"unrealized_pnl": 500.0,
"realized_pnl": 0,
"broker_account": "acct-001",
"execution_mode": "paper",
},
]
minio_client = MagicMock()
ref = await publish_positions_job(pool, minio_client, "acct-uuid-1")
assert ref.startswith("s3://stonks-lakehouse/warehouse/positions_daily/")
assert minio_client.put_object.call_count == 1
# ---------------------------------------------------------------------------
# publish_pnl_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_publish_pnl_job():
pool = AsyncMock()
pool.fetch.return_value = [
{
"ticker": "AAPL",
"quantity": 100,
"avg_entry_price": 145.0,
"current_price": 150.0,
"unrealized_pnl": 500.0,
"realized_pnl": 200.0,
"broker_account": "acct-001",
"execution_mode": "paper",
},
]
minio_client = MagicMock()
refs = await publish_pnl_job(pool, minio_client, "acct-uuid-1")
assert len(refs) == 1
assert refs[0].startswith("s3://stonks-lakehouse/warehouse/pnl_daily/")
# ---------------------------------------------------------------------------
# dispatch_job
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dispatch_unknown_job_type():
pool = AsyncMock()
minio_client = MagicMock()
result = await dispatch_job(pool, minio_client, {"job_type": "unknown", "entity_id": "x"})
assert result["error"] is not None
assert "Unknown" in str(result["error"])
@pytest.mark.asyncio
async def test_dispatch_document_job():
pool = AsyncMock()
pool.fetchrow.return_value = {
"id": "doc-uuid-1",
"document_type": "article",
"source_type": "news_api",
"publisher": "Reuters",
"title": "Test",
"url": "",
"canonical_url": "",
"language": "en",
"published_at": NOW,
"retrieved_at": NOW,
"content_hash": "abc",
"parse_quality_score": 0.8,
"ticker": "AAPL",
}
minio_client = MagicMock()
result = await dispatch_job(
pool, minio_client,
{"job_type": "document", "entity_id": "doc-uuid-1"},
)
assert result["error"] is None
refs = result["refs"]
assert isinstance(refs, list)
assert len(refs) == 1
@pytest.mark.asyncio
async def test_dispatch_job_handles_exception():
pool = AsyncMock()
pool.fetchrow.side_effect = Exception("DB down")
minio_client = MagicMock()
result = await dispatch_job(
pool, minio_client,
{"job_type": "document", "entity_id": "doc-uuid-1"},
)
assert result["error"] is not None
assert "DB down" in str(result["error"])