Files
stonks-oracle/services/recommendation/main.py
T

72 lines
2.2 KiB
Python

"""Recommendation worker entrypoint - polls Redis for recommendation jobs."""
from __future__ import annotations
import asyncio
import json
import logging
import asyncpg
from minio import Minio
from services.recommendation.worker import generate_recommendation
from services.shared.config import load_config
from services.shared.logging import setup_logging
from services.shared.redis_keys import QUEUE_RECOMMENDATION, queue_key
logger = logging.getLogger("recommendation_main")
async def main() -> None:
config = load_config()
setup_logging("recommendation", level=config.log_level, json_output=config.json_logs)
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
minio_client = Minio(
config.minio.endpoint,
access_key=config.minio.access_key,
secret_key=config.minio.secret_key,
secure=config.minio.secure,
)
import redis.asyncio as aioredis
redis_client = aioredis.from_url(config.redis.url)
queue = queue_key(QUEUE_RECOMMENDATION)
logger.info("Recommendation worker started, polling %s", queue)
try:
while True:
raw = await redis_client.lpop(queue)
if raw is None:
await asyncio.sleep(1)
continue
payload = raw
job = json.loads(payload)
ticker = job.get("ticker", "")
window = job.get("window", "7d")
logger.info("Processing recommendation job for %s/%s", ticker, window)
try:
rec = await generate_recommendation(
pool, ticker, window,
minio_client=minio_client,
)
if rec:
logger.info(
"Recommendation generated for %s: %s %s",
ticker, rec.action.value, rec.mode.value,
)
else:
logger.info("No recommendation generated for %s (no trend data)", ticker)
except Exception:
logger.exception("Recommendation failed for %s", ticker)
finally:
await pool.close()
await redis_client.close()
if __name__ == "__main__":
asyncio.run(main())