• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 24647245391

20 Apr 2026 03:37AM UTC coverage: 90.65% (-0.02%) from 90.671%
24647245391

push

github

speedyk-005
feat(document): update SECTION_BREAK_PATTERN with broader sectioning support

1367 of 1508 relevant lines covered (90.65%)

3.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/src/chunklet/code_chunker/_code_structure_extractor.py
1
"""
2
Internal module for extracting code structures from source code files.
3

4
Provides functionality to parse and analyze code syntax trees, identifying functions,
5
classes, namespaces, and other structural elements.
6
This module is used by CodeChunker to understand code structure before
7
splitting into chunks.
8
"""
9

10
import re
4✔
11
from collections import defaultdict, namedtuple
4✔
12
from itertools import accumulate
4✔
13
from pathlib import Path
4✔
14

15
try:
4✔
16
    import defusedxml.ElementTree as ET
4✔
17
    from littletree import Node
4✔
18
except ImportError:  # pragma: no cover
19
    Node, ET = None, None
20

21
from loguru import logger
4✔
22

23
from chunklet.code_chunker.patterns import (
4✔
24
    ALL_SINGLE_LINE_COMM,
25
    CLOSER,
26
    DOCSTRING_STYLE_ONE,
27
    DOCSTRING_STYLE_TWO,
28
    FULL_LINE_SINGLE_COMM,
29
    FUNCTION_DECLARATION,
30
    METADATA,
31
    MULTI_LINE_COMM,
32
    MULTI_LINE_STRING_ASSIGN,
33
    NAMESPACE_DECLARATION,
34
    OPENER,
35
)
36
from chunklet.common.validation import validate_input
4✔
37
from chunklet.common.logging_utils import log_info
4✔
38

39
CodeLine = namedtuple(
4✔
40
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
41
)
42

43

44
class CodeStructureExtractor:
4✔
45
    """Extracts structural units from source code.
46

47
    This class provides functionality to parse source code files and identify functions,
48
    classes, namespaces, and other structural elements using a language-agnostic approach.
49
    """
50

51
    @validate_input
4✔
52
    def __init__(self, verbose: bool = False):
4✔
53
        self.verbose = verbose
4✔
54

55
    def _replace_with_newlines(self, match: re.Match) -> str:
4✔
56
        """Replaces the matched content with an equivalent number of newlines."""
57
        matched_text = match.group(0)
4✔
58

59
        # To preserve the line count when replacing a multi-line block,
60
        # we need to replace N lines of content with N-1 newline characters.
61
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
62
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
4✔
63

64
        return "\n" * num_newlines
4✔
65

66
    def _annotate_block(self, tag: str, match: re.Match) -> str:
4✔
67
        """Prefix each line in a matched block with a tag for tracking.
68

69
        Args:
70
            tag (str): Tag identifier for the block type.
71
            match (re.Match): Regex match object for the block.
72

73
        Returns:
74
            str: Annotated block with tag prefixes.
75
        """
76
        lines = match.group(0).splitlines()
4✔
77
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
4✔
78

79
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
4✔
80
        """
81
        Extracts the first line from a block-style documentation string.
82

83
        Args:
84
            match (re.Match): Regex match object for the docstring with captured groups.
85

86
        Returns:
87
            str: The summarized docstring line.
88
        """
89
        # The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
90
        # which results in `None` values for the capturing groups that did not match.
91
        # filters out the `None` values to reliably extract while preserving the empty string indent
92
        groups = [g for g in match.groups() if g is not None]
4✔
93
        indent, l_end, doc, r_end = groups
4✔
94

95
        first_line = ""
4✔
96
        for line in doc.strip().splitlines():
4✔
97
            stripped_line = line.strip()
4✔
98
            if stripped_line:
4✔
99
                first_line = stripped_line
4✔
100
                break
4✔
101

102
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
4✔
103
        padding_count = len(match.group(0).splitlines()) - 1
4✔
104
        return summarized_line_content + "\n" * padding_count
4✔
105

106
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
4✔
107
        """
108
        Extracts a summary from line-prefixed documentation comments.
109

110
        Attempts to parse <summary> XML tags; falls back to the first meaningful ine if parsing fails.
111

112
        Args:
113
            match (re.Match): Regex match object for line-based docstring.
114

115
        Returns:
116
            str: The summarized docstring line(s).
117
        """
118
        if not ET:
4✔
119
            raise ImportError(
×
120
                "The 'defusedxml' library is not installed. "
121
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
122
                "with 'pip install 'chunklet-py[code]''"
123
            )
124

125
        indent = match.group(1)
4✔
126
        raw_doc = match.group(0)
4✔
127
        prefix = re.match(r"^\s*(//[/!]|%%|##)\s*", raw_doc).group(1)
4✔
128

129
        # Remove leading '///' '%%', '##' or '//!' and optional spaces at start of each line
130
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
4✔
131
        try:
4✔
132
            # Try parsing it as XML
133
            wrapped = f"<root>{clean_doc}</root>"
4✔
134
            root = ET.fromstring(wrapped)
4✔
135
            summary_elem = root.find("summary")
4✔
136
            if summary_elem is not None:
4✔
137
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
×
138
            else:
139
                raise ET.ParseError
4✔
140
        except ET.ParseError:
4✔
141
            # Fallback: first meaningful line in plain text
142
            summary = ""
4✔
143
            for line in clean_doc.splitlines():
4✔
144
                # Skip lines that contain *only tags* (with optional whitespace)
145
                stripped_line = line.strip()
4✔
146
                if stripped_line and not re.fullmatch(r"\s*<[^>]*>\s*", stripped_line):
4✔
147
                    summary = stripped_line
4✔
148
                    break
4✔
149

150
        # Construct the summarized docstring line
151
        summarized_line_content = "".join(
4✔
152
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
153
        ).lstrip()
154

155
        padding_count = (
4✔
156
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
157
        )
158

159
        return summarized_line_content + "\n" * padding_count
4✔
160

161
    def _preprocess(
4✔
162
        self, code: str, include_comments: bool, docstring_mode: str = "all"
163
    ) -> tuple[str, tuple[int, ...]]:
164
        """
165
        Preprocess the code before extraction.
166

167
        Processing steps:
168
          - Optionally remove comments
169
          - Replace docstrings according to mode
170
          - Annotate comments, docstrings, and annotations for later detection
171

172
        Args:
173
            code (str): Source code to preprocess.
174
            include_comments (bool): Whether to include comments in output.
175
            docstring_mode (str): How to handle docstrings.
176

177
        Returns:
178
            tuple[str, tuple[int, ...]]: Preprocessed code with annotations and a tuple of cumulative line lengths.
179
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
180
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
181
        """
182
        # Call at first to preserve span accurary befire any altering
183
        # Pad with 0 so cumulative_lengths[line_number - 1] == start_char_offset
184
        cumulative_lengths = (0,) + tuple(
4✔
185
            accumulate(len(line) for line in code.splitlines(keepends=True))
186
        )
187

188
        # Remove comments if not required
189
        if not include_comments:
4✔
190
            code = ALL_SINGLE_LINE_COMM.sub(
4✔
191
                lambda m: self._replace_with_newlines(m), code
192
            )
193
            code = MULTI_LINE_COMM.sub(lambda m: self._replace_with_newlines(m), code)
4✔
194

195
        # Process docstrings according to mode
196
        if docstring_mode == "summary":
4✔
197
            code = DOCSTRING_STYLE_ONE.sub(
4✔
198
                lambda m: self._summarize_docstring_style_one(m), code
199
            )
200
            code = DOCSTRING_STYLE_TWO.sub(
4✔
201
                lambda m: self._summarize_docstring_style_two(m), code
202
            )
203
        elif docstring_mode == "excluded":
4✔
204
            code = DOCSTRING_STYLE_ONE.sub(
4✔
205
                lambda m: self._replace_with_newlines(m), code
206
            )
207
            code = DOCSTRING_STYLE_TWO.sub(
4✔
208
                lambda m: self._replace_with_newlines(m), code
209
            )
210
        # Else "all": do nothing
211

212
        # List of all regex patterns with the tag to annotate them
213
        patterns_n_tags = [
4✔
214
            (MULTI_LINE_STRING_ASSIGN, "STR"),
215
            (FULL_LINE_SINGLE_COMM, "COMM"),
216
            (MULTI_LINE_COMM, "COMM"),
217
            (DOCSTRING_STYLE_ONE, "DOC"),
218
            (DOCSTRING_STYLE_TWO, "DOC"),
219
            (METADATA, "META"),
220
        ]
221

222
        # Apply _annotate_block to all matches for each pattern
223
        for pattern, tag in patterns_n_tags:
4✔
224
            code = pattern.sub(
4✔
225
                lambda match, tag=tag: self._annotate_block(tag, match), code
226
            )
227

228
        return code, cumulative_lengths
4✔
229

230
    def _post_processing(self, snippet_dicts: list[dict]):
4✔
231
        """
232
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
233

234
        Args:
235
            snippet_dicts (list[dict]): List of extracted code snippets.
236

237
        Returns:
238
            list[dict]: Snippets with attached namespace trees (as relations).
239
        """
240
        if not Node:
4✔
241
            raise ImportError(
×
242
                "The 'littletree' library is not installed. "
243
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
244
                "with 'pip install 'chunklet-py[code]''"
245
            )
246

247
        def _add_namespace_node(name, indent_level):
4✔
248
            new_node = Node(identifier=name)
4✔
249

250
            current_parent_node, _ = namespaces_stack[-1]
4✔
251
            current_parent_node.add_child(new_node)
4✔
252

253
            namespaces_stack.append((new_node, indent_level))
4✔
254

255
        # The root node will be 'global'
256
        tree_root = Node(identifier="global")
4✔
257

258
        # namespaces_stack: [ (node_reference, indent_level) ]
259
        namespaces_stack = [(tree_root, -1)]
4✔
260

261
        for snippet_dict in snippet_dicts:
4✔
262
            # Remove namespaces until we find the appropriate parent level
263
            while (
4✔
264
                namespaces_stack
265
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
266
            ):
267
                node_to_detach, _ = namespaces_stack.pop()
4✔
268
                if node_to_detach is not tree_root:
4✔
269
                    node_to_detach.detach()
4✔
270

271
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
4✔
272
            if matched:
4✔
273
                namespace_name = matched.group(1)
4✔
274
                _add_namespace_node(
4✔
275
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
276
                )
277

278
            if snippet_dict.get("func_partial_signature"):
4✔
279
                _add_namespace_node(
4✔
280
                    name=snippet_dict["func_partial_signature"].strip(),
281
                    indent_level=snippet_dict["indent_level"],
282
                )
283

284
            # Attach the current tree structure as relation
285
            snippet_dict["relations"] = list(tree_root.to_relations())
4✔
286

287
        # Normalize newlines in chunk in place
288
        for snippet_dict in snippet_dicts:
4✔
289
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
4✔
290

291
        return snippet_dicts
4✔
292

293
    def _flush_snippet(
4✔
294
        self,
295
        curr_struct: list[CodeLine],
296
        snippet_dicts: list[dict],
297
        buffer: dict[str, list],
298
    ) -> None:
299
        """
300
        Consolidate the current structure and any buffered content into a DotDict and append it to snippet_boxes.
301

302
        It automatically flushs the buffer.
303

304
        Args:
305
            curr_struct (list[tuple]): Accumulated code lines and metadata,
306
                where each element is a tuple containing:
307
                (line_number, line_content, indent_level, func_partial_signature).
308
            snippet_boxes (list[DotDict]): The list to which the newly created DotDict will be appended.
309
            buffer (dict[str, list]): Buffer for intermediate processing (default: empty list).
310
        """
311
        if not (curr_struct or buffer):
4✔
312
            return
4✔
313

314
        candidates = [entry for v in buffer.values() for entry in v] + curr_struct
4✔
315
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
4✔
316

317
        if not sorted_candidates:
4✔
318
            return
4✔
319

320
        content = "\n".join(c.content for c in sorted_candidates)
4✔
321
        start_line = sorted_candidates[0].line_number
4✔
322
        end_line = sorted_candidates[-1].line_number
4✔
323
        indent_level = next((c.indent_level for c in curr_struct if c.content), 0)
4✔
324
        func_partial_signature = next(
4✔
325
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
326
            None,
327
        )
328

329
        snippet_dicts.append(
4✔
330
            {
331
                "content": content,
332
                "indent_level": indent_level,
333
                "start_line": start_line,
334
                "end_line": end_line,
335
                "func_partial_signature": func_partial_signature,
336
            }
337
        )
338
        curr_struct.clear()
4✔
339
        buffer.clear()
4✔
340

341
    def _handle_annotated_line(
4✔
342
        self,
343
        line: str,
344
        line_no: int,
345
        matched: re.Match,
346
        buffer: dict,
347
        state: dict,
348
    ):
349
        """
350
        Handle processing of annotated lines (comments, docstrings, etc.).
351

352
        It automatically flushes the current struct if the current line is the only decorator.
353

354
        Args:
355
            line (str): The annotated line detected.
356
            line_no (int): The number of the line based on one index.
357
            matched(re.Match): Regex match object for the annotated line.
358
            buffer (dict): Buffer for intermediate processing.
359
            state (dict): The state dictionary that holds info about current structure, last indentation level,
360
                function scope, and the snippet dicts (extracted blocks).
361
        """
362
        tag = matched.group(1)
4✔
363
        deannotated_line = (
4✔
364
            line[: matched.start()] + line[matched.end() :]
365
        )  # Slice off the annotation
366

367
        # Now we can calculate the proper indentation level
368
        indent_level = len(deannotated_line) - len(deannotated_line.lstrip())
4✔
369

370
        first_metadata = tag == "META" and not buffer["META"]
4✔
371
        consecutive_docstrings = (
4✔
372
            buffer["DOC"] and buffer["DOC"][-1].line_number == line_no - 1
373
        )
374

375
        if first_metadata or not consecutive_docstrings:
4✔
376
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
4✔
377

378
        buffer[tag].append(CodeLine(line_no, deannotated_line, indent_level, None))
4✔
379

380
    def _handle_block_start(
4✔
381
        self,
382
        line: str,
383
        indent_level: int,
384
        buffer: dict,
385
        state: dict,
386
        code: str | Path,
387
        func_start: str | None = None,
388
        is_python_code: bool = False,
389
    ):
390
        """
391
        Detects top-level namespace or function starts and performs language-aware flushing.
392

393
        Args:
394
            line (str): The annotated line detected.
395
            indent_level (int): The level of indentation detected.
396
            buffer (dict): Buffer for intermediate processing.
397
            state (dict): The state dictionary that holds info about current structure, last indentation level,
398
                function scope, and the snippet dicts (extracted blocks).
399
            code (str | Path): Raw code string or Path to code file.
400
            func_start (str, optional): Line corresponds to a function partial signature
401
            is_python_code (bool): Whether the code is Python.
402
        """
403
        is_namespace = bool(NAMESPACE_DECLARATION.match(line))
4✔
404
        func_count = sum(
4✔
405
            1 for line in state["curr_struct"] if line.func_partial_signature
406
        )
407
        is_nested = indent_level > state["block_indent_level"]
4✔
408

409
        if func_start:
4✔
410
            has_decorators = bool(buffer["META"])
4✔
411

412
            # We need to skip nesled functions or those that have subsequent decorators
413
            # because having nesled functions as their own block is clunky
414
            # and for functions with subsequent decorators are already handled
415
            if is_nested and func_count != 0:
4✔
416
                return
×
417

418
            if has_decorators and func_count == 0:
4✔
419
                state["block_indent_level"] = indent_level
×
420
                return
×
421

422
        if is_namespace and is_nested:
4✔
423
            return
4✔
424

425
        if is_namespace or func_start:
4✔
426
            # If it is a Python code, we can flush everything, else we won't flush the docstring yet
427
            # This helps including the docstring that is on top of block definition in the other languages
428
            if state["curr_struct"]:
4✔
429
                if is_python_code:
4✔
430
                    self._flush_snippet(
4✔
431
                        state["curr_struct"], state["snippet_dicts"], buffer
432
                    )
433
                else:
434
                    doc = buffer.pop("DOC", [])
4✔
435
                    self._flush_snippet(
4✔
436
                        state["curr_struct"], state["snippet_dicts"], buffer
437
                    )
438
                    buffer.clear()
4✔
439
                    buffer["doc"] = doc
4✔
440

441
            state["block_indent_level"] = indent_level
4✔
442

443
    def extract_code_structure(
4✔
444
        self,
445
        code: str,
446
        include_comments: bool,
447
        docstring_mode: str,
448
        is_python_code: bool = False,
449
    ) -> tuple[list[dict], tuple[int, ...]]:
450
        """
451
        Preprocess and parse code into individual snippet boxes.
452

453
        This function-first extraction identifies functions as primary units
454
        while implicitly handling other structures within the function context.
455

456
        Args:
457
            code (str): Raw code string.
458
            include_comments (bool): Whether to include comments in output.
459
            docstring_mode (Literal["summary", "all", "excluded"]): How to handle docstrings.
460
            is_python_code (bool): Whether the code is Python.
461

462
        Returns:
463
            tuple[list[dict], tuple[int, ...]]: A tuple containing the list of extracted code structure boxes and the line lengths.
464
        """
465
        if not code:
4✔
466
            return [], ()
×
467

468
        code, cumulative_lengths = self._preprocess(
4✔
469
            code, include_comments, docstring_mode
470
        )
471

472
        state = {
4✔
473
            "curr_struct": [],
474
            "block_indent_level": 0,
475
            "snippet_dicts": [],
476
        }
477
        buffer = defaultdict(list)
4✔
478

479
        for line_no, line in enumerate(code.splitlines(), start=1):
4✔
480
            indent_level = len(line) - len(line.lstrip())
4✔
481

482
            # Detect annotated lines
483
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
4✔
484
            if matched:
4✔
485
                self._handle_annotated_line(
4✔
486
                    line=line,
487
                    line_no=line_no,
488
                    matched=matched,
489
                    buffer=buffer,
490
                    state=state,
491
                )
492
                continue
4✔
493

494
            if buffer["STR"]:
4✔
495
                self._flush_snippet([], state["snippet_dicts"], buffer)
4✔
496

497
            # -- Manage block accumulation logic--
498

499
            func_start = FUNCTION_DECLARATION.match(line)
4✔
500
            func_start = func_start.group(0) if func_start else None
4✔
501

502
            if not state["curr_struct"]:  # Fresh block
4✔
503
                state["curr_struct"] = [
4✔
504
                    CodeLine(line_no, line, indent_level, func_start)
505
                ]
506
                state["block_indent_level"] = indent_level
4✔
507
                continue
4✔
508

509
            # Block start triggered by functions or namespaces indentification
510
            # You might think it is in the wrong place, but it isnt
511
            self._handle_block_start(
4✔
512
                line=line,
513
                indent_level=indent_level,
514
                buffer=buffer,
515
                state=state,
516
                code=code,
517
                func_start=func_start,
518
                is_python_code=is_python_code,
519
            )
520

521
            if (
4✔
522
                line.strip()
523
                and indent_level <= state["block_indent_level"]
524
                and not (OPENER.match(line) or CLOSER.match(line))
525
            ):  # Block end
526
                state["block_indent_level"] = indent_level
4✔
527
                self._flush_snippet(
4✔
528
                    state["curr_struct"], state["snippet_dicts"], buffer
529
                )
530

531
            state["curr_struct"].append(
4✔
532
                CodeLine(line_no, line, indent_level, func_start)
533
            )
534

535
        # Append last snippet
536
        if state["curr_struct"]:
4✔
537
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
4✔
538

539
        snippet_dicts = self._post_processing(state["snippet_dicts"])
4✔
540
        log_info(
4✔
541
            self.verbose, "Extracted {} structural blocks from code", len(snippet_dicts)
542
        )
543

544
        return snippet_dicts, cumulative_lengths
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc