Files
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

377 lines
13 KiB
Python

"""Tests for the EndpointProfiler timing wrapper."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import httpx
import pytest
from tests.integration.profiler import SLOW_THRESHOLD_MS, EndpointProfiler
# ---------------------------------------------------------------------------
# Percentile calculation
# ---------------------------------------------------------------------------
class TestPercentile:
"""Unit tests for EndpointProfiler.percentile."""
def test_single_value(self) -> None:
assert EndpointProfiler.percentile([42.0], 50) == 42.0
assert EndpointProfiler.percentile([42.0], 99) == 42.0
def test_empty_list(self) -> None:
assert EndpointProfiler.percentile([], 50) == 0.0
def test_two_values(self) -> None:
p50 = EndpointProfiler.percentile([10.0, 20.0], 50)
assert 10.0 <= p50 <= 20.0
def test_known_distribution(self) -> None:
"""100 evenly spaced values — P50 ≈ 50, P95 ≈ 95, P99 ≈ 99."""
values = [float(i) for i in range(1, 101)]
p50 = EndpointProfiler.percentile(values, 50)
p95 = EndpointProfiler.percentile(values, 95)
p99 = EndpointProfiler.percentile(values, 99)
assert 45 <= p50 <= 55
assert 90 <= p95 <= 100
assert 95 <= p99 <= 100
def test_unsorted_input(self) -> None:
"""Percentile should sort internally."""
values = [100.0, 1.0, 50.0, 25.0, 75.0]
p50 = EndpointProfiler.percentile(values, 50)
assert 25.0 <= p50 <= 75.0
# ---------------------------------------------------------------------------
# Record / track
# ---------------------------------------------------------------------------
class TestRecord:
"""Tests for manual recording."""
def test_record_adds_timing(self) -> None:
p = EndpointProfiler()
p.record("GET /api/foo", 12.5)
p.record("GET /api/foo", 15.0)
assert len(p._timings["GET /api/foo"]) == 2
def test_record_multiple_endpoints(self) -> None:
p = EndpointProfiler()
p.record("GET /a", 10.0)
p.record("GET /b", 20.0)
assert "GET /a" in p._timings
assert "GET /b" in p._timings
@pytest.mark.asyncio
class TestTrack:
"""Tests for the async context manager."""
async def test_track_records_positive_time(self) -> None:
p = EndpointProfiler()
async with p.track("GET /api/test"):
pass # near-zero but positive
assert len(p._timings["GET /api/test"]) == 1
assert p._timings["GET /api/test"][0] >= 0
async def test_track_records_on_exception(self) -> None:
"""Timing is recorded even if the wrapped code raises."""
p = EndpointProfiler()
with pytest.raises(ValueError):
async with p.track("GET /api/fail"):
raise ValueError("boom")
assert len(p._timings["GET /api/fail"]) == 1
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------
class TestSummary:
"""Tests for the summary dict."""
def test_empty_profiler(self) -> None:
p = EndpointProfiler()
s = p.summary()
assert s["endpoints"] == {}
assert s["slow_endpoints"] == []
assert s["total_requests"] == 0
assert s["total_duration_ms"] == 0.0
def test_summary_structure(self) -> None:
p = EndpointProfiler()
for ms in [10, 20, 30, 40, 50]:
p.record("GET /api/companies", float(ms))
s = p.summary()
ep = s["endpoints"]["GET /api/companies"]
assert ep["count"] == 5
assert "p50_ms" in ep
assert "p95_ms" in ep
assert "p99_ms" in ep
assert "mean_ms" in ep
assert s["total_requests"] == 5
def test_slow_endpoint_flagged(self) -> None:
p = EndpointProfiler()
p.record("GET /slow", SLOW_THRESHOLD_MS + 100)
s = p.summary()
assert "GET /slow" in s["slow_endpoints"]
def test_fast_endpoint_not_flagged(self) -> None:
p = EndpointProfiler()
p.record("GET /fast", 10.0)
s = p.summary()
assert s["slow_endpoints"] == []
def test_total_duration(self) -> None:
p = EndpointProfiler()
p.record("GET /a", 100.0)
p.record("GET /b", 200.0)
s = p.summary()
assert s["total_duration_ms"] == 300.0
# ---------------------------------------------------------------------------
# print_summary (smoke test — just ensure no crash)
# ---------------------------------------------------------------------------
class TestPrintSummary:
def test_print_empty(self, capsys: pytest.CaptureFixture[str]) -> None:
p = EndpointProfiler()
p.print_summary()
out = capsys.readouterr().out
assert "No profiling data" in out
def test_print_with_data(self, capsys: pytest.CaptureFixture[str]) -> None:
p = EndpointProfiler()
p.record("GET /api/companies", 12.0)
p.record("GET /api/companies", 25.0)
p.record("GET /slow", 600.0)
p.print_summary()
out = capsys.readouterr().out
assert "GET /api/companies" in out
assert "SLOW" in out
# ---------------------------------------------------------------------------
# write_json
# ---------------------------------------------------------------------------
class TestWriteJson:
def test_write_creates_file(self) -> None:
p = EndpointProfiler()
p.record("GET /api/test", 42.0)
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "report.json"
p.write_json(path)
assert path.exists()
data = json.loads(path.read_text())
assert "endpoints" in data
assert "GET /api/test" in data["endpoints"]
def test_write_creates_parent_dirs(self) -> None:
p = EndpointProfiler()
p.record("GET /x", 1.0)
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "sub" / "dir" / "report.json"
p.write_json(path)
assert path.exists()
def test_json_matches_summary(self) -> None:
p = EndpointProfiler()
p.record("GET /a", 10.0)
p.record("GET /a", 20.0)
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "out.json"
p.write_json(path)
from_file = json.loads(path.read_text())
assert from_file == p.summary()
# ---------------------------------------------------------------------------
# ProfiledAsyncClient
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestProfiledAsyncClient:
"""Tests for the ProfiledAsyncClient wrapper in conftest."""
async def test_get_records_timing(self) -> None:
"""GET requests are timed and recorded in the profiler."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_response = AsyncMock()
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.get = AsyncMock(return_value=mock_response)
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.get("/api/companies")
mock_client.get.assert_awaited_once_with("/api/companies")
assert "GET /api/companies" in profiler._timings
assert len(profiler._timings["GET /api/companies"]) == 1
assert profiler._timings["GET /api/companies"][0] >= 0
async def test_post_records_timing(self) -> None:
"""POST requests are timed and recorded in the profiler."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_response = AsyncMock()
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.post = AsyncMock(return_value=mock_response)
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.post("/api/orders", json={"ticker": "AAPL"})
mock_client.post.assert_awaited_once_with(
"/api/orders", json={"ticker": "AAPL"},
)
assert "POST /api/orders" in profiler._timings
assert len(profiler._timings["POST /api/orders"]) == 1
async def test_put_records_timing(self) -> None:
"""PUT requests are timed and recorded."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.put = AsyncMock(return_value=AsyncMock())
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.put("/api/config", json={"key": "val"})
assert "PUT /api/config" in profiler._timings
async def test_delete_records_timing(self) -> None:
"""DELETE requests are timed and recorded."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.delete = AsyncMock(return_value=AsyncMock())
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.delete("/api/items/123")
assert "DELETE /api/items/123" in profiler._timings
async def test_patch_records_timing(self) -> None:
"""PATCH requests are timed and recorded."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.patch = AsyncMock(return_value=AsyncMock())
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.patch("/api/items/123", json={"name": "new"})
assert "PATCH /api/items/123" in profiler._timings
async def test_multiple_requests_accumulate(self) -> None:
"""Multiple requests to the same endpoint accumulate timings."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.get = AsyncMock(return_value=AsyncMock())
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.get("/api/companies")
await wrapped.get("/api/companies")
await wrapped.get("/api/companies")
assert len(profiler._timings["GET /api/companies"]) == 3
async def test_attribute_forwarding(self) -> None:
"""Non-HTTP attributes are forwarded to the underlying client."""
from unittest.mock import AsyncMock, PropertyMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
type(mock_client).base_url = PropertyMock(
return_value="http://localhost:8000",
)
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
assert wrapped.base_url == "http://localhost:8000"
async def test_summary_reflects_profiled_requests(self) -> None:
"""The profiler summary includes data from profiled client requests."""
from unittest.mock import AsyncMock
from tests.integration.conftest import ProfiledAsyncClient
mock_client = AsyncMock(spec=httpx.AsyncClient)
mock_client.get = AsyncMock(return_value=AsyncMock())
profiler = EndpointProfiler()
wrapped = ProfiledAsyncClient(mock_client, profiler)
await wrapped.get("/api/companies")
await wrapped.get("/api/trends")
summary = profiler.summary()
assert "GET /api/companies" in summary["endpoints"]
assert "GET /api/trends" in summary["endpoints"]
assert summary["total_requests"] == 2
# ---------------------------------------------------------------------------
# Profiling plugin (conftest_profiling.py)
# ---------------------------------------------------------------------------
class TestProfilingPlugin:
"""Tests for the pytest profiling plugin hooks."""
def test_plugin_registers_profiling_output_option(self, pytestconfig: pytest.Config) -> None:
"""The --profiling-output option is registered by the plugin."""
# The option should be available since conftest_profiling is loaded
val = pytestconfig.getoption("profiling_output", None)
# Default value or whatever was passed on the CLI
assert val is not None
def test_profiler_fixture_returns_endpoint_profiler(self, profiler: EndpointProfiler) -> None:
"""The session-scoped profiler fixture returns an EndpointProfiler."""
assert isinstance(profiler, EndpointProfiler)
def test_profiler_fixture_is_shared(self, profiler: EndpointProfiler) -> None:
"""Recording data via the fixture is visible in the summary."""
profiler.record("TEST /plugin-check", 1.0)
assert "TEST /plugin-check" in profiler._timings