Files
stonks-oracle/docs/intelligence-pipeline-deep-dive/01-data-ingestion-and-preparation.md
T
Celes Renata 88ad1e8d99 feat: comprehensive docs, unit tests, docker-compose app services
- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py)
- Add all 13 app services + dashboard to docker-compose.yml
- Add full documentation suite: API reference, Helm reference, Docker deployment guide,
  3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide,
  backup/restore guide, observability/metrics reference, per-service docs
- Add intelligence pipeline deep-dive docs with Mermaid diagrams
- Update README with documentation index and links
- Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive,
  sanitized-pipeline-docs
2026-04-22 02:56:41 +00:00

20 KiB

Page 1 — Data Ingestion and Preparation

Every signal that Stonks Oracle eventually acts on begins its life as raw data pulled from an external source. Before any AI agent can extract structured intelligence, before any trend can accumulate, and before any trade can be placed, the platform must first discover new content, fetch it reliably, eliminate duplicates, store the raw artifacts for audit, and normalize the text into a form suitable for downstream processing. This page traces that journey from external API to parser output, covering the Scheduler, Ingestion Worker, deduplication layer, raw storage, and Parser in detail.

For a visual overview of the full flow described here, see the Ingestion to Extraction Flow diagram.


Four Categories of Input Data

Stonks Oracle tracks 50 companies across 10 sectors, and it draws intelligence from four distinct categories of external data. Each category has its own adapter, its own API conventions, and its own scheduling cadence, but all of them feed into the same ingestion pipeline.

The first category is company news, sourced from the Polygon.io ticker news endpoint (/v2/reference/news). The PolygonNewsAdapter in services/adapters/news_adapter.py fetches articles linked to a specific ticker, returning structured results that include title, publisher, article URL, description, keywords, and publication timestamp. Each request can return up to 1,000 articles, though the default limit is 20 per fetch. The adapter tracks the most recent published_utc value and uses it on subsequent fetches to avoid re-retrieving articles the system has already seen.

The second category is SEC filings, sourced from the SEC EDGAR full-text search system (EFTS). The SECEdgarAdapter in services/adapters/filings_adapter.py queries the /LATEST/search-index endpoint for 8-K, 10-Q, 10-K, and other form types associated with a company's ticker or CIK number. Unlike the Polygon endpoints, EDGAR is a public API that requires no key — only a descriptive User-Agent header per the SEC's fair-access policy. The adapter deduplicates results by accession number (adsh), filters out non-primary documents like XML fragments and graphics, and constructs the SEC EDGAR filing index URL for each hit so downstream services can fetch the full document.

The third category is market data, also sourced from Polygon.io. The PolygonMarketAdapter in services/adapters/market_adapter.py supports multiple endpoints: previous-day aggregate bars (/v2/aggs/ticker/{ticker}/prev), range bars for custom date windows, intraday hourly bars, grouped daily bars that return data for all tickers in a single call (/v2/aggs/grouped/locale/us/market/stocks/{date}), and ticker detail lookups. Market data follows a different path than textual content — it does not pass through the Parser or Extractor, since the structured numeric data is already in a usable form.

The fourth category is macro and geopolitical news, fetched by the MacroNewsAdapter in services/adapters/macro_news_adapter.py. Unlike the other three categories, macro news is not company-specific. These sources have source_type='macro_news' in the sources database table and may have a NULL company_id. The adapter fetches from a configurable HTTP endpoint (typically the Polygon news API filtered for broad market topics) and returns articles that describe global events — trade policy shifts, central bank decisions, geopolitical conflicts — rather than company-specific developments. Macro news articles are eventually classified by the Global Event Classifier agent and routed through a separate queue, as described in Page 2.

All four adapter classes inherit from BaseAdapter defined in services/adapters/base.py and return an AdapterResult dataclass containing the raw payload bytes, a SHA-256 content hash, a list of parsed item dicts, HTTP metadata (status code, response time), and an error field that is None on success. This uniform interface allows the Ingestion Worker to handle all source types through a single dispatch mechanism.


The Scheduler: Orchestrating Ingestion Cycles

The Scheduler (services/scheduler/app.py) is the heartbeat of the ingestion pipeline. It runs a continuous loop that ticks every 15 seconds (SCHEDULER_TICK = 15), and on each tick it evaluates which sources are due for their next fetch. The Scheduler does not fetch data itself — it enqueues jobs onto the stonks:queue:ingestion Redis list for the Ingestion Worker to process.

Each source type has a default polling cadence defined in the DEFAULT_CADENCES dictionary:

Source Type Default Cadence
market_api 300 seconds
news_api 300 seconds
filings_api 3,600 seconds
macro_news 600 seconds
web_scrape 1,800 seconds
broker 30 seconds

Individual sources can override their cadence via the polling_interval_seconds field in their config JSONB column in the sources table. The get_cadence_for_source() function checks for this override first, falling back to the default if none is set, and enforces a minimum interval of 10 seconds.

The Scheduler determines whether a source is due by calling is_source_due(), which considers several conditions. If a source has never run before (no entry in the ingestion_runs table), it is immediately due. If the last run failed, the Scheduler respects an exponential backoff computed by compute_backoff(): the delay starts at 60 seconds (DEFAULT_BACKOFF_BASE) and doubles with each retry up to a maximum of 3,600 seconds (MAX_BACKOFF). If a source has failed 10 consecutive times (MAX_RETRY_COUNT), the Scheduler stops scheduling it entirely until an operator manually resets the retry state. If the last run is still marked as running, the source is skipped to prevent double-scheduling. Otherwise, the Scheduler checks whether enough time has elapsed since the last completed run based on the source's cadence.

Rate limiting adds another layer of protection. The check_rate_limit() function enforces two constraints. First, each source type has a per-type limit defined in DEFAULT_RATE_LIMITS — for example, market_api and news_api are each capped at 20 requests per minute, while filings_api and macro_news are capped at 10. Second, because market_api and news_api both use the same Polygon.io API key, a global Polygon rate limit of 45 requests per minute (POLYGON_GLOBAL_RATE_LIMIT) is enforced across both types combined. Rate limit state is tracked in Redis using keys of the form stonks:ratelimit:{source_type}:{window}, where the window is a minute-granularity timestamp. If a source type exceeds its limit, the Scheduler logs a warning and skips that source for the current tick.

The Scheduler handles three categories of sources in each cycle. First, it fetches all active company-specific sources (excluding macro_news) by joining the sources and companies tables. Second, it fetches active macro news sources separately, since these may not have a company_id. Third, it fetches global market sources — those with source_type='market_api' and company_id IS NULL — which represent endpoints like the grouped daily bars that return data for all tickers in a single API call. For intraday bar sources, the Scheduler expands a single global source into per-ticker jobs for every active company.

Each enqueued job payload includes the source_id, company_id, ticker, legal_name, source_type, source_name, config, credibility_score, a list of company aliases (fetched from the company_aliases table), and a scheduled_at timestamp. The job is pushed onto stonks:queue:ingestion via Redis RPUSH.

Beyond scheduling, the Scheduler also performs periodic maintenance. Every ~20 cycles (~5 minutes), it runs recover_stale_documents() to re-enqueue documents that have been stuck in parsed status for longer than 240 minutes — a safety net for cases where Redis loses queue entries due to pod restarts or OOM events. Every ~40 cycles (~10 minutes), it runs retry_failed_extractions() to give documents in extraction_failed status another chance, resetting them to parsed and deleting the failed document_intelligence row so the Extractor treats them as fresh. Every ~100 cycles (~25 minutes), it runs cleanup_all_tables() to enforce retention policies across tables like competitive_signal_records (30 days), ingestion_runs (14 days), and trading_decisions (90 days).

For more detail on the Scheduler's configuration and operational behavior, see the Services Reference.


The Ingestion Worker: Adapter Dispatch and Persistence

The Ingestion Worker (services/ingestion/worker.py) is a long-running process that continuously pops jobs from the stonks:queue:ingestion Redis list and processes them. On startup, it initializes one instance of each adapter class and stores them in a dispatch dictionary keyed by source_type:

adapters = {
    "market_api":  PolygonMarketAdapter(...),
    "news_api":    PolygonNewsAdapter(...),
    "filings_api": SECEdgarAdapter(),
    "web_scrape":  WebScrapeAdapter(),
    "broker":      AlpacaBrokerAdapter(...),
    "macro_news":  MacroNewsAdapter(...),
}

When a job arrives, the process_job() function looks up the appropriate adapter by source_type and calls its fetch() method with the ticker and source config. Before fetching, it records a new row in the ingestion_runs table with status running. If the adapter returns an error, the worker calls record_retrieval_failure() to update the run status and increment the source's retry counter with exponential backoff timing.

On a successful fetch, the worker performs several steps in sequence. First, it uploads the raw payload to MinIO via upload_raw_artifact() in services/shared/storage.py. The target bucket is determined by the source type through the SOURCE_BUCKET_MAP: market_api payloads go to stonks-raw-market, news_api and macro_news payloads go to stonks-raw-news, and filings_api payloads go to stonks-raw-filings. Objects are stored under a path that encodes the source type, ticker, date hierarchy, and document ID — for example, news_api/AAPL/2025/01/15/{run_id}/raw.json.


Content Deduplication via Redis

After storing the raw artifact, the Ingestion Worker checks for duplicate content. Deduplication operates at two levels.

At the payload level, the worker checks the overall content_hash (a SHA-256 digest of the raw API response) against Redis. The key pattern is stonks:dedupe:{content_hash} with a 24-hour TTL (86,400 seconds). If the hash is already present, the entire payload is skipped — the ingestion_runs row is marked as completed with items_new=0, and no downstream jobs are enqueued. If the hash is new, the worker sets the marker in Redis so future fetches of identical content are caught.

At the individual item level, for source types other than market_api and broker, the worker calls dedupe_items() from services/shared/dedupe.py. This function checks each item against a layered deduplication strategy. The fast path checks Redis for both content-hash markers (stonks:dedupe:{hash}) and canonical-URL markers (stonks:dedupe:url:{url_hash}), both with 24-hour TTLs. If the Redis check misses, the function falls back to PostgreSQL, querying the documents table by content_hash or canonical_url for durable cross-source matching. When a duplicate is found through the PostgreSQL fallback, the function warms the Redis cache so subsequent checks are fast.

Items identified as duplicates are not discarded entirely. If the duplicate document was originally ingested for a different company, the worker creates a cross-source mention link in the document_company_mentions table via persist_document_company_mention(). This ensures that a news article mentioning both Apple and Microsoft is linked to both companies even if it was first ingested through Apple's news source.

New (non-duplicate) items are persisted to PostgreSQL through persist_ingestion_items() in services/shared/metadata.py, which inserts rows into the documents table and records company mentions in document_company_mentions. Each new document ID is then pushed onto stonks:queue:parsing for the Parser to process. After persistence, the worker calls mark_as_seen() to set Redis dedupe markers for both the content hash and canonical URL of each new item, ensuring that the next fetch cycle's deduplication checks are fast.

On successful completion, the worker updates the ingestion_runs row with the final counts (items_fetched, items_new) and calls reset_source_retry_state() to clear any accumulated backoff from previous failures. For news-type sources (news_api and macro_news), the worker also updates the source's config JSONB column with the latest published_utc value, so the next fetch only retrieves newer articles.


The Parser: Normalization, Quality Scoring, and Routing

Documents that pass through ingestion arrive on the stonks:queue:parsing Redis list as JSON payloads containing a document_id, ticker, and source_type. The Parser Worker (services/parser/worker.py) pops these jobs and transforms raw HTML or text into normalized, quality-scored documents ready for AI extraction.

The parsing pipeline begins with HTML fetching. If the document has a URL (looked up from the documents table if not present in the job payload), the worker calls fetch_html() to retrieve the page content. SEC EDGAR URLs receive a specialized User-Agent header to comply with the SEC's fair-access policy. The raw HTML is then passed to parse_html() in services/parser/html_parser.py, which runs a multi-stage extraction pipeline.

The HTML parser first strips non-content tags — script, style, nav, footer, header, aside, iframe, and others — and removes boilerplate containers identified by CSS class or ID patterns (sidebars, ad slots, newsletter signups, social share bars, and similar UI elements). It then searches for the article body using a priority list of semantic selectors (article, [role='main'], .article-body, .post-content, and others). If no semantic match is found, it falls back to text-density scoring across candidate div, section, and td elements, selecting the block with the highest composite score based on text density, link density, paragraph count, and word count. The extracted text undergoes further cleaning: regex-based removal of residual boilerplate phrases (copyright notices, "subscribe to our newsletter" prompts, "share this article" fragments), removal of short orphan lines that are likely UI fragments, detection and collapse of repeated template blocks, and whitespace normalization.

Metadata extraction pulls the document title (from og:title or <title>), author, publisher (from og:site_name or hostname), publication date (from article:published_time or JSON-LD datePublished), canonical URL, language, description, and keywords from the HTML head elements.

If the parsed body text is shorter than 500 characters, the worker attempts to enrich it by reading the raw API payload from MinIO and extracting the Polygon article description, keywords, and author fields for the matching article. This enrichment step ensures that even articles with minimal scrapeable HTML still have enough textual content for meaningful AI extraction.

Quality scoring is performed by score_parse_quality() in services/parser/html_parser.py, which evaluates six weighted signals to produce a composite score between 0 and 0.95:

Signal Weight What It Measures
word_count 0.30 Length of extracted text (thresholds at 20, 50, 150, 300 words)
body_found 0.20 Whether a semantic article body element was located
diversity 0.15 Vocabulary richness (unique words / total words)
sentence 0.15 Presence of proper sentence structure (terminal punctuation)
paragraph 0.10 Multi-paragraph structure (blocks separated by blank lines)
metadata 0.10 Presence of title, author, publisher, and publication date

The composite score maps to a confidence label: scores below 0.35 are labeled low, scores between 0.35 and 0.65 are medium, and scores 0.65 and above are high. Documents with low confidence are marked with status low_quality in the documents table and are not enqueued for extraction — they are effectively filtered out of the pipeline at this stage.

Company mention detection runs next. The worker fetches all known aliases from the company_aliases table (plus tickers and legal names from the companies table) and calls detect_company_mentions() in services/parser/html_parser.py. The matching strategy varies by alias length: one-to-two character aliases use case-sensitive word-boundary matching to avoid false positives (the letter "A" should not match every occurrence of the word "a"), three-to-four character aliases use case-insensitive word-boundary matching (standard ticker format), and aliases of five or more characters use case-insensitive substring matching (company names and brands). Confidence scores vary by alias type: ticker matches receive 0.9, legal name matches 0.85, general aliases 0.7, and brand matches 0.6. Multiple alias hits for the same company are deduplicated, keeping the highest-confidence match and summing match counts. Detected mentions are persisted to the document_company_mentions table.

The normalized text and a structured parser output JSON (containing all metadata, quality signals, warnings, outbound links, tags, and mentions) are uploaded to the stonks-normalized MinIO bucket. The documents row is updated with the normalized storage reference, parser output reference, quality score, and confidence level.

Finally, the Parser makes a routing decision. If the document's document_type is macro_event, it is pushed onto stonks:queue:macro_classification for the Global Event Classifier agent. All other documents are pushed onto stonks:queue:extraction for the Document Intelligence Extractor agent. Both queues feed into the Extractor service described in Page 2. The job payload includes the document_id, ticker, and the first 32,000 characters of the normalized text, giving the downstream agent immediate access to the content without needing to fetch it from MinIO.

For additional detail on queue topology and data store layout, see the Data Pipeline Architecture documentation.


What Comes Next

At this point, raw data has been fetched from four external sources, deduplicated, stored in MinIO, parsed into normalized text, scored for quality, tagged with company mentions, and routed to the appropriate extraction queue. The documents sitting on stonks:queue:extraction and stonks:queue:macro_classification are clean, quality-filtered, and ready for AI processing. Page 2 — AI Agent Processing and Structured Extraction picks up the story from here, explaining how the Document Intelligence Extractor and Global Event Classifier agents use LLM inference to transform these normalized documents into the structured JSON intelligence that feeds the rest of the pipeline.