c4666c071b
Replaced the Gmail API (OAuth2) notification delivery with plain SMTP using a Gmail app password. Much simpler setup — no Google Cloud project, no OAuth2 flow, no extra dependencies. - Rewrote _send_gmail() to use smtplib with smtp.gmail.com:587 TLS - Added stonks-gmail-secrets to Helm chart (GMAIL_SENDER, GMAIL_RECIPIENT, GMAIL_APP_PASSWORD) - Added gmail secret to trading-engine deployment - Updated runmefirst.sh to read gmail.app from kube dir - Sender/recipient: celes@celestium.life
329 lines
12 KiB
Python
329 lines
12 KiB
Python
"""Notification dispatch for the autonomous trading engine.
|
|
|
|
Handles actual delivery of notifications via AWS SNS (SMS) and Gmail SMTP
|
|
(email), with rate limiting via Redis, retry with exponential backoff,
|
|
and persistence to the notifications table.
|
|
|
|
Task 31: Wire notification dispatch.
|
|
|
|
Gmail uses SMTP with an app password (smtp.gmail.com:587 STARTTLS).
|
|
boto3 is an optional dependency for SNS — the module degrades gracefully
|
|
if not installed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import smtplib
|
|
from datetime import datetime, timedelta, timezone
|
|
from email.mime.text import MIMEText
|
|
|
|
from services.shared.config import TradingConfig
|
|
from services.shared.redis_keys import trading_notification_rate_key
|
|
|
|
logger = logging.getLogger("trading_engine.notifications")
|
|
|
|
# Conditionally import boto3
|
|
try:
|
|
import boto3
|
|
|
|
_HAS_BOTO3 = True
|
|
except ImportError:
|
|
_HAS_BOTO3 = False
|
|
logger.info("boto3 not installed — SNS notifications disabled")
|
|
|
|
|
|
class NotificationDispatcher:
|
|
"""Routes notification delivery to enabled channels.
|
|
|
|
Accepts pool (asyncpg), redis (redis.asyncio), and TradingConfig.
|
|
Persists every notification attempt to the ``notifications`` table.
|
|
Rate-limits via Redis counters with 1-hour TTL.
|
|
Retries failed deliveries with exponential backoff (1s, 2s, 4s).
|
|
"""
|
|
|
|
# Rate limits per hour
|
|
SMS_RATE_LIMIT = 10
|
|
EMAIL_RATE_LIMIT = 20
|
|
|
|
# Retry config
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAYS = [1, 2, 4] # seconds
|
|
|
|
def __init__(
|
|
self,
|
|
pool: object,
|
|
redis: object,
|
|
config: TradingConfig,
|
|
) -> None:
|
|
self.pool = pool
|
|
self.redis = redis
|
|
self.config = config
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
async def dispatch(self, event_type: str, message: str) -> None:
|
|
"""Route notification to all enabled channels.
|
|
|
|
Runs delivery in a background task so it never blocks trading.
|
|
"""
|
|
asyncio.create_task(self._dispatch_impl(event_type, message))
|
|
|
|
async def _dispatch_impl(self, event_type: str, message: str) -> None:
|
|
"""Internal dispatch — sends to enabled channels."""
|
|
# SNS (SMS)
|
|
if self.config.sns_topic_arn:
|
|
await self._deliver_with_retry("sms", event_type, message, self._send_sns)
|
|
|
|
# Gmail (email)
|
|
if self.config.gmail_recipient:
|
|
await self._deliver_with_retry("email", event_type, message, self._send_gmail)
|
|
|
|
async def _deliver_with_retry(
|
|
self,
|
|
channel: str,
|
|
event_type: str,
|
|
message: str,
|
|
send_fn,
|
|
) -> None:
|
|
"""Deliver with rate limiting and exponential backoff retry."""
|
|
# Rate limit check
|
|
if not await self._check_rate_limit(channel):
|
|
await self._persist_notification(
|
|
channel, event_type, message, "rate_limited"
|
|
)
|
|
logger.info("Notification rate-limited: channel=%s event=%s", channel, event_type)
|
|
return
|
|
|
|
last_error = ""
|
|
for attempt in range(self.MAX_RETRIES):
|
|
try:
|
|
await send_fn(event_type, message)
|
|
# Success — increment rate counter and persist
|
|
await self._increment_rate_counter(channel)
|
|
await self._persist_notification(
|
|
channel, event_type, message, "delivered"
|
|
)
|
|
return
|
|
except Exception as exc:
|
|
last_error = str(exc)
|
|
logger.warning(
|
|
"Notification delivery failed (attempt %d/%d): %s",
|
|
attempt + 1,
|
|
self.MAX_RETRIES,
|
|
last_error,
|
|
)
|
|
if attempt < self.MAX_RETRIES - 1:
|
|
await asyncio.sleep(self.RETRY_DELAYS[attempt])
|
|
|
|
# All retries exhausted
|
|
await self._persist_notification(
|
|
channel,
|
|
event_type,
|
|
message,
|
|
"failed",
|
|
retry_count=self.MAX_RETRIES,
|
|
error_message=last_error,
|
|
)
|
|
logger.error(
|
|
"Notification delivery failed after %d retries: channel=%s event=%s error=%s",
|
|
self.MAX_RETRIES,
|
|
channel,
|
|
event_type,
|
|
last_error,
|
|
)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Channel implementations
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _send_sns(self, event_type: str, message: str) -> None:
|
|
"""Send SMS via AWS SNS."""
|
|
if not _HAS_BOTO3:
|
|
raise RuntimeError("boto3 is not installed — cannot send SNS notification")
|
|
|
|
loop = asyncio.get_event_loop()
|
|
# Run boto3 call in executor to avoid blocking the event loop
|
|
await loop.run_in_executor(None, self._send_sns_sync, event_type, message)
|
|
|
|
def _send_sns_sync(self, event_type: str, message: str) -> None:
|
|
"""Synchronous SNS publish (runs in executor)."""
|
|
client = boto3.client("sns")
|
|
subject = f"[Stonks] {event_type.replace('_', ' ').title()}"
|
|
|
|
if self.config.sns_topic_arn:
|
|
client.publish(
|
|
TopicArn=self.config.sns_topic_arn,
|
|
Message=message,
|
|
Subject=subject[:100], # SNS subject max 100 chars
|
|
)
|
|
|
|
if self.config.sns_phone_number:
|
|
client.publish(
|
|
PhoneNumber=self.config.sns_phone_number,
|
|
Message=message[:160], # SMS max 160 chars
|
|
)
|
|
|
|
async def _send_gmail(self, event_type: str, message: str) -> None:
|
|
"""Send email via Gmail SMTP with app password (TLS on port 587)."""
|
|
loop = asyncio.get_event_loop()
|
|
await loop.run_in_executor(None, self._send_gmail_sync, event_type, message)
|
|
|
|
def _send_gmail_sync(self, event_type: str, message: str) -> None:
|
|
"""Synchronous Gmail SMTP send (runs in executor)."""
|
|
import os
|
|
|
|
sender = self.config.gmail_sender or os.getenv("GMAIL_SENDER", "")
|
|
recipient = self.config.gmail_recipient or os.getenv("GMAIL_RECIPIENT", "")
|
|
app_password = os.getenv("GMAIL_APP_PASSWORD", "")
|
|
|
|
if not all([sender, recipient, app_password]):
|
|
raise RuntimeError(
|
|
"Gmail SMTP not configured — need GMAIL_SENDER, GMAIL_RECIPIENT, "
|
|
"and GMAIL_APP_PASSWORD env vars"
|
|
)
|
|
|
|
subject = f"[Stonks Alert] {event_type.replace('_', ' ').title()}"
|
|
mime_msg = MIMEText(message)
|
|
mime_msg["To"] = recipient
|
|
mime_msg["From"] = sender
|
|
mime_msg["Subject"] = subject
|
|
|
|
with smtplib.SMTP("smtp.gmail.com", 587) as server:
|
|
server.starttls()
|
|
server.login(sender, app_password)
|
|
server.send_message(mime_msg)
|
|
|
|
logger.info("Email sent: %s → %s (%s)", sender, recipient, event_type)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Rate limiting via Redis
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _check_rate_limit(self, channel: str) -> bool:
|
|
"""Return True if the channel is within its rate limit."""
|
|
if self.redis is None:
|
|
return True # No Redis — allow all
|
|
|
|
limit = self.SMS_RATE_LIMIT if channel == "sms" else self.EMAIL_RATE_LIMIT
|
|
key = trading_notification_rate_key(channel)
|
|
|
|
try:
|
|
count = await self.redis.get(key)
|
|
if count is not None and int(count) >= limit:
|
|
return False
|
|
except Exception:
|
|
logger.debug("Could not check rate limit — allowing notification")
|
|
|
|
return True
|
|
|
|
async def _increment_rate_counter(self, channel: str) -> None:
|
|
"""Increment the hourly rate counter for a channel."""
|
|
if self.redis is None:
|
|
return
|
|
|
|
key = trading_notification_rate_key(channel)
|
|
try:
|
|
pipe = self.redis.pipeline()
|
|
pipe.incr(key)
|
|
pipe.expire(key, 3600) # 1-hour TTL
|
|
await pipe.execute()
|
|
except Exception:
|
|
logger.debug("Could not increment rate counter for %s", channel)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Persistence
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _persist_notification(
|
|
self,
|
|
channel: str,
|
|
event_type: str,
|
|
message: str,
|
|
delivery_status: str,
|
|
retry_count: int = 0,
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
"""Persist notification record to the notifications table."""
|
|
if self.pool is None:
|
|
return
|
|
|
|
try:
|
|
now = datetime.now(tz=timezone.utc)
|
|
delivered_at = now if delivery_status == "delivered" else None
|
|
|
|
await self.pool.execute(
|
|
"INSERT INTO notifications "
|
|
"(channel, event_type, message, delivery_status, "
|
|
"retry_count, error_message, created_at, delivered_at) "
|
|
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
|
|
channel,
|
|
event_type,
|
|
message[:2000], # Truncate long messages
|
|
delivery_status,
|
|
retry_count,
|
|
error_message,
|
|
now,
|
|
delivered_at,
|
|
)
|
|
except Exception:
|
|
logger.debug("Could not persist notification record")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Daily summary scheduler
|
|
# ------------------------------------------------------------------
|
|
|
|
async def daily_summary_scheduler(self, engine) -> None:
|
|
"""Sleep until 16:30 ET each trading day and dispatch a daily summary.
|
|
|
|
Task 31.6: Runs as a background coroutine.
|
|
"""
|
|
from services.trading.trading_window import ET
|
|
|
|
while engine.running:
|
|
try:
|
|
now_utc = datetime.now(tz=timezone.utc)
|
|
et_now = now_utc.astimezone(ET)
|
|
|
|
# Target 16:30 ET
|
|
target = et_now.replace(hour=16, minute=30, second=0, microsecond=0)
|
|
if et_now >= target:
|
|
target += timedelta(days=1)
|
|
|
|
# Skip weekends
|
|
while target.weekday() > 4:
|
|
target += timedelta(days=1)
|
|
|
|
sleep_seconds = (target - et_now).total_seconds()
|
|
if sleep_seconds > 0:
|
|
await asyncio.sleep(sleep_seconds)
|
|
|
|
if not engine.running:
|
|
break
|
|
|
|
# Build summary message
|
|
summary_parts = ["Daily Trading Summary"]
|
|
if engine.portfolio_state is not None:
|
|
ps = engine.portfolio_state
|
|
summary_parts.extend([
|
|
f"Portfolio Value: ${ps.total_value:,.2f}",
|
|
f"Active Pool: ${ps.active_pool:,.2f}",
|
|
f"Reserve Pool: ${ps.reserve_pool:,.2f}",
|
|
f"Open Positions: {ps.open_position_count}",
|
|
f"Portfolio Heat: {ps.portfolio_heat:.2%}",
|
|
f"Risk Tier: {engine.config.risk_tier}",
|
|
])
|
|
|
|
summary_message = " | ".join(summary_parts)
|
|
await self.dispatch("daily_summary", summary_message)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception:
|
|
logger.exception("Error in daily summary scheduler")
|
|
if engine.running:
|
|
await asyncio.sleep(60)
|