• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 20155307936

12 Dec 2025 03:28AM UTC coverage: 81.694% (-0.06%) from 81.75%
20155307936

push

github

speedyk-005
feat(chunker): Refactor Line De-annotation and Fix Late-Binding Closure Bug

Lint says your code was sus, but now it's iconic. This commit performs critical maintenance and feature stabilization for the code chunker, primarily focusing on eliminating redundant de-annotation and resolving a latent closure bug.

### Bug Fixes

* **Line De-annotation Duplication:** Removed redundant string slicing logic within the chunker's internal processing. The line de-annotation logic was accidentally being called twice: once during the initial regex substitution step, and again via a manual string slice later on. This duplication created ambiguity and was a potential vector for `ghost slicing` where lines could be misinterpreted. We now rely **only** on the regex substitution to perform de-annotation, simplifying the control flow.

* **Late-Binding Closure Fix:** Fixed a classic Python closure bug in the code annotation loop. The original construct `pattern.sub(lambda match: self._annotate_block(tag, match), code)` caused the lambda to reference the final value of `tag` after the loop completed.

    * **The Fix:** Changed to `pattern.sub(lambda match, tag=tag: self._annotate_block(tag, match), code)`. This uses the default argument trick to *capture the current value* of `tag` at definition time, ensuring each regex substitution uses the correct annotation tag.

### Refactoring

* **Default Comments:** Set the `include_comments` parameter to `True` by default in the chunking utility. This aligns with most developer expectations for comprehensive code processing.

* **Code Cleanup:** Minor internal refactoring and code style cleanup (e.g., using explicit variable names for clarity) to enhance readability and maintainability.

7 of 10 new or added lines in 3 files covered. (70.0%)

1 existing line in 1 file now uncovered.

1080 of 1322 relevant lines covered (81.69%)

4.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.71
/src/chunklet/code_chunker/code_chunker.py
1
"""
2
Author: Speedyk-005 | Copyright (c) 2025 | License: MIT
3

4
Language-Agnostic Code Chunking Utility
5

6
This module provides a robust, convention-aware engine for segmenting source code into
7
semantic units ("chunks") such as functions, classes, namespaces, and logical blocks.
8
Unlike purely heuristic or grammar-dependent parsers, the `CodeChunker` relies on
9
anchored, multi-language regex patterns and indentation rules to identify structures
10
consistently across a variety of programming languages.
11

12
Limitations
13
-----------
14
`CodeChunker` assumes syntactically conventional code. Highly obfuscated, minified,
15
or macro-generated sources may not fully respect its boundary patterns, though such
16
cases fall outside its intended domain.
17

18
Inspired by:
19
    - Camel.utils.chunker.CodeChunker (@ CAMEL-AI.org)
20
    - code-chunker by JimAiMoment
21
    - whats_that_code by matthewdeanmartin
22
    - CintraAI Code Chunker
23
"""
24

25
import sys
5✔
26
from pathlib import Path
5✔
27
from typing import Any, Literal, Callable, Generator, Annotated
5✔
28
from functools import partial
5✔
29
from itertools import chain, accumulate
5✔
30
from more_itertools import unique_everseen
5✔
31
import regex as re
5✔
32
from pydantic import Field
5✔
33
from collections import defaultdict, namedtuple
5✔
34
from box import Box
5✔
35

36
try:
5✔
37
    from littletree import Node
5✔
38
    import defusedxml.ElementTree as ET
5✔
39
except ImportError:
×
40
    Node = None
×
41
    Et = None
×
42

43
from loguru import logger
5✔
44

45
from chunklet.code_chunker.patterns import (
5✔
46
    SINGLE_LINE_COMMENT,
47
    MULTI_LINE_COMMENT,
48
    DOCSTRING_STYLE_ONE,
49
    DOCSTRING_STYLE_TWO,
50
    FUNCTION_DECLARATION,
51
    NAMESPACE_DECLARATION,
52
    METADATA,
53
    OPENER,
54
    CLOSURE,
55
)
56
from chunklet.code_chunker.helpers import is_binary_file, is_python_code
5✔
57
from chunklet.common.path_utils import is_path_like
5✔
58
from chunklet.common.batch_runner import run_in_batch
5✔
59
from chunklet.common.validation import validate_input, restricted_iterable
5✔
60
from chunklet.common.token_utils import count_tokens
5✔
61
from chunklet.exceptions import (
5✔
62
    InvalidInputError,
63
    FileProcessingError,
64
    MissingTokenCounterError,
65
    TokenLimitError,
66
)
67

68

69
CodeLine = namedtuple(
5✔
70
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
71
)
72

73

74
class CodeChunker:
5✔
75
    """
76
    Language-agnostic code chunking utility for semantic code segmentation.
77

78
    Extracts structural units (functions, classes, namespaces) from source code
79
    across multiple programming languages using pattern-based detection and
80
    token-aware segmentation.
81

82
    Key Features:
83
        - Cross-language support (Python, C/C++, Java, C#, JavaScript, Go, etc.)
84
        - Structural analysis with namespace hierarchy tracking
85
        - Configurable token limits with strict/lenient overflow handling
86
        - Flexible docstring and comment processing modes
87
        - Accurate line number preservation and source tracking
88
        - Parallel batch processing for multiple files
89
        - Comprehensive logging and progress tracking
90
    """
91

92
    @validate_input
5✔
93
    def __init__(
5✔
94
        self,
95
        verbose: bool = False,
96
        token_counter: Callable[[str], int] | None = None,
97
    ):
98
        """
99
        Initialize the CodeChunker with optional token counter and verbosity control.
100

101
        Args:
102
            verbose (bool): Enable verbose logging.
103
            token_counter (Callable[[str], int] | None): Function that counts tokens in text.
104
                If None, must be provided when calling chunk() methods.
105
        """
106
        self.token_counter = token_counter
5✔
107
        self.verbose = verbose
5✔
108

109
    def _replace_with_newlines(self, match: re.Match) -> str:
5✔
110
        """Replaces the matched content with an equivalent number of newlines."""
111
        matched_text = match.group(0)
5✔
112

113
        # To preserve the line count when replacing a multi-line block,
114
        # we need to replace N lines of content with N-1 newline characters.
115
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
116
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
5✔
117

118
        return "\n" * num_newlines
5✔
119

120
    def _read_source(self, source: str | Path) -> str:
5✔
121
        """Retrieve source code from file or treat input as raw string.
122

123
        Args:
124
            source (str | Path): File path or raw code string.
125

126
        Returns:
127
            str: Source code content.
128

129
        Raises:
130
            FileProcessingError: When file cannot be read or doesn't exist.
131
        """
132
        if isinstance(source, Path) or is_path_like(source):
5✔
133
            path = Path(source)
5✔
134
            if not path.exists():
5✔
135
                raise FileProcessingError(f"File does not exist: {path}")
5✔
136
            if is_binary_file(path):
×
137
                raise FileProcessingError(f"Binary file not supported: {path}")
×
138
            try:
×
139
                with open(path, "r", encoding="utf-8", errors="replace") as f:
×
140
                    content = f.read()
×
141
                    if self.verbose:
×
142
                        logger.info(
×
143
                            "Successfully read %d characters from {}",
144
                            len(content),
145
                            path,
146
                        )
147
                    return content
×
148
            except Exception as e:
×
149
                raise FileProcessingError(f"Failed to read file: {path}") from e
×
150
        return source
5✔
151

152
    def _annotate_block(self, tag: str, match: re.Match) -> str:
5✔
153
        """Prefix each line in a matched block with a tag for tracking.
154

155
        Args:
156
            tag (str): Tag identifier for the block type.
157
            match (re.Match): Regex match object for the block.
158

159
        Returns:
160
            str: Annotated block with tag prefixes.
161
        """
162
        lines = match.group(0).splitlines()
5✔
163
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
5✔
164

165
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
5✔
166
        """
167
        Extracts the first line from a block-style documentation string.
168

169
        Args:
170
            match (re.Match): Regex match object for the docstring with captured groups.
171

172
        Returns:
173
            str: The summarized docstring line.
174
        """
175
        # HACK: The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
176
        # which results in `None` values for the capturing groups that did not match.
177
        # This list comprehension filters out the `None` values to reliably extract
178
        # the matched content (indent, delimiters, and docstring text).
179
        groups = [g for g in match.groups() if g is not None]
5✔
180
        indent = groups[0]
5✔
181
        l_end = groups[1]
5✔
182
        doc = groups[2].strip()
5✔
183
        r_end = groups[3]
5✔
184

185
        first_line = ""
5✔
186
        for line in doc.splitlines():
5✔
187
            stripped_line = line.strip()
5✔
188
            if stripped_line:
5✔
189
                first_line = stripped_line
5✔
190
                break
5✔
191

192
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
5✔
193
        padding_count = len(match.group(0).splitlines()) - 1
5✔
194
        return summarized_line_content + "\n" * padding_count
5✔
195

196
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
5✔
197
        """
198
        Extracts a summary from line-prefixed documentation comments.
199

200
        Attempts to parse <summary> XML tags; falls back to the first meaningful ine if parsing fails.
201

202
        Args:
203
            match (re.Match): Regex match object for line-based docstring.
204

205
        Returns:
206
            str: The summarized docstring line(s).
207
        """
208
        if not ET:
5✔
209
            raise ImportError(
×
210
                "The 'defusedxml' library is not installed. "
211
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
212
                "with 'pip install 'chunklet-py[code]''"
213
            )
214

215
        indent = match.group(1)
5✔
216
        raw_doc = match.group(0)
5✔
217
        prefix = re.match(r"^\s*(//[/!])\s*", raw_doc).group(1)
5✔
218

219
        # Remove leading '///' or '//!' and optional spaces at start of each line
220
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
5✔
221
        try:
5✔
222
            # Try parsing it as XML
223
            wrapped = f"<root>{clean_doc}</root>"
5✔
224
            root = ET.fromstring(wrapped)
5✔
225
            summary_elem = root.find("summary")
5✔
226
            if summary_elem is not None:
5✔
227
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
5✔
228
            else:
229
                raise ET.ParseError
×
230
        except ET.ParseError:
×
231
            # Fallback: first meaningful line in plain text
232
            summary = ""
×
233
            for line in clean_doc.splitlines():
×
234
                # Skip lines that contain *only tags* (with optional whitespace)
235
                stripped_line = line.strip()
×
236
                if stripped_line and not re.fullmatch(r"<.*>\s*", stripped_line):
×
237
                    summary = stripped_line
×
238
                    break
×
239

240
        # Construct the summarized docstring line
241
        summarized_line_content = "".join(
5✔
242
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
243
        ).lstrip()
244

245
        padding_count = (
5✔
246
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
247
        )
248

249
        return summarized_line_content + "\n" * padding_count
5✔
250

251
    def _merge_tree(self, relations_list: list[list]) -> str:
5✔
252
        """
253
        Merges multiple sets of parent-child relation dictionaries into a single tree
254
        then returns its string representation.
255

256
        Args:
257
            relations_list (list[list]): A list containing relation lists.
258

259
        Returns:
260
            str: The string representation of the tree
261
        """
262
        if not relations_list:
5✔
263
            return "global"
×
264

265
        # Flatten the set of lists into a single iterable
266
        all_relations_flat = chain.from_iterable(relations_list)
5✔
267

268
        # Deduplicate relations
269
        def relation_key(relation: dict):
5✔
270
            return tuple(sorted(relation.items()))
5✔
271

272
        unique_relations = list(unique_everseen(all_relations_flat, key=relation_key))
5✔
273

274
        if not unique_relations:
5✔
275
            return "global"
5✔
276

277
        merged_tree = Node().from_relations(unique_relations, root="global")
5✔
278

279
        return merged_tree.to_string()
5✔
280

281
    def _preprocess(
5✔
282
        self, code: str, include_comments: bool, docstring_mode: str = "all"
283
    ) -> tuple[str, tuple[int, ...]]:
284
        """
285
        Preprocess the code before extraction.
286

287
        Processing steps:
288
          - Optionally remove comments
289
          - Replace docstrings according to mode
290
          - Annotate comments, docstrings, and annotations for later detection
291

292
        Args:
293
            code (str): Source code to preprocess.
294
            include_comments (bool): Whether to include comments in output.
295
            docstring_mode (str): How to handle docstrings.
296

297
        Returns:
298
            tuple[str, tuple[int, ...]]: Preprocessed code with annotations and a tuple of cumulative line lengths.
299
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
300
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
301
        """
302
        # Call at first before any code altering
303
        cumulative_lengths = tuple(
5✔
304
            accumulate(len(line) for line in code.splitlines(keepends=True))
305
        )
306

307
        # Remove comments if not required
308
        if not include_comments:
5✔
309
            code = SINGLE_LINE_COMMENT.sub(
5✔
310
                lambda m: self._replace_with_newlines(m), code
311
            )
312
            code = MULTI_LINE_COMMENT.sub(
5✔
313
                lambda m: self._replace_with_newlines(m), code
314
            )
315

316
        # Process docstrings according to mode
317
        if docstring_mode == "summary":
5✔
318
            code = DOCSTRING_STYLE_ONE.sub(
5✔
319
                lambda m: self._summarize_docstring_style_one(m), code
320
            )
321
            code = DOCSTRING_STYLE_TWO.sub(
5✔
322
                lambda m: self._summarize_docstring_style_two(m), code
323
            )
324
        elif docstring_mode == "excluded":
5✔
325
            code = DOCSTRING_STYLE_ONE.sub(
5✔
326
                lambda m: self._replace_with_newlines(m), code
327
            )
328
            code = DOCSTRING_STYLE_TWO.sub(
5✔
329
                lambda m: self._replace_with_newlines(m), code
330
            )
331
        # else "all": do nothing
332

333
        # List of all regex patterns with the tag to annotate them
334
        patterns_n_tags = [
5✔
335
            (SINGLE_LINE_COMMENT, "COMM"),
336
            (MULTI_LINE_COMMENT, "COMM"),
337
            (DOCSTRING_STYLE_ONE, "DOC"),
338
            (DOCSTRING_STYLE_TWO, "DOC"),
339
            (METADATA, "META"),
340
        ]
341

342
        # Apply _annotate_block to all matches for each pattern
343
        for pattern, tag in patterns_n_tags:
5✔
344
            code = pattern.sub(lambda match, tag=tag: self._annotate_block(tag, match), code)
5✔
345

346
        return code, cumulative_lengths
5✔
347

348
    def _post_processing(self, snippet_dicts: list[dict]):
5✔
349
        """
350
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
351

352
        Args:
353
            snippet_dicts (list[dict]): List of extracted code snippets.
354

355
        Returns:
356
            list[dict]: Snippets with attached namespace trees (as relations).
357
        """
358
        if not Node:
5✔
359
            raise ImportError(
×
360
                "The 'littletree' library is not installed. "
361
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
362
                "with 'pip install 'chunklet-py[code]''"
363
            )
364

365
        def _add_namespace_node(name, indent_level):
5✔
366
            new_node = Node(identifier=name)
5✔
367

368
            current_parent_node, _ = namespaces_stack[-1]
5✔
369
            current_parent_node.add_child(new_node)
5✔
370

371
            namespaces_stack.append((new_node, indent_level))
5✔
372

373
        # The root node will be 'global'
374
        tree_root = Node(identifier="global")
5✔
375

376
        # namespaces_stack: [ (node_reference, indent_level) ]
377
        namespaces_stack = [(tree_root, -1)]
5✔
378

379
        for snippet_dict in snippet_dicts:
5✔
380
            # Remove namespaces until we find the appropriate parent level
381
            while (
5✔
382
                namespaces_stack
383
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
384
            ):
385
                node_to_detach, _ = namespaces_stack.pop()
5✔
386
                if node_to_detach is not tree_root:
5✔
387
                    node_to_detach.detach()
5✔
388

389
            # Handle Namespace Declaration
390
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
5✔
391
            if matched:
5✔
392
                namespace_name = matched.group(1)
5✔
393
                _add_namespace_node(
5✔
394
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
395
                )
396

397
            # Handle Partial Function Signature
398
            if snippet_dict.get("func_partial_signature"):
5✔
399
                _add_namespace_node(
5✔
400
                    name=snippet_dict["func_partial_signature"].strip(),
401
                    indent_level=snippet_dict["indent_level"],
402
                )
403

404
            # Attach the current tree structure as relations
405
            snippet_dict["relations"] = list(tree_root.to_relations())
5✔
406

407
        # Normalize newlines in chunk in place
408
        for snippet_dict in snippet_dicts:
5✔
409
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
5✔
410

411
        return snippet_dicts
5✔
412

413
    def _flush_snippet(
5✔
414
        self,
415
        curr_struct: list[CodeLine],
416
        snippet_dicts: list[dict],
417
        buffer: dict[list],
418
    ) -> None:
419
        """
420
        Consolidate the current structure and any buffered content into a Box and append it to snippet_boxes.
421

422
        Args:
423
            curr_struct (list[tuple]): Accumulated code lines and metadata,
424
                where each element is a tuple containing:
425
                (line_number, line_content, indent_level, func_partial_signature).
426
            snippet_boxes (list[Box]): The list to which the newly created Box will be appended.
427
            buffer (dict[list]): Buffer for intermediate processing (default: empty list).
428
        """
429
        if not curr_struct:
5✔
430
            return
×
431

432
        candidates = [entry for v in buffer.values() for entry in v] + curr_struct
5✔
433
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
5✔
434

435
        if not sorted_candidates:
5✔
436
            return
×
437

438
        content = "\n".join(c.content for c in sorted_candidates)
5✔
439
        start_line = sorted_candidates[0].line_number
5✔
440
        end_line = sorted_candidates[-1].line_number
5✔
441
        indent_level = sorted_candidates[0].indent_level
5✔
442

443
        # Capture the first func_partial_signature
444
        match = next(
5✔
445
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
446
            None,
447
        )
448

449
        snippet_dicts.append(
5✔
450
            {
451
                "content": content,
452
                "indent_level": indent_level,
453
                "start_line": start_line,
454
                "end_line": end_line,
455
                "func_partial_signature": match,
456
            }
457
        )
458
        curr_struct.clear()
5✔
459
        buffer.clear()
5✔
460

461
    def _extract_code_structures(
5✔
462
        self,
463
        source: str | Path,
464
        include_comments: bool,
465
        docstring_mode: str,
466
    ) -> tuple[list[dict], tuple[int, ...]]:
467
        """
468
        Preprocess and parse source into individual snippet boxes.
469

470
        This function-first extraction identifies functions as primary units
471
        while implicitly handling other structures within the function context.
472

473
        Args:
474
            source (str | Path): Raw code string or Path to source file.
475
            include_comments (bool): Whether to include comments in output.
476
            docstring_mode (Literal["summary", "all", "excluded"]): How to handle docstrings.
477

478
        Returns:
479
            tuple[list[dict], tuple[int, ...]]: A tuple containing the list of extracted code structure boxes and the line lengths.
480
        """
481
        source_code = self._read_source(source)
5✔
482
        if not source_code:
5✔
483
            return [], ()
×
484

485
        source_code, cumulative_lengths = self._preprocess(
5✔
486
            source_code, include_comments, docstring_mode
487
        )
488

489
        curr_struct = []
5✔
490
        buffer = defaultdict(list)
5✔
491
        last_indent = None
5✔
492
        inside_func = False
5✔
493
        snippet_dicts = []
5✔
494

495
        for line_no, line in enumerate(source_code.splitlines(), start=1):
5✔
496
            indent_level = len(line) - len(line.lstrip())
5✔
497

498
            # Detect annotated lines
499
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
5✔
500
            if matched:
5✔
501
                # Flush DOC buffer if not consecutive
502
                # Prevent storing multiple docstrings in the same buffer
503
                if buffer["DOC"] and buffer["DOC"][-1][0] != line_no - 1:
5✔
504
                    self._flush_snippet(curr_struct, snippet_dicts, buffer)
×
505
                    inside_func = False
×
506

507
                tag = matched.group(1)
5✔
508
                deannoted_line = (
5✔
509
                    line[: matched.start()] + line[matched.end() :]
510
                )  # slice off the annotation
511
                buffer[tag].append(
5✔
512
                    CodeLine(line_no, deannoted_line, indent_level, None)
513
                )
514
                continue
5✔
515

516
            # Top-level block detection
517
            namespace_start = NAMESPACE_DECLARATION.match(line)
5✔
518
            func_start = FUNCTION_DECLARATION.match(line)
5✔
519
            if namespace_start or (func_start and not inside_func):
5✔
520
                last_indent = indent_level
5✔
521

522
                # If it is a Python code, we can flush everything, else we won't flush the docstring yet
523
                # This helps including the docstring that is on top of block definition in the other languages
524
                if curr_struct:
5✔
525
                    if is_python_code(source):
5✔
526
                        self._flush_snippet(curr_struct, snippet_dicts, buffer)
5✔
527
                    else:
528
                        doc = buffer.pop("DOC", [])
5✔
529
                        self._flush_snippet(curr_struct, snippet_dicts, buffer)
5✔
530
                        buffer.clear()
5✔
531
                        buffer["doc"] = doc
5✔
532

533
            # We don't want to extract nestled blocks
534
            if func_start:
5✔
535
                inside_func = True
5✔
536

537
            # Manage block accumulation
538
            if curr_struct:
5✔
539
                last_indent = last_indent or 0
5✔
540
                if (
5✔
541
                    line.strip()
542
                    and indent_level <= last_indent
543
                    and not (OPENER.match(line) or CLOSURE.match(line))
544
                ):  # Block end
545
                    self._flush_snippet(curr_struct, snippet_dicts, buffer)
5✔
546
                    curr_struct = [
5✔
547
                        CodeLine(
548
                            line_no,
549
                            line,
550
                            indent_level,
551
                            func_start.group(0) if func_start else None,
552
                        )
553
                    ]
554
                    last_indent = None
5✔
555
                    inside_func = False
5✔
556
                else:
557
                    curr_struct.append(CodeLine(line_no, line, indent_level, None))
5✔
558
            else:
559
                curr_struct = [
5✔
560
                    CodeLine(
561
                        line_no,
562
                        line,
563
                        indent_level,
564
                        func_start.group(0) if func_start else None,
565
                    )
566
                ]
567

568
        # Append last snippet
569
        if curr_struct:
5✔
570
            self._flush_snippet(curr_struct, snippet_dicts, buffer)
5✔
571

572
        return self._post_processing(snippet_dicts), cumulative_lengths
5✔
573

574
    def _split_oversized(
5✔
575
        self,
576
        snippet_dict: dict,
577
        max_tokens: int,
578
        max_lines: int,
579
        source: str | Path,
580
        token_counter: Callable | None,
581
        cumulative_lengths: tuple[int, ...],
582
    ):
583
        """
584
        Split an oversized structural block into smaller sub-chunks.
585

586
        This helper is used when a single code block exceeds the maximum
587
        token limit and `strict_mode` is disabled. It divides the block's
588
        content into token-bounded fragments while preserving line order
589
        and basic metadata.
590

591
        Args:
592
            snippet_dict (dict): The oversized snippet to split.
593
            max_tokens (int): Maximum tokens per sub-chunk.
594
            max_lines (int): Maximum lines per sub-chunk.
595
            source (str | Path): The source of the code.
596
            token_counter (Callable | None): The token counting function.
597
            cumulative_lengths (tuple[int, ...]): The cumulative lengths of the lines in the source code.
598

599
        Returns:
600
            list[Box]: A list of sub-chunks derived from the original block.
601
        """
602
        sub_boxes = []
5✔
603
        curr_chunk = []
5✔
604
        token_count = 0
5✔
605
        line_count = 0
5✔
606

607
        # Iterate through each line in the snippet_dict content
608
        for line_no, line in enumerate(
5✔
609
            snippet_dict["content"].splitlines(), start=snippet_dict["start_line"]
610
        ):
611
            line_tokens = (
5✔
612
                count_tokens(line, token_counter) if max_tokens != sys.maxsize else 0
613
            )
614

615
            # If adding this line would exceed either max_tokens or max_lines, commit current chunk
616
            if (token_count + line_tokens > max_tokens) or (line_count + 1 > max_lines):
5✔
617
                if curr_chunk:  # avoid empty chunk creation
5✔
618
                    start_line = line_no - len(curr_chunk)
5✔
619
                    end_line = line_no - 1
5✔
620
                    start_span = (
5✔
621
                        0 if start_line == 1 else cumulative_lengths[start_line - 2]
622
                    )
623
                    end_span = cumulative_lengths[end_line - 1]
5✔
624
                    tree = Node.from_relations(snippet_dict["relations"]).to_string()
5✔
625
                    sub_boxes.append(
5✔
626
                        Box(
627
                            {
628
                                "content": "\n".join(curr_chunk),
629
                                "metadata": {
630
                                    "tree": tree,
631
                                    "start_line": start_line,
632
                                    "end_line": end_line,
633
                                    "span": (start_span, end_span),
634
                                    "source": (
635
                                        str(source)
636
                                        if isinstance(source, (str, Path))
637
                                        else "N/A"
638
                                    ),
639
                                },
640
                            }
641
                        )
642
                    )
643
                curr_chunk.clear()
5✔
644
                token_count = 0
5✔
645
                line_count = 0
5✔
646

647
            curr_chunk.append(line)
5✔
648
            token_count += line_tokens
5✔
649
            line_count += 1
5✔
650

651
        # Add any remaining chunk at the end
652
        if curr_chunk:
5✔
653
            start_line = snippet_dict["end_line"] - len(curr_chunk) + 1
5✔
654
            end_line = snippet_dict["end_line"]
5✔
655
            start_span = 0 if start_line == 1 else cumulative_lengths[start_line - 2]
5✔
656
            end_span = cumulative_lengths[end_line - 1]
5✔
657
            tree = Node.from_relations(snippet_dict["relations"]).to_string()
5✔
658
            sub_boxes.append(
5✔
659
                Box(
660
                    {
661
                        "content": "\n".join(curr_chunk),
662
                        "metadata": {
663
                            "tree": tree,
664
                            "start_line": start_line,
665
                            "end_line": end_line,
666
                            "span": (start_span, end_span),
667
                            "source": (
668
                                str(source)
669
                                if (isinstance(source, Path) or is_path_like(source))
670
                                else "N/A"
671
                            ),
672
                        },
673
                    }
674
                )
675
            )
676

677
        return sub_boxes
5✔
678

679
    def _validate_constraints(
5✔
680
        self,
681
        max_tokens: int | None,
682
        max_lines: int | None,
683
        max_functions: int | None,
684
        token_counter: Callable[[str], int] | None,
685
    ) -> tuple[int, int, int]:
686
        """
687
        Validates that at least one chunking constraint is provided and sets default values.
688

689
        Args:
690
            max_tokens (int | None): Maximum number of tokens per chunk.
691
            max_lines (int | None): Maximum number of lines per chunk.
692
            max_functions (int | None): Maximum number of functions per chunk.
693
            token_counter (Callable[[str], int] | None): Function that counts tokens in text.
694

695
        Returns:
696
            tuple[int, int, int]: Adjusted max_tokens, max_lines, and max_functions values.
697

698
        Raises:
699
            InvalidInputError: If no chunking constraints are provided.
700
            MissingTokenCounterError: If `max_tokens` is provided but no `token_counter` is provided.
701
        """
702
        if not any((max_tokens, max_lines, max_functions)):
5✔
703
            raise InvalidInputError(
5✔
704
                "At least one of 'max_tokens', 'max_lines', or 'max_functions' must be provided."
705
            )
706

707
        # If token_counter is required but not provided
708
        if max_tokens is not None and not (token_counter or self.token_counter):
5✔
709
            raise MissingTokenCounterError()
5✔
710

711
        # Adjust limits for internal use
712
        if max_tokens is None:
5✔
713
            max_tokens = sys.maxsize
5✔
714
        if max_lines is None:
5✔
715
            max_lines = sys.maxsize
5✔
716
        if max_functions is None:
5✔
717
            max_functions = sys.maxsize
5✔
718

719
        return max_tokens, max_lines, max_functions
5✔
720

721
    @validate_input
5✔
722
    def chunk(
5✔
723
        self,
724
        source: str | Path,
725
        *,
726
        max_tokens: Annotated[int | None, Field(ge=12)] = None,
727
        max_lines: Annotated[int | None, Field(ge=5)] = None,
728
        max_functions: Annotated[int | None, Field(ge=1)] = None,
729
        token_counter: Callable[[str], int] | None = None,
730
        include_comments: bool = True,
731
        docstring_mode: Literal["summary", "all", "excluded"] = "all",
732
        strict: bool = True,
733
    ) -> list[Box]:
734
        """
735
        Extract semantic code chunks from source using multi-dimensional analysis.
736

737
        Processes source code by identifying structural boundaries (functions, classes,
738
        namespaces) and grouping content based on multiple constraints including
739
        tokens, lines, and logical units while preserving semantic coherence.
740

741
        Args:
742
            source (str | Path): Raw code string or file path to process.
743
            max_tokens (int, optional): Maximum tokens per chunk. Must be >= 12.
744
            max_lines (int, optional): Maximum number of lines per chunk. Must be >= 5.
745
            max_functions (int, optional): Maximum number of functions per chunk. Must be >= 1.
746
            token_counter (Callable, optional): Token counting function. Uses instance
747
                counter if None. Required for token-based chunking.
748
            include_comments (bool): Include comments in output chunks. Default: True.
749
            docstring_mode(Literal["summary", "all", "excluded"]): Docstring processing strategy:
750
                - "summary": Include only first line of docstrings
751
                - "all": Include complete docstrings
752
                - "excluded": Remove all docstrings
753
                Defaults to "all"
754
            strict (bool): If True, raise error when structural blocks exceed
755
                max_tokens. If False, split oversized blocks. Default: True.
756

757
        Returns:
758
            list[Box]: List of code chunks with metadata. Each Box contains:
759
                - content (str): Code content
760
                - tree (str): Namespace hierarchy
761
                - start_line (int): Starting line in original source
762
                - end_line (int): Ending line in original source
763
                - span (tuple[int, int]): Character-level span (start and end offsets) in the original source.
764
                - source_path (str): Source file path or "N/A"
765

766
        Raises:
767
            InvalidInputError: Invalid configuration parameters.
768
            MissingTokenCounterError: No token counter available.
769
            FileProcessingError: Source file cannot be read.
770
            TokenLimitError: Structural block exceeds max_tokens in strict mode.
771
            CallbackError: If the token counter fails or returns an invalid type.
772
        """
773
        max_tokens, max_lines, max_functions = self._validate_constraints(
5✔
774
            max_tokens, max_lines, max_functions, token_counter
775
        )
776
        token_counter = token_counter or self.token_counter
5✔
777

778
        if not source.strip():
5✔
NEW
779
            if self.verbose:
×
NEW
780
                logger.info("Input source is empty. Returning empty list.")
×
NEW
781
            return []
×
782

783
        if self.verbose:
5✔
784
            logger.info(
×
785
                "Starting chunk processing for {}",
786
                (
787
                    f"source: {str(Path)}"
788
                    if (isinstance(str, Path) or is_path_like(source))
789
                    else f"code starting with:\n```\n{source[:100]}...\n```\n"
790
                ),
791
            )
792

793
        snippet_dicts, cumulative_lengths = self._extract_code_structures(
5✔
794
            source, include_comments, docstring_mode
795
        )
796

797
        if self.verbose:
5✔
798
            logger.info(
×
799
                "Extracted {} structural blocks from source", len(snippet_dicts)
800
            )
801

802
        # Grouping logic
803

804
        merged_content = []
5✔
805
        relations_list = []
5✔
806
        start_line = None
5✔
807
        end_line = None
5✔
808
        token_count = 0
5✔
809
        line_count = 0
5✔
810
        function_count = 0
5✔
811
        result_chunks = []
5✔
812

813
        index = 0
5✔
814
        while index < len(snippet_dicts):
5✔
815
            snippet_dict = snippet_dicts[index]
5✔
816
            box_tokens = (
5✔
817
                count_tokens(snippet_dict["content"], token_counter)
818
                if max_tokens != sys.maxsize
819
                else 0
820
            )
821
            box_lines = snippet_dict["content"].count("\n") + (
5✔
822
                1 if snippet_dict["content"] else 0
823
            )
824
            is_function = bool(snippet_dict.get("func_partial_signature"))
5✔
825

826
            # Check if adding this snippet exceeds any limits
827
            token_limit_reached = token_count + box_tokens > max_tokens
5✔
828
            line_limit_reached = line_count + box_lines > max_lines
5✔
829
            function_limit_reached = is_function and (
5✔
830
                function_count + 1 > max_functions
831
            )
832

833
            if not (
5✔
834
                token_limit_reached or line_limit_reached or function_limit_reached
835
            ):
836
                # Fits: merge normally
837
                merged_content.append(snippet_dict["content"])
5✔
838
                relations_list.append(snippet_dict["relations"])
5✔
839
                token_count += box_tokens
5✔
840
                line_count += box_lines
5✔
841
                if is_function:
5✔
842
                    function_count += 1
5✔
843

844
                if start_line is None:
5✔
845
                    start_line = snippet_dict["start_line"]
5✔
846
                end_line = snippet_dict["end_line"]
5✔
847
                index += 1
5✔
848

849
            elif not merged_content:
5✔
850
                # Too big and nothing merged yet: handle oversize
851
                if strict:
5✔
852
                    raise TokenLimitError(
5✔
853
                        f"Structural block exceeds maximum limit (tokens: {box_tokens} > {max_tokens}, "
854
                        f"lines: {box_lines} > {max_lines}, or functions: {int(is_function)} > {max_functions}).\n"
855
                        f"Content starting with: \n```\n{snippet_dict['content'][:100]}...\n```\n"
856
                        "Reason: Prevent splitting inside interest points (function, class, region, ...)\n"
857
                        "💡Hint: Consider increasing 'max_tokens', 'max_lines', or 'max_functions', "
858
                        "refactoring the oversized block, or setting 'strict=False' to allow automatic splitting of oversized blocks."
859
                    )
860
                else:  # Else split further
861
                    if self.verbose:
5✔
862
                        logger.warning(
×
863
                            "Splitting oversized block (tokens: {} lines: {}) into sub-chunks",
864
                            box_tokens,
865
                            box_lines,
866
                        )
867

868
                    sub_chunks = self._split_oversized(
5✔
869
                        snippet_dict,
870
                        max_tokens,
871
                        max_lines,
872
                        source,
873
                        token_counter,
874
                        cumulative_lengths,
875
                    )
876

877
                    for sub_chunk in sub_chunks:
5✔
878
                        sub_chunk.metadata.chunk_num = len(result_chunks) + 1
5✔
879
                        result_chunks.append(sub_chunk)
5✔
880
                    index += 1
5✔
881
            else:
882
                # Flush current merged content as a chunk
883
                start_span = (
5✔
884
                    0 if start_line == 1 else cumulative_lengths[start_line - 2]
885
                )
886
                end_span = cumulative_lengths[end_line - 1]
5✔
887
                merged_chunk = Box(
5✔
888
                    {
889
                        "content": "\n".join(merged_content),
890
                        "metadata": {
891
                            "chunk_num": len(result_chunks) + 1,
892
                            "tree": self._merge_tree(relations_list),
893
                            "start_line": start_line,
894
                            "end_line": end_line,
895
                            "span": (start_span, end_span),
896
                            "source": (
897
                                str(source)
898
                                if (isinstance(source, Path) or is_path_like(source))
899
                                else "N/A"
900
                            ),
901
                        },
902
                    }
903
                )
904
                result_chunks.append(merged_chunk)
5✔
905

906
                # Reset for next chunk
907
                merged_content.clear()
5✔
908
                relations_list.clear()
5✔
909
                start_line = None
5✔
910
                end_line = None
5✔
911
                token_count = 0
5✔
912
                line_count = 0
5✔
913
                function_count = 0
5✔
914

915
        # Flush remaining content
916
        if merged_content:
5✔
917
            start_span = 0 if start_line == 1 else cumulative_lengths[start_line - 2]
5✔
918
            end_span = cumulative_lengths[end_line - 1]
5✔
919
            merged_chunk = Box(
5✔
920
                {
921
                    "content": "\n".join(merged_content),
922
                    "metadata": {
923
                        "chunk_num": len(result_chunks) + 1,
924
                        "tree": self._merge_tree(relations_list),
925
                        "start_line": start_line,
926
                        "end_line": end_line,
927
                        "span": (start_span, end_span),
928
                        "source": (
929
                            str(source)
930
                            if (isinstance(source, Path) or is_path_like(source))
931
                            else "N/A"
932
                        ),
933
                    },
934
                }
935
            )
936
            result_chunks.append(merged_chunk)
5✔
937

938
        if self.verbose:
5✔
939
            logger.info(
×
940
                "Generated {} chunk(s) for the {}",
941
                len(result_chunks),
942
                (
943
                    f"source: {str(Path)}"
944
                    if (isinstance(str, Path) or is_path_like(source))
945
                    else f"code starting with:\n```\n{source[:100]}..\n```\n"
946
                ),
947
            )
948

949
        return result_chunks
5✔
950

951
    @validate_input
5✔
952
    def batch_chunk(
5✔
953
        self,
954
        sources: restricted_iterable(str | Path),
955
        *,
956
        max_tokens: Annotated[int | None, Field(ge=12)] = None,
957
        max_lines: Annotated[int | None, Field(ge=5)] = None,
958
        max_functions: Annotated[int | None, Field(ge=1)] = None,
959
        token_counter: Callable[[str], int] | None = None,
960
        separator: Any = None,
961
        include_comments: bool = True,
962
        docstring_mode: Literal["summary", "all", "excluded"] = "all",
963
        strict: bool = True,
964
        n_jobs: Annotated[int, Field(ge=1)] | None = None,
965
        show_progress: bool = True,
966
        on_errors: Literal["raise", "skip", "break"] = "raise",
967
    ) -> Generator[Box, None, None]:
968
        """
969
        Process multiple source files or code strings in parallel.
970

971
        Leverages multiprocessing to efficiently chunk multiple code sources,
972
        applying consistent chunking rules across all inputs.
973

974
        Args:
975
            sources (restricted_iterable[str | Path]): A restricted iterable of file paths or raw code strings to process.
976
            max_tokens (int, optional): Maximum tokens per chunk. Must be >= 12.
977
            max_lines (int, optional): Maximum number of lines per chunk. Must be >= 5.
978
            max_functions (int, optional): Maximum number of functions per chunk. Must be >= 1.
979
            token_counter (Callable | None): Token counting function. Uses instance
980
                counter if None. Required for token-based chunking.
981
            separator (Any): A value to be yielded after the chunks of each text are processed.
982
                Note: None cannot be used as a separator.
983
            include_comments (bool): Include comments in output chunks. Default: True.
984
            docstring_mode(Literal["summary", "all", "excluded"]): Docstring processing strategy:
985
                - "summary": Include only first line of docstrings
986
                - "all": Include complete docstrings
987
                - "excluded": Remove all docstrings
988
                Defaults to "all"
989
            strict (bool): If True, raise error when structural blocks exceed
990
                max_tokens. If False, split oversized blocks. Default: True.
991
            n_jobs (int | None): Number of parallel workers. Uses all available CPUs if None.
992
            show_progress (bool): Display progress bar during processing. Defaults to True.
993
            on_errors (Literal["raise", "skip", "break"]):
994
                How to handle errors during processing. Defaults to 'raise'.
995

996
        yields:
997
            Box: `Box` object, representing a chunk with its content and metadata.
998
                Includes:
999
                - content (str): Code content
1000
                - tree (str): Namespace hierarchy
1001
                - start_line (int): Starting line in original source
1002
                - end_line (int): Ending line in original source
1003
                - span (tuple[int, int]): Character-level span (start and end offsets) in the original source.
1004
                - source_path (str): Source file path or "N/A"
1005

1006
        Raises:
1007
            InvalidInputError: Invalid input parameters.
1008
            MissingTokenCounterError: No token counter available.
1009
            FileProcessingError: Source file cannot be read.
1010
            TokenLimitError: Structural block exceeds max_tokens in strict mode.
1011
            CallbackError: If the token counter fails or returns an invalid type.
1012
        """
1013
        chunk_func = partial(
5✔
1014
            self.chunk,
1015
            max_tokens=max_tokens,
1016
            max_lines=max_lines,
1017
            max_functions=max_functions,
1018
            token_counter=token_counter or self.token_counter,
1019
            include_comments=include_comments,
1020
            docstring_mode=docstring_mode,
1021
            strict=strict,
1022
        )
1023

1024
        yield from run_in_batch(
5✔
1025
            func=chunk_func,
1026
            iterable_of_args=sources,
1027
            iterable_name="sources",
1028
            separator=separator,
1029
            n_jobs=n_jobs,
1030
            show_progress=show_progress,
1031
            on_errors=on_errors,
1032
            verbose=self.verbose,
1033
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc