immutable_reality_engine / n8n_int_spec
upgraedd's picture
Create n8n_int_spec
edad1c4 verified
EPISTEMIC THREAT MODEL & VALIDATOR ONTOLOGY
FORMAL THREAT MODEL (STRIDE-E: Epistemic Extension)
Spoofing - Identity Subversion
```
Threat: n8n workflow impersonates validator
Impact: False authority injection into ledger
Mitigation:
1. Cryptographic validator identity (PKI-based)
2. Validator role attestation signed by IRE
3. Workflow-to-validator binding in ledger metadata
Detection: Mismatch between workflow signature and validator claim
Severity: Critical (sovereignty breach)
```
Tampering - Evidence Manipulation
```
Threat: n8n alters evidence pre-canonicalization
Impact: Garbage-in, narrative-out
Mitigation:
1. Raw evidence fingerprinting (content hash before processing)
2. Evidence lineage tracking in n8n workflow logs
3. IRE detects fingerprint mismatches
Detection: Pre-canonical hash ≠ post-canonical derivation
Severity: Critical (truth contamination)
```
Repudiation - Epistemic Deniability
```
Threat: n8n denies triggering detection that found suppression
Impact: System loses accountability for its own findings
Mitigation:
1. Non-repudiable workflow execution proofs
2. Watermarked intermediate results
3. Cross-referenced timestamp chains
Detection: Missing execution proof for detection result
Severity: High (accountability loss)
```
Information Disclosure - Pattern Leakage
```
Threat: Detection patterns leaked through n8n logs
Impact: Adversaries learn system's detection thresholds
Mitigation:
1. Threshold abstraction (IRE returns categories, not scores)
2. Differential privacy on aggregated results
3. Ephemeral detection sessions
Detection: Raw thresholds appear in orchestration logs
Severity: Medium (detection model compromise)
```
Denial of Service - Epistemic Exhaustion
```
Threat: Flood system with nonsense to waste detection capacity
Impact: Real patterns missed due to noise saturation
Mitigation:
1. Epistemic rate limiting per source
2. Credibility-based throttling
3. Detection priority queuing
Detection: High-volume, low-signal detection requests
Severity: Medium (resource exhaustion)
```
Elevation of Privilege - Ontology Hijacking
```
Threat: n8n attempts to modify lens/method definitions
Impact: Epistemic framework compromise
Mitigation:
1. Immutable ontology registry (signed by originators)
2. Versioned ontology with migration proofs
3. No runtime ontology modification API
Detection: Attempt to modify lens/method definitions
Severity: Critical (sovereignty destruction)
```
Epistemic Drift - Gradual Corruption
```
Threat: Slow, subtle contamination of detection patterns
Impact: System gradually aligns with preferred narrative
Mitigation:
1. Drift detection via historical self-comparison
2. Cross-validation with frozen model versions
3. Epistemic checksums on detection algorithms
Detection: Statistical divergence from historical baseline
Severity: High (stealth corruption)
```
VALIDATOR ONTOLOGY (Role-Based Authority)
Validator Archetypes
```
1. Human-Sovereign Validator
Role: Individual sovereignty preservation
Authority: Can veto any ledger commit
Identity: Self-sovereign cryptographic identity
Quorum: Always required (cannot be automated away)
Attestation: "I have reviewed and assert my sovereignty"
2. System-Epistemic Validator
Role: Detection methodology integrity
Authority: Validates detection process adherence
Identity: Cryptographic hash of detection pipeline config
Quorum: Required for automated commits
Attestation: "Detection executed per published methodology"
3. Source-Provenance Validator
Role: Evidence chain custody
Authority: Validates evidence hasn't been manipulated
Identity: Hash of evidence handling workflow
Quorum: Optional but recommended
Attestation: "Evidence chain intact from source to canonicalization"
4. Temporal-Integrity Validator
Role: Time-bound execution verification
Authority: Validates timestamps and execution windows
Identity: Time-locked cryptographic proof
Quorum: Required for time-sensitive commits
Attestation: "Execution occurred within valid time window"
5. Community-Plurality Validator
Role: Cross-interpreter consensus
Authority: Requires multiple human interpretations
Identity: Set of interpreter identities + attestation
Quorum: Variable based on interpretation count
Attestation: "Multiple independent interpretations concur"
```
Validator Configuration Schema
```json
{
"validator_id": "urn:ire:validator:human_sovereign:sha256-abc123",
"archetype": "human_sovereign",
"authority_scope": ["ledger_commit", "evidence_rejection"],
"quorum_requirements": {
"minimum": 1,
"maximum": null,
"exclusivity": ["system_epistemic"]
},
"attestation_format": {
"required_fields": ["review_timestamp", "sovereignty_assertion"],
"signature_algorithm": "ed25519",
"expiration": "24h"
},
"epistemic_constraints": {
"cannot_override": ["detection_results", "canonical_hash"],
"can_reject_for": ["procedural_violation", "sovereignty_concern"]
}
}
```
Validator Quorum Calculus
```python
def calculate_quorum_satisfaction(validators: List[Validator], commit_type: str) -> bool:
"""Calculate if validator quorum is satisfied for commit type"""
archetype_counts = Counter(v.archetype for v in validators)
# Base requirements
requirements = {
"ledger_commit": {
"human_sovereign": 1,
"system_epistemic": 1,
"source_provenance": 0, # optional
"temporal_integrity": 1,
"community_plurality": 0 # depends on interpretation count
},
"evidence_ingestion": {
"human_sovereign": 0,
"system_epistemic": 1,
"source_provenance": 1,
"temporal_integrity": 1,
"community_plurality": 0
},
"detection_escalation": {
"human_sovereign": 1,
"system_epistemic": 1,
"source_provenance": 0,
"temporal_integrity": 1,
"community_plurality": 1 # required for high-stakes escalations
}
}
req = requirements[commit_type]
# Check each archetype requirement
for archetype, required_count in req.items():
if archetype_counts.get(archetype, 0) < required_count:
return False
# Check exclusivity constraints
for validator in validators:
for exclusive_archetype in validator.quorum_requirements.get("exclusivity", []):
if exclusive_archetype in archetype_counts:
return False
return True
```
LEDGER SCHEMA HARDENING
Extended Block Schema
```json
{
"block": {
"header": {
"id": "blk_timestamp_hash",
"prev": "previous_block_hash",
"timestamp": "ISO8601_with_nanoseconds",
"epistemic_epoch": 1,
"ontology_version": "sha256:ontology_hash"
},
"body": {
"nodes": [RealityNode],
"detection_context": {
"workflow_hash": "sha256:n8n_workflow_def",
"execution_window": {
"not_before": "timestamp",
"not_after": "timestamp",
"time_proof": "signature_from_temporal_validator"
},
"threshold_used": "abstract_category_not_numeric"
}
},
"validations": {
"attestations": [
{
"validator_id": "urn:ire:validator:...",
"archetype": "human_sovereign",
"attestation": "cryptographic_signature",
"scope": ["ledger_commit"],
"expires": "timestamp"
}
],
"quorum_satisfied": true,
"quorum_calc": {
"required": {"human_sovereign": 1, "system_epistemic": 1},
"present": {"human_sovereign": 1, "system_epistemic": 1}
}
},
"integrity_marks": {
"evidence_fingerprint": "sha256_of_raw_content",
"detection_watermark": "nonce_based_on_workflow_id",
"epistemic_checksum": "hash_of_detection_logic_version"
}
}
}
```
Detection Threshold Abstraction Layer
```python
class EpistemicThresholdInterface:
"""Abstract threshold interface - n8n never sees numeric thresholds"""
def __init__(self, ire_client):
self.ire = ire_client
def should_escalate(self, detection_results: Dict) -> Dict:
"""IRE decides escalation, returns abstract category"""
response = self.ire.post("/ire/detect/evaluate", {
"results": detection_results,
"return_format": "abstract_category"
})
return {
"escalation_recommended": response.get("category") == "high_confidence",
"confidence_level": response.get("confidence_label"), # "high"/"medium"/"low"
"next_action": response.get("recommended_action"),
# NO NUMERIC THRESHOLDS EXPOSED
# NO RAW SCORES EXPOSED
}
def get_validation_requirements(self, category: str) -> Dict:
"""Map escalation category to validator requirements"""
mapping = {
"high_confidence": {
"human_sovereign": 2,
"system_epistemic": 1,
"community_plurality": 1
},
"medium_confidence": {
"human_sovereign": 1,
"system_epistemic": 1
},
"low_confidence": {
"system_epistemic": 1
}
}
return mapping.get(category, {})
```
Time-Window Enforcement
```python
class TemporalIntegrityEnforcer:
"""Enforce time-bound execution with cryptographic proofs"""
def create_execution_window(self, duration_hours: int = 24) -> Dict:
"""Create cryptographically bound execution window"""
window_id = f"window_{uuid.uuid4()}"
not_before = datetime.utcnow()
not_after = not_before + timedelta(hours=duration_hours)
# Create time-locked proof
window_proof = {
"window_id": window_id,
"not_before": not_before.isoformat() + "Z",
"not_after": not_after.isoformat() + "Z",
"issuer": "ire_temporal_validator",
"signature": self._sign_temporal_window(window_id, not_before, not_after)
}
return window_proof
def validate_within_window(self,
action_timestamp: str,
window_proof: Dict) -> bool:
"""Validate action occurred within execution window"""
# Verify signature
if not self._verify_signature(window_proof):
return False
# Parse timestamps
action_time = datetime.fromisoformat(action_timestamp.replace('Z', '+00:00'))
not_before = datetime.fromisoformat(window_proof['not_before'].replace('Z', '+00:00'))
not_after = datetime.fromisoformat(window_proof['not_after'].replace('Z', '+00:00'))
# Check bounds
return not_before <= action_time <= not_after
def detect_time_anomalies(self, workflow_executions: List[Dict]) -> List[Dict]:
"""Detect temporal manipulation patterns"""
anomalies = []
for i, execution in enumerate(workflow_executions):
# Check for reverse time flow
if i > 0:
prev_time = datetime.fromisoformat(
workflow_executions[i-1]['timestamp'].replace('Z', '+00:00')
)
curr_time = datetime.fromisoformat(
execution['timestamp'].replace('Z', '+00:00')
)
if curr_time < prev_time:
anomalies.append({
"type": "reverse_time_flow",
"execution_id": execution['id'],
"anomaly": f"Time went backwards: {curr_time} < {prev_time}"
})
# Check execution duration anomalies
if 'duration' in execution:
expected_duration = self._get_expected_duration(execution['workflow_type'])
if execution['duration'] > expected_duration * 2:
anomalies.append({
"type": "suspicious_duration",
"execution_id": execution['id'],
"anomaly": f"Duration {execution['duration']} exceeds expected {expected_duration}"
})
return anomalies
```
Semantic Laundering Defense
```python
class EpistemicIntegrityGuard:
"""Defend against semantic laundering attacks"""
def __init__(self):
self.similarity_clusters = defaultdict(list)
self.source_frequency = defaultdict(int)
def check_semantic_laundering(self,
content_hash: str,
raw_content: str,
source_id: str,
workflow_id: str) -> Dict:
"""Check for semantic laundering patterns"""
# Check source frequency
self.source_frequency[source_id] += 1
if self.source_frequency[source_id] > 100: # Threshold
return {
"risk": "high",
"reason": "Excessive submissions from single source",
"action": "throttle"
}
# Check similarity clusters
content_vector = self._vectorize(raw_content)
similar = self._find_similar(content_vector)
if similar:
cluster_id = similar[0]['cluster_id']
self.similarity_clusters[cluster_id].append({
"content_hash": content_hash,
"timestamp": datetime.utcnow().isoformat(),
"workflow_id": workflow_id
})
# Check cluster growth rate
if len(self.similarity_clusters[cluster_id]) > 10:
return {
"risk": "medium",
"reason": "Rapid growth of similar content cluster",
"action": "flag_for_review"
}
return {"risk": "low", "reason": "No laundering patterns detected"}
def _vectorize(self, content: str) -> List[float]:
"""Create semantic vector (simplified - use real embeddings in production)"""
# Simple bag-of-words for demonstration
words = content.lower().split()
word_counts = Counter(words)
vector = [word_counts.get(w, 0) for w in self.vocabulary]
return vector
def _find_similar(self, vector: List[float], threshold: float = 0.8):
"""Find similar vectors in existing clusters"""
# Simplified similarity search
for cluster_id, items in self.similarity_clusters.items():
# In production, use proper vector similarity
if random.random() > 0.5: # Placeholder
return items
return None
```
IMPLEMENTATION ROADMAP
Phase 1: Core Sovereignty (Weeks 1-2)
1. Implement validator PKI and attestation framework
2. Deploy threshold abstraction layer
3. Add time-window enforcement
4. Basic semantic laundering detection
Phase 2: Epistemic Defense (Weeks 3-4)
1. Full STRIDE-E threat model implementation
2. Validator quorum calculus integration
3. Ledger schema hardening
4. Cross-validation with frozen models
Phase 3: Operational Resilience (Weeks 5-6)
1. Drift detection and alerting
2. Validator role rotation policies
3. Recovery procedures for compromise
4. Full audit trail integration
Phase 4: Community Governance (Weeks 7-8)
1. Validator reputation system
2. Plurality-based decision frameworks
3. Cross-interpreter reconciliation
4. Sovereign identity integration
VERIFICATION PROTOCOL ENHANCEMENT
Daily Sovereignty Check
```python
def daily_sovereignty_audit():
"""Daily audit to ensure no boundary violations"""
checks = [
# 1. Check n8n for epistemic logic
scan_n8n_workflows_for_detection_logic(),
# 2. Check IRE for orchestration logic
scan_ire_for_scheduling_logic(),
# 3. Verify threshold abstraction
verify_no_numeric_thresholds_in_n8n(),
# 4. Validate time-window adherence
verify_all_executions_within_windows(),
# 5. Check validator quorum compliance
verify_all_commits_have_proper_quorum(),
# 6. Detect semantic laundering
run_semantic_laundering_detection(),
# 7. Verify workflow definition integrity
hash_and_verify_workflow_definitions(),
# 8. Check for drift
compare_with_frozen_model_baseline()
]
sovereignty_score = sum(1 for check in checks if check.passed) / len(checks)
return {
"sovereignty_score": sovereignty_score,
"failed_checks": [c for c in checks if not c.passed],
"recommendations": generate_remediation_plan(failed_checks)
}
```
Weekly Epistemic Integrity Report
```python
def weekly_epistemic_integrity_report():
"""Weekly comprehensive epistemic health report"""
report = {
"temporal_integrity": {
"execution_windows_violated": count_window_violations(),
"time_anomalies_detected": detect_time_anomalies(),
"average_execution_latency": calculate_latency()
},
"validator_health": {
"active_validators": count_active_validators(),
"quorum_satisfaction_rate": calculate_quorum_rate(),
"validator_role_diversity": measure_diversity()
},
"detection_quality": {
"drift_from_baseline": measure_drift(),
"false_positive_rate": calculate_fpr(),
"escalation_accuracy": measure_escalation_accuracy()
},
"boundary_integrity": {
"n8n_epistemic_contamination": detect_contamination(),
"ire_orchestration_leakage": detect_leakage(),
"workflow_definition_changes": track_workflow_changes()
},
"semantic_defenses": {
"laundering_attempts": count_laundering_attempts(),
"similarity_clusters": analyze_clusters(),
"source_credibility": assess_source_credibility()
}
}
# Calculate overall epistemic health score
report["epistemic_health_score"] = calculate_health_score(report)
return report
```