Files

21 KiB
Raw Permalink Blame History

Implementation Plan: Competitive Intelligence & Historical Pattern Matching Layer

Overview

This plan implements a third signal layer for the Stonks Oracle aggregation engine: competitive intelligence and historical pattern matching. The layer mines existing PostgreSQL data (document_impact_records, trend_windows, document_company_mentions) to identify how similar catalyst types resolved historically for a company and its competitors, then feeds pattern-based signals into the aggregation engine alongside company-specific (layer 1) and macro (layer 2) signals. All modules extend existing services — no new Kubernetes deployments required. Tasks are ordered so each step builds on the previous, with property-based tests validating core logic early.

Tasks

  • 1. Database migration and shared schemas

    • 1.1 Create PostgreSQL migration infra/migrations/017_competitive_historical_patterns.sql

      • Add competitor_relationships table with id (UUID PK), company_a_id (FK companies), company_b_id (FK companies), relationship_type (VARCHAR CHECK direct_rival|same_sector|overlapping_products|supply_chain_adjacent), strength (FLOAT CHECK [0,1]), bidirectional (BOOLEAN), source (VARCHAR CHECK manual|inferred), active (BOOLEAN), created_at, updated_at
      • Add competitive_signal_records table with id (UUID PK), source_document_id (FK documents), source_ticker, target_ticker, catalyst_type, pattern_confidence, signal_direction, signal_strength, relationship_strength, computed_at
      • Add CHECK constraint preventing self-referencing relationships (company_a_id != company_b_id)
      • Add unique index on (LEAST(company_a_id, company_b_id), GREATEST(company_a_id, company_b_id)) WHERE active = TRUE to prevent duplicate active pairs
      • Add indexes: idx_competitor_rel_company_a, idx_competitor_rel_company_b (both WHERE active = TRUE), idx_competitive_signals_target (target_ticker, computed_at DESC), idx_competitive_signals_source (source_ticker, computed_at DESC)
      • Requirements: 7.1, 7.2
    • 1.2 Add new Pydantic schemas and enums to services/shared/schemas.py

      • Add RelationshipType enum (direct_rival, same_sector, overlapping_products, supply_chain_adjacent)
      • Add CatalystTier enum (major_corporate_decision, routine_signal)
      • Add MAJOR_DECISION_CATALYSTS frozenset (m_and_a, legal, restructuring, leadership_change, strategic_pivot, buyback, dividend_change)
      • Add CompetitorRelationshipSchema, CompetitiveSignalRecordSchema, HistoricalPatternSchema Pydantic models
      • Requirements: 1.1, 4.4, 7.1, 7.2, 11.1
    • 1.3 Add competitive configuration fields to services/shared/config.py

      • Add CompetitiveConfig dataclass with fields: competitive_signal_weight (0.2), competitive_enabled (True), pattern_confidence_threshold (0.3), propagation_strength_threshold (0.2), routine_lookback_days (180), major_decision_lookback_days (365), major_decision_weight_multiplier (1.3), staleness_window_days (180), staleness_recent_days (90), staleness_decay_penalty (0.5), min_pattern_samples (3)
      • Add competitive: CompetitiveConfig to AppConfig with env var loading in load_config()
      • Requirements: 5.6, 6.1, 9.1, 9.2, 11.2, 11.3
  • 2. Checkpoint — Ensure migration and schemas are consistent

    • Ensure all tests pass, ask the user if questions arise.
  • 3. Competitor Registry and auto-inference

    • 3.1 Implement services/symbol_registry/competitors.py

      • Implement CompetitorRelationshipCreate and CompetitorRelationship Pydantic models for API request/response
      • Implement POST /companies/{company_id}/competitors — create relationship with audit event
      • Implement GET /companies/{company_id}/competitors — list active relationships ordered by strength descending
      • Implement PUT /companies/{company_id}/competitors/{relationship_id} — update relationship with audit event recording previous state
      • Implement DELETE /companies/{company_id}/competitors/{relationship_id} — soft-delete (set active=False), preserve row
      • Register routes as a FastAPI router on the Symbol Registry app
      • Handle error cases: self-referencing (400), duplicate active pair (409), non-existent company (404)
      • Requirements: 1.1, 1.2, 1.3, 1.4, 1.5
    • 3.2 Write property test for competitor relationship persistence round-trip

      • Property 1: Competitor relationship persistence round-trip
      • Validates: Requirements 1.1, 7.1
    • 3.3 Write property test for competitor query completeness and ordering

      • Property 2: Competitor query completeness and ordering
      • Validates: Requirements 1.2
    • 3.4 Write property test for soft-delete preserves row

      • Property 3: Soft-delete preserves row
      • Validates: Requirements 1.3
    • 3.5 Implement services/symbol_registry/competitor_inference.py

      • Implement infer_competitors(pool, company_id) -> list[CompetitorRelationship]
      • Query companies sharing the same sector and industry
      • Rank candidates by co-mention frequency in document_company_mentions
      • Compute strength = 0.3 * sector_match + 0.7 * normalized_co_mention_count
      • Upsert relationships with source='inferred', refreshing strength on re-inference (no duplicates)
      • Implement POST /companies/{company_id}/competitors/infer endpoint returning candidate relationships
      • Requirements: 2.1, 2.2, 2.3, 2.4, 2.5
    • 3.6 Write property test for auto-inference produces valid candidates

      • Property 4: Auto-inference produces valid candidates
      • Validates: Requirements 2.1, 2.3
    • 3.7 Write property test for auto-inference ranks by co-mention frequency

      • Property 5: Auto-inference ranks by co-mention frequency
      • Validates: Requirements 2.2
    • 3.8 Write property test for auto-inference idempotence

      • Property 6: Auto-inference idempotence
      • Validates: Requirements 2.4
  • 4. Checkpoint — Ensure competitor registry and inference work correctly

    • Ensure all tests pass, ask the user if questions arise.
  • 5. Pattern Matcher — core historical pattern mining

    • 5.1 Implement services/aggregation/pattern_matcher.py

      • Implement HistoricalPattern dataclass matching the design specification
      • Implement classify_catalyst_tier(catalyst_type) -> str — deterministic mapping of major_corporate_decision vs routine_signal catalyst types
      • Implement compute_pattern_confidence(sample_count, outcome_consistency, data_recency_days, tier) -> float using the formula: sample_factor * 0.4 + consistency * 0.4 + recency_factor * 0.2, with 1.3× multiplier for major decisions
      • Implement find_self_patterns(pool, ticker, catalyst_type, horizons) -> list[HistoricalPattern] — query document_impact_records joined with trend_windows for same company-catalyst pair across configurable time horizons (1d, 7d, 30d)
      • Implement find_cross_company_patterns(pool, source_ticker, target_ticker, catalyst_type, horizons) -> list[HistoricalPattern] — query cross-company historical patterns
      • Only consider records linked to document_intelligence with validation_status='valid' and documents with status != 'rejected'
      • Apply insufficient data threshold: when sample_count < 3, cap confidence at 0.25 and set insufficient_data=True
      • Apply staleness decay: when no instances in last 90 days and all data older than 180 days, apply 0.5 decay penalty
      • Use 365-day lookback for major_corporate_decision catalysts, 180-day for routine_signal
      • Compute separate HistoricalPatterns for each catalyst tier
      • Requirements: 3.1, 3.2, 3.3, 3.4, 3.5, 11.1, 11.2, 11.3, 11.5
    • 5.2 Write property test for pattern computation correctness

      • Property 7: Pattern computation correctness
      • Validates: Requirements 3.1, 3.2, 4.2
    • 5.3 Write property test for pattern confidence monotonicity

      • Property 8: Pattern confidence monotonicity
      • Validates: Requirements 3.3, 11.2
    • 5.4 Write property test for insufficient data threshold

      • Property 9: Insufficient data threshold
      • Validates: Requirements 3.4
    • 5.5 Write property test for valid-only data filtering

      • Property 10: Valid-only data filtering
      • Validates: Requirements 3.5
    • 5.6 Write property test for catalyst tier classification determinism

      • Property 19: Catalyst tier classification determinism
      • Validates: Requirements 11.1
    • 5.7 Write property test for major decision extended lookback

      • Property 20: Major decision extended lookback
      • Validates: Requirements 11.3, 11.5
  • 6. Checkpoint — Ensure pattern matcher and property tests pass

    • Ensure all tests pass, ask the user if questions arise.
  • 7. Signal Propagation Engine

    • 7.1 Implement services/aggregation/signal_propagation.py

      • Implement CompetitiveSignalRecord dataclass matching the design specification
      • Implement propagate_signals(pool, ticker, catalyst_type, impact_score, document_id, config) -> list[CompetitiveSignalRecord] — look up competitors, query cross-company patterns, produce weighted competitive signals
      • Signal weighting: signal_strength = pattern.avg_strength * relationship.strength * pattern.pattern_confidence * source_impact_score
      • Signal direction: bullish if pattern.bullish_pct > bearish_pct, else bearish
      • Skip propagation when relationship.strength < propagation_strength_threshold (default 0.2), log skip reason
      • Exclude patterns with pattern_confidence < pattern_confidence_threshold (default 0.3), log exclusion reason
      • Persist CompetitiveSignalRecord objects to the competitive_signal_records PostgreSQL table
      • Implement build_pattern_weighted_signals(patterns, competitive_signals, reference_time, window, config) -> list[WeightedSignal] — convert pattern/competitive signals to WeightedSignal objects for aggregation
      • Requirements: 4.1, 4.2, 4.3, 4.4, 4.5, 9.1
    • 7.2 Write property test for competitive signal strength monotonicity

      • Property 11: Competitive signal strength monotonicity
      • Validates: Requirements 4.3
    • 7.3 Write property test for signal propagation threshold gating

      • Property 12: Signal propagation threshold gating
      • Validates: Requirements 4.5, 9.1
    • 7.4 Write property test for pattern signal to WeightedSignal conversion

      • Property 13: Pattern signal to WeightedSignal conversion
      • Validates: Requirements 5.2
    • 7.5 Write property test for competitive signal persistence round-trip

      • Property 21: Competitive signal persistence round-trip
      • Validates: Requirements 4.4, 7.2
  • 8. Checkpoint — Ensure signal propagation and property tests pass

    • Ensure all tests pass, ask the user if questions arise.
  • 9. Aggregation engine integration

    • 9.1 Extend services/aggregation/worker.py to incorporate pattern-based and competitive signals

      • Add competitive_signal_weight and competitive_enabled fields to AggregationConfig
      • In aggregate_company_window, check competitive toggle state from risk_configs table (same pattern as macro toggle)
      • When competitive layer is enabled: query self-company historical patterns for active catalyst types in the window, query competitive signals targeting this ticker
      • Convert each pattern signal to a WeightedSignal using: document_id = source document, sentiment_value = +1.0 (bullish) or -1.0 (bearish), impact_score = signal_strength × competitive_signal_weight, recency decay from source document publication time, confidence gating from pattern_confidence
      • Merge pattern/competitive signals with company-specific and macro signals before computing trend direction, strength, confidence, and contradiction score
      • Include contributing source_document_ids in evidence references for traceability
      • When competitive layer is disabled or no pattern data exists, produce identical output to company+macro-only aggregation
      • Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6
    • 9.2 Write property test for pattern-company contradiction detection

      • Property 14: Pattern-company contradiction detection
      • Validates: Requirements 5.3
    • 9.3 Write property test for pattern evidence traceability

      • Property 15: Pattern evidence traceability
      • Validates: Requirements 5.4
    • 9.4 Write property test for no-degradation and disabled-layer equivalence

      • Property 16: No-degradation and disabled-layer equivalence
      • Validates: Requirements 5.5, 6.2
    • 9.5 Write property test for staleness decay penalty

      • Property 17: Staleness decay penalty
      • Validates: Requirements 9.2
  • 10. Checkpoint — Ensure aggregation integration works correctly

    • Ensure all tests pass, ask the user if questions arise.
  • 11. Pattern-only suppression and safety

    • 11.1 Extend services/recommendation/suppression.py with pattern-only suppression

      • Add PATTERN_ONLY_SIGNAL = "pattern_only_signal" to SuppressionReason enum
      • Implement evaluate_pattern_only_suppression(summary, pattern_signal_count, company_signal_count, macro_signal_count) -> bool
      • When pattern-based signals are the sole basis for a trend direction change, force recommendation to mode='informational' and append pattern-only caveat to thesis
      • Requirements: 9.3
    • 11.2 Write property test for pattern-only suppression

      • Property 18: Pattern-only suppression
      • Validates: Requirements 9.3
  • 12. Competitive layer toggle and API endpoints

    • 12.1 Implement competitive toggle and status endpoints in services/api/app.py

      • Add GET /api/admin/competitive/status returning current enabled/disabled state from risk_configs table
      • Add PUT /api/admin/competitive/toggle to switch competitive layer on/off, persisting to risk_configs and recording an audit event with previous state, new state, and operator
      • Toggle state is read from PostgreSQL at the start of each aggregation cycle (no caching)
      • When disabled, pattern mining remains queryable via API but signal propagation is skipped during aggregation
      • When re-enabled, resume computing signals using latest historical data including intelligence ingested while disabled
      • Requirements: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
    • 12.2 Implement pattern and competitive signal query endpoints in services/api/app.py

      • Add GET /api/patterns/{ticker} — historical patterns for a company, filterable by catalyst_type and time_horizon
      • Add GET /api/patterns/{ticker}/competitors — cross-company patterns showing how this company's catalysts affected competitors
      • Add GET /api/patterns/{ticker}/competitive-signals — recent competitive signals targeting this company
      • Add GET /api/patterns/{ticker}/decisions — major corporate decision history with trend outcomes and pattern statistics
      • Include sample_count, outcome distribution, pattern_confidence, and date range in all responses
      • Requirements: 10.1, 10.2, 10.3, 10.4, 11.4, 11.6
  • 13. Checkpoint — Ensure API endpoints and toggle logic work correctly

    • Ensure all tests pass, ask the user if questions arise.
  • 14. Lake publisher extensions

    • 14.1 Add competitive fact publishers to the lake publisher service
      • Implement publish_competitor_relationship_fact writing partitioned Parquet datasets to stonks-lakehouse/warehouse/competitor_relationships/dt={date}/
      • Implement publish_competitive_signal_fact writing partitioned Parquet datasets to stonks-lakehouse/warehouse/competitive_signals/dt={date}/target_ticker={ticker}/
      • Register new fact types in the lake publisher's job processing loop
      • Requirements: 7.3, 7.4
  • 15. Signal propagation wiring into aggregation pipeline

    • 15.1 Wire signal propagation into the aggregation worker

      • After document intelligence is produced for a company, trigger signal propagation for the company's competitors
      • In the aggregation cycle, call propagate_signals for each new document intelligence record when competitive layer is enabled
      • Handle sustained propagation errors: after configurable threshold (default 5 consecutive failures), alert operators and continue with company-specific + macro signals only
      • Requirements: 4.1, 9.4
    • 15.2 Wire pattern mining into the aggregation cycle

      • During aggregate_company_window, call pattern matcher for self-company patterns and collect competitive signals for the ticker
      • Merge resulting WeightedSignals into the signal list before trend computation
      • Ensure evidence references include pattern signal source document IDs
      • Requirements: 5.1, 5.4
  • 16. Checkpoint — Ensure full backend pipeline works end-to-end

    • Ensure all tests pass, ask the user if questions arise.
  • 17. Dashboard — Competitors panel and historical patterns

    • 17.1 Add competitors panel to Company Detail page

      • On frontend/src/pages/CompanyDetail.tsx, add a Competitors tab showing active competitor relationships with ticker, relationship_type, strength score, source (manual/inferred)
      • Add API hooks for GET /companies/{company_id}/competitors in frontend/src/api/hooks.ts
      • Add infer button triggering POST /companies/{company_id}/competitors/infer
      • Requirements: 8.1
    • 17.2 Add historical patterns panel to Company Detail page

      • On frontend/src/pages/CompanyDetail.tsx, add a Historical Patterns tab showing recent patterns: catalyst_type, outcome distribution (bullish_pct, bearish_pct), sample_count, pattern_confidence
      • Add API hook for GET /api/patterns/{ticker}
      • Requirements: 8.2
    • 17.3 Add competitive signals panel to Company Detail page

      • On frontend/src/pages/CompanyDetail.tsx, add a Competitive Signals tab showing incoming signals: source ticker, catalyst_type, signal_direction, signal_strength
      • Add API hook for GET /api/patterns/{ticker}/competitive-signals
      • Click-through on a signal shows full detail: source company, source document, catalyst_type, historical pattern statistics, competitor relationship
      • Requirements: 8.5, 8.4
    • 17.4 Add corporate decision timeline to Company Detail page

      • On frontend/src/pages/CompanyDetail.tsx, add a Decisions tab showing major_corporate_decision events: catalyst type, date, summary, trend outcome that followed
      • Add API hook for GET /api/patterns/{ticker}/decisions
      • Requirements: 11.4
    • 17.5 Add pattern-based evidence indicators to Trend detail page

      • On frontend/src/pages/TrendDetail.tsx, visually distinguish pattern-based and competitive signal evidence from company-specific and macro evidence (badge/icon differentiation)
      • Requirements: 8.3
    • 17.6 Add competitive toggle to Trading Controls page

      • On frontend/src/pages/Trading.tsx, add competitive signal layer enable/disable switch alongside existing macro toggle, with confirmation dialog
      • Add API hooks for GET /api/admin/competitive/status and PUT /api/admin/competitive/toggle
      • Requirements: 6.6
  • 18. Checkpoint — Ensure frontend pages render and integrate with API

    • Ensure all tests pass, ask the user if questions arise.
  • 19. Integration wiring and final validation

    • 19.1 Write integration tests for competitive pipeline end-to-end

      • Test document intelligence → pattern mining → signal propagation → aggregation flow
      • Test lake publisher writes correct Parquet partitions for competitor relationships and competitive signals
      • Test competitive toggle state change propagates to next aggregation cycle
      • Test toggle disable/re-enable cycle preserves data integrity
      • Requirements: 4.1, 5.1, 6.1, 6.4, 7.3
    • 19.2 Write unit tests for API endpoints and dashboard components

      • Test competitor CRUD endpoints return correct data and error codes (400, 404, 409)
      • Test pattern query endpoints return correct data with filtering
      • Test competitive toggle endpoint persists state and records audit event
      • Test auto-inference endpoint with empty data, single company, no co-mentions
      • Add MSW handlers for competitive endpoints in frontend/src/test/mocks/handlers.ts
      • Test competitors panel, historical patterns panel, competitive signals panel, and decision timeline render correctly
      • Requirements: 1.4, 2.5, 6.5, 8.1, 8.2, 8.5, 10.1, 10.4
  • 20. Final checkpoint — Ensure all tests pass

    • Ensure all tests pass, ask the user if questions arise.

Notes

  • Tasks marked with * are optional and can be skipped for faster MVP
  • Each task references specific requirements for traceability
  • Checkpoints ensure incremental validation after each major phase
  • Property tests validate the 21 correctness properties from the design using Hypothesis
  • The design uses Python throughout — no language selection needed
  • No new Kubernetes deployments required; all modules extend existing services
  • Next migration number is 017 (016 is global-news-interpolation)
  • Competitive layer follows the same toggle/suppression/aggregation pattern as the macro layer for consistency