• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 8eccca269612abd7fd80016b24a2c7e92f25c16f-PR-507

02 Nov 2025 09:35AM UTC coverage: 87.509% (-4.0%) from 91.502%
8eccca269612abd7fd80016b24a2c7e92f25c16f-PR-507

Pull #507

github

ananthakumaran
Add ability to cancel running jobs
Pull Request #507: Add ability to cancel running jobs

13 of 40 new or added lines in 5 files covered. (32.5%)

29 existing lines in 10 files now uncovered.

1177 of 1345 relevant lines covered (87.51%)

614.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/lib/exq/node/server.ex
1
defmodule Exq.Node.Server do
2
  use GenServer
3
  require Logger
4
  alias Exq.Support.Config
5
  alias Exq.Support.Time
6
  alias Exq.Redis.JobStat
7
  alias Exq.Support.Node
8
  alias Exq.Serializers.JsonSerializer
9
  alias Exq.Worker.Metadata
10
  alias Exq.Support.Job
11
  alias Exq.Worker.Server
12

13
  defmodule State do
14
    defstruct [
15
      :node,
16
      :interval,
17
      :namespace,
18
      :redis,
19
      :node_id,
20
      :manager,
21
      :metadata,
22
      :workers_sup,
23
      ping_count: 0
24
    ]
25
  end
26

27
  def start_link(options) do
28
    node_id = Keyword.get(options, :node_id, Config.node_identifier().node_id())
122✔
29

30
    GenServer.start_link(
122✔
31
      __MODULE__,
32
      %State{
33
        manager: Keyword.fetch!(options, :manager),
34
        metadata: Keyword.fetch!(options, :metadata),
35
        workers_sup: Keyword.fetch!(options, :workers_sup),
36
        node_id: node_id,
37
        node: build_node(node_id),
38
        namespace: Keyword.fetch!(options, :namespace),
39
        redis: Keyword.fetch!(options, :redis),
40
        interval: 5000
41
      },
42
      []
43
    )
44
  end
45

46
  def init(state) do
47
    :ok = schedule_ping(state.interval)
122✔
48
    {:ok, state}
49
  end
50

51
  def handle_info(
52
        :ping,
53
        %{
54
          node: node,
55
          namespace: namespace,
56
          redis: redis,
57
          manager: manager,
58
          workers_sup: workers_sup
59
        } = state
60
      ) do
61
    {:ok, queues} = Exq.subscriptions(manager)
1✔
62
    busy = Exq.Worker.Supervisor.workers_count(workers_sup)
1✔
63
    node = %{node | queues: queues, busy: busy, quiet: Enum.empty?(queues)}
1✔
64

65
    :ok =
1✔
66
      JobStat.node_ping(redis, namespace, node)
67
      |> process_signals(state)
68

69
    if Integer.mod(state.ping_count, 10) == 0 do
1✔
70
      JobStat.prune_dead_nodes(redis, namespace)
1✔
71
    end
72

73
    :ok = schedule_ping(state.interval)
1✔
74
    {:noreply, %{state | ping_count: state.ping_count + 1}}
1✔
75
  end
76

77
  def handle_info(msg, state) do
78
    Logger.error("Received unexpected info message in #{__MODULE__} #{inspect(msg)}")
×
79
    {:noreply, state}
80
  end
81

82
  defp process_signals(signals, state) do
83
    Enum.each(signals, fn signal ->
1✔
NEW
84
      :ok = process_signal(signal, state)
×
85
    end)
86

87
    :ok
88
  end
89

90
  defp process_signal("TSTP", state) do
91
    Logger.info("Received TSTP, unsubscribing from all queues")
×
92
    :ok = Exq.unsubscribe_all(state.manager)
×
93
  end
94

95
  # Make sure the process is running the jid before canceling the
96
  # job. We don't want to send cancel message to unknown process,
97
  # which could happen if we process the signals after a restart, in
98
  # that case, the pid could point to a completely unrelated process.
99
  defp process_signal("CANCEL:" <> args, state) do
NEW
100
    case JsonSerializer.decode(args) do
×
101
      {:ok, %{"pid" => "#PID" <> worker_pid_string, "jid" => jid}} ->
NEW
102
        worker_pid = :erlang.list_to_pid(~c"#{worker_pid_string}")
×
103

NEW
104
        case Process.info(worker_pid, :links) do
×
105
          {:links, links} when length(links) <= 10 ->
NEW
106
            job_pid =
×
107
              Enum.find(links, fn link ->
NEW
108
                match?(%Job{jid: ^jid}, Metadata.lookup(state.metadata, link))
×
109
              end)
110

NEW
111
            if job_pid do
×
NEW
112
              Server.cancel(worker_pid)
×
NEW
113
              Logger.info("Canceled jid #{jid}")
×
114
            else
NEW
115
              Logger.warning("Not able to find worker process to cancel")
×
116
            end
117

118
          nil ->
NEW
119
            Logger.warning("Not able to find worker process to cancel")
×
120
        end
121

122
      _ ->
NEW
123
        Logger.warning("Received invalid args for cancel, args: #{args}")
×
124
    end
125

126
    :ok
127
  end
128

129
  defp process_signal(unknown, _) do
130
    Logger.warning("Received unsupported signal #{unknown}")
×
131
    :ok
132
  end
133

134
  defp schedule_ping(interval) do
135
    _reference = Process.send_after(self(), :ping, interval)
123✔
136
    :ok
137
  end
138

139
  defp build_node(node_id) do
140
    {:ok, hostname} = :inet.gethostname()
122✔
141

142
    %Node{
122✔
143
      hostname: to_string(hostname),
122✔
144
      started_at: Time.unix_seconds(),
145
      pid: List.to_string(:os.getpid()),
146
      identity: node_id
147
    }
148
  end
149
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc