"""Integration tests for the TradingEngine wiring and end-to-end decision flow. Tests verify that the engine correctly delegates to sub-components and that the full recommendation → evaluation → sizing → decision pipeline works end-to-end with concrete values. No DB/Redis needed — all components are pure computation. """ from __future__ import annotations from datetime import datetime, timedelta, timezone import pytest from services.shared.config import TradingConfig from services.trading.circuit_breaker import CircuitBreaker from services.trading.correlation import CorrelationMatrix from services.trading.engine import TradingEngine from services.trading.models import ( CircuitBreakerState, OpenPosition, PerformanceMetrics, PortfolioState, RiskTierConfig, StopLevels, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_engine(**overrides) -> TradingEngine: """Create a TradingEngine with sensible test defaults.""" config_kwargs = { "enabled": True, "micro_trading_enabled": False, } config_kwargs.update(overrides) config = TradingConfig(**config_kwargs) return TradingEngine(pool=None, redis=None, config=config) def _moderate_tier() -> RiskTierConfig: return RiskTierConfig( name="moderate", min_confidence=0.55, max_position_pct=0.10, stop_loss_atr_multiplier=2.0, reward_risk_ratio=1.5, max_sector_pct=0.30, max_portfolio_heat=0.20, ) def _portfolio_state(**overrides) -> PortfolioState: defaults = { "positions": [], "total_value": 500.0, "cash": 400.0, "active_pool": 400.0, "reserve_pool": 100.0, "sector_exposure": {}, "portfolio_heat": 0.0, "open_position_count": 0, } defaults.update(overrides) return PortfolioState(**defaults) def _inactive_cb() -> CircuitBreakerState: return CircuitBreakerState(active=False) def _within_trading_window() -> datetime: """Return a datetime that is within the trading window (Wed 11:00 AM ET).""" from zoneinfo import ZoneInfo return datetime(2025, 1, 15, 11, 0, 0, tzinfo=ZoneInfo("America/New_York")) # --------------------------------------------------------------------------- # Test 1: Full cycle — recommendation → evaluation → position sizing → act # --------------------------------------------------------------------------- class TestFullDecisionCycle: """Verify the complete recommendation-to-decision pipeline.""" def test_high_confidence_recommendation_produces_act_decision(self): engine = _make_engine(absolute_position_cap=100.0) tier = _moderate_tier() portfolio = _portfolio_state(active_pool=1000.0, total_value=1200.0) cb_state = _inactive_cb() corr = CorrelationMatrix() now = _within_trading_window() rec = { "recommendation_id": "rec-001", "ticker": "AAPL", "confidence": 0.80, "sector": "Technology", "current_price": 25.0, # cheap enough to buy at least 1 share "action": "buy", } decision = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=corr, earnings_calendar={}, now=now, ) assert decision.decision == "act" assert decision.ticker == "AAPL" assert decision.computed_position_size is not None assert decision.computed_position_size > 0 assert decision.computed_share_quantity is not None assert decision.computed_share_quantity > 0 assert decision.risk_tier_at_decision == "moderate" assert decision.circuit_breaker_status == "inactive" def test_low_confidence_recommendation_produces_skip_decision(self): engine = _make_engine() tier = _moderate_tier() portfolio = _portfolio_state() cb_state = _inactive_cb() corr = CorrelationMatrix() now = _within_trading_window() rec = { "recommendation_id": "rec-002", "ticker": "MSFT", "confidence": 0.30, # below moderate min_confidence of 0.55 "sector": "Technology", "current_price": 400.0, "action": "buy", } decision = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=corr, earnings_calendar={}, now=now, ) assert decision.decision == "skip" assert "insufficient_confidence" in decision.skip_reason def test_duplicate_recommendation_is_skipped(self): engine = _make_engine(absolute_position_cap=100.0) tier = _moderate_tier() portfolio = _portfolio_state(active_pool=1000.0, total_value=1200.0) cb_state = _inactive_cb() corr = CorrelationMatrix() now = _within_trading_window() rec = { "recommendation_id": "rec-dup", "ticker": "GOOG", "confidence": 0.80, "sector": "Technology", "current_price": 25.0, "action": "buy", } # First evaluation should act d1 = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=corr, earnings_calendar={}, now=now, ) assert d1.decision == "act" # Second evaluation of same rec should skip as duplicate d2 = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=corr, earnings_calendar={}, now=now, ) assert d2.decision == "skip" assert "duplicate" in d2.skip_reason # --------------------------------------------------------------------------- # Test 2: Stop-loss crossing → StopTrigger list # --------------------------------------------------------------------------- class TestStopLossCrossings: """Verify stop-loss crossing detection via engine wiring.""" def test_stop_loss_triggered(self): engine = _make_engine() positions = [ OpenPosition( ticker="AAPL", quantity=10, entry_price=150.0, current_price=140.0, unrealized_pnl=-100.0, market_value=1400.0, sector="Technology", stop_loss_price=145.0, take_profit_price=165.0, signal_confidence=0.75, ), ] prices = {"AAPL": 144.0} # below stop_loss_price of 145.0 stop_levels = { "AAPL": StopLevels( stop_loss_price=145.0, take_profit_price=165.0, trailing_stop_active=False, atr_value=3.0, atr_multiplier=2.0, reward_risk_ratio=1.5, ), } triggers = engine.check_stop_loss_crossings(positions, prices, stop_levels) assert len(triggers) == 1 assert triggers[0].ticker == "AAPL" assert triggers[0].trigger_type == "stop_loss" assert triggers[0].current_price == 144.0 def test_take_profit_triggered(self): engine = _make_engine() positions = [ OpenPosition( ticker="MSFT", quantity=5, entry_price=400.0, current_price=420.0, unrealized_pnl=100.0, market_value=2100.0, sector="Technology", stop_loss_price=390.0, take_profit_price=415.0, signal_confidence=0.80, ), ] prices = {"MSFT": 416.0} # above take_profit_price of 415.0 stop_levels = { "MSFT": StopLevels( stop_loss_price=390.0, take_profit_price=415.0, trailing_stop_active=False, atr_value=5.0, atr_multiplier=2.0, reward_risk_ratio=1.5, ), } triggers = engine.check_stop_loss_crossings(positions, prices, stop_levels) assert len(triggers) == 1 assert triggers[0].ticker == "MSFT" assert triggers[0].trigger_type == "take_profit" def test_no_crossing_returns_empty(self): engine = _make_engine() positions = [ OpenPosition( ticker="GOOG", quantity=3, entry_price=170.0, current_price=172.0, unrealized_pnl=6.0, market_value=516.0, sector="Technology", stop_loss_price=165.0, take_profit_price=180.0, signal_confidence=0.70, ), ] prices = {"GOOG": 172.0} # between stop and take-profit stop_levels = { "GOOG": StopLevels( stop_loss_price=165.0, take_profit_price=180.0, trailing_stop_active=False, atr_value=3.0, atr_multiplier=2.0, reward_risk_ratio=1.5, ), } triggers = engine.check_stop_loss_crossings(positions, prices, stop_levels) assert len(triggers) == 0 # --------------------------------------------------------------------------- # Test 3: Reserve pool siphoning on profitable close # --------------------------------------------------------------------------- class TestReservePoolSiphoning: """Verify reserve pool siphoning via engine wiring.""" def test_profitable_close_siphons_20_percent(self): engine = _make_engine(reserve_siphon_pct=0.20) transfer, new_balance = engine.handle_position_close( realized_profit=100.0, reserve_balance=50.0, ) assert transfer == pytest.approx(20.0) # 20% of $100 assert new_balance == pytest.approx(70.0) # $50 + $20 def test_loss_does_not_siphon(self): engine = _make_engine(reserve_siphon_pct=0.20) transfer, new_balance = engine.handle_position_close( realized_profit=-30.0, reserve_balance=50.0, ) assert transfer == 0.0 assert new_balance == 50.0 def test_zero_profit_does_not_siphon(self): engine = _make_engine(reserve_siphon_pct=0.20) transfer, new_balance = engine.handle_position_close( realized_profit=0.0, reserve_balance=50.0, ) assert transfer == 0.0 assert new_balance == 50.0 # --------------------------------------------------------------------------- # Test 4: Circuit breaker trigger → halt → cooldown → resume # --------------------------------------------------------------------------- class TestCircuitBreakerFlow: """Verify circuit breaker integration with the decision loop.""" def test_active_circuit_breaker_skips_recommendation(self): engine = _make_engine() tier = _moderate_tier() portfolio = _portfolio_state() now = _within_trading_window() # Circuit breaker active with future cooldown cb_state = CircuitBreakerState( active=True, trigger_type="daily_loss", triggered_at=now - timedelta(minutes=30), cooldown_expires=now + timedelta(hours=2), ) rec = { "recommendation_id": "rec-cb-1", "ticker": "AAPL", "confidence": 0.90, "sector": "Technology", "current_price": 150.0, "action": "buy", } decision = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=CorrelationMatrix(), earnings_calendar={}, now=now, ) assert decision.decision == "skip" assert "circuit_breaker_active" in decision.skip_reason assert decision.circuit_breaker_status == "active" def test_expired_circuit_breaker_allows_trading(self): engine = _make_engine(absolute_position_cap=100.0) tier = _moderate_tier() portfolio = _portfolio_state(active_pool=1000.0, total_value=1200.0) now = _within_trading_window() # Circuit breaker was active but cooldown has expired cb_state = CircuitBreakerState( active=True, trigger_type="daily_loss", triggered_at=now - timedelta(hours=3), cooldown_expires=now - timedelta(hours=1), # expired ) rec = { "recommendation_id": "rec-cb-2", "ticker": "AAPL", "confidence": 0.80, "sector": "Technology", "current_price": 25.0, "action": "buy", } decision = engine.evaluate_recommendation( rec=rec, portfolio_state=portfolio, risk_tier=tier, circuit_breaker_state=cb_state, correlation_matrix=CorrelationMatrix(), earnings_calendar={}, now=now, ) # Should proceed since cooldown expired assert decision.decision == "act" def test_circuit_breaker_daily_loss_detection(self): """Verify the CircuitBreaker component detects daily loss threshold.""" cb = CircuitBreaker(daily_loss_pct=0.05) # 6% loss on $500 portfolio → should trigger assert cb.check_daily_loss(daily_pnl=-30.0, portfolio_value=500.0) is True # 3% loss → should not trigger assert cb.check_daily_loss(daily_pnl=-15.0, portfolio_value=500.0) is False def test_circuit_breaker_cooldown_computation(self): """Verify cooldown expiry is computed correctly.""" cb = CircuitBreaker(volatility_pause_hours=2, ticker_cooldown_hours=48) now = datetime(2025, 1, 15, 12, 0, 0, tzinfo=timezone.utc) vol_expiry = cb.compute_cooldown_expiry("volatility", now) assert vol_expiry == now + timedelta(hours=2) pos_expiry = cb.compute_cooldown_expiry("single_position", now) assert pos_expiry == now + timedelta(hours=48) # --------------------------------------------------------------------------- # Test 5: Engine startup state # --------------------------------------------------------------------------- class TestEngineStartup: """Verify engine startup state and lifecycle.""" def test_engine_starts_not_running(self): engine = _make_engine() assert engine.running is False @pytest.mark.asyncio async def test_start_sets_running_flag(self): engine = _make_engine() await engine.start() assert engine.running is True @pytest.mark.asyncio async def test_stop_clears_running_flag(self): engine = _make_engine() await engine.start() assert engine.running is True await engine.stop() assert engine.running is False def test_engine_initializes_all_sub_components(self): engine = _make_engine() assert engine.position_sizer is not None assert engine.stop_loss_manager is not None assert engine.circuit_breaker is not None assert engine.reserve_pool_controller is not None assert engine.risk_tier_controller is not None assert engine.correlation_matrix is not None assert engine.notification_service is not None assert engine.micro_trading_module is not None assert engine.rebalancer is not None def test_engine_starts_with_empty_processed_ids(self): engine = _make_engine() assert len(engine.processed_recommendation_ids) == 0 # --------------------------------------------------------------------------- # Test 6: Risk tier evaluation wiring # --------------------------------------------------------------------------- class TestRiskTierEvaluation: """Verify risk tier evaluation via engine wiring.""" def test_downgrade_on_poor_performance(self): engine = _make_engine() metrics = PerformanceMetrics( total_portfolio_value=500.0, active_pool=400.0, reserve_pool=100.0, unrealized_pnl=-20.0, realized_pnl=-50.0, daily_pnl=-10.0, win_count=3, loss_count=7, win_rate=0.30, # below 40% → downgrade avg_win=10.0, avg_loss=-8.0, profit_factor=0.5, sharpe_ratio=-0.5, max_drawdown=0.20, current_drawdown_pct=0.18, # above 15% → also triggers downgrade portfolio_heat=0.10, ) new_tier = engine.evaluate_risk_tier("moderate", metrics, reserve_pct=0.20) assert new_tier == "conservative" def test_upgrade_on_strong_performance(self): engine = _make_engine() metrics = PerformanceMetrics( total_portfolio_value=600.0, active_pool=400.0, reserve_pool=200.0, unrealized_pnl=30.0, realized_pnl=100.0, daily_pnl=5.0, win_count=7, loss_count=3, win_rate=0.70, # above 55% avg_win=15.0, avg_loss=-5.0, profit_factor=2.1, sharpe_ratio=1.5, max_drawdown=0.05, current_drawdown_pct=0.02, # below 5% portfolio_heat=0.08, ) # reserve_pct > 0.20 and win_rate > 0.55 and drawdown < 0.05 new_tier = engine.evaluate_risk_tier("moderate", metrics, reserve_pct=0.33) assert new_tier == "aggressive" def test_no_change_when_metrics_are_neutral(self): engine = _make_engine() metrics = PerformanceMetrics( total_portfolio_value=500.0, active_pool=400.0, reserve_pool=100.0, unrealized_pnl=5.0, realized_pnl=20.0, daily_pnl=2.0, win_count=5, loss_count=5, win_rate=0.50, # between 40% and 55% avg_win=10.0, avg_loss=-8.0, profit_factor=1.2, sharpe_ratio=0.5, max_drawdown=0.08, current_drawdown_pct=0.06, portfolio_heat=0.10, ) new_tier = engine.evaluate_risk_tier("moderate", metrics, reserve_pct=0.20) assert new_tier is None # --------------------------------------------------------------------------- # Test 7: Rebalancing wiring # --------------------------------------------------------------------------- class TestRebalancingWiring: """Verify portfolio rebalancing via engine wiring.""" def test_over_concentrated_position_generates_sell_order(self): engine = _make_engine() tier = _moderate_tier() # max_position_pct = 0.10 # market_value $200 exceeds 10% of $400 = $40 by $160 # sell_qty = int(160 / 20) = 8 shares positions = [ OpenPosition( ticker="AAPL", quantity=10, entry_price=18.0, current_price=20.0, unrealized_pnl=20.0, market_value=200.0, sector="Technology", stop_loss_price=16.0, take_profit_price=25.0, signal_confidence=0.80, ), ] orders = engine.evaluate_rebalancing(positions, tier, active_pool=400.0) assert len(orders) >= 1 assert orders[0].ticker == "AAPL" assert orders[0].action == "sell" def test_balanced_portfolio_generates_no_orders(self): engine = _make_engine() tier = _moderate_tier() positions = [ OpenPosition( ticker="AAPL", quantity=1, entry_price=30.0, current_price=30.0, unrealized_pnl=0.0, market_value=30.0, # $30 < 10% of $400 = $40 sector="Technology", stop_loss_price=28.0, take_profit_price=35.0, signal_confidence=0.80, ), ] orders = engine.evaluate_rebalancing(positions, tier, active_pool=400.0) assert len(orders) == 0 # --------------------------------------------------------------------------- # Test 8: Notification wiring # --------------------------------------------------------------------------- class TestNotificationWiring: """Verify notification creation via engine wiring.""" def test_create_alert_returns_notification_record(self): engine = _make_engine() record = engine.create_alert( event_type="circuit_breaker_triggered", details="Daily loss exceeded 5% threshold", ) assert record.event_type == "circuit_breaker_triggered" assert record.channel == "email" assert "Circuit Breaker Triggered" in record.message assert "Daily loss exceeded 5% threshold" in record.message assert record.delivery_status == "pending" def test_create_alert_for_risk_tier_change(self): engine = _make_engine() record = engine.create_alert( event_type="risk_tier_changed", details="moderate → conservative due to drawdown", ) assert record.event_type == "risk_tier_changed" assert "Risk Tier Changed" in record.message # --------------------------------------------------------------------------- # Test 9: Micro-trading constraint wiring # --------------------------------------------------------------------------- class TestMicroTradingConstraints: """Verify micro-trading constraint checking via engine wiring.""" def test_disabled_micro_trading_rejects(self): engine = _make_engine(micro_trading_enabled=False) allowed, reason = engine.check_micro_trade_constraints( daily_count=0, is_within_window=True, circuit_breaker_active=False, portfolio_heat_pct=0.05, max_heat=0.20, ) assert allowed is False assert reason == "micro_trading_disabled" def test_enabled_micro_trading_allows(self): engine = _make_engine(micro_trading_enabled=True) allowed, reason = engine.check_micro_trade_constraints( daily_count=0, is_within_window=True, circuit_breaker_active=False, portfolio_heat_pct=0.05, max_heat=0.20, ) assert allowed is True assert reason == "ok" def test_circuit_breaker_blocks_micro_trade(self): engine = _make_engine(micro_trading_enabled=True) allowed, reason = engine.check_micro_trade_constraints( daily_count=0, is_within_window=True, circuit_breaker_active=True, portfolio_heat_pct=0.05, max_heat=0.20, ) assert allowed is False assert reason == "circuit_breaker_active" def test_daily_limit_blocks_micro_trade(self): engine = _make_engine( micro_trading_enabled=True, micro_trading_max_daily=10, ) allowed, reason = engine.check_micro_trade_constraints( daily_count=10, is_within_window=True, circuit_breaker_active=False, portfolio_heat_pct=0.05, max_heat=0.20, ) assert allowed is False assert reason == "daily_limit_reached"