Files
stonks-oracle/services/trading/notifications.py
T
Celes Renata c85c0068a2 fix: clean up utcnow deprecation warnings, fix 12 failing tests, add CI/CD pipeline manifests
- Replace all datetime.utcnow() with datetime.now(tz=timezone.utc) across 8 files
- Fix 12 failing tests to match current implementation behavior
- Fix pytest_plugins in non-top-level conftest (moved to root conftest.py)
- Auto-fix 189 lint issues (import sorting, unused imports)
- Add CI/CD pipeline infrastructure (ARC, ArgoCD, Kargo manifests)
- Add values-beta.yaml and values-paper.yaml for staged deployments
- Update GitHub Actions workflow to use self-hosted-gremlin runners
- Add integration-test job to CI pipeline

Result: 1596 passed, 0 failed, 0 warnings
2026-04-18 03:59:28 +00:00

164 lines
5.3 KiB
Python

"""Notification service for the autonomous trading engine.
Pure computation module for notification logic. Actual delivery (AWS SNS,
Gmail API) is deferred to integration code — this module handles formatting,
rate-limit decisions, and record creation.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from services.trading.models import PerformanceMetrics
# ---------------------------------------------------------------------------
# Supported event types
# ---------------------------------------------------------------------------
SUPPORTED_EVENT_TYPES: frozenset[str] = frozenset(
{
"circuit_breaker_triggered",
"circuit_breaker_resumed",
"risk_tier_changed",
"emergency_liquidation",
"large_trade_pnl",
"daily_summary",
"weekly_digest",
}
)
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class NotificationRecord:
"""A single notification record for audit persistence."""
channel: str
event_type: str
message: str
delivery_status: str = "pending"
retry_count: int = 0
created_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
# ---------------------------------------------------------------------------
# Notification service (pure computation)
# ---------------------------------------------------------------------------
class NotificationService:
"""Pure-computation notification service.
Handles formatting, rate-limit checking, and record creation.
Actual delivery is the caller's responsibility.
"""
def __init__(
self,
sms_enabled: bool = False,
email_enabled: bool = False,
rate_limit_sms_per_hour: int = 10,
rate_limit_email_per_hour: int = 20,
) -> None:
self.sms_enabled = sms_enabled
self.email_enabled = email_enabled
self.rate_limit_sms_per_hour = rate_limit_sms_per_hour
self.rate_limit_email_per_hour = rate_limit_email_per_hour
# ------------------------------------------------------------------
# Rate limiting
# ------------------------------------------------------------------
def should_send(self, channel: str, current_hour_count: int) -> bool:
"""Check whether a notification on *channel* is within the rate limit.
Args:
channel: ``"sms"`` or ``"email"``.
current_hour_count: Number of notifications already sent on this
channel during the current hour window.
Returns:
``True`` if the notification may be sent, ``False`` if it should
be rate-limited.
"""
if channel == "sms":
if not self.sms_enabled:
return False
return current_hour_count < self.rate_limit_sms_per_hour
elif channel == "email":
if not self.email_enabled:
return False
return current_hour_count < self.rate_limit_email_per_hour
return False
# ------------------------------------------------------------------
# Formatting helpers
# ------------------------------------------------------------------
def format_daily_summary(self, metrics: PerformanceMetrics) -> str:
"""Format a daily performance summary message.
Args:
metrics: Current performance metrics snapshot.
Returns:
Human-readable summary string.
"""
total_trades = metrics.win_count + metrics.loss_count
return (
f"Daily Summary — "
f"P&L: ${metrics.daily_pnl:+.2f} | "
f"Portfolio: ${metrics.total_portfolio_value:,.2f} | "
f"Active: ${metrics.active_pool:,.2f} | "
f"Reserve: ${metrics.reserve_pool:,.2f} | "
f"Trades: {total_trades} | "
f"Win Rate: {metrics.win_rate:.0%} | "
f"Heat: {metrics.portfolio_heat:.2%}"
)
def format_alert(self, event_type: str, details: str) -> str:
"""Format an alert message for a specific event type.
Args:
event_type: One of the supported event types.
details: Free-form detail string.
Returns:
Formatted alert string.
"""
label = event_type.replace("_", " ").title()
return f"[Stonks Alert] {label}: {details}"
# ------------------------------------------------------------------
# Record creation
# ------------------------------------------------------------------
def create_notification(
self,
channel: str,
event_type: str,
message: str,
) -> NotificationRecord:
"""Create a ``NotificationRecord`` ready for persistence.
Args:
channel: ``"sms"`` or ``"email"``.
event_type: One of the supported event types.
message: The formatted message body.
Returns:
A new ``NotificationRecord`` with ``delivery_status="pending"``.
"""
return NotificationRecord(
channel=channel,
event_type=event_type,
message=message,
delivery_status="pending",
retry_count=0,
)