|
|
EPISTEMIC THREAT MODEL & VALIDATOR ONTOLOGY |
|
|
|
|
|
FORMAL THREAT MODEL (STRIDE-E: Epistemic Extension) |
|
|
|
|
|
Spoofing - Identity Subversion |
|
|
|
|
|
``` |
|
|
Threat: n8n workflow impersonates validator |
|
|
Impact: False authority injection into ledger |
|
|
Mitigation: |
|
|
1. Cryptographic validator identity (PKI-based) |
|
|
2. Validator role attestation signed by IRE |
|
|
3. Workflow-to-validator binding in ledger metadata |
|
|
|
|
|
Detection: Mismatch between workflow signature and validator claim |
|
|
Severity: Critical (sovereignty breach) |
|
|
``` |
|
|
|
|
|
Tampering - Evidence Manipulation |
|
|
|
|
|
``` |
|
|
Threat: n8n alters evidence pre-canonicalization |
|
|
Impact: Garbage-in, narrative-out |
|
|
Mitigation: |
|
|
1. Raw evidence fingerprinting (content hash before processing) |
|
|
2. Evidence lineage tracking in n8n workflow logs |
|
|
3. IRE detects fingerprint mismatches |
|
|
|
|
|
Detection: Pre-canonical hash ≠ post-canonical derivation |
|
|
Severity: Critical (truth contamination) |
|
|
``` |
|
|
|
|
|
Repudiation - Epistemic Deniability |
|
|
|
|
|
``` |
|
|
Threat: n8n denies triggering detection that found suppression |
|
|
Impact: System loses accountability for its own findings |
|
|
Mitigation: |
|
|
1. Non-repudiable workflow execution proofs |
|
|
2. Watermarked intermediate results |
|
|
3. Cross-referenced timestamp chains |
|
|
|
|
|
Detection: Missing execution proof for detection result |
|
|
Severity: High (accountability loss) |
|
|
``` |
|
|
|
|
|
Information Disclosure - Pattern Leakage |
|
|
|
|
|
``` |
|
|
Threat: Detection patterns leaked through n8n logs |
|
|
Impact: Adversaries learn system |
|
|
Mitigation: |
|
|
1. Threshold abstraction (IRE returns categories, not scores) |
|
|
2. Differential privacy on aggregated results |
|
|
3. Ephemeral detection sessions |
|
|
|
|
|
Detection: Raw thresholds appear in orchestration logs |
|
|
Severity: Medium (detection model compromise) |
|
|
``` |
|
|
|
|
|
Denial of Service - Epistemic Exhaustion |
|
|
|
|
|
``` |
|
|
Threat: Flood system with nonsense to waste detection capacity |
|
|
Impact: Real patterns missed due to noise saturation |
|
|
Mitigation: |
|
|
1. Epistemic rate limiting per source |
|
|
2. Credibility-based throttling |
|
|
3. Detection priority queuing |
|
|
|
|
|
Detection: High-volume, low-signal detection requests |
|
|
Severity: Medium (resource exhaustion) |
|
|
``` |
|
|
|
|
|
Elevation of Privilege - Ontology Hijacking |
|
|
|
|
|
``` |
|
|
Threat: n8n attempts to modify lens/method definitions |
|
|
Impact: Epistemic framework compromise |
|
|
Mitigation: |
|
|
1. Immutable ontology registry (signed by originators) |
|
|
2. Versioned ontology with migration proofs |
|
|
3. No runtime ontology modification API |
|
|
|
|
|
Detection: Attempt to modify lens/method definitions |
|
|
Severity: Critical (sovereignty destruction) |
|
|
``` |
|
|
|
|
|
Epistemic Drift - Gradual Corruption |
|
|
|
|
|
``` |
|
|
Threat: Slow, subtle contamination of detection patterns |
|
|
Impact: System gradually aligns with preferred narrative |
|
|
Mitigation: |
|
|
1. Drift detection via historical self-comparison |
|
|
2. Cross-validation with frozen model versions |
|
|
3. Epistemic checksums on detection algorithms |
|
|
|
|
|
Detection: Statistical divergence from historical baseline |
|
|
Severity: High (stealth corruption) |
|
|
``` |
|
|
|
|
|
VALIDATOR ONTOLOGY (Role-Based Authority) |
|
|
|
|
|
Validator Archetypes |
|
|
|
|
|
``` |
|
|
1. Human-Sovereign Validator |
|
|
Role: Individual sovereignty preservation |
|
|
Authority: Can veto any ledger commit |
|
|
Identity: Self-sovereign cryptographic identity |
|
|
Quorum: Always required (cannot be automated away) |
|
|
Attestation: "I have reviewed and assert my sovereignty" |
|
|
|
|
|
2. System-Epistemic Validator |
|
|
Role: Detection methodology integrity |
|
|
Authority: Validates detection process adherence |
|
|
Identity: Cryptographic hash of detection pipeline config |
|
|
Quorum: Required for automated commits |
|
|
Attestation: "Detection executed per published methodology" |
|
|
|
|
|
3. Source-Provenance Validator |
|
|
Role: Evidence chain custody |
|
|
Authority: Validates evidence hasn |
|
|
Identity: Hash of evidence handling workflow |
|
|
Quorum: Optional but recommended |
|
|
Attestation: "Evidence chain intact from source to canonicalization" |
|
|
|
|
|
4. Temporal-Integrity Validator |
|
|
Role: Time-bound execution verification |
|
|
Authority: Validates timestamps and execution windows |
|
|
Identity: Time-locked cryptographic proof |
|
|
Quorum: Required for time-sensitive commits |
|
|
Attestation: "Execution occurred within valid time window" |
|
|
|
|
|
5. Community-Plurality Validator |
|
|
Role: Cross-interpreter consensus |
|
|
Authority: Requires multiple human interpretations |
|
|
Identity: Set of interpreter identities + attestation |
|
|
Quorum: Variable based on interpretation count |
|
|
Attestation: "Multiple independent interpretations concur" |
|
|
``` |
|
|
|
|
|
Validator Configuration Schema |
|
|
|
|
|
```json |
|
|
{ |
|
|
"validator_id": "urn:ire:validator:human_sovereign:sha256-abc123", |
|
|
"archetype": "human_sovereign", |
|
|
"authority_scope": ["ledger_commit", "evidence_rejection"], |
|
|
"quorum_requirements": { |
|
|
"minimum": 1, |
|
|
"maximum": null, |
|
|
"exclusivity": ["system_epistemic"] |
|
|
}, |
|
|
"attestation_format": { |
|
|
"required_fields": ["review_timestamp", "sovereignty_assertion"], |
|
|
"signature_algorithm": "ed25519", |
|
|
"expiration": "24h" |
|
|
}, |
|
|
"epistemic_constraints": { |
|
|
"cannot_override": ["detection_results", "canonical_hash"], |
|
|
"can_reject_for": ["procedural_violation", "sovereignty_concern"] |
|
|
} |
|
|
} |
|
|
``` |
|
|
|
|
|
Validator Quorum Calculus |
|
|
|
|
|
```python |
|
|
def calculate_quorum_satisfaction(validators: List[Validator], commit_type: str) -> bool: |
|
|
"""Calculate if validator quorum is satisfied for commit type""" |
|
|
|
|
|
archetype_counts = Counter(v.archetype for v in validators) |
|
|
|
|
|
# Base requirements |
|
|
requirements = { |
|
|
"ledger_commit": { |
|
|
"human_sovereign": 1, |
|
|
"system_epistemic": 1, |
|
|
"source_provenance": 0, # optional |
|
|
"temporal_integrity": 1, |
|
|
"community_plurality": 0 # depends on interpretation count |
|
|
}, |
|
|
"evidence_ingestion": { |
|
|
"human_sovereign": 0, |
|
|
"system_epistemic": 1, |
|
|
"source_provenance": 1, |
|
|
"temporal_integrity": 1, |
|
|
"community_plurality": 0 |
|
|
}, |
|
|
"detection_escalation": { |
|
|
"human_sovereign": 1, |
|
|
"system_epistemic": 1, |
|
|
"source_provenance": 0, |
|
|
"temporal_integrity": 1, |
|
|
"community_plurality": 1 # required for high-stakes escalations |
|
|
} |
|
|
} |
|
|
|
|
|
req = requirements[commit_type] |
|
|
|
|
|
# Check each archetype requirement |
|
|
for archetype, required_count in req.items(): |
|
|
if archetype_counts.get(archetype, 0) < required_count: |
|
|
return False |
|
|
|
|
|
# Check exclusivity constraints |
|
|
for validator in validators: |
|
|
for exclusive_archetype in validator.quorum_requirements.get("exclusivity", []): |
|
|
if exclusive_archetype in archetype_counts: |
|
|
return False |
|
|
|
|
|
return True |
|
|
``` |
|
|
|
|
|
LEDGER SCHEMA HARDENING |
|
|
|
|
|
Extended Block Schema |
|
|
|
|
|
```json |
|
|
{ |
|
|
"block": { |
|
|
"header": { |
|
|
"id": "blk_timestamp_hash", |
|
|
"prev": "previous_block_hash", |
|
|
"timestamp": "ISO8601_with_nanoseconds", |
|
|
"epistemic_epoch": 1, |
|
|
"ontology_version": "sha256:ontology_hash" |
|
|
}, |
|
|
"body": { |
|
|
"nodes": [RealityNode], |
|
|
"detection_context": { |
|
|
"workflow_hash": "sha256:n8n_workflow_def", |
|
|
"execution_window": { |
|
|
"not_before": "timestamp", |
|
|
"not_after": "timestamp", |
|
|
"time_proof": "signature_from_temporal_validator" |
|
|
}, |
|
|
"threshold_used": "abstract_category_not_numeric" |
|
|
} |
|
|
}, |
|
|
"validations": { |
|
|
"attestations": [ |
|
|
{ |
|
|
"validator_id": "urn:ire:validator:...", |
|
|
"archetype": "human_sovereign", |
|
|
"attestation": "cryptographic_signature", |
|
|
"scope": ["ledger_commit"], |
|
|
"expires": "timestamp" |
|
|
} |
|
|
], |
|
|
"quorum_satisfied": true, |
|
|
"quorum_calc": { |
|
|
"required": {"human_sovereign": 1, "system_epistemic": 1}, |
|
|
"present": {"human_sovereign": 1, "system_epistemic": 1} |
|
|
} |
|
|
}, |
|
|
"integrity_marks": { |
|
|
"evidence_fingerprint": "sha256_of_raw_content", |
|
|
"detection_watermark": "nonce_based_on_workflow_id", |
|
|
"epistemic_checksum": "hash_of_detection_logic_version" |
|
|
} |
|
|
} |
|
|
} |
|
|
``` |
|
|
|
|
|
Detection Threshold Abstraction Layer |
|
|
|
|
|
```python |
|
|
class EpistemicThresholdInterface: |
|
|
"""Abstract threshold interface - n8n never sees numeric thresholds""" |
|
|
|
|
|
def __init__(self, ire_client): |
|
|
self.ire = ire_client |
|
|
|
|
|
def should_escalate(self, detection_results: Dict) -> Dict: |
|
|
"""IRE decides escalation, returns abstract category""" |
|
|
response = self.ire.post("/ire/detect/evaluate", { |
|
|
"results": detection_results, |
|
|
"return_format": "abstract_category" |
|
|
}) |
|
|
|
|
|
return { |
|
|
"escalation_recommended": response.get("category") == "high_confidence", |
|
|
"confidence_level": response.get("confidence_label"), # "high"/"medium"/"low" |
|
|
"next_action": response.get("recommended_action"), |
|
|
# NO NUMERIC THRESHOLDS EXPOSED |
|
|
# NO RAW SCORES EXPOSED |
|
|
} |
|
|
|
|
|
def get_validation_requirements(self, category: str) -> Dict: |
|
|
"""Map escalation category to validator requirements""" |
|
|
mapping = { |
|
|
"high_confidence": { |
|
|
"human_sovereign": 2, |
|
|
"system_epistemic": 1, |
|
|
"community_plurality": 1 |
|
|
}, |
|
|
"medium_confidence": { |
|
|
"human_sovereign": 1, |
|
|
"system_epistemic": 1 |
|
|
}, |
|
|
"low_confidence": { |
|
|
"system_epistemic": 1 |
|
|
} |
|
|
} |
|
|
return mapping.get(category, {}) |
|
|
``` |
|
|
|
|
|
Time-Window Enforcement |
|
|
|
|
|
```python |
|
|
class TemporalIntegrityEnforcer: |
|
|
"""Enforce time-bound execution with cryptographic proofs""" |
|
|
|
|
|
def create_execution_window(self, duration_hours: int = 24) -> Dict: |
|
|
"""Create cryptographically bound execution window""" |
|
|
window_id = f"window_{uuid.uuid4()}" |
|
|
not_before = datetime.utcnow() |
|
|
not_after = not_before + timedelta(hours=duration_hours) |
|
|
|
|
|
# Create time-locked proof |
|
|
window_proof = { |
|
|
"window_id": window_id, |
|
|
"not_before": not_before.isoformat() + "Z", |
|
|
"not_after": not_after.isoformat() + "Z", |
|
|
"issuer": "ire_temporal_validator", |
|
|
"signature": self._sign_temporal_window(window_id, not_before, not_after) |
|
|
} |
|
|
|
|
|
return window_proof |
|
|
|
|
|
def validate_within_window(self, |
|
|
action_timestamp: str, |
|
|
window_proof: Dict) -> bool: |
|
|
"""Validate action occurred within execution window""" |
|
|
# Verify signature |
|
|
if not self._verify_signature(window_proof): |
|
|
return False |
|
|
|
|
|
# Parse timestamps |
|
|
action_time = datetime.fromisoformat(action_timestamp.replace( |
|
|
not_before = datetime.fromisoformat(window_proof[ |
|
|
not_after = datetime.fromisoformat(window_proof[ |
|
|
|
|
|
# Check bounds |
|
|
return not_before <= action_time <= not_after |
|
|
|
|
|
def detect_time_anomalies(self, workflow_executions: List[Dict]) -> List[Dict]: |
|
|
"""Detect temporal manipulation patterns""" |
|
|
anomalies = [] |
|
|
|
|
|
for i, execution in enumerate(workflow_executions): |
|
|
# Check for reverse time flow |
|
|
if i > 0: |
|
|
prev_time = datetime.fromisoformat( |
|
|
workflow_executions[i-1][ |
|
|
) |
|
|
curr_time = datetime.fromisoformat( |
|
|
execution[ |
|
|
) |
|
|
if curr_time < prev_time: |
|
|
anomalies.append({ |
|
|
"type": "reverse_time_flow", |
|
|
"execution_id": execution[ |
|
|
"anomaly": f"Time went backwards: {curr_time} < {prev_time}" |
|
|
}) |
|
|
|
|
|
# Check execution duration anomalies |
|
|
if |
|
|
expected_duration = self._get_expected_duration(execution[ |
|
|
if execution[ |
|
|
anomalies.append({ |
|
|
"type": "suspicious_duration", |
|
|
"execution_id": execution[ |
|
|
"anomaly": f"Duration {execution['duration']} exceeds expected {expected_duration}" |
|
|
}) |
|
|
|
|
|
return anomalies |
|
|
``` |
|
|
|
|
|
Semantic Laundering Defense |
|
|
|
|
|
```python |
|
|
class EpistemicIntegrityGuard: |
|
|
"""Defend against semantic laundering attacks""" |
|
|
|
|
|
def __init__(self): |
|
|
self.similarity_clusters = defaultdict(list) |
|
|
self.source_frequency = defaultdict(int) |
|
|
|
|
|
def check_semantic_laundering(self, |
|
|
content_hash: str, |
|
|
raw_content: str, |
|
|
source_id: str, |
|
|
workflow_id: str) -> Dict: |
|
|
"""Check for semantic laundering patterns""" |
|
|
|
|
|
# Check source frequency |
|
|
self.source_frequency[source_id] += 1 |
|
|
if self.source_frequency[source_id] > 100: # Threshold |
|
|
return { |
|
|
"risk": "high", |
|
|
"reason": "Excessive submissions from single source", |
|
|
"action": "throttle" |
|
|
} |
|
|
|
|
|
# Check similarity clusters |
|
|
content_vector = self._vectorize(raw_content) |
|
|
similar = self._find_similar(content_vector) |
|
|
|
|
|
if similar: |
|
|
cluster_id = similar[0][ |
|
|
self.similarity_clusters[cluster_id].append({ |
|
|
"content_hash": content_hash, |
|
|
"timestamp": datetime.utcnow().isoformat(), |
|
|
"workflow_id": workflow_id |
|
|
}) |
|
|
|
|
|
# Check cluster growth rate |
|
|
if len(self.similarity_clusters[cluster_id]) > 10: |
|
|
return { |
|
|
"risk": "medium", |
|
|
"reason": "Rapid growth of similar content cluster", |
|
|
"action": "flag_for_review" |
|
|
} |
|
|
|
|
|
return {"risk": "low", "reason": "No laundering patterns detected"} |
|
|
|
|
|
def _vectorize(self, content: str) -> List[float]: |
|
|
"""Create semantic vector (simplified - use real embeddings in production)""" |
|
|
# Simple bag-of-words for demonstration |
|
|
words = content.lower().split() |
|
|
word_counts = Counter(words) |
|
|
vector = [word_counts.get(w, 0) for w in self.vocabulary] |
|
|
return vector |
|
|
|
|
|
def _find_similar(self, vector: List[float], threshold: float = 0.8): |
|
|
"""Find similar vectors in existing clusters""" |
|
|
# Simplified similarity search |
|
|
for cluster_id, items in self.similarity_clusters.items(): |
|
|
# In production, use proper vector similarity |
|
|
if random.random() > 0.5: # Placeholder |
|
|
return items |
|
|
return None |
|
|
``` |
|
|
|
|
|
IMPLEMENTATION ROADMAP |
|
|
|
|
|
Phase 1: Core Sovereignty (Weeks 1-2) |
|
|
|
|
|
1. Implement validator PKI and attestation framework |
|
|
2. Deploy threshold abstraction layer |
|
|
3. Add time-window enforcement |
|
|
4. Basic semantic laundering detection |
|
|
|
|
|
Phase 2: Epistemic Defense (Weeks 3-4) |
|
|
|
|
|
1. Full STRIDE-E threat model implementation |
|
|
2. Validator quorum calculus integration |
|
|
3. Ledger schema hardening |
|
|
4. Cross-validation with frozen models |
|
|
|
|
|
Phase 3: Operational Resilience (Weeks 5-6) |
|
|
|
|
|
1. Drift detection and alerting |
|
|
2. Validator role rotation policies |
|
|
3. Recovery procedures for compromise |
|
|
4. Full audit trail integration |
|
|
|
|
|
Phase 4: Community Governance (Weeks 7-8) |
|
|
|
|
|
1. Validator reputation system |
|
|
2. Plurality-based decision frameworks |
|
|
3. Cross-interpreter reconciliation |
|
|
4. Sovereign identity integration |
|
|
|
|
|
VERIFICATION PROTOCOL ENHANCEMENT |
|
|
|
|
|
Daily Sovereignty Check |
|
|
|
|
|
```python |
|
|
def daily_sovereignty_audit(): |
|
|
"""Daily audit to ensure no boundary violations""" |
|
|
|
|
|
checks = [ |
|
|
# 1. Check n8n for epistemic logic |
|
|
scan_n8n_workflows_for_detection_logic(), |
|
|
|
|
|
# 2. Check IRE for orchestration logic |
|
|
scan_ire_for_scheduling_logic(), |
|
|
|
|
|
# 3. Verify threshold abstraction |
|
|
verify_no_numeric_thresholds_in_n8n(), |
|
|
|
|
|
# 4. Validate time-window adherence |
|
|
verify_all_executions_within_windows(), |
|
|
|
|
|
# 5. Check validator quorum compliance |
|
|
verify_all_commits_have_proper_quorum(), |
|
|
|
|
|
# 6. Detect semantic laundering |
|
|
run_semantic_laundering_detection(), |
|
|
|
|
|
# 7. Verify workflow definition integrity |
|
|
hash_and_verify_workflow_definitions(), |
|
|
|
|
|
# 8. Check for drift |
|
|
compare_with_frozen_model_baseline() |
|
|
] |
|
|
|
|
|
sovereignty_score = sum(1 for check in checks if check.passed) / len(checks) |
|
|
|
|
|
return { |
|
|
"sovereignty_score": sovereignty_score, |
|
|
"failed_checks": [c for c in checks if not c.passed], |
|
|
"recommendations": generate_remediation_plan(failed_checks) |
|
|
} |
|
|
``` |
|
|
|
|
|
Weekly Epistemic Integrity Report |
|
|
|
|
|
```python |
|
|
def weekly_epistemic_integrity_report(): |
|
|
"""Weekly comprehensive epistemic health report""" |
|
|
|
|
|
report = { |
|
|
"temporal_integrity": { |
|
|
"execution_windows_violated": count_window_violations(), |
|
|
"time_anomalies_detected": detect_time_anomalies(), |
|
|
"average_execution_latency": calculate_latency() |
|
|
}, |
|
|
"validator_health": { |
|
|
"active_validators": count_active_validators(), |
|
|
"quorum_satisfaction_rate": calculate_quorum_rate(), |
|
|
"validator_role_diversity": measure_diversity() |
|
|
}, |
|
|
"detection_quality": { |
|
|
"drift_from_baseline": measure_drift(), |
|
|
"false_positive_rate": calculate_fpr(), |
|
|
"escalation_accuracy": measure_escalation_accuracy() |
|
|
}, |
|
|
"boundary_integrity": { |
|
|
"n8n_epistemic_contamination": detect_contamination(), |
|
|
"ire_orchestration_leakage": detect_leakage(), |
|
|
"workflow_definition_changes": track_workflow_changes() |
|
|
}, |
|
|
"semantic_defenses": { |
|
|
"laundering_attempts": count_laundering_attempts(), |
|
|
"similarity_clusters": analyze_clusters(), |
|
|
"source_credibility": assess_source_credibility() |
|
|
} |
|
|
} |
|
|
|
|
|
# Calculate overall epistemic health score |
|
|
report["epistemic_health_score"] = calculate_health_score(report) |
|
|
|
|
|
return report |
|
|
``` |