• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 8ac704ce-924d-4415-96d0-a7a53cd460d1

pending completion
8ac704ce-924d-4415-96d0-a7a53cd460d1

Pull #566

circle-ci

xzdandy
Merge branch 'obj-tracking' of github.com:georgia-tech-db/eva into obj-tracking
Pull Request #566: feat: object tracking

155 of 155 new or added lines in 16 files covered. (100.0%)

9371 of 9588 relevant lines covered (97.74%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.83
/eva/optimizer/rules/rules.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from __future__ import annotations
1✔
16

17
from typing import TYPE_CHECKING
1✔
18

19
from eva.catalog.catalog_manager import CatalogManager
1✔
20
from eva.catalog.catalog_type import TableType
1✔
21
from eva.catalog.catalog_utils import is_video_table
1✔
22
from eva.constants import CACHEABLE_UDFS
1✔
23
from eva.expression.expression_utils import (
1✔
24
    conjunction_list_to_expression_tree,
25
    to_conjunction_list,
26
)
27
from eva.expression.function_expression import FunctionExpression
1✔
28
from eva.expression.tuple_value_expression import TupleValueExpression
1✔
29
from eva.optimizer.optimizer_utils import (
1✔
30
    check_expr_validity_for_cache,
31
    enable_cache,
32
    enable_cache_on_expression_tree,
33
    extract_equi_join_keys,
34
    extract_pushdown_predicate,
35
    extract_pushdown_predicate_for_alias,
36
    get_expression_execution_cost,
37
)
38
from eva.optimizer.rules.pattern import Pattern
1✔
39
from eva.optimizer.rules.rules_base import Promise, Rule, RuleType
1✔
40
from eva.parser.types import JoinType, ParserOrderBySortType
1✔
41
from eva.plan_nodes.apply_and_merge_plan import ApplyAndMergePlan
1✔
42
from eva.plan_nodes.create_mat_view_plan import CreateMaterializedViewPlan
1✔
43
from eva.plan_nodes.explain_plan import ExplainPlan
1✔
44
from eva.plan_nodes.hash_join_build_plan import HashJoinBuildPlan
1✔
45
from eva.plan_nodes.nested_loop_join_plan import NestedLoopJoinPlan
1✔
46
from eva.plan_nodes.predicate_plan import PredicatePlan
1✔
47
from eva.plan_nodes.project_plan import ProjectPlan
1✔
48
from eva.plan_nodes.show_info_plan import ShowInfoPlan
1✔
49

50
if TYPE_CHECKING:
51
    from eva.optimizer.optimizer_context import OptimizerContext
52

53
from eva.optimizer.operators import (
1✔
54
    Dummy,
55
    LogicalApplyAndMerge,
56
    LogicalCreate,
57
    LogicalCreateIndex,
58
    LogicalCreateMaterializedView,
59
    LogicalCreateUDF,
60
    LogicalDelete,
61
    LogicalDrop,
62
    LogicalDropUDF,
63
    LogicalExplain,
64
    LogicalExtractObject,
65
    LogicalFaissIndexScan,
66
    LogicalFilter,
67
    LogicalFunctionScan,
68
    LogicalGet,
69
    LogicalGroupBy,
70
    LogicalInsert,
71
    LogicalJoin,
72
    LogicalLimit,
73
    LogicalLoadData,
74
    LogicalOrderBy,
75
    LogicalProject,
76
    LogicalQueryDerivedGet,
77
    LogicalRename,
78
    LogicalSample,
79
    LogicalShow,
80
    LogicalUnion,
81
    Operator,
82
    OperatorType,
83
)
84
from eva.plan_nodes.create_index_plan import CreateIndexPlan
1✔
85
from eva.plan_nodes.create_plan import CreatePlan
1✔
86
from eva.plan_nodes.create_udf_plan import CreateUDFPlan
1✔
87
from eva.plan_nodes.delete_plan import DeletePlan
1✔
88
from eva.plan_nodes.drop_plan import DropPlan
1✔
89
from eva.plan_nodes.drop_udf_plan import DropUDFPlan
1✔
90
from eva.plan_nodes.faiss_index_scan_plan import FaissIndexScanPlan
1✔
91
from eva.plan_nodes.function_scan_plan import FunctionScanPlan
1✔
92
from eva.plan_nodes.groupby_plan import GroupByPlan
1✔
93
from eva.plan_nodes.hash_join_probe_plan import HashJoinProbePlan
1✔
94
from eva.plan_nodes.insert_plan import InsertPlan
1✔
95
from eva.plan_nodes.lateral_join_plan import LateralJoinPlan
1✔
96
from eva.plan_nodes.limit_plan import LimitPlan
1✔
97
from eva.plan_nodes.load_data_plan import LoadDataPlan
1✔
98
from eva.plan_nodes.orderby_plan import OrderByPlan
1✔
99
from eva.plan_nodes.rename_plan import RenamePlan
1✔
100
from eva.plan_nodes.seq_scan_plan import SeqScanPlan
1✔
101
from eva.plan_nodes.storage_plan import StoragePlan
1✔
102
from eva.plan_nodes.union_plan import UnionPlan
1✔
103

104
##############################################
105
# REWRITE RULES START
106

107

108
class EmbedFilterIntoGet(Rule):
1✔
109
    def __init__(self):
1✔
110
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
111
        pattern.append_child(Pattern(OperatorType.LOGICALGET))
1✔
112
        super().__init__(RuleType.EMBED_FILTER_INTO_GET, pattern)
1✔
113

114
    def promise(self):
1✔
115
        return Promise.EMBED_FILTER_INTO_GET
1✔
116

117
    def check(self, before: LogicalFilter, context: OptimizerContext):
1✔
118
        # System supports predicate pushdown only while reading video data
119
        predicate = before.predicate
1✔
120
        lget: LogicalGet = before.children[0]
1✔
121
        if predicate and is_video_table(lget.table_obj):
1✔
122
            # System only supports pushing basic range predicates on id
123
            video_alias = lget.video.alias
1✔
124
            col_alias = f"{video_alias}.id"
1✔
125
            pushdown_pred, _ = extract_pushdown_predicate(predicate, col_alias)
1✔
126
            if pushdown_pred:
1✔
127
                return True
1✔
128
        return False
1✔
129

130
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
131
        predicate = before.predicate
1✔
132
        lget = before.children[0]
1✔
133
        # System only supports pushing basic range predicates on id
134
        video_alias = lget.video.alias
1✔
135
        col_alias = f"{video_alias}.id"
1✔
136
        pushdown_pred, unsupported_pred = extract_pushdown_predicate(
1✔
137
            predicate, col_alias
138
        )
139
        if pushdown_pred:
1✔
140
            new_get_opr = LogicalGet(
1✔
141
                lget.video,
142
                lget.table_obj,
143
                alias=lget.alias,
144
                predicate=pushdown_pred,
145
                target_list=lget.target_list,
146
                sampling_rate=lget.sampling_rate,
147
                sampling_type=lget.sampling_type,
148
                children=lget.children,
149
            )
150
            if unsupported_pred:
1✔
151
                unsupported_opr = LogicalFilter(unsupported_pred)
1✔
152
                unsupported_opr.append_child(new_get_opr)
1✔
153
                new_get_opr = unsupported_opr
1✔
154
            yield new_get_opr
1✔
155
        else:
156
            yield before
1✔
157

158

159
class EmbedSampleIntoGet(Rule):
1✔
160
    def __init__(self):
1✔
161
        pattern = Pattern(OperatorType.LOGICALSAMPLE)
1✔
162
        pattern.append_child(Pattern(OperatorType.LOGICALGET))
1✔
163
        super().__init__(RuleType.EMBED_SAMPLE_INTO_GET, pattern)
1✔
164

165
    def promise(self):
1✔
166
        return Promise.EMBED_SAMPLE_INTO_GET
1✔
167

168
    def check(self, before: LogicalSample, context: OptimizerContext):
1✔
169
        # System supports sample pushdown only while reading video data
170
        lget: LogicalGet = before.children[0]
1✔
171
        if lget.table_obj.table_type == TableType.VIDEO_DATA:
1✔
172
            return True
1✔
173
        return False
1✔
174

175
    def apply(self, before: LogicalSample, context: OptimizerContext):
1✔
176
        sample_freq = before.sample_freq.value
1✔
177
        sample_type = before.sample_type.value.value if before.sample_type else None
1✔
178
        lget: LogicalGet = before.children[0]
1✔
179
        new_get_opr = LogicalGet(
1✔
180
            lget.video,
181
            lget.table_obj,
182
            alias=lget.alias,
183
            predicate=lget.predicate,
184
            target_list=lget.target_list,
185
            sampling_rate=sample_freq,
186
            sampling_type=sample_type,
187
            children=lget.children,
188
        )
189
        yield new_get_opr
1✔
190

191

192
class CacheFunctionExpressionInProject(Rule):
1✔
193
    def __init__(self):
1✔
194
        pattern = Pattern(OperatorType.LOGICALPROJECT)
1✔
195
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
196
        super().__init__(RuleType.CACHE_FUNCTION_EXPRESISON_IN_PROJECT, pattern)
1✔
197

198
    def promise(self):
1✔
199
        return Promise.CACHE_FUNCTION_EXPRESISON_IN_PROJECT
1✔
200

201
    def check(self, before: LogicalProject, context: OptimizerContext):
1✔
202
        valid_exprs = []
1✔
203
        for expr in before.target_list:
1✔
204
            if isinstance(expr, FunctionExpression):
1✔
205
                func_exprs = list(expr.find_all(FunctionExpression))
1✔
206
                valid_exprs.extend(
1✔
207
                    filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
208
                )
209

210
        if len(valid_exprs) > 0:
1✔
211
            return True
1✔
212
        return False
1✔
213

214
    def apply(self, before: LogicalProject, context: OptimizerContext):
1✔
215
        new_target_list = [expr.copy() for expr in before.target_list]
1✔
216
        for expr in new_target_list:
1✔
217
            enable_cache_on_expression_tree(expr)
1✔
218
        after = LogicalProject(target_list=new_target_list, children=before.children)
1✔
219
        yield after
1✔
220

221

222
class CacheFunctionExpressionInFilter(Rule):
1✔
223
    def __init__(self):
1✔
224
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
225
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
226
        super().__init__(RuleType.CACHE_FUNCTION_EXPRESISON_IN_FILTER, pattern)
1✔
227

228
    def promise(self):
1✔
229
        return Promise.CACHE_FUNCTION_EXPRESISON_IN_FILTER
1✔
230

231
    def check(self, before: LogicalFilter, context: OptimizerContext):
1✔
232
        func_exprs = list(before.predicate.find_all(FunctionExpression))
1✔
233

234
        valid_exprs = list(
1✔
235
            filter(lambda expr: check_expr_validity_for_cache(expr), func_exprs)
236
        )
237

238
        if len(valid_exprs) > 0:
1✔
239
            return True
1✔
240
        return False
1✔
241

242
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
243
        # there could be 2^n different combinations with enable and disable option
244
        # cache for n functionExpressions. Currently considering only the case where
245
        # cache is enabled for all eligible function expressions
246
        after_predicate = before.predicate.copy()
1✔
247
        enable_cache_on_expression_tree(after_predicate)
1✔
248
        after_operator = LogicalFilter(
1✔
249
            predicate=after_predicate, children=before.children
250
        )
251
        yield after_operator
1✔
252

253

254
class CacheFunctionExpressionInApply(Rule):
1✔
255
    def __init__(self):
1✔
256
        pattern = Pattern(OperatorType.LOGICAL_APPLY_AND_MERGE)
1✔
257
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
258
        super().__init__(RuleType.CACHE_FUNCTION_EXPRESISON_IN_APPLY, pattern)
1✔
259

260
    def promise(self):
1✔
261
        return Promise.CACHE_FUNCTION_EXPRESISON_IN_APPLY
1✔
262

263
    def check(self, before: LogicalApplyAndMerge, context: OptimizerContext):
1✔
264
        expr = before.func_expr
1✔
265
        # already cache enabled
266
        # replace the cacheable condition once we have the property supported as part of the UDF itself.
267
        if expr.has_cache() or expr.name not in CACHEABLE_UDFS:
1✔
268
            return False
1✔
269
        # we do not support caching function expression instances with multiple arguments or nested function expressions
270
        if len(expr.children) > 1 or not isinstance(
1✔
271
            expr.children[0], TupleValueExpression
272
        ):
273
            return False
×
274
        return True
1✔
275

276
    def apply(self, before: LogicalApplyAndMerge, context: OptimizerContext):
1✔
277
        # todo: this will create a ctaalog entry even in the case of explain command
278
        # We should run this code conditionally
279
        new_func_expr = enable_cache(before.func_expr)
1✔
280
        after = LogicalApplyAndMerge(
1✔
281
            func_expr=new_func_expr, alias=before.alias, do_unnest=before.do_unnest
282
        )
283
        after.append_child(before.children[0])
1✔
284
        yield after
1✔
285

286

287
# Join Queries
288
class PushDownFilterThroughJoin(Rule):
1✔
289
    def __init__(self):
1✔
290
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
291
        pattern_join = Pattern(OperatorType.LOGICALJOIN)
1✔
292
        pattern_join.append_child(Pattern(OperatorType.DUMMY))
1✔
293
        pattern_join.append_child(Pattern(OperatorType.DUMMY))
1✔
294
        pattern.append_child(pattern_join)
1✔
295
        super().__init__(RuleType.PUSHDOWN_FILTER_THROUGH_JOIN, pattern)
1✔
296

297
    def promise(self):
1✔
298
        return Promise.PUSHDOWN_FILTER_THROUGH_JOIN
1✔
299

300
    def check(self, before: Operator, context: OptimizerContext):
1✔
301
        return True
1✔
302

303
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
304
        predicate = before.predicate
1✔
305
        join: LogicalJoin = before.children[0]
1✔
306
        left: Dummy = join.children[0]
1✔
307
        right: Dummy = join.children[1]
1✔
308

309
        new_join_node = LogicalJoin(
1✔
310
            join.join_type,
311
            join.join_predicate,
312
            join.left_keys,
313
            join.right_keys,
314
        )
315
        left_group_aliases = context.memo.get_group_by_id(left.group_id).aliases
1✔
316
        right_group_aliases = context.memo.get_group_by_id(right.group_id).aliases
1✔
317

318
        left_pushdown_pred, rem_pred = extract_pushdown_predicate_for_alias(
1✔
319
            predicate, left_group_aliases
320
        )
321
        right_pushdown_pred, rem_pred = extract_pushdown_predicate_for_alias(
1✔
322
            rem_pred, right_group_aliases
323
        )
324

325
        if left_pushdown_pred:
1✔
326
            left_filter = LogicalFilter(predicate=left_pushdown_pred)
1✔
327
            left_filter.append_child(left)
1✔
328
            new_join_node.append_child(left_filter)
1✔
329
        else:
330
            new_join_node.append_child(left)
×
331

332
        if right_pushdown_pred:
1✔
333
            right_filter = LogicalFilter(predicate=right_pushdown_pred)
×
334
            right_filter.append_child(right)
×
335
            new_join_node.append_child(right_filter)
×
336
        else:
337
            new_join_node.append_child(right)
1✔
338

339
        if rem_pred:
1✔
340
            new_join_node._join_predicate = conjunction_list_to_expression_tree(
×
341
                [rem_pred, new_join_node.join_predicate]
342
            )
343

344
        yield new_join_node
1✔
345

346

347
class XformLateralJoinToLinearFlow(Rule):
1✔
348
    """If the inner node of a lateral join is a function-valued expression, we
349
    eliminate the join node and make the inner node the parent of the outer node. This
350
    produces a linear data flow path. Because this scenario is common in our system,
351
    we chose to explicitly convert it to a linear flow, which simplifies the
352
    implementation of other optimizations such as UDF reuse and parallelized plans by
353
    removing the join."""
354

355
    def __init__(self):
1✔
356
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
357
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
358
        pattern.append_child(Pattern(OperatorType.LOGICALFUNCTIONSCAN))
1✔
359
        super().__init__(RuleType.XFORM_LATERAL_JOIN_TO_LINEAR_FLOW, pattern)
1✔
360

361
    def promise(self):
1✔
362
        return Promise.XFORM_LATERAL_JOIN_TO_LINEAR_FLOW
1✔
363

364
    def check(self, before: LogicalJoin, context: OptimizerContext):
1✔
365
        if before.join_type == JoinType.LATERAL_JOIN:
1✔
366
            if before.join_predicate is None and not before.join_project:
1✔
367
                return True
1✔
368
        return False
1✔
369

370
    def apply(self, before: LogicalJoin, context: OptimizerContext):
1✔
371
        #     LogicalJoin(Lateral)              LogicalApplyAndMerge
372
        #     /           \                 ->       |
373
        #    A        LogicalFunctionScan            A
374

375
        A: Dummy = before.children[0]
1✔
376
        logical_func_scan: LogicalFunctionScan = before.children[1]
1✔
377
        logical_apply_merge = LogicalApplyAndMerge(
1✔
378
            logical_func_scan.func_expr,
379
            logical_func_scan.alias,
380
            logical_func_scan.do_unnest,
381
        )
382
        logical_apply_merge.append_child(A)
1✔
383
        yield logical_apply_merge
1✔
384

385

386
class PushDownFilterThroughApplyAndMerge(Rule):
1✔
387
    """If it is feasible to partially or fully push the predicate contained within the
388
    logical filter through the ApplyAndMerge operator, we should do so. This is often
389
    beneficial, for instance, in order to prevent decoding additional frames beyond
390
    those that satisfy the predicate.
391
    Eg:
392

393
    Filter(id < 10 and func.label = 'car')           Filter(func.label = 'car')
394
            |                                                   |
395
        ApplyAndMerge(func)                  ->          ApplyAndMerge(func)
396
            |                                                   |
397
            A                                            Filter(id < 10)
398
                                                                |
399
                                                                A
400

401
    """
402

403
    def __init__(self):
1✔
404
        appply_merge_pattern = Pattern(OperatorType.LOGICAL_APPLY_AND_MERGE)
1✔
405
        appply_merge_pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
406
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
407
        pattern.append_child(appply_merge_pattern)
1✔
408
        super().__init__(RuleType.PUSHDOWN_FILTER_THROUGH_APPLY_AND_MERGE, pattern)
1✔
409

410
    def promise(self):
1✔
411
        return Promise.PUSHDOWN_FILTER_THROUGH_APPLY_AND_MERGE
1✔
412

413
    def check(self, before: LogicalFilter, context: OptimizerContext):
1✔
414
        return True
1✔
415

416
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
417
        A: Dummy = before.children[0].children[0]
1✔
418
        apply_and_merge: LogicalApplyAndMerge = before.children[0]
1✔
419
        aliases = context.memo.get_group_by_id(A.group_id).aliases
1✔
420
        predicate = before.predicate
1✔
421
        pushdown_pred, rem_pred = extract_pushdown_predicate_for_alias(
1✔
422
            predicate, aliases
423
        )
424

425
        # we do not return a new plan if nothing can be pushed
426
        # this ensures we do not keep applying this optimization
427
        if pushdown_pred is None:
1✔
428
            return
1✔
429

430
        # if we find a feasible pushdown predicate, add a new filter node between
431
        # ApplyAndMerge and Dummy
432
        if pushdown_pred:
1✔
433
            pushdown_filter = LogicalFilter(predicate=pushdown_pred)
1✔
434
            pushdown_filter.append_child(A)
1✔
435
            apply_and_merge.children = [pushdown_filter]
1✔
436

437
        # If we have partial predicate make it the root
438
        root_node = apply_and_merge
1✔
439
        if rem_pred:
1✔
440
            root_node = LogicalFilter(predicate=rem_pred)
1✔
441
            root_node.append_child(apply_and_merge)
1✔
442

443
        yield root_node
1✔
444

445

446
class XformExtractObjectToLinearFlow(Rule):
1✔
447
    """If the inner node of a lateral join is a Extract_Object function-valued
448
    expression, we eliminate the join node and make the inner node the parent of the
449
    outer node. This produces a linear data flow path.
450
    TODO: We need to add a sorting operation after detector to ensure we always provide tracker data in order.
451
    """
452

453
    #                                          LogicalApplyAndMerge(tracker)
454
    #     LogicalJoin(Lateral)                         |
455
    #     /           \                 ->    LogicalApplyAndMerge(detector)
456
    #    A        LogicalExtractObject                 |
457
    #                                                  A
458

459
    def __init__(self):
1✔
460
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
461
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
462
        pattern.append_child(Pattern(OperatorType.LOGICAL_EXTRACT_OBJECT))
1✔
463
        super().__init__(RuleType.XFORM_EXTRACT_OBJECT_TO_LINEAR_FLOW, pattern)
1✔
464

465
    def promise(self):
1✔
466
        return Promise.XFORM_EXTRACT_OBJECT_TO_LINEAR_FLOW
1✔
467

468
    def check(self, before: LogicalJoin, context: OptimizerContext):
1✔
469
        if before.join_type == JoinType.LATERAL_JOIN:
1✔
470
            return True
1✔
471
        return False
×
472

473
    def apply(self, before: LogicalJoin, context: OptimizerContext):
1✔
474
        A: Dummy = before.children[0]
1✔
475
        logical_extract_obj: LogicalExtractObject = before.children[1]
1✔
476

477
        detector = LogicalApplyAndMerge(
1✔
478
            logical_extract_obj.detector, alias=logical_extract_obj.detector.alias
479
        )
480
        tracker = LogicalApplyAndMerge(
1✔
481
            logical_extract_obj.tracker,
482
            alias=logical_extract_obj.alias,
483
            do_unnest=logical_extract_obj.do_unnest,
484
        )
485
        detector.append_child(A)
1✔
486
        tracker.append_child(detector)
1✔
487
        yield tracker
1✔
488

489

490
class CombineSimilarityOrderByAndLimitToFaissIndexScan(Rule):
1✔
491
    """
492
    This rule currently rewrites Order By + Limit to a Faiss index scan.
493
    Because Faiss index only works for similarity search, the rule will
494
    only be applied when the Order By is on Similarity expression. For
495
    simplicity, we also only enable this rule when the Similarity expression
496
    applies to the full table. Predicated query will yield incorrect results
497
    if we use an index scan.
498

499
    Limit(10)
500
        |
501
    OrderBy(func)        ->        IndexScan(10)
502
        |                               |
503
        A                               A
504
    """
505

506
    def __init__(self):
1✔
507
        pattern = Pattern(OperatorType.LOGICALLIMIT)
1✔
508
        orderby_pattern = Pattern(OperatorType.LOGICALORDERBY)
1✔
509
        orderby_pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
510
        pattern.append_child(orderby_pattern)
1✔
511
        super().__init__(
1✔
512
            RuleType.COMBINE_SIMILARITY_ORDERBY_AND_LIMIT_TO_FAISS_INDEX_SCAN, pattern
513
        )
514

515
        # Entries populate after rule egibility validation.
516
        self._index_catalog_entry = None
1✔
517
        self._query_func_expr = None
1✔
518

519
    def promise(self):
1✔
520
        return Promise.COMBINE_SIMILARITY_ORDERBY_AND_LIMIT_TO_FAISS_INDEX_SCAN
1✔
521

522
    def check(self, before: LogicalLimit, context: OptimizerContext):
1✔
523
        return True
1✔
524

525
    def apply(self, before: LogicalLimit, context: OptimizerContext):
1✔
526
        catalog_manager = CatalogManager()
1✔
527

528
        # Get corresponding nodes.
529
        limit_node = before
1✔
530
        orderby_node = before.children[0]
1✔
531
        sub_tree_root = orderby_node.children[0]
1✔
532

533
        # Check if predicate exists on table.
534
        def _exists_predicate(opr):
1✔
535
            if isinstance(opr, LogicalGet):
1✔
536
                return opr.predicate is not None
1✔
537
            # LogicalFilter
538
            return True
1✔
539

540
        if _exists_predicate(sub_tree_root.opr):
1✔
541
            return
1✔
542

543
        # Check if orderby runs on similarity expression.
544
        # Current optimization will only accept Similarity expression.
545
        func_orderby_expr = None
1✔
546
        for column, sort_type in orderby_node.orderby_list:
1✔
547
            if (
1✔
548
                isinstance(column, FunctionExpression)
549
                and sort_type == ParserOrderBySortType.ASC
550
            ):
551
                func_orderby_expr = column
1✔
552
        if not func_orderby_expr or func_orderby_expr.name != "Similarity":
1✔
553
            return
1✔
554

555
        # Check if there exists an index on table and column.
556
        query_func_expr, base_func_expr = func_orderby_expr.children
1✔
557

558
        # Get table and column of orderby.
559
        tv_expr = base_func_expr
1✔
560
        while not isinstance(tv_expr, TupleValueExpression):
1✔
561
            tv_expr = tv_expr.children[0]
1✔
562

563
        # Get column catalog entry and udf_signature.
564
        column_catalog_entry = tv_expr.col_object
1✔
565
        udf_signature = (
1✔
566
            None
567
            if isinstance(base_func_expr, TupleValueExpression)
568
            else base_func_expr.signature()
569
        )
570

571
        # Get index catalog. Check if an index exists for matching
572
        # udf signature and table columns.
573
        index_catalog_entry = (
1✔
574
            catalog_manager.get_index_catalog_entry_by_column_and_udf_signature(
575
                column_catalog_entry, udf_signature
576
            )
577
        )
578
        if not index_catalog_entry:
1✔
579
            return
1✔
580

581
        # Construct the Faiss index scan plan.
582
        faiss_index_scan_node = LogicalFaissIndexScan(
1✔
583
            index_catalog_entry.name,
584
            limit_node.limit_count,
585
            query_func_expr,
586
        )
587
        for child in orderby_node.children:
1✔
588
            faiss_index_scan_node.append_child(child)
1✔
589
        yield faiss_index_scan_node
1✔
590

591

592
# REWRITE RULES END
593
##############################################
594

595
##############################################
596
# LOGICAL RULES START
597

598

599
class LogicalInnerJoinCommutativity(Rule):
1✔
600
    def __init__(self):
1✔
601
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
602
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
603
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
604
        super().__init__(RuleType.LOGICAL_INNER_JOIN_COMMUTATIVITY, pattern)
1✔
605

606
    def promise(self):
1✔
607
        return Promise.LOGICAL_INNER_JOIN_COMMUTATIVITY
1✔
608

609
    def check(self, before: LogicalJoin, context: OptimizerContext):
1✔
610
        # has to be an inner join
611
        return before.join_type == JoinType.INNER_JOIN
1✔
612

613
    def apply(self, before: LogicalJoin, context: OptimizerContext):
1✔
614
        #     LogicalJoin(Inner)            LogicalJoin(Inner)
615
        #     /           \        ->       /               \
616
        #    A             B               B                A
617

618
        new_join = LogicalJoin(before.join_type, before.join_predicate)
1✔
619
        new_join.append_child(before.rhs())
1✔
620
        new_join.append_child(before.lhs())
1✔
621
        yield new_join
1✔
622

623

624
class ReorderPredicates(Rule):
1✔
625
    """
626
    The current implementation orders conjuncts based on their individual cost.
627
    The optimization for OR clauses has `not` been implemented yet. Additionally, we do
628
    not optimize predicates that are not user-defined functions since we assume that
629
    they will likely be pushed to the underlying relational database, which will handle
630
    the optimization process.
631
    """
632

633
    def __init__(self):
1✔
634
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
635
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
636
        super().__init__(RuleType.REORDER_PREDICATES, pattern)
1✔
637

638
    def promise(self):
1✔
639
        return Promise.REORDER_PREDICATES
1✔
640

641
    def check(self, before: LogicalFilter, context: OptimizerContext):
1✔
642
        # there exists atleast one Function Expression
643
        return len(list(before.predicate.find_all(FunctionExpression))) > 0
1✔
644

645
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
646
        # Decompose the expression tree into a list of conjuncts
647
        conjuncts = to_conjunction_list(before.predicate)
1✔
648

649
        # Segregate the conjuncts into simple and function expressions
650
        contains_func_exprs = []
1✔
651
        simple_exprs = []
1✔
652
        for conjunct in conjuncts:
1✔
653
            if list(conjunct.find_all(FunctionExpression)):
1✔
654
                contains_func_exprs.append(conjunct)
1✔
655
            else:
656
                simple_exprs.append(conjunct)
×
657

658
        # Compute the cost of every function expression and sort them in
659
        # ascending order of cost
660
        function_expr_cost_tuples = [
1✔
661
            (expr, get_expression_execution_cost(expr)) for expr in contains_func_exprs
662
        ]
663
        function_expr_cost_tuples = sorted(
1✔
664
            function_expr_cost_tuples, key=lambda x: x[1]
665
        )
666

667
        # Build the final ordered list of conjuncts
668
        ordered_conjuncts = simple_exprs + [
1✔
669
            expr for (expr, _) in function_expr_cost_tuples
670
        ]
671

672
        # we do not return a new plan if nothing has changed
673
        # this ensures we do not keep applying this optimization
674
        if ordered_conjuncts != conjuncts:
1✔
675
            # Build expression tree based on the ordered conjuncts
676
            reordered_predicate = conjunction_list_to_expression_tree(ordered_conjuncts)
1✔
677
            reordered_filter_node = LogicalFilter(predicate=reordered_predicate)
1✔
678
            reordered_filter_node.append_child(before.children[0])
1✔
679
            yield reordered_filter_node
1✔
680

681

682
# LOGICAL RULES END
683
##############################################
684

685

686
##############################################
687
# IMPLEMENTATION RULES START
688

689

690
class LogicalCreateToPhysical(Rule):
1✔
691
    def __init__(self):
1✔
692
        pattern = Pattern(OperatorType.LOGICALCREATE)
1✔
693
        super().__init__(RuleType.LOGICAL_CREATE_TO_PHYSICAL, pattern)
1✔
694

695
    def promise(self):
1✔
696
        return Promise.LOGICAL_CREATE_TO_PHYSICAL
1✔
697

698
    def check(self, before: Operator, context: OptimizerContext):
1✔
699
        return True
1✔
700

701
    def apply(self, before: LogicalCreate, context: OptimizerContext):
1✔
702
        after = CreatePlan(before.video, before.column_list, before.if_not_exists)
1✔
703
        yield after
1✔
704

705

706
class LogicalRenameToPhysical(Rule):
1✔
707
    def __init__(self):
1✔
708
        pattern = Pattern(OperatorType.LOGICALRENAME)
1✔
709
        super().__init__(RuleType.LOGICAL_RENAME_TO_PHYSICAL, pattern)
1✔
710

711
    def promise(self):
1✔
712
        return Promise.LOGICAL_RENAME_TO_PHYSICAL
1✔
713

714
    def check(self, before: Operator, context: OptimizerContext):
1✔
715
        return True
1✔
716

717
    def apply(self, before: LogicalRename, context: OptimizerContext):
1✔
718
        after = RenamePlan(before.old_table_ref, before.new_name)
1✔
719
        yield after
1✔
720

721

722
class LogicalDropToPhysical(Rule):
1✔
723
    def __init__(self):
1✔
724
        pattern = Pattern(OperatorType.LOGICALDROP)
1✔
725
        super().__init__(RuleType.LOGICAL_DROP_TO_PHYSICAL, pattern)
1✔
726

727
    def promise(self):
1✔
728
        return Promise.LOGICAL_DROP_TO_PHYSICAL
1✔
729

730
    def check(self, before: Operator, context: OptimizerContext):
1✔
731
        return True
1✔
732

733
    def apply(self, before: LogicalDrop, context: OptimizerContext):
1✔
734
        after = DropPlan(before.table_infos, before.if_exists)
1✔
735
        yield after
1✔
736

737

738
class LogicalCreateUDFToPhysical(Rule):
1✔
739
    def __init__(self):
1✔
740
        pattern = Pattern(OperatorType.LOGICALCREATEUDF)
1✔
741
        super().__init__(RuleType.LOGICAL_CREATE_UDF_TO_PHYSICAL, pattern)
1✔
742

743
    def promise(self):
1✔
744
        return Promise.LOGICAL_CREATE_UDF_TO_PHYSICAL
1✔
745

746
    def check(self, before: Operator, context: OptimizerContext):
1✔
747
        return True
1✔
748

749
    def apply(self, before: LogicalCreateUDF, context: OptimizerContext):
1✔
750
        after = CreateUDFPlan(
1✔
751
            before.name,
752
            before.if_not_exists,
753
            before.inputs,
754
            before.outputs,
755
            before.impl_path,
756
            before.udf_type,
757
            before.metadata,
758
        )
759
        yield after
1✔
760

761

762
class LogicalCreateIndexToFaiss(Rule):
1✔
763
    def __init__(self):
1✔
764
        pattern = Pattern(OperatorType.LOGICALCREATEINDEX)
1✔
765
        super().__init__(RuleType.LOGICAL_CREATE_INDEX_TO_FAISS, pattern)
1✔
766

767
    def promise(self):
1✔
768
        return Promise.LOGICAL_CREATE_INDEX_TO_FAISS
1✔
769

770
    def check(self, before: Operator, context: OptimizerContext):
1✔
771
        return True
1✔
772

773
    def apply(self, before: LogicalCreateIndex, context: OptimizerContext):
1✔
774
        after = CreateIndexPlan(
1✔
775
            before.name,
776
            before.table_ref,
777
            before.col_list,
778
            before.index_type,
779
            before.udf_func,
780
        )
781
        yield after
1✔
782

783

784
class LogicalDropUDFToPhysical(Rule):
1✔
785
    def __init__(self):
1✔
786
        pattern = Pattern(OperatorType.LOGICALDROPUDF)
1✔
787
        super().__init__(RuleType.LOGICAL_DROP_UDF_TO_PHYSICAL, pattern)
1✔
788

789
    def promise(self):
1✔
790
        return Promise.LOGICAL_DROP_UDF_TO_PHYSICAL
1✔
791

792
    def check(self, before: Operator, context: OptimizerContext):
1✔
793
        return True
1✔
794

795
    def apply(self, before: LogicalDropUDF, context: OptimizerContext):
1✔
796
        after = DropUDFPlan(before.name, before.if_exists)
1✔
797
        yield after
1✔
798

799

800
class LogicalInsertToPhysical(Rule):
1✔
801
    def __init__(self):
1✔
802
        pattern = Pattern(OperatorType.LOGICALINSERT)
1✔
803
        super().__init__(RuleType.LOGICAL_INSERT_TO_PHYSICAL, pattern)
1✔
804

805
    def promise(self):
1✔
806
        return Promise.LOGICAL_INSERT_TO_PHYSICAL
1✔
807

808
    def check(self, before: Operator, context: OptimizerContext):
1✔
809
        return True
1✔
810

811
    def apply(self, before: LogicalInsert, context: OptimizerContext):
1✔
812
        after = InsertPlan(before.table, before.column_list, before.value_list)
1✔
813
        yield after
1✔
814

815

816
class LogicalDeleteToPhysical(Rule):
1✔
817
    def __init__(self):
1✔
818
        pattern = Pattern(OperatorType.LOGICALDELETE)
1✔
819
        super().__init__(RuleType.LOGICAL_DELETE_TO_PHYSICAL, pattern)
1✔
820

821
    def promise(self):
1✔
822
        return Promise.LOGICAL_DELETE_TO_PHYSICAL
1✔
823

824
    def check(self, before: Operator, context: OptimizerContext):
1✔
825
        return True
1✔
826

827
    def apply(self, before: LogicalDelete, context: OptimizerContext):
1✔
828
        after = DeletePlan(before.table_ref, before.where_clause)
1✔
829
        yield after
1✔
830

831

832
class LogicalLoadToPhysical(Rule):
1✔
833
    def __init__(self):
1✔
834
        pattern = Pattern(OperatorType.LOGICALLOADDATA)
1✔
835
        super().__init__(RuleType.LOGICAL_LOAD_TO_PHYSICAL, pattern)
1✔
836

837
    def promise(self):
1✔
838
        return Promise.LOGICAL_LOAD_TO_PHYSICAL
1✔
839

840
    def check(self, before: Operator, context: OptimizerContext):
1✔
841
        return True
1✔
842

843
    def apply(self, before: LogicalLoadData, context: OptimizerContext):
1✔
844
        after = LoadDataPlan(
1✔
845
            before.table_info,
846
            before.path,
847
            before.column_list,
848
            before.file_options,
849
        )
850
        yield after
1✔
851

852

853
class LogicalGetToSeqScan(Rule):
1✔
854
    def __init__(self):
1✔
855
        pattern = Pattern(OperatorType.LOGICALGET)
1✔
856
        super().__init__(RuleType.LOGICAL_GET_TO_SEQSCAN, pattern)
1✔
857

858
    def promise(self):
1✔
859
        return Promise.LOGICAL_GET_TO_SEQSCAN
1✔
860

861
    def check(self, before: Operator, context: OptimizerContext):
1✔
862
        return True
1✔
863

864
    def apply(self, before: LogicalGet, context: OptimizerContext):
1✔
865
        # Configure the batch_mem_size. It decides the number of rows
866
        # read in a batch from storage engine.
867
        # ToDO: Experiment heuristics.
868
        after = SeqScanPlan(None, before.target_list, before.alias)
1✔
869
        after.append_child(
1✔
870
            StoragePlan(
871
                before.table_obj,
872
                before.video,
873
                predicate=before.predicate,
874
                sampling_rate=before.sampling_rate,
875
                sampling_type=before.sampling_type,
876
            )
877
        )
878
        yield after
1✔
879

880

881
class LogicalDerivedGetToPhysical(Rule):
1✔
882
    def __init__(self):
1✔
883
        pattern = Pattern(OperatorType.LOGICALQUERYDERIVEDGET)
1✔
884
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
885
        super().__init__(RuleType.LOGICAL_DERIVED_GET_TO_PHYSICAL, pattern)
1✔
886

887
    def promise(self):
1✔
888
        return Promise.LOGICAL_DERIVED_GET_TO_PHYSICAL
1✔
889

890
    def check(self, before: Operator, context: OptimizerContext):
1✔
891
        return True
1✔
892

893
    def apply(self, before: LogicalQueryDerivedGet, context: OptimizerContext):
1✔
894
        after = SeqScanPlan(before.predicate, before.target_list, before.alias)
1✔
895
        after.append_child(before.children[0])
1✔
896
        yield after
1✔
897

898

899
class LogicalUnionToPhysical(Rule):
1✔
900
    def __init__(self):
1✔
901
        pattern = Pattern(OperatorType.LOGICALUNION)
1✔
902
        # add 2 dummy children
903
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
904
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
905
        super().__init__(RuleType.LOGICAL_UNION_TO_PHYSICAL, pattern)
1✔
906

907
    def promise(self):
1✔
908
        return Promise.LOGICAL_UNION_TO_PHYSICAL
1✔
909

910
    def check(self, before: Operator, context: OptimizerContext):
1✔
911
        return True
1✔
912

913
    def apply(self, before: LogicalUnion, context: OptimizerContext):
1✔
914
        after = UnionPlan(before.all)
1✔
915
        for child in before.children:
1✔
916
            after.append_child(child)
1✔
917
        yield after
1✔
918

919

920
class LogicalGroupByToPhysical(Rule):
1✔
921
    def __init__(self):
1✔
922
        pattern = Pattern(OperatorType.LOGICALGROUPBY)
1✔
923
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
924
        super().__init__(RuleType.LOGICAL_GROUPBY_TO_PHYSICAL, pattern)
1✔
925

926
    def promise(self):
1✔
927
        return Promise.LOGICAL_GROUPBY_TO_PHYSICAL
1✔
928

929
    def check(self, before: Operator, context: OptimizerContext):
1✔
930
        return True
1✔
931

932
    def apply(self, before: LogicalGroupBy, context: OptimizerContext):
1✔
933
        after = GroupByPlan(before.groupby_clause)
1✔
934
        for child in before.children:
1✔
935
            after.append_child(child)
1✔
936
        yield after
1✔
937

938

939
class LogicalOrderByToPhysical(Rule):
1✔
940
    def __init__(self):
1✔
941
        pattern = Pattern(OperatorType.LOGICALORDERBY)
1✔
942
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
943
        super().__init__(RuleType.LOGICAL_ORDERBY_TO_PHYSICAL, pattern)
1✔
944

945
    def promise(self):
1✔
946
        return Promise.LOGICAL_ORDERBY_TO_PHYSICAL
1✔
947

948
    def check(self, before: Operator, context: OptimizerContext):
1✔
949
        return True
1✔
950

951
    def apply(self, before: LogicalOrderBy, context: OptimizerContext):
1✔
952
        after = OrderByPlan(before.orderby_list)
1✔
953
        for child in before.children:
1✔
954
            after.append_child(child)
1✔
955
        yield after
1✔
956

957

958
class LogicalLimitToPhysical(Rule):
1✔
959
    def __init__(self):
1✔
960
        pattern = Pattern(OperatorType.LOGICALLIMIT)
1✔
961
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
962
        super().__init__(RuleType.LOGICAL_LIMIT_TO_PHYSICAL, pattern)
1✔
963

964
    def promise(self):
1✔
965
        return Promise.LOGICAL_LIMIT_TO_PHYSICAL
1✔
966

967
    def check(self, before: Operator, context: OptimizerContext):
1✔
968
        return True
1✔
969

970
    def apply(self, before: LogicalLimit, context: OptimizerContext):
1✔
971
        after = LimitPlan(before.limit_count)
1✔
972
        for child in before.children:
1✔
973
            after.append_child(child)
1✔
974
        yield after
1✔
975

976

977
class LogicalFunctionScanToPhysical(Rule):
1✔
978
    def __init__(self):
1✔
979
        pattern = Pattern(OperatorType.LOGICALFUNCTIONSCAN)
1✔
980
        super().__init__(RuleType.LOGICAL_FUNCTION_SCAN_TO_PHYSICAL, pattern)
1✔
981

982
    def promise(self):
1✔
983
        return Promise.LOGICAL_FUNCTION_SCAN_TO_PHYSICAL
1✔
984

985
    def check(self, before: Operator, context: OptimizerContext):
1✔
986
        return True
1✔
987

988
    def apply(self, before: LogicalFunctionScan, context: OptimizerContext):
1✔
989
        after = FunctionScanPlan(before.func_expr, before.do_unnest)
1✔
990
        yield after
1✔
991

992

993
class LogicalLateralJoinToPhysical(Rule):
1✔
994
    def __init__(self):
1✔
995
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
996
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
997
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
998
        super().__init__(RuleType.LOGICAL_LATERAL_JOIN_TO_PHYSICAL, pattern)
1✔
999

1000
    def promise(self):
1✔
1001
        return Promise.LOGICAL_LATERAL_JOIN_TO_PHYSICAL
1✔
1002

1003
    def check(self, before: Operator, context: OptimizerContext):
1✔
1004
        return before.join_type == JoinType.LATERAL_JOIN
1✔
1005

1006
    def apply(self, join_node: LogicalJoin, context: OptimizerContext):
1✔
1007
        lateral_join_plan = LateralJoinPlan(join_node.join_predicate)
1✔
1008
        lateral_join_plan.join_project = join_node.join_project
1✔
1009
        lateral_join_plan.append_child(join_node.lhs())
1✔
1010
        lateral_join_plan.append_child(join_node.rhs())
1✔
1011
        yield lateral_join_plan
1✔
1012

1013

1014
class LogicalJoinToPhysicalHashJoin(Rule):
1✔
1015
    def __init__(self):
1✔
1016
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
1017
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1018
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1019
        super().__init__(RuleType.LOGICAL_JOIN_TO_PHYSICAL_HASH_JOIN, pattern)
1✔
1020

1021
    def promise(self):
1✔
1022
        return Promise.LOGICAL_JOIN_TO_PHYSICAL_HASH_JOIN
1✔
1023

1024
    def check(self, before: Operator, context: OptimizerContext):
1✔
1025
        """
1026
        We don't want to apply this rule to the join when FuzzDistance
1027
        is being used, which implies that the join is a FuzzyJoin
1028
        """
1029
        if before.join_predicate is None:
1✔
1030
            return False
1✔
1031
        j_child: FunctionExpression = before.join_predicate.children[0]
1✔
1032

1033
        if isinstance(j_child, FunctionExpression):
1✔
1034
            if j_child.name.startswith("FuzzDistance"):
1✔
1035
                return before.join_type == JoinType.INNER_JOIN and (
1✔
1036
                    not (j_child) or not (j_child.name.startswith("FuzzDistance"))
1037
                )
1038
        else:
1039
            return before.join_type == JoinType.INNER_JOIN
1✔
1040

1041
    def apply(self, join_node: LogicalJoin, context: OptimizerContext):
1✔
1042
        #          HashJoinPlan                       HashJoinProbePlan
1043
        #          /           \     ->                  /               \
1044
        #         A             B        HashJoinBuildPlan               B
1045
        #                                              /
1046
        #                                            A
1047

1048
        a: Dummy = join_node.lhs()
1✔
1049
        b: Dummy = join_node.rhs()
1✔
1050
        a_table_aliases = context.memo.get_group_by_id(a.group_id).aliases
1✔
1051
        b_table_aliases = context.memo.get_group_by_id(b.group_id).aliases
1✔
1052
        join_predicates = join_node.join_predicate
1✔
1053
        a_join_keys, b_join_keys = extract_equi_join_keys(
1✔
1054
            join_predicates, a_table_aliases, b_table_aliases
1055
        )
1056

1057
        build_plan = HashJoinBuildPlan(join_node.join_type, a_join_keys)
1✔
1058
        build_plan.append_child(a)
1✔
1059
        probe_side = HashJoinProbePlan(
1✔
1060
            join_node.join_type,
1061
            b_join_keys,
1062
            join_predicates,
1063
            join_node.join_project,
1064
        )
1065
        probe_side.append_child(build_plan)
1✔
1066
        probe_side.append_child(b)
1✔
1067
        yield probe_side
1✔
1068

1069

1070
class LogicalJoinToPhysicalNestedLoopJoin(Rule):
1✔
1071
    def __init__(self):
1✔
1072
        pattern = Pattern(OperatorType.LOGICALJOIN)
1✔
1073
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1074
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1075
        super().__init__(RuleType.LOGICAL_JOIN_TO_PHYSICAL_NESTED_LOOP_JOIN, pattern)
1✔
1076

1077
    def promise(self):
1✔
1078
        return Promise.LOGICAL_JOIN_TO_PHYSICAL_NESTED_LOOP_JOIN
1✔
1079

1080
    def check(self, before: LogicalJoin, context: OptimizerContext):
1✔
1081
        """
1082
        We want to apply this rule to the join when FuzzDistance
1083
        is being used, which implies that the join is a FuzzyJoin
1084
        """
1085
        if before.join_predicate is None:
1✔
1086
            return False
1✔
1087
        j_child: FunctionExpression = before.join_predicate.children[0]
1✔
1088
        if not isinstance(j_child, FunctionExpression):
1✔
1089
            return False
1✔
1090
        return before.join_type == JoinType.INNER_JOIN and j_child.name.startswith(
1✔
1091
            "FuzzDistance"
1092
        )
1093

1094
    def apply(self, join_node: LogicalJoin, context: OptimizerContext):
1✔
1095
        nested_loop_join_plan = NestedLoopJoinPlan(
1✔
1096
            join_node.join_type, join_node.join_predicate
1097
        )
1098
        nested_loop_join_plan.append_child(join_node.lhs())
1✔
1099
        nested_loop_join_plan.append_child(join_node.rhs())
1✔
1100
        yield nested_loop_join_plan
1✔
1101

1102

1103
class LogicalCreateMaterializedViewToPhysical(Rule):
1✔
1104
    def __init__(self):
1✔
1105
        pattern = Pattern(OperatorType.LOGICAL_CREATE_MATERIALIZED_VIEW)
1✔
1106
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1107
        super().__init__(RuleType.LOGICAL_MATERIALIZED_VIEW_TO_PHYSICAL, pattern)
1✔
1108

1109
    def promise(self):
1✔
1110
        return Promise.LOGICAL_MATERIALIZED_VIEW_TO_PHYSICAL
1✔
1111

1112
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1113
        return True
1✔
1114

1115
    def apply(self, before: LogicalCreateMaterializedView, context: OptimizerContext):
1✔
1116
        after = CreateMaterializedViewPlan(
1✔
1117
            before.view,
1118
            columns=before.col_list,
1119
            if_not_exists=before.if_not_exists,
1120
        )
1121
        for child in before.children:
1✔
1122
            after.append_child(child)
1✔
1123
        yield after
1✔
1124

1125

1126
class LogicalFilterToPhysical(Rule):
1✔
1127
    def __init__(self):
1✔
1128
        pattern = Pattern(OperatorType.LOGICALFILTER)
1✔
1129
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1130
        super().__init__(RuleType.LOGICAL_FILTER_TO_PHYSICAL, pattern)
1✔
1131

1132
    def promise(self):
1✔
1133
        return Promise.LOGICAL_FILTER_TO_PHYSICAL
1✔
1134

1135
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1136
        return True
1✔
1137

1138
    def apply(self, before: LogicalFilter, context: OptimizerContext):
1✔
1139
        after = PredicatePlan(before.predicate)
1✔
1140
        for child in before.children:
1✔
1141
            after.append_child(child)
1✔
1142
        yield after
1✔
1143

1144

1145
class LogicalProjectToPhysical(Rule):
1✔
1146
    def __init__(self):
1✔
1147
        pattern = Pattern(OperatorType.LOGICALPROJECT)
1✔
1148
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1149
        super().__init__(RuleType.LOGICAL_PROJECT_TO_PHYSICAL, pattern)
1✔
1150

1151
    def promise(self):
1✔
1152
        return Promise.LOGICAL_PROJECT_TO_PHYSICAL
1✔
1153

1154
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1155
        return True
1✔
1156

1157
    def apply(self, before: LogicalProject, context: OptimizerContext):
1✔
1158
        after = ProjectPlan(before.target_list)
1✔
1159
        for child in before.children:
1✔
1160
            after.append_child(child)
1✔
1161
        yield after
1✔
1162

1163

1164
class LogicalShowToPhysical(Rule):
1✔
1165
    def __init__(self):
1✔
1166
        pattern = Pattern(OperatorType.LOGICAL_SHOW)
1✔
1167
        super().__init__(RuleType.LOGICAL_SHOW_TO_PHYSICAL, pattern)
1✔
1168

1169
    def promise(self):
1✔
1170
        return Promise.LOGICAL_SHOW_TO_PHYSICAL
1✔
1171

1172
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1173
        return True
1✔
1174

1175
    def apply(self, before: LogicalShow, context: OptimizerContext):
1✔
1176
        after = ShowInfoPlan(before.show_type)
1✔
1177
        yield after
1✔
1178

1179

1180
class LogicalExplainToPhysical(Rule):
1✔
1181
    def __init__(self):
1✔
1182
        pattern = Pattern(OperatorType.LOGICALEXPLAIN)
1✔
1183
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1184
        super().__init__(RuleType.LOGICAL_EXPLAIN_TO_PHYSICAL, pattern)
1✔
1185

1186
    def promise(self):
1✔
1187
        return Promise.LOGICAL_EXPLAIN_TO_PHYSICAL
1✔
1188

1189
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1190
        return True
1✔
1191

1192
    def apply(self, before: LogicalExplain, context: OptimizerContext):
1✔
1193
        after = ExplainPlan(before.explainable_opr)
1✔
1194
        for child in before.children:
1✔
1195
            after.append_child(child)
1✔
1196
        yield after
1✔
1197

1198

1199
class LogicalApplyAndMergeToPhysical(Rule):
1✔
1200
    def __init__(self):
1✔
1201
        pattern = Pattern(OperatorType.LOGICAL_APPLY_AND_MERGE)
1✔
1202
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1203
        super().__init__(RuleType.LOGICAL_APPLY_AND_MERGE_TO_PHYSICAL, pattern)
1✔
1204

1205
    def promise(self):
1✔
1206
        return Promise.LOGICAL_APPLY_AND_MERGE_TO_PHYSICAL
1✔
1207

1208
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1209
        return True
1✔
1210

1211
    def apply(self, before: LogicalApplyAndMerge, context: OptimizerContext):
1✔
1212
        after = ApplyAndMergePlan(before.func_expr, before.alias, before.do_unnest)
1✔
1213
        for child in before.children:
1✔
1214
            after.append_child(child)
1✔
1215
        yield after
1✔
1216

1217

1218
class LogicalFaissIndexScanToPhysical(Rule):
1✔
1219
    def __init__(self):
1✔
1220
        pattern = Pattern(OperatorType.LOGICALFAISSINDEXSCAN)
1✔
1221
        pattern.append_child(Pattern(OperatorType.DUMMY))
1✔
1222
        super().__init__(RuleType.LOGICAL_FAISS_INDEX_SCAN_TO_PHYSICAL, pattern)
1✔
1223

1224
    def promise(self):
1✔
1225
        return Promise.LOGICAL_FAISS_INDEX_SCAN_TO_PHYSICAL
1✔
1226

1227
    def check(self, grp_id: int, context: OptimizerContext):
1✔
1228
        return True
1✔
1229

1230
    def apply(self, before: LogicalFaissIndexScan, context: OptimizerContext):
1✔
1231
        after = FaissIndexScanPlan(
1✔
1232
            before.index_name, before.limit_count, before.search_query_expr
1233
        )
1234
        for child in before.children:
1✔
1235
            after.append_child(child)
1✔
1236
        yield after
1✔
1237

1238

1239
# IMPLEMENTATION RULES END
1240
##############################################
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc