• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / cf748690-046a-4f45-983b-b5ca63416eda

pending completion
cf748690-046a-4f45-983b-b5ca63416eda

Pull #582

circle-ci

jarulraj
checkpoint
Pull Request #582: server: asyncio refactoring

150 of 150 new or added lines in 7 files covered. (100.0%)

7887 of 8618 relevant lines covered (91.52%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.51
/eva/binder/statement_binder.py
1
# coding=utf-8
2
# Copyright 2018-2022 EVA
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import sys
1✔
16

17
from eva.binder.binder_utils import (
1✔
18
    BinderError,
19
    bind_table_info,
20
    check_groupby_pattern,
21
    check_table_object_is_video,
22
    extend_star,
23
)
24
from eva.binder.statement_binder_context import StatementBinderContext
1✔
25
from eva.catalog.catalog_manager import CatalogManager
1✔
26
from eva.catalog.catalog_type import IndexType, NdArrayType, TableType
1✔
27
from eva.expression.abstract_expression import AbstractExpression
1✔
28
from eva.expression.function_expression import FunctionExpression
1✔
29
from eva.expression.tuple_value_expression import TupleValueExpression
1✔
30
from eva.parser.alias import Alias
1✔
31
from eva.parser.create_index_statement import CreateIndexStatement
1✔
32
from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement
1✔
33
from eva.parser.explain_statement import ExplainStatement
1✔
34
from eva.parser.rename_statement import RenameTableStatement
1✔
35
from eva.parser.select_statement import SelectStatement
1✔
36
from eva.parser.statement import AbstractStatement
1✔
37
from eva.parser.table_ref import TableRef
1✔
38
from eva.utils.generic_utils import path_to_class
1✔
39
from eva.utils.logging_manager import logger
1✔
40

41
if sys.version_info >= (3, 8):
1✔
42
    from functools import singledispatchmethod
×
43
else:
44
    # https://stackoverflow.com/questions/24601722/how-can-i-use-functools-singledispatch-with-instance-methods
45
    from functools import singledispatch, update_wrapper
1✔
46

47
    def singledispatchmethod(func):
1✔
48
        dispatcher = singledispatch(func)
1✔
49

50
        def wrapper(*args, **kw):
1✔
51
            return dispatcher.dispatch(args[1].__class__)(*args, **kw)
1✔
52

53
        wrapper.register = dispatcher.register
1✔
54
        update_wrapper(wrapper, func)
1✔
55
        return wrapper
1✔
56

57

58
class StatementBinder:
1✔
59
    def __init__(self, binder_context: StatementBinderContext):
1✔
60
        self._binder_context = binder_context
1✔
61
        self._catalog = CatalogManager()
1✔
62

63
    @singledispatchmethod
1✔
64
    def bind(self, node):
65
        raise NotImplementedError(f"Cannot bind {type(node)}")
1✔
66

67
    @bind.register(AbstractStatement)
1✔
68
    def _bind_abstract_statement(self, node: AbstractStatement):
1✔
69
        pass
1✔
70

71
    @bind.register(AbstractExpression)
1✔
72
    def _bind_abstract_expr(self, node: AbstractExpression):
1✔
73
        for child in node.children:
1✔
74
            self.bind(child)
1✔
75

76
    @bind.register(ExplainStatement)
1✔
77
    def _bind_explain_statement(self, node: ExplainStatement):
1✔
78
        self.bind(node.explainable_stmt)
1✔
79

80
    @bind.register(CreateIndexStatement)
1✔
81
    def _bind_create_index_statement(self, node: CreateIndexStatement):
1✔
82
        self.bind(node.table_ref)
1✔
83
        if node.udf_func:
1✔
84
            self.bind(node.udf_func)
1✔
85

86
        # TODO: create index currently only supports single numpy column.
87
        assert len(node.col_list) == 1, "Index cannot be created on more than 1 column"
1✔
88

89
        # TODO: create index currently only works on TableInfo, but will extend later.
90
        assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo"
1✔
91

92
        if IndexType.is_faiss_index_type(node.index_type):
1✔
93
            if not node.udf_func:
1✔
94
                # Feature table type needs to be float32 numpy array.
95
                col_def = node.col_list[0]
1✔
96
                table_ref_obj = node.table_ref.table.table_obj
1✔
97
                col = [
1✔
98
                    col for col in table_ref_obj.columns if col.name == col_def.name
99
                ][0]
100
                if not col.array_type == NdArrayType.FLOAT32:
1✔
101
                    raise BinderError("Index input needs to be float32.")
×
102
                if not len(col.array_dimensions) == 2:
1✔
103
                    raise BinderError("Index input needs to be 2 dimensional.")
×
104
            else:
105
                # Output of the UDF should be 2 dimension and float32 type.
106
                catalog_manager = CatalogManager()
1✔
107
                udf_obj = catalog_manager.get_udf_catalog_entry_by_name(
1✔
108
                    node.udf_func.name
109
                )
110
                for output in udf_obj.outputs:
1✔
111
                    if not output.array_type == NdArrayType.FLOAT32:
1✔
112
                        raise BinderError("Index input needs to be float32.")
×
113
                    if not len(output.array_dimensions) == 2:
1✔
114
                        raise BinderError("Index input needs to be 2 dimensional.")
×
115
        else:
116
            raise BinderError("Index type {} is not supported.".format(node.index_type))
×
117

118
    @bind.register(SelectStatement)
1✔
119
    def _bind_select_statement(self, node: SelectStatement):
1✔
120
        self.bind(node.from_table)
1✔
121
        if node.where_clause:
1✔
122
            self.bind(node.where_clause)
1✔
123
        if node.target_list:
1✔
124
            # SELECT * support
125
            if (
1✔
126
                len(node.target_list) == 1
127
                and isinstance(node.target_list[0], TupleValueExpression)
128
                and node.target_list[0].col_name == "*"
129
            ):
130
                node.target_list = extend_star(self._binder_context)
1✔
131
            for expr in node.target_list:
1✔
132
                self.bind(expr)
1✔
133
        if node.groupby_clause:
1✔
134
            self.bind(node.groupby_clause)
1✔
135
            check_groupby_pattern(node.groupby_clause.value)
1✔
136
            check_table_object_is_video(node.from_table)
1✔
137
        if node.orderby_list:
1✔
138
            for expr in node.orderby_list:
1✔
139
                self.bind(expr[0])
1✔
140
        if node.union_link:
1✔
141
            current_context = self._binder_context
1✔
142
            self._binder_context = StatementBinderContext()
1✔
143
            self.bind(node.union_link)
1✔
144
            self._binder_context = current_context
1✔
145

146
    @bind.register(CreateMaterializedViewStatement)
1✔
147
    def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement):
1✔
148
        self.bind(node.query)
1✔
149
        # Todo Verify if the number projected columns matches table
150

151
    @bind.register(RenameTableStatement)
1✔
152
    def _bind_rename_table_statement(self, node: RenameTableStatement):
1✔
153
        self.bind(node.old_table_ref)
1✔
154
        if node.old_table_ref.table.table_obj.table_type == TableType.STRUCTURED_DATA:
1✔
155
            err_msg = "Rename not yet supported on structured data"
1✔
156
            logger.exception(err_msg)
1✔
157
            raise BinderError(err_msg)
1✔
158

159
    @bind.register(TableRef)
1✔
160
    def _bind_tableref(self, node: TableRef):
1✔
161
        if node.is_table_atom():
1✔
162
            # Table
163
            self._binder_context.add_table_alias(
1✔
164
                node.alias.alias_name, node.table.table_name
165
            )
166
            bind_table_info(node.table)
1✔
167
        elif node.is_select():
1✔
168
            current_context = self._binder_context
1✔
169
            self._binder_context = StatementBinderContext()
1✔
170
            self.bind(node.select_statement)
1✔
171
            self._binder_context = current_context
1✔
172
            self._binder_context.add_derived_table_alias(
1✔
173
                node.alias.alias_name, node.select_statement.target_list
174
            )
175
        elif node.is_join():
1✔
176
            self.bind(node.join_node.left)
1✔
177
            self.bind(node.join_node.right)
1✔
178
            if node.join_node.predicate:
1✔
179
                self.bind(node.join_node.predicate)
1✔
180
        elif node.is_table_valued_expr():
1✔
181
            func_expr = node.table_valued_expr.func_expr
1✔
182
            func_expr.alias = node.alias
1✔
183
            self.bind(func_expr)
1✔
184
            output_cols = []
1✔
185
            for obj, alias in zip(func_expr.output_objs, func_expr.alias.col_names):
1✔
186
                col_alias = "{}.{}".format(func_expr.alias.alias_name, alias)
1✔
187
                alias_obj = TupleValueExpression(
1✔
188
                    col_name=alias,
189
                    table_alias=func_expr.alias.alias_name,
190
                    col_object=obj,
191
                    col_alias=col_alias,
192
                )
193
                output_cols.append(alias_obj)
1✔
194
            self._binder_context.add_derived_table_alias(
1✔
195
                func_expr.alias.alias_name, output_cols
196
            )
197
        else:
198
            raise BinderError(f"Unsupported node {type(node)}")
1✔
199

200
    @bind.register(TupleValueExpression)
1✔
201
    def _bind_tuple_expr(self, node: TupleValueExpression):
1✔
202
        table_alias, col_obj = self._binder_context.get_binded_column(
1✔
203
            node.col_name, node.table_alias
204
        )
205
        node.col_alias = "{}.{}".format(table_alias, node.col_name.lower())
1✔
206
        node.col_object = col_obj
1✔
207

208
    @bind.register(FunctionExpression)
1✔
209
    def _bind_func_expr(self, node: FunctionExpression):
1✔
210
        # bind all the children
211
        for child in node.children:
1✔
212
            self.bind(child)
1✔
213

214
        udf_obj = self._catalog.get_udf_catalog_entry_by_name(node.name)
1✔
215
        if udf_obj is None:
1✔
216
            err_msg = (
1✔
217
                f"UDF with name {node.name} does not exist in the catalog. "
218
                "Please create the UDF using CREATE UDF command."
219
            )
220
            logger.error(err_msg)
1✔
221
            raise BinderError(err_msg)
1✔
222

223
        try:
1✔
224
            node.function = path_to_class(udf_obj.impl_file_path, udf_obj.name)
1✔
225
        except Exception as e:
1✔
226
            err_msg = (
1✔
227
                f"{str(e)}. Please verify that the UDF class name in the"
228
                "implementation file matches the UDF name."
229
            )
230
            logger.error(err_msg)
1✔
231
            raise BinderError(err_msg)
1✔
232

233
        node.udf_obj = udf_obj
1✔
234
        output_objs = self._catalog.get_udf_io_catalog_output_entries(udf_obj)
1✔
235
        if node.output:
1✔
236
            for obj in output_objs:
1✔
237
                if obj.name.lower() == node.output:
1✔
238
                    node.output_objs = [obj]
1✔
239
            if not node.output_objs:
1✔
240
                err_msg = f"Output {node.output} does not exist for {udf_obj.name}."
1✔
241
                logger.error(err_msg)
1✔
242
                raise BinderError(err_msg)
1✔
243
            node.projection_columns = [node.output]
1✔
244
        else:
245
            node.output_objs = output_objs
1✔
246
            node.projection_columns = [obj.name.lower() for obj in output_objs]
1✔
247

248
        default_alias_name = node.name.lower()
1✔
249
        default_output_col_aliases = [str(obj.name.lower()) for obj in node.output_objs]
1✔
250
        if not node.alias:
1✔
251
            node.alias = Alias(default_alias_name, default_output_col_aliases)
1✔
252
        else:
253
            if not len(node.alias.col_names):
1✔
254
                node.alias = Alias(node.alias.alias_name, default_output_col_aliases)
1✔
255
            else:
256
                output_aliases = [
1✔
257
                    str(col_name.lower()) for col_name in node.alias.col_names
258
                ]
259
                node.alias = Alias(node.alias.alias_name, output_aliases)
1✔
260

261
        if len(node.alias.col_names) != len(node.output_objs):
1✔
262
            err_msg = (
1✔
263
                f"Expected {len(node.output_objs)} output columns for "
264
                f"{node.alias.alias_name}, got {len(node.alias.col_names)}."
265
            )
266
            logger.error(err_msg)
1✔
267
            raise BinderError(err_msg)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc