Files
Celes Renata 76f6bd5677
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
fix: quote reserved keyword 'window' in prediction_snapshots SQL
2026-05-01 03:40:48 +00:00

541 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Prediction Snapshot Writer — captures immutable prediction state at generation time.
Creates frozen records of every recommendation with prices, evidence links,
duplicate detection, and contribution scores so that predictions can be
evaluated against future outcomes without hindsight bias.
Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 3.2, 3.3, 3.4
"""
from __future__ import annotations
import hashlib
import json
import logging
import urllib.parse
import uuid
from dataclasses import dataclass, field
from datetime import datetime
import asyncpg
from services.shared.schemas import Recommendation, TrendSummary
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SECTOR_ETF_MAP: dict[str, str] = {
"Technology": "XLK",
"Consumer Cyclical": "XLY",
"Financial Services": "XLF",
"Healthcare": "XLV",
"Energy": "XLE",
"Communication Services": "XLC",
"Industrials": "XLI",
"Consumer Defensive": "XLP",
"Real Estate": "XLRE",
"Utilities": "XLU",
}
EVALUATION_HORIZONS: list[str] = ["1h", "6h", "1d", "7d", "30d"]
MAX_SINGLE_DOCUMENT_WEIGHT: float = 1.0
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class PredictionSnapshot:
"""Immutable snapshot of a prediction at generation time."""
id: str # UUID
generated_at: datetime
ticker: str
window: str
horizon: str
direction: str # bullish/bearish/mixed/neutral
action: str # buy/sell/hold/watch
mode: str # informational/paper_eligible/live_eligible
strength: float
confidence: float
contradiction: float
p_bull: float | None
p_bear: float | None
score_company: float
score_macro: float
score_competitive: float
evidence_count: int
unique_source_count: int
duplicate_evidence_count: int
price_at_prediction: float | None
spy_price_at_prediction: float | None
sector_etf_price_at_prediction: float | None
metadata: dict = field(default_factory=dict)
@dataclass
class SignalEvidenceLink:
"""Link between a prediction and a contributing evidence document."""
id: str # UUID
prediction_id: str
document_id: str
signal_id: str
ticker: str
source: str
source_type: str
catalyst_type: str
sentiment: str
impact: float
extraction_confidence: float
weight: float # clamped to MAX_SINGLE_DOCUMENT_WEIGHT
is_duplicate: bool
canonical_evidence_key: str
contribution_score: float # weight / total_weight, sums to 1.0
metadata: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Canonical evidence key computation (Requirements 2.3, 17.4)
# ---------------------------------------------------------------------------
def compute_canonical_evidence_key(title: str, url: str) -> str:
"""SHA256 of normalized(title) + normalized(url).
Normalization:
- Title: lowercase, strip leading/trailing whitespace
- URL: lowercase, strip query parameters (keep scheme, netloc, path)
"""
normalized_title = title.strip().lower()
parsed = urllib.parse.urlparse(url.lower())
normalized_url = urllib.parse.urlunparse(
(parsed.scheme, parsed.netloc, parsed.path, "", "", "")
)
combined = normalized_title + normalized_url
return hashlib.sha256(combined.encode("utf-8")).hexdigest()
# ---------------------------------------------------------------------------
# Contribution score computation (Requirements 2.5, 17.7)
# ---------------------------------------------------------------------------
def compute_contribution_scores(weights: list[float]) -> list[float]:
"""Compute contribution scores: each score = weight_i / sum(weights).
All scores are in [0.0, 1.0] and sum to 1.0 (within floating-point tolerance).
Returns an empty list for empty input.
"""
if not weights:
return []
total = sum(weights)
if total == 0.0:
# All weights are zero — distribute equally
n = len(weights)
return [1.0 / n] * n
return [w / total for w in weights]
# ---------------------------------------------------------------------------
# Price fetching (Requirements 1.2, 1.3, 1.4, 1.5)
# ---------------------------------------------------------------------------
_LATEST_CLOSE_SQL = """
SELECT (data->>'c')::float AS close
FROM market_snapshots
WHERE ticker = $1 AND snapshot_type = 'bar' AND data->>'c' IS NOT NULL
ORDER BY captured_at DESC
LIMIT 1
"""
async def fetch_latest_close_price(
pool: asyncpg.Pool,
ticker: str,
) -> float | None:
"""Fetch most recent close price from market_snapshots for a ticker.
Returns None if no market data is available for the ticker.
"""
row = await pool.fetchrow(_LATEST_CLOSE_SQL, ticker)
if row is None:
return None
return row["close"]
# ---------------------------------------------------------------------------
# Sector ETF lookup
# ---------------------------------------------------------------------------
_COMPANY_SECTOR_SQL = """
SELECT sector FROM companies WHERE ticker = $1 AND active = TRUE LIMIT 1
"""
async def _fetch_sector_etf_ticker(pool: asyncpg.Pool, ticker: str) -> str | None:
"""Look up the sector ETF ticker for a company ticker."""
row = await pool.fetchrow(_COMPANY_SECTOR_SQL, ticker)
if row is None or row["sector"] is None:
return None
return SECTOR_ETF_MAP.get(row["sector"])
# ---------------------------------------------------------------------------
# Layer score computation
# ---------------------------------------------------------------------------
def _compute_layer_scores(
evidence_signals: list[dict],
) -> tuple[float, float, float]:
"""Compute company, macro, and competitive layer scores from evidence signals.
Each signal's source_type determines its layer:
- company: news_api, filings_api, web_scrape
- macro: macro events (source_type containing 'macro')
- competitive: competitive signals (source_type containing 'competitive' or 'pattern')
Returns (score_company, score_macro, score_competitive) as fractions summing to 1.0.
"""
company_weight = 0.0
macro_weight = 0.0
competitive_weight = 0.0
for sig in evidence_signals:
w = sig.get("weight", 0.0)
source_type = sig.get("source_type", "").lower()
catalyst_type = sig.get("catalyst_type", "").lower()
if "macro" in source_type or catalyst_type == "macro":
macro_weight += w
elif "competitive" in source_type or "pattern" in source_type:
competitive_weight += w
else:
company_weight += w
total = company_weight + macro_weight + competitive_weight
if total == 0.0:
return (0.0, 0.0, 0.0)
return (
round(company_weight / total, 6),
round(macro_weight / total, 6),
round(competitive_weight / total, 6),
)
# ---------------------------------------------------------------------------
# SQL statements
# ---------------------------------------------------------------------------
_INSERT_SNAPSHOT_SQL = """
INSERT INTO prediction_snapshots (
id, generated_at, ticker, "window", horizon, direction, action, mode,
strength, confidence, contradiction, p_bull, p_bear,
score_company, score_macro, score_competitive,
evidence_count, unique_source_count, duplicate_evidence_count,
price_at_prediction, spy_price_at_prediction, sector_etf_price_at_prediction,
metadata
) VALUES (
$1::uuid, $2, $3, $4, $5, $6, $7, $8,
$9, $10, $11, $12, $13,
$14, $15, $16,
$17, $18, $19,
$20, $21, $22,
$23::jsonb
)
"""
_INSERT_EVIDENCE_LINK_SQL = """
INSERT INTO signal_evidence_links (
id, prediction_id, document_id, signal_id, ticker,
source, source_type, catalyst_type, sentiment,
impact, extraction_confidence, weight,
is_duplicate, canonical_evidence_key, contribution_score,
metadata
) VALUES (
$1::uuid, $2::uuid, $3, $4, $5,
$6, $7, $8, $9,
$10, $11, $12,
$13, $14, $15,
$16::jsonb
)
"""
# ---------------------------------------------------------------------------
# Main entry point (Requirements 1.11.7, 2.12.6, 3.13.4)
# ---------------------------------------------------------------------------
async def create_prediction_snapshot(
pool: asyncpg.Pool,
recommendation: Recommendation,
trend_summary: TrendSummary,
evidence_signals: list[dict],
evidence_docs: list[dict],
) -> PredictionSnapshot:
"""Create and persist a prediction snapshot with evidence links.
Steps:
1. Fetch current prices (ticker, SPY, sector ETF) from market_snapshots
2. Compute canonical evidence keys and detect duplicates
3. Clamp individual document weights to MAX_SINGLE_DOCUMENT_WEIGHT
4. Compute contribution scores (one-vote-per-canonical-key dedup)
5. Persist snapshot and evidence links in a transaction
Args:
pool: asyncpg connection pool.
recommendation: The generated Recommendation object.
trend_summary: The TrendSummary used to generate the recommendation.
evidence_signals: List of dicts with signal fields (source, source_type,
catalyst_type, sentiment, impact, extraction_confidence, weight,
document_id, signal_id, ticker).
evidence_docs: List of dicts with document metadata (title, url, document_id).
Returns:
The persisted PredictionSnapshot.
"""
ticker = recommendation.ticker
# 1. Fetch prices — handle NULL gracefully (Requirement 1.5)
ticker_price = await fetch_latest_close_price(pool, ticker)
if ticker_price is None:
logger.warning("No market price available for %s at snapshot time", ticker)
spy_price = await fetch_latest_close_price(pool, "SPY")
if spy_price is None:
logger.warning("No SPY price available at snapshot time")
sector_etf_ticker = await _fetch_sector_etf_ticker(pool, ticker)
sector_etf_price: float | None = None
if sector_etf_ticker is not None:
sector_etf_price = await fetch_latest_close_price(pool, sector_etf_ticker)
if sector_etf_price is None:
logger.warning(
"No sector ETF price available for %s (%s) at snapshot time",
sector_etf_ticker,
ticker,
)
else:
logger.warning("No sector ETF mapping found for ticker %s", ticker)
# 2. Build a doc lookup for canonical key computation
doc_lookup: dict[str, dict] = {}
for doc in evidence_docs:
doc_id = doc.get("document_id", "")
doc_lookup[doc_id] = doc
# 3. Process evidence signals: compute canonical keys, detect duplicates,
# clamp weights
processed_links: list[dict] = []
seen_canonical_keys: dict[str, int] = {} # canonical_key -> first index
for sig in evidence_signals:
doc_id = sig.get("document_id", "")
doc_meta = doc_lookup.get(doc_id, {})
title = doc_meta.get("title", "")
url = doc_meta.get("url", "")
canonical_key = compute_canonical_evidence_key(title, url)
# Detect duplicates: same canonical key for same ticker
is_duplicate = canonical_key in seen_canonical_keys
if not is_duplicate:
seen_canonical_keys[canonical_key] = len(processed_links)
# Clamp weight to MAX_SINGLE_DOCUMENT_WEIGHT (Requirement 3.3)
raw_weight = sig.get("weight", 0.0)
clamped_weight = min(raw_weight, MAX_SINGLE_DOCUMENT_WEIGHT)
processed_links.append({
"id": str(uuid.uuid4()),
"document_id": doc_id,
"signal_id": sig.get("signal_id", ""),
"ticker": sig.get("ticker", ticker),
"source": sig.get("source", ""),
"source_type": sig.get("source_type", ""),
"catalyst_type": sig.get("catalyst_type", ""),
"sentiment": sig.get("sentiment", ""),
"impact": sig.get("impact", 0.0),
"extraction_confidence": sig.get("extraction_confidence", 0.0),
"weight": clamped_weight,
"is_duplicate": is_duplicate,
"canonical_evidence_key": canonical_key,
})
# 4. Compute contribution scores — one vote per canonical key (Requirement 3.4)
# Only non-duplicate links contribute to the weight pool
non_dup_weights = [
link["weight"] for link in processed_links if not link["is_duplicate"]
]
non_dup_scores = compute_contribution_scores(non_dup_weights)
# Assign contribution scores: non-duplicates get their computed score,
# duplicates get 0.0
score_idx = 0
for link in processed_links:
if not link["is_duplicate"]:
link["contribution_score"] = non_dup_scores[score_idx]
score_idx += 1
else:
link["contribution_score"] = 0.0
# 5. Compute deduplication quality metrics (Requirements 3.1, 3.2)
unique_sources = {
link["source"]
for link in processed_links
if not link["is_duplicate"]
}
unique_source_count = len(unique_sources)
duplicate_evidence_count = sum(
1 for link in processed_links if link["is_duplicate"]
)
# 6. Compute layer scores from evidence signals
score_company, score_macro, score_competitive = _compute_layer_scores(
evidence_signals
)
# 7. Build metadata from trend summary context (Requirement 1.7)
metadata: dict = {}
if trend_summary.market_context is not None:
metadata["market_context"] = {
"ticker": trend_summary.market_context.ticker,
"price_change_pct": trend_summary.market_context.price_change_pct,
"avg_volume": trend_summary.market_context.avg_volume,
"volume_change_pct": trend_summary.market_context.volume_change_pct,
"volatility": trend_summary.market_context.volatility,
"latest_close": trend_summary.market_context.latest_close,
"bars_available": trend_summary.market_context.bars_available,
}
if sector_etf_ticker is not None:
metadata["sector_etf_ticker"] = sector_etf_ticker
# 8. Build the snapshot
snapshot_id = str(uuid.uuid4())
snapshot = PredictionSnapshot(
id=snapshot_id,
generated_at=recommendation.generated_at,
ticker=ticker,
window=trend_summary.window.value,
horizon=recommendation.time_horizon,
direction=trend_summary.trend_direction.value,
action=recommendation.action.value,
mode=recommendation.mode.value,
strength=trend_summary.trend_strength,
confidence=recommendation.confidence,
contradiction=trend_summary.contradiction_score,
p_bull=trend_summary.p_bull,
p_bear=1.0 - trend_summary.p_bull if trend_summary.p_bull is not None else None,
score_company=score_company,
score_macro=score_macro,
score_competitive=score_competitive,
evidence_count=len(processed_links),
unique_source_count=unique_source_count,
duplicate_evidence_count=duplicate_evidence_count,
price_at_prediction=ticker_price,
spy_price_at_prediction=spy_price,
sector_etf_price_at_prediction=sector_etf_price,
metadata=metadata,
)
# 9. Build evidence link objects
evidence_link_objects: list[SignalEvidenceLink] = []
for link in processed_links:
evidence_link_objects.append(
SignalEvidenceLink(
id=link["id"],
prediction_id=snapshot_id,
document_id=link["document_id"],
signal_id=link["signal_id"],
ticker=link["ticker"],
source=link["source"],
source_type=link["source_type"],
catalyst_type=link["catalyst_type"],
sentiment=link["sentiment"],
impact=link["impact"],
extraction_confidence=link["extraction_confidence"],
weight=link["weight"],
is_duplicate=link["is_duplicate"],
canonical_evidence_key=link["canonical_evidence_key"],
contribution_score=link["contribution_score"],
)
)
# 10. Persist in a transaction (Requirements 1.6, 2.6)
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
_INSERT_SNAPSHOT_SQL,
snapshot.id,
snapshot.generated_at,
snapshot.ticker,
snapshot.window,
snapshot.horizon,
snapshot.direction,
snapshot.action,
snapshot.mode,
snapshot.strength,
snapshot.confidence,
snapshot.contradiction,
snapshot.p_bull,
snapshot.p_bear,
snapshot.score_company,
snapshot.score_macro,
snapshot.score_competitive,
snapshot.evidence_count,
snapshot.unique_source_count,
snapshot.duplicate_evidence_count,
snapshot.price_at_prediction,
snapshot.spy_price_at_prediction,
snapshot.sector_etf_price_at_prediction,
json.dumps(snapshot.metadata),
)
for link in evidence_link_objects:
await conn.execute(
_INSERT_EVIDENCE_LINK_SQL,
link.id,
link.prediction_id,
link.document_id,
link.signal_id,
link.ticker,
link.source,
link.source_type,
link.catalyst_type,
link.sentiment,
link.impact,
link.extraction_confidence,
link.weight,
link.is_duplicate,
link.canonical_evidence_key,
link.contribution_score,
json.dumps(link.metadata),
)
logger.info(
"Created prediction snapshot %s for %s: %d evidence links "
"(%d unique sources, %d duplicates), prices: ticker=%s spy=%s sector_etf=%s",
snapshot_id,
ticker,
len(evidence_link_objects),
unique_source_count,
duplicate_evidence_count,
ticker_price,
spy_price,
sector_etf_price,
)
return snapshot