• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.6
/evadb/expression/function_expression.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
from dataclasses import dataclass
1✔
16
from typing import Callable, List, Tuple
1✔
17

18
import numpy as np
1✔
19
import pandas as pd
1✔
20

21
from evadb.catalog.models.function_catalog import FunctionCatalogEntry
1✔
22
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
23
from evadb.constants import NO_GPU
1✔
24
from evadb.executor.execution_context import Context
1✔
25
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
1✔
26
from evadb.functions.gpu_compatible import GPUCompatible
1✔
27
from evadb.models.storage.batch import Batch
1✔
28
from evadb.parser.alias import Alias
1✔
29
from evadb.utils.kv_cache import DiskKVCache
1✔
30
from evadb.utils.logging_manager import logger
1✔
31
from evadb.utils.stats import FunctionStats
1✔
32

33

34
class FunctionExpression(AbstractExpression):
1✔
35
    """
36
    Consider FunctionExpression: ObjDetector -> (labels, boxes)
37

38
    `output`: If the user wants only subset of outputs. Eg,
39
    ObjDetector.labels the parser with set output to 'labels'
40

41
    `output_objs`: It is populated by the binder. In case the
42
    output is None, the binder sets output_objs to list of all
43
    output columns of the FunctionExpression. Eg, ['labels',
44
    'boxes']. Otherwise, only the output columns.
45

46
    FunctionExpression also needs to prepend its alias to all the
47
    projected columns. This is important as other parts of the query
48
    might be assessing the results using alias. Eg,
49

50
    `Select Detector.labels
51
     FROM Video JOIN LATERAL ObjDetector AS Detector;`
52
    """
53

54
    def __init__(
1✔
55
        self,
56
        func: Callable,
57
        name: str,
58
        output: str = None,
59
        alias: Alias = None,
60
        **kwargs,
61
    ):
62
        super().__init__(ExpressionType.FUNCTION_EXPRESSION, **kwargs)
1✔
63
        self._context = Context()
1✔
64
        self._name = name
1✔
65
        self._function = func
1✔
66
        self._function_instance = None
1✔
67
        self._output = output
1✔
68
        self.alias = alias
1✔
69
        self.function_obj: FunctionCatalogEntry = None
1✔
70
        self.output_objs: List[FunctionIOCatalogEntry] = []
1✔
71
        self.projection_columns: List[str] = []
1✔
72
        self._cache: FunctionExpressionCache = None
1✔
73
        self._stats = FunctionStats()
1✔
74

75
    @property
1✔
76
    def name(self):
1✔
77
        return self._name
1✔
78

79
    @property
1✔
80
    def output(self):
1✔
81
        return self._output
1✔
82

83
    @property
1✔
84
    def col_alias(self):
1✔
85
        col_alias_list = []
×
86
        if self.alias is not None:
×
87
            for col in self.alias.col_names:
×
88
                col_alias_list.append("{}.{}".format(self.alias.alias_name, col))
×
89
        return col_alias_list
×
90

91
    @property
1✔
92
    def function(self):
1✔
93
        return self._function
1✔
94

95
    @function.setter
1✔
96
    def function(self, func: Callable):
1✔
97
        self._function = func
1✔
98

99
    def enable_cache(self, cache: "FunctionExpressionCache"):
1✔
100
        self._cache = cache
×
101
        return self
×
102

103
    def has_cache(self):
1✔
104
        return self._cache is not None
1✔
105

106
    def consolidate_stats(self):
1✔
107
        if self.function_obj is None:
1✔
108
            return
1✔
109

110
        # if the function expression support cache only approximate using cache_miss entries.
111
        if self.has_cache() and self._stats.cache_misses > 0:
1✔
112
            cost_per_func_call = (
×
113
                self._stats.timer.total_elapsed_time / self._stats.cache_misses
114
            )
115
        else:
116
            cost_per_func_call = self._stats.timer.total_elapsed_time / (
1✔
117
                self._stats.num_calls
118
            )
119

120
        if abs(self._stats.prev_cost - cost_per_func_call) > cost_per_func_call / 10:
1✔
121
            self._stats.prev_cost = cost_per_func_call
1✔
122

123
    def evaluate(self, batch: Batch, **kwargs) -> Batch:
1✔
124
        func = self._gpu_enabled_function()
1✔
125
        # record the time taken for the function execution
126
        # note the function might be using cache
127
        with self._stats.timer:
1✔
128
            # apply the function and project the required columns
129
            outcomes = self._apply_function_expression(func, batch, **kwargs)
1✔
130

131
            # process outcomes only if output is not empty
132
            if outcomes.frames.empty is False:
1✔
133
                outcomes = outcomes.project(self.projection_columns)
1✔
134
                outcomes.modify_column_alias(self.alias)
1✔
135

136
        # record the number of function calls
137
        self._stats.num_calls += len(batch)
1✔
138

139
        # try persisting the stats to catalog and do not crash if we fail in doing so
140
        try:
1✔
141
            self.consolidate_stats()
1✔
142
        except Exception as e:
143
            logger.warn(
144
                f"Persisting FunctionExpression {str(self)} stats failed with {str(e)}"
145
            )
146

147
        return outcomes
1✔
148

149
    def signature(self) -> str:
1✔
150
        """It constructs the signature of the function expression.
151
        It traverses the children (function arguments) and compute signature for each
152
        child. The output is in the form `function_name[row_id](arg1, arg2, ...)`.
153

154
        Returns:
155
            str: signature string
156
        """
157
        child_sigs = []
×
158
        for child in self.children:
×
159
            child_sigs.append(child.signature())
×
160

161
        func_sig = f"{self.name}[{self.function_obj.row_id}]({','.join(child_sigs)})"
×
162
        return func_sig
×
163

164
    def _gpu_enabled_function(self):
1✔
165
        if self._function_instance is None:
1✔
166
            self._function_instance = self.function()
1✔
167
            if isinstance(self._function_instance, GPUCompatible):
1✔
168
                device = self._context.gpu_device()
1✔
169
                if device != NO_GPU:
1✔
170
                    self._function_instance = self._function_instance.to_device(device)
1✔
171
        return self._function_instance
1✔
172

173
    def _apply_function_expression(self, func: Callable, batch: Batch, **kwargs):
1✔
174
        """
175
        If cache is not enabled, call the func on the batch and return.
176
        If cache is enabled:
177
        (1) iterate over the input batch rows and check if we have the value in the
178
        cache;
179
        (2) for all cache miss rows, call the func;
180
        (3) iterate over each cache miss row and store the results in the cache;
181
        (4) stitch back the partial cache results with the new func calls.
182
        """
183
        func_args = Batch.merge_column_wise(
1✔
184
            [child.evaluate(batch, **kwargs) for child in self.children]
185
        )
186

187
        if not self._cache:
1✔
188
            return func_args.apply_function_expression(func)
1✔
189

190
        output_cols = [obj.name for obj in self.function_obj.outputs]
×
191

192
        # 1. check cache
193
        # We are required to iterate over the batch row by row and check the cache.
194
        # This can hurt performance, as we have to stitch together columns to generate
195
        # row tuples. Is there an alternative approach we can take?
196

197
        results = np.full([len(batch), len(output_cols)], None)
×
198
        cache_keys = func_args
×
199
        # cache keys can be different from func_args
200
        # see optimize_cache_key
201
        if self._cache.key:
×
202
            cache_keys = Batch.merge_column_wise(
×
203
                [child.evaluate(batch, **kwargs) for child in self._cache.key]
204
            )
205
            assert len(cache_keys) == len(batch), "Not all rows have the cache key"
×
206

207
        cache_miss = np.full(len(batch), True)
×
208
        for idx, (_, key) in enumerate(cache_keys.iterrows()):
×
209
            val = self._cache.store.get(key.to_numpy())
×
210
            results[idx] = val
×
211
            cache_miss[idx] = val is None
×
212

213
        # log the cache misses
214
        self._stats.cache_misses += sum(cache_miss)
×
215

216
        # 2. call func for cache miss rows
217
        if cache_miss.any():
×
218
            func_args = func_args[list(cache_miss)]
×
219
            cache_miss_results = func_args.apply_function_expression(func)
×
220

221
            # 3. set the cache results
222
            missing_keys = cache_keys[list(cache_miss)]
×
223
            for key, value in zip(
×
224
                missing_keys.iterrows(), cache_miss_results.iterrows()
225
            ):
226
                self._cache.store.set(key[1].to_numpy(), value[1].to_numpy())
×
227

228
            # 4. merge the cache results
229
            results[cache_miss] = cache_miss_results.to_numpy()
×
230

231
        # 5. return the correct batch
232
        return Batch(pd.DataFrame(results, columns=output_cols))
×
233

234
    def __str__(self) -> str:
1✔
235
        args = [str(child) for child in self.children]
1✔
236
        expr_str = f"{self.name}({','.join(args)})"
1✔
237
        return expr_str
1✔
238

239
    def __eq__(self, other):
1✔
240
        is_subtree_equal = super().__eq__(other)
1✔
241
        if not isinstance(other, FunctionExpression):
1✔
242
            return False
1✔
243
        return (
1✔
244
            is_subtree_equal
245
            and self.name == other.name
246
            and self.output == other.output
247
            and self.alias == other.alias
248
            and self.function == other.function
249
            and self.output_objs == other.output_objs
250
            and self._cache == other._cache
251
        )
252

253
    def __hash__(self) -> int:
1✔
254
        return hash(
1✔
255
            (
256
                super().__hash__(),
257
                self.name,
258
                self.output,
259
                self.alias,
260
                self.function,
261
                tuple(self.output_objs),
262
                self._cache,
263
            )
264
        )
265

266

267
@dataclass(frozen=True)
1✔
268
class FunctionExpressionCache:
1✔
269
    """dataclass for cache-related attributes
270

271
    Args:
272
        key (`AbstractExpression`): the list of abstract expression to evaluate to get the key. If `None`, use the function arguments as the key. This is useful when the system wants to use logically equivalent columns as the key (e.g., frame number instead of frame data).
273
        store (`DiskKVCache`): the cache object to get/set key-value pairs
274
    """
275

276
    key: Tuple[AbstractExpression]
1✔
277
    store: DiskKVCache = None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc