173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
"""Tests for data retention and lifecycle controls.
|
|
|
|
Validates retention policy resolution, expired object detection,
|
|
cleanup logic, and DB record cleanup.
|
|
|
|
Requirements: N3
|
|
"""
|
|
from datetime import datetime, timedelta, timezone
|
|
from unittest.mock import MagicMock
|
|
|
|
from services.shared.config import RetentionConfig
|
|
from services.shared.retention import (
|
|
RetentionPolicy,
|
|
cleanup_bucket,
|
|
cutoff_date,
|
|
default_retention_days,
|
|
delete_expired_objects,
|
|
list_expired_objects,
|
|
merge_policies,
|
|
resolve_policies,
|
|
)
|
|
|
|
|
|
class TestDefaultRetentionDays:
|
|
def test_known_buckets(self):
|
|
config = RetentionConfig()
|
|
assert default_retention_days("stonks-raw-market", config) == 90
|
|
assert default_retention_days("stonks-raw-news", config) == 180
|
|
assert default_retention_days("stonks-raw-filings", config) == 365
|
|
assert default_retention_days("stonks-lakehouse", config) == 730
|
|
assert default_retention_days("stonks-audit", config) == 730
|
|
|
|
def test_unknown_bucket_defaults_to_365(self):
|
|
config = RetentionConfig()
|
|
assert default_retention_days("unknown-bucket", config) == 365
|
|
|
|
def test_custom_config_values(self):
|
|
config = RetentionConfig(raw_market_days=30, audit_days=1000)
|
|
assert default_retention_days("stonks-raw-market", config) == 30
|
|
assert default_retention_days("stonks-audit", config) == 1000
|
|
|
|
|
|
class TestResolvePolicies:
|
|
def test_returns_policy_per_bucket(self):
|
|
config = RetentionConfig()
|
|
policies = resolve_policies(config)
|
|
bucket_names = [p.bucket_name for p in policies]
|
|
assert "stonks-raw-market" in bucket_names
|
|
assert "stonks-lakehouse" in bucket_names
|
|
assert len(policies) == 8
|
|
|
|
def test_uses_config_values(self):
|
|
config = RetentionConfig(raw_news_days=60)
|
|
policies = resolve_policies(config)
|
|
news_policy = next(p for p in policies if p.bucket_name == "stonks-raw-news")
|
|
assert news_policy.retention_days == 60
|
|
|
|
|
|
class TestMergePolicies:
|
|
def test_db_overrides_config(self):
|
|
config_policies = [
|
|
RetentionPolicy("stonks-raw-market", 90),
|
|
RetentionPolicy("stonks-raw-news", 180),
|
|
]
|
|
db_policies = {
|
|
"stonks-raw-market": RetentionPolicy("stonks-raw-market", 30),
|
|
}
|
|
merged = merge_policies(config_policies, db_policies)
|
|
market = next(p for p in merged if p.bucket_name == "stonks-raw-market")
|
|
news = next(p for p in merged if p.bucket_name == "stonks-raw-news")
|
|
assert market.retention_days == 30 # DB override
|
|
assert news.retention_days == 180 # config default
|
|
|
|
def test_empty_db_uses_all_config(self):
|
|
config_policies = [RetentionPolicy("stonks-audit", 730)]
|
|
merged = merge_policies(config_policies, {})
|
|
assert len(merged) == 1
|
|
assert merged[0].retention_days == 730
|
|
|
|
|
|
class TestCutoffDate:
|
|
def test_calculates_cutoff(self):
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
cutoff = cutoff_date(90, now)
|
|
expected = now - timedelta(days=90)
|
|
assert cutoff == expected
|
|
|
|
def test_uses_current_time_when_none(self):
|
|
cutoff = cutoff_date(30)
|
|
assert cutoff < datetime.now(timezone.utc)
|
|
|
|
|
|
class TestListExpiredObjects:
|
|
def test_finds_expired_objects(self):
|
|
client = MagicMock()
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
old_obj = MagicMock()
|
|
old_obj.object_name = "old/file.json"
|
|
old_obj.last_modified = now - timedelta(days=100)
|
|
|
|
new_obj = MagicMock()
|
|
new_obj.object_name = "new/file.json"
|
|
new_obj.last_modified = now - timedelta(days=10)
|
|
|
|
client.list_objects.return_value = [old_obj, new_obj]
|
|
|
|
expired = list_expired_objects(client, "stonks-raw-market", 90, now=now)
|
|
assert expired == ["old/file.json"]
|
|
|
|
def test_respects_batch_size(self):
|
|
client = MagicMock()
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
objects = []
|
|
for i in range(10):
|
|
obj = MagicMock()
|
|
obj.object_name = f"file_{i}.json"
|
|
obj.last_modified = now - timedelta(days=200)
|
|
objects.append(obj)
|
|
|
|
client.list_objects.return_value = objects
|
|
expired = list_expired_objects(client, "test-bucket", 90, batch_size=3, now=now)
|
|
assert len(expired) == 3
|
|
|
|
def test_handles_list_error(self):
|
|
client = MagicMock()
|
|
client.list_objects.side_effect = Exception("connection error")
|
|
expired = list_expired_objects(client, "test-bucket", 90)
|
|
assert expired == []
|
|
|
|
|
|
class TestDeleteExpiredObjects:
|
|
def test_deletes_all(self):
|
|
client = MagicMock()
|
|
count = delete_expired_objects(client, "test-bucket", ["a.json", "b.json"])
|
|
assert count == 2
|
|
assert client.remove_object.call_count == 2
|
|
|
|
def test_handles_partial_failure(self):
|
|
client = MagicMock()
|
|
client.remove_object.side_effect = [None, Exception("fail"), None]
|
|
count = delete_expired_objects(client, "test-bucket", ["a", "b", "c"])
|
|
assert count == 2
|
|
|
|
|
|
class TestCleanupBucket:
|
|
def test_full_cleanup_flow(self):
|
|
client = MagicMock()
|
|
now = datetime(2026, 4, 11, 12, 0, 0, tzinfo=timezone.utc)
|
|
|
|
old_obj = MagicMock()
|
|
old_obj.object_name = "expired.json"
|
|
old_obj.last_modified = now - timedelta(days=200)
|
|
client.list_objects.return_value = [old_obj]
|
|
|
|
policy = RetentionPolicy("stonks-raw-market", 90)
|
|
result = cleanup_bucket(client, policy, now=now)
|
|
|
|
assert result.bucket_name == "stonks-raw-market"
|
|
assert result.objects_scanned == 1
|
|
assert result.objects_deleted == 1
|
|
|
|
def test_no_expired_objects(self):
|
|
client = MagicMock()
|
|
client.list_objects.return_value = []
|
|
|
|
policy = RetentionPolicy("stonks-raw-news", 180)
|
|
result = cleanup_bucket(client, policy)
|
|
|
|
assert result.objects_scanned == 0
|
|
assert result.objects_deleted == 0
|