• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 22647611846

03 Mar 2026 11:29PM UTC coverage: 90.659% (-0.01%) from 90.671%
22647611846

Pull #14

github

web-flow
Merge a4751ffc1 into 4c6b47c93
Pull Request #14: Refactor method ordering to follow Step-down Rule

379 of 398 new or added lines in 8 files covered. (95.23%)

2 existing lines in 2 files now uncovered.

1349 of 1488 relevant lines covered (90.66%)

4.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.05
/src/chunklet/code_chunker/_code_structure_extractor.py
1
"""
2
Internal module for extracting code structures from source code files.
3

4
Provides functionality to parse and analyze code syntax trees, identifying functions,
5
classes, namespaces, and other structural elements.
6
This module is used by CodeChunker to understand code structure before
7
splitting into chunks.
8
"""
9

10
from collections import defaultdict, namedtuple
5✔
11
from itertools import accumulate
5✔
12
from pathlib import Path
5✔
13

14
import regex as re
5✔
15

16
try:
5✔
17
    import defusedxml.ElementTree as ET
5✔
18
    from littletree import Node
5✔
19
except ImportError:  # pragma: no cover
20
    Node, ET = None, None
21

22

23
from chunklet.code_chunker.patterns import (
5✔
24
    ALL_SINGLE_LINE_COMM,
25
    CLOSER,
26
    DOCSTRING_STYLE_ONE,
27
    DOCSTRING_STYLE_TWO,
28
    FULL_LINE_SINGLE_COMM,
29
    FUNCTION_DECLARATION,
30
    METADATA,
31
    MULTI_LINE_COMM,
32
    MULTI_LINE_STRING_ASSIGN,
33
    NAMESPACE_DECLARATION,
34
    OPENER,
35
)
36
from chunklet.common.logging_utils import log_info
5✔
37
from chunklet.common.validation import validate_input
5✔
38

39
CodeLine = namedtuple(
5✔
40
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
41
)
42

43

44
class CodeStructureExtractor:
5✔
45
    """Extracts structural units from source code.
46

47
    This class provides functionality to parse source code files and identify functions,
48
    classes, namespaces, and other structural elements using a language-agnostic approach.
49
    """
50

51
    @validate_input
5✔
52
    def __init__(self, verbose: bool = False):
5✔
53
        self.verbose = verbose
5✔
54

55
    def extract_code_structure(
5✔
56
        self,
57
        code: str,
58
        include_comments: bool,
59
        docstring_mode: str,
60
        is_python_code: bool = False,
61
    ) -> tuple[list[dict], tuple[int, ...]]:
62
        """
63
        Preprocess and parse code into individual snippet boxes.
64

65
        This function-first extraction identifies functions as primary units
66
        while implicitly handling other structures within the function context.
67

68
        Args:
69
            code (str): Raw code string.
70
            include_comments (bool): Whether to include comments in output.
71
            docstring_mode (Literal["summary", "all", "excluded"]): How to handle docstrings.
72
            is_python_code (bool): Whether the code is Python.
73

74
        Returns:
75
            tuple[list[dict], tuple[int, ...]]: A tuple containing the list of extracted code structure boxes and the line lengths.
76
        """
77
        if not code:
5✔
NEW
78
            return [], ()
×
79

80
        code, cumulative_lengths = self._preprocess(
5✔
81
            code, include_comments, docstring_mode
82
        )
83

84
        state = {
5✔
85
            "curr_struct": [],
86
            "block_indent_level": 0,
87
            "snippet_dicts": [],
88
        }
89
        buffer = defaultdict(list)
5✔
90

91
        for line_no, line in enumerate(code.splitlines(), start=1):
5✔
92
            indent_level = len(line) - len(line.lstrip())
5✔
93

94
            # Detect annotated lines
95
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
5✔
96
            if matched:
5✔
97
                self._handle_annotated_line(
5✔
98
                    line=line,
99
                    line_no=line_no,
100
                    matched=matched,
101
                    buffer=buffer,
102
                    state=state,
103
                )
104
                continue
5✔
105

106
            if buffer["STR"]:
5✔
107
                self._flush_snippet([], state["snippet_dicts"], buffer)
5✔
108

109
            # -- Manage block accumulation logic--
110

111
            func_start = FUNCTION_DECLARATION.match(line)
5✔
112
            func_start = func_start.group(0) if func_start else None
5✔
113

114
            if not state["curr_struct"]:  # Fresh block
5✔
115
                state["curr_struct"] = [
5✔
116
                    CodeLine(line_no, line, indent_level, func_start)
117
                ]
118
                state["block_indent_level"] = indent_level
5✔
119
                continue
5✔
120

121
            # Block start triggered by functions or namespaces indentification
122
            # You might think it is in the wrong place, but it isnt
123
            self._handle_block_start(
5✔
124
                line=line,
125
                indent_level=indent_level,
126
                buffer=buffer,
127
                state=state,
128
                code=code,
129
                func_start=func_start,
130
                is_python_code=is_python_code,
131
            )
132

133
            if (
5✔
134
                line.strip()
135
                and indent_level <= state["block_indent_level"]
136
                and not (OPENER.match(line) or CLOSER.match(line))
137
            ):  # Block end
138
                state["block_indent_level"] = indent_level
5✔
139
                self._flush_snippet(
5✔
140
                    state["curr_struct"], state["snippet_dicts"], buffer
141
                )
142

143
            state["curr_struct"].append(
5✔
144
                CodeLine(line_no, line, indent_level, func_start)
145
            )
146

147
        # Append last snippet
148
        if state["curr_struct"]:
5✔
149
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
5✔
150

151
        snippet_dicts = self._post_processing(state["snippet_dicts"])
5✔
152
        log_info(
5✔
153
            self.verbose, "Extracted {} structural blocks from code", len(snippet_dicts)
154
        )
155

156
        return snippet_dicts, cumulative_lengths
5✔
157

158
    def _preprocess(
5✔
159
        self, code: str, include_comments: bool, docstring_mode: str = "all"
160
    ) -> tuple[str, tuple[int, ...]]:
161
        """
162
        Preprocess the code before extraction.
163

164
        Processing steps:
165
          - Optionally remove comments
166
          - Replace docstrings according to mode
167
          - Annotate comments, docstrings, and annotations for later detection
168

169
        Args:
170
            code (str): Source code to preprocess.
171
            include_comments (bool): Whether to include comments in output.
172
            docstring_mode (str): How to handle docstrings.
173

174
        Returns:
175
            tuple[str, tuple[int, ...]]: Preprocessed code with annotations and a tuple of cumulative line lengths.
176
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
177
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
178
        """
179
        # Call at first to preserve span accurary befire any altering
180
        # Pad with 0 so cumulative_lengths[line_number - 1] == start_char_offset
181
        cumulative_lengths = (0,) + tuple(
5✔
182
            accumulate(len(line) for line in code.splitlines(keepends=True))
183
        )
184

185
        # Remove comments if not required
186
        if not include_comments:
5✔
187
            code = ALL_SINGLE_LINE_COMM.sub(
5✔
188
                lambda m: self._replace_with_newlines(m), code
189
            )
190
            code = MULTI_LINE_COMM.sub(lambda m: self._replace_with_newlines(m), code)
5✔
191

192
        # Process docstrings according to mode
193
        if docstring_mode == "summary":
5✔
194
            code = DOCSTRING_STYLE_ONE.sub(
5✔
195
                lambda m: self._summarize_docstring_style_one(m), code
196
            )
197
            code = DOCSTRING_STYLE_TWO.sub(
5✔
198
                lambda m: self._summarize_docstring_style_two(m), code
199
            )
200
        elif docstring_mode == "excluded":
5✔
201
            code = DOCSTRING_STYLE_ONE.sub(
5✔
202
                lambda m: self._replace_with_newlines(m), code
203
            )
204
            code = DOCSTRING_STYLE_TWO.sub(
5✔
205
                lambda m: self._replace_with_newlines(m), code
206
            )
207
        # Else "all": do nothing
208

209
        # List of all regex patterns with the tag to annotate them
210
        patterns_n_tags = [
5✔
211
            (MULTI_LINE_STRING_ASSIGN, "STR"),
212
            (FULL_LINE_SINGLE_COMM, "COMM"),
213
            (MULTI_LINE_COMM, "COMM"),
214
            (DOCSTRING_STYLE_ONE, "DOC"),
215
            (DOCSTRING_STYLE_TWO, "DOC"),
216
            (METADATA, "META"),
217
        ]
218

219
        # Apply _annotate_block to all matches for each pattern
220
        for pattern, tag in patterns_n_tags:
5✔
221
            code = pattern.sub(
5✔
222
                lambda match, tag=tag: self._annotate_block(tag, match), code
223
            )
224

225
        return code, cumulative_lengths
5✔
226

227
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
5✔
228
        """
229
        Extracts the first line from a block-style documentation string.
230

231
        Args:
232
            match (re.Match): Regex match object for the docstring with captured groups.
233

234
        Returns:
235
            str: The summarized docstring line.
236
        """
237
        # HACK: The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
238
        # which results in `None` values for the capturing groups that did not match.
239
        # This list comprehension filters out the `None` values to reliably extract
240
        # the matched content (indent, delimiters, and docstring text).
241
        groups = [g for g in match.groups() if g is not None]
5✔
242
        indent = groups[0]
5✔
243
        l_end = groups[1]
5✔
244
        doc = groups[2].strip()
5✔
245
        r_end = groups[3]
5✔
246

247
        first_line = ""
5✔
248
        for line in doc.splitlines():
5✔
249
            stripped_line = line.strip()
5✔
250
            if stripped_line:
5✔
251
                first_line = stripped_line
5✔
252
                break
5✔
253

254
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
5✔
255
        padding_count = len(match.group(0).splitlines()) - 1
5✔
256
        return summarized_line_content + "\n" * padding_count
5✔
257

258
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
5✔
259
        """
260
        Extracts a summary from line-prefixed documentation comments.
261

262
        Attempts to parse <summary> XML tags; falls back to the first meaningful line if parsing fails.
263

264
        Args:
265
            match (re.Match): Regex match object for line-based docstring.
266

267
        Returns:
268
            str: The summarized docstring line(s).
269
        """
270
        if not ET:
5✔
UNCOV
271
            raise ImportError(
×
272
                "The 'defusedxml' library is not installed. "
273
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
274
                "with 'pip install 'chunklet-py[code]''"
275
            )
276

277
        indent = match.group(1)
5✔
278
        raw_doc = match.group(0)
5✔
279
        prefix = re.match(r"^\s*(//[/!]|%%|##)\s*", raw_doc).group(1)
5✔
280

281
        # Remove leading '///' '%%', '##' or '//!' and optional spaces at start of each line
282
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
5✔
283
        try:
5✔
284
            # Try parsing it as XML
285
            wrapped = f"<root>{clean_doc}</root>"
5✔
286
            root = ET.fromstring(wrapped)
5✔
287
            summary_elem = root.find("summary")
5✔
288
            if summary_elem is not None:
5✔
NEW
289
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
×
290
            else:
291
                raise ET.ParseError
5✔
292
        except ET.ParseError:
5✔
293
            # Fallback: first meaningful line in plain text
294
            summary = ""
5✔
295
            for line in clean_doc.splitlines():
5✔
296
                # Skip lines that contain *only tags* (with optional whitespace)
297
                stripped_line = line.strip()
5✔
298
                if stripped_line and not re.fullmatch(r"\s*<[^>]*>\s*", stripped_line):
5✔
299
                    summary = stripped_line
5✔
300
                    break
5✔
301

302
        # Construct the summarized docstring line
303
        summarized_line_content = "".join(
5✔
304
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
305
        ).lstrip()
306

307
        padding_count = (
5✔
308
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
309
        )
310

311
        return summarized_line_content + "\n" * padding_count
5✔
312

313
    def _handle_annotated_line(
5✔
314
        self,
315
        line: str,
316
        line_no: int,
317
        matched: re.Match,
318
        buffer: dict,
319
        state: dict,
320
    ):
321
        """
322
        Handle processing of annotated lines (comments, docstrings, etc.).
323

324
        It automatically flushes the current struct if the current line is the only decorator.
325

326
        Args:
327
            line (str): The annotated line detected.
328
            line_no (int): The number of the line based on one index.
329
            matched(re.Match): Regex match object for the annotated line.
330
            buffer (dict): Buffer for intermediate processing.
331
            state (dict): The state dictionary that holds info about current structure, last indentation level,
332
                function scope, and the snippet dicts (extracted blocks).
333
        """
334
        tag = matched.group(1)
5✔
335
        deannotated_line = (
5✔
336
            line[: matched.start()] + line[matched.end() :]
337
        )  # Slice off the annotation
338

339
        # Now we can calculate the proper indentation level
340
        indent_level = len(deannotated_line) - len(deannotated_line.lstrip())
5✔
341

342
        first_metadata = tag == "META" and not buffer["META"]
5✔
343
        consecutive_docstrings = (
5✔
344
            buffer["DOC"] and buffer["DOC"][-1].line_number == line_no - 1
345
        )
346

347
        if first_metadata or not consecutive_docstrings:
5✔
348
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
5✔
349

350
        buffer[tag].append(CodeLine(line_no, deannotated_line, indent_level, None))
5✔
351

352
    def _flush_snippet(
5✔
353
        self,
354
        curr_struct: list[CodeLine],
355
        snippet_dicts: list[dict],
356
        buffer: dict[str, list],
357
    ) -> None:
358
        """
359
        Consolidate the current structure and any buffered content into a Box and append it to snippet_boxes.
360

361
        It automatically flushs the buffer.
362

363
        Args:
364
            curr_struct (list[tuple]): Accumulated code lines and metadata,
365
                where each element is a tuple containing:
366
                (line_number, line_content, indent_level, func_partial_signature).
367
            snippet_boxes (list[Box]): The list to which the newly created Box will be appended.
368
            buffer (dict[str, list]): Buffer for intermediate processing (default: empty list).
369
        """
370
        if not (curr_struct or buffer):
5✔
371
            return
5✔
372

373
        candidates = [entry for v in buffer.values() for entry in v] + curr_struct
5✔
374
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
5✔
375

376
        if not sorted_candidates:
5✔
377
            return
5✔
378

379
        content = "\n".join(c.content for c in sorted_candidates)
5✔
380
        start_line = sorted_candidates[0].line_number
5✔
381
        end_line = sorted_candidates[-1].line_number
5✔
382
        indent_level = next((c.indent_level for c in curr_struct if c.content), 0)
5✔
383
        func_partial_signature = next(
5✔
384
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
385
            None,
386
        )
387

388
        snippet_dicts.append(
5✔
389
            {
390
                "content": content,
391
                "indent_level": indent_level,
392
                "start_line": start_line,
393
                "end_line": end_line,
394
                "func_partial_signature": func_partial_signature,
395
            }
396
        )
397
        curr_struct.clear()
5✔
398
        buffer.clear()
5✔
399

400
    def _handle_block_start(
5✔
401
        self,
402
        line: str,
403
        indent_level: int,
404
        buffer: dict,
405
        state: dict,
406
        code: str | Path,
407
        func_start: str | None = None,
408
        is_python_code: bool = False,
409
    ):
410
        """
411
        Detects top-level namespace or function starts and performs language-aware flushing.
412

413
        Args:
414
            line (str): The annotated line detected.
415
            indent_level (int): The level of indentation detected.
416
            buffer (dict): Buffer for intermediate processing.
417
            state (dict): The state dictionary that holds info about current structure, last indentation level,
418
                function scope, and the snippet dicts (extracted blocks).
419
            code (str | Path): Raw code string or Path to code file.
420
            func_start (str, optional): Line corresponds to a function partial signature
421
            is_python_code (bool): Whether the code is Python.
422
        """
423
        is_namespace = bool(NAMESPACE_DECLARATION.match(line))
5✔
424
        func_count = sum(
5✔
425
            1 for line in state["curr_struct"] if line.func_partial_signature
426
        )
427
        is_nested = indent_level > state["block_indent_level"]
5✔
428

429
        if func_start:
5✔
430
            has_decorators = bool(buffer["META"])
5✔
431

432
            # We need to skip nesled functions or those that have subsequent decorators
433
            # because having nesled functions as their own block is clunky
434
            # and for functions with subsequent decorators are already handled
435
            if is_nested and func_count != 0:
5✔
436
                return
×
437

438
            if has_decorators and func_count == 0:
5✔
439
                state["block_indent_level"] = indent_level
×
440
                return
×
441

442
        if is_namespace and is_nested:
5✔
443
            return
5✔
444

445
        if is_namespace or func_start:
5✔
446
            # If it is a Python code, we can flush everything, else we won't flush the docstring yet
447
            # This helps including the docstring that is on top of block definition in the other languages
448
            if state["curr_struct"]:
5✔
449
                if is_python_code:
5✔
450
                    self._flush_snippet(
5✔
451
                        state["curr_struct"], state["snippet_dicts"], buffer
452
                    )
453
                else:
454
                    doc = buffer.pop("DOC", [])
5✔
455
                    self._flush_snippet(
5✔
456
                        state["curr_struct"], state["snippet_dicts"], buffer
457
                    )
458
                    buffer.clear()
5✔
459
                    buffer["doc"] = doc
5✔
460

461
            state["block_indent_level"] = indent_level
5✔
462

463
    def _post_processing(self, snippet_dicts: list[dict]):
5✔
464
        """
465
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
466

467
        Args:
468
            snippet_dicts (list[dict]): List of extracted code snippets.
469

470
        Returns:
471
            list[dict]: Snippets with attached namespace trees (as relations).
472
        """
473
        if not Node:
5✔
NEW
474
            raise ImportError(
×
475
                "The 'littletree' library is not installed. "
476
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
477
                "with 'pip install 'chunklet-py[code]''"
478
            )
479

480
        def _add_namespace_node(name, indent_level):
5✔
481
            new_node = Node(identifier=name)
5✔
482

483
            current_parent_node, _ = namespaces_stack[-1]
5✔
484
            current_parent_node.add_child(new_node)
5✔
485

486
            namespaces_stack.append((new_node, indent_level))
5✔
487

488
        # The root node will be 'global'
489
        tree_root = Node(identifier="global")
5✔
490

491
        # namespaces_stack: [ (node_reference, indent_level) ]
492
        namespaces_stack = [(tree_root, -1)]
5✔
493

494
        for snippet_dict in snippet_dicts:
5✔
495
            # Remove namespaces until we find the appropriate parent level
496
            while (
5✔
497
                namespaces_stack
498
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
499
            ):
500
                node_to_detach, _ = namespaces_stack.pop()
5✔
501
                if node_to_detach is not tree_root:
5✔
502
                    node_to_detach.detach()
5✔
503

504
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
5✔
505
            if matched:
5✔
506
                namespace_name = matched.group(1)
5✔
507
                _add_namespace_node(
5✔
508
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
509
                )
510

511
            if snippet_dict.get("func_partial_signature"):
5✔
512
                _add_namespace_node(
5✔
513
                    name=snippet_dict["func_partial_signature"].strip(),
514
                    indent_level=snippet_dict["indent_level"],
515
                )
516

517
            # Attach the current tree structure as relation
518
            snippet_dict["relations"] = list(tree_root.to_relations())
5✔
519

520
        # Normalize newlines in chunk in place
521
        for snippet_dict in snippet_dicts:
5✔
522
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
5✔
523

524
        return snippet_dicts
5✔
525

526
    def _replace_with_newlines(self, match: re.Match) -> str:
5✔
527
        """Replaces the matched content with an equivalent number of newlines."""
528
        matched_text = match.group(0)
5✔
529

530
        # To preserve the line count when replacing a multi-line block,
531
        # we need to replace N lines of content with N-1 newline characters.
532
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
533
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
5✔
534

535
        return "\n" * num_newlines
5✔
536

537
    def _annotate_block(self, tag: str, match: re.Match) -> str:
5✔
538
        """Prefix each line in a matched block with a tag for tracking.
539

540
        Args:
541
            tag (str): Tag identifier for the block type.
542
            match (re.Match): Regex match object for the block.
543

544
        Returns:
545
            str: Annotated block with tag prefixes.
546
        """
547
        lines = match.group(0).splitlines()
5✔
548
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc