f9ee1532dc
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
Qwen3.5 in thinking mode emits <think>...</think> chain-of-thought before the actual response. The thesis rewriter was returning the raw output including the entire reasoning block. Now strips thinking tags from both Ollama and vLLM response paths.
407 lines
14 KiB
Python
407 lines
14 KiB
Python
"""Optional LLM wording layer for thesis generation.
|
|
|
|
Takes a deterministic thesis string (built from trend data and eligibility
|
|
rules) and rewrites it into natural, analyst-quality prose using a local
|
|
Ollama model. The deterministic thesis is always preserved as the fallback
|
|
and audit reference.
|
|
|
|
This module is opt-in: callers must explicitly request LLM rewriting.
|
|
If the LLM call fails or is disabled, the original deterministic thesis
|
|
is returned unchanged.
|
|
|
|
Requirements: 7.1, 7.2
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
import asyncpg
|
|
import httpx
|
|
|
|
from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig
|
|
from services.shared.config import OllamaConfig, VLLMConfig
|
|
from services.shared.schemas import TrendSummary
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
THESIS_PROMPT_VERSION = "thesis-rewrite-v1"
|
|
|
|
THESIS_SYSTEM_PROMPT = """\
|
|
You are a concise financial analyst. You rewrite structured trade thesis \
|
|
summaries into clear, professional prose suitable for an internal research note.
|
|
|
|
STRICT RULES:
|
|
1. Do NOT add any information that is not present in the input.
|
|
2. Do NOT fabricate numbers, dates, company names, or analyst opinions.
|
|
3. Keep the rewrite under 150 words.
|
|
4. Preserve all factual claims, risk notes, and evidence counts from the input.
|
|
5. Use a neutral, professional tone. Avoid hype or marketing language.
|
|
6. Return ONLY the rewritten thesis text. No JSON, no markdown, no commentary."""
|
|
|
|
|
|
def build_thesis_rewrite_prompt(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
) -> dict[str, str]:
|
|
"""Build system and user prompts for thesis rewriting.
|
|
|
|
Provides the model with the deterministic thesis and key trend
|
|
context so it can produce a natural-language version.
|
|
"""
|
|
context_parts = [
|
|
f"Ticker: {summary.entity_id}",
|
|
f"Window: {summary.window.value}",
|
|
f"Direction: {summary.trend_direction.value}",
|
|
f"Strength: {summary.trend_strength:.2f}",
|
|
f"Confidence: {summary.confidence:.2f}",
|
|
f"Contradiction score: {summary.contradiction_score:.2f}",
|
|
]
|
|
if summary.dominant_catalysts:
|
|
context_parts.append(f"Catalysts: {', '.join(summary.dominant_catalysts[:3])}")
|
|
if summary.material_risks:
|
|
context_parts.append(f"Risks: {'; '.join(summary.material_risks[:2])}")
|
|
|
|
context_block = "\n".join(context_parts)
|
|
|
|
user_prompt = f"""\
|
|
Rewrite the following structured thesis into clear, professional analyst prose.
|
|
|
|
--- STRUCTURED THESIS ---
|
|
{deterministic_thesis}
|
|
--- END STRUCTURED THESIS ---
|
|
|
|
--- CONTEXT ---
|
|
{context_block}
|
|
--- END CONTEXT ---
|
|
|
|
Return ONLY the rewritten thesis. No other text."""
|
|
|
|
return {
|
|
"system": THESIS_SYSTEM_PROMPT,
|
|
"user": user_prompt,
|
|
}
|
|
|
|
|
|
async def rewrite_thesis_with_llm(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
config: OllamaConfig,
|
|
http_client: httpx.AsyncClient | None = None,
|
|
pool: asyncpg.Pool | None = None,
|
|
) -> str:
|
|
"""Rewrite a deterministic thesis using a local Ollama model.
|
|
|
|
If the LLM call fails for any reason, returns the original
|
|
deterministic thesis unchanged. This ensures the LLM layer is
|
|
purely additive and never blocks recommendation generation.
|
|
|
|
Resolves runtime config for the "thesis-rewriter" agent slug from
|
|
the database when a pool is provided, preferring an active variant's
|
|
model, timeout, and max_retries. Falls back to the passed
|
|
OllamaConfig if resolution fails.
|
|
|
|
Args:
|
|
deterministic_thesis: The rule-based thesis string.
|
|
summary: The trend summary that produced the thesis.
|
|
config: Ollama connection and model configuration.
|
|
http_client: Optional shared HTTP client for connection reuse.
|
|
pool: Optional asyncpg pool for config resolution and performance logging.
|
|
|
|
Returns:
|
|
The LLM-rewritten thesis on success, or the original on failure.
|
|
"""
|
|
start_time = time.monotonic()
|
|
|
|
# Resolve thesis-rewriter config from DB for variant override
|
|
resolved: ResolvedAgentConfig | None = None
|
|
effective_config: OllamaConfig | VLLMConfig = config
|
|
use_vllm = False
|
|
if pool is not None:
|
|
try:
|
|
resolver = AgentConfigResolver(pool, ttl_seconds=60)
|
|
resolved = await resolver.resolve("thesis-rewriter")
|
|
if resolved is not None:
|
|
provider = (resolved.model_provider or "").strip().lower()
|
|
if provider == "vllm":
|
|
use_vllm = True
|
|
# Import load_config to get vllm base_url from env
|
|
from services.shared.config import load_config as _load_config
|
|
_cfg = _load_config()
|
|
effective_config = VLLMConfig(
|
|
base_url=_cfg.vllm.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
max_tokens=resolved.max_tokens,
|
|
temperature=0.0,
|
|
)
|
|
else:
|
|
effective_config = OllamaConfig(
|
|
base_url=config.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
retry_base_delay=config.retry_base_delay,
|
|
retry_max_delay=config.retry_max_delay,
|
|
retry_backoff_multiplier=config.retry_backoff_multiplier,
|
|
max_tokens=resolved.max_tokens,
|
|
context_window=resolved.context_window,
|
|
)
|
|
logger.info(
|
|
"Thesis rewriter using resolved config: model=%s variant=%s provider=%s",
|
|
resolved.model_name, resolved.variant_id, provider or "ollama",
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to resolve thesis-rewriter config — using passed config",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Token budget enforcement
|
|
if (
|
|
resolved is not None
|
|
and resolved.token_budget > 0
|
|
and resolved.variant_id is not None
|
|
and pool is not None
|
|
):
|
|
try:
|
|
row = await pool.fetchrow(
|
|
"""SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens
|
|
FROM agent_performance_log
|
|
WHERE variant_id = $1
|
|
AND recorded_at >= NOW() - INTERVAL '1 hour'""",
|
|
resolved.variant_id,
|
|
)
|
|
used = int(row["total_tokens"]) if row else 0
|
|
if used >= resolved.token_budget:
|
|
logger.warning(
|
|
"Token budget exceeded for thesis-rewriter variant %s: used %d / budget %d",
|
|
resolved.variant_id, used, resolved.token_budget,
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
logger.warning("Failed to check token budget for thesis-rewriter", exc_info=True)
|
|
|
|
prompts = build_thesis_rewrite_prompt(deterministic_thesis, summary)
|
|
|
|
# Override system prompt from resolved config if available
|
|
if resolved is not None and resolved.system_prompt:
|
|
prompts["system"] = resolved.system_prompt
|
|
|
|
owns_client = http_client is None
|
|
client = http_client or httpx.AsyncClient(timeout=effective_config.timeout)
|
|
|
|
try:
|
|
if use_vllm:
|
|
rewritten = await _call_vllm_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
else:
|
|
rewritten = await _call_ollama_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
|
|
if rewritten:
|
|
logger.info(
|
|
"LLM thesis rewrite succeeded for %s (%d chars → %d chars)",
|
|
summary.entity_id,
|
|
len(deterministic_thesis),
|
|
len(rewritten),
|
|
)
|
|
# Log success to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=True,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=len(rewritten) // 4,
|
|
)
|
|
return rewritten
|
|
|
|
logger.warning(
|
|
"LLM thesis rewrite returned empty for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
# Log failure to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="empty_response",
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
logger.exception(
|
|
"LLM thesis rewrite failed for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="exception",
|
|
)
|
|
return deterministic_thesis
|
|
finally:
|
|
if owns_client:
|
|
await client.aclose()
|
|
|
|
|
|
async def _log_thesis_performance(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
resolved: ResolvedAgentConfig,
|
|
ticker: str,
|
|
success: bool,
|
|
duration_ms: int,
|
|
input_tokens: int = 0,
|
|
output_tokens: int = 0,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
"""Insert a performance log entry for the thesis rewriter agent."""
|
|
try:
|
|
await pool.execute(
|
|
"""INSERT INTO agent_performance_log
|
|
(agent_id, variant_id, document_id, ticker, success,
|
|
duration_ms, confidence, retry_count,
|
|
input_tokens, output_tokens, error_message)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
|
|
resolved.agent_id,
|
|
resolved.variant_id,
|
|
None, # no document_id for thesis rewrites
|
|
ticker,
|
|
success,
|
|
duration_ms,
|
|
0.0, # no confidence score for rewrites
|
|
0,
|
|
input_tokens,
|
|
output_tokens,
|
|
error_message,
|
|
)
|
|
except Exception:
|
|
logger.warning("Failed to log thesis-rewriter performance", exc_info=True)
|
|
|
|
|
|
async def _call_ollama_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: OllamaConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a single Ollama chat call for thesis rewriting.
|
|
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"stream": False,
|
|
}
|
|
|
|
# Support context_window override via num_ctx (Requirement 10.4)
|
|
if config.context_window > 0:
|
|
payload["options"] = {"num_ctx": config.context_window}
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/api/chat",
|
|
json=payload,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
msg = body.get("message")
|
|
content: str = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"Ollama thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|
|
|
|
|
|
def _strip_thinking_block(text: str) -> str:
|
|
"""Remove <think>...</think> reasoning blocks from model output.
|
|
|
|
Some models (e.g. Qwen) emit chain-of-thought in <think> tags before
|
|
the actual response. This strips that prefix to return only the final
|
|
thesis text.
|
|
"""
|
|
import re
|
|
# Remove <think>...</think> blocks (greedy, handles multiline)
|
|
cleaned = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
|
|
# Also handle unclosed <think> tag (model cut off mid-thought)
|
|
cleaned = re.sub(r"<think>.*", "", cleaned, flags=re.DOTALL)
|
|
return cleaned.strip()
|
|
|
|
|
|
async def _call_vllm_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: VLLMConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a vLLM chat completion call for thesis rewriting.
|
|
|
|
Uses the OpenAI-compatible /v1/chat/completions endpoint.
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload: dict[str, object] = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"max_tokens": config.max_tokens,
|
|
"temperature": config.temperature,
|
|
"stream": False,
|
|
}
|
|
|
|
headers: dict[str, str] = {"Content-Type": "application/json"}
|
|
if config.api_key:
|
|
headers["Authorization"] = f"Bearer {config.api_key}"
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/v1/chat/completions",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
choices = body.get("choices", [])
|
|
content: str = ""
|
|
if choices and isinstance(choices, list):
|
|
msg = choices[0].get("message", {}) # type: ignore[union-attr]
|
|
content = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"vLLM thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|