feat: periodic aggregation every 15 minutes during market hours
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
The aggregation engine only ran when new documents were ingested, leaving intraday trend data stale for long periods. Now the scheduler enqueues all 50 tickers for re-aggregation every ~15 minutes during US market hours (Mon-Fri, 6:30 AM - 1:30 PM PT). This ensures continuous intraday trend updates based on existing signals and market price changes.
This commit is contained in:
@@ -21,6 +21,7 @@ from services.shared.db import get_pg_pool, get_redis
|
||||
from services.shared.logging import setup_logging
|
||||
from services.shared.redis_keys import (
|
||||
PIPELINE_ENABLED_KEY,
|
||||
QUEUE_AGGREGATION,
|
||||
QUEUE_EXTRACTION,
|
||||
QUEUE_INGESTION,
|
||||
QUEUE_MACRO_CLASSIFICATION,
|
||||
@@ -82,6 +83,10 @@ MAX_RETRY_COUNT: int = 10
|
||||
# Main loop interval (seconds)
|
||||
SCHEDULER_TICK: int = 15
|
||||
|
||||
# Periodic aggregation: re-aggregate all tickers every N cycles during market hours
|
||||
# 15s tick × 60 cycles = 15 minutes
|
||||
AGGREGATION_CYCLE_INTERVAL: int = 60
|
||||
|
||||
|
||||
def get_cadence_for_source(source_type: str, config: Optional[dict[str, Any]]) -> int:
|
||||
"""Return the polling interval for a source.
|
||||
@@ -493,6 +498,42 @@ async def schedule_cycle(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
||||
return enqueued
|
||||
|
||||
|
||||
async def enqueue_periodic_aggregation(pool: asyncpg.Pool, rds: aioredis.Redis) -> int:
|
||||
"""Enqueue aggregation jobs for all active tickers.
|
||||
|
||||
Runs periodically during market hours to ensure trend data stays fresh
|
||||
even when no new documents are being ingested. This gives the intraday
|
||||
and 1d windows continuous updates based on existing signals and market
|
||||
price changes.
|
||||
"""
|
||||
# Only run during US market hours (Mon-Fri, 6:30 AM - 1:30 PM PT / 13:30-20:30 UTC)
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc)
|
||||
weekday = now.weekday() # 0=Mon, 6=Sun
|
||||
hour_utc = now.hour + now.minute / 60.0
|
||||
|
||||
if weekday >= 5: # Weekend
|
||||
return 0
|
||||
if hour_utc < 13.5 or hour_utc > 20.5: # Outside market hours (with 30min buffer)
|
||||
return 0
|
||||
|
||||
# Fetch all active tickers
|
||||
rows = await pool.fetch(
|
||||
"SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker"
|
||||
)
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
agg_queue = queue_key(QUEUE_AGGREGATION)
|
||||
count = 0
|
||||
for row in rows:
|
||||
await rds.rpush(agg_queue, json.dumps({"ticker": row["ticker"]}))
|
||||
count += 1
|
||||
|
||||
logger.info("Periodic aggregation: enqueued %d tickers for re-aggregation", count)
|
||||
return count
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
config = load_config()
|
||||
setup_logging("scheduler", level=config.log_level, json_output=config.json_logs)
|
||||
@@ -513,6 +554,7 @@ async def main() -> None:
|
||||
recovery_counter = 0
|
||||
retry_counter = 0
|
||||
cleanup_counter = 0
|
||||
aggregation_counter = 0
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
@@ -540,6 +582,11 @@ async def main() -> None:
|
||||
if cleanup_counter >= CLEANUP_CYCLE_INTERVAL:
|
||||
cleanup_counter = 0
|
||||
await cleanup_all_tables(pool)
|
||||
# Periodic aggregation during market hours (~15 minutes)
|
||||
aggregation_counter += 1
|
||||
if aggregation_counter >= AGGREGATION_CYCLE_INTERVAL:
|
||||
aggregation_counter = 0
|
||||
await enqueue_periodic_aggregation(pool, rds)
|
||||
finally:
|
||||
await release_lock(rds, "scheduler_cycle")
|
||||
except Exception:
|
||||
|
||||
Reference in New Issue
Block a user