"""Recommendation worker - generates explainable trade recommendations from trend data. Fetches the latest trend summaries for a ticker, evaluates eligibility using deterministic rules, builds Recommendation objects with thesis and evidence citations, and persists them to PostgreSQL. Requirements: 7.1, 7.2, 7.3, 7.4 """ from __future__ import annotations import json import logging from datetime import datetime, timezone import asyncpg from minio import Minio from services.lake_publisher.worker import publish_recommendation_facts from services.recommendation.eligibility import ( EligibilityConfig, EligibilityResult, evaluate_eligibility, ) from services.recommendation.suppression import ( DataQualityContext, SuppressionConfig, SuppressionResult, evaluate_suppression, ) from services.recommendation.thesis_llm import ( THESIS_PROMPT_VERSION, rewrite_thesis_with_llm, ) from services.shared.config import OllamaConfig from services.shared.metrics import ( RECOMMENDATION_CONFIDENCE, RECOMMENDATION_GENERATED, RECOMMENDATION_SUPPRESSED, ) from services.shared.schemas import ( ModelMetadata, PositionSizing, Recommendation, RecommendationMode, TrendDirection, TrendSummary, TrendWindow, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Fetch latest trend summary for a ticker + window # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Fetch data quality context for suppression checks # --------------------------------------------------------------------------- _DATA_QUALITY_QUERY = """ WITH latest_trend AS ( SELECT top_supporting_evidence, top_opposing_evidence FROM trend_windows WHERE entity_id = $1 AND "window" = $2 ORDER BY generated_at DESC LIMIT 1 ), evidence_ids AS ( SELECT jsonb_array_elements_text( COALESCE(lt.top_supporting_evidence, '[]'::jsonb) || COALESCE(lt.top_opposing_evidence, '[]'::jsonb) ) AS eid FROM latest_trend lt ) SELECT COUNT(*) AS total_documents, COUNT(*) FILTER (WHERE di.validation_status = 'valid') AS valid_documents, COUNT(*) FILTER (WHERE di.validation_status = 'failed') AS failed_documents, AVG(di.confidence) FILTER (WHERE di.validation_status = 'valid') AS avg_extraction_confidence, MAX(d.published_at) AS newest_evidence_at, ARRAY_AGG(DISTINCT d.source_type) FILTER (WHERE d.source_type IS NOT NULL) AS source_types FROM documents d JOIN document_intelligence di ON di.document_id = d.id WHERE d.id::text IN (SELECT eid FROM evidence_ids) """ async def fetch_data_quality_context( pool: asyncpg.Pool, ticker: str, window: str, ) -> DataQualityContext | None: """Fetch data quality metrics for the documents underlying a trend. Returns None if the query fails or returns no data, in which case the suppression module will fall back to summary-based estimation. """ try: row = await pool.fetchrow(_DATA_QUALITY_QUERY, ticker, window) if row is None or row["total_documents"] == 0: return None source_types_raw = row["source_types"] source_types = set(source_types_raw) if source_types_raw else set() return DataQualityContext( total_documents=int(row["total_documents"]), valid_documents=int(row["valid_documents"] or 0), failed_documents=int(row["failed_documents"] or 0), avg_extraction_confidence=float(row["avg_extraction_confidence"] or 0.0), newest_evidence_at=row["newest_evidence_at"], source_types=source_types, ) except Exception: logger.warning( "Failed to fetch data quality context for %s/%s — will use summary fallback", ticker, window, exc_info=True, ) return None _LATEST_TREND_QUERY = """ SELECT entity_type, entity_id, "window", trend_direction, trend_strength, confidence, top_supporting_evidence, top_opposing_evidence, dominant_catalysts, material_risks, contradiction_score, disagreement_details, market_context, generated_at FROM trend_windows WHERE entity_id = $1 AND "window" = $2 ORDER BY generated_at DESC LIMIT 1 """ def _parse_trend_row(row: asyncpg.Record) -> TrendSummary: """Convert a trend_windows row into a TrendSummary.""" supporting = row["top_supporting_evidence"] if isinstance(supporting, str): supporting = json.loads(supporting) opposing = row["top_opposing_evidence"] if isinstance(opposing, str): opposing = json.loads(opposing) catalysts = row["dominant_catalysts"] if isinstance(catalysts, str): catalysts = json.loads(catalysts) risks = row["material_risks"] if isinstance(risks, str): risks = json.loads(risks) return TrendSummary( entity_type=row["entity_type"], entity_id=row["entity_id"], window=TrendWindow(row["window"]), trend_direction=TrendDirection(row["trend_direction"]), trend_strength=float(row["trend_strength"]), confidence=float(row["confidence"]), top_supporting_evidence=supporting or [], top_opposing_evidence=opposing or [], dominant_catalysts=catalysts or [], material_risks=risks or [], contradiction_score=float(row["contradiction_score"] or 0.0), generated_at=row["generated_at"], ) async def fetch_latest_trend( pool: asyncpg.Pool, ticker: str, window: str, ) -> TrendSummary | None: """Fetch the most recent trend summary for a ticker and window.""" row = await pool.fetchrow(_LATEST_TREND_QUERY, ticker, window) if row is None: return None return _parse_trend_row(row) # --------------------------------------------------------------------------- # Build thesis from trend summary (deterministic, no LLM) # --------------------------------------------------------------------------- def build_thesis( summary: TrendSummary, result: EligibilityResult, ) -> str: """Generate a deterministic thesis string from trend data. This is the descriptive analysis portion (Requirement 7.2). The LLM wording layer is a separate optional task. """ direction = summary.trend_direction.value ticker = summary.entity_id window = summary.window.value strength = summary.trend_strength confidence = summary.confidence parts: list[str] = [] # Opening: direction and strength parts.append( f"{ticker} shows a {direction} trend over the {window} window " + f"with strength {strength:.2f} and confidence {confidence:.2f}." ) # Catalysts if summary.dominant_catalysts: catalyst_str = ", ".join(summary.dominant_catalysts[:3]) parts.append(f"Dominant catalysts: {catalyst_str}.") # Contradiction note (Requirement 7.2 — separate descriptive from prescriptive) if summary.contradiction_score > 0.15: parts.append( "Notable signal disagreement detected " + f"(contradiction score: {summary.contradiction_score:.2f})." ) # Risks if summary.material_risks: risk_str = "; ".join(summary.material_risks[:2]) parts.append(f"Key risks: {risk_str}.") # Evidence count supporting_count = len(summary.top_supporting_evidence) opposing_count = len(summary.top_opposing_evidence) parts.append( f"Based on {supporting_count} supporting and " + f"{opposing_count} opposing evidence documents." ) # Prescriptive action (separated per Requirement 7.2) action = result.action.value.upper() mode = result.mode.value.replace("_", " ") parts.append(f"Recommendation: {action} ({mode}).") return " ".join(parts) # --------------------------------------------------------------------------- # Build risk classification (Requirement 7.2) # --------------------------------------------------------------------------- def classify_risk( summary: TrendSummary, result: EligibilityResult, ) -> str: """Assign a risk classification label based on signal quality. Returns one of: low, moderate, high, very_high. """ score = 0.0 # Contradiction raises risk score += summary.contradiction_score * 2.0 # Low confidence raises risk score += (1.0 - summary.confidence) * 1.5 # Low evidence count raises risk evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence) if evidence_count < 3: score += 1.0 elif evidence_count < 5: score += 0.5 # Rejection reasons raise risk score += len(result.rejection_reasons) * 0.5 if score >= 3.0: return "very_high" if score >= 2.0: return "high" if score >= 1.0: return "moderate" return "low" # --------------------------------------------------------------------------- # Build Recommendation from eligibility result # --------------------------------------------------------------------------- def build_recommendation( summary: TrendSummary, result: EligibilityResult, reference_time: datetime | None = None, llm_thesis: str | None = None, suppression_result: SuppressionResult | None = None, ) -> Recommendation: """Assemble a Recommendation object from a trend summary and eligibility result. Combines all evidence refs (supporting + opposing) into the recommendation so the full decision trace is available (Requirement 8.3). If ``llm_thesis`` is provided (from the optional LLM wording layer), it replaces the deterministic thesis text while preserving the risk classification prefix. If ``suppression_result`` indicates suppression, a suppression note is appended to the thesis for audit visibility (Requirement 7.4). """ if reference_time is None: reference_time = datetime.now(timezone.utc) # Combine evidence refs — supporting first, then opposing evidence_refs = list(summary.top_supporting_evidence) + list(summary.top_opposing_evidence) deterministic_thesis = build_thesis(summary, result) risk_class = classify_risk(summary, result) # Use LLM-rewritten thesis if available, otherwise deterministic thesis_body = llm_thesis if llm_thesis else deterministic_thesis # Append suppression note if suppressed (Requirement 7.4) if suppression_result and suppression_result.suppressed: reason_strs = [r.value for r in suppression_result.reasons] thesis_body += ( f" [SUPPRESSED: data quality below threshold " f"(score={suppression_result.data_quality_score:.2f}, " f"reasons={', '.join(reason_strs)})]" ) # Track whether the thesis was LLM-generated for audit if llm_thesis: provider = "ollama" model_name = "thesis-rewrite" prompt_version = THESIS_PROMPT_VERSION else: provider = "deterministic" model_name = "eligibility-v1" prompt_version = "" return Recommendation( ticker=summary.entity_id, action=result.action, mode=result.mode, confidence=summary.confidence, time_horizon=result.time_horizon, thesis=f"[risk:{risk_class}] {thesis_body}", invalidation_conditions=result.invalidation_conditions, position_sizing=PositionSizing( portfolio_pct=result.position_sizing.portfolio_pct, max_loss_pct=result.position_sizing.max_loss_pct, ), evidence_refs=evidence_refs, model_metadata=ModelMetadata( provider=provider, model_name=model_name, prompt_version=prompt_version, schema_version="1.0.0", ), generated_at=reference_time, ) # --------------------------------------------------------------------------- # Persist recommendation to PostgreSQL # --------------------------------------------------------------------------- _INSERT_RECOMMENDATION = """ INSERT INTO recommendations ( ticker, action, mode, confidence, time_horizon, thesis, invalidation_conditions, portfolio_pct, max_loss_pct, model_version, model_provider, prompt_version, schema_version, risk_classification, generated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7::jsonb, $8, $9, $10, $11, $12, $13, $14, $15 ) RETURNING id """ _INSERT_REC_EVIDENCE = """ INSERT INTO recommendation_evidence ( recommendation_id, document_id, evidence_type, weight ) VALUES ($1, $2::uuid, $3, $4) """ _INSERT_RISK_EVALUATION = """ INSERT INTO risk_evaluations ( recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at ) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb, $6) """ _FETCH_RECOMMENDATION = """ SELECT id, ticker, action, mode, confidence, time_horizon, thesis, invalidation_conditions, portfolio_pct, max_loss_pct, model_version, model_provider, prompt_version, schema_version, risk_classification, generated_at FROM recommendations WHERE id = $1::uuid """ _FETCH_REC_EVIDENCE = """ SELECT document_id, evidence_type, weight FROM recommendation_evidence WHERE recommendation_id = $1::uuid ORDER BY evidence_type, weight DESC """ _FETCH_LATEST_RECS_FOR_TICKER = """ SELECT id, ticker, action, mode, confidence, time_horizon, thesis, invalidation_conditions, portfolio_pct, max_loss_pct, model_version, model_provider, prompt_version, schema_version, risk_classification, generated_at FROM recommendations WHERE ticker = $1 ORDER BY generated_at DESC LIMIT $2 """ def _extract_risk_classification(thesis: str) -> str: """Extract the risk classification from the thesis prefix.""" if thesis.startswith("[risk:"): end = thesis.find("]") if end > 6: return thesis[6:end] return "moderate" async def persist_recommendation( pool: asyncpg.Pool, rec: Recommendation, supporting_ids: list[str], opposing_ids: list[str], eligibility_result: EligibilityResult | None = None, ) -> str: """Insert a recommendation, evidence citations, and risk evaluation. Persists the full model metadata and risk classification for audit trail (Requirement 8.3). Also writes the eligibility decision to the risk_evaluations table when provided. Returns the recommendation UUID. """ risk_class = _extract_risk_classification(rec.thesis) row = await pool.fetchrow( _INSERT_RECOMMENDATION, rec.ticker, rec.action.value, rec.mode.value, rec.confidence, rec.time_horizon, rec.thesis, json.dumps(rec.invalidation_conditions), rec.position_sizing.portfolio_pct, rec.position_sizing.max_loss_pct, rec.model_metadata.model_name, rec.model_metadata.provider, rec.model_metadata.prompt_version, rec.model_metadata.schema_version, risk_class, rec.generated_at, ) rec_id = str(row["id"]) # Insert evidence citations with position-based weighting evidence_rows: list[tuple[str, str, str, float]] = [] for idx, doc_id in enumerate(supporting_ids): weight = round(1.0 / (1.0 + idx * 0.1), 4) # rank decay evidence_rows.append((rec_id, doc_id, "supporting", weight)) for idx, doc_id in enumerate(opposing_ids): weight = round(1.0 / (1.0 + idx * 0.1), 4) evidence_rows.append((rec_id, doc_id, "opposing", weight)) if evidence_rows: await pool.executemany(_INSERT_REC_EVIDENCE, evidence_rows) # Persist the eligibility/risk evaluation for audit trail if eligibility_result is not None: rejection_reasons_json = json.dumps( [r.value for r in eligibility_result.rejection_reasons] ) risk_checks = { "time_horizon": eligibility_result.time_horizon, "position_sizing": { "portfolio_pct": eligibility_result.position_sizing.portfolio_pct, "max_loss_pct": eligibility_result.position_sizing.max_loss_pct, }, "invalidation_conditions": eligibility_result.invalidation_conditions, "risk_classification": risk_class, } await pool.execute( _INSERT_RISK_EVALUATION, rec_id, eligibility_result.eligible, eligibility_result.mode.value, rejection_reasons_json, json.dumps(risk_checks), rec.generated_at, ) return rec_id async def fetch_recommendation_by_id( pool: asyncpg.Pool, recommendation_id: str, ) -> dict[str, object] | None: """Fetch a persisted recommendation with its evidence citations. Returns a dict with the recommendation fields and an 'evidence' list, or None if not found. """ row = await pool.fetchrow(_FETCH_RECOMMENDATION, recommendation_id) if row is None: return None rec_dict = dict(row) # Parse JSONB fields if isinstance(rec_dict.get("invalidation_conditions"), str): rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"]) # Fetch evidence evidence_rows = await pool.fetch(_FETCH_REC_EVIDENCE, recommendation_id) rec_dict["evidence"] = [ { "document_id": str(e["document_id"]), "evidence_type": e["evidence_type"], "weight": float(e["weight"]), } for e in evidence_rows ] return rec_dict async def fetch_latest_recommendations( pool: asyncpg.Pool, ticker: str, limit: int = 10, ) -> list[dict[str, object]]: """Fetch the most recent recommendations for a ticker. Returns a list of recommendation dicts (without evidence — use fetch_recommendation_by_id for full detail). """ rows = await pool.fetch(_FETCH_LATEST_RECS_FOR_TICKER, ticker, limit) results = [] for row in rows: rec_dict = dict(row) if isinstance(rec_dict.get("invalidation_conditions"), str): rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"]) results.append(rec_dict) return results # --------------------------------------------------------------------------- # Main entry point: generate recommendation for a ticker # --------------------------------------------------------------------------- async def generate_recommendation( pool: asyncpg.Pool, ticker: str, window: str = TrendWindow.SEVEN_DAY.value, config: EligibilityConfig | None = None, reference_time: datetime | None = None, ollama_config: OllamaConfig | None = None, suppression_config: SuppressionConfig | None = None, minio_client: Minio | None = None, ) -> Recommendation | None: """Generate and persist a recommendation for a ticker from its latest trend. Steps: 1. Fetch the latest trend summary for the ticker + window. 2. Evaluate data quality suppression (Requirement 7.4). 3. Evaluate eligibility using deterministic rules. 4. Build a Recommendation object with thesis and evidence. - If ``ollama_config`` is provided, the deterministic thesis is rewritten into analyst-quality prose via the LLM wording layer. 5. Persist the recommendation and evidence citations. Returns the Recommendation, or None if no trend data exists. """ if reference_time is None: reference_time = datetime.now(timezone.utc) cfg = config or EligibilityConfig() sup_cfg = suppression_config or SuppressionConfig() # 1. Fetch latest trend summary = await fetch_latest_trend(pool, ticker, window) if summary is None: logger.info("No trend data for %s/%s — skipping recommendation", ticker, window) return None # 2. Evaluate data quality suppression (Requirement 7.4) quality_ctx = await fetch_data_quality_context(pool, ticker, window) suppression = evaluate_suppression( summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time, ) # 3. Evaluate eligibility result = evaluate_eligibility(summary, cfg) # Apply suppression: force mode to informational if suppressed if suppression.suppressed: result = EligibilityResult( eligible=False, action=result.action, mode=RecommendationMode.INFORMATIONAL, position_sizing=result.position_sizing, rejection_reasons=result.rejection_reasons, time_horizon=result.time_horizon, invalidation_conditions=result.invalidation_conditions, ) # 4. Optional LLM thesis rewrite llm_thesis: str | None = None if ollama_config is not None: deterministic_thesis = build_thesis(summary, result) llm_thesis = await rewrite_thesis_with_llm( deterministic_thesis=deterministic_thesis, summary=summary, config=ollama_config, ) # If the LLM returned the same text as the deterministic thesis, # treat it as a no-op (fallback was used). if llm_thesis == deterministic_thesis: llm_thesis = None # 5. Build recommendation rec = build_recommendation( summary, result, reference_time, llm_thesis=llm_thesis, suppression_result=suppression, ) # 6. Persist recommendation, evidence citations, and risk evaluation rec_id = await persist_recommendation( pool, rec, supporting_ids=list(summary.top_supporting_evidence), opposing_ids=list(summary.top_opposing_evidence), eligibility_result=result, ) # 7. Publish prediction facts to analytical tables (Requirement 9.4) if minio_client is not None: try: lake_refs = publish_recommendation_facts( minio_client, rec, trend_direction=summary.trend_direction.value, trend_strength=summary.trend_strength, ) logger.info( "Published analytical facts for %s: %s", ticker, lake_refs, ) except Exception: logger.warning( "Failed to publish analytical facts for %s/%s — recommendation " "persisted but lake publication failed", ticker, rec_id, exc_info=True, ) logger.info( "Generated recommendation %s for %s: action=%s mode=%s confidence=%.3f " "eligible=%s suppressed=%s quality_score=%.3f llm_thesis=%s", rec_id, ticker, rec.action.value, rec.mode.value, rec.confidence, result.eligible, suppression.suppressed, suppression.data_quality_score, llm_thesis is not None, ) # Prometheus metrics RECOMMENDATION_GENERATED.labels(action=rec.action.value, mode=rec.mode.value).inc() RECOMMENDATION_CONFIDENCE.observe(rec.confidence) if suppression.suppressed: RECOMMENDATION_SUPPRESSED.inc() return rec # --------------------------------------------------------------------------- # Batch: generate recommendations for multiple tickers # --------------------------------------------------------------------------- async def generate_recommendations_batch( pool: asyncpg.Pool, tickers: list[str], window: str = TrendWindow.SEVEN_DAY.value, config: EligibilityConfig | None = None, ollama_config: OllamaConfig | None = None, suppression_config: SuppressionConfig | None = None, minio_client: Minio | None = None, ) -> list[Recommendation]: """Generate recommendations for a list of tickers. Processes each ticker sequentially. Returns only the successfully generated recommendations (tickers with no trend data are skipped). If ``ollama_config`` is provided, each recommendation's thesis will be rewritten using the LLM wording layer. """ results: list[Recommendation] = [] reference_time = datetime.now(timezone.utc) for ticker in tickers: rec = await generate_recommendation( pool, ticker, window, config, reference_time, ollama_config=ollama_config, suppression_config=suppression_config, minio_client=minio_client, ) if rec is not None: results.append(rec) logger.info( "Batch recommendation: %d/%d tickers produced recommendations", len(results), len(tickers), ) return results