• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/evadb/optimizer/optimizer_tasks.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from __future__ import annotations
1✔
16

17
from abc import abstractmethod
1✔
18
from enum import IntEnum, auto
1✔
19
from typing import TYPE_CHECKING, List
1✔
20

21
from evadb.optimizer.binder import Binder
1✔
22
from evadb.optimizer.group import Group
1✔
23
from evadb.optimizer.group_expression import GroupExpression
1✔
24
from evadb.optimizer.property import PropertyType
1✔
25
from evadb.optimizer.rules.rules_base import Rule
1✔
26
from evadb.utils.logging_manager import logger
1✔
27

28
if TYPE_CHECKING:
29
    from evadb.optimizer.optimizer_context import OptimizerContext
30

31

32
class OptimizerTaskType(IntEnum):
1✔
33
    """Manages Enum for all the supported optimizer tasks"""
34

35
    TOP_DOWN_REWRITE = auto()
1✔
36
    BOTTOM_UP_REWRITE = auto()
1✔
37
    OPTIMIZE_EXPRESSION = auto()
1✔
38
    OPTIMIZE_GROUP = auto()
1✔
39
    OPTIMIZE_INPUTS = auto()
1✔
40
    APPLY_RULE = auto()
1✔
41
    EXPLORE_GROUP = auto()
1✔
42

43

44
class OptimizerTask:
1✔
45
    def __init__(
1✔
46
        self, optimizer_context: OptimizerContext, task_type: OptimizerTaskType
47
    ):
48
        self._task_type = task_type
1✔
49
        self._optimizer_context = optimizer_context
1✔
50

51
    @property
1✔
52
    def optimizer_context(self):
1✔
53
        return self._optimizer_context
1✔
54

55
    @abstractmethod
1✔
56
    def execute(self):
1✔
57
        raise NotImplementedError
58

59

60
class TopDownRewrite(OptimizerTask):
1✔
61
    def __init__(
1✔
62
        self,
63
        root_expr: GroupExpression,
64
        rule_set: List[Rule],
65
        optimizer_context: OptimizerContext,
66
    ):
67
        self.root_expr = root_expr
1✔
68
        self.rule_set = rule_set
1✔
69
        super().__init__(optimizer_context, OptimizerTaskType.TOP_DOWN_REWRITE)
1✔
70

71
    def execute(self):
1✔
72
        valid_rules = []
1✔
73
        for rule in self.rule_set:
1✔
74
            if not self.root_expr.is_rule_explored(rule.rule_type) and rule.top_match(
1✔
75
                self.root_expr.opr
76
            ):
77
                valid_rules.append(rule)
×
78

79
        # sort the rules by promise
80
        valid_rules = sorted(valid_rules, key=lambda x: x.promise())
1✔
81
        for rule in valid_rules:
1✔
82
            binder = Binder(self.root_expr, rule.pattern, self.optimizer_context.memo)
×
83
            for match in iter(binder):
×
84
                if not rule.check(match, self.optimizer_context):
×
85
                    continue
×
86
                after = rule.apply(match, self.optimizer_context)
×
87
                plans = list(after)
×
88
                assert (
×
89
                    len(plans) <= 1
90
                ), "Rewrite rule cannot generate more than open alternate plan."
91
                for plan in plans:
×
92
                    new_expr = self.optimizer_context.replace_expression(
×
93
                        plan, self.root_expr.group_id
94
                    )
95
                    self.optimizer_context.task_stack.push(
×
96
                        TopDownRewrite(new_expr, self.rule_set, self.optimizer_context)
97
                    )
98
                    # The root has changed so we cannot apply more rules to the same
99
                    # root, hence return
100
                    return
×
101

102
                self.root_expr.mark_rule_explored(rule.rule_type)
×
103
        for child in self.root_expr.children:
1✔
104
            child_expr = self.optimizer_context.memo.groups[child].logical_exprs[0]
1✔
105
            self.optimizer_context.task_stack.push(
1✔
106
                TopDownRewrite(child_expr, self.rule_set, self.optimizer_context)
107
            )
108

109

110
class BottomUpRewrite(OptimizerTask):
1✔
111
    def __init__(
1✔
112
        self,
113
        root_expr: GroupExpression,
114
        rule_set: List[Rule],
115
        optimizer_context: OptimizerContext,
116
        children_explored=False,
117
    ):
118
        super().__init__(optimizer_context, OptimizerTaskType.BOTTOM_UP_REWRITE)
1✔
119
        self._children_explored = children_explored
1✔
120
        self.root_expr = root_expr
1✔
121
        self.rule_set = rule_set
1✔
122

123
    def execute(self):
1✔
124
        if not self._children_explored:
1✔
125
            self.optimizer_context.task_stack.push(
1✔
126
                BottomUpRewrite(
127
                    self.root_expr, self.rule_set, self.optimizer_context, True
128
                )
129
            )
130
            for child in self.root_expr.children:
1✔
131
                child_expr = self.optimizer_context.memo.groups[child].logical_exprs[0]
1✔
132
                self.optimizer_context.task_stack.push(
1✔
133
                    BottomUpRewrite(child_expr, self.rule_set, self.optimizer_context)
134
                )
135
            return
1✔
136
        valid_rules = []
1✔
137
        for rule in self.rule_set:
1✔
138
            if not self.root_expr.is_rule_explored(rule.rule_type) and rule.top_match(
1✔
139
                self.root_expr.opr
140
            ):
141
                valid_rules.append(rule)
1✔
142

143
        # sort the rules by promise
144
        sorted(valid_rules, key=lambda x: x.promise())
1✔
145
        for rule in valid_rules:
1✔
146
            binder = Binder(self.root_expr, rule.pattern, self.optimizer_context.memo)
1✔
147
            for match in iter(binder):
1✔
148
                if not rule.check(match, self.optimizer_context):
1✔
149
                    continue
1✔
150
                logger.info(
1✔
151
                    "In BottomUp, Rule {} matched for {}".format(rule, self.root_expr)
152
                )
153
                after = rule.apply(match, self.optimizer_context)
1✔
154
                plans = list(after)
1✔
155
                assert (
1✔
156
                    len(plans) <= 1
157
                ), "Rewrite rule cannot generate more than open alternate plan."
158
                for plan in plans:
1✔
159
                    new_expr = self.optimizer_context.replace_expression(
1✔
160
                        plan, self.root_expr.group_id
161
                    )
162
                    logger.info("After rewriting {}".format(self.root_expr))
1✔
163
                    self.optimizer_context.task_stack.push(
1✔
164
                        BottomUpRewrite(new_expr, self.rule_set, self.optimizer_context)
165
                    )
166
                    # The root has changed so we cannot apply more rules to the same
167
                    # root, hence return
168
                    return
1✔
169
            self.root_expr.mark_rule_explored(rule.rule_type)
1✔
170

171

172
class OptimizeExpression(OptimizerTask):
1✔
173
    def __init__(
1✔
174
        self,
175
        root_expr: GroupExpression,
176
        optimizer_context: OptimizerContext,
177
        explore: bool,
178
    ):
179
        self.root_expr = root_expr
1✔
180
        self.explore = explore
1✔
181
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_EXPRESSION)
1✔
182

183
    def execute(self):
1✔
184
        rules_manager = self.optimizer_context.rules_manager
1✔
185
        rules = rules_manager.logical_rules
1✔
186
        # if exploring, we don't need to consider implementation rules
187
        if not self.explore:
1✔
188
            # rules.extend(rules_manager.implementation_rules)
189
            # Original code commented out here. It falsely modifies the internal logical
190
            # rule list.
191
            rules = rules_manager.logical_rules + rules_manager.implementation_rules
1✔
192

193
        valid_rules = []
1✔
194
        for rule in rules:
1✔
195
            if rule.top_match(self.root_expr.opr):
1✔
196
                valid_rules.append(rule)
1✔
197

198
        valid_rules = sorted(valid_rules, key=lambda x: x.promise())
1✔
199

200
        for rule in valid_rules:
1✔
201
            # apply the rule
202
            self.optimizer_context.task_stack.push(
1✔
203
                ApplyRule(rule, self.root_expr, self.optimizer_context, self.explore)
204
            )
205

206
            # explore the input group if necessary
207
            # for idx, child in enumerate(rule.pattern.children):
208
            #    if len(child.children):
209
            #        child_grp_id = self.root_expr.children[idx]
210
            #        group = self.optimizer_context.memo.get_group_by_id(child_grp_id)
211
            #        self.optimizer_context.task_stack.push(
212
            #            ExploreGroup(group, self.optimizer_context)
213
            #        )
214

215

216
class ApplyRule(OptimizerTask):
1✔
217
    """apply a transformation or implementation rule"""
218

219
    def __init__(
1✔
220
        self,
221
        rule: Rule,
222
        root_expr: GroupExpression,
223
        optimizer_context: OptimizerContext,
224
        explore: bool,
225
    ):
226
        self.rule = rule
1✔
227
        self.root_expr = root_expr
1✔
228
        self.explore = explore
1✔
229
        super().__init__(optimizer_context, OptimizerTaskType.APPLY_RULE)
1✔
230

231
    def execute(self):
1✔
232
        # return if already explored
233
        if self.root_expr.is_rule_explored(self.rule.rule_type):
1✔
234
            return
×
235
        binder = Binder(self.root_expr, self.rule.pattern, self.optimizer_context.memo)
1✔
236
        for match in iter(binder):
1✔
237
            if not self.rule.check(match, self.optimizer_context):
1✔
238
                continue
1✔
239
            after = self.rule.apply(match, self.optimizer_context)
1✔
240
            for plan in after:
1✔
241
                new_expr = self.optimizer_context.add_opr_to_group(
1✔
242
                    plan, self.root_expr.group_id
243
                )
244

245
                if new_expr.is_logical():
1✔
246
                    # optimize expressions
247
                    self.optimizer_context.task_stack.push(
×
248
                        OptimizeExpression(
249
                            new_expr, self.optimizer_context, self.explore
250
                        )
251
                    )
252
                else:
253
                    # cost the physical expressions
254
                    self.optimizer_context.task_stack.push(
1✔
255
                        OptimizeInputs(new_expr, self.optimizer_context)
256
                    )
257

258
        self.root_expr.mark_rule_explored(self.rule.rule_type)
1✔
259

260

261
class OptimizeGroup(OptimizerTask):
1✔
262
    def __init__(self, group: Group, optimizer_context: OptimizerContext):
1✔
263
        self.group = group
1✔
264
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_GROUP)
1✔
265

266
    def execute(self):
1✔
267
        # Todo: Get the property from the context
268
        if self.group.get_best_expr(PropertyType.DEFAULT):
1✔
269
            return
×
270

271
        # optimize all the logical exprs with the same context
272
        for expr in self.group.logical_exprs:
1✔
273
            self.optimizer_context.task_stack.push(
1✔
274
                OptimizeExpression(expr, self.optimizer_context, explore=False)
275
            )
276

277
        # cost all the physical exprs with the same context
278
        for expr in self.group.physical_exprs:
1✔
279
            self.optimizer_context.task_stack.push(
1✔
280
                OptimizeInputs(expr, self.optimizer_context)
281
            )
282

283

284
class OptimizeInputs(OptimizerTask):
1✔
285
    def __init__(self, root_expr: GroupExpression, optimizer_context: OptimizerContext):
1✔
286
        self.root_expr = root_expr
1✔
287
        super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_INPUTS)
1✔
288

289
    def execute(self):
1✔
290
        cost = 0
1✔
291
        memo = self.optimizer_context.memo
1✔
292
        grp = memo.get_group_by_id(self.root_expr.group_id)
1✔
293
        for child_id in self.root_expr.children:
1✔
294
            child_grp = memo.get_group_by_id(child_id)
1✔
295
            if child_grp.get_best_expr(PropertyType.DEFAULT):
1✔
296
                # Note: May never get hit when using EvaDB on Ray
297
                cost += child_grp.get_best_expr_cost(PropertyType.DEFAULT)
1✔
298
            else:
299
                self.optimizer_context.task_stack.push(
1✔
300
                    OptimizeInputs(self.root_expr, self.optimizer_context)
301
                )
302
                self.optimizer_context.task_stack.push(
1✔
303
                    OptimizeGroup(child_grp, self.optimizer_context)
304
                )
305
                return
1✔
306

307
        cost += self.optimizer_context.cost_model.calculate_cost(self.root_expr)
1✔
308
        grp.add_expr_cost(self.root_expr, PropertyType.DEFAULT, cost)
1✔
309

310

311
# class ExploreGroup(OptimizerTask):
312
#     """
313
#     Derive all logical group-expression for matching a pattern
314
#     """
315

316
#     def __init__(self, group: Group, optimizer_context: OptimizerContext):
317
#         self.group = group
318
#         super().__init__(optimizer_context, OptimizerTaskType.EXPLORE_GROUP)
319

320
#     def execute(self):
321
#         # return if the group is already explored
322
#         if self.group.is_explored():
323
#             return
324

325
#         # explore all the logical expression
326
#         for expr in self.group.logical_exprs:
327
#             self.optimizer_context.task_stack.push(
328
#                 OptimizeExpression(expr, self.optimizer_context, explore=True)
329
#             )
330

331
#         # mark the group explored
332
#         self.group.mark_explored()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc