• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #754

04 Sep 2023 09:54PM UTC coverage: 74.807% (-5.5%) from 80.336%
#754

push

circle-ci

jiashenC
update case

8727 of 11666 relevant lines covered (74.81%)

0.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.84
/evadb/catalog/catalog_manager.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import shutil
1✔
16
from pathlib import Path
1✔
17
from typing import List
1✔
18

19
from evadb.catalog.catalog_type import (
1✔
20
    ColumnType,
21
    TableType,
22
    VectorStoreType,
23
    VideoColumnName,
24
)
25
from evadb.catalog.catalog_utils import (
1✔
26
    cleanup_storage,
27
    construct_function_cache_catalog_entry,
28
    get_document_table_column_definitions,
29
    get_image_table_column_definitions,
30
    get_pdf_table_column_definitions,
31
    get_video_table_column_definitions,
32
    xform_column_definitions_to_catalog_entries,
33
)
34
from evadb.catalog.models.utils import (
35
    ColumnCatalogEntry,
36
    DatabaseCatalogEntry,
37
    FunctionCacheCatalogEntry,
38
    FunctionCatalogEntry,
39
    FunctionCostCatalogEntry,
40
    FunctionIOCatalogEntry,
41
    FunctionMetadataCatalogEntry,
42
    IndexCatalogEntry,
43
    TableCatalogEntry,
44
    drop_all_tables_except_catalog,
45
    init_db,
46
    truncate_catalog_tables,
47
)
48
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
1✔
49
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
1✔
50
from evadb.catalog.services.function_cache_catalog_service import (
1✔
51
    FunctionCacheCatalogService,
52
)
53
from evadb.catalog.services.function_catalog_service import FunctionCatalogService
1✔
54
from evadb.catalog.services.function_cost_catalog_service import (
1✔
55
    FunctionCostCatalogService,
56
)
57
from evadb.catalog.services.function_io_catalog_service import FunctionIOCatalogService
1✔
58
from evadb.catalog.services.function_metadata_catalog_service import (
1✔
59
    FunctionMetadataCatalogService,
60
)
61
from evadb.catalog.services.index_catalog_service import IndexCatalogService
1✔
62
from evadb.catalog.services.table_catalog_service import TableCatalogService
1✔
63
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
1✔
64
from evadb.configuration.configuration_manager import ConfigurationManager
1✔
65
from evadb.expression.function_expression import FunctionExpression
1✔
66
from evadb.parser.create_statement import ColumnDefinition
1✔
67
from evadb.parser.table_ref import TableInfo
1✔
68
from evadb.parser.types import FileFormatType
1✔
69
from evadb.utils.generic_utils import generate_file_path, get_file_checksum
1✔
70
from evadb.utils.logging_manager import logger
1✔
71

72

73
class CatalogManager(object):
1✔
74
    def __init__(self, db_uri: str, config: ConfigurationManager):
1✔
75
        self._db_uri = db_uri
1✔
76
        self._sql_config = SQLConfig(db_uri)
1✔
77
        self._config = config
1✔
78
        self._bootstrap_catalog()
1✔
79
        self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
1✔
80
        self._table_catalog_service = TableCatalogService(self._sql_config.session)
1✔
81
        self._column_service = ColumnCatalogService(self._sql_config.session)
1✔
82
        self._function_service = FunctionCatalogService(self._sql_config.session)
1✔
83
        self._function_cost_catalog_service = FunctionCostCatalogService(
1✔
84
            self._sql_config.session
85
        )
86
        self._function_io_service = FunctionIOCatalogService(self._sql_config.session)
1✔
87
        self._function_metadata_service = FunctionMetadataCatalogService(
1✔
88
            self._sql_config.session
89
        )
90
        self._index_service = IndexCatalogService(self._sql_config.session)
1✔
91
        self._function_cache_service = FunctionCacheCatalogService(
1✔
92
            self._sql_config.session
93
        )
94

95
    @property
1✔
96
    def sql_config(self):
1✔
97
        return self._sql_config
1✔
98

99
    def reset(self):
1✔
100
        """
101
        This method resets the state of the singleton instance.
102
        It should clear the contents of the catalog tables and any storage data
103
        Used by testcases to reset the db state before
104
        """
105
        self._clear_catalog_contents()
1✔
106

107
    def close(self):
1✔
108
        """
109
        This method closes all the connections
110
        """
111
        if self.sql_config is not None:
×
112
            sqlalchemy_engine = self.sql_config.engine
×
113
            sqlalchemy_engine.dispose()
×
114

115
    def _bootstrap_catalog(self):
1✔
116
        """Bootstraps catalog.
117
        This method runs all tasks required for using catalog. Currently,
118
        it includes only one task ie. initializing database. It creates the
119
        catalog database and tables if they do not exist.
120
        """
121
        logger.info("Bootstrapping catalog")
1✔
122
        init_db(self._sql_config.engine)
1✔
123

124
    def _clear_catalog_contents(self):
1✔
125
        """
126
        This method is responsible for clearing the contents of the
127
        catalog. It clears the tuples in the catalog tables, indexes, and cached data.
128
        """
129
        logger.info("Clearing catalog")
1✔
130
        # drop tables which are not part of catalog
131
        drop_all_tables_except_catalog(self._sql_config.engine)
132
        # truncate the catalog tables
133
        truncate_catalog_tables(self._sql_config.engine)
1✔
134
        # clean up the dataset, index, and cache directories
135
        cleanup_storage(self._config)
1✔
136

137
    "Database catalog services"
1✔
138

139
    def insert_database_catalog_entry(self, name: str, engine: str, params: dict):
1✔
140
        """A new entry is persisted in the database catalog."
141

142
        Args:
143
            name: database name
144
            engine: engine name
145
            params: required params as a dictionary for the database
146
        """
147
        self._db_catalog_service.insert_entry(name, engine, params)
×
148

149
    def get_database_catalog_entry(self, database_name: str) -> DatabaseCatalogEntry:
1✔
150
        """
151
        Returns the database catalog entry for the given database_name
152
        Arguments:
153
            database_name (str): name of the database
154

155
        Returns:
156
            DatabaseCatalogEntry
157
        """
158

159
        table_entry = self._db_catalog_service.get_entry_by_name(database_name)
×
160

161
        return table_entry
×
162

163
    def delete_database_catalog_entry(
1✔
164
        self, database_entry: DatabaseCatalogEntry
165
    ) -> bool:
166
        """
167
        This method deletes the database from  catalog.
168

169
        Arguments:
170
           database_entry: database catalog entry to remove
171

172
        Returns:
173
           True if successfully deleted else False
174
        """
175
        # todo: do we need to remove also the associated tables etc or that will be
176
        # taken care by the underlying db
177
        return self._db_catalog_service.delete_entry(database_entry)
×
178

179
    "Table catalog services"
1✔
180

181
    def insert_table_catalog_entry(
1✔
182
        self,
183
        name: str,
184
        file_url: str,
185
        column_list: List[ColumnCatalogEntry],
186
        identifier_column="id",
187
        table_type=TableType.VIDEO_DATA,
188
    ) -> TableCatalogEntry:
189
        """A new entry is added to the table catalog and persisted in the database.
190
        The schema field is set before the object is returned."
191

192
        Args:
193
            name: table name
194
            file_url: #todo
195
            column_list: list of columns
196
            identifier_column (str):  A unique identifier column for each row
197
            table_type (TableType): type of the table, video, images etc
198
        Returns:
199
            The persisted TableCatalogEntry object with the id field populated.
200
        """
201

202
        # Append row_id to table column list.
203
        column_list = [
1✔
204
            ColumnCatalogEntry(name=IDENTIFIER_COLUMN, type=ColumnType.INTEGER)
205
        ] + column_list
206

207
        table_entry = self._table_catalog_service.insert_entry(
1✔
208
            name,
209
            file_url,
210
            identifier_column=identifier_column,
211
            table_type=table_type,
212
            column_list=column_list,
213
        )
214

215
        return table_entry
1✔
216

217
    def get_table_catalog_entry(
1✔
218
        self, table_name: str, database_name: str = None
219
    ) -> TableCatalogEntry:
220
        """
221
        Returns the table catalog entry for the given table name
222
        Arguments:
223
            table_name (str): name of the table
224

225
        Returns:
226
            TableCatalogEntry
227
        """
228

229
        table_entry = self._table_catalog_service.get_entry_by_name(
1✔
230
            database_name, table_name
231
        )
232

233
        return table_entry
1✔
234

235
    def delete_table_catalog_entry(self, table_entry: TableCatalogEntry) -> bool:
1✔
236
        """
237
        This method deletes the table along with its columns from table catalog
238
        and column catalog respectively
239

240
        Arguments:
241
           table: table catalog entry to remove
242

243
        Returns:
244
           True if successfully deleted else False
245
        """
246
        return self._table_catalog_service.delete_entry(table_entry)
1✔
247

248
    def rename_table_catalog_entry(
1✔
249
        self, curr_table: TableCatalogEntry, new_name: TableInfo
250
    ):
251
        return self._table_catalog_service.rename_entry(curr_table, new_name.table_name)
1✔
252

253
    def check_table_exists(self, table_name: str, database_name: str = None):
1✔
254
        table_entry = self._table_catalog_service.get_entry_by_name(
1✔
255
            database_name, table_name
256
        )
257
        if table_entry is None:
1✔
258
            return False
1✔
259
        else:
260
            return True
1✔
261

262
    def get_all_table_catalog_entries(self):
1✔
263
        return self._table_catalog_service.get_all_entries()
×
264

265
    "Column catalog services"
1✔
266

267
    def get_column_catalog_entry(
1✔
268
        self, table_obj: TableCatalogEntry, col_name: str
269
    ) -> ColumnCatalogEntry:
270
        col_obj = self._column_service.filter_entry_by_table_id_and_name(
1✔
271
            table_obj.row_id, col_name
272
        )
273
        if col_obj:
1✔
274
            return col_obj
1✔
275
        else:
276
            # return a dummy column catalog entry for audio, even though it does not defined for videos
277
            if col_name == VideoColumnName.audio:
×
278
                return ColumnCatalogEntry(
×
279
                    col_name,
280
                    ColumnType.NDARRAY,
281
                    table_id=table_obj.row_id,
282
                    table_name=table_obj.name,
283
                )
284
            return None
×
285

286
    def get_column_catalog_entries_by_table(self, table_obj: TableCatalogEntry):
1✔
287
        col_entries = self._column_service.filter_entries_by_table(table_obj)
×
288
        return col_entries
×
289

290
    "function catalog services"
1✔
291

292
    def insert_function_catalog_entry(
1✔
293
        self,
294
        name: str,
295
        impl_file_path: str,
296
        type: str,
297
        function_io_list: List[FunctionIOCatalogEntry],
298
        function_metadata_list: List[FunctionMetadataCatalogEntry],
299
    ) -> FunctionCatalogEntry:
300
        """Inserts a function catalog entry along with Function_IO entries.
301
        It persists the entry to the database.
302

303
        Arguments:
304
            name(str): name of the function
305
            impl_file_path(str): implementation path of the function
306
            type(str): what kind of function operator like classification,
307
                                                        detection etc
308
            function_io_list(List[FunctionIOCatalogEntry]): input/output function info list
309

310
        Returns:
311
            The persisted FunctionCatalogEntry object.
312
        """
313

314
        checksum = get_file_checksum(impl_file_path)
1✔
315
        function_entry = self._function_service.insert_entry(
1✔
316
            name, impl_file_path, type, checksum
317
        )
318
        for function_io in function_io_list:
1✔
319
            function_io.function_id = function_entry.row_id
1✔
320
        self._function_io_service.insert_entries(function_io_list)
1✔
321
        for function_metadata in function_metadata_list:
1✔
322
            function_metadata.function_id = function_entry.row_id
1✔
323
        self._function_metadata_service.insert_entries(function_metadata_list)
1✔
324
        return function_entry
1✔
325

326
    def get_function_catalog_entry_by_name(self, name: str) -> FunctionCatalogEntry:
1✔
327
        """
328
        Get the function information based on name.
329

330
        Arguments:
331
             name (str): name of the function
332

333
        Returns:
334
            FunctionCatalogEntry object
335
        """
336
        return self._function_service.get_entry_by_name(name)
1✔
337

338
    def delete_function_catalog_entry_by_name(self, function_name: str) -> bool:
1✔
339
        return self._function_service.delete_entry_by_name(function_name)
1✔
340

341
    def get_all_function_catalog_entries(self):
1✔
342
        return self._function_service.get_all_entries()
1✔
343

344
    "function cost catalog services"
1✔
345

346
    def upsert_function_cost_catalog_entry(
1✔
347
        self, function_id: int, name: str, cost: int
348
    ) -> FunctionCostCatalogEntry:
349
        """Upserts function cost catalog entry.
350

351
        Arguments:
352
            function_id(int): unique function id
353
            name(str): the name of the function
354
            cost(int): cost of this function
355

356
        Returns:
357
            The persisted FunctionCostCatalogEntry object.
358
        """
359

360
        self._function_cost_catalog_service.upsert_entry(function_id, name, cost)
1✔
361

362
    def get_function_cost_catalog_entry(self, name: str):
1✔
363
        return self._function_cost_catalog_service.get_entry_by_name(name)
1✔
364

365
    "FunctionIO services"
1✔
366

367
    def get_function_io_catalog_input_entries(
1✔
368
        self, function_obj: FunctionCatalogEntry
369
    ) -> List[FunctionIOCatalogEntry]:
370
        return self._function_io_service.get_input_entries_by_function_id(
1✔
371
            function_obj.row_id
372
        )
373

374
    def get_function_io_catalog_output_entries(
1✔
375
        self, function_obj: FunctionCatalogEntry
376
    ) -> List[FunctionIOCatalogEntry]:
377
        return self._function_io_service.get_output_entries_by_function_id(
1✔
378
            function_obj.row_id
379
        )
380

381
    """ Index related services. """
1✔
382

383
    def insert_index_catalog_entry(
1✔
384
        self,
385
        name: str,
386
        save_file_path: str,
387
        vector_store_type: VectorStoreType,
388
        feat_column: ColumnCatalogEntry,
389
        function_signature: str,
390
    ) -> IndexCatalogEntry:
391
        index_catalog_entry = self._index_service.insert_entry(
×
392
            name, save_file_path, vector_store_type, feat_column, function_signature
393
        )
394
        return index_catalog_entry
×
395

396
    def get_index_catalog_entry_by_name(self, name: str) -> IndexCatalogEntry:
1✔
397
        return self._index_service.get_entry_by_name(name)
×
398

399
    def get_index_catalog_entry_by_column_and_function_signature(
1✔
400
        self, column: ColumnCatalogEntry, function_signature: str
401
    ):
402
        return self._index_service.get_entry_by_column_and_function_signature(
×
403
            column, function_signature
404
        )
405

406
    def drop_index_catalog_entry(self, index_name: str) -> bool:
1✔
407
        return self._index_service.delete_entry_by_name(index_name)
×
408

409
    def get_all_index_catalog_entries(self):
1✔
410
        return self._index_service.get_all_entries()
×
411

412
    """ Function Cache related"""
1✔
413

414
    def insert_function_cache_catalog_entry(self, func_expr: FunctionExpression):
1✔
415
        cache_dir = self._config.get_value("storage", "cache_dir")
×
416
        entry = construct_function_cache_catalog_entry(func_expr, cache_dir=cache_dir)
×
417
        return self._function_cache_service.insert_entry(entry)
×
418

419
    def get_function_cache_catalog_entry_by_name(
1✔
420
        self, name: str
421
    ) -> FunctionCacheCatalogEntry:
422
        return self._function_cache_service.get_entry_by_name(name)
×
423

424
    def drop_function_cache_catalog_entry(
1✔
425
        self, entry: FunctionCacheCatalogEntry
426
    ) -> bool:
427
        # remove the data structure associated with the entry
428
        if entry:
×
429
            shutil.rmtree(entry.cache_path)
×
430
        return self._function_cache_service.delete_entry(entry)
×
431

432
    """ function Metadata Catalog"""
1✔
433

434
    def get_function_metadata_entries_by_function_name(
1✔
435
        self, function_name: str
436
    ) -> List[FunctionMetadataCatalogEntry]:
437
        """
438
        Get the function metadata information for the provided function.
439

440
        Arguments:
441
             function_name (str): name of the function
442

443
        Returns:
444
            FunctionMetadataCatalogEntry objects
445
        """
446
        function_entry = self.get_function_catalog_entry_by_name(function_name)
×
447
        if function_entry:
×
448
            entries = self._function_metadata_service.get_entries_by_function_id(
×
449
                function_entry.row_id
450
            )
451
            return entries
×
452
        else:
453
            return []
×
454

455
    """ Utils """
1✔
456

457
    def create_and_insert_table_catalog_entry(
1✔
458
        self,
459
        table_info: TableInfo,
460
        columns: List[ColumnDefinition],
461
        identifier_column: str = None,
462
        table_type: TableType = TableType.STRUCTURED_DATA,
463
    ) -> TableCatalogEntry:
464
        """Create a valid table catalog tuple and insert into the table
465

466
        Args:
467
            table_info (TableInfo): table info object
468
            columns (List[ColumnDefinition]): columns definitions of the table
469
            identifier_column (str, optional): Specify unique columns. Defaults to None.
470
            table_type (TableType, optional): table type. Defaults to TableType.STRUCTURED_DATA.
471

472
        Returns:
473
            TableCatalogEntry: entry that has been inserted into the table catalog
474
        """
475
        table_name = table_info.table_name
1✔
476
        column_catalog_entries = xform_column_definitions_to_catalog_entries(columns)
1✔
477

478
        dataset_location = self._config.get_value("core", "datasets_dir")
1✔
479
        file_url = str(generate_file_path(dataset_location, table_name))
1✔
480
        table_catalog_entry = self.insert_table_catalog_entry(
1✔
481
            table_name,
482
            file_url,
483
            column_catalog_entries,
484
            identifier_column=identifier_column,
485
            table_type=table_type,
486
        )
487
        return table_catalog_entry
1✔
488

489
    def create_and_insert_multimedia_table_catalog_entry(
1✔
490
        self, name: str, format_type: FileFormatType
491
    ) -> TableCatalogEntry:
492
        """Create a table catalog entry for the multimedia table.
493
        Depending on the type of multimedia, the appropriate "create catalog entry" command is called.
494

495
        Args:
496
            name (str):  name of the table catalog entry
497
            format_type (FileFormatType): media type
498

499
        Raises:
500
            CatalogError: if format_type is not supported
501

502
        Returns:
503
            TableCatalogEntry: newly inserted table catalog entry
504
        """
505
        assert format_type in [
1✔
506
            FileFormatType.VIDEO,
507
            FileFormatType.IMAGE,
508
            FileFormatType.DOCUMENT,
509
            FileFormatType.PDF,
510
        ], f"Format Type {format_type} is not supported"
511

512
        if format_type is FileFormatType.VIDEO:
1✔
513
            columns = get_video_table_column_definitions()
1✔
514
            table_type = TableType.VIDEO_DATA
1✔
515
        elif format_type is FileFormatType.IMAGE:
×
516
            columns = get_image_table_column_definitions()
×
517
            table_type = TableType.IMAGE_DATA
×
518
        elif format_type is FileFormatType.DOCUMENT:
×
519
            columns = get_document_table_column_definitions()
×
520
            table_type = TableType.DOCUMENT_DATA
×
521
        elif format_type is FileFormatType.PDF:
×
522
            columns = get_pdf_table_column_definitions()
×
523
            table_type = TableType.PDF_DATA
×
524

525
        return self.create_and_insert_table_catalog_entry(
1✔
526
            TableInfo(name), columns, table_type=table_type
527
        )
528

529
    def get_multimedia_metadata_table_catalog_entry(
1✔
530
        self, input_table: TableCatalogEntry
531
    ) -> TableCatalogEntry:
532
        """Get table catalog entry for multimedia metadata table.
533
        Raise if it does not exists
534
        Args:
535
            input_table (TableCatalogEntryEntryEntryEntry): input media table
536

537
        Returns:
538
            TableCatalogEntry: metainfo table entry which is maintained by the system
539
        """
540
        # use file_url as the metadata table name
541
        media_metadata_name = Path(input_table.file_url).stem
1✔
542
        obj = self.get_table_catalog_entry(media_metadata_name)
1✔
543
        assert (
1✔
544
            obj is not None
545
        ), f"Table with name {media_metadata_name} does not exist in catalog"
546

547
        return obj
1✔
548

549
    def create_and_insert_multimedia_metadata_table_catalog_entry(
1✔
550
        self, input_table: TableCatalogEntry
551
    ) -> TableCatalogEntry:
552
        """Create and insert table catalog entry for multimedia metadata table.
553
         This table is used to store all media filenames and related information. In
554
         order to prevent direct access or modification by users, it should be
555
         designated as a SYSTEM_STRUCTURED_DATA type.
556
         **Note**: this table is managed by the storage engine, so it should not be
557
         called elsewhere.
558
        Args:
559
            input_table (TableCatalogEntry): input video table
560

561
        Returns:
562
            TableCatalogEntry: metainfo table entry which is maintained by the system
563
        """
564
        # use file_url as the metadata table name
565
        media_metadata_name = Path(input_table.file_url).stem
1✔
566
        obj = self.get_table_catalog_entry(media_metadata_name)
1✔
567
        assert obj is None, "Table with name {media_metadata_name} already exists"
1✔
568

569
        columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
1✔
570
        obj = self.create_and_insert_table_catalog_entry(
1✔
571
            TableInfo(media_metadata_name),
572
            columns,
573
            identifier_column=columns[0].name,
574
            table_type=TableType.SYSTEM_STRUCTURED_DATA,
575
        )
576
        return obj
1✔
577

578

579
#### get catalog instance
580
# This function plays a crucial role in ensuring that different threads do
581
# not share the same catalog object, as it can result in serialization issues and
582
# incorrect behavior with SQLAlchemy. Therefore, whenever a catalog instance is
583
# required, we create a new one. One possible optimization is to share the catalog
584
# instance across all objects within the same thread. It is worth investigating whether
585
# SQLAlchemy already handles this optimization for us, which will be explored at a
586
# later time.
587
def get_catalog_instance(db_uri: str, config: ConfigurationManager):
1✔
588
    return CatalogManager(db_uri, config)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc