diff --git a/services/scheduler/app.py b/services/scheduler/app.py index f958e80..919f0b6 100644 --- a/services/scheduler/app.py +++ b/services/scheduler/app.py @@ -21,6 +21,7 @@ from services.shared.db import get_pg_pool, get_redis from services.shared.logging import setup_logging from services.shared.redis_keys import ( PIPELINE_ENABLED_KEY, + QUEUE_AGGREGATION, QUEUE_EXTRACTION, QUEUE_INGESTION, QUEUE_MACRO_CLASSIFICATION, @@ -82,6 +83,10 @@ MAX_RETRY_COUNT: int = 10 # Main loop interval (seconds) SCHEDULER_TICK: int = 15 +# Periodic aggregation: re-aggregate all tickers every N cycles during market hours +# 15s tick × 60 cycles = 15 minutes +AGGREGATION_CYCLE_INTERVAL: int = 60 + def get_cadence_for_source(source_type: str, config: Optional[dict[str, Any]]) -> int: """Return the polling interval for a source. @@ -493,6 +498,42 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: return enqueued +async def enqueue_periodic_aggregation(pool: asyncpg.Pool, rds: aioredis.Redis) -> int: + """Enqueue aggregation jobs for all active tickers. + + Runs periodically during market hours to ensure trend data stays fresh + even when no new documents are being ingested. This gives the intraday + and 1d windows continuous updates based on existing signals and market + price changes. + """ + # Only run during US market hours (Mon-Fri, 6:30 AM - 1:30 PM PT / 13:30-20:30 UTC) + from datetime import datetime, timezone + now = datetime.now(timezone.utc) + weekday = now.weekday() # 0=Mon, 6=Sun + hour_utc = now.hour + now.minute / 60.0 + + if weekday >= 5: # Weekend + return 0 + if hour_utc < 13.5 or hour_utc > 20.5: # Outside market hours (with 30min buffer) + return 0 + + # Fetch all active tickers + rows = await pool.fetch( + "SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker" + ) + if not rows: + return 0 + + agg_queue = queue_key(QUEUE_AGGREGATION) + count = 0 + for row in rows: + await rds.rpush(agg_queue, json.dumps({"ticker": row["ticker"]})) + count += 1 + + logger.info("Periodic aggregation: enqueued %d tickers for re-aggregation", count) + return count + + async def main() -> None: config = load_config() setup_logging("scheduler", level=config.log_level, json_output=config.json_logs) @@ -513,6 +554,7 @@ async def main() -> None: recovery_counter = 0 retry_counter = 0 cleanup_counter = 0 + aggregation_counter = 0 try: while True: try: @@ -540,6 +582,11 @@ async def main() -> None: if cleanup_counter >= CLEANUP_CYCLE_INTERVAL: cleanup_counter = 0 await cleanup_all_tables(pool) + # Periodic aggregation during market hours (~15 minutes) + aggregation_counter += 1 + if aggregation_counter >= AGGREGATION_CYCLE_INTERVAL: + aggregation_counter = 0 + await enqueue_periodic_aggregation(pool, rds) finally: await release_lock(rds, "scheduler_cycle") except Exception: