"""Input Normalizer — fetches and assembles NormalizedInput for a single tick. Queries multiple data sources (market snapshots, trend windows, earnings calendar, macro impact records, position stop levels) and assembles them into a single ``NormalizedInput`` consumed by both pipelines. Missing data sources produce sentinel values (``None`` / empty list) with a logged warning — the normalizer never crashes on unavailable data. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5 """ from __future__ import annotations import asyncio import logging from datetime import datetime, timezone import asyncpg from .config import SignalEngineConfig from .models import NormalizedInput, OHLCVBar, OpenPositionState logger = logging.getLogger(__name__) # Timeframes the signal engine evaluates, ordered shortest → longest. TIMEFRAMES = ("M30", "H1", "H4", "D", "W", "M") # --------------------------------------------------------------------------- # Direction → numeric bias mapping (same semantics as aggregation worker) # --------------------------------------------------------------------------- _DIRECTION_TO_BIAS: dict[str, float] = { "positive": 1.0, "negative": -1.0, "mixed": 0.0, "neutral": 0.0, } # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _validate_monotonic_timestamps( bars: list[OHLCVBar], timeframe: str, ticker: str, ) -> list[OHLCVBar]: """Return *bars* sorted by timestamp, warning on non-monotonic input. If timestamps are already strictly increasing the list is returned unchanged. Otherwise the bars are sorted and a warning is logged. """ if len(bars) <= 1: return bars is_monotonic = all( bars[i].timestamp < bars[i + 1].timestamp for i in range(len(bars) - 1) ) if is_monotonic: return bars logger.warning( "%s/%s: OHLCV timestamps not monotonically increasing — sorting", ticker, timeframe, ) return sorted(bars, key=lambda b: b.timestamp) def _polygon_bar_to_ohlcv(row: asyncpg.Record) -> OHLCVBar | None: """Convert a market_snapshots row (JSONB data column) to an OHLCVBar. Polygon bar format stored in ``data``: t — timestamp in epoch milliseconds o — open h — high l — low c — close v — volume Returns ``None`` if the row cannot be parsed. """ data = row["data"] if not isinstance(data, dict): return None try: ts_ms = data.get("t") if ts_ms is None: return None return OHLCVBar( timestamp=datetime.fromtimestamp(int(ts_ms) / 1000, tz=timezone.utc), open=float(data.get("o", 0)), high=float(data.get("h", 0)), low=float(data.get("l", 0)), close=float(data.get("c", 0)), volume=float(data.get("v", 0)), ) except (TypeError, ValueError, OverflowError): return None # --------------------------------------------------------------------------- # Data-source fetchers # --------------------------------------------------------------------------- async def _fetch_bars( pool: asyncpg.Pool, ticker: str, ) -> dict[str, list[OHLCVBar]]: """Fetch OHLCV bars from ``market_snapshots`` for all timeframes. The current database stores daily bars (``snapshot_type = 'bar'``) from Polygon. Intraday bars are stored with ``snapshot_type = 'intraday_bar'`` when available. For timeframes that have no dedicated data yet (H4, W, M) we derive them from daily bars where possible: - **W** (weekly): group daily bars by ISO week. - **M** (monthly): group daily bars by calendar month. - **H4 / H1 / M30**: sourced from intraday snapshots when present; otherwise left empty. Returns a dict keyed by timeframe label with validated bar lists. """ bars: dict[str, list[OHLCVBar]] = {tf: [] for tf in TIMEFRAMES} # --- Daily bars -------------------------------------------------------- try: rows = await pool.fetch( "SELECT data FROM market_snapshots " "WHERE ticker = $1 AND snapshot_type = 'bar' " "ORDER BY captured_at ASC", ticker, ) daily: list[OHLCVBar] = [] for row in rows: bar = _polygon_bar_to_ohlcv(row) if bar is not None: daily.append(bar) bars["D"] = daily except Exception: logger.warning("%s: failed to fetch daily bars", ticker, exc_info=True) # --- Intraday bars (M30, H1) ------------------------------------------ try: intraday_rows = await pool.fetch( "SELECT data FROM market_snapshots " "WHERE ticker = $1 AND snapshot_type = 'intraday_bar' " "ORDER BY captured_at ASC", ticker, ) intraday: list[OHLCVBar] = [] for row in intraday_rows: bar = _polygon_bar_to_ohlcv(row) if bar is not None: intraday.append(bar) # Assign intraday bars to M30 and H1 buckets. # The actual timespan depends on the source config; we store them # under M30 (shortest) and duplicate to H1 for now. When dedicated # H1 bars are ingested they will replace this. if intraday: bars["M30"] = intraday bars["H1"] = intraday except Exception: logger.warning("%s: failed to fetch intraday bars", ticker, exc_info=True) # --- Derive H4 from intraday (4-hour grouping) ------------------------ # Left empty when no intraday data — sentinel value per Req 1.3. # --- Derive weekly bars from daily ------------------------------------ if bars["D"]: bars["W"] = _aggregate_bars_by_period(bars["D"], period="week") # --- Derive monthly bars from daily ----------------------------------- if bars["D"]: bars["M"] = _aggregate_bars_by_period(bars["D"], period="month") return bars def _aggregate_bars_by_period( daily_bars: list[OHLCVBar], period: str, ) -> list[OHLCVBar]: """Aggregate daily bars into weekly or monthly bars. Groups by ISO week (period="week") or calendar month (period="month"), then computes OHLCV aggregates per group. """ from collections import OrderedDict groups: OrderedDict[tuple[int, int], list[OHLCVBar]] = OrderedDict() for bar in daily_bars: if period == "week": iso = bar.timestamp.isocalendar() key = (iso[0], iso[1]) # (year, week) else: key = (bar.timestamp.year, bar.timestamp.month) groups.setdefault(key, []).append(bar) result: list[OHLCVBar] = [] for group_bars in groups.values(): if not group_bars: continue result.append( OHLCVBar( timestamp=group_bars[0].timestamp, # period open timestamp open=group_bars[0].open, high=max(b.high for b in group_bars), low=min(b.low for b in group_bars), close=group_bars[-1].close, volume=sum(b.volume for b in group_bars), ) ) return result async def _fetch_fundamentals( pool: asyncpg.Pool, ticker: str, ) -> tuple[float | None, int | None]: """Fetch valuation_score and earnings_proximity_days. - **valuation_score**: derived from the latest ``trend_windows`` confidence for the ticker (entity_type='company', entity_id=ticker). - **earnings_proximity_days**: days until the next earnings date from ``earnings_calendar``. Returns ``(valuation_score, earnings_proximity_days)`` with ``None`` sentinels for unavailable data. """ valuation_score: float | None = None earnings_proximity_days: int | None = None # --- Valuation score from trend_windows -------------------------------- try: row = await pool.fetchrow( "SELECT confidence FROM trend_windows " "WHERE entity_type = 'company' AND entity_id = $1 " "ORDER BY generated_at DESC LIMIT 1", ticker, ) if row is not None: valuation_score = float(row["confidence"]) else: logger.warning("%s: no trend_windows data — valuation_score=None", ticker) except Exception: logger.warning( "%s: failed to fetch valuation_score", ticker, exc_info=True ) # --- Earnings proximity from earnings_calendar ------------------------- try: row = await pool.fetchrow( "SELECT earnings_date FROM earnings_calendar " "WHERE ticker = $1 AND earnings_date >= CURRENT_DATE " "ORDER BY earnings_date ASC LIMIT 1", ticker, ) if row is not None: delta = row["earnings_date"] - datetime.now(timezone.utc).date() earnings_proximity_days = delta.days else: logger.warning( "%s: no upcoming earnings in calendar — earnings_proximity_days=None", ticker, ) except Exception: logger.warning( "%s: failed to fetch earnings_proximity_days", ticker, exc_info=True ) return valuation_score, earnings_proximity_days async def _fetch_macro_bias( pool: asyncpg.Pool, ticker: str, ) -> float: """Compute macro_bias for *ticker* from recent ``macro_impact_records``. Averages the numeric bias of the most recent impact records (up to 10) weighted by their confidence. The direction string is mapped to a float via ``_DIRECTION_TO_BIAS``. Returns 0.0 (neutral) when no records are found or on error. """ try: rows = await pool.fetch( "SELECT impact_direction, macro_impact_score, confidence " "FROM macro_impact_records " "WHERE ticker = $1 " "ORDER BY computed_at DESC LIMIT 10", ticker, ) if not rows: logger.warning("%s: no macro_impact_records — macro_bias=0.0", ticker) return 0.0 weighted_sum = 0.0 weight_total = 0.0 for row in rows: direction = row["impact_direction"] or "neutral" bias = _DIRECTION_TO_BIAS.get(direction, 0.0) score = float(row["macro_impact_score"] or 0.0) conf = float(row["confidence"] or 0.5) w = score * conf weighted_sum += bias * w weight_total += w if weight_total == 0.0: return 0.0 # Clamp to [-1.0, 1.0] raw = weighted_sum / weight_total return max(-1.0, min(1.0, raw)) except Exception: logger.warning("%s: failed to fetch macro_bias", ticker, exc_info=True) return 0.0 async def _fetch_open_positions( pool: asyncpg.Pool, ticker: str, ) -> list[OpenPositionState]: """Fetch open positions for *ticker* from ``position_stop_levels``. Joins with ``positions`` for current_price when available. Returns an empty list on error or when no positions exist. """ try: rows = await pool.fetch( "SELECT psl.id, psl.ticker, psl.entry_price, " " psl.stop_loss_price, psl.take_profit_price, " " psl.trailing_stop_active, psl.atr_value, " " psl.atr_multiplier, psl.reward_risk_ratio, " " COALESCE(p.current_price, psl.entry_price) AS current_price " "FROM position_stop_levels psl " "LEFT JOIN positions p ON p.ticker = psl.ticker " "WHERE psl.ticker = $1 AND psl.active = TRUE", ticker, ) positions: list[OpenPositionState] = [] for row in rows: entry = float(row["entry_price"]) current = float(row["current_price"]) stop = float(row["stop_loss_price"]) tp = float(row["take_profit_price"]) atr = float(row["atr_value"]) if row["atr_value"] else None rr = float(row["reward_risk_ratio"]) if row["reward_risk_ratio"] else 2.0 # Derive target_2 from reward-risk ratio if only one TP level target_1 = tp target_2 = entry + (tp - entry) * rr if rr > 1.0 else tp positions.append( OpenPositionState( position_id=str(row["id"]), ticker=row["ticker"], entry_price=entry, current_price=current, stop_loss=stop, target_1=target_1, target_2=target_2, trailing_stop=None, # computed by exit engine at runtime partial_exit_done=bool(row["trailing_stop_active"]), atr=atr, ) ) return positions except Exception: logger.warning( "%s: failed to fetch open positions", ticker, exc_info=True ) return [] # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- async def normalize_input( pool: asyncpg.Pool, ticker: str, config: SignalEngineConfig, ) -> NormalizedInput: """Fetch and assemble all data needed for a single evaluation tick. Sources: - OHLCV bars from ``market_snapshots`` (M30, H1, H4, D, W, M) - Fundamental metrics from ``trend_windows`` + ``earnings_calendar`` - Macro context from ``macro_impact_records`` - Open position state from ``position_stop_levels`` + ``positions`` Missing data sources produce sentinel values (``None`` / empty list) with a logged warning. The function never raises — it always returns a valid ``NormalizedInput``. Requirements: 1.1, 1.2, 1.3, 1.4, 1.5 """ now = datetime.now(timezone.utc) # Fetch all data sources concurrently for efficiency. # Each fetcher handles its own errors and returns sentinels on failure. bars_task = asyncio.create_task(_fetch_bars(pool, ticker)) fundamentals_task = asyncio.create_task(_fetch_fundamentals(pool, ticker)) macro_task = asyncio.create_task(_fetch_macro_bias(pool, ticker)) positions_task = asyncio.create_task(_fetch_open_positions(pool, ticker)) bars = await bars_task valuation_score, earnings_proximity_days = await fundamentals_task macro_bias = await macro_task open_positions = await positions_task # Validate monotonic timestamps within each timeframe (Req 1.4) for tf in TIMEFRAMES: bars[tf] = _validate_monotonic_timestamps(bars[tf], tf, ticker) # Compute closing_prices and returns from daily bars for regime # classification (used by the probabilistic pipeline). closing_prices: list[float] = [] returns: list[float] = [] daily = bars.get("D", []) if daily: closing_prices = [bar.close for bar in daily] if len(closing_prices) >= 2: returns = [ (closing_prices[i] - closing_prices[i - 1]) / closing_prices[i - 1] if closing_prices[i - 1] != 0 else 0.0 for i in range(1, len(closing_prices)) ] # Determine current_price from the latest close of the shortest # available timeframe. current_price: float | None = None for tf in TIMEFRAMES: # shortest first if bars[tf]: current_price = bars[tf][-1].close break return NormalizedInput( ticker=ticker, evaluated_at=now, bars=bars, valuation_score=valuation_score, earnings_proximity_days=earnings_proximity_days, macro_bias=macro_bias, open_positions=open_positions, closing_prices=closing_prices, returns=returns, current_price=current_price, )