Files
Celes Renata f468e30af0
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
feat: implement dual-pipeline signal engine service
New service at services/signal_engine/ implementing concurrent heuristic
(deterministic scoring) and probabilistic (Bayesian inference) pipelines
that evaluate technical signals across 6 timeframes (M30-M) and produce
independent BUY/WATCH/SKIP verdicts per ticker per evaluation tick.

Components:
- Input Normalizer: multi-source data assembly with sentinel fallbacks
- Signal Library: Fibonacci, MA Stack, RSI, Cup & Handle, Elliott Wave
- Multi-Timeframe Confluence Engine: weighted scoring with D/W/M anchors
- Hard Filter Engine: macro_bias, valuation, earnings proximity gating
- Heuristic Pipeline: S_total scoring with confidence-gated verdicts
- Probabilistic Pipeline: Bayesian log-odds with regime priors, entropy
  gating, EV_R calculation, and signal correlation penalty
- Exit Engine: stop-loss, targets, trailing ATR-based stops
- Delta Analyzer: pipeline agreement tracking with rolling Redis metrics
- Output Formatter: SignalOutput contract + Recommendation schema mapping
- Worker orchestrator: concurrent pipelines with failure isolation
- Main entry point: queue polling with fail-safe config loading

Infrastructure:
- Migration 039: signal_engine_outputs table with 3 indexes
- Helm chart: signalEngine service entry (processing tier)
- Redis key: QUEUE_SIGNAL_ENGINE constant

Tests: 390 tests (unit + property-based) covering all components
Config: dual_pipeline_enabled=false by default (safe rollout)
2026-05-02 07:32:26 +00:00

614 lines
24 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Observability and Metrics Reference
This document covers the full observability stack for Stonks Oracle: Prometheus metrics, operational alerting, structured logging, dead-letter queues, and recommended monitoring queries.
## Prometheus Metrics Endpoint
The Query API exposes a `/metrics` endpoint that returns all registered Prometheus metrics in the standard text exposition format.
**Endpoint**: `GET /metrics` on the Query API service (port 8000)
**Response**: `text/plain; version=0.0.4; charset=utf-8` — standard Prometheus scrape format via `prometheus_client.generate_latest()`.
### Prometheus Scrape Configuration
Add the following job to your `prometheus.yml`:
```yaml
scrape_configs:
- job_name: "stonks-oracle"
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /metrics
static_configs:
- targets:
# Docker Compose
- "query-api:8000"
# Kubernetes
# - "query-api.stonks-oracle.svc.cluster.local:8000"
```
For Kubernetes deployments, you can also use a `ServiceMonitor` resource if the Prometheus Operator is installed:
```yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: stonks-oracle
namespace: stonks-oracle
spec:
selector:
matchLabels:
app: query-api
endpoints:
- port: http
path: /metrics
interval: 15s
```
---
## Prometheus Metrics Reference
All metrics are defined in `services/shared/metrics.py`. Metric names use the `stonks_` prefix.
### Service Info
| Metric | Type | Description |
|--------|------|-------------|
| `stonks_oracle_info` | Info | Service metadata (build version, etc.) |
### Ingestion Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_ingestion_jobs_total` | Counter | `source_type`, `status` | Total ingestion jobs processed |
| `stonks_ingestion_items_fetched_total` | Counter | `source_type` | Total items fetched from external sources |
| `stonks_ingestion_items_new_total` | Counter | `source_type` | New (non-duplicate) items ingested |
| `stonks_ingestion_items_deduped_total` | Counter | `source_type` | Items skipped due to deduplication |
| `stonks_ingestion_errors_total` | Counter | `source_type` | Ingestion errors by source type |
| `stonks_ingestion_adapter_duration_seconds` | Histogram | `source_type` | Adapter fetch latency (buckets: 0.1, 0.5, 1, 2, 5, 10, 30, 60s) |
### Parsing Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_parse_jobs_total` | Counter | `status` | Total parse jobs processed |
| `stonks_parse_quality_score` | Histogram | — | Distribution of parser quality scores (buckets: 0.11.0 in 0.1 steps) |
| `stonks_parse_low_quality_total` | Counter | — | Documents flagged as low quality by the parser |
| `stonks_parse_duration_seconds` | Histogram | — | Parse job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) |
### Extraction Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_extraction_jobs_total` | Counter | `status` | Total extraction jobs processed |
| `stonks_extraction_attempts_total` | Counter | — | Total Ollama extraction attempts (including retries) |
| `stonks_extraction_retries_total` | Counter | — | Extraction retry count |
| `stonks_extraction_duration_seconds` | Histogram | — | Extraction total duration (buckets: 1, 2, 5, 10, 20, 30, 60, 120s) |
| `stonks_extraction_confidence` | Histogram | — | Distribution of extraction confidence scores (buckets: 0.11.0) |
| `stonks_extraction_validation_errors_total` | Counter | — | Total validation errors across extractions |
| `stonks_extraction_tokens_total` | Counter | `direction` | Estimated token usage (labels: `input`, `output`) |
### Aggregation Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_aggregation_windows_total` | Counter | `window` | Trend windows computed |
| `stonks_aggregation_signals_total` | Counter | `window` | Signals processed during aggregation |
| `stonks_aggregation_contradiction_score` | Histogram | — | Distribution of contradiction scores in trend windows (buckets: 0.01.0) |
| `stonks_aggregation_duration_seconds` | Histogram | `window` | Aggregation job duration (buckets: 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10s) |
### Recommendation Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_recommendations_total` | Counter | `action`, `mode` | Recommendations generated |
| `stonks_recommendations_suppressed_total` | Counter | — | Recommendations suppressed due to low data quality |
| `stonks_recommendation_confidence` | Histogram | — | Distribution of recommendation confidence scores (buckets: 0.11.0) |
### Lake Publication Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_lake_facts_published_total` | Counter | `table_name` | Analytical facts published to the lakehouse |
| `stonks_lake_publish_duration_seconds` | Histogram | `table_name` | Lake publication write latency (buckets: 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5s) |
| `stonks_lake_publish_errors_total` | Counter | `table_name` | Lake publication errors |
| `stonks_lake_publish_bytes_total` | Counter | `table_name` | Total bytes written to the lakehouse |
### Trading and Broker Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_orders_submitted_total` | Counter | `side`, `order_type`, `mode` | Orders submitted to broker |
| `stonks_orders_rejected_total` | Counter | `reason_category` | Orders rejected before broker submission |
| `stonks_orders_filled_total` | Counter | `side` | Orders filled by broker |
| `stonks_orders_duplicates_prevented_total` | Counter | `detected_via` | Duplicate orders prevented by idempotency checks |
| `stonks_orders_clamped_total` | Counter | — | Orders auto-clamped to fit within position limits |
| `stonks_risk_evaluations_total` | Counter | `result` | Risk evaluations performed |
| `stonks_risk_check_failures_total` | Counter | `check_name` | Individual risk check failures |
| `stonks_positions_synced_total` | Counter | — | Position sync operations completed |
### Alerting Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_alerts_fired_total` | Counter | `rule`, `severity` | Total alerts fired by rule |
| `stonks_alerts_resolved_total` | Counter | `rule` | Total alerts resolved by rule |
| `stonks_alert_check_duration_seconds` | Histogram | — | Duration of alert evaluation cycle (buckets: 0.015s) |
| `stonks_alert_active` | Gauge | `rule` | Whether an alert rule is currently firing (1) or resolved (0) |
### Dead-Letter Queue Metrics
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_dlq_items_total` | Counter | `queue` | Jobs sent to dead-letter queues |
| `stonks_dlq_replayed_total` | Counter | `queue` | Jobs replayed from dead-letter queues |
| `stonks_dlq_depth` | Gauge | `queue` | Current dead-letter queue depth |
### Active Jobs Gauge
| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `stonks_active_jobs` | Gauge | `stage` | Currently processing jobs by pipeline stage |
---
## Alerting Module
The alerting module (`services/shared/alerting.py`) evaluates four operational alert rules against PostgreSQL state on a configurable interval. When a threshold is breached, the module emits structured log events and increments Prometheus counters. When a previously firing alert clears, it logs a resolution event.
### Alert Rules
#### 1. `source_failures` — Sustained Source Retrieval Failures
Detects sources where the last N ingestion runs all failed within the lookback window.
| Parameter | ConfigMap Variable | Default | Description |
|-----------|--------------------|---------|-------------|
| Consecutive failure threshold | `ALERT_SOURCE_FAILURE_THRESHOLD` | `3` | Number of consecutive failures before alert fires |
| Lookback window | `ALERT_SOURCE_FAILURE_WINDOW_HOURS` | `6` hours | How far back to check ingestion_runs |
**Severity**: `warning`
**Query**: Checks `ingestion_runs` for sources where the most recent N runs (within the window) all have `status = 'failed'`.
**Details emitted**: `source_id`, `source_type`, `source_name`, `ticker`, `consecutive_failures`
#### 2. `schema_failure_spike` — Extraction Validation Failure Rate
Detects when the extraction schema validation failure rate exceeds a threshold.
| Parameter | ConfigMap Variable | Default | Description |
|-----------|--------------------|---------|-------------|
| Failure rate threshold | `ALERT_SCHEMA_FAILURE_RATE_THRESHOLD` | `0.3` (30%) | Failure rate that triggers the alert |
| Lookback window | `ALERT_SCHEMA_FAILURE_WINDOW_HOURS` | `1` hour | Window for computing failure rate |
**Severity**: `warning` if rate ≥ 30%, `critical` if rate ≥ 50%
**Query**: Computes `failed / total` from `model_performance_metrics` within the window.
**Details emitted**: `total_extractions`, `failed_extractions`, `failure_rate`, `threshold`, `window_hours`
#### 3. `analytical_lag` — Lake Publication Lag
Detects when lake publication has not completed within the threshold for any table.
| Parameter | ConfigMap Variable | Default | Description |
|-----------|--------------------|---------|-------------|
| Lag threshold | `ALERT_LAKE_LAG_THRESHOLD_MINUTES` | `60` minutes | Maximum acceptable time since last successful publish |
**Severity**: `warning`
**Query**: Checks `audit_events` for the most recent successful `lake_publish` event per table, alerts if any are older than the threshold.
**Details emitted**: `table_name`, `last_publish`, `lag_minutes`, `threshold_minutes`
#### 4. `broker_issues` — Consecutive Broker Errors
Detects consecutive broker submission errors (rejections, timeouts, connection failures).
| Parameter | ConfigMap Variable | Default | Description |
|-----------|--------------------|---------|-------------|
| Error threshold | `ALERT_BROKER_ERROR_THRESHOLD` | `3` | Consecutive broker errors before alert fires |
| Lookback window | `ALERT_BROKER_ERROR_WINDOW_HOURS` | `1` hour | Window for checking order_events |
**Severity**: `critical`
**Query**: Counts recent `order_events` with `event_type IN ('broker_error', 'broker_timeout', 'connection_failed')`.
**Details emitted**: `error_count`, `threshold`, `window_hours`
### Evaluation Cycle
The alerting module runs on a configurable interval (default: every 120 seconds, controlled by `ALERT_CHECK_INTERVAL_SECONDS`). Each cycle:
1. Runs all four alert rules against PostgreSQL
2. Compares results to the current `AlertState` to detect new firings and resolutions
3. For new firings: increments `stonks_alerts_fired_total`, sets `stonks_alert_active` gauge to 1, logs a `WARNING`
4. For resolutions: increments `stonks_alerts_resolved_total`, sets `stonks_alert_active` gauge to 0, logs an `INFO`
5. Records the evaluation duration in `stonks_alert_check_duration_seconds`
Each rule check is wrapped in a try/except so a failure in one rule does not block the others.
### ConfigMap Variables Summary
| Variable | Default | Description |
|----------|---------|-------------|
| `ALERT_SOURCE_FAILURE_THRESHOLD` | `3` | Consecutive source failures before alert |
| `ALERT_SOURCE_FAILURE_WINDOW_HOURS` | `6` | Source failure lookback window (hours) |
| `ALERT_SCHEMA_FAILURE_RATE_THRESHOLD` | `0.3` | Extraction failure rate threshold (0.01.0) |
| `ALERT_SCHEMA_FAILURE_WINDOW_HOURS` | `1` | Schema failure lookback window (hours) |
| `ALERT_LAKE_LAG_THRESHOLD_MINUTES` | `60` | Max minutes since last lake publish |
| `ALERT_BROKER_ERROR_THRESHOLD` | `3` | Consecutive broker errors before alert |
| `ALERT_BROKER_ERROR_WINDOW_HOURS` | `1` | Broker error lookback window (hours) |
| `ALERT_CHECK_INTERVAL_SECONDS` | `120` | Seconds between alert evaluation cycles |
---
## Structured Logging
All services use structured JSON logging configured via `services/shared/logging.py`. Call `setup_logging(service_name)` once at service startup.
### JSON Log Format
Each log line is a single JSON object with the following fields:
```json
{
"timestamp": "2025-01-15T12:34:56.789012+00:00",
"level": "INFO",
"logger": "ingestion_worker",
"message": "Processed job for AAPL",
"service": "ingestion_worker",
"trace_id": "a1b2c3d4e5f67890",
"span_id": "1a2b3c4d"
}
```
| Field | Type | Description |
|-------|------|-------------|
| `timestamp` | string (ISO 8601) | UTC timestamp of the log event |
| `level` | string | Log level: `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL` |
| `logger` | string | Python logger name |
| `message` | string | Human-readable log message |
| `service` | string | Service name set at startup (e.g., `ingestion_worker`, `scheduler`) |
| `trace_id` | string | 16-character hex trace ID for distributed tracing |
| `span_id` | string | 8-character hex span ID for the current operation |
### Additional Context Fields
When present, these fields are merged into the JSON output:
| Field | Source | Description |
|-------|--------|-------------|
| `span_operation` | `Span` context manager | Name of the traced operation |
| `span_status` | `Span` context manager | `ok` or `error` |
| `span_duration_ms` | `Span` context manager | Duration of the span in milliseconds |
| `span_parent_id` | `Span` context manager | Parent span ID for nested spans |
| `span_attributes` | `Span` context manager | Arbitrary key-value attributes set on the span |
| `ticker` | Manual `extra={}` | Company ticker symbol |
| `document_id` | Manual `extra={}` | Document UUID |
| `source_type` | Manual `extra={}` | Source type (e.g., `polygon`, `news_api`) |
| `job_id` | Manual `extra={}` | Job identifier |
| `duration_ms` | Manual `extra={}` | Operation duration |
| `error` | Manual `extra={}` | Error description |
| `count` | Manual `extra={}` | Item count |
| `exception` | Automatic | Formatted exception traceback (when `exc_info` is set) |
### Trace Context Propagation
Trace context flows through the pipeline via job payloads:
1. **Inject**: Before enqueuing a job to Redis, call `inject_trace_context(payload)` to add `_trace_id` to the payload dict.
2. **Extract**: At the start of job processing, call `extract_trace_context(payload)` to restore the trace context (or generate a new one if absent).
3. **Span**: Use the `Span` context manager to create child spans within a service:
```python
from services.shared.logging import Span
with Span("process_document", ticker="AAPL") as span:
# ... do work ...
span.set_attribute("doc_count", 5)
```
This produces a structured log entry on span exit with duration, status, and attributes.
### Log Querying
To trace a request through the pipeline, filter by `trace_id`:
```bash
# Kubernetes — find all logs for a specific trace
kubectl logs -n stonks-oracle -l app.kubernetes.io/part-of=stonks-oracle --all-containers \
| jq -r 'select(.trace_id == "a1b2c3d4e5f67890")'
# Docker Compose — search across all services
docker compose logs --no-color | grep '"trace_id":"a1b2c3d4e5f67890"'
```
To find errors in a specific service:
```bash
# Kubernetes
kubectl logs -n stonks-oracle deployment/extractor --tail=500 \
| jq 'select(.level == "ERROR")'
# Docker Compose
docker compose logs extractor --no-color --tail=500 \
| jq 'select(.level == "ERROR")'
```
To find slow extraction spans:
```bash
kubectl logs -n stonks-oracle deployment/extractor --tail=1000 \
| jq 'select(.span_operation == "extract_document" and .span_duration_ms > 30000)'
```
---
## Dead-Letter Queue System
When a worker fails to process a job after exhausting retries (default: 3 attempts), the job is pushed to a per-queue dead-letter list in Redis. The DLQ system is implemented in `services/shared/dead_letter.py`.
### Queue Names
Dead-letter queues follow the naming pattern `stonks:dlq:<queue_name>`:
| DLQ Key | Source Queue | Description |
|---------|-------------|-------------|
| `stonks:dlq:ingestion` | `stonks:queue:ingestion` | Failed ingestion jobs (adapter errors, API failures) |
| `stonks:dlq:parsing` | `stonks:queue:parsing` | Failed parse jobs |
| `stonks:dlq:extraction` | `stonks:queue:extraction` | Failed extraction jobs (LLM errors, validation failures) |
| `stonks:dlq:aggregation` | `stonks:queue:aggregation` | Failed aggregation jobs |
| `stonks:dlq:recommendation` | `stonks:queue:recommendation` | Failed recommendation jobs |
| `stonks:dlq:broker_orders` | `stonks:queue:broker_orders` | Failed broker order submissions |
When `DEPLOY_STAGE` is set, the prefix becomes `stonks:<stage>:dlq:<queue_name>`.
### DLQ Entry Format
Each DLQ entry wraps the original job payload with failure metadata:
```json
{
"original_payload": {
"source_id": "...",
"source_type": "polygon",
"ticker": "AAPL",
"company_id": "...",
"config": {}
},
"queue": "ingestion",
"error": "ConnectionError: API timeout after 30s",
"attempt": 3,
"worker": "ingestion_worker",
"dead_lettered_at": "2025-01-15T12:34:56.789012+00:00"
}
```
| Field | Type | Description |
|-------|------|-------------|
| `original_payload` | object | The original job payload as it was enqueued |
| `queue` | string | Source queue name |
| `error` | string | Error message from the final failed attempt |
| `attempt` | integer | Number of attempts made before dead-lettering |
| `worker` | string | Worker identifier that dead-lettered the job |
| `dead_lettered_at` | string (ISO 8601) | UTC timestamp when the job was dead-lettered |
### Routing
Jobs are routed to the DLQ by calling `send_to_dlq()` from worker code after retry exhaustion:
```python
from services.shared.dead_letter import send_to_dlq
await send_to_dlq(
rds=redis_client,
queue_name="ingestion",
original_payload=job,
error=str(exception),
attempt=3,
worker="ingestion_worker",
)
```
The default maximum attempts before dead-lettering is `DEFAULT_MAX_ATTEMPTS = 3`.
### Replay Tooling
The `services/shared/dead_letter.py` module provides functions for inspecting and replaying DLQ items:
| Function | Description |
|----------|-------------|
| `peek_dlq(rds, queue_name, start=0, count=10)` | Inspect DLQ entries without removing them |
| `replay_one(rds, queue_name)` | Pop the oldest DLQ entry and re-enqueue its original payload to the source queue |
| `replay_all(rds, queue_name)` | Replay every item in the DLQ back to the source queue. Returns the count replayed |
| `dlq_length(rds, queue_name)` | Return the number of items in the DLQ |
| `dlq_summary(rds, queue_names)` | Return a mapping of queue_name → DLQ depth for multiple queues |
| `purge_dlq(rds, queue_name)` | Delete all items from the DLQ. Returns count removed |
### Monitoring DLQ Depth
Use the `scripts/check_queues.py` script to inspect queue and DLQ depths from the command line:
```bash
# Docker Compose
REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD="" \
python scripts/check_queues.py
# Kubernetes
kubectl exec -n stonks-oracle deployment/query-api -- \
python scripts/check_queues.py
```
The Query API also exposes DLQ depths in the `/api/ops/pipeline/stream` SSE endpoint and the DevOps metrics endpoints, reporting `dlq:<queue_name>` keys alongside regular queue depths.
The `stonks_dlq_depth` Prometheus gauge tracks DLQ depth per queue for dashboard alerting.
---
## Recommended Prometheus/Grafana Queries
### Ingestion Throughput
```promql
# Ingestion jobs per minute by source type and status
sum(rate(stonks_ingestion_jobs_total[5m])) by (source_type, status) * 60
# New items ingested per minute
sum(rate(stonks_ingestion_items_new_total[5m])) * 60
# Deduplication ratio (higher = more duplicates being filtered)
sum(rate(stonks_ingestion_items_deduped_total[5m]))
/ sum(rate(stonks_ingestion_items_fetched_total[5m]))
# Adapter latency p95 by source type
histogram_quantile(0.95, sum(rate(stonks_ingestion_adapter_duration_seconds_bucket[5m])) by (le, source_type))
# Ingestion error rate
sum(rate(stonks_ingestion_errors_total[5m])) by (source_type)
```
### Extraction Latency and Quality
```promql
# Extraction duration p50 and p95
histogram_quantile(0.5, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))
histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le))
# Extraction success rate
sum(rate(stonks_extraction_jobs_total{status="success"}[5m]))
/ sum(rate(stonks_extraction_jobs_total[5m]))
# Average extraction confidence
histogram_quantile(0.5, sum(rate(stonks_extraction_confidence_bucket[5m])) by (le))
# Validation error rate
sum(rate(stonks_extraction_validation_errors_total[5m]))
# Token usage rate (input vs output)
sum(rate(stonks_extraction_tokens_total[5m])) by (direction)
```
### Aggregation Volume
```promql
# Trend windows computed per minute by window size
sum(rate(stonks_aggregation_windows_total[5m])) by (window) * 60
# Signals processed per minute
sum(rate(stonks_aggregation_signals_total[5m])) by (window) * 60
# Average contradiction score (higher = more conflicting signals)
histogram_quantile(0.5, sum(rate(stonks_aggregation_contradiction_score_bucket[5m])) by (le))
# Aggregation duration p95
histogram_quantile(0.95, sum(rate(stonks_aggregation_duration_seconds_bucket[5m])) by (le, window))
```
### Recommendation Generation
```promql
# Recommendations generated per minute by action
sum(rate(stonks_recommendations_total[5m])) by (action, mode) * 60
# Suppression rate
sum(rate(stonks_recommendations_suppressed_total[5m]))
/ sum(rate(stonks_recommendations_total[5m]))
# Recommendation confidence distribution
histogram_quantile(0.5, sum(rate(stonks_recommendation_confidence_bucket[5m])) by (le))
```
### Trading Engine Activity
```promql
# Orders submitted per minute by side
sum(rate(stonks_orders_submitted_total[5m])) by (side, mode) * 60
# Order rejection rate by reason
sum(rate(stonks_orders_rejected_total[5m])) by (reason_category)
# Fill rate
sum(rate(stonks_orders_filled_total[5m]))
/ sum(rate(stonks_orders_submitted_total[5m]))
# Duplicate orders prevented
sum(rate(stonks_orders_duplicates_prevented_total[5m])) by (detected_via)
# Risk evaluation outcomes
sum(rate(stonks_risk_evaluations_total[5m])) by (result)
# Risk check failure breakdown
sum(rate(stonks_risk_check_failures_total[5m])) by (check_name)
```
### Lake Publication
```promql
# Facts published per minute by table
sum(rate(stonks_lake_facts_published_total[5m])) by (table_name) * 60
# Write latency p95 by table
histogram_quantile(0.95, sum(rate(stonks_lake_publish_duration_seconds_bucket[5m])) by (le, table_name))
# Publication error rate
sum(rate(stonks_lake_publish_errors_total[5m])) by (table_name)
# Bytes written per minute
sum(rate(stonks_lake_publish_bytes_total[5m])) by (table_name) * 60
```
### Alerting Health
```promql
# Currently active alerts by rule
stonks_alert_active
# Alert firing rate
sum(rate(stonks_alerts_fired_total[1h])) by (rule, severity)
# Alert evaluation duration
histogram_quantile(0.95, sum(rate(stonks_alert_check_duration_seconds_bucket[5m])) by (le))
```
### Dead-Letter Queue Health
```promql
# Current DLQ depth by queue
stonks_dlq_depth
# DLQ inflow rate (jobs dead-lettered per minute)
sum(rate(stonks_dlq_items_total[5m])) by (queue) * 60
# DLQ replay rate
sum(rate(stonks_dlq_replayed_total[5m])) by (queue) * 60
```
### Pipeline Overview (Active Jobs)
```promql
# Currently active jobs by pipeline stage
stonks_active_jobs
# Parse quality score distribution
histogram_quantile(0.5, sum(rate(stonks_parse_quality_score_bucket[5m])) by (le))
# Low quality document rate
sum(rate(stonks_parse_low_quality_total[5m]))
/ sum(rate(stonks_parse_jobs_total[5m]))
```
### Recommended Grafana Alert Rules
| Alert | Expression | For | Severity |
|-------|-----------|-----|----------|
| High DLQ depth | `stonks_dlq_depth > 10` | 5m | warning |
| Ingestion error spike | `sum(rate(stonks_ingestion_errors_total[5m])) > 0.5` | 5m | warning |
| Extraction latency high | `histogram_quantile(0.95, sum(rate(stonks_extraction_duration_seconds_bucket[5m])) by (le)) > 60` | 10m | warning |
| Lake publication stale | `stonks_alert_active{rule="analytical_lag"} == 1` | 5m | warning |
| Broker errors active | `stonks_alert_active{rule="broker_issues"} == 1` | 1m | critical |
| Zero ingestion throughput | `sum(rate(stonks_ingestion_jobs_total[15m])) == 0` | 15m | critical |