• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.0
/lib/exq/node/server.ex
1
defmodule Exq.Node.Server do
2
  use GenServer
3
  require Logger
4
  alias Exq.Support.Config
5
  alias Exq.Support.Time
6
  alias Exq.Redis.JobStat
7
  alias Exq.Support.Node
8

9
  defmodule State do
10
    defstruct [
11
      :node,
12
      :interval,
13
      :namespace,
14
      :redis,
15
      :node_id,
16
      :manager,
17
      :workers_sup,
18
      ping_count: 0
19
    ]
20
  end
21

22
  def start_link(options) do
23
    node_id = Keyword.get(options, :node_id, Config.node_identifier().node_id())
121✔
24

25
    GenServer.start_link(
121✔
26
      __MODULE__,
27
      %State{
28
        manager: Keyword.fetch!(options, :manager),
29
        workers_sup: Keyword.fetch!(options, :workers_sup),
30
        node_id: node_id,
31
        node: build_node(node_id),
32
        namespace: Keyword.fetch!(options, :namespace),
33
        redis: Keyword.fetch!(options, :redis),
34
        interval: 5000
35
      },
36
      []
37
    )
38
  end
39

40
  def init(state) do
41
    :ok = schedule_ping(state.interval)
121✔
42
    {:ok, state}
43
  end
44

45
  def handle_info(
46
        :ping,
47
        %{
48
          node: node,
49
          namespace: namespace,
50
          redis: redis,
51
          manager: manager,
52
          workers_sup: workers_sup
53
        } = state
54
      ) do
55
    {:ok, queues} = Exq.subscriptions(manager)
×
56
    busy = Exq.Worker.Supervisor.workers_count(workers_sup)
×
57
    node = %{node | queues: queues, busy: busy, quiet: Enum.empty?(queues)}
×
58

59
    :ok =
×
60
      JobStat.node_ping(redis, namespace, node)
61
      |> process_signal(state)
62

63
    if Integer.mod(state.ping_count, 10) == 0 do
×
64
      JobStat.prune_dead_nodes(redis, namespace)
×
65
    end
66

67
    :ok = schedule_ping(state.interval)
×
68
    {:noreply, %{state | ping_count: state.ping_count + 1}}
×
69
  end
70

71
  def handle_info(msg, state) do
72
    Logger.error("Received unexpected info message in #{__MODULE__} #{inspect(msg)}")
×
73
    {:noreply, state}
74
  end
75

UNCOV
76
  defp process_signal(nil, _), do: :ok
×
77

78
  defp process_signal("TSTP", state) do
79
    Logger.info("Received TSTP, unsubscribing from all queues")
×
80
    :ok = Exq.unsubscribe_all(state.manager)
×
81
  end
82

83
  defp process_signal(unknown, _) do
84
    Logger.warning("Received unsupported signal #{unknown}")
×
85
    :ok
86
  end
87

88
  defp schedule_ping(interval) do
89
    _reference = Process.send_after(self(), :ping, interval)
121✔
90
    :ok
91
  end
92

93
  defp build_node(node_id) do
94
    {:ok, hostname} = :inet.gethostname()
121✔
95

96
    %Node{
121✔
97
      hostname: to_string(hostname),
121✔
98
      started_at: Time.unix_seconds(),
99
      pid: List.to_string(:os.getpid()),
100
      identity: node_id
101
    }
102
  end
103
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc