From 6bab19915923d97b6ddc361b3fe31422589ccd0e Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Thu, 16 Apr 2026 14:32:24 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20trend=5Fwindows=20now=20upserts=20instea?= =?UTF-8?q?d=20of=20accumulating=20(7.5GB=E2=86=924MB),=20add=20competitiv?= =?UTF-8?q?e=20signal=20retention=20cleanup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../023_trend_windows_upsert_and_cleanup.sql | 23 +++++++++++++++ services/aggregation/worker.py | 12 ++++++++ services/scheduler/app.py | 29 +++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 infra/migrations/023_trend_windows_upsert_and_cleanup.sql diff --git a/infra/migrations/023_trend_windows_upsert_and_cleanup.sql b/infra/migrations/023_trend_windows_upsert_and_cleanup.sql new file mode 100644 index 0000000..8f3d6e2 --- /dev/null +++ b/infra/migrations/023_trend_windows_upsert_and_cleanup.sql @@ -0,0 +1,23 @@ +-- Fix trend_windows to upsert instead of accumulating rows. +-- Add unique constraint so ON CONFLICT works, then deduplicate existing data. + +-- Step 1: Keep only the most recent row per (entity_type, entity_id, window) +DELETE FROM trend_windows +WHERE id NOT IN ( + SELECT DISTINCT ON (entity_type, entity_id, "window") id + FROM trend_windows + ORDER BY entity_type, entity_id, "window", generated_at DESC +); + +-- Step 2: Add unique constraint for upsert +CREATE UNIQUE INDEX IF NOT EXISTS idx_trend_windows_entity_window + ON trend_windows (entity_type, entity_id, "window"); + +-- Step 3: Clean up old competitive signal records (keep last 30 days) +DELETE FROM competitive_signal_records +WHERE computed_at < NOW() - INTERVAL '30 days'; + +-- Step 4: Add a partial index to speed up the NOT EXISTS check in the +-- aggregation propagation query +CREATE INDEX IF NOT EXISTS idx_competitive_signals_source_doc_ticker + ON competitive_signal_records (source_document_id, source_ticker); diff --git a/services/aggregation/worker.py b/services/aggregation/worker.py index 7f9aaa9..a648cd5 100644 --- a/services/aggregation/worker.py +++ b/services/aggregation/worker.py @@ -716,6 +716,18 @@ INSERT INTO trend_windows ( $9::jsonb, $10::jsonb, $11, $12::jsonb, $13::jsonb, $14 ) +ON CONFLICT (entity_type, entity_id, "window") DO UPDATE SET + trend_direction = EXCLUDED.trend_direction, + trend_strength = EXCLUDED.trend_strength, + confidence = EXCLUDED.confidence, + top_supporting_evidence = EXCLUDED.top_supporting_evidence, + top_opposing_evidence = EXCLUDED.top_opposing_evidence, + dominant_catalysts = EXCLUDED.dominant_catalysts, + material_risks = EXCLUDED.material_risks, + contradiction_score = EXCLUDED.contradiction_score, + disagreement_details = EXCLUDED.disagreement_details, + market_context = EXCLUDED.market_context, + generated_at = EXCLUDED.generated_at RETURNING id """ diff --git a/services/scheduler/app.py b/services/scheduler/app.py index 792f92e..c1e8668 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -475,6 +475,7 @@ async def main() -> None: logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK) recovery_counter = 0 + cleanup_counter = 0 try: while True: try: @@ -486,6 +487,11 @@ async def main() -> None: if recovery_counter >= 20: recovery_counter = 0 await recover_stale_documents(pool, rds) + # Run signal cleanup periodically (~25 minutes) + cleanup_counter += 1 + if cleanup_counter >= CLEANUP_CYCLE_INTERVAL: + cleanup_counter = 0 + await cleanup_old_signals(pool) finally: await release_lock(rds, "scheduler_cycle") except Exception: @@ -542,5 +548,28 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in return enqueued +# How often to run competitive signal cleanup (every ~100 cycles = ~25 minutes) +CLEANUP_CYCLE_INTERVAL: int = 100 +# Keep competitive signals for this many days +COMPETITIVE_SIGNAL_RETENTION_DAYS: int = 30 + + +async def cleanup_old_signals(pool: asyncpg.Pool) -> int: + """Delete competitive signal records older than the retention window. + + Prevents the competitive_signal_records table from growing unbounded. + Returns the number of rows deleted. + """ + result = await pool.execute( + "DELETE FROM competitive_signal_records WHERE computed_at < NOW() - INTERVAL '1 day' * $1", + COMPETITIVE_SIGNAL_RETENTION_DAYS, + ) + # result is like "DELETE 1234" + count = int(result.split()[-1]) if result else 0 + if count > 0: + logger.info("Cleaned up %d old competitive signal records", count) + return count + + if __name__ == "__main__": asyncio.run(main())