Files
stonks-oracle/services/reporting/summarizer.py
T
Celes Renata bc077bfcc8
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: trading feedback engine — periodic performance reports with AI summarization
- Migration 038: trading_reports table + report-summarizer agent seed
- 6 reporting modules: models, collector, sections, validator, summarizer, generator
- API endpoints: GET /api/reports (paginated, filterable), GET /api/reports/{id}
- Frontend hooks: useReports, useReport with TanStack Query
- Scheduler: daily (after 16:30 ET) and weekly (Saturday) report triggers
- Redis queue consumer for async report generation with retry/dedup
- 5 property-based tests (chunking, serialization, validation, accuracy, deltas)
- 109 unit/integration tests across all modules
- 6 frontend hook tests with MSW mocks
2026-05-01 22:13:09 +00:00

438 lines
15 KiB
Python

"""AI-powered report summarizer with chunking and deterministic fallback.
Generates natural-language summaries for trading performance report sections
using the Report_Summarizer_Agent (resolved via AgentConfigResolver + llm_factory).
Data is chunked to fit within the 8k-token context window of the local model.
Requirements: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.6
Design: AI Summarizer
"""
from __future__ import annotations
import json
import logging
import time
import asyncpg
from services.extractor.llm_factory import build_llm_client
from services.shared.agent_config import AgentConfigResolver, ResolvedAgentConfig
from services.shared.config import load_config
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
CHUNK_SIZE_LIMIT = 6000 # characters per chunk
MAX_SUMMARY_WORDS = 200 # per section summary
MAX_EXECUTIVE_SUMMARY_WORDS = 300
_REPORT_SUMMARIZER_SLUG = "report-summarizer"
# ---------------------------------------------------------------------------
# Chunking
# ---------------------------------------------------------------------------
def chunk_data(serialized: str, max_chars: int = CHUNK_SIZE_LIMIT) -> list[str]:
"""Split serialized data into chunks of at most *max_chars* characters.
Splits on newline boundaries to avoid breaking JSON structures.
Each chunk is ≤ *max_chars* characters. Returns at least one chunk
(even for empty input).
Round-trip property: ``"".join(chunk_data(s, n)) == s`` for all *s*.
If a single line (including its trailing newline) exceeds *max_chars*,
it is included as its own chunk (we never break mid-line).
"""
if not serialized:
return [""]
# Split into segments where each segment includes its trailing "\n"
# (except possibly the last one if the string doesn't end with "\n").
# This preserves the exact original when chunks are concatenated.
segments: list[str] = []
start = 0
while start < len(serialized):
nl = serialized.find("\n", start)
if nl == -1:
# Last segment, no trailing newline
segments.append(serialized[start:])
break
else:
# Include the newline in this segment
segments.append(serialized[start : nl + 1])
start = nl + 1
chunks: list[str] = []
current_parts: list[str] = []
current_len = 0
for segment in segments:
if current_parts and current_len + len(segment) > max_chars:
# Flush current chunk
chunks.append("".join(current_parts))
current_parts = [segment]
current_len = len(segment)
else:
current_parts.append(segment)
current_len += len(segment)
# Flush remaining
if current_parts:
chunks.append("".join(current_parts))
return chunks if chunks else [""]
# ---------------------------------------------------------------------------
# Performance logging
# ---------------------------------------------------------------------------
async def _log_performance(
pool: asyncpg.Pool,
resolved: ResolvedAgentConfig,
success: bool,
duration_ms: int,
input_text: str,
output_text: str,
error_message: str | None = None,
) -> None:
"""Insert a row into agent_performance_log for a summarizer invocation."""
try:
await pool.execute(
"""INSERT INTO agent_performance_log
(agent_id, variant_id, document_id, ticker, success,
duration_ms, confidence, retry_count,
input_tokens, output_tokens, error_message)
VALUES ($1::uuid, $2::uuid, $3, $4, $5, $6, $7, $8, $9, $10, $11)""",
resolved.agent_id,
resolved.variant_id,
None, # no document_id for report summaries
None, # no ticker for report summaries
success,
duration_ms,
0.0, # no confidence score for summaries
0,
len(input_text) // 4, # token estimate
len(output_text) // 4, # token estimate
error_message,
)
except Exception:
logger.warning("Failed to log summarizer performance", exc_info=True)
# ---------------------------------------------------------------------------
# LLM summarization helpers
# ---------------------------------------------------------------------------
async def _summarize_chunk(
resolved: ResolvedAgentConfig,
section_name: str,
chunk: str,
) -> str:
"""Summarize a single chunk via the Report_Summarizer_Agent LLM client.
Returns the raw text output from the model.
Raises on failure so the caller can handle retries / fallback.
"""
cfg = load_config()
client = build_llm_client(resolved, cfg.ollama, cfg.vllm)
try:
prompts = {
"system": resolved.system_prompt,
"user": f"Summarize this {section_name} data:\n{chunk}",
}
attempt = await client.call_llm(
prompts=prompts,
json_schema={}, # plain text, no structured output
document_text="",
)
if attempt.error:
raise RuntimeError(f"LLM error: {attempt.error}")
if not attempt.raw_output.strip():
raise RuntimeError("LLM returned empty response")
return attempt.raw_output.strip()
finally:
await client.close()
async def _merge_summaries(
resolved: ResolvedAgentConfig,
section_name: str,
summaries: list[str],
) -> str:
"""Merge multiple chunk summaries into a single coherent summary."""
combined = "\n\n".join(summaries)
cfg = load_config()
client = build_llm_client(resolved, cfg.ollama, cfg.vllm)
try:
prompts = {
"system": resolved.system_prompt,
"user": (
f"Merge these {section_name} summaries into a single coherent "
f"summary of no more than {MAX_SUMMARY_WORDS} words:\n{combined}"
),
}
attempt = await client.call_llm(
prompts=prompts,
json_schema={},
document_text="",
)
if attempt.error:
raise RuntimeError(f"LLM merge error: {attempt.error}")
if not attempt.raw_output.strip():
raise RuntimeError("LLM returned empty merge response")
return attempt.raw_output.strip()
finally:
await client.close()
# ---------------------------------------------------------------------------
# Section summarization
# ---------------------------------------------------------------------------
async def summarize_section(
pool: asyncpg.Pool,
resolver: AgentConfigResolver,
section_name: str,
section_data: dict,
) -> str:
"""Generate AI summary for a report section.
1. Serialize section data to JSON string
2. Chunk if > CHUNK_SIZE_LIMIT
3. Summarize each chunk via Report_Summarizer_Agent
4. If multiple chunks, merge summaries with a final LLM call
5. Log each invocation to agent_performance_log
6. On failure, fall back to deterministic summary
"""
resolved = await resolver.resolve(_REPORT_SUMMARIZER_SLUG)
if resolved is None:
logger.error(
"Report summarizer agent not found (slug=%s) — using deterministic fallback",
_REPORT_SUMMARIZER_SLUG,
)
return build_deterministic_summary(section_name, section_data)
serialized = json.dumps(section_data, indent=2, default=str)
chunks = chunk_data(serialized)
start = time.monotonic()
try:
# Summarize each chunk
chunk_summaries: list[str] = []
for chunk in chunks:
summary = await _summarize_chunk(resolved, section_name, chunk)
chunk_summaries.append(summary)
# Merge if multiple chunks
if len(chunk_summaries) > 1:
try:
final_summary = await _merge_summaries(
resolved, section_name, chunk_summaries,
)
except Exception:
# Merge failed — fall back to concatenation of chunk summaries
logger.warning(
"Chunk merge LLM call failed for section %s — concatenating summaries",
section_name,
)
final_summary = "\n".join(chunk_summaries)
else:
final_summary = chunk_summaries[0]
# Truncate to MAX_SUMMARY_WORDS at sentence boundary
words = final_summary.split()
if len(words) > MAX_SUMMARY_WORDS:
truncated = " ".join(words[:MAX_SUMMARY_WORDS])
# Try to end at a sentence boundary
last_period = truncated.rfind(".")
if last_period > len(truncated) // 2:
truncated = truncated[: last_period + 1]
final_summary = truncated
duration_ms = int((time.monotonic() - start) * 1000)
await _log_performance(
pool, resolved, True, duration_ms, serialized, final_summary,
)
return final_summary
except Exception as exc:
duration_ms = int((time.monotonic() - start) * 1000)
logger.warning(
"AI summarization failed for section %s: %s — using deterministic fallback",
section_name,
exc,
)
await _log_performance(
pool, resolved, False, duration_ms, serialized, "",
error_message=str(exc),
)
return build_deterministic_summary(section_name, section_data)
# ---------------------------------------------------------------------------
# Deterministic fallback summaries
# ---------------------------------------------------------------------------
_DETERMINISTIC_TEMPLATES: dict[str, str] = {
"pnl": (
"P&L Summary: Realized P&L ${realized_pnl}, unrealized ${unrealized_pnl}, "
"daily return {daily_return}%, win rate {win_rate}%."
),
"recommendation_accuracy": (
"Recommendation Accuracy: {total_evaluated} evaluated, "
"{act_count} acted ({acted_win_rate}% win rate), "
"{skip_count} skipped. "
"Avg confidence acted {avg_confidence_acted}, skipped {avg_confidence_skipped}."
),
"position_performance": (
"Position Performance: {position_count} positions tracked during the period."
),
"risk_metrics": (
"Risk Metrics: Risk tier {current_risk_tier}, portfolio heat {portfolio_heat}, "
"max drawdown {max_drawdown}, current drawdown {current_drawdown_pct}%, "
"reserve pool ${reserve_pool_balance}, "
"{circuit_breaker_event_count} circuit breaker events."
),
"model_quality": (
"Model Quality: {window_count} lookback windows evaluated."
),
}
def build_deterministic_summary(section_name: str, section_data: dict) -> str:
"""Build a fallback deterministic summary from raw metrics.
Produces a template-based text summary when AI summarization fails.
"""
template = _DETERMINISTIC_TEMPLATES.get(section_name)
if template is None:
# Generic fallback for unknown sections
return f"{section_name} summary: {len(section_data)} metrics reported."
try:
# Prepare template variables with safe defaults
data = dict(section_data)
# Add computed fields for templates that need them
if section_name == "position_performance":
positions = data.get("positions", [])
data["position_count"] = len(positions)
elif section_name == "model_quality":
windows = data.get("windows", [])
data["window_count"] = len(windows)
return template.format(**data)
except (KeyError, ValueError, TypeError) as exc:
logger.warning(
"Deterministic summary template failed for %s: %s",
section_name,
exc,
)
return f"{section_name} summary: data available but template formatting failed."
# ---------------------------------------------------------------------------
# Executive summary
# ---------------------------------------------------------------------------
async def generate_executive_summary(
pool: asyncpg.Pool,
resolver: AgentConfigResolver,
section_summaries: dict[str, str],
) -> str:
"""Generate executive summary from all section summaries.
Concatenates section summaries, chunks if needed, and produces
a ≤300-word synthesis via the Report_Summarizer_Agent.
Falls back to concatenated section summaries on failure.
"""
resolved = await resolver.resolve(_REPORT_SUMMARIZER_SLUG)
concatenated = "\n\n".join(
f"{name}: {summary}" for name, summary in section_summaries.items()
)
if resolved is None:
logger.error(
"Report summarizer agent not found — using concatenated summaries as executive summary",
)
return concatenated
chunks = chunk_data(concatenated)
start = time.monotonic()
try:
# Summarize chunks if needed
if len(chunks) > 1:
chunk_summaries: list[str] = []
for chunk in chunks:
summary = await _summarize_chunk(resolved, "executive", chunk)
chunk_summaries.append(summary)
input_text = "\n\n".join(chunk_summaries)
else:
input_text = chunks[0]
# Final executive summary call
cfg = load_config()
client = build_llm_client(resolved, cfg.ollama, cfg.vllm)
try:
prompts = {
"system": resolved.system_prompt,
"user": (
f"Synthesize these trading performance section summaries into "
f"a concise executive summary of no more than "
f"{MAX_EXECUTIVE_SUMMARY_WORDS} words:\n{input_text}"
),
}
attempt = await client.call_llm(
prompts=prompts,
json_schema={},
document_text="",
)
finally:
await client.close()
if attempt.error:
raise RuntimeError(f"Executive summary LLM error: {attempt.error}")
if not attempt.raw_output.strip():
raise RuntimeError("Executive summary LLM returned empty response")
executive = attempt.raw_output.strip()
# Truncate to MAX_EXECUTIVE_SUMMARY_WORDS at sentence boundary
words = executive.split()
if len(words) > MAX_EXECUTIVE_SUMMARY_WORDS:
truncated = " ".join(words[:MAX_EXECUTIVE_SUMMARY_WORDS])
last_period = truncated.rfind(".")
if last_period > len(truncated) // 2:
truncated = truncated[: last_period + 1]
executive = truncated
duration_ms = int((time.monotonic() - start) * 1000)
await _log_performance(
pool, resolved, True, duration_ms, concatenated, executive,
)
return executive
except Exception as exc:
duration_ms = int((time.monotonic() - start) * 1000)
logger.warning(
"Executive summary generation failed: %s — using concatenated summaries",
exc,
)
await _log_performance(
pool, resolved, False, duration_ms, concatenated, "",
error_message=str(exc),
)
return concatenated