diff --git a/services/aggregation/worker.py b/services/aggregation/worker.py index 39cab62..102c657 100644 --- a/services/aggregation/worker.py +++ b/services/aggregation/worker.py @@ -841,7 +841,11 @@ async def persist_trend_evidence( supporting: list[RankedEvidence], opposing: list[RankedEvidence], ) -> int: - """Insert evidence mapping rows for a trend window. Returns count inserted.""" + """Insert evidence mapping rows for a trend window. Returns count inserted. + + Deletes any existing evidence for this trend window first to prevent + duplicate accumulation across aggregation cycles. + """ rows: list[tuple[str, str, str, float, float, float, float, float, float]] = [] for ev in supporting: # Skip non-UUID document IDs (e.g. pattern signal synthetic IDs) @@ -861,6 +865,12 @@ async def persist_trend_evidence( ev.recency_component, ev.confidence_component, ev.sentiment_value, )) + # Clear stale evidence before inserting fresh rows + await pool.execute( + "DELETE FROM trend_evidence WHERE trend_window_id = $1", + trend_window_id, + ) + if not rows: return 0