Files

494 lines
14 KiB
Python

"""Execution audit trail - records every step from recommendation to market outcome.
Writes structured audit events to the audit_events table so the full
decision chain is traceable: recommendation → risk evaluation → order
submission → broker response → fill/rejection/cancellation.
Each event captures the entity type, entity ID, event type, actor,
and a JSONB data payload with stage-specific details.
Requirements: 8.3, 11.3
Design: Section 4.9 (Broker Adapter), Section 6.1 (PostgreSQL audit_events)
"""
from __future__ import annotations
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
import asyncpg
logger = logging.getLogger("audit")
# ---------------------------------------------------------------------------
# Event type constants
# ---------------------------------------------------------------------------
# Recommendation stage
AUDIT_RECOMMENDATION_GENERATED = "recommendation.generated"
AUDIT_RECOMMENDATION_SUPPRESSED = "recommendation.suppressed"
# Risk evaluation stage
AUDIT_RISK_EVALUATED = "risk.evaluated"
AUDIT_RISK_REJECTED = "risk.rejected"
# Order lifecycle
AUDIT_ORDER_SUBMITTED = "order.submitted"
AUDIT_ORDER_ACCEPTED = "order.accepted"
AUDIT_ORDER_FILLED = "order.filled"
AUDIT_ORDER_REJECTED = "order.rejected"
AUDIT_ORDER_CANCELLED = "order.cancelled"
AUDIT_ORDER_DUPLICATE = "order.duplicate_prevented"
# Position changes
AUDIT_POSITION_OPENED = "position.opened"
AUDIT_POSITION_CLOSED = "position.closed"
AUDIT_POSITION_UPDATED = "position.updated"
# Trading mode changes
AUDIT_TRADING_MODE_CHANGED = "trading.mode_changed"
# Operator approval workflow
AUDIT_APPROVAL_REQUESTED = "approval.requested"
AUDIT_APPROVAL_APPROVED = "approval.approved"
AUDIT_APPROVAL_REJECTED = "approval.rejected"
AUDIT_APPROVAL_EXPIRED = "approval.expired"
# ---------------------------------------------------------------------------
# Core audit writer
# ---------------------------------------------------------------------------
_INSERT_AUDIT_EVENT = """
INSERT INTO audit_events (id, event_type, entity_type, entity_id, actor, data, created_at)
VALUES ($1::uuid, $2, $3, $4::uuid, $5, $6::jsonb, $7)
"""
async def record_audit_event(
pool: asyncpg.Pool,
event_type: str,
entity_type: str,
entity_id: str,
data: dict[str, Any],
actor: str = "system",
timestamp: datetime | None = None,
) -> str:
"""Write a single audit event to PostgreSQL.
Returns the audit event UUID.
"""
event_id = str(uuid.uuid4())
ts = timestamp or datetime.now(timezone.utc)
try:
await pool.execute(
_INSERT_AUDIT_EVENT,
event_id,
event_type,
entity_type,
entity_id,
actor,
json.dumps(data, default=str),
ts,
)
except Exception:
logger.warning(
"Failed to write audit event %s for %s/%s",
event_type, entity_type, entity_id,
exc_info=True,
)
return ""
return event_id
# ---------------------------------------------------------------------------
# Convenience helpers for each execution stage
# ---------------------------------------------------------------------------
async def audit_recommendation_generated(
pool: asyncpg.Pool,
recommendation_id: str,
ticker: str,
action: str,
mode: str,
confidence: float,
evidence_count: int,
suppressed: bool = False,
) -> str:
"""Record that a recommendation was generated."""
event_type = AUDIT_RECOMMENDATION_SUPPRESSED if suppressed else AUDIT_RECOMMENDATION_GENERATED
return await record_audit_event(
pool,
event_type=event_type,
entity_type="recommendation",
entity_id=recommendation_id,
data={
"ticker": ticker,
"action": action,
"mode": mode,
"confidence": confidence,
"evidence_count": evidence_count,
"suppressed": suppressed,
},
actor="recommendation_worker",
)
async def audit_risk_evaluated(
pool: asyncpg.Pool,
evaluation_id: str,
recommendation_id: str | None,
ticker: str,
eligible: bool,
allowed_mode: str,
rejection_reasons: list[str],
check_count: int,
) -> str:
"""Record a risk evaluation result."""
event_type = AUDIT_RISK_REJECTED if not eligible else AUDIT_RISK_EVALUATED
return await record_audit_event(
pool,
event_type=event_type,
entity_type="risk_evaluation",
entity_id=evaluation_id,
data={
"recommendation_id": recommendation_id,
"ticker": ticker,
"eligible": eligible,
"allowed_mode": allowed_mode,
"rejection_reasons": rejection_reasons,
"check_count": check_count,
},
actor="risk_engine",
)
async def audit_order_submitted(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
side: str,
quantity: float,
order_type: str,
idempotency_key: str,
recommendation_id: str | None = None,
evaluation_id: str | None = None,
) -> str:
"""Record that an order was submitted to the broker."""
return await record_audit_event(
pool,
event_type=AUDIT_ORDER_SUBMITTED,
entity_type="order",
entity_id=order_id,
data={
"ticker": ticker,
"side": side,
"quantity": quantity,
"order_type": order_type,
"idempotency_key": idempotency_key,
"recommendation_id": recommendation_id,
"evaluation_id": evaluation_id,
},
actor="broker_service",
)
async def audit_order_filled(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
side: str,
fill_quantity: float,
fill_price: float | None,
broker_order_id: str,
) -> str:
"""Record that an order was filled by the broker."""
return await record_audit_event(
pool,
event_type=AUDIT_ORDER_FILLED,
entity_type="order",
entity_id=order_id,
data={
"ticker": ticker,
"side": side,
"fill_quantity": fill_quantity,
"fill_price": fill_price,
"broker_order_id": broker_order_id,
},
actor="broker_service",
)
async def audit_order_rejected(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
reason: str,
source: str = "broker",
) -> str:
"""Record that an order was rejected (by risk engine or broker)."""
return await record_audit_event(
pool,
event_type=AUDIT_ORDER_REJECTED,
entity_type="order",
entity_id=order_id,
data={
"ticker": ticker,
"reason": reason,
"rejection_source": source,
},
actor="broker_service",
)
async def audit_order_cancelled(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
broker_order_id: str,
) -> str:
"""Record that an order was cancelled."""
return await record_audit_event(
pool,
event_type=AUDIT_ORDER_CANCELLED,
entity_type="order",
entity_id=order_id,
data={
"ticker": ticker,
"broker_order_id": broker_order_id,
},
actor="broker_service",
)
async def audit_duplicate_prevented(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
idempotency_key: str,
detected_via: str,
) -> str:
"""Record that a duplicate order was prevented."""
return await record_audit_event(
pool,
event_type=AUDIT_ORDER_DUPLICATE,
entity_type="order",
entity_id=order_id,
data={
"ticker": ticker,
"idempotency_key": idempotency_key,
"detected_via": detected_via,
},
actor="broker_service",
)
async def audit_position_change(
pool: asyncpg.Pool,
order_id: str,
ticker: str,
side: str,
quantity_before: float,
quantity_after: float,
avg_entry_before: float,
avg_entry_after: float,
) -> str:
"""Record a position change resulting from a fill."""
if quantity_before == 0 and quantity_after > 0:
event_type = AUDIT_POSITION_OPENED
elif quantity_after == 0:
event_type = AUDIT_POSITION_CLOSED
else:
event_type = AUDIT_POSITION_UPDATED
return await record_audit_event(
pool,
event_type=event_type,
entity_type="position",
entity_id=order_id,
data={
"ticker": ticker,
"side": side,
"quantity_before": quantity_before,
"quantity_after": quantity_after,
"avg_entry_before": avg_entry_before,
"avg_entry_after": avg_entry_after,
},
actor="broker_service",
)
async def audit_approval_requested(
pool: asyncpg.Pool,
approval_id: str,
ticker: str,
side: str,
quantity: float,
estimated_value: float,
recommendation_id: str | None = None,
expires_at: str | None = None,
) -> str:
"""Record that an operator approval was requested for a live order."""
return await record_audit_event(
pool,
event_type=AUDIT_APPROVAL_REQUESTED,
entity_type="approval",
entity_id=approval_id,
data={
"ticker": ticker,
"side": side,
"quantity": quantity,
"estimated_value": estimated_value,
"recommendation_id": recommendation_id,
"expires_at": expires_at,
},
actor="broker_service",
)
async def audit_approval_reviewed(
pool: asyncpg.Pool,
approval_id: str,
ticker: str,
approved: bool,
reviewed_by: str = "operator",
review_note: str = "",
) -> str:
"""Record that an operator reviewed an approval request."""
event_type = AUDIT_APPROVAL_APPROVED if approved else AUDIT_APPROVAL_REJECTED
return await record_audit_event(
pool,
event_type=event_type,
entity_type="approval",
entity_id=approval_id,
data={
"ticker": ticker,
"approved": approved,
"reviewed_by": reviewed_by,
"review_note": review_note,
},
actor=reviewed_by,
)
async def audit_approval_expired(
pool: asyncpg.Pool,
approval_id: str,
ticker: str,
) -> str:
"""Record that an approval request expired without review."""
return await record_audit_event(
pool,
event_type=AUDIT_APPROVAL_EXPIRED,
entity_type="approval",
entity_id=approval_id,
data={"ticker": ticker},
actor="system",
)
async def audit_trading_mode_changed(
pool: asyncpg.Pool,
config_id: str,
old_mode: str,
new_mode: str,
actor: str = "operator",
) -> str:
"""Record a trading mode change."""
return await record_audit_event(
pool,
event_type=AUDIT_TRADING_MODE_CHANGED,
entity_type="risk_config",
entity_id=config_id,
data={
"old_mode": old_mode,
"new_mode": new_mode,
},
actor=actor,
)
# ---------------------------------------------------------------------------
# Query helpers for audit trail retrieval (Requirement 11.3)
# ---------------------------------------------------------------------------
_FETCH_AUDIT_TRAIL_FOR_ORDER = """
SELECT id, event_type, entity_type, entity_id, actor, data, created_at
FROM audit_events
WHERE entity_id = $1::uuid
OR data->>'recommendation_id' = $2
OR data->>'order_id' = $2
ORDER BY created_at ASC
"""
_FETCH_AUDIT_TRAIL_BY_ENTITY = """
SELECT id, event_type, entity_type, entity_id, actor, data, created_at
FROM audit_events
WHERE entity_type = $1 AND entity_id = $2::uuid
ORDER BY created_at ASC
"""
_FETCH_FULL_EXECUTION_TRAIL = """
SELECT id, event_type, entity_type, entity_id, actor, data, created_at
FROM audit_events
WHERE entity_id = $1::uuid
OR entity_id IN (
SELECT entity_id FROM audit_events
WHERE data->>'recommendation_id' = $2
)
ORDER BY created_at ASC
"""
async def get_order_audit_trail(
pool: asyncpg.Pool,
order_id: str,
recommendation_id: str | None = None,
) -> list[dict[str, Any]]:
"""Fetch the full audit trail for an order, including related recommendation and risk events.
Returns events ordered chronologically so the full decision chain
is visible: recommendation → risk → order → fill/reject.
"""
ref_id = recommendation_id or order_id
rows = await pool.fetch(_FETCH_AUDIT_TRAIL_FOR_ORDER, order_id, ref_id)
return [
{
"id": str(row["id"]),
"event_type": row["event_type"],
"entity_type": row["entity_type"],
"entity_id": str(row["entity_id"]),
"actor": row["actor"],
"data": row["data"] if isinstance(row["data"], dict) else json.loads(row["data"]),
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
}
for row in rows
]
async def get_entity_audit_trail(
pool: asyncpg.Pool,
entity_type: str,
entity_id: str,
) -> list[dict[str, Any]]:
"""Fetch all audit events for a specific entity."""
rows = await pool.fetch(_FETCH_AUDIT_TRAIL_BY_ENTITY, entity_type, entity_id)
return [
{
"id": str(row["id"]),
"event_type": row["event_type"],
"entity_type": row["entity_type"],
"entity_id": str(row["entity_id"]),
"actor": row["actor"],
"data": row["data"] if isinstance(row["data"], dict) else json.loads(row["data"]),
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
}
for row in rows
]