"""Profiling utilities for integration test endpoint latency measurement. Records per-endpoint timing data and produces summary reports with P50/P95/P99 percentiles. Flags endpoints exceeding 500ms as slow. Usage as a pytest fixture (add to conftest.py): @pytest.fixture def profiler(): p = EndpointProfiler() yield p p.print_summary() Usage as a context manager around httpx calls: async with profiler.track("GET /api/companies"): resp = await client.get("/api/companies") """ from __future__ import annotations import json import statistics import time from collections import defaultdict from contextlib import asynccontextmanager from dataclasses import dataclass, field from pathlib import Path from typing import AsyncIterator SLOW_THRESHOLD_MS = 500.0 @dataclass class EndpointProfiler: """Collects per-endpoint latency samples and produces summary reports.""" _timings: dict[str, list[float]] = field( default_factory=lambda: defaultdict(list) ) @asynccontextmanager async def track(self, endpoint: str) -> AsyncIterator[None]: """Context manager that records wall-clock time for an endpoint call. Uses ``time.monotonic()`` for accurate, monotonically increasing measurements unaffected by system clock adjustments. """ start = time.monotonic() try: yield finally: elapsed_ms = (time.monotonic() - start) * 1000 self._timings[endpoint].append(elapsed_ms) def record(self, endpoint: str, elapsed_ms: float) -> None: """Manually record a timing sample for *endpoint*.""" self._timings[endpoint].append(elapsed_ms) # ------------------------------------------------------------------ # Percentile helpers # ------------------------------------------------------------------ @staticmethod def percentile(values: list[float], pct: float) -> float: """Compute the *pct*-th percentile from *values*. Uses the same interpolation method as ``statistics.quantiles`` (exclusive / Method 6) but works for any list length ≥ 1. """ if not values: return 0.0 sorted_vals = sorted(values) n = len(sorted_vals) if n == 1: return sorted_vals[0] # Use statistics.quantiles when we have enough data points # quantiles(n=100) gives 99 cut points; index pct-1 is the pct-th # percentile. For very small samples we fall back to simple # nearest-rank. if n >= 2: try: quantile_cuts = statistics.quantiles(sorted_vals, n=100) idx = max(0, min(int(pct) - 1, len(quantile_cuts) - 1)) return quantile_cuts[idx] except statistics.StatisticsError: pass # Fallback: nearest-rank rank = (pct / 100) * (n - 1) lower = int(rank) upper = min(lower + 1, n - 1) weight = rank - lower return sorted_vals[lower] * (1 - weight) + sorted_vals[upper] * weight # ------------------------------------------------------------------ # Summary / reporting # ------------------------------------------------------------------ def summary(self) -> dict: """Return a dict with per-endpoint stats and slow endpoint list. The returned structure matches the JSON contract from the design doc:: { "endpoints": { "GET /api/companies": { "p50_ms": 12, "p95_ms": 25, "p99_ms": 45, "count": 5, "mean_ms": 18 }, ... }, "slow_endpoints": ["POST /evaluate"], "total_requests": 150, "total_duration_ms": 4500.0 } """ endpoints: dict[str, dict] = {} slow_endpoints: list[str] = [] total_requests = 0 total_duration_ms = 0.0 for endpoint, timings in sorted(self._timings.items()): count = len(timings) mean_ms = statistics.mean(timings) if timings else 0.0 p50 = self.percentile(timings, 50) p95 = self.percentile(timings, 95) p99 = self.percentile(timings, 99) endpoints[endpoint] = { "p50_ms": round(p50, 2), "p95_ms": round(p95, 2), "p99_ms": round(p99, 2), "count": count, "mean_ms": round(mean_ms, 2), } if p99 > SLOW_THRESHOLD_MS: slow_endpoints.append(endpoint) total_requests += count total_duration_ms += sum(timings) return { "endpoints": endpoints, "slow_endpoints": slow_endpoints, "total_requests": total_requests, "total_duration_ms": round(total_duration_ms, 2), } def print_summary(self) -> None: """Print a human-readable summary table to stdout.""" data = self.summary() endpoints = data["endpoints"] if not endpoints: print("No profiling data recorded.") return # Header header = ( f"{'Endpoint':<40} {'Count':>5} {'P50':>7} {'P95':>7} " f"{'P99':>7} {'Slow?':>8}" ) separator = "\u2500" * len(header) print() print(header) print(separator) for name, stats in endpoints.items(): slow_marker = "\u26a0 SLOW" if name in data["slow_endpoints"] else "" print( f"{name:<40} {stats['count']:>5} " f"{stats['p50_ms']:>5.0f}ms " f"{stats['p95_ms']:>5.0f}ms " f"{stats['p99_ms']:>5.0f}ms " f"{slow_marker:>8}" ) print(separator) print( f"Total requests: {data['total_requests']} " f"Total duration: {data['total_duration_ms']:.0f}ms" ) if data["slow_endpoints"]: print( f"\u26a0 Slow endpoints (P99 > {SLOW_THRESHOLD_MS:.0f}ms): " + ", ".join(data["slow_endpoints"]) ) print() def write_json(self, path: str | Path) -> None: """Write the summary as JSON to *path*.""" dest = Path(path) dest.parent.mkdir(parents=True, exist_ok=True) dest.write_text(json.dumps(self.summary(), indent=2) + "\n")