• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / #758

04 Sep 2023 08:37PM UTC coverage: 0.0% (-78.3%) from 78.333%
#758

push

circle-ci

hershd23
Increased underline length in at line 75 in text_summarization.rst
	modified:   docs/source/benchmarks/text_summarization.rst

0 of 11303 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/evadb/catalog/catalog_manager.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import shutil
×
16
from pathlib import Path
×
17
from typing import List
×
18

19
from evadb.catalog.catalog_type import (
×
20
    ColumnType,
21
    TableType,
22
    VectorStoreType,
23
    VideoColumnName,
24
)
25
from evadb.catalog.catalog_utils import (
×
26
    cleanup_storage,
27
    construct_udf_cache_catalog_entry,
28
    get_document_table_column_definitions,
29
    get_image_table_column_definitions,
30
    get_pdf_table_column_definitions,
31
    get_video_table_column_definitions,
32
    xform_column_definitions_to_catalog_entries,
33
)
34
from evadb.catalog.models.utils import (
35
    ColumnCatalogEntry,
36
    DatabaseCatalogEntry,
37
    IndexCatalogEntry,
38
    TableCatalogEntry,
39
    UdfCacheCatalogEntry,
40
    UdfCatalogEntry,
41
    UdfCostCatalogEntry,
42
    UdfIOCatalogEntry,
43
    UdfMetadataCatalogEntry,
44
    drop_all_tables_except_catalog,
45
    init_db,
46
    truncate_catalog_tables,
47
)
48
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
×
49
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
×
50
from evadb.catalog.services.index_catalog_service import IndexCatalogService
×
51
from evadb.catalog.services.table_catalog_service import TableCatalogService
×
52
from evadb.catalog.services.udf_cache_catalog_service import UdfCacheCatalogService
×
53
from evadb.catalog.services.udf_catalog_service import UdfCatalogService
×
54
from evadb.catalog.services.udf_cost_catalog_service import UdfCostCatalogService
×
55
from evadb.catalog.services.udf_io_catalog_service import UdfIOCatalogService
×
56
from evadb.catalog.services.udf_metadata_catalog_service import (
×
57
    UdfMetadataCatalogService,
58
)
59
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
×
60
from evadb.configuration.configuration_manager import ConfigurationManager
×
61
from evadb.expression.function_expression import FunctionExpression
×
62
from evadb.parser.create_statement import ColumnDefinition
×
63
from evadb.parser.table_ref import TableInfo
×
64
from evadb.parser.types import FileFormatType
×
65
from evadb.utils.generic_utils import generate_file_path, get_file_checksum
×
66
from evadb.utils.logging_manager import logger
×
67

68

69
class CatalogManager(object):
×
70
    def __init__(self, db_uri: str, config: ConfigurationManager):
×
71
        self._db_uri = db_uri
×
72
        self._sql_config = SQLConfig(db_uri)
×
73
        self._config = config
×
74
        self._bootstrap_catalog()
×
75
        self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
×
76
        self._table_catalog_service = TableCatalogService(self._sql_config.session)
×
77
        self._column_service = ColumnCatalogService(self._sql_config.session)
×
78
        self._udf_service = UdfCatalogService(self._sql_config.session)
×
79
        self._udf_cost_catalog_service = UdfCostCatalogService(self._sql_config.session)
×
80
        self._udf_io_service = UdfIOCatalogService(self._sql_config.session)
×
81
        self._udf_metadata_service = UdfMetadataCatalogService(self._sql_config.session)
×
82
        self._index_service = IndexCatalogService(self._sql_config.session)
×
83
        self._udf_cache_service = UdfCacheCatalogService(self._sql_config.session)
×
84

85
    @property
×
86
    def sql_config(self):
×
87
        return self._sql_config
×
88

89
    def reset(self):
×
90
        """
91
        This method resets the state of the singleton instance.
92
        It should clear the contents of the catalog tables and any storage data
93
        Used by testcases to reset the db state before
94
        """
95
        self._clear_catalog_contents()
×
96

97
    def close(self):
×
98
        """
99
        This method closes all the connections
100
        """
101
        if self.sql_config is not None:
×
102
            sqlalchemy_engine = self.sql_config.engine
×
103
            sqlalchemy_engine.dispose()
×
104

105
    def _bootstrap_catalog(self):
×
106
        """Bootstraps catalog.
107
        This method runs all tasks required for using catalog. Currently,
108
        it includes only one task ie. initializing database. It creates the
109
        catalog database and tables if they do not exist.
110
        """
111
        logger.info("Bootstrapping catalog")
×
112
        init_db(self._sql_config.engine)
×
113

114
    def _clear_catalog_contents(self):
×
115
        """
116
        This method is responsible for clearing the contents of the
117
        catalog. It clears the tuples in the catalog tables, indexes, and cached data.
118
        """
119
        logger.info("Clearing catalog")
×
120
        # drop tables which are not part of catalog
121
        drop_all_tables_except_catalog(self._sql_config.engine)
122
        # truncate the catalog tables
123
        truncate_catalog_tables(self._sql_config.engine)
×
124
        # clean up the dataset, index, and cache directories
125
        cleanup_storage(self._config)
×
126

127
    "Database catalog services"
×
128

129
    def insert_database_catalog_entry(self, name: str, engine: str, params: dict):
×
130
        """A new entry is persisted in the database catalog."
131

132
        Args:
133
            name: database name
134
            engine: engine name
135
            params: required params as a dictionary for the database
136
        """
137
        self._db_catalog_service.insert_entry(name, engine, params)
×
138

139
    def get_database_catalog_entry(self, database_name: str) -> DatabaseCatalogEntry:
×
140
        """
141
        Returns the database catalog entry for the given database_name
142
        Arguments:
143
            database_name (str): name of the database
144

145
        Returns:
146
            DatabaseCatalogEntry
147
        """
148

149
        table_entry = self._db_catalog_service.get_entry_by_name(database_name)
×
150

151
        return table_entry
×
152

153
    def delete_database_catalog_entry(
×
154
        self, database_entry: DatabaseCatalogEntry
155
    ) -> bool:
156
        """
157
        This method deletes the database from  catalog.
158

159
        Arguments:
160
           database_entry: database catalog entry to remove
161

162
        Returns:
163
           True if successfully deleted else False
164
        """
165
        # todo: do we need to remove also the associated tables etc or that will be
166
        # taken care by the underlying db
167
        return self._db_catalog_service.delete_entry(database_entry)
×
168

169
    "Table catalog services"
×
170

171
    def insert_table_catalog_entry(
×
172
        self,
173
        name: str,
174
        file_url: str,
175
        column_list: List[ColumnCatalogEntry],
176
        identifier_column="id",
177
        table_type=TableType.VIDEO_DATA,
178
    ) -> TableCatalogEntry:
179
        """A new entry is added to the table catalog and persisted in the database.
180
        The schema field is set before the object is returned."
181

182
        Args:
183
            name: table name
184
            file_url: #todo
185
            column_list: list of columns
186
            identifier_column (str):  A unique identifier column for each row
187
            table_type (TableType): type of the table, video, images etc
188
        Returns:
189
            The persisted TableCatalogEntry object with the id field populated.
190
        """
191

192
        # Append row_id to table column list.
193
        column_list = [
×
194
            ColumnCatalogEntry(name=IDENTIFIER_COLUMN, type=ColumnType.INTEGER)
195
        ] + column_list
196

197
        table_entry = self._table_catalog_service.insert_entry(
×
198
            name,
199
            file_url,
200
            identifier_column=identifier_column,
201
            table_type=table_type,
202
            column_list=column_list,
203
        )
204

205
        return table_entry
×
206

207
    def get_table_catalog_entry(
×
208
        self, table_name: str, database_name: str = None
209
    ) -> TableCatalogEntry:
210
        """
211
        Returns the table catalog entry for the given table name
212
        Arguments:
213
            table_name (str): name of the table
214

215
        Returns:
216
            TableCatalogEntry
217
        """
218

219
        table_entry = self._table_catalog_service.get_entry_by_name(
×
220
            database_name, table_name
221
        )
222

223
        return table_entry
×
224

225
    def delete_table_catalog_entry(self, table_entry: TableCatalogEntry) -> bool:
×
226
        """
227
        This method deletes the table along with its columns from table catalog
228
        and column catalog respectively
229

230
        Arguments:
231
           table: table catalog entry to remove
232

233
        Returns:
234
           True if successfully deleted else False
235
        """
236
        return self._table_catalog_service.delete_entry(table_entry)
×
237

238
    def rename_table_catalog_entry(
×
239
        self, curr_table: TableCatalogEntry, new_name: TableInfo
240
    ):
241
        return self._table_catalog_service.rename_entry(curr_table, new_name.table_name)
×
242

243
    def check_table_exists(self, table_name: str, database_name: str = None):
×
244
        table_entry = self._table_catalog_service.get_entry_by_name(
×
245
            database_name, table_name
246
        )
247
        if table_entry is None:
×
248
            return False
×
249
        else:
250
            return True
×
251

252
    def get_all_table_catalog_entries(self):
×
253
        return self._table_catalog_service.get_all_entries()
×
254

255
    "Column catalog services"
×
256

257
    def get_column_catalog_entry(
×
258
        self, table_obj: TableCatalogEntry, col_name: str
259
    ) -> ColumnCatalogEntry:
260
        col_obj = self._column_service.filter_entry_by_table_id_and_name(
×
261
            table_obj.row_id, col_name
262
        )
263
        if col_obj:
×
264
            return col_obj
×
265
        else:
266
            # return a dummy column catalog entry for audio, even though it does not defined for videos
267
            if col_name == VideoColumnName.audio:
×
268
                return ColumnCatalogEntry(
×
269
                    col_name,
270
                    ColumnType.NDARRAY,
271
                    table_id=table_obj.row_id,
272
                    table_name=table_obj.name,
273
                )
274
            return None
×
275

276
    def get_column_catalog_entries_by_table(self, table_obj: TableCatalogEntry):
×
277
        col_entries = self._column_service.filter_entries_by_table(table_obj)
×
278
        return col_entries
×
279

280
    "udf catalog services"
×
281

282
    def insert_udf_catalog_entry(
×
283
        self,
284
        name: str,
285
        impl_file_path: str,
286
        type: str,
287
        udf_io_list: List[UdfIOCatalogEntry],
288
        udf_metadata_list: List[UdfMetadataCatalogEntry],
289
    ) -> UdfCatalogEntry:
290
        """Inserts a UDF catalog entry along with UDF_IO entries.
291
        It persists the entry to the database.
292

293
        Arguments:
294
            name(str): name of the udf
295
            impl_file_path(str): implementation path of the udf
296
            type(str): what kind of udf operator like classification,
297
                                                        detection etc
298
            udf_io_list(List[UdfIOCatalogEntry]): input/output udf info list
299

300
        Returns:
301
            The persisted UdfCatalogEntry object.
302
        """
303

304
        checksum = get_file_checksum(impl_file_path)
×
305
        udf_entry = self._udf_service.insert_entry(name, impl_file_path, type, checksum)
×
306
        for udf_io in udf_io_list:
×
307
            udf_io.udf_id = udf_entry.row_id
×
308
        self._udf_io_service.insert_entries(udf_io_list)
×
309
        for udf_metadata in udf_metadata_list:
×
310
            udf_metadata.udf_id = udf_entry.row_id
×
311
        self._udf_metadata_service.insert_entries(udf_metadata_list)
×
312
        return udf_entry
×
313

314
    def get_udf_catalog_entry_by_name(self, name: str) -> UdfCatalogEntry:
×
315
        """
316
        Get the UDF information based on name.
317

318
        Arguments:
319
             name (str): name of the UDF
320

321
        Returns:
322
            UdfCatalogEntry object
323
        """
324
        return self._udf_service.get_entry_by_name(name)
×
325

326
    def delete_udf_catalog_entry_by_name(self, udf_name: str) -> bool:
×
327
        return self._udf_service.delete_entry_by_name(udf_name)
×
328

329
    def get_all_udf_catalog_entries(self):
×
330
        return self._udf_service.get_all_entries()
×
331

332
    "udf cost catalog services"
×
333

334
    def upsert_udf_cost_catalog_entry(
×
335
        self, udf_id: int, name: str, cost: int
336
    ) -> UdfCostCatalogEntry:
337
        """Upserts UDF cost catalog entry.
338

339
        Arguments:
340
            udf_id(int): unique udf id
341
            name(str): the name of the udf
342
            cost(int): cost of this UDF
343

344
        Returns:
345
            The persisted UdfCostCatalogEntry object.
346
        """
347

348
        self._udf_cost_catalog_service.upsert_entry(udf_id, name, cost)
×
349

350
    def get_udf_cost_catalog_entry(self, name: str):
×
351
        return self._udf_cost_catalog_service.get_entry_by_name(name)
×
352

353
    "UdfIO services"
×
354

355
    def get_udf_io_catalog_input_entries(
×
356
        self, udf_obj: UdfCatalogEntry
357
    ) -> List[UdfIOCatalogEntry]:
358
        return self._udf_io_service.get_input_entries_by_udf_id(udf_obj.row_id)
×
359

360
    def get_udf_io_catalog_output_entries(
×
361
        self, udf_obj: UdfCatalogEntry
362
    ) -> List[UdfIOCatalogEntry]:
363
        return self._udf_io_service.get_output_entries_by_udf_id(udf_obj.row_id)
×
364

365
    """ Index related services. """
×
366

367
    def insert_index_catalog_entry(
×
368
        self,
369
        name: str,
370
        save_file_path: str,
371
        vector_store_type: VectorStoreType,
372
        feat_column: ColumnCatalogEntry,
373
        udf_signature: str,
374
    ) -> IndexCatalogEntry:
375
        index_catalog_entry = self._index_service.insert_entry(
×
376
            name, save_file_path, vector_store_type, feat_column, udf_signature
377
        )
378
        return index_catalog_entry
×
379

380
    def get_index_catalog_entry_by_name(self, name: str) -> IndexCatalogEntry:
×
381
        return self._index_service.get_entry_by_name(name)
×
382

383
    def get_index_catalog_entry_by_column_and_udf_signature(
×
384
        self, column: ColumnCatalogEntry, udf_signature: str
385
    ):
386
        return self._index_service.get_entry_by_column_and_udf_signature(
×
387
            column, udf_signature
388
        )
389

390
    def drop_index_catalog_entry(self, index_name: str) -> bool:
×
391
        return self._index_service.delete_entry_by_name(index_name)
×
392

393
    def get_all_index_catalog_entries(self):
×
394
        return self._index_service.get_all_entries()
×
395

396
    """ Udf Cache related"""
×
397

398
    def insert_udf_cache_catalog_entry(self, func_expr: FunctionExpression):
×
399
        cache_dir = self._config.get_value("storage", "cache_dir")
×
400
        entry = construct_udf_cache_catalog_entry(func_expr, cache_dir=cache_dir)
×
401
        return self._udf_cache_service.insert_entry(entry)
×
402

403
    def get_udf_cache_catalog_entry_by_name(self, name: str) -> UdfCacheCatalogEntry:
×
404
        return self._udf_cache_service.get_entry_by_name(name)
×
405

406
    def drop_udf_cache_catalog_entry(self, entry: UdfCacheCatalogEntry) -> bool:
×
407
        # remove the data structure associated with the entry
408
        if entry:
×
409
            shutil.rmtree(entry.cache_path)
×
410
        return self._udf_cache_service.delete_entry(entry)
×
411

412
    """ UDF Metadata Catalog"""
×
413

414
    def get_udf_metadata_entries_by_udf_name(
×
415
        self, udf_name: str
416
    ) -> List[UdfMetadataCatalogEntry]:
417
        """
418
        Get the UDF metadata information for the provided udf.
419

420
        Arguments:
421
             udf_name (str): name of the UDF
422

423
        Returns:
424
            UdfMetadataCatalogEntry objects
425
        """
426
        udf_entry = self.get_udf_catalog_entry_by_name(udf_name)
×
427
        if udf_entry:
×
428
            entries = self._udf_metadata_service.get_entries_by_udf_id(udf_entry.row_id)
×
429
            return entries
×
430
        else:
431
            return []
×
432

433
    """ Utils """
×
434

435
    def create_and_insert_table_catalog_entry(
×
436
        self,
437
        table_info: TableInfo,
438
        columns: List[ColumnDefinition],
439
        identifier_column: str = None,
440
        table_type: TableType = TableType.STRUCTURED_DATA,
441
    ) -> TableCatalogEntry:
442
        """Create a valid table catalog tuple and insert into the table
443

444
        Args:
445
            table_info (TableInfo): table info object
446
            columns (List[ColumnDefinition]): columns definitions of the table
447
            identifier_column (str, optional): Specify unique columns. Defaults to None.
448
            table_type (TableType, optional): table type. Defaults to TableType.STRUCTURED_DATA.
449

450
        Returns:
451
            TableCatalogEntry: entry that has been inserted into the table catalog
452
        """
453
        table_name = table_info.table_name
×
454
        column_catalog_entries = xform_column_definitions_to_catalog_entries(columns)
×
455

456
        dataset_location = self._config.get_value("core", "datasets_dir")
×
457
        file_url = str(generate_file_path(dataset_location, table_name))
×
458
        table_catalog_entry = self.insert_table_catalog_entry(
×
459
            table_name,
460
            file_url,
461
            column_catalog_entries,
462
            identifier_column=identifier_column,
463
            table_type=table_type,
464
        )
465
        return table_catalog_entry
×
466

467
    def create_and_insert_multimedia_table_catalog_entry(
×
468
        self, name: str, format_type: FileFormatType
469
    ) -> TableCatalogEntry:
470
        """Create a table catalog entry for the multimedia table.
471
        Depending on the type of multimedia, the appropriate "create catalog entry" command is called.
472

473
        Args:
474
            name (str):  name of the table catalog entry
475
            format_type (FileFormatType): media type
476

477
        Raises:
478
            CatalogError: if format_type is not supported
479

480
        Returns:
481
            TableCatalogEntry: newly inserted table catalog entry
482
        """
483
        assert format_type in [
×
484
            FileFormatType.VIDEO,
485
            FileFormatType.IMAGE,
486
            FileFormatType.DOCUMENT,
487
            FileFormatType.PDF,
488
        ], f"Format Type {format_type} is not supported"
489

490
        if format_type is FileFormatType.VIDEO:
×
491
            columns = get_video_table_column_definitions()
×
492
            table_type = TableType.VIDEO_DATA
×
493
        elif format_type is FileFormatType.IMAGE:
×
494
            columns = get_image_table_column_definitions()
×
495
            table_type = TableType.IMAGE_DATA
×
496
        elif format_type is FileFormatType.DOCUMENT:
×
497
            columns = get_document_table_column_definitions()
×
498
            table_type = TableType.DOCUMENT_DATA
×
499
        elif format_type is FileFormatType.PDF:
×
500
            columns = get_pdf_table_column_definitions()
×
501
            table_type = TableType.PDF_DATA
×
502

503
        return self.create_and_insert_table_catalog_entry(
×
504
            TableInfo(name), columns, table_type=table_type
505
        )
506

507
    def get_multimedia_metadata_table_catalog_entry(
×
508
        self, input_table: TableCatalogEntry
509
    ) -> TableCatalogEntry:
510
        """Get table catalog entry for multimedia metadata table.
511
        Raise if it does not exists
512
        Args:
513
            input_table (TableCatalogEntryEntryEntryEntry): input media table
514

515
        Returns:
516
            TableCatalogEntry: metainfo table entry which is maintained by the system
517
        """
518
        # use file_url as the metadata table name
519
        media_metadata_name = Path(input_table.file_url).stem
×
520
        obj = self.get_table_catalog_entry(media_metadata_name)
×
521
        assert (
×
522
            obj is not None
523
        ), f"Table with name {media_metadata_name} does not exist in catalog"
524

525
        return obj
×
526

527
    def create_and_insert_multimedia_metadata_table_catalog_entry(
×
528
        self, input_table: TableCatalogEntry
529
    ) -> TableCatalogEntry:
530
        """Create and insert table catalog entry for multimedia metadata table.
531
         This table is used to store all media filenames and related information. In
532
         order to prevent direct access or modification by users, it should be
533
         designated as a SYSTEM_STRUCTURED_DATA type.
534
         **Note**: this table is managed by the storage engine, so it should not be
535
         called elsewhere.
536
        Args:
537
            input_table (TableCatalogEntry): input video table
538

539
        Returns:
540
            TableCatalogEntry: metainfo table entry which is maintained by the system
541
        """
542
        # use file_url as the metadata table name
543
        media_metadata_name = Path(input_table.file_url).stem
×
544
        obj = self.get_table_catalog_entry(media_metadata_name)
×
545
        assert obj is None, "Table with name {media_metadata_name} already exists"
×
546

547
        columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
×
548
        obj = self.create_and_insert_table_catalog_entry(
×
549
            TableInfo(media_metadata_name),
550
            columns,
551
            identifier_column=columns[0].name,
552
            table_type=TableType.SYSTEM_STRUCTURED_DATA,
553
        )
554
        return obj
×
555

556

557
#### get catalog instance
558
# This function plays a crucial role in ensuring that different threads do
559
# not share the same catalog object, as it can result in serialization issues and
560
# incorrect behavior with SQLAlchemy. Therefore, whenever a catalog instance is
561
# required, we create a new one. One possible optimization is to share the catalog
562
# instance across all objects within the same thread. It is worth investigating whether
563
# SQLAlchemy already handles this optimization for us, which will be explored at a
564
# later time.
565
def get_catalog_instance(db_uri: str, config: ConfigurationManager):
×
566
    return CatalogManager(db_uri, config)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc