"""Elliott Wave signal evaluator. Detects Elliott Wave patterns — impulse waves (5-wave structure) and corrective waves (3-wave structure) — using a simplified zigzag pivot filter. Produces a signal with the current wave position and projected direction. Wave detection algorithm (simplified): 1. Find significant pivot points (local highs and lows) using a zigzag filter that identifies reversals of at least X% of the price range. 2. Count alternating pivots to identify wave structure. 3. Five alternating pivots = impulse wave (bullish if trending up, bearish if trending down). 4. Three alternating pivots after an impulse = corrective wave. Signal logic: - Impulse wave 3 or 5: strong signal in the trend direction. - Corrective wave (A, B, C): signal in the opposite direction (anticipating next impulse). - Ambiguous wave count: return ``None``. """ from __future__ import annotations from services.signal_engine.models import OHLCVBar, SignalDirection, SignalResult from services.signal_engine.signals.base import validate_lookback # Default minimum number of bars required for evaluation DEFAULT_MIN_BARS: int = 30 # Minimum zigzag reversal threshold as a fraction of the price range _DEFAULT_ZIGZAG_PCT: float = 0.05 # 5% # Wave type labels WAVE_TYPE_IMPULSE: str = "impulse" WAVE_TYPE_CORRECTIVE: str = "corrective" # Impulse wave positions (1-indexed) _IMPULSE_WAVE_COUNT: int = 5 # Corrective wave positions _CORRECTIVE_WAVE_COUNT: int = 3 # Confidence multiplier for wave clarity _CONFIDENCE_MULTIPLIER: float = 0.85 class ElliottWaveEvaluator: """Elliott Wave pattern signal evaluator. Satisfies the :class:`~services.signal_engine.signals.base.SignalEvaluator` protocol. Parameters ---------- min_bars: Minimum number of OHLCV bars required before the evaluator will produce a signal. Defaults to ``30``. zigzag_pct: Minimum reversal threshold as a fraction of the overall price range for the zigzag filter. Defaults to ``0.05`` (5%). """ def __init__( self, min_bars: int = DEFAULT_MIN_BARS, zigzag_pct: float = _DEFAULT_ZIGZAG_PCT, ) -> None: self.min_bars = min_bars self.zigzag_pct = zigzag_pct # ------------------------------------------------------------------ # Public API (SignalEvaluator protocol) # ------------------------------------------------------------------ def evaluate( self, bars: list[OHLCVBar], timeframe: str, ) -> SignalResult | None: """Evaluate Elliott Wave pattern on *bars* for *timeframe*. Returns ``None`` when there are fewer than :pyattr:`min_bars` bars, when the market is flat (no price range), or when the wave count is ambiguous. """ if not validate_lookback(bars, self.min_bars): return None # Compute overall price range for the zigzag threshold overall_high = max(b.high for b in bars) overall_low = min(b.low for b in bars) price_range = overall_high - overall_low if price_range <= 0: return None # flat market zigzag_threshold = price_range * self.zigzag_pct # Find zigzag pivots pivots = _find_zigzag_pivots(bars, zigzag_threshold) if len(pivots) < _CORRECTIVE_WAVE_COUNT: return None # not enough pivots for any wave structure # Try to identify wave structure from the pivots wave_info = _classify_waves(pivots, price_range) if wave_info is None: return None # ambiguous wave count wave_type = wave_info["wave_type"] current_position = wave_info["current_position"] trend_up = wave_info["trend_up"] clarity = wave_info["clarity"] # Determine direction and strength based on wave type and position direction: SignalDirection strength: float if wave_type == WAVE_TYPE_IMPULSE: # Impulse wave: signal in the trend direction direction = SignalDirection.BULLISH if trend_up else SignalDirection.BEARISH # Waves 3 and 5 are the strongest signal points if current_position in (3, 5): strength = min(1.0, clarity * 1.0) else: strength = min(1.0, clarity * 0.6) else: # Corrective wave: signal opposite to the correction # (anticipating next impulse in the original trend direction) direction = SignalDirection.BULLISH if trend_up else SignalDirection.BEARISH strength = min(1.0, clarity * 0.7) confidence = min(1.0, clarity * _CONFIDENCE_MULTIPLIER) # Build pivot list for metadata (index, price, type) pivot_meta = [ {"index": p["index"], "price": p["price"], "type": p["type"]} for p in pivots ] return SignalResult( signal_type="elliott_wave", timeframe=timeframe, strength=strength, direction=direction, confidence=confidence, metadata={ "wave_count": len(pivots), "wave_type": wave_type, "current_wave_position": current_position, "trend_up": trend_up, "clarity": round(clarity, 4), "pivots": pivot_meta, }, ) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _find_zigzag_pivots( bars: list[OHLCVBar], threshold: float, ) -> list[dict]: """Find significant pivot points using a zigzag filter. A pivot is a local high or low where the price reverses by at least *threshold* from the last confirmed pivot. Returns a list of dicts with keys: ``index``, ``price``, ``type`` (``"high"`` or ``"low"``). """ if len(bars) < 2: return [] pivots: list[dict] = [] # Seed with the first bar's high and low as candidates last_high_idx = 0 last_high = bars[0].high last_low_idx = 0 last_low = bars[0].low # Direction: 1 = looking for a high (trending up), -1 = looking for a low # Start by determining initial direction from first two bars if bars[1].close >= bars[0].close: direction = 1 # trending up, looking for a high else: direction = -1 # trending down, looking for a low for i in range(1, len(bars)): bar = bars[i] if direction == 1: # Trending up — track the highest high if bar.high >= last_high: last_high = bar.high last_high_idx = i # Check for reversal: price dropped by threshold from the high if last_high - bar.low >= threshold: # Confirm the high as a pivot pivots.append({ "index": last_high_idx, "price": last_high, "type": "high", }) # Switch direction: now looking for a low direction = -1 last_low = bar.low last_low_idx = i else: # Trending down — track the lowest low if bar.low <= last_low: last_low = bar.low last_low_idx = i # Check for reversal: price rose by threshold from the low if bar.high - last_low >= threshold: # Confirm the low as a pivot pivots.append({ "index": last_low_idx, "price": last_low, "type": "low", }) # Switch direction: now looking for a high direction = 1 last_high = bar.high last_high_idx = i # Add the final unconfirmed pivot (the current trend endpoint) if direction == 1 and (not pivots or pivots[-1]["type"] != "high"): pivots.append({ "index": last_high_idx, "price": last_high, "type": "high", }) elif direction == -1 and (not pivots or pivots[-1]["type"] != "low"): pivots.append({ "index": last_low_idx, "price": last_low, "type": "low", }) return pivots def _classify_waves( pivots: list[dict], price_range: float, ) -> dict | None: """Classify the pivot sequence as impulse or corrective waves. Returns a dict with ``wave_type``, ``current_position``, ``trend_up``, and ``clarity``, or ``None`` if the wave count is ambiguous. """ n = len(pivots) if n < _CORRECTIVE_WAVE_COUNT: return None # Determine overall trend from first to last pivot first_price = pivots[0]["price"] last_price = pivots[-1]["price"] trend_up = last_price > first_price # Try impulse wave (5 pivots) first, then corrective (3 pivots) if n >= _IMPULSE_WAVE_COUNT: # Use the last 5 pivots for impulse wave detection impulse_pivots = pivots[-_IMPULSE_WAVE_COUNT:] impulse_result = _check_impulse(impulse_pivots, trend_up, price_range) if impulse_result is not None: return impulse_result # Check if there's a corrective wave after an impulse # (need at least 5 + 3 = 8 pivots for impulse + corrective) if n >= _IMPULSE_WAVE_COUNT + _CORRECTIVE_WAVE_COUNT: # Check if the first 5 pivots form an impulse early_impulse = pivots[:_IMPULSE_WAVE_COUNT] early_result = _check_impulse(early_impulse, trend_up, price_range) if early_result is not None: # The remaining pivots may form a corrective wave corrective_pivots = pivots[_IMPULSE_WAVE_COUNT:_IMPULSE_WAVE_COUNT + _CORRECTIVE_WAVE_COUNT] corrective_result = _check_corrective( corrective_pivots, trend_up, price_range, ) if corrective_result is not None: return corrective_result # Try corrective wave (3 pivots) from the tail if n >= _CORRECTIVE_WAVE_COUNT: corrective_pivots = pivots[-_CORRECTIVE_WAVE_COUNT:] corrective_result = _check_corrective( corrective_pivots, trend_up, price_range, ) if corrective_result is not None: return corrective_result return None # ambiguous def _check_impulse( pivots: list[dict], trend_up: bool, price_range: float, ) -> dict | None: """Check if 5 pivots form a valid impulse wave. For a bullish impulse (trend_up=True): - Wave 1 (low→high): price rises - Wave 2 (high→low): price falls but stays above wave 1 start - Wave 3 (low→high): price rises above wave 1 high (wave 3 is longest) - Wave 4 (high→low): price falls but stays above wave 1 high - Wave 5 (low→high): price rises to new high For bearish impulse, the pattern is inverted. """ if len(pivots) != _IMPULSE_WAVE_COUNT: return None prices = [p["price"] for p in pivots] if trend_up: # Bullish impulse: alternating low-high-low-high-low or high-low-high-low-high # Check for generally ascending pattern with higher highs valid = _validate_bullish_impulse(prices) else: # Bearish impulse: generally descending pattern with lower lows valid = _validate_bearish_impulse(prices) if not valid: return None # Compute clarity: how clean the wave structure is clarity = _compute_impulse_clarity(prices, trend_up, price_range) # Current position is wave 5 (the last wave in the impulse) return { "wave_type": WAVE_TYPE_IMPULSE, "current_position": 5, "trend_up": trend_up, "clarity": clarity, } def _validate_bullish_impulse(prices: list[float]) -> bool: """Validate a 5-pivot sequence as a bullish impulse. Simplified rules: - The overall trend is up (last > first). - Wave 3 (pivot 2 to pivot 3) should be the largest move or at least not the shortest. - Wave 2 should not retrace below wave 1 start. - Wave 4 should not overlap wave 1 end. """ if len(prices) != 5: return False # Overall upward trend if prices[-1] <= prices[0]: return False # Compute wave magnitudes waves = [abs(prices[i + 1] - prices[i]) for i in range(4)] # Wave 3 (index 2) should not be the shortest impulse wave # Impulse waves are waves 0, 2, 4 (odd-indexed moves in 0-based) impulse_waves = [waves[0], waves[2]] if len(waves) > 3: impulse_waves.append(waves[3]) # Wave 3 (waves[2]) should be significant if waves[2] < min(waves[0], waves[2]) * 0.5: return False # The pattern should show alternating direction # Check that consecutive pivots alternate in direction for i in range(3): move_a = prices[i + 1] - prices[i] move_b = prices[i + 2] - prices[i + 1] # Consecutive moves should be in opposite directions if move_a * move_b >= 0: return False return True def _validate_bearish_impulse(prices: list[float]) -> bool: """Validate a 5-pivot sequence as a bearish impulse. Mirror of bullish validation with inverted price direction. """ if len(prices) != 5: return False # Overall downward trend if prices[-1] >= prices[0]: return False # Compute wave magnitudes waves = [abs(prices[i + 1] - prices[i]) for i in range(4)] # Wave 3 (waves[2]) should be significant if waves[2] < min(waves[0], waves[2]) * 0.5: return False # Check alternating direction for i in range(3): move_a = prices[i + 1] - prices[i] move_b = prices[i + 2] - prices[i + 1] if move_a * move_b >= 0: return False return True def _compute_impulse_clarity( prices: list[float], trend_up: bool, price_range: float, ) -> float: """Compute wave clarity for an impulse wave. Clarity is based on: - How well the pivots alternate (already validated). - How proportional the wave magnitudes are. - How significant the waves are relative to the price range. """ if price_range <= 0: return 0.0 waves = [abs(prices[i + 1] - prices[i]) for i in range(4)] total_movement = sum(waves) # Significance: total wave movement relative to price range significance = min(1.0, total_movement / (price_range * 2.0)) # Proportionality: wave 3 should be the largest or close to it max_wave = max(waves) if max_wave <= 0: return 0.0 wave3_ratio = waves[2] / max_wave # 1.0 if wave 3 is the largest # Overall clarity clarity = 0.5 * significance + 0.5 * wave3_ratio return max(0.0, min(1.0, clarity)) def _check_corrective( pivots: list[dict], trend_up: bool, price_range: float, ) -> dict | None: """Check if 3 pivots form a valid corrective wave (A-B-C). A corrective wave moves against the main trend: - For a bullish main trend: corrective wave moves down (A down, B up, C down). - For a bearish main trend: corrective wave moves up (A up, B down, C up). """ if len(pivots) != _CORRECTIVE_WAVE_COUNT: return None prices = [p["price"] for p in pivots] # Check alternating direction move_a = prices[1] - prices[0] move_b = prices[2] - prices[1] # Moves must be in opposite directions if move_a * move_b >= 0: return None # For a bullish main trend, the corrective wave should move down overall if trend_up: if prices[2] >= prices[0]: return None # not a downward correction else: if prices[2] <= prices[0]: return None # not an upward correction # Compute clarity waves = [abs(prices[1] - prices[0]), abs(prices[2] - prices[1])] total_movement = sum(waves) if price_range <= 0: return 0.0 significance = min(1.0, total_movement / price_range) clarity = significance * 0.8 # corrective waves are inherently less clear # Current position is wave C (the last wave in the correction) return { "wave_type": WAVE_TYPE_CORRECTIVE, "current_position": 3, # wave C "trend_up": trend_up, "clarity": max(0.0, min(1.0, clarity)), }