• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.0
/lib/exq/manager/server.ex
1
defmodule Exq.Manager.Server do
2
  @moduledoc """
3
  The Manager module is the main orchestrator for the system.
4

5
  It is also the entry point Pid process used by the client to interact
6
  with the Exq system.
7

8
  It's responsibilities include:
9
    * Handle interaction with client and delegate to responsible sub-system
10
    * Initial Setup of Redis Connection (to be moved to supervisor?).
11
    * Setup and tracking of in-progress workers / jobs.
12
    * Poll Redis for new jobs for any queues that have available workers.
13
    * Handling of queue state and subscriptions (addition and removal)
14
    * Initial re-hydration of backup queue on system restart to handle any
15
      orphan jobs from last system stop.
16

17
  The Manager is a GenServer with a timed process loop.
18

19
  ## Options
20
    * `:concurrency` - Default max number of workers to use if not passed in for each queue.
21
    * `:genserver_timeout` - Timeout to use for GenServer calls.
22
    * `:max_retries` - Maximum number of times to retry a failed job
23
    * `:name` - Name of target registered process
24
    * `:namespace` - Redis namespace to store all data under. Defaults to "exq".
25
    * `:queues` - List of queues to monitor. Can be an array of queues names such as ["q1", "q2"], or
26
      array of tuples with queue and max number of concurrent workers: [{"q1", 1}, {"q2", 20}].
27
      If only an array is passed in, system will use the default `concurrency` value for each queue.
28
    * `:redis_timeout` - Timeout to use for Redis commands.
29
    * `:poll_timeout` - How often to poll Redis for jobs.
30
    * `:scheduler_enable` - Whether scheduler / retry process should be enabled. This defaults
31
      to true.  Note that is you turn this off, job retries will not be enqueued.
32
    * `:scheduler_poll_timeout` - How often to poll Redis for scheduled / retry jobs.
33
    * `:shutdown_timeout` - The number of milliseconds to wait for workers to finish processing jobs
34
      when the application is shutting down
35

36
  ## Redis Options (TODO - move to supervisor after refactor):
37
    * `:host` - Host name for Redis server (defaults to '127.0.0.1')
38
    * `:port` - Redis port (defaults to 6379)
39
    * `:database` - Redis Database number (used for isolation. Defaults to 0).
40
    * `:password` - Redis authentication password (optional, off by default).
41
    * `:redis_options` - Additional options provided to Redix
42
    * TODO: What about max_reconnection_attempts
43

44
  ## Job lifecycle
45

46
  The job lifecycle starts with an enqueue of a job. This can be done either
47
  via Exq or another system like Sidekiq / Resque.
48

49
  Note that the JobQueue encapsulates much of this logic.
50

51
  Client (Exq) -> Manager -> Enqueuer
52

53
  Assuming Exq is used to Enqueue an immediate job, the following is the flow:
54

55
    1. Client calls Exq.enqueue(Exq, "queue_name", Worker, ["arg1", "arg2"])
56

57
    2. Manager delegates to Enqueuer
58

59
    3. Enqueuer does the following:
60
      * Adds the queue to the "queues" list if not already there.
61
      * Prepare a job struct with a generated UUID and convert to JSON.
62
      * Push the job into the correct queue
63
      * Respond to client with the generated job UUID.
64

65
  At this point the job is in the correct queue ready to be dequeued.
66

67
  Manager deq Redis -> Worker (decode & execute job) --> Manager (record)
68
                                                     |
69
                                                     --> Stats (record stats)
70

71
  The dequeueing of the job is as follows:
72
    1. The Manager is on a polling cycle, and the :timeout message fires.
73

74
    2. Manager tabulates a list of active queues with available workers.
75

76
    3. Uses the JobQueue module to fetch jobs. The JobQueue module does this through
77
       a single MULT RPOPLPUSH command issued to Redis with the targeted queue.
78

79
       This command atomically pops an item off the queue and stores the item in a backup queue.
80
       The backup queue is keyed off the queue and node id, so each node would
81
       have their own backup queue.
82

83
       Note that we cannot use a blocking pop since BRPOPLPUSH (unlike BRPOP) is more
84
       limited and can only handle a single queue target (see filed issues in Redis / Sidekiq).
85

86
    4. Once the jobs are returned to the manager, the manager goes through each job
87
       and creates and kicks off an ephemeral Worker process that will handle the job.
88
       The manager also does some tabulation to reduce the worker count for those queues.
89

90
    5. The worker parses the JSON object, and figures out the worker to call.
91
       It also tells Stats to record a itself in process.
92

93
    6. The worker then calls "apply" on the correct target module, and tracks the failure
94
       or success of the job. Once the job is finished, it tells the Manager and Stats.
95

96
    7. If the job is successful, Manager and Stats simply mark the success of the job.
97

98
       If the job fails, the Worker module uses the JobQueue module to retry the job if necessary.
99
       The retry is done by adding the job to a "retry" queue which is a Sorted Set in Redis.
100
       The job is marked with the retry count and scheduled date (using exponential backup).
101
       The job is then removed from the backup queue.
102

103
    8. If any jobs were fetched from Redis, the Manager will poll again immediately, otherwise
104
       if will use the poll_timeout for the next polling.
105

106
  ## Retry / Schedule queue
107

108
  The retry / schedule queue provides functionality for scheduled jobs. This is used both
109
  for the `enqueue_in` method which allows a scheduled job in the future, as well
110
  as retry queue, which is used to retry jobs.
111
  """
112

113
  require Logger
114
  use GenServer
115
  alias Exq.Support.Config
116
  alias Exq.Support.Opts
117
  alias Exq.Redis.JobQueue
118
  alias Exq.Support.Redis
119

120
  @backoff_mult 10
121

122
  defmodule State do
123
    defstruct redis: nil,
124
              stats: nil,
125
              enqueuer: nil,
126
              pid: nil,
127
              node_id: nil,
128
              namespace: nil,
129
              dequeuers: nil,
130
              queues: nil,
131
              poll_timeout: nil,
132
              scheduler_poll_timeout: nil,
133
              workers_sup: nil,
134
              middleware: nil,
135
              metadata: nil,
136
              shutdown_timeout: nil
137
  end
138

139
  def start_link(opts \\ []) do
140
    GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
121✔
141
  end
142

143
  def job_terminated(exq, queue, success) do
144
    GenServer.cast(exq, {:job_terminated, queue, success})
1,324✔
145
    :ok
146
  end
147

148
  def server_name(nil), do: Config.get(:name)
351✔
149
  def server_name(name), do: name
18✔
150

151
  ## ===========================================================
152
  ## gen server callbacks
153
  ## ===========================================================
154

155
  def init(opts) do
156
    # Cleanup stale stats
157
    GenServer.cast(self(), :cleanup_host_stats)
121✔
158

159
    # Setup dequeues
160
    dequeuers = add_dequeuers(%{}, opts[:concurrency])
121✔
161

162
    state = %State{
121✔
163
      dequeuers: dequeuers,
164
      redis: opts[:redis],
165
      stats: opts[:stats],
166
      workers_sup: opts[:workers_sup],
167
      enqueuer: opts[:enqueuer],
168
      middleware: opts[:middleware],
169
      metadata: opts[:metadata],
170
      node_id: Config.node_identifier().node_id(),
121✔
171
      namespace: opts[:namespace],
172
      queues: opts[:queues],
173
      pid: self(),
174
      poll_timeout: opts[:poll_timeout],
175
      scheduler_poll_timeout: opts[:scheduler_poll_timeout],
176
      shutdown_timeout: opts[:shutdown_timeout]
177
    }
178

179
    check_redis_connection(opts)
121✔
180
    {:ok, state, 0}
121✔
181
  end
182

183
  def handle_call(:redis, _from, state) do
184
    {:reply, {state.redis, state.namespace}, state, 10}
1,357✔
185
  end
186

187
  def handle_call(:subscriptions, _from, state) do
188
    {:reply, {:ok, state.queues}, state, 0}
4✔
189
  end
190

191
  def handle_call({:subscribe, queue}, _from, state) do
192
    updated_state = add_queue(state, queue)
1✔
193
    {:reply, :ok, updated_state, 0}
1✔
194
  end
195

196
  def handle_call({:subscribe, queue, concurrency}, _from, state) do
197
    updated_state = add_queue(state, queue, concurrency)
1✔
198
    {:reply, :ok, updated_state, 0}
1✔
199
  end
200

201
  def handle_call({:unsubscribe, queue}, _from, state) do
202
    updated_state = remove_queue(state, queue)
2✔
203
    {:reply, :ok, updated_state, 0}
2✔
204
  end
205

206
  def handle_call(:unsubscribe_all, _from, state) do
207
    updated_state = remove_all_queues(state)
122✔
208
    {:reply, :ok, updated_state, 0}
122✔
209
  end
210

211
  def handle_cast({:re_enqueue_backup, queue}, state) do
212
    Redis.rescue_timeout(fn ->
131✔
213
      JobQueue.re_enqueue_backup(state.redis, state.namespace, state.node_id, queue)
131✔
214
    end)
215

216
    {:noreply, state, 0}
131✔
217
  end
218

219
  @doc """
220
  Cleanup host stats on boot
221
  """
222
  def handle_cast(:cleanup_host_stats, state) do
223
    Redis.rescue_timeout(fn ->
121✔
224
      Exq.Stats.Server.cleanup_host_stats(state.stats, state.namespace, state.node_id)
121✔
225
    end)
226

227
    {:noreply, state, 0}
121✔
228
  end
229

230
  def handle_cast({:job_terminated, queue, success}, state) do
231
    dequeuers =
1,308✔
232
      if success do
233
        maybe_call_dequeuer(state.dequeuers, queue, :processed)
1,083✔
234
      else
235
        maybe_call_dequeuer(state.dequeuers, queue, :failed)
225✔
236
      end
237

238
    {:noreply, %{state | dequeuers: dequeuers}, 0}
1,308✔
239
  end
240

241
  def handle_info(:timeout, state) do
242
    {updated_state, timeout} = dequeue_and_dispatch(state)
3,387✔
243
    {:noreply, updated_state, timeout}
3,387✔
244
  end
245

246
  def handle_info(_info, state) do
247
    {:noreply, state, state.poll_timeout}
×
248
  end
249

UNCOV
250
  def terminate(_reason, _state) do
×
251
    :ok
252
  end
253

254
  ## ===========================================================
255
  ## Internal Functions
256
  ## ===========================================================
257
  @doc """
258
  Dequeue jobs and dispatch to workers
259
  """
260
  def dequeue_and_dispatch(state) do
261
    case available_queues(state) do
3,387✔
262
      {[], state} ->
431✔
263
        {state, state.poll_timeout}
431✔
264

265
      {queues, state} ->
266
        result =
2,956✔
267
          Redis.rescue_timeout(
268
            fn ->
269
              Exq.Redis.JobQueue.dequeue(state.redis, state.namespace, state.node_id, queues)
2,956✔
270
            end,
271
            timeout_return_value: :timeout
272
          )
273

274
        case result do
2,956✔
UNCOV
275
          :timeout ->
×
276
            {state, state.poll_timeout}
×
277

278
          jobs ->
279
            {state, job_results} =
2,956✔
280
              Enum.reduce(jobs, {state, []}, fn potential_job, {state, results} ->
281
                {state, result} = dispatch_job(state, potential_job)
3,007✔
282
                {state, [result | results]}
283
              end)
284

285
            cond do
2,956✔
286
              Enum.any?(job_results, fn status -> elem(status, 1) == :dispatch end) ->
3,000✔
287
                {state, 0}
288

289
              Enum.any?(job_results, fn status -> elem(status, 0) == :error end) ->
1,651✔
290
                Logger.error("Redis Error #{Kernel.inspect(job_results)}}.  Backing off...")
×
291
                {state, state.poll_timeout * @backoff_mult}
×
292

293
              true ->
1,651✔
294
                {state, state.poll_timeout}
1,651✔
295
            end
296
        end
297
    end
298
  end
299

300
  @doc """
301
  Returns list of active queues with free workers
302
  """
303
  def available_queues(state) do
304
    Enum.reduce(state.queues, {[], state}, fn q, {queues, state} ->
3,387✔
305
      {available, dequeuers} =
3,182✔
306
        Map.get_and_update!(state.dequeuers, q, fn {module, state} ->
3,182✔
307
          {:ok, available, state} = module.available?(state)
3,182✔
308
          {available, {module, state}}
309
        end)
310

311
      state = %{state | dequeuers: dequeuers}
3,182✔
312

313
      if available do
3,182✔
314
        {[q | queues], state}
315
      else
316
        {queues, state}
317
      end
318
    end)
319
  end
320

321
  @doc """
322
  Dispatch job to worker if it is not empty
323
  Also update worker count for dispatched job
324
  """
325
  def dispatch_job(state, potential_job) do
326
    case potential_job do
3,007✔
327
      {:ok, {:none, _queue}} ->
1,698✔
328
        {state, {:ok, :none}}
329

330
      {:ok, {job, queue}} ->
331
        state = dispatch_job(state, job, queue)
1,309✔
332
        {state, {:ok, :dispatch}}
333

UNCOV
334
      {status, reason} ->
×
335
        {state, {:error, {status, reason}}}
336
    end
337
  end
338

339
  def dispatch_job(state, job, queue) do
340
    {:ok, worker} =
1,309✔
341
      Exq.Worker.Supervisor.start_child(
342
        state.workers_sup,
1,309✔
343
        [
344
          job,
345
          state.pid,
1,309✔
346
          queue,
347
          state.stats,
1,309✔
348
          state.namespace,
1,309✔
349
          state.node_id,
1,309✔
350
          state.redis,
1,309✔
351
          state.middleware,
1,309✔
352
          state.metadata
1,309✔
353
        ],
354
        shutdown_timeout: state.shutdown_timeout
1,309✔
355
      )
356

357
    Exq.Worker.Server.work(worker)
1,309✔
358
    %{state | dequeuers: maybe_call_dequeuer(state.dequeuers, queue, :dispatched)}
1,309✔
359
  end
360

361
  # Setup dequeuers from options / configs.
362

363
  # The following is done:
364
  #  * Sets up queues data structure with proper concurrency settings
365
  #  * Sets up :ets table for tracking workers
366
  #  * Re-enqueues any in progress jobs that were not finished the queues
367
  #  * Returns list of queues and work table
368
  # TODO: Refactor the way queues are setup
369

370
  defp add_dequeuers(dequeuers, specs) do
371
    Enum.into(specs, dequeuers, fn {queue, {module, opts}} ->
123✔
372
      GenServer.cast(self(), {:re_enqueue_backup, queue})
131✔
373
      {:ok, state} = module.init(%{queue: queue}, opts)
131✔
374
      {queue, {module, state}}
375
    end)
376
  end
377

378
  defp remove_dequeuers(dequeuers, queues) do
379
    Enum.reduce(queues, dequeuers, fn queue, dequeuers ->
124✔
380
      maybe_call_dequeuer(dequeuers, queue, :stop)
381
      |> Map.delete(queue)
131✔
382
    end)
383
  end
384

385
  defp maybe_call_dequeuer(dequeuers, queue, method) do
386
    if Map.has_key?(dequeuers, queue) do
2,748✔
387
      Map.update!(dequeuers, queue, fn {module, state} ->
2,727✔
388
        case apply(module, method, [state]) do
2,727✔
389
          {:ok, state} -> {module, state}
2,596✔
390
          :ok -> {module, nil}
131✔
391
        end
392
      end)
393
    else
394
      dequeuers
21✔
395
    end
396
  end
397

398
  defp add_queue(state, queue, concurrency \\ Config.get(:concurrency)) do
399
    queue_concurrency = {queue, Opts.cast_concurrency(concurrency)}
2✔
400

401
    %{
402
      state
403
      | queues: [queue | state.queues],
2✔
404
        dequeuers: add_dequeuers(state.dequeuers, [queue_concurrency])
2✔
405
    }
406
  end
407

408
  defp remove_queue(state, queue) do
409
    updated_queues = List.delete(state.queues, queue)
2✔
410
    %{state | queues: updated_queues, dequeuers: remove_dequeuers(state.dequeuers, [queue])}
2✔
411
  end
412

413
  defp remove_all_queues(state) do
414
    %{state | queues: [], dequeuers: remove_dequeuers(state.dequeuers, state.queues)}
122✔
415
  end
416

417
  # Check Redis connection using PING and raise exception with
418
  # user friendly error message if Redis is down.
419
  defp check_redis_connection(opts) do
420
    try do
121✔
421
      {:ok, _} = Exq.Redis.Connection.q(opts[:redis], ~w(PING))
121✔
422

423
      if Keyword.get(opts, :heartbeat_enable, false) do
121✔
424
        :ok =
121✔
425
          Exq.Redis.Heartbeat.register(
426
            opts[:redis],
427
            opts[:namespace],
428
            Config.node_identifier().node_id()
121✔
429
          )
430
      else
431
        :ok
432
      end
433
    catch
434
      err, reason ->
435
        opts = Exq.Support.Opts.redis_inspect_opts(opts)
×
436

437
        raise """
×
438
        \n\n\n#{String.duplicate("=", 100)}
×
439
        ERROR! Could not connect to Redis!
440

441
        Configuration passed in: #{opts}
×
442
        Error: #{inspect(err)}
443
        Reason: #{inspect(reason)}
444

445
        Make sure Redis is running, and your configuration matches Redis settings.
446
        #{String.duplicate("=", 100)}
×
447
        """
448
    end
449
  end
450
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc