6880f11c26
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
chat_template_kwargs isn't being respected by the vLLM deployment. Qwen3 models support /no_think as an inline suffix in the user message to disable thinking mode. This is the most reliable method across all serving backends (vLLM, Ollama, SGLang).
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""Optional LLM wording layer for thesis generation.
|
|
|
|
Takes a deterministic thesis string (built from trend data and eligibility
|
|
rules) and rewrites it into natural, analyst-quality prose using a local
|
|
Ollama model. The deterministic thesis is always preserved as the fallback
|
|
and audit reference.
|
|
|
|
This module is opt-in: callers must explicitly request LLM rewriting.
|
|
If the LLM call fails or is disabled, the original deterministic thesis
|
|
is returned unchanged.
|
|
|
|
Requirements: 7.1, 7.2
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
import asyncpg
|
|
import httpx
|
|
|
|
from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig
|
|
from services.shared.config import OllamaConfig, VLLMConfig
|
|
from services.shared.schemas import TrendSummary
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
THESIS_PROMPT_VERSION = "thesis-rewrite-v1"
|
|
|
|
THESIS_SYSTEM_PROMPT = """\
|
|
You are a concise financial analyst. You rewrite structured trade thesis \
|
|
summaries into clear, professional prose suitable for an internal research note.
|
|
|
|
STRICT RULES:
|
|
1. Do NOT add any information that is not present in the input.
|
|
2. Do NOT fabricate numbers, dates, company names, or analyst opinions.
|
|
3. Keep the rewrite under 150 words.
|
|
4. Preserve all factual claims, risk notes, and evidence counts from the input.
|
|
5. Use a neutral, professional tone. Avoid hype or marketing language.
|
|
6. Return ONLY the rewritten thesis text. No JSON, no markdown, no commentary.
|
|
7. Do NOT show your thinking process. Do NOT include any reasoning steps. Output ONLY the final rewritten text."""
|
|
|
|
|
|
def build_thesis_rewrite_prompt(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
) -> dict[str, str]:
|
|
"""Build system and user prompts for thesis rewriting.
|
|
|
|
Provides the model with the deterministic thesis and key trend
|
|
context so it can produce a natural-language version.
|
|
"""
|
|
context_parts = [
|
|
f"Ticker: {summary.entity_id}",
|
|
f"Window: {summary.window.value}",
|
|
f"Direction: {summary.trend_direction.value}",
|
|
f"Strength: {summary.trend_strength:.2f}",
|
|
f"Confidence: {summary.confidence:.2f}",
|
|
f"Contradiction score: {summary.contradiction_score:.2f}",
|
|
]
|
|
if summary.dominant_catalysts:
|
|
context_parts.append(f"Catalysts: {', '.join(summary.dominant_catalysts[:3])}")
|
|
if summary.material_risks:
|
|
context_parts.append(f"Risks: {'; '.join(summary.material_risks[:2])}")
|
|
|
|
context_block = "\n".join(context_parts)
|
|
|
|
user_prompt = f"""\
|
|
Rewrite the following structured thesis into clear, professional analyst prose.
|
|
|
|
--- STRUCTURED THESIS ---
|
|
{deterministic_thesis}
|
|
--- END STRUCTURED THESIS ---
|
|
|
|
--- CONTEXT ---
|
|
{context_block}
|
|
--- END CONTEXT ---
|
|
|
|
Return ONLY the rewritten thesis. No other text. /no_think"""
|
|
|
|
return {
|
|
"system": THESIS_SYSTEM_PROMPT,
|
|
"user": user_prompt,
|
|
}
|
|
|
|
|
|
async def rewrite_thesis_with_llm(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
config: OllamaConfig,
|
|
http_client: httpx.AsyncClient | None = None,
|
|
pool: asyncpg.Pool | None = None,
|
|
) -> str:
|
|
"""Rewrite a deterministic thesis using a local Ollama model.
|
|
|
|
If the LLM call fails for any reason, returns the original
|
|
deterministic thesis unchanged. This ensures the LLM layer is
|
|
purely additive and never blocks recommendation generation.
|
|
|
|
Resolves runtime config for the "thesis-rewriter" agent slug from
|
|
the database when a pool is provided, preferring an active variant's
|
|
model, timeout, and max_retries. Falls back to the passed
|
|
OllamaConfig if resolution fails.
|
|
|
|
Args:
|
|
deterministic_thesis: The rule-based thesis string.
|
|
summary: The trend summary that produced the thesis.
|
|
config: Ollama connection and model configuration.
|
|
http_client: Optional shared HTTP client for connection reuse.
|
|
pool: Optional asyncpg pool for config resolution and performance logging.
|
|
|
|
Returns:
|
|
The LLM-rewritten thesis on success, or the original on failure.
|
|
"""
|
|
start_time = time.monotonic()
|
|
|
|
# Resolve thesis-rewriter config from DB for variant override
|
|
resolved: ResolvedAgentConfig | None = None
|
|
effective_config: OllamaConfig | VLLMConfig = config
|
|
use_vllm = False
|
|
if pool is not None:
|
|
try:
|
|
resolver = AgentConfigResolver(pool, ttl_seconds=60)
|
|
resolved = await resolver.resolve("thesis-rewriter")
|
|
if resolved is not None:
|
|
provider = (resolved.model_provider or "").strip().lower()
|
|
if provider == "vllm":
|
|
use_vllm = True
|
|
# Import load_config to get vllm base_url from env
|
|
from services.shared.config import load_config as _load_config
|
|
_cfg = _load_config()
|
|
effective_config = VLLMConfig(
|
|
base_url=_cfg.vllm.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
max_tokens=resolved.max_tokens,
|
|
temperature=0.0,
|
|
)
|
|
else:
|
|
effective_config = OllamaConfig(
|
|
base_url=config.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
retry_base_delay=config.retry_base_delay,
|
|
retry_max_delay=config.retry_max_delay,
|
|
retry_backoff_multiplier=config.retry_backoff_multiplier,
|
|
max_tokens=resolved.max_tokens,
|
|
context_window=resolved.context_window,
|
|
)
|
|
logger.info(
|
|
"Thesis rewriter using resolved config: model=%s variant=%s provider=%s",
|
|
resolved.model_name, resolved.variant_id, provider or "ollama",
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to resolve thesis-rewriter config — using passed config",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Token budget enforcement
|
|
if (
|
|
resolved is not None
|
|
and resolved.token_budget > 0
|
|
and resolved.variant_id is not None
|
|
and pool is not None
|
|
):
|
|
try:
|
|
row = await pool.fetchrow(
|
|
"""SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens
|
|
FROM agent_performance_log
|
|
WHERE variant_id = $1
|
|
AND recorded_at >= NOW() - INTERVAL '1 hour'""",
|
|
resolved.variant_id,
|
|
)
|
|
used = int(row["total_tokens"]) if row else 0
|
|
if used >= resolved.token_budget:
|
|
logger.warning(
|
|
"Token budget exceeded for thesis-rewriter variant %s: used %d / budget %d",
|
|
resolved.variant_id, used, resolved.token_budget,
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
logger.warning("Failed to check token budget for thesis-rewriter", exc_info=True)
|
|
|
|
prompts = build_thesis_rewrite_prompt(deterministic_thesis, summary)
|
|
|
|
# Override system prompt from resolved config if available
|
|
if resolved is not None and resolved.system_prompt:
|
|
prompts["system"] = resolved.system_prompt
|
|
|
|
owns_client = http_client is None
|
|
client = http_client or httpx.AsyncClient(timeout=effective_config.timeout)
|
|
|
|
try:
|
|
if use_vllm:
|
|
rewritten = await _call_vllm_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
else:
|
|
rewritten = await _call_ollama_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
|
|
if rewritten:
|
|
logger.info(
|
|
"LLM thesis rewrite succeeded for %s (%d chars → %d chars)",
|
|
summary.entity_id,
|
|
len(deterministic_thesis),
|
|
len(rewritten),
|
|
)
|
|
# Log success to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=True,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=len(rewritten) // 4,
|
|
)
|
|
return rewritten
|
|
|
|
logger.warning(
|
|
"LLM thesis rewrite returned empty for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
# Log failure to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="empty_response",
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
logger.exception(
|
|
"LLM thesis rewrite failed for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="exception",
|
|
)
|
|
return deterministic_thesis
|
|
finally:
|
|
if owns_client:
|
|
await client.aclose()
|
|
|
|
|
|
async def _log_thesis_performance(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
resolved: ResolvedAgentConfig,
|
|
ticker: str,
|
|
success: bool,
|
|
duration_ms: int,
|
|
input_tokens: int = 0,
|
|
output_tokens: int = 0,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
"""Insert a performance log entry for the thesis rewriter agent."""
|
|
try:
|
|
await pool.execute(
|
|
"""INSERT INTO agent_performance_log
|
|
(agent_id, variant_id, document_id, ticker, success,
|
|
duration_ms, confidence, retry_count,
|
|
input_tokens, output_tokens, error_message)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
|
|
resolved.agent_id,
|
|
resolved.variant_id,
|
|
None, # no document_id for thesis rewrites
|
|
ticker,
|
|
success,
|
|
duration_ms,
|
|
0.0, # no confidence score for rewrites
|
|
0,
|
|
input_tokens,
|
|
output_tokens,
|
|
error_message,
|
|
)
|
|
except Exception:
|
|
logger.warning("Failed to log thesis-rewriter performance", exc_info=True)
|
|
|
|
|
|
async def _call_ollama_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: OllamaConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a single Ollama chat call for thesis rewriting.
|
|
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"stream": False,
|
|
}
|
|
|
|
# Support context_window override via num_ctx (Requirement 10.4)
|
|
options: dict[str, object] = {}
|
|
if config.context_window > 0:
|
|
options["num_ctx"] = config.context_window
|
|
# Disable thinking/reasoning mode for models that support it (e.g. Qwen3)
|
|
options["num_predict"] = options.get("num_predict", 512)
|
|
if options:
|
|
payload["options"] = options
|
|
|
|
# Qwen3 thinking mode control: /no_think suffix or think parameter
|
|
payload["think"] = False
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/api/chat",
|
|
json=payload,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
msg = body.get("message")
|
|
content: str = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"Ollama thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|
|
|
|
|
|
def _strip_thinking_block(text: str) -> str:
|
|
"""Remove thinking/reasoning blocks from model output.
|
|
|
|
Some models (e.g. Qwen) emit chain-of-thought either in <think> XML tags
|
|
or as plain-text "Thinking Process:" blocks before the actual response.
|
|
This strips both patterns to return only the final thesis text.
|
|
"""
|
|
import re
|
|
# Remove <think>...</think> blocks (greedy, handles multiline)
|
|
cleaned = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
|
|
# Handle unclosed <think> tag (model cut off mid-thought)
|
|
cleaned = re.sub(r"<think>.*", "", cleaned, flags=re.DOTALL)
|
|
# Remove plain-text "Thinking Process:" blocks followed by the actual thesis
|
|
# Pattern: everything from "Thinking Process:" up to "</think>" or the final
|
|
# clean thesis (identified by the last paragraph that doesn't start with numbering/bullets)
|
|
cleaned = re.sub(
|
|
r"(?:Thinking Process:|Thought Process:|Chain of Thought:).*?(?=\n[A-Z]{2,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?))",
|
|
"",
|
|
cleaned,
|
|
flags=re.DOTALL | re.IGNORECASE,
|
|
)
|
|
# Fallback: if "Thinking Process:" still present, take only text after last "</think>" or
|
|
# after the thinking block ends (heuristic: last substantial paragraph)
|
|
if "thinking process:" in cleaned.lower():
|
|
# Find the actual thesis — it's typically the last coherent paragraph
|
|
# that starts with a ticker symbol pattern
|
|
match = re.search(
|
|
r"\n([A-Z]{1,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?)\s.+)",
|
|
cleaned,
|
|
flags=re.DOTALL,
|
|
)
|
|
if match:
|
|
cleaned = match.group(1)
|
|
return cleaned.strip()
|
|
|
|
|
|
async def _call_vllm_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: VLLMConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a vLLM chat completion call for thesis rewriting.
|
|
|
|
Uses the OpenAI-compatible /v1/chat/completions endpoint.
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload: dict[str, object] = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"max_tokens": config.max_tokens,
|
|
"temperature": config.temperature,
|
|
"stream": False,
|
|
# Disable thinking/reasoning mode for Qwen3 models on vLLM
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
}
|
|
|
|
headers: dict[str, str] = {"Content-Type": "application/json"}
|
|
if config.api_key:
|
|
headers["Authorization"] = f"Bearer {config.api_key}"
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/v1/chat/completions",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
choices = body.get("choices", [])
|
|
content: str = ""
|
|
if choices and isinstance(choices, list):
|
|
msg = choices[0].get("message", {}) # type: ignore[union-attr]
|
|
content = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"vLLM thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|