• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 22270776479

22 Feb 2026 04:56AM UTC coverage: 87.005%. First build
22270776479

Pull #12

github

web-flow
Merge ec1528094 into 83dda3c2e
Pull Request #12: # v2.2.0: The Unification Edition

285 of 336 new or added lines in 24 files covered. (84.82%)

1406 of 1616 relevant lines covered (87.0%)

3.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/src/chunklet/code_chunker/_code_structure_extractor.py
1
"""
2
Internal module for extracting code structures from source code files.
3

4
Provides functionality to parse and analyze code syntax trees, identifying functions,
5
classes, namespaces, and other structural elements.
6
This module is used by CodeChunker to understand code structure before
7
splitting into chunks.
8
"""
9

10
from collections import defaultdict, namedtuple
4✔
11
from itertools import accumulate
4✔
12
from pathlib import Path
4✔
13

14
import regex as re
4✔
15

16
try:
4✔
17
    import defusedxml.ElementTree as ET
4✔
18
    from littletree import Node
4✔
19
except ImportError:
×
NEW
20
    Node, ET = None, None
×
21

22
from loguru import logger
4✔
23

24
from chunklet.code_chunker.patterns import (
4✔
25
    ALL_SINGLE_LINE_COMM,
26
    CLOSER,
27
    DOCSTRING_STYLE_ONE,
28
    DOCSTRING_STYLE_TWO,
29
    FULL_LINE_SINGLE_COMM,
30
    FUNCTION_DECLARATION,
31
    METADATA,
32
    MULTI_LINE_COMM,
33
    MULTI_LINE_STRING_ASSIGN,
34
    NAMESPACE_DECLARATION,
35
    OPENER,
36
)
37
from chunklet.common.validation import validate_input
4✔
38
from chunklet.common.logging_utils import log_info
4✔
39

40
CodeLine = namedtuple(
4✔
41
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
42
)
43

44

45
class CodeStructureExtractor:
4✔
46
    """Extracts structural units from source code.
47

48
    This class provides functionality to parse source code files and identify functions,
49
    classes, namespaces, and other structural elements using a language-agnostic approach.
50
    """
51

52
    @validate_input
4✔
53
    def __init__(self, verbose: bool = False):
4✔
54
        self.verbose = verbose
4✔
55

56
    def _replace_with_newlines(self, match: re.Match) -> str:
4✔
57
        """Replaces the matched content with an equivalent number of newlines."""
58
        matched_text = match.group(0)
4✔
59

60
        # To preserve the line count when replacing a multi-line block,
61
        # we need to replace N lines of content with N-1 newline characters.
62
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
63
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
4✔
64

65
        return "\n" * num_newlines
4✔
66

67
    def _annotate_block(self, tag: str, match: re.Match) -> str:
4✔
68
        """Prefix each line in a matched block with a tag for tracking.
69

70
        Args:
71
            tag (str): Tag identifier for the block type.
72
            match (re.Match): Regex match object for the block.
73

74
        Returns:
75
            str: Annotated block with tag prefixes.
76
        """
77
        lines = match.group(0).splitlines()
4✔
78
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
4✔
79

80
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
4✔
81
        """
82
        Extracts the first line from a block-style documentation string.
83

84
        Args:
85
            match (re.Match): Regex match object for the docstring with captured groups.
86

87
        Returns:
88
            str: The summarized docstring line.
89
        """
90
        # HACK: The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
91
        # which results in `None` values for the capturing groups that did not match.
92
        # This list comprehension filters out the `None` values to reliably extract
93
        # the matched content (indent, delimiters, and docstring text).
94
        groups = [g for g in match.groups() if g is not None]
4✔
95
        indent = groups[0]
4✔
96
        l_end = groups[1]
4✔
97
        doc = groups[2].strip()
4✔
98
        r_end = groups[3]
4✔
99

100
        first_line = ""
4✔
101
        for line in doc.splitlines():
4✔
102
            stripped_line = line.strip()
4✔
103
            if stripped_line:
4✔
104
                first_line = stripped_line
4✔
105
                break
4✔
106

107
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
4✔
108
        padding_count = len(match.group(0).splitlines()) - 1
4✔
109
        return summarized_line_content + "\n" * padding_count
4✔
110

111
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
4✔
112
        """
113
        Extracts a summary from line-prefixed documentation comments.
114

115
        Attempts to parse <summary> XML tags; falls back to the first meaningful ine if parsing fails.
116

117
        Args:
118
            match (re.Match): Regex match object for line-based docstring.
119

120
        Returns:
121
            str: The summarized docstring line(s).
122
        """
123
        if not ET:
4✔
124
            raise ImportError(
×
125
                "The 'defusedxml' library is not installed. "
126
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
127
                "with 'pip install 'chunklet-py[code]''"
128
            )
129

130
        indent = match.group(1)
4✔
131
        raw_doc = match.group(0)
4✔
132
        prefix = re.match(r"^\s*(//[/!]|%%|##)\s*", raw_doc).group(1)
4✔
133

134
        # Remove leading '///' '%%', '##' or '//!' and optional spaces at start of each line
135
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
4✔
136
        try:
4✔
137
            # Try parsing it as XML
138
            wrapped = f"<root>{clean_doc}</root>"
4✔
139
            root = ET.fromstring(wrapped)
4✔
140
            summary_elem = root.find("summary")
4✔
141
            if summary_elem is not None:
4✔
142
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
×
143
            else:
144
                raise ET.ParseError
4✔
145
        except ET.ParseError:
4✔
146
            # Fallback: first meaningful line in plain text
147
            summary = ""
4✔
148
            for line in clean_doc.splitlines():
4✔
149
                # Skip lines that contain *only tags* (with optional whitespace)
150
                stripped_line = line.strip()
4✔
151
                if stripped_line and not re.fullmatch(r"\s*<[^>]*>\s*", stripped_line):
4✔
152
                    summary = stripped_line
4✔
153
                    break
4✔
154

155
        # Construct the summarized docstring line
156
        summarized_line_content = "".join(
4✔
157
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
158
        ).lstrip()
159

160
        padding_count = (
4✔
161
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
162
        )
163

164
        return summarized_line_content + "\n" * padding_count
4✔
165

166
    def _preprocess(
4✔
167
        self, code: str, include_comments: bool, docstring_mode: str = "all"
168
    ) -> tuple[str, tuple[int, ...]]:
169
        """
170
        Preprocess the code before extraction.
171

172
        Processing steps:
173
          - Optionally remove comments
174
          - Replace docstrings according to mode
175
          - Annotate comments, docstrings, and annotations for later detection
176

177
        Args:
178
            code (str): Source code to preprocess.
179
            include_comments (bool): Whether to include comments in output.
180
            docstring_mode (str): How to handle docstrings.
181

182
        Returns:
183
            tuple[str, tuple[int, ...]]: Preprocessed code with annotations and a tuple of cumulative line lengths.
184
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
185
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
186
        """
187
        # Call at first to preserve span accurary befire any altering
188
        # Pad with 0 so cumulative_lengths[line_number - 1] == start_char_offset
189
        cumulative_lengths = (0,) + tuple(
4✔
190
            accumulate(len(line) for line in code.splitlines(keepends=True))
191
        )
192

193
        # Remove comments if not required
194
        if not include_comments:
4✔
195
            code = ALL_SINGLE_LINE_COMM.sub(
4✔
196
                lambda m: self._replace_with_newlines(m), code
197
            )
198
            code = MULTI_LINE_COMM.sub(lambda m: self._replace_with_newlines(m), code)
4✔
199

200
        # Process docstrings according to mode
201
        if docstring_mode == "summary":
4✔
202
            code = DOCSTRING_STYLE_ONE.sub(
4✔
203
                lambda m: self._summarize_docstring_style_one(m), code
204
            )
205
            code = DOCSTRING_STYLE_TWO.sub(
4✔
206
                lambda m: self._summarize_docstring_style_two(m), code
207
            )
208
        elif docstring_mode == "excluded":
4✔
209
            code = DOCSTRING_STYLE_ONE.sub(
4✔
210
                lambda m: self._replace_with_newlines(m), code
211
            )
212
            code = DOCSTRING_STYLE_TWO.sub(
4✔
213
                lambda m: self._replace_with_newlines(m), code
214
            )
215
        # Else "all": do nothing
216

217
        # List of all regex patterns with the tag to annotate them
218
        patterns_n_tags = [
4✔
219
            (MULTI_LINE_STRING_ASSIGN, "STR"),
220
            (FULL_LINE_SINGLE_COMM, "COMM"),
221
            (MULTI_LINE_COMM, "COMM"),
222
            (DOCSTRING_STYLE_ONE, "DOC"),
223
            (DOCSTRING_STYLE_TWO, "DOC"),
224
            (METADATA, "META"),
225
        ]
226

227
        # Apply _annotate_block to all matches for each pattern
228
        for pattern, tag in patterns_n_tags:
4✔
229
            code = pattern.sub(
4✔
230
                lambda match, tag=tag: self._annotate_block(tag, match), code
231
            )
232

233
        return code, cumulative_lengths
4✔
234

235
    def _post_processing(self, snippet_dicts: list[dict]):
4✔
236
        """
237
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
238

239
        Args:
240
            snippet_dicts (list[dict]): List of extracted code snippets.
241

242
        Returns:
243
            list[dict]: Snippets with attached namespace trees (as relations).
244
        """
245
        if not Node:
4✔
246
            raise ImportError(
×
247
                "The 'littletree' library is not installed. "
248
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
249
                "with 'pip install 'chunklet-py[code]''"
250
            )
251

252
        def _add_namespace_node(name, indent_level):
4✔
253
            new_node = Node(identifier=name)
4✔
254

255
            current_parent_node, _ = namespaces_stack[-1]
4✔
256
            current_parent_node.add_child(new_node)
4✔
257

258
            namespaces_stack.append((new_node, indent_level))
4✔
259

260
        # The root node will be 'global'
261
        tree_root = Node(identifier="global")
4✔
262

263
        # namespaces_stack: [ (node_reference, indent_level) ]
264
        namespaces_stack = [(tree_root, -1)]
4✔
265

266
        for snippet_dict in snippet_dicts:
4✔
267
            # Remove namespaces until we find the appropriate parent level
268
            while (
4✔
269
                namespaces_stack
270
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
271
            ):
272
                node_to_detach, _ = namespaces_stack.pop()
4✔
273
                if node_to_detach is not tree_root:
4✔
274
                    node_to_detach.detach()
4✔
275

276
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
4✔
277
            if matched:
4✔
278
                namespace_name = matched.group(1)
4✔
279
                _add_namespace_node(
4✔
280
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
281
                )
282

283
            if snippet_dict.get("func_partial_signature"):
4✔
284
                _add_namespace_node(
4✔
285
                    name=snippet_dict["func_partial_signature"].strip(),
286
                    indent_level=snippet_dict["indent_level"],
287
                )
288

289
            # Attach the current tree structure as relation
290
            snippet_dict["relations"] = list(tree_root.to_relations())
4✔
291

292
        # Normalize newlines in chunk in place
293
        for snippet_dict in snippet_dicts:
4✔
294
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
4✔
295

296
        return snippet_dicts
4✔
297

298
    def _flush_snippet(
4✔
299
        self,
300
        curr_struct: list[CodeLine],
301
        snippet_dicts: list[dict],
302
        buffer: dict[str, list],
303
    ) -> None:
304
        """
305
        Consolidate the current structure and any buffered content into a Box and append it to snippet_boxes.
306

307
        It automatically flushs the buffer.
308

309
        Args:
310
            curr_struct (list[tuple]): Accumulated code lines and metadata,
311
                where each element is a tuple containing:
312
                (line_number, line_content, indent_level, func_partial_signature).
313
            snippet_boxes (list[Box]): The list to which the newly created Box will be appended.
314
            buffer (dict[str, list]): Buffer for intermediate processing (default: empty list).
315
        """
316
        if not (curr_struct or buffer):
4✔
317
            return
4✔
318

319
        candidates = [entry for v in buffer.values() for entry in v] + curr_struct
4✔
320
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
4✔
321

322
        if not sorted_candidates:
4✔
323
            return
4✔
324

325
        content = "\n".join(c.content for c in sorted_candidates)
4✔
326
        start_line = sorted_candidates[0].line_number
4✔
327
        end_line = sorted_candidates[-1].line_number
4✔
328
        indent_level = next((c.indent_level for c in curr_struct if c.content), 0)
4✔
329
        func_partial_signature = next(
4✔
330
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
331
            None,
332
        )
333

334
        snippet_dicts.append(
4✔
335
            {
336
                "content": content,
337
                "indent_level": indent_level,
338
                "start_line": start_line,
339
                "end_line": end_line,
340
                "func_partial_signature": func_partial_signature,
341
            }
342
        )
343
        curr_struct.clear()
4✔
344
        buffer.clear()
4✔
345

346
    def _handle_annotated_line(
4✔
347
        self,
348
        line: str,
349
        line_no: int,
350
        matched: re.Match,
351
        buffer: dict,
352
        state: dict,
353
    ):
354
        """
355
        Handle processing of annotated lines (comments, docstrings, etc.).
356

357
        It automatically flushes the current struct if the current line is the only decorator.
358

359
        Args:
360
            line (str): The annotated line detected.
361
            line_no (int): The number of the line based on one index.
362
            matched(re.Match): Regex match object for the annotated line.
363
            buffer (dict): Buffer for intermediate processing.
364
            state (dict): The state dictionary that holds info about current structure, last indentation level,
365
                function scope, and the snippet dicts (extracted blocks).
366
        """
367
        tag = matched.group(1)
4✔
368
        deannotated_line = (
4✔
369
            line[: matched.start()] + line[matched.end() :]
370
        )  # Slice off the annotation
371

372
        # Now we can calculate the proper indentation level
373
        indent_level = len(deannotated_line) - len(deannotated_line.lstrip())
4✔
374

375
        first_metadata = tag == "META" and not buffer["META"]
4✔
376
        consecutive_docstrings = (
4✔
377
            buffer["DOC"] and buffer["DOC"][-1].line_number == line_no - 1
378
        )
379

380
        if first_metadata or not consecutive_docstrings:
4✔
381
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
4✔
382

383
        buffer[tag].append(CodeLine(line_no, deannotated_line, indent_level, None))
4✔
384

385
    def _handle_block_start(
4✔
386
        self,
387
        line: str,
388
        indent_level: int,
389
        buffer: dict,
390
        state: dict,
391
        code: str | Path,
392
        func_start: str | None = None,
393
        is_python_code: bool = False,
394
    ):
395
        """
396
        Detects top-level namespace or function starts and performs language-aware flushing.
397

398
        Args:
399
            line (str): The annotated line detected.
400
            indent_level (int): The level of indentation detected.
401
            buffer (dict): Buffer for intermediate processing.
402
            state (dict): The state dictionary that holds info about current structure, last indentation level,
403
                function scope, and the snippet dicts (extracted blocks).
404
            code (str | Path): Raw code string or Path to code file.
405
            func_start (str, optional): Line corresponds to a function partial signature
406
            is_python_code (bool): Whether the code is Python.
407
        """
408
        is_namespace = bool(NAMESPACE_DECLARATION.match(line))
4✔
409
        func_count = sum(
4✔
410
            1 for line in state["curr_struct"] if line.func_partial_signature
411
        )
412
        is_nested = indent_level > state["block_indent_level"]
4✔
413

414
        if func_start:
4✔
415
            has_decorators = bool(buffer["META"])
4✔
416

417
            # We need to skip nesled functions or those that have subsequent decorators
418
            # because having nesled functions as their own block is clunky
419
            # and for functions with subsequent decorators are already handled
420
            if is_nested and func_count != 0:
4✔
NEW
421
                return
×
422

423
            if has_decorators and func_count == 0:
4✔
NEW
424
                state["block_indent_level"] = indent_level
×
NEW
425
                return
×
426

427
        if is_namespace and is_nested:
4✔
428
            return
4✔
429

430
        if is_namespace or func_start:
4✔
431
            # If it is a Python code, we can flush everything, else we won't flush the docstring yet
432
            # This helps including the docstring that is on top of block definition in the other languages
433
            if state["curr_struct"]:
4✔
434
                if is_python_code:
4✔
435
                    self._flush_snippet(
4✔
436
                        state["curr_struct"], state["snippet_dicts"], buffer
437
                    )
438
                else:
439
                    doc = buffer.pop("DOC", [])
4✔
440
                    self._flush_snippet(
4✔
441
                        state["curr_struct"], state["snippet_dicts"], buffer
442
                    )
443
                    buffer.clear()
4✔
444
                    buffer["doc"] = doc
4✔
445

446
            state["block_indent_level"] = indent_level
4✔
447

448
    def extract_code_structure(
4✔
449
        self,
450
        code: str,
451
        include_comments: bool,
452
        docstring_mode: str,
453
        is_python_code: bool = False,
454
    ) -> tuple[list[dict], tuple[int, ...]]:
455
        """
456
        Preprocess and parse code into individual snippet boxes.
457

458
        This function-first extraction identifies functions as primary units
459
        while implicitly handling other structures within the function context.
460

461
        Args:
462
            code (str): Raw code string.
463
            include_comments (bool): Whether to include comments in output.
464
            docstring_mode (Literal["summary", "all", "excluded"]): How to handle docstrings.
465
            is_python_code (bool): Whether the code is Python.
466

467
        Returns:
468
            tuple[list[dict], tuple[int, ...]]: A tuple containing the list of extracted code structure boxes and the line lengths.
469
        """
470
        if not code:
4✔
471
            return [], ()
×
472

473
        code, cumulative_lengths = self._preprocess(
4✔
474
            code, include_comments, docstring_mode
475
        )
476

477
        state = {
4✔
478
            "curr_struct": [],
479
            "block_indent_level": 0,
480
            "snippet_dicts": [],
481
        }
482
        buffer = defaultdict(list)
4✔
483

484
        for line_no, line in enumerate(code.splitlines(), start=1):
4✔
485
            indent_level = len(line) - len(line.lstrip())
4✔
486

487
            # Detect annotated lines
488
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
4✔
489
            if matched:
4✔
490
                self._handle_annotated_line(
4✔
491
                    line=line,
492
                    line_no=line_no,
493
                    matched=matched,
494
                    buffer=buffer,
495
                    state=state,
496
                )
497
                continue
4✔
498

499
            if buffer["STR"]:
4✔
500
                self._flush_snippet([], state["snippet_dicts"], buffer)
4✔
501

502
            # -- Manage block accumulation logic--
503

504
            func_start = FUNCTION_DECLARATION.match(line)
4✔
505
            func_start = func_start.group(0) if func_start else None
4✔
506

507
            if not state["curr_struct"]:  # Fresh block
4✔
508
                state["curr_struct"] = [
4✔
509
                    CodeLine(line_no, line, indent_level, func_start)
510
                ]
511
                state["block_indent_level"] = indent_level
4✔
512
                continue
4✔
513

514
            # Block start triggered by functions or namespaces indentification
515
            # You might think it is in the wrong place, but it isnt
516
            self._handle_block_start(
4✔
517
                line=line,
518
                indent_level=indent_level,
519
                buffer=buffer,
520
                state=state,
521
                code=code,
522
                func_start=func_start,
523
                is_python_code=is_python_code,
524
            )
525

526
            if (
4✔
527
                line.strip()
528
                and indent_level <= state["block_indent_level"]
529
                and not (OPENER.match(line) or CLOSER.match(line))
530
            ):  # Block end
531
                state["block_indent_level"] = indent_level
4✔
532
                self._flush_snippet(
4✔
533
                    state["curr_struct"], state["snippet_dicts"], buffer
534
                )
535

536
            state["curr_struct"].append(
4✔
537
                CodeLine(line_no, line, indent_level, func_start)
538
            )
539

540
        # Append last snippet
541
        if state["curr_struct"]:
4✔
542
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
4✔
543

544
        snippet_dicts = self._post_processing(state["snippet_dicts"])
4✔
545
        log_info(
4✔
546
            self.verbose, "Extracted {} structural blocks from code", len(snippet_dicts)
547
        )
548

549
        return snippet_dicts, cumulative_lengths
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc