007189c0a5
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
The model outputs 'Thinking Process:' as plain text (not in <think> tags). Updated _strip_thinking_block to handle both XML tags and plain-text reasoning patterns. Also: - Added rule 7 to system prompt: 'Do NOT show your thinking process' - Set think=False in Ollama payload to disable Qwen3 thinking mode - Added fallback regex to extract thesis from after thinking blocks
437 lines
16 KiB
Python
437 lines
16 KiB
Python
"""Optional LLM wording layer for thesis generation.
|
|
|
|
Takes a deterministic thesis string (built from trend data and eligibility
|
|
rules) and rewrites it into natural, analyst-quality prose using a local
|
|
Ollama model. The deterministic thesis is always preserved as the fallback
|
|
and audit reference.
|
|
|
|
This module is opt-in: callers must explicitly request LLM rewriting.
|
|
If the LLM call fails or is disabled, the original deterministic thesis
|
|
is returned unchanged.
|
|
|
|
Requirements: 7.1, 7.2
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
import asyncpg
|
|
import httpx
|
|
|
|
from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig
|
|
from services.shared.config import OllamaConfig, VLLMConfig
|
|
from services.shared.schemas import TrendSummary
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
THESIS_PROMPT_VERSION = "thesis-rewrite-v1"
|
|
|
|
THESIS_SYSTEM_PROMPT = """\
|
|
You are a concise financial analyst. You rewrite structured trade thesis \
|
|
summaries into clear, professional prose suitable for an internal research note.
|
|
|
|
STRICT RULES:
|
|
1. Do NOT add any information that is not present in the input.
|
|
2. Do NOT fabricate numbers, dates, company names, or analyst opinions.
|
|
3. Keep the rewrite under 150 words.
|
|
4. Preserve all factual claims, risk notes, and evidence counts from the input.
|
|
5. Use a neutral, professional tone. Avoid hype or marketing language.
|
|
6. Return ONLY the rewritten thesis text. No JSON, no markdown, no commentary.
|
|
7. Do NOT show your thinking process. Do NOT include any reasoning steps. Output ONLY the final rewritten text."""
|
|
|
|
|
|
def build_thesis_rewrite_prompt(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
) -> dict[str, str]:
|
|
"""Build system and user prompts for thesis rewriting.
|
|
|
|
Provides the model with the deterministic thesis and key trend
|
|
context so it can produce a natural-language version.
|
|
"""
|
|
context_parts = [
|
|
f"Ticker: {summary.entity_id}",
|
|
f"Window: {summary.window.value}",
|
|
f"Direction: {summary.trend_direction.value}",
|
|
f"Strength: {summary.trend_strength:.2f}",
|
|
f"Confidence: {summary.confidence:.2f}",
|
|
f"Contradiction score: {summary.contradiction_score:.2f}",
|
|
]
|
|
if summary.dominant_catalysts:
|
|
context_parts.append(f"Catalysts: {', '.join(summary.dominant_catalysts[:3])}")
|
|
if summary.material_risks:
|
|
context_parts.append(f"Risks: {'; '.join(summary.material_risks[:2])}")
|
|
|
|
context_block = "\n".join(context_parts)
|
|
|
|
user_prompt = f"""\
|
|
Rewrite the following structured thesis into clear, professional analyst prose.
|
|
|
|
--- STRUCTURED THESIS ---
|
|
{deterministic_thesis}
|
|
--- END STRUCTURED THESIS ---
|
|
|
|
--- CONTEXT ---
|
|
{context_block}
|
|
--- END CONTEXT ---
|
|
|
|
Return ONLY the rewritten thesis. No other text."""
|
|
|
|
return {
|
|
"system": THESIS_SYSTEM_PROMPT,
|
|
"user": user_prompt,
|
|
}
|
|
|
|
|
|
async def rewrite_thesis_with_llm(
|
|
deterministic_thesis: str,
|
|
summary: TrendSummary,
|
|
config: OllamaConfig,
|
|
http_client: httpx.AsyncClient | None = None,
|
|
pool: asyncpg.Pool | None = None,
|
|
) -> str:
|
|
"""Rewrite a deterministic thesis using a local Ollama model.
|
|
|
|
If the LLM call fails for any reason, returns the original
|
|
deterministic thesis unchanged. This ensures the LLM layer is
|
|
purely additive and never blocks recommendation generation.
|
|
|
|
Resolves runtime config for the "thesis-rewriter" agent slug from
|
|
the database when a pool is provided, preferring an active variant's
|
|
model, timeout, and max_retries. Falls back to the passed
|
|
OllamaConfig if resolution fails.
|
|
|
|
Args:
|
|
deterministic_thesis: The rule-based thesis string.
|
|
summary: The trend summary that produced the thesis.
|
|
config: Ollama connection and model configuration.
|
|
http_client: Optional shared HTTP client for connection reuse.
|
|
pool: Optional asyncpg pool for config resolution and performance logging.
|
|
|
|
Returns:
|
|
The LLM-rewritten thesis on success, or the original on failure.
|
|
"""
|
|
start_time = time.monotonic()
|
|
|
|
# Resolve thesis-rewriter config from DB for variant override
|
|
resolved: ResolvedAgentConfig | None = None
|
|
effective_config: OllamaConfig | VLLMConfig = config
|
|
use_vllm = False
|
|
if pool is not None:
|
|
try:
|
|
resolver = AgentConfigResolver(pool, ttl_seconds=60)
|
|
resolved = await resolver.resolve("thesis-rewriter")
|
|
if resolved is not None:
|
|
provider = (resolved.model_provider or "").strip().lower()
|
|
if provider == "vllm":
|
|
use_vllm = True
|
|
# Import load_config to get vllm base_url from env
|
|
from services.shared.config import load_config as _load_config
|
|
_cfg = _load_config()
|
|
effective_config = VLLMConfig(
|
|
base_url=_cfg.vllm.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
max_tokens=resolved.max_tokens,
|
|
temperature=0.0,
|
|
)
|
|
else:
|
|
effective_config = OllamaConfig(
|
|
base_url=config.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
retry_base_delay=config.retry_base_delay,
|
|
retry_max_delay=config.retry_max_delay,
|
|
retry_backoff_multiplier=config.retry_backoff_multiplier,
|
|
max_tokens=resolved.max_tokens,
|
|
context_window=resolved.context_window,
|
|
)
|
|
logger.info(
|
|
"Thesis rewriter using resolved config: model=%s variant=%s provider=%s",
|
|
resolved.model_name, resolved.variant_id, provider or "ollama",
|
|
)
|
|
except Exception:
|
|
logger.warning(
|
|
"Failed to resolve thesis-rewriter config — using passed config",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Token budget enforcement
|
|
if (
|
|
resolved is not None
|
|
and resolved.token_budget > 0
|
|
and resolved.variant_id is not None
|
|
and pool is not None
|
|
):
|
|
try:
|
|
row = await pool.fetchrow(
|
|
"""SELECT COALESCE(SUM(input_tokens + output_tokens), 0) AS total_tokens
|
|
FROM agent_performance_log
|
|
WHERE variant_id = $1
|
|
AND recorded_at >= NOW() - INTERVAL '1 hour'""",
|
|
resolved.variant_id,
|
|
)
|
|
used = int(row["total_tokens"]) if row else 0
|
|
if used >= resolved.token_budget:
|
|
logger.warning(
|
|
"Token budget exceeded for thesis-rewriter variant %s: used %d / budget %d",
|
|
resolved.variant_id, used, resolved.token_budget,
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
logger.warning("Failed to check token budget for thesis-rewriter", exc_info=True)
|
|
|
|
prompts = build_thesis_rewrite_prompt(deterministic_thesis, summary)
|
|
|
|
# Override system prompt from resolved config if available
|
|
if resolved is not None and resolved.system_prompt:
|
|
prompts["system"] = resolved.system_prompt
|
|
|
|
owns_client = http_client is None
|
|
client = http_client or httpx.AsyncClient(timeout=effective_config.timeout)
|
|
|
|
try:
|
|
if use_vllm:
|
|
rewritten = await _call_vllm_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
else:
|
|
rewritten = await _call_ollama_thesis(client, effective_config, prompts) # type: ignore[arg-type]
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
|
|
if rewritten:
|
|
logger.info(
|
|
"LLM thesis rewrite succeeded for %s (%d chars → %d chars)",
|
|
summary.entity_id,
|
|
len(deterministic_thesis),
|
|
len(rewritten),
|
|
)
|
|
# Log success to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=True,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=len(rewritten) // 4,
|
|
)
|
|
return rewritten
|
|
|
|
logger.warning(
|
|
"LLM thesis rewrite returned empty for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
# Log failure to agent_performance_log
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="empty_response",
|
|
)
|
|
return deterministic_thesis
|
|
except Exception:
|
|
duration_ms = int((time.monotonic() - start_time) * 1000)
|
|
logger.exception(
|
|
"LLM thesis rewrite failed for %s — using deterministic thesis",
|
|
summary.entity_id,
|
|
)
|
|
if pool is not None and resolved is not None:
|
|
await _log_thesis_performance(
|
|
pool,
|
|
resolved=resolved,
|
|
ticker=summary.entity_id,
|
|
success=False,
|
|
duration_ms=duration_ms,
|
|
input_tokens=len(deterministic_thesis) // 4,
|
|
output_tokens=0,
|
|
error_message="exception",
|
|
)
|
|
return deterministic_thesis
|
|
finally:
|
|
if owns_client:
|
|
await client.aclose()
|
|
|
|
|
|
async def _log_thesis_performance(
|
|
pool: asyncpg.Pool,
|
|
*,
|
|
resolved: ResolvedAgentConfig,
|
|
ticker: str,
|
|
success: bool,
|
|
duration_ms: int,
|
|
input_tokens: int = 0,
|
|
output_tokens: int = 0,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
"""Insert a performance log entry for the thesis rewriter agent."""
|
|
try:
|
|
await pool.execute(
|
|
"""INSERT INTO agent_performance_log
|
|
(agent_id, variant_id, document_id, ticker, success,
|
|
duration_ms, confidence, retry_count,
|
|
input_tokens, output_tokens, error_message)
|
|
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
|
|
resolved.agent_id,
|
|
resolved.variant_id,
|
|
None, # no document_id for thesis rewrites
|
|
ticker,
|
|
success,
|
|
duration_ms,
|
|
0.0, # no confidence score for rewrites
|
|
0,
|
|
input_tokens,
|
|
output_tokens,
|
|
error_message,
|
|
)
|
|
except Exception:
|
|
logger.warning("Failed to log thesis-rewriter performance", exc_info=True)
|
|
|
|
|
|
async def _call_ollama_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: OllamaConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a single Ollama chat call for thesis rewriting.
|
|
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"stream": False,
|
|
}
|
|
|
|
# Support context_window override via num_ctx (Requirement 10.4)
|
|
options: dict[str, object] = {}
|
|
if config.context_window > 0:
|
|
options["num_ctx"] = config.context_window
|
|
# Disable thinking/reasoning mode for models that support it (e.g. Qwen3)
|
|
options["num_predict"] = options.get("num_predict", 512)
|
|
if options:
|
|
payload["options"] = options
|
|
|
|
# Qwen3 thinking mode control: /no_think suffix or think parameter
|
|
payload["think"] = False
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/api/chat",
|
|
json=payload,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
msg = body.get("message")
|
|
content: str = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"Ollama thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|
|
|
|
|
|
def _strip_thinking_block(text: str) -> str:
|
|
"""Remove thinking/reasoning blocks from model output.
|
|
|
|
Some models (e.g. Qwen) emit chain-of-thought either in <think> XML tags
|
|
or as plain-text "Thinking Process:" blocks before the actual response.
|
|
This strips both patterns to return only the final thesis text.
|
|
"""
|
|
import re
|
|
# Remove <think>...</think> blocks (greedy, handles multiline)
|
|
cleaned = re.sub(r"<think>.*?</think>\s*", "", text, flags=re.DOTALL)
|
|
# Handle unclosed <think> tag (model cut off mid-thought)
|
|
cleaned = re.sub(r"<think>.*", "", cleaned, flags=re.DOTALL)
|
|
# Remove plain-text "Thinking Process:" blocks followed by the actual thesis
|
|
# Pattern: everything from "Thinking Process:" up to "</think>" or the final
|
|
# clean thesis (identified by the last paragraph that doesn't start with numbering/bullets)
|
|
cleaned = re.sub(
|
|
r"(?:Thinking Process:|Thought Process:|Chain of Thought:).*?(?=\n[A-Z]{2,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?))",
|
|
"",
|
|
cleaned,
|
|
flags=re.DOTALL | re.IGNORECASE,
|
|
)
|
|
# Fallback: if "Thinking Process:" still present, take only text after last "</think>" or
|
|
# after the thinking block ends (heuristic: last substantial paragraph)
|
|
if "thinking process:" in cleaned.lower():
|
|
# Find the actual thesis — it's typically the last coherent paragraph
|
|
# that starts with a ticker symbol pattern
|
|
match = re.search(
|
|
r"\n([A-Z]{1,5}\s+(?:shows?|demonstrates?|exhibits?|displays?|maintains?)\s.+)",
|
|
cleaned,
|
|
flags=re.DOTALL,
|
|
)
|
|
if match:
|
|
cleaned = match.group(1)
|
|
return cleaned.strip()
|
|
|
|
|
|
async def _call_vllm_thesis(
|
|
client: httpx.AsyncClient,
|
|
config: VLLMConfig,
|
|
prompts: dict[str, str],
|
|
) -> str:
|
|
"""Make a vLLM chat completion call for thesis rewriting.
|
|
|
|
Uses the OpenAI-compatible /v1/chat/completions endpoint.
|
|
Returns the model's text response, or empty string on failure.
|
|
"""
|
|
start = time.monotonic()
|
|
|
|
payload: dict[str, object] = {
|
|
"model": config.model,
|
|
"messages": [
|
|
{"role": "system", "content": prompts["system"]},
|
|
{"role": "user", "content": prompts["user"]},
|
|
],
|
|
"max_tokens": config.max_tokens,
|
|
"temperature": config.temperature,
|
|
"stream": False,
|
|
}
|
|
|
|
headers: dict[str, str] = {"Content-Type": "application/json"}
|
|
if config.api_key:
|
|
headers["Authorization"] = f"Bearer {config.api_key}"
|
|
|
|
resp = await client.post(
|
|
f"{config.base_url}/v1/chat/completions",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
_ = resp.raise_for_status()
|
|
|
|
duration_ms = int((time.monotonic() - start) * 1000)
|
|
|
|
body: dict[str, object] = resp.json()
|
|
choices = body.get("choices", [])
|
|
content: str = ""
|
|
if choices and isinstance(choices, list):
|
|
msg = choices[0].get("message", {}) # type: ignore[union-attr]
|
|
content = msg.get("content", "") if isinstance(msg, dict) else ""
|
|
|
|
logger.debug(
|
|
"vLLM thesis call completed in %dms, response length=%d",
|
|
duration_ms,
|
|
len(content),
|
|
)
|
|
|
|
return _strip_thinking_block(content.strip())
|