Files
Celes Renata 6880f11c26
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
fix: add /no_think inline tag to disable Qwen3 thinking mode
chat_template_kwargs isn't being respected by the vLLM deployment.
Qwen3 models support /no_think as an inline suffix in the user message
to disable thinking mode. This is the most reliable method across all
serving backends (vLLM, Ollama, SGLang).
2026-04-29 16:11:44 +00:00

439 lines
16 KiB
Python

"""Optional LLM wording layer for thesis generation.
Takes a deterministic thesis string (built from trend data and eligibility
rules) and rewrites it into natural, analyst-quality prose using a local
Ollama model. The deterministic thesis is always preserved as the fallback
and audit reference.
This module is opt-in: callers must explicitly request LLM rewriting.
If the LLM call fails or is disabled, the original deterministic thesis
is returned unchanged.
Requirements: 7.1, 7.2
"""
from __future__ import annotations
import logging
import time
import asyncpg
import httpx
from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig
from services.shared.config import OllamaConfig, VLLMConfig
from services.shared.schemas import TrendSummary
logger = logging.getLogger(__name__)
THESIS_PROMPT_VERSION = "thesis-rewrite-v1"
THESIS_SYSTEM_PROMPT = """\
You are a concise financial analyst. You rewrite structured trade thesis \
summaries into clear, professional prose suitable for an internal research note.
STRICT RULES:
1. Do NOT add any information that is not present in the input.
2. Do NOT fabricate numbers, dates, company names, or analyst opinions.
3. Keep the rewrite under 150 words.
4. Preserve all factual claims, risk notes, and evidence counts from the input.
5. Use a neutral, professional tone. Avoid hype or marketing language.
6. Return ONLY the rewritten thesis text. No JSON, no markdown, no commentary.
7. Do NOT show your thinking process. Do NOT include any reasoning steps. Output ONLY the final rewritten text."""
def build_thesis_rewrite_prompt(
deterministic_thesis: str,
summary: TrendSummary,
) -> dict[str, str]:
"""Build system and user prompts for thesis rewriting.
Provides the model with the deterministic thesis and key trend
context so it can produce a natural-language version.
"""
context_parts = [
f"Ticker: {summary.entity_id}",
f"Window: {summary.window.value}",
f"Direction: {summary.trend_direction.value}",
f"Strength: {summary.trend_strength:.2f}",
f"Confidence: {summary.confidence:.2f}",
f"Contradiction score: {summary.contradiction_score:.2f}",
]
if summary.dominant_catalysts:
context_parts.append(f"Catalysts: {', '.join(summary.dominant_catalysts[:3])}")
if summary.material_risks:
context_parts.append(f"Risks: {'; '.join(summary.material_risks[:2])}")
context_block = "\n".join(context_parts)
user_prompt = f"""\
Rewrite the following structured thesis into clear, professional analyst prose.
--- STRUCTURED THESIS ---
{deterministic_thesis}
--- END STRUCTURED THESIS ---
--- CONTEXT ---
{context_block}
--- END CONTEXT ---
Return ONLY the rewritten thesis. No other text. /no_think"""
return {
"system": THESIS_SYSTEM_PROMPT,
"user": user_prompt,
}
async def rewrite_thesis_with_llm(
deterministic_thesis: str,
summary: TrendSummary,
config: OllamaConfig,
http_client: httpx.AsyncClient | None = None,
pool: asyncpg.Pool | None = None,
) -> str:
"""Rewrite a deterministic thesis using a local Ollama model.
If the LLM call fails for any reason, returns the original
deterministic thesis unchanged. This ensures the LLM layer is
purely additive and never blocks recommendation generation.
Resolves runtime config for the "thesis-rewriter" agent slug from
the database when a pool is provided, preferring an active variant's
model, timeout, and max_retries. Falls back to the passed
OllamaConfig if resolution fails.
Args:
deterministic_thesis: The rule-based thesis string.
summary: The trend summary that produced the thesis.
config: Ollama connection and model configuration.
http_client: Optional shared HTTP client for connection reuse.
pool: Optional asyncpg pool for config resolution and performance logging.
Returns:
The LLM-rewritten thesis on success, or the original on failure.
"""
start_time = time.monotonic()
# Resolve thesis-rewriter config from DB for variant override
resolved: ResolvedAgentConfig | None = None
effective_config: OllamaConfig | VLLMConfig = config
use_vllm = False
if pool is not None:
try:
resolver = AgentConfigResolver(pool, ttl_seconds=60)
resolved = await resolver.resolve("thesis-rewriter")
if resolved is not None:
provider = (resolved.model_provider or "").strip().lower()
if provider == "vllm":
use_vllm = True
# Import load_config to get vllm base_url from env
from services.shared.config import load_config as _load_config
_cfg = _load_config()
effective_config = VLLMConfig(
base_url=_cfg.vllm.base_url,
model=resolved.model_name,
timeout=resolved.timeout_seconds,
max_retries=resolved.max_retries,
max_tokens=resolved.max_tokens,
temperature=0.0,
)
else:
effective_config = OllamaConfig(
base_url=config.base_url,
model=resolved.model_name,
timeout=resolved.timeout_seconds,
max_retries=resolved.max_retries,
retry_base_delay=config.retry_base_delay,
retry_max_delay=config.retry_max_delay,
retry_backoff_multiplier=config.retry_backoff_multiplier,
max_tokens=resolved.max_tokens,
context_window=resolved.context_window,
)
logger.info(
"Thesis rewriter using resolved config: model=%s variant=%s provider=%s",
resolved.model_name, resolved.variant_id, provider or "ollama",
)
except Exception:
logger.warning(
"Failed to resolve thesis-rewriter config — using passed config",
exc_info=True,
)
# Token budget enforcement
if (
resolved is not None
and resolved.token_budget > 0
and resolved.variant_id is not None
and pool is not None
):
try:
row = await pool.fetchrow(
"""SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens
FROM agent_performance_log
WHERE variant_id = $1
AND recorded_at >= NOW() - INTERVAL '1 hour'""",
resolved.variant_id,
)
used = int(row["total_tokens"]) if row else 0
if used >= resolved.token_budget:
logger.warning(
"Token budget exceeded for thesis-rewriter variant %s: used %d / budget %d",
resolved.variant_id, used, resolved.token_budget,
)
return deterministic_thesis
except Exception:
logger.warning("Failed to check token budget for thesis-rewriter", exc_info=True)
prompts = build_thesis_rewrite_prompt(deterministic_thesis, summary)
# Override system prompt from resolved config if available
if resolved is not None and resolved.system_prompt:
prompts["system"] = resolved.system_prompt
owns_client = http_client is None
client = http_client or httpx.AsyncClient(timeout=effective_config.timeout)
try:
if use_vllm:
rewritten = await _call_vllm_thesis(client, effective_config, prompts) # type: ignore[arg-type]
else:
rewritten = await _call_ollama_thesis(client, effective_config, prompts) # type: ignore[arg-type]
duration_ms = int((time.monotonic() - start_time) * 1000)
if rewritten:
logger.info(
"LLM thesis rewrite succeeded for %s (%d chars → %d chars)",
summary.entity_id,
len(deterministic_thesis),
len(rewritten),
)
# Log success to agent_performance_log
if pool is not None and resolved is not None:
await _log_thesis_performance(
pool,
resolved=resolved,
ticker=summary.entity_id,
success=True,
duration_ms=duration_ms,
input_tokens=len(deterministic_thesis) // 4,
output_tokens=len(rewritten) // 4,
)
return rewritten
logger.warning(
"LLM thesis rewrite returned empty for %s — using deterministic thesis",
summary.entity_id,
)
# Log failure to agent_performance_log
if pool is not None and resolved is not None:
await _log_thesis_performance(
pool,
resolved=resolved,
ticker=summary.entity_id,
success=False,
duration_ms=duration_ms,
input_tokens=len(deterministic_thesis) // 4,
output_tokens=0,
error_message="empty_response",
)
return deterministic_thesis
except Exception:
duration_ms = int((time.monotonic() - start_time) * 1000)
logger.exception(
"LLM thesis rewrite failed for %s — using deterministic thesis",
summary.entity_id,
)
if pool is not None and resolved is not None:
await _log_thesis_performance(
pool,
resolved=resolved,
ticker=summary.entity_id,
success=False,
duration_ms=duration_ms,
input_tokens=len(deterministic_thesis) // 4,
output_tokens=0,
error_message="exception",
)
return deterministic_thesis
finally:
if owns_client:
await client.aclose()
async def _log_thesis_performance(
pool: asyncpg.Pool,
*,
resolved: ResolvedAgentConfig,
ticker: str,
success: bool,
duration_ms: int,
input_tokens: int = 0,
output_tokens: int = 0,
error_message: str | None = None,
) -> None:
"""Insert a performance log entry for the thesis rewriter agent."""
try:
await pool.execute(
"""INSERT INTO agent_performance_log
(agent_id, variant_id, document_id, ticker, success,
duration_ms, confidence, retry_count,
input_tokens, output_tokens, error_message)
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
resolved.agent_id,
resolved.variant_id,
None, # no document_id for thesis rewrites
ticker,
success,
duration_ms,
0.0, # no confidence score for rewrites
0,
input_tokens,
output_tokens,
error_message,
)
except Exception:
logger.warning("Failed to log thesis-rewriter performance", exc_info=True)
async def _call_ollama_thesis(
client: httpx.AsyncClient,
config: OllamaConfig,
prompts: dict[str, str],
) -> str:
"""Make a single Ollama chat call for thesis rewriting.
Returns the model's text response, or empty string on failure.
"""
start = time.monotonic()
payload = {
"model": config.model,
"messages": [
{"role": "system", "content": prompts["system"]},
{"role": "user", "content": prompts["user"]},
],
"stream": False,
}
# Support context_window override via num_ctx (Requirement 10.4)
options: dict[str, object] = {}
if config.context_window > 0:
options["num_ctx"] = config.context_window
# Disable thinking/reasoning mode for models that support it (e.g. Qwen3)
options["num_predict"] = options.get("num_predict", 512)
if options:
payload["options"] = options
# Qwen3 thinking mode control: /no_think suffix or think parameter
payload["think"] = False
resp = await client.post(
f"{config.base_url}/api/chat",
json=payload,
)
_ = resp.raise_for_status()
duration_ms = int((time.monotonic() - start) * 1000)
body: dict[str, object] = resp.json()
msg = body.get("message")
content: str = msg.get("content", "") if isinstance(msg, dict) else ""
logger.debug(
"Ollama thesis call completed in %dms, response length=%d",
duration_ms,
len(content),
)
return _strip_thinking_block(content.strip())
def _strip_thinking_block(text: str) -> str:
"""Remove thinking/reasoning blocks from model output.
Some models (e.g. Qwen) emit chain-of-thought either in <think> XML tags
or as plain-text "Thinking Process:" blocks before the actual response.
This strips both patterns to return only the final thesis text.
"""
import re
# Remove <think>...</think> blocks (greedy, handles multiline)
cleaned = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
# Handle unclosed <think> tag (model cut off mid-thought)
cleaned = re.sub(r"<think>.*", "", cleaned, flags=re.DOTALL)
# Remove plain-text "Thinking Process:" blocks followed by the actual thesis
# Pattern: everything from "Thinking Process:" up to "</think>" or the final
# clean thesis (identified by the last paragraph that doesn't start with numbering/bullets)
cleaned = re.sub(
r"(?:Thinking Process:|Thought Process:|Chain of Thought:).*?(?=\n[A-Z]{2,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?))",
"",
cleaned,
flags=re.DOTALL | re.IGNORECASE,
)
# Fallback: if "Thinking Process:" still present, take only text after last "</think>" or
# after the thinking block ends (heuristic: last substantial paragraph)
if "thinking process:" in cleaned.lower():
# Find the actual thesis — it's typically the last coherent paragraph
# that starts with a ticker symbol pattern
match = re.search(
r"\n([A-Z]{1,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?)\s.+)",
cleaned,
flags=re.DOTALL,
)
if match:
cleaned = match.group(1)
return cleaned.strip()
async def _call_vllm_thesis(
client: httpx.AsyncClient,
config: VLLMConfig,
prompts: dict[str, str],
) -> str:
"""Make a vLLM chat completion call for thesis rewriting.
Uses the OpenAI-compatible /v1/chat/completions endpoint.
Returns the model's text response, or empty string on failure.
"""
start = time.monotonic()
payload: dict[str, object] = {
"model": config.model,
"messages": [
{"role": "system", "content": prompts["system"]},
{"role": "user", "content": prompts["user"]},
],
"max_tokens": config.max_tokens,
"temperature": config.temperature,
"stream": False,
# Disable thinking/reasoning mode for Qwen3 models on vLLM
"chat_template_kwargs": {"enable_thinking": False},
}
headers: dict[str, str] = {"Content-Type": "application/json"}
if config.api_key:
headers["Authorization"] = f"Bearer {config.api_key}"
resp = await client.post(
f"{config.base_url}/v1/chat/completions",
json=payload,
headers=headers,
)
_ = resp.raise_for_status()
duration_ms = int((time.monotonic() - start) * 1000)
body: dict[str, object] = resp.json()
choices = body.get("choices", [])
content: str = ""
if choices and isinstance(choices, list):
msg = choices[0].get("message", {}) # type: ignore[union-attr]
content = msg.get("content", "") if isinstance(msg, dict) else ""
logger.debug(
"vLLM thesis call completed in %dms, response length=%d",
duration_ms,
len(content),
)
return _strip_thinking_block(content.strip())