88ad1e8d99
- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py) - Add all 13 app services + dashboard to docker-compose.yml - Add full documentation suite: API reference, Helm reference, Docker deployment guide, 3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide, backup/restore guide, observability/metrics reference, per-service docs - Add intelligence pipeline deep-dive docs with Mermaid diagrams - Update README with documentation index and links - Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive, sanitized-pipeline-docs
275 lines
16 KiB
Markdown
275 lines
16 KiB
Markdown
# Data Pipeline Architecture — Stonks Oracle
|
|
|
|
This document describes the end-to-end data pipeline from external data sources through signal processing to trade execution. The pipeline is queue-driven, with Redis lists connecting each stage and PostgreSQL/MinIO providing durable storage at every step.
|
|
|
|
All queue names follow the convention `stonks:queue:<name>` (see `services/shared/redis_keys.py`). Dead-letter queues mirror the pattern as `stonks:dlq:<name>`.
|
|
|
|
## Pipeline Overview
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
%% ── External Data Sources ─────────────────────────────────────
|
|
subgraph sources ["External Data Sources"]
|
|
direction LR
|
|
polygon["Polygon.io<br/><i>News, Market Bars,<br/>Grouped Daily</i>"]
|
|
sec["SEC EDGAR<br/><i>10-K, 10-Q Filings</i>"]
|
|
macro_src["Macro News APIs<br/><i>Geopolitical &<br/>Economic Events</i>"]
|
|
market_src["Market Data API<br/><i>Intraday Bars,<br/>Grouped Daily</i>"]
|
|
end
|
|
|
|
%% ── Scheduler ─────────────────────────────────────────────────
|
|
scheduler["<b>Scheduler</b><br/><i>services.scheduler.app</i><br/>Cadence polling, rate limiting,<br/>backoff & stale recovery"]
|
|
|
|
sources -.->|"API polling<br/>on cadence"| scheduler
|
|
|
|
%% ── Ingestion Queue ───────────────────────────────────────────
|
|
q_ingestion[["stonks:queue:ingestion"]]
|
|
scheduler -->|"rpush job"| q_ingestion
|
|
|
|
%% ── Ingestion Worker ──────────────────────────────────────────
|
|
ingestion["<b>Ingestion</b><br/><i>services.ingestion.worker</i><br/>Adapter dispatch, dedupe,<br/>raw artifact upload"]
|
|
|
|
q_ingestion -->|"lpop"| ingestion
|
|
|
|
%% ── Raw Storage ───────────────────────────────────────────────
|
|
minio_raw[("MinIO<br/><i>Raw Artifacts</i><br/>JSON / HTML")]
|
|
pg_docs[("PostgreSQL<br/><i>documents,<br/>ingestion_runs</i>")]
|
|
redis_dedupe[("Redis<br/><i>Dedupe Markers</i><br/>stonks:dedupe:*")]
|
|
|
|
ingestion -->|"upload raw payload"| minio_raw
|
|
ingestion -->|"persist metadata"| pg_docs
|
|
ingestion -->|"set content hash"| redis_dedupe
|
|
|
|
%% ── Parsing Queue ─────────────────────────────────────────────
|
|
q_parsing[["stonks:queue:parsing"]]
|
|
ingestion -->|"rpush<br/>(news, filings,<br/>web_scrape)"| q_parsing
|
|
|
|
%% ── Parser Worker ─────────────────────────────────────────────
|
|
parser["<b>Parser</b><br/><i>services.parser.worker</i><br/>HTML parsing, quality scoring,<br/>company mention detection"]
|
|
|
|
q_parsing -->|"lpop"| parser
|
|
|
|
minio_norm[("MinIO<br/><i>Normalized Text</i><br/><i>Parser Output JSON</i>")]
|
|
parser -->|"upload normalized text"| minio_norm
|
|
parser -->|"update document status,<br/>insert mentions"| pg_docs
|
|
```
|
|
|
|
## Three Signal Layers
|
|
|
|
The parser routes documents into two extraction paths based on `document_type`. All three signal layers converge at the aggregation stage through the shared `WeightedSignal` abstraction.
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
%% ── Parser Output ─────────────────────────────────────────────
|
|
parser(("Parser"))
|
|
|
|
%% ── Extraction Queues ─────────────────────────────────────────
|
|
q_extraction[["stonks:queue:extraction"]]
|
|
q_macro[["stonks:queue:macro_classification"]]
|
|
|
|
parser -->|"rpush<br/>(standard docs)"| q_extraction
|
|
parser -->|"rpush<br/>(macro_event docs)"| q_macro
|
|
|
|
%% ── Extractor Worker ──────────────────────────────────────────
|
|
subgraph extractor_svc ["Extractor Service"]
|
|
direction TB
|
|
ext_main["<b>Extractor</b><br/><i>services.extractor.main</i><br/>Alternates between queues<br/>(2 extraction : 1 macro)"]
|
|
end
|
|
|
|
q_extraction -->|"lpop"| ext_main
|
|
q_macro -->|"lpop"| ext_main
|
|
|
|
%% ── Ollama LLM ───────────────────────────────────────────────
|
|
ollama["<b>Ollama</b><br/><i>LLM Inference</i><br/>document-extractor agent<br/>event-classifier agent"]
|
|
ext_main <-->|"HTTP /api/generate"| ollama
|
|
|
|
%% ── Signal Layer 1: Company ───────────────────────────────────
|
|
subgraph layer1 ["Layer 1 — Company Signals"]
|
|
direction LR
|
|
di["document_intelligence<br/>document_impact_records"]
|
|
end
|
|
|
|
ext_main -->|"persist extraction<br/>(standard docs)"| di
|
|
|
|
%% ── Signal Layer 2: Macro ─────────────────────────────────────
|
|
subgraph layer2 ["Layer 2 — Macro Signals"]
|
|
direction LR
|
|
ge["global_events"]
|
|
mir["macro_impact_records<br/><i>per-company interpolation</i>"]
|
|
ge --> mir
|
|
end
|
|
|
|
ext_main -->|"classify & persist<br/>(macro_event docs)"| ge
|
|
ext_main -->|"compute_macro_impact<br/>for all tracked companies"| mir
|
|
|
|
%% ── Aggregation Queue ─────────────────────────────────────────
|
|
q_agg[["stonks:queue:aggregation"]]
|
|
ext_main -->|"rpush<br/>(per ticker)"| q_agg
|
|
|
|
%% ── Aggregation Worker ────────────────────────────────────────
|
|
aggregation["<b>Aggregation</b><br/><i>services.aggregation.main</i><br/>Trend windows, scoring,<br/>contradiction detection"]
|
|
|
|
q_agg -->|"lpop"| aggregation
|
|
|
|
%% ── Signal Layer 3: Competitive ──────────────────────────────
|
|
subgraph layer3 ["Layer 3 — Competitive Signals"]
|
|
direction LR
|
|
pm["pattern_matcher<br/><i>historical patterns</i>"]
|
|
sp["signal_propagation<br/><i>cross-company signals</i>"]
|
|
csr["competitive_signal_records"]
|
|
pm --> sp --> csr
|
|
end
|
|
|
|
aggregation -->|"trigger_signal_propagation<br/>(when competitive_enabled)"| layer3
|
|
|
|
%% ── All layers merge ──────────────────────────────────────────
|
|
pg_trends[("PostgreSQL<br/><i>trend_windows,<br/>trend_history,<br/>trend_projections</i>")]
|
|
|
|
di -->|"WeightedSignal"| aggregation
|
|
mir -->|"WeightedSignal"| aggregation
|
|
csr -->|"WeightedSignal"| aggregation
|
|
aggregation -->|"persist trend summaries"| pg_trends
|
|
```
|
|
|
|
## Recommendation → Trading → Broker
|
|
|
|
```mermaid
|
|
flowchart TB
|
|
%% ── Recommendation Queue ──────────────────────────────────────
|
|
q_rec[["stonks:queue:recommendation"]]
|
|
aggregation(("Aggregation")) -->|"rpush<br/>(ticker + window,<br/>dedup 5 min TTL)"| q_rec
|
|
|
|
%% ── Recommendation Worker ─────────────────────────────────────
|
|
recommendation["<b>Recommendation</b><br/><i>services.recommendation.main</i><br/>Eligibility, suppression,<br/>thesis generation"]
|
|
|
|
q_rec -->|"lpop"| recommendation
|
|
|
|
ollama_thesis["<b>Ollama</b><br/><i>thesis-rewriter agent</i><br/>(optional LLM rewrite)"]
|
|
recommendation <-->|"rewrite thesis<br/>(trading-eligible only)"| ollama_thesis
|
|
|
|
pg_recs[("PostgreSQL<br/><i>recommendations,<br/>recommendation_evidence,<br/>risk_evaluations</i>")]
|
|
recommendation -->|"persist recommendation<br/>+ evidence + risk eval"| pg_recs
|
|
|
|
%% ── Trading Engine ────────────────────────────────────────────
|
|
subgraph trading_loop ["Trading Engine Decision Loop"]
|
|
direction TB
|
|
poll["Poll recommendations<br/><i>action IN (buy, sell)<br/>mode IN (paper, live)<br/>generated_at > last_poll</i>"]
|
|
dedup_check["Redis dedup check<br/><i>stonks:dedupe:trading:*</i>"]
|
|
evaluate["evaluate_recommendation<br/><i>Circuit breaker check<br/>Trading window check<br/>Confidence gate<br/>Sector exposure check<br/>Correlation check<br/>Earnings blackout</i>"]
|
|
size["Position sizing<br/><i>Kelly criterion,<br/>risk tier limits</i>"]
|
|
decide{{"Decision"}}
|
|
poll --> dedup_check --> evaluate --> size --> decide
|
|
end
|
|
|
|
pg_recs -->|"SELECT recent<br/>recommendations"| poll
|
|
|
|
%% ── Broker Queue ──────────────────────────────────────────────
|
|
q_broker[["stonks:queue:broker_orders"]]
|
|
decide -->|"act → rpush<br/>order job"| q_broker
|
|
decide -->|"skip → persist<br/>decision only"| pg_decisions
|
|
|
|
pg_decisions[("PostgreSQL<br/><i>trading_decisions</i>")]
|
|
|
|
%% ── Broker Adapter ────────────────────────────────────────────
|
|
broker["<b>Broker Adapter</b><br/><i>services.adapters.broker_service</i><br/>Risk evaluation, idempotency,<br/>order submission, fill tracking"]
|
|
|
|
q_broker -->|"lpop"| broker
|
|
|
|
%% ── Risk Engine ───────────────────────────────────────────────
|
|
risk["<b>Risk Engine</b><br/><i>services.risk.app</i><br/>POST /evaluate<br/>Approval workflow"]
|
|
broker <-->|"evaluate order"| risk
|
|
|
|
%% ── Alpaca ────────────────────────────────────────────────────
|
|
alpaca["<b>Alpaca</b><br/><i>Paper Trading API</i><br/>Order submission,<br/>position sync"]
|
|
broker <-->|"submit order /<br/>sync positions"| alpaca
|
|
|
|
pg_orders[("PostgreSQL<br/><i>orders, order_events,<br/>positions,<br/>portfolio_snapshots</i>")]
|
|
broker -->|"persist order,<br/>events, positions"| pg_orders
|
|
|
|
%% ── Notifications ─────────────────────────────────────────────
|
|
subgraph notifications ["Notifications"]
|
|
direction LR
|
|
sns["AWS SNS<br/><i>SMS alerts</i>"]
|
|
gmail["Gmail SMTP<br/><i>Email alerts</i>"]
|
|
end
|
|
|
|
trading_loop -->|"circuit breaker trips,<br/>order fills,<br/>stop-loss triggers"| notifications
|
|
```
|
|
|
|
## Analytical Branch — Lake Publisher
|
|
|
|
The lake publisher runs as a separate worker, consuming from its own queue and writing partitioned Parquet fact tables to MinIO for analytical queries.
|
|
|
|
```mermaid
|
|
flowchart LR
|
|
%% ── Lake Publish Queue ────────────────────────────────────────
|
|
q_lake[["stonks:queue:lake_publish"]]
|
|
|
|
various(("Various Services<br/><i>ingestion, extractor,<br/>recommendation,<br/>broker adapter</i>"))
|
|
various -->|"enqueue_lake_job"| q_lake
|
|
|
|
%% ── Lake Publisher Worker ─────────────────────────────────────
|
|
lake["<b>Lake Publisher</b><br/><i>services.lake_publisher.jobs</i><br/>Transforms operational data<br/>into analytical facts"]
|
|
|
|
q_lake -->|"lpop"| lake
|
|
|
|
pg_source[("PostgreSQL<br/><i>Operational Tables</i><br/>documents, extractions,<br/>orders, positions, events")]
|
|
lake -->|"query source data"| pg_source
|
|
|
|
%% ── MinIO Parquet ─────────────────────────────────────────────
|
|
minio_lake[("MinIO<br/><i>Lakehouse Bucket</i><br/>Partitioned Parquet<br/>/year=/month=/day=")]
|
|
lake -->|"write Parquet files"| minio_lake
|
|
|
|
%% ── Trino ─────────────────────────────────────────────────────
|
|
trino["<b>Trino</b><br/><i>SQL Query Engine</i><br/>Hive connector → MinIO"]
|
|
minio_lake -->|"read via<br/>Hive Metastore"| trino
|
|
|
|
hive["<b>Hive Metastore</b><br/><i>Schema catalog</i>"]
|
|
trino <-->|"table metadata"| hive
|
|
hive -->|"location refs"| minio_lake
|
|
|
|
%% ── Visualization ─────────────────────────────────────────────
|
|
superset["<b>Superset</b><br/><i>Dashboards &<br/>SQL Lab</i>"]
|
|
dashboard["<b>React Dashboard</b><br/><i>frontend</i><br/>Charts, portfolio,<br/>recommendations"]
|
|
query_api["<b>Query API</b><br/><i>services.api.app</i>"]
|
|
|
|
trino --> superset
|
|
trino --> query_api
|
|
query_api --> dashboard
|
|
```
|
|
|
|
## Complete Queue Topology
|
|
|
|
| Queue | Full Key | Producer(s) | Consumer |
|
|
|-------|----------|-------------|----------|
|
|
| Ingestion | `stonks:queue:ingestion` | Scheduler | Ingestion Worker |
|
|
| Parsing | `stonks:queue:parsing` | Ingestion Worker | Parser Worker |
|
|
| Extraction | `stonks:queue:extraction` | Parser (standard docs) | Extractor Worker |
|
|
| Macro Classification | `stonks:queue:macro_classification` | Parser (macro_event docs), Scheduler | Extractor Worker |
|
|
| Aggregation | `stonks:queue:aggregation` | Extractor Worker | Aggregation Worker |
|
|
| Recommendation | `stonks:queue:recommendation` | Aggregation Worker | Recommendation Worker |
|
|
| Broker Orders | `stonks:queue:broker_orders` | Trading Engine, Trading API (manual overrides) | Broker Adapter |
|
|
| Lake Publish | `stonks:queue:lake_publish` | Various services | Lake Publisher |
|
|
|
|
Dead-letter queues follow the pattern `stonks:dlq:<queue_name>` and are populated when a job exhausts its retry budget.
|
|
|
|
## Data Store Summary
|
|
|
|
| Store | Role | Key Tables / Buckets |
|
|
|-------|------|---------------------|
|
|
| **PostgreSQL** | Structured operational data | `documents`, `document_intelligence`, `document_impact_records`, `global_events`, `macro_impact_records`, `competitive_signal_records`, `trend_windows`, `trend_history`, `trend_projections`, `recommendations`, `recommendation_evidence`, `risk_evaluations`, `orders`, `order_events`, `positions`, `portfolio_snapshots`, `trading_decisions` |
|
|
| **Redis** | Queues, dedup markers, rate limits, circuit breaker state | `stonks:queue:*`, `stonks:dedupe:*`, `stonks:ratelimit:*`, `stonks:trading:circuit_breaker:*`, `stonks:dlq:*` |
|
|
| **MinIO** | Object storage for raw artifacts, normalized text, and analytical Parquet files | Raw artifacts bucket, normalized text bucket, lakehouse bucket (partitioned Parquet) |
|
|
|
|
## External Integration Points
|
|
|
|
| Integration | Service | Protocol | Purpose |
|
|
|-------------|---------|----------|---------|
|
|
| **Polygon.io** | Ingestion (via adapters) | HTTPS REST | News articles, market bars, grouped daily data |
|
|
| **SEC EDGAR** | Ingestion (via FilingsDataAdapter) | HTTPS REST | 10-K, 10-Q filings |
|
|
| **Ollama** | Extractor, Recommendation | HTTP `/api/generate` | LLM inference for document extraction, event classification, thesis rewriting |
|
|
| **Alpaca** | Broker Adapter | HTTPS REST | Paper trading order submission, position sync, account state |
|
|
| **AWS SNS** | Trading Engine (notifications) | boto3 SDK | SMS alerts for circuit breaker trips, order fills, stop-loss triggers |
|
|
| **Gmail** | Trading Engine (notifications) | SMTP (port 587 STARTTLS) | Email alerts for trading events |
|
|
| **Trino** | Query API, Superset | JDBC / HTTP | SQL queries over lakehouse Parquet files |
|