Files

307 lines
9.5 KiB
Python

"""Data retention and lifecycle controls for raw and derived artifacts.
Provides configurable per-bucket retention policies, expired object cleanup
from MinIO, and expired metadata cleanup from PostgreSQL.
Requirements: N3 (preserve source metadata, access policy, and retention policy)
Design ref: Section 5.2 (MinIO bucket layout), Section 10 (Reliability and Safety)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
import asyncpg
from minio import Minio
from services.shared.config import BUCKET_RETENTION_FIELDS, RetentionConfig
from services.shared.storage import ALL_BUCKETS
logger = logging.getLogger("retention")
@dataclass
class RetentionPolicy:
"""Resolved retention policy for a single bucket."""
bucket_name: str
retention_days: int
archive_before_delete: bool = False
@dataclass
class CleanupResult:
"""Result of a single bucket cleanup run."""
bucket_name: str
objects_scanned: int = 0
objects_deleted: int = 0
bytes_freed: int = 0
db_rows_deleted: int = 0
def default_retention_days(bucket: str, config: RetentionConfig) -> int:
"""Get the default retention days for a bucket from config."""
field_name = BUCKET_RETENTION_FIELDS.get(bucket)
if field_name:
return getattr(config, field_name, 365)
return 365
def resolve_policies(config: RetentionConfig) -> list[RetentionPolicy]:
"""Build retention policies for all known buckets from config defaults."""
return [
RetentionPolicy(
bucket_name=bucket,
retention_days=default_retention_days(bucket, config),
)
for bucket in ALL_BUCKETS
]
async def load_db_policies(pool: asyncpg.Pool) -> dict[str, RetentionPolicy]:
"""Load retention policy overrides from the database.
Returns a dict keyed by bucket_name. DB policies take precedence
over config defaults when active.
"""
rows = await pool.fetch(
"""SELECT bucket_name, retention_days, archive_before_delete
FROM retention_policies
WHERE active = TRUE AND artifact_class = 'default'"""
)
return {
row["bucket_name"]: RetentionPolicy(
bucket_name=row["bucket_name"],
retention_days=row["retention_days"],
archive_before_delete=row["archive_before_delete"],
)
for row in rows
}
def merge_policies(
config_policies: list[RetentionPolicy],
db_policies: dict[str, RetentionPolicy],
) -> list[RetentionPolicy]:
"""Merge config defaults with DB overrides. DB wins on conflict."""
merged: list[RetentionPolicy] = []
for policy in config_policies:
if policy.bucket_name in db_policies:
merged.append(db_policies[policy.bucket_name])
else:
merged.append(policy)
return merged
def cutoff_date(retention_days: int, now: datetime | None = None) -> datetime:
"""Calculate the cutoff datetime. Objects older than this are expired."""
ref = now or datetime.now(timezone.utc)
return ref - timedelta(days=retention_days)
def list_expired_objects(
client: Minio,
bucket: str,
retention_days: int,
batch_size: int = 1000,
now: datetime | None = None,
) -> list[str]:
"""List object names in a bucket that are older than the retention cutoff.
Uses the object's last_modified timestamp from MinIO metadata.
Returns at most batch_size object names.
"""
cutoff = cutoff_date(retention_days, now)
expired: list[str] = []
try:
objects = client.list_objects(bucket, recursive=True)
for obj in objects:
if obj.last_modified and obj.last_modified < cutoff:
if obj.object_name:
expired.append(obj.object_name)
if len(expired) >= batch_size:
break
except Exception:
logger.exception("Error listing objects in bucket %s", bucket)
return expired
def delete_expired_objects(
client: Minio,
bucket: str,
object_names: list[str],
) -> int:
"""Delete a list of objects from a MinIO bucket.
Returns the count of successfully deleted objects.
"""
deleted = 0
for name in object_names:
try:
client.remove_object(bucket, name)
deleted += 1
except Exception:
logger.warning("Failed to delete %s/%s", bucket, name, exc_info=True)
return deleted
def cleanup_bucket(
client: Minio,
policy: RetentionPolicy,
batch_size: int = 1000,
now: datetime | None = None,
) -> CleanupResult:
"""Run retention cleanup for a single bucket.
Lists expired objects and deletes them in batches.
Returns a CleanupResult with counts.
"""
result = CleanupResult(bucket_name=policy.bucket_name)
expired = list_expired_objects(
client, policy.bucket_name, policy.retention_days,
batch_size=batch_size, now=now,
)
result.objects_scanned = len(expired)
if expired:
result.objects_deleted = delete_expired_objects(client, policy.bucket_name, expired)
logger.info(
"Bucket %s: scanned=%d deleted=%d (retention=%dd)",
policy.bucket_name, result.objects_scanned,
result.objects_deleted, policy.retention_days,
)
else:
logger.debug("Bucket %s: no expired objects (retention=%dd)",
policy.bucket_name, policy.retention_days)
return result
# --- PostgreSQL metadata cleanup ---
# Tables with a created_at or retrieved_at column that should be cleaned up
# when the corresponding MinIO artifacts are expired.
DB_CLEANUP_QUERIES: list[tuple[str, str]] = [
(
"ingestion_runs",
"DELETE FROM ingestion_runs WHERE started_at < $1",
),
(
"market_snapshots",
"DELETE FROM market_snapshots WHERE captured_at < $1",
),
]
async def cleanup_expired_db_records(
pool: asyncpg.Pool,
retention_days: int,
now: datetime | None = None,
) -> int:
"""Delete expired operational metadata from PostgreSQL.
Uses the shortest raw retention period to clean up ingestion tracking
and market snapshot records that are past their useful life.
Returns total rows deleted.
"""
cutoff = cutoff_date(retention_days, now)
total_deleted = 0
async with pool.acquire() as conn:
for table_name, query in DB_CLEANUP_QUERIES:
try:
result = await conn.execute(query, cutoff)
# asyncpg returns "DELETE N"
count = int(result.split()[-1]) if result else 0
total_deleted += count
if count > 0:
logger.info("Cleaned %d expired rows from %s (cutoff=%s)",
count, table_name, cutoff.isoformat())
except Exception:
logger.exception("Error cleaning table %s", table_name)
return total_deleted
async def record_retention_run(
pool: asyncpg.Pool,
bucket_name: str,
result: CleanupResult,
status: str = "completed",
error_message: str | None = None,
) -> None:
"""Record a retention cleanup run in the retention_runs table."""
await pool.execute(
"""INSERT INTO retention_runs
(bucket_name, objects_scanned, objects_deleted, bytes_freed,
db_rows_deleted, completed_at, status, error_message)
VALUES ($1, $2, $3, $4, $5, NOW(), $6, $7)""",
bucket_name,
result.objects_scanned,
result.objects_deleted,
result.bytes_freed,
result.db_rows_deleted,
status,
error_message,
)
async def run_retention_cleanup(
minio_client: Minio,
pool: asyncpg.Pool,
config: RetentionConfig,
now: datetime | None = None,
) -> list[CleanupResult]:
"""Run the full retention cleanup cycle.
1. Resolve policies from config defaults + DB overrides
2. Clean up expired MinIO objects per bucket
3. Clean up expired PostgreSQL metadata
4. Record each run for observability
Returns a list of CleanupResult for each bucket processed.
"""
# Resolve policies
config_policies = resolve_policies(config)
try:
db_policies = await load_db_policies(pool)
except Exception:
logger.warning("Could not load DB retention policies, using config defaults")
db_policies = {}
policies = merge_policies(config_policies, db_policies)
results: list[CleanupResult] = []
# Clean up MinIO objects per bucket
for policy in policies:
try:
result = cleanup_bucket(
minio_client, policy,
batch_size=config.batch_size, now=now,
)
results.append(result)
await record_retention_run(pool, policy.bucket_name, result)
except Exception:
logger.exception("Retention cleanup failed for bucket %s", policy.bucket_name)
empty = CleanupResult(bucket_name=policy.bucket_name)
await record_retention_run(
pool, policy.bucket_name, empty,
status="failed", error_message="See logs",
)
results.append(empty)
# Clean up expired DB records using the shortest raw retention period
min_retention = min(p.retention_days for p in policies)
try:
db_deleted = await cleanup_expired_db_records(pool, min_retention, now=now)
if db_deleted > 0:
logger.info("Total DB rows cleaned: %d", db_deleted)
except Exception:
logger.exception("DB retention cleanup failed")
return results