307 lines
9.5 KiB
Python
307 lines
9.5 KiB
Python
"""Data retention and lifecycle controls for raw and derived artifacts.
|
|
|
|
Provides configurable per-bucket retention policies, expired object cleanup
|
|
from MinIO, and expired metadata cleanup from PostgreSQL.
|
|
|
|
Requirements: N3 (preserve source metadata, access policy, and retention policy)
|
|
Design ref: Section 5.2 (MinIO bucket layout), Section 10 (Reliability and Safety)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import asyncpg
|
|
from minio import Minio
|
|
|
|
from services.shared.config import BUCKET_RETENTION_FIELDS, RetentionConfig
|
|
from services.shared.storage import ALL_BUCKETS
|
|
|
|
logger = logging.getLogger("retention")
|
|
|
|
|
|
@dataclass
|
|
class RetentionPolicy:
|
|
"""Resolved retention policy for a single bucket."""
|
|
bucket_name: str
|
|
retention_days: int
|
|
archive_before_delete: bool = False
|
|
|
|
|
|
@dataclass
|
|
class CleanupResult:
|
|
"""Result of a single bucket cleanup run."""
|
|
bucket_name: str
|
|
objects_scanned: int = 0
|
|
objects_deleted: int = 0
|
|
bytes_freed: int = 0
|
|
db_rows_deleted: int = 0
|
|
|
|
|
|
def default_retention_days(bucket: str, config: RetentionConfig) -> int:
|
|
"""Get the default retention days for a bucket from config."""
|
|
field_name = BUCKET_RETENTION_FIELDS.get(bucket)
|
|
if field_name:
|
|
return getattr(config, field_name, 365)
|
|
return 365
|
|
|
|
|
|
def resolve_policies(config: RetentionConfig) -> list[RetentionPolicy]:
|
|
"""Build retention policies for all known buckets from config defaults."""
|
|
return [
|
|
RetentionPolicy(
|
|
bucket_name=bucket,
|
|
retention_days=default_retention_days(bucket, config),
|
|
)
|
|
for bucket in ALL_BUCKETS
|
|
]
|
|
|
|
|
|
async def load_db_policies(pool: asyncpg.Pool) -> dict[str, RetentionPolicy]:
|
|
"""Load retention policy overrides from the database.
|
|
|
|
Returns a dict keyed by bucket_name. DB policies take precedence
|
|
over config defaults when active.
|
|
"""
|
|
rows = await pool.fetch(
|
|
"""SELECT bucket_name, retention_days, archive_before_delete
|
|
FROM retention_policies
|
|
WHERE active = TRUE AND artifact_class = 'default'"""
|
|
)
|
|
return {
|
|
row["bucket_name"]: RetentionPolicy(
|
|
bucket_name=row["bucket_name"],
|
|
retention_days=row["retention_days"],
|
|
archive_before_delete=row["archive_before_delete"],
|
|
)
|
|
for row in rows
|
|
}
|
|
|
|
|
|
def merge_policies(
|
|
config_policies: list[RetentionPolicy],
|
|
db_policies: dict[str, RetentionPolicy],
|
|
) -> list[RetentionPolicy]:
|
|
"""Merge config defaults with DB overrides. DB wins on conflict."""
|
|
merged: list[RetentionPolicy] = []
|
|
for policy in config_policies:
|
|
if policy.bucket_name in db_policies:
|
|
merged.append(db_policies[policy.bucket_name])
|
|
else:
|
|
merged.append(policy)
|
|
return merged
|
|
|
|
|
|
def cutoff_date(retention_days: int, now: datetime | None = None) -> datetime:
|
|
"""Calculate the cutoff datetime. Objects older than this are expired."""
|
|
ref = now or datetime.now(timezone.utc)
|
|
return ref - timedelta(days=retention_days)
|
|
|
|
|
|
def list_expired_objects(
|
|
client: Minio,
|
|
bucket: str,
|
|
retention_days: int,
|
|
batch_size: int = 1000,
|
|
now: datetime | None = None,
|
|
) -> list[str]:
|
|
"""List object names in a bucket that are older than the retention cutoff.
|
|
|
|
Uses the object's last_modified timestamp from MinIO metadata.
|
|
Returns at most batch_size object names.
|
|
"""
|
|
cutoff = cutoff_date(retention_days, now)
|
|
expired: list[str] = []
|
|
|
|
try:
|
|
objects = client.list_objects(bucket, recursive=True)
|
|
for obj in objects:
|
|
if obj.last_modified and obj.last_modified < cutoff:
|
|
if obj.object_name:
|
|
expired.append(obj.object_name)
|
|
if len(expired) >= batch_size:
|
|
break
|
|
except Exception:
|
|
logger.exception("Error listing objects in bucket %s", bucket)
|
|
|
|
return expired
|
|
|
|
|
|
def delete_expired_objects(
|
|
client: Minio,
|
|
bucket: str,
|
|
object_names: list[str],
|
|
) -> int:
|
|
"""Delete a list of objects from a MinIO bucket.
|
|
|
|
Returns the count of successfully deleted objects.
|
|
"""
|
|
deleted = 0
|
|
for name in object_names:
|
|
try:
|
|
client.remove_object(bucket, name)
|
|
deleted += 1
|
|
except Exception:
|
|
logger.warning("Failed to delete %s/%s", bucket, name, exc_info=True)
|
|
return deleted
|
|
|
|
|
|
def cleanup_bucket(
|
|
client: Minio,
|
|
policy: RetentionPolicy,
|
|
batch_size: int = 1000,
|
|
now: datetime | None = None,
|
|
) -> CleanupResult:
|
|
"""Run retention cleanup for a single bucket.
|
|
|
|
Lists expired objects and deletes them in batches.
|
|
Returns a CleanupResult with counts.
|
|
"""
|
|
result = CleanupResult(bucket_name=policy.bucket_name)
|
|
|
|
expired = list_expired_objects(
|
|
client, policy.bucket_name, policy.retention_days,
|
|
batch_size=batch_size, now=now,
|
|
)
|
|
result.objects_scanned = len(expired)
|
|
|
|
if expired:
|
|
result.objects_deleted = delete_expired_objects(client, policy.bucket_name, expired)
|
|
logger.info(
|
|
"Bucket %s: scanned=%d deleted=%d (retention=%dd)",
|
|
policy.bucket_name, result.objects_scanned,
|
|
result.objects_deleted, policy.retention_days,
|
|
)
|
|
else:
|
|
logger.debug("Bucket %s: no expired objects (retention=%dd)",
|
|
policy.bucket_name, policy.retention_days)
|
|
|
|
return result
|
|
|
|
|
|
# --- PostgreSQL metadata cleanup ---
|
|
|
|
# Tables with a created_at or retrieved_at column that should be cleaned up
|
|
# when the corresponding MinIO artifacts are expired.
|
|
DB_CLEANUP_QUERIES: list[tuple[str, str]] = [
|
|
(
|
|
"ingestion_runs",
|
|
"DELETE FROM ingestion_runs WHERE started_at < $1",
|
|
),
|
|
(
|
|
"market_snapshots",
|
|
"DELETE FROM market_snapshots WHERE captured_at < $1",
|
|
),
|
|
]
|
|
|
|
|
|
async def cleanup_expired_db_records(
|
|
pool: asyncpg.Pool,
|
|
retention_days: int,
|
|
now: datetime | None = None,
|
|
) -> int:
|
|
"""Delete expired operational metadata from PostgreSQL.
|
|
|
|
Uses the shortest raw retention period to clean up ingestion tracking
|
|
and market snapshot records that are past their useful life.
|
|
|
|
Returns total rows deleted.
|
|
"""
|
|
cutoff = cutoff_date(retention_days, now)
|
|
total_deleted = 0
|
|
|
|
async with pool.acquire() as conn:
|
|
for table_name, query in DB_CLEANUP_QUERIES:
|
|
try:
|
|
result = await conn.execute(query, cutoff)
|
|
# asyncpg returns "DELETE N"
|
|
count = int(result.split()[-1]) if result else 0
|
|
total_deleted += count
|
|
if count > 0:
|
|
logger.info("Cleaned %d expired rows from %s (cutoff=%s)",
|
|
count, table_name, cutoff.isoformat())
|
|
except Exception:
|
|
logger.exception("Error cleaning table %s", table_name)
|
|
|
|
return total_deleted
|
|
|
|
|
|
async def record_retention_run(
|
|
pool: asyncpg.Pool,
|
|
bucket_name: str,
|
|
result: CleanupResult,
|
|
status: str = "completed",
|
|
error_message: str | None = None,
|
|
) -> None:
|
|
"""Record a retention cleanup run in the retention_runs table."""
|
|
await pool.execute(
|
|
"""INSERT INTO retention_runs
|
|
(bucket_name, objects_scanned, objects_deleted, bytes_freed,
|
|
db_rows_deleted, completed_at, status, error_message)
|
|
VALUES ($1, $2, $3, $4, $5, NOW(), $6, $7)""",
|
|
bucket_name,
|
|
result.objects_scanned,
|
|
result.objects_deleted,
|
|
result.bytes_freed,
|
|
result.db_rows_deleted,
|
|
status,
|
|
error_message,
|
|
)
|
|
|
|
|
|
async def run_retention_cleanup(
|
|
minio_client: Minio,
|
|
pool: asyncpg.Pool,
|
|
config: RetentionConfig,
|
|
now: datetime | None = None,
|
|
) -> list[CleanupResult]:
|
|
"""Run the full retention cleanup cycle.
|
|
|
|
1. Resolve policies from config defaults + DB overrides
|
|
2. Clean up expired MinIO objects per bucket
|
|
3. Clean up expired PostgreSQL metadata
|
|
4. Record each run for observability
|
|
|
|
Returns a list of CleanupResult for each bucket processed.
|
|
"""
|
|
# Resolve policies
|
|
config_policies = resolve_policies(config)
|
|
try:
|
|
db_policies = await load_db_policies(pool)
|
|
except Exception:
|
|
logger.warning("Could not load DB retention policies, using config defaults")
|
|
db_policies = {}
|
|
|
|
policies = merge_policies(config_policies, db_policies)
|
|
results: list[CleanupResult] = []
|
|
|
|
# Clean up MinIO objects per bucket
|
|
for policy in policies:
|
|
try:
|
|
result = cleanup_bucket(
|
|
minio_client, policy,
|
|
batch_size=config.batch_size, now=now,
|
|
)
|
|
results.append(result)
|
|
await record_retention_run(pool, policy.bucket_name, result)
|
|
except Exception:
|
|
logger.exception("Retention cleanup failed for bucket %s", policy.bucket_name)
|
|
empty = CleanupResult(bucket_name=policy.bucket_name)
|
|
await record_retention_run(
|
|
pool, policy.bucket_name, empty,
|
|
status="failed", error_message="See logs",
|
|
)
|
|
results.append(empty)
|
|
|
|
# Clean up expired DB records using the shortest raw retention period
|
|
min_retention = min(p.retention_days for p in policies)
|
|
try:
|
|
db_deleted = await cleanup_expired_db_records(pool, min_retention, now=now)
|
|
if db_deleted > 0:
|
|
logger.info("Total DB rows cleaned: %d", db_deleted)
|
|
except Exception:
|
|
logger.exception("DB retention cleanup failed")
|
|
|
|
return results
|