fix: trend_windows now upserts instead of accumulating (7.5GB→4MB), add competitive signal retention cleanup
This commit is contained in:
@@ -0,0 +1,23 @@
|
|||||||
|
-- Fix trend_windows to upsert instead of accumulating rows.
|
||||||
|
-- Add unique constraint so ON CONFLICT works, then deduplicate existing data.
|
||||||
|
|
||||||
|
-- Step 1: Keep only the most recent row per (entity_type, entity_id, window)
|
||||||
|
DELETE FROM trend_windows
|
||||||
|
WHERE id NOT IN (
|
||||||
|
SELECT DISTINCT ON (entity_type, entity_id, "window") id
|
||||||
|
FROM trend_windows
|
||||||
|
ORDER BY entity_type, entity_id, "window", generated_at DESC
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Step 2: Add unique constraint for upsert
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_trend_windows_entity_window
|
||||||
|
ON trend_windows (entity_type, entity_id, "window");
|
||||||
|
|
||||||
|
-- Step 3: Clean up old competitive signal records (keep last 30 days)
|
||||||
|
DELETE FROM competitive_signal_records
|
||||||
|
WHERE computed_at < NOW() - INTERVAL '30 days';
|
||||||
|
|
||||||
|
-- Step 4: Add a partial index to speed up the NOT EXISTS check in the
|
||||||
|
-- aggregation propagation query
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_competitive_signals_source_doc_ticker
|
||||||
|
ON competitive_signal_records (source_document_id, source_ticker);
|
||||||
@@ -716,6 +716,18 @@ INSERT INTO trend_windows (
|
|||||||
$9::jsonb, $10::jsonb, $11,
|
$9::jsonb, $10::jsonb, $11,
|
||||||
$12::jsonb, $13::jsonb, $14
|
$12::jsonb, $13::jsonb, $14
|
||||||
)
|
)
|
||||||
|
ON CONFLICT (entity_type, entity_id, "window") DO UPDATE SET
|
||||||
|
trend_direction = EXCLUDED.trend_direction,
|
||||||
|
trend_strength = EXCLUDED.trend_strength,
|
||||||
|
confidence = EXCLUDED.confidence,
|
||||||
|
top_supporting_evidence = EXCLUDED.top_supporting_evidence,
|
||||||
|
top_opposing_evidence = EXCLUDED.top_opposing_evidence,
|
||||||
|
dominant_catalysts = EXCLUDED.dominant_catalysts,
|
||||||
|
material_risks = EXCLUDED.material_risks,
|
||||||
|
contradiction_score = EXCLUDED.contradiction_score,
|
||||||
|
disagreement_details = EXCLUDED.disagreement_details,
|
||||||
|
market_context = EXCLUDED.market_context,
|
||||||
|
generated_at = EXCLUDED.generated_at
|
||||||
RETURNING id
|
RETURNING id
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|||||||
@@ -475,6 +475,7 @@ async def main() -> None:
|
|||||||
|
|
||||||
logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK)
|
logger.info("Scheduler started (tick=%ds)", SCHEDULER_TICK)
|
||||||
recovery_counter = 0
|
recovery_counter = 0
|
||||||
|
cleanup_counter = 0
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -486,6 +487,11 @@ async def main() -> None:
|
|||||||
if recovery_counter >= 20:
|
if recovery_counter >= 20:
|
||||||
recovery_counter = 0
|
recovery_counter = 0
|
||||||
await recover_stale_documents(pool, rds)
|
await recover_stale_documents(pool, rds)
|
||||||
|
# Run signal cleanup periodically (~25 minutes)
|
||||||
|
cleanup_counter += 1
|
||||||
|
if cleanup_counter >= CLEANUP_CYCLE_INTERVAL:
|
||||||
|
cleanup_counter = 0
|
||||||
|
await cleanup_old_signals(pool)
|
||||||
finally:
|
finally:
|
||||||
await release_lock(rds, "scheduler_cycle")
|
await release_lock(rds, "scheduler_cycle")
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -542,5 +548,28 @@ async def recover_stale_documents(pool: asyncpg.Pool, rds: aioredis.Redis) -> in
|
|||||||
return enqueued
|
return enqueued
|
||||||
|
|
||||||
|
|
||||||
|
# How often to run competitive signal cleanup (every ~100 cycles = ~25 minutes)
|
||||||
|
CLEANUP_CYCLE_INTERVAL: int = 100
|
||||||
|
# Keep competitive signals for this many days
|
||||||
|
COMPETITIVE_SIGNAL_RETENTION_DAYS: int = 30
|
||||||
|
|
||||||
|
|
||||||
|
async def cleanup_old_signals(pool: asyncpg.Pool) -> int:
|
||||||
|
"""Delete competitive signal records older than the retention window.
|
||||||
|
|
||||||
|
Prevents the competitive_signal_records table from growing unbounded.
|
||||||
|
Returns the number of rows deleted.
|
||||||
|
"""
|
||||||
|
result = await pool.execute(
|
||||||
|
"DELETE FROM competitive_signal_records WHERE computed_at < NOW() - INTERVAL '1 day' * $1",
|
||||||
|
COMPETITIVE_SIGNAL_RETENTION_DAYS,
|
||||||
|
)
|
||||||
|
# result is like "DELETE 1234"
|
||||||
|
count = int(result.split()[-1]) if result else 0
|
||||||
|
if count > 0:
|
||||||
|
logger.info("Cleaned up %d old competitive signal records", count)
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|||||||
Reference in New Issue
Block a user