• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a75d4cdb-5478-400d-88b6-c219d38db53a

27 Oct 2023 06:59AM UTC coverage: 72.758% (+72.8%) from 0.0%
a75d4cdb-5478-400d-88b6-c219d38db53a

Pull #1308

circle-ci

dungnmaster
linting changes
Pull Request #1308: Job scheduler changes

289 of 289 new or added lines in 17 files covered. (100.0%)

9332 of 12826 relevant lines covered (72.76%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.43
/evadb/catalog/catalog_manager.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import datetime
1✔
16
import shutil
1✔
17
from pathlib import Path
1✔
18
from typing import List
1✔
19

20
from evadb.catalog.catalog_type import (
1✔
21
    ColumnType,
22
    TableType,
23
    VectorStoreType,
24
    VideoColumnName,
25
)
26
from evadb.catalog.catalog_utils import (
1✔
27
    cleanup_storage,
28
    construct_function_cache_catalog_entry,
29
    get_document_table_column_definitions,
30
    get_image_table_column_definitions,
31
    get_pdf_table_column_definitions,
32
    get_video_table_column_definitions,
33
    xform_column_definitions_to_catalog_entries,
34
)
35
from evadb.catalog.models.utils import (
36
    ColumnCatalogEntry,
37
    DatabaseCatalogEntry,
38
    FunctionCacheCatalogEntry,
39
    FunctionCatalogEntry,
40
    FunctionCostCatalogEntry,
41
    FunctionIOCatalogEntry,
42
    FunctionMetadataCatalogEntry,
43
    IndexCatalogEntry,
44
    JobCatalogEntry,
45
    TableCatalogEntry,
46
    drop_all_tables_except_catalog,
47
    init_db,
48
    truncate_catalog_tables,
49
)
50
from evadb.catalog.services.column_catalog_service import ColumnCatalogService
1✔
51
from evadb.catalog.services.database_catalog_service import DatabaseCatalogService
1✔
52
from evadb.catalog.services.function_cache_catalog_service import (
1✔
53
    FunctionCacheCatalogService,
54
)
55
from evadb.catalog.services.function_catalog_service import FunctionCatalogService
1✔
56
from evadb.catalog.services.function_cost_catalog_service import (
1✔
57
    FunctionCostCatalogService,
58
)
59
from evadb.catalog.services.function_io_catalog_service import FunctionIOCatalogService
1✔
60
from evadb.catalog.services.function_metadata_catalog_service import (
1✔
61
    FunctionMetadataCatalogService,
62
)
63
from evadb.catalog.services.index_catalog_service import IndexCatalogService
1✔
64
from evadb.catalog.services.job_catalog_service import JobCatalogService
1✔
65
from evadb.catalog.services.table_catalog_service import TableCatalogService
1✔
66
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
1✔
67
from evadb.configuration.configuration_manager import ConfigurationManager
1✔
68
from evadb.expression.function_expression import FunctionExpression
1✔
69
from evadb.parser.create_statement import ColumnDefinition
1✔
70
from evadb.parser.table_ref import TableInfo
1✔
71
from evadb.parser.types import FileFormatType
1✔
72
from evadb.third_party.databases.interface import get_database_handler
1✔
73
from evadb.utils.generic_utils import generate_file_path, get_file_checksum
1✔
74
from evadb.utils.logging_manager import logger
1✔
75

76

77
class CatalogManager(object):
1✔
78
    def __init__(self, db_uri: str, config: ConfigurationManager):
1✔
79
        self._db_uri = db_uri
1✔
80
        self._sql_config = SQLConfig(db_uri)
1✔
81
        self._config = config
1✔
82
        self._bootstrap_catalog()
1✔
83
        self._db_catalog_service = DatabaseCatalogService(self._sql_config.session)
1✔
84
        self._job_catalog_service = JobCatalogService(self._sql_config.session)
1✔
85
        self._table_catalog_service = TableCatalogService(self._sql_config.session)
1✔
86
        self._column_service = ColumnCatalogService(self._sql_config.session)
1✔
87
        self._function_service = FunctionCatalogService(self._sql_config.session)
1✔
88
        self._function_cost_catalog_service = FunctionCostCatalogService(
1✔
89
            self._sql_config.session
90
        )
91
        self._function_io_service = FunctionIOCatalogService(self._sql_config.session)
1✔
92
        self._function_metadata_service = FunctionMetadataCatalogService(
1✔
93
            self._sql_config.session
94
        )
95
        self._index_service = IndexCatalogService(self._sql_config.session)
1✔
96
        self._function_cache_service = FunctionCacheCatalogService(
1✔
97
            self._sql_config.session
98
        )
99

100
    @property
1✔
101
    def sql_config(self):
1✔
102
        return self._sql_config
1✔
103

104
    def reset(self):
1✔
105
        """
106
        This method resets the state of the singleton instance.
107
        It should clear the contents of the catalog tables and any storage data
108
        Used by testcases to reset the db state before
109
        """
110
        self._clear_catalog_contents()
1✔
111

112
    def close(self):
1✔
113
        """
114
        This method closes all the connections
115
        """
116
        if self.sql_config is not None:
×
117
            sqlalchemy_engine = self.sql_config.engine
×
118
            sqlalchemy_engine.dispose()
×
119

120
    def _bootstrap_catalog(self):
1✔
121
        """Bootstraps catalog.
122
        This method runs all tasks required for using catalog. Currently,
123
        it includes only one task ie. initializing database. It creates the
124
        catalog database and tables if they do not exist.
125
        """
126
        logger.info("Bootstrapping catalog")
1✔
127
        init_db(self._sql_config.engine)
1✔
128

129
    def _clear_catalog_contents(self):
1✔
130
        """
131
        This method is responsible for clearing the contents of the
132
        catalog. It clears the tuples in the catalog tables, indexes, and cached data.
133
        """
134
        logger.info("Clearing catalog")
1✔
135
        # drop tables which are not part of catalog
136
        drop_all_tables_except_catalog(self._sql_config.engine)
137
        # truncate the catalog tables
138
        truncate_catalog_tables(self._sql_config.engine)
1✔
139
        # clean up the dataset, index, and cache directories
140
        cleanup_storage(self._config)
1✔
141

142
    "Database catalog services"
1✔
143

144
    def insert_database_catalog_entry(self, name: str, engine: str, params: dict):
1✔
145
        """A new entry is persisted in the database catalog."
146

147
        Args:
148
            name: database name
149
            engine: engine name
150
            params: required params as a dictionary for the database
151
        """
152
        self._db_catalog_service.insert_entry(name, engine, params)
×
153

154
    def get_database_catalog_entry(self, database_name: str) -> DatabaseCatalogEntry:
1✔
155
        """
156
        Returns the database catalog entry for the given database_name
157
        Arguments:
158
            database_name (str): name of the database
159

160
        Returns:
161
            DatabaseCatalogEntry
162
        """
163

164
        table_entry = self._db_catalog_service.get_entry_by_name(database_name)
×
165

166
        return table_entry
×
167

168
    def drop_database_catalog_entry(self, database_entry: DatabaseCatalogEntry) -> bool:
1✔
169
        """
170
        This method deletes the database from  catalog.
171

172
        Arguments:
173
           database_entry: database catalog entry to remove
174

175
        Returns:
176
           True if successfully deleted else False
177
        """
178
        # todo: do we need to remove also the associated tables etc or that will be
179
        # taken care by the underlying db
180
        return self._db_catalog_service.delete_entry(database_entry)
×
181

182
    def check_native_table_exists(self, table_name: str, database_name: str):
1✔
183
        """
184
        Validate the database is valid and the requested table in database is
185
        also valid.
186
        """
187
        db_catalog_entry = self.get_database_catalog_entry(database_name)
×
188

189
        if db_catalog_entry is None:
×
190
            return False
×
191

192
        with get_database_handler(
×
193
            db_catalog_entry.engine, **db_catalog_entry.params
194
        ) as handler:
195
            # Get table definition.
196
            resp = handler.get_tables()
×
197

198
            if resp.error is not None:
×
199
                raise Exception(resp.error)
200

201
            # Check table existence.
202
            table_df = resp.data
×
203
            if table_name not in table_df["table_name"].values:
×
204
                return False
×
205

206
        return True
×
207

208
    "Job catalog services"
1✔
209

210
    def insert_job_catalog_entry(
1✔
211
        self,
212
        name: str,
213
        queries: str,
214
        start_time: datetime,
215
        end_time: datetime,
216
        repeat_interval: int,
217
        active: bool,
218
        next_schedule_run: datetime,
219
    ) -> JobCatalogEntry:
220
        """A new entry is persisted in the job catalog."
221

222
        Args:
223
            name: job name
224
            queries: job's queries
225
            start_time: job start time
226
            end_time: job end time
227
            repeat_interval: job repeat interval
228
            active: job status
229
            next_schedule_run: next run time as per schedule
230
        """
231
        job_entry = self._job_catalog_service.insert_entry(
×
232
            name,
233
            queries,
234
            start_time,
235
            end_time,
236
            repeat_interval,
237
            active,
238
            next_schedule_run,
239
        )
240

241
        return job_entry
×
242

243
    def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry:
1✔
244
        """
245
        Returns the job catalog entry for the given database_name
246
        Arguments:
247
            job_name (str): name of the job
248

249
        Returns:
250
            JobCatalogEntry
251
        """
252

253
        table_entry = self._job_catalog_service.get_entry_by_name(job_name)
×
254

255
        return table_entry
×
256

257
    def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool:
1✔
258
        """
259
        This method deletes the job from  catalog.
260

261
        Arguments:
262
           job_entry: job catalog entry to remove
263

264
        Returns:
265
           True if successfully deleted else False
266
        """
267
        return self._job_catalog_service.delete_entry(job_entry)
×
268

269
    def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry:
1✔
270
        """Get the oldest job that is ready to be triggered by trigger time
271
        Arguments:
272
            only_past_jobs: boolean flag to denote if only jobs with trigger time in
273
                past should be considered
274
        Returns:
275
            Returns the first job to be triggered
276
        """
277
        return self._job_catalog_service.get_next_executable_job(only_past_jobs)
×
278

279
    def update_job_catalog_entry(
1✔
280
        self, job_name: str, next_scheduled_run: datetime, active: bool
281
    ):
282
        """Update the next_scheduled_run and active column as per the provided values
283
        Arguments:
284
            job_name (str): job which should be updated
285

286
            next_run_time (datetime): the next trigger time for the job
287

288
            active (bool): the active status for the job
289
        """
290
        self._job_catalog_service.update_next_scheduled_run(
×
291
            job_name, next_scheduled_run, active
292
        )
293

294
    "Table catalog services"
1✔
295

296
    def insert_table_catalog_entry(
1✔
297
        self,
298
        name: str,
299
        file_url: str,
300
        column_list: List[ColumnCatalogEntry],
301
        identifier_column="id",
302
        table_type=TableType.VIDEO_DATA,
303
    ) -> TableCatalogEntry:
304
        """A new entry is added to the table catalog and persisted in the database.
305
        The schema field is set before the object is returned."
306

307
        Args:
308
            name: table name
309
            file_url: #todo
310
            column_list: list of columns
311
            identifier_column (str):  A unique identifier column for each row
312
            table_type (TableType): type of the table, video, images etc
313
        Returns:
314
            The persisted TableCatalogEntry object with the id field populated.
315
        """
316

317
        # Append row_id to table column list.
318
        column_list = [
1✔
319
            ColumnCatalogEntry(name=IDENTIFIER_COLUMN, type=ColumnType.INTEGER)
320
        ] + column_list
321

322
        table_entry = self._table_catalog_service.insert_entry(
1✔
323
            name,
324
            file_url,
325
            identifier_column=identifier_column,
326
            table_type=table_type,
327
            column_list=column_list,
328
        )
329

330
        return table_entry
1✔
331

332
    def get_table_catalog_entry(
1✔
333
        self, table_name: str, database_name: str = None
334
    ) -> TableCatalogEntry:
335
        """
336
        Returns the table catalog entry for the given table name
337
        Arguments:
338
            table_name (str): name of the table
339

340
        Returns:
341
            TableCatalogEntry
342
        """
343

344
        table_entry = self._table_catalog_service.get_entry_by_name(
1✔
345
            database_name, table_name
346
        )
347

348
        return table_entry
1✔
349

350
    def delete_table_catalog_entry(self, table_entry: TableCatalogEntry) -> bool:
1✔
351
        """
352
        This method deletes the table along with its columns from table catalog
353
        and column catalog respectively
354

355
        Arguments:
356
           table: table catalog entry to remove
357

358
        Returns:
359
           True if successfully deleted else False
360
        """
361
        return self._table_catalog_service.delete_entry(table_entry)
1✔
362

363
    def rename_table_catalog_entry(
1✔
364
        self, curr_table: TableCatalogEntry, new_name: TableInfo
365
    ):
366
        return self._table_catalog_service.rename_entry(curr_table, new_name.table_name)
1✔
367

368
    def check_table_exists(self, table_name: str, database_name: str = None):
1✔
369
        is_native_table = database_name is not None
1✔
370

371
        if is_native_table:
1✔
372
            return self.check_native_table_exists(table_name, database_name)
×
373
        else:
374
            table_entry = self._table_catalog_service.get_entry_by_name(
1✔
375
                database_name, table_name
376
            )
377
            return table_entry is not None
1✔
378

379
    def get_all_table_catalog_entries(self):
1✔
380
        return self._table_catalog_service.get_all_entries()
×
381

382
    "Column catalog services"
1✔
383

384
    def get_column_catalog_entry(
1✔
385
        self, table_obj: TableCatalogEntry, col_name: str
386
    ) -> ColumnCatalogEntry:
387
        col_obj = self._column_service.filter_entry_by_table_id_and_name(
1✔
388
            table_obj.row_id, col_name
389
        )
390
        if col_obj:
1✔
391
            return col_obj
1✔
392
        else:
393
            # return a dummy column catalog entry for audio, even though it does not defined for videos
394
            if col_name == VideoColumnName.audio:
×
395
                return ColumnCatalogEntry(
×
396
                    col_name,
397
                    ColumnType.NDARRAY,
398
                    table_id=table_obj.row_id,
399
                    table_name=table_obj.name,
400
                )
401
            return None
×
402

403
    def get_column_catalog_entries_by_table(self, table_obj: TableCatalogEntry):
1✔
404
        col_entries = self._column_service.filter_entries_by_table(table_obj)
×
405
        return col_entries
×
406

407
    "function catalog services"
1✔
408

409
    def insert_function_catalog_entry(
1✔
410
        self,
411
        name: str,
412
        impl_file_path: str,
413
        type: str,
414
        function_io_list: List[FunctionIOCatalogEntry],
415
        function_metadata_list: List[FunctionMetadataCatalogEntry],
416
    ) -> FunctionCatalogEntry:
417
        """Inserts a function catalog entry along with Function_IO entries.
418
        It persists the entry to the database.
419

420
        Arguments:
421
            name(str): name of the function
422
            impl_file_path(str): implementation path of the function
423
            type(str): what kind of function operator like classification,
424
                                                        detection etc
425
            function_io_list(List[FunctionIOCatalogEntry]): input/output function info list
426

427
        Returns:
428
            The persisted FunctionCatalogEntry object.
429
        """
430

431
        checksum = get_file_checksum(impl_file_path)
1✔
432
        function_entry = self._function_service.insert_entry(
1✔
433
            name, impl_file_path, type, checksum
434
        )
435
        for function_io in function_io_list:
1✔
436
            function_io.function_id = function_entry.row_id
1✔
437
        self._function_io_service.insert_entries(function_io_list)
1✔
438
        for function_metadata in function_metadata_list:
1✔
439
            function_metadata.function_id = function_entry.row_id
1✔
440
        self._function_metadata_service.insert_entries(function_metadata_list)
1✔
441
        return function_entry
1✔
442

443
    def get_function_catalog_entry_by_name(self, name: str) -> FunctionCatalogEntry:
1✔
444
        """
445
        Get the function information based on name.
446

447
        Arguments:
448
             name (str): name of the function
449

450
        Returns:
451
            FunctionCatalogEntry object
452
        """
453
        return self._function_service.get_entry_by_name(name)
1✔
454

455
    def delete_function_catalog_entry_by_name(self, function_name: str) -> bool:
1✔
456
        return self._function_service.delete_entry_by_name(function_name)
1✔
457

458
    def get_all_function_catalog_entries(self):
1✔
459
        return self._function_service.get_all_entries()
1✔
460

461
    "function cost catalog services"
1✔
462

463
    def upsert_function_cost_catalog_entry(
1✔
464
        self, function_id: int, name: str, cost: int
465
    ) -> FunctionCostCatalogEntry:
466
        """Upserts function cost catalog entry.
467

468
        Arguments:
469
            function_id(int): unique function id
470
            name(str): the name of the function
471
            cost(int): cost of this function
472

473
        Returns:
474
            The persisted FunctionCostCatalogEntry object.
475
        """
476

477
        self._function_cost_catalog_service.upsert_entry(function_id, name, cost)
1✔
478

479
    def get_function_cost_catalog_entry(self, name: str):
1✔
480
        return self._function_cost_catalog_service.get_entry_by_name(name)
1✔
481

482
    "FunctionIO services"
1✔
483

484
    def get_function_io_catalog_input_entries(
1✔
485
        self, function_obj: FunctionCatalogEntry
486
    ) -> List[FunctionIOCatalogEntry]:
487
        return self._function_io_service.get_input_entries_by_function_id(
1✔
488
            function_obj.row_id
489
        )
490

491
    def get_function_io_catalog_output_entries(
1✔
492
        self, function_obj: FunctionCatalogEntry
493
    ) -> List[FunctionIOCatalogEntry]:
494
        return self._function_io_service.get_output_entries_by_function_id(
1✔
495
            function_obj.row_id
496
        )
497

498
    """ Index related services. """
1✔
499

500
    def insert_index_catalog_entry(
1✔
501
        self,
502
        name: str,
503
        save_file_path: str,
504
        vector_store_type: VectorStoreType,
505
        feat_column: ColumnCatalogEntry,
506
        function_signature: str,
507
        index_def: str,
508
    ) -> IndexCatalogEntry:
509
        index_catalog_entry = self._index_service.insert_entry(
×
510
            name,
511
            save_file_path,
512
            vector_store_type,
513
            feat_column,
514
            function_signature,
515
            index_def,
516
        )
517
        return index_catalog_entry
×
518

519
    def get_index_catalog_entry_by_name(self, name: str) -> IndexCatalogEntry:
1✔
520
        return self._index_service.get_entry_by_name(name)
×
521

522
    def get_index_catalog_entry_by_column_and_function_signature(
1✔
523
        self, column: ColumnCatalogEntry, function_signature: str
524
    ):
525
        return self._index_service.get_entry_by_column_and_function_signature(
×
526
            column, function_signature
527
        )
528

529
    def drop_index_catalog_entry(self, index_name: str) -> bool:
1✔
530
        return self._index_service.delete_entry_by_name(index_name)
×
531

532
    def get_all_index_catalog_entries(self):
1✔
533
        return self._index_service.get_all_entries()
×
534

535
    """ Function Cache related"""
1✔
536

537
    def insert_function_cache_catalog_entry(self, func_expr: FunctionExpression):
1✔
538
        cache_dir = self._config.get_value("storage", "cache_dir")
×
539
        entry = construct_function_cache_catalog_entry(func_expr, cache_dir=cache_dir)
×
540
        return self._function_cache_service.insert_entry(entry)
×
541

542
    def get_function_cache_catalog_entry_by_name(
1✔
543
        self, name: str
544
    ) -> FunctionCacheCatalogEntry:
545
        return self._function_cache_service.get_entry_by_name(name)
×
546

547
    def drop_function_cache_catalog_entry(
1✔
548
        self, entry: FunctionCacheCatalogEntry
549
    ) -> bool:
550
        # remove the data structure associated with the entry
551
        if entry:
×
552
            shutil.rmtree(entry.cache_path)
×
553
        return self._function_cache_service.delete_entry(entry)
×
554

555
    """ function Metadata Catalog"""
1✔
556

557
    def get_function_metadata_entries_by_function_name(
1✔
558
        self, function_name: str
559
    ) -> List[FunctionMetadataCatalogEntry]:
560
        """
561
        Get the function metadata information for the provided function.
562

563
        Arguments:
564
             function_name (str): name of the function
565

566
        Returns:
567
            FunctionMetadataCatalogEntry objects
568
        """
569
        function_entry = self.get_function_catalog_entry_by_name(function_name)
×
570
        if function_entry:
×
571
            entries = self._function_metadata_service.get_entries_by_function_id(
×
572
                function_entry.row_id
573
            )
574
            return entries
×
575
        else:
576
            return []
×
577

578
    """ Utils """
1✔
579

580
    def create_and_insert_table_catalog_entry(
1✔
581
        self,
582
        table_info: TableInfo,
583
        columns: List[ColumnDefinition],
584
        identifier_column: str = None,
585
        table_type: TableType = TableType.STRUCTURED_DATA,
586
    ) -> TableCatalogEntry:
587
        """Create a valid table catalog tuple and insert into the table
588

589
        Args:
590
            table_info (TableInfo): table info object
591
            columns (List[ColumnDefinition]): columns definitions of the table
592
            identifier_column (str, optional): Specify unique columns. Defaults to None.
593
            table_type (TableType, optional): table type. Defaults to TableType.STRUCTURED_DATA.
594

595
        Returns:
596
            TableCatalogEntry: entry that has been inserted into the table catalog
597
        """
598
        table_name = table_info.table_name
1✔
599
        column_catalog_entries = xform_column_definitions_to_catalog_entries(columns)
1✔
600

601
        dataset_location = self._config.get_value("core", "datasets_dir")
1✔
602
        file_url = str(generate_file_path(dataset_location, table_name))
1✔
603
        table_catalog_entry = self.insert_table_catalog_entry(
1✔
604
            table_name,
605
            file_url,
606
            column_catalog_entries,
607
            identifier_column=identifier_column,
608
            table_type=table_type,
609
        )
610
        return table_catalog_entry
1✔
611

612
    def create_and_insert_multimedia_table_catalog_entry(
1✔
613
        self, name: str, format_type: FileFormatType
614
    ) -> TableCatalogEntry:
615
        """Create a table catalog entry for the multimedia table.
616
        Depending on the type of multimedia, the appropriate "create catalog entry" command is called.
617

618
        Args:
619
            name (str):  name of the table catalog entry
620
            format_type (FileFormatType): media type
621

622
        Raises:
623
            CatalogError: if format_type is not supported
624

625
        Returns:
626
            TableCatalogEntry: newly inserted table catalog entry
627
        """
628
        assert format_type in [
1✔
629
            FileFormatType.VIDEO,
630
            FileFormatType.IMAGE,
631
            FileFormatType.DOCUMENT,
632
            FileFormatType.PDF,
633
        ], f"Format Type {format_type} is not supported"
634

635
        if format_type is FileFormatType.VIDEO:
1✔
636
            columns = get_video_table_column_definitions()
1✔
637
            table_type = TableType.VIDEO_DATA
1✔
638
        elif format_type is FileFormatType.IMAGE:
×
639
            columns = get_image_table_column_definitions()
×
640
            table_type = TableType.IMAGE_DATA
×
641
        elif format_type is FileFormatType.DOCUMENT:
×
642
            columns = get_document_table_column_definitions()
×
643
            table_type = TableType.DOCUMENT_DATA
×
644
        elif format_type is FileFormatType.PDF:
×
645
            columns = get_pdf_table_column_definitions()
×
646
            table_type = TableType.PDF_DATA
×
647

648
        return self.create_and_insert_table_catalog_entry(
1✔
649
            TableInfo(name), columns, table_type=table_type
650
        )
651

652
    def get_multimedia_metadata_table_catalog_entry(
1✔
653
        self, input_table: TableCatalogEntry
654
    ) -> TableCatalogEntry:
655
        """Get table catalog entry for multimedia metadata table.
656
        Raise if it does not exists
657
        Args:
658
            input_table (TableCatalogEntryEntryEntryEntry): input media table
659

660
        Returns:
661
            TableCatalogEntry: metainfo table entry which is maintained by the system
662
        """
663
        # use file_url as the metadata table name
664
        media_metadata_name = Path(input_table.file_url).stem
1✔
665
        obj = self.get_table_catalog_entry(media_metadata_name)
1✔
666
        assert (
1✔
667
            obj is not None
668
        ), f"Table with name {media_metadata_name} does not exist in catalog"
669

670
        return obj
1✔
671

672
    def create_and_insert_multimedia_metadata_table_catalog_entry(
1✔
673
        self, input_table: TableCatalogEntry
674
    ) -> TableCatalogEntry:
675
        """Create and insert table catalog entry for multimedia metadata table.
676
         This table is used to store all media filenames and related information. In
677
         order to prevent direct access or modification by users, it should be
678
         designated as a SYSTEM_STRUCTURED_DATA type.
679
         **Note**: this table is managed by the storage engine, so it should not be
680
         called elsewhere.
681
        Args:
682
            input_table (TableCatalogEntry): input video table
683

684
        Returns:
685
            TableCatalogEntry: metainfo table entry which is maintained by the system
686
        """
687
        # use file_url as the metadata table name
688
        media_metadata_name = Path(input_table.file_url).stem
1✔
689
        obj = self.get_table_catalog_entry(media_metadata_name)
1✔
690
        assert obj is None, "Table with name {media_metadata_name} already exists"
1✔
691

692
        columns = [ColumnDefinition("file_url", ColumnType.TEXT, None, None)]
1✔
693
        obj = self.create_and_insert_table_catalog_entry(
1✔
694
            TableInfo(media_metadata_name),
695
            columns,
696
            identifier_column=columns[0].name,
697
            table_type=TableType.SYSTEM_STRUCTURED_DATA,
698
        )
699
        return obj
1✔
700

701

702
#### get catalog instance
703
# This function plays a crucial role in ensuring that different threads do
704
# not share the same catalog object, as it can result in serialization issues and
705
# incorrect behavior with SQLAlchemy. Therefore, whenever a catalog instance is
706
# required, we create a new one. One possible optimization is to share the catalog
707
# instance across all objects within the same thread. It is worth investigating whether
708
# SQLAlchemy already handles this optimization for us, which will be explored at a
709
# later time.
710
def get_catalog_instance(db_uri: str, config: ConfigurationManager):
1✔
711
    return CatalogManager(db_uri, config)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc