Files
Celes Renata 2f2a7665e7
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
fix: suppress take-profit when active buy signal has high confidence
Profit-taking was selling positions that still had strong bullish
signals (0.93 confidence buy recommendations), then the engine would
immediately rebuy at a worse price.

Now checks for recent high-confidence buy recommendations (>=0.80)
before executing a take-profit sell. If the signal says keep holding,
the take-profit is suppressed. Stop-losses still fire unconditionally.
2026-05-13 20:19:47 +00:00

1994 lines
84 KiB
Python

"""Core autonomous trading engine — decision loop and pre-trade evaluation.
Feature: autonomous-trading-engine
Coordinates all trading sub-components (PositionSizer, StopLossManager,
CircuitBreaker, ReservePoolController, RiskTierController, CorrelationMatrix)
to evaluate recommendations and produce TradingDecision records.
The ``evaluate_recommendation`` method is deliberately synchronous-compatible
so that it can be tested without real DB/Redis connections. The async
``start`` / ``stop`` methods manage the live decision loop, stop-loss
monitor, and performance metrics loop.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from datetime import datetime, timedelta, timezone
import httpx
try:
import numpy as np # noqa: F401
_HAS_NUMPY = True
except ImportError:
_HAS_NUMPY = False
from services.shared.config import TradingConfig
from services.shared.redis_keys import (
QUEUE_BROKER,
queue_key,
trading_dedupe_key,
)
from services.trading.circuit_breaker import CircuitBreaker
from services.trading.correlation import CorrelationMatrix
from services.trading.micro_trading import MicroTradeConfig, MicroTradingModule
from services.trading.models import (
RISK_TIER_DEFAULTS,
CircuitBreakerState,
OpenPosition,
PerformanceMetrics,
PortfolioState,
PositionSizeResult,
RiskTierConfig,
StopLevels,
StopTrigger,
TradingDecision,
)
from services.trading.notifications import NotificationRecord, NotificationService
from services.trading.performance_tracker import PerformanceComputer
from services.trading.position_sizer import PositionSizer
from services.trading.rebalancer import PortfolioRebalancer
from services.trading.reserve_pool import ReservePoolController
from services.trading.risk_tier_controller import RiskTierController
from services.trading.stop_loss_manager import StopLossManager
from services.trading.trading_window import is_market_open, is_within_trading_window
logger = logging.getLogger("trading_engine")
class TradingEngine:
"""Main autonomous trading engine.
Manages the decision loop, coordinates all sub-components,
and maintains runtime state.
Parameters
----------
pool:
asyncpg connection pool (used by async lifecycle methods).
redis:
Redis client (used for deduplication and pub/sub).
config:
Trading engine configuration.
"""
def __init__(
self,
pool: object,
redis: object,
config: TradingConfig,
) -> None:
self.pool = pool
self.redis = redis
self.config = config
# Sub-components
self.position_sizer = PositionSizer()
self.stop_loss_manager = StopLossManager()
self.circuit_breaker = CircuitBreaker()
self.reserve_pool_controller = ReservePoolController(
siphon_pct=config.reserve_siphon_pct,
high_water_pct=config.reserve_high_water_pct,
)
self.risk_tier_controller = RiskTierController()
self.correlation_matrix = CorrelationMatrix()
self.notification_service = NotificationService()
self.micro_trading_module = MicroTradingModule()
self.rebalancer = PortfolioRebalancer()
self.performance_computer = PerformanceComputer()
# Runtime state
self.running: bool = False
self.portfolio_state: PortfolioState | None = None
self.processed_recommendation_ids: set[str] = set()
# Async task management (Task 27.6)
self._tasks: list[asyncio.Task] = [] # type: ignore[type-arg]
# Active risk tier loaded from config defaults
self._active_risk_tier: RiskTierConfig = RISK_TIER_DEFAULTS.get(
config.risk_tier, RISK_TIER_DEFAULTS["moderate"]
)
# Circuit breaker runtime state
self._cb_state: CircuitBreakerState = CircuitBreakerState()
# Earnings calendar cache
self._earnings_calendar: dict = {}
# Last poll timestamp — initialised to 24 h ago so first poll
# picks up recent recommendations
self._last_poll_timestamp: datetime = datetime.now(tz=timezone.utc) - timedelta(hours=24)
# Per-ticker last-price-fetch timestamps for safety sell (Task 28.4)
self._last_price_timestamps: dict[str, datetime] = {}
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
"""Load portfolio state and spawn the async worker loops.
When ``self.pool`` is ``None`` (unit-test / lightweight mode) the
engine skips database loading and starts with an empty portfolio.
"""
# --- Load initial state from PostgreSQL (graceful degradation) ---
if self.pool is not None:
try:
await self._load_initial_state()
except Exception:
logger.exception("Failed to load initial state from DB — starting with defaults")
# Ensure we always have a portfolio state
if self.portfolio_state is None:
self.portfolio_state = PortfolioState()
self.running = True
# Spawn async worker loops
self._tasks = [
asyncio.create_task(self._decision_loop(), name="decision_loop"),
asyncio.create_task(self._stop_loss_monitor(), name="stop_loss_monitor"),
asyncio.create_task(self._performance_loop(), name="performance_loop"),
asyncio.create_task(self._risk_tier_scheduler(), name="risk_tier_scheduler"),
asyncio.create_task(self._rebalance_scheduler(), name="rebalance_scheduler"),
]
logger.info("Trading engine started with %d worker tasks", len(self._tasks))
async def stop(self) -> None:
"""Graceful shutdown — cancel all worker tasks and persist state."""
self.running = False
# Cancel all tasks
for task in self._tasks:
task.cancel()
if self._tasks:
await asyncio.gather(*self._tasks, return_exceptions=True)
self._tasks.clear()
logger.info("Trading engine stopped")
# ------------------------------------------------------------------
# Core evaluation logic (synchronous-compatible for testing)
# ------------------------------------------------------------------
def evaluate_recommendation(
self,
rec: dict,
portfolio_state: PortfolioState,
risk_tier: RiskTierConfig,
circuit_breaker_state: CircuitBreakerState,
correlation_matrix: CorrelationMatrix,
earnings_calendar: dict,
now: datetime | None = None,
) -> TradingDecision:
"""Run all pre-trade checks and produce a TradingDecision.
The checks are applied in order; the first failure short-circuits
with a ``skip`` decision. If all checks pass the PositionSizer
is invoked and its result determines the final decision.
Parameters
----------
rec:
Recommendation dict with at least ``recommendation_id``,
``ticker``, ``confidence``, ``sector``, ``current_price``,
and ``action``.
portfolio_state:
Current portfolio snapshot.
risk_tier:
Active risk tier configuration.
circuit_breaker_state:
Current circuit breaker state.
correlation_matrix:
Correlation matrix instance for diversification checks.
earnings_calendar:
Mapping of ticker → next earnings datetime.
now:
Optional override for the current timestamp (for testing).
"""
now = now or datetime.now(tz=timezone.utc)
rec_id = rec.get("recommendation_id")
ticker = rec.get("ticker", "")
confidence = rec.get("confidence", 0.0)
sector = rec.get("sector", "")
current_price = rec.get("current_price", 0.0)
reasoning: list[str] = []
# --- a. Circuit breaker active? --------------------------------
if self.circuit_breaker.is_active(circuit_breaker_state, now=now):
reasoning.append("Circuit breaker is active")
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="circuit_breaker_active",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- b. Trading window? ----------------------------------------
if not is_within_trading_window(now):
reasoning.append("Outside trading window")
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="outside_trading_window",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- c. Confidence gate (early check before sizer) -------------
if confidence < risk_tier.min_confidence:
reasoning.append(
f"Confidence {confidence:.4f} below tier minimum "
f"{risk_tier.min_confidence}"
)
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="insufficient_confidence",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- d. Deduplication check ------------------------------------
if rec_id and rec_id in self.processed_recommendation_ids:
reasoning.append(f"Recommendation {rec_id} already processed")
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="duplicate_recommendation",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- e. Multiple declining positions check ---------------------
if self.check_declining_positions(portfolio_state.positions):
reasoning.append(
"Multiple declining positions — halting new entries"
)
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="multiple_declining_positions",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- f. Max open positions check -------------------------------
max_positions = self.config.max_open_positions if hasattr(self.config, "max_open_positions") else 10
if self.check_max_positions(
portfolio_state.open_position_count, max_positions
):
reasoning.append(
f"At max open positions ({portfolio_state.open_position_count}/"
f"{max_positions})"
)
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason="max_positions_reached",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# --- g. Position sizing ----------------------------------------
reasoning.append("All pre-trade checks passed — computing position size")
# Build the raw correlation dict expected by PositionSizer
corr_dict: dict[tuple[str, str], float] = correlation_matrix._data
size_result: PositionSizeResult = self.position_sizer.compute(
confidence=confidence,
ticker=ticker,
sector=sector,
current_price=current_price,
active_pool=portfolio_state.active_pool,
risk_tier=risk_tier,
portfolio_state=portfolio_state,
correlation_matrix=corr_dict,
earnings_calendar=earnings_calendar,
absolute_position_cap=self.config.absolute_position_cap,
active_pool_minimum=self.config.active_pool_minimum,
)
reasoning.extend(size_result.adjustments)
if size_result.rejected:
reasoning.append(f"Position sizer rejected: {size_result.rejection_reason}")
return self._skip_decision(
rec_id=rec_id,
ticker=ticker,
skip_reason=f"position_sizer_rejected: {size_result.rejection_reason}",
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# Mark recommendation as processed
if rec_id:
self.processed_recommendation_ids.add(rec_id)
return self._act_decision(
rec_id=rec_id,
ticker=ticker,
size_result=size_result,
risk_tier=risk_tier,
portfolio_state=portfolio_state,
circuit_breaker_state=circuit_breaker_state,
reasoning=reasoning,
now=now,
)
# ------------------------------------------------------------------
# Helper checks
# ------------------------------------------------------------------
def check_declining_positions(
self,
positions: list[OpenPosition],
threshold_pct: float = 0.50,
decline_pct: float = 0.02,
) -> bool:
"""Return True if > threshold_pct of positions have > decline_pct negative unrealized P&L.
A position is considered "declining" when its unrealized P&L as a
fraction of its entry value is worse than ``-decline_pct``.
Parameters
----------
positions:
List of currently open positions.
threshold_pct:
Fraction of positions that must be declining to trigger
(default 0.50 = 50%).
decline_pct:
Minimum loss fraction to count as declining
(default 0.02 = 2%).
"""
if not positions:
return False
declining_count = 0
for pos in positions:
entry_value = pos.entry_price * pos.quantity
if entry_value <= 0:
continue
loss_pct = -pos.unrealized_pnl / entry_value
if loss_pct > decline_pct:
declining_count += 1
return declining_count > threshold_pct * len(positions)
def check_max_positions(
self,
open_count: int,
max_positions: int = 10,
) -> bool:
"""Return True if the portfolio is at the maximum number of open positions."""
return open_count >= max_positions
# ------------------------------------------------------------------
# Integration wiring — thin wrappers for the decision loop
# ------------------------------------------------------------------
def check_stop_loss_crossings(
self,
positions: list[OpenPosition],
prices: dict[str, float],
stop_levels: dict[str, StopLevels],
) -> list[StopTrigger]:
"""Delegate to StopLossManager.check_price_crossings().
Called by the async decision loop at the configured interval
(5 min default, 60s during high-severity events).
"""
return self.stop_loss_manager.check_price_crossings(
positions, prices, stop_levels
)
def handle_position_close(
self,
realized_profit: float,
reserve_balance: float,
) -> tuple[float, float]:
"""Delegate to ReservePoolController.siphon_profit().
Called when a position close event is detected from broker
service fill events.
"""
return self.reserve_pool_controller.siphon_profit(
realized_profit, reserve_balance
)
def evaluate_risk_tier(
self,
current_tier: str,
metrics: PerformanceMetrics,
reserve_pct: float,
) -> str | None:
"""Delegate to RiskTierController.evaluate().
Scheduled to run at daily market close.
"""
return self.risk_tier_controller.evaluate(
current_tier, metrics, reserve_pct
)
def evaluate_rebalancing(
self,
positions: list[OpenPosition],
risk_tier: RiskTierConfig,
active_pool: float,
) -> list:
"""Delegate to PortfolioRebalancer.evaluate().
Scheduled to run weekly at Monday market open.
"""
return self.rebalancer.evaluate(positions, risk_tier, active_pool)
def create_alert(
self,
event_type: str,
details: str,
) -> NotificationRecord:
"""Create a notification record for a critical event.
Delegates formatting and record creation to NotificationService.
The caller is responsible for actual delivery.
"""
message = self.notification_service.format_alert(event_type, details)
return self.notification_service.create_notification(
channel="email",
event_type=event_type,
message=message,
)
def check_micro_trade_constraints(
self,
daily_count: int,
is_within_window: bool,
circuit_breaker_active: bool,
portfolio_heat_pct: float,
max_heat: float,
) -> tuple[bool, str]:
"""Delegate to MicroTradingModule.check_constraints().
Called by the decision loop when micro-trading is enabled.
"""
micro_config = MicroTradeConfig(
enabled=self.config.micro_trading_enabled,
allocation_cap_pct=self.config.micro_trading_allocation_cap_pct,
max_daily=self.config.micro_trading_max_daily,
max_hold_minutes=self.config.micro_trading_max_hold_minutes,
)
return self.micro_trading_module.check_constraints(
config=micro_config,
daily_count=daily_count,
is_within_window=is_within_window,
circuit_breaker_active=circuit_breaker_active,
portfolio_heat_pct=portfolio_heat_pct,
max_heat=max_heat,
)
# ------------------------------------------------------------------
# Async worker loops
# ------------------------------------------------------------------
async def _load_initial_state(self) -> None:
"""Load portfolio state, risk tier, reserve pool, and CB status from DB."""
if self.pool is None:
return
# Load reserve pool balance
reserve_balance = 0.0
try:
row = await self.pool.fetchrow(
"SELECT balance_after FROM reserve_pool_ledger ORDER BY created_at DESC LIMIT 1"
)
if row:
reserve_balance = float(row["balance_after"])
except Exception:
logger.debug("Could not load reserve pool balance — using 0.0")
# Load circuit breaker state (unresolved events)
try:
cb_row = await self.pool.fetchrow(
"SELECT trigger_type, triggered_at, cooldown_expires "
"FROM circuit_breaker_events WHERE resolved_at IS NULL "
"ORDER BY created_at DESC LIMIT 1"
)
if cb_row:
self._cb_state = CircuitBreakerState(
active=True,
trigger_type=cb_row["trigger_type"],
triggered_at=cb_row["triggered_at"],
cooldown_expires=cb_row["cooldown_expires"],
)
except Exception:
logger.debug("Could not load circuit breaker state — using inactive")
# Build portfolio state from broker account (real buying power)
initial_capital = 100000.0
try:
acct_row = await self.pool.fetchrow(
"SELECT config FROM broker_accounts WHERE active = TRUE ORDER BY created_at DESC LIMIT 1"
)
if acct_row and acct_row["config"]:
cfg = acct_row["config"] if isinstance(acct_row["config"], dict) else {}
initial_capital = float(cfg.get("portfolio_value", cfg.get("cash", 100000.0)))
except Exception:
logger.debug("Could not load broker account — using default capital")
# Load actual positions to calculate invested amount
invested = 0.0
open_count = 0
try:
pos_rows = await self.pool.fetch(
"SELECT quantity, avg_entry_price, current_price, unrealized_pnl FROM positions WHERE quantity > 0"
)
for pr in pos_rows:
invested += float(pr["quantity"]) * float(pr["current_price"] or pr["avg_entry_price"])
open_count += 1
except Exception:
logger.debug("Could not load positions — assuming no invested capital")
# Use portfolio_value (equity) as the total, not initial_capital
# Available = equity - invested market value is wrong with margin
# Instead use: buying_power from broker or equity - long_market_value
available = max(0.0, initial_capital - invested)
self.portfolio_state = PortfolioState(
reserve_pool=reserve_balance,
active_pool=available,
total_value=initial_capital,
open_position_count=open_count,
)
logger.info(
"Portfolio state: total=$%.2f invested=$%.2f available=$%.2f reserve=$%.2f positions=%d",
initial_capital, invested, available, reserve_balance, open_count,
)
# Compute initial correlation matrix from market data (non-blocking)
# Runs in background so the engine can start trading immediately
asyncio.create_task(self._compute_correlation_matrix(), name="correlation_matrix")
async def _decision_loop(self) -> None:
"""Poll recommendations and evaluate them in a continuous loop.
Task 27.3: Main decision loop that polls the recommendations table,
checks Redis deduplication, evaluates each recommendation, and
pushes "act" decisions to the broker queue.
"""
while self.running:
try:
await asyncio.sleep(self.config.polling_interval_seconds)
if not self.running:
break
if self.pool is None:
continue
# Poll recommendations from PostgreSQL
recs: list[dict] = []
try:
rows = await self.pool.fetch(
"SELECT * FROM recommendations "
"WHERE action IN ('buy','sell') "
"AND mode IN ('paper_eligible','live_eligible') "
"AND generated_at > NOW() - INTERVAL '2 hours' "
"AND generated_at > $1 "
"ORDER BY confidence DESC "
"LIMIT 50",
self._last_poll_timestamp,
)
if rows:
# Advance timestamp to the oldest rec in this batch
# so next poll picks up where we left off
last_gen = max(r["generated_at"] for r in rows)
self._last_poll_timestamp = last_gen
recs = [dict(r) for r in rows]
if recs:
logger.info(
"Polled %d recommendations (highest confidence=%.3f)",
len(recs), recs[0].get("confidence", 0),
)
# Fetch current prices for all tickers in this batch
batch_tickers = list({r.get("ticker", "") for r in recs if r.get("ticker")})
price_map: dict[str, float] = {}
if batch_tickers and self.pool is not None:
try:
price_rows = await self.pool.fetch(
"""SELECT DISTINCT ON (ticker) ticker, (data->>'c')::float AS price
FROM market_snapshots
WHERE ticker = ANY($1) AND snapshot_type = 'bar'
ORDER BY ticker, captured_at DESC""",
batch_tickers,
)
price_map = {r["ticker"]: r["price"] for r in price_rows if r["price"]}
except Exception:
logger.debug("Could not fetch prices from market_snapshots")
# Fall back to Polygon API for any missing prices
missing = [t for t in batch_tickers if t not in price_map]
if missing:
fetched = await self._fetch_current_prices(missing)
price_map.update(fetched)
except Exception:
logger.debug("Could not poll recommendations — table may not exist yet")
continue
for rec in recs:
try:
rec_id = str(rec.get("recommendation_id", rec.get("id", "")))
ticker = rec.get("ticker", "")
# Inject current price from market data
if not rec.get("current_price") and ticker in price_map:
rec["current_price"] = price_map[ticker]
# Skip if no price available
if not rec.get("current_price") or rec["current_price"] <= 0:
continue
# Redis deduplication check
if self.redis is not None:
dedupe_key = trading_dedupe_key(rec_id)
already = await self.redis.get(dedupe_key)
if already:
continue
# Ensure portfolio state exists
if self.portfolio_state is None:
self.portfolio_state = PortfolioState()
action = rec.get("action", "buy")
# --- Sell path: skip position sizing, look up existing position ---
if action == "sell":
# Check trading window for sells too
now_check = datetime.now(tz=timezone.utc)
if not is_within_trading_window(now_check):
continue
pos_row = None
try:
pos_row = await self.pool.fetchrow(
"SELECT quantity, avg_entry_price, current_price "
"FROM positions WHERE ticker = $1 AND quantity > 0",
ticker,
)
except Exception:
logger.debug("Could not look up position for sell: %s", ticker)
if pos_row is None:
continue
# Set dedup key only after confirming we have a position to sell
if self.redis is not None:
await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400)
sell_qty = int(pos_row["quantity"])
sell_price = rec.get("current_price", 0.0)
estimated_proceeds = sell_qty * sell_price
order_job = {
"trading_decision_id": str(uuid.uuid4()),
"ticker": ticker,
"action": "sell",
"side": "sell",
"quantity": sell_qty,
"order_type": "market",
"estimated_value": estimated_proceeds,
"source": "trading_engine",
}
if self.redis is not None:
broker_queue = queue_key(QUEUE_BROKER)
await self.redis.rpush(broker_queue, json.dumps(order_job))
logger.info(
"Pushed sell order for %s (%d shares, ~$%.2f) to broker queue",
ticker, sell_qty, estimated_proceeds,
)
# Update portfolio state
if self.portfolio_state:
self.portfolio_state.open_position_count = max(
0, self.portfolio_state.open_position_count - 1
)
self.portfolio_state.active_pool += estimated_proceeds
# Persist sell decision for audit trail
sell_decision = TradingDecision(
id=order_job["trading_decision_id"],
recommendation_id=rec_id,
decision="act",
skip_reason=None,
ticker=ticker,
computed_position_size=estimated_proceeds,
computed_share_quantity=sell_qty,
risk_tier_at_decision=self._active_risk_tier.name,
portfolio_heat_at_decision=self.portfolio_state.portfolio_heat if self.portfolio_state else 0,
active_pool_at_decision=self.portfolio_state.active_pool if self.portfolio_state else 0,
reserve_pool_at_decision=self.portfolio_state.reserve_pool if self.portfolio_state else 0,
circuit_breaker_status="active" if self._cb_state.active else "inactive",
decision_trace={"action": "sell", "reasoning": [f"Sell {sell_qty} shares of {ticker}"]},
)
await self._persist_decision(sell_decision)
# Deactivate stop levels for sold position
try:
await self.pool.execute(
"UPDATE position_stop_levels SET active = FALSE, updated_at = NOW() WHERE ticker = $1 AND active = TRUE",
ticker,
)
except Exception:
pass
if rec_id:
self.processed_recommendation_ids.add(rec_id)
continue
# --- Buy path ---
# Check if we already hold this ticker — don't double up
try:
existing_pos = await self.pool.fetchrow(
"SELECT quantity FROM positions WHERE ticker = $1 AND quantity > 0",
ticker,
)
if existing_pos:
# Permanent skip — safe to dedup
if self.redis is not None:
await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400)
continue
except Exception:
pass
# Evaluate recommendation through position sizer
decision = self.evaluate_recommendation(
rec=rec,
portfolio_state=self.portfolio_state,
risk_tier=self._active_risk_tier,
circuit_breaker_state=self._cb_state,
correlation_matrix=self.correlation_matrix,
earnings_calendar=self._earnings_calendar,
)
logger.info(
"Decision for %s: %s (reason=%s, size=$%.2f, shares=%d)",
decision.ticker,
decision.decision,
decision.skip_reason or "n/a",
decision.computed_position_size or 0,
decision.computed_share_quantity or 0,
)
# For "act" decisions: push order to broker queue
if decision.decision == "act":
# Deduct from active pool immediately to prevent over-allocation
order_cost = (decision.computed_share_quantity or 0) * rec.get("current_price", 0)
if self.portfolio_state and order_cost > 0:
self.portfolio_state.active_pool = max(0.0, self.portfolio_state.active_pool - order_cost)
self.portfolio_state.open_position_count += 1
order_job = {
"trading_decision_id": decision.id,
"ticker": decision.ticker,
"action": rec.get("action", "buy"),
"quantity": decision.computed_share_quantity,
"order_type": "market",
"source": "trading_engine",
}
if self.redis is not None:
broker_queue = queue_key(QUEUE_BROKER)
await self.redis.rpush(broker_queue, json.dumps(order_job))
logger.info(
"Pushed order for %s (%d shares, $%.2f) to broker queue — remaining pool: $%.2f",
decision.ticker,
decision.computed_share_quantity or 0,
order_cost,
self.portfolio_state.active_pool if self.portfolio_state else 0,
)
# Create stop-loss levels for the new position
if self.pool is not None:
try:
price = rec.get("current_price", 0.0)
atr_est = price * 0.02 # 2% ATR estimate
tier = self._active_risk_tier
sl_price = price * (1 - 0.02 * tier.stop_loss_atr_multiplier)
tp_price = price * (1 + 0.02 * tier.stop_loss_atr_multiplier * tier.reward_risk_ratio)
await self.pool.execute(
"INSERT INTO position_stop_levels "
"(ticker, entry_price, stop_loss_price, take_profit_price, "
"trailing_stop_active, atr_value, atr_multiplier, "
"reward_risk_ratio, signal_confidence, is_micro_trade, active) "
"VALUES ($1, $2, $3, $4, FALSE, $5, $6, $7, $8, FALSE, TRUE)",
decision.ticker, price, sl_price, tp_price,
atr_est, tier.stop_loss_atr_multiplier,
tier.reward_risk_ratio,
rec.get("confidence", 0.8),
)
except Exception:
logger.debug("Could not create stop levels for %s", decision.ticker)
# Persist decision
await self._persist_decision(decision)
# Set dedup key only for permanent outcomes (act or
# non-retryable skips). Do NOT dedup
# outside_trading_window — those should be retried
# when the market opens.
retryable_skips = {"outside_trading_window"}
if decision.skip_reason not in retryable_skips:
if self.redis is not None:
await self.redis.set(
trading_dedupe_key(rec_id), "1", ex=86400,
)
if rec_id:
self.processed_recommendation_ids.add(rec_id)
except Exception:
logger.exception("Error evaluating recommendation %s", rec.get("recommendation_id", "?"))
except asyncio.CancelledError:
break
except Exception:
logger.exception("Unexpected error in decision loop")
if self.running:
await asyncio.sleep(5)
async def _stop_loss_monitor(self) -> None:
"""Monitor open positions for stop-loss and take-profit crossings.
Task 28.1: Periodically checks current prices against stop levels
and submits sell orders for triggered positions.
"""
while self.running:
try:
await asyncio.sleep(self.config.stop_loss_check_interval_seconds)
if not self.running:
break
now = datetime.now(tz=timezone.utc)
# Skip if not market hours
if not is_market_open(now):
continue
if self.pool is None:
continue
# Load positions and stop levels from DB
positions = await self._load_open_positions()
stop_levels = await self._load_stop_levels()
if not positions:
continue
# Fetch current prices
tickers = [p.ticker for p in positions]
prices = await self._fetch_current_prices(tickers)
# Update last-price timestamps for tickers that returned data
for ticker in tickers:
if ticker in prices:
self._last_price_timestamps[ticker] = now
# Safety sell for missing price data (Task 28.4)
for pos in positions:
if pos.ticker not in prices:
last_ts = self._last_price_timestamps.get(pos.ticker)
if last_ts and (now - last_ts) > timedelta(minutes=15):
logger.warning(
"No price data for %s for >15 min — submitting safety sell",
pos.ticker,
)
await self._submit_sell_order(
pos.ticker, pos.quantity, "safety_sell_missing_price"
)
# Check crossings
triggers = self.check_stop_loss_crossings(positions, prices, stop_levels)
for trigger in triggers:
# Find the position to get quantity
pos_match = next((p for p in positions if p.ticker == trigger.ticker), None)
if pos_match is None:
continue
# Suppress take-profit when a strong buy signal is active
if trigger.trigger_type == "take_profit" and self.pool is not None:
try:
active_buy = await self.pool.fetchrow(
"SELECT confidence FROM recommendations "
"WHERE ticker = $1 AND action = 'buy' "
"AND mode IN ('paper_eligible', 'live_eligible') "
"AND generated_at > NOW() - INTERVAL '2 hours' "
"ORDER BY confidence DESC LIMIT 1",
trigger.ticker,
)
if active_buy and float(active_buy["confidence"]) >= 0.80:
logger.info(
"Suppressing take-profit for %s — active buy signal (confidence=%.3f)",
trigger.ticker,
float(active_buy["confidence"]),
)
continue
except Exception:
pass # On error, proceed with the take-profit
await self._submit_sell_order(
trigger.ticker,
pos_match.quantity,
f"{trigger.trigger_type}_triggered",
)
logger.info(
"Stop-loss monitor: %s triggered for %s at %.2f (trigger: %.2f)",
trigger.trigger_type,
trigger.ticker,
trigger.current_price,
trigger.trigger_price,
)
except asyncio.CancelledError:
break
except Exception:
logger.exception("Unexpected error in stop-loss monitor")
if self.running:
await asyncio.sleep(5)
async def _performance_loop(self) -> None:
"""Compute and update performance metrics periodically.
Task 29.1: Runs every 5 minutes during market hours, computing
portfolio metrics and updating self.portfolio_state.
Task 29.2: Persists a daily snapshot at end of trading day.
"""
last_snapshot_date: str | None = None
while self.running:
try:
await asyncio.sleep(300) # 5 minutes
if not self.running:
break
now = datetime.now(tz=timezone.utc)
# Skip if not market hours
if not is_market_open(now):
# Check if we should persist end-of-day snapshot (Task 29.2)
from services.trading.trading_window import ET
et_now = now.astimezone(ET)
today_str = et_now.strftime("%Y-%m-%d")
# After 4:00 PM ET and haven't snapshotted today
if et_now.hour >= 16 and last_snapshot_date != today_str:
await self._persist_daily_snapshot(now)
last_snapshot_date = today_str
continue
# Compute metrics from current state
if self.portfolio_state is None:
continue
# Sync active_pool with actual positions from DB
try:
if self.pool is not None:
pos_rows = await self.pool.fetch(
"SELECT quantity, avg_entry_price, current_price FROM positions WHERE quantity > 0"
)
invested = sum(float(r["quantity"]) * float(r["current_price"] or r["avg_entry_price"]) for r in pos_rows)
open_count = len(pos_rows)
available = max(0.0, self.portfolio_state.total_value - self.portfolio_state.reserve_pool - invested)
self.portfolio_state.active_pool = available
self.portfolio_state.open_position_count = open_count
except Exception:
logger.debug("Could not sync positions for portfolio state")
# Update portfolio heat and metrics from current positions
try:
metrics = self.performance_computer.compute_metrics(
closed_trades=[],
portfolio_value=self.portfolio_state.total_value,
active_pool=self.portfolio_state.active_pool,
reserve_pool=self.portfolio_state.reserve_pool,
daily_pnl=0.0,
unrealized_pnl=sum(
p.unrealized_pnl for p in self.portfolio_state.positions
),
portfolio_heat=self.portfolio_state.portfolio_heat,
daily_returns=[],
)
logger.debug(
"Performance update: value=%.2f heat=%.4f",
metrics.total_portfolio_value,
metrics.portfolio_heat,
)
except Exception:
logger.debug("Could not compute performance metrics")
# Refresh correlation matrix every 5 minutes
try:
await self._compute_correlation_matrix()
except Exception:
logger.debug("Could not refresh correlation matrix")
# Profit-taking: sell positions that have exceeded the take-profit threshold
try:
await self._check_profit_taking()
except Exception:
logger.debug("Could not run profit-taking check")
# Circuit breaker: check daily loss threshold
try:
await self._check_circuit_breaker_daily_loss()
except Exception:
logger.debug("Could not run circuit breaker daily loss check")
# Persist a snapshot every cycle (during market hours) for the performance tab
try:
await self._persist_daily_snapshot(now)
except Exception:
logger.debug("Could not persist snapshot")
except asyncio.CancelledError:
break
except Exception:
logger.exception("Unexpected error in performance loop")
if self.running:
await asyncio.sleep(5)
async def _risk_tier_scheduler(self) -> None:
"""Evaluate risk tier at daily market close (16:00 ET).
Task 30.1: Computes seconds until next 16:00 ET, sleeps until then,
loads latest PerformanceMetrics, computes reserve_pct, calls
evaluate_risk_tier(), and persists tier changes.
"""
from services.trading.trading_window import ET
while self.running:
try:
# Compute seconds until next 16:00 ET
now_utc = datetime.now(tz=timezone.utc)
et_now = now_utc.astimezone(ET)
target_today = et_now.replace(hour=16, minute=0, second=0, microsecond=0)
if et_now >= target_today:
# Already past 16:00 ET today — target tomorrow
target = target_today + timedelta(days=1)
else:
target = target_today
# Skip weekends
while target.weekday() > 4: # Saturday=5, Sunday=6
target += timedelta(days=1)
sleep_seconds = (target - et_now).total_seconds()
if sleep_seconds > 0:
await asyncio.sleep(sleep_seconds)
if not self.running:
break
if self.portfolio_state is None:
continue
# Load latest PerformanceMetrics from portfolio_snapshots or compute fresh
metrics: PerformanceMetrics | None = None
if self.pool is not None:
try:
row = await self.pool.fetchrow(
"SELECT metrics FROM portfolio_snapshots "
"ORDER BY snapshot_date DESC LIMIT 1"
)
if row and row["metrics"]:
m = json.loads(row["metrics"]) if isinstance(row["metrics"], str) else row["metrics"]
if m:
metrics = PerformanceMetrics(
total_portfolio_value=m.get("total_portfolio_value", self.portfolio_state.total_value),
active_pool=m.get("active_pool", self.portfolio_state.active_pool),
reserve_pool=m.get("reserve_pool", self.portfolio_state.reserve_pool),
unrealized_pnl=m.get("unrealized_pnl", 0.0),
realized_pnl=m.get("realized_pnl", 0.0),
daily_pnl=m.get("daily_pnl", 0.0),
win_count=m.get("win_count", 0),
loss_count=m.get("loss_count", 0),
win_rate=m.get("win_rate", 0.0),
avg_win=m.get("avg_win", 0.0),
avg_loss=m.get("avg_loss", 0.0),
profit_factor=m.get("profit_factor", 0.0),
sharpe_ratio=m.get("sharpe_ratio", 0.0),
max_drawdown=m.get("max_drawdown", 0.0),
current_drawdown_pct=m.get("current_drawdown_pct", 0.0),
portfolio_heat=m.get("portfolio_heat", 0.0),
)
except Exception:
logger.debug("Could not load metrics from portfolio_snapshots")
# Fall back to computing fresh metrics
if metrics is None:
metrics = self.performance_computer.compute_metrics(
closed_trades=[],
portfolio_value=self.portfolio_state.total_value,
active_pool=self.portfolio_state.active_pool,
reserve_pool=self.portfolio_state.reserve_pool,
daily_pnl=0.0,
unrealized_pnl=sum(
p.unrealized_pnl for p in self.portfolio_state.positions
),
portfolio_heat=self.portfolio_state.portfolio_heat,
daily_returns=[],
)
# Compute reserve_pct
total_value = self.portfolio_state.total_value
reserve_pct = (
self.portfolio_state.reserve_pool / total_value
if total_value > 0
else 0.0
)
# Evaluate risk tier
current_tier = self.config.risk_tier
new_tier = self.evaluate_risk_tier(current_tier, metrics, reserve_pct)
if new_tier is not None and new_tier != current_tier:
# Persist to risk_tier_history
if self.pool is not None:
try:
await self.pool.execute(
"INSERT INTO risk_tier_history "
"(previous_tier, new_tier, trigger_source, trigger_metrics, created_at) "
"VALUES ($1, $2, $3, $4, $5)",
current_tier,
new_tier,
"auto_adjustment",
json.dumps({
"win_rate": metrics.win_rate,
"current_drawdown_pct": metrics.current_drawdown_pct,
"reserve_pct": reserve_pct,
"sharpe_ratio": metrics.sharpe_ratio,
}),
datetime.now(tz=timezone.utc),
)
except Exception:
logger.debug("Could not persist risk tier change")
# Update config and active tier
self.config.risk_tier = new_tier
self._active_risk_tier = RISK_TIER_DEFAULTS.get(
new_tier, RISK_TIER_DEFAULTS["moderate"]
)
# Create alert notification
self.create_alert(
"risk_tier_changed",
f"Risk tier changed from {current_tier} to {new_tier} "
f"(win_rate={metrics.win_rate:.2%}, "
f"drawdown={metrics.current_drawdown_pct:.2%}, "
f"reserve={reserve_pct:.2%})",
)
logger.info(
"Risk tier changed: %s%s (win_rate=%.2f, drawdown=%.2f, reserve=%.2f)",
current_tier,
new_tier,
metrics.win_rate,
metrics.current_drawdown_pct,
reserve_pct,
)
except asyncio.CancelledError:
break
except Exception:
logger.exception("Unexpected error in risk tier scheduler")
if self.running:
await asyncio.sleep(60)
async def _rebalance_scheduler(self) -> None:
"""Evaluate portfolio rebalancing weekly at Monday 09:45 ET.
Task 30.2: Computes seconds until next Monday 09:45 ET, sleeps until
then, loads positions and risk tier, calls evaluate_rebalancing(),
and pushes rebalance orders to the broker queue.
"""
from services.trading.trading_window import ET, WINDOW_OPEN
while self.running:
try:
# Compute seconds until next Monday 09:45 ET
now_utc = datetime.now(tz=timezone.utc)
et_now = now_utc.astimezone(ET)
# Find next Monday
days_until_monday = (7 - et_now.weekday()) % 7
if days_until_monday == 0:
# It's Monday — check if we're past 09:45
target_today = et_now.replace(
hour=WINDOW_OPEN.hour,
minute=WINDOW_OPEN.minute,
second=0,
microsecond=0,
)
if et_now >= target_today:
# Already past 09:45 on Monday — target next Monday
days_until_monday = 7
else:
days_until_monday = 0
target = et_now.replace(
hour=WINDOW_OPEN.hour,
minute=WINDOW_OPEN.minute,
second=0,
microsecond=0,
) + timedelta(days=days_until_monday)
sleep_seconds = (target - et_now).total_seconds()
if sleep_seconds > 0:
await asyncio.sleep(sleep_seconds)
if not self.running:
break
# Respect circuit breaker status
if self.circuit_breaker.is_active(self._cb_state, now=datetime.now(tz=timezone.utc)):
logger.info("Rebalance skipped — circuit breaker is active")
continue
if self.portfolio_state is None:
continue
# Load current positions
positions = self.portfolio_state.positions
if self.pool is not None:
try:
positions = await self._load_open_positions()
except Exception:
logger.debug("Could not load positions for rebalancing")
if not positions:
continue
# Evaluate rebalancing
max_positions = (
self.config.max_open_positions
if hasattr(self.config, "max_open_positions")
else 10
)
rebalance_orders = self.rebalancer.evaluate(
positions,
self._active_risk_tier,
self.portfolio_state.active_pool,
max_positions,
)
# Push rebalance orders to broker queue
for order in rebalance_orders:
order_job = {
"ticker": order.ticker,
"action": order.action,
"quantity": order.quantity,
"order_type": "market",
"source": "trading_engine",
"reason": order.reason,
"tag": order.tag,
}
if self.redis is not None:
try:
broker_queue = queue_key(QUEUE_BROKER)
await self.redis.rpush(broker_queue, json.dumps(order_job))
logger.info(
"Rebalance: pushed %s order for %s (%d shares)",
order.action,
order.ticker,
order.quantity,
)
except Exception:
logger.exception("Failed to push rebalance order for %s", order.ticker)
else:
logger.info(
"Rebalance (no redis): %s %s %d shares — %s",
order.action,
order.ticker,
order.quantity,
order.reason,
)
if rebalance_orders:
logger.info("Rebalance cycle completed: %d orders generated", len(rebalance_orders))
else:
logger.debug("Rebalance cycle completed: no orders needed")
except asyncio.CancelledError:
break
except Exception:
logger.exception("Unexpected error in rebalance scheduler")
if self.running:
await asyncio.sleep(60)
# ------------------------------------------------------------------
# Correlation matrix computation
# ------------------------------------------------------------------
async def _compute_correlation_matrix(self) -> None:
"""Compute pairwise price correlations from market_snapshots and load into self.correlation_matrix.
Queries the last 30 days of daily close prices, computes daily returns,
then calculates Pearson correlation coefficients between each ticker pair.
Uses numpy when available, otherwise falls back to a manual computation.
"""
if self.pool is None:
return
try:
rows = await asyncio.wait_for(
self.pool.fetch(
"SELECT ms.ticker, ms.captured_at::date AS dt, (ms.data->>'c')::float AS close "
"FROM market_snapshots ms "
"JOIN companies c ON c.ticker = ms.ticker AND c.active = TRUE "
"WHERE ms.snapshot_type = 'bar' AND ms.captured_at > NOW() - INTERVAL '30 days' "
"ORDER BY ms.ticker, ms.captured_at"
),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning("Correlation matrix query timed out — skipping")
return
except Exception:
logger.debug("Could not query market_snapshots for correlation matrix")
return
if not rows:
return
# Group close prices by ticker, keyed by date
ticker_prices: dict[str, dict] = {}
for row in rows:
ticker = row["ticker"]
dt = row["dt"]
close = row["close"]
if close is None:
continue
if ticker not in ticker_prices:
ticker_prices[ticker] = {}
ticker_prices[ticker][dt] = close
# Compute daily returns for each ticker
ticker_returns: dict[str, list[float]] = {}
all_dates: set = set()
for ticker, prices_by_date in ticker_prices.items():
sorted_dates = sorted(prices_by_date.keys())
all_dates.update(sorted_dates)
returns = []
for i in range(1, len(sorted_dates)):
prev = prices_by_date[sorted_dates[i - 1]]
curr = prices_by_date[sorted_dates[i]]
if prev > 0:
returns.append((curr - prev) / prev)
if returns:
ticker_returns[ticker] = returns
tickers = list(ticker_returns.keys())
if len(tickers) < 2:
return
# Align returns to common dates for proper pairwise comparison
sorted_all_dates = sorted(all_dates)
aligned_returns: dict[str, list[float]] = {}
for ticker in tickers:
prices_by_date = ticker_prices[ticker]
aligned = []
for i in range(1, len(sorted_all_dates)):
prev_dt = sorted_all_dates[i - 1]
curr_dt = sorted_all_dates[i]
if prev_dt in prices_by_date and curr_dt in prices_by_date:
prev_p = prices_by_date[prev_dt]
curr_p = prices_by_date[curr_dt]
if prev_p > 0:
aligned.append((curr_p - prev_p) / prev_p)
else:
aligned.append(0.0)
else:
aligned.append(None) # type: ignore[arg-type]
aligned_returns[ticker] = aligned
corr_data: dict[tuple[str, str], float] = {}
if _HAS_NUMPY:
import numpy as _np
for i in range(len(tickers)):
for j in range(i + 1, len(tickers)):
a_raw = aligned_returns[tickers[i]]
b_raw = aligned_returns[tickers[j]]
# Use only indices where both have valid returns
pairs = [
(a_raw[k], b_raw[k])
for k in range(len(a_raw))
if a_raw[k] is not None and b_raw[k] is not None
]
if len(pairs) < 5:
continue
a_arr = _np.array([p[0] for p in pairs])
b_arr = _np.array([p[1] for p in pairs])
corr_matrix = _np.corrcoef(a_arr, b_arr)
corr_val = float(corr_matrix[0, 1])
if not _np.isnan(corr_val):
corr_data[(tickers[i], tickers[j])] = corr_val
else:
# Manual Pearson correlation fallback
for i in range(len(tickers)):
for j in range(i + 1, len(tickers)):
a_raw = aligned_returns[tickers[i]]
b_raw = aligned_returns[tickers[j]]
pairs = [
(a_raw[k], b_raw[k])
for k in range(len(a_raw))
if a_raw[k] is not None and b_raw[k] is not None
]
if len(pairs) < 5:
continue
a_vals = [p[0] for p in pairs]
b_vals = [p[1] for p in pairs]
n = len(a_vals)
mean_a = sum(a_vals) / n
mean_b = sum(b_vals) / n
cov = sum((a_vals[k] - mean_a) * (b_vals[k] - mean_b) for k in range(n))
std_a = sum((v - mean_a) ** 2 for v in a_vals) ** 0.5
std_b = sum((v - mean_b) ** 2 for v in b_vals) ** 0.5
if std_a > 0 and std_b > 0:
corr_data[(tickers[i], tickers[j])] = cov / (std_a * std_b)
self.correlation_matrix.load(corr_data)
logger.info("Correlation matrix loaded: %d pairs", len(corr_data))
# ------------------------------------------------------------------
# Async helpers
# ------------------------------------------------------------------
async def _persist_decision(self, decision: TradingDecision) -> None:
"""INSERT a trading decision into the trading_decisions table.
Task 27.5: Handles pool=None gracefully (skip persistence, log only).
"""
logger.info(
"Decision: %s %s ticker=%s reason=%s",
decision.decision,
decision.id[:8],
decision.ticker,
decision.skip_reason or "",
)
if self.pool is None:
return
try:
await self.pool.execute(
"INSERT INTO trading_decisions "
"(id, recommendation_id, decision, skip_reason, ticker, "
"computed_position_size, computed_share_quantity, "
"risk_tier_at_decision, portfolio_heat_at_decision, "
"active_pool_at_decision, reserve_pool_at_decision, "
"circuit_breaker_status, correlation_check_result, "
"sector_exposure_check_result, earnings_proximity_flag, "
"is_micro_trade, decision_trace, created_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, "
"$11, $12, $13, $14, $15, $16, $17, $18)",
decision.id,
decision.recommendation_id,
decision.decision,
decision.skip_reason,
decision.ticker,
decision.computed_position_size,
decision.computed_share_quantity,
decision.risk_tier_at_decision,
decision.portfolio_heat_at_decision,
decision.active_pool_at_decision,
decision.reserve_pool_at_decision,
decision.circuit_breaker_status,
json.dumps(decision.correlation_check_result),
json.dumps(decision.sector_exposure_check_result),
decision.earnings_proximity_flag,
decision.is_micro_trade,
json.dumps(decision.decision_trace),
decision.created_at,
)
except Exception:
logger.debug("Could not persist decision %s — table may not exist", decision.id[:8])
async def _sync_positions_and_siphon(self) -> None:
"""Sync positions from DB and siphon profit on closed positions.
Task 27.4: Fetches current positions, detects closes, and calls
siphon_profit() for profitable closes.
"""
if self.pool is None or self.portfolio_state is None:
return
try:
positions = await self._load_open_positions()
old_tickers = {p.ticker for p in self.portfolio_state.positions}
new_tickers = {p.ticker for p in positions}
# Detect closed positions
closed_tickers = old_tickers - new_tickers
for ticker in closed_tickers:
old_pos = next(
(p for p in self.portfolio_state.positions if p.ticker == ticker),
None,
)
if old_pos and old_pos.unrealized_pnl > 0:
transfer, new_balance = self.reserve_pool_controller.siphon_profit(
old_pos.unrealized_pnl,
self.portfolio_state.reserve_pool,
)
if transfer > 0:
self.portfolio_state.reserve_pool = new_balance
# Persist to reserve_pool_ledger
try:
await self.pool.execute(
"INSERT INTO reserve_pool_ledger "
"(amount, balance_after, trigger_type, reference_id, notes, created_at) "
"VALUES ($1, $2, 'profit_siphon', $3, $4, $5)",
transfer,
new_balance,
ticker,
f"Siphoned from {ticker} close",
datetime.now(tz=timezone.utc),
)
except Exception:
logger.debug("Could not persist siphon event for %s", ticker)
logger.info(
"Siphoned $%.2f from %s close → reserve now $%.2f",
transfer,
ticker,
new_balance,
)
# Update portfolio state
self.portfolio_state.positions = positions
self.portfolio_state.open_position_count = len(positions)
except Exception:
logger.exception("Error syncing positions")
async def _fetch_current_prices(self, tickers: list[str]) -> dict[str, float]:
"""Fetch latest prices from Polygon API for the given tickers.
Task 28.2: Uses httpx for async HTTP calls. Returns a dict mapping
ticker → latest price. Handles API errors gracefully.
"""
if not tickers:
return {}
prices: dict[str, float] = {}
# Use the market data config for API key
api_key = ""
base_url = "https://api.polygon.io"
try:
from services.shared.config import load_config
app_config = load_config()
api_key = app_config.market_data.api_key
base_url = app_config.market_data.base_url
except Exception:
pass
if not api_key:
logger.debug("No Polygon API key configured — skipping price fetch")
return prices
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Use the grouped daily endpoint or snapshot for multiple tickers
tickers_str = ",".join(tickers)
url = f"{base_url}/v2/snapshot/locale/us/markets/stocks/tickers"
params = {"tickers": tickers_str, "apiKey": api_key}
resp = await client.get(url, params=params)
if resp.status_code == 200:
data = resp.json()
for item in data.get("tickers", []):
t = item.get("ticker", "")
last_trade = item.get("lastTrade", {})
price = last_trade.get("p", 0.0)
if t and price > 0:
prices[t] = price
else:
logger.warning("Polygon API returned status %d", resp.status_code)
except Exception:
logger.warning("Failed to fetch prices from Polygon API")
return prices
async def _load_open_positions(self) -> list[OpenPosition]:
"""Load open positions from the database.
Task 28.3: Queries the position_stop_levels table for active positions.
Returns typed OpenPosition list.
"""
if self.pool is None:
return []
positions: list[OpenPosition] = []
try:
rows = await self.pool.fetch(
"SELECT ticker, entry_price, stop_loss_price, take_profit_price, "
"signal_confidence, is_micro_trade "
"FROM position_stop_levels WHERE active = TRUE"
)
for row in rows:
positions.append(
OpenPosition(
ticker=row["ticker"],
quantity=1, # Default; real quantity from orders table
entry_price=float(row["entry_price"]),
current_price=float(row["entry_price"]),
unrealized_pnl=0.0,
market_value=float(row["entry_price"]),
sector="",
stop_loss_price=float(row["stop_loss_price"]),
take_profit_price=float(row["take_profit_price"]),
signal_confidence=float(row["signal_confidence"]),
is_micro_trade=bool(row["is_micro_trade"]),
)
)
except Exception:
logger.debug("Could not load open positions — table may not exist")
return positions
async def _load_stop_levels(self) -> dict[str, StopLevels]:
"""Load active stop-loss/take-profit levels from the database.
Task 28.3: Queries position_stop_levels WHERE active = TRUE.
Returns dict keyed by ticker.
"""
if self.pool is None:
return {}
levels: dict[str, StopLevels] = {}
try:
rows = await self.pool.fetch(
"SELECT ticker, stop_loss_price, take_profit_price, "
"trailing_stop_active, atr_value, atr_multiplier, "
"reward_risk_ratio, updated_at "
"FROM position_stop_levels WHERE active = TRUE"
)
for row in rows:
levels[row["ticker"]] = StopLevels(
stop_loss_price=float(row["stop_loss_price"]),
take_profit_price=float(row["take_profit_price"]),
trailing_stop_active=bool(row["trailing_stop_active"]),
atr_value=float(row["atr_value"]),
atr_multiplier=float(row["atr_multiplier"]),
reward_risk_ratio=float(row["reward_risk_ratio"]),
)
return levels
except Exception:
logger.debug("Could not load stop levels — table may not exist")
return {}
async def _check_circuit_breaker_daily_loss(self) -> None:
"""Check if daily unrealized loss exceeds the circuit breaker threshold.
If the portfolio has lost more than circuit_breaker_daily_loss_pct
(default 5%) from its start-of-day value, activate the circuit breaker
to halt all new trades.
"""
if self.pool is None or self.portfolio_state is None:
return
# Already active — nothing to do
if self._cb_state.active:
return
try:
# Get total unrealized P&L from positions
row = await self.pool.fetchrow(
"SELECT COALESCE(SUM(unrealized_pnl), 0) AS total_pnl FROM positions WHERE quantity > 0"
)
total_pnl = float(row["total_pnl"]) if row else 0.0
total_value = self.portfolio_state.total_value
if total_value <= 0:
return
daily_loss_pct = abs(min(0, total_pnl)) / total_value
# Check against threshold (default 5%)
threshold = getattr(self.config, 'circuit_breaker_daily_loss_pct', 0.05) or 0.05
if isinstance(threshold, str):
threshold = float(threshold)
if daily_loss_pct >= threshold:
# Activate circuit breaker
now = datetime.now(tz=timezone.utc)
cooldown_hours = getattr(self.config, 'circuit_breaker_volatility_pause_hours', 2) or 2
cooldown_expires = now + timedelta(hours=int(cooldown_hours))
self._cb_state = CircuitBreakerState(
active=True,
trigger_type="daily_loss_limit",
triggered_at=now,
cooldown_expires=cooldown_expires,
)
# Persist to DB
try:
await self.pool.execute(
"INSERT INTO circuit_breaker_events "
"(trigger_type, triggered_at, cooldown_expires, trigger_data) "
"VALUES ($1, $2, $3, $4)",
"daily_loss_limit",
now,
cooldown_expires,
json.dumps({
"daily_loss_pct": round(daily_loss_pct, 4),
"threshold": threshold,
"total_pnl": round(total_pnl, 2),
"total_value": round(total_value, 2),
}),
)
except Exception:
logger.debug("Could not persist circuit breaker event")
logger.warning(
"CIRCUIT BREAKER ACTIVATED: daily loss %.1f%% exceeds %.1f%% threshold "
"(P&L=$%.2f, value=$%.2f). Trading halted until %s",
daily_loss_pct * 100, threshold * 100,
total_pnl, total_value, cooldown_expires.isoformat(),
)
# Create alert
self.create_alert(
"circuit_breaker_activated",
f"Daily loss {daily_loss_pct:.1%} exceeded {threshold:.1%} threshold. "
f"Trading halted until {cooldown_expires.strftime('%H:%M ET')}.",
)
except Exception:
logger.debug("Could not check circuit breaker daily loss")
async def _check_profit_taking(self) -> None:
"""Check positions for profit-taking opportunities.
Sells positions that have gained more than the take-profit threshold.
Only runs during trading window hours.
"""
if self.pool is None:
return
# Only sell during market hours
now = datetime.now(tz=timezone.utc)
if not is_within_trading_window(now):
return
try:
rows = await self.pool.fetch(
"SELECT ticker, quantity, avg_entry_price, current_price, unrealized_pnl "
"FROM positions WHERE quantity > 0"
)
except Exception:
return
for row in rows:
ticker = row["ticker"]
qty = int(row["quantity"])
entry = float(row["avg_entry_price"])
current = float(row["current_price"] or 0)
if entry <= 0 or current <= 0 or qty <= 0:
continue
gain_pct = (current - entry) / entry
# Absolute profit cap: sell if gain > 10%
# Or tier-based: reward_risk_ratio * stop_loss_atr_multiplier * ~2% base ATR
tier_target = self._active_risk_tier.reward_risk_ratio * self._active_risk_tier.stop_loss_atr_multiplier * 0.02
should_sell = gain_pct >= 0.10 or gain_pct >= tier_target
if should_sell:
logger.info(
"Profit-taking: %s gained %.1f%% (target=%.1f%%) — selling %d shares",
ticker, gain_pct * 100, max(10.0, tier_target * 100), qty,
)
await self._submit_sell_order(ticker, qty, f"profit_taking_{gain_pct:.1%}")
# Update portfolio state
if self.portfolio_state:
self.portfolio_state.open_position_count = max(
0, self.portfolio_state.open_position_count - 1
)
self.portfolio_state.active_pool += current * qty
async def _submit_sell_order(
self, ticker: str, quantity: int, reason: str,
estimated_value: float = 0.0,
) -> None:
"""Push a sell order to the broker queue via Redis."""
order_job = {
"ticker": ticker,
"action": "sell",
"side": "sell",
"quantity": quantity,
"order_type": "market",
"estimated_value": estimated_value,
"source": "trading_engine",
"reason": reason,
}
if self.redis is not None:
try:
broker_queue = queue_key(QUEUE_BROKER)
await self.redis.rpush(broker_queue, json.dumps(order_job))
logger.info("Submitted sell order for %s (%d shares): %s", ticker, quantity, reason)
except Exception:
logger.exception("Failed to push sell order for %s", ticker)
else:
logger.info("Sell order (no redis): %s %d shares — %s", ticker, quantity, reason)
async def _persist_daily_snapshot(self, now: datetime) -> None:
"""Persist end-of-day portfolio snapshot to portfolio_snapshots table.
Task 29.2: Called after 4:00 PM ET when market closes.
"""
if self.pool is None or self.portfolio_state is None:
return
from services.trading.trading_window import ET
et_now = now.astimezone(ET)
snapshot_date = et_now.date()
try:
await self.pool.execute(
"INSERT INTO portfolio_snapshots "
"(snapshot_date, portfolio_value, active_pool, reserve_pool, "
"daily_return, cumulative_return, unrealized_pnl, realized_pnl, "
"win_count, loss_count, win_rate, sharpe_ratio, max_drawdown, "
"current_drawdown_pct, portfolio_heat, risk_tier, "
"positions, metrics, created_at) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, "
"$11, $12, $13, $14, $15, $16, $17, $18, $19) "
"ON CONFLICT (snapshot_date) DO UPDATE SET "
"portfolio_value = EXCLUDED.portfolio_value, "
"active_pool = EXCLUDED.active_pool, "
"reserve_pool = EXCLUDED.reserve_pool, "
"updated_at = NOW()",
snapshot_date,
self.portfolio_state.total_value,
self.portfolio_state.active_pool,
self.portfolio_state.reserve_pool,
0.0, # daily_return
0.0, # cumulative_return
sum(p.unrealized_pnl for p in self.portfolio_state.positions),
0.0, # realized_pnl
0, # win_count
0, # loss_count
0.0, # win_rate
0.0, # sharpe_ratio
0.0, # max_drawdown
0.0, # current_drawdown_pct
self.portfolio_state.portfolio_heat,
self.config.risk_tier,
json.dumps([]), # positions
json.dumps({}), # metrics
now,
)
logger.info("Persisted daily snapshot for %s", snapshot_date)
except Exception:
logger.debug("Could not persist daily snapshot — table may not exist")
# ------------------------------------------------------------------
# Decision builders
# ------------------------------------------------------------------
def _skip_decision(
self,
*,
rec_id: str | None,
ticker: str,
skip_reason: str,
risk_tier: RiskTierConfig,
portfolio_state: PortfolioState,
circuit_breaker_state: CircuitBreakerState,
reasoning: list[str],
now: datetime,
) -> TradingDecision:
return TradingDecision(
id=str(uuid.uuid4()),
recommendation_id=rec_id,
decision="skip",
skip_reason=skip_reason,
ticker=ticker,
computed_position_size=None,
computed_share_quantity=None,
risk_tier_at_decision=risk_tier.name,
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
active_pool_at_decision=portfolio_state.active_pool,
reserve_pool_at_decision=portfolio_state.reserve_pool,
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
decision_trace={"reasoning": reasoning},
created_at=now,
)
def _act_decision(
self,
*,
rec_id: str | None,
ticker: str,
size_result: PositionSizeResult,
risk_tier: RiskTierConfig,
portfolio_state: PortfolioState,
circuit_breaker_state: CircuitBreakerState,
reasoning: list[str],
now: datetime,
) -> TradingDecision:
return TradingDecision(
id=str(uuid.uuid4()),
recommendation_id=rec_id,
decision="act",
skip_reason=None,
ticker=ticker,
computed_position_size=size_result.dollar_amount,
computed_share_quantity=size_result.share_quantity,
risk_tier_at_decision=risk_tier.name,
portfolio_heat_at_decision=portfolio_state.portfolio_heat,
active_pool_at_decision=portfolio_state.active_pool,
reserve_pool_at_decision=portfolio_state.reserve_pool,
circuit_breaker_status="active" if circuit_breaker_state.active else "inactive",
decision_trace={"reasoning": reasoning},
created_at=now,
)