diff --git a/services/aggregation/main.py b/services/aggregation/main.py index f1e564c..0fd2e7f 100644 --- a/services/aggregation/main.py +++ b/services/aggregation/main.py @@ -177,9 +177,15 @@ async def main() -> None: ticker, ) - # Enqueue recommendation job for each window that produced a trend + # Enqueue recommendation job for each window that produced a trend. + # Use Redis dedup to avoid flooding the recommendation queue + # with duplicate ticker+window jobs that haven't been processed yet. for summary in summaries: if summary.trend_strength > 0: + dedup_key = f"stonks:rec_dedup:{ticker}:{summary.window.value}" + already = await redis_client.get(dedup_key) + if already: + continue await redis_client.rpush( rec_queue, json.dumps(inject_trace_context({ @@ -187,6 +193,9 @@ async def main() -> None: "window": summary.window.value, })), ) + # Mark as enqueued for 5 minutes — enough time for + # the recommendation worker to process it. + await redis_client.set(dedup_key, "1", ex=300) except Exception: logger.exception("Aggregation failed for %s", ticker) finally: