"""Structured logging and distributed tracing for all Stonks Oracle services. Provides: - JSON-formatted structured log output for machine-parseable log aggregation - Trace context (trace_id, span_id, service) propagated through log records - Context manager for creating trace spans within a service - Helper to configure logging for any service worker or API Requirements: 12.1 Design: Section 12 (Observability and Operations) """ from __future__ import annotations import json import logging import time import uuid from contextvars import ContextVar from datetime import datetime, timezone from typing import Any # --------------------------------------------------------------------------- # Trace context stored in contextvars for async-safe propagation # --------------------------------------------------------------------------- _trace_id: ContextVar[str] = ContextVar("trace_id", default="") _span_id: ContextVar[str] = ContextVar("span_id", default="") _service_name: ContextVar[str] = ContextVar("service_name", default="unknown") def get_trace_id() -> str: return _trace_id.get() def get_span_id() -> str: return _span_id.get() def get_service_name() -> str: return _service_name.get() def set_trace_context( trace_id: str | None = None, span_id: str | None = None, service: str | None = None, ) -> None: """Set trace context for the current async task / thread.""" if trace_id is not None: _trace_id.set(trace_id) if span_id is not None: _span_id.set(span_id) if service is not None: _service_name.set(service) def new_trace_id() -> str: return uuid.uuid4().hex[:16] def new_span_id() -> str: return uuid.uuid4().hex[:8] # --------------------------------------------------------------------------- # Span context manager for tracing within a service # --------------------------------------------------------------------------- class Span: """Lightweight span for distributed tracing. Usage:: with Span("process_document", ticker="AAPL") as span: # ... do work ... span.set_attribute("doc_count", 5) On exit the span logs its duration and attributes as a structured event. """ def __init__(self, operation: str, **attributes: Any) -> None: self.operation = operation self.parent_span_id = get_span_id() self.span_id = new_span_id() self.trace_id = get_trace_id() or new_trace_id() self.attributes: dict[str, Any] = dict(attributes) self.start_time: float = 0.0 self.duration_ms: float = 0.0 self._token_trace: Any = None self._token_span: Any = None self._logger = logging.getLogger(get_service_name() or "tracing") def set_attribute(self, key: str, value: Any) -> None: self.attributes[key] = value def __enter__(self) -> Span: self.start_time = time.monotonic() self._token_trace = _trace_id.set(self.trace_id) self._token_span = _span_id.set(self.span_id) return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.duration_ms = (time.monotonic() - self.start_time) * 1000 status = "error" if exc_type else "ok" self._logger.info( "span.end", extra={ "span_operation": self.operation, "span_status": status, "span_duration_ms": round(self.duration_ms, 2), "span_parent_id": self.parent_span_id, "span_attributes": self.attributes, }, ) # Restore parent span context if self._token_span is not None: _span_id.reset(self._token_span) if self._token_trace is not None: _trace_id.reset(self._token_trace) # --------------------------------------------------------------------------- # JSON log formatter # --------------------------------------------------------------------------- class JSONFormatter(logging.Formatter): """Emit each log record as a single JSON line with trace context.""" def format(self, record: logging.LogRecord) -> str: log_entry: dict[str, Any] = { "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), "service": get_service_name(), "trace_id": get_trace_id(), "span_id": get_span_id(), } # Merge extra fields from Span or manual extra={} usage for key in ( "span_operation", "span_status", "span_duration_ms", "span_parent_id", "span_attributes", "ticker", "document_id", "source_type", "job_id", "duration_ms", "error", "count", ): val = getattr(record, key, None) if val is not None: log_entry[key] = val if record.exc_info and record.exc_info[1]: log_entry["exception"] = self.formatException(record.exc_info) return json.dumps(log_entry, default=str) # --------------------------------------------------------------------------- # Setup helper # --------------------------------------------------------------------------- def setup_logging( service_name: str, level: str = "INFO", json_output: bool = True, ) -> None: """Configure structured logging for a service. Call this once at service startup (before any log calls). Args: service_name: Identifies this service in log output (e.g. "ingestion_worker"). level: Log level string (DEBUG, INFO, WARNING, ERROR). json_output: If True, emit JSON lines. If False, use a human-readable format. """ _service_name.set(service_name) root = logging.getLogger() root.setLevel(getattr(logging, level.upper(), logging.INFO)) # Remove existing handlers to avoid duplicate output root.handlers.clear() handler = logging.StreamHandler() if json_output: handler.setFormatter(JSONFormatter()) else: handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s (%(service)s) " "trace=%(trace_id)s span=%(span_id)s — %(message)s", defaults={"service": service_name, "trace_id": "", "span_id": ""}, )) root.addHandler(handler) # --------------------------------------------------------------------------- # Trace context propagation through job payloads # --------------------------------------------------------------------------- def inject_trace_context(payload: dict[str, Any]) -> dict[str, Any]: """Inject current trace context into a job payload dict. Call this before enqueuing a job to Redis so the downstream worker can continue the same trace. """ trace_id = get_trace_id() if trace_id: payload["_trace_id"] = trace_id return payload def extract_trace_context(payload: dict[str, Any]) -> None: """Extract and set trace context from an incoming job payload. Call this at the start of job processing. If no trace context is present, generates a new trace_id. """ trace_id = payload.get("_trace_id") or new_trace_id() set_trace_context(trace_id=trace_id, span_id=new_span_id())