From 82da8af02b030bee5924c498a0ee0bc893b6a956 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Fri, 17 Apr 2026 22:11:59 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20dedup=20recommendation=20queue=20at=20ag?= =?UTF-8?q?gregation=20level=20=E2=80=94=20prevent=20duplicate=20ticker+wi?= =?UTF-8?q?ndow=20flooding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/aggregation/main.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/services/aggregation/main.py b/services/aggregation/main.py index f1e564c..0fd2e7f 100644 --- a/services/aggregation/main.py +++ b/services/aggregation/main.py @@ -177,9 +177,15 @@ async def main() -> None: ticker, ) - # Enqueue recommendation job for each window that produced a trend + # Enqueue recommendation job for each window that produced a trend. + # Use Redis dedup to avoid flooding the recommendation queue + # with duplicate ticker+window jobs that haven't been processed yet. for summary in summaries: if summary.trend_strength > 0: + dedup_key = f"stonks:rec_dedup:{ticker}:{summary.window.value}" + already = await redis_client.get(dedup_key) + if already: + continue await redis_client.rpush( rec_queue, json.dumps(inject_trace_context({ @@ -187,6 +193,9 @@ async def main() -> None: "window": summary.window.value, })), ) + # Mark as enqueued for 5 minutes — enough time for + # the recommendation worker to process it. + await redis_client.set(dedup_key, "1", ex=300) except Exception: logger.exception("Aggregation failed for %s", ticker) finally: