• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / 1b281547-b06e-4454-874a-06ef3843dfed

pending completion
1b281547-b06e-4454-874a-06ef3843dfed

Pull #717

circle-ci

jarulraj
updates
Pull Request #717: ci: updates to ci pipeline

10 of 10 new or added lines in 2 files covered. (100.0%)

9243 of 9431 relevant lines covered (98.01%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.63
/eva/executor/create_udf_executor.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import os
1✔
16
from pathlib import Path
1✔
17
from typing import Dict, List
1✔
18

19
import pandas as pd
1✔
20

21
from eva.catalog.catalog_manager import CatalogManager
1✔
22
from eva.catalog.catalog_utils import get_metadata_properties
1✔
23
from eva.catalog.models.udf_catalog import UdfCatalogEntry
1✔
24
from eva.catalog.models.udf_io_catalog import UdfIOCatalogEntry
1✔
25
from eva.configuration.constants import EVA_DEFAULT_DIR
1✔
26
from eva.executor.abstract_executor import AbstractExecutor
1✔
27
from eva.models.storage.batch import Batch
1✔
28
from eva.plan_nodes.create_udf_plan import CreateUDFPlan
1✔
29
from eva.third_party.huggingface.create import gen_hf_io_catalog_entries
1✔
30
from eva.udfs.decorators.utils import load_io_from_udf_decorators
1✔
31
from eva.utils.errors import UDFIODefinitionError
1✔
32
from eva.utils.generic_utils import load_udf_class_from_file
1✔
33
from eva.utils.logging_manager import logger
1✔
34

35

36
class CreateUDFExecutor(AbstractExecutor):
1✔
37
    def __init__(self, node: CreateUDFPlan):
1✔
38
        super().__init__(node)
1✔
39

40
    def handle_huggingface_udf(self):
1✔
41
        """Handle HuggingFace UDFs
42

43
        HuggingFace UDFs are special UDFs that are not loaded from a file.
44
        So we do not need to call the setup method on them like we do for other UDFs.
45
        """
46
        impl_path = f"{EVA_DEFAULT_DIR}/udfs/abstract/hf_abstract_udf.py"
1✔
47
        io_list = gen_hf_io_catalog_entries(self.node.name, self.node.metadata)
1✔
48
        return (
1✔
49
            self.node.name,
50
            impl_path,
51
            self.node.udf_type,
52
            io_list,
53
            self.node.metadata,
54
        )
55

56
    def suffix_pytest_xdist_worker_id_to_dir(self, path: Path):
1✔
57
        try:
1✔
58
            worker_id = os.environ["PYTEST_XDIST_WORKER"]
1✔
59
            path = path / str(worker_id)
×
60
        except KeyError:
61
            pass
62
        return path
1✔
63

64
    def handle_ultralytics_udf(self):
1✔
65
        """Handle Ultralytics UDFs"""
66
        initial_eva_config_dir = Path(EVA_DEFAULT_DIR)
1✔
67
        updated_eva_config_dir = self.suffix_pytest_xdist_worker_id_to_dir(
1✔
68
            initial_eva_config_dir
69
        )
70
        impl_path = (
1✔
71
            Path(f"{updated_eva_config_dir}/udfs/yolo_object_detector.py")
72
            .absolute()
73
            .as_posix()
74
        )
75
        udf = self._try_initializing_udf(
1✔
76
            impl_path, udf_args=get_metadata_properties(self.node)
77
        )
78
        io_list = self._resolve_udf_io(udf)
1✔
79
        return (
1✔
80
            self.node.name,
81
            impl_path,
82
            self.node.udf_type,
83
            io_list,
84
            self.node.metadata,
85
        )
86

87
    def handle_generic_udf(self):
1✔
88
        """Handle generic UDFs
89

90
        Generic UDFs are loaded from a file. We check for inputs passed by the user during CREATE or try to load io from decorators.
91
        """
92
        impl_path = self.node.impl_path.absolute().as_posix()
1✔
93
        udf = self._try_initializing_udf(impl_path)
1✔
94
        io_list = self._resolve_udf_io(udf)
1✔
95

96
        return (
1✔
97
            self.node.name,
98
            impl_path,
99
            self.node.udf_type,
100
            io_list,
101
            self.node.metadata,
102
        )
103

104
    def exec(self, *args, **kwargs):
1✔
105
        """Create udf executor
106

107
        Calls the catalog to insert a udf catalog entry.
108
        """
109
        catalog_manager = CatalogManager()
1✔
110
        # check catalog if it already has this udf entry
111
        if catalog_manager.get_udf_catalog_entry_by_name(self.node.name):
1✔
112
            if self.node.if_not_exists:
1✔
113
                msg = f"UDF {self.node.name} already exists, nothing added."
1✔
114
                logger.warn(msg)
1✔
115
                yield Batch(pd.DataFrame([msg]))
1✔
116
                return
1✔
117
            else:
118
                msg = f"UDF {self.node.name} already exists."
1✔
119
                logger.error(msg)
1✔
120
                raise RuntimeError(msg)
121

122
        # if it's a type of HuggingFaceModel, override the impl_path
123
        if self.node.udf_type == "HuggingFace":
1✔
124
            name, impl_path, udf_type, io_list, metadata = self.handle_huggingface_udf()
1✔
125
        elif self.node.udf_type == "ultralytics":
1✔
126
            name, impl_path, udf_type, io_list, metadata = self.handle_ultralytics_udf()
1✔
127
        else:
128
            name, impl_path, udf_type, io_list, metadata = self.handle_generic_udf()
1✔
129

130
        catalog_manager.insert_udf_catalog_entry(
1✔
131
            name, impl_path, udf_type, io_list, metadata
132
        )
133
        yield Batch(
1✔
134
            pd.DataFrame([f"UDF {self.node.name} successfully added to the database."])
135
        )
136

137
    def _try_initializing_udf(
1✔
138
        self, impl_path: str, udf_args: Dict = {}
139
    ) -> UdfCatalogEntry:
140
        """Attempts to initialize UDF given the implementation file path and arguments.
141

142
        Args:
143
            impl_path (str): The file path of the UDF implementation file.
144
            udf_args (Dict, optional): Dictionary of arguments to pass to the UDF.
145
            Defaults to {}.
146

147
        Returns:
148
            UdfCatalogEntry: A UdfCatalogEntry object that represents the initialized
149
            UDF.
150

151
        Raises:
152
            RuntimeError: If an error occurs while initializing the UDF.
153
        """
154

155
        # load the udf class from the file
156
        try:
1✔
157
            # loading the udf class from the file
158
            udf = load_udf_class_from_file(impl_path, self.node.name)
1✔
159
            # initializing the udf class calls the setup method internally
160
            udf(**udf_args)
1✔
161
        except Exception as e:
162
            err_msg = f"Error creating UDF: {str(e)}"
163
            logger.error(err_msg)
164
            raise RuntimeError(err_msg)
165

166
        return udf
1✔
167

168
    def _resolve_udf_io(self, udf: UdfCatalogEntry) -> List[UdfIOCatalogEntry]:
1✔
169
        """Private method that resolves the input/output definitions for a given UDF.
170
        It first searches for the input/outputs in the CREATE statement. If not found, it resolves them using decorators. If not found there as well, it raises an error.
171

172
        Args:
173
            udf (UdfCatalogEntry): The UDF for which to resolve input and output definitions.
174

175
        Returns:
176
            A List of UdfIOCatalogEntry objects that represent the resolved input and
177
            output definitions for the UDF.
178

179
        Raises:
180
            RuntimeError: If an error occurs while resolving the UDF input/output
181
            definitions.
182
        """
183
        io_list = []
1✔
184
        try:
1✔
185
            if self.node.inputs:
1✔
186
                io_list.extend(self.node.inputs)
1✔
187
            else:
188
                # try to load the inputs from decorators, the inputs from CREATE statement take precedence
189
                io_list.extend(load_io_from_udf_decorators(udf, is_input=True))
1✔
190

191
            if self.node.outputs:
1✔
192
                io_list.extend(self.node.outputs)
1✔
193
            else:
194
                # try to load the outputs from decorators, the outputs from CREATE statement take precedence
195
                io_list.extend(load_io_from_udf_decorators(udf, is_input=False))
1✔
196

197
        except UDFIODefinitionError as e:
198
            err_msg = f"Error creating UDF, input/output definition incorrect: {str(e)}"
199
            logger.error(err_msg)
200
            raise RuntimeError(err_msg)
201

202
        return io_list
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc