• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a75d4cdb-5478-400d-88b6-c219d38db53a

27 Oct 2023 06:59AM UTC coverage: 72.758% (+72.8%) from 0.0%
a75d4cdb-5478-400d-88b6-c219d38db53a

Pull #1308

circle-ci

dungnmaster
linting changes
Pull Request #1308: Job scheduler changes

289 of 289 new or added lines in 17 files covered. (100.0%)

9332 of 12826 relevant lines covered (72.76%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.19
/evadb/catalog/services/job_catalog_service.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import datetime
1✔
17
import json
1✔
18

19
from sqlalchemy import and_, true
1✔
20
from sqlalchemy.orm import Session
1✔
21
from sqlalchemy.sql.expression import select
1✔
22

23
from evadb.catalog.models.job_catalog import JobCatalog
1✔
24
from evadb.catalog.models.utils import JobCatalogEntry
1✔
25
from evadb.catalog.services.base_service import BaseService
1✔
26
from evadb.utils.errors import CatalogError
1✔
27
from evadb.utils.logging_manager import logger
1✔
28

29

30
class JobCatalogService(BaseService):
1✔
31
    def __init__(self, db_session: Session):
1✔
32
        super().__init__(JobCatalog, db_session)
1✔
33

34
    def insert_entry(
1✔
35
        self,
36
        name: str,
37
        queries: list,
38
        start_time: datetime,
39
        end_time: datetime,
40
        repeat_interval: int,
41
        active: bool,
42
        next_schedule_run: datetime,
43
    ) -> JobCatalogEntry:
44
        try:
×
45
            job_catalog_obj = self.model(
×
46
                name=name,
47
                queries=json.dumps(queries),
48
                start_time=start_time,
49
                end_time=end_time,
50
                repeat_interval=repeat_interval,
51
                active=active,
52
                next_schedule_run=next_schedule_run,
53
            )
54
            job_catalog_obj = job_catalog_obj.save(self.session)
×
55

56
        except Exception as e:
57
            logger.exception(
58
                f"Failed to insert entry into job catalog with exception {str(e)}"
59
            )
60
            raise CatalogError(e)
61

62
        return job_catalog_obj.as_dataclass()
×
63

64
    def get_entry_by_name(self, job_name: str) -> JobCatalogEntry:
1✔
65
        """
66
        Get the job catalog entry with given job name.
67
        Arguments:
68
            job_name  (str): Job name
69
        Returns:
70
            JobCatalogEntry - catalog entry for given job name
71
        """
72
        entry = self.session.execute(
×
73
            select(self.model).filter(self.model._name == job_name)
74
        ).scalar_one_or_none()
75
        if entry:
×
76
            return entry.as_dataclass()
×
77
        return entry
×
78

79
    def delete_entry(self, job_entry: JobCatalogEntry):
1✔
80
        """Delete Job from the catalog
81
        Arguments:
82
            job  (JobCatalogEntry): job to delete
83
        Returns:
84
            True if successfully removed else false
85
        """
86
        try:
×
87
            job_catalog_obj = self.session.execute(
×
88
                select(self.model).filter(self.model._row_id == job_entry.row_id)
89
            ).scalar_one_or_none()
90
            job_catalog_obj.delete(self.session)
×
91
            return True
×
92
        except Exception as e:
93
            err_msg = f"Delete Job failed for {job_entry} with error {str(e)}."
94
            logger.exception(err_msg)
95
            raise CatalogError(err_msg)
96

97
    def get_all_overdue_jobs(self) -> list:
1✔
98
        """Get the list of jobs that are overdue to be triggered
99
        Arguments:
100
            None
101
        Returns:
102
            Returns the list of all active overdue jobs
103
        """
104
        entries = self.session.execute(
×
105
            select(self.model).filter(
106
                and_(
107
                    self.model._next_scheduled_run <= datetime.datetime.now(),
108
                    self.model._active == true(),
109
                )
110
            )
111
        ).all()
112
        entry = [row.as_dataclass() for row in entries]
×
113
        return entry
×
114

115
    def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry:
1✔
116
        """Get the oldest job that is ready to be triggered by trigger time
117
        Arguments:
118
            only_past_jobs (bool): boolean flag to denote if only jobs with trigger time in
119
                past should be considered
120
        Returns:
121
            Returns the first job to be triggered
122
        """
123
        entry = self.session.execute(
×
124
            select(self.model)
125
            .filter(
126
                and_(
127
                    self.model._next_scheduled_run <= datetime.datetime.now(),
128
                    self.model._active == true(),
129
                )
130
                if only_past_jobs
131
                else self.model._active == true()
132
            )
133
            .order_by(self.model._next_scheduled_run.asc())
134
            .limit(1)
135
        ).scalar_one_or_none()
136
        if entry:
×
137
            return entry.as_dataclass()
×
138
        return entry
×
139

140
    def update_next_scheduled_run(
1✔
141
        self, job_name: str, next_scheduled_run: datetime, active: bool
142
    ):
143
        """Update the next_scheduled_run and active column as per the provided values
144
        Arguments:
145
            job_name (str): job which should be updated
146

147
            next_run_time (datetime): the next trigger time for the job
148

149
            active (bool): the active status for the job
150
        Returns:
151
            void
152
        """
153
        job = (
×
154
            self.session.query(self.model).filter(self.model._name == job_name).first()
155
        )
156
        if job:
×
157
            job._next_scheduled_run = next_scheduled_run
×
158
            job._active = active
×
159
            self.session.commit()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc