"""Tests for the EndpointProfiler timing wrapper.""" from __future__ import annotations import json import tempfile from pathlib import Path import httpx import pytest from tests.integration.profiler import SLOW_THRESHOLD_MS, EndpointProfiler # --------------------------------------------------------------------------- # Percentile calculation # --------------------------------------------------------------------------- class TestPercentile: """Unit tests for EndpointProfiler.percentile.""" def test_single_value(self) -> None: assert EndpointProfiler.percentile([42.0], 50) == 42.0 assert EndpointProfiler.percentile([42.0], 99) == 42.0 def test_empty_list(self) -> None: assert EndpointProfiler.percentile([], 50) == 0.0 def test_two_values(self) -> None: p50 = EndpointProfiler.percentile([10.0, 20.0], 50) assert 10.0 <= p50 <= 20.0 def test_known_distribution(self) -> None: """100 evenly spaced values — P50 ≈ 50, P95 ≈ 95, P99 ≈ 99.""" values = [float(i) for i in range(1, 101)] p50 = EndpointProfiler.percentile(values, 50) p95 = EndpointProfiler.percentile(values, 95) p99 = EndpointProfiler.percentile(values, 99) assert 45 <= p50 <= 55 assert 90 <= p95 <= 100 assert 95 <= p99 <= 100 def test_unsorted_input(self) -> None: """Percentile should sort internally.""" values = [100.0, 1.0, 50.0, 25.0, 75.0] p50 = EndpointProfiler.percentile(values, 50) assert 25.0 <= p50 <= 75.0 # --------------------------------------------------------------------------- # Record / track # --------------------------------------------------------------------------- class TestRecord: """Tests for manual recording.""" def test_record_adds_timing(self) -> None: p = EndpointProfiler() p.record("GET /api/foo", 12.5) p.record("GET /api/foo", 15.0) assert len(p._timings["GET /api/foo"]) == 2 def test_record_multiple_endpoints(self) -> None: p = EndpointProfiler() p.record("GET /a", 10.0) p.record("GET /b", 20.0) assert "GET /a" in p._timings assert "GET /b" in p._timings @pytest.mark.asyncio class TestTrack: """Tests for the async context manager.""" async def test_track_records_positive_time(self) -> None: p = EndpointProfiler() async with p.track("GET /api/test"): pass # near-zero but positive assert len(p._timings["GET /api/test"]) == 1 assert p._timings["GET /api/test"][0] >= 0 async def test_track_records_on_exception(self) -> None: """Timing is recorded even if the wrapped code raises.""" p = EndpointProfiler() with pytest.raises(ValueError): async with p.track("GET /api/fail"): raise ValueError("boom") assert len(p._timings["GET /api/fail"]) == 1 # --------------------------------------------------------------------------- # Summary # --------------------------------------------------------------------------- class TestSummary: """Tests for the summary dict.""" def test_empty_profiler(self) -> None: p = EndpointProfiler() s = p.summary() assert s["endpoints"] == {} assert s["slow_endpoints"] == [] assert s["total_requests"] == 0 assert s["total_duration_ms"] == 0.0 def test_summary_structure(self) -> None: p = EndpointProfiler() for ms in [10, 20, 30, 40, 50]: p.record("GET /api/companies", float(ms)) s = p.summary() ep = s["endpoints"]["GET /api/companies"] assert ep["count"] == 5 assert "p50_ms" in ep assert "p95_ms" in ep assert "p99_ms" in ep assert "mean_ms" in ep assert s["total_requests"] == 5 def test_slow_endpoint_flagged(self) -> None: p = EndpointProfiler() p.record("GET /slow", SLOW_THRESHOLD_MS + 100) s = p.summary() assert "GET /slow" in s["slow_endpoints"] def test_fast_endpoint_not_flagged(self) -> None: p = EndpointProfiler() p.record("GET /fast", 10.0) s = p.summary() assert s["slow_endpoints"] == [] def test_total_duration(self) -> None: p = EndpointProfiler() p.record("GET /a", 100.0) p.record("GET /b", 200.0) s = p.summary() assert s["total_duration_ms"] == 300.0 # --------------------------------------------------------------------------- # print_summary (smoke test — just ensure no crash) # --------------------------------------------------------------------------- class TestPrintSummary: def test_print_empty(self, capsys: pytest.CaptureFixture[str]) -> None: p = EndpointProfiler() p.print_summary() out = capsys.readouterr().out assert "No profiling data" in out def test_print_with_data(self, capsys: pytest.CaptureFixture[str]) -> None: p = EndpointProfiler() p.record("GET /api/companies", 12.0) p.record("GET /api/companies", 25.0) p.record("GET /slow", 600.0) p.print_summary() out = capsys.readouterr().out assert "GET /api/companies" in out assert "SLOW" in out # --------------------------------------------------------------------------- # write_json # --------------------------------------------------------------------------- class TestWriteJson: def test_write_creates_file(self) -> None: p = EndpointProfiler() p.record("GET /api/test", 42.0) with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "report.json" p.write_json(path) assert path.exists() data = json.loads(path.read_text()) assert "endpoints" in data assert "GET /api/test" in data["endpoints"] def test_write_creates_parent_dirs(self) -> None: p = EndpointProfiler() p.record("GET /x", 1.0) with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "sub" / "dir" / "report.json" p.write_json(path) assert path.exists() def test_json_matches_summary(self) -> None: p = EndpointProfiler() p.record("GET /a", 10.0) p.record("GET /a", 20.0) with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "out.json" p.write_json(path) from_file = json.loads(path.read_text()) assert from_file == p.summary() # --------------------------------------------------------------------------- # ProfiledAsyncClient # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestProfiledAsyncClient: """Tests for the ProfiledAsyncClient wrapper in conftest.""" async def test_get_records_timing(self) -> None: """GET requests are timed and recorded in the profiler.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_response = AsyncMock() mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.get = AsyncMock(return_value=mock_response) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.get("/api/companies") mock_client.get.assert_awaited_once_with("/api/companies") assert "GET /api/companies" in profiler._timings assert len(profiler._timings["GET /api/companies"]) == 1 assert profiler._timings["GET /api/companies"][0] >= 0 async def test_post_records_timing(self) -> None: """POST requests are timed and recorded in the profiler.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_response = AsyncMock() mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.post = AsyncMock(return_value=mock_response) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.post("/api/orders", json={"ticker": "AAPL"}) mock_client.post.assert_awaited_once_with( "/api/orders", json={"ticker": "AAPL"}, ) assert "POST /api/orders" in profiler._timings assert len(profiler._timings["POST /api/orders"]) == 1 async def test_put_records_timing(self) -> None: """PUT requests are timed and recorded.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.put = AsyncMock(return_value=AsyncMock()) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.put("/api/config", json={"key": "val"}) assert "PUT /api/config" in profiler._timings async def test_delete_records_timing(self) -> None: """DELETE requests are timed and recorded.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.delete = AsyncMock(return_value=AsyncMock()) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.delete("/api/items/123") assert "DELETE /api/items/123" in profiler._timings async def test_patch_records_timing(self) -> None: """PATCH requests are timed and recorded.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.patch = AsyncMock(return_value=AsyncMock()) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.patch("/api/items/123", json={"name": "new"}) assert "PATCH /api/items/123" in profiler._timings async def test_multiple_requests_accumulate(self) -> None: """Multiple requests to the same endpoint accumulate timings.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.get = AsyncMock(return_value=AsyncMock()) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.get("/api/companies") await wrapped.get("/api/companies") await wrapped.get("/api/companies") assert len(profiler._timings["GET /api/companies"]) == 3 async def test_attribute_forwarding(self) -> None: """Non-HTTP attributes are forwarded to the underlying client.""" from unittest.mock import AsyncMock, PropertyMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) type(mock_client).base_url = PropertyMock( return_value="http://localhost:8000", ) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) assert wrapped.base_url == "http://localhost:8000" async def test_summary_reflects_profiled_requests(self) -> None: """The profiler summary includes data from profiled client requests.""" from unittest.mock import AsyncMock from tests.integration.conftest import ProfiledAsyncClient mock_client = AsyncMock(spec=httpx.AsyncClient) mock_client.get = AsyncMock(return_value=AsyncMock()) profiler = EndpointProfiler() wrapped = ProfiledAsyncClient(mock_client, profiler) await wrapped.get("/api/companies") await wrapped.get("/api/trends") summary = profiler.summary() assert "GET /api/companies" in summary["endpoints"] assert "GET /api/trends" in summary["endpoints"] assert summary["total_requests"] == 2 # --------------------------------------------------------------------------- # Profiling plugin (conftest_profiling.py) # --------------------------------------------------------------------------- class TestProfilingPlugin: """Tests for the pytest profiling plugin hooks.""" def test_plugin_registers_profiling_output_option(self, pytestconfig: pytest.Config) -> None: """The --profiling-output option is registered by the plugin.""" # The option should be available since conftest_profiling is loaded val = pytestconfig.getoption("profiling_output", None) # Default value or whatever was passed on the CLI assert val is not None def test_profiler_fixture_returns_endpoint_profiler(self, profiler: EndpointProfiler) -> None: """The session-scoped profiler fixture returns an EndpointProfiler.""" assert isinstance(profiler, EndpointProfiler) def test_profiler_fixture_is_shared(self, profiler: EndpointProfiler) -> None: """Recording data via the fixture is visible in the summary.""" profiler.record("TEST /plugin-check", 1.0) assert "TEST /plugin-check" in profiler._timings