- Add scheduler and ingestion unit tests (test_scheduler_unit.py, test_ingestion_unit.py) - Add all 13 app services + dashboard to docker-compose.yml - Add full documentation suite: API reference, Helm reference, Docker deployment guide, 3 architecture diagrams (K8s, Docker Compose, data pipeline), AI agent guide, backup/restore guide, observability/metrics reference, per-service docs - Add intelligence pipeline deep-dive docs with Mermaid diagrams - Update README with documentation index and links - Add specs for comprehensive-quality-docs, intelligence-pipeline-deep-dive, sanitized-pipeline-docs
24 KiB
Observability and Metrics Reference
This document covers the full observability stack for Stonks Oracle: Prometheus metrics, operational alerting, structured logging, dead-letter queues, and recommended monitoring queries.
Prometheus Metrics Endpoint
The Query API exposes a /metrics endpoint that returns all registered Prometheus metrics in the standard text exposition format.
Endpoint: GET /metrics on the Query API service (port 8000)
Response: text/plain; version=0.0.4; charset=utf-8 — standard Prometheus scrape format via prometheus_client.generate_latest().
Prometheus Scrape Configuration
Add the following job to your prometheus.yml:
scrape_configs:
- job_name: "stonks-oracle"
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /metrics
static_targets:
- targets:
# Docker Compose
- "query-api:8000"
# Kubernetes
# - "query-api.stonks-oracle.svc.cluster.local:8000"
For Kubernetes deployments, you can also use a ServiceMonitor resource if the Prometheus Operator is installed:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: stonks-oracle
namespace: stonks-oracle
spec:
selector:
matchLabels:
app: query-api
endpoints:
- port: http
path: /metrics
interval: 15s
Prometheus Metrics Reference
All metrics are defined in services/shared/metrics.py. Metric names use the stonks_ prefix.
Service Info
| Metric | Type | Description |
|---|---|---|
stonks_oracle_info |
Info | Service metadata (build version, etc.) |
Ingestion Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_ingestion_jobs_total |
Counter | source_type, status |
Total ingestion jobs processed |
stonks_ingestion_items_fetched_total |
Counter | source_type |
Total items fetched from external sources |
stonks_ingestion_items_new_total |
Counter | source_type |
New (non-duplicate) items ingested |
stonks_ingestion_items_deduped_total |
Counter | source_type |
Items skipped due to deduplication |
stonks_ingestion_errors_total |
Counter | source_type |
Ingestion errors by source type |
stonks_ingestion_adapter_duration_seconds |
Histogram | source_type |
Adapter fetch latency (buckets: 0.1, 0.5, 1, 2, 5, 10, 30, 60s) |
Parsing Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_parse_jobs_total |
Counter | status |
Total parse jobs processed |
stonks_parse_quality_score |
Histogram | — | Distribution of parser quality scores (buckets: 0.1–1.0 in 0.1 steps) |
stonks_parse_low_quality_total |
Counter | — | Documents flagged as low quality by the parser |
stonks_parse_duration_seconds |
Histogram | — | Parse job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) |
Extraction Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_extraction_jobs_total |
Counter | status |
Total extraction jobs processed |
stonks_extraction_attempts_total |
Counter | — | Total Ollama extraction attempts (including retries) |
stonks_extraction_retries_total |
Counter | — | Extraction retry count |
stonks_extraction_duration_seconds |
Histogram | — | Extraction total duration (buckets: 1, 2, 5, 10, 20, 30, 60, 120s) |
stonks_extraction_confidence |
Histogram | — | Distribution of extraction confidence scores (buckets: 0.1–1.0) |
stonks_extraction_validation_errors_total |
Counter | — | Total validation errors across extractions |
stonks_extraction_tokens_total |
Counter | direction |
Estimated token usage (labels: input, output) |
Aggregation Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_aggregation_windows_total |
Counter | window |
Trend windows computed |
stonks_aggregation_signals_total |
Counter | window |
Signals processed during aggregation |
stonks_aggregation_contradiction_score |
Histogram | — | Distribution of contradiction scores in trend windows (buckets: 0.0–1.0) |
stonks_aggregation_duration_seconds |
Histogram | window |
Aggregation job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) |
Recommendation Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_recommendations_total |
Counter | action, mode |
Recommendations generated |
stonks_recommendations_suppressed_total |
Counter | — | Recommendations suppressed due to low data quality |
stonks_recommendation_confidence |
Histogram | — | Distribution of recommendation confidence scores (buckets: 0.1–1.0) |
Lake Publication Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_lake_facts_published_total |
Counter | table_name |
Analytical facts published to the lakehouse |
stonks_lake_publish_duration_seconds |
Histogram | table_name |
Lake publication write latency (buckets: 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5s) |
stonks_lake_publish_errors_total |
Counter | table_name |
Lake publication errors |
stonks_lake_publish_bytes_total |
Counter | table_name |
Total bytes written to the lakehouse |
Trading and Broker Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_orders_submitted_total |
Counter | side, order_type, mode |
Orders submitted to broker |
stonks_orders_rejected_total |
Counter | reason_category |
Orders rejected before broker submission |
stonks_orders_filled_total |
Counter | side |
Orders filled by broker |
stonks_orders_duplicates_prevented_total |
Counter | detected_via |
Duplicate orders prevented by idempotency checks |
stonks_risk_evaluations_total |
Counter | result |
Risk evaluations performed |
stonks_risk_check_failures_total |
Counter | check_name |
Individual risk check failures |
stonks_positions_synced_total |
Counter | — | Position sync operations completed |
Alerting Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_alerts_fired_total |
Counter | rule, severity |
Total alerts fired by rule |
stonks_alerts_resolved_total |
Counter | rule |
Total alerts resolved by rule |
stonks_alert_check_duration_seconds |
Histogram | — | Duration of alert evaluation cycle (buckets: 0.01–5s) |
stonks_alert_active |
Gauge | rule |
Whether an alert rule is currently firing (1) or resolved (0) |
Dead-Letter Queue Metrics
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_dlq_items_total |
Counter | queue |
Jobs sent to dead-letter queues |
stonks_dlq_replayed_total |
Counter | queue |
Jobs replayed from dead-letter queues |
stonks_dlq_depth |
Gauge | queue |
Current dead-letter queue depth |
Active Jobs Gauge
| Metric | Type | Labels | Description |
|---|---|---|---|
stonks_active_jobs |
Gauge | stage |
Currently processing jobs by pipeline stage |
Alerting Module
The alerting module (services/shared/alerting.py) evaluates four operational alert rules against PostgreSQL state on a configurable interval. When a threshold is breached, the module emits structured log events and increments Prometheus counters. When a previously firing alert clears, it logs a resolution event.
Alert Rules
1. source_failures — Sustained Source Retrieval Failures
Detects sources where the last N ingestion runs all failed within the lookback window.
| Parameter | ConfigMap Variable | Default | Description |
|---|---|---|---|
| Consecutive failure threshold | ALERT_SOURCE_FAILURE_THRESHOLD |
3 |
Number of consecutive failures before alert fires |
| Lookback window | ALERT_SOURCE_FAILURE_WINDOW_HOURS |
6 hours |
How far back to check ingestion_runs |
Severity: warning
Query: Checks ingestion_runs for sources where the most recent N runs (within the window) all have status = 'failed'.
Details emitted: source_id, source_type, source_name, ticker, consecutive_failures
2. schema_failure_spike — Extraction Validation Failure Rate
Detects when the extraction schema validation failure rate exceeds a threshold.
| Parameter | ConfigMap Variable | Default | Description |
|---|---|---|---|
| Failure rate threshold | ALERT_SCHEMA_FAILURE_RATE_THRESHOLD |
0.3 (30%) |
Failure rate that triggers the alert |
| Lookback window | ALERT_SCHEMA_FAILURE_WINDOW_HOURS |
1 hour |
Window for computing failure rate |
Severity: warning if rate ≥ 30%, critical if rate ≥ 50%
Query: Computes failed / total from model_performance_metrics within the window.
Details emitted: total_extractions, failed_extractions, failure_rate, threshold, window_hours
3. analytical_lag — Lake Publication Lag
Detects when lake publication has not completed within the threshold for any table.
| Parameter | ConfigMap Variable | Default | Description |
|---|---|---|---|
| Lag threshold | ALERT_LAKE_LAG_THRESHOLD_MINUTES |
60 minutes |
Maximum acceptable time since last successful publish |
Severity: warning
Query: Checks audit_events for the most recent successful lake_publish event per table, alerts if any are older than the threshold.
Details emitted: table_name, last_publish, lag_minutes, threshold_minutes
4. broker_issues — Consecutive Broker Errors
Detects consecutive broker submission errors (rejections, timeouts, connection failures).
| Parameter | ConfigMap Variable | Default | Description |
|---|---|---|---|
| Error threshold | ALERT_BROKER_ERROR_THRESHOLD |
3 |
Consecutive broker errors before alert fires |
| Lookback window | ALERT_BROKER_ERROR_WINDOW_HOURS |
1 hour |
Window for checking order_events |
Severity: critical
Query: Counts recent order_events with event_type IN ('broker_error', 'broker_timeout', 'connection_failed').
Details emitted: error_count, threshold, window_hours
Evaluation Cycle
The alerting module runs on a configurable interval (default: every 120 seconds, controlled by ALERT_CHECK_INTERVAL_SECONDS). Each cycle:
- Runs all four alert rules against PostgreSQL
- Compares results to the current
AlertStateto detect new firings and resolutions - For new firings: increments
stonks_alerts_fired_total, setsstonks_alert_activegauge to 1, logs aWARNING - For resolutions: increments
stonks_alerts_resolved_total, setsstonks_alert_activegauge to 0, logs anINFO - Records the evaluation duration in
stonks_alert_check_duration_seconds
Each rule check is wrapped in a try/except so a failure in one rule does not block the others.
ConfigMap Variables Summary
| Variable | Default | Description |
|---|---|---|
ALERT_SOURCE_FAILURE_THRESHOLD |
3 |
Consecutive source failures before alert |
ALERT_SOURCE_FAILURE_WINDOW_HOURS |
6 |
Source failure lookback window (hours) |
ALERT_SCHEMA_FAILURE_RATE_THRESHOLD |
0.3 |
Extraction failure rate threshold (0.0–1.0) |
ALERT_SCHEMA_FAILURE_WINDOW_HOURS |
1 |
Schema failure lookback window (hours) |
ALERT_LAKE_LAG_THRESHOLD_MINUTES |
60 |
Max minutes since last lake publish |
ALERT_BROKER_ERROR_THRESHOLD |
3 |
Consecutive broker errors before alert |
ALERT_BROKER_ERROR_WINDOW_HOURS |
1 |
Broker error lookback window (hours) |
ALERT_CHECK_INTERVAL_SECONDS |
120 |
Seconds between alert evaluation cycles |
Structured Logging
All services use structured JSON logging configured via services/shared/logging.py. Call setup_logging(service_name) once at service startup.
JSON Log Format
Each log line is a single JSON object with the following fields:
{
"timestamp": "2025-01-15T12:34:56.789012+00:00",
"level": "INFO",
"logger": "ingestion_worker",
"message": "Processed job for AAPL",
"service": "ingestion_worker",
"trace_id": "a1b2c3d4e5f67890",
"span_id": "1a2b3c4d"
}
| Field | Type | Description |
|---|---|---|
timestamp |
string (ISO 8601) | UTC timestamp of the log event |
level |
string | Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL |
logger |
string | Python logger name |
message |
string | Human-readable log message |
service |
string | Service name set at startup (e.g., ingestion_worker, scheduler) |
trace_id |
string | 16-character hex trace ID for distributed tracing |
span_id |
string | 8-character hex span ID for the current operation |
Additional Context Fields
When present, these fields are merged into the JSON output:
| Field | Source | Description |
|---|---|---|
span_operation |
Span context manager |
Name of the traced operation |
span_status |
Span context manager |
ok or error |
span_duration_ms |
Span context manager |
Duration of the span in milliseconds |
span_parent_id |
Span context manager |
Parent span ID for nested spans |
span_attributes |
Span context manager |
Arbitrary key-value attributes set on the span |
ticker |
Manual extra={} |
Company ticker symbol |
document_id |
Manual extra={} |
Document UUID |
source_type |
Manual extra={} |
Source type (e.g., polygon, news_api) |
job_id |
Manual extra={} |
Job identifier |
duration_ms |
Manual extra={} |
Operation duration |
error |
Manual extra={} |
Error description |
count |
Manual extra={} |
Item count |
exception |
Automatic | Formatted exception traceback (when exc_info is set) |
Trace Context Propagation
Trace context flows through the pipeline via job payloads:
- Inject: Before enqueuing a job to Redis, call
inject_trace_context(payload)to add_trace_idto the payload dict. - Extract: At the start of job processing, call
extract_trace_context(payload)to restore the trace context (or generate a new one if absent). - Span: Use the
Spancontext manager to create child spans within a service:
from services.shared.logging import Span
with Span("process_document", ticker="AAPL") as span:
# ... do work ...
span.set_attribute("doc_count", 5)
This produces a structured log entry on span exit with duration, status, and attributes.
Log Querying
To trace a request through the pipeline, filter by trace_id:
# Kubernetes — find all logs for a specific trace
kubectl logs -n stonks-oracle -l app.kubernetes.io/part-of=stonks-oracle --all-containers \
| jq -r 'select(.trace_id == "a1b2c3d4e5f67890")'
# Docker Compose — search across all services
docker compose logs --no-color | grep '"trace_id":"a1b2c3d4e5f67890"'
To find errors in a specific service:
# Kubernetes
kubectl logs -n stonks-oracle deployment/extractor --tail=500 \
| jq 'select(.level == "ERROR")'
# Docker Compose
docker compose logs extractor --no-color --tail=500 \
| jq 'select(.level == "ERROR")'
To find slow extraction spans:
kubectl logs -n stonks-oracle deployment/extractor --tail=1000 \
| jq 'select(.span_operation == "extract_document" and .span_duration_ms > 30000)'
Dead-Letter Queue System
When a worker fails to process a job after exhausting retries (default: 3 attempts), the job is pushed to a per-queue dead-letter list in Redis. The DLQ system is implemented in services/shared/dead_letter.py.
Queue Names
Dead-letter queues follow the naming pattern stonks:dlq:<queue_name>:
| DLQ Key | Source Queue | Description |
|---|---|---|
stonks:dlq:ingestion |
stonks:queue:ingestion |
Failed ingestion jobs (adapter errors, API failures) |
stonks:dlq:parsing |
stonks:queue:parsing |
Failed parse jobs |
stonks:dlq:extraction |
stonks:queue:extraction |
Failed extraction jobs (LLM errors, validation failures) |
stonks:dlq:aggregation |
stonks:queue:aggregation |
Failed aggregation jobs |
stonks:dlq:recommendation |
stonks:queue:recommendation |
Failed recommendation jobs |
stonks:dlq:broker_orders |
stonks:queue:broker_orders |
Failed broker order submissions |
When DEPLOY_STAGE is set, the prefix becomes stonks:<stage>:dlq:<queue_name>.
DLQ Entry Format
Each DLQ entry wraps the original job payload with failure metadata:
{
"original_payload": {
"source_id": "...",
"source_type": "polygon",
"ticker": "AAPL",
"company_id": "...",
"config": {}
},
"queue": "ingestion",
"error": "ConnectionError: API timeout after 30s",
"attempt": 3,
"worker": "ingestion_worker",
"dead_lettered_at": "2025-01-15T12:34:56.789012+00:00"
}
| Field | Type | Description |
|---|---|---|
original_payload |
object | The original job payload as it was enqueued |
queue |
string | Source queue name |
error |
string | Error message from the final failed attempt |
attempt |
integer | Number of attempts made before dead-lettering |
worker |
string | Worker identifier that dead-lettered the job |
dead_lettered_at |
string (ISO 8601) | UTC timestamp when the job was dead-lettered |
Routing
Jobs are routed to the DLQ by calling send_to_dlq() from worker code after retry exhaustion:
from services.shared.dead_letter import send_to_dlq
await send_to_dlq(
rds=redis_client,
queue_name="ingestion",
original_payload=job,
error=str(exception),
attempt=3,
worker="ingestion_worker",
)
The default maximum attempts before dead-lettering is DEFAULT_MAX_ATTEMPTS = 3.
Replay Tooling
The services/shared/dead_letter.py module provides functions for inspecting and replaying DLQ items:
| Function | Description |
|---|---|
peek_dlq(rds, queue_name, start=0, count=10) |
Inspect DLQ entries without removing them |
replay_one(rds, queue_name) |
Pop the oldest DLQ entry and re-enqueue its original payload to the source queue |
replay_all(rds, queue_name) |
Replay every item in the DLQ back to the source queue. Returns the count replayed |
dlq_length(rds, queue_name) |
Return the number of items in the DLQ |
dlq_summary(rds, queue_names) |
Return a mapping of queue_name → DLQ depth for multiple queues |
purge_dlq(rds, queue_name) |
Delete all items from the DLQ. Returns count removed |
Monitoring DLQ Depth
Use the scripts/check_queues.py script to inspect queue and DLQ depths from the command line:
# Docker Compose
REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD="" \
python scripts/check_queues.py
# Kubernetes
kubectl exec -n stonks-oracle deployment/query-api -- \
python scripts/check_queues.py
The Query API also exposes DLQ depths in the /api/ops/pipeline/stream SSE endpoint and the DevOps metrics endpoints, reporting dlq:<queue_name> keys alongside regular queue depths.
The stonks_dlq_depth Prometheus gauge tracks DLQ depth per queue for dashboard alerting.
Recommended Prometheus/Grafana Queries
Ingestion Throughput
# Ingestion jobs per minute by source type and status
sum(rate(stonks_ingestion_jobs_total[5m])) by (source_type, status) * 60
# New items ingested per minute
sum(rate(stonks_ingestion_items_new_total[5m])) * 60
# Deduplication ratio (higher = more duplicates being filtered)
sum(rate(stonks_ingestion_items_deduped_total[5m]))
/ sum(rate(stonks_ingestion_items_fetched_total[5m]))
# Adapter latency p95 by source type
histogram_quantile(0.95, sum(rate(stonks_ingestion_adapter_duration_seconds_bucket[5m])) by (le, source_type))
# Ingestion error rate
sum(rate(stonks_ingestion_errors_total[5m])) by (source_type)
Extraction Latency and Quality
# Extraction duration p50 and p95
histogram_quantile(0.5, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))
histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))
# Extraction success rate
sum(rate(stonks_extraction_jobs_total{status="success"}[5m]))
/ sum(rate(stonks_extraction_jobs_total[5m]))
# Average extraction confidence
histogram_quantile(0.5, sum(rate(stonks_extraction_confidence_bucket[5m])) by (le))
# Validation error rate
sum(rate(stonks_extraction_validation_errors_total[5m]))
# Token usage rate (input vs output)
sum(rate(stonks_extraction_tokens_total[5m])) by (direction)
Aggregation Volume
# Trend windows computed per minute by window size
sum(rate(stonks_aggregation_windows_total[5m])) by (window) * 60
# Signals processed per minute
sum(rate(stonks_aggregation_signals_total[5m])) by (window) * 60
# Average contradiction score (higher = more conflicting signals)
histogram_quantile(0.5, sum(rate(stonks_aggregation_contradiction_score_bucket[5m])) by (le))
# Aggregation duration p95
histogram_quantile(0.95, sum(rate(stonks_aggregation_duration_seconds_bucket[5m])) by (le, window))
Recommendation Generation
# Recommendations generated per minute by action
sum(rate(stonks_recommendations_total[5m])) by (action, mode) * 60
# Suppression rate
sum(rate(stonks_recommendations_suppressed_total[5m]))
/ sum(rate(stonks_recommendations_total[5m]))
# Recommendation confidence distribution
histogram_quantile(0.5, sum(rate(stonks_recommendation_confidence_bucket[5m])) by (le))
Trading Engine Activity
# Orders submitted per minute by side
sum(rate(stonks_orders_submitted_total[5m])) by (side, mode) * 60
# Order rejection rate by reason
sum(rate(stonks_orders_rejected_total[5m])) by (reason_category)
# Fill rate
sum(rate(stonks_orders_filled_total[5m]))
/ sum(rate(stonks_orders_submitted_total[5m]))
# Duplicate orders prevented
sum(rate(stonks_orders_duplicates_prevented_total[5m])) by (detected_via)
# Risk evaluation outcomes
sum(rate(stonks_risk_evaluations_total[5m])) by (result)
# Risk check failure breakdown
sum(rate(stonks_risk_check_failures_total[5m])) by (check_name)
Lake Publication
# Facts published per minute by table
sum(rate(stonks_lake_facts_published_total[5m])) by (table_name) * 60
# Write latency p95 by table
histogram_quantile(0.95, sum(rate(stonks_lake_publish_duration_seconds_bucket[5m])) by (le, table_name))
# Publication error rate
sum(rate(stonks_lake_publish_errors_total[5m])) by (table_name)
# Bytes written per minute
sum(rate(stonks_lake_publish_bytes_total[5m])) by (table_name) * 60
Alerting Health
# Currently active alerts by rule
stonks_alert_active
# Alert firing rate
sum(rate(stonks_alerts_fired_total[1h])) by (rule, severity)
# Alert evaluation duration
histogram_quantile(0.95, sum(rate(stonks_alert_check_duration_seconds_bucket[5m])) by (le))
Dead-Letter Queue Health
# Current DLQ depth by queue
stonks_dlq_depth
# DLQ inflow rate (jobs dead-lettered per minute)
sum(rate(stonks_dlq_items_total[5m])) by (queue) * 60
# DLQ replay rate
sum(rate(stonks_dlq_replayed_total[5m])) by (queue) * 60
Pipeline Overview (Active Jobs)
# Currently active jobs by pipeline stage
stonks_active_jobs
# Parse quality score distribution
histogram_quantile(0.5, sum(rate(stonks_parse_quality_score_bucket[5m])) by (le))
# Low quality document rate
sum(rate(stonks_parse_low_quality_total[5m]))
/ sum(rate(stonks_parse_jobs_total[5m]))
Recommended Grafana Alert Rules
| Alert | Expression | For | Severity |
|---|---|---|---|
| High DLQ depth | stonks_dlq_depth > 10 |
5m | warning |
| Ingestion error spike | sum(rate(stonks_ingestion_errors_total[5m])) > 0.5 |
5m | warning |
| Extraction latency high | histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) > 60 |
10m | warning |
| Lake publication stale | stonks_alert_active{rule="analytical_lag"} == 1 |
5m | warning |
| Broker errors active | stonks_alert_active{rule="broker_issues"} == 1 |
1m | critical |
| Zero ingestion throughput | sum(rate(stonks_ingestion_jobs_total[15m])) == 0 |
15m | critical |