Files
stonks-oracle/tests/test_pbt_trading_window.py
Celes Renata 4ffde8cc06 feat: autonomous trading engine — full implementation
- Database migration 018 with 13 tables for trading engine state
- Trading engine service (services/trading/) with 12 pure computation modules:
  position sizer, stop-loss manager, reserve pool, circuit breaker,
  risk tier controller, correlation matrix, tax lots, trading window,
  gradual entry, notifications, micro-trading, backtester
- Core TradingEngine with pre-trade evaluation pipeline and integration wiring
- FastAPI HTTP service with 14 endpoints (health, config, decisions, metrics, backtest)
- Performance tracker with Sharpe ratio, drawdown, profit factor computation
- 194 Python tests (165 property-based + 29 integration)
- Frontend: 13 TanStack Query hooks, 7 dashboard panels, tabbed Trading Engine page
- Helm chart entry, network policy, nginx proxy, ingress for trading-engine
- Shared infrastructure: enums, Redis keys, TradingConfig in AppConfig
2026-04-15 16:12:22 +00:00

318 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Property-based tests for Trading Window and Gradual Entry.
Feature: autonomous-trading-engine
Tests properties 20 and 21 from the design specification, covering
trading window determination (9:45 AM 3:45 PM ET on weekdays) and
gradual entry tranche splitting.
"""
from __future__ import annotations
from datetime import datetime, time
from hypothesis import given, settings
from hypothesis import strategies as st
from services.trading.gradual_entry import (
create_tranches,
should_use_gradual_entry,
split_into_tranches,
)
from services.trading.trading_window import (
ET,
MARKET_OPEN,
WINDOW_CLOSE,
WINDOW_OPEN,
is_market_open,
is_within_trading_window,
next_window_open,
)
# ---------------------------------------------------------------------------
# Hypothesis strategies
# ---------------------------------------------------------------------------
_WEEKDAYS = range(0, 5) # MonFri
def _et_datetime_in_window() -> st.SearchStrategy[datetime]:
"""Generate a timezone-aware datetime that is inside the trading window."""
return (
st.dates(
min_value=datetime(2024, 1, 1).date(),
max_value=datetime(2025, 12, 31).date(),
)
.filter(lambda d: d.weekday() in _WEEKDAYS)
.flatmap(
lambda d: st.times(
min_value=WINDOW_OPEN,
max_value=time(15, 44, 59),
).map(lambda t: datetime.combine(d, t, tzinfo=ET))
)
)
def _et_datetime_outside_window_weekday() -> st.SearchStrategy[datetime]:
"""Generate a weekday datetime that is outside the trading window.
Either before 9:45 AM ET or at/after 3:45 PM ET.
"""
before_open = st.times(min_value=time(0, 0), max_value=time(9, 44, 59))
after_close = st.times(min_value=WINDOW_CLOSE, max_value=time(23, 59, 59))
return (
st.dates(
min_value=datetime(2024, 1, 1).date(),
max_value=datetime(2025, 12, 31).date(),
)
.filter(lambda d: d.weekday() in _WEEKDAYS)
.flatmap(
lambda d: st.one_of(before_open, after_close).map(
lambda t: datetime.combine(d, t, tzinfo=ET)
)
)
)
def _et_datetime_weekend() -> st.SearchStrategy[datetime]:
"""Generate a weekend datetime (Saturday or Sunday)."""
return (
st.dates(
min_value=datetime(2024, 1, 1).date(),
max_value=datetime(2025, 12, 31).date(),
)
.filter(lambda d: d.weekday() >= 5)
.flatmap(
lambda d: st.times().map(
lambda t: datetime.combine(d, t, tzinfo=ET)
)
)
)
# ---------------------------------------------------------------------------
# Property 20: Trading window determination
# **Validates: Requirements 11.1**
# ---------------------------------------------------------------------------
class TestProperty20TradingWindowDetermination:
"""Property 20: Trading window determination.
**Validates: Requirements 11.1**
"""
@settings(max_examples=100)
@given(dt=_et_datetime_in_window())
def test_within_window_on_weekday(self, dt: datetime) -> None:
"""Timestamps between 9:45 AM and 3:45 PM ET on weekdays are within the window."""
assert is_within_trading_window(dt) is True, (
f"Expected within window: {dt} (ET weekday={dt.weekday()}, time={dt.time()})"
)
@settings(max_examples=100)
@given(dt=_et_datetime_outside_window_weekday())
def test_outside_window_on_weekday(self, dt: datetime) -> None:
"""Weekday timestamps before 9:45 AM or at/after 3:45 PM ET are outside the window."""
assert is_within_trading_window(dt) is False, (
f"Expected outside window: {dt} (ET time={dt.time()})"
)
@settings(max_examples=100)
@given(dt=_et_datetime_weekend())
def test_outside_window_on_weekend(self, dt: datetime) -> None:
"""Weekend timestamps are always outside the trading window."""
assert is_within_trading_window(dt) is False, (
f"Expected outside window on weekend: {dt} (weekday={dt.weekday()})"
)
@settings(max_examples=100)
@given(dt=_et_datetime_outside_window_weekday())
def test_next_window_open_is_in_future(self, dt: datetime) -> None:
"""next_window_open always returns a time >= the input when outside the window."""
nwo = next_window_open(dt)
et_dt = dt.astimezone(ET)
et_nwo = nwo.astimezone(ET)
# If we're past today's open, next open must be a future day
if et_dt.time() >= WINDOW_OPEN:
assert et_nwo.date() > et_dt.date(), (
f"Expected future date: nwo={et_nwo}, dt={et_dt}"
)
assert et_nwo.time().hour == WINDOW_OPEN.hour
assert et_nwo.time().minute == WINDOW_OPEN.minute
@settings(max_examples=100)
@given(dt=_et_datetime_weekend())
def test_next_window_open_skips_weekends(self, dt: datetime) -> None:
"""next_window_open from a weekend returns a weekday."""
nwo = next_window_open(dt)
et_nwo = nwo.astimezone(ET)
assert et_nwo.weekday() in _WEEKDAYS, (
f"Expected weekday, got {et_nwo.weekday()} for {et_nwo}"
)
@settings(max_examples=100)
@given(
dt=st.dates(
min_value=datetime(2024, 1, 1).date(),
max_value=datetime(2025, 12, 31).date(),
)
.filter(lambda d: d.weekday() in _WEEKDAYS)
.flatmap(
lambda d: st.times(
min_value=MARKET_OPEN,
max_value=time(15, 59, 59),
).map(lambda t: datetime.combine(d, t, tzinfo=ET))
),
)
def test_is_market_open_during_market_hours(self, dt: datetime) -> None:
"""Timestamps between 9:30 AM and 4:00 PM ET on weekdays are market-open."""
assert is_market_open(dt) is True, (
f"Expected market open: {dt} (time={dt.time()})"
)
@settings(max_examples=100)
@given(dt=_et_datetime_weekend())
def test_is_market_closed_on_weekends(self, dt: datetime) -> None:
"""Weekend timestamps always have market closed."""
assert is_market_open(dt) is False
# ---------------------------------------------------------------------------
# Property 21: Gradual entry tranche splitting
# **Validates: Requirements 11.3, 11.5**
# ---------------------------------------------------------------------------
class TestProperty21GradualEntryTrancheSplitting:
"""Property 21: Gradual entry tranche splitting.
**Validates: Requirements 11.3, 11.5**
"""
# -- should_use_gradual_entry ------------------------------------------
@settings(max_examples=100)
@given(
active_pool=st.floats(min_value=100.0, max_value=100_000.0, allow_nan=False, allow_infinity=False),
threshold_dollars=st.floats(min_value=1.0, max_value=1000.0, allow_nan=False, allow_infinity=False),
excess=st.floats(min_value=0.01, max_value=1000.0, allow_nan=False, allow_infinity=False),
)
def test_gradual_entry_triggered_above_threshold(
self, active_pool: float, threshold_dollars: float, excess: float,
) -> None:
"""Gradual entry is used when position size exceeds min(threshold, 5% of pool)."""
effective = min(threshold_dollars, 0.05 * active_pool)
position_size = effective + excess
assert should_use_gradual_entry(position_size, active_pool, threshold_dollars) is True
@settings(max_examples=100)
@given(
active_pool=st.floats(min_value=100.0, max_value=100_000.0, allow_nan=False, allow_infinity=False),
threshold_dollars=st.floats(min_value=1.0, max_value=1000.0, allow_nan=False, allow_infinity=False),
fraction=st.floats(min_value=0.0, max_value=1.0, allow_nan=False, allow_infinity=False),
)
def test_gradual_entry_not_triggered_at_or_below_threshold(
self, active_pool: float, threshold_dollars: float, fraction: float,
) -> None:
"""Gradual entry is NOT used when position size <= effective threshold."""
effective = min(threshold_dollars, 0.05 * active_pool)
position_size = effective * fraction
assert should_use_gradual_entry(position_size, active_pool, threshold_dollars) is False
# -- split_into_tranches -----------------------------------------------
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
num_tranches=st.integers(min_value=1, max_value=20),
)
def test_tranche_sum_equals_total(
self, total_quantity: int, num_tranches: int,
) -> None:
"""Sum of all tranches must equal the original total quantity."""
tranches = split_into_tranches(total_quantity, num_tranches)
assert sum(tranches) == total_quantity
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
num_tranches=st.integers(min_value=1, max_value=20),
)
def test_tranche_sizes_approximately_equal(
self, total_quantity: int, num_tranches: int,
) -> None:
"""All tranche sizes differ by at most 1."""
tranches = split_into_tranches(total_quantity, num_tranches)
assert max(tranches) - min(tranches) <= 1
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
num_tranches=st.integers(min_value=1, max_value=20),
)
def test_tranche_count_matches_requested(
self, total_quantity: int, num_tranches: int,
) -> None:
"""Number of tranches returned matches the requested count."""
tranches = split_into_tranches(total_quantity, num_tranches)
assert len(tranches) == num_tranches
# -- create_tranches ---------------------------------------------------
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
parent_id=st.text(min_size=1, max_size=36, alphabet=st.characters(whitelist_categories=("L", "N", "Pd"))),
num_tranches=st.integers(min_value=1, max_value=10),
)
def test_all_tranches_reference_same_parent_decision_id(
self, total_quantity: int, parent_id: str, num_tranches: int,
) -> None:
"""Every tranche references the same parent decision ID."""
tranches = create_tranches(total_quantity, parent_id, num_tranches)
for t in tranches:
assert t.parent_decision_id == parent_id
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
parent_id=st.text(min_size=1, max_size=36, alphabet=st.characters(whitelist_categories=("L", "N", "Pd"))),
num_tranches=st.integers(min_value=1, max_value=10),
)
def test_create_tranches_quantity_sum(
self, total_quantity: int, parent_id: str, num_tranches: int,
) -> None:
"""Sum of tranche quantities from create_tranches equals total."""
tranches = create_tranches(total_quantity, parent_id, num_tranches)
assert sum(t.quantity for t in tranches) == total_quantity
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
parent_id=st.text(min_size=1, max_size=36, alphabet=st.characters(whitelist_categories=("L", "N", "Pd"))),
num_tranches=st.integers(min_value=1, max_value=10),
)
def test_create_tranches_indices_sequential(
self, total_quantity: int, parent_id: str, num_tranches: int,
) -> None:
"""Tranche indices are sequential starting from 0."""
tranches = create_tranches(total_quantity, parent_id, num_tranches)
for i, t in enumerate(tranches):
assert t.tranche_index == i
@settings(max_examples=100)
@given(
total_quantity=st.integers(min_value=1, max_value=10_000),
parent_id=st.text(min_size=1, max_size=36, alphabet=st.characters(whitelist_categories=("L", "N", "Pd"))),
num_tranches=st.integers(min_value=1, max_value=10),
)
def test_create_tranches_default_status_pending(
self, total_quantity: int, parent_id: str, num_tranches: int,
) -> None:
"""All tranches start with status 'pending'."""
tranches = create_tranches(total_quantity, parent_id, num_tranches)
for t in tranches:
assert t.status == "pending"