Files
stonks-oracle/docs/architecture-data-pipeline.md
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

22 KiB

Data Pipeline Architecture — Stonks Oracle

This document describes the end-to-end data pipeline from external data sources through signal processing to trade execution. The pipeline is queue-driven, with Redis lists connecting each stage and PostgreSQL/MinIO providing durable storage at every step.

All queue names follow the convention stonks:queue:<name> (see services/shared/redis_keys.py). Dead-letter queues mirror the pattern as stonks:dlq:<name>.

Pipeline Overview

flowchart TB
    %% ── External Data Sources ─────────────────────────────────────
    subgraph sources ["External Data Sources"]
        direction LR
        polygon["Polygon.io<br/><i>News, Market Bars,<br/>Grouped Daily</i>"]
        sec["SEC EDGAR<br/><i>10-K, 10-Q Filings</i>"]
        macro_src["Macro News APIs<br/><i>Geopolitical &amp;<br/>Economic Events</i>"]
        market_src["Market Data API<br/><i>Intraday Bars,<br/>Grouped Daily</i>"]
    end

    %% ── Scheduler ─────────────────────────────────────────────────
    scheduler["<b>Scheduler</b><br/><i>services.scheduler.app</i><br/>Cadence polling, rate limiting,<br/>backoff, stale recovery,<br/>periodic aggregation,<br/>report scheduling"]

    sources -.->|"API polling<br/>on cadence"| scheduler

    %% ── Ingestion Queue ───────────────────────────────────────────
    q_ingestion[["stonks:queue:ingestion"]]
    scheduler -->|"rpush job<br/>(company, macro,<br/>global market)"| q_ingestion

    %% ── Ingestion Worker ──────────────────────────────────────────
    ingestion["<b>Ingestion</b><br/><i>services.ingestion.worker</i><br/>Adapter dispatch, dedupe,<br/>raw artifact upload"]

    q_ingestion -->|"lpop"| ingestion

    %% ── Raw Storage ───────────────────────────────────────────────
    minio_raw[("MinIO<br/><i>Raw Artifacts</i><br/>JSON / HTML")]
    pg_docs[("PostgreSQL<br/><i>documents,<br/>ingestion_runs</i>")]
    redis_dedupe[("Redis<br/><i>Dedupe Markers</i><br/>stonks:dedupe:*")]

    ingestion -->|"upload raw payload"| minio_raw
    ingestion -->|"persist metadata"| pg_docs
    ingestion -->|"set content hash"| redis_dedupe

    %% ── Parsing Queue ─────────────────────────────────────────────
    q_parsing[["stonks:queue:parsing"]]
    ingestion -->|"rpush<br/>(news, filings,<br/>web_scrape, macro)"| q_parsing

    %% ── Parser Worker ─────────────────────────────────────────────
    parser["<b>Parser</b><br/><i>services.parser.worker</i><br/>HTML parsing, quality scoring,<br/>company mention detection"]

    q_parsing -->|"lpop"| parser

    minio_norm[("MinIO<br/><i>Normalized Text</i><br/><i>Parser Output JSON</i>")]
    parser -->|"upload normalized text<br/>+ structured output"| minio_norm
    parser -->|"update document status,<br/>insert mentions"| pg_docs

Three Signal Layers

The parser routes documents into two extraction paths based on document_type. All three signal layers converge at the aggregation stage through the shared WeightedSignal abstraction.

flowchart TB
    %% ── Parser Output ─────────────────────────────────────────────
    parser(("Parser"))

    %% ── Extraction Queues ─────────────────────────────────────────
    q_extraction[["stonks:queue:extraction"]]
    q_macro[["stonks:queue:macro_classification"]]

    parser -->|"rpush<br/>(standard docs)"| q_extraction
    parser -->|"rpush<br/>(macro_event docs)"| q_macro

    %% ── Scheduler Recovery ────────────────────────────────────────
    scheduler_recovery(("Scheduler<br/><i>stale recovery &amp;<br/>failed retry</i>"))
    scheduler_recovery -.->|"re-enqueue orphaned<br/>parsed docs"| q_extraction
    scheduler_recovery -.->|"re-enqueue orphaned<br/>macro docs"| q_macro

    %% ── Extractor Worker ──────────────────────────────────────────
    subgraph extractor_svc ["Extractor Service"]
        direction TB
        ext_main["<b>Extractor</b><br/><i>services.extractor.main</i><br/>Alternates between queues<br/>(2 extraction : 1 macro)<br/>Token budget enforcement"]
    end

    q_extraction -->|"lpop"| ext_main
    q_macro -->|"lpop"| ext_main

    %% ── Ollama LLM ───────────────────────────────────────────────
    ollama["<b>Ollama / vLLM</b><br/><i>LLM Inference</i><br/>document-extractor agent<br/>event-classifier agent"]
    ext_main <-->|"HTTP /api/generate<br/>(AgentConfigResolver<br/>selects model + variant)"| ollama

    %% ── Signal Layer 1: Company ───────────────────────────────────
    subgraph layer1 ["Layer 1 — Company Signals"]
        direction LR
        di["document_intelligence<br/>document_impact_records"]
    end

    ext_main -->|"persist extraction<br/>(standard docs)"| di

    %% ── Signal Layer 2: Macro ─────────────────────────────────────
    subgraph layer2 ["Layer 2 — Macro Signals"]
        direction LR
        ge["global_events"]
        mir["macro_impact_records<br/><i>per-company interpolation<br/>via exposure profiles</i>"]
        ge --> mir
    end

    ext_main -->|"classify &amp; persist<br/>(macro_event docs)"| ge
    ext_main -->|"compute_macro_impact<br/>for all tracked companies"| mir

    %% ── Aggregation Queue ─────────────────────────────────────────
    q_agg[["stonks:queue:aggregation"]]
    ext_main -->|"rpush<br/>(per ticker)"| q_agg

    %% ── Scheduler Periodic Aggregation ────────────────────────────
    scheduler_agg(("Scheduler<br/><i>periodic aggregation<br/>every ~15 min</i>"))
    scheduler_agg -.->|"rpush all<br/>active tickers"| q_agg

    %% ── Aggregation Worker ────────────────────────────────────────
    aggregation["<b>Aggregation</b><br/><i>services.aggregation.main</i><br/>Trend windows, scoring,<br/>contradiction detection"]

    q_agg -->|"lpop"| aggregation

    %% ── Signal Layer 3: Competitive ──────────────────────────────
    subgraph layer3 ["Layer 3 — Competitive Signals"]
        direction LR
        pm["pattern_matcher<br/><i>historical patterns</i>"]
        sp["signal_propagation<br/><i>cross-company signals</i>"]
        csr["competitive_signal_records"]
        pm --> sp --> csr
    end

    aggregation -->|"trigger_signal_propagation<br/>(when competitive_enabled)"| layer3

    %% ── All layers merge ──────────────────────────────────────────
    pg_trends[("PostgreSQL<br/><i>trend_windows,<br/>trend_history,<br/>trend_projections</i>")]

    di -->|"WeightedSignal"| aggregation
    mir -->|"WeightedSignal"| aggregation
    csr -->|"WeightedSignal"| aggregation
    aggregation -->|"persist trend summaries"| pg_trends

Recommendation → Trading → Broker

The recommendation worker consumes from the recommendation queue. The trading engine does not consume from a queue — it polls the recommendations table in PostgreSQL on a configurable interval, evaluates each recommendation through its decision pipeline, and pushes "act" decisions to the broker queue.

flowchart TB
    %% ── Recommendation Queue ──────────────────────────────────────
    q_rec[["stonks:queue:recommendation"]]
    aggregation(("Aggregation")) -->|"rpush<br/>(ticker + window,<br/>dedup 5 min TTL)"| q_rec

    %% ── Recommendation Worker ─────────────────────────────────────
    recommendation["<b>Recommendation</b><br/><i>services.recommendation.main</i><br/>Eligibility, suppression,<br/>thesis generation"]

    q_rec -->|"lpop"| recommendation

    ollama_thesis["<b>Ollama / vLLM</b><br/><i>thesis-rewriter agent</i><br/>(AgentConfigResolver<br/>selects model + variant)"]
    recommendation <-->|"rewrite thesis<br/>(trading-eligible only)"| ollama_thesis

    pg_recs[("PostgreSQL<br/><i>recommendations,<br/>recommendation_evidence,<br/>risk_evaluations</i>")]
    recommendation -->|"persist recommendation<br/>+ evidence + risk eval"| pg_recs

    %% ── Lake Publication (inline) ─────────────────────────────────
    minio_rec_lake[("MinIO<br/><i>Lakehouse</i><br/>recommendation facts")]
    recommendation -->|"publish_recommendation_facts<br/>(Parquet)"| minio_rec_lake

    %% ── Trading Engine ────────────────────────────────────────────
    subgraph trading_loop ["Trading Engine Decision Loop"]
        direction TB
        poll["Poll recommendations<br/><i>action IN (buy, sell)<br/>mode IN (paper, live)<br/>generated_at &gt; last_poll</i>"]
        dedup_check["Redis dedup check<br/><i>stonks:dedupe:trading:*</i>"]
        evaluate["evaluate_recommendation<br/><i>Circuit breaker check<br/>Trading window check<br/>Confidence gate<br/>Sector exposure check<br/>Correlation check<br/>Earnings blackout<br/>Max positions check</i>"]
        size["Position sizing<br/><i>Kelly criterion,<br/>risk tier limits,<br/>micro-trade support</i>"]
        decide{{"Decision"}}
        poll --> dedup_check --> evaluate --> size --> decide
    end

    pg_recs -->|"SELECT recent<br/>recommendations"| poll

    %% ── Broker Queue ──────────────────────────────────────────────
    q_broker[["stonks:queue:broker_orders"]]
    decide -->|"act → rpush<br/>order job"| q_broker
    decide -->|"skip → persist<br/>decision only"| pg_decisions

    pg_decisions[("PostgreSQL<br/><i>trading_decisions</i>")]

    %% ── Manual Override ───────────────────────────────────────────
    trading_api(("Trading API<br/><i>POST /override/order</i>"))
    trading_api -->|"rpush<br/>manual order"| q_broker

    %% ── Broker Adapter ────────────────────────────────────────────
    broker["<b>Broker Adapter</b><br/><i>services.adapters.broker_service</i><br/>Idempotency, risk evaluation,<br/>approval gate, order submission,<br/>fill tracking, position sync"]

    q_broker -->|"lpop"| broker

    %% ── Risk Engine ───────────────────────────────────────────────
    risk["<b>Risk Engine</b><br/><i>services.risk.app</i><br/>evaluate_order()<br/>Position limits, sector exposure,<br/>daily loss caps, approval workflow"]
    broker -->|"evaluate order<br/>(inline call)"| risk

    %% ── Alpaca ────────────────────────────────────────────────────
    alpaca["<b>Alpaca</b><br/><i>Paper Trading API</i><br/>Order submission,<br/>position sync,<br/>account state"]
    broker <-->|"submit order /<br/>sync positions /<br/>sync order status"| alpaca

    pg_orders[("PostgreSQL<br/><i>orders, order_events,<br/>positions,<br/>portfolio_snapshots,<br/>broker_accounts</i>")]
    broker -->|"persist order,<br/>events, positions"| pg_orders

    %% ── Lake Publication (broker inline) ──────────────────────────
    minio_broker_lake[("MinIO<br/><i>Lakehouse</i><br/>order + fill + position facts")]
    broker -->|"publish_trade_order<br/>publish_trade_fill<br/>publish_positions_daily_batch<br/>(Parquet)"| minio_broker_lake

    %% ── Notifications ─────────────────────────────────────────────
    subgraph notifications ["Notifications"]
        direction LR
        sns["AWS SNS<br/><i>SMS alerts</i>"]
        gmail["Gmail SMTP<br/><i>Email alerts</i>"]
    end

    trading_loop -->|"circuit breaker trips,<br/>order fills,<br/>stop-loss triggers"| notifications

Analytical Branch — Lake Publisher

The lake publisher runs as a separate worker, consuming from its own queue and writing partitioned Parquet fact tables to MinIO for analytical queries. Some services (broker adapter, recommendation worker) also publish facts directly to MinIO inline, bypassing the queue.

flowchart LR
    %% ── Lake Publish Queue ────────────────────────────────────────
    q_lake[["stonks:queue:lake_publish"]]

    various(("Upstream Services<br/><i>via enqueue_lake_job()</i>"))
    various -->|"rpush job<br/>(job_type + entity_id)"| q_lake

    %% ── Lake Publisher Worker ─────────────────────────────────────
    lake["<b>Lake Publisher</b><br/><i>services.lake_publisher.jobs</i><br/>Transforms operational data<br/>into analytical facts<br/><i>15 job types supported</i>"]

    q_lake -->|"lpop"| lake

    pg_source[("PostgreSQL<br/><i>Operational Tables</i><br/>documents, extractions,<br/>orders, positions, events,<br/>global_events, macro_impacts,<br/>competitive_signals")]
    lake -->|"query source data"| pg_source

    %% ── MinIO Parquet ─────────────────────────────────────────────
    minio_lake[("MinIO<br/><i>Lakehouse Bucket</i><br/>Partitioned Parquet<br/>/year=/month=/day=")]
    lake -->|"write Parquet files"| minio_lake

    %% ── Inline Publishers ─────────────────────────────────────────
    inline(("Inline Publishers<br/><i>broker adapter,<br/>recommendation worker</i>"))
    inline -->|"publish_* functions<br/>(direct Parquet write)"| minio_lake

    %% ── Trino ─────────────────────────────────────────────────────
    trino["<b>Trino</b><br/><i>SQL Query Engine</i><br/>Hive connector → MinIO"]
    minio_lake -->|"read via<br/>Hive Metastore"| trino

    hive["<b>Hive Metastore</b><br/><i>Schema catalog</i>"]
    trino <-->|"table metadata"| hive
    hive -->|"location refs"| minio_lake

    %% ── Visualization ─────────────────────────────────────────────
    superset["<b>Superset</b><br/><i>Dashboards &amp;<br/>SQL Lab</i>"]
    dashboard["<b>React Dashboard</b><br/><i>frontend</i><br/>Charts, portfolio,<br/>recommendations"]
    query_api["<b>Query API</b><br/><i>services.api.app</i>"]

    trino --> superset
    trino --> query_api
    query_api --> dashboard

Report Generation

The scheduler manages report generation as a sub-loop, enqueuing daily and weekly report jobs to a dedicated queue and consuming them inline.

flowchart LR
    scheduler["<b>Scheduler</b><br/><i>report schedule check</i><br/>daily @ 16:30 ET<br/>weekly @ Saturday"]

    q_report[["stonks:queue:report_generation"]]
    scheduler -->|"rpush<br/>(daily/weekly)"| q_report

    scheduler_consumer["<b>Scheduler</b><br/><i>report consumer loop</i><br/>pops up to 5 jobs/cycle"]
    q_report -->|"lpop"| scheduler_consumer

    generator["<b>Report Generator</b><br/><i>services.reporting.generator</i>"]
    scheduler_consumer -->|"process_report_job()"| generator

    pg_reports[("PostgreSQL<br/><i>trading_reports</i>")]
    generator -->|"persist report"| pg_reports

Complete Queue Topology

Queue Full Key Producer(s) Consumer
Ingestion stonks:queue:ingestion Scheduler (company, macro, global market sources) Ingestion Worker
Parsing stonks:queue:parsing Ingestion Worker (news, filings, web_scrape, macro) Parser Worker
Extraction stonks:queue:extraction Parser (standard docs), Scheduler (stale recovery) Extractor Worker
Macro Classification stonks:queue:macro_classification Parser (macro_event docs), Scheduler (stale/failed recovery) Extractor Worker
Aggregation stonks:queue:aggregation Extractor Worker (per ticker), Scheduler (periodic, all tickers) Aggregation Worker
Recommendation stonks:queue:recommendation Aggregation Worker (ticker + window, 5 min dedup TTL) Recommendation Worker
Broker Orders stonks:queue:broker_orders Trading Engine (act decisions), Trading API (manual overrides) Broker Adapter
Lake Publish stonks:queue:lake_publish Various services (via enqueue_lake_job()) Lake Publisher
Report Generation stonks:queue:report_generation Scheduler (daily/weekly triggers) Scheduler (inline consumer)

Dead-letter queues follow the pattern stonks:dlq:<queue_name> and are populated when a job exhausts its retry budget.

Data Store Summary

Store Role Key Tables / Buckets
PostgreSQL Structured operational data documents, document_intelligence, document_impact_records, document_company_mentions, global_events, macro_impact_records, exposure_profiles, competitive_signal_records, competitor_relationships, trend_windows, trend_history, trend_projections, recommendations, recommendation_evidence, risk_evaluations, orders, order_events, positions, portfolio_snapshots, trading_decisions, circuit_breaker_events, reserve_pool_ledger, risk_tier_history, broker_accounts, ingestion_runs, sources, companies, company_aliases, ai_agents, agent_variants, agent_performance_log, risk_configs, trading_reports
Redis Queues, dedup markers, rate limits, circuit breaker state, pipeline toggle stonks:queue:* (9 queues), stonks:dedupe:*, stonks:dedupe:trading:*, stonks:ratelimit:*, stonks:trading:circuit_breaker:*, stonks:trading:notification_rate:*, stonks:order_idempotency:*, stonks:lock:*, stonks:cache:*, stonks:retry:*, stonks:rec_dedup:*, stonks:pipeline:enabled, stonks:dlq:*
MinIO Object storage for raw artifacts, normalized text, and analytical Parquet files Raw artifacts bucket, normalized text bucket, parser output bucket, lakehouse bucket (partitioned Parquet: documents, extractions, market bars/quotes, orders, fills, positions, PnL, global events, macro impacts, trend projections, competitive signals, competitor relationships, recommendations)

External Integration Points

Integration Service Protocol Purpose
Polygon.io Ingestion (via PolygonNewsAdapter, PolygonMarketAdapter) HTTPS REST News articles, market bars, grouped daily data, intraday bars
SEC EDGAR Ingestion (via SECEdgarAdapter) HTTPS REST 10-K, 10-Q filings
Macro News Ingestion (via MacroNewsAdapter) HTTPS REST Geopolitical and economic event articles
Ollama / vLLM Extractor, Recommendation HTTP /api/generate LLM inference for document extraction (document-extractor agent), event classification (event-classifier agent), thesis rewriting (thesis-rewriter agent). Model and variant selected via AgentConfigResolver with 60s TTL cache.
Alpaca Broker Adapter HTTPS REST Paper/live trading: order submission, position sync, account state, order status polling
AWS SNS Trading Engine (notifications) boto3 SDK SMS alerts for circuit breaker trips, order fills, stop-loss triggers
Gmail Trading Engine (notifications) SMTP (port 587 STARTTLS) Email alerts for trading events
Trino Query API, Superset HTTP SQL queries over lakehouse Parquet files via Hive Metastore

Pipeline Toggle

The pipeline can be paused globally via the Redis key stonks:pipeline:enabled. When set to "0", all queue workers (ingestion, parser, extractor, aggregation, recommendation, broker adapter, lake publisher) enter a sleep loop and stop processing jobs. The scheduler also skips scheduling cycles when the toggle is off. The toggle can be set via the Query API's pipeline control endpoints.

Setting PIPELINE_DEFAULT_OFF=true on the scheduler initializes the toggle to OFF on first boot, useful for staged deployments where you want to verify infrastructure before enabling the pipeline.