"""Prometheus metrics for all Stonks Oracle pipeline stages. Provides counters, histograms, and gauges covering: - Ingestion: items fetched, new items, errors, adapter latency - Parsing: documents parsed, quality scores, low-quality flags - Extraction: attempts, successes, failures, latency, confidence, retries - Aggregation: trend windows computed, signal counts, contradiction scores - Lake publication: facts published per table, write latency - Trading: orders submitted, rejected, filled, risk evaluations Requirements: 12.1, 12.2 Design: Section 12 (Observability and Operations) """ from __future__ import annotations from prometheus_client import Counter, Gauge, Histogram, Info # --------------------------------------------------------------------------- # Service info # --------------------------------------------------------------------------- SERVICE_INFO = Info("stonks_oracle", "Stonks Oracle service metadata") # --------------------------------------------------------------------------- # Ingestion metrics # --------------------------------------------------------------------------- INGESTION_JOBS_TOTAL = Counter( "stonks_ingestion_jobs_total", "Total ingestion jobs processed", ["source_type", "status"], ) INGESTION_ITEMS_FETCHED = Counter( "stonks_ingestion_items_fetched_total", "Total items fetched from external sources", ["source_type"], ) INGESTION_ITEMS_NEW = Counter( "stonks_ingestion_items_new_total", "New (non-duplicate) items ingested", ["source_type"], ) INGESTION_ITEMS_DEDUPED = Counter( "stonks_ingestion_items_deduped_total", "Items skipped due to deduplication", ["source_type"], ) INGESTION_ERRORS = Counter( "stonks_ingestion_errors_total", "Ingestion errors by source type", ["source_type"], ) INGESTION_ADAPTER_DURATION = Histogram( "stonks_ingestion_adapter_duration_seconds", "Adapter fetch latency in seconds", ["source_type"], buckets=(0.1, 0.5, 1, 2, 5, 10, 30, 60), ) # --------------------------------------------------------------------------- # Parsing metrics # --------------------------------------------------------------------------- PARSE_JOBS_TOTAL = Counter( "stonks_parse_jobs_total", "Total parse jobs processed", ["status"], ) PARSE_QUALITY_SCORE = Histogram( "stonks_parse_quality_score", "Distribution of parser quality scores", buckets=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0), ) PARSE_LOW_QUALITY_TOTAL = Counter( "stonks_parse_low_quality_total", "Documents flagged as low quality by the parser", ) PARSE_DURATION = Histogram( "stonks_parse_duration_seconds", "Parse job duration in seconds", buckets=(0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10), ) # --------------------------------------------------------------------------- # Extraction metrics # --------------------------------------------------------------------------- EXTRACTION_JOBS_TOTAL = Counter( "stonks_extraction_jobs_total", "Total extraction jobs processed", ["status"], ) EXTRACTION_ATTEMPTS = Counter( "stonks_extraction_attempts_total", "Total Ollama extraction attempts (including retries)", ) EXTRACTION_RETRIES = Counter( "stonks_extraction_retries_total", "Extraction retry count", ) EXTRACTION_DURATION = Histogram( "stonks_extraction_duration_seconds", "Extraction total duration in seconds", buckets=(1, 2, 5, 10, 20, 30, 60, 120), ) EXTRACTION_CONFIDENCE = Histogram( "stonks_extraction_confidence", "Distribution of extraction confidence scores", buckets=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0), ) EXTRACTION_VALIDATION_ERRORS = Counter( "stonks_extraction_validation_errors_total", "Total validation errors across extractions", ) EXTRACTION_TOKEN_ESTIMATE = Counter( "stonks_extraction_tokens_total", "Estimated token usage", ["direction"], ) # --------------------------------------------------------------------------- # Aggregation metrics # --------------------------------------------------------------------------- AGGREGATION_WINDOWS_COMPUTED = Counter( "stonks_aggregation_windows_total", "Trend windows computed", ["window"], ) AGGREGATION_SIGNALS_PROCESSED = Counter( "stonks_aggregation_signals_total", "Signals processed during aggregation", ["window"], ) AGGREGATION_CONTRADICTION_SCORE = Histogram( "stonks_aggregation_contradiction_score", "Distribution of contradiction scores in trend windows", buckets=(0.0, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5, 0.75, 1.0), ) AGGREGATION_DURATION = Histogram( "stonks_aggregation_duration_seconds", "Aggregation job duration in seconds", ["window"], buckets=(0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10), ) # --------------------------------------------------------------------------- # Recommendation metrics # --------------------------------------------------------------------------- RECOMMENDATION_GENERATED = Counter( "stonks_recommendations_total", "Recommendations generated", ["action", "mode"], ) RECOMMENDATION_SUPPRESSED = Counter( "stonks_recommendations_suppressed_total", "Recommendations suppressed due to low data quality", ) RECOMMENDATION_CONFIDENCE = Histogram( "stonks_recommendation_confidence", "Distribution of recommendation confidence scores", buckets=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0), ) # --------------------------------------------------------------------------- # Lake publication metrics # --------------------------------------------------------------------------- LAKE_FACTS_PUBLISHED = Counter( "stonks_lake_facts_published_total", "Analytical facts published to the lakehouse", ["table_name"], ) LAKE_PUBLISH_DURATION = Histogram( "stonks_lake_publish_duration_seconds", "Lake publication write latency in seconds", ["table_name"], buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5), ) LAKE_PUBLISH_ERRORS = Counter( "stonks_lake_publish_errors_total", "Lake publication errors", ["table_name"], ) LAKE_PUBLISH_BYTES = Counter( "stonks_lake_publish_bytes_total", "Total bytes written to the lakehouse", ["table_name"], ) # --------------------------------------------------------------------------- # Trading / broker metrics # --------------------------------------------------------------------------- ORDERS_SUBMITTED = Counter( "stonks_orders_submitted_total", "Orders submitted to broker", ["side", "order_type", "mode"], ) ORDERS_REJECTED = Counter( "stonks_orders_rejected_total", "Orders rejected before broker submission", ["reason_category"], ) ORDERS_FILLED = Counter( "stonks_orders_filled_total", "Orders filled by broker", ["side"], ) ORDERS_DUPLICATES_PREVENTED = Counter( "stonks_orders_duplicates_prevented_total", "Duplicate orders prevented by idempotency checks", ["detected_via"], ) RISK_EVALUATIONS_TOTAL = Counter( "stonks_risk_evaluations_total", "Risk evaluations performed", ["result"], ) RISK_CHECK_FAILURES = Counter( "stonks_risk_check_failures_total", "Individual risk check failures", ["check_name"], ) POSITIONS_SYNCED = Counter( "stonks_positions_synced_total", "Position sync operations completed", ) # --------------------------------------------------------------------------- # Active gauges # --------------------------------------------------------------------------- ACTIVE_JOBS = Gauge( "stonks_active_jobs", "Currently processing jobs by stage", ["stage"], ) # --------------------------------------------------------------------------- # Alerting metrics # --------------------------------------------------------------------------- ALERTS_FIRED = Counter( "stonks_alerts_fired_total", "Total alerts fired by rule", ["rule", "severity"], ) ALERTS_RESOLVED = Counter( "stonks_alerts_resolved_total", "Total alerts resolved by rule", ["rule"], ) ALERT_CHECK_DURATION = Histogram( "stonks_alert_check_duration_seconds", "Duration of alert evaluation cycle", buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5), ) ALERT_ACTIVE = Gauge( "stonks_alert_active", "Whether an alert rule is currently firing (1) or resolved (0)", ["rule"], ) # --------------------------------------------------------------------------- # Dead-letter queue metrics # --------------------------------------------------------------------------- DLQ_ITEMS_TOTAL = Counter( "stonks_dlq_items_total", "Jobs sent to dead-letter queues", ["queue"], ) DLQ_REPLAYED_TOTAL = Counter( "stonks_dlq_replayed_total", "Jobs replayed from dead-letter queues", ["queue"], ) DLQ_DEPTH = Gauge( "stonks_dlq_depth", "Current dead-letter queue depth", ["queue"], )