• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / a6b020b0543299a4d61fbcdaa14cbd0e58ea393e

24 Aug 2025 03:59AM UTC coverage: 90.729% (-1.7%) from 92.382%
a6b020b0543299a4d61fbcdaa14cbd0e58ea393e

push

github

ananthakumaran
bump version

1194 of 1316 relevant lines covered (90.73%)

706.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.0
/lib/exq/node/server.ex
1
defmodule Exq.Node.Server do
2
  use GenServer
3
  require Logger
4
  alias Exq.Support.Config
5
  alias Exq.Support.Time
6
  alias Exq.Redis.JobStat
7
  alias Exq.Support.Node
8

9
  defmodule State do
10
    defstruct [
11
      :node,
12
      :interval,
13
      :namespace,
14
      :redis,
15
      :node_id,
16
      :manager,
17
      :workers_sup,
18
      ping_count: 0
19
    ]
20
  end
21

22
  def start_link(options) do
23
    node_id = Keyword.get(options, :node_id, Config.node_identifier().node_id())
120✔
24

25
    GenServer.start_link(
120✔
26
      __MODULE__,
27
      %State{
28
        manager: Keyword.fetch!(options, :manager),
29
        workers_sup: Keyword.fetch!(options, :workers_sup),
30
        node_id: node_id,
31
        node: build_node(node_id),
32
        namespace: Keyword.fetch!(options, :namespace),
33
        redis: Keyword.fetch!(options, :redis),
34
        interval: 5000
35
      },
36
      []
37
    )
38
  end
39

40
  def init(state) do
41
    :ok = schedule_ping(state.interval)
120✔
42
    {:ok, state}
43
  end
44

45
  def handle_info(
46
        :ping,
47
        %{
48
          node: node,
49
          namespace: namespace,
50
          redis: redis,
51
          manager: manager,
52
          workers_sup: workers_sup
53
        } = state
54
      ) do
55
    {:ok, queues} = Exq.subscriptions(manager)
×
56
    busy = Exq.Worker.Supervisor.workers_count(workers_sup)
×
57
    node = %{node | queues: queues, busy: busy, quiet: Enum.empty?(queues)}
×
58

59
    :ok =
×
60
      JobStat.node_ping(redis, namespace, node)
61
      |> process_signal(state)
62

63
    if Integer.mod(state.ping_count, 10) == 0 do
×
64
      JobStat.prune_dead_nodes(redis, namespace)
×
65
    end
66

67
    :ok = schedule_ping(state.interval)
×
68
    {:noreply, %{state | ping_count: state.ping_count + 1}}
×
69
  end
70

71
  def handle_info(msg, state) do
72
    Logger.error("Received unexpected info message in #{__MODULE__} #{inspect(msg)}")
×
73
    {:noreply, state}
74
  end
75

76
  defp process_signal(nil, _), do: :ok
×
77

78
  defp process_signal("TSTP", state) do
79
    Logger.info("Received TSTP, unsubscribing from all queues")
×
80
    :ok = Exq.unsubscribe_all(state.manager)
×
81
  end
82

83
  defp process_signal(unknown, _) do
84
    Logger.warning("Received unsupported signal #{unknown}")
×
85
    :ok
86
  end
87

88
  defp schedule_ping(interval) do
89
    _reference = Process.send_after(self(), :ping, interval)
120✔
90
    :ok
91
  end
92

93
  defp build_node(node_id) do
94
    {:ok, hostname} = :inet.gethostname()
120✔
95

96
    %Node{
120✔
97
      hostname: to_string(hostname),
120✔
98
      started_at: Time.unix_seconds(),
99
      pid: List.to_string(:os.getpid()),
100
      identity: node_id
101
    }
102
  end
103
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc