8c3c1aab43
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
Workers (ingestion, parser, extractor, aggregation, recommendation, broker, lake-publisher) now check the pipeline:enabled Redis flag on each loop iteration and sleep when disabled. The toggle endpoint flushes all pipeline queues on disable so queued jobs don't resume when workers eventually check. Broker/trading queues are excluded from flush to avoid dropping in-flight orders.
125 lines
4.6 KiB
Python
125 lines
4.6 KiB
Python
"""Recommendation worker entrypoint - polls Redis for recommendation jobs."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
import asyncpg
|
|
from minio import Minio
|
|
|
|
from services.recommendation.worker import generate_recommendation
|
|
from services.shared.agent_config import AgentConfigResolver
|
|
from services.shared.config import OllamaConfig, load_config
|
|
from services.shared.logging import setup_logging
|
|
from services.shared.redis_keys import QUEUE_RECOMMENDATION, is_pipeline_enabled, queue_key
|
|
|
|
logger = logging.getLogger("recommendation_main")
|
|
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
setup_logging("recommendation", level=config.log_level, json_output=config.json_logs)
|
|
|
|
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
|
|
minio_client = Minio(
|
|
config.minio.endpoint,
|
|
access_key=config.minio.access_key,
|
|
secret_key=config.minio.secret_key,
|
|
secure=config.minio.secure,
|
|
)
|
|
|
|
import redis.asyncio as aioredis
|
|
|
|
redis_client = aioredis.from_url(config.redis.url)
|
|
queue = queue_key(QUEUE_RECOMMENDATION)
|
|
|
|
# Resolve thesis rewriter config from DB
|
|
resolver = AgentConfigResolver(pool, ttl_seconds=60)
|
|
ollama_config: OllamaConfig | None = None
|
|
try:
|
|
resolved = await resolver.resolve("thesis-rewriter")
|
|
if resolved is not None:
|
|
ollama_config = OllamaConfig(
|
|
base_url=config.ollama.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
max_tokens=resolved.max_tokens,
|
|
)
|
|
logger.info(
|
|
"Thesis rewriter enabled: model=%s variant=%s",
|
|
resolved.model_name, resolved.variant_id,
|
|
)
|
|
else:
|
|
logger.info("No DB config for thesis-rewriter — thesis rewriting disabled")
|
|
except Exception:
|
|
logger.warning("Failed to resolve thesis-rewriter config — thesis rewriting disabled", exc_info=True)
|
|
|
|
logger.info("Recommendation worker started, polling %s", queue)
|
|
|
|
refresh_counter = 0
|
|
|
|
try:
|
|
while True:
|
|
if not await is_pipeline_enabled(redis_client):
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
raw = await redis_client.lpop(queue)
|
|
if raw is None:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
payload = raw
|
|
job = json.loads(payload)
|
|
ticker = job.get("ticker", "")
|
|
window = job.get("window", "7d")
|
|
|
|
logger.info("Processing recommendation job for %s/%s", ticker, window)
|
|
|
|
# Refresh resolver every 50 jobs to pick up config changes
|
|
refresh_counter += 1
|
|
if refresh_counter % 50 == 0:
|
|
try:
|
|
resolved = await resolver.resolve("thesis-rewriter")
|
|
if resolved is not None:
|
|
new_config = OllamaConfig(
|
|
base_url=config.ollama.base_url,
|
|
model=resolved.model_name,
|
|
timeout=resolved.timeout_seconds,
|
|
max_retries=resolved.max_retries,
|
|
max_tokens=resolved.max_tokens,
|
|
)
|
|
if ollama_config is None or new_config.model != ollama_config.model:
|
|
logger.info("Thesis rewriter config updated: model=%s", resolved.model_name)
|
|
ollama_config = new_config
|
|
elif ollama_config is not None:
|
|
logger.info("Thesis rewriter disabled — skipping LLM thesis rewrite")
|
|
ollama_config = None
|
|
except Exception:
|
|
logger.warning("Failed to refresh thesis-rewriter config", exc_info=True)
|
|
|
|
try:
|
|
rec = await generate_recommendation(
|
|
pool, ticker, window,
|
|
minio_client=minio_client,
|
|
ollama_config=ollama_config,
|
|
)
|
|
if rec:
|
|
logger.info(
|
|
"Recommendation generated for %s: %s %s",
|
|
ticker, rec.action.value, rec.mode.value,
|
|
)
|
|
else:
|
|
logger.info("No recommendation generated for %s (no trend data)", ticker)
|
|
except Exception:
|
|
logger.exception("Recommendation failed for %s", ticker)
|
|
finally:
|
|
await pool.close()
|
|
await redis_client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|