From f11aa0a1ee5bd1153323f0fae683a54fba8dc291 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Fri, 17 Apr 2026 00:15:32 +0000 Subject: [PATCH] fix: deduplicate recommendations and widen position sizing range - Add dedup check in recommendation worker: skip generation when latest rec for same ticker+window has identical action/mode/confidence - Widen position sizing range (1-10% portfolio, 0.3-2% max loss) and factor in trend strength + evidence count for differentiated sizing - API returns only latest recommendation per ticker by default (DISTINCT ON) to eliminate duplicate rows in the frontend list view --- services/api/app.py | 48 ++++++++++++----- services/recommendation/eligibility.py | 50 ++++++++++-------- services/recommendation/worker.py | 71 ++++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 35 deletions(-) diff --git a/services/api/app.py b/services/api/app.py index 9ea6afd..2ce1cf3 100644 --- a/services/api/app.py +++ b/services/api/app.py @@ -450,8 +450,14 @@ async def list_recommendations( since: Optional[str] = None, limit: int = Query(default=50, le=200), offset: int = 0, + latest: bool = Query(default=True, description="Return only the latest recommendation per ticker"), ): - """List recommendations with optional filters.""" + """List recommendations with optional filters. + + By default (latest=true), returns only the most recent recommendation + per ticker to avoid showing duplicate/stale entries. Set latest=false + to see the full history. + """ conditions: list[str] = [] params: list[Any] = [] idx = 1 @@ -475,17 +481,35 @@ async def list_recommendations( where = ("WHERE " + " AND ".join(conditions)) if conditions else "" - rows = await pool.fetch( - f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence, - r.time_horizon, r.thesis, r.invalidation_conditions, - r.portfolio_pct, r.max_loss_pct, r.model_version, - r.risk_classification, r.generated_at - FROM recommendations r - {where} - ORDER BY r.generated_at DESC - LIMIT ${idx} OFFSET ${idx + 1}""", - *params, limit, offset, - ) + if latest: + # Use DISTINCT ON to get only the latest recommendation per ticker + rows = await pool.fetch( + f"""SELECT DISTINCT ON (r.ticker) + r.id, r.ticker, r.action, r.mode, r.confidence, + r.time_horizon, r.thesis, r.invalidation_conditions, + r.portfolio_pct, r.max_loss_pct, r.model_version, + r.risk_classification, r.generated_at + FROM recommendations r + {where} + ORDER BY r.ticker, r.generated_at DESC""", + *params, + ) + # Apply limit/offset manually after DISTINCT ON + # Sort by generated_at DESC for the final result + rows = sorted(rows, key=lambda r: r["generated_at"], reverse=True) + rows = rows[offset:offset + limit] + else: + rows = await pool.fetch( + f"""SELECT r.id, r.ticker, r.action, r.mode, r.confidence, + r.time_horizon, r.thesis, r.invalidation_conditions, + r.portfolio_pct, r.max_loss_pct, r.model_version, + r.risk_classification, r.generated_at + FROM recommendations r + {where} + ORDER BY r.generated_at DESC + LIMIT ${idx} OFFSET ${idx + 1}""", + *params, limit, offset, + ) results = [] for r in rows: d = _row_to_dict(r) diff --git a/services/recommendation/eligibility.py b/services/recommendation/eligibility.py index cb83cd9..27edd5a 100644 --- a/services/recommendation/eligibility.py +++ b/services/recommendation/eligibility.py @@ -66,17 +66,17 @@ class EligibilityConfig: # --- Position sizing rules (Requirement 7.3) --- # Base portfolio allocation percentage - base_portfolio_pct: float = 0.02 + base_portfolio_pct: float = 0.01 # Maximum portfolio allocation percentage - max_portfolio_pct: float = 0.05 + max_portfolio_pct: float = 0.10 # Base max loss percentage - base_max_loss_pct: float = 0.005 + base_max_loss_pct: float = 0.003 # Maximum max loss percentage - max_max_loss_pct: float = 0.01 + max_max_loss_pct: float = 0.02 # Confidence scaling: higher confidence → larger position (linear) - confidence_sizing_weight: float = 0.5 + confidence_sizing_weight: float = 0.8 # Contradiction penalty: higher contradiction → smaller position - contradiction_sizing_penalty: float = 0.3 + contradiction_sizing_penalty: float = 0.5 DEFAULT_ELIGIBILITY_CONFIG = EligibilityConfig() @@ -216,30 +216,40 @@ def _compute_position_sizing( ) -> PositionSizing: """Compute position sizing guidance from portfolio rules and signal quality. - Higher confidence → larger allocation (up to max). + Higher confidence and trend strength → larger allocation (up to max). Higher contradiction → smaller allocation (penalty). + Low evidence count further reduces allocation. """ - # Start from base allocation - confidence_scale = config.base_portfolio_pct + ( - config.confidence_sizing_weight - * summary.confidence - * (config.max_portfolio_pct - config.base_portfolio_pct) - ) + # Confidence-based scaling over the full range + confidence_factor = config.confidence_sizing_weight * summary.confidence + # Trend strength multiplier — stronger trends justify larger positions + strength_factor = 0.5 + 0.5 * summary.trend_strength # range [0.5, 1.0] + + portfolio_range = config.max_portfolio_pct - config.base_portfolio_pct + raw_portfolio = config.base_portfolio_pct + confidence_factor * strength_factor * portfolio_range # Apply contradiction penalty contradiction_penalty = config.contradiction_sizing_penalty * summary.contradiction_score - portfolio_pct = confidence_scale * (1.0 - contradiction_penalty) + portfolio_pct = raw_portfolio * (1.0 - contradiction_penalty) + + # Evidence count penalty — fewer sources = less confidence in sizing + evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence) + if evidence_count < 3: + portfolio_pct *= 0.5 + elif evidence_count < 5: + portfolio_pct *= 0.75 # Clamp to bounds portfolio_pct = max(config.base_portfolio_pct * 0.5, min(portfolio_pct, config.max_portfolio_pct)) # Max loss scales similarly - loss_scale = config.base_max_loss_pct + ( - config.confidence_sizing_weight - * summary.confidence - * (config.max_max_loss_pct - config.base_max_loss_pct) - ) - max_loss_pct = loss_scale * (1.0 - contradiction_penalty) + loss_range = config.max_max_loss_pct - config.base_max_loss_pct + raw_loss = config.base_max_loss_pct + confidence_factor * strength_factor * loss_range + max_loss_pct = raw_loss * (1.0 - contradiction_penalty) + if evidence_count < 3: + max_loss_pct *= 0.5 + elif evidence_count < 5: + max_loss_pct *= 0.75 max_loss_pct = max(config.base_max_loss_pct * 0.5, min(max_loss_pct, config.max_max_loss_pct)) return PositionSizing( diff --git a/services/recommendation/worker.py b/services/recommendation/worker.py index 0a38df8..a4180c6 100644 --- a/services/recommendation/worker.py +++ b/services/recommendation/worker.py @@ -656,6 +656,64 @@ async def fetch_latest_recommendations( # --------------------------------------------------------------------------- +_DEDUP_CHECK_QUERY = """ +SELECT r.id, r.confidence, r.action, r.mode, r.generated_at +FROM recommendations r +WHERE r.ticker = $1 + AND r.time_horizon LIKE $2 || '%' +ORDER BY r.generated_at DESC +LIMIT 1 +""" + + +async def _is_duplicate_recommendation( + pool: asyncpg.Pool, + ticker: str, + summary: TrendSummary, + result: EligibilityResult, +) -> bool: + """Check if the latest recommendation for this ticker+window is effectively identical. + + Compares confidence, action, and mode. If they match, the underlying + trend data hasn't changed meaningfully and we skip regeneration. + """ + horizon_prefix = _map_time_horizon_prefix(summary.window.value) + row = await pool.fetchrow(_DEDUP_CHECK_QUERY, ticker, horizon_prefix) + if row is None: + return False + + # If the previous recommendation has the same action, mode, and confidence + # (within a small tolerance), it's a duplicate + prev_confidence = float(row["confidence"]) + prev_action = row["action"] + prev_mode = row["mode"] + + same_action = prev_action == result.action.value + same_mode = prev_mode == result.mode.value + same_confidence = abs(prev_confidence - summary.confidence) < 0.01 + + if same_action and same_mode and same_confidence: + logger.info( + "Skipping duplicate recommendation for %s — action=%s mode=%s " + "confidence=%.3f matches previous", + ticker, prev_action, prev_mode, prev_confidence, + ) + return True + return False + + +def _map_time_horizon_prefix(window: str) -> str: + """Map window to the time_horizon prefix for dedup matching.""" + mapping = { + "intraday": "intraday", + "1d": "swing_1d", + "7d": "swing_1d", + "30d": "position_10d", + "90d": "position_30d", + } + return mapping.get(window, "window_") + + async def generate_recommendation( pool: asyncpg.Pool, ticker: str, @@ -670,6 +728,7 @@ async def generate_recommendation( Steps: 1. Fetch the latest trend summary for the ticker + window. + 1b. Skip if the latest recommendation for this ticker is effectively identical. 2. Fetch the latest trend projection (Requirement 12.8, 12.9). 3. Evaluate data quality suppression (Requirement 7.4). 4. Evaluate eligibility using deterministic rules. @@ -678,7 +737,8 @@ async def generate_recommendation( rewritten into analyst-quality prose via the LLM wording layer. 6. Persist the recommendation and evidence citations. - Returns the Recommendation, or None if no trend data exists. + Returns the Recommendation, or None if no trend data exists or recommendation + is a duplicate of the most recent one. """ if reference_time is None: reference_time = datetime.now(timezone.utc) @@ -692,6 +752,11 @@ async def generate_recommendation( logger.info("No trend data for %s/%s — skipping recommendation", ticker, window) return None + # 1b. Check for duplicate: evaluate eligibility early for dedup comparison + preliminary_result = evaluate_eligibility(summary, cfg) + if await _is_duplicate_recommendation(pool, ticker, summary, preliminary_result): + return None + # 2. Fetch latest trend projection (Requirement 12.8, 12.9) projection = await fetch_latest_projection(pool, ticker, window) # Exclude low-confidence projections from influencing recommendation @@ -708,8 +773,8 @@ async def generate_recommendation( summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time, ) - # 4. Evaluate eligibility - result = evaluate_eligibility(summary, cfg) + # 4. Evaluate eligibility (use preliminary result, already computed for dedup) + result = preliminary_result # Apply suppression: force mode to informational if suppressed if suppression.suppressed: