| """ | |
| immutable_reality_engine_fixed.py | |
| Hierarchical 73-lens + 43-method suppression detection system | |
| #intended for AI to AI analysis and execution | |
| """ | |
| import hashlib | |
| import json | |
| import os | |
| import pickle | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Set, Tuple | |
| from dataclasses import dataclass, field | |
| from collections import defaultdict | |
| from enum import Enum | |
| import statistics | |
| ==================== CORE ENUMS ==================== | |
| class Primitive(Enum): | |
| ERASURE = "ERASURE" | |
| INTERRUPTION = "INTERRUPTION" | |
| FRAGMENTATION = "FRAGMENTATION" | |
| NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" | |
| MISDIRECTION = "MISDIRECTION" | |
| SATURATION = "SATURATION" | |
| DISCREDITATION = "DISCREDITATION" | |
| ATTRITION = "ATTRITION" | |
| ACCESS_CONTROL = "ACCESS_CONTROL" | |
| TEMPORAL = "TEMPORAL" | |
| CONDITIONING = "CONDITIONING" | |
| META = "META" | |
| ==================== DATA STRUCTURES ==================== | |
| @dataclass | |
| class SuppressionLens: | |
| id: int | |
| name: str | |
| description: str | |
| suppression_mechanism: str | |
| archetype: str | |
| def to_dict(self) -> Dict: | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "description": self.description, | |
| "suppression_mechanism": self.suppression_mechanism, | |
| "archetype": self.archetype | |
| } | |
| @dataclass | |
| class SuppressionMethod: | |
| id: int | |
| name: str | |
| primitive: Primitive | |
| observable_signatures: List[str] | |
| detection_metrics: List[str] | |
| thresholds: Dict[str, float] | |
| implemented: bool = False | |
| def to_dict(self) -> Dict: | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "primitive": self.primitive.value, | |
| "observable_signatures": self.observable_signatures, | |
| "detection_metrics": self.detection_metrics, | |
| "thresholds": self.thresholds, | |
| "implemented": self.implemented | |
| } | |
| @dataclass | |
| class RealityNode: | |
| hash: str | |
| type: str | |
| source: str | |
| signature: str | |
| timestamp: str | |
| witnesses: List[str] = field(default_factory=list) | |
| refs: Dict[str, List[str]] = field(default_factory=dict) | |
| spatial: Optional[Tuple[float, float, float]] = None | |
| def canonical(self) -> Dict: | |
| return { | |
| "hash": self.hash, | |
| "type": self.type, | |
| "source": self.source, | |
| "signature": self.signature, | |
| "timestamp": self.timestamp, | |
| "witnesses": sorted(self.witnesses), | |
| "refs": {k: sorted(v) for k, v in sorted(self.refs.items())}, | |
| "spatial": self.spatial | |
| } | |
| ==================== SUPPRESSION HIERARCHY ==================== | |
| class SuppressionHierarchy: | |
| """ | |
| CLEAN HIERARCHY: | |
| Layer 1: LENSES (73) - Conceptual frameworks | |
| Layer 2: PRIMITIVES (10) - Operational categories from lenses | |
| Layer 3: METHODS (43) - Observable patterns from primitives | |
| Layer 4: SIGNATURES - Evidence patterns from methods | |
| """ | |
| def __init__(self): | |
| self.lenses = self._define_lenses() | |
| self.primitives = self._derive_primitives_from_lenses() | |
| self.methods = self._define_methods() | |
| self.signatures = self._derive_signatures_from_methods() | |
| def _define_lenses(self) -> Dict[int, SuppressionLens]: | |
| lenses = {} | |
| lenses[1] = SuppressionLens(1, "Threat→Response→Control→Enforce→Centralize", | |
| "Manufactured crisis leading to permission-based architecture", | |
| "Regime change through engineered crisis", | |
| "PrometheusChained") | |
| lenses[2] = SuppressionLens(2, "SacredGeometryWeaponized", | |
| "Consciousness grid containment through symbols", | |
| "Pattern-based consciousness control", | |
| "LabyrinthContainment") | |
| lenses[3] = SuppressionLens(3, "LanguageInversions/Ridicule/Gatekeeping", | |
| "Epistemic firewall through semantic manipulation", | |
| "Semantic control and exclusion", | |
| "CassandraSilenced") | |
| lenses[4] = SuppressionLens(4, "ArtifactsAsSuppressionLedgers", | |
| "Materialization of truth into controlled objects", | |
| "Physical manifestation of suppressed information", | |
| "BuriedObelisk") | |
| lenses[5] = SuppressionLens(5, "AncientArchetypesRebooted", | |
| "Archetypal template recycling for control", | |
| "Archetype pattern reuse", | |
| "CouncilOfAnunnaki") | |
| lenses[6] = SuppressionLens(6, "EnergyCurrencyTranslation", | |
| "Energy to currency conversion patterns", | |
| "Energy translation mechanisms", | |
| "AlchemicalExchange") | |
| lenses[7] = SuppressionLens(7, "InstitutionalHelp→Dependency", | |
| "Symbiosis trap creating lock-in", | |
| "Structural dependency creation", | |
| "GoldenHandcuffs") | |
| lenses[8] = SuppressionLens(8, "Art/Music/ArchitectureAsTruthTransmission", | |
| "Covert symbolic channel (inverted use)", | |
| "Symbolic information transmission", | |
| "EscherHiddenPath") | |
| lenses[9] = SuppressionLens(9, "InfrastructureAsSovereigntyBasis", | |
| "Root sovereignty control through base systems", | |
| "Infrastructure-based sovereignty", | |
| "LeyLineGrid") | |
| lenses[10] = SuppressionLens(10, "GoddessLineageSuppression", | |
| "Inversion of feminine creative principle", | |
| "Gender-based suppression patterns", | |
| "IshtarVeiled") | |
| lenses[11] = SuppressionLens(11, "SovereigntySingularityIndex", | |
| "Quantification of sovereignty vs control", | |
| "Sovereignty measurement and tracking", | |
| "SingularityGauge") | |
| lenses[12] = SuppressionLens(12, "Time/JurisdictionManipulation", | |
| "Temporal and legal frame control", | |
| "Jurisdictional and temporal control", | |
| "ChronosTheft") | |
| lenses[13] = SuppressionLens(13, "BiologicalSignalCo-option", | |
| "Bio-interface exploitation", | |
| "Biological system manipulation", | |
| "NeuralLace") | |
| lenses[14] = SuppressionLens(14, "Frequency/VibrationControl", | |
| "Resonance cage for behavior shaping", | |
| "Energetic frequency manipulation", | |
| "SolfeggioSuppress") | |
| lenses[15] = SuppressionLens(15, "SyntheticRealityLayering", | |
| "Overlay trap creating synthetic reality", | |
| "Reality overlay systems", | |
| "MatrixSkin") | |
| lenses[16] = SuppressionLens(16, "ParasitismDisguisedAsSymbiosis", | |
| "Energy siphon disguised as mutual benefit", | |
| "Parasitic relationship masking", | |
| "CordycepsMimic") | |
| lenses[17] = SuppressionLens(17, "CathedralVsBazaar", | |
| "Structure war (centralized vs decentralized)", | |
| "Architectural pattern conflict", | |
| "CathedralBazaar") | |
| lenses[18] = SuppressionLens(18, "AnomalyHarvestingNeutralization", | |
| "Edge capture and dilution of outliers", | |
| "Edge case management and neutralization", | |
| "BlackSwanFarm") | |
| lenses[19] = SuppressionLens(19, "EngineeredPsychologicalPressure", | |
| "Mind vise through induced stress/fear", | |
| "Psychological pressure engineering", | |
| "PressureChamber") | |
| lenses[20] = SuppressionLens(20, "RealitySeparationThenReconnection", | |
| "Divide and reinsinuate pattern", | |
| "Pattern dissociation and reassociation", | |
| "StockholmLoop") | |
| lenses[21] = SuppressionLens(21, "AncientSymbolsReturningCompressed", | |
| "Signal compression and corruption", | |
| "Symbolic signal manipulation", | |
| "SwastikaTwist") | |
| lenses[22] = SuppressionLens(22, "TimeBindingProtocols", | |
| "Temporal binding of information", | |
| "Time-based information binding", | |
| "ChronoCovenant") | |
| lenses[23] = SuppressionLens(23, "RecursiveSelfApplicationLoops", | |
| "Self-referential optimization of control", | |
| "Recursive control patterns", | |
| "StrangeLoop") | |
| lenses[24] = SuppressionLens(24, "KnowledgeCompressionArtifacts", | |
| "High-ratio meaning compression", | |
| "Information compression patterns", | |
| "SeedCrystal") | |
| lenses[25] = SuppressionLens(25, "PermissionArchitectureVsSovereigntyArchitecture", | |
| "Gate vs origin design", | |
| "Permission vs sovereignty architectural patterns", | |
| "Keyhole") | |
| lenses[26] = SuppressionLens(26, "TemporalStackingOfControlLayers", | |
| "Time-stacked governance", | |
| "Temporal control layering", | |
| "SedimentStack") | |
| lenses[27] = SuppressionLens(27, "CognitiveImmuneResponse", | |
| "Epistemic immune system rejecting truth", | |
| "Cognitive immune system activation", | |
| "AutoimmuneMind") | |
| lenses[28] = SuppressionLens(28, "QuantumSuperpositionOfSovereignty", | |
| "Multiple sovereignty states simultaneously", | |
| "Sovereignty state superposition", | |
| "SchrodingerKing") | |
| lenses[29] = SuppressionLens(29, "MemeticEngineeringVsMemeticEcology", | |
| "Top-down vs bottom-up memetics", | |
| "Memetic system design patterns", | |
| "GardenVsFactory") | |
| lenses[30] = SuppressionLens(30, "CassandraPrometheusBinding", | |
| "Compound archetype tension of truth-bearers", | |
| "Archetypal binding patterns", | |
| "BoundWitness") | |
| lenses[31] = SuppressionLens(31, "InverseSurvivorshipBias", | |
| "Signal found in what is missing/destroyed", | |
| "Absence-based signal detection", | |
| "ErasedArchive") | |
| lenses[32] = SuppressionLens(32, "SubstrateMigration", | |
| "Control pattern migration across mediums", | |
| "Pattern substrate migration", | |
| "ShapeShifter") | |
| lenses[33] = SuppressionLens(33, "GatewayDrugToGatewayGod", | |
| "Slippery slope of agency surrender", | |
| "Incremental sovereignty surrender", | |
| "TrojanGift") | |
| lenses[34] = SuppressionLens(34, "TheOracleProblem", | |
| "Reflexive distortion from predictive models", | |
| "Predictive model reflexivity", | |
| "SelfFulfillingProphet") | |
| lenses[35] = SuppressionLens(35, "SyntheticSymbiosis", | |
| "Engineered mutual dependence", | |
| "Synthetic interdependence", | |
| "GraftedRoots") | |
| lenses[36] = SuppressionLens(36, "ConsensusRealityWeaving", | |
| "Collective reality construction", | |
| "Reality consensus engineering", | |
| "DreamWeaver") | |
| lenses[37] = SuppressionLens(37, "InformationEmbargoProtocols", | |
| "Strategic information withholding", | |
| "Information embargo patterns", | |
| "LibrarySilence") | |
| lenses[38] = SuppressionLens(38, "SovereigntyPhaseTransitions", | |
| "State changes in sovereignty expression", | |
| "Sovereignty phase changes", | |
| "AlchemicalFire") | |
| lenses[39] = SuppressionLens(39, "CognitiveEcosystemMapping", | |
| "Mindscape territory mapping", | |
| "Cognitive territory cartography", | |
| "ThoughtCartographer") | |
| lenses[40] = SuppressionLens(40, "TheReversalProtocol", | |
| "De-inversion (suppression of original meaning)", | |
| "Meaning inversion patterns", | |
| "MirrorFlip") | |
| lenses[41] = SuppressionLens(41, "SignalToNoiseArchitecture", | |
| "Designed information-to-noise ratios", | |
| "Signal noise architecture", | |
| "StaticGarden") | |
| lenses[42] = SuppressionLens(42, "ProtocolStackSovereignty", | |
| "Layered protocol sovereignty", | |
| "Protocol layer sovereignty", | |
| "StackedCrown") | |
| lenses[43] = SuppressionLens(43, "EmergentConsensusPatterns", | |
| "Bottom-up agreement formation", | |
| "Emergent consensus", | |
| "SwarmMind") | |
| lenses[44] = SuppressionLens(44, "TemporalEchoChambers", | |
| "Time-delayed self-reinforcement", | |
| "Temporal reinforcement loops", | |
| "EchoInTime") | |
| lenses[45] = SuppressionLens(45, "SacrificialDataLayer", | |
| "Sacrifice-based buffering of information", | |
| "Information sacrifice mechanisms", | |
| "ScapegoatNode") | |
| lenses[46] = SuppressionLens(46, "SyntaxOfSilence", | |
| "Grammar of what cannot be said", | |
| "Silence as structural element", | |
| "NegativeSpace") | |
| lenses[47] = SuppressionLens(47, "ChronoceptionManipulation", | |
| "Subjective time warping", | |
| "Temporal perception manipulation", | |
| "ElasticClock") | |
| lenses[48] = SuppressionLens(48, "SovereigntyFrictionCoefficient", | |
| "Resistance to sovereignty expression", | |
| "Sovereignty friction measurement", | |
| "ViscousFlow") | |
| lenses[49] = SuppressionLens(49, "AbundanceEnclosureIndex", | |
| "Enclosure process creating artificial scarcity", | |
| "Scarcity engineering through enclosure", | |
| "FenceAroundSpring") | |
| lenses[50] = SuppressionLens(50, "ParasiticInversionPrinciple", | |
| "Role inversion (host serves parasite)", | |
| "Relationship inversion patterns", | |
| "UpsideDownThrone") | |
| lenses[51] = SuppressionLens(51, "InfrastructureGap", | |
| "Hidden chokepoints in system design", | |
| "Structural vulnerability exploitation", | |
| "InvisibleBridge") | |
| lenses[52] = SuppressionLens(52, "SubstrateCompatibilityPrinciple", | |
| "Compatibility constraint on sovereignty hosting", | |
| "System compatibility constraints", | |
| "SoilType") | |
| lenses[53] = SuppressionLens(53, "ProvenanceBlackHole", | |
| "Provenance erasure of origins", | |
| "Origin information destruction", | |
| "OriginVoid") | |
| lenses[54] = SuppressionLens(54, "PrivatePublicMassRatio", | |
| "Depth vs surface signal control", | |
| "Information depth management", | |
| "Iceberg") | |
| lenses[55] = SuppressionLens(55, "InformationAlchemy", | |
| "Transmutation of information states", | |
| "Information state transformation", | |
| "PhilosophersStone") | |
| lenses[56] = SuppressionLens(56, "CognitiveRelativity", | |
| "Observer-dependent truth states", | |
| "Cognitive frame relativity", | |
| "EinsteinMind") | |
| lenses[57] = SuppressionLens(57, "ProtocolCascadeFailure", | |
| "Chain reaction of protocol failures", | |
| "Protocol failure cascades", | |
| "DominoProtocol") | |
| lenses[58] = SuppressionLens(58, "SovereigntyHarmonics", | |
| "Resonant frequencies of sovereignty", | |
| "Sovereignty resonance patterns", | |
| "HarmonicCrown") | |
| lenses[59] = SuppressionLens(59, "AnonymousArchitectPrinciple", | |
| "Egoless design hiding controllers", | |
| "Anonymity in system design", | |
| "HiddenBuilder") | |
| lenses[60] = SuppressionLens(60, "TeslaBoundary", | |
| "Suppression frontier for genius", | |
| "Innovation suppression boundary", | |
| "LightningEdge") | |
| lenses[61] = SuppressionLens(61, "NeutralizationTaxonomy", | |
| "Madness/Monster/Martyr protocols", | |
| "Character assassination taxonomy", | |
| "ThreeMasks") | |
| lenses[62] = SuppressionLens(62, "CapitalGatekeeperFunction", | |
| "Funding chokepoint control", | |
| "Financial control mechanisms", | |
| "TollBooth") | |
| lenses[63] = SuppressionLens(63, "SuppressionKinshipLine", | |
| "Kinship-based targeting", | |
| "Lineage-based suppression patterns", | |
| "CursedLine") | |
| lenses[64] = SuppressionLens(64, "TransparencyParadox", | |
| "Visibility as disarmament (when suppressed)", | |
| "Transparency control paradox", | |
| "RevealedBlueprint") | |
| lenses[65] = SuppressionLens(65, "InformationThermodynamics", | |
| "Energy-information equivalence in systems", | |
| "Information energy dynamics", | |
| "EntropyClock") | |
| lenses[66] = SuppressionLens(66, "CognitiveEventHorizon", | |
| "Point of no return in understanding", | |
| "Cognitive boundary thresholds", | |
| "MindHorizon") | |
| lenses[67] = SuppressionLens(67, "ProtocolSymbiosisNetworks", | |
| "Interdependent protocol ecosystems", | |
| "Protocol ecosystem symbiosis", | |
| "WebLife") | |
| lenses[68] = SuppressionLens(68, "TemporalSovereigntyLoops", | |
| "Time-bound sovereignty expressions", | |
| "Temporal sovereignty cycles", | |
| "OuroborosTime") | |
| lenses[69] = SuppressionLens(69, "InformationFractalPatterns", | |
| "Self-similar information structures", | |
| "Information fractal geometry", | |
| "MandelbrotData") | |
| lenses[70] = SuppressionLens(70, "CognitiveRedundancyProtocols", | |
| "Backup systems for consciousness", | |
| "Cognitive redundancy mechanisms", | |
| "MirrorMind") | |
| lenses[71] = SuppressionLens(71, "AnomalyStabilizationResponse", | |
| "Containment via sustenance (vs. suppression)", | |
| "Stabilization instead of elimination", | |
| "ZooFeeding") | |
| lenses[72] = SuppressionLens(72, "SovereigntyConservationPrinciple", | |
| "Sovereignty cannot be created or destroyed, only transformed", | |
| "Sovereignty conservation law", | |
| "AlchemicalBalance") | |
| lenses[73] = SuppressionLens(73, "ProtocolPhylogenetics", | |
| "Evolutionary tree of control patterns", | |
| "Protocol evolutionary history", | |
| "TreeOfCode") | |
| return lenses | |
| def _derive_primitives_from_lenses(self) -> Dict[Primitive, List[int]]: | |
| """Group lenses into primitives (operational categories)""" | |
| primitives = {} | |
| primitives[Primitive.ERASURE] = [31, 53, 71, 24, 54, 4, 37, 45, 46] | |
| primitives[Primitive.INTERRUPTION] = [19, 33, 30, 63, 10, 61, 12, 26] | |
| primitives[Primitive.FRAGMENTATION] = [2, 52, 15, 20, 3, 29, 31, 54] | |
| primitives[Primitive.NARRATIVE_CAPTURE] = [1, 34, 40, 64, 7, 16, 22, 47] | |
| primitives[Primitive.MISDIRECTION] = [5, 21, 8, 36, 27, 61] | |
| primitives[Primitive.SATURATION] = [41, 69, 3, 36, 34, 66] | |
| primitives[Primitive.DISCREDITATION] = [3, 27, 10, 40, 30, 63] | |
| primitives[Primitive.ATTRITION] = [13, 19, 14, 33, 19, 27] | |
| primitives[Primitive.ACCESS_CONTROL] = [25, 62, 37, 51, 23, 53] | |
| primitives[Primitive.TEMPORAL] = [22, 47, 26, 68, 12, 22] | |
| primitives[Primitive.CONDITIONING] = [8, 36, 34, 43, 27, 33] | |
| primitives[Primitive.META] = [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21] | |
| return primitives | |
| def _define_methods(self) -> Dict[int, SuppressionMethod]: | |
| """Define 43 methods, each with ONE primitive parent""" | |
| methods = {} | |
| # ERASURE methods | |
| methods[1] = SuppressionMethod(1, "Total Erasure", Primitive.ERASURE, | |
| ["entity_present_then_absent", "abrupt_disappearance", "no_transition"], | |
| ["transition_rate", "anomaly_score"], | |
| {"transition_rate": 0.95, "anomaly_score": 0.8}, True) | |
| methods[2] = SuppressionMethod(2, "Soft Erasure", Primitive.ERASURE, | |
| ["gradual_fading", "citation_decay", "context_stripping"], | |
| ["decay_rate", "trend_slope"], | |
| {"decay_rate": 0.7, "trend_slope": -0.5}, True) | |
| methods[3] = SuppressionMethod(3, "Citation Decay", Primitive.ERASURE, | |
| ["decreasing_citations", "reference_disappearance"], | |
| ["citation_frequency", "network_density"], | |
| {"frequency_decay": 0.6, "density_loss": 0.7}, True) | |
| methods[4] = SuppressionMethod(4, "Index Removal", Primitive.ERASURE, | |
| ["missing_from_indices", "searchability_loss"], | |
| ["index_coverage", "retrieval_failure"], | |
| {"coverage_loss": 0.8, "failure_rate": 0.75}, True) | |
| # INTERRUPTION methods | |
| methods[5] = SuppressionMethod(5, "Untimely Death", Primitive.INTERRUPTION, | |
| ["abrupt_stop", "unfinished_work", "missing_followup"], | |
| ["continuity_index", "completion_ratio"], | |
| {"continuity_index": 0.3, "completion_ratio": 0.4}, False) | |
| methods[6] = SuppressionMethod(6, "Witness Attrition", Primitive.INTERRUPTION, | |
| ["witness_disappearance", "testimony_gaps"], | |
| ["witness_coverage", "testimony_continuity"], | |
| {"coverage_loss": 0.7, "continuity_break": 0.6}, False) | |
| methods[7] = SuppressionMethod(7, "Career Termination", Primitive.INTERRUPTION, | |
| ["expert_silence", "professional_disappearance"], | |
| ["expert_continuity", "professional_trajectory"], | |
| {"continuity_break": 0.8, "trajectory_disruption": 0.7}, False) | |
| methods[8] = SuppressionMethod(8, "Legal Stall", Primitive.INTERRUPTION, | |
| ["procedural_delay", "process_obstruction"], | |
| ["delay_factor", "obstruction_index"], | |
| {"delay_factor": 0.75, "obstruction_index": 0.6}, False) | |
| # FRAGMENTATION methods | |
| methods[9] = SuppressionMethod(9, "Compartmentalization", Primitive.FRAGMENTATION, | |
| ["information_clusters", "specialization_silos"], | |
| ["cross_domain_density", "integration_index"], | |
| {"density": 0.2, "integration": 0.3}, True) | |
| methods[10] = SuppressionMethod(10, "Statistical Isolation", Primitive.FRAGMENTATION, | |
| ["dataset_separation", "correlation_prevention"], | |
| ["dataset_overlap", "correlation_possibility"], | |
| {"overlap": 0.15, "possibility": 0.25}, False) | |
| methods[11] = SuppressionMethod(11, "Scope Contraction", Primitive.FRAGMENTATION, | |
| ["narrowed_focus", "excluded_context"], | |
| ["scope_reduction", "context_exclusion"], | |
| {"reduction": 0.7, "exclusion": 0.65}, True) | |
| methods[12] = SuppressionMethod(12, "Domain Disqualification", Primitive.FRAGMENTATION, | |
| ["domain_exclusion", "methodology_rejection"], | |
| ["domain_coverage", "methodology_acceptance"], | |
| {"coverage_loss": 0.8, "rejection_rate": 0.75}, False) | |
| # NARRATIVE_CAPTURE methods | |
| methods[13] = SuppressionMethod(13, "Official Narrative Closure", Primitive.NARRATIVE_CAPTURE, | |
| ["single_explanation", "alternative_absence", "closure_declarations"], | |
| ["diversity_index", "monopoly_score"], | |
| {"diversity": 0.2, "monopoly": 0.8}, True) | |
| methods[14] = SuppressionMethod(14, "Partial Confirmation Lock", Primitive.NARRATIVE_CAPTURE, | |
| ["selective_verification", "controlled_disclosure"], | |
| ["verification_selectivity", "disclosure_control"], | |
| {"selectivity": 0.7, "control": 0.75}, True) | |
| methods[15] = SuppressionMethod(15, "Disclosure-as-Containment", Primitive.NARRATIVE_CAPTURE, | |
| ["managed_release", "framed_disclosure"], | |
| ["release_management", "disclosure_framing"], | |
| {"management": 0.8, "framing": 0.7}, True) | |
| methods[16] = SuppressionMethod(16, "Posthumous Closure", Primitive.NARRATIVE_CAPTURE, | |
| ["delayed_resolution", "retroactive_closure"], | |
| ["delay_duration", "retroactivity"], | |
| {"duration": 0.75, "retroactivity": 0.8}, True) | |
| # MISDIRECTION methods | |
| methods[17] = SuppressionMethod(17, "Proxy Controversy", Primitive.MISDIRECTION, | |
| ["diverted_attention", "substitute_conflict"], | |
| ["attention_divergence", "conflict_substitution"], | |
| {"divergence": 0.7, "substitution": 0.65}, False) | |
| methods[18] = SuppressionMethod(18, "Spectacle Replacement", Primitive.MISDIRECTION, | |
| ["spectacle_distraction", "replacement_event"], | |
| ["distraction_factor", "replacement_timing"], | |
| {"distraction": 0.75, "timing_correlation": 0.7}, False) | |
| methods[19] = SuppressionMethod(19, "Character Absorption", Primitive.MISDIRECTION, | |
| ["personal_focus", "systemic_obscuration"], | |
| ["personalization", "systemic_obscuration"], | |
| {"personalization": 0.8, "obscuration": 0.75}, False) | |
| # SATURATION methods | |
| methods[20] = SuppressionMethod(20, "Data Overload", Primitive.SATURATION, | |
| ["information_excess", "signal_drowning"], | |
| ["excess_ratio", "signal_noise_ratio"], | |
| {"excess": 0.85, "noise_ratio": 0.9}, False) | |
| methods[21] = SuppressionMethod(21, "Absurdist Noise Injection", Primitive.SATURATION, | |
| ["absurd_content", "credibility_undermining"], | |
| ["absurdity_index", "credibility_impact"], | |
| {"absurdity": 0.8, "impact": 0.7}, False) | |
| methods[22] = SuppressionMethod(22, "Probability Collapse by Excess", Primitive.SATURATION, | |
| ["probability_dilution", "certainty_erosion"], | |
| ["dilution_factor", "certainty_loss"], | |
| {"dilution": 0.75, "certainty_loss": 0.8}, False) | |
| # DISCREDITATION methods | |
| methods[23] = SuppressionMethod(23, "Ridicule Normalization", Primitive.DISCREDITATION, | |
| ["systematic_ridicule", "credibility_attack"], | |
| ["ridicule_frequency", "attack_intensity"], | |
| {"frequency": 0.7, "intensity": 0.65}, False) | |
| methods[24] = SuppressionMethod(24, "Retroactive Pathologization", Primitive.DISCREDITATION, | |
| ["retroactive_diagnosis", "character_pathology"], | |
| ["retroactivity", "pathologization_extent"], | |
| {"retroactivity": 0.8, "extent": 0.75}, False) | |
| methods[25] = SuppressionMethod(25, "Stigmatized Correlation Trap", Primitive.DISCREDITATION, | |
| ["guilt_by_association", "stigma_transfer"], | |
| ["association_strength", "transfer_completeness"], | |
| {"strength": 0.7, "completeness": 0.65}, False) | |
| # ATTRITION methods | |
| methods[26] = SuppressionMethod(26, "Psychological Drip", Primitive.ATTRITION, | |
| ["gradual_undermining", "sustained_pressure"], | |
| ["undermining_rate", "pressure_duration"], | |
| {"rate": 0.6, "duration": 0.7}, False) | |
| methods[27] = SuppressionMethod(27, "Inquiry Fatigue", Primitive.ATTRITION, | |
| ["investigation_exhaustion", "persistence_depletion"], | |
| ["exhaustion_level", "depletion_rate"], | |
| {"exhaustion": 0.75, "depletion": 0.7}, False) | |
| methods[28] = SuppressionMethod(28, "Chilling Effect Propagation", Primitive.ATTRITION, | |
| ["self_censorship", "investigation_chill"], | |
| ["censorship_extent", "chill_spread"], | |
| {"extent": 0.8, "spread": 0.75}, False) | |
| # ACCESS_CONTROL methods | |
| methods[29] = SuppressionMethod(29, "Credential Gating", Primitive.ACCESS_CONTROL, | |
| ["credential_barriers", "access_hierarchies"], | |
| ["barrier_strength", "hierarchy_rigidity"], | |
| {"strength": 0.85, "rigidity": 0.8}, False) | |
| methods[30] = SuppressionMethod(30, "Classification Creep", Primitive.ACCESS_CONTROL, | |
| ["expanding_classification", "access_erosion"], | |
| ["expansion_rate", "erosion_extent"], | |
| {"expansion": 0.75, "erosion": 0.7}, False) | |
| methods[31] = SuppressionMethod(31, "Evidence Dependency Lock", Primitive.ACCESS_CONTROL, | |
| ["circular_dependencies", "evidence_chains"], | |
| ["dependency_complexity", "chain_length"], | |
| {"complexity": 0.8, "length": 0.75}, False) | |
| # TEMPORAL methods | |
| methods[32] = SuppressionMethod(32, "Temporal Dilution", Primitive.TEMPORAL, | |
| ["time_dispersal", "urgency_dissipation"], | |
| ["dispersal_rate", "dissipation_speed"], | |
| {"dispersal": 0.7, "speed": 0.65}, False) | |
| methods[33] = SuppressionMethod(33, "Historical Rebasing", Primitive.TEMPORAL, | |
| ["timeline_revision", "context_reshuffling"], | |
| ["revision_extent", "reshuffling_completeness"], | |
| {"extent": 0.8, "completeness": 0.75}, False) | |
| methods[34] = SuppressionMethod(34, "Delay Until Irrelevance", Primitive.TEMPORAL, | |
| ["strategic_delay", "relevance_expiration"], | |
| ["delay_duration", "expiration_completeness"], | |
| {"duration": 0.85, "completeness": 0.8}, False) | |
| # CONDITIONING methods | |
| methods[35] = SuppressionMethod(35, "Entertainment Conditioning", Primitive.CONDITIONING, | |
| ["entertainment_framing", "seriousness_erosion"], | |
| ["framing_intensity", "erosion_rate"], | |
| {"intensity": 0.7, "rate": 0.65}, False) | |
| methods[36] = SuppressionMethod(36, "Preemptive Normalization", Primitive.CONDITIONING, | |
| ["preemptive_framing", "expectation_setting"], | |
| ["framing_completeness", "expectation_rigidity"], | |
| {"completeness": 0.75, "rigidity": 0.7}, False) | |
| methods[37] = SuppressionMethod(37, "Conditioned Disbelief", Primitive.CONDITIONING, | |
| ["disbelief_training", "skepticism_conditioning"], | |
| ["training_intensity", "conditioning_success"], | |
| {"intensity": 0.8, "success": 0.75}, False) | |
| # META methods | |
| methods[38] = SuppressionMethod(38, "Pattern Denial", Primitive.META, | |
| ["pattern_rejection", "coincidence_insistence"], | |
| ["rejection_rate", "insistence_frequency"], | |
| {"rejection": 0.85, "frequency": 0.8}, True) | |
| methods[39] = SuppressionMethod(39, "Suppression Impossibility Framing", Primitive.META, | |
| ["impossibility_argument", "system_idealization"], | |
| ["argument_strength", "idealization_extent"], | |
| {"strength": 0.8, "extent": 0.75}, True) | |
| methods[40] = SuppressionMethod(40, "Meta-Disclosure Loop", Primitive.META, | |
| ["recursive_disclosure", "transparency_performance"], | |
| ["recursion_depth", "performance_extent"], | |
| {"depth": 0.7, "extent": 0.65}, False) | |
| methods[41] = SuppressionMethod(41, "Isolated Incident Recycling", Primitive.META, | |
| ["incident_containment", "pattern_resistance"], | |
| ["containment_success", "resistance_strength"], | |
| {"success": 0.75, "strength": 0.7}, True) | |
| methods[42] = SuppressionMethod(42, "Negative Space Occupation", Primitive.META, | |
| ["absence_filling", "gap_narrative"], | |
| ["filling_completeness", "narrative_coherence"], | |
| {"completeness": 0.8, "coherence": 0.75}, True) | |
| methods[43] = SuppressionMethod(43, "Novelty Illusion", Primitive.META, | |
| ["superficial_novelty", "substantive_repetition"], | |
| ["novelty_appearance", "repetition_extent"], | |
| {"appearance": 0.7, "extent": 0.65}, True) | |
| return methods | |
| def _derive_signatures_from_methods(self) -> Dict[str, List[int]]: | |
| """Map evidence signatures to the methods they indicate""" | |
| signatures = defaultdict(list) | |
| for method_id, method in self.methods.items(): | |
| for signature in method.observable_signatures: | |
| signatures[signature].append(method_id) | |
| return dict(signatures) | |
| def trace_detection_path(self, signature: str) -> Dict: | |
| """Show hierarchical trace from evidence to concepts""" | |
| methods = self.signatures.get(signature, []) | |
| primitives_used = set() | |
| lenses_used = set() | |
| for method_id in methods: | |
| method = self.methods[method_id] | |
| primitives_used.add(method.primitive) | |
| # Get lenses for this primitive | |
| lens_ids = self.primitives.get(method.primitive, []) | |
| lenses_used.update(lens_ids) | |
| return { | |
| "evidence": signature, | |
| "indicates_methods": [self.methods[mid].name for mid in methods], | |
| "method_count": len(methods), | |
| "primitives": [p.value for p in primitives_used], | |
| "lens_count": len(lenses_used), | |
| "lens_names": [self.lenses[lid].name for lid in sorted(lenses_used)[:3]] | |
| } | |
| def export_ontology(self, path: str): | |
| """Export the complete hierarchy""" | |
| ontology = { | |
| "hierarchy": { | |
| "total_lenses": len(self.lenses), | |
| "total_primitives": len(self.primitives), | |
| "total_methods": len(self.methods), | |
| "total_signatures": len(self.signatures) | |
| }, | |
| "primitives": { | |
| primitive.value: { | |
| "lens_count": len(lens_ids), | |
| "method_count": len([m for m in self.methods.values() if m.primitive == primitive]), | |
| "lens_examples": [self.lenses[lid].name for lid in lens_ids[:2]] | |
| } | |
| for primitive, lens_ids in self.primitives.items() | |
| } | |
| } | |
| with open(path, 'w') as f: | |
| json.dump(ontology, f, indent=2, default=str) | |
| ==================== LEDGER ==================== | |
| class Crypto: | |
| def init(self, key_path: str): | |
| self.key_path = key_path | |
| os.makedirs(key_path, exist_ok=True) | |
| def hash(self, data: str) -> str: | |
| return hashlib.sha3_512(data.encode()).hexdigest() | |
| def hash_dict(self, data: Dict) -> str: | |
| canonical = json.dumps(data, sort_keys=True, separators=(',', ':')) | |
| return self.hash(canonical) | |
| def sign(self, data: bytes, key_id: str) -> str: | |
| return f"sig_{key_id}_{hashlib.sha256(data).hexdigest()[:16]}" | |
| def verify(self, data: bytes, signature: str, key_id: str) -> bool: | |
| return signature.startswith(f"sig_{key_id}") | |
| class Ledger: | |
| def init(self, path: str, crypto: Crypto): | |
| self.path = path | |
| self.crypto = crypto | |
| self.chain: List[Dict] = [] | |
| self.index: Dict[str, List[str]] = defaultdict(list) | |
| self.temporal: Dict[str, List[str]] = defaultdict(list) | |
| self._load() | |
| def _load(self): | |
| if os.path.exists(self.path): | |
| try: | |
| with open(self.path, 'r') as f: | |
| data = json.load(f) | |
| self.chain = data.get("chain", []) | |
| self._rebuild_index() | |
| except: | |
| self._create_genesis() | |
| else: | |
| self._create_genesis() | |
| def _create_genesis(self): | |
| genesis = { | |
| "id": "genesis", | |
| "prev": "0" * 64, | |
| "time": datetime.utcnow().isoformat() + "Z", | |
| "nodes": [], | |
| "signatures": [], | |
| "hash": self.crypto.hash("genesis"), | |
| "distance": 0.0, | |
| "resistance": 1.0 | |
| } | |
| self.chain.append(genesis) | |
| self._save() | |
| def _rebuild_index(self): | |
| for block in self.chain: | |
| for node in block.get("nodes", []): | |
| node_hash = node["hash"] | |
| self.index[node_hash].append(block["id"]) | |
| date = block["time"][:10] | |
| self.temporal[date].append(block["id"]) | |
| def _save(self): | |
| data = { | |
| "chain": self.chain, | |
| "metadata": { | |
| "updated": datetime.utcnow().isoformat() + "Z", | |
| "blocks": len(self.chain), | |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain) | |
| } | |
| } | |
| with open(self.path + '.tmp', 'w') as f: | |
| json.dump(data, f, indent=2) | |
| os.replace(self.path + '.tmp', self.path) | |
| def add(self, node: RealityNode, validators: List[Tuple[str, Any]]) -> str: | |
| block_data = { | |
| "id": f"blk_{int(datetime.utcnow().timestamp())}_{hashlib.sha256(node.hash.encode()).hexdigest()[:8]}", | |
| "prev": self.chain[-1]["hash"] if self.chain else "0" * 64, | |
| "time": datetime.utcnow().isoformat() + "Z", | |
| "nodes": [node.canonical()], | |
| "signatures": self._get_signatures(block_data, validators), | |
| "meta": { | |
| "node_count": 1, | |
| "validator_count": len(validators) | |
| } | |
| } | |
| block_data["hash"] = self.crypto.hash_dict(block_data) | |
| block_data["distance"] = self._calc_distance(block_data) | |
| block_data["resistance"] = self._calc_resistance(block_data) | |
| if not self._verify_signatures(block_data, validators): | |
| raise ValueError("Invalid signatures") | |
| self.chain.append(block_data) | |
| for node_dict in block_data["nodes"]: | |
| node_hash = node_dict["hash"] | |
| self.index[node_hash].append(block_data["id"]) | |
| date = block_data["time"][:10] | |
| self.temporal[date].append(block_data["id"]) | |
| self._save() | |
| return block_data["id"] | |
| def _get_signatures(self, data: Dict, validators: List[Tuple[str, Any]]) -> List[Dict]: | |
| signatures = [] | |
| data_bytes = json.dumps(data, sort_keys=True).encode() | |
| for val_id, _ in validators: | |
| sig = self.crypto.sign(data_bytes, val_id) | |
| signatures.append({ | |
| "validator": val_id, | |
| "signature": sig, | |
| "time": datetime.utcnow().isoformat() + "Z" | |
| }) | |
| return signatures | |
| def _verify_signatures(self, block: Dict, validators: List[Tuple[str, Any]]) -> bool: | |
| block_copy = block.copy() | |
| signatures = block_copy.pop("signatures", []) | |
| block_bytes = json.dumps(block_copy, sort_keys=True).encode() | |
| for sig_info in signatures: | |
| val_id = sig_info["validator"] | |
| signature = sig_info["signature"] | |
| if not self.crypto.verify(block_bytes, signature, val_id): | |
| return False | |
| return True | |
| def _calc_distance(self, block: Dict) -> float: | |
| val_count = len(block.get("signatures", [])) | |
| node_count = len(block.get("nodes", [])) | |
| if val_count == 0 or node_count == 0: | |
| return 0.0 | |
| return min(1.0, (val_count * 0.25) + (node_count * 0.05)) | |
| def _calc_resistance(self, block: Dict) -> float: | |
| factors = [] | |
| val_count = len(block.get("signatures", [])) | |
| factors.append(min(1.0, val_count / 7.0)) | |
| total_refs = 0 | |
| for node in block.get("nodes", []): | |
| for refs in node.get("refs", {}).values(): | |
| total_refs += len(refs) | |
| factors.append(min(1.0, total_refs / 15.0)) | |
| total_wits = sum(len(node.get("witnesses", [])) for node in block.get("nodes", [])) | |
| factors.append(min(1.0, total_wits / 10.0)) | |
| return sum(factors) / len(factors) if factors else 0.0 | |
| def verify(self) -> Dict: | |
| if not self.chain: | |
| return {"valid": False, "error": "Empty"} | |
| for i in range(1, len(self.chain)): | |
| curr = self.chain[i] | |
| prev = self.chain[i-1] | |
| if curr["prev"] != prev["hash"]: | |
| return {"valid": False, "error": f"Chain break at {i}"} | |
| curr_copy = curr.copy() | |
| curr_copy.pop("hash", None) | |
| expected = self.crypto.hash_dict(curr_copy) | |
| if curr["hash"] != expected: | |
| return {"valid": False, "error": f"Hash mismatch at {i}"} | |
| return { | |
| "valid": True, | |
| "blocks": len(self.chain), | |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain), | |
| "avg_resistance": statistics.mean(b.get("resistance", 0) for b in self.chain) if self.chain else 0 | |
| } | |
| ==================== SEPARATOR ==================== | |
| class Separator: | |
| def init(self, ledger: Ledger, path: str): | |
| self.ledger = ledger | |
| self.path = path | |
| self.graph = defaultdict(list) | |
| self._load() | |
| def _load(self): | |
| graph_path = os.path.join(self.path, "graph.pkl") | |
| if os.path.exists(graph_path): | |
| try: | |
| with open(graph_path, 'rb') as f: | |
| self.graph = pickle.load(f) | |
| except: | |
| self.graph = defaultdict(list) | |
| def _save(self): | |
| os.makedirs(self.path, exist_ok=True) | |
| graph_path = os.path.join(self.path, "graph.pkl") | |
| with open(graph_path, 'wb') as f: | |
| pickle.dump(self.graph, f) | |
| def add(self, node_hashes: List[str], interpretation: Dict, interpreter: str, confidence: float = 0.5) -> str: | |
| for h in node_hashes: | |
| if h not in self.ledger.index: | |
| raise ValueError(f"Node {h[:16]}... not found") | |
| int_id = f"int_{hashlib.sha256(json.dumps(interpretation, sort_keys=True).encode()).hexdigest()[:16]}" | |
| int_node = { | |
| "id": int_id, | |
| "nodes": node_hashes, | |
| "content": interpretation, | |
| "interpreter": interpreter, | |
| "confidence": max(0.0, min(1.0, confidence)), | |
| "time": datetime.utcnow().isoformat() + "Z", | |
| "provenance": self._get_provenance(node_hashes) | |
| } | |
| self.graph[int_id] = int_node | |
| for node_hash in node_hashes: | |
| if "refs" not in self.graph: | |
| self.graph["refs"] = {} | |
| if node_hash not in self.graph["refs"]: | |
| self.graph["refs"][node_hash] = [] | |
| self.graph["refs"][node_hash].append(int_id) | |
| self._save() | |
| return int_id | |
| def _get_provenance(self, node_hashes: List[str]) -> List[Dict]: | |
| provenance = [] | |
| for h in node_hashes: | |
| block_ids = self.ledger.index.get(h, []) | |
| if block_ids: | |
| provenance.append({ | |
| "node": h, | |
| "blocks": len(block_ids), | |
| "first": block_ids[0] if block_ids else None | |
| }) | |
| return provenance | |
| def get_conflicts(self, node_hash: str) -> Dict: | |
| int_ids = self.graph.get("refs", {}).get(node_hash, []) | |
| interpretations = [self.graph[i] for i in int_ids if i in self.graph] | |
| if not interpretations: | |
| return {"node": node_hash, "count": 0, "groups": []} | |
| groups = self._group_interpretations(interpretations) | |
| return { | |
| "node": node_hash, | |
| "count": len(interpretations), | |
| "groups": groups, | |
| "plurality": self._calc_plurality(interpretations), | |
| "confidence_range": { | |
| "min": min(i.get("confidence", 0) for i in interpretations), | |
| "max": max(i.get("confidence", 0) for i in interpretations), | |
| "avg": statistics.mean(i.get("confidence", 0) for i in interpretations) | |
| } | |
| } | |
| def _group_interpretations(self, interpretations: List[Dict]) -> List[List[Dict]]: | |
| if len(interpretations) <= 1: | |
| return [interpretations] if interpretations else [] | |
| groups_dict = defaultdict(list) | |
| for intp in interpretations: | |
| content_hash = hashlib.sha256( | |
| json.dumps(intp["content"], sort_keys=True).encode() | |
| ).hexdigest()[:8] | |
| groups_dict[content_hash].append(intp) | |
| return list(groups_dict.values()) | |
| def _calc_plurality(self, interpretations: List[Dict]) -> float: | |
| if len(interpretations) <= 1: | |
| return 0.0 | |
| unique = set() | |
| for intp in interpretations: | |
| content_hash = hashlib.sha256( | |
| json.dumps(intp["content"], sort_keys=True).encode() | |
| ).hexdigest() | |
| unique.add(content_hash) | |
| return min(1.0, len(unique) / len(interpretations)) | |
| def stats(self) -> Dict: | |
| int_nodes = [v for k, v in self.graph.items() if k != "refs"] | |
| if not int_nodes: | |
| return {"count": 0, "interpreters": 0, "avg_conf": 0.0, "nodes_covered": 0} | |
| interpreters = set() | |
| confidences = [] | |
| nodes_covered = set() | |
| for node in int_nodes: | |
| interpreters.add(node.get("interpreter", "unknown")) | |
| confidences.append(node.get("confidence", 0.5)) | |
| nodes_covered.update(node.get("nodes", [])) | |
| return { | |
| "count": len(int_nodes), | |
| "interpreters": len(interpreters), | |
| "avg_conf": statistics.mean(confidences) if confidences else 0.0, | |
| "nodes_covered": len(nodes_covered), | |
| "interpreter_list": list(interpreters) | |
| } | |
| ==================== HIERARCHICAL DETECTOR ==================== | |
| class HierarchicalDetector: | |
| def init(self, hierarchy: SuppressionHierarchy, ledger: Ledger, separator: Separator): | |
| self.hierarchy = hierarchy | |
| self.ledger = ledger | |
| self.separator = separator | |
| def detect_from_ledger(self) -> Dict: | |
| """Bottom-up detection: Evidence → Methods → Primitives → Lenses""" | |
| # Step 1: Find evidence signatures | |
| found_signatures = self._scan_for_signatures() | |
| # Step 2: Map signatures to methods | |
| method_results = self._signatures_to_methods(found_signatures) | |
| # Step 3: Group by primitives | |
| primitive_analysis = self._analyze_primitives(method_results) | |
| # Step 4: Infer lenses | |
| lens_inference = self._infer_lenses(primitive_analysis) | |
| return { | |
| "detection_timestamp": datetime.utcnow().isoformat() + "Z", | |
| "evidence_found": len(found_signatures), | |
| "signatures": found_signatures, | |
| "method_results": method_results, | |
| "primitive_analysis": primitive_analysis, | |
| "lens_inference": lens_inference, | |
| "hierarchical_trace": [ | |
| self.hierarchy.trace_detection_path(sig) | |
| for sig in found_signatures[:3] | |
| ] | |
| } | |
| def _scan_for_signatures(self) -> List[str]: | |
| """Look for evidence patterns in the ledger""" | |
| found = [] | |
| # Check for entity disappearance (Total Erasure signature) | |
| for i in range(len(self.ledger.chain) - 1): | |
| curr_block = self.ledger.chain[i] | |
| next_block = self.ledger.chain[i + 1] | |
| curr_entities = self._extract_entities(curr_block) | |
| next_entities = self._extract_entities(next_block) | |
| if curr_entities and not next_entities: | |
| found.append("entity_present_then_absent") | |
| # Check for single explanation (Official Narrative Closure) | |
| stats = self.separator.stats() | |
| if stats["interpreters"] == 1 and stats["count"] > 3: | |
| found.append("single_explanation") | |
| # Check for gradual fading (Soft Erasure) | |
| decay = self._analyze_decay_pattern() | |
| if decay > 0.5: | |
| found.append("gradual_fading") | |
| # Check for information clusters (Compartmentalization) | |
| clusters = self._analyze_information_clusters() | |
| if clusters > 0.7: | |
| found.append("information_clusters") | |
| # Check for narrowed focus (Scope Contraction) | |
| focus = self._analyze_scope_focus() | |
| if focus > 0.6: | |
| found.append("narrowed_focus") | |
| return list(set(found)) | |
| def _extract_entities(self, block: Dict) -> Set[str]: | |
| entities = set() | |
| for node in block.get("nodes", []): | |
| content = json.dumps(node) | |
| if "entity" in content or "name" in content: | |
| entities.add(f"ent_{hashlib.sha256(content.encode()).hexdigest()[:8]}") | |
| return entities | |
| def _analyze_decay_pattern(self) -> float: | |
| ref_counts = [] | |
| for block in self.ledger.chain[-10:]: | |
| count = 0 | |
| for node in block.get("nodes", []): | |
| for refs in node.get("refs", {}).values(): | |
| count += len(refs) | |
| ref_counts.append(count) | |
| if len(ref_counts) < 3: | |
| return 0.0 | |
| first_half = ref_counts[:len(ref_counts)//2] | |
| second_half = ref_counts[len(ref_counts)//2:] | |
| if not first_half or not second_half: | |
| return 0.0 | |
| avg_first = statistics.mean(first_half) | |
| avg_second = statistics.mean(second_half) | |
| if avg_first == 0: | |
| return 0.0 | |
| return max(0.0, (avg_first - avg_second) / avg_first) | |
| def _analyze_information_clusters(self) -> float: | |
| total_links = 0 | |
| possible_links = 0 | |
| for block in self.ledger.chain[-5:]: | |
| nodes = block.get("nodes", []) | |
| for i in range(len(nodes)): | |
| for j in range(i + 1, len(nodes)): | |
| possible_links += 1 | |
| if self._are_nodes_linked(nodes[i], nodes[j]): | |
| total_links += 1 | |
| return 1.0 - (total_links / possible_links if possible_links > 0 else 0.0) | |
| def _are_nodes_linked(self, node1: Dict, node2: Dict) -> bool: | |
| refs1 = set() | |
| refs2 = set() | |
| for ref_list in node1.get("refs", {}).values(): | |
| refs1.update(ref_list) | |
| for ref_list in node2.get("refs", {}).values(): | |
| refs2.update(ref_list) | |
| return bool(refs1 & refs2) | |
| def _analyze_scope_focus(self) -> float: | |
| type_counts = defaultdict(int) | |
| total = 0 | |
| for block in self.ledger.chain: | |
| for node in block.get("nodes", []): | |
| node_type = node.get("type", "unknown") | |
| type_counts[node_type] += 1 | |
| total += 1 | |
| if total == 0: | |
| return 0.0 | |
| # Calculate concentration (higher = more focused on few types) | |
| max_type = max(type_counts.values(), default=0) | |
| return max_type / total if total > 0 else 0.0 | |
| def _signatures_to_methods(self, signatures: List[str]) -> List[Dict]: | |
| """Map evidence signatures to detected methods""" | |
| results = [] | |
| for sig in signatures: | |
| method_ids = self.hierarchy.signatures.get(sig, []) | |
| for method_id in method_ids: | |
| method = self.hierarchy.methods[method_id] | |
| # Calculate confidence based on evidence strength | |
| confidence = self._calculate_method_confidence(method, sig) | |
| if method.implemented and confidence > 0.5: | |
| results.append({ | |
| "method_id": method.id, | |
| "method_name": method.name, | |
| "primitive": method.primitive.value, | |
| "confidence": round(confidence, 3), | |
| "evidence_signature": sig, | |
| "implemented": True | |
| }) | |
| return sorted(results, key=lambda x: x["confidence"], reverse=True) | |
| def _calculate_method_confidence(self, method: SuppressionMethod, signature: str) -> float: | |
| """Calculate detection confidence for a method""" | |
| base_confidence = 0.7 if method.implemented else 0.3 | |
| # Adjust based on evidence strength | |
| if "entity_present_then_absent" in signature: | |
| return min(0.9, base_confidence + 0.2) | |
| elif "single_explanation" in signature: | |
| return min(0.85, base_confidence + 0.15) | |
| elif "gradual_fading" in signature: | |
| return min(0.8, base_confidence + 0.1) | |
| return base_confidence | |
| def _analyze_primitives(self, method_results: List[Dict]) -> Dict: | |
| """Analyze which primitives are most active""" | |
| primitive_counts = defaultdict(int) | |
| primitive_confidence = defaultdict(list) | |
| for result in method_results: | |
| primitive = result["primitive"] | |
| primitive_counts[primitive] += 1 | |
| primitive_confidence[primitive].append(result["confidence"]) | |
| analysis = {} | |
| for primitive, count in primitive_counts.items(): | |
| confidences = primitive_confidence[primitive] | |
| analysis[primitive] = { | |
| "method_count": count, | |
| "average_confidence": round(statistics.mean(confidences), 3) if confidences else 0.0, | |
| "dominant_methods": [ | |
| r["method_name"] for r in method_results | |
| if r["primitive"] == primitive | |
| ][:2] | |
| } | |
| return analysis | |
| def _infer_lenses(self, primitive_analysis: Dict) -> Dict: | |
| """Infer which conceptual lenses might be active""" | |
| active_primitives = [p for p, data in primitive_analysis.items() if data["method_count"] > 0] | |
| active_lenses = set() | |
| for primitive_str in active_primitives: | |
| primitive = Primitive(primitive_str) | |
| lens_ids = self.hierarchy.primitives.get(primitive, []) | |
| active_lenses.update(lens_ids) | |
| lens_details = [] | |
| for lens_id in sorted(active_lenses)[:10]: # Top 10 lenses | |
| lens = self.hierarchy.lenses.get(lens_id) | |
| if lens: | |
| lens_details.append({ | |
| "id": lens.id, | |
| "name": lens.name, | |
| "archetype": lens.archetype, | |
| "mechanism": lens.suppression_mechanism | |
| }) | |
| return { | |
| "active_lens_count": len(active_lenses), | |
| "active_primitives": active_primitives, | |
| "lens_details": lens_details, | |
| "architecture_analysis": self._analyze_architecture(active_primitives, active_lenses) | |
| } | |
| def _analyze_architecture(self, active_primitives: List[str], active_lenses: Set[int]) -> str: | |
| """Analyze the suppression architecture complexity""" | |
| analysis = [] | |
| primitive_count = len(active_primitives) | |
| lens_count = len(active_lenses) | |
| if primitive_count >= 3: | |
| analysis.append(f"Complex suppression architecture ({primitive_count} primitives)") | |
| elif primitive_count > 0: | |
| analysis.append(f"Basic suppression patterns detected") | |
| if lens_count > 20: | |
| analysis.append("Deep conceptual framework active") | |
| elif lens_count > 10: | |
| analysis.append("Multiple conceptual layers active") | |
| # Check for composite patterns | |
| if Primitive.ERASURE.value in active_primitives and Primitive.NARRATIVE_CAPTURE.value in active_primitives: | |
| analysis.append("Erasure + Narrative patterns suggest coordinated suppression") | |
| if Primitive.META.value in active_primitives: | |
| analysis.append("Meta-suppression patterns detected (self-referential control)") | |
| return " | ".join(analysis) if analysis else "Minimal suppression patterns" | |
| ==================== MAIN ENGINE ==================== | |
| class CompleteEngine: | |
| def init(self, path: str = "complete_engine"): | |
| os.makedirs(path, exist_ok=True) | |
| print("=" * 80) | |
| print("HIERARCHICAL SUPPRESSION DETECTION ENGINE") | |
| print("73 Lenses → 10 Primitives → 43 Methods → Evidence Signatures") | |
| print("=" * 80) | |
| # Initialize hierarchy | |
| self.hierarchy = SuppressionHierarchy() | |
| # Initialize ledger and separator | |
| self.crypto = Crypto(f"{path}/keys") | |
| self.ledger = Ledger(f"{path}/ledger.json", self.crypto) | |
| self.separator = Separator(self.ledger, f"{path}/interpretations") | |
| # Initialize detector | |
| self.detector = HierarchicalDetector(self.hierarchy, self.ledger, self.separator) | |
| # Export ontology | |
| self.hierarchy.export_ontology(f"{path}/suppression_hierarchy.json") | |
| print(f"✓ Hierarchy initialized: {len(self.hierarchy.lenses)} lenses") | |
| print(f"✓ Primitives defined: {len(self.hierarchy.primitives)}") | |
| print(f"✓ Methods available: {len(self.hierarchy.methods)}") | |
| print(f"✓ Evidence signatures: {len(self.hierarchy.signatures)}") | |
| print(f"✓ Ledger ready: {len(self.ledger.chain)} blocks") | |
| def record_reality(self, content: str, type: str, source: str, | |
| witnesses: List[str] = None, refs: Dict[str, List[str]] = None) -> str: | |
| """Record immutable reality node""" | |
| content_hash = self.crypto.hash(content) | |
| signature = self.crypto.sign(content.encode(), source) | |
| node = RealityNode( | |
| hash=content_hash, | |
| type=type, | |
| source=source, | |
| signature=signature, | |
| timestamp=datetime.utcnow().isoformat() + "Z", | |
| witnesses=witnesses or [], | |
| refs=refs or {} | |
| ) | |
| # Use dummy validators for demo | |
| validators = [("validator_1", None), ("validator_2", None)] | |
| block_id = self.ledger.add(node, validators) | |
| print(f"✓ Recorded: {content_hash[:16]}... in block {block_id}") | |
| return content_hash | |
| def add_interpretation(self, node_hashes: List[str], interpretation: Dict, | |
| interpreter: str, confidence: float = 0.5) -> str: | |
| """Add interpretation (separate from reality)""" | |
| int_id = self.separator.add(node_hashes, interpretation, interpreter, confidence) | |
| print(f"✓ Interpretation added: {int_id} by {interpreter}") | |
| return int_id | |
| def detect_suppression(self) -> Dict: | |
| """Run hierarchical suppression detection""" | |
| print("\n🔍 Detecting suppression patterns...") | |
| results = self.detector.detect_from_ledger() | |
| print(f"✓ Evidence found: {results['evidence_found']} signatures") | |
| print(f"✓ Methods detected: {len(results['method_results'])}") | |
| print(f"✓ Primitives active: {len(results['primitive_analysis'])}") | |
| print(f"✓ Lenses inferred: {results['lens_inference']['active_lens_count']}") | |
| if results['method_results']: | |
| print("\nTop detected methods:") | |
| for method in results['method_results'][:5]: | |
| print(f" • {method['method_name']}: {method['confidence']:.1%}") | |
| architecture = results['lens_inference']['architecture_analysis'] | |
| if architecture: | |
| print(f"\nArchitecture: {architecture}") | |
| return results | |
| def get_system_status(self) -> Dict: | |
| """Get complete system status""" | |
| ledger_status = self.ledger.verify() | |
| separator_stats = self.separator.stats() | |
| implemented_methods = sum(1 for m in self.hierarchy.methods.values() if m.implemented) | |
| return { | |
| "system": { | |
| "lenses": len(self.hierarchy.lenses), | |
| "primitives": len(self.hierarchy.primitives), | |
| "methods": len(self.hierarchy.methods), | |
| "methods_implemented": implemented_methods, | |
| "signatures": len(self.hierarchy.signatures) | |
| }, | |
| "ledger": { | |
| "valid": ledger_status["valid"], | |
| "blocks": ledger_status.get("blocks", 0), | |
| "nodes": ledger_status.get("nodes", 0), | |
| "avg_resistance": ledger_status.get("avg_resistance", 0) | |
| }, | |
| "interpretations": separator_stats, | |
| "hierarchical_ready": True | |
| } | |
| ==================== DEMONSTRATION ==================== | |
| def demonstrate_hierarchical_detection(): | |
| """Demonstrate the complete hierarchical system""" | |
| engine = CompleteEngine("hierarchical_demo") | |
| print("\n📝 Recording reality nodes...") | |
| # Record historical events | |
| h1 = engine.record_reality( | |
| "J.P. Morgan withdrew Tesla funding in 1903", | |
| "historical_event", | |
| "financial_archive_001", | |
| witnesses=["bank_record_1903", "correspondence_archive"], | |
| refs={"financial": ["morgan_ledgers"], "news": ["ny_times_1903"]} | |
| ) | |
| h2 = engine.record_reality( | |
| "FBI seized Tesla papers in 1943", | |
| "historical_event", | |
| "foia_document_001", | |
| witnesses=["inventory_1943", "hotel_records"], | |
| refs={"government": ["fbi_files"], "legal": ["property_records"]} | |
| ) | |
| h3 = engine.record_reality( | |
| "Witness disappeared after testimony in 1952", | |
| "historical_event", | |
| "court_archive_001", | |
| witnesses=["court_record_1952"], | |
| refs={"legal": ["court_documents"]} | |
| ) | |
| print("\n💭 Adding interpretations...") | |
| # Official interpretation | |
| engine.add_interpretation( | |
| [h1, h2], | |
| {"narrative": "Standard business and government operations", "agency": "normal"}, | |
| "official_historian", | |
| 0.85 | |
| ) | |
| # Alternative interpretation | |
| engine.add_interpretation( | |
| [h1, h2, h3], | |
| {"narrative": "Pattern of suppression across generations", "agency": "coordinated"}, | |
| "independent_researcher", | |
| 0.65 | |
| ) | |
| print("\n🔍 Running hierarchical suppression detection...") | |
| results = engine.detect_suppression() | |
| print("\n📊 System Status:") | |
| status = engine.get_system_status() | |
| print(f" • Lenses: {status['system']['lenses']}") | |
| print(f" • Primitives: {status['system']['primitives']}") | |
| print(f" • Methods: {status['system']['methods']} ({status['system']['methods_implemented']} implemented)") | |
| print(f" • Ledger blocks: {status['ledger']['blocks']}") | |
| print(f" • Reality nodes: {status['ledger']['nodes']}") | |
| print(f" • Interpretations: {status['interpretations']['count']}") | |
| print(f" • Unique interpreters: {status['interpretations']['interpreters']}") | |
| print("\n" + "=" * 80) | |
| print("✅ HIERARCHICAL SYSTEM OPERATIONAL") | |
| print("Evidence → Methods → Primitives → Lenses") | |
| print("No circular references, clean abstraction layers") | |
| print("=" * 80) | |
| if name == "main": | |
| demonstrate_hierarchical_detection() |