• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

07 Aug 2025 11:50AM UTC coverage: 90.805% (-1.6%) from 92.382%
16bf7540e38e434e6ed4fadc1f40894960dd12db-PR-500

Pull #500

github

ananthakumaran
Run coveralls on one build only
Pull Request #500: Add ability to snooze job

15 of 15 new or added lines in 2 files covered. (100.0%)

18 existing lines in 13 files now uncovered.

1195 of 1316 relevant lines covered (90.81%)

706.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.6
/lib/exq/redis/job_queue.ex
1
defmodule Exq.Redis.JobQueue do
2
  @moduledoc """
3
  The JobQueue module is the main abstraction of a job queue on top of Redis.
4

5
  It provides functionality for:
6
    * Storing jobs in Redis
7
    * Fetching the next job(s) to be executed (and storing a backup of these).
8
    * Scheduling future jobs in Redis
9
    * Fetching scheduling jobs and moving them to current job list
10
    * Retrying or failing a job
11
    * Re-hydrating jobs from a backup queue
12

13
  """
14

15
  require Logger
16

17
  alias Exq.Redis.Connection
18
  alias Exq.Redis.Script
19
  alias Exq.Support.Job
20
  alias Exq.Support.Config
21
  alias Exq.Support.Time
22

23
  @default_size 100
24

25
  def enqueue(redis, namespace, queue, worker, args, options) do
26
    {jid, job, job_serialized} = to_job_serialized(queue, worker, args, options)
1,382✔
27

28
    case do_enqueue(redis, namespace, queue, job, job_serialized, unique_check: true) do
1,382✔
29
      :ok -> {:ok, jid}
1,371✔
30
      other -> other
11✔
31
    end
32
  end
33

34
  defp do_enqueue(redis, namespace, queue, job, job_serialized, options \\ []) do
35
    try do
1,383✔
36
      [unlocks_in, unique_key] = unique_args(namespace, job, options)
1,383✔
37

38
      keys = keys_list([full_key(namespace, "queues"), queue_key(namespace, queue)], unique_key)
1,383✔
39

40
      response =
1,383✔
41
        Script.eval!(
42
          redis,
43
          :enqueue,
44
          keys,
45
          [queue, job_serialized, job.jid, unlocks_in]
1,383✔
46
        )
47

48
      case response do
1,383✔
49
        {:ok, 0} -> :ok
1,372✔
50
        {:ok, [1, old_jid]} -> {:conflict, old_jid}
11✔
51
        error -> error
×
52
      end
53
    catch
54
      :exit, e ->
55
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
56
        {:error, :timeout}
57
    end
58
  end
59

60
  def unlock_jobs(redis, namespace, raw_jobs) do
61
    for job <- raw_jobs,
4✔
62
        unique_token = Job.decode(job).unique_token do
8✔
63
      unlock(redis, namespace, unique_token)
64
    end
65
  end
66

67
  def unlock(redis, namespace, unique_token) do
68
    Connection.del!(redis, unique_key(namespace, unique_token), retry_on_connection_error: 3)
17✔
69
  end
70

71
  def enqueue_in(redis, namespace, queue, offset, worker, args, options)
72
      when is_integer(offset) do
73
    time = Time.offset_from_now(offset)
20✔
74
    enqueue_at(redis, namespace, queue, time, worker, args, options)
20✔
75
  end
76

77
  def enqueue_at(redis, namespace, queue, time, worker, args, options) do
78
    {jid, job, job_serialized} =
42✔
79
      to_job_serialized(queue, worker, args, options, Time.unix_seconds(time))
80

81
    do_enqueue_job_at(
42✔
82
      redis,
83
      namespace,
84
      job,
85
      job_serialized,
86
      jid,
87
      time,
88
      scheduled_queue_key(namespace),
89
      unique_check: true
90
    )
91
  end
92

93
  def do_enqueue_job_at(
94
        redis,
95
        namespace,
96
        job,
97
        job_serialized,
98
        jid,
99
        time,
100
        scheduled_queue,
101
        options \\ []
202✔
102
      ) do
103
    score = Time.time_to_score(time)
244✔
104

105
    try do
244✔
106
      [unlocks_in, unique_key] = unique_args(namespace, job, options)
244✔
107

108
      keys = keys_list([scheduled_queue], unique_key)
244✔
109

110
      response =
244✔
111
        Script.eval!(redis, :enqueue_at, keys, [
112
          job_serialized,
113
          score,
114
          jid,
115
          unlocks_in
116
        ])
117

118
      case response do
244✔
119
        {:ok, 0} -> {:ok, jid}
243✔
120
        {:ok, [1, old_jid]} -> {:conflict, old_jid}
1✔
121
        error -> error
×
122
      end
123
    catch
124
      :exit, e ->
125
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
126
        {:error, :timeout}
127
    end
128
  end
129

130
  def enqueue_all(redis, namespace, jobs) do
131
    {keys, args} = extract_enqueue_all_keys_and_args(namespace, jobs)
3✔
132

133
    try do
3✔
134
      response =
3✔
135
        Script.eval!(
136
          redis,
137
          :enqueue_all,
138
          [scheduled_queue_key(namespace), full_key(namespace, "queues")] ++ keys,
139
          args
140
        )
141

142
      case response do
3✔
143
        {:ok, result} ->
3✔
144
          {
145
            :ok,
146
            Enum.map(result, fn [status, jid] ->
147
              case status do
6✔
148
                0 -> {:ok, jid}
5✔
149
                1 -> {:conflict, jid}
1✔
150
              end
151
            end)
152
          }
153

154
        error ->
155
          error
×
156
      end
157
    catch
158
      :exit, e ->
159
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
160
        {:error, :timeout}
161
    end
162
  end
163

164
  @doc """
165
  Dequeue jobs for available queues
166
  """
167
  def dequeue(redis, namespace, node_id, queues) when is_list(queues) do
168
    dequeue_multiple(redis, namespace, node_id, queues)
3,052✔
169
  end
170

UNCOV
171
  defp dequeue_multiple(_redis, _namespace, _node_id, []) do
×
172
    {:ok, {:none, nil}}
173
  end
174

175
  defp dequeue_multiple(redis, namespace, node_id, queues) do
176
    deq_commands =
3,052✔
177
      Enum.map(queues, fn queue ->
3,107✔
178
        ["RPOPLPUSH", queue_key(namespace, queue), backup_queue_key(namespace, node_id, queue)]
179
      end)
180

181
    resp = Connection.qp(redis, deq_commands)
3,052✔
182

183
    case resp do
3,052✔
UNCOV
184
      {:error, reason} ->
×
185
        [{:error, reason}]
186

187
      {:ok, success} ->
188
        success
189
        |> Enum.zip(queues)
190
        |> Enum.map(fn {resp, queue} ->
3,052✔
191
          case resp do
3,107✔
192
            :undefined -> {:ok, {:none, queue}}
×
193
            nil -> {:ok, {:none, queue}}
1,712✔
194
            %Redix.Error{} = error -> {:error, {error, queue}}
×
195
            value -> {:ok, {value, queue}}
1,395✔
196
          end
197
        end)
198
    end
199
  end
200

201
  def re_enqueue_backup(redis, namespace, node_id, queue) do
202
    resp =
136✔
203
      Script.eval!(
204
        redis,
205
        :mlpop_rpush,
206
        [backup_queue_key(namespace, node_id, queue), queue_key(namespace, queue)],
207
        [10]
208
      )
209

210
    case resp do
136✔
211
      {:ok, [remaining, moved]} ->
212
        if moved > 0 do
136✔
213
          Logger.info(
5✔
214
            "Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
5✔
215
          )
216
        end
217

218
        if remaining > 0 do
136✔
219
          re_enqueue_backup(redis, namespace, node_id, queue)
1✔
220
        end
221

UNCOV
222
      _ ->
×
223
        nil
224
    end
225
  end
226

227
  def remove_job_from_backup(redis, namespace, node_id, queue, job_serialized) do
228
    Connection.lrem!(redis, backup_queue_key(namespace, node_id, queue), job_serialized, 1,
1,330✔
229
      retry_on_connection_error: 3
230
    )
231
  end
232

233
  def scheduler_dequeue(redis, namespace) do
234
    scheduler_dequeue(redis, namespace, Time.time_to_score())
528✔
235
  end
236

237
  def scheduler_dequeue(redis, namespace, max_score) do
238
    schedule_queues(namespace)
239
    |> Enum.map(
240
      &do_scheduler_dequeue(redis, namespace, &1, max_score, Config.get(:scheduler_page_size), 0)
1,067✔
241
    )
242
    |> Enum.sum()
534✔
243
  end
244

245
  defp do_scheduler_dequeue(redis, namespace, queue, max_score, limit, acc) do
246
    case Script.eval!(redis, :scheduler_dequeue, [queue], [
1,068✔
247
           limit,
248
           max_score,
249
           full_key(namespace, "")
250
         ]) do
251
      {:ok, count} ->
252
        if count == limit do
1,067✔
253
          do_scheduler_dequeue(redis, namespace, queue, max_score, limit, count + acc)
1✔
254
        else
255
          count + acc
1,066✔
256
        end
257

258
      {:error, reason} ->
259
        Logger.warning(
×
260
          "Error dequeueing jobs from scheduler queue #{queue} - #{Kernel.inspect(reason)}"
×
261
        )
262

263
        0
264
    end
265
  end
266

267
  defp keys_list([_hd | _tl] = keys, nil), do: keys
1,587✔
268
  defp keys_list([_hd | _tl] = keys, key), do: keys ++ [key]
40✔
269

270
  def full_key("", key), do: key
1✔
271
  def full_key(nil, key), do: key
1✔
272

273
  def full_key(namespace, key) do
274
    "#{namespace}:#{key}"
21,391✔
275
  end
276

277
  def queue_key(namespace, queue) do
278
    full_key(namespace, "queue:#{queue}")
4,688✔
279
  end
280

281
  def unique_key(namespace, unique_token) do
282
    full_key(namespace, "unique:#{unique_token}")
60✔
283
  end
284

285
  def backup_queue_key(namespace, node_id, queue) do
286
    full_key(namespace, "queue:backup::#{node_id}::#{queue}")
4,613✔
287
  end
288

289
  def schedule_queues(namespace) do
534✔
290
    [scheduled_queue_key(namespace), retry_queue_key(namespace)]
291
  end
292

293
  def scheduled_queue_key(namespace) do
294
    full_key(namespace, "schedule")
598✔
295
  end
296

297
  def retry_queue_key(namespace) do
298
    full_key(namespace, "retry")
752✔
299
  end
300

301
  def failed_queue_key(namespace) do
302
    full_key(namespace, "dead")
89✔
303
  end
304

305
  def dead?(job) do
306
    retry_count = (job.retry_count || 0) + 1
240✔
307

308
    case job do
240✔
309
      %{retry: retry} when is_integer(retry) and retry > 0 ->
310
        retry_count > retry
187✔
311

312
      %{retry: true} ->
313
        retry_count > get_max_retries()
1✔
314

315
      _ ->
52✔
316
        true
317
    end
318
  end
319

320
  def snooze_job(redis, namespace, job, offset) do
321
    job =
5✔
322
      %{job | error_message: "Snoozed for #{offset} seconds"}
5✔
323
      |> add_failure_timestamp()
324

325
    time = Time.offset_from_now(offset)
5✔
326
    Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds")
5✔
327

328
    {:ok, _jid} =
5✔
329
      do_enqueue_job_at(
330
        redis,
331
        namespace,
332
        job,
333
        Job.encode(job),
334
        job.jid,
5✔
335
        time,
336
        retry_queue_key(namespace)
337
      )
338
  end
339

340
  def retry_or_fail_job(redis, namespace, job, error) do
341
    if dead?(job) do
237✔
342
      Logger.info("Max retries on job #{job.jid} exceeded")
53✔
343
      fail_job(redis, namespace, job, error)
53✔
344
    else
345
      retry_count = (job.retry_count || 0) + 1
184✔
346
      retry_job(redis, namespace, job, retry_count, error)
184✔
347
    end
348
  end
349

350
  def retry_job(redis, namespace, job, retry_count, error) do
351
    job =
196✔
352
      %{job | retry_count: retry_count, error_message: error}
353
      |> add_failure_timestamp()
354

355
    offset = Config.backoff().offset(job)
196✔
356
    time = Time.offset_from_now(offset)
196✔
357
    Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds")
196✔
358

359
    {:ok, _jid} =
196✔
360
      do_enqueue_job_at(
361
        redis,
362
        namespace,
363
        job,
364
        Job.encode(job),
365
        job.jid,
196✔
366
        time,
367
        retry_queue_key(namespace)
368
      )
369
  end
370

371
  def retry_job(redis, namespace, job) do
372
    remove_retry(redis, namespace, job.jid)
1✔
373
    :ok = do_enqueue(redis, namespace, job.queue, job, Job.encode(job))
1✔
374
  end
375

376
  def fail_job(redis, namespace, job, error) do
377
    job =
68✔
378
      %{
379
        job
380
        | retry_count: job.retry_count || 0,
68✔
381
          error_class: "ExqGenericError",
382
          error_message: error
383
      }
384
      |> add_failure_timestamp()
385

386
    job_serialized = Job.encode(job)
68✔
387
    key = failed_queue_key(namespace)
68✔
388

389
    now = Time.unix_seconds()
68✔
390

391
    commands = [
68✔
392
      ["ZADD", key, Time.time_to_score(), job_serialized],
393
      ["ZREMRANGEBYSCORE", key, "-inf", now - Config.get(:dead_timeout_in_seconds)],
394
      ["ZREMRANGEBYRANK", key, 0, -Config.get(:dead_max_jobs) - 1]
395
    ]
396

397
    Connection.qp!(redis, commands, retry_on_connection_error: 3)
68✔
398
  end
399

400
  def queue_size(redis, namespace) do
401
    queues = list_queues(redis, namespace)
2✔
402
    for q <- queues, do: {q, queue_size(redis, namespace, q)}
2✔
403
  end
404

405
  def queue_size(redis, namespace, :scheduled) do
406
    Connection.zcard!(redis, scheduled_queue_key(namespace))
2✔
407
  end
408

409
  def queue_size(redis, namespace, :retry) do
410
    Connection.zcard!(redis, retry_queue_key(namespace))
2✔
411
  end
412

413
  def queue_size(redis, namespace, queue) do
414
    Connection.llen!(redis, queue_key(namespace, queue))
7✔
415
  end
416

417
  def delete_queue(redis, namespace, queue) do
418
    Connection.del!(redis, full_key(namespace, queue))
2✔
419
  end
420

421
  def jobs(redis, namespace) do
422
    queues = list_queues(redis, namespace)
2✔
423
    for q <- queues, do: {q, jobs(redis, namespace, q)}
2✔
424
  end
425

426
  def jobs(redis, namespace, queue, options \\ []) do
427
    range_start = Keyword.get(options, :offset, 0)
17✔
428
    range_end = range_start + Keyword.get(options, :size, @default_size) - 1
17✔
429

430
    Connection.lrange!(redis, queue_key(namespace, queue), range_start, range_end)
431
    |> maybe_decode(options)
17✔
432
  end
433

434
  def scheduled_jobs(redis, namespace, queue, options \\ []) do
435
    if Keyword.get(options, :score, false) do
19✔
436
      scheduled_jobs_with_scores(redis, namespace, queue, options)
2✔
437
    else
438
      Connection.zrangebyscorewithlimit!(
439
        redis,
440
        full_key(namespace, queue),
441
        Keyword.get(options, :offset, 0),
442
        Keyword.get(options, :size, @default_size)
443
      )
444
      |> maybe_decode(options)
17✔
445
    end
446
  end
447

448
  def scheduled_jobs_with_scores(redis, namespace, queue, options \\ []) do
449
    Connection.zrangebyscorewithscoreandlimit!(
450
      redis,
451
      full_key(namespace, queue),
452
      Keyword.get(options, :offset, 0),
453
      Keyword.get(options, :size, @default_size)
454
    )
455
    |> decode_zset_withscores(options)
3✔
456
  end
457

458
  def failed(redis, namespace, options \\ []) do
459
    if Keyword.get(options, :score, false) do
9✔
460
      Connection.zrevrangebyscorewithscoreandlimit!(
461
        redis,
462
        failed_queue_key(namespace),
463
        Keyword.get(options, :offset, 0),
464
        Keyword.get(options, :size, @default_size)
465
      )
466
      |> decode_zset_withscores(options)
1✔
467
    else
468
      Connection.zrevrangebyscorewithlimit!(
469
        redis,
470
        failed_queue_key(namespace),
471
        Keyword.get(options, :offset, 0),
472
        Keyword.get(options, :size, @default_size)
473
      )
474
      |> maybe_decode(options)
8✔
475
    end
476
  end
477

478
  def retry_size(redis, namespace) do
479
    Connection.zcard!(redis, retry_queue_key(namespace))
3✔
480
  end
481

482
  def scheduled_size(redis, namespace) do
483
    Connection.zcard!(redis, scheduled_queue_key(namespace))
2✔
484
  end
485

486
  def failed_size(redis, namespace) do
487
    Connection.zcard!(redis, failed_queue_key(namespace))
7✔
488
  end
489

490
  def remove_enqueued_jobs(redis, namespace, queue, raw_jobs) do
491
    Connection.lrem!(redis, queue_key(namespace, queue), raw_jobs)
3✔
492
  end
493

494
  def remove_job(redis, namespace, queue, jid) do
495
    {:ok, job} = find_job(redis, namespace, jid, queue, false)
1✔
496
    Connection.lrem!(redis, queue_key(namespace, queue), job)
1✔
497
  end
498

499
  def remove_retry(redis, namespace, jid) do
500
    {:ok, job} = find_job(redis, namespace, jid, :retry, false)
2✔
501
    Connection.zrem!(redis, retry_queue_key(namespace), job)
2✔
502
  end
503

504
  def remove_retry_jobs(redis, namespace, raw_jobs) do
505
    Connection.zrem!(redis, retry_queue_key(namespace), raw_jobs)
3✔
506
  end
507

508
  def dequeue_retry_jobs(redis, namespace, raw_jobs) do
509
    dequeue_scheduled_jobs(redis, namespace, retry_queue_key(namespace), raw_jobs)
2✔
510
  end
511

512
  def remove_scheduled(redis, namespace, jid) do
513
    {:ok, job} = find_job(redis, namespace, jid, :scheduled, false)
1✔
514
    Connection.zrem!(redis, scheduled_queue_key(namespace), job)
1✔
515
  end
516

517
  def remove_scheduled_jobs(redis, namespace, raw_jobs) do
518
    Connection.zrem!(redis, scheduled_queue_key(namespace), raw_jobs)
3✔
519
  end
520

521
  def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
522
    dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
2✔
523
  end
524

525
  def remove_failed_jobs(redis, namespace, raw_jobs) do
526
    Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs)
3✔
527
  end
528

529
  def dequeue_failed_jobs(redis, namespace, raw_jobs) do
530
    dequeue_scheduled_jobs(redis, namespace, failed_queue_key(namespace), raw_jobs)
2✔
531
  end
532

533
  def list_queues(redis, namespace) do
534
    Connection.smembers!(redis, full_key(namespace, "queues"))
19✔
535
  end
536

537
  @doc """
538
  Find a current job by job id (but do not pop it)
539
  """
540
  def find_job(redis, namespace, jid, queue) do
541
    find_job(redis, namespace, jid, queue, true)
20✔
542
  end
543

544
  def find_job(redis, namespace, jid, :scheduled, convert) do
545
    redis
546
    |> Connection.zrangebyscore!(scheduled_queue_key(namespace))
547
    |> search_jobs(jid, convert)
9✔
548
  end
549

550
  def find_job(redis, namespace, jid, :retry, convert) do
551
    redis
552
    |> Connection.zrangebyscore!(retry_queue_key(namespace))
553
    |> search_jobs(jid, convert)
5✔
554
  end
555

556
  def find_job(redis, namespace, jid, queue, convert) do
557
    redis
558
    |> Connection.lrange!(queue_key(namespace, queue))
559
    |> search_jobs(jid, convert)
10✔
560
  end
561

562
  def search_jobs(jobs_serialized, jid) do
563
    search_jobs(jobs_serialized, jid, true)
8✔
564
  end
565

566
  def search_jobs(jobs_serialized, jid, true) do
567
    found_job =
31✔
568
      jobs_serialized
569
      |> Enum.map(&Job.decode/1)
570
      |> Enum.find(fn job -> job.jid == jid end)
15✔
571

572
    {:ok, found_job}
573
  end
574

575
  def search_jobs(jobs_serialized, jid, false) do
576
    found_job =
4✔
577
      jobs_serialized
578
      |> Enum.find(fn job_serialized ->
579
        job = Job.decode(job_serialized)
4✔
580
        job.jid == jid
4✔
581
      end)
582

583
    {:ok, found_job}
584
  end
585

586
  def to_job_serialized(queue, worker, args, options) do
587
    to_job_serialized(queue, worker, args, options, Time.unix_seconds())
2,386✔
588
  end
589

590
  def to_job_serialized(queue, worker, args, options, enqueued_at) when is_atom(worker) do
591
    to_job_serialized(queue, to_string(worker), args, options, enqueued_at)
2,387✔
592
  end
593

594
  def to_job_serialized(queue, "Elixir." <> worker, args, options, enqueued_at) do
595
    to_job_serialized(queue, worker, args, options, enqueued_at)
2,387✔
596
  end
597

598
  def to_job_serialized(queue, worker, args, options, enqueued_at) do
599
    jid = Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end)
2,434✔
600
    retry = Keyword.get_lazy(options, :max_retries, fn -> get_max_retries() end)
2,434✔
601

602
    job =
2,434✔
603
      %{
604
        queue: queue,
605
        retry: retry,
606
        class: worker,
607
        args: args,
608
        jid: jid,
609
        enqueued_at: enqueued_at
610
      }
611
      |> add_unique_attributes(options)
612

613
    {jid, job, Config.serializer().encode!(job)}
2,434✔
614
  end
615

616
  defp dequeue_scheduled_jobs(redis, namespace, queue_key, raw_jobs) do
617
    Script.eval!(redis, :scheduler_dequeue_jobs, [queue_key, full_key(namespace, "")], raw_jobs)
6✔
618
  end
619

620
  defp get_max_retries do
621
    :max_retries
622
    |> Config.get()
623
    |> Exq.Support.Coercion.to_integer()
1,385✔
624
  end
625

626
  defp add_failure_timestamp(job) do
627
    timestamp = Time.unix_seconds()
269✔
628

629
    job =
269✔
630
      if !job.failed_at do
269✔
631
        %{job | failed_at: timestamp}
261✔
632
      else
633
        job
8✔
634
      end
635

636
    %{job | retried_at: timestamp}
269✔
637
  end
638

639
  defp decode_zset_withscores(list, options) do
640
    raw? = Keyword.get(options, :raw, false)
4✔
641

642
    Enum.chunk_every(list, 2)
643
    |> Enum.map(fn [job, score] ->
4✔
644
      if raw? do
4✔
645
        {job, score}
646
      else
647
        {Job.decode(job), score}
648
      end
649
    end)
650
  end
651

652
  defp maybe_decode(list, options) do
653
    if Keyword.get(options, :raw, false) do
42✔
654
      list
27✔
655
    else
656
      Enum.map(list, &Job.decode/1)
15✔
657
    end
658
  end
659

660
  defp add_unique_attributes(job, options) do
661
    unique_for = Keyword.get(options, :unique_for, nil)
2,434✔
662

663
    if unique_for do
2,434✔
664
      unique_token =
43✔
665
        Keyword.get_lazy(options, :unique_token, fn ->
666
          string =
21✔
667
            Enum.join([job.queue, job.class] ++ Enum.map(job.args, &:erlang.phash2(&1)), ":")
21✔
668

669
          :crypto.hash(:sha256, string) |> Base.encode64()
21✔
670
        end)
671

672
      Map.put(job, :unique_for, unique_for)
673
      |> Map.put(:unique_until, to_string(Keyword.get(options, :unique_until, :success)))
43✔
674
      |> Map.put(:unique_token, unique_token)
675
      |> Map.put(:unlocks_at, job.enqueued_at + unique_for)
43✔
676
    else
677
      job
2,391✔
678
    end
679
  end
680

681
  defp unique_args(namespace, job, options) do
682
    if Keyword.get(options, :unique_check, false) do
1,633✔
683
      case job do
1,430✔
684
        %{unlocks_at: unlocks_at, unique_token: unique_token} ->
685
          unlocks_in = Enum.max([trunc((unlocks_at - Time.unix_seconds()) * 1000), 0])
43✔
686
          [unlocks_in, unique_key(namespace, unique_token)]
687

688
        _ ->
1,387✔
689
          [nil, nil]
690
      end
691
    else
692
      [nil, nil]
693
    end
694
  end
695

696
  # Returns
697
  # {
698
  #   [
699
  #     job_1_unique_key, job_1_queue_key,
700
  #     job_2_unique_key, job_2_queue_key,
701
  #     ...
702
  #   ],
703
  #   [
704
  #     job_1_jid, job_1_queue, job_1_score, job_1_job_serialized, job_1_unlocks_in,
705
  #     job_2_jid, job_2_queue, job_2_score, job_2_job_serialized, job_2_unlocks_in,
706
  #     ...
707
  #   ]
708
  # }
709
  defp extract_enqueue_all_keys_and_args(namespace, jobs) do
710
    {keys, job_attrs} =
3✔
711
      Enum.reduce(jobs, {[], []}, fn job, {keys_acc, job_attrs_acc} ->
712
        [queue, worker, args, options] = job
6✔
713

714
        {score, enqueued_at} =
6✔
715
          case options[:schedule] do
716
            {:at, at_time} ->
1✔
717
              {Time.time_to_score(at_time), Time.unix_seconds(at_time)}
718

719
            {:in, offset} ->
720
              at_time = Time.offset_from_now(offset)
4✔
721
              {Time.time_to_score(at_time), Time.unix_seconds(at_time)}
722

723
            _ ->
1✔
724
              {"0", Time.unix_seconds()}
725
          end
726

727
        {jid, job_data, job_serialized} =
6✔
728
          to_job_serialized(queue, worker, args, options, enqueued_at)
729

730
        [unlocks_in, unique_key] = unique_args(namespace, job_data, unique_check: true)
6✔
731

732
        # accumulating in reverse order for efficiency
733
        {
734
          [queue_key(namespace, queue), unique_key] ++ keys_acc,
735
          [unlocks_in, job_serialized, score, queue, jid] ++ job_attrs_acc
736
        }
737
      end)
738

739
    {Enum.reverse(keys), Enum.reverse(job_attrs)}
740
  end
741
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc