Files
stonks-oracle/services/recommendation/worker.py
T
2026-04-11 12:10:01 -07:00

721 lines
24 KiB
Python

"""Recommendation worker - generates explainable trade recommendations from trend data.
Fetches the latest trend summaries for a ticker, evaluates eligibility
using deterministic rules, builds Recommendation objects with thesis
and evidence citations, and persists them to PostgreSQL.
Requirements: 7.1, 7.2, 7.3, 7.4
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
import asyncpg
from minio import Minio
from services.lake_publisher.worker import publish_recommendation_facts
from services.recommendation.eligibility import (
EligibilityConfig,
EligibilityResult,
evaluate_eligibility,
)
from services.recommendation.suppression import (
DataQualityContext,
SuppressionConfig,
SuppressionResult,
evaluate_suppression,
)
from services.recommendation.thesis_llm import (
THESIS_PROMPT_VERSION,
rewrite_thesis_with_llm,
)
from services.shared.config import OllamaConfig
from services.shared.metrics import (
RECOMMENDATION_CONFIDENCE,
RECOMMENDATION_GENERATED,
RECOMMENDATION_SUPPRESSED,
)
from services.shared.schemas import (
ModelMetadata,
PositionSizing,
Recommendation,
RecommendationMode,
TrendDirection,
TrendSummary,
TrendWindow,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Fetch latest trend summary for a ticker + window
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Fetch data quality context for suppression checks
# ---------------------------------------------------------------------------
_DATA_QUALITY_QUERY = """
SELECT
COUNT(*) AS total_documents,
COUNT(*) FILTER (WHERE di.validation_status = 'valid') AS valid_documents,
COUNT(*) FILTER (WHERE di.validation_status = 'failed') AS failed_documents,
AVG(di.confidence) FILTER (WHERE di.validation_status = 'valid') AS avg_extraction_confidence,
MAX(d.published_at) AS newest_evidence_at,
ARRAY_AGG(DISTINCT s.source_class) FILTER (WHERE s.source_class IS NOT NULL) AS source_types
FROM documents d
JOIN document_intelligence di ON di.document_id = d.id
LEFT JOIN sources s ON d.source_id = s.id
WHERE d.id = ANY(
SELECT UNNEST(
COALESCE(tw.top_supporting_evidence, '[]'::jsonb)
|| COALESCE(tw.top_opposing_evidence, '[]'::jsonb)
)::uuid
FROM trend_windows tw
WHERE tw.entity_id = $1 AND tw.window = $2
ORDER BY tw.generated_at DESC
LIMIT 1
)
"""
async def fetch_data_quality_context(
pool: asyncpg.Pool,
ticker: str,
window: str,
) -> DataQualityContext | None:
"""Fetch data quality metrics for the documents underlying a trend.
Returns None if the query fails or returns no data, in which case
the suppression module will fall back to summary-based estimation.
"""
try:
row = await pool.fetchrow(_DATA_QUALITY_QUERY, ticker, window)
if row is None or row["total_documents"] == 0:
return None
source_types_raw = row["source_types"]
source_types = set(source_types_raw) if source_types_raw else set()
return DataQualityContext(
total_documents=int(row["total_documents"]),
valid_documents=int(row["valid_documents"] or 0),
failed_documents=int(row["failed_documents"] or 0),
avg_extraction_confidence=float(row["avg_extraction_confidence"] or 0.0),
newest_evidence_at=row["newest_evidence_at"],
source_types=source_types,
)
except Exception:
logger.warning(
"Failed to fetch data quality context for %s/%s — will use summary fallback",
ticker, window, exc_info=True,
)
return None
_LATEST_TREND_QUERY = """
SELECT
entity_type, entity_id, window, trend_direction, trend_strength,
confidence, top_supporting_evidence, top_opposing_evidence,
dominant_catalysts, material_risks, contradiction_score,
disagreement_details, market_context, generated_at
FROM trend_windows
WHERE entity_id = $1 AND window = $2
ORDER BY generated_at DESC
LIMIT 1
"""
def _parse_trend_row(row: asyncpg.Record) -> TrendSummary:
"""Convert a trend_windows row into a TrendSummary."""
supporting = row["top_supporting_evidence"]
if isinstance(supporting, str):
supporting = json.loads(supporting)
opposing = row["top_opposing_evidence"]
if isinstance(opposing, str):
opposing = json.loads(opposing)
catalysts = row["dominant_catalysts"]
if isinstance(catalysts, str):
catalysts = json.loads(catalysts)
risks = row["material_risks"]
if isinstance(risks, str):
risks = json.loads(risks)
return TrendSummary(
entity_type=row["entity_type"],
entity_id=row["entity_id"],
window=TrendWindow(row["window"]),
trend_direction=TrendDirection(row["trend_direction"]),
trend_strength=float(row["trend_strength"]),
confidence=float(row["confidence"]),
top_supporting_evidence=supporting or [],
top_opposing_evidence=opposing or [],
dominant_catalysts=catalysts or [],
material_risks=risks or [],
contradiction_score=float(row["contradiction_score"] or 0.0),
generated_at=row["generated_at"],
)
async def fetch_latest_trend(
pool: asyncpg.Pool,
ticker: str,
window: str,
) -> TrendSummary | None:
"""Fetch the most recent trend summary for a ticker and window."""
row = await pool.fetchrow(_LATEST_TREND_QUERY, ticker, window)
if row is None:
return None
return _parse_trend_row(row)
# ---------------------------------------------------------------------------
# Build thesis from trend summary (deterministic, no LLM)
# ---------------------------------------------------------------------------
def build_thesis(
summary: TrendSummary,
result: EligibilityResult,
) -> str:
"""Generate a deterministic thesis string from trend data.
This is the descriptive analysis portion (Requirement 7.2).
The LLM wording layer is a separate optional task.
"""
direction = summary.trend_direction.value
ticker = summary.entity_id
window = summary.window.value
strength = summary.trend_strength
confidence = summary.confidence
parts: list[str] = []
# Opening: direction and strength
parts.append(
f"{ticker} shows a {direction} trend over the {window} window "
+ f"with strength {strength:.2f} and confidence {confidence:.2f}."
)
# Catalysts
if summary.dominant_catalysts:
catalyst_str = ", ".join(summary.dominant_catalysts[:3])
parts.append(f"Dominant catalysts: {catalyst_str}.")
# Contradiction note (Requirement 7.2 — separate descriptive from prescriptive)
if summary.contradiction_score > 0.15:
parts.append(
"Notable signal disagreement detected "
+ f"(contradiction score: {summary.contradiction_score:.2f})."
)
# Risks
if summary.material_risks:
risk_str = "; ".join(summary.material_risks[:2])
parts.append(f"Key risks: {risk_str}.")
# Evidence count
supporting_count = len(summary.top_supporting_evidence)
opposing_count = len(summary.top_opposing_evidence)
parts.append(
f"Based on {supporting_count} supporting and "
+ f"{opposing_count} opposing evidence documents."
)
# Prescriptive action (separated per Requirement 7.2)
action = result.action.value.upper()
mode = result.mode.value.replace("_", " ")
parts.append(f"Recommendation: {action} ({mode}).")
return " ".join(parts)
# ---------------------------------------------------------------------------
# Build risk classification (Requirement 7.2)
# ---------------------------------------------------------------------------
def classify_risk(
summary: TrendSummary,
result: EligibilityResult,
) -> str:
"""Assign a risk classification label based on signal quality.
Returns one of: low, moderate, high, very_high.
"""
score = 0.0
# Contradiction raises risk
score += summary.contradiction_score * 2.0
# Low confidence raises risk
score += (1.0 - summary.confidence) * 1.5
# Low evidence count raises risk
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < 3:
score += 1.0
elif evidence_count < 5:
score += 0.5
# Rejection reasons raise risk
score += len(result.rejection_reasons) * 0.5
if score >= 3.0:
return "very_high"
if score >= 2.0:
return "high"
if score >= 1.0:
return "moderate"
return "low"
# ---------------------------------------------------------------------------
# Build Recommendation from eligibility result
# ---------------------------------------------------------------------------
def build_recommendation(
summary: TrendSummary,
result: EligibilityResult,
reference_time: datetime | None = None,
llm_thesis: str | None = None,
suppression_result: SuppressionResult | None = None,
) -> Recommendation:
"""Assemble a Recommendation object from a trend summary and eligibility result.
Combines all evidence refs (supporting + opposing) into the recommendation
so the full decision trace is available (Requirement 8.3).
If ``llm_thesis`` is provided (from the optional LLM wording layer),
it replaces the deterministic thesis text while preserving the risk
classification prefix.
If ``suppression_result`` indicates suppression, a suppression note
is appended to the thesis for audit visibility (Requirement 7.4).
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
# Combine evidence refs — supporting first, then opposing
evidence_refs = list(summary.top_supporting_evidence) + list(summary.top_opposing_evidence)
deterministic_thesis = build_thesis(summary, result)
risk_class = classify_risk(summary, result)
# Use LLM-rewritten thesis if available, otherwise deterministic
thesis_body = llm_thesis if llm_thesis else deterministic_thesis
# Append suppression note if suppressed (Requirement 7.4)
if suppression_result and suppression_result.suppressed:
reason_strs = [r.value for r in suppression_result.reasons]
thesis_body += (
f" [SUPPRESSED: data quality below threshold "
f"(score={suppression_result.data_quality_score:.2f}, "
f"reasons={', '.join(reason_strs)})]"
)
# Track whether the thesis was LLM-generated for audit
if llm_thesis:
provider = "ollama"
model_name = "thesis-rewrite"
prompt_version = THESIS_PROMPT_VERSION
else:
provider = "deterministic"
model_name = "eligibility-v1"
prompt_version = ""
return Recommendation(
ticker=summary.entity_id,
action=result.action,
mode=result.mode,
confidence=summary.confidence,
time_horizon=result.time_horizon,
thesis=f"[risk:{risk_class}] {thesis_body}",
invalidation_conditions=result.invalidation_conditions,
position_sizing=PositionSizing(
portfolio_pct=result.position_sizing.portfolio_pct,
max_loss_pct=result.position_sizing.max_loss_pct,
),
evidence_refs=evidence_refs,
model_metadata=ModelMetadata(
provider=provider,
model_name=model_name,
prompt_version=prompt_version,
schema_version="1.0.0",
),
generated_at=reference_time,
)
# ---------------------------------------------------------------------------
# Persist recommendation to PostgreSQL
# ---------------------------------------------------------------------------
_INSERT_RECOMMENDATION = """
INSERT INTO recommendations (
ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8, $9,
$10, $11, $12, $13,
$14, $15
)
RETURNING id
"""
_INSERT_REC_EVIDENCE = """
INSERT INTO recommendation_evidence (
recommendation_id, document_id, evidence_type, weight
) VALUES ($1, $2::uuid, $3, $4)
"""
_INSERT_RISK_EVALUATION = """
INSERT INTO risk_evaluations (
recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb, $6)
"""
_FETCH_RECOMMENDATION = """
SELECT
id, ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
FROM recommendations
WHERE id = $1::uuid
"""
_FETCH_REC_EVIDENCE = """
SELECT document_id, evidence_type, weight
FROM recommendation_evidence
WHERE recommendation_id = $1::uuid
ORDER BY evidence_type, weight DESC
"""
_FETCH_LATEST_RECS_FOR_TICKER = """
SELECT
id, ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
FROM recommendations
WHERE ticker = $1
ORDER BY generated_at DESC
LIMIT $2
"""
def _extract_risk_classification(thesis: str) -> str:
"""Extract the risk classification from the thesis prefix."""
if thesis.startswith("[risk:"):
end = thesis.find("]")
if end > 6:
return thesis[6:end]
return "moderate"
async def persist_recommendation(
pool: asyncpg.Pool,
rec: Recommendation,
supporting_ids: list[str],
opposing_ids: list[str],
eligibility_result: EligibilityResult | None = None,
) -> str:
"""Insert a recommendation, evidence citations, and risk evaluation.
Persists the full model metadata and risk classification for audit
trail (Requirement 8.3). Also writes the eligibility decision to
the risk_evaluations table when provided.
Returns the recommendation UUID.
"""
risk_class = _extract_risk_classification(rec.thesis)
row = await pool.fetchrow(
_INSERT_RECOMMENDATION,
rec.ticker,
rec.action.value,
rec.mode.value,
rec.confidence,
rec.time_horizon,
rec.thesis,
json.dumps(rec.invalidation_conditions),
rec.position_sizing.portfolio_pct,
rec.position_sizing.max_loss_pct,
rec.model_metadata.model_name,
rec.model_metadata.provider,
rec.model_metadata.prompt_version,
rec.model_metadata.schema_version,
risk_class,
rec.generated_at,
)
rec_id = str(row["id"])
# Insert evidence citations with position-based weighting
evidence_rows: list[tuple[str, str, str, float]] = []
for idx, doc_id in enumerate(supporting_ids):
weight = round(1.0 / (1.0 + idx * 0.1), 4) # rank decay
evidence_rows.append((rec_id, doc_id, "supporting", weight))
for idx, doc_id in enumerate(opposing_ids):
weight = round(1.0 / (1.0 + idx * 0.1), 4)
evidence_rows.append((rec_id, doc_id, "opposing", weight))
if evidence_rows:
await pool.executemany(_INSERT_REC_EVIDENCE, evidence_rows)
# Persist the eligibility/risk evaluation for audit trail
if eligibility_result is not None:
rejection_reasons_json = json.dumps(
[r.value for r in eligibility_result.rejection_reasons]
)
risk_checks = {
"time_horizon": eligibility_result.time_horizon,
"position_sizing": {
"portfolio_pct": eligibility_result.position_sizing.portfolio_pct,
"max_loss_pct": eligibility_result.position_sizing.max_loss_pct,
},
"invalidation_conditions": eligibility_result.invalidation_conditions,
"risk_classification": risk_class,
}
await pool.execute(
_INSERT_RISK_EVALUATION,
rec_id,
eligibility_result.eligible,
eligibility_result.mode.value,
rejection_reasons_json,
json.dumps(risk_checks),
rec.generated_at,
)
return rec_id
async def fetch_recommendation_by_id(
pool: asyncpg.Pool,
recommendation_id: str,
) -> dict[str, object] | None:
"""Fetch a persisted recommendation with its evidence citations.
Returns a dict with the recommendation fields and an 'evidence' list,
or None if not found.
"""
row = await pool.fetchrow(_FETCH_RECOMMENDATION, recommendation_id)
if row is None:
return None
rec_dict = dict(row)
# Parse JSONB fields
if isinstance(rec_dict.get("invalidation_conditions"), str):
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
# Fetch evidence
evidence_rows = await pool.fetch(_FETCH_REC_EVIDENCE, recommendation_id)
rec_dict["evidence"] = [
{
"document_id": str(e["document_id"]),
"evidence_type": e["evidence_type"],
"weight": float(e["weight"]),
}
for e in evidence_rows
]
return rec_dict
async def fetch_latest_recommendations(
pool: asyncpg.Pool,
ticker: str,
limit: int = 10,
) -> list[dict[str, object]]:
"""Fetch the most recent recommendations for a ticker.
Returns a list of recommendation dicts (without evidence — use
fetch_recommendation_by_id for full detail).
"""
rows = await pool.fetch(_FETCH_LATEST_RECS_FOR_TICKER, ticker, limit)
results = []
for row in rows:
rec_dict = dict(row)
if isinstance(rec_dict.get("invalidation_conditions"), str):
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
results.append(rec_dict)
return results
# ---------------------------------------------------------------------------
# Main entry point: generate recommendation for a ticker
# ---------------------------------------------------------------------------
async def generate_recommendation(
pool: asyncpg.Pool,
ticker: str,
window: str = TrendWindow.SEVEN_DAY.value,
config: EligibilityConfig | None = None,
reference_time: datetime | None = None,
ollama_config: OllamaConfig | None = None,
suppression_config: SuppressionConfig | None = None,
minio_client: Minio | None = None,
) -> Recommendation | None:
"""Generate and persist a recommendation for a ticker from its latest trend.
Steps:
1. Fetch the latest trend summary for the ticker + window.
2. Evaluate data quality suppression (Requirement 7.4).
3. Evaluate eligibility using deterministic rules.
4. Build a Recommendation object with thesis and evidence.
- If ``ollama_config`` is provided, the deterministic thesis is
rewritten into analyst-quality prose via the LLM wording layer.
5. Persist the recommendation and evidence citations.
Returns the Recommendation, or None if no trend data exists.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
cfg = config or EligibilityConfig()
sup_cfg = suppression_config or SuppressionConfig()
# 1. Fetch latest trend
summary = await fetch_latest_trend(pool, ticker, window)
if summary is None:
logger.info("No trend data for %s/%s — skipping recommendation", ticker, window)
return None
# 2. Evaluate data quality suppression (Requirement 7.4)
quality_ctx = await fetch_data_quality_context(pool, ticker, window)
suppression = evaluate_suppression(
summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time,
)
# 3. Evaluate eligibility
result = evaluate_eligibility(summary, cfg)
# Apply suppression: force mode to informational if suppressed
if suppression.suppressed:
result = EligibilityResult(
eligible=False,
action=result.action,
mode=RecommendationMode.INFORMATIONAL,
position_sizing=result.position_sizing,
rejection_reasons=result.rejection_reasons,
time_horizon=result.time_horizon,
invalidation_conditions=result.invalidation_conditions,
)
# 4. Optional LLM thesis rewrite
llm_thesis: str | None = None
if ollama_config is not None:
deterministic_thesis = build_thesis(summary, result)
llm_thesis = await rewrite_thesis_with_llm(
deterministic_thesis=deterministic_thesis,
summary=summary,
config=ollama_config,
)
# If the LLM returned the same text as the deterministic thesis,
# treat it as a no-op (fallback was used).
if llm_thesis == deterministic_thesis:
llm_thesis = None
# 5. Build recommendation
rec = build_recommendation(
summary, result, reference_time, llm_thesis=llm_thesis,
suppression_result=suppression,
)
# 6. Persist recommendation, evidence citations, and risk evaluation
rec_id = await persist_recommendation(
pool,
rec,
supporting_ids=list(summary.top_supporting_evidence),
opposing_ids=list(summary.top_opposing_evidence),
eligibility_result=result,
)
# 7. Publish prediction facts to analytical tables (Requirement 9.4)
if minio_client is not None:
try:
lake_refs = publish_recommendation_facts(
minio_client,
rec,
trend_direction=summary.trend_direction.value,
trend_strength=summary.trend_strength,
)
logger.info(
"Published analytical facts for %s: %s",
ticker, lake_refs,
)
except Exception:
logger.warning(
"Failed to publish analytical facts for %s/%s — recommendation "
"persisted but lake publication failed",
ticker, rec_id, exc_info=True,
)
logger.info(
"Generated recommendation %s for %s: action=%s mode=%s confidence=%.3f "
"eligible=%s suppressed=%s quality_score=%.3f llm_thesis=%s",
rec_id, ticker, rec.action.value, rec.mode.value, rec.confidence,
result.eligible, suppression.suppressed, suppression.data_quality_score,
llm_thesis is not None,
)
# Prometheus metrics
RECOMMENDATION_GENERATED.labels(action=rec.action.value, mode=rec.mode.value).inc()
RECOMMENDATION_CONFIDENCE.observe(rec.confidence)
if suppression.suppressed:
RECOMMENDATION_SUPPRESSED.inc()
return rec
# ---------------------------------------------------------------------------
# Batch: generate recommendations for multiple tickers
# ---------------------------------------------------------------------------
async def generate_recommendations_batch(
pool: asyncpg.Pool,
tickers: list[str],
window: str = TrendWindow.SEVEN_DAY.value,
config: EligibilityConfig | None = None,
ollama_config: OllamaConfig | None = None,
suppression_config: SuppressionConfig | None = None,
minio_client: Minio | None = None,
) -> list[Recommendation]:
"""Generate recommendations for a list of tickers.
Processes each ticker sequentially. Returns only the successfully
generated recommendations (tickers with no trend data are skipped).
If ``ollama_config`` is provided, each recommendation's thesis will
be rewritten using the LLM wording layer.
"""
results: list[Recommendation] = []
reference_time = datetime.now(timezone.utc)
for ticker in tickers:
rec = await generate_recommendation(
pool, ticker, window, config, reference_time,
ollama_config=ollama_config,
suppression_config=suppression_config,
minio_client=minio_client,
)
if rec is not None:
results.append(rec)
logger.info(
"Batch recommendation: %d/%d tickers produced recommendations",
len(results), len(tickers),
)
return results