• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 24798516591

22 Apr 2026 07:19PM UTC coverage: 90.606% (-0.2%) from 90.758%
24798516591

push

github

speedyk-005
refactor: remove redundant type hints from docstrings

- Strip (type) from Args/Returns where signature already has types
- Simplify Returns format to prose description
- Run clean_docstrings.py on src/chunklet (26 files)
- Add ExtractionState TypedDict for type safety (from earlier refactor)

1360 of 1501 relevant lines covered (90.61%)

3.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.98
/src/chunklet/code_chunker/_code_structure_extractor.py
1
"""
2
Internal module for extracting code structures from source code files.
3

4
Provides functionality to parse and analyze code syntax trees, identifying functions,
5
classes, namespaces, and other structural elements.
6
This module is used by CodeChunker to understand code structure before
7
splitting into chunks.
8
"""
9

10
import re
4✔
11
from collections import defaultdict, namedtuple
4✔
12
from itertools import accumulate
4✔
13
from pathlib import Path
4✔
14
from typing import TypedDict
4✔
15

16
try:
4✔
17
    import defusedxml.ElementTree as ET
4✔
18
    from littletree import Node
4✔
19
except ImportError:  # pragma: no cover
20
    Node, ET = None, None
21

22
from loguru import logger
4✔
23

24
from chunklet.code_chunker.patterns import (
4✔
25
    ALL_SINGLE_LINE_COMM,
26
    CLOSER,
27
    DOCSTRING_STYLE_ONE,
28
    DOCSTRING_STYLE_TWO,
29
    FULL_LINE_SINGLE_COMM,
30
    FUNCTION_DECLARATION,
31
    METADATA,
32
    MULTI_LINE_COMM,
33
    MULTI_LINE_STRING_ASSIGN,
34
    NAMESPACE_DECLARATION,
35
    OPENER,
36
)
37
from chunklet.common.validation import validate_input
4✔
38
from chunklet.common.logging_utils import log_info
4✔
39

40

41
CodeLine = namedtuple(
4✔
42
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
43
)
44

45

46
class ExtractionState(TypedDict):
4✔
47
    curr_struct: list[CodeLine]
3✔
48
    block_indent_level: int
3✔
49
    snippet_dicts: list[dict]
3✔
50

51

52
class CodeStructureExtractor:
4✔
53
    """Extracts structural units from source code.
54

55
    This class provides functionality to parse source code files and identify functions,
56
    classes, namespaces, and other structural elements using a language-agnostic approach.
57
    """
58

59
    @validate_input
4✔
60
    def __init__(self, verbose: bool = False):
4✔
61
        self.verbose = verbose
4✔
62

63
    def _replace_with_newlines(self, match: re.Match) -> str:
4✔
64
        """Replaces the matched content with an equivalent number of newlines."""
65
        matched_text = match.group(0)
4✔
66

67
        # To preserve the line count when replacing a multi-line block,
68
        # we need to replace N lines of content with N-1 newline characters.
69
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
70
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
4✔
71

72
        return "\n" * num_newlines
4✔
73

74
    def _annotate_block(self, tag: str, match: re.Match) -> str:
4✔
75
        """Prefix each line in a matched block with a tag for tracking.
76

77
        Args:
78
            tag: Tag identifier for the block type.
79
            match: Regex match object for the block.
80

81
        Returns:
82
            Annotated block with tag prefixes.
83
        """
84
        lines = match.group(0).splitlines()
4✔
85
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
4✔
86

87
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
4✔
88
        """
89
        Extracts the first line from a block-style documentation string.
90

91
        Args:
92
            match: Regex match object for the docstring with captured groups.
93

94
        Returns:
95
            The summarized docstring line.
96
        """
97
        # The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
98
        # which results in `None` values for the capturing groups that did not match.
99
        # filters out the `None` values to reliably extract while preserving the empty string indent
100
        groups = [g for g in match.groups() if g is not None]
4✔
101
        indent, l_end, doc, r_end = groups
4✔
102

103
        first_line = next(
4✔
104
            (line.strip() for line in doc.strip().splitlines() if line.strip()), ""
105
        )
106

107
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
4✔
108
        padding_count = len(match.group(0).splitlines()) - 1
4✔
109
        return summarized_line_content + "\n" * padding_count
4✔
110

111
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
4✔
112
        """
113
        Extracts a summary from line-prefixed documentation comments.
114

115
        Attempts to parse <summary> XML tags; falls back to the first meaningful ine if parsing fails.
116

117
        Args:
118
            match: Regex match object for line-based docstring.
119

120
        Returns:
121
            The summarized docstring line(s).
122
        """
123
        if not ET:
4✔
124
            raise ImportError(
×
125
                "The 'defusedxml' library is not installed. "
126
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
127
                "with 'pip install 'chunklet-py[code]''"
128
            )
129

130
        indent = match.group(1)
4✔
131
        raw_doc = match.group(0)
4✔
132
        prefix = re.match(r"^\s*(//[/!]|%%|##)\s*", raw_doc).group(1)
4✔
133

134
        # Remove leading '///' '%%', '##' or '//!' and optional spaces at start of each line
135
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
4✔
136
        try:
4✔
137
            # Try parsing it as XML
138
            wrapped = f"<root>{clean_doc}</root>"
4✔
139
            root = ET.fromstring(wrapped)
4✔
140
            summary_elem = root.find("summary")
4✔
141
            if summary_elem is not None:
4✔
142
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
×
143
            else:
144
                raise ET.ParseError
4✔
145
        except ET.ParseError:
4✔
146
            # Fallback: first meaningful line in plain text
147
            summary = ""
4✔
148
            for line in clean_doc.splitlines():
4✔
149
                # Skip lines that contain *only tags* (with optional whitespace)
150
                stripped_line = line.strip()
4✔
151
                if stripped_line and not re.fullmatch(r"\s*<[^>]*>\s*", stripped_line):
4✔
152
                    summary = stripped_line
4✔
153
                    break
4✔
154

155
        summarized_line_content = "".join(
4✔
156
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
157
        ).lstrip()
158

159
        padding_count = (
4✔
160
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
161
        )
162

163
        return summarized_line_content + "\n" * padding_count
4✔
164

165
    def _preprocess(
4✔
166
        self, code: str, include_comments: bool, docstring_mode: str = "all"
167
    ) -> tuple[str, tuple[int, ...]]:
168
        """
169
        Preprocess the code before extraction.
170

171
        Processing steps:
172
          - Optionally remove comments
173
          - Replace docstrings according to mode
174
          - Annotate comments, docstrings, and annotations for later detection
175

176
        Args:
177
            code: Source code to preprocess.
178
            include_comments: Whether to include comments in output.
179
            docstring_mode: How to handle docstrings.
180

181
        Returns:
182
            Preprocessed code with annotations and a tuple of cumulative line lengths.
183
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
184
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
185
        """
186
        # Call at first to preserve span accurary befire any altering
187
        # Pad with 0 so cumulative_lengths[line_number - 1] == start_char_offset
188
        cumulative_lengths = (0,) + tuple(
4✔
189
            accumulate(len(line) for line in code.splitlines(keepends=True))
190
        )
191

192
        # Remove comments if not required
193
        if not include_comments:
4✔
194
            code = ALL_SINGLE_LINE_COMM.sub(
4✔
195
                lambda m: self._replace_with_newlines(m), code
196
            )
197
            code = MULTI_LINE_COMM.sub(lambda m: self._replace_with_newlines(m), code)
4✔
198

199
        # Process docstrings according to mode
200
        if docstring_mode == "summary":
4✔
201
            code = DOCSTRING_STYLE_ONE.sub(
4✔
202
                lambda m: self._summarize_docstring_style_one(m), code
203
            )
204
            code = DOCSTRING_STYLE_TWO.sub(
4✔
205
                lambda m: self._summarize_docstring_style_two(m), code
206
            )
207
        elif docstring_mode == "excluded":
4✔
208
            code = DOCSTRING_STYLE_ONE.sub(
4✔
209
                lambda m: self._replace_with_newlines(m), code
210
            )
211
            code = DOCSTRING_STYLE_TWO.sub(
4✔
212
                lambda m: self._replace_with_newlines(m), code
213
            )
214
        # Else "all": do nothing
215

216
        # List of all regex patterns with the tag to annotate them
217
        patterns_n_tags = [
4✔
218
            (MULTI_LINE_STRING_ASSIGN, "STR"),
219
            (FULL_LINE_SINGLE_COMM, "COMM"),
220
            (MULTI_LINE_COMM, "COMM"),
221
            (DOCSTRING_STYLE_ONE, "DOC"),
222
            (DOCSTRING_STYLE_TWO, "DOC"),
223
            (METADATA, "META"),
224
        ]
225

226
        # Annotate the code
227
        for pattern, tag in patterns_n_tags:
4✔
228
            code = pattern.sub(
4✔
229
                lambda match, tag=tag: self._annotate_block(tag, match), code
230
            )
231

232
        return code, cumulative_lengths
4✔
233

234
    def _post_processing(self, snippet_dicts: list[dict]):
4✔
235
        """
236
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
237

238
        Args:
239
            snippet_dicts: List of extracted code snippets.
240

241
        Returns:
242
            Snippets with attached namespace trees (as relations).
243
        """
244
        if not Node:
4✔
245
            raise ImportError(
×
246
                "The 'littletree' library is not installed. "
247
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
248
                "with 'pip install 'chunklet-py[code]''"
249
            )
250

251
        def _add_namespace_node(name, indent_level):
4✔
252
            new_node = Node(identifier=name)
4✔
253

254
            current_parent_node, _ = namespaces_stack[-1]
4✔
255
            current_parent_node.add_child(new_node)
4✔
256

257
            namespaces_stack.append((new_node, indent_level))
4✔
258

259
        tree_root = Node(identifier="global")
4✔
260
        namespaces_stack = [(tree_root, -1)]  # [ (node_reference, indent_level) ]
4✔
261

262
        for snippet_dict in snippet_dicts:
4✔
263
            # Remove namespaces until we find the appropriate parent level
264
            while (
4✔
265
                namespaces_stack
266
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
267
            ):
268
                node_to_detach, _ = namespaces_stack.pop()
4✔
269
                if node_to_detach is not tree_root:
4✔
270
                    node_to_detach.detach()
4✔
271

272
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
4✔
273
            if matched:
4✔
274
                namespace_name = matched.group(1)
4✔
275
                _add_namespace_node(
4✔
276
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
277
                )
278

279
            if snippet_dict.get("func_partial_signature"):
4✔
280
                _add_namespace_node(
4✔
281
                    name=snippet_dict["func_partial_signature"].strip(),
282
                    indent_level=snippet_dict["indent_level"],
283
                )
284

285
            # Attach the current tree structure as relation
286
            snippet_dict["relations"] = list(tree_root.to_relations())
4✔
287

288
            # Normalize newlines in chunk in place
289
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
4✔
290

291
        return snippet_dicts
4✔
292

293
    def _flush_snippet(
4✔
294
        self,
295
        curr_struct: list[CodeLine],
296
        snippet_dicts: list[dict],
297
        annotated_lines_buffer: dict[str, list],
298
    ) -> None:
299
        """
300
        Consolidate the current structure and any annotated_lines_buffered content into a DotDict and append it to snippets.
301

302
        It automatically flushs the annotated_lines_buffer.
303

304
        Args:
305
            curr_struct: Accumulated code lines and metadata,
306
                where each element is a tuple containing:
307
                (line_number, line_content, indent_level, func_partial_signature).
308
            snippets: The list to which the newly created DotDict will be appended.
309
            annotated_lines_buffer: Buffer for intermediate processing (default: empty list).
310
        """
311
        if not (curr_struct or annotated_lines_buffer):
4✔
312
            return
4✔
313

314
        candidates = [entry for v in annotated_lines_buffer.values() for entry in v] + curr_struct
4✔
315
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
4✔
316

317
        if not sorted_candidates:
4✔
318
            return
4✔
319

320
        content = "\n".join(c.content for c in sorted_candidates)
4✔
321
        start_line = sorted_candidates[0].line_number
4✔
322
        end_line = sorted_candidates[-1].line_number
4✔
323
        indent_level = next((c.indent_level for c in curr_struct if c.content), 0)
4✔
324
        func_partial_signature = next(
4✔
325
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
326
            None,
327
        )
328

329
        snippet_dicts.append(
4✔
330
            {
331
                "content": content,
332
                "indent_level": indent_level,
333
                "start_line": start_line,
334
                "end_line": end_line,
335
                "func_partial_signature": func_partial_signature,
336
            }
337
        )
338
        curr_struct.clear()
4✔
339
        annotated_lines_buffer.clear()
4✔
340

341
    def _handle_annotated_line(
4✔
342
        self,
343
        line: str,
344
        line_no: int,
345
        matched: re.Match,
346
        annotated_lines_buffer: dict[str, list],
347
        state: ExtractionState,
348
    ):
349
        """
350
        Handle processing of annotated lines (comments, docstrings, etc.).
351

352
        It automatically flushes the current struct if the current line is the only decorator.
353

354
        Args:
355
            line: The annotated line detected.
356
            line_no: The number of the line based on one index.
357
            matched(re.Match): Regex match object for the annotated line.
358
            annotated_lines_buffer: Buffer for intermediate processing.
359
            state: The state dictionary that holds info about current structure,
360
                last indentation level, function scope, and the snippet dicts (extracted blocks).
361
        """
362
        tag = matched.group(1)
4✔
363
        deannotated_line = (
4✔
364
            line[: matched.start()] + line[matched.end() :]
365
        )  # Slice off the annotation
366

367
        indent_level = len(deannotated_line) - len(deannotated_line.lstrip())
4✔
368
        first_metadata = tag == "META" and not annotated_lines_buffer["META"]
4✔
369
        consecutive_docstrings = (
4✔
370
            annotated_lines_buffer["DOC"] and annotated_lines_buffer["DOC"][-1].line_number == line_no - 1
371
        )
372

373
        if first_metadata or not consecutive_docstrings:
4✔
374
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], annotated_lines_buffer)
4✔
375

376
        annotated_lines_buffer[tag].append(CodeLine(line_no, deannotated_line, indent_level, None))
4✔
377

378
    def _handle_block_start(
4✔
379
        self,
380
        line: str,
381
        indent_level: int,
382
        annotated_lines_buffer: dict[str, list],
383
        state: ExtractionState,
384
        code: str | Path,
385
        func_start: str | None = None,
386
        is_python_code: bool = False,
387
    ):
388
        """
389
        Detects top-level namespace or function starts and performs language-aware flushing.
390

391
        Args:
392
            line: The annotated line detected.
393
            indent_level: The level of indentation detected.
394
            annotated_lines_buffer: Buffer for intermediate processing.
395
            state: The state dictionary that holds info about current structure,
396
                last indentation level, function scope, and the snippet dicts (extracted blocks).
397
            code: Raw code string or Path to code file.
398
            func_start: Line corresponds to a function partial signature
399
            is_python_code: Whether the code is Python.
400
        """
401
        is_namespace = bool(NAMESPACE_DECLARATION.match(line))
4✔
402
        func_count = sum(
4✔
403
            1 for line in state["curr_struct"] if line.func_partial_signature
404
        )
405
        is_nested = indent_level > state["block_indent_level"]
4✔
406

407
        if func_start:
4✔
408
            has_decorators = bool(annotated_lines_buffer["META"])
4✔
409

410
            # We need to skip nesled functions or those that have subsequent decorators
411
            # because having nesled functions as their own block is clunky
412
            # and for functions with subsequent decorators are already handled
413
            if is_nested and func_count != 0:
4✔
414
                return
×
415

416
            if has_decorators and func_count == 0:
4✔
417
                state["block_indent_level"] = indent_level
×
418
                return
×
419

420
        if is_namespace and is_nested:
4✔
421
            return
4✔
422

423
        if is_namespace or func_start:
4✔
424
            # If it is a Python code, we can flush everything, else we won't flush the docstring yet
425
            # This helps including the docstring that is on top of block definition in the other languages
426
            if state["curr_struct"]:
4✔
427
                if is_python_code:
4✔
428
                    self._flush_snippet(
4✔
429
                        state["curr_struct"], state["snippet_dicts"], annotated_lines_buffer
430
                    )
431
                else:
432
                    doc = annotated_lines_buffer.pop("DOC", [])
4✔
433
                    self._flush_snippet(
4✔
434
                        state["curr_struct"], state["snippet_dicts"], annotated_lines_buffer
435
                    )
436
                    annotated_lines_buffer.clear()
4✔
437
                    annotated_lines_buffer["doc"] = doc
4✔
438

439
            state["block_indent_level"] = indent_level
4✔
440

441
    def extract_code_structure(
4✔
442
        self,
443
        code: str,
444
        include_comments: bool,
445
        docstring_mode: str,
446
        is_python_code: bool = False,
447
    ) -> tuple[list[dict], tuple[int, ...]]:
448
        """
449
        Preprocess and parse code into individual snippets.
450

451
        This function-first extraction identifies functions as primary units
452
        while implicitly handling other structures within the function context.
453

454
        Args:
455
            code: Raw code string.
456
            include_comments: Whether to include comments in output.
457
            docstring_mode: How to handle docstrings.
458
            is_python_code: Whether the code is Python.
459

460
        Returns:
461
            A tuple containing the list of extracted code snippets and the line lengths.
462
        """
463
        if not code:
4✔
464
            return [], ()
×
465

466
        code, cumulative_lengths = self._preprocess(
4✔
467
            code, include_comments, docstring_mode
468
        )
469

470
        state: ExtractionState = {
4✔
471
            "curr_struct": [],
472
            "block_indent_level": 0,
473
            "snippet_dicts": [],
474
        }
475
        annotated_lines_buffer = defaultdict(list)
4✔
476

477
        for line_no, line in enumerate(code.splitlines(), start=1):
4✔
478
            indent_level = len(line) - len(line.lstrip())
4✔
479

480
            # Detect annotated lines
481
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
4✔
482
            if matched:
4✔
483
                self._handle_annotated_line(
4✔
484
                    line=line,
485
                    line_no=line_no,
486
                    matched=matched,
487
                    annotated_lines_buffer=annotated_lines_buffer,
488
                    state=state,
489
                )
490
                continue
4✔
491

492
            if annotated_lines_buffer["STR"]:
4✔
493
                self._flush_snippet([], state["snippet_dicts"], annotated_lines_buffer)
4✔
494

495
            # -- Manage block accumulation logic--
496

497
            func_start = FUNCTION_DECLARATION.match(line)
4✔
498
            func_start = func_start.group(0) if func_start else None
4✔
499

500
            if not state["curr_struct"]:  # Fresh block
4✔
501
                state["curr_struct"] = [
4✔
502
                    CodeLine(line_no, line, indent_level, func_start)
503
                ]
504
                state["block_indent_level"] = indent_level
4✔
505
                continue
4✔
506

507
            # Block start triggered by functions or namespaces indentification
508
            self._handle_block_start(
4✔
509
                line=line,
510
                indent_level=indent_level,
511
                annotated_lines_buffer=annotated_lines_buffer,
512
                state=state,
513
                code=code,
514
                func_start=func_start,
515
                is_python_code=is_python_code,
516
            )
517

518
            if (
4✔
519
                line.strip()
520
                and indent_level <= state["block_indent_level"]
521
                and not (OPENER.match(line) or CLOSER.match(line))
522
            ):  # Block end
523
                state["block_indent_level"] = indent_level
4✔
524
                self._flush_snippet(
4✔
525
                    state["curr_struct"], state["snippet_dicts"], annotated_lines_buffer
526
                )
527

528
            state["curr_struct"].append(
4✔
529
                CodeLine(line_no, line, indent_level, func_start)
530
            )
531

532
        # Append last snippet
533
        if state["curr_struct"]:
4✔
534
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], annotated_lines_buffer)
4✔
535

536
        snippet_dicts = self._post_processing(state["snippet_dicts"])
4✔
537
        log_info(
4✔
538
            self.verbose, "Extracted {} structural blocks from code", len(snippet_dicts)
539
        )
540

541
        return snippet_dicts, cumulative_lengths
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc