• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #725

pending completion
#725

push

circle-ci

web-flow
chore: reducing coverage loss (#619)

* adding delete operation

* Adding Insert Statement

* checkpoint

* supporting multiple entries

* implemented for structured data error

* adding parser visitor for delete

* delete executor

* delete plan and rules

* adding delete to plan executor

* change position of LogicalDelete

* logical delimeter

* delete test case

* adding test case

* adding test case

* adding delete testcase

* adding predicate to delete executor

* adding delete to Image storage

* bug fix in delete

* fixing testcase

* adding test case for insert statement

* remove order_by from statement_binder.py

* better variable names, using Batch

* error message for insert

* removing order_by and limit from delete

* remove order_by and limit

* use f-string

* adding to changelog

* removing commit messages

* formatting

* fixing comments

* formatting

* eva insert f32 values

* fix: should delete range

* delete multiple rows

* udf bootstrap

* try to run tests in parallel

* minor fix for ray to work

* ray fixes

---------

Co-authored-by: Aryan-Rajoria <aryanrajoria1003@gmail.com>
Co-authored-by: Gaurav <gaurav21776@gmail.com>

236 of 236 new or added lines in 53 files covered. (100.0%)

7003 of 9005 relevant lines covered (77.77%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.26
/eva/expression/function_expression.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from typing import Callable, List
1✔
16

17
from eva.catalog.models.udf_catalog import UdfCatalogEntry
1✔
18
from eva.catalog.models.udf_io_catalog import UdfIOCatalogEntry
1✔
19
from eva.constants import NO_GPU
1✔
20
from eva.executor.execution_context import Context
1✔
21
from eva.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
22
from eva.models.storage.batch import Batch
1✔
23
from eva.parser.alias import Alias
1✔
24
from eva.udfs.gpu_compatible import GPUCompatible
1✔
25
from eva.utils.stats import UDFStats
1✔
26

27

28
class FunctionExpression(AbstractExpression):
1✔
29
    """
30
    Consider FunctionExpression: ObjDetector -> (labels, boxes)
31

32
    `output`: If the user wants only subset of ouputs. Eg,
33
    ObjDetector.lables the parser with set output to 'labels'
34

35
    `output_objs`: It is populated by the binder. In case the
36
    output is None, the binder sets output_objs to list of all
37
    output columns of the FunctionExpression. Eg, ['labels',
38
    'boxes']. Otherwise, only the output columns.
39

40
    FunctionExpression also needs to prepend its alias to all the
41
    projected columns. This is important as other parts of the query
42
    might be assessing the results using alias. Eg,
43
    `Select OD.labels FROM Video JOIN LATERAL ObjDetector AS OD;`
44
    """
45

46
    def __init__(
1✔
47
        self,
48
        func: Callable,
49
        name: str,
50
        output: str = None,
51
        alias: Alias = None,
52
        **kwargs,
53
    ):
54
        super().__init__(ExpressionType.FUNCTION_EXPRESSION, **kwargs)
1✔
55
        self._context = Context()
1✔
56
        self._name = name
1✔
57
        self._function = func
1✔
58
        self._function_instance = None
1✔
59
        self._output = output
1✔
60
        self.alias = alias
1✔
61
        self.udf_obj: UdfCatalogEntry = None
1✔
62
        self.output_objs: List[UdfIOCatalogEntry] = []
1✔
63
        self.projection_columns: List[str] = []
1✔
64
        self._stats = UDFStats()
1✔
65

66
    @property
1✔
67
    def name(self):
1✔
68
        return self._name
1✔
69

70
    @property
1✔
71
    def output(self):
1✔
72
        return self._output
1✔
73

74
    @property
1✔
75
    def col_alias(self):
1✔
76
        col_alias_list = []
×
77
        if self.alias is not None:
×
78
            for col in self.alias.col_names:
×
79
                col_alias_list.append("{}.{}".format(self.alias.alias_name, col))
×
80
        return col_alias_list
×
81

82
    @property
1✔
83
    def function(self):
1✔
84
        return self._function
1✔
85

86
    @function.setter
1✔
87
    def function(self, func: Callable):
1✔
88
        self._function = func
×
89

90
    def persist_stats(self):
1✔
91
        from eva.catalog.catalog_manager import CatalogManager
1✔
92

93
        if self.udf_obj is None:
1✔
94
            return
1✔
95
        udf_id = self.udf_obj.row_id
×
96
        cost_per_func_call = (
×
97
            self._stats.timer.total_elapsed_time / self._stats.num_calls
98
        )
99

100
        # persist stats to catalog only if it differ by greater than 10% from
101
        # the previous value
102
        if abs(self._stats.prev_cost - cost_per_func_call) > cost_per_func_call / 10:
×
103
            CatalogManager().upsert_udf_cost_catalog_entry(
×
104
                udf_id, self.udf_obj.name, cost_per_func_call
105
            )
106
            self._stats.prev_cost = cost_per_func_call
×
107

108
    def evaluate(self, batch: Batch, **kwargs) -> Batch:
1✔
109
        new_batch = batch
1✔
110
        child_batches = [child.evaluate(batch, **kwargs) for child in self.children]
1✔
111
        if len(child_batches):
1✔
112
            batch_sizes = [len(child_batch) for child_batch in child_batches]
×
113
            are_all_equal_length = all(batch_sizes[0] == x for x in batch_sizes)
×
114
            assert (
×
115
                are_all_equal_length is True
116
            ), "All columns in batch must have equal elements"
117
            new_batch = Batch.merge_column_wise(child_batches)
×
118

119
        func = self._gpu_enabled_function()
1✔
120
        outcomes = new_batch
1✔
121

122
        # record the time taken for the udf execution
123
        with self._stats.timer:
1✔
124
            # apply the function and project the required columns
125
            outcomes.apply_function_expression(func)
1✔
126
            outcomes = outcomes.project(self.projection_columns)
1✔
127
            outcomes.modify_column_alias(self.alias)
1✔
128

129
        # record the number of function calls
130
        self._stats.num_calls += len(batch)
1✔
131

132
        # persist the stats to catalog
133
        self.persist_stats()
1✔
134

135
        return outcomes
1✔
136

137
    def signature(self) -> str:
1✔
138
        """It constructs the signature of the function expression.
139
        It traverses the children (function arguments) and compute signature for each child. The output is in the form `udf_name(arg1, arg2, ...)`.
140

141
        Returns:
142
            str: signature string
143
        """
144
        child_sigs = []
×
145
        for child in self.children:
×
146
            child_sigs.append(child.signature())
×
147

148
        func_sig = f"{self.name}({','.join(child_sigs)})"
×
149
        return func_sig
×
150

151
    def _gpu_enabled_function(self):
1✔
152
        if self._function_instance is None:
1✔
153
            self._function_instance = self.function()
1✔
154
            if isinstance(self._function_instance, GPUCompatible):
1✔
155
                device = self._context.gpu_device()
1✔
156
                if device != NO_GPU:
1✔
157
                    self._function_instance = self._function_instance.to_device(device)
1✔
158
        return self._function_instance
1✔
159

160
    def __str__(self) -> str:
1✔
161
        expr_str = f"{self.name}()"
1✔
162
        return expr_str
1✔
163

164
    def __eq__(self, other):
1✔
165
        is_subtree_equal = super().__eq__(other)
1✔
166
        if not isinstance(other, FunctionExpression):
1✔
167
            return False
1✔
168
        return (
1✔
169
            is_subtree_equal
170
            and self.name == other.name
171
            and self.output == other.output
172
            and self.alias == other.alias
173
            and self.function == other.function
174
            and self.output_objs == other.output_objs
175
        )
176

177
    def __hash__(self) -> int:
1✔
178
        return hash(
1✔
179
            (
180
                super().__hash__(),
181
                self.name,
182
                self.output,
183
                self.alias,
184
                self.function,
185
                tuple(self.output_objs),
186
            )
187
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc