• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

h3llrais3r / Auto-Subliminal / 9993891536

18 Jul 2024 03:00PM UTC coverage: 45.146% (-0.02%) from 45.17%
9993891536

Pull #1751

github

web-flow
Merge 43ef12d45 into 14056858b
Pull Request #1751: Update dependency primeng to v17.18.5

3725 of 8251 relevant lines covered (45.15%)

3.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.43
/autosubliminal/core/scheduler.py
1
# coding=utf-8
2

3
import datetime
8✔
4
import logging
8✔
5
import os
8✔
6
import threading
8✔
7
import time
8✔
8
import traceback
8✔
9
from abc import ABC, abstractmethod
8✔
10
from typing import Any, Callable, Dict
8✔
11

12
import autosubliminal
8✔
13
from autosubliminal.core.queue import get_wanted_queue_lock, release_wanted_queue_lock
8✔
14
from autosubliminal.util.common import camelize, to_dict
8✔
15
from autosubliminal.util.websocket import SCHEDULER_FINISH, SCHEDULER_START, send_websocket_event
8✔
16

17
log = logging.getLogger(__name__)
8✔
18

19

20
class Scheduler(object):
8✔
21
    """
8✔
22
    Scheduler class.
23

24
    :param name: name of the thread to schedule
25
    :type name: str
26
    :param process: process to schedule
27
    :type process: ScheduledProcess
28
    :param interval: interval in hours between scheduled runs
29
    :type interval: int
30
    :param active: indication if the scheduler is active or not
31
    :type active: bool
32
    """
33

34
    def __init__(self, name: str, process: 'ScheduledProcess', interval: int, active: bool = True) -> None:
8✔
35
        self.name = name
8✔
36
        self.process = process
8✔
37
        self.interval = datetime.timedelta(hours=interval).total_seconds()  # Convert to seconds
8✔
38
        self.active = active
8✔
39
        self.last_run: float = 0
8✔
40
        self._initial_run: float = 0  # Only used to allow delay of initial run
8✔
41
        self._delay: int = 0
8✔
42
        self._force_run: bool = False
8✔
43
        self._force_stop: bool = False
8✔
44

45
        # Register scheduler
46
        self._register_scheduler()
8✔
47

48
        # Create thread (use timer so we can delay the execution when needed)
49
        self._thread = threading.Thread(name=self.name, target=self._schedule_process)
8✔
50

51
    def _register_scheduler(self) -> None:
8✔
52
        # Add scheduler to dict of schedulers
53
        scheduler_name = self.name
8✔
54
        while scheduler_name in autosubliminal.SCHEDULERS:
8✔
55
            # Add suffix in case of multiple schedulers with same name (but this shouldn't occur)
56
            suffix = 1
8✔
57
            suffix_index = scheduler_name.rfind('-')
8✔
58
            if suffix_index > 0:
8✔
59
                scheduler_name_suffix = scheduler_name[suffix_index + 1 :]
8✔
60
                try:
8✔
61
                    suffix = int(scheduler_name_suffix)
8✔
62
                    suffix += 1
8✔
63
                    scheduler_name = scheduler_name[:suffix_index] + '-' + str(suffix)
8✔
64
                except Exception:
×
65
                    scheduler_name = scheduler_name + '-' + str(suffix)
×
66
            else:
67
                scheduler_name = scheduler_name + '-' + str(suffix)
8✔
68
        self.name = scheduler_name
8✔
69
        autosubliminal.SCHEDULERS[scheduler_name] = self
8✔
70

71
    def _deregister_scheduler(self) -> None:
8✔
72
        # Remove scheduler form dict of schedulers
73
        del autosubliminal.SCHEDULERS[self.name]
8✔
74

75
    def start(self, now: bool = True, wait: bool = False) -> None:
8✔
76
        """Start the scheduler."""
77
        log.info('Starting %s thread', self.name)
8✔
78

79
        # If the process should not run now, set initial_run to now to postpone till next run
80
        if not now:
8✔
81
            self._initial_run = time.time()
×
82

83
        # Start thread
84
        self._thread.start()
8✔
85

86
        # Wait and block caller thread until process is executed the first time when needed
87
        if self.active and now and wait:
8✔
88
            log.debug('Waiting for initial run of %s thread to be completed', self.name)
8✔
89
            while not self.last_run:
8✔
90
                time.sleep(1)
8✔
91

92
    def _schedule_process(self) -> None:
8✔
93
        while True:
4✔
94
            # Check for stop
95
            if self._force_stop:
8✔
96
                break
8✔
97

98
            # Check if we need to run the process (only check if the scheduler is marked as active)
99
            run_needed = False
8✔
100
            current_time = time.time()
8✔
101
            if self.active:
8✔
102
                if self._force_run:
8✔
103
                    run_needed = True
8✔
104
                elif current_time - (self.last_run or self._initial_run) > self.interval:
8✔
105
                    run_needed = True
8✔
106
            elif self.last_run:
8✔
107
                # Clear last_run if not active
108
                self.last_run = 0
8✔
109

110
            # Run if needed
111
            if run_needed:
8✔
112
                if self._delay:
8✔
113
                    log.debug('Delaying %s thread process with %s seconds', self.name, self._delay)
×
114
                    time.sleep(self._delay)
×
115
                self._run_process(current_time)
8✔
116

117
            time.sleep(1)
8✔
118

119
    def _run_process(self, current_time: float) -> None:
8✔
120
        # Check if the run needs a lock
121
        run_lock = self.process.force_run_lock if self._force_run else self.process.run_lock
8✔
122

123
        # Delay process if lock cannot be acquired
124
        if run_lock and not get_wanted_queue_lock():
8✔
125
            # Increase delay with 1 second each time the process cannot yet run
126
            self._delay += 1
×
127
            return
×
128

129
        try:
8✔
130
            # Mark as running
131
            self.process.running = True
8✔
132
            send_websocket_event(SCHEDULER_START, data=self.to_dict(camelize))
8✔
133

134
            log.debug('Running %s thread process', self.name)
8✔
135
            self.process.run(self._force_run)
8✔
136

137
            # Update process properties after process run
138
            self.last_run = current_time
8✔
139
            self._delay = 0
8✔
140
            if self._force_run:
8✔
141
                self._force_run = False
8✔
142

143
            # Mark as finished
144
            self.process.running = False
8✔
145
            send_websocket_event(SCHEDULER_FINISH, data=self.to_dict(camelize))
8✔
146

147
        except:
8✔
148
            print(traceback.format_exc())
8✔
149
            self.process.running = False
8✔
150
            os._exit(1)
8✔
151

152
        finally:
153
            # Release lock if needed
154
            if run_lock:
8✔
155
                release_wanted_queue_lock()
8✔
156

157
    def stop(self) -> None:
8✔
158
        """Stop the scheduler."""
159
        log.info('Stopping %s thread', self.name)
8✔
160

161
        self._force_stop = True
8✔
162
        self._thread.join(10)
8✔
163
        self._deregister_scheduler()
8✔
164

165
    def activate(self) -> None:
8✔
166
        """Activate the scheduler."""
167
        log.info('Activating %s scheduler', self.name)
8✔
168
        self.active = True
8✔
169

170
    def deactivate(self) -> None:
8✔
171
        """Deactivate the scheduler."""
172
        log.info('Deactivating %s scheduler', self.name)
8✔
173
        self.active = False
8✔
174

175
    def run(self, delay: int = 0) -> None:
8✔
176
        """Force run the scheduler."""
177
        log.info('Running %s thread', self.name)
8✔
178

179
        self._force_run = True
8✔
180
        self._delay = delay
8✔
181

182
    def to_dict(self, key_fn: Callable, *args: Any, **kwargs: Any) -> Dict[str, Any]:
8✔
183
        """Convert the object to its dict representation.
184

185
        :param key_fn: the function that is executed on the keys when creating the dict
186
        :type key_fn: function
187
        :param args: optional list of attributes not to include in the conversion
188
        :type args: tuple
189
        :param kwargs: optional dict with custom attributes to include in the conversion
190
        :type args: dict
191
        :return: the json dict
192
        :rtype: dict
193
        """
194
        exclude_args = ['process', 'last_run', 'next_run']
8✔
195
        if args:
8✔
196
            exclude_args.extend(list(args))
×
197

198
        # Define kwargs to include
199
        last_run_in_ms = self.last_run * 1000  # convert to ms for javascript date compatibility
8✔
200
        next_run_in_ms = self.next_run * 1000  # convert to ms for javascript date compatibility
8✔
201
        include_kwargs = {'last_run': last_run_in_ms, 'next_run': next_run_in_ms}
8✔
202
        if kwargs:
8✔
203
            include_kwargs.update(kwargs)
×
204

205
        return to_dict(self, key_fn, *exclude_args, **include_kwargs)
8✔
206

207
    @property
8✔
208
    def alive(self) -> bool:
8✔
209
        return self._thread.is_alive()
8✔
210

211
    @property
8✔
212
    def next_run(self) -> float:
8✔
213
        if self.last_run:
8✔
214
            return self.last_run + self.interval
8✔
215
        else:
216
            return 0
8✔
217

218
    @property
8✔
219
    def running(self) -> bool:
8✔
220
        return self.process.running
8✔
221

222

223
class ScheduledProcess(ABC):
8✔
224
    """
8✔
225
    Base class for all scheduled processes.
226
    """
227

228
    def __init__(self, run_lock: bool = True, force_run_lock: bool = True) -> None:
8✔
229
        self.running = False
8✔
230
        self.run_lock = run_lock
8✔
231
        self.force_run_lock = force_run_lock
8✔
232

233
    @abstractmethod
234
    def run(self, force_run: bool) -> None:
235
        pass
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc