• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

georgia-tech-db / eva / a75d4cdb-5478-400d-88b6-c219d38db53a

27 Oct 2023 06:59AM UTC coverage: 72.758% (+72.8%) from 0.0%
a75d4cdb-5478-400d-88b6-c219d38db53a

Pull #1308

circle-ci

dungnmaster
linting changes
Pull Request #1308: Job scheduler changes

289 of 289 new or added lines in 17 files covered. (100.0%)

9332 of 12826 relevant lines covered (72.76%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.88
/evadb/executor/create_job_executor.py
1
# coding=utf-8
2
# Copyright 2018-2023 EvaDB
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
import re
1✔
16
from datetime import datetime
1✔
17

18
import pandas as pd
1✔
19

20
from evadb.database import EvaDBDatabase
1✔
21
from evadb.executor.abstract_executor import AbstractExecutor
1✔
22
from evadb.executor.executor_utils import ExecutorError
1✔
23
from evadb.models.storage.batch import Batch
1✔
24
from evadb.parser.create_statement import CreateJobStatement
1✔
25
from evadb.utils.logging_manager import logger
1✔
26

27

28
class CreateJobExecutor(AbstractExecutor):
1✔
29
    def __init__(self, db: EvaDBDatabase, node: CreateJobStatement):
1✔
30
        super().__init__(db, node)
1✔
31

32
    def _parse_datetime_str(self, datetime_str: str) -> datetime:
1✔
33
        datetime_format = "%Y-%m-%d %H:%M:%S"
×
34
        date_format = "%Y-%m-%d"
×
35

36
        if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", datetime_str):
×
37
            try:
×
38
                return datetime.strptime(datetime_str, datetime_format)
×
39
            except ValueError:
40
                raise ExecutorError(
41
                    f"{datetime_str} is not in the correct datetime format. expected format: {datetime_format}."
42
                )
43
        elif re.match(r"\d{4}-\d{2}-\d{2}", datetime_str):
×
44
            try:
×
45
                return datetime.strptime(datetime_str, date_format)
×
46
            except ValueError:
47
                raise ExecutorError(
48
                    f"{datetime_str} is not in the correct date format. expected format: {date_format}."
49
                )
50
        else:
51
            raise ValueError(
52
                f"{datetime_str} does not match the expected date or datetime format"
53
            )
54

55
    def _get_repeat_time_interval_seconds(
1✔
56
        self, repeat_interval: int, repeat_period: str
57
    ) -> int:
58
        unit_to_seconds = {
×
59
            "second": 1,
60
            "minute": 60,
61
            "minutes": 60,
62
            "min": 60,
63
            "hour": 3600,
64
            "hours": 3600,
65
            "day": 86400,
66
            "days": 86400,
67
            "week": 604800,
68
            "weeks": 604800,
69
            "month": 2592000,
70
            "months": 2592000,
71
        }
72
        assert (repeat_period is None) or (
×
73
            repeat_period in unit_to_seconds
74
        ), "repeat period should be one of these values: minute | minutes | min | hour | hours | day | days | week | weeks | month | months"
75

76
        repeat_interval = 1 if repeat_interval is None else repeat_interval
×
77
        return repeat_interval * unit_to_seconds.get(repeat_period, 0)
×
78

79
    def exec(self, *args, **kwargs):
1✔
80
        # Check if the job already exists.
81
        job_catalog_entry = self.catalog().get_job_catalog_entry(self.node.job_name)
×
82

83
        if job_catalog_entry is not None:
×
84
            if self.node.if_not_exists:
×
85
                msg = f"A job with name {self.node.job_name} already exists, nothing added."
×
86
                yield Batch(pd.DataFrame([msg]))
×
87
                return
×
88
            else:
89
                raise ExecutorError(
90
                    f"A job with name {self.node.job_name} already exists."
91
                )
92

93
        logger.debug(f"Creating job {self.node}")
×
94

95
        job_name = self.node.job_name
×
96
        queries = [str(q) for q in self.node.queries]
×
97
        start_time = (
×
98
            self._parse_datetime_str(self.node.start_time)
99
            if self.node.start_time is not None
100
            else datetime.datetime.now()
101
        )
102
        end_time = (
×
103
            self._parse_datetime_str(self.node.end_time)
104
            if self.node.end_time is not None
105
            else None
106
        )
107
        repeat_interval = self._get_repeat_time_interval_seconds(
×
108
            self.node.repeat_interval, self.node.repeat_period
109
        )
110
        active = True
×
111
        next_schedule_run = start_time
×
112

113
        self.catalog().insert_job_catalog_entry(
×
114
            job_name,
115
            queries,
116
            start_time,
117
            end_time,
118
            repeat_interval,
119
            active,
120
            next_schedule_run,
121
        )
122

123
        yield Batch(
×
124
            pd.DataFrame(
125
                [f"The job {self.node.job_name} has been successfully created."]
126
            )
127
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc