"""Prediction Snapshot Writer — captures immutable prediction state at generation time. Creates frozen records of every recommendation with prices, evidence links, duplicate detection, and contribution scores so that predictions can be evaluated against future outcomes without hindsight bias. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 3.2, 3.3, 3.4 """ from __future__ import annotations import hashlib import json import logging import urllib.parse import uuid from dataclasses import dataclass, field from datetime import datetime import asyncpg from services.shared.schemas import Recommendation, TrendSummary logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- SECTOR_ETF_MAP: dict[str, str] = { "Technology": "XLK", "Consumer Cyclical": "XLY", "Financial Services": "XLF", "Healthcare": "XLV", "Energy": "XLE", "Communication Services": "XLC", "Industrials": "XLI", "Consumer Defensive": "XLP", "Real Estate": "XLRE", "Utilities": "XLU", } EVALUATION_HORIZONS: list[str] = ["1h", "6h", "1d", "7d", "30d"] MAX_SINGLE_DOCUMENT_WEIGHT: float = 1.0 # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class PredictionSnapshot: """Immutable snapshot of a prediction at generation time.""" id: str # UUID generated_at: datetime ticker: str window: str horizon: str direction: str # bullish/bearish/mixed/neutral action: str # buy/sell/hold/watch mode: str # informational/paper_eligible/live_eligible strength: float confidence: float contradiction: float p_bull: float | None p_bear: float | None score_company: float score_macro: float score_competitive: float evidence_count: int unique_source_count: int duplicate_evidence_count: int price_at_prediction: float | None spy_price_at_prediction: float | None sector_etf_price_at_prediction: float | None metadata: dict = field(default_factory=dict) @dataclass class SignalEvidenceLink: """Link between a prediction and a contributing evidence document.""" id: str # UUID prediction_id: str document_id: str signal_id: str ticker: str source: str source_type: str catalyst_type: str sentiment: str impact: float extraction_confidence: float weight: float # clamped to MAX_SINGLE_DOCUMENT_WEIGHT is_duplicate: bool canonical_evidence_key: str contribution_score: float # weight / total_weight, sums to 1.0 metadata: dict = field(default_factory=dict) # --------------------------------------------------------------------------- # Canonical evidence key computation (Requirements 2.3, 17.4) # --------------------------------------------------------------------------- def compute_canonical_evidence_key(title: str, url: str) -> str: """SHA256 of normalized(title) + normalized(url). Normalization: - Title: lowercase, strip leading/trailing whitespace - URL: lowercase, strip query parameters (keep scheme, netloc, path) """ normalized_title = title.strip().lower() parsed = urllib.parse.urlparse(url.lower()) normalized_url = urllib.parse.urlunparse( (parsed.scheme, parsed.netloc, parsed.path, "", "", "") ) combined = normalized_title + normalized_url return hashlib.sha256(combined.encode("utf-8")).hexdigest() # --------------------------------------------------------------------------- # Contribution score computation (Requirements 2.5, 17.7) # --------------------------------------------------------------------------- def compute_contribution_scores(weights: list[float]) -> list[float]: """Compute contribution scores: each score = weight_i / sum(weights). All scores are in [0.0, 1.0] and sum to 1.0 (within floating-point tolerance). Returns an empty list for empty input. """ if not weights: return [] total = sum(weights) if total == 0.0: # All weights are zero — distribute equally n = len(weights) return [1.0 / n] * n return [w / total for w in weights] # --------------------------------------------------------------------------- # Price fetching (Requirements 1.2, 1.3, 1.4, 1.5) # --------------------------------------------------------------------------- _LATEST_CLOSE_SQL = """ SELECT (data->>'c')::float AS close FROM market_snapshots WHERE ticker = $1 AND snapshot_type = 'bar' AND data->>'c' IS NOT NULL ORDER BY captured_at DESC LIMIT 1 """ async def fetch_latest_close_price( pool: asyncpg.Pool, ticker: str, ) -> float | None: """Fetch most recent close price from market_snapshots for a ticker. Returns None if no market data is available for the ticker. """ row = await pool.fetchrow(_LATEST_CLOSE_SQL, ticker) if row is None: return None return row["close"] # --------------------------------------------------------------------------- # Sector ETF lookup # --------------------------------------------------------------------------- _COMPANY_SECTOR_SQL = """ SELECT sector FROM companies WHERE ticker = $1 AND active = TRUE LIMIT 1 """ async def _fetch_sector_etf_ticker(pool: asyncpg.Pool, ticker: str) -> str | None: """Look up the sector ETF ticker for a company ticker.""" row = await pool.fetchrow(_COMPANY_SECTOR_SQL, ticker) if row is None or row["sector"] is None: return None return SECTOR_ETF_MAP.get(row["sector"]) # --------------------------------------------------------------------------- # Layer score computation # --------------------------------------------------------------------------- def _compute_layer_scores( evidence_signals: list[dict], ) -> tuple[float, float, float]: """Compute company, macro, and competitive layer scores from evidence signals. Each signal's source_type determines its layer: - company: news_api, filings_api, web_scrape - macro: macro events (source_type containing 'macro') - competitive: competitive signals (source_type containing 'competitive' or 'pattern') Returns (score_company, score_macro, score_competitive) as fractions summing to 1.0. """ company_weight = 0.0 macro_weight = 0.0 competitive_weight = 0.0 for sig in evidence_signals: w = sig.get("weight", 0.0) source_type = sig.get("source_type", "").lower() catalyst_type = sig.get("catalyst_type", "").lower() if "macro" in source_type or catalyst_type == "macro": macro_weight += w elif "competitive" in source_type or "pattern" in source_type: competitive_weight += w else: company_weight += w total = company_weight + macro_weight + competitive_weight if total == 0.0: return (0.0, 0.0, 0.0) return ( round(company_weight / total, 6), round(macro_weight / total, 6), round(competitive_weight / total, 6), ) # --------------------------------------------------------------------------- # SQL statements # --------------------------------------------------------------------------- _INSERT_SNAPSHOT_SQL = """ INSERT INTO prediction_snapshots ( id, generated_at, ticker, "window", horizon, direction, action, mode, strength, confidence, contradiction, p_bull, p_bear, score_company, score_macro, score_competitive, evidence_count, unique_source_count, duplicate_evidence_count, price_at_prediction, spy_price_at_prediction, sector_etf_price_at_prediction, metadata ) VALUES ( $1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23::jsonb ) """ _INSERT_EVIDENCE_LINK_SQL = """ INSERT INTO signal_evidence_links ( id, prediction_id, document_id, signal_id, ticker, source, source_type, catalyst_type, sentiment, impact, extraction_confidence, weight, is_duplicate, canonical_evidence_key, contribution_score, metadata ) VALUES ( $1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16::jsonb ) """ # --------------------------------------------------------------------------- # Main entry point (Requirements 1.1–1.7, 2.1–2.6, 3.1–3.4) # --------------------------------------------------------------------------- async def create_prediction_snapshot( pool: asyncpg.Pool, recommendation: Recommendation, trend_summary: TrendSummary, evidence_signals: list[dict], evidence_docs: list[dict], ) -> PredictionSnapshot: """Create and persist a prediction snapshot with evidence links. Steps: 1. Fetch current prices (ticker, SPY, sector ETF) from market_snapshots 2. Compute canonical evidence keys and detect duplicates 3. Clamp individual document weights to MAX_SINGLE_DOCUMENT_WEIGHT 4. Compute contribution scores (one-vote-per-canonical-key dedup) 5. Persist snapshot and evidence links in a transaction Args: pool: asyncpg connection pool. recommendation: The generated Recommendation object. trend_summary: The TrendSummary used to generate the recommendation. evidence_signals: List of dicts with signal fields (source, source_type, catalyst_type, sentiment, impact, extraction_confidence, weight, document_id, signal_id, ticker). evidence_docs: List of dicts with document metadata (title, url, document_id). Returns: The persisted PredictionSnapshot. """ ticker = recommendation.ticker # 1. Fetch prices — handle NULL gracefully (Requirement 1.5) ticker_price = await fetch_latest_close_price(pool, ticker) if ticker_price is None: logger.warning("No market price available for %s at snapshot time", ticker) spy_price = await fetch_latest_close_price(pool, "SPY") if spy_price is None: logger.warning("No SPY price available at snapshot time") sector_etf_ticker = await _fetch_sector_etf_ticker(pool, ticker) sector_etf_price: float | None = None if sector_etf_ticker is not None: sector_etf_price = await fetch_latest_close_price(pool, sector_etf_ticker) if sector_etf_price is None: logger.warning( "No sector ETF price available for %s (%s) at snapshot time", sector_etf_ticker, ticker, ) else: logger.warning("No sector ETF mapping found for ticker %s", ticker) # 2. Build a doc lookup for canonical key computation doc_lookup: dict[str, dict] = {} for doc in evidence_docs: doc_id = doc.get("document_id", "") doc_lookup[doc_id] = doc # 3. Process evidence signals: compute canonical keys, detect duplicates, # clamp weights processed_links: list[dict] = [] seen_canonical_keys: dict[str, int] = {} # canonical_key -> first index for sig in evidence_signals: doc_id = sig.get("document_id", "") doc_meta = doc_lookup.get(doc_id, {}) title = doc_meta.get("title", "") url = doc_meta.get("url", "") canonical_key = compute_canonical_evidence_key(title, url) # Detect duplicates: same canonical key for same ticker is_duplicate = canonical_key in seen_canonical_keys if not is_duplicate: seen_canonical_keys[canonical_key] = len(processed_links) # Clamp weight to MAX_SINGLE_DOCUMENT_WEIGHT (Requirement 3.3) raw_weight = sig.get("weight", 0.0) clamped_weight = min(raw_weight, MAX_SINGLE_DOCUMENT_WEIGHT) processed_links.append({ "id": str(uuid.uuid4()), "document_id": doc_id, "signal_id": sig.get("signal_id", ""), "ticker": sig.get("ticker", ticker), "source": sig.get("source", ""), "source_type": sig.get("source_type", ""), "catalyst_type": sig.get("catalyst_type", ""), "sentiment": sig.get("sentiment", ""), "impact": sig.get("impact", 0.0), "extraction_confidence": sig.get("extraction_confidence", 0.0), "weight": clamped_weight, "is_duplicate": is_duplicate, "canonical_evidence_key": canonical_key, }) # 4. Compute contribution scores — one vote per canonical key (Requirement 3.4) # Only non-duplicate links contribute to the weight pool non_dup_weights = [ link["weight"] for link in processed_links if not link["is_duplicate"] ] non_dup_scores = compute_contribution_scores(non_dup_weights) # Assign contribution scores: non-duplicates get their computed score, # duplicates get 0.0 score_idx = 0 for link in processed_links: if not link["is_duplicate"]: link["contribution_score"] = non_dup_scores[score_idx] score_idx += 1 else: link["contribution_score"] = 0.0 # 5. Compute deduplication quality metrics (Requirements 3.1, 3.2) unique_sources = { link["source"] for link in processed_links if not link["is_duplicate"] } unique_source_count = len(unique_sources) duplicate_evidence_count = sum( 1 for link in processed_links if link["is_duplicate"] ) # 6. Compute layer scores from evidence signals score_company, score_macro, score_competitive = _compute_layer_scores( evidence_signals ) # 7. Build metadata from trend summary context (Requirement 1.7) metadata: dict = {} if trend_summary.market_context is not None: metadata["market_context"] = { "ticker": trend_summary.market_context.ticker, "price_change_pct": trend_summary.market_context.price_change_pct, "avg_volume": trend_summary.market_context.avg_volume, "volume_change_pct": trend_summary.market_context.volume_change_pct, "volatility": trend_summary.market_context.volatility, "latest_close": trend_summary.market_context.latest_close, "bars_available": trend_summary.market_context.bars_available, } if sector_etf_ticker is not None: metadata["sector_etf_ticker"] = sector_etf_ticker # 8. Build the snapshot snapshot_id = str(uuid.uuid4()) snapshot = PredictionSnapshot( id=snapshot_id, generated_at=recommendation.generated_at, ticker=ticker, window=trend_summary.window.value, horizon=recommendation.time_horizon, direction=trend_summary.trend_direction.value, action=recommendation.action.value, mode=recommendation.mode.value, strength=trend_summary.trend_strength, confidence=recommendation.confidence, contradiction=trend_summary.contradiction_score, p_bull=trend_summary.p_bull, p_bear=1.0 - trend_summary.p_bull if trend_summary.p_bull is not None else None, score_company=score_company, score_macro=score_macro, score_competitive=score_competitive, evidence_count=len(processed_links), unique_source_count=unique_source_count, duplicate_evidence_count=duplicate_evidence_count, price_at_prediction=ticker_price, spy_price_at_prediction=spy_price, sector_etf_price_at_prediction=sector_etf_price, metadata=metadata, ) # 9. Build evidence link objects evidence_link_objects: list[SignalEvidenceLink] = [] for link in processed_links: evidence_link_objects.append( SignalEvidenceLink( id=link["id"], prediction_id=snapshot_id, document_id=link["document_id"], signal_id=link["signal_id"], ticker=link["ticker"], source=link["source"], source_type=link["source_type"], catalyst_type=link["catalyst_type"], sentiment=link["sentiment"], impact=link["impact"], extraction_confidence=link["extraction_confidence"], weight=link["weight"], is_duplicate=link["is_duplicate"], canonical_evidence_key=link["canonical_evidence_key"], contribution_score=link["contribution_score"], ) ) # 10. Persist in a transaction (Requirements 1.6, 2.6) async with pool.acquire() as conn: async with conn.transaction(): await conn.execute( _INSERT_SNAPSHOT_SQL, snapshot.id, snapshot.generated_at, snapshot.ticker, snapshot.window, snapshot.horizon, snapshot.direction, snapshot.action, snapshot.mode, snapshot.strength, snapshot.confidence, snapshot.contradiction, snapshot.p_bull, snapshot.p_bear, snapshot.score_company, snapshot.score_macro, snapshot.score_competitive, snapshot.evidence_count, snapshot.unique_source_count, snapshot.duplicate_evidence_count, snapshot.price_at_prediction, snapshot.spy_price_at_prediction, snapshot.sector_etf_price_at_prediction, json.dumps(snapshot.metadata), ) for link in evidence_link_objects: await conn.execute( _INSERT_EVIDENCE_LINK_SQL, link.id, link.prediction_id, link.document_id, link.signal_id, link.ticker, link.source, link.source_type, link.catalyst_type, link.sentiment, link.impact, link.extraction_confidence, link.weight, link.is_duplicate, link.canonical_evidence_key, link.contribution_score, json.dumps(link.metadata), ) logger.info( "Created prediction snapshot %s for %s: %d evidence links " "(%d unique sources, %d duplicates), prices: ticker=%s spy=%s sector_etf=%s", snapshot_id, ticker, len(evidence_link_objects), unique_source_count, duplicate_evidence_count, ticker_price, spy_price, sector_etf_price, ) return snapshot