Files
stonks-oracle/services/aggregation/main.py
T

58 lines
1.6 KiB
Python

"""Aggregation worker entrypoint - polls Redis for aggregation jobs."""
from __future__ import annotations
import asyncio
import json
import logging
import asyncpg
from services.aggregation.worker import aggregate_company
from services.shared.config import load_config
from services.shared.logging import setup_logging
from services.shared.redis_keys import QUEUE_AGGREGATION, queue_key
logger = logging.getLogger("aggregation_main")
async def main() -> None:
config = load_config()
setup_logging("aggregation", level=config.log_level, json_output=config.json_logs)
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
import redis.asyncio as aioredis
redis_client = aioredis.from_url(config.redis.url)
queue = queue_key(QUEUE_AGGREGATION)
logger.info("Aggregation worker started, polling %s", queue)
try:
while True:
raw = await redis_client.lpop(queue)
if raw is None:
await asyncio.sleep(1)
continue
payload = raw
job = json.loads(payload)
ticker = job.get("ticker", "")
logger.info("Processing aggregation job for %s", ticker)
try:
summaries = await aggregate_company(pool, ticker)
logger.info(
"Aggregation complete for %s: %d windows",
ticker, len(summaries),
)
except Exception:
logger.exception("Aggregation failed for %s", ticker)
finally:
await pool.close()
await redis_client.close()
if __name__ == "__main__":
asyncio.run(main())