• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / e6161546-9e33-42e7-a2b6-f8fbe6aa8255

08 Sep 2023 02:22AM UTC coverage: 80.449% (-12.5%) from 92.929%
e6161546-9e33-42e7-a2b6-f8fbe6aa8255

push

circle-ci

jiashenC
fix lint

13 of 13 new or added lines in 8 files covered. (100.0%)

9398 of 11682 relevant lines covered (80.45%)

1.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/evadb/executor/orderby_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Iterator
2✔
16

17
from evadb.database import EvaDBDatabase
2✔
18
from evadb.executor.abstract_executor import AbstractExecutor
2✔
19
from evadb.executor.executor_utils import ExecutorError
2✔
20
from evadb.expression.function_expression import FunctionExpression
2✔
21
from evadb.expression.tuple_value_expression import TupleValueExpression
2✔
22
from evadb.models.storage.batch import Batch
2✔
23
from evadb.parser.types import ParserOrderBySortType
2✔
24
from evadb.plan_nodes.orderby_plan import OrderByPlan
2✔
25

26

27
class OrderByExecutor(AbstractExecutor):
2✔
28
    """
29
    Sort the frames which satisfy the condition
30

31
    Arguments:
32
        node (AbstractPlan): The OrderBy Plan
33

34
    """
35

36
    def __init__(self, db: EvaDBDatabase, node: OrderByPlan):
2✔
37
        super().__init__(db, node)
2✔
38
        self._orderby_list = node.orderby_list
2✔
39
        self._columns = node.columns
2✔
40
        self._sort_types = node.sort_types
2✔
41
        self.batch_sizes = []
2✔
42

43
    def _extract_column_name(self, col):
2✔
44
        col_name = []
2✔
45
        if isinstance(col, TupleValueExpression):
2✔
46
            col_name += [col.col_alias]
2✔
47
        elif isinstance(col, FunctionExpression):
×
48
            col_name += col.col_alias
×
49
        else:
50
            raise ExecutorError(
51
                "Expression type {} is not supported.".format(type(col))
52
            )
53
        return col_name
2✔
54

55
    def extract_column_names(self):
2✔
56
        """extracts the string name of the column"""
57
        # self._columns: List[TupleValueExpression]
58
        col_name_list = []
2✔
59
        for col in self._columns:
2✔
60
            col_name_list += self._extract_column_name(col)
2✔
61
        return col_name_list
2✔
62

63
    def extract_sort_types(self):
2✔
64
        """extracts the sort type for the column"""
65
        # self._sort_types: List[ParserOrderBySortType]
66
        sort_type_bools = []
2✔
67
        for st in self._sort_types:
2✔
68
            if st is ParserOrderBySortType.ASC:
2✔
69
                sort_type_bools.append(True)
2✔
70
            else:
71
                sort_type_bools.append(False)
2✔
72
        return sort_type_bools
2✔
73

74
    def exec(self, *args, **kwargs) -> Iterator[Batch]:
2✔
75
        child_executor = self.children[0]
2✔
76
        aggregated_batch_list = []
2✔
77

78
        # aggregates the batches into one large batch
79
        for batch in child_executor.exec(**kwargs):
2✔
80
            self.batch_sizes.append(len(batch))
2✔
81
            aggregated_batch_list.append(batch)
2✔
82
        aggregated_batch = Batch.concat(aggregated_batch_list, copy=False)
2✔
83

84
        # nothing to order by
85
        if not len(aggregated_batch):
2✔
86
            return
×
87

88
        # Column can be a functional expression, so if it
89
        # is not in columns, it needs to be re-evaluated.
90
        merge_batch_list = [aggregated_batch]
2✔
91
        for col in self._columns:
2✔
92
            col_name_list = self._extract_column_name(col)
2✔
93
            for col_name in col_name_list:
2✔
94
                if col_name not in aggregated_batch.columns:
2✔
95
                    batch = col.evaluate(aggregated_batch)
×
96
                    merge_batch_list.append(batch)
×
97
        if len(merge_batch_list) > 1:
2✔
98
            aggregated_batch = Batch.merge_column_wise(merge_batch_list)
×
99

100
        # sorts the batch
101
        try:
2✔
102
            aggregated_batch.sort_orderby(
2✔
103
                by=self.extract_column_names(),
104
                sort_type=self.extract_sort_types(),
105
            )
106
        except KeyError:
107
            # raise ExecutorError(str(e))
108
            pass
109

110
        # split the aggregated batch into smaller ones based
111
        #  on self.batch_sizes which holds the input batches sizes
112
        index = 0
2✔
113
        for i in self.batch_sizes:
2✔
114
            batch = aggregated_batch[index : index + i]
2✔
115
            batch.reset_index()
2✔
116
            index += i
2✔
117
            yield batch
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc