Files
stonks-oracle/docs/services.md
T
Celes Renata 88ad1e8d99 feat: comprehensive docs, unit tests, docker-compose app services
- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py)
- Add all 13 app services + dashboard to docker-compose.yml
- Add full documentation suite: API reference, Helm reference, Docker deployment guide,
  3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide,
  backup/restore guide, observability/metrics reference, per-service docs
- Add intelligence pipeline deep-dive docs with Mermaid diagrams
- Update README with documentation index and links
- Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive,
  sanitized-pipeline-docs
2026-04-22 02:56:41 +00:00

45 KiB
Raw Permalink Blame History

Service Documentation

Stonks Oracle is composed of 13 services that form an end-to-end AI market intelligence and paper-trading pipeline. This document describes each service's purpose, entry point, configuration, database tables, Redis queue interactions, and key features.

For services that expose HTTP endpoints, see the API Reference.


Table of Contents

  1. Queue Topology
  2. Scheduler
  3. Symbol Registry
  4. Ingestion
  5. Parser
  6. Extractor
  7. Aggregation
  8. Recommendation
  9. Trading Engine
  10. Risk Engine
  11. Broker Adapter
  12. Lake Publisher
  13. Query API
  14. Dashboard
  15. Signal Layers
  16. Trading Engine Features

Queue Topology

All queues use the stonks:queue:<name> key pattern (configurable via DEPLOY_STAGE prefix). Dead-letter queues follow the pattern stonks:dlq:<name>.

Queue Name Full Redis Key Producer(s) Consumer
ingestion stonks:queue:ingestion Scheduler Ingestion
parsing stonks:queue:parsing Ingestion Parser
extraction stonks:queue:extraction Parser, Scheduler (recovery) Extractor
macro_classification stonks:queue:macro_classification Parser, Scheduler (recovery) Extractor
aggregation stonks:queue:aggregation Extractor Aggregation
recommendation stonks:queue:recommendation Aggregation Recommendation
broker_orders stonks:queue:broker_orders Trading Engine, Trading API Broker Adapter
lake_publish stonks:queue:lake_publish Various services Lake Publisher

Queue Message Schemas

Ingestion Job (stonks:queue:ingestion):

{
  "source_id": "uuid",
  "company_id": "uuid | null",
  "ticker": "AAPL",
  "legal_name": "Apple Inc.",
  "aliases": ["Apple", "AAPL"],
  "source_type": "news_api",
  "source_name": "Polygon News",
  "config": {},
  "credibility_score": 0.5,
  "scheduled_at": "2025-01-01T00:00:00+00:00"
}

Parsing Job (stonks:queue:parsing):

{
  "document_id": "uuid",
  "ticker": "AAPL",
  "source_type": "news_api"
}

Extraction Job (stonks:queue:extraction):

{
  "document_id": "uuid",
  "ticker": "AAPL",
  "normalized_text": "Article text content..."
}

Macro Classification Job (stonks:queue:macro_classification):

{
  "document_id": "uuid",
  "ticker": "",
  "normalized_text": "Global event text..."
}

Aggregation Job (stonks:queue:aggregation):

{
  "ticker": "AAPL",
  "macro_event_id": "uuid (optional)"
}

Recommendation Job (stonks:queue:recommendation):

{
  "ticker": "AAPL",
  "window": "7d"
}

Broker Order Job (stonks:queue:broker_orders):

{
  "ticker": "AAPL",
  "side": "buy",
  "quantity": 10.0,
  "order_type": "market",
  "limit_price": null,
  "stop_price": null,
  "recommendation_id": "uuid",
  "confidence": 0.75,
  "estimated_value": 1500.0,
  "sector": "Technology",
  "source": "trading_engine",
  "idempotency_key": "optional-explicit-key"
}

Lake Publish Job (stonks:queue:lake_publish):

{
  "job_type": "document | document_extraction | market_snapshot | trade_order | trade_fill | positions_snapshot | pnl_snapshot | global_event | macro_impact | trend_projection | competitor_relationship | competitive_signal | bulk_documents | bulk_extractions",
  "entity_id": "uuid or ticker",
  "dt": "2025-01-01T00:00:00+00:00",
  "since": "2025-01-01T00:00:00+00:00 (for bulk jobs)"
}

1. Scheduler

Purpose: Triggers ingestion cycles for tracked companies and sources on a configurable cadence. Polls the symbol registry for active companies and their configured sources, respects per-source polling intervals and backoff windows, coordinates rate limits across source types, and enqueues ingestion jobs for downstream workers. Also runs periodic maintenance: stale document recovery, failed extraction retries, and data retention cleanup.

Entry Point: services.scheduler.app

Tier: Orchestration (no HTTP endpoints)

Configuration

Environment Variable Default Description
POSTGRES_HOST localhost PostgreSQL host
POSTGRES_PORT 5432 PostgreSQL port
POSTGRES_DB stonks Database name
POSTGRES_USER stonks Database user
POSTGRES_PASSWORD stonks_dev Database password
REDIS_HOST localhost Redis host
REDIS_PORT 6379 Redis port
REDIS_PASSWORD (none) Redis password
LOG_LEVEL INFO Logging level
JSON_LOGS true Structured JSON logging
PIPELINE_DEFAULT_OFF (none) If true, initializes the pipeline toggle to OFF on first boot

Database Tables

Table Access Purpose
sources Read Active sources with polling config
companies Read Active companies and tickers
company_aliases Read Aliases for entity matching
ingestion_runs Read Last run status, timing, retry state
documents Read/Write Stale document recovery, status reset
document_intelligence Write (delete) Clear failed extractions for retry
competitive_signal_records Write (delete) Retention cleanup
trading_decisions Write (delete) Retention cleanup
risk_evaluations Write (delete) Retention cleanup
audit_events Write (delete) Retention cleanup
macro_impact_records Write (delete) Retention cleanup
recommendation_evidence Write (delete) Retention cleanup
recommendations Write (delete) Retention cleanup
order_events Write (delete) Retention cleanup
model_performance_metrics Write (delete) Retention cleanup

Redis Queues

Direction Queue Purpose
Publish stonks:queue:ingestion Enqueue ingestion jobs for due sources
Read stonks:pipeline:enabled Pipeline toggle (skip cycle if "0")
Read/Write stonks:lock:scheduler_cycle Distributed lock for single-writer
Read/Write stonks:ratelimit:* Per-source-type and global Polygon rate limits
Read/Write stonks:queue:enqueued:* Dedup markers for recovery re-enqueue

Key Behaviors

  • Polling cadences: Default intervals per source type — market_api: 300s, news_api: 300s, filings_api: 3600s, web_scrape: 1800s, broker: 30s, macro_news: 600s. Overridable per-source via config.polling_interval_seconds.
  • Rate limiting: Per-type limits (e.g., market_api: 20/min) plus a global Polygon limit of 45/min across market_api + news_api.
  • Backoff: Exponential backoff on failures (base 60s, max 3600s, max 10 retries).
  • Stale document recovery: Every ~5 minutes, re-enqueues documents stuck in parsed status for >240 minutes.
  • Failed extraction retry: Every ~10 minutes, re-enqueues extraction_failed documents older than 60 minutes.
  • Data retention cleanup: Every ~25 minutes, deletes old rows from 10 tables with configurable retention windows (1490 days).

2. Symbol Registry

Purpose: Manages the tracked universe of companies, their aliases, watchlists, data sources, exposure profiles, and competitor relationships. Provides a CRUD API for the symbol registry used by all other services.

Entry Point: services.symbol_registry.app (FastAPI)

Tier: API (HTTP endpoints, see API Reference)

Configuration

Environment Variable Default Description
POSTGRES_HOST localhost PostgreSQL host
POSTGRES_PORT 5432 PostgreSQL port
POSTGRES_DB stonks Database name
POSTGRES_USER stonks Database user
POSTGRES_PASSWORD stonks_dev Database password
LOG_LEVEL INFO Logging level
JSON_LOGS true Structured JSON logging

Database Tables

Table Access Purpose
companies CRUD Tracked companies (ticker, legal name, sector, industry)
company_aliases CRUD Alternative names for entity matching
watchlists CRUD Named groups of companies
watchlist_members CRUD Company membership in watchlists
sources CRUD Data source configurations per company
exposure_profiles CRUD Geographic/commodity exposure for macro interpolation
competitor_relationships CRUD Competitor pairs with relationship type and strength

Redis Queues

None — this service is purely HTTP-driven.


3. Ingestion

Purpose: Fetches raw data from external sources (Polygon market data, Polygon news, SEC EDGAR filings, web scraping, Alpaca broker, macro news). Stores raw payloads in MinIO, deduplicates content, persists document metadata to PostgreSQL, and enqueues new documents for parsing.

Entry Point: services.ingestion.worker

Tier: Pipeline (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_ENDPOINT localhost:9000 MinIO endpoint
MINIO_ACCESS_KEY minioadmin MinIO access key
MINIO_SECRET_KEY minioadmin MinIO secret key
MARKET_DATA_API_KEY (empty) Polygon.io API key
MARKET_DATA_BASE_URL https://api.polygon.io Polygon base URL
BROKER_API_KEY (none) Alpaca API key
BROKER_API_SECRET (none) Alpaca API secret
BROKER_BASE_URL (none) Alpaca base URL
BROKER_MODE paper Broker mode (paper or live)
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
ingestion_runs Write Record each ingestion attempt with status
documents Write Persist document metadata
document_company_mentions Write Link documents to mentioned companies
sources Write Update last_published_at in source config

Redis Queues

Direction Queue Purpose
Consume stonks:queue:ingestion Receive ingestion jobs from scheduler
Publish stonks:queue:parsing Enqueue parsed documents for parsing
Read/Write stonks:dedupe:* Content hash deduplication markers (24h TTL)

MinIO Buckets

  • stonks-raw-market — Raw market data JSON
  • stonks-raw-news — Raw news article JSON
  • stonks-raw-filings — Raw SEC filing data
  • stonks-normalized — Normalized text (written by parser)

Adapters

Source Type Adapter Class External API
market_api PolygonMarketAdapter Polygon.io
news_api PolygonNewsAdapter Polygon.io
filings_api SECEdgarAdapter SEC EDGAR
web_scrape WebScrapeAdapter Direct HTTP
broker AlpacaBrokerAdapter Alpaca
macro_news MacroNewsAdapter Polygon.io

4. Parser

Purpose: Converts raw HTML/text into normalized, quality-scored documents. Uses BeautifulSoup for HTML parsing, extracts metadata (title, author, publisher, canonical URL), detects company mentions via alias matching, computes quality scores, and routes documents to the appropriate extraction queue.

Entry Point: services.parser.worker

Tier: Pipeline (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
documents Read/Write Fetch URL, update parse results and status
company_aliases Read Alias lookup for company mention detection
companies Read Ticker and legal name for mention detection
document_company_mentions Write Persist detected company mentions

Redis Queues

Direction Queue Purpose
Consume stonks:queue:parsing Receive documents from ingestion
Publish stonks:queue:extraction Enqueue standard documents for LLM extraction
Publish stonks:queue:macro_classification Route macro_event documents to event classifier

MinIO Buckets

  • stonks-normalized — Normalized text output
  • stonks-audit — Structured parser output JSON (metadata, quality signals, mentions)

Key Behaviors

  • Fetches article HTML via httpx if a URL is available
  • Enriches short articles (<500 chars) with Polygon description from raw payload
  • Quality scoring with confidence levels (high, medium, low)
  • Low-quality documents (confidence = "low") are marked but not sent for extraction
  • Routes macro_event documents to the macro classification queue instead of standard extraction

5. Extractor

Purpose: Performs LLM-based intelligence extraction from documents using Ollama. Handles two pipelines: (1) standard document extraction producing DocumentIntelligence with per-company impact records, and (2) macro event classification producing GlobalEventSchema with company-level macro impact interpolation. Supports AI agent configuration with variant-based A/B testing.

Entry Point: services.extractor.main

Tier: Pipeline (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection
OLLAMA_BASE_URL http://localhost:11434 Ollama API endpoint
OLLAMA_MODEL qwen3.5:9b Default LLM model
OLLAMA_TIMEOUT 120 Request timeout (seconds)
OLLAMA_MAX_RETRIES 2 Max retry attempts
MACRO_CONFIDENCE_THRESHOLD 0.4 Minimum confidence for macro event inclusion
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
documents Read/Write Fetch document type, normalized text ref; update status
document_intelligence Write Persist extraction results
document_impact_records Write Per-company impact scores
companies Read Ticker-to-company-id mapping
global_events Write Persist classified macro events
macro_impact_records Write Per-company macro impact scores
exposure_profiles Read Company exposure profiles for interpolation
ai_agents Read Agent configuration (model, prompts)
agent_variants Read Active variant overrides for A/B testing
agent_performance_log Write Performance metrics per extraction
model_performance_metrics Write Extraction quality metrics

Redis Queues

Direction Queue Purpose
Consume stonks:queue:extraction Standard document extraction jobs
Consume stonks:queue:macro_classification Macro event classification jobs
Publish stonks:queue:aggregation Trigger aggregation after extraction

Key Behaviors

  • Alternates between macro and extraction queues (1 macro per 3 jobs) to prevent starvation
  • Resolves agent configuration from DB with 60-second TTL cache (AgentConfigResolver)
  • Supports separate models for document extraction and event classification
  • Token budget enforcement per variant (hourly limit)
  • Input token limit truncation (configurable per variant)
  • Refreshes company map and agent config every 100 jobs
  • Consecutive macro classification failures trigger operator alerts (threshold: 3)

6. Aggregation

Purpose: Computes rolling-window trend summaries for each company by merging signals from three layers: company-specific document intelligence, macro impact records, and competitive signal propagation. Produces TrendSummary objects with direction, strength, confidence, evidence rankings, and contradiction analysis.

Entry Point: services.aggregation.main

Tier: Pipeline (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MACRO_SIGNAL_WEIGHT 0.3 Relative weight of macro signals
MACRO_ENABLED true Enable/disable macro signal layer
COMPETITIVE_SIGNAL_WEIGHT 0.2 Relative weight of competitive signals
COMPETITIVE_ENABLED true Enable/disable competitive signal layer
COMPETITIVE_PATTERN_CONFIDENCE_THRESHOLD 0.3 Minimum pattern confidence
COMPETITIVE_PROPAGATION_STRENGTH_THRESHOLD 0.2 Minimum propagation strength
COMPETITIVE_PROPAGATION_FAILURE_THRESHOLD 5 Consecutive failures before alert
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
document_impact_records Read Company-specific signals from extraction
document_intelligence Read Extraction metadata and confidence
documents Read Document publication dates and status
macro_impact_records Read Macro impact scores per company
global_events Read Source document for macro events
competitive_signal_records Read/Write Competitive signals targeting a ticker
competitor_relationships Read Competitor pairs for signal propagation
trend_windows Write (upsert) Current trend summaries per ticker/window
trend_history Write Time-series snapshots for charting
trend_evidence Write Detailed evidence rankings per trend
trend_projections Write Forward-looking trend projections
risk_configs Read Runtime toggle state for macro/competitive layers
market_snapshots Read Market context (price, volume, volatility)

Redis Queues

Direction Queue Purpose
Consume stonks:queue:aggregation Receive aggregation jobs from extractor
Publish stonks:queue:recommendation Enqueue recommendation jobs per ticker/window
Read/Write stonks:rec_dedup:* Dedup markers for recommendation queue (5-min TTL)

Key Behaviors

  • Computes trend summaries across 5 windows: intraday, 1d, 7d, 30d, 90d
  • Merges three signal layers via the WeightedSignal abstraction (see Signal Layers)
  • Triggers competitive signal propagation after aggregation when the competitive layer is enabled
  • Reads toggle state from risk_configs table on each cycle (no restart needed)
  • Contradiction detection identifies disagreements between signals
  • Evidence ranking uses composite scoring (weight, impact, recency, confidence)

7. Recommendation

Purpose: Generates actionable trading recommendations from trend summaries. Builds a thesis with supporting evidence, classifies risk level, determines action (buy/sell/hold/watch) and mode (informational/paper_eligible/live_eligible), and optionally rewrites the thesis using an LLM.

Entry Point: services.recommendation.main

Tier: Pipeline (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection
OLLAMA_BASE_URL http://localhost:11434 Ollama API (for thesis rewriting)
OLLAMA_MODEL qwen3.5:9b Default model
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
trend_windows Read Latest trend summaries per ticker/window
trend_projections Read Forward-looking projections
recommendations Write Persist generated recommendations
recommendation_evidence Write Link recommendations to source documents
ai_agents Read Thesis rewriter agent config
agent_variants Read Active variant for thesis rewriter

Redis Queues

Direction Queue Purpose
Consume stonks:queue:recommendation Receive recommendation jobs from aggregation

Key Behaviors

  • Resolves thesis-rewriter agent config from DB (60-second TTL cache)
  • Refreshes agent config every 50 jobs
  • Deduplicates recommendations to avoid generating identical entries
  • Classifies recommendations into risk tiers based on confidence and signal strength
  • Mode classification: informational (low confidence), paper_eligible (medium), live_eligible (high)
  • Pattern-only and macro-only trend shifts are forced to informational mode (suppression safety)

8. Trading Engine

Purpose: Autonomous trading engine that polls for new recommendations, evaluates position sizing, manages circuit breakers and reserve pools, auto-adjusts risk tiers, runs backtests, and sends notifications. Exposes an HTTP API for engine control, decision audit, performance metrics, backtesting, and manual override orders.

Entry Point: services.trading.app (FastAPI)

Tier: Trading (HTTP endpoints, see API Reference)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
TRADING_ENABLED false Enable autonomous trading
TRADING_RISK_TIER moderate Risk tier (conservative, moderate, aggressive)
TRADING_RESERVE_SIPHON_PCT 0.20 Percentage of profits siphoned to reserve pool
TRADING_POLLING_INTERVAL_SECONDS 60 Recommendation polling interval
TRADING_STOP_LOSS_CHECK_INTERVAL_SECONDS 300 Stop-loss monitoring interval
TRADING_FAST_STOP_LOSS_INTERVAL_SECONDS 60 Fast stop-loss check interval
TRADING_GRADUAL_ENTRY_TRANCHES 3 Number of tranches for gradual entry
TRADING_GRADUAL_ENTRY_THRESHOLD_DOLLARS 30.0 Minimum order value for gradual entry
TRADING_ABSOLUTE_POSITION_CAP 50.0 Maximum position size (dollars)
TRADING_ACTIVE_POOL_MINIMUM 100.0 Minimum active pool balance
TRADING_EMERGENCY_DRAWDOWN_THRESHOLD_PCT 0.40 Emergency drawdown threshold
TRADING_RESERVE_HIGH_WATER_PCT 0.30 Reserve pool high-water mark
TRADING_MICRO_TRADING_ENABLED false Enable micro-trading mode
TRADING_MICRO_TRADING_INTERVAL_SECONDS 300 Micro-trading polling interval
TRADING_MICRO_TRADING_ALLOCATION_CAP_PCT 0.03 Max allocation per micro-trade
TRADING_MICRO_TRADING_MAX_DAILY 10 Max micro-trades per day
TRADING_MICRO_TRADING_MAX_HOLD_MINUTES 120 Max hold time for micro-trades
TRADING_MAX_OPEN_POSITIONS 10 Maximum concurrent open positions
TRADING_SNS_TOPIC_ARN (empty) AWS SNS topic ARN for SMS notifications
TRADING_SNS_PHONE_NUMBER (empty) Phone number for SMS notifications
TRADING_GMAIL_SENDER (empty) Gmail sender address
TRADING_GMAIL_RECIPIENT (empty) Gmail recipient address
BROKER_API_KEY (none) Alpaca API key
BROKER_API_SECRET (none) Alpaca API secret
BROKER_BASE_URL (none) Alpaca base URL
SYMBOL_REGISTRY_URL http://symbol-registry:8000 Symbol registry service URL
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
recommendations Read Poll for new actionable recommendations
trading_decisions Write Persist every decision (act/skip) with full trace
portfolio_snapshots Read/Write Daily portfolio performance snapshots
reserve_pool_ledger Read/Write Reserve pool transactions
risk_tier_history Read/Write Risk tier change audit trail
circuit_breaker_events Read/Write Circuit breaker trigger/reset events
positions Read Current open positions
position_stop_levels Read/Write Stop-loss and take-profit levels
orders Read Order history for dedup
backtest_runs Read/Write Backtest configuration and results
backtest_trades Read/Write Individual trades within a backtest
notifications Write Notification history

Redis Queues

Direction Queue Purpose
Publish stonks:queue:broker_orders Submit orders to broker adapter
Read/Write stonks:dedupe:trading:* Recommendation dedup markers
Read/Write stonks:trading:circuit_breaker:* Circuit breaker state
Read/Write stonks:trading:notification_rate:* Notification rate limiting

See Trading Engine Features for detailed feature documentation.


9. Risk Engine

Purpose: Evaluates proposed orders against portfolio risk rules and manages an operator approval workflow. Provides an HTTP API for order evaluation, pending approval listing, approval review, and expiration of stale approvals.

Entry Point: services.risk.app (FastAPI)

Tier: Trading (HTTP endpoints, see API Reference)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
risk_configs Read Active risk configuration
risk_evaluations Write Persist evaluation results
approval_requests Read/Write Operator approval workflow
daily_risk_snapshots Read Daily portfolio risk state

Redis Queues

None — called synchronously by the broker adapter and via HTTP.

Key Behaviors

  • Evaluates orders against configurable risk checks (position limits, sector concentration, daily loss limits, portfolio heat)
  • Returns eligible / not eligible with detailed check results and rejection reasons
  • Approval workflow: orders requiring approval are held until an operator reviews them
  • Stale approvals can be expired via the /approvals/expire endpoint

10. Broker Adapter

Purpose: Processes order requests from the broker queue, evaluates them through the risk engine, submits to Alpaca's paper trading API, and persists the full audit trail. Implements idempotent order submission with Redis fast-path and PostgreSQL durable fallback duplicate detection. Periodically syncs positions and order statuses from Alpaca.

Entry Point: services.adapters.broker_service

Tier: Trading (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection (for lake publishing)
BROKER_API_KEY (none) Alpaca API key
BROKER_API_SECRET (none) Alpaca API secret
BROKER_BASE_URL (none) Alpaca base URL
BROKER_MODE paper Trading mode (paper or live)
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
orders Write Persist submitted/rejected/filled orders
order_events Write Order lifecycle events (submitted, fill, rejected)
risk_evaluations Write Risk evaluation results per order
positions Write (upsert) Sync positions from Alpaca
broker_accounts Write (upsert) Register/update broker account
daily_risk_snapshots Read Daily portfolio state for risk evaluation
risk_configs Read Active risk configuration
approval_requests Write Create approval requests for gated orders
audit_events Write Full audit trail

Redis Queues

Direction Queue Purpose
Consume stonks:queue:broker_orders Receive order jobs from trading engine
Read/Write stonks:order_idempotency:* Idempotency markers (24h TTL)

Key Behaviors

  • Idempotent submission: Deterministic key generation from job attributes; Redis fast-path + PostgreSQL durable fallback
  • Risk evaluation: Every order is evaluated through the risk engine before submission
  • Approval gate: Orders requiring operator approval are held (not submitted) until reviewed
  • Position sync: Every 60 seconds, syncs positions and order statuses from Alpaca
  • Lake publishing: Publishes order facts, fill facts, and position snapshots to the analytical lake
  • Audit trail: Records every step (risk evaluation, submission, fill, rejection, duplicate prevention)

11. Lake Publisher

Purpose: Transforms operational data from PostgreSQL into analytical Parquet fact tables stored in MinIO. Supports 15 job types covering documents, extractions, market data, orders, fills, positions, PnL, global events, macro impacts, trend projections, competitor relationships, and competitive signals. Data is queryable via Trino and visualized in Superset and the React dashboard.

Entry Point: services.lake_publisher.jobs

Tier: Analytics (queue-driven worker)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection
LOG_LEVEL INFO Logging level

Database Tables

Table Access Purpose
documents Read Document metadata for fact publishing
document_intelligence Read Extraction results
document_impact_records Read Per-company impact scores
market_snapshots Read Market bar/quote data
orders Read Trade orders
order_events Read Fill events
positions Read Current positions
broker_accounts Read Broker account metadata
global_events Read Macro event classifications
macro_impact_records Read Macro impact scores
trend_projections Read Trend projections
competitor_relationships Read Competitor pairs
competitive_signal_records Read Competitive signals
companies Read Company names for enrichment

Redis Queues

Direction Queue Purpose
Consume stonks:queue:lake_publish Receive lake publish jobs

MinIO Buckets

  • stonks-lakehouse — Partitioned Parquet fact tables

Supported Job Types

document, document_extraction, market_snapshot, trade_order, trade_fill, positions_snapshot, pnl_snapshot, global_event, macro_impact, trend_projection, competitor_relationship, competitive_signal, bulk_documents, bulk_extractions


12. Query API

Purpose: Read-only FastAPI service for analytics, evidence drill-down, and admin controls. Serves the React dashboard and external integrations with endpoints for companies, documents, trends, recommendations, orders, positions, portfolio metrics, global events, macro impacts, competitive signals, trend projections, AI agents, dead-letter queues, pipeline control, SQL explorer, saved queries, audit trail, DevOps metrics, and Prometheus metrics.

Entry Point: services.api.app (FastAPI)

Tier: API (HTTP endpoints, see API Reference)

Configuration

Environment Variable Default Description
POSTGRES_* (see shared) PostgreSQL connection
REDIS_* (see shared) Redis connection
MINIO_* (see shared) MinIO connection
TRINO_HOST localhost Trino host
TRINO_PORT 8080 Trino port
TRINO_CATALOG lakehouse Trino catalog
TRINO_SCHEMA stonks Trino schema
LOG_LEVEL INFO Logging level

Database Tables

The Query API reads from nearly all tables in the database, including:

Table Purpose
companies, company_aliases Company listings and details
sources Source configurations
documents, document_company_mentions Document timelines
document_intelligence, document_impact_records Intelligence extraction results
trend_windows, trend_history, trend_projections Trend summaries and projections
recommendations, recommendation_evidence Recommendation history with evidence
risk_evaluations Risk evaluation results
orders, order_events Order history and lifecycle
positions, portfolio_snapshots Portfolio state
global_events, macro_impact_records Macro event data
competitive_signal_records, competitor_relationships Competitive intelligence
ai_agents, agent_variants, agent_performance_log AI agent management
audit_events Audit trail
market_snapshots Market price data
watchlists, watchlist_members Watchlist data

Redis Queues

Direction Queue Purpose
Read/Write stonks:pipeline:enabled Pipeline toggle control
Read stonks:queue:* Queue depth monitoring for DLQ and DevOps metrics
Read stonks:dlq:* Dead-letter queue inspection and replay

Key Behaviors

  • Exposes /metrics endpoint for Prometheus scraping
  • Trace context propagation via x-trace-id header middleware
  • SQL explorer endpoint for ad-hoc Trino queries
  • Dead-letter queue management (list, inspect, replay)
  • Pipeline control (enable/disable via Redis toggle)
  • Saved queries with CRUD operations

13. Dashboard

Purpose: React frontend serving the web dashboard via nginx. Provides a visual interface for monitoring the platform: company overviews, document timelines, trend charts, recommendation cards, trading performance, portfolio state, and administrative controls.

Entry Point: frontend/Dockerfile (nginx on port 8080)

Tier: Frontend

Configuration

The dashboard is a static React build served by nginx. It proxies API requests:

Proxy Path Backend Service
/api/ Query API (port 8000)
/registry/ Symbol Registry (port 8000)
/risk/ Risk Engine (port 8000)

Technology Stack

  • React 19, TypeScript (strict mode)
  • Tailwind CSS for styling
  • TanStack Router and TanStack Query for routing and data fetching
  • Recharts for data visualization
  • MSW (Mock Service Worker) for testing

Signal Layers

The aggregation engine merges three independent signal layers into a unified WeightedSignal abstraction. Each layer can be toggled at runtime via the risk_configs table (no service restart required).

Layer 1: Company-Specific Signals

Data flow: Documents → Ingestion → Parsing → Extraction → document_impact_recordsWeightedSignaltrend_windows

  • Source: document_impact_records joined with document_intelligence and documents
  • Signals include: sentiment, impact score, catalyst type, confidence, novelty, source credibility
  • Weight computation factors: recency decay, source credibility, novelty score, extraction confidence
  • Always active (no toggle — this is the base layer)

Layer 2: Macro Signals

Data flow: Macro news → Ingestion → Parsing → Event Classification (global_events) → Macro Interpolation (macro_impact_records) → WeightedSignaltrend_windows

  • Source: macro_impact_records joined with global_events
  • Each global event is classified by severity, affected regions/sectors/commodities, and estimated duration
  • Impact interpolation uses company exposure profiles (exposure_profiles) to compute per-company macro impact scores
  • Signals are weighted by MACRO_SIGNAL_WEIGHT (default: 0.3)

Toggle: macro_enabled in risk_configs table (default: true)

  • When disabled: ingestion and classification continue (data preserved), but macro signals are excluded from aggregation
  • When re-enabled: resumes using the most recent events, including those classified while disabled

Weight configuration:

Environment Variable Default Description
MACRO_SIGNAL_WEIGHT 0.3 Relative weight of macro vs company signals
MACRO_ENABLED true Runtime toggle (also overridable via risk_configs DB table)
MACRO_CONFIDENCE_THRESHOLD 0.4 Minimum confidence for event inclusion
MACRO_SHORT_TERM_STALENESS_HOURS 48 Accelerated decay for short-term events
PROJECTION_CONFIDENCE_THRESHOLD 0.3 Minimum confidence for projections

Layer 3: Competitive Signals

Data flow: Document intelligence → Pattern Matcher → Signal Propagation (competitive_signal_records) → WeightedSignaltrend_windows

  • Source: competitive_signal_records targeting a ticker
  • After aggregation completes for a company, the system propagates signals to competitors via competitor_relationships
  • Historical pattern matching (find_self_patterns, find_cross_company_patterns) informs propagation confidence
  • Signals are weighted by COMPETITIVE_SIGNAL_WEIGHT (default: 0.2)

Toggle: competitive_enabled in risk_configs table (default: true)

  • When disabled: signal propagation is skipped; aggregation uses only company + macro signals
  • When re-enabled: resumes propagation using existing competitor relationships

Weight configuration:

Environment Variable Default Description
COMPETITIVE_SIGNAL_WEIGHT 0.2 Relative weight of competitive signals
COMPETITIVE_ENABLED true Runtime toggle (also overridable via risk_configs DB table)
COMPETITIVE_PATTERN_CONFIDENCE_THRESHOLD 0.3 Minimum pattern confidence
COMPETITIVE_PROPAGATION_STRENGTH_THRESHOLD 0.2 Minimum propagation strength
COMPETITIVE_ROUTINE_LOOKBACK_DAYS 180 Lookback for routine signal patterns
COMPETITIVE_MAJOR_DECISION_LOOKBACK_DAYS 365 Lookback for major corporate decisions
COMPETITIVE_MAJOR_DECISION_WEIGHT_MULTIPLIER 1.3 Weight boost for major decisions
COMPETITIVE_STALENESS_WINDOW_DAYS 180 Staleness window for pattern decay
COMPETITIVE_STALENESS_RECENT_DAYS 90 Recent window (no decay)
COMPETITIVE_STALENESS_DECAY_PENALTY 0.5 Decay penalty for stale patterns
COMPETITIVE_MIN_PATTERN_SAMPLES 3 Minimum samples for pattern confidence
COMPETITIVE_PROPAGATION_FAILURE_THRESHOLD 5 Consecutive failures before operator alert

Safety Mechanisms

  • Pattern-only suppression: Trend shifts driven solely by competitive patterns are forced to informational mode
  • Macro-only suppression: Trend shifts driven solely by macro signals are forced to informational mode
  • Consecutive failure alerting: Both macro classification and competitive propagation track consecutive failures and emit CRITICAL alerts when thresholds are exceeded
  • Graceful degradation: If a signal layer fails, the system continues with the remaining layers

Trading Engine Features

Position Sizing

The trading engine uses a multi-factor position sizing algorithm:

  • Risk-tier-based allocation: Each risk tier (conservative, moderate, aggressive) defines maximum position size as a percentage of the active pool
  • Gradual entry: Large positions are split into configurable tranches (TRADING_GRADUAL_ENTRY_TRANCHES, default: 3) to reduce timing risk
  • Absolute position cap: Hard limit on any single position (TRADING_ABSOLUTE_POSITION_CAP, default: $50)
  • Active pool minimum: Orders are rejected if the active pool falls below TRADING_ACTIVE_POOL_MINIMUM (default: $100)
  • Max open positions: Configurable limit on concurrent positions (TRADING_MAX_OPEN_POSITIONS, default: 10)

Circuit Breakers

Circuit breakers automatically halt trading when risk thresholds are breached:

Trigger Type Description
daily_loss Triggered when daily portfolio loss exceeds the emergency drawdown threshold (TRADING_EMERGENCY_DRAWDOWN_THRESHOLD_PCT, default: 40%)
single_position Triggered when a single position loss exceeds configured limits
volatility Triggered during extreme market volatility
manual Operator-triggered via the API

Circuit breaker events are persisted to circuit_breaker_events and state is tracked in Redis (stonks:trading:circuit_breaker:*).

Reserve Pool

The reserve pool is a capital buffer that protects against drawdowns:

  • Profit siphoning: A configurable percentage of realized profits (TRADING_RESERVE_SIPHON_PCT, default: 20%) is automatically transferred to the reserve pool
  • High-water mark: When the reserve pool exceeds TRADING_RESERVE_HIGH_WATER_PCT (default: 30%) of total portfolio value, excess is returned to the active pool
  • Emergency liquidation: During severe drawdowns, the reserve pool can be used to cover losses
  • Ledger tracking: All reserve pool transactions are recorded in reserve_pool_ledger with trigger type and notes

Risk Tier Auto-Adjustment

The engine periodically evaluates portfolio performance and adjusts the risk tier:

  • Tiers: conservative (lowest risk), moderate (default), aggressive (highest risk)
  • Evaluation: Based on recent win rate, drawdown, and portfolio heat
  • History: All tier changes are recorded in risk_tier_history for audit
  • Configuration: Tier can be set manually via PUT /api/trading/config or auto-adjusted by the engine

Risk Tier Defaults:

Parameter Conservative Moderate Aggressive
Min Confidence 0.75 0.55 0.40
Max Position % 5% 10% 15%
Stop-Loss ATR Multiplier 1.5 2.0 2.5
Reward/Risk Ratio 2.0 1.5 1.2
Max Sector % 20% 30% 40%
Max Portfolio Heat 10% 20% 30%

Backtesting

The trading engine supports historical backtesting:

  • Launch: POST /api/trading/backtest with start/end dates, initial capital, and risk tier
  • Execution: Runs asynchronously using BacktestReplay against historical recommendation data
  • Results: Stored in backtest_runs (summary metrics) and backtest_trades (individual trades)
  • Metrics: Total return, Sharpe ratio, max drawdown, win rate, profit factor, equity curve
  • Polling: GET /api/trading/backtest/{id} to check status and retrieve results

Notifications

The engine supports two notification channels:

Channel Configuration Rate Limiting
SMS (AWS SNS) TRADING_SNS_TOPIC_ARN, TRADING_SNS_PHONE_NUMBER Redis-based per-channel rate limiting
Email (Gmail) TRADING_GMAIL_SENDER, TRADING_GMAIL_RECIPIENT Redis-based per-channel rate limiting

Notification configuration can be updated at runtime via PUT /api/trading/notifications/config. History is available via GET /api/trading/notifications/history.

Micro-Trading

An optional micro-trading mode for small, frequent trades:

Setting Default Description
TRADING_MICRO_TRADING_ENABLED false Enable micro-trading
TRADING_MICRO_TRADING_INTERVAL_SECONDS 300 Polling interval
TRADING_MICRO_TRADING_ALLOCATION_CAP_PCT 0.03 Max allocation per trade (3%)
TRADING_MICRO_TRADING_MAX_DAILY 10 Max trades per day
TRADING_MICRO_TRADING_MAX_HOLD_MINUTES 120 Max hold time

Engine Control

Endpoint Method Description
/api/trading/status GET Current engine state
/api/trading/config PUT Update configuration
/api/trading/pause POST Pause the engine
/api/trading/resume POST Resume the engine
/api/trading/reset POST Full paper trading reset
/api/trading/debug GET Diagnostic state dump
/api/trading/override/order POST Submit manual override order

Decision Loop

The engine runs five concurrent async tasks:

  1. Decision loop: Polls recommendations, evaluates position sizing, submits orders
  2. Stop-loss monitor: Checks positions against stop-loss and take-profit levels at configurable intervals
  3. Performance loop: Computes daily snapshots, siphons profits to reserve pool
  4. Risk tier scheduler: Periodically evaluates and adjusts risk tier based on performance
  5. Rebalance scheduler: Evaluates portfolio rebalancing needs and computes position correlation matrix

Shared Configuration Reference

All services load configuration from environment variables via services/shared/config.py. The following variables are common across most services:

PostgreSQL

Variable Default Description
POSTGRES_HOST localhost Database host
POSTGRES_PORT 5432 Database port
POSTGRES_DB stonks Database name (auto-derived from DEPLOY_STAGE if empty)
POSTGRES_USER stonks Database user
POSTGRES_PASSWORD stonks_dev Database password

Redis

Variable Default Description
REDIS_HOST localhost Redis host
REDIS_PORT 6379 Redis port
REDIS_DB 0 Redis database number
REDIS_PASSWORD (none) Redis password

MinIO

Variable Default Description
MINIO_ENDPOINT localhost:9000 MinIO endpoint
MINIO_ACCESS_KEY minioadmin Access key
MINIO_SECRET_KEY minioadmin Secret key
MINIO_SECURE false Use HTTPS

Ollama

Variable Default Description
OLLAMA_BASE_URL http://localhost:11434 Ollama API endpoint
OLLAMA_MODEL qwen3.5:9b Default model
OLLAMA_TIMEOUT 120 Request timeout (seconds)
OLLAMA_MAX_RETRIES 2 Max retry attempts

Observability

Variable Default Description
LOG_LEVEL INFO Logging level
JSON_LOGS true Enable structured JSON logging
DEPLOY_STAGE (empty) Stage prefix for Redis keys and MinIO buckets