"""Integration tests for cross-service signal flow contracts. These tests validate the end-to-end data flow that the trading engine depends on. They catch schema drift and contract violations between services that unit tests miss. Flow under test: 1. Symbol Registry has companies with exposure profiles and competitors 2. Query API returns trends with correct schema for trading engine consumption 3. Risk engine evaluates orders using data from query API 4. Trading engine receives valid recommendation payloads These are the "beta gate" tests — if any fail, promotion to paper is blocked. """ import pytest pytestmark = pytest.mark.asyncio # --------------------------------------------------------------------------- # Contract: Symbol Registry → Query API company data consistency # --------------------------------------------------------------------------- class TestRegistryToQueryContract: """Verify that company data in the registry matches what query API exposes.""" async def test_company_ids_consistent(self, registry_client, query_client, seed_ids): """Company IDs from registry match those returned by query API.""" reg_resp = await registry_client.get("/companies") assert reg_resp.status_code == 200 reg_companies = {c["id"]: c["ticker"] for c in reg_resp.json()} query_resp = await query_client.get("/api/companies") assert query_resp.status_code == 200 query_companies = {c["id"]: c["ticker"] for c in query_resp.json()} # Every company in registry should appear in query API for cid, ticker in reg_companies.items(): assert cid in query_companies, ( f"Company {ticker} ({cid}) in registry but missing from query API" ) assert query_companies[cid] == ticker async def test_exposure_profiles_accessible(self, registry_client, seed_ids): """Exposure profiles required by macro signal layer are accessible.""" company_id = seed_ids["companies"]["AAPL"] resp = await registry_client.get(f"/companies/{company_id}/exposure") assert resp.status_code == 200 data = resp.json() # Trading engine needs these fields for macro impact scoring assert "geographic_revenue_mix" in data assert "supply_chain_regions" in data assert "key_input_commodities" in data assert "market_position_tier" in data assert "export_dependency_pct" in data # Values must be valid types assert isinstance(data["geographic_revenue_mix"], dict) assert isinstance(data["supply_chain_regions"], list) assert isinstance(data["export_dependency_pct"], (int, float)) assert 0 <= data["export_dependency_pct"] <= 1 async def test_competitor_relationships_bidirectional(self, registry_client, seed_ids): """Competitor relationships are queryable from both sides.""" aapl_id = seed_ids["companies"]["AAPL"] msft_id = seed_ids["companies"]["MSFT"] # Query from AAPL side resp_a = await registry_client.get(f"/companies/{aapl_id}/competitors") assert resp_a.status_code == 200 aapl_competitors = resp_a.json() # Query from MSFT side resp_b = await registry_client.get(f"/companies/{msft_id}/competitors") assert resp_b.status_code == 200 msft_competitors = resp_b.json() # AAPL should see MSFT and vice versa aapl_partner_ids = set() for rel in aapl_competitors: if rel.get("company_a_id") == aapl_id: aapl_partner_ids.add(rel["company_b_id"]) else: aapl_partner_ids.add(rel["company_a_id"]) msft_partner_ids = set() for rel in msft_competitors: if rel.get("company_a_id") == msft_id: msft_partner_ids.add(rel["company_b_id"]) else: msft_partner_ids.add(rel["company_a_id"]) assert msft_id in aapl_partner_ids, "MSFT not in AAPL's competitors" assert aapl_id in msft_partner_ids, "AAPL not in MSFT's competitors" # --------------------------------------------------------------------------- # Contract: Query API → Trading Engine trend data # --------------------------------------------------------------------------- class TestTrendToTradingContract: """Verify trend data has the schema the trading engine expects.""" async def test_trend_has_required_trading_fields(self, query_client, seed_ids): """Trends must include fields the trading engine uses for decisions.""" resp = await query_client.get("/api/trends") assert resp.status_code == 200 trends = resp.json() assert len(trends) >= 1 for trend in trends: # Fields the trading engine reads assert "id" in trend assert "trend_direction" in trend assert "confidence" in trend assert "trend_strength" in trend # Direction must be a valid enum value assert trend["trend_direction"] in ( "bullish", "bearish", "mixed", "neutral", ), f"Invalid direction: {trend['trend_direction']}" # Confidence and strength must be normalized [0, 1] assert 0 <= trend["confidence"] <= 1, ( f"Confidence out of range: {trend['confidence']}" ) assert 0 <= trend["trend_strength"] <= 1, ( f"Strength out of range: {trend['trend_strength']}" ) async def test_trend_detail_has_evidence(self, query_client, seed_ids): """Individual trend detail includes evidence the trading engine logs.""" trend_id = seed_ids["trends"]["TREND_01"] resp = await query_client.get(f"/api/trends/{trend_id}") assert resp.status_code == 200 data = resp.json() # Trading engine logs these for audit trail assert "top_supporting_evidence" in data assert "top_opposing_evidence" in data assert "dominant_catalysts" in data assert isinstance(data["top_supporting_evidence"], list) assert isinstance(data["top_opposing_evidence"], list) assert isinstance(data["dominant_catalysts"], list) # --------------------------------------------------------------------------- # Contract: Recommendation → Risk Engine → Trading Engine # --------------------------------------------------------------------------- class TestRecommendationToRiskContract: """Verify recommendations produce valid risk evaluation inputs.""" async def test_recommendation_has_risk_fields(self, query_client, seed_ids): """Recommendations include fields needed for risk evaluation.""" resp = await query_client.get( "/api/recommendations", params={"latest": "false"}, ) assert resp.status_code == 200 recs = resp.json() assert len(recs) >= 1 for rec in recs: assert "ticker" in rec assert "action" in rec assert "confidence" in rec assert "mode" in rec # Action must be valid assert rec["action"] in ("buy", "sell", "hold", "watch") # Mode determines if it reaches trading engine assert isinstance(rec["mode"], str) and len(rec["mode"]) > 0 # Confidence must be normalized assert 0 <= rec["confidence"] <= 1 async def test_risk_evaluation_schema(self, risk_client): """Risk engine returns evaluation with all fields trading engine needs.""" payload = { "order": { "ticker": "AAPL", "action": "buy", "quantity": 5, "estimated_value": 925.0, "confidence": 0.75, "recommendation_id": None, "sector": "Technology", }, } resp = await risk_client.post("/evaluate", json=payload) assert resp.status_code == 200 data = resp.json() # Trading engine reads these fields from risk evaluation assert "evaluation_id" in data assert "ticker" in data assert "eligible" in data assert "rejection_reasons" in data assert "checks" in data assert "evaluated_at" in data # Types assert isinstance(data["eligible"], bool) assert isinstance(data["rejection_reasons"], list) assert isinstance(data["checks"], list) # Each check should have name and passed for check in data["checks"]: assert "check_name" in check or "name" in check assert "result" in check or "passed" in check async def test_risk_rejects_oversized_order(self, risk_client): """Risk engine correctly rejects an order exceeding position cap.""" payload = { "order": { "ticker": "AAPL", "action": "buy", "quantity": 1000, "estimated_value": 185000.0, "confidence": 0.9, "recommendation_id": None, "sector": "Technology", }, "config": { "absolute_position_cap": 10000.0, }, } resp = await risk_client.post("/evaluate", json=payload) assert resp.status_code == 200 data = resp.json() # Should be rejected due to position cap assert data["eligible"] is False assert len(data["rejection_reasons"]) > 0 # --------------------------------------------------------------------------- # Contract: Trading Engine state consistency # --------------------------------------------------------------------------- class TestTradingEngineState: """Verify trading engine exposes consistent state for the promotion gate.""" async def test_status_reflects_config(self, trading_client): """Engine status fields are consistent with each other.""" resp = await trading_client.get("/api/trading/status") assert resp.status_code == 200 data = resp.json() # If paused, open_positions should still be reported assert "open_positions" in data assert isinstance(data["open_positions"], int) assert data["open_positions"] >= 0 # Risk tier must be valid assert data["risk_tier"] in ("conservative", "moderate", "aggressive") # Pool values must be non-negative assert data["active_pool"] >= 0 assert data["reserve_pool"] >= 0 async def test_decisions_have_audit_fields(self, trading_client, seed_ids): """Trading decisions include full audit trail fields.""" resp = await trading_client.get("/api/trading/decisions") assert resp.status_code == 200 decisions = resp.json() if len(decisions) > 0: d = decisions[0] assert "id" in d assert "decision" in d assert "ticker" in d assert "created_at" in d # Decision type must be valid assert d["decision"] in ("act", "skip", "execute") async def test_metrics_numeric_consistency(self, trading_client): """Portfolio metrics are all numeric and internally consistent.""" resp = await trading_client.get("/api/trading/metrics") assert resp.status_code == 200 data = resp.json() # All values must be numeric numeric_fields = [ "total_portfolio_value", "active_pool", "reserve_pool", "unrealized_pnl", "realized_pnl", "daily_pnl", "win_rate", "sharpe_ratio", "max_drawdown", "portfolio_heat", ] for field in numeric_fields: assert field in data, f"Missing field: {field}" assert isinstance(data[field], (int, float)), ( f"{field} should be numeric, got {type(data[field])}" ) # Win rate and portfolio heat should be bounded assert 0 <= data["win_rate"] <= 1 or data["win_rate"] == 0 assert 0 <= data["portfolio_heat"] <= 1 or data["portfolio_heat"] == 0 # Total portfolio = active + reserve + unrealized (approximately) # Allow some tolerance for rounding expected_total = data["active_pool"] + data["reserve_pool"] + data["unrealized_pnl"] if data["total_portfolio_value"] > 0: diff = abs(data["total_portfolio_value"] - expected_total) assert diff < data["total_portfolio_value"] * 0.1, ( f"Portfolio value inconsistency: total={data['total_portfolio_value']}, " f"active+reserve+unrealized={expected_total}" ) # --------------------------------------------------------------------------- # Contract: Approval workflow integration # --------------------------------------------------------------------------- class TestApprovalWorkflowContract: """Verify the approval workflow is accessible and returns valid schemas.""" async def test_pending_approvals_schema(self, risk_client): """Pending approvals list returns valid schema (may be empty).""" resp = await risk_client.get("/approvals/pending") assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) # If there are pending approvals, validate schema for approval in data: assert "id" in approval assert "status" in approval assert "ticker" in approval assert "side" in approval assert "quantity" in approval assert "created_at" in approval async def test_approval_not_found_returns_404(self, risk_client): """Non-existent approval ID returns 404, not 500.""" fake_id = "00000000-0000-4000-ffff-ffffffffffff" resp = await risk_client.get(f"/approvals/{fake_id}") assert resp.status_code == 404 # --------------------------------------------------------------------------- # Contract: Cross-service health (all services must be up for paper trading) # --------------------------------------------------------------------------- class TestCrossServiceHealth: """All services must be healthy before promotion to paper trading.""" async def test_all_services_healthy( self, query_client, registry_client, risk_client, trading_client, ): """Every service responds to health check.""" services = { "query-api": query_client, "symbol-registry": registry_client, "risk-engine": risk_client, "trading-engine": trading_client, } for name, client in services.items(): resp = await client.get("/health") assert resp.status_code == 200, ( f"{name} health check failed with status {resp.status_code}" ) data = resp.json() assert data.get("status") == "ok", ( f"{name} reported unhealthy: {data}" ) async def test_trading_engine_ready(self, trading_client): """Trading engine readiness probe passes (DB + Redis connected).""" resp = await trading_client.get("/ready") assert resp.status_code == 200 data = resp.json() assert data["ready"] is True, ( f"Trading engine not ready: {data}" )