"""Data retention and lifecycle controls for raw and derived artifacts. Provides configurable per-bucket retention policies, expired object cleanup from MinIO, and expired metadata cleanup from PostgreSQL. Requirements: N3 (preserve source metadata, access policy, and retention policy) Design ref: Section 5.2 (MinIO bucket layout), Section 10 (Reliability and Safety) """ from __future__ import annotations import logging from dataclasses import dataclass from datetime import datetime, timedelta, timezone import asyncpg from minio import Minio from services.shared.config import BUCKET_RETENTION_FIELDS, RetentionConfig from services.shared.storage import ALL_BUCKETS logger = logging.getLogger("retention") @dataclass class RetentionPolicy: """Resolved retention policy for a single bucket.""" bucket_name: str retention_days: int archive_before_delete: bool = False @dataclass class CleanupResult: """Result of a single bucket cleanup run.""" bucket_name: str objects_scanned: int = 0 objects_deleted: int = 0 bytes_freed: int = 0 db_rows_deleted: int = 0 def default_retention_days(bucket: str, config: RetentionConfig) -> int: """Get the default retention days for a bucket from config.""" field_name = BUCKET_RETENTION_FIELDS.get(bucket) if field_name: return getattr(config, field_name, 365) return 365 def resolve_policies(config: RetentionConfig) -> list[RetentionPolicy]: """Build retention policies for all known buckets from config defaults.""" return [ RetentionPolicy( bucket_name=bucket, retention_days=default_retention_days(bucket, config), ) for bucket in ALL_BUCKETS ] async def load_db_policies(pool: asyncpg.Pool) -> dict[str, RetentionPolicy]: """Load retention policy overrides from the database. Returns a dict keyed by bucket_name. DB policies take precedence over config defaults when active. """ rows = await pool.fetch( """SELECT bucket_name, retention_days, archive_before_delete FROM retention_policies WHERE active = TRUE AND artifact_class = 'default'""" ) return { row["bucket_name"]: RetentionPolicy( bucket_name=row["bucket_name"], retention_days=row["retention_days"], archive_before_delete=row["archive_before_delete"], ) for row in rows } def merge_policies( config_policies: list[RetentionPolicy], db_policies: dict[str, RetentionPolicy], ) -> list[RetentionPolicy]: """Merge config defaults with DB overrides. DB wins on conflict.""" merged: list[RetentionPolicy] = [] for policy in config_policies: if policy.bucket_name in db_policies: merged.append(db_policies[policy.bucket_name]) else: merged.append(policy) return merged def cutoff_date(retention_days: int, now: datetime | None = None) -> datetime: """Calculate the cutoff datetime. Objects older than this are expired.""" ref = now or datetime.now(timezone.utc) return ref - timedelta(days=retention_days) def list_expired_objects( client: Minio, bucket: str, retention_days: int, batch_size: int = 1000, now: datetime | None = None, ) -> list[str]: """List object names in a bucket that are older than the retention cutoff. Uses the object's last_modified timestamp from MinIO metadata. Returns at most batch_size object names. """ cutoff = cutoff_date(retention_days, now) expired: list[str] = [] try: objects = client.list_objects(bucket, recursive=True) for obj in objects: if obj.last_modified and obj.last_modified < cutoff: if obj.object_name: expired.append(obj.object_name) if len(expired) >= batch_size: break except Exception: logger.exception("Error listing objects in bucket %s", bucket) return expired def delete_expired_objects( client: Minio, bucket: str, object_names: list[str], ) -> int: """Delete a list of objects from a MinIO bucket. Returns the count of successfully deleted objects. """ deleted = 0 for name in object_names: try: client.remove_object(bucket, name) deleted += 1 except Exception: logger.warning("Failed to delete %s/%s", bucket, name, exc_info=True) return deleted def cleanup_bucket( client: Minio, policy: RetentionPolicy, batch_size: int = 1000, now: datetime | None = None, ) -> CleanupResult: """Run retention cleanup for a single bucket. Lists expired objects and deletes them in batches. Returns a CleanupResult with counts. """ result = CleanupResult(bucket_name=policy.bucket_name) expired = list_expired_objects( client, policy.bucket_name, policy.retention_days, batch_size=batch_size, now=now, ) result.objects_scanned = len(expired) if expired: result.objects_deleted = delete_expired_objects(client, policy.bucket_name, expired) logger.info( "Bucket %s: scanned=%d deleted=%d (retention=%dd)", policy.bucket_name, result.objects_scanned, result.objects_deleted, policy.retention_days, ) else: logger.debug("Bucket %s: no expired objects (retention=%dd)", policy.bucket_name, policy.retention_days) return result # --- PostgreSQL metadata cleanup --- # Tables with a created_at or retrieved_at column that should be cleaned up # when the corresponding MinIO artifacts are expired. DB_CLEANUP_QUERIES: list[tuple[str, str]] = [ ( "ingestion_runs", "DELETE FROM ingestion_runs WHERE started_at < $1", ), ( "market_snapshots", "DELETE FROM market_snapshots WHERE captured_at < $1", ), ] async def cleanup_expired_db_records( pool: asyncpg.Pool, retention_days: int, now: datetime | None = None, ) -> int: """Delete expired operational metadata from PostgreSQL. Uses the shortest raw retention period to clean up ingestion tracking and market snapshot records that are past their useful life. Returns total rows deleted. """ cutoff = cutoff_date(retention_days, now) total_deleted = 0 async with pool.acquire() as conn: for table_name, query in DB_CLEANUP_QUERIES: try: result = await conn.execute(query, cutoff) # asyncpg returns "DELETE N" count = int(result.split()[-1]) if result else 0 total_deleted += count if count > 0: logger.info("Cleaned %d expired rows from %s (cutoff=%s)", count, table_name, cutoff.isoformat()) except Exception: logger.exception("Error cleaning table %s", table_name) return total_deleted async def record_retention_run( pool: asyncpg.Pool, bucket_name: str, result: CleanupResult, status: str = "completed", error_message: str | None = None, ) -> None: """Record a retention cleanup run in the retention_runs table.""" await pool.execute( """INSERT INTO retention_runs (bucket_name, objects_scanned, objects_deleted, bytes_freed, db_rows_deleted, completed_at, status, error_message) VALUES ($1, $2, $3, $4, $5, NOW(), $6, $7)""", bucket_name, result.objects_scanned, result.objects_deleted, result.bytes_freed, result.db_rows_deleted, status, error_message, ) async def run_retention_cleanup( minio_client: Minio, pool: asyncpg.Pool, config: RetentionConfig, now: datetime | None = None, ) -> list[CleanupResult]: """Run the full retention cleanup cycle. 1. Resolve policies from config defaults + DB overrides 2. Clean up expired MinIO objects per bucket 3. Clean up expired PostgreSQL metadata 4. Record each run for observability Returns a list of CleanupResult for each bucket processed. """ # Resolve policies config_policies = resolve_policies(config) try: db_policies = await load_db_policies(pool) except Exception: logger.warning("Could not load DB retention policies, using config defaults") db_policies = {} policies = merge_policies(config_policies, db_policies) results: list[CleanupResult] = [] # Clean up MinIO objects per bucket for policy in policies: try: result = cleanup_bucket( minio_client, policy, batch_size=config.batch_size, now=now, ) results.append(result) await record_retention_run(pool, policy.bucket_name, result) except Exception: logger.exception("Retention cleanup failed for bucket %s", policy.bucket_name) empty = CleanupResult(bucket_name=policy.bucket_name) await record_retention_run( pool, policy.bucket_name, empty, status="failed", error_message="See logs", ) results.append(empty) # Clean up expired DB records using the shortest raw retention period min_retention = min(p.retention_days for p in policies) try: db_deleted = await cleanup_expired_db_records(pool, min_retention, now=now) if db_deleted > 0: logger.info("Total DB rows cleaned: %d", db_deleted) except Exception: logger.exception("DB retention cleanup failed") return results