• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

speedyk-005 / chunklet-py / 20378511984

19 Dec 2025 06:09PM UTC coverage: 86.588% (+4.8%) from 81.75%
20378511984

Pull #7

github

web-flow
Merge 81717401a into aeb37fd6a
Pull Request #7: Merge develop branch to main

464 of 550 new or added lines in 17 files covered. (84.36%)

1 existing line in 1 file now uncovered.

1317 of 1521 relevant lines covered (86.59%)

4.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.81
/src/chunklet/code_chunker/_code_structure_extractor.py
1
"""
2
Code Structure Extractor
3

4
Internal module for extracting code structures from source code.
5
Split from CodeChunker for modularity.
6
"""
7

8
from pathlib import Path
5✔
9
from itertools import accumulate
5✔
10
import regex as re
5✔
11
from collections import defaultdict, namedtuple
5✔
12

13
try:
5✔
14
    from charset_normalizer import from_path
5✔
15
    from littletree import Node
5✔
16
    import defusedxml.ElementTree as ET
5✔
NEW
17
except ImportError:
×
NEW
18
    from_path, Node, ET = None, None, None
×
19

20
from loguru import logger
5✔
21

22
from chunklet.code_chunker.patterns import (
5✔
23
    SINGLE_LINE_COMMENT,
24
    MULTI_LINE_COMMENT,
25
    DOCSTRING_STYLE_ONE,
26
    DOCSTRING_STYLE_TWO,
27
    FUNCTION_DECLARATION,
28
    NAMESPACE_DECLARATION,
29
    METADATA,
30
    OPENER,
31
    CLOSURE,
32
)
33
from chunklet.code_chunker.helpers import is_binary_file, is_python_code
5✔
34
from chunklet.common.path_utils import is_path_like
5✔
35
from chunklet.common.validation import validate_input
5✔
36
from chunklet.exceptions import FileProcessingError
5✔
37

38

39
CodeLine = namedtuple(
5✔
40
    "CodeLine", ["line_number", "content", "indent_level", "func_partial_signature"]
41
)
42

43

44
class CodeStructureExtractor:
5✔
45
    """
46
    Internal class for extracting structural units from source code.
47
    """
48

49
    @validate_input
5✔
50
    def __init__(self, verbose: bool = False):
5✔
51
        self.verbose = verbose
5✔
52

53
    def _replace_with_newlines(self, match: re.Match) -> str:
5✔
54
        """Replaces the matched content with an equivalent number of newlines."""
55
        matched_text = match.group(0)
5✔
56

57
        # To preserve the line count when replacing a multi-line block,
58
        # we need to replace N lines of content with N-1 newline characters.
59
        # This is because N-1 newlines create N empty lines in the context of the surrounding text.
60
        num_newlines = max(0, len(matched_text.splitlines()) - 1)
5✔
61

62
        return "\n" * num_newlines
5✔
63

64
    def _read_source(self, source: str | Path) -> str:
5✔
65
        """Retrieve source code from file or treat input as raw string.
66

67
        Args:
68
            source (str | Path): File path or raw code string.
69

70
        Returns:
71
            str: Source code content.
72

73
        Raises:
74
            FileProcessingError: When file cannot be read or doesn't exist.
75
        """
76
        if from_path is None:
5✔
NEW
77
            raise ImportError(
×
78
                "The 'charset-normalizer' library is not installed. "
79
                "Please install it with 'pip install charset-normalizer>=3.4.0' "
80
                "or install the code processing extras with 'pip install chunklet-py[code]'"
81
            )
82

83
        if isinstance(source, Path) or is_path_like(source):
5✔
84
            path = Path(source)
5✔
85
            if not path.exists():
5✔
86
                raise FileProcessingError(f"File does not exist: {path}")
5✔
87
            if is_binary_file(path):
5✔
88
                raise FileProcessingError(f"Binary file not supported: {path}")
5✔
89

90
            match = from_path(str(path)).best()
5✔
91
            content = str(match) if match else ""
5✔
92
            if self.verbose:
5✔
NEW
93
                logger.info(
×
94
                    "Successfully read %d characters from {} using charset detection",
95
                    len(content),
96
                    path,
97
                )
98
            return content
5✔
99
        return source
5✔
100

101
    def _annotate_block(self, tag: str, match: re.Match) -> str:
5✔
102
        """Prefix each line in a matched block with a tag for tracking.
103

104
        Args:
105
            tag (str): Tag identifier for the block type.
106
            match (re.Match): Regex match object for the block.
107

108
        Returns:
109
            str: Annotated block with tag prefixes.
110
        """
111
        lines = match.group(0).splitlines()
5✔
112
        return "\n".join(f"(-- {tag} -->) {line}" for line in lines)
5✔
113

114
    def _summarize_docstring_style_one(self, match: re.Match) -> str:
5✔
115
        """
116
        Extracts the first line from a block-style documentation string.
117

118
        Args:
119
            match (re.Match): Regex match object for the docstring with captured groups.
120

121
        Returns:
122
            str: The summarized docstring line.
123
        """
124
        # HACK: The `DOCSTRING_STYLE_ONE` regex contains multiple alternative patterns,
125
        # which results in `None` values for the capturing groups that did not match.
126
        # This list comprehension filters out the `None` values to reliably extract
127
        # the matched content (indent, delimiters, and docstring text).
128
        groups = [g for g in match.groups() if g is not None]
5✔
129
        indent = groups[0]
5✔
130
        l_end = groups[1]
5✔
131
        doc = groups[2].strip()
5✔
132
        r_end = groups[3]
5✔
133

134
        first_line = ""
5✔
135
        for line in doc.splitlines():
5✔
136
            stripped_line = line.strip()
5✔
137
            if stripped_line:
5✔
138
                first_line = stripped_line
5✔
139
                break
5✔
140

141
        summarized_line_content = f"{indent}{l_end}{first_line}{r_end}".strip()
5✔
142
        padding_count = len(match.group(0).splitlines()) - 1
5✔
143
        return summarized_line_content + "\n" * padding_count
5✔
144

145
    def _summarize_docstring_style_two(self, match: re.Match) -> str:
5✔
146
        """
147
        Extracts a summary from line-prefixed documentation comments.
148

149
        Attempts to parse <summary> XML tags; falls back to the first meaningful ine if parsing fails.
150

151
        Args:
152
            match (re.Match): Regex match object for line-based docstring.
153

154
        Returns:
155
            str: The summarized docstring line(s).
156
        """
157
        if not ET:
5✔
NEW
158
            raise ImportError(
×
159
                "The 'defusedxml' library is not installed. "
160
                "Please install it with 'pip install 'defusedxml>=0.7.1'' or install the code processing extras "
161
                "with 'pip install 'chunklet-py[code]''"
162
            )
163

164
        indent = match.group(1)
5✔
165
        raw_doc = match.group(0)
5✔
166
        prefix = re.match(r"^\s*(//[/!])\s*", raw_doc).group(1)
5✔
167

168
        # Remove leading '///' or '//!' and optional spaces at start of each line
169
        clean_doc = re.sub(rf"(?m)^\s*{prefix}\s*", "", raw_doc)
5✔
170
        try:
5✔
171
            # Try parsing it as XML
172
            wrapped = f"<root>{clean_doc}</root>"
5✔
173
            root = ET.fromstring(wrapped)
5✔
174
            summary_elem = root.find("summary")
5✔
175
            if summary_elem is not None:
5✔
176
                summary = ET.tostring(summary_elem, encoding="unicode").strip("\n")
5✔
177
            else:
NEW
178
                raise ET.ParseError
×
NEW
179
        except ET.ParseError:
×
180
            # Fallback: first meaningful line in plain text
NEW
181
            summary = ""
×
NEW
182
            for line in clean_doc.splitlines():
×
183
                # Skip lines that contain *only tags* (with optional whitespace)
NEW
184
                stripped_line = line.strip()
×
NEW
185
                if stripped_line and not re.fullmatch(r"\s*<[^>]*>\s*", stripped_line):
×
NEW
186
                    summary = stripped_line
×
NEW
187
                    break
×
188

189
        # Construct the summarized docstring line
190
        summarized_line_content = "".join(
5✔
191
            f"{indent}{prefix} {line}" for line in summary.splitlines() if line.strip()
192
        ).lstrip()
193

194
        padding_count = (
5✔
195
            len(raw_doc.splitlines()) - len(summarized_line_content.splitlines()) - 1
196
        )
197

198
        return summarized_line_content + "\n" * padding_count
5✔
199

200
    def _preprocess(
5✔
201
        self, code: str, include_comments: bool, docstring_mode: str = "all"
202
    ) -> tuple[str, tuple[int, ...]]:
203
        """
204
        Preprocess the code before extraction.
205

206
        Processing steps:
207
          - Optionally remove comments
208
          - Replace docstrings according to mode
209
          - Annotate comments, docstrings, and annotations for later detection
210

211
        Args:
212
            code (str): Source code to preprocess.
213
            include_comments (bool): Whether to include comments in output.
214
            docstring_mode (str): How to handle docstrings.
215

216
        Returns:
217
            tuple[str, tuple[int, ...]]: Preprocessed code with annotations and a tuple of cumulative line lengths.
218
                The `cumulative_lengths` are pre-calculated on the original code because altering the code
219
                (e.g., via removal, summary, or annotations) would cause character counts to vary.
220
        """
221
        # Call at first before any code altering
222
        cumulative_lengths = tuple(
5✔
223
            accumulate(len(line) for line in code.splitlines(keepends=True))
224
        )
225

226
        # Remove comments if not required
227
        if not include_comments:
5✔
228
            code = SINGLE_LINE_COMMENT.sub(
5✔
229
                lambda m: self._replace_with_newlines(m), code
230
            )
231
            code = MULTI_LINE_COMMENT.sub(
5✔
232
                lambda m: self._replace_with_newlines(m), code
233
            )
234

235
        # Process docstrings according to mode
236
        if docstring_mode == "summary":
5✔
237
            code = DOCSTRING_STYLE_ONE.sub(
5✔
238
                lambda m: self._summarize_docstring_style_one(m), code
239
            )
240
            code = DOCSTRING_STYLE_TWO.sub(
5✔
241
                lambda m: self._summarize_docstring_style_two(m), code
242
            )
243
        elif docstring_mode == "excluded":
5✔
244
            code = DOCSTRING_STYLE_ONE.sub(
5✔
245
                lambda m: self._replace_with_newlines(m), code
246
            )
247
            code = DOCSTRING_STYLE_TWO.sub(
5✔
248
                lambda m: self._replace_with_newlines(m), code
249
            )
250
        # Else "all": do nothing
251

252
        # List of all regex patterns with the tag to annotate them
253
        patterns_n_tags = [
5✔
254
            (SINGLE_LINE_COMMENT, "COMM"),
255
            (MULTI_LINE_COMMENT, "COMM"),
256
            (DOCSTRING_STYLE_ONE, "DOC"),
257
            (DOCSTRING_STYLE_TWO, "DOC"),
258
            (METADATA, "META"),
259
        ]
260

261
        # Apply _annotate_block to all matches for each pattern
262
        for pattern, tag in patterns_n_tags:
5✔
263
            code = pattern.sub(
5✔
264
                lambda match, tag=tag: self._annotate_block(tag, match), code
265
            )
266

267
        return code, cumulative_lengths
5✔
268

269
    def _post_processing(self, snippet_dicts: list[dict]):
5✔
270
        """
271
        Attach a namespace tree structure (as a list of relations) to each snippet incrementally.
272

273
        Args:
274
            snippet_dicts (list[dict]): List of extracted code snippets.
275

276
        Returns:
277
            list[dict]: Snippets with attached namespace trees (as relations).
278
        """
279
        if not Node:
5✔
NEW
280
            raise ImportError(
×
281
                "The 'littletree' library is not installed. "
282
                "Please install it with 'pip install littletree>=0.8.4' or install the code processing extras "
283
                "with 'pip install 'chunklet-py[code]''"
284
            )
285

286
        def _add_namespace_node(name, indent_level):
5✔
287
            new_node = Node(identifier=name)
5✔
288

289
            current_parent_node, _ = namespaces_stack[-1]
5✔
290
            current_parent_node.add_child(new_node)
5✔
291

292
            namespaces_stack.append((new_node, indent_level))
5✔
293

294
        # The root node will be 'global'
295
        tree_root = Node(identifier="global")
5✔
296

297
        # namespaces_stack: [ (node_reference, indent_level) ]
298
        namespaces_stack = [(tree_root, -1)]
5✔
299

300
        for snippet_dict in snippet_dicts:
5✔
301
            # Remove namespaces until we find the appropriate parent level
302
            while (
5✔
303
                namespaces_stack
304
                and snippet_dict["indent_level"] <= namespaces_stack[-1][1]
305
            ):
306
                node_to_detach, _ = namespaces_stack.pop()
5✔
307
                if node_to_detach is not tree_root:
5✔
308
                    node_to_detach.detach()
5✔
309

310
            # Handle Namespace Declaration
311
            matched = NAMESPACE_DECLARATION.search(snippet_dict["content"])
5✔
312
            if matched:
5✔
313
                namespace_name = matched.group(1)
5✔
314
                _add_namespace_node(
5✔
315
                    name=namespace_name, indent_level=snippet_dict["indent_level"]
316
                )
317

318
            # Handle Partial Function Signature
319
            if snippet_dict.get("func_partial_signature"):
5✔
320
                _add_namespace_node(
5✔
321
                    name=snippet_dict["func_partial_signature"].strip(),
322
                    indent_level=snippet_dict["indent_level"],
323
                )
324

325
            # Attach the current tree structure as relations
326
            snippet_dict["relations"] = list(tree_root.to_relations())
5✔
327

328
        # Normalize newlines in chunk in place
329
        for snippet_dict in snippet_dicts:
5✔
330
            snippet_dict["content"] = re.sub(r"\n{3,}", "\n\n", snippet_dict["content"])
5✔
331

332
        return snippet_dicts
5✔
333

334
    def _flush_snippet(
5✔
335
        self,
336
        curr_struct: list[CodeLine],
337
        snippet_dicts: list[dict],
338
        buffer: dict[list],
339
    ) -> None:
340
        """
341
        Consolidate the current structure and any buffered content into a Box and append it to snippet_boxes.
342

343
        Args:
344
            curr_struct (list[tuple]): Accumulated code lines and metadata,
345
                where each element is a tuple containing:
346
                (line_number, line_content, indent_level, func_partial_signature).
347
            snippet_boxes (list[Box]): The list to which the newly created Box will be appended.
348
            buffer (dict[list]): Buffer for intermediate processing (default: empty list).
349
        """
350
        if not curr_struct:
5✔
NEW
351
            return
×
352

353
        candidates = [entry for v in buffer.values() for entry in v] + curr_struct
5✔
354
        sorted_candidates = sorted(candidates, key=lambda x: x.line_number)
5✔
355

356
        content = "\n".join(c.content for c in sorted_candidates)
5✔
357
        start_line = sorted_candidates[0].line_number
5✔
358
        end_line = sorted_candidates[-1].line_number
5✔
359
        indent_level = sorted_candidates[0].indent_level
5✔
360

361
        # Capture the first func_partial_signature
362
        match = next(
5✔
363
            (c.func_partial_signature for c in curr_struct if c.func_partial_signature),
364
            None,
365
        )
366

367
        snippet_dicts.append(
5✔
368
            {
369
                "content": content,
370
                "indent_level": indent_level,
371
                "start_line": start_line,
372
                "end_line": end_line,
373
                "func_partial_signature": match,
374
            }
375
        )
376
        curr_struct.clear()
5✔
377
        buffer.clear()
5✔
378

379
    def _handle_annotated_line(
5✔
380
        self,
381
        line: str,
382
        line_no: int,
383
        matched: re.Match,
384
        indent_level: int,
385
        buffer: dict[list],
386
        state: dict,
387
    ):
388
        """
389
        Handle processing of annotated lines (comments, docstrings, etc.).
390

391
        Args:
392
            line (str): The annotated line detected.
393
            line_no (int): The number of the line based on one index.
394
            indent_level (int):
395
            matched(re.Match): Regex match object for the annotated line.
396
            buffer (dict[list]): Buffer for intermediate processing.
397
            state (dict): The state dictionary that holds info about current structure, last indentation level,
398
                function scope, and the snippet dicts (extracted blocks).
399
        """
400
        # Flush if DOC buffered lines are not consecutive
401
        if (
5✔
402
            len(buffer["META"]) == 1  # First decorator/attribute
403
            or buffer["DOC"]
404
            and buffer["DOC"][-1].line_number != line_no - 1
405
        ):
406
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
5✔
407
            state["inside_func"] = False
5✔
408

409
        tag = matched.group(1)
5✔
410
        deannoted_line = (
5✔
411
            line[: matched.start()] + line[matched.end() :]
412
        )  # slice off the annotation
413
        buffer[tag].append(CodeLine(line_no, deannoted_line, indent_level, None))
5✔
414

415
    def _handle_block_start(
5✔
416
        self,
417
        line: str,
418
        indent_level: int,
419
        buffer: dict[list],
420
        state: dict,
421
        source: str | Path,
422
        func_start: str | None = None,
423
    ):
424
        """
425
        Detects top-level namespace or function starts and performs language-aware flushing.
426

427
        Args:
428
            line (str): The annotated line detected.
429
            indent_level (int):
430
            buffer (dict[list]): Buffer for intermediate processing.
431
            state (dict): The state dictionary that holds info about current structure, last indentation level,
432
                function scope, and the snippet dicts (extracted blocks).
433
            source (str | Path): Raw code string or Path to source file.
434
            func_start (str, optional): Line corresponds to a function partial signature
435
        """
436
        namespace_start = NAMESPACE_DECLARATION.match(line)
5✔
437

438
        if (
5✔
439
            namespace_start
440
            # If decorator/attribute exists in buffer, skip flushing
441
            or (func_start and not (state["inside_func"] or buffer["META"]))
442
        ):
443
            state["last_indent"] = indent_level
5✔
444

445
            # If it is a Python code, we can flush everything, else we won't flush the docstring yet
446
            # This helps including the docstring that is on top of block definition in the other languages
447
            if state["curr_struct"]:
5✔
448
                if is_python_code(source):
5✔
449
                    self._flush_snippet(
5✔
450
                        state["curr_struct"], state["snippet_dicts"], buffer
451
                    )
452
                else:
453
                    doc = buffer.pop("DOC", [])
5✔
454
                    self._flush_snippet(
5✔
455
                        state["curr_struct"], state["snippet_dicts"], buffer
456
                    )
457
                    buffer.clear()
5✔
458
                    buffer["doc"] = doc
5✔
459

460
        # Nestled blocks are not to be extracted
461
        if func_start:
5✔
462
            state["inside_func"] = True
5✔
463

464
    def extract_code_structure(
5✔
465
        self,
466
        source: str | Path,
467
        include_comments: bool,
468
        docstring_mode: str,
469
    ) -> tuple[list[dict], tuple[int, ...]]:
470
        """
471
        Preprocess and parse source into individual snippet boxes.
472

473
        This function-first extraction identifies functions as primary units
474
        while implicitly handling other structures within the function context.
475

476
        Args:
477
            source (str | Path): Raw code string or Path to source file.
478
            include_comments (bool): Whether to include comments in output.
479
            docstring_mode (Literal["summary", "all", "excluded"]): How to handle docstrings.
480

481
        Returns:
482
            tuple[list[dict], tuple[int, ...]]: A tuple containing the list of extracted code structure boxes and the line lengths.
483
        """
484
        source_code = self._read_source(source)
5✔
485
        if not source_code:
5✔
NEW
486
            return [], ()
×
487

488
        source_code, cumulative_lengths = self._preprocess(
5✔
489
            source_code, include_comments, docstring_mode
490
        )
491

492
        state = {
5✔
493
            "curr_struct": [],
494
            "last_indent": 0,
495
            "inside_func": False,
496
            "snippet_dicts": [],
497
        }
498
        buffer = defaultdict(list)
5✔
499

500
        for line_no, line in enumerate(source_code.splitlines(), start=1):
5✔
501
            indent_level = len(line) - len(line.lstrip())
5✔
502

503
            # Detect annotated lines
504
            matched = re.search(r"\(-- ([A-Z]+) -->\) ", line)
5✔
505
            if matched:
5✔
506
                self._handle_annotated_line(
5✔
507
                    line=line,
508
                    line_no=line_no,
509
                    indent_level=indent_level,
510
                    matched=matched,
511
                    buffer=buffer,
512
                    state=state,
513
                )
514
                continue
5✔
515

516
            # Manage block accumulation
517

518
            func_start = FUNCTION_DECLARATION.match(line)
5✔
519
            self._handle_block_start(
5✔
520
                line=line,
521
                indent_level=indent_level,
522
                buffer=buffer,
523
                state=state,
524
                source=source,
525
                func_start=func_start.group(0) if func_start else None,
526
            )
527

528
            if not state["curr_struct"]:  # Fresh block
5✔
529
                state["curr_struct"] = [
5✔
530
                    CodeLine(
531
                        line_no,
532
                        line,
533
                        indent_level,
534
                        func_start.group(0) if func_start else None,
535
                    )
536
                ]
537
                continue
5✔
538

539
            if (
5✔
540
                line.strip()
541
                and indent_level <= state["last_indent"]
542
                and not (OPENER.match(line) or CLOSURE.match(line))
543
            ):  # Block end
544
                self._flush_snippet(
5✔
545
                    state["curr_struct"], state["snippet_dicts"], buffer
546
                )
547
                state["curr_struct"] = [
5✔
548
                    CodeLine(
549
                        line_no,
550
                        line,
551
                        indent_level,
552
                        func_start.group(0) if func_start else None,
553
                    )
554
                ]
555
                state["last_indent"] = 0
5✔
556
                state["inside_func"] = False
5✔
557
            else:
558
                state["curr_struct"].append(CodeLine(line_no, line, indent_level, None))
5✔
559

560
        # Append last snippet
561
        if state["curr_struct"]:
5✔
562
            self._flush_snippet(state["curr_struct"], state["snippet_dicts"], buffer)
5✔
563

564
        snippet_dicts = self._post_processing(state["snippet_dicts"])
5✔
565
        if self.verbose:
5✔
NEW
566
            logger.info(
×
567
                "Extracted {} structural blocks from source", len(snippet_dicts)
568
            )
569

570
        return snippet_dicts, cumulative_lengths
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc