• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 1f0358a5-d952-4fc5-b05b-9ffc47a04733

26 Aug 2023 02:06PM UTC coverage: 93.649% (-0.3%) from 93.98%
1f0358a5-d952-4fc5-b05b-9ffc47a04733

push

circle-ci

web-flow
feat: improve circle ci (#949)

Test folder reorg
```
test
|__ unit_tests
|__ integration_tests
     |__ short
     |__ long
```

For open PR, we will run unit tests and short integration tests. 

Once it is merged to staging branch, we will run long integration tests
across different versions and with both ray disabled or enabled.

Once it passes, it will be merged to master, which will run full test
(unit tests + all integration tests) with different python version and
with both ray disabled and enabled.

The testing for open PR is now around 8 mins, but majority of it is
installing package. I plan to also add cache for that but it is not
really working now.

---------

Co-authored-by: xzdandy <xzdandy@gmail.com>

10470 of 11180 relevant lines covered (93.65%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.39
/evadb/executor/plan_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator, Union
1✔
16

17
from evadb.database import EvaDBDatabase
1✔
18
from evadb.executor.abstract_executor import AbstractExecutor
1✔
19
from evadb.executor.apply_and_merge_executor import ApplyAndMergeExecutor
1✔
20
from evadb.executor.create_database_executor import CreateDatabaseExecutor
1✔
21
from evadb.executor.create_executor import CreateExecutor
1✔
22
from evadb.executor.create_index_executor import CreateIndexExecutor
1✔
23
from evadb.executor.create_udf_executor import CreateUDFExecutor
1✔
24
from evadb.executor.delete_executor import DeleteExecutor
1✔
25
from evadb.executor.drop_object_executor import DropObjectExecutor
1✔
26
from evadb.executor.exchange_executor import ExchangeExecutor
1✔
27
from evadb.executor.executor_utils import ExecutorError
1✔
28
from evadb.executor.explain_executor import ExplainExecutor
1✔
29
from evadb.executor.function_scan_executor import FunctionScanExecutor
1✔
30
from evadb.executor.groupby_executor import GroupByExecutor
1✔
31
from evadb.executor.hash_join_executor import HashJoinExecutor
1✔
32
from evadb.executor.insert_executor import InsertExecutor
1✔
33
from evadb.executor.join_build_executor import BuildJoinExecutor
1✔
34
from evadb.executor.lateral_join_executor import LateralJoinExecutor
1✔
35
from evadb.executor.limit_executor import LimitExecutor
1✔
36
from evadb.executor.load_executor import LoadDataExecutor
1✔
37
from evadb.executor.nested_loop_join_executor import NestedLoopJoinExecutor
1✔
38
from evadb.executor.orderby_executor import OrderByExecutor
1✔
39
from evadb.executor.pp_executor import PPExecutor
1✔
40
from evadb.executor.predicate_executor import PredicateExecutor
1✔
41
from evadb.executor.project_executor import ProjectExecutor
1✔
42
from evadb.executor.rename_executor import RenameExecutor
1✔
43
from evadb.executor.sample_executor import SampleExecutor
1✔
44
from evadb.executor.seq_scan_executor import SequentialScanExecutor
1✔
45
from evadb.executor.show_info_executor import ShowInfoExecutor
1✔
46
from evadb.executor.storage_executor import StorageExecutor
1✔
47
from evadb.executor.union_executor import UnionExecutor
1✔
48
from evadb.executor.use_executor import UseExecutor
1✔
49
from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor
1✔
50
from evadb.models.storage.batch import Batch
1✔
51
from evadb.parser.create_statement import CreateDatabaseStatement
1✔
52
from evadb.parser.statement import AbstractStatement
1✔
53
from evadb.parser.use_statement import UseStatement
1✔
54
from evadb.plan_nodes.abstract_plan import AbstractPlan
1✔
55
from evadb.plan_nodes.types import PlanOprType
1✔
56
from evadb.utils.logging_manager import logger
1✔
57

58

59
class PlanExecutor:
1✔
60
    """
61
    This is an interface between plan tree and execution tree.
62
    We traverse the plan tree and build execution tree from it
63

64
    Arguments:
65
        plan (AbstractPlan): Physical plan tree which needs to be executed
66
        evadb (EvaDBDatabase): database to execute the query on
67
    """
68

69
    def __init__(self, evadb: EvaDBDatabase, plan: AbstractPlan):
1✔
70
        self._db = evadb
1✔
71
        self._plan = plan
1✔
72

73
    def _build_execution_tree(
1✔
74
        self, plan: Union[AbstractPlan, AbstractStatement]
75
    ) -> AbstractExecutor:
76
        """build the execution tree from plan tree
77

78
        Arguments:
79
            plan {AbstractPlan} -- Input Plan tree
80

81
        Returns:
82
            AbstractExecutor -- Compiled Execution tree
83
        """
84
        root = None
1✔
85
        if plan is None:
1✔
86
            return root
×
87

88
        # First handle cases when the plan is actually a parser statement
89
        if isinstance(plan, CreateDatabaseStatement):
1✔
90
            return CreateDatabaseExecutor(db=self._db, node=plan)
1✔
91
        elif isinstance(plan, UseStatement):
1✔
92
            return UseExecutor(db=self._db, node=plan)
×
93

94
        # Get plan node type
95
        plan_opr_type = plan.opr_type
1✔
96

97
        if plan_opr_type == PlanOprType.SEQUENTIAL_SCAN:
1✔
98
            executor_node = SequentialScanExecutor(db=self._db, node=plan)
1✔
99
        elif plan_opr_type == PlanOprType.UNION:
1✔
100
            executor_node = UnionExecutor(db=self._db, node=plan)
1✔
101
        elif plan_opr_type == PlanOprType.STORAGE_PLAN:
1✔
102
            executor_node = StorageExecutor(db=self._db, node=plan)
1✔
103
        elif plan_opr_type == PlanOprType.PP_FILTER:
1✔
104
            executor_node = PPExecutor(db=self._db, node=plan)
1✔
105
        elif plan_opr_type == PlanOprType.CREATE:
1✔
106
            executor_node = CreateExecutor(db=self._db, node=plan)
1✔
107
        elif plan_opr_type == PlanOprType.RENAME:
1✔
108
            executor_node = RenameExecutor(db=self._db, node=plan)
1✔
109
        elif plan_opr_type == PlanOprType.DROP_OBJECT:
1✔
110
            executor_node = DropObjectExecutor(db=self._db, node=plan)
1✔
111
        elif plan_opr_type == PlanOprType.INSERT:
1✔
112
            executor_node = InsertExecutor(db=self._db, node=plan)
1✔
113
        elif plan_opr_type == PlanOprType.CREATE_UDF:
1✔
114
            executor_node = CreateUDFExecutor(db=self._db, node=plan)
1✔
115
        elif plan_opr_type == PlanOprType.LOAD_DATA:
1✔
116
            executor_node = LoadDataExecutor(db=self._db, node=plan)
1✔
117
        elif plan_opr_type == PlanOprType.GROUP_BY:
1✔
118
            executor_node = GroupByExecutor(db=self._db, node=plan)
1✔
119
        elif plan_opr_type == PlanOprType.ORDER_BY:
1✔
120
            executor_node = OrderByExecutor(db=self._db, node=plan)
1✔
121
        elif plan_opr_type == PlanOprType.LIMIT:
1✔
122
            executor_node = LimitExecutor(db=self._db, node=plan)
1✔
123
        elif plan_opr_type == PlanOprType.SAMPLE:
1✔
124
            executor_node = SampleExecutor(db=self._db, node=plan)
×
125
        elif plan_opr_type == PlanOprType.NESTED_LOOP_JOIN:
1✔
126
            executor_node = NestedLoopJoinExecutor(db=self._db, node=plan)
1✔
127
        elif plan_opr_type == PlanOprType.LATERAL_JOIN:
1✔
128
            logger.warn(
129
                "LateralJoin Executor should not be part of the execution plan."
130
                "Please raise an issue with the current query. Thanks!"
131
            )
132
            executor_node = LateralJoinExecutor(db=self._db, node=plan)
×
133
        elif plan_opr_type == PlanOprType.HASH_JOIN:
1✔
134
            executor_node = HashJoinExecutor(db=self._db, node=plan)
1✔
135
        elif plan_opr_type == PlanOprType.HASH_BUILD:
1✔
136
            executor_node = BuildJoinExecutor(db=self._db, node=plan)
1✔
137
        elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
1✔
138
            executor_node = FunctionScanExecutor(db=self._db, node=plan)
×
139
        elif plan_opr_type == PlanOprType.EXCHANGE:
1✔
140
            executor_node = ExchangeExecutor(db=self._db, node=plan)
×
141
            inner_executor = self._build_execution_tree(plan.inner_plan)
×
142
            executor_node.build_inner_executor(inner_executor)
×
143
        elif plan_opr_type == PlanOprType.PROJECT:
1✔
144
            executor_node = ProjectExecutor(db=self._db, node=plan)
1✔
145
        elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
1✔
146
            executor_node = PredicateExecutor(db=self._db, node=plan)
1✔
147
        elif plan_opr_type == PlanOprType.SHOW_INFO:
1✔
148
            executor_node = ShowInfoExecutor(db=self._db, node=plan)
1✔
149
        elif plan_opr_type == PlanOprType.EXPLAIN:
1✔
150
            executor_node = ExplainExecutor(db=self._db, node=plan)
1✔
151
        elif plan_opr_type == PlanOprType.CREATE_INDEX:
1✔
152
            executor_node = CreateIndexExecutor(db=self._db, node=plan)
1✔
153
        elif plan_opr_type == PlanOprType.APPLY_AND_MERGE:
1✔
154
            executor_node = ApplyAndMergeExecutor(db=self._db, node=plan)
1✔
155
        elif plan_opr_type == PlanOprType.VECTOR_INDEX_SCAN:
1✔
156
            executor_node = VectorIndexScanExecutor(db=self._db, node=plan)
1✔
157
        elif plan_opr_type == PlanOprType.DELETE:
1✔
158
            executor_node = DeleteExecutor(db=self._db, node=plan)
1✔
159

160
        # EXPLAIN does not need to build execution tree for its children
161
        if plan_opr_type != PlanOprType.EXPLAIN:
1✔
162
            # Build Executor Tree for children
163
            for children in plan.children:
1✔
164
                executor_node.append_child(self._build_execution_tree(children))
1✔
165

166
        return executor_node
1✔
167

168
    def execute_plan(
169
        self,
170
        do_not_raise_exceptions: bool = False,
171
        do_not_print_exceptions: bool = False,
172
    ) -> Iterator[Batch]:
173
        """execute the plan tree"""
174
        try:
1✔
175
            execution_tree = self._build_execution_tree(self._plan)
1✔
176
            output = execution_tree.exec()
1✔
177
            if output is not None:
1✔
178
                yield from output
1✔
179
        except Exception as e:
180
            if do_not_raise_exceptions is False:
181
                if do_not_print_exceptions is False:
182
                    logger.exception(str(e))
183
                raise ExecutorError(e)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc