Files
stonks-oracle/services/recommendation/worker.py
T
Celes Renata f11aa0a1ee fix: deduplicate recommendations and widen position sizing range
- Add dedup check in recommendation worker: skip generation when latest
  rec for same ticker+window has identical action/mode/confidence
- Widen position sizing range (1-10% portfolio, 0.3-2% max loss) and
  factor in trend strength + evidence count for differentiated sizing
- API returns only latest recommendation per ticker by default (DISTINCT ON)
  to eliminate duplicate rows in the frontend list view
2026-04-17 00:15:32 +00:00

899 lines
31 KiB
Python

"""Recommendation worker - generates explainable trade recommendations from trend data.
Fetches the latest trend summaries for a ticker, evaluates eligibility
using deterministic rules, builds Recommendation objects with thesis
and evidence citations, and persists them to PostgreSQL.
Requirements: 7.1, 7.2, 7.3, 7.4
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
import asyncpg
from minio import Minio
from services.aggregation.projection import TrendProjection
from services.lake_publisher.worker import publish_recommendation_facts
from services.recommendation.eligibility import (
EligibilityConfig,
EligibilityResult,
evaluate_eligibility,
)
from services.recommendation.suppression import (
DataQualityContext,
SuppressionConfig,
SuppressionResult,
evaluate_suppression,
)
from services.recommendation.thesis_llm import (
THESIS_PROMPT_VERSION,
rewrite_thesis_with_llm,
)
from services.shared.config import OllamaConfig
from services.shared.metrics import (
RECOMMENDATION_CONFIDENCE,
RECOMMENDATION_GENERATED,
RECOMMENDATION_SUPPRESSED,
)
from services.shared.schemas import (
ModelMetadata,
PositionSizing,
Recommendation,
RecommendationMode,
TrendDirection,
TrendSummary,
TrendWindow,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Fetch latest trend summary for a ticker + window
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Fetch data quality context for suppression checks
# ---------------------------------------------------------------------------
_DATA_QUALITY_QUERY = """
WITH latest_trend AS (
SELECT top_supporting_evidence, top_opposing_evidence
FROM trend_windows
WHERE entity_id = $1 AND "window" = $2
ORDER BY generated_at DESC
LIMIT 1
),
evidence_ids AS (
SELECT jsonb_array_elements_text(
COALESCE(lt.top_supporting_evidence, '[]'::jsonb)
|| COALESCE(lt.top_opposing_evidence, '[]'::jsonb)
) AS eid
FROM latest_trend lt
)
SELECT
COUNT(*) AS total_documents,
COUNT(*) FILTER (WHERE di.validation_status = 'valid') AS valid_documents,
COUNT(*) FILTER (WHERE di.validation_status = 'failed') AS failed_documents,
AVG(di.confidence) FILTER (WHERE di.validation_status = 'valid') AS avg_extraction_confidence,
MAX(d.published_at) AS newest_evidence_at,
ARRAY_AGG(DISTINCT d.source_type) FILTER (WHERE d.source_type IS NOT NULL) AS source_types
FROM documents d
JOIN document_intelligence di ON di.document_id = d.id
WHERE d.id::text IN (SELECT eid FROM evidence_ids)
"""
async def fetch_data_quality_context(
pool: asyncpg.Pool,
ticker: str,
window: str,
) -> DataQualityContext | None:
"""Fetch data quality metrics for the documents underlying a trend.
Returns None if the query fails or returns no data, in which case
the suppression module will fall back to summary-based estimation.
"""
try:
row = await pool.fetchrow(_DATA_QUALITY_QUERY, ticker, window)
if row is None or row["total_documents"] == 0:
return None
source_types_raw = row["source_types"]
source_types = set(source_types_raw) if source_types_raw else set()
return DataQualityContext(
total_documents=int(row["total_documents"]),
valid_documents=int(row["valid_documents"] or 0),
failed_documents=int(row["failed_documents"] or 0),
avg_extraction_confidence=float(row["avg_extraction_confidence"] or 0.0),
newest_evidence_at=row["newest_evidence_at"],
source_types=source_types,
)
except Exception:
logger.warning(
"Failed to fetch data quality context for %s/%s — will use summary fallback",
ticker, window, exc_info=True,
)
return None
_LATEST_TREND_QUERY = """
SELECT
entity_type, entity_id, "window", trend_direction, trend_strength,
confidence, top_supporting_evidence, top_opposing_evidence,
dominant_catalysts, material_risks, contradiction_score,
disagreement_details, market_context, generated_at
FROM trend_windows
WHERE entity_id = $1 AND "window" = $2
ORDER BY generated_at DESC
LIMIT 1
"""
def _parse_trend_row(row: asyncpg.Record) -> TrendSummary:
"""Convert a trend_windows row into a TrendSummary."""
supporting = row["top_supporting_evidence"]
if isinstance(supporting, str):
supporting = json.loads(supporting)
opposing = row["top_opposing_evidence"]
if isinstance(opposing, str):
opposing = json.loads(opposing)
catalysts = row["dominant_catalysts"]
if isinstance(catalysts, str):
catalysts = json.loads(catalysts)
risks = row["material_risks"]
if isinstance(risks, str):
risks = json.loads(risks)
return TrendSummary(
entity_type=row["entity_type"],
entity_id=row["entity_id"],
window=TrendWindow(row["window"]),
trend_direction=TrendDirection(row["trend_direction"]),
trend_strength=float(row["trend_strength"]),
confidence=float(row["confidence"]),
top_supporting_evidence=supporting or [],
top_opposing_evidence=opposing or [],
dominant_catalysts=catalysts or [],
material_risks=risks or [],
contradiction_score=float(row["contradiction_score"] or 0.0),
generated_at=row["generated_at"],
)
async def fetch_latest_trend(
pool: asyncpg.Pool,
ticker: str,
window: str,
) -> TrendSummary | None:
"""Fetch the most recent trend summary for a ticker and window."""
row = await pool.fetchrow(_LATEST_TREND_QUERY, ticker, window)
if row is None:
return None
return _parse_trend_row(row)
# ---------------------------------------------------------------------------
# Fetch latest trend projection for a ticker + window
# ---------------------------------------------------------------------------
_LATEST_PROJECTION_QUERY = """
SELECT
tp.projected_direction, tp.projected_strength, tp.projected_confidence,
tp.projection_horizon, tp.driving_factors, tp.macro_contribution_pct,
tp.diverges_from_current, tp.computed_at
FROM trend_projections tp
JOIN trend_windows tw ON tw.id = tp.trend_window_id
WHERE tw.entity_id = $1 AND tw."window" = $2
ORDER BY tp.computed_at DESC
LIMIT 1
"""
async def fetch_latest_projection(
pool: asyncpg.Pool,
ticker: str,
window: str,
) -> TrendProjection | None:
"""Fetch the most recent trend projection for a ticker and window.
Returns None if no projection exists. Low-confidence projections
are returned with low_confidence=True so callers can decide whether
to use them (Requirement 12.9).
"""
try:
row = await pool.fetchrow(_LATEST_PROJECTION_QUERY, ticker, window)
if row is None:
return None
driving_factors = row["driving_factors"]
if isinstance(driving_factors, str):
driving_factors = json.loads(driving_factors)
proj = TrendProjection(
projected_direction=row["projected_direction"],
projected_strength=float(row["projected_strength"]),
projected_confidence=float(row["projected_confidence"]),
projection_horizon=row["projection_horizon"],
driving_factors=driving_factors or [],
macro_contribution_pct=float(row["macro_contribution_pct"] or 0.0),
diverges_from_current=bool(row["diverges_from_current"]),
computed_at=row["computed_at"],
low_confidence=float(row["projected_confidence"]) < 0.3,
)
return proj
except Exception:
logger.warning(
"Failed to fetch projection for %s/%s — continuing without projection",
ticker, window, exc_info=True,
)
return None
# ---------------------------------------------------------------------------
# Build thesis from trend summary (deterministic, no LLM)
# ---------------------------------------------------------------------------
def build_thesis(
summary: TrendSummary,
result: EligibilityResult,
projection: TrendProjection | None = None,
) -> str:
"""Generate a deterministic thesis string from trend data.
This is the descriptive analysis portion (Requirement 7.2).
The LLM wording layer is a separate optional task.
When a TrendProjection is provided and is not low-confidence,
the thesis incorporates the projected direction and key driving
factors (Requirement 12.8).
"""
direction = summary.trend_direction.value
ticker = summary.entity_id
window = summary.window.value
strength = summary.trend_strength
confidence = summary.confidence
parts: list[str] = []
# Opening: direction and strength
parts.append(
f"{ticker} shows a {direction} trend over the {window} window "
+ f"with strength {strength:.2f} and confidence {confidence:.2f}."
)
# Catalysts
if summary.dominant_catalysts:
catalyst_str = ", ".join(summary.dominant_catalysts[:3])
parts.append(f"Dominant catalysts: {catalyst_str}.")
# Contradiction note (Requirement 7.2 — separate descriptive from prescriptive)
if summary.contradiction_score > 0.15:
parts.append(
"Notable signal disagreement detected "
+ f"(contradiction score: {summary.contradiction_score:.2f})."
)
# Trend projection (Requirement 12.8)
if projection is not None and not projection.low_confidence:
proj_dir = projection.projected_direction
proj_str = projection.projected_strength
parts.append(
f"Forward projection ({projection.projection_horizon}): "
f"{proj_dir} at strength {proj_str:.2f}."
)
# Include top driving factors
non_divergence_factors = [
f for f in projection.driving_factors
if not f.startswith("DIVERGENCE:")
]
if non_divergence_factors:
factors_str = "; ".join(non_divergence_factors[:2])
parts.append(f"Key drivers: {factors_str}.")
if projection.diverges_from_current:
parts.append(
f"Note: projection diverges from current {direction} trend."
)
# Risks
if summary.material_risks:
risk_str = "; ".join(summary.material_risks[:2])
parts.append(f"Key risks: {risk_str}.")
# Evidence count
supporting_count = len(summary.top_supporting_evidence)
opposing_count = len(summary.top_opposing_evidence)
parts.append(
f"Based on {supporting_count} supporting and "
+ f"{opposing_count} opposing evidence documents."
)
# Prescriptive action (separated per Requirement 7.2)
action = result.action.value.upper()
mode = result.mode.value.replace("_", " ")
parts.append(f"Recommendation: {action} ({mode}).")
return " ".join(parts)
# ---------------------------------------------------------------------------
# Build risk classification (Requirement 7.2)
# ---------------------------------------------------------------------------
def classify_risk(
summary: TrendSummary,
result: EligibilityResult,
) -> str:
"""Assign a risk classification label based on signal quality.
Returns one of: low, moderate, high, very_high.
"""
score = 0.0
# Contradiction raises risk
score += summary.contradiction_score * 2.0
# Low confidence raises risk
score += (1.0 - summary.confidence) * 1.5
# Low evidence count raises risk
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
if evidence_count < 3:
score += 1.0
elif evidence_count < 5:
score += 0.5
# Rejection reasons raise risk
score += len(result.rejection_reasons) * 0.5
if score >= 3.0:
return "very_high"
if score >= 2.0:
return "high"
if score >= 1.0:
return "moderate"
return "low"
# ---------------------------------------------------------------------------
# Build Recommendation from eligibility result
# ---------------------------------------------------------------------------
def build_recommendation(
summary: TrendSummary,
result: EligibilityResult,
reference_time: datetime | None = None,
llm_thesis: str | None = None,
suppression_result: SuppressionResult | None = None,
projection: TrendProjection | None = None,
) -> Recommendation:
"""Assemble a Recommendation object from a trend summary and eligibility result.
Combines all evidence refs (supporting + opposing) into the recommendation
so the full decision trace is available (Requirement 8.3).
If ``llm_thesis`` is provided (from the optional LLM wording layer),
it replaces the deterministic thesis text while preserving the risk
classification prefix.
If ``suppression_result`` indicates suppression, a suppression note
is appended to the thesis for audit visibility (Requirement 7.4).
If ``projection`` is provided and is not low-confidence, the thesis
incorporates projected direction and driving factors (Requirement 12.8).
The time_horizon may be refined based on the projection horizon.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
# Combine evidence refs — supporting first, then opposing
evidence_refs = list(summary.top_supporting_evidence) + list(summary.top_opposing_evidence)
deterministic_thesis = build_thesis(summary, result, projection=projection)
risk_class = classify_risk(summary, result)
# Use LLM-rewritten thesis if available, otherwise deterministic
thesis_body = llm_thesis if llm_thesis else deterministic_thesis
# Append suppression note if suppressed (Requirement 7.4)
if suppression_result and suppression_result.suppressed:
reason_strs = [r.value for r in suppression_result.reasons]
thesis_body += (
f" [SUPPRESSED: data quality below threshold "
f"(score={suppression_result.data_quality_score:.2f}, "
f"reasons={', '.join(reason_strs)})]"
)
# Determine time_horizon — refine with projection horizon if available
# (Requirement 12.8)
time_horizon = result.time_horizon
if projection is not None and not projection.low_confidence:
# Append projection horizon context to time_horizon
time_horizon = f"{result.time_horizon} (proj:{projection.projection_horizon})"
# Track whether the thesis was LLM-generated for audit
if llm_thesis:
provider = "ollama"
model_name = "thesis-rewrite"
prompt_version = THESIS_PROMPT_VERSION
else:
provider = "deterministic"
model_name = "eligibility-v1"
prompt_version = ""
return Recommendation(
ticker=summary.entity_id,
action=result.action,
mode=result.mode,
confidence=summary.confidence,
time_horizon=time_horizon,
thesis=f"[risk:{risk_class}] {thesis_body}",
invalidation_conditions=result.invalidation_conditions,
position_sizing=PositionSizing(
portfolio_pct=result.position_sizing.portfolio_pct,
max_loss_pct=result.position_sizing.max_loss_pct,
),
evidence_refs=evidence_refs,
model_metadata=ModelMetadata(
provider=provider,
model_name=model_name,
prompt_version=prompt_version,
schema_version="1.0.0",
),
generated_at=reference_time,
)
# ---------------------------------------------------------------------------
# Persist recommendation to PostgreSQL
# ---------------------------------------------------------------------------
_INSERT_RECOMMENDATION = """
INSERT INTO recommendations (
ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
) VALUES (
$1, $2, $3, $4, $5,
$6, $7::jsonb, $8, $9,
$10, $11, $12, $13,
$14, $15
)
RETURNING id
"""
_INSERT_REC_EVIDENCE = """
INSERT INTO recommendation_evidence (
recommendation_id, document_id, evidence_type, weight
) VALUES ($1, $2::uuid, $3, $4)
"""
_INSERT_RISK_EVALUATION = """
INSERT INTO risk_evaluations (
recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb, $6)
"""
_FETCH_RECOMMENDATION = """
SELECT
id, ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
FROM recommendations
WHERE id = $1::uuid
"""
_FETCH_REC_EVIDENCE = """
SELECT document_id, evidence_type, weight
FROM recommendation_evidence
WHERE recommendation_id = $1::uuid
ORDER BY evidence_type, weight DESC
"""
_FETCH_LATEST_RECS_FOR_TICKER = """
SELECT
id, ticker, action, mode, confidence, time_horizon,
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
model_version, model_provider, prompt_version, schema_version,
risk_classification, generated_at
FROM recommendations
WHERE ticker = $1
ORDER BY generated_at DESC
LIMIT $2
"""
def _extract_risk_classification(thesis: str) -> str:
"""Extract the risk classification from the thesis prefix."""
if thesis.startswith("[risk:"):
end = thesis.find("]")
if end > 6:
return thesis[6:end]
return "moderate"
async def persist_recommendation(
pool: asyncpg.Pool,
rec: Recommendation,
supporting_ids: list[str],
opposing_ids: list[str],
eligibility_result: EligibilityResult | None = None,
) -> str:
"""Insert a recommendation, evidence citations, and risk evaluation.
Persists the full model metadata and risk classification for audit
trail (Requirement 8.3). Also writes the eligibility decision to
the risk_evaluations table when provided.
Returns the recommendation UUID.
"""
risk_class = _extract_risk_classification(rec.thesis)
row = await pool.fetchrow(
_INSERT_RECOMMENDATION,
rec.ticker,
rec.action.value,
rec.mode.value,
rec.confidence,
rec.time_horizon,
rec.thesis,
json.dumps(rec.invalidation_conditions),
rec.position_sizing.portfolio_pct,
rec.position_sizing.max_loss_pct,
rec.model_metadata.model_name,
rec.model_metadata.provider,
rec.model_metadata.prompt_version,
rec.model_metadata.schema_version,
risk_class,
rec.generated_at,
)
rec_id = str(row["id"])
# Insert evidence citations with position-based weighting
evidence_rows: list[tuple[str, str, str, float]] = []
for idx, doc_id in enumerate(supporting_ids):
weight = round(1.0 / (1.0 + idx * 0.1), 4) # rank decay
evidence_rows.append((rec_id, doc_id, "supporting", weight))
for idx, doc_id in enumerate(opposing_ids):
weight = round(1.0 / (1.0 + idx * 0.1), 4)
evidence_rows.append((rec_id, doc_id, "opposing", weight))
if evidence_rows:
await pool.executemany(_INSERT_REC_EVIDENCE, evidence_rows)
# Persist the eligibility/risk evaluation for audit trail
if eligibility_result is not None:
rejection_reasons_json = json.dumps(
[r.value for r in eligibility_result.rejection_reasons]
)
risk_checks = {
"time_horizon": eligibility_result.time_horizon,
"position_sizing": {
"portfolio_pct": eligibility_result.position_sizing.portfolio_pct,
"max_loss_pct": eligibility_result.position_sizing.max_loss_pct,
},
"invalidation_conditions": eligibility_result.invalidation_conditions,
"risk_classification": risk_class,
}
await pool.execute(
_INSERT_RISK_EVALUATION,
rec_id,
eligibility_result.eligible,
eligibility_result.mode.value,
rejection_reasons_json,
json.dumps(risk_checks),
rec.generated_at,
)
return rec_id
async def fetch_recommendation_by_id(
pool: asyncpg.Pool,
recommendation_id: str,
) -> dict[str, object] | None:
"""Fetch a persisted recommendation with its evidence citations.
Returns a dict with the recommendation fields and an 'evidence' list,
or None if not found.
"""
row = await pool.fetchrow(_FETCH_RECOMMENDATION, recommendation_id)
if row is None:
return None
rec_dict = dict(row)
# Parse JSONB fields
if isinstance(rec_dict.get("invalidation_conditions"), str):
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
# Fetch evidence
evidence_rows = await pool.fetch(_FETCH_REC_EVIDENCE, recommendation_id)
rec_dict["evidence"] = [
{
"document_id": str(e["document_id"]),
"evidence_type": e["evidence_type"],
"weight": float(e["weight"]),
}
for e in evidence_rows
]
return rec_dict
async def fetch_latest_recommendations(
pool: asyncpg.Pool,
ticker: str,
limit: int = 10,
) -> list[dict[str, object]]:
"""Fetch the most recent recommendations for a ticker.
Returns a list of recommendation dicts (without evidence — use
fetch_recommendation_by_id for full detail).
"""
rows = await pool.fetch(_FETCH_LATEST_RECS_FOR_TICKER, ticker, limit)
results = []
for row in rows:
rec_dict = dict(row)
if isinstance(rec_dict.get("invalidation_conditions"), str):
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
results.append(rec_dict)
return results
# ---------------------------------------------------------------------------
# Main entry point: generate recommendation for a ticker
# ---------------------------------------------------------------------------
_DEDUP_CHECK_QUERY = """
SELECT r.id, r.confidence, r.action, r.mode, r.generated_at
FROM recommendations r
WHERE r.ticker = $1
AND r.time_horizon LIKE $2 || '%'
ORDER BY r.generated_at DESC
LIMIT 1
"""
async def _is_duplicate_recommendation(
pool: asyncpg.Pool,
ticker: str,
summary: TrendSummary,
result: EligibilityResult,
) -> bool:
"""Check if the latest recommendation for this ticker+window is effectively identical.
Compares confidence, action, and mode. If they match, the underlying
trend data hasn't changed meaningfully and we skip regeneration.
"""
horizon_prefix = _map_time_horizon_prefix(summary.window.value)
row = await pool.fetchrow(_DEDUP_CHECK_QUERY, ticker, horizon_prefix)
if row is None:
return False
# If the previous recommendation has the same action, mode, and confidence
# (within a small tolerance), it's a duplicate
prev_confidence = float(row["confidence"])
prev_action = row["action"]
prev_mode = row["mode"]
same_action = prev_action == result.action.value
same_mode = prev_mode == result.mode.value
same_confidence = abs(prev_confidence - summary.confidence) < 0.01
if same_action and same_mode and same_confidence:
logger.info(
"Skipping duplicate recommendation for %s — action=%s mode=%s "
"confidence=%.3f matches previous",
ticker, prev_action, prev_mode, prev_confidence,
)
return True
return False
def _map_time_horizon_prefix(window: str) -> str:
"""Map window to the time_horizon prefix for dedup matching."""
mapping = {
"intraday": "intraday",
"1d": "swing_1d",
"7d": "swing_1d",
"30d": "position_10d",
"90d": "position_30d",
}
return mapping.get(window, "window_")
async def generate_recommendation(
pool: asyncpg.Pool,
ticker: str,
window: str = TrendWindow.SEVEN_DAY.value,
config: EligibilityConfig | None = None,
reference_time: datetime | None = None,
ollama_config: OllamaConfig | None = None,
suppression_config: SuppressionConfig | None = None,
minio_client: Minio | None = None,
) -> Recommendation | None:
"""Generate and persist a recommendation for a ticker from its latest trend.
Steps:
1. Fetch the latest trend summary for the ticker + window.
1b. Skip if the latest recommendation for this ticker is effectively identical.
2. Fetch the latest trend projection (Requirement 12.8, 12.9).
3. Evaluate data quality suppression (Requirement 7.4).
4. Evaluate eligibility using deterministic rules.
5. Build a Recommendation object with thesis and evidence.
- If ``ollama_config`` is provided, the deterministic thesis is
rewritten into analyst-quality prose via the LLM wording layer.
6. Persist the recommendation and evidence citations.
Returns the Recommendation, or None if no trend data exists or recommendation
is a duplicate of the most recent one.
"""
if reference_time is None:
reference_time = datetime.now(timezone.utc)
cfg = config or EligibilityConfig()
sup_cfg = suppression_config or SuppressionConfig()
# 1. Fetch latest trend
summary = await fetch_latest_trend(pool, ticker, window)
if summary is None:
logger.info("No trend data for %s/%s — skipping recommendation", ticker, window)
return None
# 1b. Check for duplicate: evaluate eligibility early for dedup comparison
preliminary_result = evaluate_eligibility(summary, cfg)
if await _is_duplicate_recommendation(pool, ticker, summary, preliminary_result):
return None
# 2. Fetch latest trend projection (Requirement 12.8, 12.9)
projection = await fetch_latest_projection(pool, ticker, window)
# Exclude low-confidence projections from influencing recommendation
# eligibility (Requirement 12.9). The projection is still passed to
# build_recommendation for informational display, but marked as
# low_confidence so it won't affect thesis or time_horizon.
effective_projection = projection
if projection is not None and projection.low_confidence:
effective_projection = projection # still passed, but build_thesis checks low_confidence
# 3. Evaluate data quality suppression (Requirement 7.4)
quality_ctx = await fetch_data_quality_context(pool, ticker, window)
suppression = evaluate_suppression(
summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time,
)
# 4. Evaluate eligibility (use preliminary result, already computed for dedup)
result = preliminary_result
# Apply suppression: force mode to informational if suppressed
if suppression.suppressed:
result = EligibilityResult(
eligible=False,
action=result.action,
mode=RecommendationMode.INFORMATIONAL,
position_sizing=result.position_sizing,
rejection_reasons=result.rejection_reasons,
time_horizon=result.time_horizon,
invalidation_conditions=result.invalidation_conditions,
)
# 5. Optional LLM thesis rewrite
llm_thesis: str | None = None
if ollama_config is not None:
deterministic_thesis = build_thesis(summary, result, projection=effective_projection)
llm_thesis = await rewrite_thesis_with_llm(
deterministic_thesis=deterministic_thesis,
summary=summary,
config=ollama_config,
)
# If the LLM returned the same text as the deterministic thesis,
# treat it as a no-op (fallback was used).
if llm_thesis == deterministic_thesis:
llm_thesis = None
# 6. Build recommendation
rec = build_recommendation(
summary, result, reference_time, llm_thesis=llm_thesis,
suppression_result=suppression,
projection=effective_projection,
)
# 7. Persist recommendation, evidence citations, and risk evaluation
rec_id = await persist_recommendation(
pool,
rec,
supporting_ids=list(summary.top_supporting_evidence),
opposing_ids=list(summary.top_opposing_evidence),
eligibility_result=result,
)
# 8. Publish prediction facts to analytical tables (Requirement 9.4)
if minio_client is not None:
try:
lake_refs = publish_recommendation_facts(
minio_client,
rec,
trend_direction=summary.trend_direction.value,
trend_strength=summary.trend_strength,
)
logger.info(
"Published analytical facts for %s: %s",
ticker, lake_refs,
)
except Exception:
logger.warning(
"Failed to publish analytical facts for %s/%s — recommendation "
"persisted but lake publication failed",
ticker, rec_id, exc_info=True,
)
logger.info(
"Generated recommendation %s for %s: action=%s mode=%s confidence=%.3f "
"eligible=%s suppressed=%s quality_score=%.3f llm_thesis=%s projection=%s",
rec_id, ticker, rec.action.value, rec.mode.value, rec.confidence,
result.eligible, suppression.suppressed, suppression.data_quality_score,
llm_thesis is not None,
projection.projected_direction if projection else "none",
)
# Prometheus metrics
RECOMMENDATION_GENERATED.labels(action=rec.action.value, mode=rec.mode.value).inc()
RECOMMENDATION_CONFIDENCE.observe(rec.confidence)
if suppression.suppressed:
RECOMMENDATION_SUPPRESSED.inc()
return rec
# ---------------------------------------------------------------------------
# Batch: generate recommendations for multiple tickers
# ---------------------------------------------------------------------------
async def generate_recommendations_batch(
pool: asyncpg.Pool,
tickers: list[str],
window: str = TrendWindow.SEVEN_DAY.value,
config: EligibilityConfig | None = None,
ollama_config: OllamaConfig | None = None,
suppression_config: SuppressionConfig | None = None,
minio_client: Minio | None = None,
) -> list[Recommendation]:
"""Generate recommendations for a list of tickers.
Processes each ticker sequentially. Returns only the successfully
generated recommendations (tickers with no trend data are skipped).
If ``ollama_config`` is provided, each recommendation's thesis will
be rewritten using the LLM wording layer.
"""
results: list[Recommendation] = []
reference_time = datetime.now(timezone.utc)
for ticker in tickers:
rec = await generate_recommendation(
pool, ticker, window, config, reference_time,
ollama_config=ollama_config,
suppression_config=suppression_config,
minio_client=minio_client,
)
if rec is not None:
results.append(rec)
logger.info(
"Batch recommendation: %d/%d tickers produced recommendations",
len(results), len(tickers),
)
return results