• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 14361258787

09 Apr 2025 03:37PM UTC coverage: 48.77%. Remained the same
14361258787

push

github

web-flow
fix: use `LOCATION_ID` for cluster identification (#639)

It will fallback to `REGION` if there is no `LOCATION_ID`. It also adds
support for `CLUSTER_ID` to specify the cluster independently of the
above variables.

991 of 2032 relevant lines covered (48.77%)

661.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.52
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server. It is
4
  implemented as a Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
5
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
6
  supervisor. Each client connection is assigned to a specific tenant supervisor.
7
  """
8

9
  require Logger
10

11
  @behaviour :ranch_protocol
12
  @behaviour :gen_statem
13
  @proto [:tcp, :ssl]
14
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
15
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
16
  @timeout_subscribe 500
17
  @clients_registry Supavisor.Registry.TenantClients
18
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
19

20
  alias Supavisor.{
21
    DbHandler,
22
    HandlerHelpers,
23
    Helpers,
24
    Monitoring.Telem,
25
    Protocol.Client,
26
    Tenants
27
  }
28

29
  require Supavisor.Protocol.Server, as: Server
30

31
  @impl true
32
  def start_link(ref, transport, opts) do
33
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
203✔
34
    {:ok, pid}
35
  end
36

37
  @impl true
38
  def callback_mode, do: [:handle_event_function]
203✔
39

40
  @spec db_status(pid(), :ready_for_query | :read_sql_error, binary()) :: :ok
41
  def db_status(pid, status, bin), do: :gen_statem.cast(pid, {:db_status, status, bin})
563✔
42

43
  @impl true
44
  def init(_), do: :ignore
×
45

46
  def init(ref, trans, opts) do
47
    Process.flag(:trap_exit, true)
203✔
48
    Helpers.set_max_heap_size(90)
203✔
49

50
    {:ok, sock} = :ranch.handshake(ref)
203✔
51

52
    :ok =
203✔
53
      trans.setopts(sock,
54
        # mode: :binary,
55
        # packet: :raw,
56
        # recbuf: 8192,
57
        # sndbuf: 8192,
58
        # # backlog: 2048,
59
        # send_timeout: 120,
60
        # keepalive: true,
61
        # nodelay: true,
62
        # nopush: true,
63
        active: true
64
      )
65

66
    Logger.debug("ClientHandler is: #{inspect(self())}")
203✔
67

68
    data = %{
203✔
69
      id: nil,
70
      sock: {:gen_tcp, sock},
71
      trans: trans,
72
      db_pid: nil,
73
      tenant: nil,
74
      user: nil,
75
      pool: nil,
76
      manager: nil,
77
      query_start: nil,
78
      timeout: nil,
79
      ps: nil,
80
      ssl: false,
81
      auth_secrets: nil,
82
      proxy_type: nil,
83
      mode: opts.mode,
203✔
84
      stats: %{},
85
      idle_timeout: 0,
86
      db_name: nil,
87
      last_query: nil,
88
      heartbeat_interval: 0,
89
      connection_start: System.monotonic_time(),
90
      log_level: nil,
91
      auth: %{},
92
      tenant_availability_zone: nil,
93
      local: opts[:local] || false,
203✔
94
      active_count: 0,
95
      peer_ip: Helpers.peer_ip(sock),
96
      app_name: nil,
97
      subscribe_retries: 0
98
    }
99

100
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
203✔
101
  end
102

103
  @impl true
104
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do
105
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
106

107
    HandlerHelpers.sock_send(
1✔
108
      data.sock,
1✔
109
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
110
    )
111

112
    {:stop, {:shutdown, :http_request}}
113
  end
114

115
  # cancel request
116
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _, _) do
117
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
21✔
118
    :ok = HandlerHelpers.send_cancel_query(pid, key)
21✔
119
    {:stop, {:shutdown, :cancel_query}}
120
  end
121

122
  # send cancel request to db
123
  def handle_event(:info, :cancel_query, :busy, data) do
124
    key = {data.tenant, data.db_pid}
3✔
125
    Logger.debug("ClientHandler: Cancel query for #{inspect(key)}")
3✔
126
    {_pool, db_pid, _db_sock} = data.db_pid
3✔
127

128
    case db_pid_meta(key) do
3✔
129
      [{^db_pid, meta}] ->
130
        :ok = HandlerHelpers.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key)
3✔
131

132
      error ->
133
        Logger.error(
×
134
          "ClientHandler: Received cancel but no proc was found #{inspect(key)} #{inspect(error)}"
135
        )
136
    end
137

138
    :keep_state_and_data
139
  end
140

141
  def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %{sock: sock} = data) do
142
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
×
143

144
    downstream_cert = Helpers.downstream_cert()
×
145
    downstream_key = Helpers.downstream_key()
×
146

147
    # SSL negotiation, S/N/Error
148
    if !!downstream_cert and !!downstream_key do
×
149
      :ok = HandlerHelpers.setopts(sock, active: false)
×
150
      :ok = HandlerHelpers.sock_send(sock, "S")
×
151

152
      opts = [
×
153
        verify: :verify_none,
154
        certfile: downstream_cert,
155
        keyfile: downstream_key
156
      ]
157

158
      case :ssl.handshake(elem(sock, 1), opts) do
×
159
        {:ok, ssl_sock} ->
160
          socket = {:ssl, ssl_sock}
×
161
          :ok = HandlerHelpers.setopts(socket, active: true)
×
162
          {:keep_state, %{data | sock: socket, ssl: true}}
163

164
        error ->
165
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
166
          Telem.client_join(:fail, data.id)
×
167
          {:stop, {:shutdown, :ssl_handshake_error}}
168
      end
169
    else
170
      Logger.error(
×
171
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
172
      )
173

174
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
175
      :keep_state_and_data
176
    end
177
  end
178

179
  def handle_event(:info, {_, _, bin}, :exchange, _) when byte_size(bin) > 1024 do
180
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
181
    {:stop, {:shutdown, :startup_packet_too_large}}
182
  end
183

184
  def handle_event(:info, {_, _, bin}, :exchange, data) do
185
    case Server.decode_startup_packet(bin) do
179✔
186
      {:ok, hello} ->
187
        Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}")
177✔
188
        {type, {user, tenant_or_alias, db_name}} = HandlerHelpers.parse_user_info(hello.payload)
177✔
189

190
        if Helpers.validate_name(user) and Helpers.validate_name(db_name) do
177✔
191
          log_level = maybe_change_log(hello)
176✔
192
          search_path = hello.payload["options"]["--search_path"]
176✔
193
          event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
176✔
194
          app_name = app_name(hello.payload["application_name"])
176✔
195

196
          {:keep_state, %{data | log_level: log_level, app_name: app_name},
176✔
197
           {:next_event, :internal, event}}
198
        else
199
          reason = "Invalid format for user or db_name"
1✔
200
          Logger.error("ClientHandler: #{inspect(reason)} #{inspect({user, db_name})}")
1✔
201
          Telem.client_join(:fail, tenant_or_alias)
1✔
202

203
          HandlerHelpers.send_error(
1✔
204
            data.sock,
1✔
205
            "XX000",
206
            "Authentication error, reason: #{inspect(reason)}"
207
          )
208

209
          {:stop, {:shutdown, :invalid_format}}
210
        end
211

212
      {:error, error} ->
213
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
2✔
214
        Telem.client_join(:fail, data.id)
2✔
215
        {:stop, {:shutdown, :startup_packet_error}}
216
    end
217
  end
218

219
  def handle_event(
220
        :internal,
221
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
222
        :exchange,
223
        %{sock: sock} = data
224
      ) do
225
    sni_hostname = HandlerHelpers.try_get_sni(sock)
176✔
226

227
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
176✔
228
      {:ok, info} ->
229
        db_name = db_name || info.tenant.db_database
176✔
230

231
        id =
176✔
232
          Supavisor.id(
233
            {type, tenant_or_alias},
234
            user,
235
            data.mode,
176✔
236
            info.user.mode_type,
176✔
237
            db_name,
238
            search_path
239
          )
240

241
        mode = Supavisor.mode(id)
176✔
242

243
        Logger.metadata(
176✔
244
          project: tenant_or_alias,
245
          user: user,
246
          mode: mode,
247
          type: type,
248
          db_name: db_name,
249
          app_name: data.app_name,
176✔
250
          peer_ip: data.peer_ip,
176✔
251
          local: data.local
176✔
252
        )
253

254
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
176✔
255

256
        cond do
174✔
257
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
174✔
258
            Logger.error(
×
259
              "ClientHandler: Tenant is not allowed to connect without SSL, user #{user}"
×
260
            )
261

262
            :ok = HandlerHelpers.send_error(sock, "XX000", "SSL connection is required")
×
263
            Telem.client_join(:fail, id)
×
264
            {:stop, {:shutdown, :ssl_required}}
265

266
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
174✔
267
            message = "Address not in tenant allow_list: " <> inspect(addr)
×
268
            Logger.error("ClientHandler: #{message}")
×
269
            :ok = HandlerHelpers.send_error(sock, "XX000", message)
×
270

271
            Telem.client_join(:fail, id)
×
272
            {:stop, {:shutdown, :address_not_allowed}}
273

274
          true ->
174✔
275
            new_data = update_user_data(data, info, user, id, db_name, mode)
174✔
276

277
            key = {:secrets, tenant_or_alias, user}
174✔
278

279
            case auth_secrets(info, user, key, :timer.hours(24)) do
174✔
280
              {:ok, auth_secrets} ->
281
                Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
174✔
282

283
                {:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}
174✔
284

285
              {:error, reason} ->
286
                Logger.error(
×
287
                  "ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
288
                )
289

290
                :ok =
×
291
                  HandlerHelpers.send_error(
292
                    sock,
293
                    "XX000",
294
                    "Authentication error, reason: #{inspect(reason)}"
295
                  )
296

297
                Telem.client_join(:fail, id)
×
298
                {:stop, {:shutdown, :auth_secrets_error}}
299
            end
300
        end
301

302
      {:error, reason} ->
303
        Logger.error(
×
304
          "ClientHandler: User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}"
305
        )
306

307
        :ok = HandlerHelpers.send_error(sock, "XX000", "Tenant or user not found")
×
308
        Telem.client_join(:fail, data.id)
×
309
        {:stop, {:shutdown, :user_not_found}}
310
    end
311
  end
312

313
  def handle_event(
314
        :internal,
315
        {:handle, {method, secrets}, info},
316
        _,
317
        %{sock: sock} = data
318
      ) do
319
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
174✔
320

321
    case handle_exchange(sock, {method, secrets}) do
174✔
322
      {:error, reason} ->
323
        Logger.error(
2✔
324
          "ClientHandler: Exchange error: #{inspect(reason)} when method #{inspect(method)}"
325
        )
326

327
        msg =
2✔
328
          if method == :auth_query_md5,
329
            do: Server.error_message("XX000", reason),
×
330
            else: Server.exchange_message(:final, "e=#{reason}")
2✔
331

332
        key = {:secrets_check, data.tenant, data.user}
2✔
333

334
        if method != :password and reason == "Wrong password" and
2✔
335
             Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
1✔
336
          case auth_secrets(info, data.user, key, 15_000) do
1✔
337
            {:ok, {method2, secrets2}} = value ->
338
              if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do
1✔
339
                Logger.warning("ClientHandler: Update secrets and terminate pool")
1✔
340

341
                Cachex.update(
1✔
342
                  Supavisor.Cache,
343
                  {:secrets, data.tenant, data.user},
1✔
344
                  {:cached, value}
345
                )
346

347
                Supavisor.stop(data.id)
1✔
348
              else
349
                Logger.debug("ClientHandler: Cache the same #{inspect(key)}")
×
350
              end
351

352
            other ->
353
              Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
×
354
          end
355
        else
356
          Logger.debug("ClientHandler: Cache hit for #{inspect(key)}")
1✔
357
        end
358

359
        HandlerHelpers.sock_send(sock, msg)
2✔
360
        Telem.client_join(:fail, data.id)
2✔
361
        {:stop, {:shutdown, :exchange_error}}
362

363
      {:ok, client_key} ->
364
        secrets =
172✔
365
          if client_key,
366
            do: fn -> Map.put(secrets.(), :client_key, client_key) end,
144✔
367
            else: secrets
28✔
368

369
        Logger.debug("ClientHandler: Exchange success")
172✔
370
        :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
172✔
371
        Telem.client_join(:ok, data.id)
172✔
372

373
        auth = Map.merge(data.auth, %{secrets: secrets, method: method})
172✔
374

375
        conn_type =
172✔
376
          if data.mode == :proxy,
172✔
377
            do: :connect_db,
378
            else: :subscribe
379

380
        {:keep_state, %{data | auth_secrets: {method, secrets}, auth: auth},
172✔
381
         {:next_event, :internal, conn_type}}
382
    end
383
  end
384

385
  def handle_event(:internal, :subscribe, _, data) do
386
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
172✔
387

388
    with {:ok, sup} <-
172✔
389
           Supavisor.start_dist(data.id, data.auth_secrets,
172✔
390
             log_level: data.log_level,
172✔
391
             availability_zone: data.tenant_availability_zone
172✔
392
           ),
393
         true <-
172✔
394
           if(node(sup) != node() and data.mode in [:transaction, :session],
172✔
395
             do: :proxy,
396
             else: true
397
           ),
398
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
172✔
399
      manager_ref = Process.monitor(opts.workers.manager)
171✔
400
      data = Map.merge(data, opts.workers)
171✔
401
      db_pid = db_checkout(:both, :on_connect, data)
171✔
402
      data = %{data | manager: manager_ref, db_pid: db_pid, idle_timeout: opts.idle_timeout}
170✔
403

404
      Registry.register(@clients_registry, data.id, [])
170✔
405

406
      next =
170✔
407
        if opts.ps == [],
170✔
408
          do: {:timeout, 10_000, :wait_ps},
18✔
409
          else: {:next_event, :internal, {:greetings, opts.ps}}
152✔
410

411
      {:keep_state, data, next}
170✔
412
    else
413
      {:error, :max_clients_reached} ->
414
        msg = "Max client connections reached"
1✔
415
        Logger.error("ClientHandler: #{msg}")
1✔
416
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
1✔
417
        Telem.client_join(:fail, data.id)
1✔
418
        {:stop, {:shutdown, :max_clients_reached}}
419

420
      {:error, :max_pools_reached} ->
421
        msg = "Max pools count reached"
×
422
        Logger.error("ClientHandler: #{msg}")
×
423
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
×
424
        Telem.client_join(:fail, data.id)
×
425
        {:stop, {:shutdown, :max_pools_reached}}
426

427
      :proxy ->
428
        case Supavisor.get_pool_ranch(data.id) do
×
429
          {:ok, %{port: port, host: host}} ->
430
            auth =
×
431
              Map.merge(data.auth, %{
×
432
                port: port,
433
                host: to_charlist(host),
434
                ip_version: :inet,
435
                upstream_ssl: false,
436
                upstream_tls_ca: nil,
437
                upstream_verify: nil
438
              })
439

440
            Logger.metadata(proxy: true)
×
441
            Registry.register(@proxy_clients_registry, data.id, [])
×
442

443
            {:keep_state, %{data | auth: auth}, {:next_event, :internal, :connect_db}}
×
444

445
          other ->
446
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
447
            timeout_subscribe_or_terminate(data)
×
448
        end
449

450
      error ->
451
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
×
452
        timeout_subscribe_or_terminate(data)
×
453
    end
454
  end
455

456
  def handle_event(:internal, :connect_db, _, data) do
457
    Logger.debug("ClientHandler: Trying to connect to DB")
×
458

459
    args = %{
×
460
      id: data.id,
×
461
      auth: data.auth,
×
462
      user: data.user,
×
463
      tenant: {:single, data.tenant},
×
464
      replica_type: :write,
465
      mode: :proxy,
466
      proxy: true,
467
      log_level: data.log_level,
×
468
      caller: self(),
469
      client_sock: data.sock
×
470
    }
471

472
    {:ok, db_pid} = DbHandler.start_link(args)
×
473
    db_sock = :gen_statem.call(db_pid, {:checkout, data.sock, self()})
×
474
    {:keep_state, %{data | db_pid: {nil, db_pid, db_sock}, mode: :proxy}}
475
  end
476

477
  def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do
478
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
170✔
479
    msg = [ps, [header, payload], Server.ready_for_query()]
170✔
480
    :ok = HandlerHelpers.listen_cancel_query(pid, key)
170✔
481
    :ok = HandlerHelpers.sock_send(sock, msg)
170✔
482
    Telem.client_connection_time(data.connection_start, data.id)
170✔
483
    {:next_state, :idle, data, handle_actions(data)}
170✔
484
  end
485

486
  def handle_event(:timeout, :subscribe, _, _),
×
487
    do: {:keep_state_and_data, {:next_event, :internal, :subscribe}}
488

489
  def handle_event(:timeout, :wait_ps, _, data) do
490
    Logger.error(
×
491
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
492
    )
493

494
    ps = Server.encode_parameter_status(data.ps)
×
495
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
496
  end
497

498
  def handle_event(:timeout, :idle_terminate, _, data) do
499
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
500
    {:stop, {:shutdown, :idle_terminate}}
501
  end
502

503
  def handle_event(:timeout, :heartbeat_check, _, data) do
504
    Logger.debug("ClientHandler: Send heartbeat to client")
×
505
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
506
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
507
  end
508

509
  def handle_event(kind, {proto, socket, msg}, state, data)
510
      when proto in @proto and is_binary(msg) do
511
    with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
4,830✔
512
      new_actions =
641✔
513
        actions
514
        |> List.wrap()
515
        |> Enum.map(fn
516
          {:next_event, type, bin} when is_binary(bin) ->
517
            {:next_event, type, {proto, socket, bin}}
641✔
518

519
          other ->
520
            other
×
521
        end)
522

523
      {:next_state, next_state, new_data, new_actions}
641✔
524
    end
525
  end
526

527
  def handle_event(:info, {:parameter_status, :updated}, _, _) do
528
    Logger.warning("ClientHandler: Parameter status is updated")
×
529
    {:stop, {:shutdown, :parameter_status_updated}}
530
  end
531

532
  def handle_event(:info, {:parameter_status, ps}, :exchange, _),
18✔
533
    do: {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
534

535
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
536
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
537
    :keep_state_and_data
538
  end
539

540
  # client closed connection
541
  def handle_event(_, {closed, _}, _, data)
542
      when closed in [:tcp_closed, :ssl_closed] do
543
    Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}")
146✔
544
    {:stop, {:shutdown, :socket_closed}}
545
  end
546

547
  # linked DbHandler went down
548
  def handle_event(:info, {:EXIT, db_pid, reason}, _, data) do
549
    Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}")
×
550
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited"))
×
551
    {:stop, {:shutdown, :db_handler_exit}}
552
  end
553

554
  # pool's manager went down
555
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
556
    Logger.error(
×
557
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
×
558
    )
559

560
    case {state, reason} do
×
561
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
562
      {:idle, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
563
      {:busy, _} -> {:stop, {:shutdown, :manager_down}}
×
564
    end
565
  end
566

567
  def handle_event(:info, {:disconnect, reason}, _, _data) do
568
    Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}")
×
569
    {:stop, {:shutdown, {:disconnect, reason}}}
570
  end
571

572
  # emulate handle_cast
573
  def handle_event(:cast, {:db_status, status, bin}, :busy, data) do
574
    case status do
563✔
575
      :ready_for_query ->
576
        Logger.debug("ClientHandler: Client is ready")
563✔
577

578
        :ok = HandlerHelpers.sock_send(data.sock, bin)
563✔
579

580
        db_pid = handle_db_pid(data.mode, data.pool, data.db_pid)
563✔
581

582
        {_, stats} =
563✔
583
          if not data.local,
563✔
584
            do: Telem.network_usage(:client, data.sock, data.id, data.stats),
559✔
585
            else: {nil, data.stats}
4✔
586

587
        Telem.client_query_time(data.query_start, data.id)
563✔
588

589
        {:next_state, :idle,
563✔
590
         %{data | db_pid: db_pid, stats: stats, active_count: reset_active_count(data)},
591
         handle_actions(data)}
592

593
      :read_sql_error ->
594
        Logger.error(
×
595
          "ClientHandler: read only sql transaction, rerunning the query to write pool"
596
        )
597

598
        # release the read pool
599
        _ = handle_db_pid(data.mode, data.pool, data.db_pid)
×
600

601
        ts = System.monotonic_time()
×
602
        db_pid = db_checkout(:write, :on_query, data)
×
603

604
        {:keep_state, %{data | db_pid: db_pid, query_start: ts},
×
605
         {:next_event, :internal, {:tcp, nil, data.last_query}}}
×
606
    end
607
  end
608

609
  def handle_event(:info, {sock_error, _sock, msg}, _state, _data)
610
      when sock_error in [:tcp_error, :ssl_error] do
611
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}")
×
612

613
    {:stop, {:shutdown, {:socket_error, msg}}}
614
  end
615

616
  def handle_event(type, content, _state, _data) do
617
    msg = [
18✔
618
      {"type", type},
619
      {"content", content}
620
    ]
621

622
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
18✔
623

624
    :keep_state_and_data
625
  end
626

627
  @impl true
628
  def terminate(
629
        {:timeout, {_, _, [_, {:checkout, _, _}, _]}},
630
        _,
631
        data
632
      ) do
633
    msg =
1✔
634
      case data.mode do
1✔
635
        :session ->
1✔
636
          "MaxClientsInSessionMode: max clients reached - in Session mode max clients are limited to pool_size"
637

638
        :transaction ->
×
639
          "Unable to check out process from the pool due to timeout"
640
      end
641

642
    Logger.error("ClientHandler: #{msg}")
1✔
643
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", msg))
1✔
644
    :ok
645
  end
646

647
  def terminate(reason, _state, %{db_pid: {_, pid, _}}) do
648
    db_info =
79✔
649
      with {:ok, {state, mode} = resp} <- DbHandler.get_state_and_mode(pid) do
×
650
        if state == :busy or mode == :session, do: DbHandler.stop(pid)
79✔
651
        resp
79✔
652
      end
653

654
    Logger.debug(
79✔
655
      "ClientHandler: socket closed with reason #{inspect(reason)}, DbHandler #{inspect({pid, db_info})}"
656
    )
657

658
    :ok
659
  end
660

661
  def terminate(reason, _state, _data) do
662
    Logger.debug("ClientHandler: socket closed with reason #{inspect(reason)}")
123✔
663
    :ok
664
  end
665

666
  ## Internal functions
667

668
  @spec handle_exchange(Supavisor.sock(), {atom(), fun()}) ::
669
          {:ok, binary() | nil} | {:error, String.t()}
670
  def handle_exchange({_, socket} = sock, {:auth_query_md5 = method, secrets}) do
671
    salt = :crypto.strong_rand_bytes(4)
×
672
    :ok = HandlerHelpers.sock_send(sock, Server.md5_request(salt))
×
673

674
    with {:ok,
×
675
          %{
676
            tag: :password_message,
677
            payload: {:md5, client_md5}
678
          }, _} <- receive_next(socket, "Timeout while waiting for the md5 exchange"),
×
679
         {:ok, key} <- authenticate_exchange(method, client_md5, secrets.().secret, salt) do
×
680
      {:ok, key}
681
    else
682
      {:error, message} -> {:error, message}
×
683
      other -> {:error, "Unexpected message #{inspect(other)}"}
×
684
    end
685
  end
686

687
  def handle_exchange({_, socket} = sock, {method, secrets}) do
688
    :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
174✔
689

690
    with {:ok,
174✔
691
          %{
692
            tag: :password_message,
693
            payload: {:scram_sha_256, %{"n" => user, "r" => nonce, "c" => channel}}
694
          },
695
          _} <-
174✔
696
           receive_next(
697
             socket,
698
             "Timeout while waiting for the first password message"
699
           ),
700
         {:ok, signatures} = reply_first_exchange(sock, method, secrets, channel, nonce, user),
174✔
701
         {:ok,
702
          %{
703
            tag: :password_message,
704
            payload: {:first_msg_response, %{"p" => p}}
705
          },
706
          _} <-
174✔
707
           receive_next(
708
             socket,
709
             "Timeout while waiting for the second password message"
710
           ),
711
         {:ok, key} <- authenticate_exchange(method, secrets, signatures, p) do
174✔
712
      message = "v=#{Base.encode64(signatures.server)}"
172✔
713
      :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:final, message))
172✔
714
      {:ok, key}
715
    else
716
      {:error, message} -> {:error, message}
2✔
717
      other -> {:error, "Unexpected message #{inspect(other)}"}
×
718
    end
719
  end
720

721
  defp receive_next(socket, timeout_message) do
722
    receive do
348✔
723
      {_proto, ^socket, bin} ->
724
        Server.decode_pkt(bin)
348✔
725

726
      other ->
×
727
        {:error, "Unexpected message in receive_next/2 #{inspect(other)}"}
728
    after
729
      15_000 -> {:error, timeout_message}
×
730
    end
731
  end
732

733
  defp reply_first_exchange(sock, method, secrets, channel, nonce, user) do
734
    {message, signatures} = exchange_first(method, secrets, nonce, user, channel)
174✔
735
    :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:first, message))
174✔
736
    {:ok, signatures}
737
  end
738

739
  defp authenticate_exchange(:password, _secrets, signatures, p) do
740
    if p == signatures.client,
29✔
741
      do: {:ok, nil},
742
      else: {:error, "Wrong password"}
743
  end
744

745
  defp authenticate_exchange(:auth_query, secrets, signatures, p) do
746
    client_key = :crypto.exor(Base.decode64!(p), signatures.client)
145✔
747

748
    if Helpers.hash(client_key) == secrets.().stored_key do
145✔
749
      {:ok, client_key}
750
    else
751
      {:error, "Wrong password"}
752
    end
753
  end
754

755
  defp authenticate_exchange(:auth_query_md5, client_hash, server_hash, salt) do
756
    if "md5" <> Helpers.md5([server_hash, salt]) == client_hash,
×
757
      do: {:ok, nil},
758
      else: {:error, "Wrong password"}
759
  end
760

761
  @spec db_checkout(:write | :read | :both, :on_connect | :on_query, map) ::
762
          {pid, pid, Supavisor.sock()} | nil
763
  defp db_checkout(_, _, %{mode: mode, db_pid: {pool, db_pid, db_sock}})
764
       when is_pid(db_pid) and mode in [:session, :proxy] do
765
    {pool, db_pid, db_sock}
76✔
766
  end
767

768
  defp db_checkout(_, :on_connect, %{mode: :transaction}), do: nil
93✔
769

770
  defp db_checkout(query_type, _, data) when query_type in [:write, :read] do
771
    pool =
×
772
      data.pool[query_type]
×
773
      |> Enum.random()
774

775
    {time, db_pid} = :timer.tc(:poolboy, :checkout, [pool, true, data.timeout])
×
776
    Process.link(db_pid)
×
777
    same_box = if node(db_pid) == node(), do: :local, else: :remote
×
778
    Telem.pool_checkout_time(time, data.id, same_box)
×
779
    {pool, db_pid}
780
  end
781

782
  defp db_checkout(_, _, data) do
783
    start = System.monotonic_time(:microsecond)
643✔
784
    db_pid = :poolboy.checkout(data.pool, true, data.timeout)
643✔
785
    Process.link(db_pid)
642✔
786
    db_sock = DbHandler.checkout(db_pid, data.sock, self())
642✔
787
    same_box = if node(db_pid) == node(), do: :local, else: :remote
642✔
788
    Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
642✔
789
    {data.pool, db_pid, db_sock}
642✔
790
  end
791

792
  @spec handle_db_pid(:transaction, pid(), pid() | nil) :: nil
793
  @spec handle_db_pid(:session, pid(), pid()) :: pid()
794
  @spec handle_db_pid(:proxy, pid(), pid()) :: pid()
795
  defp handle_db_pid(:transaction, _pool, nil), do: nil
×
796

797
  defp handle_db_pid(:transaction, pool, {_, db_pid, _}) do
798
    Process.unlink(db_pid)
563✔
799
    :poolboy.checkin(pool, db_pid)
563✔
800
    nil
801
  end
802

803
  defp handle_db_pid(:session, _, db_pid), do: db_pid
×
804
  defp handle_db_pid(:proxy, _, db_pid), do: db_pid
×
805

806
  defp update_user_data(data, info, user, id, db_name, mode) do
807
    proxy_type =
174✔
808
      if info.tenant.require_user,
174✔
809
        do: :password,
810
        else: :auth_query
811

812
    auth = %{
174✔
813
      application_name: data[:app_name] || "Supavisor",
174✔
814
      database: db_name,
815
      host: to_charlist(info.tenant.db_host),
174✔
816
      sni_hostname:
817
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
174✔
818
      port: info.tenant.db_port,
174✔
819
      user: user,
820
      password: info.user.db_password,
174✔
821
      require_user: info.tenant.require_user,
174✔
822
      upstream_ssl: info.tenant.upstream_ssl,
174✔
823
      upstream_tls_ca: info.tenant.upstream_tls_ca,
174✔
824
      upstream_verify: info.tenant.upstream_verify
174✔
825
    }
826

827
    %{
828
      data
829
      | tenant: info.tenant.external_id,
174✔
830
        user: user,
831
        timeout: info.user.pool_checkout_timeout,
174✔
832
        ps: info.tenant.default_parameter_status,
174✔
833
        proxy_type: proxy_type,
834
        id: id,
835
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
174✔
836
        db_name: db_name,
837
        mode: mode,
838
        auth: auth,
839
        tenant_availability_zone: info.tenant.availability_zone
174✔
840
    }
841
  end
842

843
  @spec auth_secrets(map, String.t(), term(), non_neg_integer()) ::
844
          {:ok, Supavisor.secrets()} | {:error, term()}
845
  ## password secrets
846
  def auth_secrets(%{user: user, tenant: %{require_user: true}}, _, _, _) do
847
    secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}
29✔
848

849
    {:ok, {:password, fn -> secrets end}}
66✔
850
  end
851

852
  ## auth_query secrets
853
  def auth_secrets(info, db_user, key, ttl) do
854
    fetch = fn _key ->
146✔
855
      case get_secrets(info, db_user) do
11✔
856
        {:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
11✔
857
        {:error, _} = resp -> {:ignore, resp}
×
858
      end
859
    end
860

861
    case Cachex.fetch(Supavisor.Cache, key, fetch) do
146✔
862
      {:ok, {:cached, value}} -> value
135✔
863
      {:commit, {:cached, value}, _opts} -> value
11✔
864
      {:ignore, resp} -> resp
×
865
    end
866
  end
867

868
  @spec get_secrets(map, String.t()) :: {:ok, {:auth_query, fun()}} | {:error, term()}
869
  def get_secrets(%{user: user, tenant: tenant}, db_user) do
870
    ssl_opts =
11✔
871
      if tenant.upstream_ssl and tenant.upstream_verify == :peer do
11✔
872
        [
873
          verify: :verify_peer,
874
          cacerts: [Helpers.upstream_cert(tenant.upstream_tls_ca)],
×
875
          server_name_indication: String.to_charlist(tenant.db_host),
×
876
          customize_hostname_check: [{:match_fun, fn _, _ -> true end}]
×
877
        ]
878
      else
879
        [
880
          verify: :verify_none
881
        ]
882
      end
883

884
    {:ok, conn} =
11✔
885
      Postgrex.start_link(
886
        hostname: tenant.db_host,
11✔
887
        port: tenant.db_port,
11✔
888
        database: tenant.db_database,
11✔
889
        password: user.db_password,
11✔
890
        username: user.db_user,
11✔
891
        parameters: [application_name: "Supavisor auth_query"],
892
        ssl: tenant.upstream_ssl,
11✔
893
        socket_options: [
894
          Helpers.ip_version(tenant.ip_version, tenant.db_host)
11✔
895
        ],
896
        queue_target: 1_000,
897
        queue_interval: 5_000,
898
        ssl_opts: ssl_opts
899
      )
900

901
    try do
11✔
902
      Logger.debug(
11✔
903
        "ClientHandler: Connected to db #{tenant.db_host} #{tenant.db_port} #{tenant.db_database} #{user.db_user}"
11✔
904
      )
905

906
      resp =
11✔
907
        with {:ok, secret} <- Helpers.get_user_secret(conn, tenant.auth_query, db_user) do
11✔
908
          t = if secret.digest == :md5, do: :auth_query_md5, else: :auth_query
11✔
909
          {:ok, {t, fn -> Map.put(secret, :alias, user.db_user_alias) end}}
662✔
910
        end
911

912
      Logger.info("ClientHandler: Get secrets finished")
11✔
913
      resp
11✔
914
    rescue
915
      exception ->
×
916
        Logger.error("ClientHandler: Couldn't fetch user secrets from #{tenant.db_host}")
×
917
        reraise exception, __STACKTRACE__
×
918
    after
919
      GenServer.stop(conn, :normal, 5_000)
11✔
920
    end
921
  end
922

923
  @spec exchange_first(:password | :auth_query, fun(), binary(), binary(), binary()) ::
924
          {binary(), map()}
925
  defp exchange_first(:password, secret, nonce, user, channel) do
926
    message = Server.exchange_first_message(nonce)
29✔
927
    server_first_parts = Helpers.parse_server_first(message, nonce)
29✔
928

929
    {client_final_message, server_proof} =
29✔
930
      Helpers.get_client_final(
931
        :password,
932
        secret.().password,
29✔
933
        server_first_parts,
934
        nonce,
935
        user,
936
        channel
937
      )
938

939
    sings = %{
29✔
940
      client: List.last(client_final_message),
941
      server: server_proof
942
    }
943

944
    {message, sings}
945
  end
946

947
  defp exchange_first(:auth_query, secret, nonce, user, channel) do
948
    secret = secret.()
145✔
949
    message = Server.exchange_first_message(nonce, secret.salt)
145✔
950
    server_first_parts = Helpers.parse_server_first(message, nonce)
145✔
951

952
    sings =
145✔
953
      Helpers.signatures(
954
        secret.stored_key,
145✔
955
        secret.server_key,
145✔
956
        server_first_parts,
957
        nonce,
958
        user,
959
        channel
960
      )
961

962
    {message, sings}
963
  end
964

965
  @spec try_get_sni(Supavisor.sock()) :: String.t() | nil
966
  def try_get_sni({:ssl, sock}) do
967
    case :ssl.connection_information(sock, [:sni_hostname]) do
×
968
      {:ok, [sni_hostname: sni]} -> List.to_string(sni)
×
969
      _ -> nil
×
970
    end
971
  end
972

973
  def try_get_sni(_), do: nil
×
974

975
  defp db_pid_meta({_, {_, pid, _}} = _key) do
976
    rkey = Supavisor.Registry.PoolPids
3✔
977
    fnode = node(pid)
3✔
978

979
    if fnode == node() do
3✔
980
      Registry.lookup(rkey, pid)
3✔
981
    else
982
      :erpc.call(fnode, Registry, :lookup, [rkey, pid], 15_000)
×
983
    end
984
  end
985

986
  @spec handle_data(kind :: atom(), data :: binary(), state, data) ::
987
          :gen_statem.event_handler_result(data)
988
        when state: atom() | term(),
989
             data: term()
990

991
  # handle Terminate message
992
  defp handle_data(:info, <<?X, 4::32>>, :idle, %{local: true}) do
993
    Logger.info("ClientHandler: Terminate received from proxy client")
×
994
    :keep_state_and_data
995
  end
996

997
  defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
998
    Logger.info("ClientHandler: Terminate received from client")
26✔
999
    {:stop, {:shutdown, :terminate_received}}
1000
  end
1001

1002
  defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
1003
    Logger.debug("ClientHandler: Receive sync")
6✔
1004

1005
    # db_pid can be nil in transaction mode, so we will send ready_for_query
1006
    # without checking out a direct connection. If there is a linked db_pid,
1007
    # we will forward the message to it
1008
    if data.db_pid != nil,
6✔
1009
      do: :ok = sock_send_maybe_active_once(msg, data),
×
1010
      else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
6✔
1011

1012
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
6✔
1013
  end
1014

1015
  defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
1016
    Logger.debug("ClientHandler: Receive sync while not idle")
75✔
1017
    :ok = sock_send_maybe_active_once(msg, data)
75✔
1018
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
75✔
1019
  end
1020

1021
  # handle Flush message
1022
  defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
1023
    Logger.debug("ClientHandler: Receive flush while not idle")
×
1024
    :ok = sock_send_maybe_active_once(msg, data)
×
1025
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
×
1026
  end
1027

1028
  # incoming query with a single pool
1029
  defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
1030
    Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
641✔
1031
    db_pid = db_checkout(:both, :on_query, data)
641✔
1032
    handle_prepared_statements(db_pid, bin, data)
641✔
1033

1034
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
641✔
1035
     {:next_event, :internal, bin}}
1036
  end
1037

1038
  defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
1039
    {:next_state, :busy, %{data | query_start: System.monotonic_time()},
×
1040
     {:next_event, :internal, bin}}
1041
  end
1042

1043
  # incoming query with read/write pools
1044
  defp handle_data(:info, bin, :idle, data) do
1045
    query_type =
×
1046
      with {:ok, payload} <- Client.get_payload(bin),
×
1047
           {:ok, statements} <- Supavisor.PgParser.statements(payload) do
×
1048
        Logger.debug(
×
1049
          "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
1050
        )
1051

1052
        case statements do
×
1053
          # naive check for read only queries
1054
          ["SelectStmt"] -> :read
×
1055
          _ -> :write
×
1056
        end
1057
      else
1058
        other ->
1059
          Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
×
1060
          :write
1061
      end
1062

1063
    ts = System.monotonic_time()
×
1064
    db_pid = db_checkout(query_type, :on_query, data)
×
1065

1066
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
×
1067
     {:next_event, :internal, bin}}
1068
  end
1069

1070
  # forward query to db
1071
  defp handle_data(_, bin, :busy, data) do
1072
    Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")
4,082✔
1073

1074
    case sock_send_maybe_active_once(bin, data) do
4,082✔
1075
      :ok ->
4,082✔
1076
        {:keep_state, %{data | active_count: data.active_count + 1}}
4,082✔
1077

1078
      error ->
1079
        Logger.error("ClientHandler: error while sending query: #{inspect(error)}")
×
1080

1081
        HandlerHelpers.sock_send(
×
1082
          data.sock,
×
1083
          Server.error_message("XX000", "Error while sending query")
1084
        )
1085

1086
        {:stop, {:shutdown, :send_query_error}}
1087
    end
1088
  end
1089

1090
  @spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil
1091
  defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do
1092
    with {:ok, payload} <- Client.get_payload(bin),
565✔
1093
         {:ok, statements} <- Supavisor.PgParser.statements(payload),
560✔
1094
         true <- statements in [["PrepareStmt"], ["DeallocateStmt"]] do
474✔
1095
      Logger.info("ClientHandler: Handle prepared statement #{inspect(payload)}")
1✔
1096

1097
      GenServer.call(data.pool, :get_all_workers)
1✔
1098
      |> Enum.each(fn
1✔
1099
        {_, ^pid, _, [Supavisor.DbHandler]} ->
1100
          Logger.debug("ClientHandler: Linked DbHandler #{inspect(pid)}")
1✔
1101
          nil
1102

1103
        {_, pool_proc, _, [Supavisor.DbHandler]} ->
1104
          Logger.debug(
×
1105
            "ClientHandler: Sending prepared statement change #{inspect(payload)} to #{inspect(pool_proc)}"
1106
          )
1107

1108
          send(pool_proc, {:handle_ps, payload, bin})
×
1109
      end)
1110
    else
1111
      error ->
1112
        Logger.debug("ClientHandler: Skip prepared statement #{inspect(error)}")
564✔
1113
    end
1114
  end
1115

1116
  defp handle_prepared_statements(_, _, _), do: nil
76✔
1117

1118
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
1119
  defp handle_actions(%{} = data) do
1120
    heartbeat =
814✔
1121
      if data.heartbeat_interval > 0,
814✔
1122
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
814✔
1123
        else: []
1124

1125
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
814✔
1126

1127
    idle ++ heartbeat
814✔
1128
  end
1129

1130
  @spec app_name(any()) :: String.t()
1131
  def app_name(name) when is_binary(name), do: name
138✔
1132

1133
  def app_name(name) do
1134
    Logger.debug("ClientHandler: Invalid application name #{inspect(name)}")
38✔
1135
    "Supavisor"
1136
  end
1137

1138
  @spec maybe_change_log(map()) :: atom() | nil
1139
  def maybe_change_log(%{"payload" => %{"options" => options}}) do
1140
    level = options["log_level"] && String.to_existing_atom(options["log_level"])
×
1141

1142
    if level in [:debug, :info, :notice, :warning, :error] do
×
1143
      Helpers.set_log_level(level)
×
1144
      level
×
1145
    end
1146
  end
1147

1148
  def maybe_change_log(_), do: :ok
176✔
1149

1150
  @spec sock_send_maybe_active_once(binary(), map()) :: :ok | {:error, term()}
1151
  def sock_send_maybe_active_once(bin, data) do
1152
    Logger.debug("ClientHandler: Send maybe active once")
4,157✔
1153
    active_count = data.active_count
4,157✔
1154

1155
    if active_count > @switch_active_count do
4,157✔
1156
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
1,739✔
1157
      HandlerHelpers.active_once(data.sock)
1,739✔
1158
    end
1159

1160
    HandlerHelpers.sock_send(elem(data.db_pid, 2), bin)
4,157✔
1161
  end
1162

1163
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
1164
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
1165
    if subscribe_retries < @subscribe_retries do
×
1166
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
1167

1168
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
1169
       {:timeout, @timeout_subscribe, :subscribe}}
1170
    else
1171
      Logger.error("ClientHandler: Terminate after retries")
×
1172
      {:stop, {:shutdown, :subscribe_retries}}
1173
    end
1174
  end
1175

1176
  @spec reset_active_count(map()) :: 0
1177
  def reset_active_count(data) do
1178
    HandlerHelpers.activate(data.sock)
644✔
1179
    0
1180
  end
1181
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc