"""Execution audit trail - records every step from recommendation to market outcome. Writes structured audit events to the audit_events table so the full decision chain is traceable: recommendation → risk evaluation → order submission → broker response → fill/rejection/cancellation. Each event captures the entity type, entity ID, event type, actor, and a JSONB data payload with stage-specific details. Requirements: 8.3, 11.3 Design: Section 4.9 (Broker Adapter), Section 6.1 (PostgreSQL audit_events) """ from __future__ import annotations import json import logging import uuid from datetime import datetime, timezone from typing import Any import asyncpg logger = logging.getLogger("audit") # --------------------------------------------------------------------------- # Event type constants # --------------------------------------------------------------------------- # Recommendation stage AUDIT_RECOMMENDATION_GENERATED = "recommendation.generated" AUDIT_RECOMMENDATION_SUPPRESSED = "recommendation.suppressed" # Risk evaluation stage AUDIT_RISK_EVALUATED = "risk.evaluated" AUDIT_RISK_REJECTED = "risk.rejected" # Order lifecycle AUDIT_ORDER_SUBMITTED = "order.submitted" AUDIT_ORDER_ACCEPTED = "order.accepted" AUDIT_ORDER_FILLED = "order.filled" AUDIT_ORDER_REJECTED = "order.rejected" AUDIT_ORDER_CANCELLED = "order.cancelled" AUDIT_ORDER_DUPLICATE = "order.duplicate_prevented" # Position changes AUDIT_POSITION_OPENED = "position.opened" AUDIT_POSITION_CLOSED = "position.closed" AUDIT_POSITION_UPDATED = "position.updated" # Trading mode changes AUDIT_TRADING_MODE_CHANGED = "trading.mode_changed" # Operator approval workflow AUDIT_APPROVAL_REQUESTED = "approval.requested" AUDIT_APPROVAL_APPROVED = "approval.approved" AUDIT_APPROVAL_REJECTED = "approval.rejected" AUDIT_APPROVAL_EXPIRED = "approval.expired" # --------------------------------------------------------------------------- # Core audit writer # --------------------------------------------------------------------------- _INSERT_AUDIT_EVENT = """ INSERT INTO audit_events (id, event_type, entity_type, entity_id, actor, data, created_at) VALUES ($1::uuid, $2, $3, $4::uuid, $5, $6::jsonb, $7) """ async def record_audit_event( pool: asyncpg.Pool, event_type: str, entity_type: str, entity_id: str, data: dict[str, Any], actor: str = "system", timestamp: datetime | None = None, ) -> str: """Write a single audit event to PostgreSQL. Returns the audit event UUID. """ event_id = str(uuid.uuid4()) ts = timestamp or datetime.now(timezone.utc) try: await pool.execute( _INSERT_AUDIT_EVENT, event_id, event_type, entity_type, entity_id, actor, json.dumps(data, default=str), ts, ) except Exception: logger.warning( "Failed to write audit event %s for %s/%s", event_type, entity_type, entity_id, exc_info=True, ) return "" return event_id # --------------------------------------------------------------------------- # Convenience helpers for each execution stage # --------------------------------------------------------------------------- async def audit_recommendation_generated( pool: asyncpg.Pool, recommendation_id: str, ticker: str, action: str, mode: str, confidence: float, evidence_count: int, suppressed: bool = False, ) -> str: """Record that a recommendation was generated.""" event_type = AUDIT_RECOMMENDATION_SUPPRESSED if suppressed else AUDIT_RECOMMENDATION_GENERATED return await record_audit_event( pool, event_type=event_type, entity_type="recommendation", entity_id=recommendation_id, data={ "ticker": ticker, "action": action, "mode": mode, "confidence": confidence, "evidence_count": evidence_count, "suppressed": suppressed, }, actor="recommendation_worker", ) async def audit_risk_evaluated( pool: asyncpg.Pool, evaluation_id: str, recommendation_id: str | None, ticker: str, eligible: bool, allowed_mode: str, rejection_reasons: list[str], check_count: int, ) -> str: """Record a risk evaluation result.""" event_type = AUDIT_RISK_REJECTED if not eligible else AUDIT_RISK_EVALUATED return await record_audit_event( pool, event_type=event_type, entity_type="risk_evaluation", entity_id=evaluation_id, data={ "recommendation_id": recommendation_id, "ticker": ticker, "eligible": eligible, "allowed_mode": allowed_mode, "rejection_reasons": rejection_reasons, "check_count": check_count, }, actor="risk_engine", ) async def audit_order_submitted( pool: asyncpg.Pool, order_id: str, ticker: str, side: str, quantity: float, order_type: str, idempotency_key: str, recommendation_id: str | None = None, evaluation_id: str | None = None, ) -> str: """Record that an order was submitted to the broker.""" return await record_audit_event( pool, event_type=AUDIT_ORDER_SUBMITTED, entity_type="order", entity_id=order_id, data={ "ticker": ticker, "side": side, "quantity": quantity, "order_type": order_type, "idempotency_key": idempotency_key, "recommendation_id": recommendation_id, "evaluation_id": evaluation_id, }, actor="broker_service", ) async def audit_order_filled( pool: asyncpg.Pool, order_id: str, ticker: str, side: str, fill_quantity: float, fill_price: float | None, broker_order_id: str, ) -> str: """Record that an order was filled by the broker.""" return await record_audit_event( pool, event_type=AUDIT_ORDER_FILLED, entity_type="order", entity_id=order_id, data={ "ticker": ticker, "side": side, "fill_quantity": fill_quantity, "fill_price": fill_price, "broker_order_id": broker_order_id, }, actor="broker_service", ) async def audit_order_rejected( pool: asyncpg.Pool, order_id: str, ticker: str, reason: str, source: str = "broker", ) -> str: """Record that an order was rejected (by risk engine or broker).""" return await record_audit_event( pool, event_type=AUDIT_ORDER_REJECTED, entity_type="order", entity_id=order_id, data={ "ticker": ticker, "reason": reason, "rejection_source": source, }, actor="broker_service", ) async def audit_order_cancelled( pool: asyncpg.Pool, order_id: str, ticker: str, broker_order_id: str, ) -> str: """Record that an order was cancelled.""" return await record_audit_event( pool, event_type=AUDIT_ORDER_CANCELLED, entity_type="order", entity_id=order_id, data={ "ticker": ticker, "broker_order_id": broker_order_id, }, actor="broker_service", ) async def audit_duplicate_prevented( pool: asyncpg.Pool, order_id: str, ticker: str, idempotency_key: str, detected_via: str, ) -> str: """Record that a duplicate order was prevented.""" return await record_audit_event( pool, event_type=AUDIT_ORDER_DUPLICATE, entity_type="order", entity_id=order_id, data={ "ticker": ticker, "idempotency_key": idempotency_key, "detected_via": detected_via, }, actor="broker_service", ) async def audit_position_change( pool: asyncpg.Pool, order_id: str, ticker: str, side: str, quantity_before: float, quantity_after: float, avg_entry_before: float, avg_entry_after: float, ) -> str: """Record a position change resulting from a fill.""" if quantity_before == 0 and quantity_after > 0: event_type = AUDIT_POSITION_OPENED elif quantity_after == 0: event_type = AUDIT_POSITION_CLOSED else: event_type = AUDIT_POSITION_UPDATED return await record_audit_event( pool, event_type=event_type, entity_type="position", entity_id=order_id, data={ "ticker": ticker, "side": side, "quantity_before": quantity_before, "quantity_after": quantity_after, "avg_entry_before": avg_entry_before, "avg_entry_after": avg_entry_after, }, actor="broker_service", ) async def audit_approval_requested( pool: asyncpg.Pool, approval_id: str, ticker: str, side: str, quantity: float, estimated_value: float, recommendation_id: str | None = None, expires_at: str | None = None, ) -> str: """Record that an operator approval was requested for a live order.""" return await record_audit_event( pool, event_type=AUDIT_APPROVAL_REQUESTED, entity_type="approval", entity_id=approval_id, data={ "ticker": ticker, "side": side, "quantity": quantity, "estimated_value": estimated_value, "recommendation_id": recommendation_id, "expires_at": expires_at, }, actor="broker_service", ) async def audit_approval_reviewed( pool: asyncpg.Pool, approval_id: str, ticker: str, approved: bool, reviewed_by: str = "operator", review_note: str = "", ) -> str: """Record that an operator reviewed an approval request.""" event_type = AUDIT_APPROVAL_APPROVED if approved else AUDIT_APPROVAL_REJECTED return await record_audit_event( pool, event_type=event_type, entity_type="approval", entity_id=approval_id, data={ "ticker": ticker, "approved": approved, "reviewed_by": reviewed_by, "review_note": review_note, }, actor=reviewed_by, ) async def audit_approval_expired( pool: asyncpg.Pool, approval_id: str, ticker: str, ) -> str: """Record that an approval request expired without review.""" return await record_audit_event( pool, event_type=AUDIT_APPROVAL_EXPIRED, entity_type="approval", entity_id=approval_id, data={"ticker": ticker}, actor="system", ) async def audit_trading_mode_changed( pool: asyncpg.Pool, config_id: str, old_mode: str, new_mode: str, actor: str = "operator", ) -> str: """Record a trading mode change.""" return await record_audit_event( pool, event_type=AUDIT_TRADING_MODE_CHANGED, entity_type="risk_config", entity_id=config_id, data={ "old_mode": old_mode, "new_mode": new_mode, }, actor=actor, ) # --------------------------------------------------------------------------- # Query helpers for audit trail retrieval (Requirement 11.3) # --------------------------------------------------------------------------- _FETCH_AUDIT_TRAIL_FOR_ORDER = """ SELECT id, event_type, entity_type, entity_id, actor, data, created_at FROM audit_events WHERE entity_id = $1::uuid OR data->>'recommendation_id' = $2 OR data->>'order_id' = $2 ORDER BY created_at ASC """ _FETCH_AUDIT_TRAIL_BY_ENTITY = """ SELECT id, event_type, entity_type, entity_id, actor, data, created_at FROM audit_events WHERE entity_type = $1 AND entity_id = $2::uuid ORDER BY created_at ASC """ _FETCH_FULL_EXECUTION_TRAIL = """ SELECT id, event_type, entity_type, entity_id, actor, data, created_at FROM audit_events WHERE entity_id = $1::uuid OR entity_id IN ( SELECT entity_id FROM audit_events WHERE data->>'recommendation_id' = $2 ) ORDER BY created_at ASC """ async def get_order_audit_trail( pool: asyncpg.Pool, order_id: str, recommendation_id: str | None = None, ) -> list[dict[str, Any]]: """Fetch the full audit trail for an order, including related recommendation and risk events. Returns events ordered chronologically so the full decision chain is visible: recommendation → risk → order → fill/reject. """ ref_id = recommendation_id or order_id rows = await pool.fetch(_FETCH_AUDIT_TRAIL_FOR_ORDER, order_id, ref_id) return [ { "id": str(row["id"]), "event_type": row["event_type"], "entity_type": row["entity_type"], "entity_id": str(row["entity_id"]), "actor": row["actor"], "data": row["data"] if isinstance(row["data"], dict) else json.loads(row["data"]), "created_at": row["created_at"].isoformat() if row["created_at"] else None, } for row in rows ] async def get_entity_audit_trail( pool: asyncpg.Pool, entity_type: str, entity_id: str, ) -> list[dict[str, Any]]: """Fetch all audit events for a specific entity.""" rows = await pool.fetch(_FETCH_AUDIT_TRAIL_BY_ENTITY, entity_type, entity_id) return [ { "id": str(row["id"]), "event_type": row["event_type"], "entity_type": row["entity_type"], "entity_id": str(row["entity_id"]), "actor": row["actor"], "data": row["data"] if isinstance(row["data"], dict) else json.loads(row["data"]), "created_at": row["created_at"].isoformat() if row["created_at"] else None, } for row in rows ]