f11aa0a1ee
- Add dedup check in recommendation worker: skip generation when latest rec for same ticker+window has identical action/mode/confidence - Widen position sizing range (1-10% portfolio, 0.3-2% max loss) and factor in trend strength + evidence count for differentiated sizing - API returns only latest recommendation per ticker by default (DISTINCT ON) to eliminate duplicate rows in the frontend list view
899 lines
31 KiB
Python
899 lines
31 KiB
Python
"""Recommendation worker - generates explainable trade recommendations from trend data.
|
|
|
|
Fetches the latest trend summaries for a ticker, evaluates eligibility
|
|
using deterministic rules, builds Recommendation objects with thesis
|
|
and evidence citations, and persists them to PostgreSQL.
|
|
|
|
Requirements: 7.1, 7.2, 7.3, 7.4
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
|
|
import asyncpg
|
|
from minio import Minio
|
|
|
|
from services.aggregation.projection import TrendProjection
|
|
from services.lake_publisher.worker import publish_recommendation_facts
|
|
from services.recommendation.eligibility import (
|
|
EligibilityConfig,
|
|
EligibilityResult,
|
|
evaluate_eligibility,
|
|
)
|
|
from services.recommendation.suppression import (
|
|
DataQualityContext,
|
|
SuppressionConfig,
|
|
SuppressionResult,
|
|
evaluate_suppression,
|
|
)
|
|
from services.recommendation.thesis_llm import (
|
|
THESIS_PROMPT_VERSION,
|
|
rewrite_thesis_with_llm,
|
|
)
|
|
from services.shared.config import OllamaConfig
|
|
from services.shared.metrics import (
|
|
RECOMMENDATION_CONFIDENCE,
|
|
RECOMMENDATION_GENERATED,
|
|
RECOMMENDATION_SUPPRESSED,
|
|
)
|
|
from services.shared.schemas import (
|
|
ModelMetadata,
|
|
PositionSizing,
|
|
Recommendation,
|
|
RecommendationMode,
|
|
TrendDirection,
|
|
TrendSummary,
|
|
TrendWindow,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch latest trend summary for a ticker + window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch data quality context for suppression checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DATA_QUALITY_QUERY = """
|
|
WITH latest_trend AS (
|
|
SELECT top_supporting_evidence, top_opposing_evidence
|
|
FROM trend_windows
|
|
WHERE entity_id = $1 AND "window" = $2
|
|
ORDER BY generated_at DESC
|
|
LIMIT 1
|
|
),
|
|
evidence_ids AS (
|
|
SELECT jsonb_array_elements_text(
|
|
COALESCE(lt.top_supporting_evidence, '[]'::jsonb)
|
|
|| COALESCE(lt.top_opposing_evidence, '[]'::jsonb)
|
|
) AS eid
|
|
FROM latest_trend lt
|
|
)
|
|
SELECT
|
|
COUNT(*) AS total_documents,
|
|
COUNT(*) FILTER (WHERE di.validation_status = 'valid') AS valid_documents,
|
|
COUNT(*) FILTER (WHERE di.validation_status = 'failed') AS failed_documents,
|
|
AVG(di.confidence) FILTER (WHERE di.validation_status = 'valid') AS avg_extraction_confidence,
|
|
MAX(d.published_at) AS newest_evidence_at,
|
|
ARRAY_AGG(DISTINCT d.source_type) FILTER (WHERE d.source_type IS NOT NULL) AS source_types
|
|
FROM documents d
|
|
JOIN document_intelligence di ON di.document_id = d.id
|
|
WHERE d.id::text IN (SELECT eid FROM evidence_ids)
|
|
"""
|
|
|
|
|
|
async def fetch_data_quality_context(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> DataQualityContext | None:
|
|
"""Fetch data quality metrics for the documents underlying a trend.
|
|
|
|
Returns None if the query fails or returns no data, in which case
|
|
the suppression module will fall back to summary-based estimation.
|
|
"""
|
|
try:
|
|
row = await pool.fetchrow(_DATA_QUALITY_QUERY, ticker, window)
|
|
if row is None or row["total_documents"] == 0:
|
|
return None
|
|
|
|
source_types_raw = row["source_types"]
|
|
source_types = set(source_types_raw) if source_types_raw else set()
|
|
|
|
return DataQualityContext(
|
|
total_documents=int(row["total_documents"]),
|
|
valid_documents=int(row["valid_documents"] or 0),
|
|
failed_documents=int(row["failed_documents"] or 0),
|
|
avg_extraction_confidence=float(row["avg_extraction_confidence"] or 0.0),
|
|
newest_evidence_at=row["newest_evidence_at"],
|
|
source_types=source_types,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to fetch data quality context for %s/%s — will use summary fallback",
|
|
ticker, window, exc_info=True,
|
|
)
|
|
return None
|
|
|
|
|
|
_LATEST_TREND_QUERY = """
|
|
SELECT
|
|
entity_type, entity_id, "window", trend_direction, trend_strength,
|
|
confidence, top_supporting_evidence, top_opposing_evidence,
|
|
dominant_catalysts, material_risks, contradiction_score,
|
|
disagreement_details, market_context, generated_at
|
|
FROM trend_windows
|
|
WHERE entity_id = $1 AND "window" = $2
|
|
ORDER BY generated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
def _parse_trend_row(row: asyncpg.Record) -> TrendSummary:
|
|
"""Convert a trend_windows row into a TrendSummary."""
|
|
supporting = row["top_supporting_evidence"]
|
|
if isinstance(supporting, str):
|
|
supporting = json.loads(supporting)
|
|
|
|
opposing = row["top_opposing_evidence"]
|
|
if isinstance(opposing, str):
|
|
opposing = json.loads(opposing)
|
|
|
|
catalysts = row["dominant_catalysts"]
|
|
if isinstance(catalysts, str):
|
|
catalysts = json.loads(catalysts)
|
|
|
|
risks = row["material_risks"]
|
|
if isinstance(risks, str):
|
|
risks = json.loads(risks)
|
|
|
|
return TrendSummary(
|
|
entity_type=row["entity_type"],
|
|
entity_id=row["entity_id"],
|
|
window=TrendWindow(row["window"]),
|
|
trend_direction=TrendDirection(row["trend_direction"]),
|
|
trend_strength=float(row["trend_strength"]),
|
|
confidence=float(row["confidence"]),
|
|
top_supporting_evidence=supporting or [],
|
|
top_opposing_evidence=opposing or [],
|
|
dominant_catalysts=catalysts or [],
|
|
material_risks=risks or [],
|
|
contradiction_score=float(row["contradiction_score"] or 0.0),
|
|
generated_at=row["generated_at"],
|
|
)
|
|
|
|
|
|
async def fetch_latest_trend(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> TrendSummary | None:
|
|
"""Fetch the most recent trend summary for a ticker and window."""
|
|
row = await pool.fetchrow(_LATEST_TREND_QUERY, ticker, window)
|
|
if row is None:
|
|
return None
|
|
return _parse_trend_row(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch latest trend projection for a ticker + window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_LATEST_PROJECTION_QUERY = """
|
|
SELECT
|
|
tp.projected_direction, tp.projected_strength, tp.projected_confidence,
|
|
tp.projection_horizon, tp.driving_factors, tp.macro_contribution_pct,
|
|
tp.diverges_from_current, tp.computed_at
|
|
FROM trend_projections tp
|
|
JOIN trend_windows tw ON tw.id = tp.trend_window_id
|
|
WHERE tw.entity_id = $1 AND tw."window" = $2
|
|
ORDER BY tp.computed_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def fetch_latest_projection(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> TrendProjection | None:
|
|
"""Fetch the most recent trend projection for a ticker and window.
|
|
|
|
Returns None if no projection exists. Low-confidence projections
|
|
are returned with low_confidence=True so callers can decide whether
|
|
to use them (Requirement 12.9).
|
|
"""
|
|
try:
|
|
row = await pool.fetchrow(_LATEST_PROJECTION_QUERY, ticker, window)
|
|
if row is None:
|
|
return None
|
|
|
|
driving_factors = row["driving_factors"]
|
|
if isinstance(driving_factors, str):
|
|
driving_factors = json.loads(driving_factors)
|
|
|
|
proj = TrendProjection(
|
|
projected_direction=row["projected_direction"],
|
|
projected_strength=float(row["projected_strength"]),
|
|
projected_confidence=float(row["projected_confidence"]),
|
|
projection_horizon=row["projection_horizon"],
|
|
driving_factors=driving_factors or [],
|
|
macro_contribution_pct=float(row["macro_contribution_pct"] or 0.0),
|
|
diverges_from_current=bool(row["diverges_from_current"]),
|
|
computed_at=row["computed_at"],
|
|
low_confidence=float(row["projected_confidence"]) < 0.3,
|
|
)
|
|
return proj
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to fetch projection for %s/%s — continuing without projection",
|
|
ticker, window, exc_info=True,
|
|
)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build thesis from trend summary (deterministic, no LLM)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_thesis(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
projection: TrendProjection | None = None,
|
|
) -> str:
|
|
"""Generate a deterministic thesis string from trend data.
|
|
|
|
This is the descriptive analysis portion (Requirement 7.2).
|
|
The LLM wording layer is a separate optional task.
|
|
|
|
When a TrendProjection is provided and is not low-confidence,
|
|
the thesis incorporates the projected direction and key driving
|
|
factors (Requirement 12.8).
|
|
"""
|
|
direction = summary.trend_direction.value
|
|
ticker = summary.entity_id
|
|
window = summary.window.value
|
|
strength = summary.trend_strength
|
|
confidence = summary.confidence
|
|
|
|
parts: list[str] = []
|
|
|
|
# Opening: direction and strength
|
|
parts.append(
|
|
f"{ticker} shows a {direction} trend over the {window} window "
|
|
+ f"with strength {strength:.2f} and confidence {confidence:.2f}."
|
|
)
|
|
|
|
# Catalysts
|
|
if summary.dominant_catalysts:
|
|
catalyst_str = ", ".join(summary.dominant_catalysts[:3])
|
|
parts.append(f"Dominant catalysts: {catalyst_str}.")
|
|
|
|
# Contradiction note (Requirement 7.2 — separate descriptive from prescriptive)
|
|
if summary.contradiction_score > 0.15:
|
|
parts.append(
|
|
"Notable signal disagreement detected "
|
|
+ f"(contradiction score: {summary.contradiction_score:.2f})."
|
|
)
|
|
|
|
# Trend projection (Requirement 12.8)
|
|
if projection is not None and not projection.low_confidence:
|
|
proj_dir = projection.projected_direction
|
|
proj_str = projection.projected_strength
|
|
parts.append(
|
|
f"Forward projection ({projection.projection_horizon}): "
|
|
f"{proj_dir} at strength {proj_str:.2f}."
|
|
)
|
|
# Include top driving factors
|
|
non_divergence_factors = [
|
|
f for f in projection.driving_factors
|
|
if not f.startswith("DIVERGENCE:")
|
|
]
|
|
if non_divergence_factors:
|
|
factors_str = "; ".join(non_divergence_factors[:2])
|
|
parts.append(f"Key drivers: {factors_str}.")
|
|
if projection.diverges_from_current:
|
|
parts.append(
|
|
f"Note: projection diverges from current {direction} trend."
|
|
)
|
|
|
|
# Risks
|
|
if summary.material_risks:
|
|
risk_str = "; ".join(summary.material_risks[:2])
|
|
parts.append(f"Key risks: {risk_str}.")
|
|
|
|
# Evidence count
|
|
supporting_count = len(summary.top_supporting_evidence)
|
|
opposing_count = len(summary.top_opposing_evidence)
|
|
parts.append(
|
|
f"Based on {supporting_count} supporting and "
|
|
+ f"{opposing_count} opposing evidence documents."
|
|
)
|
|
|
|
# Prescriptive action (separated per Requirement 7.2)
|
|
action = result.action.value.upper()
|
|
mode = result.mode.value.replace("_", " ")
|
|
parts.append(f"Recommendation: {action} ({mode}).")
|
|
|
|
return " ".join(parts)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build risk classification (Requirement 7.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def classify_risk(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
) -> str:
|
|
"""Assign a risk classification label based on signal quality.
|
|
|
|
Returns one of: low, moderate, high, very_high.
|
|
"""
|
|
score = 0.0
|
|
|
|
# Contradiction raises risk
|
|
score += summary.contradiction_score * 2.0
|
|
|
|
# Low confidence raises risk
|
|
score += (1.0 - summary.confidence) * 1.5
|
|
|
|
# Low evidence count raises risk
|
|
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
|
|
if evidence_count < 3:
|
|
score += 1.0
|
|
elif evidence_count < 5:
|
|
score += 0.5
|
|
|
|
# Rejection reasons raise risk
|
|
score += len(result.rejection_reasons) * 0.5
|
|
|
|
if score >= 3.0:
|
|
return "very_high"
|
|
if score >= 2.0:
|
|
return "high"
|
|
if score >= 1.0:
|
|
return "moderate"
|
|
return "low"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build Recommendation from eligibility result
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_recommendation(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
reference_time: datetime | None = None,
|
|
llm_thesis: str | None = None,
|
|
suppression_result: SuppressionResult | None = None,
|
|
projection: TrendProjection | None = None,
|
|
) -> Recommendation:
|
|
"""Assemble a Recommendation object from a trend summary and eligibility result.
|
|
|
|
Combines all evidence refs (supporting + opposing) into the recommendation
|
|
so the full decision trace is available (Requirement 8.3).
|
|
|
|
If ``llm_thesis`` is provided (from the optional LLM wording layer),
|
|
it replaces the deterministic thesis text while preserving the risk
|
|
classification prefix.
|
|
|
|
If ``suppression_result`` indicates suppression, a suppression note
|
|
is appended to the thesis for audit visibility (Requirement 7.4).
|
|
|
|
If ``projection`` is provided and is not low-confidence, the thesis
|
|
incorporates projected direction and driving factors (Requirement 12.8).
|
|
The time_horizon may be refined based on the projection horizon.
|
|
"""
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
# Combine evidence refs — supporting first, then opposing
|
|
evidence_refs = list(summary.top_supporting_evidence) + list(summary.top_opposing_evidence)
|
|
|
|
deterministic_thesis = build_thesis(summary, result, projection=projection)
|
|
risk_class = classify_risk(summary, result)
|
|
|
|
# Use LLM-rewritten thesis if available, otherwise deterministic
|
|
thesis_body = llm_thesis if llm_thesis else deterministic_thesis
|
|
|
|
# Append suppression note if suppressed (Requirement 7.4)
|
|
if suppression_result and suppression_result.suppressed:
|
|
reason_strs = [r.value for r in suppression_result.reasons]
|
|
thesis_body += (
|
|
f" [SUPPRESSED: data quality below threshold "
|
|
f"(score={suppression_result.data_quality_score:.2f}, "
|
|
f"reasons={', '.join(reason_strs)})]"
|
|
)
|
|
|
|
# Determine time_horizon — refine with projection horizon if available
|
|
# (Requirement 12.8)
|
|
time_horizon = result.time_horizon
|
|
if projection is not None and not projection.low_confidence:
|
|
# Append projection horizon context to time_horizon
|
|
time_horizon = f"{result.time_horizon} (proj:{projection.projection_horizon})"
|
|
|
|
# Track whether the thesis was LLM-generated for audit
|
|
if llm_thesis:
|
|
provider = "ollama"
|
|
model_name = "thesis-rewrite"
|
|
prompt_version = THESIS_PROMPT_VERSION
|
|
else:
|
|
provider = "deterministic"
|
|
model_name = "eligibility-v1"
|
|
prompt_version = ""
|
|
|
|
return Recommendation(
|
|
ticker=summary.entity_id,
|
|
action=result.action,
|
|
mode=result.mode,
|
|
confidence=summary.confidence,
|
|
time_horizon=time_horizon,
|
|
thesis=f"[risk:{risk_class}] {thesis_body}",
|
|
invalidation_conditions=result.invalidation_conditions,
|
|
position_sizing=PositionSizing(
|
|
portfolio_pct=result.position_sizing.portfolio_pct,
|
|
max_loss_pct=result.position_sizing.max_loss_pct,
|
|
),
|
|
evidence_refs=evidence_refs,
|
|
model_metadata=ModelMetadata(
|
|
provider=provider,
|
|
model_name=model_name,
|
|
prompt_version=prompt_version,
|
|
schema_version="1.0.0",
|
|
),
|
|
generated_at=reference_time,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persist recommendation to PostgreSQL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_INSERT_RECOMMENDATION = """
|
|
INSERT INTO recommendations (
|
|
ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7::jsonb, $8, $9,
|
|
$10, $11, $12, $13,
|
|
$14, $15
|
|
)
|
|
RETURNING id
|
|
"""
|
|
|
|
_INSERT_REC_EVIDENCE = """
|
|
INSERT INTO recommendation_evidence (
|
|
recommendation_id, document_id, evidence_type, weight
|
|
) VALUES ($1, $2::uuid, $3, $4)
|
|
"""
|
|
|
|
_INSERT_RISK_EVALUATION = """
|
|
INSERT INTO risk_evaluations (
|
|
recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
|
|
) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb, $6)
|
|
"""
|
|
|
|
_FETCH_RECOMMENDATION = """
|
|
SELECT
|
|
id, ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
FROM recommendations
|
|
WHERE id = $1::uuid
|
|
"""
|
|
|
|
_FETCH_REC_EVIDENCE = """
|
|
SELECT document_id, evidence_type, weight
|
|
FROM recommendation_evidence
|
|
WHERE recommendation_id = $1::uuid
|
|
ORDER BY evidence_type, weight DESC
|
|
"""
|
|
|
|
_FETCH_LATEST_RECS_FOR_TICKER = """
|
|
SELECT
|
|
id, ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
FROM recommendations
|
|
WHERE ticker = $1
|
|
ORDER BY generated_at DESC
|
|
LIMIT $2
|
|
"""
|
|
|
|
|
|
def _extract_risk_classification(thesis: str) -> str:
|
|
"""Extract the risk classification from the thesis prefix."""
|
|
if thesis.startswith("[risk:"):
|
|
end = thesis.find("]")
|
|
if end > 6:
|
|
return thesis[6:end]
|
|
return "moderate"
|
|
|
|
|
|
async def persist_recommendation(
|
|
pool: asyncpg.Pool,
|
|
rec: Recommendation,
|
|
supporting_ids: list[str],
|
|
opposing_ids: list[str],
|
|
eligibility_result: EligibilityResult | None = None,
|
|
) -> str:
|
|
"""Insert a recommendation, evidence citations, and risk evaluation.
|
|
|
|
Persists the full model metadata and risk classification for audit
|
|
trail (Requirement 8.3). Also writes the eligibility decision to
|
|
the risk_evaluations table when provided.
|
|
|
|
Returns the recommendation UUID.
|
|
"""
|
|
risk_class = _extract_risk_classification(rec.thesis)
|
|
|
|
row = await pool.fetchrow(
|
|
_INSERT_RECOMMENDATION,
|
|
rec.ticker,
|
|
rec.action.value,
|
|
rec.mode.value,
|
|
rec.confidence,
|
|
rec.time_horizon,
|
|
rec.thesis,
|
|
json.dumps(rec.invalidation_conditions),
|
|
rec.position_sizing.portfolio_pct,
|
|
rec.position_sizing.max_loss_pct,
|
|
rec.model_metadata.model_name,
|
|
rec.model_metadata.provider,
|
|
rec.model_metadata.prompt_version,
|
|
rec.model_metadata.schema_version,
|
|
risk_class,
|
|
rec.generated_at,
|
|
)
|
|
rec_id = str(row["id"])
|
|
|
|
# Insert evidence citations with position-based weighting
|
|
evidence_rows: list[tuple[str, str, str, float]] = []
|
|
for idx, doc_id in enumerate(supporting_ids):
|
|
weight = round(1.0 / (1.0 + idx * 0.1), 4) # rank decay
|
|
evidence_rows.append((rec_id, doc_id, "supporting", weight))
|
|
for idx, doc_id in enumerate(opposing_ids):
|
|
weight = round(1.0 / (1.0 + idx * 0.1), 4)
|
|
evidence_rows.append((rec_id, doc_id, "opposing", weight))
|
|
|
|
if evidence_rows:
|
|
await pool.executemany(_INSERT_REC_EVIDENCE, evidence_rows)
|
|
|
|
# Persist the eligibility/risk evaluation for audit trail
|
|
if eligibility_result is not None:
|
|
rejection_reasons_json = json.dumps(
|
|
[r.value for r in eligibility_result.rejection_reasons]
|
|
)
|
|
risk_checks = {
|
|
"time_horizon": eligibility_result.time_horizon,
|
|
"position_sizing": {
|
|
"portfolio_pct": eligibility_result.position_sizing.portfolio_pct,
|
|
"max_loss_pct": eligibility_result.position_sizing.max_loss_pct,
|
|
},
|
|
"invalidation_conditions": eligibility_result.invalidation_conditions,
|
|
"risk_classification": risk_class,
|
|
}
|
|
await pool.execute(
|
|
_INSERT_RISK_EVALUATION,
|
|
rec_id,
|
|
eligibility_result.eligible,
|
|
eligibility_result.mode.value,
|
|
rejection_reasons_json,
|
|
json.dumps(risk_checks),
|
|
rec.generated_at,
|
|
)
|
|
|
|
return rec_id
|
|
|
|
|
|
async def fetch_recommendation_by_id(
|
|
pool: asyncpg.Pool,
|
|
recommendation_id: str,
|
|
) -> dict[str, object] | None:
|
|
"""Fetch a persisted recommendation with its evidence citations.
|
|
|
|
Returns a dict with the recommendation fields and an 'evidence' list,
|
|
or None if not found.
|
|
"""
|
|
row = await pool.fetchrow(_FETCH_RECOMMENDATION, recommendation_id)
|
|
if row is None:
|
|
return None
|
|
|
|
rec_dict = dict(row)
|
|
# Parse JSONB fields
|
|
if isinstance(rec_dict.get("invalidation_conditions"), str):
|
|
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
|
|
|
|
# Fetch evidence
|
|
evidence_rows = await pool.fetch(_FETCH_REC_EVIDENCE, recommendation_id)
|
|
rec_dict["evidence"] = [
|
|
{
|
|
"document_id": str(e["document_id"]),
|
|
"evidence_type": e["evidence_type"],
|
|
"weight": float(e["weight"]),
|
|
}
|
|
for e in evidence_rows
|
|
]
|
|
|
|
return rec_dict
|
|
|
|
|
|
async def fetch_latest_recommendations(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
limit: int = 10,
|
|
) -> list[dict[str, object]]:
|
|
"""Fetch the most recent recommendations for a ticker.
|
|
|
|
Returns a list of recommendation dicts (without evidence — use
|
|
fetch_recommendation_by_id for full detail).
|
|
"""
|
|
rows = await pool.fetch(_FETCH_LATEST_RECS_FOR_TICKER, ticker, limit)
|
|
results = []
|
|
for row in rows:
|
|
rec_dict = dict(row)
|
|
if isinstance(rec_dict.get("invalidation_conditions"), str):
|
|
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
|
|
results.append(rec_dict)
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main entry point: generate recommendation for a ticker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_DEDUP_CHECK_QUERY = """
|
|
SELECT r.id, r.confidence, r.action, r.mode, r.generated_at
|
|
FROM recommendations r
|
|
WHERE r.ticker = $1
|
|
AND r.time_horizon LIKE $2 || '%'
|
|
ORDER BY r.generated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def _is_duplicate_recommendation(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
) -> bool:
|
|
"""Check if the latest recommendation for this ticker+window is effectively identical.
|
|
|
|
Compares confidence, action, and mode. If they match, the underlying
|
|
trend data hasn't changed meaningfully and we skip regeneration.
|
|
"""
|
|
horizon_prefix = _map_time_horizon_prefix(summary.window.value)
|
|
row = await pool.fetchrow(_DEDUP_CHECK_QUERY, ticker, horizon_prefix)
|
|
if row is None:
|
|
return False
|
|
|
|
# If the previous recommendation has the same action, mode, and confidence
|
|
# (within a small tolerance), it's a duplicate
|
|
prev_confidence = float(row["confidence"])
|
|
prev_action = row["action"]
|
|
prev_mode = row["mode"]
|
|
|
|
same_action = prev_action == result.action.value
|
|
same_mode = prev_mode == result.mode.value
|
|
same_confidence = abs(prev_confidence - summary.confidence) < 0.01
|
|
|
|
if same_action and same_mode and same_confidence:
|
|
logger.info(
|
|
"Skipping duplicate recommendation for %s — action=%s mode=%s "
|
|
"confidence=%.3f matches previous",
|
|
ticker, prev_action, prev_mode, prev_confidence,
|
|
)
|
|
return True
|
|
return False
|
|
|
|
|
|
def _map_time_horizon_prefix(window: str) -> str:
|
|
"""Map window to the time_horizon prefix for dedup matching."""
|
|
mapping = {
|
|
"intraday": "intraday",
|
|
"1d": "swing_1d",
|
|
"7d": "swing_1d",
|
|
"30d": "position_10d",
|
|
"90d": "position_30d",
|
|
}
|
|
return mapping.get(window, "window_")
|
|
|
|
|
|
async def generate_recommendation(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str = TrendWindow.SEVEN_DAY.value,
|
|
config: EligibilityConfig | None = None,
|
|
reference_time: datetime | None = None,
|
|
ollama_config: OllamaConfig | None = None,
|
|
suppression_config: SuppressionConfig | None = None,
|
|
minio_client: Minio | None = None,
|
|
) -> Recommendation | None:
|
|
"""Generate and persist a recommendation for a ticker from its latest trend.
|
|
|
|
Steps:
|
|
1. Fetch the latest trend summary for the ticker + window.
|
|
1b. Skip if the latest recommendation for this ticker is effectively identical.
|
|
2. Fetch the latest trend projection (Requirement 12.8, 12.9).
|
|
3. Evaluate data quality suppression (Requirement 7.4).
|
|
4. Evaluate eligibility using deterministic rules.
|
|
5. Build a Recommendation object with thesis and evidence.
|
|
- If ``ollama_config`` is provided, the deterministic thesis is
|
|
rewritten into analyst-quality prose via the LLM wording layer.
|
|
6. Persist the recommendation and evidence citations.
|
|
|
|
Returns the Recommendation, or None if no trend data exists or recommendation
|
|
is a duplicate of the most recent one.
|
|
"""
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
cfg = config or EligibilityConfig()
|
|
sup_cfg = suppression_config or SuppressionConfig()
|
|
|
|
# 1. Fetch latest trend
|
|
summary = await fetch_latest_trend(pool, ticker, window)
|
|
if summary is None:
|
|
logger.info("No trend data for %s/%s — skipping recommendation", ticker, window)
|
|
return None
|
|
|
|
# 1b. Check for duplicate: evaluate eligibility early for dedup comparison
|
|
preliminary_result = evaluate_eligibility(summary, cfg)
|
|
if await _is_duplicate_recommendation(pool, ticker, summary, preliminary_result):
|
|
return None
|
|
|
|
# 2. Fetch latest trend projection (Requirement 12.8, 12.9)
|
|
projection = await fetch_latest_projection(pool, ticker, window)
|
|
# Exclude low-confidence projections from influencing recommendation
|
|
# eligibility (Requirement 12.9). The projection is still passed to
|
|
# build_recommendation for informational display, but marked as
|
|
# low_confidence so it won't affect thesis or time_horizon.
|
|
effective_projection = projection
|
|
if projection is not None and projection.low_confidence:
|
|
effective_projection = projection # still passed, but build_thesis checks low_confidence
|
|
|
|
# 3. Evaluate data quality suppression (Requirement 7.4)
|
|
quality_ctx = await fetch_data_quality_context(pool, ticker, window)
|
|
suppression = evaluate_suppression(
|
|
summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time,
|
|
)
|
|
|
|
# 4. Evaluate eligibility (use preliminary result, already computed for dedup)
|
|
result = preliminary_result
|
|
|
|
# Apply suppression: force mode to informational if suppressed
|
|
if suppression.suppressed:
|
|
result = EligibilityResult(
|
|
eligible=False,
|
|
action=result.action,
|
|
mode=RecommendationMode.INFORMATIONAL,
|
|
position_sizing=result.position_sizing,
|
|
rejection_reasons=result.rejection_reasons,
|
|
time_horizon=result.time_horizon,
|
|
invalidation_conditions=result.invalidation_conditions,
|
|
)
|
|
|
|
# 5. Optional LLM thesis rewrite
|
|
llm_thesis: str | None = None
|
|
if ollama_config is not None:
|
|
deterministic_thesis = build_thesis(summary, result, projection=effective_projection)
|
|
llm_thesis = await rewrite_thesis_with_llm(
|
|
deterministic_thesis=deterministic_thesis,
|
|
summary=summary,
|
|
config=ollama_config,
|
|
)
|
|
# If the LLM returned the same text as the deterministic thesis,
|
|
# treat it as a no-op (fallback was used).
|
|
if llm_thesis == deterministic_thesis:
|
|
llm_thesis = None
|
|
|
|
# 6. Build recommendation
|
|
rec = build_recommendation(
|
|
summary, result, reference_time, llm_thesis=llm_thesis,
|
|
suppression_result=suppression,
|
|
projection=effective_projection,
|
|
)
|
|
|
|
# 7. Persist recommendation, evidence citations, and risk evaluation
|
|
rec_id = await persist_recommendation(
|
|
pool,
|
|
rec,
|
|
supporting_ids=list(summary.top_supporting_evidence),
|
|
opposing_ids=list(summary.top_opposing_evidence),
|
|
eligibility_result=result,
|
|
)
|
|
|
|
# 8. Publish prediction facts to analytical tables (Requirement 9.4)
|
|
if minio_client is not None:
|
|
try:
|
|
lake_refs = publish_recommendation_facts(
|
|
minio_client,
|
|
rec,
|
|
trend_direction=summary.trend_direction.value,
|
|
trend_strength=summary.trend_strength,
|
|
)
|
|
logger.info(
|
|
"Published analytical facts for %s: %s",
|
|
ticker, lake_refs,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to publish analytical facts for %s/%s — recommendation "
|
|
"persisted but lake publication failed",
|
|
ticker, rec_id, exc_info=True,
|
|
)
|
|
|
|
logger.info(
|
|
"Generated recommendation %s for %s: action=%s mode=%s confidence=%.3f "
|
|
"eligible=%s suppressed=%s quality_score=%.3f llm_thesis=%s projection=%s",
|
|
rec_id, ticker, rec.action.value, rec.mode.value, rec.confidence,
|
|
result.eligible, suppression.suppressed, suppression.data_quality_score,
|
|
llm_thesis is not None,
|
|
projection.projected_direction if projection else "none",
|
|
)
|
|
|
|
# Prometheus metrics
|
|
RECOMMENDATION_GENERATED.labels(action=rec.action.value, mode=rec.mode.value).inc()
|
|
RECOMMENDATION_CONFIDENCE.observe(rec.confidence)
|
|
if suppression.suppressed:
|
|
RECOMMENDATION_SUPPRESSED.inc()
|
|
|
|
return rec
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch: generate recommendations for multiple tickers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def generate_recommendations_batch(
|
|
pool: asyncpg.Pool,
|
|
tickers: list[str],
|
|
window: str = TrendWindow.SEVEN_DAY.value,
|
|
config: EligibilityConfig | None = None,
|
|
ollama_config: OllamaConfig | None = None,
|
|
suppression_config: SuppressionConfig | None = None,
|
|
minio_client: Minio | None = None,
|
|
) -> list[Recommendation]:
|
|
"""Generate recommendations for a list of tickers.
|
|
|
|
Processes each ticker sequentially. Returns only the successfully
|
|
generated recommendations (tickers with no trend data are skipped).
|
|
|
|
If ``ollama_config`` is provided, each recommendation's thesis will
|
|
be rewritten using the LLM wording layer.
|
|
"""
|
|
results: list[Recommendation] = []
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
for ticker in tickers:
|
|
rec = await generate_recommendation(
|
|
pool, ticker, window, config, reference_time,
|
|
ollama_config=ollama_config,
|
|
suppression_config=suppression_config,
|
|
minio_client=minio_client,
|
|
)
|
|
if rec is not None:
|
|
results.append(rec)
|
|
|
|
logger.info(
|
|
"Batch recommendation: %d/%d tickers produced recommendations",
|
|
len(results), len(tickers),
|
|
)
|
|
return results
|