21 KiB
Implementation Plan: Competitive Intelligence & Historical Pattern Matching Layer
Overview
This plan implements a third signal layer for the Stonks Oracle aggregation engine: competitive intelligence and historical pattern matching. The layer mines existing PostgreSQL data (document_impact_records, trend_windows, document_company_mentions) to identify how similar catalyst types resolved historically for a company and its competitors, then feeds pattern-based signals into the aggregation engine alongside company-specific (layer 1) and macro (layer 2) signals. All modules extend existing services — no new Kubernetes deployments required. Tasks are ordered so each step builds on the previous, with property-based tests validating core logic early.
Tasks
-
1. Database migration and shared schemas
-
1.1 Create PostgreSQL migration
infra/migrations/017_competitive_historical_patterns.sql- Add
competitor_relationshipstable with id (UUID PK), company_a_id (FK companies), company_b_id (FK companies), relationship_type (VARCHAR CHECK direct_rival|same_sector|overlapping_products|supply_chain_adjacent), strength (FLOAT CHECK [0,1]), bidirectional (BOOLEAN), source (VARCHAR CHECK manual|inferred), active (BOOLEAN), created_at, updated_at - Add
competitive_signal_recordstable with id (UUID PK), source_document_id (FK documents), source_ticker, target_ticker, catalyst_type, pattern_confidence, signal_direction, signal_strength, relationship_strength, computed_at - Add CHECK constraint preventing self-referencing relationships (company_a_id != company_b_id)
- Add unique index on (LEAST(company_a_id, company_b_id), GREATEST(company_a_id, company_b_id)) WHERE active = TRUE to prevent duplicate active pairs
- Add indexes: idx_competitor_rel_company_a, idx_competitor_rel_company_b (both WHERE active = TRUE), idx_competitive_signals_target (target_ticker, computed_at DESC), idx_competitive_signals_source (source_ticker, computed_at DESC)
- Requirements: 7.1, 7.2
- Add
-
1.2 Add new Pydantic schemas and enums to
services/shared/schemas.py- Add
RelationshipTypeenum (direct_rival, same_sector, overlapping_products, supply_chain_adjacent) - Add
CatalystTierenum (major_corporate_decision, routine_signal) - Add
MAJOR_DECISION_CATALYSTSfrozenset (m_and_a, legal, restructuring, leadership_change, strategic_pivot, buyback, dividend_change) - Add
CompetitorRelationshipSchema,CompetitiveSignalRecordSchema,HistoricalPatternSchemaPydantic models - Requirements: 1.1, 4.4, 7.1, 7.2, 11.1
- Add
-
1.3 Add competitive configuration fields to
services/shared/config.py- Add
CompetitiveConfigdataclass with fields: competitive_signal_weight (0.2), competitive_enabled (True), pattern_confidence_threshold (0.3), propagation_strength_threshold (0.2), routine_lookback_days (180), major_decision_lookback_days (365), major_decision_weight_multiplier (1.3), staleness_window_days (180), staleness_recent_days (90), staleness_decay_penalty (0.5), min_pattern_samples (3) - Add
competitive: CompetitiveConfigtoAppConfigwith env var loading inload_config() - Requirements: 5.6, 6.1, 9.1, 9.2, 11.2, 11.3
- Add
-
-
2. Checkpoint — Ensure migration and schemas are consistent
- Ensure all tests pass, ask the user if questions arise.
-
3. Competitor Registry and auto-inference
-
3.1 Implement
services/symbol_registry/competitors.py- Implement
CompetitorRelationshipCreateandCompetitorRelationshipPydantic models for API request/response - Implement
POST /companies/{company_id}/competitors— create relationship with audit event - Implement
GET /companies/{company_id}/competitors— list active relationships ordered by strength descending - Implement
PUT /companies/{company_id}/competitors/{relationship_id}— update relationship with audit event recording previous state - Implement
DELETE /companies/{company_id}/competitors/{relationship_id}— soft-delete (set active=False), preserve row - Register routes as a FastAPI router on the Symbol Registry app
- Handle error cases: self-referencing (400), duplicate active pair (409), non-existent company (404)
- Requirements: 1.1, 1.2, 1.3, 1.4, 1.5
- Implement
-
3.2 Write property test for competitor relationship persistence round-trip
- Property 1: Competitor relationship persistence round-trip
- Validates: Requirements 1.1, 7.1
-
3.3 Write property test for competitor query completeness and ordering
- Property 2: Competitor query completeness and ordering
- Validates: Requirements 1.2
-
3.4 Write property test for soft-delete preserves row
- Property 3: Soft-delete preserves row
- Validates: Requirements 1.3
-
3.5 Implement
services/symbol_registry/competitor_inference.py- Implement
infer_competitors(pool, company_id) -> list[CompetitorRelationship] - Query companies sharing the same sector and industry
- Rank candidates by co-mention frequency in
document_company_mentions - Compute strength =
0.3 * sector_match + 0.7 * normalized_co_mention_count - Upsert relationships with
source='inferred', refreshing strength on re-inference (no duplicates) - Implement
POST /companies/{company_id}/competitors/inferendpoint returning candidate relationships - Requirements: 2.1, 2.2, 2.3, 2.4, 2.5
- Implement
-
3.6 Write property test for auto-inference produces valid candidates
- Property 4: Auto-inference produces valid candidates
- Validates: Requirements 2.1, 2.3
-
3.7 Write property test for auto-inference ranks by co-mention frequency
- Property 5: Auto-inference ranks by co-mention frequency
- Validates: Requirements 2.2
-
3.8 Write property test for auto-inference idempotence
- Property 6: Auto-inference idempotence
- Validates: Requirements 2.4
-
-
4. Checkpoint — Ensure competitor registry and inference work correctly
- Ensure all tests pass, ask the user if questions arise.
-
5. Pattern Matcher — core historical pattern mining
-
5.1 Implement
services/aggregation/pattern_matcher.py- Implement
HistoricalPatterndataclass matching the design specification - Implement
classify_catalyst_tier(catalyst_type) -> str— deterministic mapping of major_corporate_decision vs routine_signal catalyst types - Implement
compute_pattern_confidence(sample_count, outcome_consistency, data_recency_days, tier) -> floatusing the formula:sample_factor * 0.4 + consistency * 0.4 + recency_factor * 0.2, with 1.3× multiplier for major decisions - Implement
find_self_patterns(pool, ticker, catalyst_type, horizons) -> list[HistoricalPattern]— query document_impact_records joined with trend_windows for same company-catalyst pair across configurable time horizons (1d, 7d, 30d) - Implement
find_cross_company_patterns(pool, source_ticker, target_ticker, catalyst_type, horizons) -> list[HistoricalPattern]— query cross-company historical patterns - Only consider records linked to document_intelligence with validation_status='valid' and documents with status != 'rejected'
- Apply insufficient data threshold: when sample_count < 3, cap confidence at 0.25 and set insufficient_data=True
- Apply staleness decay: when no instances in last 90 days and all data older than 180 days, apply 0.5 decay penalty
- Use 365-day lookback for major_corporate_decision catalysts, 180-day for routine_signal
- Compute separate HistoricalPatterns for each catalyst tier
- Requirements: 3.1, 3.2, 3.3, 3.4, 3.5, 11.1, 11.2, 11.3, 11.5
- Implement
-
5.2 Write property test for pattern computation correctness
- Property 7: Pattern computation correctness
- Validates: Requirements 3.1, 3.2, 4.2
-
5.3 Write property test for pattern confidence monotonicity
- Property 8: Pattern confidence monotonicity
- Validates: Requirements 3.3, 11.2
-
5.4 Write property test for insufficient data threshold
- Property 9: Insufficient data threshold
- Validates: Requirements 3.4
-
5.5 Write property test for valid-only data filtering
- Property 10: Valid-only data filtering
- Validates: Requirements 3.5
-
5.6 Write property test for catalyst tier classification determinism
- Property 19: Catalyst tier classification determinism
- Validates: Requirements 11.1
-
5.7 Write property test for major decision extended lookback
- Property 20: Major decision extended lookback
- Validates: Requirements 11.3, 11.5
-
-
6. Checkpoint — Ensure pattern matcher and property tests pass
- Ensure all tests pass, ask the user if questions arise.
-
7. Signal Propagation Engine
-
7.1 Implement
services/aggregation/signal_propagation.py- Implement
CompetitiveSignalRecorddataclass matching the design specification - Implement
propagate_signals(pool, ticker, catalyst_type, impact_score, document_id, config) -> list[CompetitiveSignalRecord]— look up competitors, query cross-company patterns, produce weighted competitive signals - Signal weighting:
signal_strength = pattern.avg_strength * relationship.strength * pattern.pattern_confidence * source_impact_score - Signal direction: bullish if pattern.bullish_pct > bearish_pct, else bearish
- Skip propagation when relationship.strength < propagation_strength_threshold (default 0.2), log skip reason
- Exclude patterns with pattern_confidence < pattern_confidence_threshold (default 0.3), log exclusion reason
- Persist CompetitiveSignalRecord objects to the competitive_signal_records PostgreSQL table
- Implement
build_pattern_weighted_signals(patterns, competitive_signals, reference_time, window, config) -> list[WeightedSignal]— convert pattern/competitive signals to WeightedSignal objects for aggregation - Requirements: 4.1, 4.2, 4.3, 4.4, 4.5, 9.1
- Implement
-
7.2 Write property test for competitive signal strength monotonicity
- Property 11: Competitive signal strength monotonicity
- Validates: Requirements 4.3
-
7.3 Write property test for signal propagation threshold gating
- Property 12: Signal propagation threshold gating
- Validates: Requirements 4.5, 9.1
-
7.4 Write property test for pattern signal to WeightedSignal conversion
- Property 13: Pattern signal to WeightedSignal conversion
- Validates: Requirements 5.2
-
7.5 Write property test for competitive signal persistence round-trip
- Property 21: Competitive signal persistence round-trip
- Validates: Requirements 4.4, 7.2
-
-
8. Checkpoint — Ensure signal propagation and property tests pass
- Ensure all tests pass, ask the user if questions arise.
-
9. Aggregation engine integration
-
9.1 Extend
services/aggregation/worker.pyto incorporate pattern-based and competitive signals- Add
competitive_signal_weightandcompetitive_enabledfields toAggregationConfig - In
aggregate_company_window, check competitive toggle state fromrisk_configstable (same pattern as macro toggle) - When competitive layer is enabled: query self-company historical patterns for active catalyst types in the window, query competitive signals targeting this ticker
- Convert each pattern signal to a
WeightedSignalusing: document_id = source document, sentiment_value = +1.0 (bullish) or -1.0 (bearish), impact_score = signal_strength × competitive_signal_weight, recency decay from source document publication time, confidence gating from pattern_confidence - Merge pattern/competitive signals with company-specific and macro signals before computing trend direction, strength, confidence, and contradiction score
- Include contributing source_document_ids in evidence references for traceability
- When competitive layer is disabled or no pattern data exists, produce identical output to company+macro-only aggregation
- Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6
- Add
-
9.2 Write property test for pattern-company contradiction detection
- Property 14: Pattern-company contradiction detection
- Validates: Requirements 5.3
-
9.3 Write property test for pattern evidence traceability
- Property 15: Pattern evidence traceability
- Validates: Requirements 5.4
-
9.4 Write property test for no-degradation and disabled-layer equivalence
- Property 16: No-degradation and disabled-layer equivalence
- Validates: Requirements 5.5, 6.2
-
9.5 Write property test for staleness decay penalty
- Property 17: Staleness decay penalty
- Validates: Requirements 9.2
-
-
10. Checkpoint — Ensure aggregation integration works correctly
- Ensure all tests pass, ask the user if questions arise.
-
11. Pattern-only suppression and safety
-
11.1 Extend
services/recommendation/suppression.pywith pattern-only suppression- Add
PATTERN_ONLY_SIGNAL = "pattern_only_signal"toSuppressionReasonenum - Implement
evaluate_pattern_only_suppression(summary, pattern_signal_count, company_signal_count, macro_signal_count) -> bool - When pattern-based signals are the sole basis for a trend direction change, force recommendation to
mode='informational'and append pattern-only caveat to thesis - Requirements: 9.3
- Add
-
11.2 Write property test for pattern-only suppression
- Property 18: Pattern-only suppression
- Validates: Requirements 9.3
-
-
12. Competitive layer toggle and API endpoints
-
12.1 Implement competitive toggle and status endpoints in
services/api/app.py- Add
GET /api/admin/competitive/statusreturning current enabled/disabled state fromrisk_configstable - Add
PUT /api/admin/competitive/toggleto switch competitive layer on/off, persisting torisk_configsand recording an audit event with previous state, new state, and operator - Toggle state is read from PostgreSQL at the start of each aggregation cycle (no caching)
- When disabled, pattern mining remains queryable via API but signal propagation is skipped during aggregation
- When re-enabled, resume computing signals using latest historical data including intelligence ingested while disabled
- Requirements: 6.1, 6.2, 6.3, 6.4, 6.5, 6.7
- Add
-
12.2 Implement pattern and competitive signal query endpoints in
services/api/app.py- Add
GET /api/patterns/{ticker}— historical patterns for a company, filterable by catalyst_type and time_horizon - Add
GET /api/patterns/{ticker}/competitors— cross-company patterns showing how this company's catalysts affected competitors - Add
GET /api/patterns/{ticker}/competitive-signals— recent competitive signals targeting this company - Add
GET /api/patterns/{ticker}/decisions— major corporate decision history with trend outcomes and pattern statistics - Include sample_count, outcome distribution, pattern_confidence, and date range in all responses
- Requirements: 10.1, 10.2, 10.3, 10.4, 11.4, 11.6
- Add
-
-
13. Checkpoint — Ensure API endpoints and toggle logic work correctly
- Ensure all tests pass, ask the user if questions arise.
-
14. Lake publisher extensions
- 14.1 Add competitive fact publishers to the lake publisher service
- Implement
publish_competitor_relationship_factwriting partitioned Parquet datasets tostonks-lakehouse/warehouse/competitor_relationships/dt={date}/ - Implement
publish_competitive_signal_factwriting partitioned Parquet datasets tostonks-lakehouse/warehouse/competitive_signals/dt={date}/target_ticker={ticker}/ - Register new fact types in the lake publisher's job processing loop
- Requirements: 7.3, 7.4
- Implement
- 14.1 Add competitive fact publishers to the lake publisher service
-
15. Signal propagation wiring into aggregation pipeline
-
15.1 Wire signal propagation into the aggregation worker
- After document intelligence is produced for a company, trigger signal propagation for the company's competitors
- In the aggregation cycle, call
propagate_signalsfor each new document intelligence record when competitive layer is enabled - Handle sustained propagation errors: after configurable threshold (default 5 consecutive failures), alert operators and continue with company-specific + macro signals only
- Requirements: 4.1, 9.4
-
15.2 Wire pattern mining into the aggregation cycle
- During
aggregate_company_window, call pattern matcher for self-company patterns and collect competitive signals for the ticker - Merge resulting WeightedSignals into the signal list before trend computation
- Ensure evidence references include pattern signal source document IDs
- Requirements: 5.1, 5.4
- During
-
-
16. Checkpoint — Ensure full backend pipeline works end-to-end
- Ensure all tests pass, ask the user if questions arise.
-
17. Dashboard — Competitors panel and historical patterns
-
17.1 Add competitors panel to Company Detail page
- On
frontend/src/pages/CompanyDetail.tsx, add a Competitors tab showing active competitor relationships with ticker, relationship_type, strength score, source (manual/inferred) - Add API hooks for
GET /companies/{company_id}/competitorsinfrontend/src/api/hooks.ts - Add infer button triggering
POST /companies/{company_id}/competitors/infer - Requirements: 8.1
- On
-
17.2 Add historical patterns panel to Company Detail page
- On
frontend/src/pages/CompanyDetail.tsx, add a Historical Patterns tab showing recent patterns: catalyst_type, outcome distribution (bullish_pct, bearish_pct), sample_count, pattern_confidence - Add API hook for
GET /api/patterns/{ticker} - Requirements: 8.2
- On
-
17.3 Add competitive signals panel to Company Detail page
- On
frontend/src/pages/CompanyDetail.tsx, add a Competitive Signals tab showing incoming signals: source ticker, catalyst_type, signal_direction, signal_strength - Add API hook for
GET /api/patterns/{ticker}/competitive-signals - Click-through on a signal shows full detail: source company, source document, catalyst_type, historical pattern statistics, competitor relationship
- Requirements: 8.5, 8.4
- On
-
17.4 Add corporate decision timeline to Company Detail page
- On
frontend/src/pages/CompanyDetail.tsx, add a Decisions tab showing major_corporate_decision events: catalyst type, date, summary, trend outcome that followed - Add API hook for
GET /api/patterns/{ticker}/decisions - Requirements: 11.4
- On
-
17.5 Add pattern-based evidence indicators to Trend detail page
- On
frontend/src/pages/TrendDetail.tsx, visually distinguish pattern-based and competitive signal evidence from company-specific and macro evidence (badge/icon differentiation) - Requirements: 8.3
- On
-
17.6 Add competitive toggle to Trading Controls page
- On
frontend/src/pages/Trading.tsx, add competitive signal layer enable/disable switch alongside existing macro toggle, with confirmation dialog - Add API hooks for
GET /api/admin/competitive/statusandPUT /api/admin/competitive/toggle - Requirements: 6.6
- On
-
-
18. Checkpoint — Ensure frontend pages render and integrate with API
- Ensure all tests pass, ask the user if questions arise.
-
19. Integration wiring and final validation
-
19.1 Write integration tests for competitive pipeline end-to-end
- Test document intelligence → pattern mining → signal propagation → aggregation flow
- Test lake publisher writes correct Parquet partitions for competitor relationships and competitive signals
- Test competitive toggle state change propagates to next aggregation cycle
- Test toggle disable/re-enable cycle preserves data integrity
- Requirements: 4.1, 5.1, 6.1, 6.4, 7.3
-
19.2 Write unit tests for API endpoints and dashboard components
- Test competitor CRUD endpoints return correct data and error codes (400, 404, 409)
- Test pattern query endpoints return correct data with filtering
- Test competitive toggle endpoint persists state and records audit event
- Test auto-inference endpoint with empty data, single company, no co-mentions
- Add MSW handlers for competitive endpoints in
frontend/src/test/mocks/handlers.ts - Test competitors panel, historical patterns panel, competitive signals panel, and decision timeline render correctly
- Requirements: 1.4, 2.5, 6.5, 8.1, 8.2, 8.5, 10.1, 10.4
-
-
20. Final checkpoint — Ensure all tests pass
- Ensure all tests pass, ask the user if questions arise.
Notes
- Tasks marked with
*are optional and can be skipped for faster MVP - Each task references specific requirements for traceability
- Checkpoints ensure incremental validation after each major phase
- Property tests validate the 21 correctness properties from the design using Hypothesis
- The design uses Python throughout — no language selection needed
- No new Kubernetes deployments required; all modules extend existing services
- Next migration number is 017 (016 is global-news-interpolation)
- Competitive layer follows the same toggle/suppression/aggregation pattern as the macro layer for consistency