Files
stonks-oracle/docs/architecture-data-pipeline.md
T
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

331 lines
22 KiB
Markdown

# Data Pipeline Architecture — Stonks Oracle
This document describes the end-to-end data pipeline from external data sources through signal processing to trade execution. The pipeline is queue-driven, with Redis lists connecting each stage and PostgreSQL/MinIO providing durable storage at every step.
All queue names follow the convention `stonks:queue:<name>` (see `services/shared/redis_keys.py`). Dead-letter queues mirror the pattern as `stonks:dlq:<name>`.
## Pipeline Overview
```mermaid
flowchart TB
%% ── External Data Sources ─────────────────────────────────────
subgraph sources ["External Data Sources"]
direction LR
polygon["Polygon.io<br/><i>News, Market Bars,<br/>Grouped Daily</i>"]
sec["SEC EDGAR<br/><i>10-K, 10-Q Filings</i>"]
macro_src["Macro News APIs<br/><i>Geopolitical &amp;<br/>Economic Events</i>"]
market_src["Market Data API<br/><i>Intraday Bars,<br/>Grouped Daily</i>"]
end
%% ── Scheduler ─────────────────────────────────────────────────
scheduler["<b>Scheduler</b><br/><i>services.scheduler.app</i><br/>Cadence polling, rate limiting,<br/>backoff, stale recovery,<br/>periodic aggregation,<br/>report scheduling"]
sources -.->|"API polling<br/>on cadence"| scheduler
%% ── Ingestion Queue ───────────────────────────────────────────
q_ingestion[["stonks:queue:ingestion"]]
scheduler -->|"rpush job<br/>(company, macro,<br/>global market)"| q_ingestion
%% ── Ingestion Worker ──────────────────────────────────────────
ingestion["<b>Ingestion</b><br/><i>services.ingestion.worker</i><br/>Adapter dispatch, dedupe,<br/>raw artifact upload"]
q_ingestion -->|"lpop"| ingestion
%% ── Raw Storage ───────────────────────────────────────────────
minio_raw[("MinIO<br/><i>Raw Artifacts</i><br/>JSON / HTML")]
pg_docs[("PostgreSQL<br/><i>documents,<br/>ingestion_runs</i>")]
redis_dedupe[("Redis<br/><i>Dedupe Markers</i><br/>stonks:dedupe:*")]
ingestion -->|"upload raw payload"| minio_raw
ingestion -->|"persist metadata"| pg_docs
ingestion -->|"set content hash"| redis_dedupe
%% ── Parsing Queue ─────────────────────────────────────────────
q_parsing[["stonks:queue:parsing"]]
ingestion -->|"rpush<br/>(news, filings,<br/>web_scrape, macro)"| q_parsing
%% ── Parser Worker ─────────────────────────────────────────────
parser["<b>Parser</b><br/><i>services.parser.worker</i><br/>HTML parsing, quality scoring,<br/>company mention detection"]
q_parsing -->|"lpop"| parser
minio_norm[("MinIO<br/><i>Normalized Text</i><br/><i>Parser Output JSON</i>")]
parser -->|"upload normalized text<br/>+ structured output"| minio_norm
parser -->|"update document status,<br/>insert mentions"| pg_docs
```
## Three Signal Layers
The parser routes documents into two extraction paths based on `document_type`. All three signal layers converge at the aggregation stage through the shared `WeightedSignal` abstraction.
```mermaid
flowchart TB
%% ── Parser Output ─────────────────────────────────────────────
parser(("Parser"))
%% ── Extraction Queues ─────────────────────────────────────────
q_extraction[["stonks:queue:extraction"]]
q_macro[["stonks:queue:macro_classification"]]
parser -->|"rpush<br/>(standard docs)"| q_extraction
parser -->|"rpush<br/>(macro_event docs)"| q_macro
%% ── Scheduler Recovery ────────────────────────────────────────
scheduler_recovery(("Scheduler<br/><i>stale recovery &amp;<br/>failed retry</i>"))
scheduler_recovery -.->|"re-enqueue orphaned<br/>parsed docs"| q_extraction
scheduler_recovery -.->|"re-enqueue orphaned<br/>macro docs"| q_macro
%% ── Extractor Worker ──────────────────────────────────────────
subgraph extractor_svc ["Extractor Service"]
direction TB
ext_main["<b>Extractor</b><br/><i>services.extractor.main</i><br/>Alternates between queues<br/>(2 extraction : 1 macro)<br/>Token budget enforcement"]
end
q_extraction -->|"lpop"| ext_main
q_macro -->|"lpop"| ext_main
%% ── Ollama LLM ───────────────────────────────────────────────
ollama["<b>Ollama / vLLM</b><br/><i>LLM Inference</i><br/>document-extractor agent<br/>event-classifier agent"]
ext_main <-->|"HTTP /api/generate<br/>(AgentConfigResolver<br/>selects model + variant)"| ollama
%% ── Signal Layer 1: Company ───────────────────────────────────
subgraph layer1 ["Layer 1 — Company Signals"]
direction LR
di["document_intelligence<br/>document_impact_records"]
end
ext_main -->|"persist extraction<br/>(standard docs)"| di
%% ── Signal Layer 2: Macro ─────────────────────────────────────
subgraph layer2 ["Layer 2 — Macro Signals"]
direction LR
ge["global_events"]
mir["macro_impact_records<br/><i>per-company interpolation<br/>via exposure profiles</i>"]
ge --> mir
end
ext_main -->|"classify &amp; persist<br/>(macro_event docs)"| ge
ext_main -->|"compute_macro_impact<br/>for all tracked companies"| mir
%% ── Aggregation Queue ─────────────────────────────────────────
q_agg[["stonks:queue:aggregation"]]
ext_main -->|"rpush<br/>(per ticker)"| q_agg
%% ── Scheduler Periodic Aggregation ────────────────────────────
scheduler_agg(("Scheduler<br/><i>periodic aggregation<br/>every ~15 min</i>"))
scheduler_agg -.->|"rpush all<br/>active tickers"| q_agg
%% ── Aggregation Worker ────────────────────────────────────────
aggregation["<b>Aggregation</b><br/><i>services.aggregation.main</i><br/>Trend windows, scoring,<br/>contradiction detection"]
q_agg -->|"lpop"| aggregation
%% ── Signal Layer 3: Competitive ──────────────────────────────
subgraph layer3 ["Layer 3 — Competitive Signals"]
direction LR
pm["pattern_matcher<br/><i>historical patterns</i>"]
sp["signal_propagation<br/><i>cross-company signals</i>"]
csr["competitive_signal_records"]
pm --> sp --> csr
end
aggregation -->|"trigger_signal_propagation<br/>(when competitive_enabled)"| layer3
%% ── All layers merge ──────────────────────────────────────────
pg_trends[("PostgreSQL<br/><i>trend_windows,<br/>trend_history,<br/>trend_projections</i>")]
di -->|"WeightedSignal"| aggregation
mir -->|"WeightedSignal"| aggregation
csr -->|"WeightedSignal"| aggregation
aggregation -->|"persist trend summaries"| pg_trends
```
## Recommendation → Trading → Broker
The recommendation worker consumes from the recommendation queue. The trading engine does **not** consume from a queue — it polls the `recommendations` table in PostgreSQL on a configurable interval, evaluates each recommendation through its decision pipeline, and pushes "act" decisions to the broker queue.
```mermaid
flowchart TB
%% ── Recommendation Queue ──────────────────────────────────────
q_rec[["stonks:queue:recommendation"]]
aggregation(("Aggregation")) -->|"rpush<br/>(ticker + window,<br/>dedup 5 min TTL)"| q_rec
%% ── Recommendation Worker ─────────────────────────────────────
recommendation["<b>Recommendation</b><br/><i>services.recommendation.main</i><br/>Eligibility, suppression,<br/>thesis generation"]
q_rec -->|"lpop"| recommendation
ollama_thesis["<b>Ollama / vLLM</b><br/><i>thesis-rewriter agent</i><br/>(AgentConfigResolver<br/>selects model + variant)"]
recommendation <-->|"rewrite thesis<br/>(trading-eligible only)"| ollama_thesis
pg_recs[("PostgreSQL<br/><i>recommendations,<br/>recommendation_evidence,<br/>risk_evaluations</i>")]
recommendation -->|"persist recommendation<br/>+ evidence + risk eval"| pg_recs
%% ── Lake Publication (inline) ─────────────────────────────────
minio_rec_lake[("MinIO<br/><i>Lakehouse</i><br/>recommendation facts")]
recommendation -->|"publish_recommendation_facts<br/>(Parquet)"| minio_rec_lake
%% ── Trading Engine ────────────────────────────────────────────
subgraph trading_loop ["Trading Engine Decision Loop"]
direction TB
poll["Poll recommendations<br/><i>action IN (buy, sell)<br/>mode IN (paper, live)<br/>generated_at &gt; last_poll</i>"]
dedup_check["Redis dedup check<br/><i>stonks:dedupe:trading:*</i>"]
evaluate["evaluate_recommendation<br/><i>Circuit breaker check<br/>Trading window check<br/>Confidence gate<br/>Sector exposure check<br/>Correlation check<br/>Earnings blackout<br/>Max positions check</i>"]
size["Position sizing<br/><i>Kelly criterion,<br/>risk tier limits,<br/>micro-trade support</i>"]
decide{{"Decision"}}
poll --> dedup_check --> evaluate --> size --> decide
end
pg_recs -->|"SELECT recent<br/>recommendations"| poll
%% ── Broker Queue ──────────────────────────────────────────────
q_broker[["stonks:queue:broker_orders"]]
decide -->|"act → rpush<br/>order job"| q_broker
decide -->|"skip → persist<br/>decision only"| pg_decisions
pg_decisions[("PostgreSQL<br/><i>trading_decisions</i>")]
%% ── Manual Override ───────────────────────────────────────────
trading_api(("Trading API<br/><i>POST /override/order</i>"))
trading_api -->|"rpush<br/>manual order"| q_broker
%% ── Broker Adapter ────────────────────────────────────────────
broker["<b>Broker Adapter</b><br/><i>services.adapters.broker_service</i><br/>Idempotency, risk evaluation,<br/>approval gate, order submission,<br/>fill tracking, position sync"]
q_broker -->|"lpop"| broker
%% ── Risk Engine ───────────────────────────────────────────────
risk["<b>Risk Engine</b><br/><i>services.risk.app</i><br/>evaluate_order()<br/>Position limits, sector exposure,<br/>daily loss caps, approval workflow"]
broker -->|"evaluate order<br/>(inline call)"| risk
%% ── Alpaca ────────────────────────────────────────────────────
alpaca["<b>Alpaca</b><br/><i>Paper Trading API</i><br/>Order submission,<br/>position sync,<br/>account state"]
broker <-->|"submit order /<br/>sync positions /<br/>sync order status"| alpaca
pg_orders[("PostgreSQL<br/><i>orders, order_events,<br/>positions,<br/>portfolio_snapshots,<br/>broker_accounts</i>")]
broker -->|"persist order,<br/>events, positions"| pg_orders
%% ── Lake Publication (broker inline) ──────────────────────────
minio_broker_lake[("MinIO<br/><i>Lakehouse</i><br/>order + fill + position facts")]
broker -->|"publish_trade_order<br/>publish_trade_fill<br/>publish_positions_daily_batch<br/>(Parquet)"| minio_broker_lake
%% ── Notifications ─────────────────────────────────────────────
subgraph notifications ["Notifications"]
direction LR
sns["AWS SNS<br/><i>SMS alerts</i>"]
gmail["Gmail SMTP<br/><i>Email alerts</i>"]
end
trading_loop -->|"circuit breaker trips,<br/>order fills,<br/>stop-loss triggers"| notifications
```
## Analytical Branch — Lake Publisher
The lake publisher runs as a separate worker, consuming from its own queue and writing partitioned Parquet fact tables to MinIO for analytical queries. Some services (broker adapter, recommendation worker) also publish facts directly to MinIO inline, bypassing the queue.
```mermaid
flowchart LR
%% ── Lake Publish Queue ────────────────────────────────────────
q_lake[["stonks:queue:lake_publish"]]
various(("Upstream Services<br/><i>via enqueue_lake_job()</i>"))
various -->|"rpush job<br/>(job_type + entity_id)"| q_lake
%% ── Lake Publisher Worker ─────────────────────────────────────
lake["<b>Lake Publisher</b><br/><i>services.lake_publisher.jobs</i><br/>Transforms operational data<br/>into analytical facts<br/><i>15 job types supported</i>"]
q_lake -->|"lpop"| lake
pg_source[("PostgreSQL<br/><i>Operational Tables</i><br/>documents, extractions,<br/>orders, positions, events,<br/>global_events, macro_impacts,<br/>competitive_signals")]
lake -->|"query source data"| pg_source
%% ── MinIO Parquet ─────────────────────────────────────────────
minio_lake[("MinIO<br/><i>Lakehouse Bucket</i><br/>Partitioned Parquet<br/>/year=/month=/day=")]
lake -->|"write Parquet files"| minio_lake
%% ── Inline Publishers ─────────────────────────────────────────
inline(("Inline Publishers<br/><i>broker adapter,<br/>recommendation worker</i>"))
inline -->|"publish_* functions<br/>(direct Parquet write)"| minio_lake
%% ── Trino ─────────────────────────────────────────────────────
trino["<b>Trino</b><br/><i>SQL Query Engine</i><br/>Hive connector → MinIO"]
minio_lake -->|"read via<br/>Hive Metastore"| trino
hive["<b>Hive Metastore</b><br/><i>Schema catalog</i>"]
trino <-->|"table metadata"| hive
hive -->|"location refs"| minio_lake
%% ── Visualization ─────────────────────────────────────────────
superset["<b>Superset</b><br/><i>Dashboards &amp;<br/>SQL Lab</i>"]
dashboard["<b>React Dashboard</b><br/><i>frontend</i><br/>Charts, portfolio,<br/>recommendations"]
query_api["<b>Query API</b><br/><i>services.api.app</i>"]
trino --> superset
trino --> query_api
query_api --> dashboard
```
## Report Generation
The scheduler manages report generation as a sub-loop, enqueuing daily and weekly report jobs to a dedicated queue and consuming them inline.
```mermaid
flowchart LR
scheduler["<b>Scheduler</b><br/><i>report schedule check</i><br/>daily @ 16:30 ET<br/>weekly @ Saturday"]
q_report[["stonks:queue:report_generation"]]
scheduler -->|"rpush<br/>(daily/weekly)"| q_report
scheduler_consumer["<b>Scheduler</b><br/><i>report consumer loop</i><br/>pops up to 5 jobs/cycle"]
q_report -->|"lpop"| scheduler_consumer
generator["<b>Report Generator</b><br/><i>services.reporting.generator</i>"]
scheduler_consumer -->|"process_report_job()"| generator
pg_reports[("PostgreSQL<br/><i>trading_reports</i>")]
generator -->|"persist report"| pg_reports
```
## Complete Queue Topology
| Queue | Full Key | Producer(s) | Consumer |
|-------|----------|-------------|----------|
| Ingestion | `stonks:queue:ingestion` | Scheduler (company, macro, global market sources) | Ingestion Worker |
| Parsing | `stonks:queue:parsing` | Ingestion Worker (news, filings, web_scrape, macro) | Parser Worker |
| Extraction | `stonks:queue:extraction` | Parser (standard docs), Scheduler (stale recovery) | Extractor Worker |
| Macro Classification | `stonks:queue:macro_classification` | Parser (macro_event docs), Scheduler (stale/failed recovery) | Extractor Worker |
| Aggregation | `stonks:queue:aggregation` | Extractor Worker (per ticker), Scheduler (periodic, all tickers) | Aggregation Worker |
| Recommendation | `stonks:queue:recommendation` | Aggregation Worker (ticker + window, 5 min dedup TTL) | Recommendation Worker |
| Broker Orders | `stonks:queue:broker_orders` | Trading Engine (act decisions), Trading API (manual overrides) | Broker Adapter |
| Lake Publish | `stonks:queue:lake_publish` | Various services (via `enqueue_lake_job()`) | Lake Publisher |
| Report Generation | `stonks:queue:report_generation` | Scheduler (daily/weekly triggers) | Scheduler (inline consumer) |
Dead-letter queues follow the pattern `stonks:dlq:<queue_name>` and are populated when a job exhausts its retry budget.
## Data Store Summary
| Store | Role | Key Tables / Buckets |
|-------|------|---------------------|
| **PostgreSQL** | Structured operational data | `documents`, `document_intelligence`, `document_impact_records`, `document_company_mentions`, `global_events`, `macro_impact_records`, `exposure_profiles`, `competitive_signal_records`, `competitor_relationships`, `trend_windows`, `trend_history`, `trend_projections`, `recommendations`, `recommendation_evidence`, `risk_evaluations`, `orders`, `order_events`, `positions`, `portfolio_snapshots`, `trading_decisions`, `circuit_breaker_events`, `reserve_pool_ledger`, `risk_tier_history`, `broker_accounts`, `ingestion_runs`, `sources`, `companies`, `company_aliases`, `ai_agents`, `agent_variants`, `agent_performance_log`, `risk_configs`, `trading_reports` |
| **Redis** | Queues, dedup markers, rate limits, circuit breaker state, pipeline toggle | `stonks:queue:*` (9 queues), `stonks:dedupe:*`, `stonks:dedupe:trading:*`, `stonks:ratelimit:*`, `stonks:trading:circuit_breaker:*`, `stonks:trading:notification_rate:*`, `stonks:order_idempotency:*`, `stonks:lock:*`, `stonks:cache:*`, `stonks:retry:*`, `stonks:rec_dedup:*`, `stonks:pipeline:enabled`, `stonks:dlq:*` |
| **MinIO** | Object storage for raw artifacts, normalized text, and analytical Parquet files | Raw artifacts bucket, normalized text bucket, parser output bucket, lakehouse bucket (partitioned Parquet: documents, extractions, market bars/quotes, orders, fills, positions, PnL, global events, macro impacts, trend projections, competitive signals, competitor relationships, recommendations) |
## External Integration Points
| Integration | Service | Protocol | Purpose |
|-------------|---------|----------|---------|
| **Polygon.io** | Ingestion (via PolygonNewsAdapter, PolygonMarketAdapter) | HTTPS REST | News articles, market bars, grouped daily data, intraday bars |
| **SEC EDGAR** | Ingestion (via SECEdgarAdapter) | HTTPS REST | 10-K, 10-Q filings |
| **Macro News** | Ingestion (via MacroNewsAdapter) | HTTPS REST | Geopolitical and economic event articles |
| **Ollama / vLLM** | Extractor, Recommendation | HTTP `/api/generate` | LLM inference for document extraction (document-extractor agent), event classification (event-classifier agent), thesis rewriting (thesis-rewriter agent). Model and variant selected via `AgentConfigResolver` with 60s TTL cache. |
| **Alpaca** | Broker Adapter | HTTPS REST | Paper/live trading: order submission, position sync, account state, order status polling |
| **AWS SNS** | Trading Engine (notifications) | boto3 SDK | SMS alerts for circuit breaker trips, order fills, stop-loss triggers |
| **Gmail** | Trading Engine (notifications) | SMTP (port 587 STARTTLS) | Email alerts for trading events |
| **Trino** | Query API, Superset | HTTP | SQL queries over lakehouse Parquet files via Hive Metastore |
## Pipeline Toggle
The pipeline can be paused globally via the Redis key `stonks:pipeline:enabled`. When set to `"0"`, all queue workers (ingestion, parser, extractor, aggregation, recommendation, broker adapter, lake publisher) enter a sleep loop and stop processing jobs. The scheduler also skips scheduling cycles when the toggle is off. The toggle can be set via the Query API's pipeline control endpoints.
Setting `PIPELINE_DEFAULT_OFF=true` on the scheduler initializes the toggle to OFF on first boot, useful for staged deployments where you want to verify infrastructure before enabling the pipeline.