• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #850

08 Nov 2023 08:36PM UTC coverage: 0.0% (-77.0%) from 76.982%
#850

push

circleci

americast
fix metrics logic

0 of 1 new or added line in 1 file covered. (0.0%)

9789 existing lines in 252 files now uncovered.

0 of 12428 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/expression/function_expression.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
UNCOV
15
from dataclasses import dataclass
×
UNCOV
16
from typing import Callable, List, Tuple
×
17

UNCOV
18
import numpy as np
×
UNCOV
19
import pandas as pd
×
20

UNCOV
21
from evadb.catalog.models.function_catalog import FunctionCatalogEntry
×
UNCOV
22
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
×
UNCOV
23
from evadb.constants import NO_GPU
×
UNCOV
24
from evadb.executor.execution_context import Context
×
UNCOV
25
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
×
UNCOV
26
from evadb.functions.gpu_compatible import GPUCompatible
×
UNCOV
27
from evadb.models.storage.batch import Batch
×
UNCOV
28
from evadb.parser.alias import Alias
×
UNCOV
29
from evadb.utils.kv_cache import DiskKVCache
×
UNCOV
30
from evadb.utils.logging_manager import logger
×
UNCOV
31
from evadb.utils.stats import FunctionStats
×
32

33

UNCOV
34
class FunctionExpression(AbstractExpression):
×
35
    """
36
    Consider FunctionExpression: ObjDetector -> (labels, boxes)
37

38
    `output`: If the user wants only subset of outputs. Eg,
39
    ObjDetector.labels the parser with set output to 'labels'
40

41
    `output_objs`: It is populated by the binder. In case the
42
    output is None, the binder sets output_objs to list of all
43
    output columns of the FunctionExpression. Eg, ['labels',
44
    'boxes']. Otherwise, only the output columns.
45

46
    FunctionExpression also needs to prepend its alias to all the
47
    projected columns. This is important as other parts of the query
48
    might be assessing the results using alias. Eg,
49

50
    `Select Detector.labels
51
     FROM Video JOIN LATERAL ObjDetector AS Detector;`
52
    """
53

UNCOV
54
    def __init__(
×
55
        self,
56
        func: Callable,
57
        name: str,
58
        output: str = None,
59
        alias: Alias = None,
60
        **kwargs,
61
    ):
UNCOV
62
        super().__init__(ExpressionType.FUNCTION_EXPRESSION, **kwargs)
×
UNCOV
63
        self._context = Context()
×
UNCOV
64
        self._name = name
×
UNCOV
65
        self._function = func
×
UNCOV
66
        self._function_instance = None
×
UNCOV
67
        self._output = output
×
UNCOV
68
        self.alias = alias
×
UNCOV
69
        self.function_obj: FunctionCatalogEntry = None
×
UNCOV
70
        self.output_objs: List[FunctionIOCatalogEntry] = []
×
UNCOV
71
        self.projection_columns: List[str] = []
×
UNCOV
72
        self._cache: FunctionExpressionCache = None
×
UNCOV
73
        self._stats = FunctionStats()
×
74

UNCOV
75
    @property
×
UNCOV
76
    def name(self):
×
UNCOV
77
        return self._name
×
78

UNCOV
79
    @property
×
UNCOV
80
    def output(self):
×
UNCOV
81
        return self._output
×
82

UNCOV
83
    @property
×
UNCOV
84
    def col_alias(self):
×
85
        col_alias_list = []
×
86
        if self.alias is not None:
×
87
            for col in self.alias.col_names:
×
88
                col_alias_list.append("{}.{}".format(self.alias.alias_name, col))
×
89
        return col_alias_list
×
90

UNCOV
91
    @property
×
UNCOV
92
    def function(self):
×
UNCOV
93
        return self._function
×
94

UNCOV
95
    @function.setter
×
UNCOV
96
    def function(self, func: Callable):
×
UNCOV
97
        self._function = func
×
98

UNCOV
99
    def enable_cache(self, cache: "FunctionExpressionCache"):
×
100
        self._cache = cache
×
101
        return self
×
102

UNCOV
103
    def has_cache(self):
×
UNCOV
104
        return self._cache is not None
×
105

UNCOV
106
    def consolidate_stats(self):
×
UNCOV
107
        if self.function_obj is None:
×
UNCOV
108
            return
×
109

110
        # if the function expression support cache only approximate using cache_miss entries.
UNCOV
111
        if self.has_cache() and self._stats.cache_misses > 0:
×
112
            cost_per_func_call = (
×
113
                self._stats.timer.total_elapsed_time / self._stats.cache_misses
114
            )
115
        else:
UNCOV
116
            cost_per_func_call = self._stats.timer.total_elapsed_time / (
×
117
                self._stats.num_calls
118
            )
119

UNCOV
120
        if abs(self._stats.prev_cost - cost_per_func_call) > cost_per_func_call / 10:
×
UNCOV
121
            self._stats.prev_cost = cost_per_func_call
×
122

UNCOV
123
    def evaluate(self, batch: Batch, **kwargs) -> Batch:
×
UNCOV
124
        func = self._gpu_enabled_function()
×
125
        # record the time taken for the function execution
126
        # note the function might be using cache
UNCOV
127
        with self._stats.timer:
×
128
            # apply the function and project the required columns
UNCOV
129
            outcomes = self._apply_function_expression(func, batch, **kwargs)
×
130

131
            # process outcomes only if output is not empty
UNCOV
132
            if outcomes.frames.empty is False:
×
UNCOV
133
                outcomes = outcomes.project(self.projection_columns)
×
UNCOV
134
                outcomes.modify_column_alias(self.alias)
×
135

136
        # record the number of function calls
UNCOV
137
        self._stats.num_calls += len(batch)
×
138

139
        # try persisting the stats to catalog and do not crash if we fail in doing so
UNCOV
140
        try:
×
UNCOV
141
            self.consolidate_stats()
×
142
        except Exception as e:
143
            logger.warn(
144
                f"Persisting FunctionExpression {str(self)} stats failed with {str(e)}"
145
            )
146

UNCOV
147
        return outcomes
×
148

UNCOV
149
    def signature(self) -> str:
×
150
        """It constructs the signature of the function expression.
151
        It traverses the children (function arguments) and compute signature for each
152
        child. The output is in the form `function_name[row_id](arg1, arg2, ...)`.
153

154
        Returns:
155
            str: signature string
156
        """
UNCOV
157
        child_sigs = []
×
UNCOV
158
        for child in self.children:
×
UNCOV
159
            child_sigs.append(child.signature())
×
160

UNCOV
161
        func_sig = f"{self.name}[{self.function_obj.row_id}]({','.join(child_sigs)})"
×
UNCOV
162
        return func_sig
×
163

UNCOV
164
    def _gpu_enabled_function(self):
×
UNCOV
165
        if self._function_instance is None:
×
UNCOV
166
            self._function_instance = self.function()
×
UNCOV
167
            if isinstance(self._function_instance, GPUCompatible):
×
UNCOV
168
                device = self._context.gpu_device()
×
UNCOV
169
                if device != NO_GPU:
×
UNCOV
170
                    self._function_instance = self._function_instance.to_device(device)
×
UNCOV
171
        return self._function_instance
×
172

UNCOV
173
    def _apply_function_expression(self, func: Callable, batch: Batch, **kwargs):
×
174
        """
175
        If cache is not enabled, call the func on the batch and return.
176
        If cache is enabled:
177
        (1) iterate over the input batch rows and check if we have the value in the
178
        cache;
179
        (2) for all cache miss rows, call the func;
180
        (3) iterate over each cache miss row and store the results in the cache;
181
        (4) stitch back the partial cache results with the new func calls.
182
        """
UNCOV
183
        func_args = Batch.merge_column_wise(
×
184
            [child.evaluate(batch, **kwargs) for child in self.children]
185
        )
186

UNCOV
187
        if not self._cache:
×
UNCOV
188
            return func_args.apply_function_expression(func)
×
189

190
        output_cols = [obj.name for obj in self.function_obj.outputs]
×
191

192
        # 1. check cache
193
        # We are required to iterate over the batch row by row and check the cache.
194
        # This can hurt performance, as we have to stitch together columns to generate
195
        # row tuples. Is there an alternative approach we can take?
196

197
        results = np.full([len(batch), len(output_cols)], None)
×
198
        cache_keys = func_args
×
199
        # cache keys can be different from func_args
200
        # see optimize_cache_key
201
        if self._cache.key:
×
202
            cache_keys = Batch.merge_column_wise(
×
203
                [child.evaluate(batch, **kwargs) for child in self._cache.key]
204
            )
205
            assert len(cache_keys) == len(batch), "Not all rows have the cache key"
×
206

207
        cache_miss = np.full(len(batch), True)
×
208
        for idx, (_, key) in enumerate(cache_keys.iterrows()):
×
209
            val = self._cache.store.get(key.to_numpy())
×
210
            results[idx] = val
×
211
            cache_miss[idx] = val is None
×
212

213
        # log the cache misses
214
        self._stats.cache_misses += sum(cache_miss)
×
215

216
        # 2. call func for cache miss rows
217
        if cache_miss.any():
×
218
            func_args = func_args[list(cache_miss)]
×
219
            cache_miss_results = func_args.apply_function_expression(func)
×
220

221
            # 3. set the cache results
222
            missing_keys = cache_keys[list(cache_miss)]
×
223
            for key, value in zip(
×
224
                missing_keys.iterrows(), cache_miss_results.iterrows()
225
            ):
226
                self._cache.store.set(key[1].to_numpy(), value[1].to_numpy())
×
227

228
            # 4. merge the cache results
229
            results[cache_miss] = cache_miss_results.to_numpy()
×
230

231
        # 5. return the correct batch
232
        return Batch(pd.DataFrame(results, columns=output_cols))
×
233

UNCOV
234
    def __str__(self) -> str:
×
UNCOV
235
        args = [str(child) for child in self.children]
×
UNCOV
236
        expr_str = f"{self.name}({','.join(args)})"
×
UNCOV
237
        return expr_str
×
238

UNCOV
239
    def __eq__(self, other):
×
UNCOV
240
        is_subtree_equal = super().__eq__(other)
×
UNCOV
241
        if not isinstance(other, FunctionExpression):
×
UNCOV
242
            return False
×
UNCOV
243
        return (
×
244
            is_subtree_equal
245
            and self.name == other.name
246
            and self.output == other.output
247
            and self.alias == other.alias
248
            and self.function == other.function
249
            and self.output_objs == other.output_objs
250
            and self._cache == other._cache
251
        )
252

UNCOV
253
    def __hash__(self) -> int:
×
UNCOV
254
        return hash(
×
255
            (
256
                super().__hash__(),
257
                self.name,
258
                self.output,
259
                self.alias,
260
                self.function,
261
                tuple(self.output_objs),
262
                self._cache,
263
            )
264
        )
265

266

UNCOV
267
@dataclass(frozen=True)
×
UNCOV
268
class FunctionExpressionCache:
×
269
    """dataclass for cache-related attributes
270

271
    Args:
272
        key (`AbstractExpression`): the list of abstract expression to evaluate to get the key. If `None`, use the function arguments as the key. This is useful when the system wants to use logically equivalent columns as the key (e.g., frame number instead of frame data).
273
        store (`DiskKVCache`): the cache object to get/set key-value pairs
274
    """
275

UNCOV
276
    key: Tuple[AbstractExpression]
×
UNCOV
277
    store: DiskKVCache = None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc