• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

quantum-elixir / quantum-core / 12a0faacc0a2b110be8abde30228e658a4413b11-PR-661

02 Dec 2025 11:03AM UTC coverage: 88.189% (-0.5%) from 88.714%
12a0faacc0a2b110be8abde30228e658a4413b11-PR-661

Pull #661

github

web-flow
Bump step-security/harden-runner from 2.13.2 to 2.13.3

Bumps [step-security/harden-runner](https://github.com/step-security/harden-runner) from 2.13.2 to 2.13.3.
- [Release notes](https://github.com/step-security/harden-runner/releases)
- [Commits](https://github.com/step-security/harden-runner/compare/95d9a5ded...df199fb7b)

---
updated-dependencies:
- dependency-name: step-security/harden-runner
  dependency-version: 2.13.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #661: Bump step-security/harden-runner from 2.13.2 to 2.13.3

336 of 381 relevant lines covered (88.19%)

22.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.71
/lib/quantum/execution_broadcaster.ex
1
defmodule Quantum.ExecutionBroadcaster do
2
  @moduledoc false
3

4
  # Receives Added / Removed Jobs, Broadcasts Executions of Jobs
5

6
  use GenStage
7

8
  require Logger
9

10
  alias Crontab.CronExpression
11
  alias Crontab.Scheduler, as: CrontabScheduler
12

13
  alias Quantum.ClockBroadcaster.Event, as: ClockEvent
14

15
  alias Quantum.ExecutionBroadcaster.Event, as: ExecuteEvent
16
  alias Quantum.ExecutionBroadcaster.InitOpts
17
  alias Quantum.ExecutionBroadcaster.State
18
  alias Quantum.Job
19

20
  alias __MODULE__.{InitOpts, StartOpts, State}
21

22
  @type event :: {:add, Job.t()} | {:execute, Job.t()}
23

24
  defmodule JobInPastError do
25
    @moduledoc false
26
    defexception message:
27
                   "The job was scheduled in the past. This must not happen to prevent infinite loops!"
28
  end
29

30
  # Start Stage
31
  @spec start_link(StartOpts.t()) :: GenServer.on_start()
32
  def start_link(%StartOpts{name: name} = opts) do
33
    __MODULE__
34
    |> GenStage.start_link(
35
      struct!(
36
        InitOpts,
37
        Map.take(opts, [
38
          :job_broadcaster_reference,
39
          :clock_broadcaster_reference,
40
          :storage,
41
          :scheduler,
42
          :debug_logging
43
        ])
44
      ),
45
      name: name
46
    )
47
    |> case do
24✔
48
      {:ok, pid} ->
24✔
49
        {:ok, pid}
50

51
      {:error, {:already_started, pid}} ->
52
        Process.monitor(pid)
×
53
        {:ok, pid}
54

55
      {:error, _reason} = error ->
56
        error
×
57
    end
58
  end
59

60
  @impl GenStage
61
  def init(%InitOpts{
62
        job_broadcaster_reference: job_broadcaster,
63
        clock_broadcaster_reference: clock_broadcaster,
64
        storage: storage,
65
        scheduler: scheduler,
66
        debug_logging: debug_logging
67
      }) do
68
    storage_pid = GenServer.whereis(Module.concat(scheduler, Storage))
24✔
69

70
    {:producer_consumer,
24✔
71
     %State{
72
       uninitialized_jobs: [],
73
       execution_timeline: [],
74
       storage: storage,
75
       storage_pid: storage_pid,
76
       scheduler: scheduler,
77
       debug_logging: debug_logging
78
     }, subscribe_to: [job_broadcaster, clock_broadcaster]}
79
  end
80

81
  @impl GenStage
82
  def handle_events(events, _, state) do
83
    {events, state} =
60✔
84
      Enum.reduce(events, {[], state}, fn event, {list, state} ->
85
        {new_events, state} = handle_event(event, state)
65✔
86
        {list ++ new_events, state}
87
      end)
88

89
    {:noreply, events, state}
57✔
90
  end
91

92
  def handle_event(
93
        {:add, %Job{schedule: %CronExpression{reboot: true}, name: name} = job},
94
        %State{uninitialized_jobs: uninitialized_jobs, debug_logging: debug_logging} = state
95
      ) do
96
    debug_logging &&
1✔
97
      Logger.debug(fn ->
1✔
98
        {"Scheduling job for single reboot execution", node: Node.self(), name: name}
99
      end)
100

101
    {[%ExecuteEvent{job: job}], %{state | uninitialized_jobs: [job | uninitialized_jobs]}}
102
  end
103

104
  def handle_event(
105
        {:add, %Job{name: name} = job},
106
        %State{uninitialized_jobs: uninitialized_jobs, debug_logging: debug_logging} = state
107
      ) do
108
    debug_logging &&
29✔
109
      Logger.debug(fn ->
29✔
110
        {"Adding job", node: Node.self(), name: name}
111
      end)
112

113
    {[], %{state | uninitialized_jobs: [job | uninitialized_jobs]}}
114
  end
115

116
  def handle_event(
117
        {:run, %Job{name: name} = job},
118
        %State{debug_logging: debug_logging} = state
119
      ) do
120
    debug_logging &&
1✔
121
      Logger.debug(fn ->
1✔
122
        {"Running job once", node: Node.self(), name: name}
123
      end)
124

125
    {[%ExecuteEvent{job: job}], state}
126
  end
127

128
  def handle_event(
129
        {:remove, name},
130
        %State{
131
          uninitialized_jobs: uninitialized_jobs,
132
          execution_timeline: execution_timeline,
133
          debug_logging: debug_logging
134
        } = state
135
      ) do
136
    debug_logging &&
7✔
137
      Logger.debug(fn ->
7✔
138
        {"Removing job", node: Node.self(), name: name}
139
      end)
140

141
    uninitialized_jobs = Enum.reject(uninitialized_jobs, &(&1.name == name))
6✔
142

143
    execution_timeline =
6✔
144
      execution_timeline
145
      |> Enum.map(fn {date, job_list} ->
2✔
146
        {date, Enum.reject(job_list, &match?(%Job{name: ^name}, &1))}
2✔
147
      end)
148
      |> Enum.reject(fn
149
        {_, []} -> true
1✔
150
        {_, _} -> false
1✔
151
      end)
152

153
    {[],
154
     %{state | uninitialized_jobs: uninitialized_jobs, execution_timeline: execution_timeline}}
155
  end
156

157
  def handle_event(
158
        %ClockEvent{time: time},
159
        state
160
      ) do
161
    state
162
    |> initialize_jobs(time)
163
    |> execute_events_to_fire(time)
27✔
164
  end
165

166
  @impl GenStage
167
  def handle_info(_message, state) do
168
    {:noreply, [], state}
×
169
  end
170

171
  defp initialize_jobs(%State{uninitialized_jobs: uninitialized_jobs} = state, time) do
172
    uninitialized_jobs
173
    |> Enum.reject(&match?(%Job{schedule: %CronExpression{reboot: true}}, &1))
16✔
174
    |> Enum.reduce(
175
      %{
176
        state
177
        | uninitialized_jobs:
178
            Enum.filter(
179
              uninitialized_jobs,
180
              &match?(%Job{schedule: %CronExpression{reboot: true}}, &1)
16✔
181
            )
182
      },
183
      &add_job_to_state(&1, &2, time)
15✔
184
    )
185
    |> sort_state
27✔
186
  end
187

188
  defp execute_events_to_fire(%State{execution_timeline: []} = state, _time), do: {[], state}
14✔
189

190
  defp execute_events_to_fire(
191
         %State{
192
           storage: storage,
193
           storage_pid: storage_pid,
194
           debug_logging: debug_logging,
195
           execution_timeline: [{time_to_execute, jobs} | tail]
196
         } = state,
197
         time
198
       ) do
199
    case DateTime.compare(time, time_to_execute) do
22✔
200
      :gt ->
201
        raise "Jobs were skipped"
×
202

203
      :lt ->
11✔
204
        {[], state}
205

206
      :eq ->
207
        :ok = storage.update_last_execution_date(storage_pid, time_to_execute)
11✔
208

209
        events =
11✔
210
          for %Job{name: job_name} = job <- jobs do
11✔
211
            debug_logging &&
11✔
212
              Logger.debug(fn ->
11✔
213
                {"Scheduling job for execution", node: Node.self(), name: job_name}
214
              end)
215

216
            %ExecuteEvent{job: job}
11✔
217
          end
218

219
        {next_events, new_state} =
11✔
220
          jobs
221
          |> Enum.reduce(
222
            %{state | execution_timeline: tail},
223
            &add_job_to_state(&1, &2, DateTime.add(time, 1, :second))
11✔
224
          )
225
          |> sort_state
226
          |> execute_events_to_fire(time)
227

228
        {events ++ next_events, new_state}
229
    end
230
  end
231

232
  defp add_job_to_state(
233
         %Job{schedule: schedule, timezone: timezone, name: name} = job,
234
         state,
235
         time
236
       ) do
237
    with {:ok, execution_date} <- get_next_execution_time(job, time) do
26✔
238
      add_to_state(state, time, execution_date, job)
22✔
239
    else
240
      {:error, :time_zone_not_found} ->
241
        Logger.error(
1✔
242
          "Invalid Timezone #{inspect(timezone)} provided for job #{inspect(name)}.",
243
          job: job,
244
          error: :time_zone_not_found
245
        )
246

247
        state
1✔
248

249
      {:error, _} ->
250
        Logger.warning(fn ->
1✔
251
          """
1✔
252
          Invalid Schedule #{inspect(schedule)} provided for job #{inspect(name)}.
253
          No matching dates found. The job was removed.
254
          """
255
        end)
256

257
        state
1✔
258
    end
259
  end
260

261
  defp get_next_execution_time(
262
         %Job{schedule: schedule, timezone: :utc},
263
         time
264
       ) do
265
    CrontabScheduler.get_next_run_date(schedule, time)
22✔
266
  end
267

268
  defp get_next_execution_time(
269
         %Job{schedule: schedule, timezone: timezone},
270
         time
271
       ) do
272
    with {:ok, localized_time} <- DateTime.shift_zone(time, timezone),
4✔
273
         {:ok, localized_execution_time} <-
3✔
274
           CrontabScheduler.get_next_run_date(schedule, localized_time) do
275
      DateTime.shift_zone(localized_execution_time, "Etc/UTC")
3✔
276
    end
277
  end
278

279
  defp sort_state(%State{execution_timeline: execution_timeline} = state) do
280
    %{state | execution_timeline: Enum.sort_by(execution_timeline, &elem(&1, 0), DateTime)}
36✔
281
  end
282

283
  defp add_to_state(%State{execution_timeline: execution_timeline} = state, time, date, job) do
284
    unless DateTime.compare(time, date) in [:lt, :eq] do
22✔
285
      raise Quantum.ExecutionBroadcaster.JobInPastError
×
286
    end
287

288
    %{state | execution_timeline: add_job_at_date(execution_timeline, date, job)}
22✔
289
  end
290

291
  defp add_job_at_date(execution_timeline, date, job) do
292
    case find_date_and_put_job(execution_timeline, date, job) do
22✔
293
      {:found, list} -> list
×
294
      {:not_found, list} -> [{date, [job]} | list]
22✔
295
    end
296
  end
297

298
  defp find_date_and_put_job([{date, jobs} | rest], date, job) do
×
299
    {:found, [{date, [job | jobs]} | rest]}
300
  end
301

302
  defp find_date_and_put_job([], _, _) do
22✔
303
    {:not_found, []}
304
  end
305

306
  defp find_date_and_put_job([head | rest], date, job) do
307
    {state, new_rest} = find_date_and_put_job(rest, date, job)
4✔
308
    {state, [head | new_rest]}
309
  end
310
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc