From 6f54fd07fa4d856775cc9346d6bfaf30e22a6748 Mon Sep 17 00:00:00 2001 From: Celes Renata Date: Wed, 29 Apr 2026 18:27:49 +0000 Subject: [PATCH] feat: periodic aggregation every 15 minutes during market hours The aggregation engine only ran when new documents were ingested, leaving intraday trend data stale for long periods. Now the scheduler enqueues all 50 tickers for re-aggregation every ~15 minutes during US market hours (Mon-Fri, 6:30 AM - 1:30 PM PT). This ensures continuous intraday trend updates based on existing signals and market price changes. --- services/scheduler/app.py | 47 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/services/scheduler/app.py b/services/scheduler/app.py index f958e80..919f0b6 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -21,6 +21,7 @@ from services.shared.db import get_pg_pool, get_redis from services.shared.logging import setup_logging from services.shared.redis_keys import ( PIPELINE_ENABLED_KEY, + QUEUE_AGGREGATION, QUEUE_EXTRACTION, QUEUE_INGESTION, QUEUE_MACRO_CLASSIFICATION, @@ -82,6 +83,10 @@ MAX_RETRY_COUNT: int = 10 # Main loop interval (seconds) SCHEDULER_TICK: int = 15 +# Periodic aggregation: re-aggregate all tickers every N cycles during market hours +# 15s tick × 60 cycles = 15 minutes +AGGREGATION_CYCLE_INTERVAL: int = 60 + def get_cadence_for_source(source_type: str, config: Optional[dict[str, Any]]) -> int: """Return the polling interval for a source. @@ -493,6 +498,42 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: return enqueued +async def enqueue_periodic_aggregation(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: + """Enqueue aggregation jobs for all active tickers. + + Runs periodically during market hours to ensure trend data stays fresh + even when no new documents are being ingested. This gives the intraday + and 1d windows continuous updates based on existing signals and market + price changes. + """ + # Only run during US market hours (Mon-Fri, 6:30 AM - 1:30 PM PT / 13:30-20:30 UTC) + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + weekday = now.weekday() # 0=Mon, 6=Sun + hour_utc = now.hour + now.minute / 60.0 + + if weekday >= 5: # Weekend + return 0 + if hour_utc < 13.5 or hour_utc > 20.5: # Outside market hours (with 30min buffer) + return 0 + + # Fetch all active tickers + rows = await pool.fetch( + "SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker" + ) + if not rows: + return 0 + + agg_queue = queue_key(QUEUE_AGGREGATION) + count = 0 + for row in rows: + await rds.rpush(agg_queue, json.dumps({"ticker": row["ticker"]})) + count += 1 + + logger.info("Periodic aggregation: enqueued %d tickers for re-aggregation", count) + return count + + async def main() -> None: config = load_config() setup_logging("scheduler", level=config.log_level, json_output=config.json_logs) @@ -513,6 +554,7 @@ async def main() -> None: recovery_counter = 0 retry_counter = 0 cleanup_counter = 0 + aggregation_counter = 0 try: while True: try: @@ -540,6 +582,11 @@ async def main() -> None: if cleanup_counter >= CLEANUP_CYCLE_INTERVAL: cleanup_counter = 0 await cleanup_all_tables(pool) + # Periodic aggregation during market hours (~15 minutes) + aggregation_counter += 1 + if aggregation_counter >= AGGREGATION_CYCLE_INTERVAL: + aggregation_counter = 0 + await enqueue_periodic_aggregation(pool, rds) finally: await release_lock(rds, "scheduler_cycle") except Exception: