• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.21
/lib/exq/worker_drainer/server.ex
1
defmodule Exq.WorkerDrainer.Server do
2
  @moduledoc """
3
  The WorkerDrainer server is responsible for gracefully draining
4
  workers when the application is shutting down.
5

6
  When shutdown starts it instructs the Manager to stop accepting new jobs and
7
  then waits for all currently in progress jobs to complete.
8

9
  If the jobs do not complete within an allowed timeout the WorkerDrainer
10
  will shut down, allowing the rest of the supervision tree (including the
11
  remaining workers) to then shut down.
12

13
  The length of the grace period can be configured with the
14
  `shutdown_timeout` option, which defaults to 5000 ms.
15
  """
16

17
  use GenServer
18
  alias Exq.{Worker, Manager}
19

20
  defstruct name: Exq,
21
            shutdown_timeout: 5000
22

23
  def server_name(name) do
24
    name = name || Exq.Support.Config.get(:name)
121✔
25
    "#{name}.WorkerDrainer" |> String.to_atom()
121✔
26
  end
27

28
  ## ===========================================================
29
  ## GenServer callbacks
30
  ## ===========================================================
31

32
  def start_link(opts \\ []) do
33
    GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
121✔
34
  end
35

36
  def init(opts) do
37
    Process.flag(:trap_exit, true)
121✔
38
    state = struct(__MODULE__, opts)
121✔
39
    {:ok, state}
40
  end
41

42
  def terminate(:shutdown, state) do
43
    drain_workers(state)
121✔
44
  end
45

46
  def terminate({:shutdown, _}, state) do
47
    drain_workers(state)
×
48
  end
49

50
  def terminate(:normal, state) do
51
    drain_workers(state)
×
52
  end
53

UNCOV
54
  def terminate(_, _) do
×
55
    :ok
56
  end
57

58
  ## ===========================================================
59
  ## Internal Functions
60
  ## ===========================================================
61

62
  defp drain_workers(state) do
63
    timer_ref = :erlang.start_timer(state.shutdown_timeout, self(), :end_of_grace_period)
121✔
64

65
    :ok =
121✔
66
      state.name
121✔
67
      |> Manager.Server.server_name()
68
      |> Exq.unsubscribe_all()
69

70
    state.name
121✔
71
    |> Worker.Supervisor.supervisor_name()
72
    |> Worker.Supervisor.workers()
73
    |> Enum.map(&Process.monitor(elem(&1, 1)))
21✔
74
    |> Enum.into(MapSet.new())
75
    |> await_workers(timer_ref)
121✔
76
  end
77

78
  defp await_workers(%{map: refs}, _) when map_size(refs) == 0 do
120✔
79
    :ok
80
  end
81

82
  defp await_workers(worker_refs, timer_ref) do
83
    receive do
21✔
84
      {:DOWN, downed_ref, _, _, _} ->
85
        worker_refs
86
        |> MapSet.delete(downed_ref)
87
        |> await_workers(timer_ref)
20✔
88

89
      # Not all workers finished within grace period
90
      {:timeout, ^timer_ref, :end_of_grace_period} ->
1✔
91
        :ok
92
    end
93
  end
94
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc