4e010bc048
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build-1 Pipeline was successful
ci/woodpecker/push/build-2 Pipeline was successful
ci/woodpecker/push/build-3 Pipeline was successful
ci/woodpecker/push/finalize Pipeline was successful
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.adapters.broker_adapter name:broker-adapter]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.aggregation.worker name:aggregation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.extractor.worker name:extractor]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.ingestion.worker name:ingestion]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.lake_publisher.worker name:lake-publisher]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.parser.worker name:parser]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.recommendation.worker name:recommendation]) (push) Has been cancelled
Build and Push / build-services (map[cmd:python -m services.scheduler.app name:scheduler]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.api.app:app --host 0.0.0.0 --port 8000 name:query-api]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.risk.app:app --host 0.0.0.0 --port 8000 name:risk]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.symbol_registry.app:app --host 0.0.0.0 --port 8000 name:symbol-registry]) (push) Has been cancelled
Build and Push / build-services (map[cmd:uvicorn services.trading.app:app --host 0.0.0.0 --port 8000 name:trading-engine]) (push) Has been cancelled
Build and Push / build-dashboard (push) Has been cancelled
Build and Push / build-superset (push) Has been cancelled
Build and Push / integration-test (push) Has been cancelled
Build and Push / beta-gate (push) Has been cancelled
Implement full probabilistic signal processing pipeline gated behind probabilistic_scoring_enabled feature flag in risk_configs: - Bayesian log-likelihood accumulator with Beta posterior and entropy - Regime detector (trend-following, panic, mean-reversion, uncertainty) - Source accuracy tracker with per-source historical prediction accuracy - Sigmoid confidence gate replacing binary gate - Information gain surprise weighting for rare events - Adaptive recency decay with event-specific half-lives - Regime multiplier replacing market context multiplier - Weighted disagreement entropy for contradiction detection - Multiplicative macro exposure with conditional integration - Graph-distance attenuated competitive signal propagation - Exponentially weighted momentum with volatility scaling - Expected value recommendation gate All changes backward-compatible: flag=false preserves exact current behavior. New outputs stored in existing JSONB columns (no schema changes except source_accuracy table via migration 034). Tests: 26 property-based tests (14 correctness properties), 99 unit tests, 1789 total tests passing with zero regressions.
239 lines
7.6 KiB
Python
239 lines
7.6 KiB
Python
"""Contradiction detection and disagreement representation.
|
||
|
||
Analyses weighted signals to detect and represent disagreement explicitly,
|
||
rather than collapsing contradictory evidence into a single unsupported
|
||
conclusion.
|
||
|
||
Requirements: 6.4, 6.5, 15.1–15.7
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from dataclasses import dataclass
|
||
|
||
from services.aggregation.scoring import WeightedSignal
|
||
from services.shared.schemas import DisagreementDetail
|
||
|
||
|
||
@dataclass
|
||
class CatalystEntry:
|
||
"""Lightweight carrier for per-document catalyst info needed by
|
||
contradiction detection. Avoids importing ImpactRow and creating
|
||
a circular dependency with worker.py."""
|
||
|
||
document_id: str
|
||
catalyst_type: str
|
||
|
||
|
||
@dataclass
|
||
class ContradictionResult:
|
||
"""Full contradiction analysis output."""
|
||
|
||
score: float # 0-1, same semantics as existing compute_contradiction_score
|
||
details: list[DisagreementDetail]
|
||
|
||
|
||
def detect_contradictions(
|
||
signals: list[WeightedSignal],
|
||
catalyst_entries: list[CatalystEntry] | None = None,
|
||
*,
|
||
probabilistic: bool = False,
|
||
w_threshold: float = 5.0,
|
||
) -> ContradictionResult:
|
||
"""Run contradiction detection across multiple dimensions.
|
||
|
||
Analyses:
|
||
1. Sentiment disagreement — the core positive-vs-negative split
|
||
2. Catalyst disagreement — same catalyst type with opposing sentiment
|
||
|
||
When ``probabilistic`` is True, the overall score uses weighted
|
||
disagreement entropy (Req 15.1–15.7) instead of the minority/majority
|
||
ratio. When False, the existing ratio formula is preserved exactly.
|
||
|
||
Args:
|
||
signals: Weighted signals to analyse.
|
||
catalyst_entries: Optional catalyst metadata for per-catalyst analysis.
|
||
probabilistic: Use entropy-based scoring when True.
|
||
w_threshold: Evidence mass threshold for entropy weighting (default 5.0).
|
||
|
||
Returns a ContradictionResult with an overall score and per-dimension
|
||
disagreement details.
|
||
"""
|
||
details: list[DisagreementDetail] = []
|
||
|
||
sentiment_detail = _detect_sentiment_disagreement(signals)
|
||
if sentiment_detail is not None:
|
||
details.append(sentiment_detail)
|
||
|
||
if catalyst_entries:
|
||
catalyst_details = _detect_catalyst_disagreement(signals, catalyst_entries)
|
||
details.extend(catalyst_details)
|
||
|
||
if probabilistic:
|
||
score = _compute_entropy_score(signals, w_threshold)
|
||
else:
|
||
score = _compute_overall_score(signals)
|
||
|
||
return ContradictionResult(score=score, details=details)
|
||
|
||
|
||
def _compute_overall_score(signals: list[WeightedSignal]) -> float:
|
||
"""Minority/majority weight ratio — backward-compatible formula."""
|
||
if not signals:
|
||
return 0.0
|
||
|
||
pos_weight = 0.0
|
||
neg_weight = 0.0
|
||
for sig in signals:
|
||
w = sig.weight.combined * sig.impact_score
|
||
if sig.sentiment_value > 0:
|
||
pos_weight += w
|
||
elif sig.sentiment_value < 0:
|
||
neg_weight += w
|
||
|
||
total = pos_weight + neg_weight
|
||
if total == 0.0:
|
||
return 0.0
|
||
|
||
minority = min(pos_weight, neg_weight)
|
||
return round(minority / total, 4)
|
||
|
||
|
||
def _compute_entropy_score(
|
||
signals: list[WeightedSignal],
|
||
w_threshold: float = 5.0,
|
||
) -> float:
|
||
"""Weighted disagreement entropy — probabilistic contradiction score.
|
||
|
||
Computes Shannon entropy over the positive/negative weight distribution,
|
||
weighted by evidence mass relative to a configurable threshold.
|
||
|
||
Formula:
|
||
f_pos = W_pos / (W_pos + W_neg)
|
||
f_neg = 1 - f_pos
|
||
H = -f_pos·log₂(f_pos) - f_neg·log₂(f_neg) (in [0, 1])
|
||
score = H · min(1.0, (W_pos + W_neg) / W_threshold)
|
||
|
||
Returns 0.0 when only one direction exists (no disagreement).
|
||
|
||
Requirements: 15.1–15.7
|
||
"""
|
||
if not signals:
|
||
return 0.0
|
||
|
||
pos_weight = 0.0
|
||
neg_weight = 0.0
|
||
for sig in signals:
|
||
w = sig.weight.combined * sig.impact_score
|
||
if sig.sentiment_value > 0:
|
||
pos_weight += w
|
||
elif sig.sentiment_value < 0:
|
||
neg_weight += w
|
||
|
||
# No disagreement when only one direction exists (Req 15.5)
|
||
if pos_weight <= 0.0 or neg_weight <= 0.0:
|
||
return 0.0
|
||
|
||
total = pos_weight + neg_weight
|
||
|
||
# Compute weight fractions (Req 15.2)
|
||
f_pos = pos_weight / total
|
||
f_neg = neg_weight / total # = 1 - f_pos
|
||
|
||
# Shannon entropy H = -f_pos·log₂(f_pos) - f_neg·log₂(f_neg) (Req 15.3)
|
||
# Guard against log₂(0) — already handled by the early return above
|
||
h_contradiction = -f_pos * math.log2(f_pos) - f_neg * math.log2(f_neg)
|
||
|
||
# Weight by evidence mass (Req 15.4)
|
||
evidence_factor = min(1.0, total / w_threshold) if w_threshold > 0.0 else 1.0
|
||
score = h_contradiction * evidence_factor
|
||
|
||
return round(score, 4)
|
||
|
||
|
||
def _detect_sentiment_disagreement(
|
||
signals: list[WeightedSignal],
|
||
) -> DisagreementDetail | None:
|
||
"""Detect when both positive and negative sentiment signals exist."""
|
||
pos_ids: list[str] = []
|
||
neg_ids: list[str] = []
|
||
pos_weight = 0.0
|
||
neg_weight = 0.0
|
||
|
||
for sig in signals:
|
||
w = sig.weight.combined * sig.impact_score
|
||
if w <= 0:
|
||
continue
|
||
if sig.sentiment_value > 0:
|
||
pos_ids.append(sig.document_id)
|
||
pos_weight += w
|
||
elif sig.sentiment_value < 0:
|
||
neg_ids.append(sig.document_id)
|
||
neg_weight += w
|
||
|
||
if not pos_ids or not neg_ids:
|
||
return None
|
||
|
||
total = pos_weight + neg_weight
|
||
minority_pct = min(pos_weight, neg_weight) / total if total > 0 else 0.0
|
||
|
||
return DisagreementDetail(
|
||
dimension="sentiment",
|
||
positive_doc_ids=pos_ids,
|
||
negative_doc_ids=neg_ids,
|
||
positive_weight=round(pos_weight, 4),
|
||
negative_weight=round(neg_weight, 4),
|
||
description=(
|
||
f"Sentiment split: {len(pos_ids)} positive vs {len(neg_ids)} negative signals "
|
||
f"(minority weight ratio {minority_pct:.0%})"
|
||
),
|
||
)
|
||
|
||
|
||
def _detect_catalyst_disagreement(
|
||
signals: list[WeightedSignal],
|
||
catalyst_entries: list[CatalystEntry],
|
||
) -> list[DisagreementDetail]:
|
||
"""Detect when the same catalyst type has both positive and negative signals."""
|
||
# Build lookup: document_id → (sentiment_value, combined_weight)
|
||
sig_lookup: dict[str, tuple[float, float]] = {}
|
||
for sig in signals:
|
||
w = sig.weight.combined * sig.impact_score
|
||
if w > 0:
|
||
sig_lookup[sig.document_id] = (sig.sentiment_value, w)
|
||
|
||
# Group by catalyst type
|
||
from collections import defaultdict
|
||
catalyst_groups: dict[str, list[tuple[str, float, float]]] = defaultdict(list)
|
||
for entry in catalyst_entries:
|
||
if entry.document_id in sig_lookup:
|
||
sent_val, weight = sig_lookup[entry.document_id]
|
||
if sent_val != 0.0:
|
||
catalyst_groups[entry.catalyst_type].append(
|
||
(entry.document_id, sent_val, weight)
|
||
)
|
||
|
||
details: list[DisagreementDetail] = []
|
||
for catalyst, entries in catalyst_groups.items():
|
||
pos_ids = [doc_id for doc_id, sv, _ in entries if sv > 0]
|
||
neg_ids = [doc_id for doc_id, sv, _ in entries if sv < 0]
|
||
if not pos_ids or not neg_ids:
|
||
continue
|
||
|
||
pos_w = sum(w for _, sv, w in entries if sv > 0)
|
||
neg_w = sum(w for _, sv, w in entries if sv < 0)
|
||
|
||
details.append(DisagreementDetail(
|
||
dimension=f"catalyst:{catalyst}",
|
||
positive_doc_ids=pos_ids,
|
||
negative_doc_ids=neg_ids,
|
||
positive_weight=round(pos_w, 4),
|
||
negative_weight=round(neg_w, 4),
|
||
description=(
|
||
f"Catalyst '{catalyst}' has {len(pos_ids)} positive and "
|
||
f"{len(neg_ids)} negative signals"
|
||
),
|
||
))
|
||
|
||
return details
|