diff --git a/infra/migrations/026_ai_agents.sql b/infra/migrations/026_ai_agents.sql index fa70213..ed79dd0 100644 --- a/infra/migrations/026_ai_agents.sql +++ b/infra/migrations/026_ai_agents.sql @@ -8,7 +8,7 @@ CREATE TABLE IF NOT EXISTS ai_agents ( slug VARCHAR(100) NOT NULL UNIQUE, purpose TEXT NOT NULL DEFAULT '', model_provider VARCHAR(50) NOT NULL DEFAULT 'ollama', - model_name VARCHAR(200) NOT NULL DEFAULT 'llama3.1:8b', + model_name VARCHAR(200) NOT NULL DEFAULT 'qwen3.5:9b', system_prompt TEXT NOT NULL DEFAULT '', user_prompt_template TEXT NOT NULL DEFAULT '', prompt_version VARCHAR(100) NOT NULL DEFAULT '', @@ -34,7 +34,7 @@ SELECT * FROM (VALUES 'document-extractor', 'Extracts structured intelligence (sentiment, catalysts, impact scores, key facts, risks) from company news, SEC filings, earnings transcripts, and press releases.', 'ollama', - 'llama3.1:8b', + 'qwen3.5:9b', 'You are a financial document analyst. Extract structured data as JSON. Return ONLY a single JSON object. No markdown fences, no explanation, no text before or after the JSON. Every field in the schema is required. Use "other" for catalyst_type if unsure. Keep evidence_spans short (under 20 words each). Keep key_facts to 3-5 items max.', 'document-intel-v2', '2.0.0', @@ -45,8 +45,8 @@ SELECT * FROM (VALUES 'event-classifier', 'Classifies global/geopolitical news into structured macro events with impact type, severity, affected regions/sectors/commodities, and estimated duration.', 'ollama', - 'llama3.1:8b', - 'Classify this global news article as a macro event. Fill every field. RULES: - Only extract facts EXPLICITLY stated in the article - Do NOT infer geopolitical implications not stated - Distinguish between announced policy and rumored policy - If severity is unclear, default to "low" - confidence: 0.0-1.0 your confidence in this classification', + 'qwen3.5:9b', + 'Classify this global news article as a macro event. Fill every field. RULES: - Only extract facts EXPLICITLY stated in the article - Do NOT infer geopolitical implications not stated - Distinguish between announced policy and rumored policy - If severity is unclear, default to "low" - If the article is about a SINGLE COMPANY (not a sector or market), set severity to "low" and confidence below 0.3 - Only tag event_types DIRECTLY described, do NOT infer secondary effects - severity "critical" is reserved for events affecting multiple countries or entire global markets - confidence: 0.0-1.0 your confidence in this classification', 'event-classification-v1', '1.0.0', 'system' @@ -56,7 +56,7 @@ SELECT * FROM (VALUES 'thesis-rewriter', 'Rewrites deterministic trade thesis summaries into clear, professional analyst prose. Optional layer — system falls back to deterministic thesis if this fails.', 'ollama', - 'llama3.1:8b', + 'qwen3.5:9b', 'You are a concise financial analyst. You rewrite structured trade thesis summaries into clear, professional prose suitable for an internal research note. STRICT RULES: 1. Do NOT add any information not present in the input. 2. Do NOT fabricate numbers, dates, company names. 3. Keep under 150 words. 4. Preserve all factual claims, risk notes, evidence counts. 5. Neutral, professional tone. 6. Return ONLY the rewritten thesis text.', 'thesis-rewrite-v1', '1.0.0', diff --git a/services/extractor/event_classifier.py b/services/extractor/event_classifier.py index 29e7068..57a29b4 100644 --- a/services/extractor/event_classifier.py +++ b/services/extractor/event_classifier.py @@ -14,6 +14,7 @@ from __future__ import annotations import asyncio import json import logging +import time import uuid from dataclasses import dataclass, field from datetime import datetime, timezone @@ -22,6 +23,7 @@ from typing import Any import asyncpg from minio import Minio +from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig from services.shared.schemas import ( EstimatedDuration, ImpactType, @@ -183,15 +185,28 @@ def get_event_json_schema() -> dict[str, Any]: # --------------------------------------------------------------------------- _SYSTEM_PROMPT = """\ -You classify global news into structured macro event JSON. \ +You classify MACRO-LEVEL global news into structured event JSON. \ Return ONLY a single JSON object. No markdown, no explanation. \ -Every field is required. Keep key_facts to 3-5 items. Keep summary under 3 sentences.""" +Every field is required. Keep key_facts to 3-5 items. Keep summary under 3 sentences. + +CRITICAL: Only classify articles about MACRO events that affect entire markets, \ +sectors, or economies. Examples: trade wars, interest rate changes, commodity \ +supply disruptions, regulatory changes, geopolitical conflicts, natural disasters. + +DO NOT classify as macro events: individual company earnings, lawsuits against \ +a single company, single-company management changes, individual stock analysis, \ +company-specific debt or bankruptcy, product launches by one company. \ +For these, set severity to "low", confidence below 0.3, and leave \ +affected_regions, affected_sectors, and affected_commodities as empty arrays.""" _ANTI_HALLUCINATION_RULES = """\ RULES: - Only extract facts EXPLICITLY stated in the text. Do NOT fabricate. - If vague or speculative, set confidence below 0.4. -- Distinguish announced policy from rumored policy.""" +- Distinguish announced policy from rumored policy. +- If the article is about a SINGLE COMPANY (not a sector or market), set severity to "low" and confidence below 0.3. +- Only tag event_types that are DIRECTLY described in the article. Do NOT infer secondary effects. +- severity "critical" is reserved for events affecting multiple countries or entire global markets.""" def build_event_classification_prompt(text: str) -> dict[str, str]: @@ -447,6 +462,11 @@ async def classify_global_event( dedicated event classification prompt and JSON schema. Follows the same retry policy as document extraction. + Resolves runtime config for the "event-classifier" agent slug from + the database, preferring an active variant's model_name and + system_prompt if one exists. Falls back to the OllamaClient's + existing config if resolution fails. + Persists prompt, raw output, and final event to MinIO and PostgreSQL when the respective clients are provided. @@ -464,10 +484,40 @@ async def classify_global_event( ValueError: If classification fails after all retries. """ ts = datetime.now(timezone.utc) + start_time = time.monotonic() + + # Resolve event-classifier config from DB for variant override + resolved: ResolvedAgentConfig | None = None + if pool is not None: + try: + resolver = AgentConfigResolver(pool, ttl_seconds=60) + resolved = await resolver.resolve("event-classifier") + except Exception: + logger.warning( + "Failed to resolve event-classifier config — using defaults", + exc_info=True, + ) + prompts = build_event_classification_prompt(normalized_text) json_schema = get_event_json_schema() model_name = ollama_client._config.model + # Override model_name and system_prompt from resolved config + if resolved is not None: + model_name = resolved.model_name + if resolved.system_prompt: + prompts["system"] = resolved.system_prompt + + # Input token limit truncation + if resolved is not None and resolved.input_token_limit > 0: + max_chars = resolved.input_token_limit * 4 + if len(normalized_text) > max_chars: + normalized_text = normalized_text[:max_chars] + # Rebuild prompts with truncated text + prompts = build_event_classification_prompt(normalized_text) + if resolved.system_prompt: + prompts["system"] = resolved.system_prompt + # Persist prompt to MinIO if minio_client: try: @@ -480,6 +530,8 @@ async def classify_global_event( # Call Ollama using the client's internal _call_ollama method # We reuse the retry logic pattern from OllamaClient.extract() max_retries = ollama_client._max_retries + if resolved is not None: + max_retries = resolved.max_retries last_error: str | None = None raw_output = "" @@ -515,6 +567,36 @@ async def classify_global_event( "Failed to persist global event for doc %s", document_id, ) + # Log to agent_performance_log with variant attribution + if pool is not None and resolved is not None: + duration_ms = int((time.monotonic() - start_time) * 1000) + output_tokens = len(raw_output) // 4 if raw_output else 0 + try: + await pool.execute( + """INSERT INTO agent_performance_log + (agent_id, variant_id, document_id, ticker, success, + duration_ms, confidence, retry_count, + input_tokens, output_tokens, error_message) + VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, + $6, $7, $8, $9, $10, $11)""", + resolved.agent_id, + resolved.variant_id, + document_id, + "", + True, + duration_ms, + event.confidence, + attempt_num, + len(normalized_text) // 4, + output_tokens, + None, + ) + except Exception: + logger.warning( + "Failed to log event-classifier performance for doc %s", + document_id, exc_info=True, + ) + return event except (json.JSONDecodeError, KeyError, TypeError) as exc: @@ -538,7 +620,35 @@ async def classify_global_event( ) await asyncio.sleep(delay) - # All retries exhausted — persist failure and raise + # All retries exhausted — log failure performance and persist + if pool is not None and resolved is not None: + duration_ms = int((time.monotonic() - start_time) * 1000) + try: + await pool.execute( + """INSERT INTO agent_performance_log + (agent_id, variant_id, document_id, ticker, success, + duration_ms, confidence, retry_count, + input_tokens, output_tokens, error_message) + VALUES ($1::uuid, $2::uuid, $3::uuid, $4, $5, + $6, $7, $8, $9, $10, $11)""", + resolved.agent_id, + resolved.variant_id, + document_id, + "", + False, + duration_ms, + 0.0, + max_retries + 1, + len(normalized_text) // 4, + 0, + last_error, + ) + except Exception: + logger.warning( + "Failed to log event-classifier failure performance for doc %s", + document_id, exc_info=True, + ) + if minio_client: try: _upload_classification_result( diff --git a/services/shared/config.py b/services/shared/config.py index 7b74bff..8175c97 100644 --- a/services/shared/config.py +++ b/services/shared/config.py @@ -41,7 +41,7 @@ class MinioConfig: @dataclass class OllamaConfig: base_url: str = "http://localhost:11434" - model: str = "llama3.1:8b" + model: str = "qwen3.5:9b" timeout: int = 120 max_retries: int = 2 retry_base_delay: float = 1.0 @@ -51,6 +51,7 @@ class OllamaConfig: stall_timeout: float = 30.0 loop_window: int = 64 loop_threshold: float = 0.5 + context_window: int = 0 # Ollama num_ctx; 0 = use model default @dataclass @@ -240,7 +241,7 @@ def load_config() -> AppConfig: ), ollama=OllamaConfig( base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"), - model=os.getenv("OLLAMA_MODEL", "llama3.1:8b"), + model=os.getenv("OLLAMA_MODEL", "qwen3.5:9b"), timeout=int(os.getenv("OLLAMA_TIMEOUT", "120")), max_retries=int(os.getenv("OLLAMA_MAX_RETRIES", "2")), retry_base_delay=float(os.getenv("OLLAMA_RETRY_BASE_DELAY", "1.0")), diff --git a/tests/test_event_classifier.py b/tests/test_event_classifier.py index 1efb81e..fc14c8d 100644 --- a/tests/test_event_classifier.py +++ b/tests/test_event_classifier.py @@ -159,7 +159,7 @@ class TestBuildEventClassificationPrompt: def test_system_prompt_is_concise(self): result = build_event_classification_prompt("text") assert "JSON" in result["system"] - assert len(result["system"]) < 300 + assert len(result["system"]) < 1000 # expanded to include macro-vs-company filtering rules def test_user_prompt_lists_impact_types(self): result = build_event_classification_prompt("text")