# Data Pipeline Architecture — Stonks Oracle This document describes the end-to-end data pipeline from external data sources through signal processing to trade execution. The pipeline is queue-driven, with Redis lists connecting each stage and PostgreSQL/MinIO providing durable storage at every step. All queue names follow the convention `stonks:queue:` (see `services/shared/redis_keys.py`). Dead-letter queues mirror the pattern as `stonks:dlq:`. ## Pipeline Overview ```mermaid flowchart TB %% ── External Data Sources ───────────────────────────────────── subgraph sources ["External Data Sources"] direction LR polygon["Polygon.io
News, Market Bars,
Grouped Daily
"] sec["SEC EDGAR
10-K, 10-Q Filings"] macro_src["Macro News APIs
Geopolitical &
Economic Events
"] market_src["Market Data API
Intraday Bars,
Grouped Daily
"] end %% ── Scheduler ───────────────────────────────────────────────── scheduler["Scheduler
services.scheduler.app
Cadence polling, rate limiting,
backoff, stale recovery,
periodic aggregation,
report scheduling"] sources -.->|"API polling
on cadence"| scheduler %% ── Ingestion Queue ─────────────────────────────────────────── q_ingestion[["stonks:queue:ingestion"]] scheduler -->|"rpush job
(company, macro,
global market)"| q_ingestion %% ── Ingestion Worker ────────────────────────────────────────── ingestion["Ingestion
services.ingestion.worker
Adapter dispatch, dedupe,
raw artifact upload"] q_ingestion -->|"lpop"| ingestion %% ── Raw Storage ─────────────────────────────────────────────── minio_raw[("MinIO
Raw Artifacts
JSON / HTML")] pg_docs[("PostgreSQL
documents,
ingestion_runs
")] redis_dedupe[("Redis
Dedupe Markers
stonks:dedupe:*")] ingestion -->|"upload raw payload"| minio_raw ingestion -->|"persist metadata"| pg_docs ingestion -->|"set content hash"| redis_dedupe %% ── Parsing Queue ───────────────────────────────────────────── q_parsing[["stonks:queue:parsing"]] ingestion -->|"rpush
(news, filings,
web_scrape, macro)"| q_parsing %% ── Parser Worker ───────────────────────────────────────────── parser["Parser
services.parser.worker
HTML parsing, quality scoring,
company mention detection"] q_parsing -->|"lpop"| parser minio_norm[("MinIO
Normalized Text
Parser Output JSON")] parser -->|"upload normalized text
+ structured output"| minio_norm parser -->|"update document status,
insert mentions"| pg_docs ``` ## Three Signal Layers The parser routes documents into two extraction paths based on `document_type`. All three signal layers converge at the aggregation stage through the shared `WeightedSignal` abstraction. ```mermaid flowchart TB %% ── Parser Output ───────────────────────────────────────────── parser(("Parser")) %% ── Extraction Queues ───────────────────────────────────────── q_extraction[["stonks:queue:extraction"]] q_macro[["stonks:queue:macro_classification"]] parser -->|"rpush
(standard docs)"| q_extraction parser -->|"rpush
(macro_event docs)"| q_macro %% ── Scheduler Recovery ──────────────────────────────────────── scheduler_recovery(("Scheduler
stale recovery &
failed retry
")) scheduler_recovery -.->|"re-enqueue orphaned
parsed docs"| q_extraction scheduler_recovery -.->|"re-enqueue orphaned
macro docs"| q_macro %% ── Extractor Worker ────────────────────────────────────────── subgraph extractor_svc ["Extractor Service"] direction TB ext_main["Extractor
services.extractor.main
Alternates between queues
(2 extraction : 1 macro)
Token budget enforcement"] end q_extraction -->|"lpop"| ext_main q_macro -->|"lpop"| ext_main %% ── Ollama LLM ─────────────────────────────────────────────── ollama["Ollama / vLLM
LLM Inference
document-extractor agent
event-classifier agent"] ext_main <-->|"HTTP /api/generate
(AgentConfigResolver
selects model + variant)"| ollama %% ── Signal Layer 1: Company ─────────────────────────────────── subgraph layer1 ["Layer 1 — Company Signals"] direction LR di["document_intelligence
document_impact_records"] end ext_main -->|"persist extraction
(standard docs)"| di %% ── Signal Layer 2: Macro ───────────────────────────────────── subgraph layer2 ["Layer 2 — Macro Signals"] direction LR ge["global_events"] mir["macro_impact_records
per-company interpolation
via exposure profiles
"] ge --> mir end ext_main -->|"classify & persist
(macro_event docs)"| ge ext_main -->|"compute_macro_impact
for all tracked companies"| mir %% ── Aggregation Queue ───────────────────────────────────────── q_agg[["stonks:queue:aggregation"]] ext_main -->|"rpush
(per ticker)"| q_agg %% ── Scheduler Periodic Aggregation ──────────────────────────── scheduler_agg(("Scheduler
periodic aggregation
every ~15 min
")) scheduler_agg -.->|"rpush all
active tickers"| q_agg %% ── Aggregation Worker ──────────────────────────────────────── aggregation["Aggregation
services.aggregation.main
Trend windows, scoring,
contradiction detection"] q_agg -->|"lpop"| aggregation %% ── Signal Layer 3: Competitive ────────────────────────────── subgraph layer3 ["Layer 3 — Competitive Signals"] direction LR pm["pattern_matcher
historical patterns"] sp["signal_propagation
cross-company signals"] csr["competitive_signal_records"] pm --> sp --> csr end aggregation -->|"trigger_signal_propagation
(when competitive_enabled)"| layer3 %% ── All layers merge ────────────────────────────────────────── pg_trends[("PostgreSQL
trend_windows,
trend_history,
trend_projections
")] di -->|"WeightedSignal"| aggregation mir -->|"WeightedSignal"| aggregation csr -->|"WeightedSignal"| aggregation aggregation -->|"persist trend summaries"| pg_trends ``` ## Recommendation → Trading → Broker The recommendation worker consumes from the recommendation queue. The trading engine does **not** consume from a queue — it polls the `recommendations` table in PostgreSQL on a configurable interval, evaluates each recommendation through its decision pipeline, and pushes "act" decisions to the broker queue. ```mermaid flowchart TB %% ── Recommendation Queue ────────────────────────────────────── q_rec[["stonks:queue:recommendation"]] aggregation(("Aggregation")) -->|"rpush
(ticker + window,
dedup 5 min TTL)"| q_rec %% ── Recommendation Worker ───────────────────────────────────── recommendation["Recommendation
services.recommendation.main
Eligibility, suppression,
thesis generation"] q_rec -->|"lpop"| recommendation ollama_thesis["Ollama / vLLM
thesis-rewriter agent
(AgentConfigResolver
selects model + variant)"] recommendation <-->|"rewrite thesis
(trading-eligible only)"| ollama_thesis pg_recs[("PostgreSQL
recommendations,
recommendation_evidence,
risk_evaluations
")] recommendation -->|"persist recommendation
+ evidence + risk eval"| pg_recs %% ── Lake Publication (inline) ───────────────────────────────── minio_rec_lake[("MinIO
Lakehouse
recommendation facts")] recommendation -->|"publish_recommendation_facts
(Parquet)"| minio_rec_lake %% ── Trading Engine ──────────────────────────────────────────── subgraph trading_loop ["Trading Engine Decision Loop"] direction TB poll["Poll recommendations
action IN (buy, sell)
mode IN (paper, live)
generated_at > last_poll
"] dedup_check["Redis dedup check
stonks:dedupe:trading:*"] evaluate["evaluate_recommendation
Circuit breaker check
Trading window check
Confidence gate
Sector exposure check
Correlation check
Earnings blackout
Max positions check
"] size["Position sizing
Kelly criterion,
risk tier limits,
micro-trade support
"] decide{{"Decision"}} poll --> dedup_check --> evaluate --> size --> decide end pg_recs -->|"SELECT recent
recommendations"| poll %% ── Broker Queue ────────────────────────────────────────────── q_broker[["stonks:queue:broker_orders"]] decide -->|"act → rpush
order job"| q_broker decide -->|"skip → persist
decision only"| pg_decisions pg_decisions[("PostgreSQL
trading_decisions")] %% ── Manual Override ─────────────────────────────────────────── trading_api(("Trading API
POST /override/order")) trading_api -->|"rpush
manual order"| q_broker %% ── Broker Adapter ──────────────────────────────────────────── broker["Broker Adapter
services.adapters.broker_service
Idempotency, risk evaluation,
approval gate, order submission,
fill tracking, position sync"] q_broker -->|"lpop"| broker %% ── Risk Engine ─────────────────────────────────────────────── risk["Risk Engine
services.risk.app
evaluate_order()
Position limits, sector exposure,
daily loss caps, approval workflow"] broker -->|"evaluate order
(inline call)"| risk %% ── Alpaca ──────────────────────────────────────────────────── alpaca["Alpaca
Paper Trading API
Order submission,
position sync,
account state"] broker <-->|"submit order /
sync positions /
sync order status"| alpaca pg_orders[("PostgreSQL
orders, order_events,
positions,
portfolio_snapshots,
broker_accounts
")] broker -->|"persist order,
events, positions"| pg_orders %% ── Lake Publication (broker inline) ────────────────────────── minio_broker_lake[("MinIO
Lakehouse
order + fill + position facts")] broker -->|"publish_trade_order
publish_trade_fill
publish_positions_daily_batch
(Parquet)"| minio_broker_lake %% ── Notifications ───────────────────────────────────────────── subgraph notifications ["Notifications"] direction LR sns["AWS SNS
SMS alerts"] gmail["Gmail SMTP
Email alerts"] end trading_loop -->|"circuit breaker trips,
order fills,
stop-loss triggers"| notifications ``` ## Analytical Branch — Lake Publisher The lake publisher runs as a separate worker, consuming from its own queue and writing partitioned Parquet fact tables to MinIO for analytical queries. Some services (broker adapter, recommendation worker) also publish facts directly to MinIO inline, bypassing the queue. ```mermaid flowchart LR %% ── Lake Publish Queue ──────────────────────────────────────── q_lake[["stonks:queue:lake_publish"]] various(("Upstream Services
via enqueue_lake_job()")) various -->|"rpush job
(job_type + entity_id)"| q_lake %% ── Lake Publisher Worker ───────────────────────────────────── lake["Lake Publisher
services.lake_publisher.jobs
Transforms operational data
into analytical facts
15 job types supported"] q_lake -->|"lpop"| lake pg_source[("PostgreSQL
Operational Tables
documents, extractions,
orders, positions, events,
global_events, macro_impacts,
competitive_signals")] lake -->|"query source data"| pg_source %% ── MinIO Parquet ───────────────────────────────────────────── minio_lake[("MinIO
Lakehouse Bucket
Partitioned Parquet
/year=/month=/day=")] lake -->|"write Parquet files"| minio_lake %% ── Inline Publishers ───────────────────────────────────────── inline(("Inline Publishers
broker adapter,
recommendation worker
")) inline -->|"publish_* functions
(direct Parquet write)"| minio_lake %% ── Trino ───────────────────────────────────────────────────── trino["Trino
SQL Query Engine
Hive connector → MinIO"] minio_lake -->|"read via
Hive Metastore"| trino hive["Hive Metastore
Schema catalog"] trino <-->|"table metadata"| hive hive -->|"location refs"| minio_lake %% ── Visualization ───────────────────────────────────────────── superset["Superset
Dashboards &
SQL Lab
"] dashboard["React Dashboard
frontend
Charts, portfolio,
recommendations"] query_api["Query API
services.api.app"] trino --> superset trino --> query_api query_api --> dashboard ``` ## Report Generation The scheduler manages report generation as a sub-loop, enqueuing daily and weekly report jobs to a dedicated queue and consuming them inline. ```mermaid flowchart LR scheduler["Scheduler
report schedule check
daily @ 16:30 ET
weekly @ Saturday"] q_report[["stonks:queue:report_generation"]] scheduler -->|"rpush
(daily/weekly)"| q_report scheduler_consumer["Scheduler
report consumer loop
pops up to 5 jobs/cycle"] q_report -->|"lpop"| scheduler_consumer generator["Report Generator
services.reporting.generator"] scheduler_consumer -->|"process_report_job()"| generator pg_reports[("PostgreSQL
trading_reports")] generator -->|"persist report"| pg_reports ``` ## Complete Queue Topology | Queue | Full Key | Producer(s) | Consumer | |-------|----------|-------------|----------| | Ingestion | `stonks:queue:ingestion` | Scheduler (company, macro, global market sources) | Ingestion Worker | | Parsing | `stonks:queue:parsing` | Ingestion Worker (news, filings, web_scrape, macro) | Parser Worker | | Extraction | `stonks:queue:extraction` | Parser (standard docs), Scheduler (stale recovery) | Extractor Worker | | Macro Classification | `stonks:queue:macro_classification` | Parser (macro_event docs), Scheduler (stale/failed recovery) | Extractor Worker | | Aggregation | `stonks:queue:aggregation` | Extractor Worker (per ticker), Scheduler (periodic, all tickers) | Aggregation Worker | | Recommendation | `stonks:queue:recommendation` | Aggregation Worker (ticker + window, 5 min dedup TTL) | Recommendation Worker | | Broker Orders | `stonks:queue:broker_orders` | Trading Engine (act decisions), Trading API (manual overrides) | Broker Adapter | | Lake Publish | `stonks:queue:lake_publish` | Various services (via `enqueue_lake_job()`) | Lake Publisher | | Report Generation | `stonks:queue:report_generation` | Scheduler (daily/weekly triggers) | Scheduler (inline consumer) | Dead-letter queues follow the pattern `stonks:dlq:` and are populated when a job exhausts its retry budget. ## Data Store Summary | Store | Role | Key Tables / Buckets | |-------|------|---------------------| | **PostgreSQL** | Structured operational data | `documents`, `document_intelligence`, `document_impact_records`, `document_company_mentions`, `global_events`, `macro_impact_records`, `exposure_profiles`, `competitive_signal_records`, `competitor_relationships`, `trend_windows`, `trend_history`, `trend_projections`, `recommendations`, `recommendation_evidence`, `risk_evaluations`, `orders`, `order_events`, `positions`, `portfolio_snapshots`, `trading_decisions`, `circuit_breaker_events`, `reserve_pool_ledger`, `risk_tier_history`, `broker_accounts`, `ingestion_runs`, `sources`, `companies`, `company_aliases`, `ai_agents`, `agent_variants`, `agent_performance_log`, `risk_configs`, `trading_reports` | | **Redis** | Queues, dedup markers, rate limits, circuit breaker state, pipeline toggle | `stonks:queue:*` (9 queues), `stonks:dedupe:*`, `stonks:dedupe:trading:*`, `stonks:ratelimit:*`, `stonks:trading:circuit_breaker:*`, `stonks:trading:notification_rate:*`, `stonks:order_idempotency:*`, `stonks:lock:*`, `stonks:cache:*`, `stonks:retry:*`, `stonks:rec_dedup:*`, `stonks:pipeline:enabled`, `stonks:dlq:*` | | **MinIO** | Object storage for raw artifacts, normalized text, and analytical Parquet files | Raw artifacts bucket, normalized text bucket, parser output bucket, lakehouse bucket (partitioned Parquet: documents, extractions, market bars/quotes, orders, fills, positions, PnL, global events, macro impacts, trend projections, competitive signals, competitor relationships, recommendations) | ## External Integration Points | Integration | Service | Protocol | Purpose | |-------------|---------|----------|---------| | **Polygon.io** | Ingestion (via PolygonNewsAdapter, PolygonMarketAdapter) | HTTPS REST | News articles, market bars, grouped daily data, intraday bars | | **SEC EDGAR** | Ingestion (via SECEdgarAdapter) | HTTPS REST | 10-K, 10-Q filings | | **Macro News** | Ingestion (via MacroNewsAdapter) | HTTPS REST | Geopolitical and economic event articles | | **Ollama / vLLM** | Extractor, Recommendation | HTTP `/api/generate` | LLM inference for document extraction (document-extractor agent), event classification (event-classifier agent), thesis rewriting (thesis-rewriter agent). Model and variant selected via `AgentConfigResolver` with 60s TTL cache. | | **Alpaca** | Broker Adapter | HTTPS REST | Paper/live trading: order submission, position sync, account state, order status polling | | **AWS SNS** | Trading Engine (notifications) | boto3 SDK | SMS alerts for circuit breaker trips, order fills, stop-loss triggers | | **Gmail** | Trading Engine (notifications) | SMTP (port 587 STARTTLS) | Email alerts for trading events | | **Trino** | Query API, Superset | HTTP | SQL queries over lakehouse Parquet files via Hive Metastore | ## Pipeline Toggle The pipeline can be paused globally via the Redis key `stonks:pipeline:enabled`. When set to `"0"`, all queue workers (ingestion, parser, extractor, aggregation, recommendation, broker adapter, lake publisher) enter a sleep loop and stop processing jobs. The scheduler also skips scheduling cycles when the toggle is off. The toggle can be set via the Query API's pipeline control endpoints. Setting `PIPELINE_DEFAULT_OFF=true` on the scheduler initializes the toggle to OFF on first boot, useful for staged deployments where you want to verify infrastructure before enabling the pipeline.