• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/lib/exq/redis/job_stat.ex
1
defmodule Exq.Redis.JobStat do
2
  @moduledoc """
3
  The JobStat module encapsulates storing system-wide stats on top of Redis
4
  It aims to be compatible with the Sidekiq stats format.
5
  """
6

7
  require Logger
8
  alias Exq.Support.{Binary, Process, Job, Time, Node}
9
  alias Exq.Redis.{Connection, JobQueue}
10

11
  def record_processed_commands(namespace, _job, current_date \\ DateTime.utc_now()) do
12
    {time, date} = Time.format_current_date(current_date)
1,088✔
13

14
    [
15
      ["INCR", JobQueue.full_key(namespace, "stat:processed")],
16
      ["INCR", JobQueue.full_key(namespace, "stat:processed_rt:#{time}")],
1,088✔
17
      ["EXPIRE", JobQueue.full_key(namespace, "stat:processed_rt:#{time}"), 120],
1,088✔
18
      ["INCR", JobQueue.full_key(namespace, "stat:processed:#{date}")]
1,088✔
19
    ]
20
  end
21

22
  def record_processed(redis, namespace, job, current_date \\ DateTime.utc_now()) do
23
    instr = record_processed_commands(namespace, job, current_date)
5✔
24
    {:ok, [count, _, _, _]} = Connection.qp(redis, instr)
5✔
25
    {:ok, count}
26
  end
27

28
  def record_failure_commands(namespace, _error, _job, current_date \\ DateTime.utc_now()) do
29
    {time, date} = Time.format_current_date(current_date)
229✔
30

31
    [
32
      ["INCR", JobQueue.full_key(namespace, "stat:failed")],
33
      ["INCR", JobQueue.full_key(namespace, "stat:failed_rt:#{time}")],
229✔
34
      ["EXPIRE", JobQueue.full_key(namespace, "stat:failed_rt:#{time}"), 120],
229✔
35
      ["INCR", JobQueue.full_key(namespace, "stat:failed:#{date}")]
229✔
36
    ]
37
  end
38

39
  def record_failure(redis, namespace, error, job, current_date \\ DateTime.utc_now()) do
40
    instr = record_failure_commands(namespace, error, job, current_date)
4✔
41
    {:ok, [count, _, _, _]} = Connection.qp(redis, instr)
4✔
42
    {:ok, count}
43
  end
44

45
  def add_process_commands(namespace, process_info, serialized_process \\ nil) do
46
    serialized = serialized_process || Exq.Support.Process.encode(process_info)
1,314✔
47
    [["HSET", workers_key(namespace, process_info.host), process_info.pid, serialized]]
1,314✔
48
  end
49

50
  def add_process(redis, namespace, process_info, serialized_process \\ nil) do
51
    instr = add_process_commands(namespace, process_info, serialized_process)
5✔
52
    Connection.qp!(redis, instr)
5✔
53
    :ok
54
  end
55

56
  def remove_process_commands(namespace, process_info) do
1,309✔
57
    [["HDEL", workers_key(namespace, process_info.host), process_info.pid]]
1,309✔
58
  end
59

60
  def remove_process(redis, namespace, process_info) do
61
    instr = remove_process_commands(namespace, process_info)
1✔
62
    Connection.qp!(redis, instr)
1✔
63
    :ok
64
  end
65

66
  def cleanup_processes(redis, namespace, host) do
67
    Connection.del!(redis, workers_key(namespace, host))
133✔
68
    :ok
69
  end
70

71
  def node_ping(redis, namespace, node) do
72
    key = node_info_key(namespace, node.identity)
13✔
73

74
    case Connection.qp(
13✔
75
           redis,
76
           [
77
             ["MULTI"],
78
             ["SADD", nodes_key(namespace), node.identity],
13✔
79
             [
80
               "HMSET",
81
               key,
82
               "info",
83
               Node.encode(node),
84
               "busy",
85
               node.busy,
13✔
86
               "beat",
87
               Time.unix_seconds(),
88
               "quiet",
89
               node.quiet
13✔
90
             ],
91
             ["EXPIRE", key, 60],
92
             ["RPOP", "#{key}-signals"],
13✔
93
             ["EXEC"]
94
           ]
95
         ) do
96
      {:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", "QUEUED", [_, "OK", 1, signal]]} ->
97
        signal
13✔
98

99
      error ->
100
        Logger.error("Failed to send node stats. Unexpected error from redis: #{inspect(error)}")
×
101

102
        nil
103
    end
104
  end
105

106
  def node_signal(redis, namespace, node_id, signal_name) do
107
    key = node_info_key(namespace, node_id)
1✔
108
    signal_key = "#{key}-signals"
1✔
109

110
    case Connection.qp(redis, [
1✔
111
           ["MULTI"],
112
           ["LPUSH", signal_key, signal_name],
113
           ["EXPIRE", signal_key, 60],
114
           ["EXEC"]
115
         ]) do
116
      {:ok, ["OK", "QUEUED", "QUEUED", [_, 1]]} -> :ok
1✔
117
      error -> error
×
118
    end
119
  end
120

121
  def node_ids(redis, namespace) do
122
    Connection.smembers!(redis, nodes_key(namespace))
23✔
123
  end
124

125
  def nodes(redis, namespace) do
126
    commands =
2✔
127
      node_ids(redis, namespace)
128
      |> Enum.map(fn node_id -> ["HGET", node_info_key(namespace, node_id), "info"] end)
2✔
129

130
    if Enum.empty?(commands) do
2✔
131
      []
132
    else
133
      Connection.qp!(redis, commands)
134
      |> Enum.flat_map(fn result ->
1✔
135
        if result && result != "" do
2✔
136
          [Node.decode(result)]
137
        else
138
          []
139
        end
140
      end)
141
    end
142
  end
143

144
  def prune_dead_nodes(redis, namespace) do
145
    node_ids = node_ids(redis, namespace)
2✔
146

147
    commands =
2✔
148
      node_ids
149
      |> Enum.map(fn node_id -> ["HEXISTS", node_info_key(namespace, node_id), "info"] end)
4✔
150

151
    if Enum.empty?(commands) do
2✔
152
      []
153
    else
154
      dead_node_ids =
2✔
155
        Connection.qp!(redis, commands)
156
        |> Enum.zip(node_ids)
157
        |> Enum.flat_map(fn {exists, node_id} ->
158
          if exists == 0 do
4✔
159
            [node_id]
160
          else
161
            []
162
          end
163
        end)
164

165
      if !Enum.empty?(dead_node_ids) do
2✔
166
        commands = [
1✔
167
          ["SREM", nodes_key(namespace)] ++ dead_node_ids,
168
          ["DEL"] ++ Enum.map(node_ids, &workers_key(namespace, &1))
2✔
169
        ]
170

171
        Connection.qp(redis, commands)
1✔
172
      end
173
    end
174
  end
175

176
  def busy(redis, namespace) do
177
    commands =
2✔
178
      node_ids(redis, namespace)
179
      |> Enum.map(fn node_id -> ["HGET", node_info_key(namespace, node_id), "busy"] end)
1✔
180

181
    if Enum.empty?(commands) do
2✔
182
      0
183
    else
184
      Connection.qp!(redis, commands)
185
      |> Enum.reduce(0, fn count, sum -> sum + decode_integer(count) end)
1✔
186
    end
187
  end
188

189
  def processes(redis, namespace) do
190
    commands =
14✔
191
      node_ids(redis, namespace)
192
      |> Enum.map(fn node_id -> ["HVALS", workers_key(namespace, node_id)] end)
15✔
193

194
    if Enum.empty?(commands) do
14✔
195
      []
196
    else
197
      Connection.qp!(redis, commands)
198
      |> List.flatten()
199
      |> Enum.map(&Process.decode/1)
12✔
200
    end
201
  end
202

203
  def find_failed(redis, namespace, jid) do
204
    redis
205
    |> Connection.zrange!(JobQueue.full_key(namespace, "dead"), 0, -1)
206
    |> JobQueue.search_jobs(jid)
8✔
207
  end
208

209
  def find_failed(redis, namespace, score, jid, options) do
210
    find_by_score_and_jid(redis, JobQueue.full_key(namespace, "dead"), score, jid, options)
1✔
211
  end
212

213
  def find_retry(redis, namespace, score, jid, options) do
214
    find_by_score_and_jid(redis, JobQueue.full_key(namespace, "retry"), score, jid, options)
1✔
215
  end
216

217
  def find_scheduled(redis, namespace, score, jid, options) do
218
    find_by_score_and_jid(redis, JobQueue.full_key(namespace, "schedule"), score, jid, options)
1✔
219
  end
220

221
  def remove_queue(redis, namespace, queue) do
222
    Connection.qp(redis, [
4✔
223
      ["SREM", JobQueue.full_key(namespace, "queues"), queue],
224
      ["DEL", JobQueue.queue_key(namespace, queue)]
225
    ])
226
  end
227

228
  def remove_failed(redis, namespace, jid) do
229
    {:ok, failure} = find_failed(redis, namespace, jid)
2✔
230

231
    Connection.qp(redis, [
2✔
232
      ["DECR", JobQueue.full_key(namespace, "stat:failed")],
233
      ["ZREM", JobQueue.full_key(namespace, "dead"), Job.encode(failure)]
234
    ])
235
  end
236

237
  def clear_failed(redis, namespace) do
238
    Connection.qp(redis, [
2✔
239
      ["SET", JobQueue.full_key(namespace, "stat:failed"), 0],
240
      ["DEL", JobQueue.full_key(namespace, "dead")]
241
    ])
242
  end
243

244
  def clear_processes(redis, namespace) do
245
    commands =
×
246
      node_ids(redis, namespace)
247
      |> Enum.map(fn node_id -> ["DEL", workers_key(namespace, node_id)] end)
×
248

249
    if Enum.empty?(commands) do
×
250
      0
251
    else
252
      Connection.qp!(redis, commands)
×
253
    end
254
  end
255

256
  def realtime_stats(redis, namespace) do
257
    failure_keys = realtime_stats_scanner(redis, JobQueue.full_key(namespace, "stat:failed_rt:*"))
4✔
258

259
    success_keys =
4✔
260
      realtime_stats_scanner(redis, JobQueue.full_key(namespace, "stat:processed_rt:*"))
261

262
    formatter = realtime_stats_formatter(redis, namespace)
4✔
263
    failures = formatter.(failure_keys, "stat:failed_rt:")
4✔
264
    successes = formatter.(success_keys, "stat:processed_rt:")
4✔
265

266
    {:ok, failures, successes}
4✔
267
  end
268

269
  defp realtime_stats_scanner(redis, namespace) do
270
    {:ok, [[cursor, result]]} =
8✔
271
      Connection.qp(redis, [["SCAN", 0, "MATCH", namespace, "COUNT", 1_000]])
272

273
    realtime_stats_scan_keys(redis, namespace, cursor, result)
8✔
274
  end
275

276
  defp realtime_stats_scan_keys(_redis, _namespace, "0", accumulator) do
277
    accumulator
8✔
278
  end
279

280
  defp realtime_stats_scan_keys(redis, namespace, cursor, accumulator) do
281
    {:ok, [[new_cursor, result]]} =
×
282
      Connection.qp(redis, [["SCAN", cursor, "MATCH", namespace, "COUNT", 1_000]])
283

284
    realtime_stats_scan_keys(redis, namespace, new_cursor, accumulator ++ result)
×
285
  end
286

287
  defp realtime_stats_formatter(redis, namespace) do
288
    fn keys, ns ->
4✔
289
      if Enum.empty?(keys) do
8✔
290
        []
291
      else
292
        {:ok, counts} = Connection.qp(redis, Enum.map(keys, &["GET", &1]))
4✔
293

294
        Enum.map(keys, &Binary.take_prefix(&1, JobQueue.full_key(namespace, ns)))
6✔
295
        |> Enum.zip(counts)
4✔
296
      end
297
    end
298
  end
299

300
  def get_count(redis, namespace, key) do
301
    Connection.get!(redis, JobQueue.full_key(namespace, "stat:#{key}"))
4✔
302
    |> decode_integer()
4✔
303
  end
304

305
  def get_counts(redis, namespace, keys) do
306
    {:ok, results} =
2✔
307
      Connection.q(redis, ["MGET" | Enum.map(keys, &JobQueue.full_key(namespace, "stat:#{&1}"))])
3✔
308

309
    Enum.map(results, &decode_integer/1)
2✔
310
  end
311

UNCOV
312
  def decode_integer(:undefined), do: 0
×
313
  def decode_integer(nil), do: 0
3✔
314
  def decode_integer(count) when is_integer(count), do: count
×
315

316
  def decode_integer(count) when is_binary(count) do
317
    {count, _} = Integer.parse(count)
5✔
318
    count
5✔
319
  end
320

321
  defp find_by_score_and_jid(redis, zset, score, jid, options) do
322
    redis
323
    |> Connection.zrangebyscore!(zset, score, score)
324
    |> JobQueue.search_jobs(jid, !Keyword.get(options, :raw, false))
3✔
325
  end
326

327
  defp workers_key(namespace, node_id) do
328
    JobQueue.full_key(namespace, "#{node_id}:workers")
2,773✔
329
  end
330

331
  defp nodes_key(namespace) do
332
    "#{namespace}:processes"
37✔
333
  end
334

335
  defp node_info_key(namespace, node_id) do
336
    "#{namespace}:#{node_id}"
21✔
337
  end
338
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc