diff --git a/infra/helm/stonks-oracle/templates/deployments.yaml b/infra/helm/stonks-oracle/templates/deployments.yaml index 4e56fe0..b0d3fd2 100644 --- a/infra/helm/stonks-oracle/templates/deployments.yaml +++ b/infra/helm/stonks-oracle/templates/deployments.yaml @@ -90,6 +90,80 @@ spec: volumeMounts: - name: tmp mountPath: /tmp + - name: backfill-market-data + image: {{ $root.Values.image.registry }}/{{ $svc.image }}:{{ $root.Values.image.tag }} + imagePullPolicy: {{ $root.Values.image.pullPolicy }} + command: ["sh", "-c"] + args: + - | + COUNT=$(PGPASSWORD="$POSTGRES_PASSWORD" psql \ + -h "$POSTGRES_HOST" -p "$POSTGRES_PORT" \ + -U "$POSTGRES_USER" -d "$POSTGRES_DB" \ + -tAc "SELECT count(*) FROM market_snapshots WHERE snapshot_type = 'bar'" 2>/dev/null || echo "0") + if [ "$COUNT" -lt "50" ] && [ -n "$MARKET_DATA_API_KEY" ]; then + echo "Only $COUNT market bars found — backfilling 90 days from Polygon..." + python -c " +import asyncio, asyncpg, hashlib, httpx, json, os +from datetime import date, datetime, timedelta, timezone + +async def backfill(): + api_key = os.environ.get('MARKET_DATA_API_KEY', '') + base_url = os.environ.get('MARKET_DATA_BASE_URL', 'https://api.polygon.io') + dsn = f'postgresql://{os.environ[\"POSTGRES_USER\"]}:{os.environ[\"POSTGRES_PASSWORD\"]}@{os.environ[\"POSTGRES_HOST\"]}:{os.environ[\"POSTGRES_PORT\"]}/{os.environ[\"POSTGRES_DB\"]}' + pool = await asyncpg.create_pool(dsn=dsn) + tickers = [r['ticker'] for r in await pool.fetch('SELECT ticker FROM companies WHERE active = TRUE ORDER BY ticker')] + to_d = date.today().isoformat() + from_d = (date.today() - timedelta(days=90)).isoformat() + total = 0 + async with httpx.AsyncClient(timeout=30) as client: + for ticker in tickers: + url = f'{base_url}/v2/aggs/ticker/{ticker}/range/1/day/{from_d}/{to_d}' + try: + resp = await client.get(url, params={'apiKey': api_key, 'adjusted': 'true', 'sort': 'asc', 'limit': '500'}) + resp.raise_for_status() + bars = resp.json().get('results', []) + except Exception as e: + print(f' {ticker}: fetch failed ({e})') + continue + existing = {r['bar_ts'] for r in await pool.fetch(\"SELECT DISTINCT (data->>'t')::bigint AS bar_ts FROM market_snapshots WHERE ticker = \$1 AND snapshot_type = 'bar'\", ticker) if r['bar_ts']} + co = await pool.fetchrow('SELECT id FROM companies WHERE ticker = \$1', ticker) + cid = co['id'] if co else None + inserted = 0 + for bar in bars: + bar_ts = bar.get('t') + if not bar_ts or bar_ts in existing: + continue + bj = json.dumps(bar) + ch = hashlib.sha256(bj.encode()).hexdigest() + ca = datetime.fromtimestamp(bar_ts / 1000, tz=timezone.utc) + await pool.execute('INSERT INTO market_snapshots (company_id, ticker, snapshot_type, data, source_provider, captured_at, content_hash) VALUES (\$1, \$2, \'bar\', \$3::jsonb, \'polygon_backfill\', \$4, \$5)', cid, ticker, bj, ca, ch) + existing.add(bar_ts) + inserted += 1 + total += inserted + if inserted: + print(f' {ticker}: {inserted} bars') + await pool.close() + print(f'Backfill complete: {total} bars across {len(tickers)} tickers') +asyncio.run(backfill()) +" + else + echo "Market data has $COUNT bars — skipping backfill." + fi + securityContext: + {{- include "stonks.containerSecurityContext" $root | nindent 12 }} + envFrom: + - configMapRef: + name: stonks-config + {{- range $svc.secrets }} + - secretRef: + name: {{ . }} + {{- end }} + resources: + requests: { cpu: 50m, memory: 64Mi } + limits: { cpu: 200m, memory: 256Mi } + volumeMounts: + - name: tmp + mountPath: /tmp {{- end }} containers: - name: {{ $svc.image }} diff --git a/services/trading/engine.py b/services/trading/engine.py index 5c9001b..fc47f59 100644 --- a/services/trading/engine.py +++ b/services/trading/engine.py @@ -778,10 +778,6 @@ class TradingEngine: continue # --- Buy path --- - # Set dedup key for buys - if self.redis is not None: - await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400) - # Check if we already hold this ticker — don't double up try: existing_pos = await self.pool.fetchrow( @@ -789,6 +785,9 @@ class TradingEngine: ticker, ) if existing_pos: + # Permanent skip — safe to dedup + if self.redis is not None: + await self.redis.set(trading_dedupe_key(rec_id), "1", ex=86400) continue except Exception: pass @@ -864,6 +863,19 @@ class TradingEngine: # Persist decision await self._persist_decision(decision) + # Set dedup key only for permanent outcomes (act or + # non-retryable skips). Do NOT dedup + # outside_trading_window — those should be retried + # when the market opens. + retryable_skips = {"outside_trading_window"} + if decision.skip_reason not in retryable_skips: + if self.redis is not None: + await self.redis.set( + trading_dedupe_key(rec_id), "1", ex=86400, + ) + if rec_id: + self.processed_recommendation_ids.add(rec_id) + except Exception: logger.exception("Error evaluating recommendation %s", rec.get("recommendation_id", "?"))