71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
"""Aggregation worker entrypoint - polls Redis for aggregation jobs."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
import asyncpg
|
|
import redis.asyncio as aioredis
|
|
|
|
from services.aggregation.worker import aggregate_company
|
|
from services.shared.config import load_config
|
|
from services.shared.logging import inject_trace_context, setup_logging
|
|
from services.shared.redis_keys import (
|
|
QUEUE_AGGREGATION,
|
|
QUEUE_RECOMMENDATION,
|
|
queue_key,
|
|
)
|
|
|
|
logger = logging.getLogger("aggregation_main")
|
|
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
setup_logging("aggregation", level=config.log_level, json_output=config.json_logs)
|
|
|
|
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
|
|
redis_client = aioredis.from_url(config.redis.url)
|
|
queue = queue_key(QUEUE_AGGREGATION)
|
|
rec_queue = queue_key(QUEUE_RECOMMENDATION)
|
|
logger.info("Aggregation worker started, polling %s", queue)
|
|
|
|
try:
|
|
while True:
|
|
raw = await redis_client.lpop(queue)
|
|
if raw is None:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
job = json.loads(raw)
|
|
ticker = job.get("ticker", "")
|
|
|
|
logger.info("Processing aggregation job for %s", ticker)
|
|
|
|
try:
|
|
summaries = await aggregate_company(pool, ticker)
|
|
logger.info(
|
|
"Aggregation complete for %s: %d windows",
|
|
ticker, len(summaries),
|
|
)
|
|
|
|
# Enqueue recommendation job for each window that produced a trend
|
|
for summary in summaries:
|
|
if summary.trend_strength > 0:
|
|
await redis_client.rpush(
|
|
rec_queue,
|
|
json.dumps(inject_trace_context({
|
|
"ticker": ticker,
|
|
"window": summary.window.value,
|
|
})),
|
|
)
|
|
except Exception:
|
|
logger.exception("Aggregation failed for %s", ticker)
|
|
finally:
|
|
await pool.close()
|
|
await redis_client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|