"""Notification dispatch for the autonomous trading engine. Handles actual delivery of notifications via AWS SNS (SMS) and Gmail SMTP (email), with rate limiting via Redis, retry with exponential backoff, and persistence to the notifications table. Task 31: Wire notification dispatch. Gmail uses SMTP with an app password (smtp.gmail.com:587 STARTTLS). boto3 is an optional dependency for SNS — the module degrades gracefully if not installed. """ from __future__ import annotations import asyncio import logging import smtplib from datetime import datetime, timedelta, timezone from email.mime.text import MIMEText from services.shared.config import TradingConfig from services.shared.redis_keys import trading_notification_rate_key logger = logging.getLogger("trading_engine.notifications") # Conditionally import boto3 try: import boto3 _HAS_BOTO3 = True except ImportError: _HAS_BOTO3 = False logger.info("boto3 not installed — SNS notifications disabled") class NotificationDispatcher: """Routes notification delivery to enabled channels. Accepts pool (asyncpg), redis (redis.asyncio), and TradingConfig. Persists every notification attempt to the ``notifications`` table. Rate-limits via Redis counters with 1-hour TTL. Retries failed deliveries with exponential backoff (1s, 2s, 4s). """ # Rate limits per hour SMS_RATE_LIMIT = 10 EMAIL_RATE_LIMIT = 20 # Retry config MAX_RETRIES = 3 RETRY_DELAYS = [1, 2, 4] # seconds def __init__( self, pool: object, redis: object, config: TradingConfig, ) -> None: self.pool = pool self.redis = redis self.config = config # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def dispatch(self, event_type: str, message: str) -> None: """Route notification to all enabled channels. Runs delivery in a background task so it never blocks trading. """ asyncio.create_task(self._dispatch_impl(event_type, message)) async def _dispatch_impl(self, event_type: str, message: str) -> None: """Internal dispatch — sends to enabled channels.""" # SNS (SMS) if self.config.sns_topic_arn: await self._deliver_with_retry("sms", event_type, message, self._send_sns) # Gmail (email) if self.config.gmail_recipient: await self._deliver_with_retry("email", event_type, message, self._send_gmail) async def _deliver_with_retry( self, channel: str, event_type: str, message: str, send_fn, ) -> None: """Deliver with rate limiting and exponential backoff retry.""" # Rate limit check if not await self._check_rate_limit(channel): await self._persist_notification( channel, event_type, message, "rate_limited" ) logger.info("Notification rate-limited: channel=%s event=%s", channel, event_type) return last_error = "" for attempt in range(self.MAX_RETRIES): try: await send_fn(event_type, message) # Success — increment rate counter and persist await self._increment_rate_counter(channel) await self._persist_notification( channel, event_type, message, "delivered" ) return except Exception as exc: last_error = str(exc) logger.warning( "Notification delivery failed (attempt %d/%d): %s", attempt + 1, self.MAX_RETRIES, last_error, ) if attempt < self.MAX_RETRIES - 1: await asyncio.sleep(self.RETRY_DELAYS[attempt]) # All retries exhausted await self._persist_notification( channel, event_type, message, "failed", retry_count=self.MAX_RETRIES, error_message=last_error, ) logger.error( "Notification delivery failed after %d retries: channel=%s event=%s error=%s", self.MAX_RETRIES, channel, event_type, last_error, ) # ------------------------------------------------------------------ # Channel implementations # ------------------------------------------------------------------ async def _send_sns(self, event_type: str, message: str) -> None: """Send SMS via AWS SNS.""" if not _HAS_BOTO3: raise RuntimeError("boto3 is not installed — cannot send SNS notification") loop = asyncio.get_event_loop() # Run boto3 call in executor to avoid blocking the event loop await loop.run_in_executor(None, self._send_sns_sync, event_type, message) def _send_sns_sync(self, event_type: str, message: str) -> None: """Synchronous SNS publish (runs in executor).""" client = boto3.client("sns") subject = f"[Stonks] {event_type.replace('_', ' ').title()}" if self.config.sns_topic_arn: client.publish( TopicArn=self.config.sns_topic_arn, Message=message, Subject=subject[:100], # SNS subject max 100 chars ) if self.config.sns_phone_number: client.publish( PhoneNumber=self.config.sns_phone_number, Message=message[:160], # SMS max 160 chars ) async def _send_gmail(self, event_type: str, message: str) -> None: """Send email via Gmail SMTP with app password (TLS on port 587).""" loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._send_gmail_sync, event_type, message) def _send_gmail_sync(self, event_type: str, message: str) -> None: """Synchronous Gmail SMTP send (runs in executor).""" import os sender = self.config.gmail_sender or os.getenv("GMAIL_SENDER", "") recipient = self.config.gmail_recipient or os.getenv("GMAIL_RECIPIENT", "") app_password = os.getenv("GMAIL_APP_PASSWORD", "") if not all([sender, recipient, app_password]): raise RuntimeError( "Gmail SMTP not configured — need GMAIL_SENDER, GMAIL_RECIPIENT, " "and GMAIL_APP_PASSWORD env vars" ) subject = f"[Stonks Alert] {event_type.replace('_', ' ').title()}" mime_msg = MIMEText(message) mime_msg["To"] = recipient mime_msg["From"] = sender mime_msg["Subject"] = subject with smtplib.SMTP("smtp.gmail.com", 587) as server: server.starttls() server.login(sender, app_password) server.send_message(mime_msg) logger.info("Email sent: %s → %s (%s)", sender, recipient, event_type) # ------------------------------------------------------------------ # Rate limiting via Redis # ------------------------------------------------------------------ async def _check_rate_limit(self, channel: str) -> bool: """Return True if the channel is within its rate limit.""" if self.redis is None: return True # No Redis — allow all limit = self.SMS_RATE_LIMIT if channel == "sms" else self.EMAIL_RATE_LIMIT key = trading_notification_rate_key(channel) try: count = await self.redis.get(key) if count is not None and int(count) >= limit: return False except Exception: logger.debug("Could not check rate limit — allowing notification") return True async def _increment_rate_counter(self, channel: str) -> None: """Increment the hourly rate counter for a channel.""" if self.redis is None: return key = trading_notification_rate_key(channel) try: pipe = self.redis.pipeline() pipe.incr(key) pipe.expire(key, 3600) # 1-hour TTL await pipe.execute() except Exception: logger.debug("Could not increment rate counter for %s", channel) # ------------------------------------------------------------------ # Persistence # ------------------------------------------------------------------ async def _persist_notification( self, channel: str, event_type: str, message: str, delivery_status: str, retry_count: int = 0, error_message: str | None = None, ) -> None: """Persist notification record to the notifications table.""" if self.pool is None: return try: now = datetime.now(tz=timezone.utc) delivered_at = now if delivery_status == "delivered" else None await self.pool.execute( "INSERT INTO notifications " "(channel, event_type, message, delivery_status, " "retry_count, error_message, created_at, delivered_at) " "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", channel, event_type, message[:2000], # Truncate long messages delivery_status, retry_count, error_message, now, delivered_at, ) except Exception: logger.debug("Could not persist notification record") # ------------------------------------------------------------------ # Daily summary scheduler # ------------------------------------------------------------------ async def daily_summary_scheduler(self, engine) -> None: """Sleep until 16:30 ET each trading day and dispatch a daily summary. Task 31.6: Runs as a background coroutine. """ from services.trading.trading_window import ET while engine.running: try: now_utc = datetime.now(tz=timezone.utc) et_now = now_utc.astimezone(ET) # Target 16:30 ET target = et_now.replace(hour=16, minute=30, second=0, microsecond=0) if et_now >= target: target += timedelta(days=1) # Skip weekends while target.weekday() > 4: target += timedelta(days=1) sleep_seconds = (target - et_now).total_seconds() if sleep_seconds > 0: await asyncio.sleep(sleep_seconds) if not engine.running: break # Build summary message summary_parts = ["Daily Trading Summary"] if engine.portfolio_state is not None: ps = engine.portfolio_state summary_parts.extend([ f"Portfolio Value: ${ps.total_value:,.2f}", f"Active Pool: ${ps.active_pool:,.2f}", f"Reserve Pool: ${ps.reserve_pool:,.2f}", f"Open Positions: {ps.open_position_count}", f"Portfolio Heat: {ps.portfolio_heat:.2%}", f"Risk Tier: {engine.config.risk_tier}", ]) summary_message = " | ".join(summary_parts) await self.dispatch("daily_summary", summary_message) except asyncio.CancelledError: break except Exception: logger.exception("Error in daily summary scheduler") if engine.running: await asyncio.sleep(60)