• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 88056f97-3ba0-4458-8b7e-20f8bb782cbe

pending completion
88056f97-3ba0-4458-8b7e-20f8bb782cbe

Pull #787

circle-ci

xzdandy
Check Notebook under ray
Pull Request #787: Enable ray be default

9700 of 10045 relevant lines covered (96.57%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.64
/eva/executor/plan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
1✔
16

17
from eva.executor.abstract_executor import AbstractExecutor
1✔
18
from eva.executor.apply_and_merge_executor import ApplyAndMergeExecutor
1✔
19
from eva.executor.create_executor import CreateExecutor
1✔
20
from eva.executor.create_index_executor import CreateIndexExecutor
1✔
21
from eva.executor.create_mat_view_executor import CreateMaterializedViewExecutor
1✔
22
from eva.executor.create_udf_executor import CreateUDFExecutor
1✔
23
from eva.executor.delete_executor import DeleteExecutor
1✔
24
from eva.executor.drop_executor import DropExecutor
1✔
25
from eva.executor.drop_udf_executor import DropUDFExecutor
1✔
26
from eva.executor.executor_utils import ExecutorError
1✔
27
from eva.executor.explain_executor import ExplainExecutor
1✔
28
from eva.executor.function_scan_executor import FunctionScanExecutor
1✔
29
from eva.executor.groupby_executor import GroupByExecutor
1✔
30
from eva.executor.hash_join_executor import HashJoinExecutor
1✔
31
from eva.executor.insert_executor import InsertExecutor
1✔
32
from eva.executor.join_build_executor import BuildJoinExecutor
1✔
33
from eva.executor.lateral_join_executor import LateralJoinExecutor
1✔
34
from eva.executor.limit_executor import LimitExecutor
1✔
35
from eva.executor.load_executor import LoadDataExecutor
1✔
36
from eva.executor.nested_loop_join_executor import NestedLoopJoinExecutor
1✔
37
from eva.executor.orderby_executor import OrderByExecutor
1✔
38
from eva.executor.pp_executor import PPExecutor
1✔
39
from eva.executor.predicate_executor import PredicateExecutor
1✔
40
from eva.executor.project_executor import ProjectExecutor
1✔
41
from eva.executor.rename_executor import RenameExecutor
1✔
42
from eva.executor.sample_executor import SampleExecutor
1✔
43
from eva.executor.seq_scan_executor import SequentialScanExecutor
1✔
44
from eva.executor.show_info_executor import ShowInfoExecutor
1✔
45
from eva.executor.storage_executor import StorageExecutor
1✔
46
from eva.executor.union_executor import UnionExecutor
1✔
47
from eva.executor.vector_index_scan_executor import VectorIndexScanExecutor
1✔
48
from eva.experimental.parallel.executor.exchange_executor import ExchangeExecutor
1✔
49
from eva.models.storage.batch import Batch
1✔
50
from eva.plan_nodes.abstract_plan import AbstractPlan
1✔
51
from eva.plan_nodes.types import PlanOprType
1✔
52
from eva.utils.logging_manager import logger
1✔
53

54

55
class PlanExecutor:
1✔
56
    """
57
    This is an interface between plan tree and execution tree.
58
    We traverse the plan tree and build execution tree from it
59

60
    Arguments:
61
        plan (AbstractPlan): Physical plan tree which needs to be executed
62

63
    """
64

65
    def __init__(self, plan: AbstractPlan):
1✔
66
        self._plan = plan
1✔
67

68
    def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
1✔
69
        """build the execution tree from plan tree
70

71
        Arguments:
72
            plan {AbstractPlan} -- Input Plan tree
73

74
        Returns:
75
            AbstractExecutor -- Compiled Execution tree
76
        """
77
        root = None
1✔
78
        if plan is None:
1✔
79
            return root
×
80

81
        # Get plan node type
82
        plan_opr_type = plan.opr_type
1✔
83

84
        if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN:
1✔
85
            executor_node = SequentialScanExecutor(node=plan)
1✔
86
        elif plan_opr_type == PlanOprType.UNION:
1✔
87
            executor_node = UnionExecutor(node=plan)
1✔
88
        elif plan_opr_type == PlanOprType.STORAGE_PLAN:
1✔
89
            executor_node = StorageExecutor(node=plan)
1✔
90
        elif plan_opr_type == PlanOprType.PP_FILTER:
1✔
91
            executor_node = PPExecutor(node=plan)
1✔
92
        elif plan_opr_type == PlanOprType.CREATE:
1✔
93
            executor_node = CreateExecutor(node=plan)
1✔
94
        elif plan_opr_type == PlanOprType.RENAME:
1✔
95
            executor_node = RenameExecutor(node=plan)
1✔
96
        elif plan_opr_type == PlanOprType.DROP:
1✔
97
            executor_node = DropExecutor(node=plan)
1✔
98
        elif plan_opr_type == PlanOprType.INSERT:
1✔
99
            executor_node = InsertExecutor(node=plan)
1✔
100
        elif plan_opr_type == PlanOprType.CREATE_UDF:
1✔
101
            executor_node = CreateUDFExecutor(node=plan)
1✔
102
        elif plan_opr_type == PlanOprType.DROP_UDF:
1✔
103
            executor_node = DropUDFExecutor(node=plan)
1✔
104
        elif plan_opr_type == PlanOprType.LOAD_DATA:
1✔
105
            executor_node = LoadDataExecutor(node=plan)
1✔
106
        elif plan_opr_type == PlanOprType.GROUP_BY:
1✔
107
            executor_node = GroupByExecutor(node=plan)
1✔
108
        elif plan_opr_type == PlanOprType.ORDER_BY:
1✔
109
            executor_node = OrderByExecutor(node=plan)
1✔
110
        elif plan_opr_type == PlanOprType.LIMIT:
1✔
111
            executor_node = LimitExecutor(node=plan)
1✔
112
        elif plan_opr_type == PlanOprType.SAMPLE:
1✔
113
            executor_node = SampleExecutor(node=plan)
×
114
        elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
1✔
115
            executor_node = NestedLoopJoinExecutor(node=plan)
1✔
116
        elif plan_opr_type == PlanOprType.LATERAL_JOIN:
1✔
117
            logger.warn(
118
                "LateralJoin Executor should not be part of the execution plan."
119
                "Please raise an issue with the current query. Thanks!"
120
            )
121
            executor_node = LateralJoinExecutor(node=plan)
×
122
        elif plan_opr_type == PlanOprType.HASH_JOIN:
1✔
123
            executor_node = HashJoinExecutor(node=plan)
1✔
124
        elif plan_opr_type == PlanOprType.HASH_BUILD:
1✔
125
            executor_node = BuildJoinExecutor(node=plan)
1✔
126
        elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
1✔
127
            executor_node = FunctionScanExecutor(node=plan)
×
128
        elif plan_opr_type == PlanOprType.CREATE_MATERIALIZED_VIEW:
1✔
129
            executor_node = CreateMaterializedViewExecutor(node=plan)
1✔
130
        elif plan_opr_type == PlanOprType.EXCHANGE:
1✔
131
            executor_node = ExchangeExecutor(node=plan)
1✔
132
            inner_executor = self._build_execution_tree(plan.inner_plan)
1✔
133
            executor_node.build_inner_executor(inner_executor)
1✔
134
        elif plan_opr_type == PlanOprType.PROJECT:
1✔
135
            executor_node = ProjectExecutor(node=plan)
1✔
136
        elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
1✔
137
            executor_node = PredicateExecutor(node=plan)
1✔
138
        elif plan_opr_type == PlanOprType.SHOW_INFO:
1✔
139
            executor_node = ShowInfoExecutor(node=plan)
1✔
140
        elif plan_opr_type == PlanOprType.EXPLAIN:
1✔
141
            executor_node = ExplainExecutor(node=plan)
1✔
142
        elif plan_opr_type == PlanOprType.CREATE_INDEX:
1✔
143
            executor_node = CreateIndexExecutor(node=plan)
1✔
144
        elif plan_opr_type == PlanOprType.APPLY_AND_MERGE:
1✔
145
            executor_node = ApplyAndMergeExecutor(node=plan)
1✔
146
        elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN:
1✔
147
            executor_node = VectorIndexScanExecutor(node=plan)
1✔
148
        elif plan_opr_type == PlanOprType.DELETE:
1✔
149
            executor_node = DeleteExecutor(node=plan)
1✔
150

151
        # EXPLAIN does not need to build execution tree for its children
152
        if plan_opr_type != PlanOprType.EXPLAIN:
1✔
153
            # Build Executor Tree for children
154
            for children in plan.children:
1✔
155
                executor_node.append_child(self._build_execution_tree(children))
1✔
156

157
        return executor_node
1✔
158

159
    def _clean_execution_tree(self, tree_root: AbstractExecutor):
1✔
160
        """clean the execution tree from memory
161

162
        Arguments:
163
            tree_root {AbstractExecutor} -- root of execution tree to delete
164
        """
165
        # Todo
166
        # clear all the nodes from the execution tree
167

168
    def execute_plan(self) -> Iterator[Batch]:
1✔
169
        """execute the plan tree"""
170
        try:
1✔
171
            execution_tree = self._build_execution_tree(self._plan)
1✔
172
            output = execution_tree.exec()
1✔
173
            if output is not None:
1✔
174
                yield from output
1✔
175
            self._clean_execution_tree(execution_tree)
1✔
176
        except Exception as e:
177
            logger.exception(str(e))
178
            raise ExecutorError(e)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc