76f6bd5677
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
541 lines
18 KiB
Python
541 lines
18 KiB
Python
"""Prediction Snapshot Writer — captures immutable prediction state at generation time.
|
||
|
||
Creates frozen records of every recommendation with prices, evidence links,
|
||
duplicate detection, and contribution scores so that predictions can be
|
||
evaluated against future outcomes without hindsight bias.
|
||
|
||
Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 3.2, 3.3, 3.4
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import json
|
||
import logging
|
||
import urllib.parse
|
||
import uuid
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
|
||
import asyncpg
|
||
|
||
from services.shared.schemas import Recommendation, TrendSummary
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
SECTOR_ETF_MAP: dict[str, str] = {
|
||
"Technology": "XLK",
|
||
"Consumer Cyclical": "XLY",
|
||
"Financial Services": "XLF",
|
||
"Healthcare": "XLV",
|
||
"Energy": "XLE",
|
||
"Communication Services": "XLC",
|
||
"Industrials": "XLI",
|
||
"Consumer Defensive": "XLP",
|
||
"Real Estate": "XLRE",
|
||
"Utilities": "XLU",
|
||
}
|
||
|
||
EVALUATION_HORIZONS: list[str] = ["1h", "6h", "1d", "7d", "30d"]
|
||
|
||
MAX_SINGLE_DOCUMENT_WEIGHT: float = 1.0
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dataclasses
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
@dataclass
|
||
class PredictionSnapshot:
|
||
"""Immutable snapshot of a prediction at generation time."""
|
||
|
||
id: str # UUID
|
||
generated_at: datetime
|
||
ticker: str
|
||
window: str
|
||
horizon: str
|
||
direction: str # bullish/bearish/mixed/neutral
|
||
action: str # buy/sell/hold/watch
|
||
mode: str # informational/paper_eligible/live_eligible
|
||
strength: float
|
||
confidence: float
|
||
contradiction: float
|
||
p_bull: float | None
|
||
p_bear: float | None
|
||
score_company: float
|
||
score_macro: float
|
||
score_competitive: float
|
||
evidence_count: int
|
||
unique_source_count: int
|
||
duplicate_evidence_count: int
|
||
price_at_prediction: float | None
|
||
spy_price_at_prediction: float | None
|
||
sector_etf_price_at_prediction: float | None
|
||
metadata: dict = field(default_factory=dict)
|
||
|
||
|
||
@dataclass
|
||
class SignalEvidenceLink:
|
||
"""Link between a prediction and a contributing evidence document."""
|
||
|
||
id: str # UUID
|
||
prediction_id: str
|
||
document_id: str
|
||
signal_id: str
|
||
ticker: str
|
||
source: str
|
||
source_type: str
|
||
catalyst_type: str
|
||
sentiment: str
|
||
impact: float
|
||
extraction_confidence: float
|
||
weight: float # clamped to MAX_SINGLE_DOCUMENT_WEIGHT
|
||
is_duplicate: bool
|
||
canonical_evidence_key: str
|
||
contribution_score: float # weight / total_weight, sums to 1.0
|
||
metadata: dict = field(default_factory=dict)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Canonical evidence key computation (Requirements 2.3, 17.4)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def compute_canonical_evidence_key(title: str, url: str) -> str:
|
||
"""SHA256 of normalized(title) + normalized(url).
|
||
|
||
Normalization:
|
||
- Title: lowercase, strip leading/trailing whitespace
|
||
- URL: lowercase, strip query parameters (keep scheme, netloc, path)
|
||
"""
|
||
normalized_title = title.strip().lower()
|
||
|
||
parsed = urllib.parse.urlparse(url.lower())
|
||
normalized_url = urllib.parse.urlunparse(
|
||
(parsed.scheme, parsed.netloc, parsed.path, "", "", "")
|
||
)
|
||
|
||
combined = normalized_title + normalized_url
|
||
return hashlib.sha256(combined.encode("utf-8")).hexdigest()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Contribution score computation (Requirements 2.5, 17.7)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def compute_contribution_scores(weights: list[float]) -> list[float]:
|
||
"""Compute contribution scores: each score = weight_i / sum(weights).
|
||
|
||
All scores are in [0.0, 1.0] and sum to 1.0 (within floating-point tolerance).
|
||
Returns an empty list for empty input.
|
||
"""
|
||
if not weights:
|
||
return []
|
||
|
||
total = sum(weights)
|
||
if total == 0.0:
|
||
# All weights are zero — distribute equally
|
||
n = len(weights)
|
||
return [1.0 / n] * n
|
||
|
||
return [w / total for w in weights]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Price fetching (Requirements 1.2, 1.3, 1.4, 1.5)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_LATEST_CLOSE_SQL = """
|
||
SELECT (data->>'c')::float AS close
|
||
FROM market_snapshots
|
||
WHERE ticker = $1 AND snapshot_type = 'bar' AND data->>'c' IS NOT NULL
|
||
ORDER BY captured_at DESC
|
||
LIMIT 1
|
||
"""
|
||
|
||
|
||
async def fetch_latest_close_price(
|
||
pool: asyncpg.Pool,
|
||
ticker: str,
|
||
) -> float | None:
|
||
"""Fetch most recent close price from market_snapshots for a ticker.
|
||
|
||
Returns None if no market data is available for the ticker.
|
||
"""
|
||
row = await pool.fetchrow(_LATEST_CLOSE_SQL, ticker)
|
||
if row is None:
|
||
return None
|
||
return row["close"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Sector ETF lookup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_COMPANY_SECTOR_SQL = """
|
||
SELECT sector FROM companies WHERE ticker = $1 AND active = TRUE LIMIT 1
|
||
"""
|
||
|
||
|
||
async def _fetch_sector_etf_ticker(pool: asyncpg.Pool, ticker: str) -> str | None:
|
||
"""Look up the sector ETF ticker for a company ticker."""
|
||
row = await pool.fetchrow(_COMPANY_SECTOR_SQL, ticker)
|
||
if row is None or row["sector"] is None:
|
||
return None
|
||
return SECTOR_ETF_MAP.get(row["sector"])
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Layer score computation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _compute_layer_scores(
|
||
evidence_signals: list[dict],
|
||
) -> tuple[float, float, float]:
|
||
"""Compute company, macro, and competitive layer scores from evidence signals.
|
||
|
||
Each signal's source_type determines its layer:
|
||
- company: news_api, filings_api, web_scrape
|
||
- macro: macro events (source_type containing 'macro')
|
||
- competitive: competitive signals (source_type containing 'competitive' or 'pattern')
|
||
|
||
Returns (score_company, score_macro, score_competitive) as fractions summing to 1.0.
|
||
"""
|
||
company_weight = 0.0
|
||
macro_weight = 0.0
|
||
competitive_weight = 0.0
|
||
|
||
for sig in evidence_signals:
|
||
w = sig.get("weight", 0.0)
|
||
source_type = sig.get("source_type", "").lower()
|
||
catalyst_type = sig.get("catalyst_type", "").lower()
|
||
|
||
if "macro" in source_type or catalyst_type == "macro":
|
||
macro_weight += w
|
||
elif "competitive" in source_type or "pattern" in source_type:
|
||
competitive_weight += w
|
||
else:
|
||
company_weight += w
|
||
|
||
total = company_weight + macro_weight + competitive_weight
|
||
if total == 0.0:
|
||
return (0.0, 0.0, 0.0)
|
||
|
||
return (
|
||
round(company_weight / total, 6),
|
||
round(macro_weight / total, 6),
|
||
round(competitive_weight / total, 6),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# SQL statements
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_INSERT_SNAPSHOT_SQL = """
|
||
INSERT INTO prediction_snapshots (
|
||
id, generated_at, ticker, "window", horizon, direction, action, mode,
|
||
strength, confidence, contradiction, p_bull, p_bear,
|
||
score_company, score_macro, score_competitive,
|
||
evidence_count, unique_source_count, duplicate_evidence_count,
|
||
price_at_prediction, spy_price_at_prediction, sector_etf_price_at_prediction,
|
||
metadata
|
||
) VALUES (
|
||
$1::uuid, $2, $3, $4, $5, $6, $7, $8,
|
||
$9, $10, $11, $12, $13,
|
||
$14, $15, $16,
|
||
$17, $18, $19,
|
||
$20, $21, $22,
|
||
$23::jsonb
|
||
)
|
||
"""
|
||
|
||
_INSERT_EVIDENCE_LINK_SQL = """
|
||
INSERT INTO signal_evidence_links (
|
||
id, prediction_id, document_id, signal_id, ticker,
|
||
source, source_type, catalyst_type, sentiment,
|
||
impact, extraction_confidence, weight,
|
||
is_duplicate, canonical_evidence_key, contribution_score,
|
||
metadata
|
||
) VALUES (
|
||
$1::uuid, $2::uuid, $3, $4, $5,
|
||
$6, $7, $8, $9,
|
||
$10, $11, $12,
|
||
$13, $14, $15,
|
||
$16::jsonb
|
||
)
|
||
"""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main entry point (Requirements 1.1–1.7, 2.1–2.6, 3.1–3.4)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
async def create_prediction_snapshot(
|
||
pool: asyncpg.Pool,
|
||
recommendation: Recommendation,
|
||
trend_summary: TrendSummary,
|
||
evidence_signals: list[dict],
|
||
evidence_docs: list[dict],
|
||
) -> PredictionSnapshot:
|
||
"""Create and persist a prediction snapshot with evidence links.
|
||
|
||
Steps:
|
||
1. Fetch current prices (ticker, SPY, sector ETF) from market_snapshots
|
||
2. Compute canonical evidence keys and detect duplicates
|
||
3. Clamp individual document weights to MAX_SINGLE_DOCUMENT_WEIGHT
|
||
4. Compute contribution scores (one-vote-per-canonical-key dedup)
|
||
5. Persist snapshot and evidence links in a transaction
|
||
|
||
Args:
|
||
pool: asyncpg connection pool.
|
||
recommendation: The generated Recommendation object.
|
||
trend_summary: The TrendSummary used to generate the recommendation.
|
||
evidence_signals: List of dicts with signal fields (source, source_type,
|
||
catalyst_type, sentiment, impact, extraction_confidence, weight,
|
||
document_id, signal_id, ticker).
|
||
evidence_docs: List of dicts with document metadata (title, url, document_id).
|
||
|
||
Returns:
|
||
The persisted PredictionSnapshot.
|
||
"""
|
||
ticker = recommendation.ticker
|
||
|
||
# 1. Fetch prices — handle NULL gracefully (Requirement 1.5)
|
||
ticker_price = await fetch_latest_close_price(pool, ticker)
|
||
if ticker_price is None:
|
||
logger.warning("No market price available for %s at snapshot time", ticker)
|
||
|
||
spy_price = await fetch_latest_close_price(pool, "SPY")
|
||
if spy_price is None:
|
||
logger.warning("No SPY price available at snapshot time")
|
||
|
||
sector_etf_ticker = await _fetch_sector_etf_ticker(pool, ticker)
|
||
sector_etf_price: float | None = None
|
||
if sector_etf_ticker is not None:
|
||
sector_etf_price = await fetch_latest_close_price(pool, sector_etf_ticker)
|
||
if sector_etf_price is None:
|
||
logger.warning(
|
||
"No sector ETF price available for %s (%s) at snapshot time",
|
||
sector_etf_ticker,
|
||
ticker,
|
||
)
|
||
else:
|
||
logger.warning("No sector ETF mapping found for ticker %s", ticker)
|
||
|
||
# 2. Build a doc lookup for canonical key computation
|
||
doc_lookup: dict[str, dict] = {}
|
||
for doc in evidence_docs:
|
||
doc_id = doc.get("document_id", "")
|
||
doc_lookup[doc_id] = doc
|
||
|
||
# 3. Process evidence signals: compute canonical keys, detect duplicates,
|
||
# clamp weights
|
||
processed_links: list[dict] = []
|
||
seen_canonical_keys: dict[str, int] = {} # canonical_key -> first index
|
||
|
||
for sig in evidence_signals:
|
||
doc_id = sig.get("document_id", "")
|
||
doc_meta = doc_lookup.get(doc_id, {})
|
||
title = doc_meta.get("title", "")
|
||
url = doc_meta.get("url", "")
|
||
|
||
canonical_key = compute_canonical_evidence_key(title, url)
|
||
|
||
# Detect duplicates: same canonical key for same ticker
|
||
is_duplicate = canonical_key in seen_canonical_keys
|
||
if not is_duplicate:
|
||
seen_canonical_keys[canonical_key] = len(processed_links)
|
||
|
||
# Clamp weight to MAX_SINGLE_DOCUMENT_WEIGHT (Requirement 3.3)
|
||
raw_weight = sig.get("weight", 0.0)
|
||
clamped_weight = min(raw_weight, MAX_SINGLE_DOCUMENT_WEIGHT)
|
||
|
||
processed_links.append({
|
||
"id": str(uuid.uuid4()),
|
||
"document_id": doc_id,
|
||
"signal_id": sig.get("signal_id", ""),
|
||
"ticker": sig.get("ticker", ticker),
|
||
"source": sig.get("source", ""),
|
||
"source_type": sig.get("source_type", ""),
|
||
"catalyst_type": sig.get("catalyst_type", ""),
|
||
"sentiment": sig.get("sentiment", ""),
|
||
"impact": sig.get("impact", 0.0),
|
||
"extraction_confidence": sig.get("extraction_confidence", 0.0),
|
||
"weight": clamped_weight,
|
||
"is_duplicate": is_duplicate,
|
||
"canonical_evidence_key": canonical_key,
|
||
})
|
||
|
||
# 4. Compute contribution scores — one vote per canonical key (Requirement 3.4)
|
||
# Only non-duplicate links contribute to the weight pool
|
||
non_dup_weights = [
|
||
link["weight"] for link in processed_links if not link["is_duplicate"]
|
||
]
|
||
non_dup_scores = compute_contribution_scores(non_dup_weights)
|
||
|
||
# Assign contribution scores: non-duplicates get their computed score,
|
||
# duplicates get 0.0
|
||
score_idx = 0
|
||
for link in processed_links:
|
||
if not link["is_duplicate"]:
|
||
link["contribution_score"] = non_dup_scores[score_idx]
|
||
score_idx += 1
|
||
else:
|
||
link["contribution_score"] = 0.0
|
||
|
||
# 5. Compute deduplication quality metrics (Requirements 3.1, 3.2)
|
||
unique_sources = {
|
||
link["source"]
|
||
for link in processed_links
|
||
if not link["is_duplicate"]
|
||
}
|
||
unique_source_count = len(unique_sources)
|
||
duplicate_evidence_count = sum(
|
||
1 for link in processed_links if link["is_duplicate"]
|
||
)
|
||
|
||
# 6. Compute layer scores from evidence signals
|
||
score_company, score_macro, score_competitive = _compute_layer_scores(
|
||
evidence_signals
|
||
)
|
||
|
||
# 7. Build metadata from trend summary context (Requirement 1.7)
|
||
metadata: dict = {}
|
||
if trend_summary.market_context is not None:
|
||
metadata["market_context"] = {
|
||
"ticker": trend_summary.market_context.ticker,
|
||
"price_change_pct": trend_summary.market_context.price_change_pct,
|
||
"avg_volume": trend_summary.market_context.avg_volume,
|
||
"volume_change_pct": trend_summary.market_context.volume_change_pct,
|
||
"volatility": trend_summary.market_context.volatility,
|
||
"latest_close": trend_summary.market_context.latest_close,
|
||
"bars_available": trend_summary.market_context.bars_available,
|
||
}
|
||
if sector_etf_ticker is not None:
|
||
metadata["sector_etf_ticker"] = sector_etf_ticker
|
||
|
||
# 8. Build the snapshot
|
||
snapshot_id = str(uuid.uuid4())
|
||
snapshot = PredictionSnapshot(
|
||
id=snapshot_id,
|
||
generated_at=recommendation.generated_at,
|
||
ticker=ticker,
|
||
window=trend_summary.window.value,
|
||
horizon=recommendation.time_horizon,
|
||
direction=trend_summary.trend_direction.value,
|
||
action=recommendation.action.value,
|
||
mode=recommendation.mode.value,
|
||
strength=trend_summary.trend_strength,
|
||
confidence=recommendation.confidence,
|
||
contradiction=trend_summary.contradiction_score,
|
||
p_bull=trend_summary.p_bull,
|
||
p_bear=1.0 - trend_summary.p_bull if trend_summary.p_bull is not None else None,
|
||
score_company=score_company,
|
||
score_macro=score_macro,
|
||
score_competitive=score_competitive,
|
||
evidence_count=len(processed_links),
|
||
unique_source_count=unique_source_count,
|
||
duplicate_evidence_count=duplicate_evidence_count,
|
||
price_at_prediction=ticker_price,
|
||
spy_price_at_prediction=spy_price,
|
||
sector_etf_price_at_prediction=sector_etf_price,
|
||
metadata=metadata,
|
||
)
|
||
|
||
# 9. Build evidence link objects
|
||
evidence_link_objects: list[SignalEvidenceLink] = []
|
||
for link in processed_links:
|
||
evidence_link_objects.append(
|
||
SignalEvidenceLink(
|
||
id=link["id"],
|
||
prediction_id=snapshot_id,
|
||
document_id=link["document_id"],
|
||
signal_id=link["signal_id"],
|
||
ticker=link["ticker"],
|
||
source=link["source"],
|
||
source_type=link["source_type"],
|
||
catalyst_type=link["catalyst_type"],
|
||
sentiment=link["sentiment"],
|
||
impact=link["impact"],
|
||
extraction_confidence=link["extraction_confidence"],
|
||
weight=link["weight"],
|
||
is_duplicate=link["is_duplicate"],
|
||
canonical_evidence_key=link["canonical_evidence_key"],
|
||
contribution_score=link["contribution_score"],
|
||
)
|
||
)
|
||
|
||
# 10. Persist in a transaction (Requirements 1.6, 2.6)
|
||
async with pool.acquire() as conn:
|
||
async with conn.transaction():
|
||
await conn.execute(
|
||
_INSERT_SNAPSHOT_SQL,
|
||
snapshot.id,
|
||
snapshot.generated_at,
|
||
snapshot.ticker,
|
||
snapshot.window,
|
||
snapshot.horizon,
|
||
snapshot.direction,
|
||
snapshot.action,
|
||
snapshot.mode,
|
||
snapshot.strength,
|
||
snapshot.confidence,
|
||
snapshot.contradiction,
|
||
snapshot.p_bull,
|
||
snapshot.p_bear,
|
||
snapshot.score_company,
|
||
snapshot.score_macro,
|
||
snapshot.score_competitive,
|
||
snapshot.evidence_count,
|
||
snapshot.unique_source_count,
|
||
snapshot.duplicate_evidence_count,
|
||
snapshot.price_at_prediction,
|
||
snapshot.spy_price_at_prediction,
|
||
snapshot.sector_etf_price_at_prediction,
|
||
json.dumps(snapshot.metadata),
|
||
)
|
||
|
||
for link in evidence_link_objects:
|
||
await conn.execute(
|
||
_INSERT_EVIDENCE_LINK_SQL,
|
||
link.id,
|
||
link.prediction_id,
|
||
link.document_id,
|
||
link.signal_id,
|
||
link.ticker,
|
||
link.source,
|
||
link.source_type,
|
||
link.catalyst_type,
|
||
link.sentiment,
|
||
link.impact,
|
||
link.extraction_confidence,
|
||
link.weight,
|
||
link.is_duplicate,
|
||
link.canonical_evidence_key,
|
||
link.contribution_score,
|
||
json.dumps(link.metadata),
|
||
)
|
||
|
||
logger.info(
|
||
"Created prediction snapshot %s for %s: %d evidence links "
|
||
"(%d unique sources, %d duplicates), prices: ticker=%s spy=%s sector_etf=%s",
|
||
snapshot_id,
|
||
ticker,
|
||
len(evidence_link_objects),
|
||
unique_source_count,
|
||
duplicate_evidence_count,
|
||
ticker_price,
|
||
spy_price,
|
||
sector_etf_price,
|
||
)
|
||
|
||
return snapshot
|