225 lines
7.2 KiB
Python
225 lines
7.2 KiB
Python
"""Structured logging and distributed tracing for all Stonks Oracle services.
|
|
|
|
Provides:
|
|
- JSON-formatted structured log output for machine-parseable log aggregation
|
|
- Trace context (trace_id, span_id, service) propagated through log records
|
|
- Context manager for creating trace spans within a service
|
|
- Helper to configure logging for any service worker or API
|
|
|
|
Requirements: 12.1
|
|
Design: Section 12 (Observability and Operations)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
import uuid
|
|
from contextvars import ContextVar
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trace context stored in contextvars for async-safe propagation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_trace_id: ContextVar[str] = ContextVar("trace_id", default="")
|
|
_span_id: ContextVar[str] = ContextVar("span_id", default="")
|
|
_service_name: ContextVar[str] = ContextVar("service_name", default="unknown")
|
|
|
|
|
|
def get_trace_id() -> str:
|
|
return _trace_id.get()
|
|
|
|
|
|
def get_span_id() -> str:
|
|
return _span_id.get()
|
|
|
|
|
|
def get_service_name() -> str:
|
|
return _service_name.get()
|
|
|
|
|
|
def set_trace_context(
|
|
trace_id: str | None = None,
|
|
span_id: str | None = None,
|
|
service: str | None = None,
|
|
) -> None:
|
|
"""Set trace context for the current async task / thread."""
|
|
if trace_id is not None:
|
|
_trace_id.set(trace_id)
|
|
if span_id is not None:
|
|
_span_id.set(span_id)
|
|
if service is not None:
|
|
_service_name.set(service)
|
|
|
|
|
|
def new_trace_id() -> str:
|
|
return uuid.uuid4().hex[:16]
|
|
|
|
|
|
def new_span_id() -> str:
|
|
return uuid.uuid4().hex[:8]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Span context manager for tracing within a service
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class Span:
|
|
"""Lightweight span for distributed tracing.
|
|
|
|
Usage::
|
|
|
|
with Span("process_document", ticker="AAPL") as span:
|
|
# ... do work ...
|
|
span.set_attribute("doc_count", 5)
|
|
|
|
On exit the span logs its duration and attributes as a structured event.
|
|
"""
|
|
|
|
def __init__(self, operation: str, **attributes: Any) -> None:
|
|
self.operation = operation
|
|
self.parent_span_id = get_span_id()
|
|
self.span_id = new_span_id()
|
|
self.trace_id = get_trace_id() or new_trace_id()
|
|
self.attributes: dict[str, Any] = dict(attributes)
|
|
self.start_time: float = 0.0
|
|
self.duration_ms: float = 0.0
|
|
self._token_trace: Any = None
|
|
self._token_span: Any = None
|
|
self._logger = logging.getLogger(get_service_name() or "tracing")
|
|
|
|
def set_attribute(self, key: str, value: Any) -> None:
|
|
self.attributes[key] = value
|
|
|
|
def __enter__(self) -> Span:
|
|
self.start_time = time.monotonic()
|
|
self._token_trace = _trace_id.set(self.trace_id)
|
|
self._token_span = _span_id.set(self.span_id)
|
|
return self
|
|
|
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
self.duration_ms = (time.monotonic() - self.start_time) * 1000
|
|
status = "error" if exc_type else "ok"
|
|
|
|
self._logger.info(
|
|
"span.end",
|
|
extra={
|
|
"span_operation": self.operation,
|
|
"span_status": status,
|
|
"span_duration_ms": round(self.duration_ms, 2),
|
|
"span_parent_id": self.parent_span_id,
|
|
"span_attributes": self.attributes,
|
|
},
|
|
)
|
|
|
|
# Restore parent span context
|
|
if self._token_span is not None:
|
|
_span_id.reset(self._token_span)
|
|
if self._token_trace is not None:
|
|
_trace_id.reset(self._token_trace)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSON log formatter
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class JSONFormatter(logging.Formatter):
|
|
"""Emit each log record as a single JSON line with trace context."""
|
|
|
|
def format(self, record: logging.LogRecord) -> str:
|
|
log_entry: dict[str, Any] = {
|
|
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
|
|
"level": record.levelname,
|
|
"logger": record.name,
|
|
"message": record.getMessage(),
|
|
"service": get_service_name(),
|
|
"trace_id": get_trace_id(),
|
|
"span_id": get_span_id(),
|
|
}
|
|
|
|
# Merge extra fields from Span or manual extra={} usage
|
|
for key in (
|
|
"span_operation", "span_status", "span_duration_ms",
|
|
"span_parent_id", "span_attributes",
|
|
"ticker", "document_id", "source_type", "job_id",
|
|
"duration_ms", "error", "count",
|
|
):
|
|
val = getattr(record, key, None)
|
|
if val is not None:
|
|
log_entry[key] = val
|
|
|
|
if record.exc_info and record.exc_info[1]:
|
|
log_entry["exception"] = self.formatException(record.exc_info)
|
|
|
|
return json.dumps(log_entry, default=str)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Setup helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def setup_logging(
|
|
service_name: str,
|
|
level: str = "INFO",
|
|
json_output: bool = True,
|
|
) -> None:
|
|
"""Configure structured logging for a service.
|
|
|
|
Call this once at service startup (before any log calls).
|
|
|
|
Args:
|
|
service_name: Identifies this service in log output (e.g. "ingestion_worker").
|
|
level: Log level string (DEBUG, INFO, WARNING, ERROR).
|
|
json_output: If True, emit JSON lines. If False, use a human-readable format.
|
|
"""
|
|
_service_name.set(service_name)
|
|
|
|
root = logging.getLogger()
|
|
root.setLevel(getattr(logging, level.upper(), logging.INFO))
|
|
|
|
# Remove existing handlers to avoid duplicate output
|
|
root.handlers.clear()
|
|
|
|
handler = logging.StreamHandler()
|
|
if json_output:
|
|
handler.setFormatter(JSONFormatter())
|
|
else:
|
|
handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(name)s (%(service)s) "
|
|
"trace=%(trace_id)s span=%(span_id)s — %(message)s",
|
|
defaults={"service": service_name, "trace_id": "", "span_id": ""},
|
|
))
|
|
root.addHandler(handler)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Trace context propagation through job payloads
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def inject_trace_context(payload: dict[str, Any]) -> dict[str, Any]:
|
|
"""Inject current trace context into a job payload dict.
|
|
|
|
Call this before enqueuing a job to Redis so the downstream
|
|
worker can continue the same trace.
|
|
"""
|
|
trace_id = get_trace_id()
|
|
if trace_id:
|
|
payload["_trace_id"] = trace_id
|
|
return payload
|
|
|
|
|
|
def extract_trace_context(payload: dict[str, Any]) -> None:
|
|
"""Extract and set trace context from an incoming job payload.
|
|
|
|
Call this at the start of job processing. If no trace context
|
|
is present, generates a new trace_id.
|
|
"""
|
|
trace_id = payload.get("_trace_id") or new_trace_id()
|
|
set_trace_context(trace_id=trace_id, span_id=new_span_id())
|