• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akira / exq / 451629f84b9baf30739069c96facb5ab3f6407b8-PR-483

pending completion
451629f84b9baf30739069c96facb5ab3f6407b8-PR-483

Pull #483

github

ananthakumaran
Pull Request #483: Add enqueue_all for enqueuing a batch of jobs atomically

48 of 48 new or added lines in 4 files covered. (100.0%)

1169 of 1291 relevant lines covered (90.55%)

704.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
/lib/exq/redis/job_queue.ex
1
defmodule Exq.Redis.JobQueue do
2
  @moduledoc """
3
  The JobQueue module is the main abstraction of a job queue on top of Redis.
4

5
  It provides functionality for:
6
    * Storing jobs in Redis
7
    * Fetching the next job(s) to be executed (and storing a backup of these).
8
    * Scheduling future jobs in Redis
9
    * Fetching scheduling jobs and moving them to current job list
10
    * Retrying or failing a job
11
    * Re-hydrating jobs from a backup queue
12

13
  """
14

15
  require Logger
16

17
  alias Exq.Redis.Connection
18
  alias Exq.Redis.Script
19
  alias Exq.Support.Job
20
  alias Exq.Support.Config
21
  alias Exq.Support.Time
22

23
  @default_size 100
24

25
  def enqueue(redis, namespace, queue, worker, args, options) do
26
    {jid, job, job_serialized} = to_job_serialized(queue, worker, args, options)
1,361✔
27

28
    case do_enqueue(redis, namespace, queue, job, job_serialized, unique_check: true) do
1,361✔
29
      :ok -> {:ok, jid}
1,354✔
30
      other -> other
7✔
31
    end
32
  end
33

34
  defp do_enqueue(redis, namespace, queue, job, job_serialized, options \\ []) do
35
    try do
1,362✔
36
      [unlocks_in, unique_key] = unique_args(namespace, job, options)
1,362✔
37

38
      keys = keys_list([full_key(namespace, "queues"), queue_key(namespace, queue)], unique_key)
1,362✔
39

40
      response =
1,362✔
41
        Script.eval!(
42
          redis,
43
          :enqueue,
44
          keys,
45
          [queue, job_serialized, job.jid, unlocks_in]
1,362✔
46
        )
47

48
      case response do
1,362✔
49
        {:ok, 0} -> :ok
1,355✔
50
        {:ok, [1, old_jid]} -> {:conflict, old_jid}
7✔
51
        error -> error
×
52
      end
53
    catch
54
      :exit, e ->
55
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
56
        {:error, :timeout}
57
    end
58
  end
59

60
  def unlock(redis, namespace, unique_token) do
61
    Connection.del!(redis, unique_key(namespace, unique_token), retry_on_connection_error: 3)
13✔
62
  end
63

64
  def enqueue_in(redis, namespace, queue, offset, worker, args, options)
65
      when is_integer(offset) do
66
    time = Time.offset_from_now(offset)
17✔
67
    enqueue_at(redis, namespace, queue, time, worker, args, options)
17✔
68
  end
69

70
  def enqueue_at(redis, namespace, queue, time, worker, args, options) do
71
    {jid, job, job_serialized} =
39✔
72
      to_job_serialized(queue, worker, args, options, Time.unix_seconds(time))
73

74
    do_enqueue_job_at(
39✔
75
      redis,
76
      namespace,
77
      job,
78
      job_serialized,
79
      jid,
80
      time,
81
      scheduled_queue_key(namespace),
82
      unique_check: true
83
    )
84
  end
85

86
  def do_enqueue_job_at(
87
        redis,
88
        namespace,
89
        job,
90
        job_serialized,
91
        jid,
92
        time,
93
        scheduled_queue,
94
        options \\ []
20✔
95
      ) do
96
    score = Time.time_to_score(time)
59✔
97

98
    try do
59✔
99
      [unlocks_in, unique_key] = unique_args(namespace, job, options)
59✔
100

101
      keys = keys_list([scheduled_queue], unique_key)
59✔
102

103
      response =
59✔
104
        Script.eval!(redis, :enqueue_at, keys, [
105
          job_serialized,
106
          score,
107
          jid,
108
          unlocks_in
109
        ])
110

111
      case response do
59✔
112
        {:ok, 0} -> {:ok, jid}
58✔
113
        {:ok, [1, old_jid]} -> {:conflict, old_jid}
1✔
114
        error -> error
×
115
      end
116
    catch
117
      :exit, e ->
118
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
119
        {:error, :timeout}
120
    end
121
  end
122

123
  def enqueue_all(redis, namespace, jobs) do
124
    {keys, args} = extract_enqueue_all_keys_and_args(namespace, jobs)
3✔
125

126
    try do
3✔
127
      response =
3✔
128
        Script.eval!(
129
          redis,
130
          :enqueue_all,
131
          [scheduled_queue_key(namespace), full_key(namespace, "queues")] ++ keys,
132
          args
133
        )
134

135
      case response do
3✔
136
        {:ok, result} ->
3✔
137
          {
138
            :ok,
139
            Enum.map(result, fn [status, jid] ->
140
              case status do
6✔
141
                0 -> {:ok, jid}
5✔
142
                1 -> {:conflict, jid}
1✔
143
              end
144
            end)
145
          }
146

147
        error ->
148
          error
×
149
      end
150
    catch
151
      :exit, e ->
152
        Logger.info("Error enqueueing -  #{Kernel.inspect(e)}")
×
153
        {:error, :timeout}
154
    end
155
  end
156

157
  @doc """
158
  Dequeue jobs for available queues
159
  """
160
  def dequeue(redis, namespace, node_id, queues) when is_list(queues) do
161
    dequeue_multiple(redis, namespace, node_id, queues)
3,018✔
162
  end
163

164
  defp dequeue_multiple(_redis, _namespace, _node_id, []) do
×
165
    {:ok, {:none, nil}}
166
  end
167

168
  defp dequeue_multiple(redis, namespace, node_id, queues) do
169
    deq_commands =
3,018✔
170
      Enum.map(queues, fn queue ->
3,074✔
171
        ["RPOPLPUSH", queue_key(namespace, queue), backup_queue_key(namespace, node_id, queue)]
172
      end)
173

174
    resp = Connection.qp(redis, deq_commands)
3,018✔
175

176
    case resp do
3,018✔
177
      {:error, reason} ->
×
178
        [{:error, reason}]
179

180
      {:ok, success} ->
181
        success
182
        |> Enum.zip(queues)
183
        |> Enum.map(fn {resp, queue} ->
3,018✔
184
          case resp do
3,074✔
185
            :undefined -> {:ok, {:none, queue}}
×
186
            nil -> {:ok, {:none, queue}}
1,683✔
187
            %Redix.Error{} = error -> {:error, {error, queue}}
×
188
            value -> {:ok, {value, queue}}
1,391✔
189
          end
190
        end)
191
    end
192
  end
193

194
  def re_enqueue_backup(redis, namespace, node_id, queue) do
195
    resp =
126✔
196
      Script.eval!(
197
        redis,
198
        :mlpop_rpush,
199
        [backup_queue_key(namespace, node_id, queue), queue_key(namespace, queue)],
200
        [10]
201
      )
202

203
    case resp do
126✔
204
      {:ok, [remaining, moved]} ->
205
        if moved > 0 do
126✔
206
          Logger.info(
5✔
207
            "Re-enqueued #{moved} job(s) from backup for node_id [#{node_id}] and queue [#{queue}]"
5✔
208
          )
209
        end
210

211
        if remaining > 0 do
126✔
212
          re_enqueue_backup(redis, namespace, node_id, queue)
1✔
213
        end
214

215
      _ ->
×
216
        nil
217
    end
218
  end
219

220
  def remove_job_from_backup(redis, namespace, node_id, queue, job_serialized) do
221
    Connection.lrem!(redis, backup_queue_key(namespace, node_id, queue), job_serialized, 1,
1,323✔
222
      retry_on_connection_error: 3
223
    )
224
  end
225

226
  def scheduler_dequeue(redis, namespace) do
227
    scheduler_dequeue(redis, namespace, Time.time_to_score())
515✔
228
  end
229

230
  def scheduler_dequeue(redis, namespace, max_score) do
231
    schedule_queues(namespace)
232
    |> Enum.map(
233
      &do_scheduler_dequeue(redis, namespace, &1, max_score, Config.get(:scheduler_page_size), 0)
1,042✔
234
    )
235
    |> Enum.sum()
521✔
236
  end
237

238
  defp do_scheduler_dequeue(redis, namespace, queue, max_score, limit, acc) do
239
    case Script.eval!(redis, :scheduler_dequeue, [queue], [
1,043✔
240
           limit,
241
           max_score,
242
           full_key(namespace, "")
243
         ]) do
244
      {:ok, count} ->
245
        if count == limit do
1,043✔
246
          do_scheduler_dequeue(redis, namespace, queue, max_score, limit, count + acc)
1✔
247
        else
248
          count + acc
1,042✔
249
        end
250

251
      {:error, reason} ->
252
        Logger.warn(
×
253
          "Error dequeueing jobs from scheduler queue #{queue} - #{Kernel.inspect(reason)}"
×
254
        )
255

256
        0
257
    end
258
  end
259

260
  defp keys_list([_hd | _tl] = keys, nil), do: keys
1,397✔
261
  defp keys_list([_hd | _tl] = keys, key), do: keys ++ [key]
24✔
262

263
  def full_key("", key), do: key
1✔
264
  def full_key(nil, key), do: key
1✔
265

266
  def full_key(namespace, key) do
267
    "#{namespace}:#{key}"
21,098✔
268
  end
269

270
  def queue_key(namespace, queue) do
271
    full_key(namespace, "queue:#{queue}")
4,612✔
272
  end
273

274
  def unique_key(namespace, unique_token) do
275
    full_key(namespace, "unique:#{unique_token}")
40✔
276
  end
277

278
  def backup_queue_key(namespace, node_id, queue) do
279
    full_key(namespace, "queue:backup::#{node_id}::#{queue}")
4,563✔
280
  end
281

282
  def schedule_queues(namespace) do
521✔
283
    [scheduled_queue_key(namespace), retry_queue_key(namespace)]
284
  end
285

286
  def scheduled_queue_key(namespace) do
287
    full_key(namespace, "schedule")
580✔
288
  end
289

290
  def retry_queue_key(namespace) do
291
    full_key(namespace, "retry")
554✔
292
  end
293

294
  def failed_queue_key(namespace) do
295
    full_key(namespace, "dead")
247✔
296
  end
297

298
  def dead?(job) do
299
    retry_count = (job.retry_count || 0) + 1
235✔
300

301
    case job do
235✔
302
      %{retry: retry} when is_integer(retry) and retry > 0 ->
303
        retry_count > retry
13✔
304

305
      %{retry: true} ->
306
        retry_count > get_max_retries()
1✔
307

308
      _ ->
221✔
309
        true
310
    end
311
  end
312

313
  def retry_or_fail_job(redis, namespace, job, error) do
314
    if dead?(job) do
232✔
315
      Logger.info("Max retries on job #{job.jid} exceeded")
222✔
316
      fail_job(redis, namespace, job, error)
222✔
317
    else
318
      retry_count = (job.retry_count || 0) + 1
10✔
319
      retry_job(redis, namespace, job, retry_count, error)
10✔
320
    end
321
  end
322

323
  def retry_job(redis, namespace, job, retry_count, error) do
324
    job =
19✔
325
      %{job | retry_count: retry_count, error_message: error}
326
      |> add_failure_timestamp()
327

328
    offset = Config.backoff().offset(job)
19✔
329
    time = Time.offset_from_now(offset)
19✔
330
    Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds")
19✔
331

332
    {:ok, _jid} =
19✔
333
      do_enqueue_job_at(
334
        redis,
335
        namespace,
336
        job,
337
        Job.encode(job),
338
        job.jid,
19✔
339
        time,
340
        retry_queue_key(namespace)
341
      )
342
  end
343

344
  def retry_job(redis, namespace, job) do
345
    remove_retry(redis, namespace, job.jid)
1✔
346
    :ok = do_enqueue(redis, namespace, job.queue, job, Job.encode(job))
1✔
347
  end
348

349
  def fail_job(redis, namespace, job, error) do
350
    job =
231✔
351
      %{
352
        job
353
        | retry_count: job.retry_count || 0,
231✔
354
          error_class: "ExqGenericError",
355
          error_message: error
356
      }
357
      |> add_failure_timestamp()
358

359
    job_serialized = Job.encode(job)
231✔
360
    key = failed_queue_key(namespace)
231✔
361

362
    now = Time.unix_seconds()
231✔
363

364
    commands = [
231✔
365
      ["ZADD", key, Time.time_to_score(), job_serialized],
366
      ["ZREMRANGEBYSCORE", key, "-inf", now - Config.get(:dead_timeout_in_seconds)],
367
      ["ZREMRANGEBYRANK", key, 0, -Config.get(:dead_max_jobs) - 1]
368
    ]
369

370
    Connection.qp!(redis, commands, retry_on_connection_error: 3)
231✔
371
  end
372

373
  def queue_size(redis, namespace) do
374
    queues = list_queues(redis, namespace)
2✔
375
    for q <- queues, do: {q, queue_size(redis, namespace, q)}
2✔
376
  end
377

378
  def queue_size(redis, namespace, :scheduled) do
379
    Connection.zcard!(redis, scheduled_queue_key(namespace))
2✔
380
  end
381

382
  def queue_size(redis, namespace, :retry) do
383
    Connection.zcard!(redis, retry_queue_key(namespace))
1✔
384
  end
385

386
  def queue_size(redis, namespace, queue) do
387
    Connection.llen!(redis, queue_key(namespace, queue))
7✔
388
  end
389

390
  def delete_queue(redis, namespace, queue) do
391
    Connection.del!(redis, full_key(namespace, queue))
2✔
392
  end
393

394
  def jobs(redis, namespace) do
395
    queues = list_queues(redis, namespace)
2✔
396
    for q <- queues, do: {q, jobs(redis, namespace, q)}
2✔
397
  end
398

399
  def jobs(redis, namespace, queue, options \\ []) do
400
    range_start = Keyword.get(options, :offset, 0)
10✔
401
    range_end = range_start + Keyword.get(options, :size, @default_size) - 1
10✔
402

403
    Connection.lrange!(redis, queue_key(namespace, queue), range_start, range_end)
404
    |> maybe_decode(options)
10✔
405
  end
406

407
  def scheduled_jobs(redis, namespace, queue, options \\ []) do
408
    if Keyword.get(options, :score, false) do
12✔
409
      scheduled_jobs_with_scores(redis, namespace, queue, options)
2✔
410
    else
411
      Connection.zrangebyscorewithlimit!(
412
        redis,
413
        full_key(namespace, queue),
414
        Keyword.get(options, :offset, 0),
415
        Keyword.get(options, :size, @default_size)
416
      )
417
      |> maybe_decode(options)
10✔
418
    end
419
  end
420

421
  def scheduled_jobs_with_scores(redis, namespace, queue, options \\ []) do
422
    Connection.zrangebyscorewithscoreandlimit!(
423
      redis,
424
      full_key(namespace, queue),
425
      Keyword.get(options, :offset, 0),
426
      Keyword.get(options, :size, @default_size)
427
    )
428
    |> decode_zset_withscores(options)
3✔
429
  end
430

431
  def failed(redis, namespace, options \\ []) do
432
    if Keyword.get(options, :score, false) do
6✔
433
      Connection.zrevrangebyscorewithscoreandlimit!(
434
        redis,
435
        failed_queue_key(namespace),
436
        Keyword.get(options, :offset, 0),
437
        Keyword.get(options, :size, @default_size)
438
      )
439
      |> decode_zset_withscores(options)
1✔
440
    else
441
      Connection.zrevrangebyscorewithlimit!(
442
        redis,
443
        failed_queue_key(namespace),
444
        Keyword.get(options, :offset, 0),
445
        Keyword.get(options, :size, @default_size)
446
      )
447
      |> maybe_decode(options)
5✔
448
    end
449
  end
450

451
  def retry_size(redis, namespace) do
452
    Connection.zcard!(redis, retry_queue_key(namespace))
3✔
453
  end
454

455
  def scheduled_size(redis, namespace) do
456
    Connection.zcard!(redis, scheduled_queue_key(namespace))
2✔
457
  end
458

459
  def failed_size(redis, namespace) do
460
    Connection.zcard!(redis, failed_queue_key(namespace))
7✔
461
  end
462

463
  def remove_enqueued_jobs(redis, namespace, queue, raw_jobs) do
464
    Connection.lrem!(redis, queue_key(namespace, queue), raw_jobs)
1✔
465
  end
466

467
  def remove_job(redis, namespace, queue, jid) do
468
    {:ok, job} = find_job(redis, namespace, jid, queue, false)
1✔
469
    Connection.lrem!(redis, queue_key(namespace, queue), job)
1✔
470
  end
471

472
  def remove_retry(redis, namespace, jid) do
473
    {:ok, job} = find_job(redis, namespace, jid, :retry, false)
2✔
474
    Connection.zrem!(redis, retry_queue_key(namespace), job)
2✔
475
  end
476

477
  def remove_retry_jobs(redis, namespace, raw_jobs) do
478
    Connection.zrem!(redis, retry_queue_key(namespace), raw_jobs)
1✔
479
  end
480

481
  def dequeue_retry_jobs(redis, namespace, raw_jobs) do
482
    dequeue_scheduled_jobs(redis, namespace, retry_queue_key(namespace), raw_jobs)
2✔
483
  end
484

485
  def remove_scheduled(redis, namespace, jid) do
486
    {:ok, job} = find_job(redis, namespace, jid, :scheduled, false)
1✔
487
    Connection.zrem!(redis, scheduled_queue_key(namespace), job)
1✔
488
  end
489

490
  def remove_scheduled_jobs(redis, namespace, raw_jobs) do
491
    Connection.zrem!(redis, scheduled_queue_key(namespace), raw_jobs)
1✔
492
  end
493

494
  def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
495
    dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
2✔
496
  end
497

498
  def remove_failed_jobs(redis, namespace, raw_jobs) do
499
    Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs)
1✔
500
  end
501

502
  def dequeue_failed_jobs(redis, namespace, raw_jobs) do
503
    dequeue_scheduled_jobs(redis, namespace, failed_queue_key(namespace), raw_jobs)
2✔
504
  end
505

506
  def list_queues(redis, namespace) do
507
    Connection.smembers!(redis, full_key(namespace, "queues"))
19✔
508
  end
509

510
  @doc """
511
  Find a current job by job id (but do not pop it)
512
  """
513
  def find_job(redis, namespace, jid, queue) do
514
    find_job(redis, namespace, jid, queue, true)
17✔
515
  end
516

517
  def find_job(redis, namespace, jid, :scheduled, convert) do
518
    redis
519
    |> Connection.zrangebyscore!(scheduled_queue_key(namespace))
520
    |> search_jobs(jid, convert)
9✔
521
  end
522

523
  def find_job(redis, namespace, jid, :retry, convert) do
524
    redis
525
    |> Connection.zrangebyscore!(retry_queue_key(namespace))
526
    |> search_jobs(jid, convert)
5✔
527
  end
528

529
  def find_job(redis, namespace, jid, queue, convert) do
530
    redis
531
    |> Connection.lrange!(queue_key(namespace, queue))
532
    |> search_jobs(jid, convert)
7✔
533
  end
534

535
  def search_jobs(jobs_serialized, jid) do
536
    search_jobs(jobs_serialized, jid, true)
8✔
537
  end
538

539
  def search_jobs(jobs_serialized, jid, true) do
540
    found_job =
28✔
541
      jobs_serialized
542
      |> Enum.map(&Job.decode/1)
543
      |> Enum.find(fn job -> job.jid == jid end)
12✔
544

545
    {:ok, found_job}
546
  end
547

548
  def search_jobs(jobs_serialized, jid, false) do
549
    found_job =
4✔
550
      jobs_serialized
551
      |> Enum.find(fn job_serialized ->
552
        job = Job.decode(job_serialized)
4✔
553
        job.jid == jid
4✔
554
      end)
555

556
    {:ok, found_job}
557
  end
558

559
  def to_job_serialized(queue, worker, args, options) do
560
    to_job_serialized(queue, worker, args, options, Time.unix_seconds())
2,365✔
561
  end
562

563
  def to_job_serialized(queue, worker, args, options, enqueued_at) when is_atom(worker) do
564
    to_job_serialized(queue, to_string(worker), args, options, enqueued_at)
2,363✔
565
  end
566

567
  def to_job_serialized(queue, "Elixir." <> worker, args, options, enqueued_at) do
568
    to_job_serialized(queue, worker, args, options, enqueued_at)
2,363✔
569
  end
570

571
  def to_job_serialized(queue, worker, args, options, enqueued_at) do
572
    jid = Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end)
2,410✔
573
    retry = Keyword.get_lazy(options, :max_retries, fn -> get_max_retries() end)
2,410✔
574

575
    job =
2,410✔
576
      %{
577
        queue: queue,
578
        retry: retry,
579
        class: worker,
580
        args: args,
581
        jid: jid,
582
        enqueued_at: enqueued_at
583
      }
584
      |> add_unique_attributes(options)
585

586
    {jid, job, Config.serializer().encode!(job)}
2,410✔
587
  end
588

589
  defp dequeue_scheduled_jobs(redis, namespace, queue_key, raw_jobs) do
590
    Script.eval!(redis, :scheduler_dequeue_jobs, [queue_key, full_key(namespace, "")], raw_jobs)
6✔
591
  end
592

593
  defp get_max_retries do
594
    :max_retries
595
    |> Config.get()
596
    |> Exq.Support.Coercion.to_integer()
1,362✔
597
  end
598

599
  defp add_failure_timestamp(job) do
600
    timestamp = Time.unix_seconds()
250✔
601

602
    job =
250✔
603
      if !job.failed_at do
250✔
604
        %{job | failed_at: timestamp}
246✔
605
      else
606
        job
4✔
607
      end
608

609
    %{job | retried_at: timestamp}
250✔
610
  end
611

612
  defp decode_zset_withscores(list, options) do
613
    raw? = Keyword.get(options, :raw, false)
4✔
614

615
    Enum.chunk_every(list, 2)
616
    |> Enum.map(fn [job, score] ->
4✔
617
      if raw? do
4✔
618
        {job, score}
619
      else
620
        {Job.decode(job), score}
621
      end
622
    end)
623
  end
624

625
  defp maybe_decode(list, options) do
626
    if Keyword.get(options, :raw, false) do
25✔
627
      list
14✔
628
    else
629
      Enum.map(list, &Job.decode/1)
11✔
630
    end
631
  end
632

633
  defp add_unique_attributes(job, options) do
634
    unique_for = Keyword.get(options, :unique_for, nil)
2,410✔
635

636
    if unique_for do
2,410✔
637
      unique_token =
27✔
638
        Keyword.get_lazy(options, :unique_token, fn ->
639
          string =
21✔
640
            Enum.join([job.queue, job.class] ++ Enum.map(job.args, &:erlang.phash2(&1)), ":")
21✔
641

642
          :crypto.hash(:sha256, string) |> Base.encode64()
21✔
643
        end)
644

645
      Map.put(job, :unique_for, unique_for)
646
      |> Map.put(:unique_until, to_string(Keyword.get(options, :unique_until, :success)))
27✔
647
      |> Map.put(:unique_token, unique_token)
648
      |> Map.put(:unlocks_at, job.enqueued_at + unique_for)
27✔
649
    else
650
      job
2,383✔
651
    end
652
  end
653

654
  defp unique_args(namespace, job, options) do
655
    if Keyword.get(options, :unique_check, false) do
1,427✔
656
      case job do
1,406✔
657
        %{unlocks_at: unlocks_at, unique_token: unique_token} ->
658
          unlocks_in = Enum.max([trunc((unlocks_at - Time.unix_seconds()) * 1000), 0])
27✔
659
          [unlocks_in, unique_key(namespace, unique_token)]
660

661
        _ ->
1,379✔
662
          [nil, nil]
663
      end
664
    else
665
      [nil, nil]
666
    end
667
  end
668

669
  # Returns
670
  # {
671
  #   [
672
  #     job_1_unique_key, job_1_queue_key,
673
  #     job_2_unique_key, job_2_queue_key,
674
  #     ...
675
  #   ],
676
  #   [
677
  #     job_1_jid, job_1_queue, job_1_score, job_1_job_serialized, job_1_unlocks_in,
678
  #     job_2_jid, job_2_queue, job_2_score, job_2_job_serialized, job_2_unlocks_in,
679
  #     ...
680
  #   ]
681
  # }
682
  defp extract_enqueue_all_keys_and_args(namespace, jobs) do
683
    {keys, job_attrs} =
3✔
684
      Enum.reduce(jobs, {[], []}, fn job, {keys_acc, job_attrs_acc} ->
685
        [queue, worker, args, options] = job
6✔
686

687
        {score, enqueued_at} =
6✔
688
          case options[:schedule] do
689
            {:at, at_time} ->
1✔
690
              {Time.time_to_score(at_time), Time.unix_seconds(at_time)}
691

692
            {:in, offset} ->
693
              at_time = Time.offset_from_now(offset)
4✔
694
              {Time.time_to_score(at_time), Time.unix_seconds(at_time)}
695

696
            _ ->
1✔
697
              {"0", Time.unix_seconds()}
698
          end
699

700
        {jid, job_data, job_serialized} =
6✔
701
          to_job_serialized(queue, worker, args, options, enqueued_at)
702

703
        [unlocks_in, unique_key] = unique_args(namespace, job_data, unique_check: true)
6✔
704

705
        # accumulating in reverse order for efficiency
706
        {
707
          [queue_key(namespace, queue), unique_key] ++ keys_acc,
708
          [unlocks_in, job_serialized, score, queue, jid] ++ job_attrs_acc
709
        }
710
      end)
711

712
    {Enum.reverse(keys), Enum.reverse(job_attrs)}
713
  end
714
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc