Files
stonks-oracle/services/trading/notification_dispatch.py
T
Celes Renata 70bad7709a feat: wire live decision loop and enable paper trading
Phase 2 of the autonomous trading engine:

- Replace start()/stop() stubs with real async implementations
- Decision loop: polls recommendations from PostgreSQL, deduplicates
  via Redis, evaluates through the full pipeline, submits orders to
  stonks:queue:broker_orders
- Stop-loss monitor: fetches prices from Polygon API, checks crossings,
  submits immediate sell orders, safety sell after 15 min without data
- Performance loop: computes metrics every 5 min during market hours,
  persists daily snapshots at market close
- Risk tier scheduler: evaluates daily at 16:00 ET, persists tier changes
- Rebalance scheduler: evaluates Monday 09:45 ET, respects circuit breaker
- Notification dispatch: SNS + Gmail with rate limiting and retry
- Backtest replay: fetches historical data, simulates decisions, persists
- Real asyncpg/redis connections in FastAPI lifespan (graceful degradation)
- Migration 019: enable paper trading with conservative tier, 5 cap
- Added max_open_positions to TradingConfig with env var loading
- Phase 2 tasks added to autonomous-trading-engine spec
2026-04-15 20:52:28 +00:00

349 lines
12 KiB
Python

"""Notification dispatch for the autonomous trading engine.
Handles actual delivery of notifications via AWS SNS (SMS) and Gmail API
(email), with rate limiting via Redis, retry with exponential backoff,
and persistence to the notifications table.
Task 31: Wire notification dispatch.
boto3 and google-api-python-client are optional dependencies — the module
logs a warning and degrades gracefully if they are not installed.
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from services.shared.config import TradingConfig
from services.shared.redis_keys import trading_notification_rate_key
logger = logging.getLogger("trading_engine.notifications")
# Conditionally import boto3
try:
import boto3
_HAS_BOTO3 = True
except ImportError:
_HAS_BOTO3 = False
logger.info("boto3 not installed — SNS notifications disabled")
# Conditionally import Google API client
try:
import base64
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build as google_build
_HAS_GOOGLE = True
except ImportError:
_HAS_GOOGLE = False
logger.info("google-api-python-client not installed — Gmail notifications disabled")
class NotificationDispatcher:
"""Routes notification delivery to enabled channels.
Accepts pool (asyncpg), redis (redis.asyncio), and TradingConfig.
Persists every notification attempt to the ``notifications`` table.
Rate-limits via Redis counters with 1-hour TTL.
Retries failed deliveries with exponential backoff (1s, 2s, 4s).
"""
# Rate limits per hour
SMS_RATE_LIMIT = 10
EMAIL_RATE_LIMIT = 20
# Retry config
MAX_RETRIES = 3
RETRY_DELAYS = [1, 2, 4] # seconds
def __init__(
self,
pool: object,
redis: object,
config: TradingConfig,
) -> None:
self.pool = pool
self.redis = redis
self.config = config
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
async def dispatch(self, event_type: str, message: str) -> None:
"""Route notification to all enabled channels.
Runs delivery in a background task so it never blocks trading.
"""
asyncio.create_task(self._dispatch_impl(event_type, message))
async def _dispatch_impl(self, event_type: str, message: str) -> None:
"""Internal dispatch — sends to enabled channels."""
# SNS (SMS)
if self.config.sns_topic_arn:
await self._deliver_with_retry("sms", event_type, message, self._send_sns)
# Gmail (email)
if self.config.gmail_recipient:
await self._deliver_with_retry("email", event_type, message, self._send_gmail)
async def _deliver_with_retry(
self,
channel: str,
event_type: str,
message: str,
send_fn,
) -> None:
"""Deliver with rate limiting and exponential backoff retry."""
# Rate limit check
if not await self._check_rate_limit(channel):
await self._persist_notification(
channel, event_type, message, "rate_limited"
)
logger.info("Notification rate-limited: channel=%s event=%s", channel, event_type)
return
last_error = ""
for attempt in range(self.MAX_RETRIES):
try:
await send_fn(event_type, message)
# Success — increment rate counter and persist
await self._increment_rate_counter(channel)
await self._persist_notification(
channel, event_type, message, "delivered"
)
return
except Exception as exc:
last_error = str(exc)
logger.warning(
"Notification delivery failed (attempt %d/%d): %s",
attempt + 1,
self.MAX_RETRIES,
last_error,
)
if attempt < self.MAX_RETRIES - 1:
await asyncio.sleep(self.RETRY_DELAYS[attempt])
# All retries exhausted
await self._persist_notification(
channel,
event_type,
message,
"failed",
retry_count=self.MAX_RETRIES,
error_message=last_error,
)
logger.error(
"Notification delivery failed after %d retries: channel=%s event=%s error=%s",
self.MAX_RETRIES,
channel,
event_type,
last_error,
)
# ------------------------------------------------------------------
# Channel implementations
# ------------------------------------------------------------------
async def _send_sns(self, event_type: str, message: str) -> None:
"""Send SMS via AWS SNS."""
if not _HAS_BOTO3:
raise RuntimeError("boto3 is not installed — cannot send SNS notification")
loop = asyncio.get_event_loop()
# Run boto3 call in executor to avoid blocking the event loop
await loop.run_in_executor(None, self._send_sns_sync, event_type, message)
def _send_sns_sync(self, event_type: str, message: str) -> None:
"""Synchronous SNS publish (runs in executor)."""
client = boto3.client("sns")
subject = f"[Stonks] {event_type.replace('_', ' ').title()}"
if self.config.sns_topic_arn:
client.publish(
TopicArn=self.config.sns_topic_arn,
Message=message,
Subject=subject[:100], # SNS subject max 100 chars
)
if self.config.sns_phone_number:
client.publish(
PhoneNumber=self.config.sns_phone_number,
Message=message[:160], # SMS max 160 chars
)
async def _send_gmail(self, event_type: str, message: str) -> None:
"""Send email via Gmail API."""
if not _HAS_GOOGLE:
raise RuntimeError(
"google-api-python-client not installed — cannot send Gmail notification"
)
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._send_gmail_sync, event_type, message)
def _send_gmail_sync(self, event_type: str, message: str) -> None:
"""Synchronous Gmail send (runs in executor)."""
import os
refresh_token = os.getenv("GMAIL_REFRESH_TOKEN", "")
client_id = os.getenv("GMAIL_CLIENT_ID", "")
client_secret = os.getenv("GMAIL_CLIENT_SECRET", "")
if not all([refresh_token, client_id, client_secret]):
raise RuntimeError("Gmail OAuth2 credentials not configured")
creds = Credentials(
token=None,
refresh_token=refresh_token,
client_id=client_id,
client_secret=client_secret,
token_uri="https://oauth2.googleapis.com/token",
)
service = google_build("gmail", "v1", credentials=creds)
subject = f"[Stonks Alert] {event_type.replace('_', ' ').title()}"
mime_msg = MIMEText(message)
mime_msg["to"] = self.config.gmail_recipient
mime_msg["from"] = self.config.gmail_sender or "me"
mime_msg["subject"] = subject
raw = base64.urlsafe_b64encode(mime_msg.as_bytes()).decode()
service.users().messages().send(
userId="me", body={"raw": raw}
).execute()
# ------------------------------------------------------------------
# Rate limiting via Redis
# ------------------------------------------------------------------
async def _check_rate_limit(self, channel: str) -> bool:
"""Return True if the channel is within its rate limit."""
if self.redis is None:
return True # No Redis — allow all
limit = self.SMS_RATE_LIMIT if channel == "sms" else self.EMAIL_RATE_LIMIT
key = trading_notification_rate_key(channel)
try:
count = await self.redis.get(key)
if count is not None and int(count) >= limit:
return False
except Exception:
logger.debug("Could not check rate limit — allowing notification")
return True
async def _increment_rate_counter(self, channel: str) -> None:
"""Increment the hourly rate counter for a channel."""
if self.redis is None:
return
key = trading_notification_rate_key(channel)
try:
pipe = self.redis.pipeline()
pipe.incr(key)
pipe.expire(key, 3600) # 1-hour TTL
await pipe.execute()
except Exception:
logger.debug("Could not increment rate counter for %s", channel)
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
async def _persist_notification(
self,
channel: str,
event_type: str,
message: str,
delivery_status: str,
retry_count: int = 0,
error_message: str | None = None,
) -> None:
"""Persist notification record to the notifications table."""
if self.pool is None:
return
try:
now = datetime.now(tz=timezone.utc)
delivered_at = now if delivery_status == "delivered" else None
await self.pool.execute(
"INSERT INTO notifications "
"(channel, event_type, message, delivery_status, "
"retry_count, error_message, created_at, delivered_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
channel,
event_type,
message[:2000], # Truncate long messages
delivery_status,
retry_count,
error_message,
now,
delivered_at,
)
except Exception:
logger.debug("Could not persist notification record")
# ------------------------------------------------------------------
# Daily summary scheduler
# ------------------------------------------------------------------
async def daily_summary_scheduler(self, engine) -> None:
"""Sleep until 16:30 ET each trading day and dispatch a daily summary.
Task 31.6: Runs as a background coroutine.
"""
from services.trading.trading_window import ET
while engine.running:
try:
now_utc = datetime.now(tz=timezone.utc)
et_now = now_utc.astimezone(ET)
# Target 16:30 ET
target = et_now.replace(hour=16, minute=30, second=0, microsecond=0)
if et_now >= target:
target += timedelta(days=1)
# Skip weekends
while target.weekday() > 4:
target += timedelta(days=1)
sleep_seconds = (target - et_now).total_seconds()
if sleep_seconds > 0:
await asyncio.sleep(sleep_seconds)
if not engine.running:
break
# Build summary message
summary_parts = ["Daily Trading Summary"]
if engine.portfolio_state is not None:
ps = engine.portfolio_state
summary_parts.extend([
f"Portfolio Value: ${ps.total_value:,.2f}",
f"Active Pool: ${ps.active_pool:,.2f}",
f"Reserve Pool: ${ps.reserve_pool:,.2f}",
f"Open Positions: {ps.open_position_count}",
f"Portfolio Heat: {ps.portfolio_heat:.2%}",
f"Risk Tier: {engine.config.risk_tier}",
])
summary_message = " | ".join(summary_parts)
await self.dispatch("daily_summary", summary_message)
except asyncio.CancelledError:
break
except Exception:
logger.exception("Error in daily summary scheduler")
if engine.running:
await asyncio.sleep(60)