• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / b7e09b55-9333-4c49-b273-87a69e5c463f

05 Sep 2023 11:42PM UTC coverage: 74.515% (-19.0%) from 93.55%
b7e09b55-9333-4c49-b273-87a69e5c463f

Pull #1050

circle-ci

jiashenC
fix: add missing needed file (#1046)
Pull Request #1050: feat: sync master staging

768 of 768 new or added lines in 96 files covered. (100.0%)

8757 of 11752 relevant lines covered (74.51%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.05
/evadb/executor/create_function_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
1✔
16
from pathlib import Path
1✔
17
from typing import Dict, List
1✔
18

19
import pandas as pd
1✔
20

21
from evadb.catalog.catalog_utils import get_metadata_properties
1✔
22
from evadb.catalog.models.function_catalog import FunctionCatalogEntry
1✔
23
from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry
1✔
24
from evadb.catalog.models.function_metadata_catalog import FunctionMetadataCatalogEntry
1✔
25
from evadb.configuration.constants import (
1✔
26
    DEFAULT_TRAIN_TIME_LIMIT,
27
    EvaDB_INSTALLATION_DIR,
28
)
29
from evadb.database import EvaDBDatabase
1✔
30
from evadb.executor.abstract_executor import AbstractExecutor
1✔
31
from evadb.functions.decorators.utils import load_io_from_function_decorators
1✔
32
from evadb.models.storage.batch import Batch
1✔
33
from evadb.plan_nodes.create_function_plan import CreateFunctionPlan
1✔
34
from evadb.third_party.huggingface.create import gen_hf_io_catalog_entries
1✔
35
from evadb.utils.errors import FunctionIODefinitionError
1✔
36
from evadb.utils.generic_utils import (
1✔
37
    load_function_class_from_file,
38
    try_to_import_ludwig,
39
    try_to_import_torch,
40
    try_to_import_ultralytics,
41
)
42
from evadb.utils.logging_manager import logger
1✔
43

44

45
class CreateFunctionExecutor(AbstractExecutor):
1✔
46
    def __init__(self, db: EvaDBDatabase, node: CreateFunctionPlan):
1✔
47
        super().__init__(db, node)
1✔
48
        self.function_dir = Path(EvaDB_INSTALLATION_DIR) / "functions"
1✔
49

50
    def handle_huggingface_function(self):
1✔
51
        """Handle HuggingFace Functions
52

53
        HuggingFace Functions are special Functions that are not loaded from a file.
54
        So we do not need to call the setup method on them like we do for other Functions.
55
        """
56
        # We need at least one deep learning framework for HuggingFace
57
        # Torch or Tensorflow
58
        try_to_import_torch()
×
59
        impl_path = f"{self.function_dir}/abstract/hf_abstract_function.py"
×
60
        io_list = gen_hf_io_catalog_entries(self.node.name, self.node.metadata)
×
61
        return (
×
62
            self.node.name,
63
            impl_path,
64
            self.node.function_type,
65
            io_list,
66
            self.node.metadata,
67
        )
68

69
    def handle_ludwig_function(self):
1✔
70
        """Handle ludwig Functions
71

72
        Use ludwig's auto_train engine to train/tune models.
73
        """
74
        try_to_import_ludwig()
×
75
        from ludwig.automl import auto_train
×
76

77
        assert (
×
78
            len(self.children) == 1
79
        ), "Create ludwig function expects 1 child, finds {}.".format(
80
            len(self.children)
81
        )
82

83
        aggregated_batch_list = []
×
84
        child = self.children[0]
×
85
        for batch in child.exec():
×
86
            aggregated_batch_list.append(batch)
×
87
        aggregated_batch = Batch.concat(aggregated_batch_list, copy=False)
×
88
        aggregated_batch.drop_column_alias()
×
89

90
        arg_map = {arg.key: arg.value for arg in self.node.metadata}
×
91
        auto_train_results = auto_train(
×
92
            dataset=aggregated_batch.frames,
93
            target=arg_map["predict"],
94
            tune_for_memory=arg_map.get("tune_for_memory", False),
95
            time_limit_s=arg_map.get("time_limit", DEFAULT_TRAIN_TIME_LIMIT),
96
            output_directory=self.db.config.get_value("storage", "tmp_dir"),
97
        )
98
        model_path = os.path.join(
×
99
            self.db.config.get_value("storage", "model_dir"), self.node.name
100
        )
101
        auto_train_results.best_model.save(model_path)
×
102
        self.node.metadata.append(
×
103
            FunctionMetadataCatalogEntry("model_path", model_path)
104
        )
105

106
        impl_path = Path(f"{self.function_dir}/ludwig.py").absolute().as_posix()
×
107
        io_list = self._resolve_function_io(None)
×
108
        return (
×
109
            self.node.name,
110
            impl_path,
111
            self.node.function_type,
112
            io_list,
113
            self.node.metadata,
114
        )
115

116
    def handle_ultralytics_function(self):
1✔
117
        """Handle Ultralytics Functions"""
118
        try_to_import_ultralytics()
1✔
119

120
        impl_path = (
1✔
121
            Path(f"{self.function_dir}/yolo_object_detector.py").absolute().as_posix()
122
        )
123
        function = self._try_initializing_function(
1✔
124
            impl_path, function_args=get_metadata_properties(self.node)
125
        )
126
        io_list = self._resolve_function_io(function)
1✔
127
        return (
1✔
128
            self.node.name,
129
            impl_path,
130
            self.node.function_type,
131
            io_list,
132
            self.node.metadata,
133
        )
134

135
    def handle_generic_function(self):
1✔
136
        """Handle generic Functions
137

138
        Generic Functions are loaded from a file. We check for inputs passed by the user during CREATE or try to load io from decorators.
139
        """
140
        impl_path = self.node.impl_path.absolute().as_posix()
1✔
141
        function = self._try_initializing_function(impl_path)
1✔
142
        io_list = self._resolve_function_io(function)
1✔
143

144
        return (
1✔
145
            self.node.name,
146
            impl_path,
147
            self.node.function_type,
148
            io_list,
149
            self.node.metadata,
150
        )
151

152
    def exec(self, *args, **kwargs):
1✔
153
        """Create function executor
154

155
        Calls the catalog to insert a function catalog entry.
156
        """
157
        # check catalog if it already has this function entry
158
        if self.catalog().get_function_catalog_entry_by_name(self.node.name):
1✔
159
            if self.node.if_not_exists:
×
160
                msg = f"Function {self.node.name} already exists, nothing added."
×
161
                yield Batch(pd.DataFrame([msg]))
×
162
                return
×
163
            else:
164
                msg = f"Function {self.node.name} already exists."
×
165
                logger.error(msg)
×
166
                raise RuntimeError(msg)
167

168
        # if it's a type of HuggingFaceModel, override the impl_path
169
        if self.node.function_type == "HuggingFace":
1✔
170
            (
×
171
                name,
172
                impl_path,
173
                function_type,
174
                io_list,
175
                metadata,
176
            ) = self.handle_huggingface_function()
177
        elif self.node.function_type == "ultralytics":
1✔
178
            (
1✔
179
                name,
180
                impl_path,
181
                function_type,
182
                io_list,
183
                metadata,
184
            ) = self.handle_ultralytics_function()
185
        elif self.node.function_type == "Ludwig":
1✔
186
            (
×
187
                name,
188
                impl_path,
189
                function_type,
190
                io_list,
191
                metadata,
192
            ) = self.handle_ludwig_function()
193
        else:
194
            (
1✔
195
                name,
196
                impl_path,
197
                function_type,
198
                io_list,
199
                metadata,
200
            ) = self.handle_generic_function()
201

202
        self.catalog().insert_function_catalog_entry(
1✔
203
            name, impl_path, function_type, io_list, metadata
204
        )
205
        yield Batch(
1✔
206
            pd.DataFrame(
207
                [f"Function {self.node.name} successfully added to the database."]
208
            )
209
        )
210

211
    def _try_initializing_function(
1✔
212
        self, impl_path: str, function_args: Dict = {}
213
    ) -> FunctionCatalogEntry:
214
        """Attempts to initialize function given the implementation file path and arguments.
215

216
        Args:
217
            impl_path (str): The file path of the function implementation file.
218
            function_args (Dict, optional): Dictionary of arguments to pass to the Function. Defaults to {}.
219

220
        Returns:
221
            FunctionCatalogEntry: A FunctionCatalogEntry object that represents the initialized Function.
222

223
        Raises:
224
            RuntimeError: If an error occurs while initializing the Function.
225
        """
226

227
        # load the function class from the file
228
        try:
1✔
229
            # loading the function class from the file
230
            function = load_function_class_from_file(impl_path, self.node.name)
1✔
231
            # initializing the function class calls the setup method internally
232
            function(**function_args)
1✔
233
        except Exception as e:
234
            err_msg = f"Error creating Function: {str(e)}"
235
            # logger.error(err_msg)
236
            raise RuntimeError(err_msg)
237

238
        return function
1✔
239

240
    def _resolve_function_io(
1✔
241
        self, function: FunctionCatalogEntry
242
    ) -> List[FunctionIOCatalogEntry]:
243
        """Private method that resolves the input/output definitions for a given Function.
244
        It first searches for the input/outputs in the CREATE statement. If not found, it resolves them using decorators. If not found there as well, it raises an error.
245

246
        Args:
247
            function (FunctionCatalogEntry): The function for which to resolve input and output definitions.
248

249
        Returns:
250
            A List of FunctionIOCatalogEntry objects that represent the resolved input and
251
            output definitions for the Function.
252

253
        Raises:
254
            RuntimeError: If an error occurs while resolving the function input/output
255
            definitions.
256
        """
257
        io_list = []
1✔
258
        try:
1✔
259
            if self.node.inputs:
1✔
260
                io_list.extend(self.node.inputs)
1✔
261
            else:
262
                # try to load the inputs from decorators, the inputs from CREATE statement take precedence
263
                io_list.extend(
1✔
264
                    load_io_from_function_decorators(function, is_input=True)
265
                )
266

267
            if self.node.outputs:
1✔
268
                io_list.extend(self.node.outputs)
1✔
269
            else:
270
                # try to load the outputs from decorators, the outputs from CREATE statement take precedence
271
                io_list.extend(
1✔
272
                    load_io_from_function_decorators(function, is_input=False)
273
                )
274

275
        except FunctionIODefinitionError as e:
276
            err_msg = (
277
                f"Error creating Function, input/output definition incorrect: {str(e)}"
278
            )
279
            logger.error(err_msg)
280
            raise RuntimeError(err_msg)
281

282
        return io_list
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc