• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 20a9a0f9-edcc-437c-815d-bcc1a2d22b17

10 Nov 2023 04:50AM UTC coverage: 66.644% (-10.2%) from 76.812%
20a9a0f9-edcc-437c-815d-bcc1a2d22b17

push

circleci

americast
update docs

0 of 1 new or added line in 1 file covered. (0.0%)

1354 existing lines in 113 files now uncovered.

8767 of 13155 relevant lines covered (66.64%)

0.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.97
/evadb/expression/function_expression.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from dataclasses import dataclass
1✔
16
from typing import Callable, List, Tuple
1✔
17

18
import numpy as np
1✔
19
import pandas as pd
1✔
20

21
from evadb.catalog.models.function_catalog import FunctionCatalogEntry
1✔
22
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
23
from evadb.constants import NO_GPU
1✔
24
from evadb.executor.execution_context import Context
1✔
25
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
26
from evadb.functions.gpu_compatible import GPUCompatible
1✔
27
from evadb.models.storage.batch import Batch
1✔
28
from evadb.parser.alias import Alias
1✔
29
from evadb.utils.kv_cache import DiskKVCache
1✔
30
from evadb.utils.logging_manager import logger
1✔
31
from evadb.utils.stats import FunctionStats
1✔
32

33

34
class FunctionExpression(AbstractExpression):
1✔
35
    """
36
    Consider FunctionExpression: ObjDetector -> (labels, boxes)
37

38
    `output`: If the user wants only subset of outputs. Eg,
39
    ObjDetector.labels the parser with set output to 'labels'
40

41
    `output_objs`: It is populated by the binder. In case the
42
    output is None, the binder sets output_objs to list of all
43
    output columns of the FunctionExpression. Eg, ['labels',
44
    'boxes']. Otherwise, only the output columns.
45

46
    FunctionExpression also needs to prepend its alias to all the
47
    projected columns. This is important as other parts of the query
48
    might be assessing the results using alias. Eg,
49

50
    `Select Detector.labels
51
     FROM Video JOIN LATERAL ObjDetector AS Detector;`
52
    """
53

54
    def __init__(
1✔
55
        self,
56
        func: Callable,
57
        name: str,
58
        output: str = None,
59
        alias: Alias = None,
60
        **kwargs,
61
    ):
62
        super().__init__(ExpressionType.FUNCTION_EXPRESSION, **kwargs)
1✔
63
        self._context = Context()
1✔
64
        self._name = name
1✔
65
        self._function = func
1✔
66
        self._function_instance = None
1✔
67
        self._output = output
1✔
68
        self.alias = alias
1✔
69
        self.function_obj: FunctionCatalogEntry = None
1✔
70
        self.output_objs: List[FunctionIOCatalogEntry] = []
1✔
71
        self.projection_columns: List[str] = []
1✔
72
        self._cache: FunctionExpressionCache = None
1✔
73
        self._stats = FunctionStats()
1✔
74

75
    @property
1✔
76
    def name(self):
1✔
77
        return self._name
1✔
78

79
    @property
1✔
80
    def output(self):
1✔
81
        return self._output
1✔
82

83
    @property
1✔
84
    def col_alias(self):
1✔
85
        col_alias_list = []
×
86
        if self.alias is not None:
×
87
            for col in self.alias.col_names:
×
88
                col_alias_list.append("{}.{}".format(self.alias.alias_name, col))
×
89
        return col_alias_list
×
90

91
    @property
1✔
92
    def function(self):
1✔
93
        return self._function
1✔
94

95
    @function.setter
1✔
96
    def function(self, func: Callable):
1✔
97
        self._function = func
1✔
98

99
    def enable_cache(self, cache: "FunctionExpressionCache"):
1✔
100
        self._cache = cache
×
101
        return self
×
102

103
    def has_cache(self):
1✔
104
        return self._cache is not None
1✔
105

106
    def consolidate_stats(self):
1✔
107
        if self.function_obj is None:
1✔
UNCOV
108
            return
×
109

110
        # if the function expression support cache only approximate using cache_miss entries.
111
        if self.has_cache() and self._stats.cache_misses > 0:
1✔
112
            cost_per_func_call = (
×
113
                self._stats.timer.total_elapsed_time / self._stats.cache_misses
114
            )
115
        else:
116
            cost_per_func_call = self._stats.timer.total_elapsed_time / (
1✔
117
                self._stats.num_calls
118
            )
119

120
        if abs(self._stats.prev_cost - cost_per_func_call) > cost_per_func_call / 10:
1✔
121
            self._stats.prev_cost = cost_per_func_call
1✔
122

123
    def evaluate(self, batch: Batch, **kwargs) -> Batch:
1✔
124
        func = self._gpu_enabled_function()
1✔
125
        # record the time taken for the function execution
126
        # note the function might be using cache
127
        with self._stats.timer:
1✔
128
            # apply the function and project the required columns
129
            outcomes = self._apply_function_expression(func, batch, **kwargs)
1✔
130

131
            # process outcomes only if output is not empty
132
            if outcomes.frames.empty is False:
1✔
133
                if self._function().name == "ForecastModel":
1✔
UNCOV
134
                    outcomes = outcomes.project(self.projection_columns, forecast=True)
×
UNCOV
135
                    outcomes.modify_column_alias(self.alias, forecast=True)
×
136
                else:
137
                    outcomes = outcomes.project(self.projection_columns)
1✔
138
                    outcomes.modify_column_alias(self.alias)
1✔
139

140
        # record the number of function calls
141
        self._stats.num_calls += len(batch)
1✔
142

143
        # try persisting the stats to catalog and do not crash if we fail in doing so
144
        try:
1✔
145
            self.consolidate_stats()
1✔
146
        except Exception as e:
147
            logger.warn(
148
                f"Persisting FunctionExpression {str(self)} stats failed with {str(e)}"
149
            )
150

151
        return outcomes
1✔
152

153
    def signature(self) -> str:
1✔
154
        """It constructs the signature of the function expression.
155
        It traverses the children (function arguments) and compute signature for each
156
        child. The output is in the form `function_name[row_id](arg1, arg2, ...)`.
157

158
        Returns:
159
            str: signature string
160
        """
161
        child_sigs = []
1✔
162
        for child in self.children:
1✔
163
            child_sigs.append(child.signature())
1✔
164

165
        func_sig = f"{self.name}[{self.function_obj.row_id}]({','.join(child_sigs)})"
1✔
166
        return func_sig
1✔
167

168
    def _gpu_enabled_function(self):
1✔
169
        if self._function_instance is None:
1✔
170
            self._function_instance = self.function()
1✔
171
            if isinstance(self._function_instance, GPUCompatible):
1✔
UNCOV
172
                device = self._context.gpu_device()
×
UNCOV
173
                if device != NO_GPU:
×
UNCOV
174
                    self._function_instance = self._function_instance.to_device(device)
×
175
        return self._function_instance
1✔
176

177
    def _apply_function_expression(self, func: Callable, batch: Batch, **kwargs):
1✔
178
        """
179
        If cache is not enabled, call the func on the batch and return.
180
        If cache is enabled:
181
        (1) iterate over the input batch rows and check if we have the value in the
182
        cache;
183
        (2) for all cache miss rows, call the func;
184
        (3) iterate over each cache miss row and store the results in the cache;
185
        (4) stitch back the partial cache results with the new func calls.
186
        """
187
        func_args = Batch.merge_column_wise(
1✔
188
            [child.evaluate(batch, **kwargs) for child in self.children]
189
        )
190

191
        if not self._cache:
1✔
192
            return func_args.apply_function_expression(func)
1✔
193

UNCOV
194
        output_cols = [obj.name for obj in self.function_obj.outputs]
×
195

196
        # 1. check cache
197
        # We are required to iterate over the batch row by row and check the cache.
198
        # This can hurt performance, as we have to stitch together columns to generate
199
        # row tuples. Is there an alternative approach we can take?
200

201
        results = np.full([len(batch), len(output_cols)], None)
×
202
        cache_keys = func_args
×
203
        # cache keys can be different from func_args
204
        # see optimize_cache_key
205
        if self._cache.key:
×
UNCOV
206
            cache_keys = Batch.merge_column_wise(
×
207
                [child.evaluate(batch, **kwargs) for child in self._cache.key]
208
            )
209
            assert len(cache_keys) == len(batch), "Not all rows have the cache key"
×
210

211
        cache_miss = np.full(len(batch), True)
×
UNCOV
212
        for idx, (_, key) in enumerate(cache_keys.iterrows()):
×
UNCOV
213
            val = self._cache.store.get(key.to_numpy())
×
214
            results[idx] = val
×
UNCOV
215
            cache_miss[idx] = val is None
×
216

217
        # log the cache misses
218
        self._stats.cache_misses += sum(cache_miss)
×
219

220
        # 2. call func for cache miss rows
UNCOV
221
        if cache_miss.any():
×
222
            func_args = func_args[list(cache_miss)]
×
223
            cache_miss_results = func_args.apply_function_expression(func)
×
224

225
            # 3. set the cache results
226
            missing_keys = cache_keys[list(cache_miss)]
×
UNCOV
227
            for key, value in zip(
×
228
                missing_keys.iterrows(), cache_miss_results.iterrows()
229
            ):
UNCOV
230
                self._cache.store.set(key[1].to_numpy(), value[1].to_numpy())
×
231

232
            # 4. merge the cache results
UNCOV
233
            results[cache_miss] = cache_miss_results.to_numpy()
×
234

235
        # 5. return the correct batch
UNCOV
236
        return Batch(pd.DataFrame(results, columns=output_cols))
×
237

238
    def __str__(self) -> str:
1✔
UNCOV
239
        args = [str(child) for child in self.children]
×
UNCOV
240
        expr_str = f"{self.name}({','.join(args)})"
×
UNCOV
241
        return expr_str
×
242

243
    def __eq__(self, other):
1✔
UNCOV
244
        is_subtree_equal = super().__eq__(other)
×
UNCOV
245
        if not isinstance(other, FunctionExpression):
×
UNCOV
246
            return False
×
UNCOV
247
        return (
×
248
            is_subtree_equal
249
            and self.name == other.name
250
            and self.output == other.output
251
            and self.alias == other.alias
252
            and self.function == other.function
253
            and self.output_objs == other.output_objs
254
            and self._cache == other._cache
255
        )
256

257
    def __hash__(self) -> int:
1✔
258
        return hash(
1✔
259
            (
260
                super().__hash__(),
261
                self.name,
262
                self.output,
263
                self.alias,
264
                self.function,
265
                tuple(self.output_objs),
266
                self._cache,
267
            )
268
        )
269

270

271
@dataclass(frozen=True)
1✔
272
class FunctionExpressionCache:
1✔
273
    """dataclass for cache-related attributes
274

275
    Args:
276
        key (`AbstractExpression`): the list of abstract expression to evaluate to get the key. If `None`, use the function arguments as the key. This is useful when the system wants to use logically equivalent columns as the key (e.g., frame number instead of frame data).
277
        store (`DiskKVCache`): the cache object to get/set key-value pairs
278
    """
279

280
    key: Tuple[AbstractExpression]
1✔
281
    store: DiskKVCache = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc