fix: dedup recommendation queue at aggregation level — prevent duplicate ticker+window flooding

This commit is contained in:
Celes Renata
2026-04-17 22:11:59 +00:00
parent a6189fce6b
commit 82da8af02b
+10 -1
View File
@@ -177,9 +177,15 @@ async def main() -> None:
ticker, ticker,
) )
# Enqueue recommendation job for each window that produced a trend # Enqueue recommendation job for each window that produced a trend.
# Use Redis dedup to avoid flooding the recommendation queue
# with duplicate ticker+window jobs that haven't been processed yet.
for summary in summaries: for summary in summaries:
if summary.trend_strength > 0: if summary.trend_strength > 0:
dedup_key = f"stonks:rec_dedup:{ticker}:{summary.window.value}"
already = await redis_client.get(dedup_key)
if already:
continue
await redis_client.rpush( await redis_client.rpush(
rec_queue, rec_queue,
json.dumps(inject_trace_context({ json.dumps(inject_trace_context({
@@ -187,6 +193,9 @@ async def main() -> None:
"window": summary.window.value, "window": summary.window.value,
})), })),
) )
# Mark as enqueued for 5 minutes — enough time for
# the recommendation worker to process it.
await redis_client.set(dedup_key, "1", ex=300)
except Exception: except Exception:
logger.exception("Aggregation failed for %s", ticker) logger.exception("Aggregation failed for %s", ticker)
finally: finally: