• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.65
/localstack-core/localstack/utils/scheduler.py
1
import queue
1✔
2
import threading
1✔
3
import time
1✔
4
from collections.abc import Mapping
1✔
5
from concurrent.futures import Executor
1✔
6
from typing import Any, Callable, Optional, Union
1✔
7

8

9
class ScheduledTask:
1✔
10
    """
11
    Internal representation of a task (a callable) and its scheduling parameters.
12
    """
13

14
    def __init__(
1✔
15
        self,
16
        task: Callable,
17
        period: Optional[float] = None,
18
        fixed_rate: bool = True,
19
        start: Optional[float] = None,
20
        on_error: Callable[[Exception], None] = None,
21
        args: Optional[Union[tuple, list]] = None,
22
        kwargs: Optional[Mapping[str, Any]] = None,
23
    ) -> None:
24
        super().__init__()
1✔
25
        self.task = task
1✔
26
        self.fixed_rate = fixed_rate
1✔
27
        self.period = period
1✔
28
        self.start = start
1✔
29
        self.on_error = on_error
1✔
30
        self.args = args or tuple()
1✔
31
        self.kwargs = kwargs or dict()
1✔
32

33
        self.deadline = None
1✔
34
        self.error = None
1✔
35
        self._cancelled = False
1✔
36

37
    @property
1✔
38
    def is_periodic(self) -> bool:
1✔
39
        return self.period is not None
1✔
40

41
    @property
1✔
42
    def is_cancelled(self) -> bool:
1✔
43
        return self._cancelled
1✔
44

45
    def set_next_deadline(self):
1✔
46
        """
47
        Internal method to update the next deadline of this task based on the period and the current time.
48
        """
49
        if not self.deadline:
1✔
UNCOV
50
            raise ValueError("Deadline was not initialized")
×
51

52
        if self.fixed_rate:
1✔
53
            self.deadline = self.deadline + self.period
1✔
54
        else:
55
            self.deadline = time.time() + self.period
1✔
56

57
    def cancel(self):
1✔
58
        self._cancelled = True
1✔
59

60
    def run(self):
1✔
61
        """
62
        Executes the task function. If the function raises and Exception, ``on_error`` is called (if set).
63
        """
64
        try:
1✔
65
            self.task(*self.args, **self.kwargs)
1✔
66
        except Exception as e:
1✔
67
            if self.on_error:
1✔
68
                self.on_error(e)
1✔
69

70

71
class Scheduler:
1✔
72
    """
73
    An event-loop based task scheduler that can manage multiple scheduled tasks with different periods,
74
    can be parallelized with an executor.
75
    """
76

77
    POISON = (-1, "__POISON__")
1✔
78

79
    def __init__(self, executor: Optional[Executor] = None) -> None:
1✔
80
        """
81
        Creates a new Scheduler. If an executor is passed, then that executor will be used to run the scheduled tasks
82
        asynchronously, otherwise they will be executed synchronously inside the event loop. Running tasks
83
        asynchronously in an executor means that they will be effectively executed at a fixed rate (scheduling with
84
        ``fixed_rate = False``, will have no effect).
85

86
        :param executor: an optional executor that tasks will be submitted to.
87
        """
88
        super().__init__()
1✔
89
        self.executor = executor
1✔
90

91
        self._queue = queue.PriorityQueue()
1✔
92
        self._condition = threading.Condition()
1✔
93

94
    def schedule(
1✔
95
        self,
96
        func: Callable,
97
        period: Optional[float] = None,
98
        fixed_rate: bool = True,
99
        start: Optional[float] = None,
100
        on_error: Callable[[Exception], None] = None,
101
        args: Optional[Union[tuple, list[Any]]] = None,
102
        kwargs: Optional[Mapping[str, Any]] = None,
103
    ) -> ScheduledTask:
104
        """
105
        Schedules a given task (function call).
106

107
        :param func: the task to schedule
108
        :param period: the period in which to run the task (in seconds). if not set, task will run once
109
        :param fixed_rate: whether the to run at a fixed rate (neglecting execution duration of the task)
110
        :param start: start time
111
        :param on_error: error callback
112
        :param args: additional positional arguments to pass to the function
113
        :param kwargs: additional keyword arguments to pass to the function
114
        :return: a ScheduledTask instance
115
        """
116
        st = ScheduledTask(
1✔
117
            func,
118
            period=period,
119
            fixed_rate=fixed_rate,
120
            start=start,
121
            on_error=on_error,
122
            args=args,
123
            kwargs=kwargs,
124
        )
125
        self.schedule_task(st)
1✔
126
        return st
1✔
127

128
    def schedule_task(self, task: ScheduledTask) -> None:
1✔
129
        """
130
        Schedules the given task and sets the deadline of the task to either ``task.start`` or the current time.
131

132
        :param task: the task to schedule
133
        """
134
        task.deadline = max(task.start or 0, time.time())
1✔
135
        self.add(task)
1✔
136

137
    def add(self, task: ScheduledTask) -> None:
1✔
138
        """
139
        Schedules the given task. Requires that the task has a deadline set. It's better to use ``schedule_task``.
140

141
        :param task: the task to schedule.
142
        """
143
        if task.deadline is None:
1✔
UNCOV
144
            raise ValueError
×
145

146
        task._cancelled = False
1✔
147

148
        with self._condition:
1✔
149
            self._queue.put((task.deadline, task))
1✔
150
            self._condition.notify()
1✔
151

152
    def close(self) -> None:
1✔
153
        """
154
        Terminates the run loop.
155
        """
156
        with self._condition:
1✔
157
            self._queue.put(self.POISON)
1✔
158
            self._condition.notify()
1✔
159

160
    def run(self):
1✔
161
        q = self._queue
1✔
162
        cond = self._condition
1✔
163
        executor = self.executor
1✔
164
        poison = self.POISON
1✔
165

166
        task: ScheduledTask
167
        while True:
1✔
168
            deadline, task = q.get()
1✔
169

170
            if (deadline, task) == poison:
1✔
171
                break
1✔
172

173
            if task.is_cancelled:
1✔
174
                continue
1✔
175

176
            # wait until the task should be executed
177
            wait = max(0, deadline - time.time())
1✔
178
            if wait > 0:
1✔
179
                with cond:
1✔
180
                    interrupted = cond.wait(timeout=wait)
1✔
181
                    if interrupted:
1✔
182
                        # something with a potentially earlier deadline has arrived while waiting, so we re-queue and
183
                        # continue. this could be optimized by checking the deadline of the added element(s) first,
184
                        # but that would be fairly involved. the assumption is that `schedule` is not invoked frequently
185
                        q.put((task.deadline, task))
1✔
186
                        continue
1✔
187

188
            # run or submit the task
189
            if not task.is_cancelled:
1✔
190
                if executor:
1✔
191
                    executor.submit(task.run)
1✔
192
                else:
193
                    task.run()
1✔
194

195
            if task.is_periodic:
1✔
196
                try:
1✔
197
                    task.set_next_deadline()
1✔
UNCOV
198
                except ValueError:
×
199
                    # task deadline couldn't be set because it was cancelled
UNCOV
200
                    continue
×
201
                q.put((task.deadline, task))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc