• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

meltwater / gen_rmq / 723

pending completion
723

Pull #189

travis-ci

web-flow
Merge 538eedfe8 into 7d88f8b40
Pull Request #189: Updated telemetry events

70 of 70 new or added lines in 4 files covered. (100.0%)

267 of 296 relevant lines covered (90.2%)

202.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.18
/lib/consumer.ex
1
defmodule GenRMQ.Consumer do
2
  @moduledoc """
3
  A behaviour module for implementing the RabbitMQ consumer.
4

5
  It will:
6
  * setup RabbitMQ connection / channel and keep them in a state
7
  * create (if does not exist) a queue and bind it to an exchange
8
  * create deadletter queue and exchange
9
  * handle reconnections
10
  * call `handle_message` callback on every message delivery
11
  * call `handle_error` callback whenever `handle_message` fails to process or times out
12
  """
13

14
  use GenServer
15
  use AMQP
16

17
  require Logger
18

19
  alias GenRMQ.Consumer.{
20
    MessageTask,
21
    QueueConfiguration,
22
    Telemetry
23
  }
24

25
  alias GenRMQ.Message
26

27
  ##############################################################################
28
  # GenRMQ.Consumer callbacks
29
  ##############################################################################
30

31
  @doc """
32
  Invoked to provide consumer configuration
33

34
  ## Return values
35
  ### Mandatory:
36

37
  `connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2).
38

39
  `queue` - the name of the queue to consume. If it does not exist, it will be created.
40

41
  `exchange` - Name or `{type, name}` of the exchange to which `queue` should be bound. If it does not exist, it will be created.
42
  For valid exchange types see `GenRMQ.Binding`.
43

44
  `routing_key` - queue binding key, can also be a list.
45

46
  `prefetch_count` - limit the number of unacknowledged messages.
47

48
  ### Optional:
49

50
  `uri` - RabbitMQ uri. Deprecated. Please use `connection`.
51

52
  `queue_options` - Queue options as declared in
53
  [AMQP.Queue.declare/3](https://hexdocs.pm/amqp/AMQP.Queue.html#declare/3).
54

55
  If argument 'x-expires' is given to arguments, then it will be used instead
56
  of `queue_ttl`.
57

58
  If argument 'x-max-priority' is given to arguments, then it will be used
59
  instead of `queue_max_priority`.
60

61
  `queue_ttl` - controls for how long a queue can be unused before it is
62
  automatically deleted. Unused means the queue has no consumers,
63
  the queue has not been redeclared, and basic.get has not been invoked
64
  for a duration of at least the expiration period
65

66
  `queue_max_priority` - defines if a declared queue should be a priority queue.
67
  Should be set to a value from `1..255` range. If it is greater than `255`, queue
68
  max priority will be set to `255`. Values between `1` and `10` are
69
  [recommended](https://www.rabbitmq.com/priority.html#resource-usage).
70

71
  `concurrency` - defines if `handle_message` callback is called
72
  in separate process using [supervised task](https://hexdocs.pm/elixir/Task.Supervisor.html).
73
  By default concurrency is enabled. To disable, set it to `false`
74

75
  `terminate_timeout` - defines how long the consumer will wait for in-flight Tasks to
76
  complete before terminating the process. The value is in milliseconds and the default
77
  is 5_000 milliseconds.
78

79
  `handle_message_timeout` - defines how long the `handle_message` callback will execute
80
  within a supervised task. The value is in milliseconds and the default is 5_000
81
  milliseconds.
82

83
  `retry_delay_function` - custom retry delay function. Called when the connection to
84
  the broker cannot be established. Receives the connection attempt as an argument (>= 1)
85
  and is expected to wait for some time.
86
  With this callback you can for example do exponential backoff.
87
  The default implementation is a linear delay starting with 1 second step.
88

89
  `reconnect` - defines if consumer should reconnect on connection termination.
90
  By default reconnection is enabled.
91

92
  `deadletter` - defines if consumer should setup deadletter exchange and queue.
93
  (**Default:** `true`).
94

95
  `deadletter_queue` - defines name of the deadletter queue (**Default:** Same as queue name suffixed by `_error`).
96

97
  `deadletter_queue_options` - Queue options for the deadletter queue as declared in [AMQP.Queue.declare/3](https://hexdocs.pm/amqp/AMQP.Queue.html#declare/3).
98

99
  If argument 'x-expires' is given to arguments, then it will be used instead of `queue_ttl`.
100

101
  If argument 'x-max-priority' is given to arguments, then it will be used instead of `queue_max_priority`.
102

103
  `deadletter_exchange` - defines name of the deadletter exchange (**Default:** Same as exchange name suffixed by `.deadletter`).
104

105
  `deadletter_routing_key` - defines name of the deadletter routing key (**Default:** `#`).
106

107
  ## Examples:
108
  ```
109
  def init() do
110
    [
111
      connection: "amqp://guest:guest@localhost:5672",
112
      queue: "gen_rmq_in_queue",
113
      queue_options: [
114
        durable: true,
115
        passive: true,
116
        arguments: [
117
          {"x-queue-type", :longstr ,"quorum"}
118
        ]
119
      ]
120
      exchange: "gen_rmq_exchange",
121
      routing_key: "#",
122
      prefetch_count: "10",
123
      uri: "amqp://guest:guest@localhost:5672",
124
      concurrency: true,
125
      terminate_timeout: 5_000,
126
      handle_message_timeout: 5_000,
127
      queue_ttl: 5_000,
128
      retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
129
      reconnect: true,
130
      deadletter: true,
131
      deadletter_queue: "gen_rmq_in_queue_error",
132
      deadletter_queue_options: [
133
        arguments: [
134
          {"x-queue-type", :longstr ,"quorum"}
135
        ]
136
      ]
137
      deadletter_exchange: "gen_rmq_exchange.deadletter",
138
      deadletter_routing_key: "#",
139
      queue_max_priority: 10
140
    ]
141
  end
142
  ```
143

144
  """
145
  @callback init() :: [
146
              connection: keyword | {String.t(), String.t()} | :undefined | keyword,
147
              queue: String.t(),
148
              queue_options: keyword,
149
              exchange: GenRMQ.Binding.exchange(),
150
              routing_key: [String.t()] | String.t(),
151
              prefetch_count: String.t(),
152
              uri: String.t(),
153
              concurrency: boolean,
154
              terminate_timeout: integer,
155
              handle_message_timeout: integer,
156
              queue_ttl: integer,
157
              retry_delay_function: function,
158
              reconnect: boolean,
159
              deadletter: boolean,
160
              deadletter_queue: String.t(),
161
              deadletter_queue_options: keyword,
162
              deadletter_exchange: String.t(),
163
              deadletter_routing_key: String.t(),
164
              queue_max_priority: integer
165
            ]
166

167
  @doc """
168
  Invoked to provide consumer [tag](https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.consumer-tag)
169

170
  ## Examples:
171
  ```
172
  def consumer_tag() do
173
    "hostname-app-version-consumer"
174
  end
175
  ```
176

177
  """
178
  @callback consumer_tag() :: String.t()
179

180
  @doc """
181
  Invoked on message delivery
182

183
  `message` - `GenRMQ.Message` struct
184

185
  ## Examples:
186
  ```
187
  def handle_message(message) do
188
    # Do something with message and acknowledge it
189
    GenRMQ.Consumer.ack(message)
190
  end
191
  ```
192

193
  """
194
  @callback handle_message(message :: %GenRMQ.Message{}) :: :ok
195

196
  @doc """
197
  Invoked when an error or timeout is encountered while executing `handle_message` callback
198

199
  `message` - `GenRMQ.Message` struct
200
  `reason` - the information regarding the error
201

202
  ## Examples:
203
  To reject the message that caused the Task to fail you can do something like so:
204
  ```
205
  def handle_error(message, reason) do
206
    # Do something with message and reject it
207
    Logger.warn("Failed to process message: #\{inspect(message)}")
208

209
    GenRMQ.Consumer.reject(message)
210
  end
211
  ```
212

213
  The `reason` argument will either be the atom `:killed` if the Task timed out and needed
214
  to be stopped. Or it will be a 2 elementr tuple where the first element is the error stuct
215
  and the second element is the stacktrace:
216

217
  ```
218
  {
219
    %RuntimeError{message: "Can't divide by zero!"},
220
    [
221
      {TestConsumer.ErrorWithoutConcurrency, :handle_message, 1, [file: 'test/support/test_consumers.ex', line: 98]},
222
      {GenRMQ.Consumer, :handle_message, 2, [file: 'lib/consumer.ex', line: 519]},
223
      {GenRMQ.Consumer, :handle_info, 2, [file: 'lib/consumer.ex', line: 424]},
224
      {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]},
225
      {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]},
226
      {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
227
    ]
228
  }
229
  ```
230
  """
231
  @callback handle_error(message :: %GenRMQ.Message{}, reason :: atom()) :: :ok
232

233
  ##############################################################################
234
  # GenRMQ.Consumer API
235
  ##############################################################################
236

237
  @doc """
238
  Starts `GenRMQ.Consumer` process with given callback module linked to the current
239
  process
240

241
  `module` - callback module implementing `GenRMQ.Consumer` behaviour
242

243
  ## Options
244
   * `:name` - used for name registration
245

246
  ## Return values
247
  If the consumer is successfully created and initialized, this function returns
248
  `{:ok, pid}`, where `pid` is the PID of the consumer. If a process with the
249
  specified consumer name already exists, this function returns
250
  `{:error, {:already_started, pid}}` with the PID of that process.
251

252
  ## Examples:
253
  ```
254
  GenRMQ.Consumer.start_link(Consumer, name: :consumer)
255
  ```
256

257
  """
258
  @spec start_link(module :: module(), options :: Keyword.t()) :: {:ok, pid} | {:error, term}
259
  def start_link(module, options \\ []) do
260
    GenServer.start_link(__MODULE__, %{module: module}, options)
402✔
261
  end
262

263
  @doc """
264
  Synchronously stops the consumer with a given reason
265

266
  `name` - pid or name of the consumer to stop
267
  `reason` - reason of the termination
268

269
  ## Examples:
270
  ```
271
  GenRMQ.Consumer.stop(:consumer, :normal)
272
  ```
273

274
  """
275
  @spec stop(name :: atom | pid, reason :: term) :: :ok
276
  def stop(name, reason) do
277
    GenServer.stop(name, reason)
24✔
278
  end
279

280
  @doc """
281
  Acknowledges given message
282

283
  `message` - `GenRMQ.Message` struct
284
  """
285
  @spec ack(message :: %GenRMQ.Message{}) :: :ok
286
  def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message) do
287
    Telemetry.emit_message_ack_event(message)
120✔
288

289
    Basic.ack(channel, tag)
120✔
290
  end
291

292
  @doc """
293
  Requeues / rejects given message
294

295
  `message` - `GenRMQ.Message` struct
296

297
  `requeue` - indicates if message should be requeued
298
  """
299
  @spec reject(message :: %GenRMQ.Message{}, requeue :: boolean) :: :ok
300
  def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message, requeue \\ false) do
301
    Telemetry.emit_message_reject_event(message, requeue)
66✔
302

303
    Basic.reject(channel, tag, requeue: requeue)
66✔
304
  end
305

306
  ##############################################################################
307
  # GenServer callbacks
308
  ##############################################################################
309

310
  @doc false
311
  @impl GenServer
312
  def init(%{module: module} = initial_state) do
313
    Process.flag(:trap_exit, true)
402✔
314
    config = apply(module, :init, [])
402✔
315
    parsed_config = parse_config(config)
402✔
316
    terminate_timeout = Keyword.get(parsed_config, :terminate_timeout, 5_000)
402✔
317
    handle_message_timeout = Keyword.get(parsed_config, :handle_message_timeout, 5_000)
402✔
318

319
    state =
402✔
320
      initial_state
321
      |> Map.put(:config, parsed_config)
322
      |> Map.put(:reconnect_attempt, 0)
323
      |> Map.put(:running_tasks, %{})
324
      |> Map.put(:terminate_timeout, terminate_timeout)
325
      |> Map.put(:handle_message_timeout, handle_message_timeout)
326

327
    {:ok, state, {:continue, :init}}
402✔
328
  end
329

330
  @doc false
331
  @impl GenServer
332
  def handle_continue(:init, state) do
333
    state =
402✔
334
      state
335
      |> get_connection()
336
      |> open_channels()
337
      |> setup_consumer()
338
      |> setup_task_supervisor()
339

340
    {:noreply, state}
341
  end
342

343
  @doc false
344
  @impl GenServer
345
  def handle_call({:recover, requeue}, _from, %{in: channel} = state) do
346
    {:reply, Basic.recover(channel, requeue: requeue), state}
×
347
  end
348

349
  @doc false
350
  @impl GenServer
351
  def handle_info(
352
        {:DOWN, ref, :process, _pid, reason},
353
        %{module: module, config: config, running_tasks: running_tasks} = state
354
      ) do
355
    case Map.get(running_tasks, ref) do
66✔
356
      %MessageTask{message: message, timeout_reference: timeout_reference, start_time: start_time} ->
357
        Logger.info("[#{module}]: Task failed to handle message. Reason: #{inspect(reason)}")
12✔
358

359
        # Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback
360
        Process.cancel_timer(timeout_reference)
12✔
361
        updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)}
12✔
362
        Telemetry.emit_message_error_event(module, reason, message, start_time)
12✔
363
        apply(module, :handle_error, [message, reason])
12✔
364

365
        {:noreply, updated_state}
366

367
      _ ->
368
        Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")
54✔
369

370
        Telemetry.emit_connection_down_event(module, reason)
54✔
371

372
        config
373
        |> Keyword.get(:reconnect, true)
374
        |> handle_reconnect(state)
54✔
375
    end
376
  end
377

378
  @doc false
379
  @impl GenServer
380
  def handle_info({ref, _task_result}, %{running_tasks: running_tasks} = state) when is_reference(ref) do
381
    # Task completed successfully, update the running task map and state
382
    Process.demonitor(ref, [:flush])
149✔
383

384
    updated_state =
149✔
385
      case Map.get(running_tasks, ref) do
386
        %MessageTask{} = message_task ->
387
          Process.cancel_timer(message_task.timeout_reference)
149✔
388
          %{state | running_tasks: Map.delete(running_tasks, ref)}
149✔
389

390
        _ ->
391
          state
×
392
      end
393

394
    {:noreply, updated_state}
395
  end
396

397
  @doc false
398
  @impl GenServer
399
  def handle_info({:kill, task_reference}, %{running_tasks: running_tasks} = state) when is_reference(task_reference) do
400
    # The task has timed out, kill the Task process which will trigger a :DOWN event that
401
    # is handled by a previous `handle_info/2` callback
402
    %MessageTask{task: %Task{pid: pid}} = Map.get(running_tasks, task_reference)
6✔
403
    Process.exit(pid, :kill)
6✔
404

405
    {:noreply, state}
406
  end
407

408
  @doc false
409
  @impl GenServer
410
  def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
411
    Logger.info("[#{module}]: Broker confirmed consumer with tag #{consumer_tag}")
426✔
412
    {:noreply, state}
413
  end
414

415
  @doc false
416
  @impl GenServer
417
  def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
418
    Logger.warn("[#{module}]: The consumer was unexpectedly cancelled, tag: #{consumer_tag}")
126✔
419
    {:stop, :cancelled, state}
126✔
420
  end
421

422
  @doc false
423
  @impl GenServer
424
  def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
425
    Logger.info("[#{module}]: Consumer was cancelled, tag: #{consumer_tag}")
×
426
    {:noreply, state}
427
  end
428

429
  @doc false
430
  @impl GenServer
431
  def handle_info(
432
        {:basic_deliver, payload, attributes},
433
        %{module: module, running_tasks: running_tasks, handle_message_timeout: handle_message_timeout} = state
434
      ) do
435
    %{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes
186✔
436
    Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}")
186✔
437

438
    if redelivered do
186✔
439
      Logger.debug("[#{module}]: Redelivered payload for message. Tag: #{tag}, payload: #{payload}")
×
440
    end
441

442
    message = Message.create(attributes, payload, state)
186✔
443

444
    updated_state =
186✔
445
      case handle_message(message, state) do
446
        %Task{ref: task_reference} = task ->
447
          timeout_reference = Process.send_after(self(), {:kill, task_reference}, handle_message_timeout)
168✔
448
          message_task = MessageTask.create(task, timeout_reference, message)
168✔
449
          %{state | running_tasks: Map.put(running_tasks, task_reference, message_task)}
168✔
450

451
        _ ->
452
          state
18✔
453
      end
454

455
    {:noreply, updated_state}
456
  end
457

458
  @doc false
459
  @impl GenServer
460
  def terminate(:connection_closed = reason, %{module: module} = state) do
461
    await_running_tasks(state)
12✔
462

463
    # Since connection has been closed no need to clean it up
464
    Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
12✔
465
  end
466

467
  @doc false
468
  @impl GenServer
469
  def terminate(reason, %{module: module, conn: conn, in: in_chan, out: out_chan} = state) do
470
    await_running_tasks(state)
384✔
471

472
    Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
384✔
473
    Channel.close(in_chan)
384✔
474
    Channel.close(out_chan)
384✔
475
    Connection.close(conn)
384✔
476
  end
477

478
  @doc false
479
  @impl GenServer
480
  def terminate({{:shutdown, {:server_initiated_close, error_code, reason}}, _}, %{module: module} = state) do
481
    await_running_tasks(state)
6✔
482

483
    Logger.error("[#{module}]: Terminating consumer, error_code: #{inspect(error_code)}, reason: #{inspect(reason)}")
6✔
484
  end
485

486
  @doc false
487
  @impl GenServer
488
  def terminate(reason, %{module: module} = state) do
489
    await_running_tasks(state)
×
490

491
    Logger.error("[#{module}]: Terminating consumer, unexpected reason: #{inspect(reason)}")
×
492
  end
493

494
  ##############################################################################
495
  # Helpers
496
  ##############################################################################
497

498
  defp await_running_tasks(%{running_tasks: running_tasks, terminate_timeout: terminate_timeout}) do
499
    # Await for all in-flight tasks for the configured amount of time and cancel
500
    # their individual timeout timers
501
    running_tasks
502
    |> Map.values()
503
    |> Enum.map(fn %MessageTask{} = message_task ->
504
      Process.cancel_timer(message_task.timeout_reference)
7✔
505
      message_task.task
7✔
506
    end)
507
    |> Task.yield_many(terminate_timeout)
402✔
508
  end
509

510
  defp parse_config(config) do
511
    queue_name = Keyword.fetch!(config, :queue)
402✔
512

513
    config
514
    |> Keyword.put(:queue, QueueConfiguration.setup(queue_name, config))
515
    |> Keyword.put(:connection, Keyword.get(config, :connection, config[:uri]))
402✔
516
  end
517

518
  defp handle_message(message, %{module: module, task_supervisor: task_supervisor_pid})
519
       when is_pid(task_supervisor_pid) do
520
    Task.Supervisor.async_nolink(
168✔
521
      task_supervisor_pid,
522
      fn ->
523
        start_time = System.monotonic_time()
168✔
524

525
        Telemetry.emit_message_start_event(message, module)
168✔
526
        result = apply(module, :handle_message, [message])
168✔
527
        Telemetry.emit_message_stop_event(start_time, message, module)
156✔
528

529
        result
156✔
530
      end,
531
      shutdown: :brutal_kill
532
    )
533
  end
534

535
  defp handle_message(message, %{module: module}) do
536
    start_time = System.monotonic_time()
18✔
537
    Telemetry.emit_message_start_event(message, module)
18✔
538

539
    try do
18✔
540
      result = apply(module, :handle_message, [message])
18✔
541
      Telemetry.emit_message_stop_event(start_time, message, module)
12✔
542

543
      result
12✔
544
    rescue
545
      reason ->
6✔
546
        full_error = {reason, __STACKTRACE__}
6✔
547
        Telemetry.emit_message_error_event(module, full_error, message, start_time)
6✔
548
        apply(module, :handle_error, [message, full_error])
6✔
549
        :error
550
    end
551
  end
552

553
  defp handle_reconnect(false, %{module: module} = state) do
554
    Logger.info("[#{module}]: Reconnection is disabled. Terminating consumer.")
12✔
555
    {:stop, :connection_closed, state}
12✔
556
  end
557

558
  defp handle_reconnect(_, state) do
559
    new_state =
42✔
560
      state
561
      |> Map.put(:reconnect_attempt, 0)
562
      |> get_connection()
563
      |> open_channels()
564
      |> setup_consumer()
565

566
    {:noreply, new_state}
567
  end
568

569
  defp get_connection(%{config: config, module: module, reconnect_attempt: attempt} = state) do
570
    start_time = System.monotonic_time()
444✔
571
    queue = config[:queue]
444✔
572
    exchange = config[:exchange]
444✔
573
    routing_key = config[:routing_key]
444✔
574

575
    Telemetry.emit_connection_start_event(module, attempt, queue, exchange, routing_key)
444✔
576

577
    case Connection.open(config[:connection]) do
444✔
578
      {:ok, conn} ->
579
        Telemetry.emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
444✔
580
        Process.monitor(conn.pid)
444✔
581
        Map.put(state, :conn, conn)
444✔
582

583
      {:error, e} ->
584
        Logger.error(
×
585
          "[#{module}]: Failed to connect to RabbitMQ with settings: " <>
×
586
            "#{inspect(strip_key(config, :connection))}, reason #{inspect(e)}"
587
        )
588

589
        Telemetry.emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)
×
590

591
        retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
×
592
        next_attempt = attempt + 1
×
593
        retry_delay_fn.(next_attempt)
×
594

595
        state
596
        |> Map.put(:reconnect_attempt, next_attempt)
597
        |> get_connection()
×
598
    end
599
  end
600

601
  defp open_channels(%{conn: conn} = state) do
602
    {:ok, chan} = Channel.open(conn)
444✔
603
    {:ok, out_chan} = Channel.open(conn)
444✔
604
    Map.merge(state, %{in: chan, out: out_chan})
444✔
605
  end
606

607
  defp setup_task_supervisor(%{config: config} = state) do
608
    if Keyword.get(config, :concurrency, true) do
396✔
609
      {:ok, pid} = Task.Supervisor.start_link(max_restarts: 0)
372✔
610

611
      Map.put(state, :task_supervisor, pid)
372✔
612
    else
613
      Map.put(state, :task_supervisor, nil)
24✔
614
    end
615
  end
616

617
  defp setup_consumer(%{in: chan, config: config, module: module} = state) do
618
    queue_config = config[:queue]
444✔
619
    prefetch_count = String.to_integer(config[:prefetch_count])
444✔
620

621
    if queue_config.dead_letter[:create] do
444✔
622
      setup_deadletter(chan, queue_config.dead_letter)
396✔
623
    end
624

625
    Basic.qos(chan, prefetch_count: prefetch_count)
444✔
626
    setup_queue(queue_config.name, queue_config.options, chan, config[:exchange], config[:routing_key])
444✔
627
    consumer_tag = apply(module, :consumer_tag, [])
438✔
628
    {:ok, _consumer_tag} = Basic.consume(chan, queue_config.name, nil, consumer_tag: consumer_tag)
438✔
629
    state
438✔
630
  end
631

632
  defp setup_deadletter(chan, config) do
633
    setup_queue(config[:name], config[:options], chan, config[:exchange], config[:routing_key])
396✔
634
  end
635

636
  defp setup_queue(name, options, chan, exchange, routing_key) do
637
    Queue.declare(chan, name, options)
840✔
638
    GenRMQ.Binding.bind_exchange_and_queue(chan, exchange, name, routing_key)
840✔
639
  end
640

641
  defp strip_key(keyword_list, key) do
642
    keyword_list
643
    |> Keyword.delete(key)
644
    |> Keyword.put(key, "[FILTERED]")
×
645
  end
646

647
  defp linear_delay(attempt), do: :timer.sleep(attempt * 1_000)
×
648

649
  ##############################################################################
650
  ##############################################################################
651
  ##############################################################################
652
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc