• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sqlfluff / sqlfluff / 7229139733

11 Dec 2023 09:15PM UTC coverage: 99.914% (-0.09%) from 100.0%
7229139733

push

github

web-flow
Clickhouse Dialect - Support BackQuoted Identifiers (#5457)

18694 of 18710 relevant lines covered (99.91%)

2.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.66
/src/sqlfluff/core/templaters/python.py
1
"""Defines the templaters."""
3✔
2

3
import ast
3✔
4
from string import Formatter
3✔
5
from typing import (
3✔
6
    Any,
7
    Callable,
8
    Dict,
9
    Iterable,
10
    Iterator,
11
    List,
12
    NamedTuple,
13
    Optional,
14
    Tuple,
15
)
16

17
from sqlfluff.core.errors import SQLTemplaterError
3✔
18
from sqlfluff.core.helpers.slice import offset_slice, zero_slice
3✔
19
from sqlfluff.core.helpers.string import findall
3✔
20
from sqlfluff.core.templaters.base import (
3✔
21
    RawFileSlice,
22
    RawTemplater,
23
    TemplatedFile,
24
    TemplatedFileSlice,
25
    large_file_check,
26
    templater_logger,
27
)
28

29

30
class IntermediateFileSlice(NamedTuple):
3✔
31
    """An intermediate representation of a partially sliced File."""
3✔
32

33
    intermediate_type: str
3✔
34
    source_slice: slice
3✔
35
    templated_slice: slice
3✔
36
    slice_buffer: List[RawFileSlice]
3✔
37

38
    def _trim_end(
3✔
39
        self, templated_str: str, target_end: str = "head"
40
    ) -> Tuple["IntermediateFileSlice", List[TemplatedFileSlice]]:
41
        """Trim the ends of a intermediate segment."""
42
        target_idx = 0 if target_end == "head" else -1
1✔
43
        terminator_types = ("block_start") if target_end == "head" else ("block_end")
1✔
44
        main_source_slice = self.source_slice
1✔
45
        main_templated_slice = self.templated_slice
1✔
46
        slice_buffer = self.slice_buffer
1✔
47

48
        end_buffer = []
1✔
49

50
        # Yield any leading literals, comments or blocks.
51
        while len(slice_buffer) > 0 and slice_buffer[target_idx].slice_type in (
1✔
52
            "literal",
53
            "block_start",
54
            "block_end",
55
            "comment",
56
        ):
57
            focus = slice_buffer[target_idx]
1✔
58
            templater_logger.debug("            %s Focus: %s", target_end, focus)
1✔
59
            # Is it a zero length item?
60
            if focus.slice_type in ("block_start", "block_end", "comment"):
1✔
61
                # Only add the length in the source space.
62
                templated_len = 0
1✔
63
            else:
64
                # Assume it's a literal, check the literal actually matches.
65
                templated_len = len(focus.raw)
1✔
66
                if target_end == "head":
1✔
67
                    check_slice = offset_slice(
1✔
68
                        main_templated_slice.start,
69
                        templated_len,
70
                    )
71
                else:
72
                    check_slice = slice(
1✔
73
                        main_templated_slice.stop - templated_len,
74
                        main_templated_slice.stop,
75
                    )
76

77
                if templated_str[check_slice] != focus.raw:
1✔
78
                    # It doesn't match, we can't use it. break
79
                    templater_logger.debug("                Nope")
1✔
80
                    break
1✔
81

82
            # If it does match, set up the new slices
83
            if target_end == "head":
1✔
84
                division = (
1✔
85
                    main_source_slice.start + len(focus.raw),
86
                    main_templated_slice.start + templated_len,
87
                )
88
                new_slice = TemplatedFileSlice(
1✔
89
                    focus.slice_type,
90
                    slice(main_source_slice.start, division[0]),
91
                    slice(main_templated_slice.start, division[1]),
92
                )
93
                end_buffer.append(new_slice)
1✔
94
                main_source_slice = slice(division[0], main_source_slice.stop)
1✔
95
                main_templated_slice = slice(division[1], main_templated_slice.stop)
1✔
96
            else:
97
                division = (
1✔
98
                    main_source_slice.stop - len(focus.raw),
99
                    main_templated_slice.stop - templated_len,
100
                )
101
                new_slice = TemplatedFileSlice(
1✔
102
                    focus.slice_type,
103
                    slice(division[0], main_source_slice.stop),
104
                    slice(division[1], main_templated_slice.stop),
105
                )
106
                end_buffer.insert(0, new_slice)
1✔
107
                main_source_slice = slice(main_source_slice.start, division[0])
1✔
108
                main_templated_slice = slice(main_templated_slice.start, division[1])
1✔
109

110
            slice_buffer.pop(target_idx)
1✔
111
            if focus.slice_type in terminator_types:
1✔
112
                break
1✔
113
        # Return a new Intermediate slice and the buffer.
114
        # NB: Don't check size of slice buffer here. We can do that later.
115
        new_intermediate = self.__class__(
1✔
116
            "compound", main_source_slice, main_templated_slice, slice_buffer
117
        )
118
        return new_intermediate, end_buffer
1✔
119

120
    def trim_ends(
3✔
121
        self, templated_str: str
122
    ) -> Tuple[
123
        List[TemplatedFileSlice], "IntermediateFileSlice", List[TemplatedFileSlice]
124
    ]:
125
        """Trim both ends of an intermediate slice."""
126
        # Trim start:
127
        new_slice, head_buffer = self._trim_end(
1✔
128
            templated_str=templated_str, target_end="head"
129
        )
130
        # Trim end:
131
        new_slice, tail_buffer = new_slice._trim_end(
1✔
132
            templated_str=templated_str, target_end="tail"
133
        )
134
        # Return
135
        return head_buffer, new_slice, tail_buffer
1✔
136

137
    def try_simple(self) -> TemplatedFileSlice:
3✔
138
        """Try to turn this intermediate slice into a simple slice."""
139
        # Yield anything simple
140
        if len(self.slice_buffer) == 1:
2✔
141
            return TemplatedFileSlice(
2✔
142
                self.slice_buffer[0].slice_type,
143
                self.source_slice,
144
                self.templated_slice,
145
            )
146
        else:
147
            raise ValueError("IntermediateFileSlice is not simple!")
1✔
148

149
    def coalesce(self) -> TemplatedFileSlice:
3✔
150
        """Coalesce this whole slice into a single one. Brutally."""
151
        return TemplatedFileSlice(
1✔
152
            PythonTemplater._coalesce_types(self.slice_buffer),
153
            self.source_slice,
154
            self.templated_slice,
155
        )
156

157

158
class PythonTemplater(RawTemplater):
3✔
159
    """A templater using python format strings.
3✔
160

161
    See: https://docs.python.org/3/library/string.html#format-string-syntax
162

163
    For the python templater we don't allow functions or macros because there isn't
164
    a good way of doing it securely. Use the jinja templater for this.
165

166
    The python templater also defines a lot of the logic for how
167
    to allow fixing and translation in a templated file.
168
    """
169

170
    name = "python"
3✔
171

172
    def __init__(self, override_context=None, **kwargs) -> None:
3✔
173
        self.default_context = dict(test_value="__test__")
3✔
174
        self.override_context = override_context or {}
3✔
175

176
    @staticmethod
3✔
177
    def infer_type(s) -> Any:
3✔
178
        """Infer a python type from a string and convert.
179

180
        Given a string value, convert it to a more specific built-in Python type
181
        (e.g. int, float, list, dictionary) if possible.
182

183
        """
184
        try:
2✔
185
            return ast.literal_eval(s)
2✔
186
        except (SyntaxError, ValueError):
2✔
187
            return s
2✔
188

189
    def get_context(self, fname=None, config=None, **kw) -> Dict:
3✔
190
        """Get the templating context from the config.
191

192
        This function retrieves the templating context from the config by
193
        loading the config and updating the live_context dictionary with the
194
        loaded_context and other predefined context dictionaries. It then goes
195
        through the loaded_context dictionary and infers the types of the values
196
        before returning the live_context dictionary.
197

198
        Args:
199
            fname (str, optional): The file name.
200
            config (dict, optional): The config dictionary.
201
            **kw: Additional keyword arguments.
202

203
        Returns:
204
            dict: The templating context.
205
        """
206
        # TODO: The config loading should be done outside the templater code. Here
207
        # is a silly place.
208
        if config:
2✔
209
            # This is now a nested section
210
            loaded_context = (
2✔
211
                config.get_section((self.templater_selector, self.name, "context"))
212
                or {}
213
            )
214
        else:
215
            loaded_context = {}
1✔
216
        live_context = {}
2✔
217
        live_context.update(self.default_context)
2✔
218
        live_context.update(loaded_context)
2✔
219
        live_context.update(self.override_context)
2✔
220

221
        # Infer types
222
        for k in loaded_context:
2✔
223
            live_context[k] = self.infer_type(live_context[k])
2✔
224
        return live_context
2✔
225

226
    @large_file_check
3✔
227
    def process(
3✔
228
        self, *, in_str: str, fname: str, config=None, formatter=None
229
    ) -> Tuple[Optional[TemplatedFile], List]:
230
        """Process a string and return a TemplatedFile.
231

232
        Note that the arguments are enforced as keywords
233
        because Templaters can have differences in their
234
        `process` method signature.
235
        A Templater that only supports reading from a file
236
        would need the following signature:
237
            process(*, fname, in_str=None, config=None)
238
        (arguments are swapped)
239

240
        Args:
241
            in_str (:obj:`str`): The input string.
242
            fname (:obj:`str`, optional): The filename of this string. This is
243
                mostly for loading config files at runtime.
244
            config (:obj:`FluffConfig`): A specific config to use for this
245
                templating operation. Only necessary for some templaters.
246
            formatter (:obj:`CallbackFormatter`): Optional object for output.
247

248
        """
249
        live_context = self.get_context(fname=fname, config=config)
2✔
250

251
        def render_func(raw_str: str) -> str:
2✔
252
            """Render the string using the captured live_context."""
253
            try:
2✔
254
                rendered_str = raw_str.format(**live_context)
2✔
255
            except KeyError as err:
1✔
256
                raise SQLTemplaterError(
1✔
257
                    "Failure in Python templating: {}. Have you configured your "
258
                    "variables? https://docs.sqlfluff.com/en/stable/"
259
                    "configuration.html#templating-configuration".format(err)
260
                )
261
            return rendered_str
2✔
262

263
        raw_sliced, sliced_file, new_str = self.slice_file(
2✔
264
            in_str,
265
            render_func=render_func,
266
            config=config,
267
        )
268
        return (
2✔
269
            TemplatedFile(
270
                source_str=in_str,
271
                templated_str=new_str,
272
                fname=fname,
273
                sliced_file=sliced_file,
274
                raw_sliced=raw_sliced,
275
            ),
276
            [],
277
        )
278

279
    def slice_file(
3✔
280
        self, raw_str: str, render_func: Callable[[str], str], config=None, **kwargs
281
    ) -> Tuple[List[RawFileSlice], List[TemplatedFileSlice], str]:
282
        """Slice the file to determine regions where we can fix."""
283
        templater_logger.info("Slicing File Template")
2✔
284
        templater_logger.debug("    Raw String: %r", raw_str)
2✔
285
        # Render the templated string.
286
        # NOTE: This seems excessive in this simple example, but for other templating
287
        # engines we need more control over the rendering so may need to call this
288
        # method more than once.
289
        templated_str = render_func(raw_str)
2✔
290
        templater_logger.debug("    Templated String: %r", templated_str)
2✔
291
        # Slice the raw file
292
        raw_sliced = list(self._slice_template(raw_str))
2✔
293
        templater_logger.debug("    Raw Sliced:")
2✔
294
        for idx, raw_slice in enumerate(raw_sliced):
2✔
295
            templater_logger.debug("        %s: %r", idx, raw_slice)
2✔
296
        # Find the literals
297
        literals = [
2✔
298
            raw_slice.raw
299
            for raw_slice in raw_sliced
300
            if raw_slice.slice_type == "literal"
301
        ]
302
        templater_logger.debug("    Literals: %s", literals)
2✔
303
        for loop_idx in range(2):
2✔
304
            templater_logger.debug("    # Slice Loop %s", loop_idx)
2✔
305
            # Calculate occurrences
306
            raw_occurrences = self._substring_occurrences(raw_str, literals)
2✔
307
            templated_occurrences = self._substring_occurrences(templated_str, literals)
2✔
308
            templater_logger.debug(
2✔
309
                "    Occurrences: Raw: %s, Templated: %s",
310
                raw_occurrences,
311
                templated_occurrences,
312
            )
313
            # Split on invariants
314
            split_sliced = list(
2✔
315
                self._split_invariants(
316
                    raw_sliced,
317
                    literals,
318
                    raw_occurrences,
319
                    templated_occurrences,
320
                    templated_str,
321
                )
322
            )
323
            templater_logger.debug("    Split Sliced:")
2✔
324
            for idx, split_slice in enumerate(split_sliced):
2✔
325
                templater_logger.debug("        %s: %r", idx, split_slice)
2✔
326
            # Deal with uniques and coalesce the rest
327
            sliced_file = list(
2✔
328
                self._split_uniques_coalesce_rest(
329
                    split_sliced, raw_occurrences, templated_occurrences, templated_str
330
                )
331
            )
332
            templater_logger.debug("    Fully Sliced:")
2✔
333
            for idx, templ_slice in enumerate(sliced_file):
2✔
334
                templater_logger.debug("        %s: %r", idx, templ_slice)
2✔
335
            unwrap_wrapped = (
2✔
336
                True
337
                if config is None
338
                else config.get(
339
                    "unwrap_wrapped_queries", section="templater", default=True
340
                )
341
            )
342
            sliced_file, new_templated_str = self._check_for_wrapped(
2✔
343
                sliced_file, templated_str, unwrap_wrapped=unwrap_wrapped
344
            )
345
            if new_templated_str == templated_str:
2✔
346
                # If we didn't change it then we're done.
347
                break
2✔
348
            else:
349
                # If it's not equal, loop around
350
                templated_str = new_templated_str
1✔
351
        return raw_sliced, sliced_file, new_templated_str
2✔
352

353
    @classmethod
3✔
354
    def _check_for_wrapped(
3✔
355
        cls,
356
        slices: List[TemplatedFileSlice],
357
        templated_str: str,
358
        unwrap_wrapped: bool = True,
359
    ) -> Tuple[List[TemplatedFileSlice], str]:
360
        """Identify a wrapped query (e.g. dbt test) and handle it.
361

362
        If unwrap_wrapped is true, we trim the wrapping from the templated
363
        file.
364
        If unwrap_wrapped is false, we add a slice at start and end.
365
        """
366
        if not slices:
2✔
367
            # If there are no slices, return
368
            return slices, templated_str
1✔
369
        first_slice = slices[0]
2✔
370
        last_slice = slices[-1]
2✔
371

372
        if unwrap_wrapped:
2✔
373
            # If we're unwrapping, there is no need to edit the slices, but we do need
374
            # to trim the templated string. We should expect that the template will need
375
            # to be re-sliced but we should assume that the function calling this one
376
            # will deal with that eventuality.
377
            return (
2✔
378
                slices,
379
                templated_str[
380
                    first_slice.templated_slice.start : last_slice.templated_slice.stop
381
                ],
382
            )
383

384
        if (
1✔
385
            first_slice.source_slice.start == 0
386
            and first_slice.templated_slice.start != 0
387
        ):
388
            # This means that there is text at the start of the templated file which
389
            # doesn't exist in the raw file. Handle this by adding a templated slice
390
            # (though it's not really templated) between 0 and 0 in the raw, and 0 and
391
            # the current first slice start index in the templated.
392
            slices.insert(
1✔
393
                0,
394
                TemplatedFileSlice(
395
                    "templated",
396
                    slice(0, 0),
397
                    slice(0, first_slice.templated_slice.start),
398
                ),
399
            )
400
        if last_slice.templated_slice.stop != len(templated_str):
1✔
401
            # This means that there is text at the end of the templated file which
402
            # doesn't exist in the raw file. Handle this by adding a templated slice
403
            # beginning and ending at the end of the raw, and the current last slice
404
            # stop and file end in the templated.
405
            slices.append(
1✔
406
                TemplatedFileSlice(
407
                    "templated",
408
                    zero_slice(last_slice.source_slice.stop),
409
                    slice(last_slice.templated_slice.stop, len(templated_str)),
410
                )
411
            )
412
        return slices, templated_str
1✔
413

414
    @classmethod
3✔
415
    def _substring_occurrences(
3✔
416
        cls, in_str: str, substrings: Iterable[str]
417
    ) -> Dict[str, List[int]]:
418
        """Find every occurrence of the given substrings."""
419
        occurrences = {}
2✔
420
        for substring in substrings:
2✔
421
            occurrences[substring] = list(findall(substring, in_str))
2✔
422
        return occurrences
2✔
423

424
    @staticmethod
3✔
425
    def _sorted_occurrence_tuples(
3✔
426
        occurrences: Dict[str, List[int]]
427
    ) -> List[Tuple[str, int]]:
428
        """Sort a dict of occurrences into a sorted list of tuples."""
429
        return sorted(
2✔
430
            ((raw, idx) for raw in occurrences.keys() for idx in occurrences[raw]),
431
            # Sort first by position, then by lexical (for stability)
432
            key=lambda x: (x[1], x[0]),
433
        )
434

435
    @classmethod
3✔
436
    def _slice_template(cls, in_str: str) -> Iterator[RawFileSlice]:
3✔
437
        """Slice a templated python string into token tuples.
438

439
        This uses Formatter() as per:
440
        https://docs.python.org/3/library/string.html#string.Formatter
441
        """
442
        fmt = Formatter()
2✔
443
        in_idx = 0
2✔
444
        for literal_text, field_name, format_spec, conversion in fmt.parse(in_str):
2✔
445
            if literal_text:
2✔
446
                escape_chars = cls._sorted_occurrence_tuples(
2✔
447
                    cls._substring_occurrences(literal_text, ["}", "{"])
448
                )
449
                idx = 0
2✔
450
                while escape_chars:
2✔
451
                    first_char = escape_chars.pop()
1✔
452
                    # Is there a literal first?
453
                    if first_char[1] > idx:
1✔
454
                        yield RawFileSlice(
1✔
455
                            literal_text[idx : first_char[1]], "literal", in_idx
456
                        )
457
                        in_idx += first_char[1] - idx
1✔
458
                    # Add the escaped
459
                    idx = first_char[1] + len(first_char[0])
1✔
460
                    # We double them here to make the raw
461
                    yield RawFileSlice(
1✔
462
                        literal_text[first_char[1] : idx] * 2, "escaped", in_idx
463
                    )
464
                    # Will always be 2 in this case.
465
                    # This is because ALL escape sequences in the python formatter
466
                    # are two characters which reduce to one.
467
                    in_idx += 2
1✔
468
                # Deal with last one (if present)
469
                if literal_text[idx:]:
2✔
470
                    yield RawFileSlice(literal_text[idx:], "literal", in_idx)
2✔
471
                    in_idx += len(literal_text) - idx
2✔
472
            # Deal with fields
473
            if field_name:
2✔
474
                constructed_token = "{{{field_name}{conv}{spec}}}".format(
2✔
475
                    field_name=field_name,
476
                    conv=f"!{conversion}" if conversion else "",
477
                    spec=f":{format_spec}" if format_spec else "",
478
                )
479
                yield RawFileSlice(constructed_token, "templated", in_idx)
2✔
480
                in_idx += len(constructed_token)
2✔
481

482
    @classmethod
3✔
483
    def _split_invariants(
3✔
484
        cls,
485
        raw_sliced: List[RawFileSlice],
486
        literals: List[str],
487
        raw_occurrences: Dict[str, List[int]],
488
        templated_occurrences: Dict[str, List[int]],
489
        templated_str: str,
490
    ) -> Iterator[IntermediateFileSlice]:
491
        """Split a sliced file on its invariant literals.
492

493
        We prioritise the _longest_ invariants first as they
494
        are more likely to the the anchors.
495
        """
496
        # Calculate invariants
497
        invariants = [
2✔
498
            literal
499
            for literal in literals
500
            if len(raw_occurrences[literal]) == 1
501
            and len(templated_occurrences[literal]) == 1
502
        ]
503
        # Work through the invariants and make sure they appear
504
        # in order.
505
        for linv in sorted(invariants, key=len, reverse=True):
2✔
506
            # Any invariants which have templated positions, relative
507
            # to source positions, which aren't in order, should be
508
            # ignored.
509

510
            # Is this one still relevant?
511
            if linv not in invariants:
2✔
512
                continue  # pragma: no cover
513

514
            source_pos, templ_pos = raw_occurrences[linv], templated_occurrences[linv]
2✔
515
            # Copy the list before iterating because we're going to edit it.
516
            for tinv in invariants.copy():
2✔
517
                if tinv != linv:
2✔
518
                    src_dir = source_pos > raw_occurrences[tinv]
2✔
519
                    tmp_dir = templ_pos > templated_occurrences[tinv]
2✔
520
                    # If it's not in the same direction in the source and template
521
                    # remove it.
522
                    if src_dir != tmp_dir:  # pragma: no cover
523
                        templater_logger.debug(
524
                            "          Invariant found out of order: %r", tinv
525
                        )
526
                        invariants.remove(tinv)
527

528
        # Set up some buffers
529
        buffer: List[RawFileSlice] = []
2✔
530
        idx: Optional[int] = None
2✔
531
        templ_idx = 0
2✔
532
        # Loop through
533
        for raw_file_slice in raw_sliced:
2✔
534
            if raw_file_slice.raw in invariants:
2✔
535
                if buffer:
2✔
536
                    yield IntermediateFileSlice(
2✔
537
                        "compound",
538
                        slice(idx, raw_file_slice.source_idx),
539
                        slice(templ_idx, templated_occurrences[raw_file_slice.raw][0]),
540
                        buffer,
541
                    )
542
                buffer = []
2✔
543
                idx = None
2✔
544
                yield IntermediateFileSlice(
2✔
545
                    "invariant",
546
                    offset_slice(
547
                        raw_file_slice.source_idx,
548
                        len(raw_file_slice.raw),
549
                    ),
550
                    offset_slice(
551
                        templated_occurrences[raw_file_slice.raw][0],
552
                        len(raw_file_slice.raw),
553
                    ),
554
                    [
555
                        RawFileSlice(
556
                            raw_file_slice.raw,
557
                            raw_file_slice.slice_type,
558
                            templated_occurrences[raw_file_slice.raw][0],
559
                        )
560
                    ],
561
                )
562
                templ_idx = templated_occurrences[raw_file_slice.raw][0] + len(
2✔
563
                    raw_file_slice.raw
564
                )
565
            else:
566
                buffer.append(
2✔
567
                    RawFileSlice(
568
                        raw_file_slice.raw,
569
                        raw_file_slice.slice_type,
570
                        raw_file_slice.source_idx,
571
                    )
572
                )
573
                if idx is None:
2✔
574
                    idx = raw_file_slice.source_idx
2✔
575
        # If we have a final buffer, yield it
576
        if buffer:
2✔
577
            yield IntermediateFileSlice(
1✔
578
                "compound",
579
                slice((idx or 0), (idx or 0) + sum(len(slc.raw) for slc in buffer)),
580
                slice(templ_idx, len(templated_str)),
581
                buffer,
582
            )
583

584
    @staticmethod
3✔
585
    def _filter_occurrences(
3✔
586
        file_slice: slice, occurrences: Dict[str, List[int]]
587
    ) -> Dict[str, List[int]]:
588
        """Filter a dict of occurrences to just those within a slice."""
589
        filtered = {
1✔
590
            key: [
591
                pos
592
                for pos in occurrences[key]
593
                if pos >= file_slice.start and pos < file_slice.stop
594
            ]
595
            for key in occurrences.keys()
596
        }
597
        return {key: filtered[key] for key in filtered.keys() if filtered[key]}
1✔
598

599
    @staticmethod
3✔
600
    def _coalesce_types(elems: List[RawFileSlice]) -> str:
3✔
601
        """Coalesce to the priority type."""
602
        # Make a set of types
603
        types = {elem.slice_type for elem in elems}
1✔
604
        # Replace block types with templated
605
        for typ in list(types):
1✔
606
            if typ.startswith("block_"):  # pragma: no cover
607
                types.remove(typ)
608
                types.add("templated")
609
        # Take the easy route if they're all the same type
610
        if len(types) == 1:
1✔
611
            return types.pop()
1✔
612
        # Then deal with priority
613
        priority = ["templated", "escaped", "literal"]
1✔
614
        for p in priority:
1✔
615
            if p in types:
1✔
616
                return p
1✔
617
        raise RuntimeError(
618
            f"Exhausted priorities in _coalesce_types! {types!r}"
619
        )  # pragma: no cover
620

621
    @classmethod
3✔
622
    def _split_uniques_coalesce_rest(
3✔
623
        cls,
624
        split_file: List[IntermediateFileSlice],
625
        raw_occurrences: Dict[str, List[int]],
626
        templ_occurrences: Dict[str, List[int]],
627
        templated_str: str,
628
    ) -> Iterator[TemplatedFileSlice]:
629
        """Within each of the compound sections split on unique literals.
630

631
        For everything else we coalesce to the dominant type.
632

633
        Returns:
634
            Iterable of the type of segment, the slice in the raw file
635
                and the slice in the templated file.
636

637
        """
638
        # A buffer to capture tail segments
639
        tail_buffer: List[TemplatedFileSlice] = []
2✔
640

641
        templater_logger.debug("    _split_uniques_coalesce_rest: %s", split_file)
2✔
642

643
        for int_file_slice in split_file:
2✔
644
            # Yield anything from the tail buffer
645
            if tail_buffer:  # pragma: no cover
646
                templater_logger.debug(
647
                    "        Yielding Tail Buffer [start]: %s", tail_buffer
648
                )
649
                yield from tail_buffer
650
                tail_buffer = []
651

652
            # Check whether we're handling a zero length slice.
653
            if (
654
                int_file_slice.templated_slice.stop
655
                - int_file_slice.templated_slice.start
656
                == 0
657
            ):  # pragma: no cover
658
                point_combo = int_file_slice.coalesce()
659
                templater_logger.debug(
660
                    "        Yielding Point Combination: %s", point_combo
661
                )
662
                yield point_combo
663
                continue
664

665
            # Yield anything simple
666
            try:
2✔
667
                simple_elem = int_file_slice.try_simple()
2✔
668
                templater_logger.debug("        Yielding Simple: %s", simple_elem)
2✔
669
                yield simple_elem
2✔
670
                continue
2✔
671
            except ValueError:
1✔
672
                pass
1✔
673

674
            # Trim ends and overwrite the current working copy.
675
            head_buffer, int_file_slice, tail_buffer = int_file_slice.trim_ends(
1✔
676
                templated_str=templated_str
677
            )
678
            if head_buffer:
1✔
679
                yield from head_buffer  # pragma: no cover
680
            # Have we consumed the whole thing?
681
            if not int_file_slice.slice_buffer:
1✔
682
                continue  # pragma: no cover
683

684
            # Try to yield simply again (post trim)
685
            try:  # pragma: no cover
686
                simple_elem = int_file_slice.try_simple()
687
                templater_logger.debug("        Yielding Simple: %s", simple_elem)
688
                yield simple_elem
689
                continue
690
            except ValueError:
1✔
691
                pass
1✔
692

693
            templater_logger.debug("        Intermediate Slice: %s", int_file_slice)
1✔
694
            # Generate the coalesced version in case we need it
695
            coalesced = int_file_slice.coalesce()
1✔
696

697
            # Look for anchors
698
            raw_occs = cls._filter_occurrences(
1✔
699
                int_file_slice.source_slice, raw_occurrences
700
            )
701
            templ_occs = cls._filter_occurrences(
1✔
702
                int_file_slice.templated_slice, templ_occurrences
703
            )
704
            # Do we have any uniques to split on?
705
            # NB: We use `get` on the templated occurrences, because it's possible
706
            # that because of an if statement, something is in the source, but
707
            # not in the templated at all. In that case, we shouldn't use it.
708
            one_way_uniques = [
1✔
709
                key
710
                for key in raw_occs.keys()
711
                if len(raw_occs[key]) == 1 and len(templ_occs.get(key, [])) >= 1
712
            ]
713
            two_way_uniques = [
1✔
714
                key for key in one_way_uniques if len(templ_occs[key]) == 1
715
            ]
716
            # if we don't have anything to anchor on, then just return (coalescing
717
            # types)
718
            if not raw_occs or not templ_occs or not one_way_uniques:
1✔
719
                templater_logger.debug(
1✔
720
                    "        No Anchors or Uniques. Yielding Whole: %s", coalesced
721
                )
722
                yield coalesced
1✔
723
                continue
1✔
724

725
            # Deal with the inner segment itself.
726
            templater_logger.debug(
1✔
727
                "        Intermediate Slice [post trim]: %s: %r",
728
                int_file_slice,
729
                templated_str[int_file_slice.templated_slice],
730
            )
731
            templater_logger.debug("        One Way Uniques: %s", one_way_uniques)
1✔
732
            templater_logger.debug("        Two Way Uniques: %s", two_way_uniques)
1✔
733

734
            # Hang onto the starting position, which we'll advance as we go.
735
            starts = (
1✔
736
                int_file_slice.source_slice.start,
737
                int_file_slice.templated_slice.start,
738
            )
739

740
            # Deal with two way uniques first, because they are easier.
741
            # If we do find any we use recursion, because we'll want to do
742
            # all of the above checks again.
743
            if two_way_uniques:
1✔
744
                # Yield the uniques and coalesce anything between.
745
                bookmark_idx = 0
1✔
746
                for idx, raw_slice in enumerate(int_file_slice.slice_buffer):
1✔
747
                    pos = 0
1✔
748
                    unq: Optional[str] = None
1✔
749
                    # Does this element contain one of our uniques? If so, where?
750
                    for unique in two_way_uniques:
1✔
751
                        if unique in raw_slice.raw:
1✔
752
                            pos = raw_slice.raw.index(unique)
1✔
753
                            unq = unique
1✔
754

755
                    if unq:
1✔
756
                        # Yes it does. Handle it.
757

758
                        # Get the position of the unique section.
759
                        unique_position = (
1✔
760
                            raw_occs[unq][0],
761
                            templ_occs[unq][0],
762
                        )
763
                        templater_logger.debug(
1✔
764
                            "            Handling Unique: %r, %s, %s, %r",
765
                            unq,
766
                            pos,
767
                            unique_position,
768
                            raw_slice,
769
                        )
770

771
                        # Handle full slices up to this one
772
                        if idx > bookmark_idx:
1✔
773
                            # Recurse to deal with any loops separately
774
                            yield from cls._split_uniques_coalesce_rest(
1✔
775
                                [
776
                                    IntermediateFileSlice(
777
                                        "compound",
778
                                        # slice up to this unique
779
                                        slice(starts[0], unique_position[0] - pos),
780
                                        slice(starts[1], unique_position[1] - pos),
781
                                        int_file_slice.slice_buffer[bookmark_idx:idx],
782
                                    )
783
                                ],
784
                                raw_occs,
785
                                templ_occs,
786
                                templated_str,
787
                            )
788

789
                        # Handle any potential partial slice if we're part way through
790
                        # this one.
791
                        if pos > 0:
1✔
792
                            yield TemplatedFileSlice(
1✔
793
                                raw_slice.slice_type,
794
                                slice(unique_position[0] - pos, unique_position[0]),
795
                                slice(unique_position[1] - pos, unique_position[1]),
796
                            )
797

798
                        # Handle the unique itself and update the bookmark
799
                        starts = (
1✔
800
                            unique_position[0] + len(unq),
801
                            unique_position[1] + len(unq),
802
                        )
803
                        yield TemplatedFileSlice(
1✔
804
                            raw_slice.slice_type,
805
                            slice(unique_position[0], starts[0]),
806
                            slice(unique_position[1], starts[1]),
807
                        )
808
                        # Move the bookmark after this position
809
                        bookmark_idx = idx + 1
1✔
810

811
                        # Handle any remnant after the unique.
812
                        if raw_slice.raw[pos + len(unq) :]:
1✔
813
                            remnant_length = len(raw_slice.raw) - (len(unq) + pos)
1✔
814
                            _starts = starts
1✔
815
                            starts = (
1✔
816
                                starts[0] + remnant_length,
817
                                starts[1] + remnant_length,
818
                            )
819
                            yield TemplatedFileSlice(
1✔
820
                                raw_slice.slice_type,
821
                                slice(_starts[0], starts[0]),
822
                                slice(_starts[1], starts[1]),
823
                            )
824

825
                if bookmark_idx == 0:  # pragma: no cover
826
                    # This is a SAFETY VALVE. In Theory we should never be here
827
                    # and if we are it implies an error elsewhere. This clause
828
                    # should stop any potential infinite recursion in its tracks
829
                    # by simply classifying the whole of the current block as
830
                    # templated and just stopping here.
831
                    # Bugs triggering this eventuality have been observed in 0.4.0.
832
                    templater_logger.info(
833
                        "        Safety Value Info: %s, %r",
834
                        two_way_uniques,
835
                        templated_str[int_file_slice.templated_slice],
836
                    )
837
                    templater_logger.warning(
838
                        "        Python templater safety value unexpectedly triggered. "
839
                        "Please report your raw and compiled query on github for "
840
                        "debugging."
841
                    )
842
                    # NOTE: If a bug is reported here, this will incorrectly
843
                    # classify more of the query as "templated" than it should.
844
                    yield coalesced
845
                    continue
846

847
                # At the end of the loop deal with any remaining slices.
848
                # The above "Safety Valve"TM should keep us safe from infinite
849
                # recursion.
850
                if len(int_file_slice.slice_buffer) > bookmark_idx:
1✔
851
                    # Recurse to deal with any loops separately
852
                    yield from cls._split_uniques_coalesce_rest(
1✔
853
                        [
854
                            IntermediateFileSlice(
855
                                "compound",
856
                                # Slicing is easy here, we have no choice
857
                                slice(starts[0], int_file_slice.source_slice.stop),
858
                                slice(starts[1], int_file_slice.templated_slice.stop),
859
                                # Calculate the subsection to deal with.
860
                                int_file_slice.slice_buffer[
861
                                    bookmark_idx : len(int_file_slice.slice_buffer)
862
                                ],
863
                            )
864
                        ],
865
                        raw_occs,
866
                        templ_occs,
867
                        templated_str,
868
                    )
869
                # We continue here because the buffer should be exhausted,
870
                # and if there's more to do we'll do it in the recursion.
871
                continue
1✔
872

873
            # If we get here, then there ARE uniques, but they are only ONE WAY.
874
            # This means loops. Loops are tricky.
875
            # We're very unlikely to get here (impossible?) with just python
876
            # formatting, but this class is also the base for the jinja templater
877
            # (and others?) so it may be used there.
878
            # One way uniques give us landmarks to try and estimate what to do with
879
            # them.
880
            owu_templ_tuples = cls._sorted_occurrence_tuples(  # pragma: no cover
881
                {key: templ_occs[key] for key in one_way_uniques}
882
            )
883

884
            templater_logger.debug(  # pragma: no cover
885
                "        Handling One Way Uniques: %s", owu_templ_tuples
886
            )
887

888
            # Hang onto out *ending* position too from here.
889
            stops = (  # pragma: no cover
890
                int_file_slice.source_slice.stop,
891
                int_file_slice.templated_slice.stop,
892
            )
893

894
            # OWU in this context refers to "One Way Unique"
895
            this_owu_idx: Optional[int] = None  # pragma: no cover
896
            last_owu_idx: Optional[int] = None  # pragma: no cover
897
            # Iterate through occurrence tuples of the one-way uniques.
898
            for raw, template_idx in owu_templ_tuples:  # pragma: no cover
899
                raw_idx = raw_occs[raw][0]
900
                raw_len = len(raw)
901

902
                # Find the index of this owu in the slice_buffer, store the previous
903
                last_owu_idx = this_owu_idx
904
                try:
905
                    this_owu_idx = next(
906
                        idx
907
                        for idx, slc in enumerate(int_file_slice.slice_buffer)
908
                        if slc.raw == raw
909
                    )
910
                except StopIteration:  # pragma: no cover
911
                    # This can happen if the unique was detected, but was introduced
912
                    # by a templater step. This is a false positive. Skip and move on.
913
                    templater_logger.info(
914
                        "One Way Unique %r not found in slice buffer. Skipping...", raw
915
                    )
916
                    continue
917

918
                templater_logger.debug(
919
                    "        Handling OWU: %r @%s (raw @%s) [this_owu_idx: %s, "
920
                    "last_owu_dx: %s]",
921
                    raw,
922
                    template_idx,
923
                    raw_idx,
924
                    this_owu_idx,
925
                    last_owu_idx,
926
                )
927

928
                if template_idx > starts[1]:
929
                    # Yield the bit before this literal. We yield it
930
                    # all as a tuple, because if we could do any better
931
                    # we would have done it by now.
932

933
                    # Can we identify a meaningful portion of the patch
934
                    # to recurse a split?
935
                    sub_section: Optional[List[RawFileSlice]] = None
936
                    # If it's the start, the slicing is easy
937
                    if (
938
                        starts[1] == int_file_slice.templated_slice.stop
939
                    ):  # pragma: no cover TODO?
940
                        sub_section = int_file_slice.slice_buffer[:this_owu_idx]
941
                    # If we are AFTER the previous in the template, then it's
942
                    # also easy. [assuming it's not the same owu]
943
                    elif (
944
                        raw_idx > starts[0] and last_owu_idx != this_owu_idx
945
                    ):  # pragma: no cover
946
                        if last_owu_idx:
947
                            sub_section = int_file_slice.slice_buffer[
948
                                last_owu_idx + 1 : this_owu_idx
949
                            ]
950
                        else:
951
                            sub_section = int_file_slice.slice_buffer[:this_owu_idx]
952

953
                    # If we succeeded in one of the above, we can also recurse
954
                    # and be more intelligent with the other sections.
955
                    if sub_section:
956
                        templater_logger.debug(
957
                            "        Attempting Subsplit [pre]: %s, %r",
958
                            sub_section,
959
                            templated_str[slice(starts[1], template_idx)],
960
                        )
961
                        yield from cls._split_uniques_coalesce_rest(
962
                            [
963
                                IntermediateFileSlice(
964
                                    "compound",
965
                                    # Slicing is easy here, we have no choice
966
                                    slice(starts[0], raw_idx),
967
                                    slice(starts[1], template_idx),
968
                                    sub_section,
969
                                )
970
                            ],
971
                            raw_occs,
972
                            templ_occs,
973
                            templated_str,
974
                        )
975
                    # Otherwise, it's the tricky case.
976
                    else:
977
                        # In this case we've found a literal, coming AFTER another
978
                        # in the templated version, but BEFORE (or the same) in the
979
                        # raw version. This only happens during loops, but it means
980
                        # that identifying exactly what the intervening bit refers
981
                        # to is a bit arbitrary. In this case we're going to OVER
982
                        # estimate and refer to the whole loop segment.
983

984
                        # TODO: Maybe this should make two chunks instead, one
985
                        # working backward, and one working forward. But that's
986
                        # a job for another day.
987

988
                        # First find where we are starting this remainder
989
                        # in the template (as an index in the buffer).
990
                        # Any segments *after* cur_idx are involved.
991
                        if last_owu_idx is None or last_owu_idx + 1 >= len(
992
                            int_file_slice.slice_buffer
993
                        ):
994
                            cur_idx = 0
995
                        else:
996
                            cur_idx = last_owu_idx + 1
×
997

998
                        # We need to know how many block_ends are after this.
999
                        block_ends = sum(
×
1000
                            slc.slice_type == "block_end"
1001
                            for slc in int_file_slice.slice_buffer[cur_idx:]
1002
                        )
1003
                        # We can allow up to this number of preceding block starts
1004
                        block_start_indices = [
×
1005
                            idx
1006
                            for idx, slc in enumerate(
1007
                                int_file_slice.slice_buffer[:cur_idx]
1008
                            )
1009
                            if slc.slice_type == "block_start"
1010
                        ]
1011

1012
                        # Trim anything which we're not allowed to use.
1013
                        if len(block_start_indices) > block_ends:  # pragma: no cover
1014
                            offset = block_start_indices[-1 - block_ends] + 1
1015
                            elem_sub_buffer = int_file_slice.slice_buffer[offset:]
1016
                            cur_idx -= offset
1017
                        else:
1018
                            elem_sub_buffer = int_file_slice.slice_buffer
×
1019

1020
                        # We also need to know whether any of the *starting*
1021
                        # segments are involved.
1022
                        # Anything up to start_idx (exclusive) is included.
1023
                        include_start = raw_idx > elem_sub_buffer[0].source_idx
×
1024

1025
                        # The ending point of this slice, is already decided.
1026
                        end_point = elem_sub_buffer[-1].end_source_idx()
×
1027

1028
                        # If start_idx is None, we're in luck. We don't need to include
1029
                        # the beginning.
1030
                        if include_start:
×
1031
                            start_point = elem_sub_buffer[0].source_idx
×
1032
                        # Otherwise we know it's looped round, we need to include the
1033
                        # whole slice.
1034
                        else:  # pragma: no cover
1035
                            start_point = elem_sub_buffer[cur_idx].source_idx
1036

1037
                        tricky = TemplatedFileSlice(
×
1038
                            "templated",
1039
                            slice(start_point, end_point),
1040
                            slice(starts[1], template_idx),
1041
                        )
1042

1043
                        templater_logger.debug(
×
1044
                            "        Yielding Tricky Case : %s",
1045
                            tricky,
1046
                        )
1047

1048
                        yield tricky
×
1049

1050
                # Yield the literal
1051
                owu_literal_slice = TemplatedFileSlice(
×
1052
                    "literal",
1053
                    offset_slice(raw_idx, raw_len),
1054
                    offset_slice(template_idx, raw_len),
1055
                )
1056
                templater_logger.debug(
×
1057
                    "    Yielding Unique: %r, %s",
1058
                    raw,
1059
                    owu_literal_slice,
1060
                )
1061
                yield owu_literal_slice
×
1062
                # Update our bookmark
1063
                starts = (
×
1064
                    raw_idx + raw_len,
1065
                    template_idx + raw_len,
1066
                )
1067

1068
            if starts[1] < stops[1] and last_owu_idx is not None:  # pragma: no cover
1069
                # Yield the end bit
1070
                templater_logger.debug("        Attempting Subsplit [post].")
1071
                yield from cls._split_uniques_coalesce_rest(
1072
                    [
1073
                        IntermediateFileSlice(
1074
                            "compound",
1075
                            # Slicing is easy here, we have no choice
1076
                            slice(raw_idx + raw_len, stops[0]),
1077
                            slice(starts[1], stops[1]),
1078
                            int_file_slice.slice_buffer[last_owu_idx + 1 :],
1079
                        )
1080
                    ],
1081
                    raw_occs,
1082
                    templ_occs,
1083
                    templated_str,
1084
                )
1085

1086
        # Yield anything from the tail buffer
1087
        if tail_buffer:  # pragma: no cover
1088
            templater_logger.debug(
1089
                "        Yielding Tail Buffer [end]: %s", tail_buffer
1090
            )
1091
            yield from tail_buffer
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc