• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / e4f99d26-a857-4f78-b9e8-041e4254b9e1

18 Aug 2023 05:49AM UTC coverage: 94.434% (-0.8%) from 95.277%
e4f99d26-a857-4f78-b9e8-041e4254b9e1

Pull #935

circle-ci

xzdandy
remove print
Pull Request #935: Ludwig-based model train and tune support.

92 of 92 new or added lines in 10 files covered. (100.0%)

10264 of 10869 relevant lines covered (94.43%)

1.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/evadb/executor/create_udf_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
2✔
16
from pathlib import Path
2✔
17
from typing import Dict, List
2✔
18

19
import pandas as pd
2✔
20

21
from evadb.catalog.catalog_utils import get_metadata_properties
2✔
22
from evadb.catalog.models.udf_catalog import UdfCatalogEntry
2✔
23
from evadb.catalog.models.udf_io_catalog import UdfIOCatalogEntry
2✔
24
from evadb.catalog.models.udf_metadata_catalog import UdfMetadataCatalogEntry
2✔
25
from evadb.configuration.constants import EvaDB_INSTALLATION_DIR
2✔
26
from evadb.database import EvaDBDatabase
2✔
27
from evadb.executor.abstract_executor import AbstractExecutor
2✔
28
from evadb.models.storage.batch import Batch
2✔
29
from evadb.plan_nodes.create_udf_plan import CreateUDFPlan
2✔
30
from evadb.third_party.huggingface.create import gen_hf_io_catalog_entries
2✔
31
from evadb.udfs.decorators.utils import load_io_from_udf_decorators
2✔
32
from evadb.utils.errors import UDFIODefinitionError
2✔
33
from evadb.utils.generic_utils import (
2✔
34
    load_udf_class_from_file,
35
    try_to_import_ludwig,
36
    try_to_import_torch,
37
    try_to_import_ultralytics,
38
)
39
from evadb.utils.logging_manager import logger
2✔
40

41

42
class CreateUDFExecutor(AbstractExecutor):
2✔
43
    def __init__(self, db: EvaDBDatabase, node: CreateUDFPlan):
2✔
44
        super().__init__(db, node)
2✔
45
        self.udf_dir = Path(EvaDB_INSTALLATION_DIR) / "udfs"
2✔
46

47
    def handle_huggingface_udf(self):
2✔
48
        """Handle HuggingFace UDFs
49

50
        HuggingFace UDFs are special UDFs that are not loaded from a file.
51
        So we do not need to call the setup method on them like we do for other UDFs.
52
        """
53
        # We need atleast one deep learning framework for HuggingFace
54
        # Torch or Tensorflow
55
        try_to_import_torch()
2✔
56
        impl_path = f"{self.udf_dir}/abstract/hf_abstract_udf.py"
2✔
57
        io_list = gen_hf_io_catalog_entries(self.node.name, self.node.metadata)
2✔
58
        return (
2✔
59
            self.node.name,
60
            impl_path,
61
            self.node.udf_type,
62
            io_list,
63
            self.node.metadata,
64
        )
65

66
    def handle_ludwig_udf(self):
2✔
67
        """Handle ludwig UDFs
68

69
        Use ludwig's auto_train engine to train/tune models.
70
        """
71
        try_to_import_ludwig()
×
72
        from evadb.configuration.constants import DEFAULT_TRAIN_TIME_LIMIT
×
73
        from ludwig.automl import auto_train
×
74

75
        assert (
×
76
            len(self.children) == 1
77
        ), "Create ludwig UDF expects 1 child, finds {}.".format(len(self.children))
78

79
        aggregated_batch_list = []
×
80
        child = self.children[0]
×
81
        for batch in child.exec():
×
82
            aggregated_batch_list.append(batch)
×
83
        aggregated_batch = Batch.concat(aggregated_batch_list, copy=False)
×
84
        aggregated_batch.drop_column_alias()
×
85

86
        arg_map = {arg.key: arg.value for arg in self.node.metadata}
×
87
        assert "predict" in arg_map, "Create ludwig UDF expects 'predict' metadata."
×
88
        auto_train_results = auto_train(
×
89
            dataset=aggregated_batch.frames,
90
            target=arg_map["predict"],
91
            tune_for_memory=arg_map.get("tune_for_memory", False),
92
            time_limit_s=arg_map.get("time_limit", DEFAULT_TRAIN_TIME_LIMIT),
93
            output_directory=self.db.config.get_value("storage", "tmp_dir"),
94
        )
95
        model_path = os.path.join(
×
96
            self.db.config.get_value("storage", "model_dir"), self.node.name
97
        )
98
        auto_train_results.best_model.save(model_path)
×
99
        self.node.metadata.append(UdfMetadataCatalogEntry("model_path", model_path))
×
100

101
        impl_path = Path(f"{self.udf_dir}/ludwig.py").absolute().as_posix()
×
102
        # TODO figure out the io
103
        io_list = None
×
104
        return (
×
105
            self.node.name,
106
            impl_path,
107
            self.node.udf_type,
108
            io_list,
109
            self.node.metadata,
110
        )
111

112
    def handle_ultralytics_udf(self):
2✔
113
        """Handle Ultralytics UDFs"""
114
        try_to_import_ultralytics()
2✔
115

116
        impl_path = (
2✔
117
            Path(f"{self.udf_dir}/yolo_object_detector.py").absolute().as_posix()
118
        )
119
        udf = self._try_initializing_udf(
2✔
120
            impl_path, udf_args=get_metadata_properties(self.node)
121
        )
122
        io_list = self._resolve_udf_io(udf)
2✔
123
        return (
2✔
124
            self.node.name,
125
            impl_path,
126
            self.node.udf_type,
127
            io_list,
128
            self.node.metadata,
129
        )
130

131
    def handle_generic_udf(self):
2✔
132
        """Handle generic UDFs
133

134
        Generic UDFs are loaded from a file. We check for inputs passed by the user during CREATE or try to load io from decorators.
135
        """
136
        impl_path = self.node.impl_path.absolute().as_posix()
2✔
137
        udf = self._try_initializing_udf(impl_path)
2✔
138
        io_list = self._resolve_udf_io(udf)
2✔
139

140
        return (
2✔
141
            self.node.name,
142
            impl_path,
143
            self.node.udf_type,
144
            io_list,
145
            self.node.metadata,
146
        )
147

148
    def exec(self, *args, **kwargs):
2✔
149
        """Create udf executor
150

151
        Calls the catalog to insert a udf catalog entry.
152
        """
153
        # check catalog if it already has this udf entry
154
        if self.catalog().get_udf_catalog_entry_by_name(self.node.name):
2✔
155
            if self.node.if_not_exists:
2✔
156
                msg = f"UDF {self.node.name} already exists, nothing added."
2✔
157
                yield Batch(pd.DataFrame([msg]))
2✔
158
                return
2✔
159
            else:
160
                msg = f"UDF {self.node.name} already exists."
2✔
161
                logger.error(msg)
2✔
162
                raise RuntimeError(msg)
163

164
        # if it's a type of HuggingFaceModel, override the impl_path
165
        if self.node.udf_type == "HuggingFace":
2✔
166
            name, impl_path, udf_type, io_list, metadata = self.handle_huggingface_udf()
2✔
167
        elif self.node.udf_type == "ultralytics":
2✔
168
            name, impl_path, udf_type, io_list, metadata = self.handle_ultralytics_udf()
2✔
169
        elif self.node.udf_type == "ludwig":
2✔
170
            name, impl_path, udf_type, io_list, metadata = self.handle_ludwig_udf()
×
171
        else:
172
            name, impl_path, udf_type, io_list, metadata = self.handle_generic_udf()
2✔
173

174
        self.catalog().insert_udf_catalog_entry(
2✔
175
            name, impl_path, udf_type, io_list, metadata
176
        )
177
        yield Batch(
2✔
178
            pd.DataFrame([f"UDF {self.node.name} successfully added to the database."])
179
        )
180

181
    def _try_initializing_udf(
2✔
182
        self, impl_path: str, udf_args: Dict = {}
183
    ) -> UdfCatalogEntry:
184
        """Attempts to initialize UDF given the implementation file path and arguments.
185

186
        Args:
187
            impl_path (str): The file path of the UDF implementation file.
188
            udf_args (Dict, optional): Dictionary of arguments to pass to the UDF. Defaults to {}.
189

190
        Returns:
191
            UdfCatalogEntry: A UdfCatalogEntry object that represents the initialized UDF.
192

193
        Raises:
194
            RuntimeError: If an error occurs while initializing the UDF.
195
        """
196

197
        # load the udf class from the file
198
        try:
2✔
199
            # loading the udf class from the file
200
            udf = load_udf_class_from_file(impl_path, self.node.name)
2✔
201
            # initializing the udf class calls the setup method internally
202
            udf(**udf_args)
2✔
203
        except Exception as e:
204
            err_msg = f"Error creating UDF: {str(e)}"
205
            # logger.error(err_msg)
206
            raise RuntimeError(err_msg)
207

208
        return udf
2✔
209

210
    def _resolve_udf_io(self, udf: UdfCatalogEntry) -> List[UdfIOCatalogEntry]:
2✔
211
        """Private method that resolves the input/output definitions for a given UDF.
212
        It first searches for the input/outputs in the CREATE statement. If not found, it resolves them using decorators. If not found there as well, it raises an error.
213

214
        Args:
215
            udf (UdfCatalogEntry): The UDF for which to resolve input and output definitions.
216

217
        Returns:
218
            A List of UdfIOCatalogEntry objects that represent the resolved input and
219
            output definitions for the UDF.
220

221
        Raises:
222
            RuntimeError: If an error occurs while resolving the UDF input/output
223
            definitions.
224
        """
225
        io_list = []
2✔
226
        try:
2✔
227
            if self.node.inputs:
2✔
228
                io_list.extend(self.node.inputs)
2✔
229
            else:
230
                # try to load the inputs from decorators, the inputs from CREATE statement take precedence
231
                io_list.extend(load_io_from_udf_decorators(udf, is_input=True))
2✔
232

233
            if self.node.outputs:
2✔
234
                io_list.extend(self.node.outputs)
2✔
235
            else:
236
                # try to load the outputs from decorators, the outputs from CREATE statement take precedence
237
                io_list.extend(load_io_from_udf_decorators(udf, is_input=False))
2✔
238

239
        except UDFIODefinitionError as e:
240
            err_msg = f"Error creating UDF, input/output definition incorrect: {str(e)}"
241
            logger.error(err_msg)
242
            raise RuntimeError(err_msg)
243

244
        return io_list
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc