4e010bc048
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
Implement full probabilistic signal processing pipeline gated behind probabilistic_scoring_enabled feature flag in risk_configs: - Bayesian log-likelihood accumulator with Beta posterior and entropy - Regime detector (trend-following, panic, mean-reversion, uncertainty) - Source accuracy tracker with per-source historical prediction accuracy - Sigmoid confidence gate replacing binary gate - Information gain surprise weighting for rare events - Adaptive recency decay with event-specific half-lives - Regime multiplier replacing market context multiplier - Weighted disagreement entropy for contradiction detection - Multiplicative macro exposure with conditional integration - Graph-distance attenuated competitive signal propagation - Exponentially weighted momentum with volatility scaling - Expected value recommendation gate All changes backward-compatible: flag=false preserves exact current behavior. New outputs stored in existing JSONB columns (no schema changes except source_accuracy table via migration 034). Tests: 26 property-based tests (14 correctness properties), 99 unit tests, 1789 total tests passing with zero regressions.
928 lines
32 KiB
Python
928 lines
32 KiB
Python
"""Recommendation worker - generates explainable trade recommendations from trend data.
|
|
|
|
Fetches the latest trend summaries for a ticker, evaluates eligibility
|
|
using deterministic rules, builds Recommendation objects with thesis
|
|
and evidence citations, and persists them to PostgreSQL.
|
|
|
|
Requirements: 7.1, 7.2, 7.3, 7.4
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import uuid as _uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import asyncpg
|
|
from minio import Minio
|
|
|
|
from services.aggregation.projection import TrendProjection
|
|
from services.lake_publisher.worker import publish_recommendation_facts
|
|
from services.recommendation.eligibility import (
|
|
EligibilityConfig,
|
|
EligibilityResult,
|
|
evaluate_eligibility,
|
|
)
|
|
from services.recommendation.suppression import (
|
|
DataQualityContext,
|
|
SuppressionConfig,
|
|
SuppressionResult,
|
|
evaluate_suppression,
|
|
)
|
|
from services.recommendation.thesis_llm import (
|
|
THESIS_PROMPT_VERSION,
|
|
rewrite_thesis_with_llm,
|
|
)
|
|
from services.shared.config import OllamaConfig
|
|
from services.shared.metrics import (
|
|
RECOMMENDATION_CONFIDENCE,
|
|
RECOMMENDATION_GENERATED,
|
|
RECOMMENDATION_SUPPRESSED,
|
|
)
|
|
from services.shared.schemas import (
|
|
ModelMetadata,
|
|
PositionSizing,
|
|
Recommendation,
|
|
RecommendationMode,
|
|
TrendDirection,
|
|
TrendSummary,
|
|
TrendWindow,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch latest trend summary for a ticker + window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch data quality context for suppression checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_DATA_QUALITY_QUERY = """
|
|
WITH latest_trend AS (
|
|
SELECT top_supporting_evidence, top_opposing_evidence
|
|
FROM trend_windows
|
|
WHERE entity_id = $1 AND "window" = $2
|
|
ORDER BY generated_at DESC
|
|
LIMIT 1
|
|
),
|
|
evidence_ids AS (
|
|
SELECT jsonb_array_elements_text(
|
|
COALESCE(lt.top_supporting_evidence, '[]'::jsonb)
|
|
|| COALESCE(lt.top_opposing_evidence, '[]'::jsonb)
|
|
) AS eid
|
|
FROM latest_trend lt
|
|
)
|
|
SELECT
|
|
COUNT(*) AS total_documents,
|
|
COUNT(*) FILTER (WHERE di.validation_status = 'valid') AS valid_documents,
|
|
COUNT(*) FILTER (WHERE di.validation_status = 'failed') AS failed_documents,
|
|
AVG(di.confidence) FILTER (WHERE di.validation_status = 'valid') AS avg_extraction_confidence,
|
|
MAX(d.published_at) AS newest_evidence_at,
|
|
ARRAY_AGG(DISTINCT d.source_type) FILTER (WHERE d.source_type IS NOT NULL) AS source_types
|
|
FROM documents d
|
|
JOIN document_intelligence di ON di.document_id = d.id
|
|
WHERE d.id::text IN (SELECT eid FROM evidence_ids)
|
|
"""
|
|
|
|
|
|
async def fetch_data_quality_context(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> DataQualityContext | None:
|
|
"""Fetch data quality metrics for the documents underlying a trend.
|
|
|
|
Returns None if the query fails or returns no data, in which case
|
|
the suppression module will fall back to summary-based estimation.
|
|
"""
|
|
try:
|
|
row = await pool.fetchrow(_DATA_QUALITY_QUERY, ticker, window)
|
|
if row is None or row["total_documents"] == 0:
|
|
return None
|
|
|
|
source_types_raw = row["source_types"]
|
|
source_types = set(source_types_raw) if source_types_raw else set()
|
|
|
|
return DataQualityContext(
|
|
total_documents=int(row["total_documents"]),
|
|
valid_documents=int(row["valid_documents"] or 0),
|
|
failed_documents=int(row["failed_documents"] or 0),
|
|
avg_extraction_confidence=float(row["avg_extraction_confidence"] or 0.0),
|
|
newest_evidence_at=row["newest_evidence_at"],
|
|
source_types=source_types,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to fetch data quality context for %s/%s — will use summary fallback",
|
|
ticker, window, exc_info=True,
|
|
)
|
|
return None
|
|
|
|
|
|
_LATEST_TREND_QUERY = """
|
|
SELECT
|
|
entity_type, entity_id, "window", trend_direction, trend_strength,
|
|
confidence, top_supporting_evidence, top_opposing_evidence,
|
|
dominant_catalysts, material_risks, contradiction_score,
|
|
disagreement_details, market_context, generated_at
|
|
FROM trend_windows
|
|
WHERE entity_id = $1 AND "window" = $2
|
|
ORDER BY generated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
def _parse_trend_row(row: asyncpg.Record) -> TrendSummary:
|
|
"""Convert a trend_windows row into a TrendSummary."""
|
|
supporting = row["top_supporting_evidence"]
|
|
if isinstance(supporting, str):
|
|
supporting = json.loads(supporting)
|
|
|
|
opposing = row["top_opposing_evidence"]
|
|
if isinstance(opposing, str):
|
|
opposing = json.loads(opposing)
|
|
|
|
catalysts = row["dominant_catalysts"]
|
|
if isinstance(catalysts, str):
|
|
catalysts = json.loads(catalysts)
|
|
|
|
risks = row["material_risks"]
|
|
if isinstance(risks, str):
|
|
risks = json.loads(risks)
|
|
|
|
return TrendSummary(
|
|
entity_type=row["entity_type"],
|
|
entity_id=row["entity_id"],
|
|
window=TrendWindow(row["window"]),
|
|
trend_direction=TrendDirection(row["trend_direction"]),
|
|
trend_strength=float(row["trend_strength"]),
|
|
confidence=float(row["confidence"]),
|
|
top_supporting_evidence=supporting or [],
|
|
top_opposing_evidence=opposing or [],
|
|
dominant_catalysts=catalysts or [],
|
|
material_risks=risks or [],
|
|
contradiction_score=float(row["contradiction_score"] or 0.0),
|
|
generated_at=row["generated_at"],
|
|
)
|
|
|
|
|
|
async def fetch_latest_trend(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> TrendSummary | None:
|
|
"""Fetch the most recent trend summary for a ticker and window."""
|
|
row = await pool.fetchrow(_LATEST_TREND_QUERY, ticker, window)
|
|
if row is None:
|
|
return None
|
|
return _parse_trend_row(row)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fetch latest trend projection for a ticker + window
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_LATEST_PROJECTION_QUERY = """
|
|
SELECT
|
|
tp.projected_direction, tp.projected_strength, tp.projected_confidence,
|
|
tp.projection_horizon, tp.driving_factors, tp.macro_contribution_pct,
|
|
tp.diverges_from_current, tp.computed_at
|
|
FROM trend_projections tp
|
|
JOIN trend_windows tw ON tw.id = tp.trend_window_id
|
|
WHERE tw.entity_id = $1 AND tw."window" = $2
|
|
ORDER BY tp.computed_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def fetch_latest_projection(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str,
|
|
) -> TrendProjection | None:
|
|
"""Fetch the most recent trend projection for a ticker and window.
|
|
|
|
Returns None if no projection exists. Low-confidence projections
|
|
are returned with low_confidence=True so callers can decide whether
|
|
to use them (Requirement 12.9).
|
|
"""
|
|
try:
|
|
row = await pool.fetchrow(_LATEST_PROJECTION_QUERY, ticker, window)
|
|
if row is None:
|
|
return None
|
|
|
|
driving_factors = row["driving_factors"]
|
|
if isinstance(driving_factors, str):
|
|
driving_factors = json.loads(driving_factors)
|
|
|
|
proj = TrendProjection(
|
|
projected_direction=row["projected_direction"],
|
|
projected_strength=float(row["projected_strength"]),
|
|
projected_confidence=float(row["projected_confidence"]),
|
|
projection_horizon=row["projection_horizon"],
|
|
driving_factors=driving_factors or [],
|
|
macro_contribution_pct=float(row["macro_contribution_pct"] or 0.0),
|
|
diverges_from_current=bool(row["diverges_from_current"]),
|
|
computed_at=row["computed_at"],
|
|
low_confidence=float(row["projected_confidence"]) < 0.3,
|
|
)
|
|
return proj
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to fetch projection for %s/%s — continuing without projection",
|
|
ticker, window, exc_info=True,
|
|
)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build thesis from trend summary (deterministic, no LLM)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_thesis(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
projection: TrendProjection | None = None,
|
|
) -> str:
|
|
"""Generate a deterministic thesis string from trend data.
|
|
|
|
This is the descriptive analysis portion (Requirement 7.2).
|
|
The LLM wording layer is a separate optional task.
|
|
|
|
When a TrendProjection is provided and is not low-confidence,
|
|
the thesis incorporates the projected direction and key driving
|
|
factors (Requirement 12.8).
|
|
"""
|
|
direction = summary.trend_direction.value
|
|
ticker = summary.entity_id
|
|
window = summary.window.value
|
|
strength = summary.trend_strength
|
|
confidence = summary.confidence
|
|
|
|
parts: list[str] = []
|
|
|
|
# Opening: direction and strength
|
|
parts.append(
|
|
f"{ticker} shows a {direction} trend over the {window} window "
|
|
+ f"with strength {strength:.2f} and confidence {confidence:.2f}."
|
|
)
|
|
|
|
# Catalysts
|
|
if summary.dominant_catalysts:
|
|
catalyst_str = ", ".join(summary.dominant_catalysts[:3])
|
|
parts.append(f"Dominant catalysts: {catalyst_str}.")
|
|
|
|
# Contradiction note (Requirement 7.2 — separate descriptive from prescriptive)
|
|
if summary.contradiction_score > 0.15:
|
|
parts.append(
|
|
"Notable signal disagreement detected "
|
|
+ f"(contradiction score: {summary.contradiction_score:.2f})."
|
|
)
|
|
|
|
# Trend projection (Requirement 12.8)
|
|
if projection is not None and not projection.low_confidence:
|
|
proj_dir = projection.projected_direction
|
|
proj_str = projection.projected_strength
|
|
parts.append(
|
|
f"Forward projection ({projection.projection_horizon}): "
|
|
f"{proj_dir} at strength {proj_str:.2f}."
|
|
)
|
|
# Include top driving factors
|
|
non_divergence_factors = [
|
|
f for f in projection.driving_factors
|
|
if not f.startswith("DIVERGENCE:")
|
|
]
|
|
if non_divergence_factors:
|
|
factors_str = "; ".join(non_divergence_factors[:2])
|
|
parts.append(f"Key drivers: {factors_str}.")
|
|
if projection.diverges_from_current:
|
|
parts.append(
|
|
f"Note: projection diverges from current {direction} trend."
|
|
)
|
|
|
|
# Risks
|
|
if summary.material_risks:
|
|
risk_str = "; ".join(summary.material_risks[:2])
|
|
parts.append(f"Key risks: {risk_str}.")
|
|
|
|
# Evidence count
|
|
supporting_count = len(summary.top_supporting_evidence)
|
|
opposing_count = len(summary.top_opposing_evidence)
|
|
parts.append(
|
|
f"Based on {supporting_count} supporting and "
|
|
+ f"{opposing_count} opposing evidence documents."
|
|
)
|
|
|
|
# Prescriptive action (separated per Requirement 7.2)
|
|
action = result.action.value.upper()
|
|
mode = result.mode.value.replace("_", " ")
|
|
parts.append(f"Recommendation: {action} ({mode}).")
|
|
|
|
return " ".join(parts)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build risk classification (Requirement 7.2)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def classify_risk(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
) -> str:
|
|
"""Assign a risk classification label based on signal quality.
|
|
|
|
Returns one of: low, moderate, high, very_high.
|
|
"""
|
|
score = 0.0
|
|
|
|
# Contradiction raises risk
|
|
score += summary.contradiction_score * 2.0
|
|
|
|
# Low confidence raises risk
|
|
score += (1.0 - summary.confidence) * 1.5
|
|
|
|
# Low evidence count raises risk
|
|
evidence_count = len(summary.top_supporting_evidence) + len(summary.top_opposing_evidence)
|
|
if evidence_count < 3:
|
|
score += 1.0
|
|
elif evidence_count < 5:
|
|
score += 0.5
|
|
|
|
# Rejection reasons raise risk
|
|
score += len(result.rejection_reasons) * 0.5
|
|
|
|
if score >= 3.0:
|
|
return "very_high"
|
|
if score >= 2.0:
|
|
return "high"
|
|
if score >= 1.0:
|
|
return "moderate"
|
|
return "low"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Build Recommendation from eligibility result
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def build_recommendation(
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
reference_time: datetime | None = None,
|
|
llm_thesis: str | None = None,
|
|
suppression_result: SuppressionResult | None = None,
|
|
projection: TrendProjection | None = None,
|
|
) -> Recommendation:
|
|
"""Assemble a Recommendation object from a trend summary and eligibility result.
|
|
|
|
Combines all evidence refs (supporting + opposing) into the recommendation
|
|
so the full decision trace is available (Requirement 8.3).
|
|
|
|
If ``llm_thesis`` is provided (from the optional LLM wording layer),
|
|
it replaces the deterministic thesis text while preserving the risk
|
|
classification prefix.
|
|
|
|
If ``suppression_result`` indicates suppression, a suppression note
|
|
is appended to the thesis for audit visibility (Requirement 7.4).
|
|
|
|
If ``projection`` is provided and is not low-confidence, the thesis
|
|
incorporates projected direction and driving factors (Requirement 12.8).
|
|
The time_horizon may be refined based on the projection horizon.
|
|
"""
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
# Combine evidence refs — supporting first, then opposing
|
|
evidence_refs = list(summary.top_supporting_evidence) + list(summary.top_opposing_evidence)
|
|
|
|
deterministic_thesis = build_thesis(summary, result, projection=projection)
|
|
risk_class = classify_risk(summary, result)
|
|
|
|
# Use LLM-rewritten thesis if available, otherwise deterministic
|
|
thesis_body = llm_thesis if llm_thesis else deterministic_thesis
|
|
|
|
# Append suppression note if suppressed (Requirement 7.4)
|
|
if suppression_result and suppression_result.suppressed:
|
|
reason_strs = [r.value for r in suppression_result.reasons]
|
|
thesis_body += (
|
|
f" [SUPPRESSED: data quality below threshold "
|
|
f"(score={suppression_result.data_quality_score:.2f}, "
|
|
f"reasons={', '.join(reason_strs)})]"
|
|
)
|
|
|
|
# Determine time_horizon — refine with projection horizon if available
|
|
# (Requirement 12.8)
|
|
time_horizon = result.time_horizon
|
|
if projection is not None and not projection.low_confidence:
|
|
# Append projection horizon context to time_horizon
|
|
time_horizon = f"{result.time_horizon} (proj:{projection.projection_horizon})"
|
|
|
|
# Track whether the thesis was LLM-generated for audit
|
|
if llm_thesis:
|
|
provider = "ollama"
|
|
model_name = "thesis-rewrite"
|
|
prompt_version = THESIS_PROMPT_VERSION
|
|
else:
|
|
provider = "deterministic"
|
|
model_name = "eligibility-v1"
|
|
prompt_version = ""
|
|
|
|
return Recommendation(
|
|
ticker=summary.entity_id,
|
|
action=result.action,
|
|
mode=result.mode,
|
|
confidence=summary.confidence,
|
|
time_horizon=time_horizon,
|
|
thesis=f"[risk:{risk_class}] {thesis_body}",
|
|
invalidation_conditions=result.invalidation_conditions,
|
|
position_sizing=PositionSizing(
|
|
portfolio_pct=result.position_sizing.portfolio_pct,
|
|
max_loss_pct=result.position_sizing.max_loss_pct,
|
|
),
|
|
evidence_refs=evidence_refs,
|
|
model_metadata=ModelMetadata(
|
|
provider=provider,
|
|
model_name=model_name,
|
|
prompt_version=prompt_version,
|
|
schema_version="1.0.0",
|
|
),
|
|
generated_at=reference_time,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persist recommendation to PostgreSQL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_INSERT_RECOMMENDATION = """
|
|
INSERT INTO recommendations (
|
|
ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5,
|
|
$6, $7::jsonb, $8, $9,
|
|
$10, $11, $12, $13,
|
|
$14, $15
|
|
)
|
|
RETURNING id
|
|
"""
|
|
|
|
_INSERT_REC_EVIDENCE = """
|
|
INSERT INTO recommendation_evidence (
|
|
recommendation_id, document_id, evidence_type, weight
|
|
) VALUES ($1, $2::uuid, $3, $4)
|
|
"""
|
|
|
|
_INSERT_RISK_EVALUATION = """
|
|
INSERT INTO risk_evaluations (
|
|
recommendation_id, eligible, allowed_mode, rejection_reasons, risk_checks, evaluated_at
|
|
) VALUES ($1::uuid, $2, $3, $4::jsonb, $5::jsonb, $6)
|
|
"""
|
|
|
|
_FETCH_RECOMMENDATION = """
|
|
SELECT
|
|
id, ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
FROM recommendations
|
|
WHERE id = $1::uuid
|
|
"""
|
|
|
|
_FETCH_REC_EVIDENCE = """
|
|
SELECT document_id, evidence_type, weight
|
|
FROM recommendation_evidence
|
|
WHERE recommendation_id = $1::uuid
|
|
ORDER BY evidence_type, weight DESC
|
|
"""
|
|
|
|
_FETCH_LATEST_RECS_FOR_TICKER = """
|
|
SELECT
|
|
id, ticker, action, mode, confidence, time_horizon,
|
|
thesis, invalidation_conditions, portfolio_pct, max_loss_pct,
|
|
model_version, model_provider, prompt_version, schema_version,
|
|
risk_classification, generated_at
|
|
FROM recommendations
|
|
WHERE ticker = $1
|
|
ORDER BY generated_at DESC
|
|
LIMIT $2
|
|
"""
|
|
|
|
|
|
def _extract_risk_classification(thesis: str) -> str:
|
|
"""Extract the risk classification from the thesis prefix."""
|
|
if thesis.startswith("[risk:"):
|
|
end = thesis.find("]")
|
|
if end > 6:
|
|
return thesis[6:end]
|
|
return "moderate"
|
|
|
|
|
|
async def persist_recommendation(
|
|
pool: asyncpg.Pool,
|
|
rec: Recommendation,
|
|
supporting_ids: list[str],
|
|
opposing_ids: list[str],
|
|
eligibility_result: EligibilityResult | None = None,
|
|
) -> str:
|
|
"""Insert a recommendation, evidence citations, and risk evaluation.
|
|
|
|
Persists the full model metadata and risk classification for audit
|
|
trail (Requirement 8.3). Also writes the eligibility decision to
|
|
the risk_evaluations table when provided.
|
|
|
|
Returns the recommendation UUID.
|
|
"""
|
|
risk_class = _extract_risk_classification(rec.thesis)
|
|
|
|
row = await pool.fetchrow(
|
|
_INSERT_RECOMMENDATION,
|
|
rec.ticker,
|
|
rec.action.value,
|
|
rec.mode.value,
|
|
rec.confidence,
|
|
rec.time_horizon,
|
|
rec.thesis,
|
|
json.dumps(rec.invalidation_conditions),
|
|
rec.position_sizing.portfolio_pct,
|
|
rec.position_sizing.max_loss_pct,
|
|
rec.model_metadata.model_name,
|
|
rec.model_metadata.provider,
|
|
rec.model_metadata.prompt_version,
|
|
rec.model_metadata.schema_version,
|
|
risk_class,
|
|
rec.generated_at,
|
|
)
|
|
rec_id = str(row["id"])
|
|
|
|
# Insert evidence citations with position-based weighting.
|
|
# Filter out non-UUID document IDs (e.g. synthetic "pattern:..." IDs from
|
|
# competitive signal propagation) — the recommendation_evidence table
|
|
# requires a valid UUID foreign key to the documents table.
|
|
evidence_rows: list[tuple[str, str, str, float]] = []
|
|
for idx, doc_id in enumerate(supporting_ids):
|
|
try:
|
|
_uuid.UUID(doc_id)
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
weight = round(1.0 / (1.0 + idx * 0.1), 4) # rank decay
|
|
evidence_rows.append((rec_id, doc_id, "supporting", weight))
|
|
for idx, doc_id in enumerate(opposing_ids):
|
|
try:
|
|
_uuid.UUID(doc_id)
|
|
except (ValueError, AttributeError):
|
|
continue
|
|
weight = round(1.0 / (1.0 + idx * 0.1), 4)
|
|
evidence_rows.append((rec_id, doc_id, "opposing", weight))
|
|
|
|
if evidence_rows:
|
|
try:
|
|
await pool.executemany(_INSERT_REC_EVIDENCE, evidence_rows)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to insert %d evidence rows for recommendation %s",
|
|
len(evidence_rows),
|
|
rec_id,
|
|
exc_info=True,
|
|
)
|
|
|
|
# Persist the eligibility/risk evaluation for audit trail
|
|
if eligibility_result is not None:
|
|
rejection_reasons_json = json.dumps(
|
|
[r.value for r in eligibility_result.rejection_reasons]
|
|
)
|
|
risk_checks = {
|
|
"time_horizon": eligibility_result.time_horizon,
|
|
"position_sizing": {
|
|
"portfolio_pct": eligibility_result.position_sizing.portfolio_pct,
|
|
"max_loss_pct": eligibility_result.position_sizing.max_loss_pct,
|
|
},
|
|
"invalidation_conditions": eligibility_result.invalidation_conditions,
|
|
"risk_classification": risk_class,
|
|
}
|
|
|
|
# Store probabilistic EV fields in risk_checks JSONB (Req 16.2)
|
|
if eligibility_result.pipeline_mode == "probabilistic":
|
|
risk_checks["ev"] = eligibility_result.ev_value
|
|
risk_checks["p_bull"] = eligibility_result.p_bull
|
|
risk_checks["pipeline_mode"] = eligibility_result.pipeline_mode
|
|
risk_checks["ev_threshold"] = 0.005
|
|
await pool.execute(
|
|
_INSERT_RISK_EVALUATION,
|
|
rec_id,
|
|
eligibility_result.eligible,
|
|
eligibility_result.mode.value,
|
|
rejection_reasons_json,
|
|
json.dumps(risk_checks),
|
|
rec.generated_at,
|
|
)
|
|
|
|
return rec_id
|
|
|
|
|
|
async def fetch_recommendation_by_id(
|
|
pool: asyncpg.Pool,
|
|
recommendation_id: str,
|
|
) -> dict[str, object] | None:
|
|
"""Fetch a persisted recommendation with its evidence citations.
|
|
|
|
Returns a dict with the recommendation fields and an 'evidence' list,
|
|
or None if not found.
|
|
"""
|
|
row = await pool.fetchrow(_FETCH_RECOMMENDATION, recommendation_id)
|
|
if row is None:
|
|
return None
|
|
|
|
rec_dict = dict(row)
|
|
# Parse JSONB fields
|
|
if isinstance(rec_dict.get("invalidation_conditions"), str):
|
|
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
|
|
|
|
# Fetch evidence
|
|
evidence_rows = await pool.fetch(_FETCH_REC_EVIDENCE, recommendation_id)
|
|
rec_dict["evidence"] = [
|
|
{
|
|
"document_id": str(e["document_id"]),
|
|
"evidence_type": e["evidence_type"],
|
|
"weight": float(e["weight"]),
|
|
}
|
|
for e in evidence_rows
|
|
]
|
|
|
|
return rec_dict
|
|
|
|
|
|
async def fetch_latest_recommendations(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
limit: int = 10,
|
|
) -> list[dict[str, object]]:
|
|
"""Fetch the most recent recommendations for a ticker.
|
|
|
|
Returns a list of recommendation dicts (without evidence — use
|
|
fetch_recommendation_by_id for full detail).
|
|
"""
|
|
rows = await pool.fetch(_FETCH_LATEST_RECS_FOR_TICKER, ticker, limit)
|
|
results = []
|
|
for row in rows:
|
|
rec_dict = dict(row)
|
|
if isinstance(rec_dict.get("invalidation_conditions"), str):
|
|
rec_dict["invalidation_conditions"] = json.loads(rec_dict["invalidation_conditions"])
|
|
results.append(rec_dict)
|
|
return results
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main entry point: generate recommendation for a ticker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_DEDUP_CHECK_QUERY = """
|
|
SELECT r.id, r.confidence, r.action, r.mode, r.generated_at
|
|
FROM recommendations r
|
|
WHERE r.ticker = $1
|
|
AND r.time_horizon LIKE $2 || '%'
|
|
ORDER BY r.generated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
|
|
async def _is_duplicate_recommendation(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
summary: TrendSummary,
|
|
result: EligibilityResult,
|
|
) -> bool:
|
|
"""Check if the latest recommendation for this ticker+window is effectively identical.
|
|
|
|
Compares confidence, action, and mode. If they match, the underlying
|
|
trend data hasn't changed meaningfully and we skip regeneration.
|
|
"""
|
|
horizon_prefix = _map_time_horizon_prefix(summary.window.value)
|
|
row = await pool.fetchrow(_DEDUP_CHECK_QUERY, ticker, horizon_prefix)
|
|
if row is None:
|
|
return False
|
|
|
|
# If the previous recommendation has the same action, mode, and confidence
|
|
# (within a small tolerance), it's a duplicate
|
|
prev_confidence = float(row["confidence"])
|
|
prev_action = row["action"]
|
|
prev_mode = row["mode"]
|
|
|
|
same_action = prev_action == result.action.value
|
|
same_mode = prev_mode == result.mode.value
|
|
same_confidence = abs(prev_confidence - summary.confidence) < 0.01
|
|
|
|
if same_action and same_mode and same_confidence:
|
|
logger.info(
|
|
"Skipping duplicate recommendation for %s — action=%s mode=%s "
|
|
"confidence=%.3f matches previous",
|
|
ticker, prev_action, prev_mode, prev_confidence,
|
|
)
|
|
return True
|
|
return False
|
|
|
|
|
|
def _map_time_horizon_prefix(window: str) -> str:
|
|
"""Map window to the time_horizon prefix for dedup matching."""
|
|
mapping = {
|
|
"intraday": "intraday",
|
|
"1d": "swing_1d",
|
|
"7d": "swing_1d",
|
|
"30d": "position_10d",
|
|
"90d": "position_30d",
|
|
}
|
|
return mapping.get(window, "window_")
|
|
|
|
|
|
async def generate_recommendation(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
window: str = TrendWindow.SEVEN_DAY.value,
|
|
config: EligibilityConfig | None = None,
|
|
reference_time: datetime | None = None,
|
|
ollama_config: OllamaConfig | None = None,
|
|
suppression_config: SuppressionConfig | None = None,
|
|
minio_client: Minio | None = None,
|
|
) -> Recommendation | None:
|
|
"""Generate and persist a recommendation for a ticker from its latest trend.
|
|
|
|
Steps:
|
|
1. Fetch the latest trend summary for the ticker + window.
|
|
1b. Skip if the latest recommendation for this ticker is effectively identical.
|
|
2. Fetch the latest trend projection (Requirement 12.8, 12.9).
|
|
3. Evaluate data quality suppression (Requirement 7.4).
|
|
4. Evaluate eligibility using deterministic rules.
|
|
5. Build a Recommendation object with thesis and evidence.
|
|
- If ``ollama_config`` is provided, the deterministic thesis is
|
|
rewritten into analyst-quality prose via the LLM wording layer.
|
|
6. Persist the recommendation and evidence citations.
|
|
|
|
Returns the Recommendation, or None if no trend data exists or recommendation
|
|
is a duplicate of the most recent one.
|
|
"""
|
|
if reference_time is None:
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
cfg = config or EligibilityConfig()
|
|
sup_cfg = suppression_config or SuppressionConfig()
|
|
|
|
# 1. Fetch latest trend
|
|
summary = await fetch_latest_trend(pool, ticker, window)
|
|
if summary is None:
|
|
logger.info("No trend data for %s/%s — skipping recommendation", ticker, window)
|
|
return None
|
|
|
|
# 1b. Check for duplicate: evaluate eligibility early for dedup comparison
|
|
preliminary_result = evaluate_eligibility(summary, cfg)
|
|
if await _is_duplicate_recommendation(pool, ticker, summary, preliminary_result):
|
|
return None
|
|
|
|
# 2. Fetch latest trend projection (Requirement 12.8, 12.9)
|
|
projection = await fetch_latest_projection(pool, ticker, window)
|
|
# Exclude low-confidence projections from influencing recommendation
|
|
# eligibility (Requirement 12.9). The projection is still passed to
|
|
# build_recommendation for informational display, but marked as
|
|
# low_confidence so it won't affect thesis or time_horizon.
|
|
effective_projection = projection
|
|
if projection is not None and projection.low_confidence:
|
|
effective_projection = projection # still passed, but build_thesis checks low_confidence
|
|
|
|
# 3. Evaluate data quality suppression (Requirement 7.4)
|
|
quality_ctx = await fetch_data_quality_context(pool, ticker, window)
|
|
suppression = evaluate_suppression(
|
|
summary, quality_ctx=quality_ctx, config=sup_cfg, reference_time=reference_time,
|
|
)
|
|
|
|
# 4. Evaluate eligibility (use preliminary result, already computed for dedup)
|
|
result = preliminary_result
|
|
|
|
# Apply suppression: force mode to informational if suppressed
|
|
if suppression.suppressed:
|
|
result = EligibilityResult(
|
|
eligible=False,
|
|
action=result.action,
|
|
mode=RecommendationMode.INFORMATIONAL,
|
|
position_sizing=result.position_sizing,
|
|
rejection_reasons=result.rejection_reasons,
|
|
time_horizon=result.time_horizon,
|
|
invalidation_conditions=result.invalidation_conditions,
|
|
)
|
|
|
|
# 5. Optional LLM thesis rewrite — only for trading-eligible recommendations
|
|
# to avoid the LLM bottleneck on informational/suppressed recs.
|
|
llm_thesis: str | None = None
|
|
if ollama_config is not None and result.eligible and not suppression.suppressed:
|
|
deterministic_thesis = build_thesis(summary, result, projection=effective_projection)
|
|
llm_thesis = await rewrite_thesis_with_llm(
|
|
deterministic_thesis=deterministic_thesis,
|
|
summary=summary,
|
|
config=ollama_config,
|
|
pool=pool,
|
|
)
|
|
# If the LLM returned the same text as the deterministic thesis,
|
|
# treat it as a no-op (fallback was used).
|
|
if llm_thesis == deterministic_thesis:
|
|
llm_thesis = None
|
|
|
|
# 6. Build recommendation
|
|
rec = build_recommendation(
|
|
summary, result, reference_time, llm_thesis=llm_thesis,
|
|
suppression_result=suppression,
|
|
projection=effective_projection,
|
|
)
|
|
|
|
# 7. Persist recommendation, evidence citations, and risk evaluation
|
|
rec_id = await persist_recommendation(
|
|
pool,
|
|
rec,
|
|
supporting_ids=list(summary.top_supporting_evidence),
|
|
opposing_ids=list(summary.top_opposing_evidence),
|
|
eligibility_result=result,
|
|
)
|
|
|
|
# 8. Publish prediction facts to analytical tables (Requirement 9.4)
|
|
if minio_client is not None:
|
|
try:
|
|
lake_refs = publish_recommendation_facts(
|
|
minio_client,
|
|
rec,
|
|
trend_direction=summary.trend_direction.value,
|
|
trend_strength=summary.trend_strength,
|
|
)
|
|
logger.info(
|
|
"Published analytical facts for %s: %s",
|
|
ticker, lake_refs,
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to publish analytical facts for %s/%s — recommendation "
|
|
"persisted but lake publication failed",
|
|
ticker, rec_id, exc_info=True,
|
|
)
|
|
|
|
logger.info(
|
|
"Generated recommendation %s for %s: action=%s mode=%s confidence=%.3f "
|
|
"eligible=%s suppressed=%s quality_score=%.3f llm_thesis=%s projection=%s",
|
|
rec_id, ticker, rec.action.value, rec.mode.value, rec.confidence,
|
|
result.eligible, suppression.suppressed, suppression.data_quality_score,
|
|
llm_thesis is not None,
|
|
projection.projected_direction if projection else "none",
|
|
)
|
|
|
|
# Prometheus metrics
|
|
RECOMMENDATION_GENERATED.labels(action=rec.action.value, mode=rec.mode.value).inc()
|
|
RECOMMENDATION_CONFIDENCE.observe(rec.confidence)
|
|
if suppression.suppressed:
|
|
RECOMMENDATION_SUPPRESSED.inc()
|
|
|
|
return rec
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch: generate recommendations for multiple tickers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def generate_recommendations_batch(
|
|
pool: asyncpg.Pool,
|
|
tickers: list[str],
|
|
window: str = TrendWindow.SEVEN_DAY.value,
|
|
config: EligibilityConfig | None = None,
|
|
ollama_config: OllamaConfig | None = None,
|
|
suppression_config: SuppressionConfig | None = None,
|
|
minio_client: Minio | None = None,
|
|
) -> list[Recommendation]:
|
|
"""Generate recommendations for a list of tickers.
|
|
|
|
Processes each ticker sequentially. Returns only the successfully
|
|
generated recommendations (tickers with no trend data are skipped).
|
|
|
|
If ``ollama_config`` is provided, each recommendation's thesis will
|
|
be rewritten using the LLM wording layer.
|
|
"""
|
|
results: list[Recommendation] = []
|
|
reference_time = datetime.now(timezone.utc)
|
|
|
|
for ticker in tickers:
|
|
rec = await generate_recommendation(
|
|
pool, ticker, window, config, reference_time,
|
|
ollama_config=ollama_config,
|
|
suppression_config=suppression_config,
|
|
minio_client=minio_client,
|
|
)
|
|
if rec is not None:
|
|
results.append(rec)
|
|
|
|
logger.info(
|
|
"Batch recommendation: %d/%d tickers produced recommendations",
|
|
len(results), len(tickers),
|
|
)
|
|
return results
|