"""Delta Analyzer — compares heuristic and probabilistic pipeline verdicts. Computes agreement flags, confidence deltas, disagreement reasons, and tracks a rolling 100-evaluation agreement rate per ticker in Redis. Requirements: 9.1, 9.2, 9.3, 9.4, 9.5, 9.6 """ from __future__ import annotations import logging import redis.asyncio from services.signal_engine.models import ( DeltaResult, HeuristicResult, ProbabilisticResult, Verdict, ) logger = logging.getLogger(__name__) # Redis key pattern for rolling agreement tracking _AGREEMENT_KEY_PREFIX = "stonks:signal_engine:agreement" # Maximum number of evaluations to track for rolling agreement rate _ROLLING_WINDOW = 100 # Agreement rate threshold below which a warning is logged _AGREEMENT_WARNING_THRESHOLD = 0.50 def _compute_disagreement_reasons( heuristic: HeuristicResult, probabilistic: ProbabilisticResult, ) -> list[str]: """Identify reasons for pipeline disagreement. Compares which conditions each pipeline met or failed to produce human-readable disagreement reasons for training signal generation. """ reasons: list[str] = [] if heuristic.verdict == probabilistic.verdict: return reasons # Heuristic-side reasons if heuristic.confidence < 0.70: reasons.append("heuristic_confidence_below_threshold") if heuristic.s_total < 1.2: reasons.append("heuristic_s_total_below_threshold") # Probabilistic-side reasons if probabilistic.p_up < 0.60: reasons.append("probabilistic_p_up_below_threshold") if probabilistic.entropy > 0.90: reasons.append("probabilistic_entropy_too_high") if probabilistic.ev_r < 1.5: reasons.append("EV_R_below_threshold") # Verdict-specific context if heuristic.verdict == Verdict.BUY and probabilistic.verdict != Verdict.BUY: reasons.append("heuristic_buy_probabilistic_disagrees") elif probabilistic.verdict == Verdict.BUY and heuristic.verdict != Verdict.BUY: reasons.append("probabilistic_buy_heuristic_disagrees") return reasons async def analyze_delta( heuristic: HeuristicResult, probabilistic: ProbabilisticResult, redis_client: redis.asyncio.Redis, ticker: str, ) -> DeltaResult: """Compare pipeline verdicts and track agreement metrics. 1. Compute agreement flag (both verdicts identical). 2. Compute confidence delta: ``|heuristic_confidence - probabilistic_P_up|``. 3. Record disagreement reasons when verdicts differ. 4. Track rolling 100-evaluation agreement rate in Redis. 5. Log warning when agreement rate drops below 0.50. Returns a ``DeltaResult`` with all computed fields. """ # Step 1: Agreement flag agreement = heuristic.verdict == probabilistic.verdict # Step 2: Confidence delta confidence_delta = abs(heuristic.confidence - probabilistic.p_up) # Step 3: Disagreement reasons disagreement_reasons = _compute_disagreement_reasons(heuristic, probabilistic) # Step 4: Rolling agreement rate in Redis rolling_agreement_rate: float | None = None agreement_key = f"{_AGREEMENT_KEY_PREFIX}:{ticker}" try: # Push the agreement result (1 for agree, 0 for disagree) await redis_client.lpush(agreement_key, "1" if agreement else "0") # Trim to the last _ROLLING_WINDOW evaluations await redis_client.ltrim(agreement_key, 0, _ROLLING_WINDOW - 1) # Compute the rolling agreement rate values = await redis_client.lrange(agreement_key, 0, _ROLLING_WINDOW - 1) if values: agree_count = sum(1 for v in values if v == b"1" or v == "1") rolling_agreement_rate = agree_count / len(values) except Exception: logger.warning( "Failed to update rolling agreement rate in Redis for %s", ticker, exc_info=True, ) # Step 5: Log warning when agreement rate drops below threshold if ( rolling_agreement_rate is not None and rolling_agreement_rate < _AGREEMENT_WARNING_THRESHOLD ): logger.warning( "Persistent pipeline disagreement for %s: rolling agreement rate %.2f " "(below %.2f threshold over last %d evaluations)", ticker, rolling_agreement_rate, _AGREEMENT_WARNING_THRESHOLD, _ROLLING_WINDOW, ) # Step 6: Return DeltaResult return DeltaResult( agreement=agreement, confidence_delta=round(confidence_delta, 6), heuristic_verdict=heuristic.verdict.value, probabilistic_verdict=probabilistic.verdict.value, disagreement_reasons=disagreement_reasons, rolling_agreement_rate=rolling_agreement_rate, )