"""Position sizing engine for the autonomous trading system. Computes dollar allocation and share quantity for a trade by applying a sequential adjustment pipeline: confidence gate, correlation reduction, sector exposure, diversification bonus, earnings proximity, portfolio heat check, active-pool minimum, absolute cap, and share rounding. """ from __future__ import annotations import math from datetime import datetime from services.trading.models import ( OpenPosition, PortfolioState, PositionSizeResult, RiskTierConfig, ) class PositionSizer: """Compute position size through a multi-step adjustment pipeline.""" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def compute( self, confidence: float, ticker: str, sector: str, current_price: float, active_pool: float, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, correlation_matrix: dict[tuple[str, str], float], earnings_calendar: dict[str, datetime], absolute_position_cap: float = 50.0, active_pool_minimum: float = 100.0, ) -> PositionSizeResult: """Run the full adjustment pipeline and return a sizing result.""" adjustments: list[str] = [] # ---- 1. Active pool minimum check (early reject) ------------- if active_pool < active_pool_minimum: return self._rejected( f"Active pool ${active_pool:.2f} below minimum ${active_pool_minimum:.2f}", adjustments, ) # ---- 2. Confidence gate -------------------------------------- if confidence < risk_tier.min_confidence: return self._rejected( f"Confidence {confidence:.4f} below tier minimum {risk_tier.min_confidence}", adjustments, ) # ---- 3. Base sizing formula ---------------------------------- base_allocation_pct = risk_tier.max_position_pct * 0.5 multiplier = 1.0 # default multiplier raw_pct = ( base_allocation_pct * (confidence / risk_tier.min_confidence) * multiplier ) clamped_pct = min(raw_pct, risk_tier.max_position_pct) dollar_amount = active_pool * clamped_pct dollar_amount = min(dollar_amount, absolute_position_cap) adjustments.append( f"Base sizing: raw_pct={raw_pct:.6f}, clamped_pct={clamped_pct:.6f}, " f"dollar=${dollar_amount:.2f}" ) # ---- 4. Correlation reduction -------------------------------- dollar_amount, clamped_pct = self._apply_correlation_reduction( ticker, dollar_amount, clamped_pct, portfolio_state, correlation_matrix, adjustments, ) if dollar_amount == 0.0: return self._rejected(adjustments[-1], adjustments) # ---- 5. Sector exposure reduction ---------------------------- dollar_amount, clamped_pct = self._apply_sector_exposure_reduction( sector, dollar_amount, clamped_pct, active_pool, risk_tier, portfolio_state, adjustments, ) # ---- 6. Diversification bonus -------------------------------- dollar_amount, clamped_pct = self._apply_diversification_bonus( sector, dollar_amount, clamped_pct, risk_tier, portfolio_state, adjustments, ) # ---- 7. Earnings proximity ----------------------------------- result = self._apply_earnings_proximity( ticker, dollar_amount, clamped_pct, earnings_calendar, adjustments, ) if isinstance(result, PositionSizeResult): return result dollar_amount, clamped_pct = result # ---- 8. Absolute cap enforcement (re-apply after adjustments) - if dollar_amount > absolute_position_cap: dollar_amount = absolute_position_cap clamped_pct = dollar_amount / active_pool if active_pool > 0 else 0.0 adjustments.append( f"Absolute cap enforced: capped to ${absolute_position_cap:.2f}" ) # ---- 9. Portfolio heat check --------------------------------- stop_loss_distance_pct = risk_tier.stop_loss_atr_multiplier * 0.02 new_position_heat = dollar_amount * stop_loss_distance_pct max_heat_dollars = risk_tier.max_portfolio_heat * active_pool current_heat = portfolio_state.portfolio_heat if current_heat + new_position_heat > max_heat_dollars: return self._rejected( f"Portfolio heat would exceed limit: current={current_heat:.2f} + " f"new={new_position_heat:.2f} > max={max_heat_dollars:.2f}", adjustments, ) # ---- 10. Share rounding -------------------------------------- if current_price <= 0: return self._rejected("Invalid current price", adjustments) share_quantity = math.floor(dollar_amount / current_price) if share_quantity == 0: return self._rejected( f"Zero shares after rounding: ${dollar_amount:.2f} / ${current_price:.2f}", adjustments, ) # Final dollar amount based on whole shares final_dollar = share_quantity * current_price final_pct = final_dollar / active_pool if active_pool > 0 else 0.0 adjustments.append( f"Final: {share_quantity} shares @ ${current_price:.2f} = ${final_dollar:.2f} " f"({final_pct:.4%} of active pool)" ) return PositionSizeResult( dollar_amount=final_dollar, share_quantity=share_quantity, allocation_pct=final_pct, adjustments=adjustments, rejected=False, rejection_reason="", ) # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ @staticmethod def _rejected(reason: str, adjustments: list[str]) -> PositionSizeResult: return PositionSizeResult( dollar_amount=0.0, share_quantity=0, allocation_pct=0.0, adjustments=adjustments, rejected=True, rejection_reason=reason, ) @staticmethod def _apply_correlation_reduction( ticker: str, dollar_amount: float, allocation_pct: float, portfolio_state: PortfolioState, correlation_matrix: dict[tuple[str, str], float], adjustments: list[str], ) -> tuple[float, float]: """Reduce or reject based on weighted average correlation.""" positions: list[OpenPosition] = portfolio_state.positions if not positions: return dollar_amount, allocation_pct total_weight = 0.0 weighted_corr = 0.0 for pos in positions: corr = correlation_matrix.get( (ticker, pos.ticker), correlation_matrix.get((pos.ticker, ticker), 0.0), ) weight = pos.market_value weighted_corr += corr * weight total_weight += weight if total_weight == 0.0: return dollar_amount, allocation_pct avg_corr = weighted_corr / total_weight if avg_corr > 0.8: adjustments.append( f"Correlation rejection: avg={avg_corr:.4f} > 0.8" ) return 0.0, 0.0 if avg_corr > 0.5: # Reduce proportionally: scale factor goes from 1.0 at 0.5 to 0.0 at 0.8 reduction = (avg_corr - 0.5) / (0.8 - 0.5) factor = 1.0 - reduction new_dollar = dollar_amount * factor new_pct = allocation_pct * factor adjustments.append( f"Correlation reduction: avg={avg_corr:.4f}, factor={factor:.4f}, " f"${dollar_amount:.2f} -> ${new_dollar:.2f}" ) return new_dollar, new_pct return dollar_amount, allocation_pct @staticmethod def _apply_sector_exposure_reduction( sector: str, dollar_amount: float, allocation_pct: float, active_pool: float, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, adjustments: list[str], ) -> tuple[float, float]: """Reduce allocation if sector would exceed max_sector_pct.""" max_sector_dollars = risk_tier.max_sector_pct * active_pool current_sector_exposure = portfolio_state.sector_exposure.get(sector, 0.0) if current_sector_exposure + dollar_amount > max_sector_dollars: available = max(max_sector_dollars - current_sector_exposure, 0.0) if available <= 0: adjustments.append( f"Sector exposure at limit: {sector} " f"${current_sector_exposure:.2f} >= max ${max_sector_dollars:.2f}" ) return 0.0, 0.0 new_pct = available / active_pool if active_pool > 0 else 0.0 adjustments.append( f"Sector exposure reduction: {sector} " f"${current_sector_exposure:.2f} + ${dollar_amount:.2f} > " f"max ${max_sector_dollars:.2f}, reduced to ${available:.2f}" ) return available, new_pct return dollar_amount, allocation_pct @staticmethod def _apply_diversification_bonus( sector: str, dollar_amount: float, allocation_pct: float, risk_tier: RiskTierConfig, portfolio_state: PortfolioState, adjustments: list[str], ) -> tuple[float, float]: """Apply 1.2x bonus for under-represented sectors when < 3 sectors held.""" existing_sectors = set(portfolio_state.sector_exposure.keys()) if len(existing_sectors) < 3 and sector not in existing_sectors: bonus = 1.2 new_dollar = dollar_amount * bonus new_pct = allocation_pct * bonus # Re-clamp to max_position_pct after bonus max_dollar = risk_tier.max_position_pct * ( portfolio_state.active_pool if portfolio_state.active_pool > 0 else 1.0 ) if new_dollar > max_dollar: new_dollar = max_dollar new_pct = risk_tier.max_position_pct adjustments.append( f"Diversification bonus: 1.2x applied for new sector '{sector}' " f"(portfolio has {len(existing_sectors)} sectors), " f"${dollar_amount:.2f} -> ${new_dollar:.2f}" ) return new_dollar, new_pct return dollar_amount, allocation_pct @staticmethod def _apply_earnings_proximity( ticker: str, dollar_amount: float, allocation_pct: float, earnings_calendar: dict[str, datetime], adjustments: list[str], ) -> tuple[float, float] | PositionSizeResult: """Reduce by 50% within 3 trading days; reject within 1 trading day.""" if ticker not in earnings_calendar: return dollar_amount, allocation_pct earnings_dt = earnings_calendar[ticker] now = datetime.utcnow() delta = earnings_dt - now # Use total_seconds for precise fractional-day comparison trading_days_until = delta.total_seconds() / 86400.0 if trading_days_until < 0: # Earnings already passed return dollar_amount, allocation_pct if trading_days_until <= 1: adjustments.append( f"Earnings rejection: {ticker} earnings in {trading_days_until:.1f} day(s)" ) return PositionSizeResult( dollar_amount=0.0, share_quantity=0, allocation_pct=0.0, adjustments=adjustments, rejected=True, rejection_reason=f"Earnings within 1 trading day for {ticker}", ) if trading_days_until <= 3: new_dollar = dollar_amount * 0.5 new_pct = allocation_pct * 0.5 adjustments.append( f"Earnings proximity: {ticker} earnings in {trading_days_until:.1f} days, " f"50% reduction: ${dollar_amount:.2f} -> ${new_dollar:.2f}" ) return new_dollar, new_pct return dollar_amount, allocation_pct