199 lines
6.9 KiB
Python
199 lines
6.9 KiB
Python
"""Aggregation worker entrypoint - polls Redis for aggregation jobs.
|
|
|
|
After computing trend summaries for a ticker, the worker also triggers
|
|
competitive signal propagation for the ticker's competitors when the
|
|
competitive layer is enabled. This ensures that document intelligence
|
|
for one company produces competitive signals for related companies.
|
|
|
|
Requirements: 4.1, 5.1, 9.4
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
import asyncpg
|
|
import redis.asyncio as aioredis
|
|
|
|
from services.aggregation.signal_propagation import propagate_signals
|
|
from services.aggregation.worker import aggregate_company, fetch_competitive_enabled
|
|
from services.shared.config import CompetitiveConfig, load_config
|
|
from services.shared.logging import inject_trace_context, setup_logging
|
|
from services.shared.redis_keys import (
|
|
QUEUE_AGGREGATION,
|
|
QUEUE_RECOMMENDATION,
|
|
queue_key,
|
|
)
|
|
|
|
logger = logging.getLogger("aggregation_main")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query to fetch recent document intelligence records for a ticker.
|
|
# Used to trigger signal propagation after aggregation completes.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_RECENT_INTELLIGENCE_QUERY = """
|
|
SELECT
|
|
di.document_id,
|
|
dir.catalyst_type,
|
|
dir.impact_score
|
|
FROM document_impact_records dir
|
|
JOIN document_intelligence di ON di.id = dir.intelligence_id
|
|
JOIN documents d ON d.id = di.document_id
|
|
WHERE dir.ticker = $1
|
|
AND di.validation_status = 'valid'
|
|
AND d.status != 'rejected'
|
|
AND d.created_at >= NOW() - INTERVAL '1 hour'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM competitive_signal_records csr
|
|
WHERE csr.source_document_id = di.document_id
|
|
AND csr.source_ticker = $1
|
|
)
|
|
ORDER BY d.published_at DESC
|
|
LIMIT 10
|
|
"""
|
|
|
|
|
|
# Track consecutive propagation failures for alerting (Requirement 9.4)
|
|
_propagation_consecutive_failures = 0
|
|
|
|
|
|
async def _trigger_signal_propagation(
|
|
pool: asyncpg.Pool,
|
|
ticker: str,
|
|
competitive_config: CompetitiveConfig,
|
|
) -> int:
|
|
"""Trigger competitive signal propagation for a ticker's recent documents.
|
|
|
|
Fetches recent document intelligence records for the ticker and calls
|
|
propagate_signals for each, producing competitive signals for the
|
|
ticker's competitors.
|
|
|
|
Returns the total number of competitive signals produced.
|
|
"""
|
|
global _propagation_consecutive_failures
|
|
|
|
rows = await pool.fetch(_RECENT_INTELLIGENCE_QUERY, ticker)
|
|
if not rows:
|
|
return 0
|
|
|
|
total_signals = 0
|
|
for row in rows:
|
|
document_id = str(row["document_id"])
|
|
catalyst_type = row["catalyst_type"] or "other"
|
|
impact_score = float(row["impact_score"] or 0.0)
|
|
|
|
if impact_score <= 0.0:
|
|
continue
|
|
|
|
try:
|
|
records = await propagate_signals(
|
|
pool=pool,
|
|
ticker=ticker,
|
|
catalyst_type=catalyst_type,
|
|
impact_score=impact_score,
|
|
document_id=document_id,
|
|
config=competitive_config,
|
|
)
|
|
total_signals += len(records)
|
|
|
|
# Reset failure counter on success
|
|
_propagation_consecutive_failures = 0
|
|
|
|
except Exception:
|
|
_propagation_consecutive_failures += 1
|
|
logger.exception(
|
|
"Signal propagation failed for %s doc %s/%s",
|
|
ticker, document_id, catalyst_type,
|
|
)
|
|
if _propagation_consecutive_failures >= competitive_config.propagation_failure_threshold:
|
|
logger.critical(
|
|
"ALERT: Sustained signal propagation failures (%d consecutive). "
|
|
"Continuing with company-specific + macro signals only. "
|
|
"Operator action required.",
|
|
_propagation_consecutive_failures,
|
|
)
|
|
# Stop trying propagation for this ticker after threshold
|
|
break
|
|
|
|
return total_signals
|
|
|
|
|
|
async def main() -> None:
|
|
config = load_config()
|
|
setup_logging("aggregation", level=config.log_level, json_output=config.json_logs)
|
|
|
|
pool = await asyncpg.create_pool(dsn=config.postgres.dsn, min_size=2, max_size=8)
|
|
redis_client = aioredis.from_url(config.redis.url)
|
|
queue = queue_key(QUEUE_AGGREGATION)
|
|
rec_queue = queue_key(QUEUE_RECOMMENDATION)
|
|
competitive_config = config.competitive
|
|
logger.info("Aggregation worker started, polling %s", queue)
|
|
|
|
try:
|
|
while True:
|
|
raw = await redis_client.lpop(queue)
|
|
if raw is None:
|
|
await asyncio.sleep(1)
|
|
continue
|
|
|
|
job = json.loads(raw)
|
|
ticker = job.get("ticker", "")
|
|
|
|
logger.info("Processing aggregation job for %s", ticker)
|
|
|
|
try:
|
|
summaries = await aggregate_company(pool, ticker)
|
|
logger.info(
|
|
"Aggregation complete for %s: %d windows",
|
|
ticker, len(summaries),
|
|
)
|
|
|
|
# Trigger competitive signal propagation after aggregation
|
|
# (Requirement 4.1): When new document intelligence is
|
|
# produced for a company, propagate signals to competitors.
|
|
# Check toggle state from DB (same pattern as macro toggle).
|
|
competitive_enabled = competitive_config.competitive_enabled
|
|
db_toggle = await fetch_competitive_enabled(pool)
|
|
if db_toggle is not None:
|
|
competitive_enabled = db_toggle
|
|
|
|
if competitive_enabled:
|
|
try:
|
|
sig_count = await _trigger_signal_propagation(
|
|
pool, ticker, competitive_config,
|
|
)
|
|
if sig_count > 0:
|
|
logger.info(
|
|
"Propagated %d competitive signals for %s",
|
|
sig_count, ticker,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"Signal propagation failed for %s — "
|
|
"continuing with company+macro signals only",
|
|
ticker,
|
|
)
|
|
|
|
# Enqueue recommendation job for each window that produced a trend
|
|
for summary in summaries:
|
|
if summary.trend_strength > 0:
|
|
await redis_client.rpush(
|
|
rec_queue,
|
|
json.dumps(inject_trace_context({
|
|
"ticker": ticker,
|
|
"window": summary.window.value,
|
|
})),
|
|
)
|
|
except Exception:
|
|
logger.exception("Aggregation failed for %s", ticker)
|
|
finally:
|
|
await pool.close()
|
|
await redis_client.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|