diff --git a/services/scheduler/app.py b/services/scheduler/app.py index c1e8668..539d3f0 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -491,7 +491,7 @@ async def main() -> None: cleanup_counter += 1 if cleanup_counter >= CLEANUP_CYCLE_INTERVAL: cleanup_counter = 0 - await cleanup_old_signals(pool) + await cleanup_all_tables(pool) finally: await release_lock(rds, "scheduler_cycle") except Exception: @@ -571,5 +571,44 @@ async def cleanup_old_signals(pool: asyncpg.Pool) -> int: return count +async def cleanup_all_tables(pool: asyncpg.Pool) -> None: + """Run retention cleanup across all tables that grow unbounded. + + Called periodically by the scheduler (~every 25 minutes). + Each table has its own retention window. + """ + cleanups = [ + # (table, time_column, retention_days, description) + ("competitive_signal_records", "computed_at", 30, "competitive signals"), + ("ingestion_runs", "started_at", 14, "ingestion runs"), + ("trading_decisions", "created_at", 90, "trading decisions"), + ("risk_evaluations", "evaluated_at", 30, "risk evaluations"), + ("audit_events", "created_at", 90, "audit events"), + ("macro_impact_records", "computed_at", 60, "macro impact records"), + ("recommendation_evidence", "created_at", 60, "recommendation evidence"), + ("recommendations", "generated_at", 30, "old recommendations"), + ("order_events", "created_at", 90, "order events"), + ("model_performance_metrics", "recorded_at", 30, "model metrics"), + ] + + total = 0 + for table, col, days, desc in cleanups: + try: + result = await pool.execute( + f"DELETE FROM {table} WHERE {col} < NOW() - INTERVAL '1 day' * $1", # noqa: S608 + days, + ) + count = int(result.split()[-1]) if result else 0 + if count > 0: + total += count + logger.info("Cleaned %d %s (>%dd old)", count, desc, days) + except Exception: + # Table might not exist or column name might differ — skip + pass + + if total > 0: + logger.info("Total cleanup: %d rows deleted across all tables", total) + + if __name__ == "__main__": asyncio.run(main())