• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a75d4cdb-5478-400d-88b6-c219d38db53a

27 Oct 2023 06:59AM UTC coverage: 72.758% (+72.8%) from 0.0%
a75d4cdb-5478-400d-88b6-c219d38db53a

Pull #1308

circle-ci

dungnmaster
linting changes
Pull Request #1308: Job scheduler changes

289 of 289 new or added lines in 17 files covered. (100.0%)

9332 of 12826 relevant lines covered (72.76%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.24
/evadb/utils/job_scheduler.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import datetime
1✔
17
import sys
1✔
18
import time
1✔
19

20
from evadb.catalog.models.utils import JobCatalogEntry
1✔
21
from evadb.database import EvaDBDatabase
1✔
22
from evadb.server.command_handler import execute_query
1✔
23
from evadb.utils.generic_utils import parse_config_yml
1✔
24
from evadb.utils.logging_manager import logger
1✔
25

26

27
class JobScheduler:
1✔
28
    # read jobs with next trigger in the past
29
    # execute the task with oldest trigger date
30
    # update the next trigger date and TODO: update job history in one transaction
31
    # sleep till next wakeup time
32

33
    def __init__(self, evadb: EvaDBDatabase) -> None:
1✔
34
        config_object = parse_config_yml()
1✔
35
        self.jobs_config = (
1✔
36
            config_object["jobs"]
37
            if config_object is not None
38
            else {"poll_interval": 30}
39
        )
40
        self._evadb = evadb
1✔
41

42
    def _update_next_schedule_run(self, job_catalog_entry: JobCatalogEntry) -> bool:
1✔
43
        job_end_time = job_catalog_entry.end_time
×
44
        active_status = False
×
45
        if job_catalog_entry.repeat_interval > 0:
×
46
            next_trigger_time = datetime.datetime.now() + datetime.timedelta(
×
47
                seconds=job_catalog_entry.repeat_interval
48
            )
49
            if next_trigger_time < job_end_time:
×
50
                active_status = True
×
51

52
        self._evadb.catalog().update_job_catalog_entry(
×
53
            job_catalog_entry.name,
54
            next_trigger_time
55
            if active_status
56
            else job_catalog_entry.next_scheduled_run,
57
            active_status,
58
        )
59
        return active_status
×
60

61
    def _scan_and_execute_jobs(self):
1✔
62
        while True:
×
63
            try:
×
64
                for next_executable_job in iter(
×
65
                    lambda: self._evadb.catalog().get_next_executable_job(
66
                        only_past_jobs=True
67
                    ),
68
                    None,
69
                ):
70
                    execution_results = [
×
71
                        execute_query(self._evadb, query)
72
                        for query in next_executable_job.queries
73
                    ]
74
                    logger.debug(
×
75
                        f"Exection result for job: {next_executable_job.name} results: {execution_results}"
76
                    )
77
                    self._update_next_schedule_run(next_executable_job)
×
78

79
                next_executable_job = self._evadb.catalog().get_next_executable_job(
×
80
                    only_past_jobs=False
81
                )
82
                if next_executable_job.next_scheduled_run > datetime.datetime.now():
×
83
                    sleep_time = min(
×
84
                        self.jobs_config["poll_interval"],
85
                        (
86
                            next_executable_job.next_scheduled_run
87
                            - datetime.datetime.now()
88
                        ).total_seconds(),
89
                    )
90
                    logger.debug(
×
91
                        f"Job scheduler process sleeping for {sleep_time} seconds"
92
                    )
93
                    time.sleep(sleep_time)
×
94
            except Exception as e:
95
                logger.error(f"Got an exception in job scheduler: {str(e)}")
96
                time.sleep(self.jobs_config["poll_interval"] * 0.2)
97

98
    def execute(self):
1✔
99
        try:
×
100
            self._scan_and_execute_jobs()
×
101
        except KeyboardInterrupt:
102
            logger.debug("Exiting the job scheduler process due to interrupt")
103
            sys.exit()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc