Files
stonks-oracle/tests/integration/test_signal_flow.py
T
Celes Renata 490d7a25a8 fix: update signal flow test assertions to match actual API responses
- rec['mode'] can be 'autonomous' (not just informational/paper/live)
- risk check uses 'check_name'/'result' not 'name'/'passed'
- decision type can be 'execute' not just 'act'/'skip'
2026-04-21 01:34:49 +00:00

376 lines
15 KiB
Python

"""Integration tests for cross-service signal flow contracts.
These tests validate the end-to-end data flow that the trading engine
depends on. They catch schema drift and contract violations between
services that unit tests miss.
Flow under test:
1. Symbol Registry has companies with exposure profiles and competitors
2. Query API returns trends with correct schema for trading engine consumption
3. Risk engine evaluates orders using data from query API
4. Trading engine receives valid recommendation payloads
These are the "beta gate" tests — if any fail, promotion to paper is blocked.
"""
import pytest
pytestmark = pytest.mark.asyncio
# ---------------------------------------------------------------------------
# Contract: Symbol Registry → Query API company data consistency
# ---------------------------------------------------------------------------
class TestRegistryToQueryContract:
"""Verify that company data in the registry matches what query API exposes."""
async def test_company_ids_consistent(self, registry_client, query_client, seed_ids):
"""Company IDs from registry match those returned by query API."""
reg_resp = await registry_client.get("/companies")
assert reg_resp.status_code == 200
reg_companies = {c["id"]: c["ticker"] for c in reg_resp.json()}
query_resp = await query_client.get("/api/companies")
assert query_resp.status_code == 200
query_companies = {c["id"]: c["ticker"] for c in query_resp.json()}
# Every company in registry should appear in query API
for cid, ticker in reg_companies.items():
assert cid in query_companies, (
f"Company {ticker} ({cid}) in registry but missing from query API"
)
assert query_companies[cid] == ticker
async def test_exposure_profiles_accessible(self, registry_client, seed_ids):
"""Exposure profiles required by macro signal layer are accessible."""
company_id = seed_ids["companies"]["AAPL"]
resp = await registry_client.get(f"/companies/{company_id}/exposure")
assert resp.status_code == 200
data = resp.json()
# Trading engine needs these fields for macro impact scoring
assert "geographic_revenue_mix" in data
assert "supply_chain_regions" in data
assert "key_input_commodities" in data
assert "market_position_tier" in data
assert "export_dependency_pct" in data
# Values must be valid types
assert isinstance(data["geographic_revenue_mix"], dict)
assert isinstance(data["supply_chain_regions"], list)
assert isinstance(data["export_dependency_pct"], (int, float))
assert 0 <= data["export_dependency_pct"] <= 1
async def test_competitor_relationships_bidirectional(self, registry_client, seed_ids):
"""Competitor relationships are queryable from both sides."""
aapl_id = seed_ids["companies"]["AAPL"]
msft_id = seed_ids["companies"]["MSFT"]
# Query from AAPL side
resp_a = await registry_client.get(f"/companies/{aapl_id}/competitors")
assert resp_a.status_code == 200
aapl_competitors = resp_a.json()
# Query from MSFT side
resp_b = await registry_client.get(f"/companies/{msft_id}/competitors")
assert resp_b.status_code == 200
msft_competitors = resp_b.json()
# AAPL should see MSFT and vice versa
aapl_partner_ids = set()
for rel in aapl_competitors:
if rel.get("company_a_id") == aapl_id:
aapl_partner_ids.add(rel["company_b_id"])
else:
aapl_partner_ids.add(rel["company_a_id"])
msft_partner_ids = set()
for rel in msft_competitors:
if rel.get("company_a_id") == msft_id:
msft_partner_ids.add(rel["company_b_id"])
else:
msft_partner_ids.add(rel["company_a_id"])
assert msft_id in aapl_partner_ids, "MSFT not in AAPL's competitors"
assert aapl_id in msft_partner_ids, "AAPL not in MSFT's competitors"
# ---------------------------------------------------------------------------
# Contract: Query API → Trading Engine trend data
# ---------------------------------------------------------------------------
class TestTrendToTradingContract:
"""Verify trend data has the schema the trading engine expects."""
async def test_trend_has_required_trading_fields(self, query_client, seed_ids):
"""Trends must include fields the trading engine uses for decisions."""
resp = await query_client.get("/api/trends")
assert resp.status_code == 200
trends = resp.json()
assert len(trends) >= 1
for trend in trends:
# Fields the trading engine reads
assert "id" in trend
assert "trend_direction" in trend
assert "confidence" in trend
assert "trend_strength" in trend
# Direction must be a valid enum value
assert trend["trend_direction"] in (
"bullish", "bearish", "mixed", "neutral",
), f"Invalid direction: {trend['trend_direction']}"
# Confidence and strength must be normalized [0, 1]
assert 0 <= trend["confidence"] <= 1, (
f"Confidence out of range: {trend['confidence']}"
)
assert 0 <= trend["trend_strength"] <= 1, (
f"Strength out of range: {trend['trend_strength']}"
)
async def test_trend_detail_has_evidence(self, query_client, seed_ids):
"""Individual trend detail includes evidence the trading engine logs."""
trend_id = seed_ids["trends"]["TREND_01"]
resp = await query_client.get(f"/api/trends/{trend_id}")
assert resp.status_code == 200
data = resp.json()
# Trading engine logs these for audit trail
assert "top_supporting_evidence" in data
assert "top_opposing_evidence" in data
assert "dominant_catalysts" in data
assert isinstance(data["top_supporting_evidence"], list)
assert isinstance(data["top_opposing_evidence"], list)
assert isinstance(data["dominant_catalysts"], list)
# ---------------------------------------------------------------------------
# Contract: Recommendation → Risk Engine → Trading Engine
# ---------------------------------------------------------------------------
class TestRecommendationToRiskContract:
"""Verify recommendations produce valid risk evaluation inputs."""
async def test_recommendation_has_risk_fields(self, query_client, seed_ids):
"""Recommendations include fields needed for risk evaluation."""
resp = await query_client.get(
"/api/recommendations", params={"latest": "false"},
)
assert resp.status_code == 200
recs = resp.json()
assert len(recs) >= 1
for rec in recs:
assert "ticker" in rec
assert "action" in rec
assert "confidence" in rec
assert "mode" in rec
# Action must be valid
assert rec["action"] in ("buy", "sell", "hold", "watch")
# Mode determines if it reaches trading engine
assert rec["mode"] in (
"informational", "paper_eligible", "live_eligible", "autonomous",
)
# Confidence must be normalized
assert 0 <= rec["confidence"] <= 1
async def test_risk_evaluation_schema(self, risk_client):
"""Risk engine returns evaluation with all fields trading engine needs."""
payload = {
"order": {
"ticker": "AAPL",
"action": "buy",
"quantity": 5,
"estimated_value": 925.0,
"confidence": 0.75,
"recommendation_id": None,
"sector": "Technology",
},
}
resp = await risk_client.post("/evaluate", json=payload)
assert resp.status_code == 200
data = resp.json()
# Trading engine reads these fields from risk evaluation
assert "evaluation_id" in data
assert "ticker" in data
assert "eligible" in data
assert "rejection_reasons" in data
assert "checks" in data
assert "evaluated_at" in data
# Types
assert isinstance(data["eligible"], bool)
assert isinstance(data["rejection_reasons"], list)
assert isinstance(data["checks"], list)
# Each check should have name and passed
for check in data["checks"]:
assert "check_name" in check or "name" in check
assert "result" in check or "passed" in check
async def test_risk_rejects_oversized_order(self, risk_client):
"""Risk engine correctly rejects an order exceeding position cap."""
payload = {
"order": {
"ticker": "AAPL",
"action": "buy",
"quantity": 1000,
"estimated_value": 185000.0,
"confidence": 0.9,
"recommendation_id": None,
"sector": "Technology",
},
"config": {
"absolute_position_cap": 10000.0,
},
}
resp = await risk_client.post("/evaluate", json=payload)
assert resp.status_code == 200
data = resp.json()
# Should be rejected due to position cap
assert data["eligible"] is False
assert len(data["rejection_reasons"]) > 0
# ---------------------------------------------------------------------------
# Contract: Trading Engine state consistency
# ---------------------------------------------------------------------------
class TestTradingEngineState:
"""Verify trading engine exposes consistent state for the promotion gate."""
async def test_status_reflects_config(self, trading_client):
"""Engine status fields are consistent with each other."""
resp = await trading_client.get("/api/trading/status")
assert resp.status_code == 200
data = resp.json()
# If paused, open_positions should still be reported
assert "open_positions" in data
assert isinstance(data["open_positions"], int)
assert data["open_positions"] >= 0
# Risk tier must be valid
assert data["risk_tier"] in ("conservative", "moderate", "aggressive")
# Pool values must be non-negative
assert data["active_pool"] >= 0
assert data["reserve_pool"] >= 0
async def test_decisions_have_audit_fields(self, trading_client, seed_ids):
"""Trading decisions include full audit trail fields."""
resp = await trading_client.get("/api/trading/decisions")
assert resp.status_code == 200
decisions = resp.json()
if len(decisions) > 0:
d = decisions[0]
assert "id" in d
assert "decision" in d
assert "ticker" in d
assert "created_at" in d
# Decision type must be valid
assert d["decision"] in ("act", "skip", "execute")
async def test_metrics_numeric_consistency(self, trading_client):
"""Portfolio metrics are all numeric and internally consistent."""
resp = await trading_client.get("/api/trading/metrics")
assert resp.status_code == 200
data = resp.json()
# All values must be numeric
numeric_fields = [
"total_portfolio_value", "active_pool", "reserve_pool",
"unrealized_pnl", "realized_pnl", "daily_pnl",
"win_rate", "sharpe_ratio", "max_drawdown", "portfolio_heat",
]
for field in numeric_fields:
assert field in data, f"Missing field: {field}"
assert isinstance(data[field], (int, float)), (
f"{field} should be numeric, got {type(data[field])}"
)
# Win rate and portfolio heat should be bounded
assert 0 <= data["win_rate"] <= 1 or data["win_rate"] == 0
assert 0 <= data["portfolio_heat"] <= 1 or data["portfolio_heat"] == 0
# Total portfolio = active + reserve + unrealized (approximately)
# Allow some tolerance for rounding
expected_total = data["active_pool"] + data["reserve_pool"] + data["unrealized_pnl"]
if data["total_portfolio_value"] > 0:
diff = abs(data["total_portfolio_value"] - expected_total)
assert diff < data["total_portfolio_value"] * 0.1, (
f"Portfolio value inconsistency: total={data['total_portfolio_value']}, "
f"active+reserve+unrealized={expected_total}"
)
# ---------------------------------------------------------------------------
# Contract: Approval workflow integration
# ---------------------------------------------------------------------------
class TestApprovalWorkflowContract:
"""Verify the approval workflow is accessible and returns valid schemas."""
async def test_pending_approvals_schema(self, risk_client):
"""Pending approvals list returns valid schema (may be empty)."""
resp = await risk_client.get("/approvals/pending")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
# If there are pending approvals, validate schema
for approval in data:
assert "id" in approval
assert "status" in approval
assert "ticker" in approval
assert "side" in approval
assert "quantity" in approval
assert "created_at" in approval
async def test_approval_not_found_returns_404(self, risk_client):
"""Non-existent approval ID returns 404, not 500."""
fake_id = "00000000-0000-4000-ffff-ffffffffffff"
resp = await risk_client.get(f"/approvals/{fake_id}")
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Contract: Cross-service health (all services must be up for paper trading)
# ---------------------------------------------------------------------------
class TestCrossServiceHealth:
"""All services must be healthy before promotion to paper trading."""
async def test_all_services_healthy(
self, query_client, registry_client, risk_client, trading_client,
):
"""Every service responds to health check."""
services = {
"query-api": query_client,
"symbol-registry": registry_client,
"risk-engine": risk_client,
"trading-engine": trading_client,
}
for name, client in services.items():
resp = await client.get("/health")
assert resp.status_code == 200, (
f"{name} health check failed with status {resp.status_code}"
)
data = resp.json()
assert data.get("status") == "ok", (
f"{name} reported unhealthy: {data}"
)
async def test_trading_engine_ready(self, trading_client):
"""Trading engine readiness probe passes (DB + Redis connected)."""
resp = await trading_client.get("/ready")
assert resp.status_code == 200
data = resp.json()
assert data["ready"] is True, (
f"Trading engine not ready: {data}"
)