Files
stonks-oracle/docs/intelligence-pipeline-deep-dive/02-ai-agent-processing-and-extraction.md
Celes Renata 88ad1e8d99 feat: comprehensive docs, unit tests, docker-compose app services
- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py)
- Add all 13 app services + dashboard to docker-compose.yml
- Add full documentation suite: API reference, Helm reference, Docker deployment guide,
  3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide,
  backup/restore guide, observability/metrics reference, per-service docs
- Add intelligence pipeline deep-dive docs with Mermaid diagrams
- Update README with documentation index and links
- Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive,
  sanitized-pipeline-docs
2026-04-22 02:56:41 +00:00

28 KiB

Page 2 — AI Agent Processing and Structured Extraction

Documents that arrive on the stonks:queue:extraction and stonks:queue:macro_classification Redis queues are clean, quality-filtered, and normalized — but they are still unstructured text. The job of the Extractor service is to transform that text into structured JSON intelligence that the rest of the pipeline can reason about quantitatively. Two AI agents share this responsibility: the Document Intelligence Extractor handles company-specific news, filings, and transcripts, while the Global Event Classifier handles macro-level geopolitical and economic events. Both agents run through the same Ollama-based inference infrastructure, share a common JSON repair pipeline, and persist their results to PostgreSQL and MinIO for downstream consumption and audit.

This page explains how each agent works, what schemas they produce, how the system validates and repairs LLM output, how runtime configuration is resolved from the database, and how the final structured records are persisted. For a visual overview of the full flow from ingestion through extraction, see the Ingestion to Extraction Flow diagram. For reference-level detail on agent configuration and the variant management API, see the AI Agents Guide.


The Document Intelligence Extractor

The Document Intelligence Extractor is the primary AI agent in the pipeline. Registered under the slug document-extractor in the ai_agents database table, it processes every non-macro document that passes through the Parser — news articles, SEC filings, earnings transcripts, and press releases. Its purpose is to read a normalized document and produce a structured JSON object that captures the document's summary, the companies it affects, the sentiment and impact for each company, the catalysts driving that impact, and the evidence supporting the analysis.

The entry point is services/extractor/main.py, which runs a continuous worker loop polling the stonks:queue:extraction Redis list. When a job arrives, the worker extracts the document_id, ticker, and text fields from the JSON payload. If the job payload does not include the document text directly, the worker fetches it from MinIO using the normalized_storage_ref stored in the documents table — the Parser uploaded the normalized text to the stonks-normalized bucket during the previous pipeline stage (see Page 1).

The actual LLM inference is handled by OllamaClient in services/extractor/client.py. The client sends the document to a local Ollama instance via the /api/chat HTTP endpoint with stream=False and think=False. The think=False flag is a deliberate performance choice — it disables the model's chain-of-thought reasoning phase, which would otherwise add two to four minutes of latency per document. The client does not use Ollama's format parameter for structured output because of a known Ollama bug (#14645) where the format constraint is silently ignored when think=False on qwen3.5 models. Instead, the system relies on prompt engineering to produce JSON and repairs any syntax issues after the fact.

The prompt sent to the model has two parts. The system prompt, defined in services/extractor/prompts.py, establishes the model's role as a financial document analyst and sets strict output rules: return only a single JSON object, no markdown fences, no explanation text, every schema field is required, use "other" for catalyst_type when unsure, keep evidence spans under 20 words, and limit key facts to three to five items. The user prompt, built by build_extraction_prompt() in the same module, provides the document text along with document-type-specific guidance. Four guidance variants exist — one each for articles, filings, transcripts, and press releases — each calibrated to the conventions and biases of that document type. For example, the filing guidance instructs the model to preserve the precise legal language of SEC documents, while the press release guidance warns that sentiment may be biased positive and directs the model to focus on concrete metrics rather than marketing language.

The user prompt also includes a list of all tracked tickers from the companies table, along with rules for how the model should use them. If a tracked ticker appears verbatim in the text, the model must include it in the output with at least one evidence span. If the article discusses a sector or theme that clearly affects a tracked company (oil prices affecting XOM, AI chip demand affecting NVDA), the model should include that company as well. The model is explicitly told not to invent tickers that are not in the provided list. Documents longer than 8,000 characters are truncated before being included in the prompt, with a [... truncated for extraction ...] marker appended.

The OllamaClient also supports a context_window override via the Ollama num_ctx option, which can be configured per agent variant through the AgentConfigResolver mechanism described later in this page.


The ExtractionResult Schema

The structured output that the Document Intelligence Extractor produces is defined by the ExtractionResult Pydantic model in services/extractor/schemas.py. Every field is required — the model has no defaults — so the generated JSON schema forces the LLM to produce every field explicitly. The top-level fields are:

summary — a concise one-to-three sentence summary of the document's main point. This becomes the human-readable description stored in the document_intelligence table.

companies — an array of CompanyExtractionItem objects, one per affected company. Each company entry contains:

  • ticker — the stock ticker symbol (validated against a regex pattern of one to five uppercase letters).
  • company_name — the full company name as referenced in the document.
  • relevance — a float between 0.0 and 1.0 indicating how relevant the document is to this company, where 0 means tangential and 1 means the company is the primary subject.
  • sentiment — one of positive, negative, neutral, or mixed, representing the overall sentiment toward this company in the document.
  • impact_score — a float between 0.0 and 1.0 estimating the magnitude of impact, where 0 is negligible and 1 is highly material.
  • impact_horizon — one of intraday, 1d, 1d_7d, 1d_30d, 30d_90d, or 90d_plus, indicating the expected timeframe over which the impact will play out.
  • catalyst_type — exactly one of earnings, product, legal, macro, supply_chain, m_and_a, rating_change, or other. The prompt instructs the model to use other when none of the specific categories fit.
  • key_facts — a list of facts explicitly stated in the document. The prompt emphasizes that the model must not infer or fabricate facts.
  • risks — a list of risks explicitly mentioned in the document.
  • evidence_spans — short verbatim quotes from the document supporting the analysis. The prompt requests these be kept under 20 words each.

macro_themes — a list of broad economic or market themes mentioned in the document, such as rates, inflation, or ai_capex.

novelty_score — a float between 0.0 and 1.0 indicating how novel or surprising the information is. Routine earnings reports score low; unexpected regulatory actions score high. This value feeds into the novelty bonus component of the signal weighting formula described in Page 3.

confidence — a float between 0.0 and 1.0 representing the model's confidence in the accuracy of its extraction. Lower values indicate ambiguous or incomplete source text. This value becomes the confidence gate input for signal scoring.

extraction_warnings — a list of issues encountered during extraction, such as ambiguous_ticker, incomplete_text, or low_confidence. These warnings are persisted alongside the intelligence record for operational monitoring.

The JSON schema is generated programmatically from the Pydantic models via generate_json_schema() in services/extractor/schemas.py, which calls Pydantic's model_json_schema() and then inlines all $defs references so the schema is self-contained and Ollama-friendly.


The Global Event Classifier

Not all documents describe company-specific developments. Macro news articles — those tagged with document_type='macro_event' by the Parser — describe events that affect entire markets, sectors, or economies: trade wars, central bank rate decisions, commodity supply disruptions, geopolitical conflicts. These documents are routed to the stonks:queue:macro_classification Redis queue and processed by the Global Event Classifier agent, registered under the slug event-classifier in the ai_agents table.

The classifier is implemented in services/extractor/event_classifier.py. When the extractor worker in services/extractor/main.py pops a job and determines that the document type is macro_event (either because the job came from the macro queue or because the documents table records it as such), it routes the document to _process_macro_classification() instead of the standard extraction pipeline. This function calls classify_global_event(), which builds a dedicated prompt, sends it to Ollama through the same OllamaClient infrastructure, parses the response, and persists the result.

The classifier's system prompt is distinct from the extractor's. It establishes the model's role as a macro-level news classifier and includes explicit anti-hallucination rules that are critical to preventing the classifier from overreaching. The prompt states that the model should only classify articles about macro events that affect entire markets, sectors, or economies — trade wars, interest rate changes, commodity supply disruptions, regulatory changes, geopolitical conflicts, natural disasters. It explicitly lists what should not be classified as macro events: individual company earnings, lawsuits against a single company, single-company management changes, individual stock analysis, company-specific debt or bankruptcy, and product launches by one company. For these company-specific articles that were incorrectly routed, the model is instructed to set severity to "low", confidence below 0.3, and leave the affected_regions, affected_sectors, and affected_commodities arrays empty.

The user prompt, built by build_event_classification_prompt(), reinforces these anti-hallucination rules and provides additional guidance. It instructs the model to only extract facts explicitly stated in the text, to set confidence below 0.4 for vague or speculative content, to distinguish announced policy from rumored policy, and to reserve "critical" severity for events affecting multiple countries or entire global markets. Articles longer than 6,000 characters are truncated before inclusion in the prompt.

The output schema is the GlobalEvent dataclass, which contains:

  • event_types — a list of impact type strings, drawn from a fixed set: supply_disruption, demand_shift, cost_increase, regulatory_pressure, currency_impact, commodity_shock, trade_barrier, and geopolitical_risk. The model is instructed to include all applicable types rather than collapsing to a single category.
  • severity — one of low, moderate, high, or critical.
  • affected_regions — ISO 3166-1 alpha-2 country codes or region names (e.g., US, CN, EU, GB, JP). Only regions explicitly mentioned or clearly implied should be included.
  • affected_sectors — GICS sector identifiers such as Energy, Financials, Information Technology, or Industrials.
  • affected_commodities — commodity identifiers like crude_oil, natural_gas, gold, copper, wheat, lithium, or semiconductors. An empty list if no commodities are directly affected.
  • summary — a one-to-three sentence summary of the event and its market implications.
  • key_facts — facts explicitly stated in the article, limited to three to five items.
  • estimated_duration — one of short_term (days to weeks), medium_term (weeks to months), or long_term (months to years).
  • confidence — a float between 0.0 and 1.0, clamped during parsing.

Each GlobalEvent also carries a model_metadata object recording the provider (ollama), model name, prompt version (event-classification-v1), and schema version (1.0.0), plus a source_document_id linking back to the originating document.

After a successful classification, the system computes macro impact records for all tracked companies using the exposure-based interpolation engine in services/aggregation/interpolation.py. Each company's exposure profile — geographic revenue mix, supply chain regions, key input commodities, regulatory jurisdictions, and market position tier — determines how much a given macro event affects that company. Companies with non-zero macro impact scores get macro_impact_records rows persisted to PostgreSQL, and aggregation jobs are enqueued to stonks:queue:aggregation for each affected ticker. The extractor worker tracks consecutive macro classification failures and emits a critical-level alert after three consecutive failures, continuing with company-only signals in the meantime.


The JSON Repair Pipeline

LLM output is inherently unreliable at the syntactic level. Models sometimes wrap JSON in markdown fences, produce trailing commas, leave strings unterminated, or truncate output mid-object when they hit token limits. The extractor addresses this with a three-stage JSON repair pipeline implemented across services/extractor/client.py and services/extractor/schemas.py.

The first stage is a direct json.loads() call. If the raw model output is already valid JSON, no repair is needed and the pipeline moves straight to validation. This is the fast path for well-behaved model responses.

The second stage strips markdown fences. Models frequently wrap their output in ```json ... ``` blocks despite being told not to. The _strip_markdown_fences() function in services/extractor/client.py uses a regex to detect and remove these wrappers before attempting another parse.

The third stage invokes the json-repair library as a fallback. The _repair_json() function in services/extractor/client.py calls repair_json() with return_objects=False to get a repaired JSON string. This library handles a wide range of common LLM JSON errors — trailing commas, missing quotes, unescaped characters — that would otherwise require custom repair logic.

The services/extractor/schemas.py module contains an additional layer of repair logic in its own _repair_json() function, which handles cases that the library might miss. It strips non-JSON prefixes (models sometimes prepend explanatory text before the opening brace), removes control characters that break parsing, fixes trailing commas before closing brackets, and as a last resort calls _repair_truncated_json() — a state-machine parser that walks the string tracking bracket depth and string state, then appends the necessary closing tokens to complete a truncated JSON object.

For the Global Event Classifier, the _parse_classification_response() function in services/extractor/event_classifier.py reuses the same _strip_markdown_fences() and _repair_json() functions from the client module, and additionally handles the case where the model wraps the output object in a single-element list — a quirk observed with some model configurations.


Structural and Semantic Validation

Repairing JSON syntax is only the first step. The validate_extraction() function in services/extractor/schemas.py performs both structural and semantic validation on the parsed output, and the distinction between the two is important for understanding the retry logic.

Structural validation begins with normalization. The _normalize_extraction_data() function fills in missing top-level fields with sensible defaults (empty summary, empty companies array, 0.5 novelty score, 0.3 confidence), clamps numeric fields to the [0.0, 1.0] range, and normalizes per-company fields. Catalyst types that the model produces as free-text alternatives — "strategic pivot", "acquisition", "lawsuit", "inflation", "launch" — are mapped to their canonical enum values through a comprehensive alias dictionary. Impact horizons like "long-term", "short", "immediate", or "near-term" are similarly mapped to the valid set (intraday, 1d, 1d_7d, 1d_30d, 30d_90d, 90d_plus). After normalization, the data is validated against the ExtractionResult Pydantic model, which enforces type constraints, enum membership, and range bounds.

Semantic validation catches issues that are structurally valid but logically suspect. The _semantic_checks() function runs a series of cross-field consistency checks that produce either errors (which trigger a retry) or warnings (which are logged but do not block acceptance). Semantic errors include duplicate tickers across company entries, missing ticker fields, and invalid impact horizon values. Semantic warnings include empty summaries, low confidence with companies present, invalid ticker formats (not matching the one-to-five uppercase letter pattern), missing evidence spans, evidence spans that are too short (under 8 characters) or too long (over 500 characters), high impact scores with no supporting key facts, very low relevance scores, and strong sentiment paired with negligible impact scores.

When the original document text is available, the validator also performs an evidence grounding check: each evidence span is searched for in the source text (case-insensitive), and spans not found in the document are flagged with a warning. This helps detect hallucinated evidence — quotes the model fabricated rather than extracted from the actual text.

If validation produces any semantic errors, the ValidationReport is marked as invalid and the OllamaClient retry loop treats it as a failed attempt. The retry logic uses exponential backoff with configurable parameters: a base delay (default from OllamaConfig), a multiplier applied on each retry, and a maximum delay cap. The number of retries is configurable per agent through the max_retries field in the ai_agents or agent_variants table. Non-retryable errors — HTTP 400, 401, 403, 404, and 422 responses from Ollama — short-circuit the retry loop immediately, since these indicate a problem with the request itself rather than a transient model failure.

Every attempt, whether successful or not, is recorded in an ExtractionAttempt dataclass that captures the raw output, validation report, error description, duration in milliseconds, model name, and whether the error was retryable. The full list of attempts is preserved in the ExtractionResponse for audit purposes and uploaded to MinIO by the persistence layer.


The AgentConfigResolver: Hot-Swapping Models and Prompts

Both the Document Intelligence Extractor and the Global Event Classifier resolve their runtime configuration through the AgentConfigResolver in services/shared/agent_config.py. This mechanism allows operators to change models, prompts, timeouts, retry counts, and token budgets without restarting any service — changes take effect within 60 seconds.

The resolver works by querying the ai_agents and agent_variants PostgreSQL tables with a single SQL statement that uses COALESCE to prefer variant values over base agent values. When the extractor worker starts, it creates an AgentConfigResolver instance with a 60-second TTL cache and calls resolver.resolve("document-extractor") to get the active configuration. If an active variant exists for the agent (enforced by a unique partial index on agent_variants that allows at most one active variant per agent), the variant's model_name, system_prompt, temperature, max_tokens, context_window, timeout_seconds, and max_retries override the base agent's values wherever the variant provides a non-NULL value. If no active variant exists, the base agent's configuration is used. If the database query fails entirely, the resolver returns None and the worker falls back to environment-variable-based OllamaConfig defaults.

The resolved configuration is captured in a ResolvedAgentConfig frozen dataclass that includes the agent_id, variant_id (if any), model_provider, model_name, system_prompt, user_prompt_template, prompt_version, temperature, max_tokens, context_window, input_token_limit, token_budget, timeout_seconds, and max_retries. The extractor worker uses this to build an OllamaConfig that is passed to the OllamaClient.

The 60-second TTL cache means the resolver only hits the database once per minute per agent slug. Cache entries are keyed by slug and timestamped with time.monotonic(). When a cached entry expires, the next resolve() call re-queries the database and refreshes the cache. The invalidate() method can clear a single slug or the entire cache, though in practice the TTL-based expiry is sufficient for normal operations.

The extractor worker re-resolves its configuration every 100 jobs. If the resolved model name has changed (for example, because an operator activated a variant that uses a different model), the worker closes the old OllamaClient and creates a new one with the updated configuration. The event classifier is resolved separately and can use a different model than the document extractor — the worker maintains two independent OllamaClient instances when the models differ.

Token budget enforcement adds another layer of control. If a variant specifies a token_budget (total tokens per hour), the worker checks the agent_performance_log table before each invocation to see whether the budget has been exceeded. If so, the invocation is skipped entirely. Input token limits work similarly: if a variant sets an input_token_limit, the worker truncates the document text to approximately that many tokens (estimated at four characters per token) before sending it to the model.

For a complete guide to creating variants, activating them, and comparing their performance, see the AI Agents Guide.


Persistence: From Extraction to Database

Once the LLM produces a valid extraction and it passes validation, the persist_extraction() function in services/extractor/worker.py orchestrates the full persistence pipeline. This function writes to both MinIO (for audit) and PostgreSQL (for downstream consumption), ensuring that every extraction attempt is fully traceable.

The MinIO persistence layer uploads four artifacts per extraction, all stored under date-partitioned paths in dedicated buckets. The prompt metadata (prompt version, schema version, model name) goes to stonks-llm-prompts. The raw model output for every attempt — including failed ones — goes to stonks-llm-results, preserving the full retry history. A validation report summarizing the final attempt's status, errors, and warnings is uploaded alongside the raw output. On success, the final parsed intelligence object (the ExtractionResult serialized as JSON) is uploaded to a separate path for easy retrieval.

The PostgreSQL persistence writes to two tables. The document_intelligence table receives one row per document, containing the summary, macro themes, novelty score, source credibility, extraction warnings, confidence, model metadata (provider, model name, prompt version, schema version), references to the MinIO artifacts (raw output ref, prompt ref), validation status (valid or failed), validation errors, and retry count. This row is the authoritative record of what the AI extracted from the document.

The document_impact_records table receives one row per company mention within the extraction. Each impact record is linked to the parent document_intelligence row via intelligence_id and to the companies table via company_id. The record captures the ticker, relevance, sentiment, impact score, impact horizon, catalyst type, key facts, risks, and evidence spans for that specific company. The company_id is resolved from a ticker-to-UUID mapping that the worker maintains by querying the companies table (refreshed every 100 jobs). If a ticker in the extraction output does not match any tracked company, the impact record is skipped with a warning — the system only persists impact records for companies in its tracked universe.

After persisting the intelligence and impact records, the worker updates the document's status in the documents table to extracted (or extraction_failed if all retry attempts were exhausted). Even failed extractions get a document_intelligence row with validation_status='failed', empty summary, zero confidence, and the accumulated error messages — this ensures the failure is visible in the database rather than silently lost.

Performance metrics are collected for every extraction via collect_metrics() in services/extractor/metrics.py and persisted to a metrics table. Prometheus counters and histograms track extraction attempts, duration, retries, confidence distribution, validation errors, and estimated token usage (input and output, estimated at four characters per token). When a resolved agent config is available, the worker also logs to the agent_performance_log table with variant attribution, enabling the A/B comparison queries described in the AI Agents Guide.

For the Global Event Classifier, persistence follows a parallel path. The prompt and raw output are uploaded to MinIO under an event_classification/macro/ path prefix. The parsed GlobalEvent is persisted to the global_events PostgreSQL table, which stores the event types, severity, affected regions, affected sectors, affected commodities, summary, key facts, estimated duration, confidence, source document ID, and model metadata. Downstream, the macro interpolation engine computes macro_impact_records for each affected company and persists those as well.


Enqueuing Aggregation Jobs

The final step in the extraction pipeline is to notify the downstream aggregation engine that new intelligence is available. After a successful document extraction, the worker pushes a job onto the stonks:queue:aggregation Redis list containing the ticker of the affected company. The aggregation engine (described in Page 3) will pick up this job and recompute the weighted signals and trend summaries for that ticker, incorporating the freshly extracted intelligence.

For macro events, the enqueue logic is more expansive. After the Global Event Classifier produces a GlobalEvent and the interpolation engine computes macro impact records, the worker enqueues an aggregation job for every ticker that received a non-zero macro impact score. A single macro event — say, a new tariff announcement affecting the Energy and Industrials sectors — can trigger aggregation recomputation for dozens of tickers simultaneously. The aggregation job payload includes both the ticker and the macro_event_id, so the aggregation engine knows to incorporate the new macro signals.

The worker alternates between the extraction and macro classification queues to prevent starvation: every third job is pulled from stonks:queue:macro_classification, with the remaining two-thirds from stonks:queue:extraction. If the preferred queue is empty, the worker falls back to the other queue, ensuring that neither pipeline stalls while the other has work available.


What Comes Next

At this point, documents have been transformed from unstructured text into structured JSON intelligence — ExtractionResult objects for company-specific documents and GlobalEvent objects for macro news. These structured records are persisted in PostgreSQL and their tickers have been enqueued for aggregation. But raw extraction output is not yet actionable for trading decisions. The extraction tells us that a document is bearish for AAPL with an impact score of 0.7 and a confidence of 0.8, but it does not tell us how much weight that signal should carry relative to other signals about AAPL, or how it compares to signals from different sources, time periods, or market conditions. Page 3 — Signal Scoring and the WeightedSignal Abstraction picks up the story from here, explaining how the aggregation engine transforms these raw extraction outputs into weighted signals through confidence gating, recency decay, source credibility scoring, novelty bonuses, and market context multipliers.