# Data Pipeline Architecture — Stonks Oracle This document describes the end-to-end data pipeline from external data sources through signal processing to trade execution. The pipeline is queue-driven, with Redis lists connecting each stage and PostgreSQL/MinIO providing durable storage at every step. All queue names follow the convention `stonks:queue:` (see `services/shared/redis_keys.py`). Dead-letter queues mirror the pattern as `stonks:dlq:`. ## Pipeline Overview ```mermaid flowchart TB %% ── External Data Sources ───────────────────────────────────── subgraph sources ["External Data Sources"] direction LR polygon["Polygon.io
News, Market Bars,
Grouped Daily
"] sec["SEC EDGAR
10-K, 10-Q Filings"] macro_src["Macro News APIs
Geopolitical &
Economic Events
"] market_src["Market Data API
Intraday Bars,
Grouped Daily
"] end %% ── Scheduler ───────────────────────────────────────────────── scheduler["Scheduler
services.scheduler.app
Cadence polling, rate limiting,
backoff & stale recovery"] sources -.->|"API polling
on cadence"| scheduler %% ── Ingestion Queue ─────────────────────────────────────────── q_ingestion[["stonks:queue:ingestion"]] scheduler -->|"rpush job"| q_ingestion %% ── Ingestion Worker ────────────────────────────────────────── ingestion["Ingestion
services.ingestion.worker
Adapter dispatch, dedupe,
raw artifact upload"] q_ingestion -->|"lpop"| ingestion %% ── Raw Storage ─────────────────────────────────────────────── minio_raw[("MinIO
Raw Artifacts
JSON / HTML")] pg_docs[("PostgreSQL
documents,
ingestion_runs
")] redis_dedupe[("Redis
Dedupe Markers
stonks:dedupe:*")] ingestion -->|"upload raw payload"| minio_raw ingestion -->|"persist metadata"| pg_docs ingestion -->|"set content hash"| redis_dedupe %% ── Parsing Queue ───────────────────────────────────────────── q_parsing[["stonks:queue:parsing"]] ingestion -->|"rpush
(news, filings,
web_scrape)"| q_parsing %% ── Parser Worker ───────────────────────────────────────────── parser["Parser
services.parser.worker
HTML parsing, quality scoring,
company mention detection"] q_parsing -->|"lpop"| parser minio_norm[("MinIO
Normalized Text
Parser Output JSON")] parser -->|"upload normalized text"| minio_norm parser -->|"update document status,
insert mentions"| pg_docs ``` ## Three Signal Layers The parser routes documents into two extraction paths based on `document_type`. All three signal layers converge at the aggregation stage through the shared `WeightedSignal` abstraction. ```mermaid flowchart TB %% ── Parser Output ───────────────────────────────────────────── parser(("Parser")) %% ── Extraction Queues ───────────────────────────────────────── q_extraction[["stonks:queue:extraction"]] q_macro[["stonks:queue:macro_classification"]] parser -->|"rpush
(standard docs)"| q_extraction parser -->|"rpush
(macro_event docs)"| q_macro %% ── Extractor Worker ────────────────────────────────────────── subgraph extractor_svc ["Extractor Service"] direction TB ext_main["Extractor
services.extractor.main
Alternates between queues
(2 extraction : 1 macro)"] end q_extraction -->|"lpop"| ext_main q_macro -->|"lpop"| ext_main %% ── Ollama LLM ─────────────────────────────────────────────── ollama["Ollama
LLM Inference
document-extractor agent
event-classifier agent"] ext_main <-->|"HTTP /api/generate"| ollama %% ── Signal Layer 1: Company ─────────────────────────────────── subgraph layer1 ["Layer 1 — Company Signals"] direction LR di["document_intelligence
document_impact_records"] end ext_main -->|"persist extraction
(standard docs)"| di %% ── Signal Layer 2: Macro ───────────────────────────────────── subgraph layer2 ["Layer 2 — Macro Signals"] direction LR ge["global_events"] mir["macro_impact_records
per-company interpolation"] ge --> mir end ext_main -->|"classify & persist
(macro_event docs)"| ge ext_main -->|"compute_macro_impact
for all tracked companies"| mir %% ── Aggregation Queue ───────────────────────────────────────── q_agg[["stonks:queue:aggregation"]] ext_main -->|"rpush
(per ticker)"| q_agg %% ── Aggregation Worker ──────────────────────────────────────── aggregation["Aggregation
services.aggregation.main
Trend windows, scoring,
contradiction detection"] q_agg -->|"lpop"| aggregation %% ── Signal Layer 3: Competitive ────────────────────────────── subgraph layer3 ["Layer 3 — Competitive Signals"] direction LR pm["pattern_matcher
historical patterns"] sp["signal_propagation
cross-company signals"] csr["competitive_signal_records"] pm --> sp --> csr end aggregation -->|"trigger_signal_propagation
(when competitive_enabled)"| layer3 %% ── All layers merge ────────────────────────────────────────── pg_trends[("PostgreSQL
trend_windows,
trend_history,
trend_projections
")] di -->|"WeightedSignal"| aggregation mir -->|"WeightedSignal"| aggregation csr -->|"WeightedSignal"| aggregation aggregation -->|"persist trend summaries"| pg_trends ``` ## Recommendation → Trading → Broker ```mermaid flowchart TB %% ── Recommendation Queue ────────────────────────────────────── q_rec[["stonks:queue:recommendation"]] aggregation(("Aggregation")) -->|"rpush
(ticker + window,
dedup 5 min TTL)"| q_rec %% ── Recommendation Worker ───────────────────────────────────── recommendation["Recommendation
services.recommendation.main
Eligibility, suppression,
thesis generation"] q_rec -->|"lpop"| recommendation ollama_thesis["Ollama
thesis-rewriter agent
(optional LLM rewrite)"] recommendation <-->|"rewrite thesis
(trading-eligible only)"| ollama_thesis pg_recs[("PostgreSQL
recommendations,
recommendation_evidence,
risk_evaluations
")] recommendation -->|"persist recommendation
+ evidence + risk eval"| pg_recs %% ── Trading Engine ──────────────────────────────────────────── subgraph trading_loop ["Trading Engine Decision Loop"] direction TB poll["Poll recommendations
action IN (buy, sell)
mode IN (paper, live)
generated_at > last_poll
"] dedup_check["Redis dedup check
stonks:dedupe:trading:*"] evaluate["evaluate_recommendation
Circuit breaker check
Trading window check
Confidence gate
Sector exposure check
Correlation check
Earnings blackout
"] size["Position sizing
Kelly criterion,
risk tier limits
"] decide{{"Decision"}} poll --> dedup_check --> evaluate --> size --> decide end pg_recs -->|"SELECT recent
recommendations"| poll %% ── Broker Queue ────────────────────────────────────────────── q_broker[["stonks:queue:broker_orders"]] decide -->|"act → rpush
order job"| q_broker decide -->|"skip → persist
decision only"| pg_decisions pg_decisions[("PostgreSQL
trading_decisions")] %% ── Broker Adapter ──────────────────────────────────────────── broker["Broker Adapter
services.adapters.broker_service
Risk evaluation, idempotency,
order submission, fill tracking"] q_broker -->|"lpop"| broker %% ── Risk Engine ─────────────────────────────────────────────── risk["Risk Engine
services.risk.app
POST /evaluate
Approval workflow"] broker <-->|"evaluate order"| risk %% ── Alpaca ──────────────────────────────────────────────────── alpaca["Alpaca
Paper Trading API
Order submission,
position sync"] broker <-->|"submit order /
sync positions"| alpaca pg_orders[("PostgreSQL
orders, order_events,
positions,
portfolio_snapshots
")] broker -->|"persist order,
events, positions"| pg_orders %% ── Notifications ───────────────────────────────────────────── subgraph notifications ["Notifications"] direction LR sns["AWS SNS
SMS alerts"] gmail["Gmail SMTP
Email alerts"] end trading_loop -->|"circuit breaker trips,
order fills,
stop-loss triggers"| notifications ``` ## Analytical Branch — Lake Publisher The lake publisher runs as a separate worker, consuming from its own queue and writing partitioned Parquet fact tables to MinIO for analytical queries. ```mermaid flowchart LR %% ── Lake Publish Queue ──────────────────────────────────────── q_lake[["stonks:queue:lake_publish"]] various(("Various Services
ingestion, extractor,
recommendation,
broker adapter
")) various -->|"enqueue_lake_job"| q_lake %% ── Lake Publisher Worker ───────────────────────────────────── lake["Lake Publisher
services.lake_publisher.jobs
Transforms operational data
into analytical facts"] q_lake -->|"lpop"| lake pg_source[("PostgreSQL
Operational Tables
documents, extractions,
orders, positions, events")] lake -->|"query source data"| pg_source %% ── MinIO Parquet ───────────────────────────────────────────── minio_lake[("MinIO
Lakehouse Bucket
Partitioned Parquet
/year=/month=/day=")] lake -->|"write Parquet files"| minio_lake %% ── Trino ───────────────────────────────────────────────────── trino["Trino
SQL Query Engine
Hive connector → MinIO"] minio_lake -->|"read via
Hive Metastore"| trino hive["Hive Metastore
Schema catalog"] trino <-->|"table metadata"| hive hive -->|"location refs"| minio_lake %% ── Visualization ───────────────────────────────────────────── superset["Superset
Dashboards &
SQL Lab
"] dashboard["React Dashboard
frontend
Charts, portfolio,
recommendations"] query_api["Query API
services.api.app"] trino --> superset trino --> query_api query_api --> dashboard ``` ## Complete Queue Topology | Queue | Full Key | Producer(s) | Consumer | |-------|----------|-------------|----------| | Ingestion | `stonks:queue:ingestion` | Scheduler | Ingestion Worker | | Parsing | `stonks:queue:parsing` | Ingestion Worker | Parser Worker | | Extraction | `stonks:queue:extraction` | Parser (standard docs) | Extractor Worker | | Macro Classification | `stonks:queue:macro_classification` | Parser (macro_event docs), Scheduler | Extractor Worker | | Aggregation | `stonks:queue:aggregation` | Extractor Worker | Aggregation Worker | | Recommendation | `stonks:queue:recommendation` | Aggregation Worker | Recommendation Worker | | Broker Orders | `stonks:queue:broker_orders` | Trading Engine, Trading API (manual overrides) | Broker Adapter | | Lake Publish | `stonks:queue:lake_publish` | Various services | Lake Publisher | Dead-letter queues follow the pattern `stonks:dlq:` and are populated when a job exhausts its retry budget. ## Data Store Summary | Store | Role | Key Tables / Buckets | |-------|------|---------------------| | **PostgreSQL** | Structured operational data | `documents`, `document_intelligence`, `document_impact_records`, `global_events`, `macro_impact_records`, `competitive_signal_records`, `trend_windows`, `trend_history`, `trend_projections`, `recommendations`, `recommendation_evidence`, `risk_evaluations`, `orders`, `order_events`, `positions`, `portfolio_snapshots`, `trading_decisions` | | **Redis** | Queues, dedup markers, rate limits, circuit breaker state | `stonks:queue:*`, `stonks:dedupe:*`, `stonks:ratelimit:*`, `stonks:trading:circuit_breaker:*`, `stonks:dlq:*` | | **MinIO** | Object storage for raw artifacts, normalized text, and analytical Parquet files | Raw artifacts bucket, normalized text bucket, lakehouse bucket (partitioned Parquet) | ## External Integration Points | Integration | Service | Protocol | Purpose | |-------------|---------|----------|---------| | **Polygon.io** | Ingestion (via adapters) | HTTPS REST | News articles, market bars, grouped daily data | | **SEC EDGAR** | Ingestion (via FilingsDataAdapter) | HTTPS REST | 10-K, 10-Q filings | | **Ollama** | Extractor, Recommendation | HTTP `/api/generate` | LLM inference for document extraction, event classification, thesis rewriting | | **Alpaca** | Broker Adapter | HTTPS REST | Paper trading order submission, position sync, account state | | **AWS SNS** | Trading Engine (notifications) | boto3 SDK | SMS alerts for circuit breaker trips, order fills, stop-loss triggers | | **Gmail** | Trading Engine (notifications) | SMTP (port 587 STARTTLS) | Email alerts for trading events | | **Trino** | Query API, Superset | JDBC / HTTP | SQL queries over lakehouse Parquet files |