• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.96
/lib/exq/redis/heartbeat.ex
1
defmodule Exq.Redis.Heartbeat do
2
  require Logger
3
  alias Exq.Redis.Connection
4
  alias Exq.Redis.JobQueue
5
  alias Exq.Redis.Script
6

7
  def register(redis, namespace, node_id) do
8
    score = DateTime.to_unix(DateTime.utc_now(), :millisecond) / 1000
400✔
9

10
    case Connection.qp(redis, [
400✔
11
           ["MULTI"],
12
           ["ZREM", sorted_set_key(namespace), node_id],
13
           ["ZADD", sorted_set_key(namespace), score, node_id],
14
           ["EXEC"]
15
         ]) do
16
      {:ok, ["OK", "QUEUED", "QUEUED", [_, 1]]} ->
393✔
17
        :ok
18

19
      error ->
20
        Logger.error("Failed to send heartbeat. Unexpected error from redis: #{inspect(error)}")
3✔
21
        error
3✔
22
    end
23
  end
24

25
  def unregister(redis, namespace, node_id) do
26
    case Connection.zrem(redis, sorted_set_key(namespace), node_id) do
11✔
27
      {:ok, _} ->
11✔
28
        :ok
29

30
      error ->
31
        Logger.error(
×
32
          "Failed to clear old heartbeat. Unexpected error from redis: #{inspect(error)}"
33
        )
34

35
        error
×
36
    end
37
  end
38

39
  def re_enqueue_backup(redis, namespace, node_id, queue, current_score) do
40
    resp =
14✔
41
      Script.eval!(
42
        redis,
43
        :heartbeat_re_enqueue_backup,
44
        [
45
          JobQueue.backup_queue_key(namespace, node_id, queue),
46
          JobQueue.queue_key(namespace, queue),
47
          sorted_set_key(namespace)
48
        ],
49
        [node_id, current_score, 10]
50
      )
51

52
    case resp do
14✔
53
      {:ok, [remaining, moved]} ->
54
        if moved > 0 do
14✔
55
          Logger.info(
4✔
56
            "Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
4✔
57
          )
58
        end
59

60
        if remaining > 0 do
14✔
61
          re_enqueue_backup(redis, namespace, node_id, queue, current_score)
1✔
62
        end
63

UNCOV
64
      _ ->
×
65
        nil
66
    end
67
  end
68

69
  def dead_nodes(redis, namespace, interval, missed_heartbeats_allowed) do
70
    score = DateTime.to_unix(DateTime.utc_now(), :millisecond) / 1000
180✔
71
    cutoff = score - interval / 1000 * (missed_heartbeats_allowed + 1)
180✔
72
    cutoff = Enum.max([0, cutoff])
180✔
73

74
    with {:ok, results} <-
180✔
75
           Connection.zrangebyscorewithscore(redis, sorted_set_key(namespace), 0, cutoff) do
76
      {:ok,
77
       Enum.chunk_every(results, 2)
78
       |> Map.new(fn [k, v] -> {k, v} end)}
13✔
79
    end
80
  end
81

82
  defp sorted_set_key(namespace) do
83
    "#{namespace}:heartbeats"
1,005✔
84
  end
85
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc