# ── compress.py ── """One-function compression API for Headroom. The simplest way to use Headroom — no proxy, no config, just compress: from headroom import compress result = compress(messages, model="claude-sonnet-4-5-20250929") result.messages # Compressed messages (same format, fewer tokens) result.tokens_saved # Tokens saved result.compression_ratio # e.g., 0.35 means 65% saved Works with any LLM client, any proxy, any framework. Just compress the messages before sending them. Examples: # With Anthropic SDK from anthropic import Anthropic from headroom import compress client = Anthropic() messages = [{"role": "user", "content": huge_tool_output}] compressed = compress(messages, model="claude-sonnet-4-5-20250929") response = client.messages.create( model="claude-sonnet-4-5-20250929", messages=compressed.messages, ) # With OpenAI SDK from openai import OpenAI from headroom import compress client = OpenAI() messages = [{"role": "user", "content": "analyze this"}, {"role": "tool", "content": big_data}] compressed = compress(messages, model="gpt-4o") response = client.chat.completions.create(model="gpt-4o", messages=compressed.messages) # With LiteLLM import litellm from headroom import compress messages = [...] compressed = compress(messages, model="bedrock/claude-sonnet") response = litellm.completion(model="bedrock/claude-sonnet", messages=compressed.messages) # With any HTTP client import httpx from headroom import compress compressed = compress(messages, model="claude-sonnet-4-5-20250929") httpx.post("https://api.anthropic.com/v1/messages", json={ "model": "claude-sonnet-4-5-20250929", "messages": compressed.messages, }) """ from __future__ import annotations import logging import threading from dataclasses import dataclass, field from typing import Any from .observability import get_otel_metrics from .pipeline import PipelineExtensionManager, PipelineStage, summarize_routing_markers from .utils import extract_user_query as _extract_user_query logger = logging.getLogger(__name__) # Lazy-initialized singleton pipeline _pipeline = None _pipeline_lock = threading.Lock() @dataclass class CompressConfig: """User-facing compression options. Controls what gets compressed, how aggressively, and with which model. Pass to ``compress()`` or any integration that uses headroom. Examples:: # Coding agent (default — skip user messages, protect recent) compress(messages, model="gpt-4o") # Financial document (compress everything, keep 50%) compress(messages, model="claude-opus-4-20250514", compress_user_messages=True, target_ratio=0.5, protect_recent=0, ) # Aggressive (logs, search results) compress(messages, model="gpt-4o", target_ratio=0.2) """ # What to compress compress_user_messages: bool = False """Compress user messages too (default: skip them for coding agents). Set True for document compression, RAG pipelines, or when user messages contain large tool outputs.""" compress_system_messages: bool = True """Compress system messages (default: True). Set False to preserve system prompts exactly as-is. Useful for voice agents where tool definitions and instructions must not be altered.""" protect_recent: int = 4 """Don't compress the last N messages (they're the active conversation). Set 0 to compress everything.""" protect_analysis_context: bool = True """Detect 'analyze'/'review' intent and protect code from compression.""" # How aggressive target_ratio: float | None = None """Keep ratio for Kompress. None = model decides (~15% kept, aggressive). 0.5 = keep 50% (safe for documents). 0.7 = keep 70% (conservative). Only affects Kompress (text compression). SmartCrusher (JSON) has its own logic based on array dedup.""" min_tokens_to_compress: int = 250 """Minimum token count for a message to be compressed. Messages shorter than this are left unchanged. Default 250. Set lower for voice agents where turns are short.""" # Model variant kompress_model: str | None = None """Kompress model ID. None = default (chopratejas/kompress-base). Set to a HuggingFace model ID for domain-specific compression. Set to 'disabled' to skip ML compression entirely (only SmartCrusher + CacheAligner will run).""" @dataclass class CompressResult: """Result of compressing messages. Attributes: messages: The compressed messages (same format as input). tokens_before: Token count before compression. tokens_after: Token count after compression. tokens_saved: Tokens removed by compression. compression_ratio: Ratio of tokens saved (0.0 = no savings, 1.0 = 100% removed). transforms_applied: List of transforms that were applied. """ messages: list[dict[str, Any]] tokens_before: int = 0 tokens_after: int = 0 tokens_saved: int = 0 compression_ratio: float = 0.0 transforms_applied: list[str] = field(default_factory=list) def compress( messages: list[dict[str, Any]], model: str = "claude-sonnet-4-5-20250929", model_limit: int = 200000, optimize: bool = True, hooks: Any = None, config: CompressConfig | None = None, **kwargs: Any, ) -> CompressResult: """Compress messages using Headroom's full compression pipeline. This is the simplest way to use Headroom. No proxy, no config needed. Just pass messages and get compressed messages back. Args: messages: List of messages in Anthropic or OpenAI format. model: Model name (used for token counting and context limit). model_limit: Model's context window size in tokens. optimize: Whether to actually compress (False = passthrough for A/B testing). hooks: Optional CompressionHooks instance for custom behavior. config: Compression options (CompressConfig). Overrides defaults. **kwargs: Shorthand for CompressConfig fields. These override config: compress_user_messages, target_ratio, protect_recent, protect_analysis_context, kompress_model. Returns: CompressResult with compressed messages and metrics. Examples:: # Default (coding agent) result = compress(messages, model="gpt-4o") # Financial document (keep 50%, compress everything) result = compress(messages, model="claude-opus-4-20250514", compress_user_messages=True, target_ratio=0.5, protect_recent=0, ) """ if not messages or not optimize: return CompressResult(messages=messages) # Build config from explicit config + kwargs cfg = config or CompressConfig() config_fields = {f.name for f in cfg.__dataclass_fields__.values()} for key, value in kwargs.items(): if key in config_fields: setattr(cfg, key, value) pipeline = _get_pipeline() pipeline_extensions = PipelineExtensionManager(hooks=hooks, discover=False) try: # Compute biases from hooks if provided biases = None if hooks: from headroom.hooks import CompressContext ctx = CompressContext(model=model) messages = hooks.pre_compress(messages, ctx) biases = hooks.compute_biases(messages, ctx) received_event = pipeline_extensions.emit( PipelineStage.INPUT_RECEIVED, operation="compress", model=model, messages=messages, ) if received_event.messages is not None: messages = received_event.messages # Extract user query from messages so transforms can score by # relevance. Without this, SmartCrusher selects items by statistics # alone (position, anomaly) and may drop relevant content. context = _extract_user_query(messages) result = pipeline.apply( messages=messages, model=model, model_limit=model_limit, context=context, biases=biases, # Pass CompressConfig options through to transforms compress_user_messages=cfg.compress_user_messages, compress_system_messages=cfg.compress_system_messages, target_ratio=cfg.target_ratio, protect_recent=cfg.protect_recent, protect_analysis_context=cfg.protect_analysis_context, min_tokens_to_compress=cfg.min_tokens_to_compress, kompress_model=cfg.kompress_model, ) tokens_before = result.tokens_before tokens_after = result.tokens_after compressed_messages = result.messages routing_markers = summarize_routing_markers(result.transforms_applied) if routing_markers: routed_event = pipeline_extensions.emit( PipelineStage.INPUT_ROUTED, operation="compress", model=model, messages=compressed_messages, metadata={ "routing_markers": routing_markers, "transforms_applied": result.transforms_applied, }, ) if routed_event.messages is not None: compressed_messages = routed_event.messages compressed_event = pipeline_extensions.emit( PipelineStage.INPUT_COMPRESSED, operation="compress", model=model, messages=compressed_messages, metadata={ "tokens_before": tokens_before, "tokens_after": tokens_after, "transforms_applied": result.transforms_applied, }, ) if compressed_event.messages is not None: compressed_messages = compressed_event.messages tokens_saved = tokens_before - tokens_after ratio = tokens_saved / tokens_before if tokens_before > 0 else 0.0 # Post-compress hook if hooks and tokens_saved > 0: from headroom.hooks import CompressEvent hooks.post_compress( CompressEvent( tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, compression_ratio=ratio, transforms_applied=result.transforms_applied, model=model, ) ) return CompressResult( messages=compressed_messages, tokens_before=tokens_before, tokens_after=tokens_after, tokens_saved=tokens_saved, compression_ratio=ratio, transforms_applied=result.transforms_applied, ) except Exception as e: get_otel_metrics().record_compression_failure( model=model, operation="compress", error_type=type(e).__name__, ) logger.warning("Compression failed, returning original messages: %s", e) return CompressResult( messages=messages, tokens_before=0, tokens_after=0, tokens_saved=0, compression_ratio=0.0, ) def _get_pipeline() -> Any: """Get or create the singleton compression pipeline.""" global _pipeline if _pipeline is not None: return _pipeline with _pipeline_lock: if _pipeline is not None: return _pipeline from headroom.transforms import TransformPipeline # Default pipeline: CacheAligner → ContentRouter # CacheAligner: stabilizes prefix for provider KV cache hits # ContentRouter: routes to the right compressor per content type # (SmartCrusher for JSON, CodeCompressor for code, Kompress for text) # Phase B PR-B1 retired the trailing context-management stage — # live-zone-only compression never drops messages. _pipeline = TransformPipeline() logger.debug("Headroom compression pipeline initialized") return _pipeline # ── pipeline.py ── """Canonical Headroom pipeline lifecycle and extension contracts.""" from __future__ import annotations import importlib.metadata import logging from dataclasses import dataclass, field from enum import Enum from typing import Any, Protocol log = logging.getLogger(__name__) ENTRY_POINT_GROUP = "headroom.pipeline_extension" class PipelineStage(str, Enum): """Stable lifecycle stages for the canonical Headroom pipeline.""" SETUP = "setup" PRE_START = "pre_start" POST_START = "post_start" INPUT_RECEIVED = "input_received" INPUT_CACHED = "input_cached" INPUT_ROUTED = "input_routed" INPUT_COMPRESSED = "input_compressed" INPUT_REMEMBERED = "input_remembered" PRE_SEND = "pre_send" POST_SEND = "post_send" RESPONSE_RECEIVED = "response_received" CANONICAL_PIPELINE_STAGES: tuple[PipelineStage, ...] = ( PipelineStage.SETUP, PipelineStage.PRE_START, PipelineStage.POST_START, PipelineStage.INPUT_RECEIVED, PipelineStage.INPUT_CACHED, PipelineStage.INPUT_ROUTED, PipelineStage.INPUT_COMPRESSED, PipelineStage.INPUT_REMEMBERED, PipelineStage.PRE_SEND, PipelineStage.POST_SEND, PipelineStage.RESPONSE_RECEIVED, ) @dataclass class PipelineEvent: """Event emitted at a canonical pipeline stage. Extensions may mutate ``messages``, ``tools``, ``headers``, or ``metadata`` in place, or return a replacement ``PipelineEvent`` from ``on_pipeline_event``. """ stage: PipelineStage operation: str request_id: str = "" provider: str = "" model: str = "" messages: list[dict[str, Any]] | None = None tools: list[dict[str, Any]] | None = None headers: dict[str, str] | None = None response: Any = None metadata: dict[str, Any] = field(default_factory=dict) class PipelineExtension(Protocol): """Request lifecycle extension contract for the canonical pipeline.""" def on_pipeline_event(self, event: PipelineEvent) -> PipelineEvent | None: """Handle a canonical pipeline event.""" def discover_pipeline_extensions() -> list[PipelineExtension]: """Load registered pipeline extensions from Python entry points.""" discovered: list[PipelineExtension] = [] try: entries = importlib.metadata.entry_points(group=ENTRY_POINT_GROUP) except Exception as exc: # noqa: BLE001 - importlib metadata varies by runtime log.debug("pipeline extensions: entry-point enumeration failed: %s", exc) return discovered for entry in entries: try: extension = entry.load() except Exception as exc: # noqa: BLE001 - third-party load failures are isolated log.warning("pipeline extension %r failed to load: %s", entry.name, exc) continue if isinstance(extension, type): try: extension = extension() except Exception as exc: # noqa: BLE001 log.warning("pipeline extension %r failed to initialize: %s", entry.name, exc) continue discovered.append(extension) return discovered def summarize_routing_markers(transforms_applied: list[str]) -> list[str]: """Return the routed transform markers emitted by ContentRouter.""" return [item for item in transforms_applied if item.startswith("router:")] class PipelineExtensionManager: """Dispatch canonical pipeline events to configured extensions.""" def __init__( self, *, hooks: Any = None, extensions: list[Any] | None = None, discover: bool = True, ) -> None: resolved: list[Any] = [] if hooks is not None and callable(getattr(hooks, "on_pipeline_event", None)): resolved.append(hooks) if extensions: resolved.extend(extensions) if discover: resolved.extend(discover_pipeline_extensions()) self._extensions = resolved @property def enabled(self) -> bool: return bool(self._extensions) def emit( self, stage: PipelineStage, *, operation: str, request_id: str = "", provider: str = "", model: str = "", messages: list[dict[str, Any]] | None = None, tools: list[dict[str, Any]] | None = None, headers: dict[str, str] | None = None, response: Any = None, metadata: dict[str, Any] | None = None, ) -> PipelineEvent: """Emit a canonical lifecycle event and return the final event state.""" event = PipelineEvent( stage=stage, operation=operation, request_id=request_id, provider=provider, model=model, messages=messages, tools=tools, headers=headers, response=response, metadata=metadata or {}, ) for extension in self._extensions: handler = getattr(extension, "on_pipeline_event", None) if not callable(handler): continue try: updated = handler(event) except Exception as exc: # noqa: BLE001 - preserve hook fail-open behavior log.warning( "pipeline extension %r failed during %s: %s", type(extension).__name__, stage.value, exc, ) continue if isinstance(updated, PipelineEvent): event = updated return event # ── compression/__init__.py ── """Universal compression with ML-based content detection. This module provides intelligent, automatic compression that: 1. Detects content type using ML (Magika) 2. Preserves structure (keys, signatures, templates) 3. Compresses content with Kompress 4. Enables retrieval via CCR Quick Start: # One-liner for simple use from headroom.compression import compress result = compress(content) # Or with configuration from headroom.compression import UniversalCompressor, UniversalCompressorConfig config = UniversalCompressorConfig(compression_ratio_target=0.5) compressor = UniversalCompressor(config=config) result = compressor.compress(content) """ from headroom.compression.detector import ContentType, MagikaDetector from headroom.compression.masks import StructureMask from headroom.compression.universal import ( CompressionResult, UniversalCompressor, UniversalCompressorConfig, compress, ) __all__ = [ # Simple API "compress", # Full API "UniversalCompressor", "UniversalCompressorConfig", "CompressionResult", # Advanced "MagikaDetector", "ContentType", "StructureMask", ] # ── compression/detector.py ── """ML-based content type detection using Google's Magika. Magika is a deep learning model for content type detection that: - Runs locally (~5ms latency) - Supports 100+ content types - Has 99%+ accuracy on supported types - Requires no configuration This replaces rule-based detection with learned detection. """ from __future__ import annotations import logging from dataclasses import dataclass, field from enum import Enum from typing import TYPE_CHECKING if TYPE_CHECKING: from magika import Magika from magika.types import MagikaResult logger = logging.getLogger(__name__) # Lazy-loaded Magika instance (singleton) _magika_instance: Magika | None = None class ContentType(Enum): """High-level content categories for compression routing.""" JSON = "json" CODE = "code" LOG = "log" DIFF = "diff" MARKDOWN = "markdown" TEXT = "text" UNKNOWN = "unknown" @dataclass class DetectionResult: """Result of ML-based content detection.""" content_type: ContentType confidence: float # 0.0 to 1.0 raw_label: str # Original Magika label language: str | None = None # For code: python, javascript, etc. metadata: dict = field(default_factory=dict) # Map Magika labels to our content types # This is the ONLY place where we map labels - no hardcoding elsewhere _CODE_LABELS = frozenset( { "python", "javascript", "typescript", "go", "rust", "java", "c", "cpp", "csharp", "ruby", "php", "swift", "kotlin", "scala", "shell", "bash", "powershell", "sql", "r", "perl", "lua", "haskell", "elixir", "erlang", "clojure", "ocaml", "fsharp", "dart", "julia", "zig", "nim", "crystal", "v", "solidity", "move", "cairo", "vyper", } ) _STRUCTURED_LABELS = frozenset( { "json", "jsonl", "yaml", "toml", "xml", "html", "csv", "tsv", "ini", "properties", } ) _LOG_LABELS = frozenset( { "log", "syslog", } ) _MARKDOWN_LABELS = frozenset( { "markdown", "rst", "asciidoc", "org", } ) def _get_magika() -> Magika: """Get or create the singleton Magika instance. Lazy-loads on first use to avoid import cost if not needed. """ global _magika_instance if _magika_instance is None: try: from magika import Magika _magika_instance = Magika() logger.debug("Magika model loaded successfully") except ImportError as e: raise ImportError( "Magika is required for ML-based content detection. " "Install with: pip install magika" ) from e return _magika_instance def _magika_available() -> bool: """Check if Magika is available without loading it.""" try: import magika # noqa: F401 return True except ImportError: return False class MagikaDetector: """ML-based content type detector using Google's Magika. This detector uses a deep learning model to identify content types without relying on file extensions or brittle regex patterns. Example: detector = MagikaDetector() result = detector.detect('def hello(): print("hi")') # result.content_type == ContentType.CODE # result.language == "python" """ def __init__(self, min_confidence: float = 0.5): """Initialize the detector. Args: min_confidence: Minimum confidence threshold. Below this, returns ContentType.UNKNOWN. """ self.min_confidence = min_confidence self._magika: Magika | None = None def _ensure_magika(self) -> Magika: """Ensure Magika is loaded.""" if self._magika is None: self._magika = _get_magika() return self._magika def detect(self, content: str) -> DetectionResult: """Detect content type using ML. Args: content: The content to analyze. Returns: DetectionResult with type, confidence, and metadata. Example: >>> detector = MagikaDetector() >>> result = detector.detect('{"users": [{"id": 1}]}') >>> result.content_type ContentType.JSON """ if not content or not content.strip(): return DetectionResult( content_type=ContentType.UNKNOWN, confidence=0.0, raw_label="empty", ) # Get Magika prediction magika = self._ensure_magika() result: MagikaResult = magika.identify_bytes(content.encode("utf-8")) raw_label = result.output.label confidence = result.score # Map to our content type content_type, language = self._map_label(raw_label) # Apply confidence threshold if confidence < self.min_confidence: content_type = ContentType.UNKNOWN return DetectionResult( content_type=content_type, confidence=confidence, raw_label=raw_label, language=language, metadata={ "magika_group": result.output.group, "magika_mime": result.output.mime_type, }, ) def detect_batch(self, contents: list[str]) -> list[DetectionResult]: """Detect content types for multiple contents. Args: contents: List of content strings to analyze. Returns: List of DetectionResults in same order as input. """ if not contents: return [] results = [] for content in contents: if not content or not content.strip(): results.append( DetectionResult( content_type=ContentType.UNKNOWN, confidence=0.0, raw_label="empty", ) ) continue magika_result = self._ensure_magika().identify_bytes(content.encode("utf-8")) raw_label = magika_result.output.label confidence = magika_result.score content_type, language = self._map_label(raw_label) if confidence < self.min_confidence: content_type = ContentType.UNKNOWN results.append( DetectionResult( content_type=content_type, confidence=confidence, raw_label=raw_label, language=language, metadata={ "magika_group": magika_result.output.group, "magika_mime": magika_result.output.mime_type, }, ) ) return results def _map_label(self, label: str) -> tuple[ContentType, str | None]: """Map Magika label to our ContentType. Args: label: Raw Magika label (e.g., "python", "json"). Returns: Tuple of (ContentType, optional language). """ label_lower = label.lower() # Check code languages if label_lower in _CODE_LABELS: return ContentType.CODE, label_lower # Check structured data if label_lower in _STRUCTURED_LABELS: # JSON gets its own type for specialized handling if label_lower in ("json", "jsonl"): return ContentType.JSON, None # Other structured data treated as JSON-like return ContentType.JSON, None # Check logs if label_lower in _LOG_LABELS: return ContentType.LOG, None # Check markdown/docs if label_lower in _MARKDOWN_LABELS: return ContentType.MARKDOWN, None # Diff format (Magika detects this with score=1.0) if label_lower == "diff": return ContentType.DIFF, None # Text types if label_lower in ("txt", "text", "ascii", "utf8", "empty"): return ContentType.TEXT, None # Default: treat as text return ContentType.TEXT, None @staticmethod def is_available() -> bool: """Check if Magika is available.""" return _magika_available() class FallbackDetector: """Simple fallback detector when Magika is not available. Uses basic heuristics - not as accurate but requires no dependencies. """ def __init__(self, min_confidence: float = 0.5): """Initialize the fallback detector.""" self.min_confidence = min_confidence def detect(self, content: str) -> DetectionResult: """Detect content type using simple heuristics. Args: content: The content to analyze. Returns: DetectionResult with type and confidence. """ if not content or not content.strip(): return DetectionResult( content_type=ContentType.UNKNOWN, confidence=0.0, raw_label="empty", ) stripped = content.strip() # JSON detection (simple but effective) if stripped.startswith(("{", "[")): try: import json json.loads(stripped) return DetectionResult( content_type=ContentType.JSON, confidence=1.0, raw_label="json", ) except (json.JSONDecodeError, ValueError): pass # Code detection (look for common patterns) code_indicators = [ "def ", "class ", "function ", "import ", "const ", "let ", "var ", "func ", "fn ", "pub ", "package ", ] if any(indicator in content for indicator in code_indicators): return DetectionResult( content_type=ContentType.CODE, confidence=0.7, raw_label="code", ) # Log detection log_indicators = ["ERROR", "WARN", "INFO", "DEBUG", "FATAL"] if any(indicator in content for indicator in log_indicators): return DetectionResult( content_type=ContentType.LOG, confidence=0.6, raw_label="log", ) # Default to text return DetectionResult( content_type=ContentType.TEXT, confidence=0.5, raw_label="text", ) def get_detector(prefer_magika: bool = True) -> MagikaDetector | FallbackDetector: """Get the best available detector. Args: prefer_magika: If True, use Magika if available. Returns: MagikaDetector if available and preferred, else FallbackDetector. """ if prefer_magika and MagikaDetector.is_available(): return MagikaDetector() return FallbackDetector() # ── compression/masks.py ── """Structure mask system for compression. A StructureMask identifies which parts of content are "structural" (should be preserved) vs "compressible" (can be compressed by Kompress). This separates the concerns of: 1. Structure detection (handlers) - What tokens are navigational? 2. Content compression (Kompress) - What tokens can be removed? The mask is content-agnostic - it's just a boolean array aligned to tokens. """ from __future__ import annotations from collections.abc import Callable, Sequence from dataclasses import dataclass, field @dataclass class StructureMask: """A mask identifying structural vs compressible tokens. The mask is aligned to a token sequence. True means "preserve this token" (it's structural/navigational), False means "compressible" (Kompress can potentially remove it). Attributes: tokens: The tokenized content (list of strings or token IDs). mask: Boolean array, True = preserve, False = compressible. metadata: Optional handler-specific metadata. """ tokens: Sequence[str | int] mask: list[bool] metadata: dict = field(default_factory=dict) def __post_init__(self) -> None: """Validate mask alignment.""" if len(self.tokens) != len(self.mask): raise ValueError( f"Mask length ({len(self.mask)}) must match tokens length ({len(self.tokens)})" ) @property def preservation_ratio(self) -> float: """Fraction of tokens marked for preservation.""" if not self.mask: return 0.0 return sum(self.mask) / len(self.mask) @property def structural_count(self) -> int: """Number of structural (preserved) tokens.""" return sum(self.mask) @property def compressible_count(self) -> int: """Number of compressible tokens.""" return len(self.mask) - sum(self.mask) def get_structural_tokens(self) -> list[str | int]: """Get list of tokens marked as structural.""" return [t for t, m in zip(self.tokens, self.mask) if m] def get_compressible_tokens(self) -> list[str | int]: """Get list of tokens marked as compressible.""" return [t for t, m in zip(self.tokens, self.mask) if not m] @classmethod def empty(cls, tokens: Sequence[str | int]) -> StructureMask: """Create a mask with no structural tokens (all compressible).""" return cls(tokens=tokens, mask=[False] * len(tokens)) @classmethod def full(cls, tokens: Sequence[str | int]) -> StructureMask: """Create a mask preserving all tokens (nothing compressible).""" return cls(tokens=tokens, mask=[True] * len(tokens)) def union(self, other: StructureMask) -> StructureMask: """Combine masks - preserve if EITHER mask says preserve. Useful when combining multiple structure detection strategies. Args: other: Another mask to combine with. Returns: New mask with union of preserved tokens. Raises: ValueError: If masks have different lengths. """ if len(self.mask) != len(other.mask): raise ValueError("Cannot union masks of different lengths") return StructureMask( tokens=self.tokens, mask=[a or b for a, b in zip(self.mask, other.mask)], metadata={"source": "union", **self.metadata, **other.metadata}, ) def intersection(self, other: StructureMask) -> StructureMask: """Combine masks - preserve only if BOTH masks say preserve. Useful for being more aggressive with compression. Args: other: Another mask to combine with. Returns: New mask with intersection of preserved tokens. Raises: ValueError: If masks have different lengths. """ if len(self.mask) != len(other.mask): raise ValueError("Cannot intersect masks of different lengths") return StructureMask( tokens=self.tokens, mask=[a and b for a, b in zip(self.mask, other.mask)], metadata={"source": "intersection", **self.metadata, **other.metadata}, ) @dataclass class MaskSpan: """A contiguous span in the mask. Useful for applying different compression strategies to different parts of the content. """ start: int end: int is_structural: bool label: str = "" # Optional label (e.g., "key", "value", "signature") @property def length(self) -> int: """Length of the span.""" return self.end - self.start def mask_to_spans(mask: StructureMask) -> list[MaskSpan]: """Convert a mask to a list of contiguous spans. This is useful for processing structural and compressible regions separately. Args: mask: The structure mask. Returns: List of MaskSpan objects representing contiguous regions. Example: >>> tokens = ["def", " ", "foo", "(", ")", ":", " ", "pass"] >>> mask = StructureMask(tokens, [True, True, True, True, True, True, False, False]) >>> spans = mask_to_spans(mask) >>> [(s.start, s.end, s.is_structural) for s in spans] [(0, 6, True), (6, 8, False)] """ if not mask.mask: return [] spans = [] current_start = 0 current_structural = mask.mask[0] for i, is_structural in enumerate(mask.mask[1:], start=1): if is_structural != current_structural: spans.append( MaskSpan( start=current_start, end=i, is_structural=current_structural, ) ) current_start = i current_structural = is_structural # Don't forget the last span spans.append( MaskSpan( start=current_start, end=len(mask.mask), is_structural=current_structural, ) ) return spans def apply_mask_to_text( text: str, mask: StructureMask, compress_fn: Callable[[str], str], tokenizer_decode: Callable[[Sequence[str | int]], str] | None = None, ) -> str: """Apply compression to non-structural regions of text. This is the core function that enables structure-preserving compression. Structural regions are kept verbatim, non-structural regions are passed to the compression function. Args: text: Original text. mask: Structure mask aligned to tokens. compress_fn: Function to compress text (e.g., Kompress). tokenizer_decode: Optional function to decode tokens to text. If not provided, assumes tokens are strings and joins them. Returns: Text with non-structural regions compressed. """ spans = mask_to_spans(mask) result_parts = [] if tokenizer_decode is None: # Default: assume tokens are strings def tokenizer_decode(tokens: Sequence[str | int]) -> str: return "".join(str(t) for t in tokens) for span in spans: span_tokens = mask.tokens[span.start : span.end] span_text = tokenizer_decode(span_tokens) if span.is_structural: # Keep structural regions verbatim result_parts.append(span_text) else: # Compress non-structural regions compressed = compress_fn(span_text) result_parts.append(compressed) return "".join(result_parts) @dataclass class EntropyScore: """Entropy-based preservation signal. High entropy content (UUIDs, hashes, random strings) should generally be preserved because: 1. They're information-dense (can't be reconstructed) 2. They're often identifiers (semantically important) 3. Token-level compressors may mangle them This is a self-signal - no external classifier needed. """ value: float # 0.0 to 1.0, normalized entropy should_preserve: bool # True if entropy above threshold @classmethod def compute(cls, text: str, threshold: float = 0.85) -> EntropyScore: """Compute entropy score for text. Args: text: Text to analyze. threshold: Entropy threshold for preservation (0.0-1.0). Higher = more selective. Returns: EntropyScore with value and preservation recommendation. """ if not text: return cls(value=0.0, should_preserve=False) # Calculate character entropy import math from collections import Counter # Count character frequencies counter = Counter(text) total = len(text) # Calculate Shannon entropy entropy = 0.0 for count in counter.values(): if count > 0: p = count / total entropy -= p * math.log2(p) # Normalize to 0-1 range # Maximum possible entropy for this alphabet size max_entropy = math.log2(len(counter)) if len(counter) > 1 else 1.0 normalized = entropy / max_entropy if max_entropy > 0 else 0.0 return cls( value=normalized, should_preserve=normalized >= threshold, ) def compute_entropy_mask( tokens: Sequence[str], threshold: float = 0.85, min_token_length: int = 8, ) -> StructureMask: """Create a mask preserving high-entropy tokens. This is a self-signal that doesn't require content classification. High-entropy tokens (UUIDs, hashes, etc.) are marked for preservation. Args: tokens: List of string tokens. threshold: Entropy threshold (0.0-1.0). Higher = more selective. min_token_length: Only check tokens this long or longer. Short tokens rarely have meaningful entropy. Returns: StructureMask with high-entropy tokens marked for preservation. Example: >>> tokens = ["user", ":", " ", "8f14e45f-ceea-4123-8f14-e45fceea4123"] >>> mask = compute_entropy_mask(tokens) >>> mask.mask [False, False, False, True] # UUID preserved """ mask = [] for token in tokens: if isinstance(token, int): # Token ID, can't compute entropy mask.append(False) continue token_str = str(token) # Skip short tokens if len(token_str) < min_token_length: mask.append(False) continue # Compute entropy score = EntropyScore.compute(token_str, threshold) mask.append(score.should_preserve) return StructureMask( tokens=tokens, mask=mask, metadata={"source": "entropy", "threshold": threshold}, ) # ── compression/universal.py ── """Universal compressor with ML-based detection and structure preservation. This is the main entry point for compression. It: 1. Detects content type using Magika (ML) 2. Extracts structure using appropriate handler 3. Compresses non-structural content with Kompress 4. Optionally stores original in CCR for retrieval Usage: compressor = UniversalCompressor() result = compressor.compress(content) # Result contains: # - compressed: The compressed content # - compression_ratio: original_tokens / compressed_tokens # - content_type: Detected content type # - preservation_ratio: Fraction of content preserved as structure """ from __future__ import annotations import logging from collections.abc import Callable from dataclasses import dataclass, field from typing import Any from headroom.compression.detector import ( ContentType, DetectionResult, FallbackDetector, get_detector, ) from headroom.compression.handlers.base import ( NoOpHandler, StructureHandler, ) from headroom.compression.handlers.code_handler import CodeStructureHandler from headroom.compression.handlers.json_handler import JSONStructureHandler from headroom.compression.masks import ( StructureMask, compute_entropy_mask, mask_to_spans, ) logger = logging.getLogger(__name__) @dataclass class UniversalCompressorConfig: """Configuration for UniversalCompressor. Attributes: use_magika: Use ML-based detection (requires magika package). use_kompress: Use Kompress for content compression. use_entropy_preservation: Preserve high-entropy tokens (UUIDs, etc.). entropy_threshold: Threshold for entropy-based preservation. min_content_length: Minimum content length to compress. compression_ratio_target: Target compression ratio (0.0-1.0). ccr_enabled: Store originals in CCR for retrieval. """ use_magika: bool = True use_kompress: bool = True use_entropy_preservation: bool = True entropy_threshold: float = 0.85 min_content_length: int = 100 compression_ratio_target: float = 0.3 # Target 70% reduction ccr_enabled: bool = True @dataclass class CompressionResult: """Result from compression. Attributes: compressed: The compressed content. original: The original content (for reference). compression_ratio: compressed_length / original_length. tokens_before: Estimated token count before compression. tokens_after: Estimated token count after compression. content_type: Detected content type. detection_confidence: Confidence of content type detection. handler_used: Name of structure handler used. preservation_ratio: Fraction of content marked as structural. ccr_key: CCR storage key (if CCR enabled). metadata: Additional metadata. """ compressed: str original: str compression_ratio: float tokens_before: int tokens_after: int content_type: ContentType detection_confidence: float handler_used: str preservation_ratio: float ccr_key: str | None = None metadata: dict = field(default_factory=dict) @property def tokens_saved(self) -> int: """Number of tokens saved.""" return max(0, self.tokens_before - self.tokens_after) @property def savings_percentage(self) -> float: """Percentage of tokens saved.""" if self.tokens_before == 0: return 0.0 return (self.tokens_saved / self.tokens_before) * 100 class UniversalCompressor: """Universal compressor with ML detection and structure preservation. This compressor automatically: 1. Detects content type (JSON, code, logs, text) using ML 2. Extracts structure (keys, signatures, templates) 3. Preserves structure while compressing content 4. Stores original for CCR retrieval Example: >>> compressor = UniversalCompressor() >>> result = compressor.compress('{"users": [{"id": 1, "name": "Alice"}]}') >>> print(result.content_type) # ContentType.JSON >>> print(result.compressed) # Structure preserved, values compressed """ def __init__( self, config: UniversalCompressorConfig | None = None, handlers: dict[ContentType, StructureHandler] | None = None, compress_fn: Callable[[str], str] | None = None, ): """Initialize the compressor. Args: config: Compression configuration. handlers: Custom handlers for content types. compress_fn: Custom compression function. If None, uses Kompress when available, else simple truncation. """ self.config = config or UniversalCompressorConfig() # Initialize detector if self.config.use_magika: self._detector = get_detector(prefer_magika=True) else: self._detector = FallbackDetector() # Initialize handlers self._handlers: dict[ContentType, StructureHandler] = handlers or { ContentType.JSON: JSONStructureHandler(), ContentType.CODE: CodeStructureHandler(), } self._noop_handler = NoOpHandler() # Initialize compression function self._compress_fn = compress_fn or self._get_default_compress_fn() # CCR store (lazy initialized) self._ccr_store: Any | None = None def _get_default_compress_fn(self) -> Callable[[str], str]: """Get default compression function. Returns Kompress wrapper if available, else simple truncation. """ if self.config.use_kompress: try: return self._kompress_compress except ImportError: logger.info("Kompress not available, using simple compression") return self._simple_compress def _kompress_compress(self, text: str) -> str: """Compress using Kompress. Args: text: Text to compress. Returns: Compressed text. """ try: from headroom.transforms.kompress_compressor import KompressCompressor compressor = KompressCompressor() result = compressor.compress(text) return result.compressed except ImportError: return self._simple_compress(text) except Exception as e: logger.warning("Kompress compression failed: %s", e) return self._simple_compress(text) def _simple_compress(self, text: str) -> str: """Simple compression fallback (truncation with indicator). Args: text: Text to compress. Returns: Truncated text with indicator. """ target_len = int(len(text) * self.config.compression_ratio_target) if len(text) <= target_len: return text # Keep first and last portions keep_start = target_len * 2 // 3 keep_end = target_len // 3 return text[:keep_start] + "\n...[compressed]...\n" + text[-keep_end:] def compress( self, content: str, content_type: ContentType | None = None, **kwargs: Any, ) -> CompressionResult: """Compress content with structure preservation. Args: content: Content to compress. content_type: Override content type detection. **kwargs: Handler-specific options. Returns: CompressionResult with compressed content and metadata. """ # Handle empty/short content if not content or len(content) < self.config.min_content_length: return CompressionResult( compressed=content, original=content, compression_ratio=1.0, tokens_before=self._estimate_tokens(content), tokens_after=self._estimate_tokens(content), content_type=ContentType.UNKNOWN, detection_confidence=0.0, handler_used="none", preservation_ratio=1.0, metadata={"skipped": "content too short"}, ) # Detect content type if content_type is None: detection = self._detector.detect(content) else: detection = DetectionResult( content_type=content_type, confidence=1.0, raw_label="override", ) # Get handler for content type handler = self._handlers.get(detection.content_type, self._noop_handler) # Tokenize content (character-level for masks) tokens = list(content) # Get structure mask from handler handler_result = handler.get_mask(content, tokens, **kwargs) structure_mask = handler_result.mask # Optionally add entropy-based preservation if self.config.use_entropy_preservation: entropy_mask = compute_entropy_mask( tokens, threshold=self.config.entropy_threshold, ) # Union: preserve if either mask says preserve structure_mask = structure_mask.union(entropy_mask) # Apply compression to non-structural parts compressed = self._compress_with_mask(content, structure_mask) # Estimate tokens tokens_before = self._estimate_tokens(content) tokens_after = self._estimate_tokens(compressed) # Store in CCR if enabled ccr_key = None if self.config.ccr_enabled: ccr_key = self._store_in_ccr(content, compressed) return CompressionResult( compressed=compressed, original=content, compression_ratio=len(compressed) / len(content) if content else 1.0, tokens_before=tokens_before, tokens_after=tokens_after, content_type=detection.content_type, detection_confidence=detection.confidence, handler_used=handler_result.handler_name, preservation_ratio=structure_mask.preservation_ratio, ccr_key=ccr_key, metadata={ "detection": { "raw_label": detection.raw_label, "language": detection.language, }, "handler": handler_result.metadata, }, ) def _compress_with_mask(self, content: str, mask: StructureMask) -> str: """Apply compression respecting structure mask. Args: content: Original content. mask: Structure mask. Returns: Compressed content with structure preserved. """ spans = mask_to_spans(mask) result_parts: list[str] = [] for span in spans: span_content = content[span.start : span.end] if span.is_structural: # Preserve structural content result_parts.append(span_content) else: # Compress non-structural content if len(span_content) > 50: # Only compress if substantial compressed = self._compress_fn(span_content) result_parts.append(compressed) else: result_parts.append(span_content) return "".join(result_parts) def _estimate_tokens(self, text: str) -> int: """Estimate token count. Uses simple heuristic: ~4 characters per token. Args: text: Text to estimate. Returns: Estimated token count. """ if not text: return 0 # Simple estimation: ~4 chars per token on average return len(text) // 4 def _store_in_ccr(self, original: str, compressed: str) -> str | None: """Store original in CCR for retrieval. Args: original: Original content. compressed: Compressed content. Returns: CCR key if stored, None otherwise. """ try: if self._ccr_store is None: from headroom.cache.compression_store import CompressionStore self._ccr_store = CompressionStore() key = self._ccr_store.store( original, compressed, original_tokens=self._estimate_tokens(original), compressed_tokens=self._estimate_tokens(compressed), ) return str(key) if key else None except ImportError: logger.debug("CCR store not available") return None except Exception as e: logger.warning("Failed to store in CCR: %s", e) return None def compress_batch( self, contents: list[str], **kwargs: Any, ) -> list[CompressionResult]: """Compress multiple contents. More efficient than calling compress() in a loop for ML detection. Args: contents: List of contents to compress. **kwargs: Handler-specific options. Returns: List of CompressionResults. """ if not contents: return [] # Batch detection if hasattr(self._detector, "detect_batch"): detections = self._detector.detect_batch(contents) else: detections = [self._detector.detect(c) for c in contents] # Compress each with detected type results = [] for content, detection in zip(contents, detections): result = self.compress( content, content_type=detection.content_type, **kwargs, ) results.append(result) return results def get_handler(self, content_type: ContentType) -> StructureHandler: """Get handler for content type. Args: content_type: Content type. Returns: Handler for the content type. """ return self._handlers.get(content_type, self._noop_handler) def register_handler( self, content_type: ContentType, handler: StructureHandler, ) -> None: """Register a custom handler for a content type. Args: content_type: Content type to handle. handler: Handler instance. """ self._handlers[content_type] = handler def compress(content: str, **kwargs: Any) -> CompressionResult: """Convenience function for one-off compression. Args: content: Content to compress. **kwargs: Passed to UniversalCompressor.compress(). Returns: CompressionResult. Example: >>> from headroom.compression import compress >>> result = compress('{"users": [{"id": 1}, {"id": 2}]}') >>> print(result.compressed) """ compressor = UniversalCompressor() return compressor.compress(content, **kwargs) # ── compression/handlers/__init__.py ── """Structure handlers for different content types. Each handler knows how to extract structural information from a specific content type and create a StructureMask marking what should be preserved. Handlers don't compress - they only identify structure. The actual compression is done by Kompress on the non-structural parts. """ from headroom.compression.handlers.base import ( HandlerResult, StructureHandler, ) from headroom.compression.handlers.code_handler import CodeStructureHandler from headroom.compression.handlers.json_handler import JSONStructureHandler __all__ = [ "StructureHandler", "HandlerResult", "JSONStructureHandler", "CodeStructureHandler", ] # ── compression/handlers/base.py ── """Base class and protocol for structure handlers. Structure handlers extract structural information from content and create masks identifying what should be preserved during compression. The handler protocol is simple: 1. get_mask(content) -> StructureMask 2. can_handle(content) -> bool (optional) Handlers are content-type specific but domain-agnostic. A JSONStructureHandler preserves JSON keys whether it's user data, search results, or config files. """ from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass, field from typing import Any, Protocol, runtime_checkable from headroom.compression.masks import StructureMask @dataclass class HandlerResult: """Result from a structure handler. Contains the mask plus metadata about what was detected. """ mask: StructureMask handler_name: str confidence: float = 1.0 # How confident the handler is in its detection metadata: dict = field(default_factory=dict) @property def preservation_ratio(self) -> float: """Fraction of content marked for preservation.""" return self.mask.preservation_ratio @runtime_checkable class StructureHandler(Protocol): """Protocol for structure handlers. Any class implementing get_mask() can be used as a handler. """ @property def name(self) -> str: """Handler name for logging and metadata.""" ... def get_mask( self, content: str, tokens: list[str] | None = None, **kwargs: Any, ) -> HandlerResult: """Extract structure mask from content. Args: content: The content to analyze. tokens: Pre-tokenized content (optional). If not provided, handler should tokenize internally. **kwargs: Handler-specific options. Returns: HandlerResult with mask and metadata. """ ... def can_handle(self, content: str) -> bool: """Check if this handler can process the content. Default implementation returns True. Override for handlers that need to verify content format before processing. Args: content: The content to check. Returns: True if handler can process this content. """ ... class BaseStructureHandler(ABC): """Base implementation for structure handlers. Provides common functionality and enforces the handler interface. Subclasses must implement _extract_mask(). """ def __init__(self, name: str | None = None): """Initialize the handler. Args: name: Optional handler name. Defaults to class name. """ self._name = name or self.__class__.__name__ @property def name(self) -> str: """Handler name.""" return self._name def get_mask( self, content: str, tokens: list[str] | None = None, **kwargs: Any, ) -> HandlerResult: """Extract structure mask from content. This is the main entry point. It handles common logic like empty content and delegates to _extract_mask() for the content-specific logic. Args: content: The content to analyze. tokens: Pre-tokenized content (optional). **kwargs: Handler-specific options. Returns: HandlerResult with mask and metadata. """ # Handle empty content if not content or not content.strip(): tokens = tokens or [] return HandlerResult( mask=StructureMask.empty(tokens), handler_name=self.name, confidence=0.0, metadata={"empty": True}, ) # Tokenize if not provided if tokens is None: tokens = self._tokenize(content) # Delegate to subclass return self._extract_mask(content, tokens, **kwargs) def can_handle(self, content: str) -> bool: """Check if this handler can process the content. Default implementation returns True. Override for handlers that need to verify content format. Args: content: The content to check. Returns: True if handler can process this content. """ return True @abstractmethod def _extract_mask( self, content: str, tokens: list[str], **kwargs: Any, ) -> HandlerResult: """Extract structure mask from content. Subclasses implement this to provide content-specific logic. Args: content: The content to analyze (non-empty, stripped). tokens: Tokenized content. **kwargs: Handler-specific options. Returns: HandlerResult with mask and metadata. """ ... def _tokenize(self, content: str) -> list[str]: """Default tokenization - character-level. Subclasses may override for more sophisticated tokenization. For mask purposes, character-level is often sufficient and aligns well with token-level compression. Args: content: Content to tokenize. Returns: List of tokens (characters by default). """ # Simple character-level tokenization # This aligns well with structure detection (we mark ranges) return list(content) class NoOpHandler(BaseStructureHandler): """Handler that marks everything as compressible. Used as a fallback when no structure is detected. """ def __init__(self) -> None: """Initialize the no-op handler.""" super().__init__(name="noop") def _extract_mask( self, content: str, tokens: list[str], **kwargs: Any, ) -> HandlerResult: """Return mask with everything compressible.""" return HandlerResult( mask=StructureMask.empty(tokens), handler_name=self.name, confidence=1.0, metadata={"reason": "no structure detected"}, ) # ── compression/handlers/code_handler.py ── """Code structure handler using AST parsing. Extracts structural elements from source code: - Import statements - Function/method signatures - Class definitions - Type annotations - Decorators Function bodies are marked as compressible while preserving signatures. This enables the LLM to see all available functions/methods while body implementations are compressed. Uses tree-sitter for parsing when available, falls back to regex patterns. """ from __future__ import annotations import logging import re import threading from dataclasses import dataclass from enum import Enum from typing import Any from headroom.compression.handlers.base import BaseStructureHandler, HandlerResult from headroom.compression.masks import StructureMask logger = logging.getLogger(__name__) # Lazy-loaded tree-sitter _tree_sitter_available: bool | None = None _tree_sitter_parsers: dict[str, Any] = {} _tree_sitter_lock = threading.Lock() def _check_tree_sitter() -> bool: """Check if tree-sitter is available.""" global _tree_sitter_available if _tree_sitter_available is None: try: import tree_sitter_language_pack # noqa: F401 _tree_sitter_available = True except ImportError: _tree_sitter_available = False return _tree_sitter_available def _get_parser(language: str) -> Any: """Get tree-sitter parser for language.""" global _tree_sitter_parsers if not _check_tree_sitter(): raise ImportError("tree-sitter-language-pack not installed") with _tree_sitter_lock: if language not in _tree_sitter_parsers: from tree_sitter_language_pack import get_parser _tree_sitter_parsers[language] = get_parser(language) # type: ignore[arg-type] return _tree_sitter_parsers[language] class CodeLanguage(Enum): """Supported programming languages.""" PYTHON = "python" JAVASCRIPT = "javascript" TYPESCRIPT = "typescript" GO = "go" RUST = "rust" JAVA = "java" C = "c" CPP = "cpp" @dataclass class CodeSpan: """A span of code with its structural role.""" start: int end: int role: str # "import", "signature", "body", "decorator", etc. is_structural: bool # Language-specific AST node types that are structural _STRUCTURAL_NODE_TYPES: dict[str, set[str]] = { "python": { "import_statement", "import_from_statement", "function_definition", # Just the signature part "class_definition", "decorated_definition", "type_alias_statement", }, "javascript": { "import_statement", "export_statement", "function_declaration", "class_declaration", "method_definition", "arrow_function", # Signature only }, "typescript": { "import_statement", "export_statement", "function_declaration", "class_declaration", "method_definition", "interface_declaration", "type_alias_declaration", }, "go": { "import_declaration", "function_declaration", "method_declaration", "type_declaration", "interface_type", }, "rust": { "use_declaration", "function_item", "impl_item", "struct_item", "enum_item", "trait_item", }, "java": { "import_declaration", "class_declaration", "method_declaration", "interface_declaration", "annotation", }, } # Regex patterns for fallback detection _SIGNATURE_PATTERNS: dict[str, list[re.Pattern[str]]] = { "python": [ re.compile(r"^\s*(async\s+)?def\s+\w+\s*\([^)]*\)\s*(->\s*[^:]+)?:", re.MULTILINE), re.compile(r"^\s*class\s+\w+(\([^)]*\))?:", re.MULTILINE), re.compile(r"^\s*@\w+(\([^)]*\))?\s*$", re.MULTILINE), ], "javascript": [ re.compile(r"^\s*(async\s+)?function\s+\w+\s*\([^)]*\)", re.MULTILINE), re.compile(r"^\s*class\s+\w+(\s+extends\s+\w+)?", re.MULTILINE), re.compile(r"^\s*(const|let|var)\s+\w+\s*=\s*(async\s+)?\([^)]*\)\s*=>", re.MULTILINE), ], "typescript": [ re.compile(r"^\s*(async\s+)?function\s+\w+\s*(<[^>]+>)?\s*\([^)]*\)", re.MULTILINE), re.compile(r"^\s*class\s+\w+(<[^>]+>)?(\s+extends\s+\w+)?", re.MULTILINE), re.compile(r"^\s*interface\s+\w+(<[^>]+>)?", re.MULTILINE), re.compile(r"^\s*type\s+\w+(<[^>]+>)?\s*=", re.MULTILINE), ], "go": [ re.compile(r"^\s*func\s+(\([^)]+\)\s+)?\w+\s*\([^)]*\)", re.MULTILINE), re.compile(r"^\s*type\s+\w+\s+(struct|interface)", re.MULTILINE), ], "rust": [ re.compile(r"^\s*(pub\s+)?(async\s+)?fn\s+\w+\s*(<[^>]+>)?\s*\([^)]*\)", re.MULTILINE), re.compile(r"^\s*(pub\s+)?struct\s+\w+", re.MULTILINE), re.compile(r"^\s*(pub\s+)?enum\s+\w+", re.MULTILINE), re.compile(r"^\s*(pub\s+)?trait\s+\w+", re.MULTILINE), re.compile(r"^\s*impl(<[^>]+>)?\s+\w+", re.MULTILINE), ], "java": [ re.compile( r"^\s*(public|private|protected)?\s*(static\s+)?\w+\s+\w+\s*\([^)]*\)", re.MULTILINE ), re.compile(r"^\s*(public\s+)?(class|interface|enum)\s+\w+", re.MULTILINE), re.compile(r"^\s*@\w+(\([^)]*\))?\s*$", re.MULTILINE), ], } # Import patterns for fallback _IMPORT_PATTERNS: dict[str, re.Pattern[str]] = { "python": re.compile(r"^\s*(import\s+\w+|from\s+\w+\s+import)", re.MULTILINE), "javascript": re.compile(r"^\s*(import\s+.*from|require\s*\()", re.MULTILINE), "typescript": re.compile(r"^\s*(import\s+.*from|require\s*\()", re.MULTILINE), "go": re.compile(r'^\s*import\s+(\(|")', re.MULTILINE), "rust": re.compile(r"^\s*use\s+\w+", re.MULTILINE), "java": re.compile(r"^\s*import\s+[\w.]+;", re.MULTILINE), } class CodeStructureHandler(BaseStructureHandler): """Handler for source code. Preserves: - Import/use statements - Function/method signatures (not bodies) - Class/struct/interface definitions - Type declarations - Decorators/annotations Marks as compressible: - Function/method bodies - Comments (optionally preserved) - Whitespace Example: >>> handler = CodeStructureHandler() >>> code = ''' ... def hello(name: str) -> str: ... message = f"Hello, {name}!" ... return message ... ''' >>> result = handler.get_mask(code, language="python") >>> # Signature "def hello(name: str) -> str:" preserved >>> # Body content compressed """ def __init__( self, preserve_comments: bool = False, use_tree_sitter: bool = True, default_language: str = "python", ): """Initialize the code handler. Args: preserve_comments: Whether to preserve comments as structural. use_tree_sitter: Whether to use tree-sitter for parsing. Falls back to regex if False or unavailable. default_language: Default language when detection fails. """ super().__init__(name="code") self.preserve_comments = preserve_comments self.use_tree_sitter = use_tree_sitter self.default_language = default_language def can_handle(self, content: str) -> bool: """Check if content looks like source code.""" # Quick heuristic checks code_indicators = [ "def ", "class ", "function ", "import ", "const ", "let ", "var ", "func ", "fn ", "pub ", "package ", "struct ", "interface ", ] return any(indicator in content for indicator in code_indicators) def _extract_mask( self, content: str, tokens: list[str], language: str | None = None, **kwargs: Any, ) -> HandlerResult: """Extract structure mask from code. Args: content: Source code content. tokens: Character-level tokens. language: Programming language (auto-detected if None). **kwargs: Additional options. Returns: HandlerResult with mask marking structural elements. """ # Detect language if not provided if language is None: language = self._detect_language(content) # Try tree-sitter first if self.use_tree_sitter and _check_tree_sitter(): try: return self._extract_with_tree_sitter(content, tokens, language) except Exception as e: logger.debug("Tree-sitter parsing failed, using fallback: %s", e) # Fallback to regex return self._extract_with_regex(content, tokens, language) def _extract_with_tree_sitter( self, content: str, tokens: list[str], language: str, ) -> HandlerResult: """Extract structure using tree-sitter AST. Args: content: Source code. tokens: Character tokens. language: Language name. Returns: HandlerResult with mask. """ parser = _get_parser(language) tree = parser.parse(content.encode("utf-8")) # Collect structural spans spans: list[CodeSpan] = [] def visit_node(node: Any, depth: int = 0) -> None: """Visit AST node and collect structural spans.""" node_type = node.type structural_types = _STRUCTURAL_NODE_TYPES.get(language, set()) # Check if this is a structural node type if node_type in structural_types: # For functions, only the signature is structural if "function" in node_type or "method" in node_type: # Find the body node and exclude it body_node = None for child in node.children: if child.type in ("block", "statement_block", "compound_statement"): body_node = child break if body_node: # Signature is from start to body start spans.append( CodeSpan( start=node.start_byte, end=body_node.start_byte, role="signature", is_structural=True, ) ) # Body is compressible spans.append( CodeSpan( start=body_node.start_byte, end=body_node.end_byte, role="body", is_structural=False, ) ) else: # No body found, preserve whole thing spans.append( CodeSpan( start=node.start_byte, end=node.end_byte, role=node_type, is_structural=True, ) ) else: # Non-function structural nodes spans.append( CodeSpan( start=node.start_byte, end=node.end_byte, role=node_type, is_structural=True, ) ) elif node_type == "comment" and self.preserve_comments: spans.append( CodeSpan( start=node.start_byte, end=node.end_byte, role="comment", is_structural=True, ) ) # Recurse into children for child in node.children: visit_node(child, depth + 1) visit_node(tree.root_node) # Build mask from spans mask = self._spans_to_mask(spans, len(content)) return HandlerResult( mask=StructureMask(tokens=tokens, mask=mask), handler_name=self.name, confidence=0.95, metadata={ "language": language, "parser": "tree-sitter", "structural_spans": len([s for s in spans if s.is_structural]), }, ) def _extract_with_regex( self, content: str, tokens: list[str], language: str, ) -> HandlerResult: """Extract structure using regex patterns (fallback). Args: content: Source code. tokens: Character tokens. language: Language name. Returns: HandlerResult with mask. """ spans: list[CodeSpan] = [] # Match imports import_pattern = _IMPORT_PATTERNS.get(language) if import_pattern: for match in import_pattern.finditer(content): # Find end of import line end = content.find("\n", match.end()) if end == -1: end = len(content) spans.append( CodeSpan( start=match.start(), end=end, role="import", is_structural=True, ) ) # Match signatures signature_patterns = _SIGNATURE_PATTERNS.get(language, []) for pattern in signature_patterns: for match in pattern.finditer(content): spans.append( CodeSpan( start=match.start(), end=match.end(), role="signature", is_structural=True, ) ) # Build mask from spans mask = self._spans_to_mask(spans, len(content)) return HandlerResult( mask=StructureMask(tokens=tokens, mask=mask), handler_name=self.name, confidence=0.7, # Lower confidence for regex metadata={ "language": language, "parser": "regex", "structural_spans": len(spans), }, ) def _spans_to_mask(self, spans: list[CodeSpan], length: int) -> list[bool]: """Convert spans to character-level mask. Args: spans: List of code spans. length: Total content length. Returns: Boolean mask aligned to characters. """ mask = [False] * length for span in spans: if span.is_structural: for i in range(span.start, min(span.end, length)): mask[i] = True return mask def _detect_language(self, content: str) -> str: """Detect programming language from content. Args: content: Source code content. Returns: Language name (lowercase). """ # Check for language-specific markers markers = { "python": ["def ", "import ", "from ", "class ", "async def"], "javascript": ["function ", "const ", "let ", "var ", "=>"], "typescript": ["interface ", "type ", ": string", ": number"], "go": ["func ", "package ", "import (", "type "], "rust": ["fn ", "let mut", "impl ", "pub fn", "use "], "java": ["public class", "private ", "protected ", "void "], } scores: dict[str, int] = {} for lang, patterns in markers.items(): scores[lang] = sum(1 for p in patterns if p in content) if not scores or max(scores.values()) == 0: return self.default_language return max(scores, key=lambda k: scores[k]) def is_tree_sitter_available() -> bool: """Check if tree-sitter is available.""" return _check_tree_sitter() # ── compression/handlers/json_handler.py ── """JSON structure handler. Extracts structural elements from JSON content: - Keys (navigational - tells LLM what fields exist) - Brackets and colons (structural syntax) - Short values like booleans, nulls, small numbers Values (strings, long numbers, nested content) are marked as compressible. This enables the LLM to see the full schema while values are compressed. """ from __future__ import annotations import json from dataclasses import dataclass from enum import Enum from typing import Any, cast from headroom.compression.handlers.base import BaseStructureHandler, HandlerResult from headroom.compression.masks import EntropyScore, StructureMask class JSONTokenType(Enum): """Types of JSON tokens for structure detection.""" KEY = "key" # Object key (always structural) STRING_VALUE = "string_value" # String value (compressible) NUMBER = "number" # Numeric value (preserve if short) BOOLEAN = "boolean" # true/false (always structural) NULL = "null" # null (always structural) BRACKET = "bracket" # {, }, [, ] (always structural) COLON = "colon" # : (always structural) COMMA = "comma" # , (always structural) WHITESPACE = "whitespace" # spaces, newlines (compressible) @dataclass class JSONToken: """A token in JSON content with its type and position.""" text: str token_type: JSONTokenType start: int end: int @property def is_structural(self) -> bool: """Whether this token should be preserved.""" return self.token_type in ( JSONTokenType.KEY, JSONTokenType.BOOLEAN, JSONTokenType.NULL, JSONTokenType.BRACKET, JSONTokenType.COLON, JSONTokenType.COMMA, ) class JSONStructureHandler(BaseStructureHandler): """Handler for JSON content. Preserves: - All keys (navigational - LLM sees what fields exist) - Structural syntax ({, }, [, ], :, ,) - Booleans and nulls (small, semantically important) - High-entropy strings (UUIDs, hashes - identifiers) - Short numbers (often IDs or important values) Compresses: - Long string values (descriptions, content) - Whitespace - Redundant array elements (after first few) Example: >>> handler = JSONStructureHandler() >>> result = handler.get_mask('{"name": "Alice", "id": "usr_123"}') >>> # Keys "name" and "id" preserved, values may be compressed """ def __init__( self, preserve_short_values: bool = True, short_value_threshold: int = 20, preserve_high_entropy: bool = True, entropy_threshold: float = 0.85, max_array_items_full: int = 3, # Keep first N items fully max_number_digits: int = 10, # Preserve numbers up to N digits ): """Initialize the JSON handler. Args: preserve_short_values: Preserve short string values. short_value_threshold: Max length for "short" values. preserve_high_entropy: Preserve high-entropy strings (UUIDs, etc.). entropy_threshold: Entropy threshold for preservation. max_array_items_full: Number of array items to keep in full. max_number_digits: Max digits for numbers to preserve (often IDs). """ super().__init__(name="json") self.preserve_short_values = preserve_short_values self.short_value_threshold = short_value_threshold self.preserve_high_entropy = preserve_high_entropy self.entropy_threshold = entropy_threshold self.max_array_items_full = max_array_items_full self.max_number_digits = max_number_digits def can_handle(self, content: str) -> bool: """Check if content is valid JSON.""" stripped = content.strip() if not stripped.startswith(("{", "[")): return False try: json.loads(stripped) return True except (json.JSONDecodeError, ValueError): return False def _extract_mask( self, content: str, tokens: list[str], **kwargs: Any, ) -> HandlerResult: """Extract structure mask from JSON content. Args: content: JSON content. tokens: Character-level tokens. **kwargs: Additional options. Returns: HandlerResult with mask marking structural elements. """ # Tokenize JSON to identify structure json_tokens = self._tokenize_json(content) # Build character-level mask mask = [False] * len(content) # Track array depth for selective preservation array_depth = 0 array_item_counts: dict[int, int] = {} # depth -> count for token in json_tokens: # Track array items if token.token_type == JSONTokenType.BRACKET: if token.text == "[": array_depth += 1 array_item_counts[array_depth] = 0 elif token.text == "]": if array_depth in array_item_counts: del array_item_counts[array_depth] array_depth = max(0, array_depth - 1) # Count array items at commas if token.token_type == JSONTokenType.COMMA and array_depth > 0: array_item_counts[array_depth] = array_item_counts.get(array_depth, 0) + 1 # Determine if this token should be preserved preserve = self._should_preserve_token( token, array_depth, array_item_counts.get(array_depth, 0), ) # Mark in mask if preserve: for i in range(token.start, min(token.end, len(mask))): mask[i] = True # Convert to character tokens if needed char_tokens = list(content) if tokens == list(content) else tokens return HandlerResult( mask=StructureMask(tokens=char_tokens, mask=mask), handler_name=self.name, confidence=1.0, metadata={ "token_count": len(json_tokens), "key_count": sum(1 for t in json_tokens if t.token_type == JSONTokenType.KEY), }, ) def _should_preserve_token( self, token: JSONToken, array_depth: int, array_item_index: int, ) -> bool: """Determine if a token should be preserved. Args: token: The JSON token. array_depth: Current array nesting depth. array_item_index: Index of current item in array. Returns: True if token should be preserved. """ # Always preserve structural tokens if token.is_structural: return True # Whitespace is never preserved if token.token_type == JSONTokenType.WHITESPACE: return False # Numbers: preserve short ones (often IDs) if token.token_type == JSONTokenType.NUMBER: return len(token.text) <= self.max_number_digits # String values: selective preservation if token.token_type == JSONTokenType.STRING_VALUE: # Check if we're past the max array items threshold if array_depth > 0 and array_item_index >= self.max_array_items_full: # In deep array, be more aggressive return False # Preserve short values if self.preserve_short_values and len(token.text) <= self.short_value_threshold: return True # Preserve high-entropy values (UUIDs, hashes) if self.preserve_high_entropy: # Strip quotes for entropy calculation value = token.text.strip('"') score = EntropyScore.compute(value, self.entropy_threshold) if score.should_preserve: return True return False return False def _tokenize_json(self, content: str) -> list[JSONToken]: """Tokenize JSON content into typed tokens. This is a simple tokenizer that identifies JSON structure. It's not a full parser - just enough to identify keys vs values. Args: content: JSON content. Returns: List of JSONToken objects. """ tokens: list[JSONToken] = [] i = 0 n = len(content) # Track if we're expecting a key (after { or ,) expect_key = False brace_stack: list[str] = [] while i < n: char = content[i] # Whitespace if char in " \t\n\r": start = i while i < n and content[i] in " \t\n\r": i += 1 tokens.append(JSONToken(content[start:i], JSONTokenType.WHITESPACE, start, i)) continue # Brackets if char in "{}[]": tokens.append(JSONToken(char, JSONTokenType.BRACKET, i, i + 1)) if char == "{": brace_stack.append("{") expect_key = True elif char == "}": if brace_stack and brace_stack[-1] == "{": brace_stack.pop() expect_key = False elif char == "[": brace_stack.append("[") expect_key = False elif char == "]": if brace_stack and brace_stack[-1] == "[": brace_stack.pop() i += 1 continue # Colon if char == ":": tokens.append(JSONToken(char, JSONTokenType.COLON, i, i + 1)) expect_key = False i += 1 continue # Comma if char == ",": tokens.append(JSONToken(char, JSONTokenType.COMMA, i, i + 1)) # After comma in object, expect key if brace_stack and brace_stack[-1] == "{": expect_key = True i += 1 continue # String (key or value) if char == '"': start = i i += 1 while i < n and content[i] != '"': if content[i] == "\\": i += 2 # Skip escaped character else: i += 1 i += 1 # Include closing quote text = content[start:i] # Determine if this is a key or value # Look ahead for colon (skipping whitespace) j = i while j < n and content[j] in " \t\n\r": j += 1 is_key = j < n and content[j] == ":" and expect_key if is_key: tokens.append(JSONToken(text, JSONTokenType.KEY, start, i)) expect_key = False else: tokens.append(JSONToken(text, JSONTokenType.STRING_VALUE, start, i)) continue # Number if char in "-0123456789": start = i # Match JSON number pattern if char == "-": i += 1 while i < n and content[i] in "0123456789": i += 1 if i < n and content[i] == ".": i += 1 while i < n and content[i] in "0123456789": i += 1 if i < n and content[i] in "eE": i += 1 if i < n and content[i] in "+-": i += 1 while i < n and content[i] in "0123456789": i += 1 tokens.append(JSONToken(content[start:i], JSONTokenType.NUMBER, start, i)) continue # Boolean or null if content[i : i + 4] == "true": tokens.append(JSONToken("true", JSONTokenType.BOOLEAN, i, i + 4)) i += 4 continue if content[i : i + 5] == "false": tokens.append(JSONToken("false", JSONTokenType.BOOLEAN, i, i + 5)) i += 5 continue if content[i : i + 4] == "null": tokens.append(JSONToken("null", JSONTokenType.NULL, i, i + 4)) i += 4 continue # Unknown character - skip i += 1 return tokens def extract_json_schema(content: str) -> dict[str, Any] | list[Any]: """Extract the schema (keys only) from JSON content. Useful for understanding the structure without the values. Args: content: JSON content. Returns: Schema dictionary with keys and types (no values). Example: >>> extract_json_schema('{"name": "Alice", "age": 30}') {'name': 'string', 'age': 'number'} """ def _extract(obj: Any) -> Any: if isinstance(obj, dict): return {k: _extract(v) for k, v in obj.items()} elif isinstance(obj, list): if obj: return [_extract(obj[0])] # Schema of first item return [] elif isinstance(obj, str): return "string" elif isinstance(obj, bool): return "boolean" elif isinstance(obj, int): return "integer" elif isinstance(obj, float): return "number" elif obj is None: return "null" else: return "unknown" try: parsed = json.loads(content) result = _extract(parsed) if isinstance(result, dict): return cast(dict[str, Any], result) elif isinstance(result, list): return cast(list[Any], result) return {} except (json.JSONDecodeError, ValueError): return {}