Files
stonks-oracle/services/aggregation/main.py
T

208 lines
7.5 KiB
Python

"""Aggregation worker entrypoint - polls Redis for aggregation jobs.
After computing trend summaries for a ticker, the worker also triggers
competitive signal propagation for the ticker's competitors when the
competitive layer is enabled. This ensures that document intelligence
for one company produces competitive signals for related companies.
Requirements: 4.1, 5.1, 9.4
"""
from __future__ import annotations
import asyncio
import json
import logging
import asyncpg
import redis.asyncio as aioredis
from services.aggregation.signal_propagation import propagate_signals
from services.aggregation.worker import aggregate_company, fetch_competitive_enabled
from services.shared.config import CompetitiveConfig, load_config
from services.shared.logging import inject_trace_context, setup_logging
from services.shared.redis_keys import (
QUEUE_AGGREGATION,
QUEUE_RECOMMENDATION,
queue_key,
)
logger = logging.getLogger("aggregation_main")
# ---------------------------------------------------------------------------
# Query to fetch recent document intelligence records for a ticker.
# Used to trigger signal propagation after aggregation completes.
# ---------------------------------------------------------------------------
_RECENT_INTELLIGENCE_QUERY = """
SELECT
di.document_id,
dir.catalyst_type,
dir.impact_score
FROM document_impact_records dir
JOIN document_intelligence di ON di.id = dir.intelligence_id
JOIN documents d ON d.id = di.document_id
WHERE dir.ticker = $1
AND di.validation_status = 'valid'
AND d.status != 'rejected'
AND d.created_at >= NOW() - INTERVAL '1 hour'
AND NOT EXISTS (
SELECT 1 FROM competitive_signal_records csr
WHERE csr.source_document_id = di.document_id
AND csr.source_ticker = $1
)
ORDER BY d.published_at DESC
LIMIT 10
"""
# Track consecutive propagation failures for alerting (Requirement 9.4)
_propagation_consecutive_failures = 0
async def _trigger_signal_propagation(
pool: asyncpg.Pool,
ticker: str,
competitive_config: CompetitiveConfig,
) -> int:
"""Trigger competitive signal propagation for a ticker's recent documents.
Fetches recent document intelligence records for the ticker and calls
propagate_signals for each, producing competitive signals for the
ticker's competitors.
Returns the total number of competitive signals produced.
"""
global _propagation_consecutive_failures
rows = await pool.fetch(_RECENT_INTELLIGENCE_QUERY, ticker)
if not rows:
return 0
total_signals = 0
for row in rows:
document_id = str(row["document_id"])
catalyst_type = row["catalyst_type"] or "other"
impact_score = float(row["impact_score"] or 0.0)
if impact_score <= 0.0:
continue
try:
records = await propagate_signals(
pool=pool,
ticker=ticker,
catalyst_type=catalyst_type,
impact_score=impact_score,
document_id=document_id,
config=competitive_config,
)
total_signals += len(records)
# Reset failure counter on success
_propagation_consecutive_failures = 0
except Exception:
_propagation_consecutive_failures += 1
logger.exception(
"Signal propagation failed for %s doc %s/%s",
ticker, document_id, catalyst_type,
)
if _propagation_consecutive_failures >= competitive_config.propagation_failure_threshold:
logger.critical(
"ALERT: Sustained signal propagation failures (%d consecutive). "
"Continuing with company-specific + macro signals only. "
"Operator action required.",
_propagation_consecutive_failures,
)
# Stop trying propagation for this ticker after threshold
break
return total_signals
async def main() -> None:
config = load_config()
setup_logging("aggregation", level=config.log_level, json_output=config.json_logs)
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
redis_client = aioredis.from_url(config.redis.url)
queue = queue_key(QUEUE_AGGREGATION)
rec_queue = queue_key(QUEUE_RECOMMENDATION)
competitive_config = config.competitive
logger.info("Aggregation worker started, polling %s", queue)
try:
while True:
raw = await redis_client.lpop(queue)
if raw is None:
await asyncio.sleep(1)
continue
job = json.loads(raw)
ticker = job.get("ticker", "")
logger.info("Processing aggregation job for %s", ticker)
try:
summaries = await aggregate_company(pool, ticker)
logger.info(
"Aggregation complete for %s: %d windows",
ticker, len(summaries),
)
# Trigger competitive signal propagation after aggregation
# (Requirement 4.1): When new document intelligence is
# produced for a company, propagate signals to competitors.
# Check toggle state from DB (same pattern as macro toggle).
competitive_enabled = competitive_config.competitive_enabled
db_toggle = await fetch_competitive_enabled(pool)
if db_toggle is not None:
competitive_enabled = db_toggle
if competitive_enabled:
try:
sig_count = await _trigger_signal_propagation(
pool, ticker, competitive_config,
)
if sig_count > 0:
logger.info(
"Propagated %d competitive signals for %s",
sig_count, ticker,
)
except Exception:
logger.exception(
"Signal propagation failed for %s"
"continuing with company+macro signals only",
ticker,
)
# Enqueue recommendation job for each window that produced a trend.
# Use Redis dedup to avoid flooding the recommendation queue
# with duplicate ticker+window jobs that haven't been processed yet.
for summary in summaries:
if summary.trend_strength > 0:
dedup_key = f"stonks:rec_dedup:{ticker}:{summary.window.value}"
already = await redis_client.get(dedup_key)
if already:
continue
await redis_client.rpush(
rec_queue,
json.dumps(inject_trace_context({
"ticker": ticker,
"window": summary.window.value,
})),
)
# Mark as enqueued for 5 minutes — enough time for
# the recommendation worker to process it.
await redis_client.set(dedup_key, "1", ex=300)
except Exception:
logger.exception("Aggregation failed for %s", ticker)
finally:
await pool.close()
await redis_client.close()
if __name__ == "__main__":
asyncio.run(main())