• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 6af7f2db69bcd25b3c0152c1ae07f7f165c55681-PR-573

24 Jan 2025 12:13PM UTC coverage: 46.518% (-0.8%) from 47.304%
6af7f2db69bcd25b3c0152c1ae07f7f165c55681-PR-573

Pull #573

github

hauleth
chore: ignore tests support files in coverage reports
Pull Request #573: chore: ignore tests support files in coverage reports

962 of 2068 relevant lines covered (46.52%)

206.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.23
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server. It is
4
  implemented as a Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
5
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
6
  supervisor. Each client connection is assigned to a specific tenant supervisor.
7
  """
8

9
  require Logger
10

11
  @behaviour :ranch_protocol
12
  @behaviour :gen_statem
13
  @proto [:tcp, :ssl]
14
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
15
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
16
  @timeout_subscribe 500
17
  @clients_registry Supavisor.Registry.TenantClients
18
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
19

20
  alias Supavisor.{
21
    DbHandler,
22
    HandlerHelpers,
23
    Helpers,
24
    Monitoring.Telem,
25
    Protocol.Client,
26
    Tenants
27
  }
28

29
  require Supavisor.Protocol.Server, as: Server
30

31
  @impl true
32
  def start_link(ref, transport, opts) do
33
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
220✔
34
    {:ok, pid}
35
  end
36

37
  @impl true
38
  def callback_mode, do: [:handle_event_function]
220✔
39

40
  @spec db_status(pid(), :ready_for_query | :read_sql_error, binary()) :: :ok
41
  def db_status(pid, status, bin), do: :gen_statem.cast(pid, {:db_status, status, bin})
13✔
42

43
  @impl true
44
  def init(_), do: :ignore
×
45

46
  def init(ref, trans, opts) do
47
    Process.flag(:trap_exit, true)
220✔
48
    Helpers.set_max_heap_size(90)
220✔
49

50
    {:ok, sock} = :ranch.handshake(ref)
220✔
51

52
    :ok =
220✔
53
      trans.setopts(sock,
54
        # mode: :binary,
55
        # packet: :raw,
56
        # recbuf: 8192,
57
        # sndbuf: 8192,
58
        # # backlog: 2048,
59
        # send_timeout: 120,
60
        # keepalive: true,
61
        # nodelay: true,
62
        # nopush: true,
63
        active: true
64
      )
65

66
    Logger.debug("ClientHandler is: #{inspect(self())}")
220✔
67

68
    data = %{
220✔
69
      id: nil,
70
      sock: {:gen_tcp, sock},
71
      trans: trans,
72
      db_pid: nil,
73
      tenant: nil,
74
      user: nil,
75
      pool: nil,
76
      manager: nil,
77
      query_start: nil,
78
      timeout: nil,
79
      ps: nil,
80
      ssl: false,
81
      auth_secrets: nil,
82
      proxy_type: nil,
83
      mode: opts.mode,
220✔
84
      stats: %{},
85
      idle_timeout: 0,
86
      db_name: nil,
87
      last_query: nil,
88
      heartbeat_interval: 0,
89
      connection_start: System.monotonic_time(),
90
      log_level: nil,
91
      auth: %{},
92
      tenant_availability_zone: nil,
93
      local: opts[:local] || false,
220✔
94
      active_count: 0,
95
      peer_ip: Helpers.peer_ip(sock),
96
      app_name: nil,
97
      subscribe_retries: 0
98
    }
99

100
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :exchange, data)
220✔
101
  end
102

103
  @impl true
104
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :exchange, data) do
105
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
106

107
    HandlerHelpers.sock_send(
1✔
108
      data.sock,
1✔
109
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
110
    )
111

112
    {:stop, {:shutdown, :http_request}}
113
  end
114

115
  # cancel request
116
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _, _) do
117
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
27✔
118
    :ok = HandlerHelpers.send_cancel_query(pid, key)
27✔
119
    {:stop, {:shutdown, :cancel_query}}
120
  end
121

122
  # send cancel request to db
123
  def handle_event(:info, :cancel_query, :busy, data) do
124
    key = {data.tenant, data.db_pid}
5✔
125
    Logger.debug("ClientHandler: Cancel query for #{inspect(key)}")
5✔
126
    {_pool, db_pid, _db_sock} = data.db_pid
5✔
127

128
    case db_pid_meta(key) do
5✔
129
      [{^db_pid, meta}] ->
130
        :ok = HandlerHelpers.cancel_query(meta.host, meta.port, meta.ip_ver, meta.pid, meta.key)
5✔
131

132
      error ->
133
        Logger.error(
×
134
          "ClientHandler: Received cancel but no proc was found #{inspect(key)} #{inspect(error)}"
135
        )
136
    end
137

138
    :keep_state_and_data
139
  end
140

141
  def handle_event(:info, {:tcp, _, <<_::64>>}, :exchange, %{sock: sock} = data) do
142
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
×
143

144
    downstream_cert = Helpers.downstream_cert()
×
145
    downstream_key = Helpers.downstream_key()
×
146

147
    # SSL negotiation, S/N/Error
148
    if !!downstream_cert and !!downstream_key do
×
149
      :ok = HandlerHelpers.setopts(sock, active: false)
×
150
      :ok = HandlerHelpers.sock_send(sock, "S")
×
151

152
      opts = [
×
153
        certfile: downstream_cert,
154
        keyfile: downstream_key
155
      ]
156

157
      case :ssl.handshake(elem(sock, 1), opts) do
×
158
        {:ok, ssl_sock} ->
159
          socket = {:ssl, ssl_sock}
×
160
          :ok = HandlerHelpers.setopts(socket, active: true)
×
161
          {:keep_state, %{data | sock: socket, ssl: true}}
162

163
        error ->
164
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
165
          Telem.client_join(:fail, data.id)
×
166
          {:stop, {:shutdown, :ssl_handshake_error}}
167
      end
168
    else
169
      Logger.error(
×
170
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
171
      )
172

173
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
174
      :keep_state_and_data
175
    end
176
  end
177

178
  def handle_event(:info, {_, _, bin}, :exchange, _) when byte_size(bin) > 1024 do
179
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
180
    {:stop, {:shutdown, :startup_packet_too_large}}
181
  end
182

183
  def handle_event(:info, {_, _, bin}, :exchange, data) do
184
    case Server.decode_startup_packet(bin) do
185✔
185
      {:ok, hello} ->
186
        Logger.debug("ClientHandler: Client startup message: #{inspect(hello)}")
183✔
187
        {type, {user, tenant_or_alias, db_name}} = HandlerHelpers.parse_user_info(hello.payload)
183✔
188

189
        if Helpers.validate_name(user) and Helpers.validate_name(db_name) do
183✔
190
          log_level = maybe_change_log(hello)
182✔
191
          search_path = hello.payload["options"]["--search_path"]
182✔
192
          event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
182✔
193
          app_name = app_name(hello.payload["application_name"])
182✔
194

195
          {:keep_state, %{data | log_level: log_level, app_name: app_name},
182✔
196
           {:next_event, :internal, event}}
197
        else
198
          reason = "Invalid format for user or db_name"
1✔
199
          Logger.error("ClientHandler: #{inspect(reason)} #{inspect({user, db_name})}")
1✔
200
          Telem.client_join(:fail, tenant_or_alias)
1✔
201

202
          HandlerHelpers.send_error(
1✔
203
            data.sock,
1✔
204
            "XX000",
205
            "Authentication error, reason: #{inspect(reason)}"
206
          )
207

208
          {:stop, {:shutdown, :invalid_format}}
209
        end
210

211
      {:error, error} ->
212
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
2✔
213
        Telem.client_join(:fail, data.id)
2✔
214
        {:stop, {:shutdown, :startup_packet_error}}
215
    end
216
  end
217

218
  def handle_event(
219
        :internal,
220
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
221
        :exchange,
222
        %{sock: sock} = data
223
      ) do
224
    sni_hostname = HandlerHelpers.try_get_sni(sock)
182✔
225

226
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
182✔
227
      {:ok, info} ->
228
        db_name = db_name || info.tenant.db_database
182✔
229

230
        id =
182✔
231
          Supavisor.id(
232
            {type, tenant_or_alias},
233
            user,
234
            data.mode,
182✔
235
            info.user.mode_type,
182✔
236
            db_name,
237
            search_path
238
          )
239

240
        mode = Supavisor.mode(id)
182✔
241

242
        Logger.metadata(
182✔
243
          project: tenant_or_alias,
244
          user: user,
245
          mode: mode,
246
          type: type,
247
          db_name: db_name,
248
          app_name: data.app_name,
182✔
249
          peer_ip: data.peer_ip,
182✔
250
          local: data.local
182✔
251
        )
252

253
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
182✔
254

255
        cond do
182✔
256
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
182✔
257
            Logger.error(
×
258
              "ClientHandler: Tenant is not allowed to connect without SSL, user #{user}"
×
259
            )
260

261
            :ok = HandlerHelpers.send_error(sock, "XX000", "SSL connection is required")
×
262
            Telem.client_join(:fail, id)
×
263
            {:stop, {:shutdown, :ssl_required}}
264

265
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
182✔
266
            message = "Address not in tenant allow_list: " <> inspect(addr)
×
267
            Logger.error("ClientHandler: #{message}")
×
268
            :ok = HandlerHelpers.send_error(sock, "XX000", message)
×
269

270
            Telem.client_join(:fail, id)
×
271
            {:stop, {:shutdown, :address_not_allowed}}
272

273
          true ->
182✔
274
            new_data = update_user_data(data, info, user, id, db_name, mode)
182✔
275

276
            key = {:secrets, tenant_or_alias, user}
182✔
277

278
            case auth_secrets(info, user, key, :timer.hours(24)) do
182✔
279
              {:ok, auth_secrets} ->
280
                Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
182✔
281

282
                {:keep_state, new_data, {:next_event, :internal, {:handle, auth_secrets, info}}}
182✔
283

284
              {:error, reason} ->
285
                Logger.error(
×
286
                  "ClientHandler: Authentication auth_secrets error: #{inspect(reason)}"
287
                )
288

289
                :ok =
×
290
                  HandlerHelpers.send_error(
291
                    sock,
292
                    "XX000",
293
                    "Authentication error, reason: #{inspect(reason)}"
294
                  )
295

296
                Telem.client_join(:fail, id)
×
297
                {:stop, {:shutdown, :auth_secrets_error}}
298
            end
299
        end
300

301
      {:error, reason} ->
302
        Logger.error(
×
303
          "ClientHandler: User not found: #{inspect(reason)} #{inspect({type, user, tenant_or_alias})}"
304
        )
305

306
        :ok = HandlerHelpers.send_error(sock, "XX000", "Tenant or user not found")
×
307
        Telem.client_join(:fail, data.id)
×
308
        {:stop, {:shutdown, :user_not_found}}
309
    end
310
  end
311

312
  def handle_event(
313
        :internal,
314
        {:handle, {method, secrets}, info},
315
        _,
316
        %{sock: sock} = data
317
      ) do
318
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
182✔
319

320
    case handle_exchange(sock, {method, secrets}) do
182✔
321
      {:error, reason} ->
322
        Logger.error(
5✔
323
          "ClientHandler: Exchange error: #{inspect(reason)} when method #{inspect(method)}"
324
        )
325

326
        msg =
5✔
327
          if method == :auth_query_md5,
328
            do: Server.error_message("XX000", reason),
×
329
            else: Server.exchange_message(:final, "e=#{reason}")
5✔
330

331
        key = {:secrets_check, data.tenant, data.user}
5✔
332

333
        if method != :password and reason == "Wrong password" and
5✔
334
             Cachex.get(Supavisor.Cache, key) == {:ok, nil} do
1✔
335
          case auth_secrets(info, data.user, key, 15_000) do
1✔
336
            {:ok, {method2, secrets2}} = value ->
337
              if method != method2 or Map.delete(secrets.(), :client_key) != secrets2.() do
1✔
338
                Logger.warning("ClientHandler: Update secrets and terminate pool")
1✔
339

340
                Cachex.update(
1✔
341
                  Supavisor.Cache,
342
                  {:secrets, data.tenant, data.user},
1✔
343
                  {:cached, value}
344
                )
345

346
                Supavisor.stop(data.id)
1✔
347
              else
348
                Logger.debug("ClientHandler: Cache the same #{inspect(key)}")
×
349
              end
350

351
            other ->
352
              Logger.error("ClientHandler: Auth secrets check error: #{inspect(other)}")
×
353
          end
354
        else
355
          Logger.debug("ClientHandler: Cache hit for #{inspect(key)}")
4✔
356
        end
357

358
        HandlerHelpers.sock_send(sock, msg)
5✔
359
        Telem.client_join(:fail, data.id)
5✔
360
        {:stop, {:shutdown, :exchange_error}}
361

362
      {:ok, client_key} ->
363
        secrets =
177✔
364
          if client_key,
365
            do: fn -> Map.put(secrets.(), :client_key, client_key) end,
153✔
366
            else: secrets
24✔
367

368
        Logger.debug("ClientHandler: Exchange success")
177✔
369
        :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
177✔
370
        Telem.client_join(:ok, data.id)
177✔
371

372
        auth = Map.merge(data.auth, %{secrets: secrets, method: method})
177✔
373

374
        conn_type =
177✔
375
          if data.mode == :proxy,
177✔
376
            do: :connect_db,
377
            else: :subscribe
378

379
        {:keep_state, %{data | auth_secrets: {method, secrets}, auth: auth},
177✔
380
         {:next_event, :internal, conn_type}}
381
    end
382
  end
383

384
  def handle_event(:internal, :subscribe, _, data) do
385
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
177✔
386

387
    with {:ok, sup} <-
177✔
388
           Supavisor.start_dist(data.id, data.auth_secrets,
177✔
389
             log_level: data.log_level,
177✔
390
             availability_zone: data.tenant_availability_zone
177✔
391
           ),
392
         true <-
177✔
393
           if(node(sup) != node() and data.mode in [:transaction, :session],
177✔
394
             do: :proxy,
395
             else: true
396
           ),
397
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
177✔
398
      manager_ref = Process.monitor(opts.workers.manager)
176✔
399
      data = Map.merge(data, opts.workers)
176✔
400
      db_pid = db_checkout(:both, :on_connect, data)
176✔
401
      data = %{data | manager: manager_ref, db_pid: db_pid, idle_timeout: opts.idle_timeout}
175✔
402

403
      Registry.register(@clients_registry, data.id, [])
175✔
404

405
      next =
175✔
406
        if opts.ps == [],
175✔
407
          do: {:timeout, 10_000, :wait_ps},
18✔
408
          else: {:next_event, :internal, {:greetings, opts.ps}}
157✔
409

410
      {:keep_state, data, next}
175✔
411
    else
412
      {:error, :max_clients_reached} ->
413
        msg = "Max client connections reached"
1✔
414
        Logger.error("ClientHandler: #{msg}")
1✔
415
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
1✔
416
        Telem.client_join(:fail, data.id)
1✔
417
        {:stop, {:shutdown, :max_clients_reached}}
418

419
      {:error, :max_pools_reached} ->
420
        msg = "Max pools count reached"
×
421
        Logger.error("ClientHandler: #{msg}")
×
422
        :ok = HandlerHelpers.send_error(data.sock, "XX000", msg)
×
423
        Telem.client_join(:fail, data.id)
×
424
        {:stop, {:shutdown, :max_pools_reached}}
425

426
      :proxy ->
427
        case Supavisor.get_pool_ranch(data.id) do
×
428
          {:ok, %{port: port, host: host}} ->
429
            auth =
×
430
              Map.merge(data.auth, %{
×
431
                port: port,
432
                host: to_charlist(host),
433
                ip_version: :inet,
434
                upstream_ssl: false,
435
                upstream_tls_ca: nil,
436
                upstream_verify: nil
437
              })
438

439
            Logger.metadata(proxy: true)
×
440
            Registry.register(@proxy_clients_registry, data.id, [])
×
441

442
            {:keep_state, %{data | auth: auth}, {:next_event, :internal, :connect_db}}
×
443

444
          other ->
445
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
446
            timeout_subscribe_or_terminate(data)
×
447
        end
448

449
      error ->
450
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
×
451
        timeout_subscribe_or_terminate(data)
×
452
    end
453
  end
454

455
  def handle_event(:internal, :connect_db, _, data) do
456
    Logger.debug("ClientHandler: Trying to connect to DB")
×
457

458
    args = %{
×
459
      id: data.id,
×
460
      auth: data.auth,
×
461
      user: data.user,
×
462
      tenant: {:single, data.tenant},
×
463
      replica_type: :write,
464
      mode: :proxy,
465
      proxy: true,
466
      log_level: data.log_level,
×
467
      caller: self(),
468
      client_sock: data.sock
×
469
    }
470

471
    {:ok, db_pid} = DbHandler.start_link(args)
×
472
    db_sock = :gen_statem.call(db_pid, {:checkout, data.sock, self()})
×
473
    {:keep_state, %{data | db_pid: {nil, db_pid, db_sock}, mode: :proxy}}
474
  end
475

476
  def handle_event(:internal, {:greetings, ps}, _, %{sock: sock} = data) do
477
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
175✔
478
    msg = [ps, [header, payload], Server.ready_for_query()]
175✔
479
    :ok = HandlerHelpers.listen_cancel_query(pid, key)
175✔
480
    :ok = HandlerHelpers.sock_send(sock, msg)
175✔
481
    Telem.client_connection_time(data.connection_start, data.id)
175✔
482
    {:next_state, :idle, data, handle_actions(data)}
175✔
483
  end
484

485
  def handle_event(:timeout, :subscribe, _, _),
×
486
    do: {:keep_state_and_data, {:next_event, :internal, :subscribe}}
487

488
  def handle_event(:timeout, :wait_ps, _, data) do
489
    Logger.error(
×
490
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
491
    )
492

493
    ps = Server.encode_parameter_status(data.ps)
×
494
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
495
  end
496

497
  def handle_event(:timeout, :idle_terminate, _, data) do
498
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
499
    {:stop, {:shutdown, :idle_terminate}}
500
  end
501

502
  def handle_event(:timeout, :heartbeat_check, _, data) do
503
    Logger.debug("ClientHandler: Send heartbeat to client")
×
504
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
505
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
506
  end
507

508
  def handle_event(kind, {proto, socket, msg}, state, data)
509
      when proto in @proto and is_binary(msg) do
510
    with {:next_state, next_state, new_data, actions} <- handle_data(kind, msg, state, data) do
4,408✔
511
      new_actions =
165✔
512
        actions
513
        |> List.wrap()
514
        |> Enum.map(fn
515
          {:next_event, type, bin} when is_binary(bin) ->
516
            {:next_event, type, {proto, socket, bin}}
165✔
517

518
          other ->
519
            other
×
520
        end)
521

522
      {:next_state, next_state, new_data, new_actions}
165✔
523
    end
524
  end
525

526
  def handle_event(:info, {:parameter_status, :updated}, _, _) do
527
    Logger.warning("ClientHandler: Parameter status is updated")
×
528
    {:stop, {:shutdown, :parameter_status_updated}}
529
  end
530

531
  def handle_event(:info, {:parameter_status, ps}, :exchange, _),
18✔
532
    do: {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
533

534
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
535
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
536
    :keep_state_and_data
537
  end
538

539
  # client closed connection
540
  def handle_event(_, {closed, _}, _, data)
541
      when closed in [:tcp_closed, :ssl_closed] do
542
    Logger.debug("ClientHandler: #{closed} socket closed for #{inspect(data.tenant)}")
160✔
543
    {:stop, {:shutdown, :socket_closed}}
544
  end
545

546
  # linked DbHandler went down
547
  def handle_event(:info, {:EXIT, db_pid, reason}, _, data) do
548
    Logger.error("ClientHandler: DbHandler #{inspect(db_pid)} exited #{inspect(reason)}")
1✔
549
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", "DbHandler exited"))
1✔
550
    {:stop, {:shutdown, :db_handler_exit}}
551
  end
552

553
  # pool's manager went down
554
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
555
    Logger.error(
6✔
556
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
6✔
557
    )
558

559
    case {state, reason} do
6✔
560
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
6✔
561
      {:idle, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
562
      {:busy, _} -> {:stop, {:shutdown, :manager_down}}
×
563
    end
564
  end
565

566
  def handle_event(:info, {:disconnect, reason}, _, _data) do
567
    Logger.warning("ClientHandler: Disconnected due to #{inspect(reason)}")
×
568
    {:stop, {:shutdown, {:disconnect, reason}}}
569
  end
570

571
  # emulate handle_cast
572
  def handle_event(:cast, {:db_status, status, bin}, :busy, data) do
573
    case status do
13✔
574
      :ready_for_query ->
575
        Logger.debug("ClientHandler: Client is ready")
13✔
576

577
        :ok = HandlerHelpers.sock_send(data.sock, bin)
13✔
578

579
        db_pid = handle_db_pid(data.mode, data.pool, data.db_pid)
13✔
580

581
        {_, stats} =
13✔
582
          if not data.local,
13✔
583
            do: Telem.network_usage(:client, data.sock, data.id, data.stats),
11✔
584
            else: {nil, data.stats}
2✔
585

586
        Telem.client_query_time(data.query_start, data.id)
13✔
587

588
        {:next_state, :idle,
13✔
589
         %{data | db_pid: db_pid, stats: stats, active_count: reset_active_count(data)},
590
         handle_actions(data)}
591

592
      :read_sql_error ->
593
        Logger.error(
×
594
          "ClientHandler: read only sql transaction, rerunning the query to write pool"
595
        )
596

597
        # release the read pool
598
        _ = handle_db_pid(data.mode, data.pool, data.db_pid)
×
599

600
        ts = System.monotonic_time()
×
601
        db_pid = db_checkout(:write, :on_query, data)
×
602

603
        {:keep_state, %{data | db_pid: db_pid, query_start: ts},
×
604
         {:next_event, :internal, {:tcp, nil, data.last_query}}}
×
605
    end
606
  end
607

608
  def handle_event(type, content, _state, _data) do
609
    msg = [
15✔
610
      {"type", type},
611
      {"content", content}
612
    ]
613

614
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
15✔
615

616
    :keep_state_and_data
617
  end
618

619
  @impl true
620
  def terminate(
621
        {:timeout, {_, _, [_, {:checkout, _, _}, _]}},
622
        _,
623
        data
624
      ) do
625
    msg =
1✔
626
      case data.mode do
1✔
627
        :session ->
1✔
628
          "MaxClientsInSessionMode: max clients reached - in Session mode max clients are limited to pool_size"
629

630
        :transaction ->
×
631
          "Unable to check out process from the pool due to timeout"
632
      end
633

634
    Logger.error("ClientHandler: #{msg}")
1✔
635
    HandlerHelpers.sock_send(data.sock, Server.error_message("XX000", msg))
1✔
636
    :ok
637
  end
638

639
  def terminate(reason, _state, %{db_pid: {_, pid, _}}) do
640
    db_info =
154✔
641
      with {:ok, {state, mode} = resp} <- DbHandler.get_state_and_mode(pid) do
6✔
642
        if state == :busy or mode == :session, do: DbHandler.stop(pid)
148✔
643
        resp
142✔
644
      end
645

646
    Logger.debug(
148✔
647
      "ClientHandler: socket closed with reason #{inspect(reason)}, DbHandler #{inspect({pid, db_info})}"
648
    )
649

650
    :ok
651
  end
652

653
  def terminate(reason, _state, _data) do
654
    Logger.debug("ClientHandler: socket closed with reason #{inspect(reason)}")
65✔
655
    :ok
656
  end
657

658
  ## Internal functions
659

660
  @spec handle_exchange(Supavisor.sock(), {atom(), fun()}) ::
661
          {:ok, binary() | nil} | {:error, String.t()}
662
  def handle_exchange({_, socket} = sock, {:auth_query_md5 = method, secrets}) do
663
    salt = :crypto.strong_rand_bytes(4)
×
664
    :ok = HandlerHelpers.sock_send(sock, Server.md5_request(salt))
×
665

666
    with {:ok,
×
667
          %{
668
            tag: :password_message,
669
            payload: {:md5, client_md5}
670
          }, _} <- receive_next(socket, "Timeout while waiting for the md5 exchange"),
×
671
         {:ok, key} <- authenticate_exchange(method, client_md5, secrets.().secret, salt) do
×
672
      {:ok, key}
673
    else
674
      {:error, message} -> {:error, message}
675
      other -> {:error, "Unexpected message #{inspect(other)}"}
676
    end
677
  end
678

679
  def handle_exchange({_, socket} = sock, {method, secrets}) do
680
    :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
182✔
681

682
    with {:ok,
5✔
683
          %{
684
            tag: :password_message,
685
            payload: {:scram_sha_256, %{"n" => user, "r" => nonce, "c" => channel}}
686
          },
687
          _} <-
182✔
688
           receive_next(
689
             socket,
690
             "Timeout while waiting for the first password message"
691
           ),
692
         {:ok, signatures} = reply_first_exchange(sock, method, secrets, channel, nonce, user),
182✔
693
         {:ok,
694
          %{
695
            tag: :password_message,
696
            payload: {:first_msg_response, %{"p" => p}}
697
          },
698
          _} <-
182✔
699
           receive_next(
700
             socket,
701
             "Timeout while waiting for the second password message"
702
           ),
703
         {:ok, key} <- authenticate_exchange(method, secrets, signatures, p) do
179✔
704
      message = "v=#{Base.encode64(signatures.server)}"
177✔
705
      :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:final, message))
177✔
706
      {:ok, key}
707
    else
708
      {:error, message} -> {:error, message}
709
      other -> {:error, "Unexpected message #{inspect(other)}"}
710
    end
711
  end
712

713
  defp receive_next(socket, timeout_message) do
714
    receive do
364✔
715
      {_proto, ^socket, bin} ->
716
        Server.decode_pkt(bin)
361✔
717

718
      other ->
3✔
719
        {:error, "Unexpected message in receive_next/2 #{inspect(other)}"}
720
    after
721
      15_000 -> {:error, timeout_message}
×
722
    end
723
  end
724

725
  defp reply_first_exchange(sock, method, secrets, channel, nonce, user) do
726
    {message, signatures} = exchange_first(method, secrets, nonce, user, channel)
182✔
727
    :ok = HandlerHelpers.sock_send(sock, Server.exchange_message(:first, message))
182✔
728
    {:ok, signatures}
729
  end
730

731
  defp authenticate_exchange(:password, _secrets, signatures, p) do
732
    if p == signatures.client,
25✔
733
      do: {:ok, nil},
734
      else: {:error, "Wrong password"}
735
  end
736

737
  defp authenticate_exchange(:auth_query, secrets, signatures, p) do
738
    client_key = :crypto.exor(Base.decode64!(p), signatures.client)
154✔
739

740
    if Helpers.hash(client_key) == secrets.().stored_key do
154✔
741
      {:ok, client_key}
742
    else
743
      {:error, "Wrong password"}
744
    end
745
  end
746

747
  defp authenticate_exchange(:auth_query_md5, client_hash, server_hash, salt) do
748
    if "md5" <> Helpers.md5([server_hash, salt]) == client_hash,
×
749
      do: {:ok, nil},
750
      else: {:error, "Wrong password"}
751
  end
752

753
  @spec db_checkout(:write | :read | :both, :on_connect | :on_query, map) ::
754
          {pid, pid, Supavisor.sock()} | nil
755
  defp db_checkout(_, _, %{mode: mode, db_pid: {pool, db_pid, db_sock}})
756
       when is_pid(db_pid) and mode in [:session, :proxy] do
757
    {pool, db_pid, db_sock}
152✔
758
  end
759

760
  defp db_checkout(_, :on_connect, %{mode: :transaction}), do: nil
21✔
761

762
  defp db_checkout(query_type, _, data) when query_type in [:write, :read] do
763
    pool =
×
764
      data.pool[query_type]
×
765
      |> Enum.random()
766

767
    {time, db_pid} = :timer.tc(:poolboy, :checkout, [pool, true, data.timeout])
×
768
    Process.link(db_pid)
×
769
    same_box = if node(db_pid) == node(), do: :local, else: :remote
×
770
    Telem.pool_checkout_time(time, data.id, same_box)
×
771
    {pool, db_pid}
772
  end
773

774
  defp db_checkout(_, _, data) do
775
    start = System.monotonic_time(:microsecond)
168✔
776
    db_pid = :poolboy.checkout(data.pool, true, data.timeout)
168✔
777
    Process.link(db_pid)
167✔
778
    db_sock = DbHandler.checkout(db_pid, data.sock, self())
167✔
779
    same_box = if node(db_pid) == node(), do: :local, else: :remote
167✔
780
    Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
167✔
781
    {data.pool, db_pid, db_sock}
167✔
782
  end
783

784
  @spec handle_db_pid(:transaction, pid(), pid() | nil) :: nil
785
  @spec handle_db_pid(:session, pid(), pid()) :: pid()
786
  @spec handle_db_pid(:proxy, pid(), pid()) :: pid()
787
  defp handle_db_pid(:transaction, _pool, nil), do: nil
×
788

789
  defp handle_db_pid(:transaction, pool, {_, db_pid, _}) do
790
    Process.unlink(db_pid)
13✔
791
    :poolboy.checkin(pool, db_pid)
13✔
792
    nil
793
  end
794

795
  defp handle_db_pid(:session, _, db_pid), do: db_pid
×
796
  defp handle_db_pid(:proxy, _, db_pid), do: db_pid
×
797

798
  defp update_user_data(data, info, user, id, db_name, mode) do
799
    proxy_type =
182✔
800
      if info.tenant.require_user,
182✔
801
        do: :password,
802
        else: :auth_query
803

804
    auth = %{
182✔
805
      application_name: data[:app_name] || "Supavisor",
182✔
806
      database: db_name,
807
      host: to_charlist(info.tenant.db_host),
182✔
808
      sni_hostname:
809
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
182✔
810
      port: info.tenant.db_port,
182✔
811
      user: user,
812
      password: info.user.db_password,
182✔
813
      require_user: info.tenant.require_user,
182✔
814
      upstream_ssl: info.tenant.upstream_ssl,
182✔
815
      upstream_tls_ca: info.tenant.upstream_tls_ca,
182✔
816
      upstream_verify: info.tenant.upstream_verify
182✔
817
    }
818

819
    %{
820
      data
821
      | tenant: info.tenant.external_id,
182✔
822
        user: user,
823
        timeout: info.user.pool_checkout_timeout,
182✔
824
        ps: info.tenant.default_parameter_status,
182✔
825
        proxy_type: proxy_type,
826
        id: id,
827
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
182✔
828
        db_name: db_name,
829
        mode: mode,
830
        auth: auth,
831
        tenant_availability_zone: info.tenant.availability_zone
182✔
832
    }
833
  end
834

835
  @spec auth_secrets(map, String.t(), term(), non_neg_integer()) ::
836
          {:ok, Supavisor.secrets()} | {:error, term()}
837
  ## password secrets
838
  def auth_secrets(%{user: user, tenant: %{require_user: true}}, _, _, _) do
839
    secrets = %{db_user: user.db_user, password: user.db_password, alias: user.db_user_alias}
25✔
840

841
    {:ok, {:password, fn -> secrets end}}
62✔
842
  end
843

844
  ## auth_query secrets
845
  def auth_secrets(info, db_user, key, ttl) do
846
    fetch = fn _key ->
158✔
847
      case get_secrets(info, db_user) do
11✔
848
        {:ok, _} = resp -> {:commit, {:cached, resp}, ttl: ttl}
11✔
849
        {:error, _} = resp -> {:ignore, resp}
×
850
      end
851
    end
852

853
    case Cachex.fetch(Supavisor.Cache, key, fetch) do
158✔
854
      {:ok, {:cached, value}} -> value
147✔
855
      {:commit, {:cached, value}, _opts} -> value
11✔
856
      {:ignore, resp} -> resp
×
857
    end
858
  end
859

860
  @spec get_secrets(map, String.t()) :: {:ok, {:auth_query, fun()}} | {:error, term()}
861
  def get_secrets(%{user: user, tenant: tenant}, db_user) do
862
    ssl_opts =
11✔
863
      if tenant.upstream_ssl and tenant.upstream_verify == "peer" do
11✔
864
        [
865
          {:verify, :verify_peer},
866
          {:cacerts, [Helpers.upstream_cert(tenant.upstream_tls_ca)]},
×
867
          {:server_name_indication, String.to_charlist(tenant.db_host)},
×
868
          {:customize_hostname_check, [{:match_fun, fn _, _ -> true end}]}
×
869
        ]
870
      end
871

872
    {:ok, conn} =
11✔
873
      Postgrex.start_link(
874
        hostname: tenant.db_host,
11✔
875
        port: tenant.db_port,
11✔
876
        database: tenant.db_database,
11✔
877
        password: user.db_password,
11✔
878
        username: user.db_user,
11✔
879
        parameters: [application_name: "Supavisor auth_query"],
880
        ssl: tenant.upstream_ssl,
11✔
881
        socket_options: [
882
          Helpers.ip_version(tenant.ip_version, tenant.db_host)
11✔
883
        ],
884
        queue_target: 1_000,
885
        queue_interval: 5_000,
886
        ssl_opts: ssl_opts || []
11✔
887
      )
888

889
    try do
11✔
890
      Logger.debug(
11✔
891
        "ClientHandler: Connected to db #{tenant.db_host} #{tenant.db_port} #{tenant.db_database} #{user.db_user}"
11✔
892
      )
893

894
      resp =
11✔
895
        with {:ok, secret} <- Helpers.get_user_secret(conn, tenant.auth_query, db_user) do
11✔
896
          t = if secret.digest == :md5, do: :auth_query_md5, else: :auth_query
11✔
897
          {:ok, {t, fn -> Map.put(secret, :alias, user.db_user_alias) end}}
947✔
898
        end
899

900
      Logger.info("ClientHandler: Get secrets finished")
11✔
901
      resp
11✔
902
    rescue
903
      exception ->
×
904
        Logger.error("ClientHandler: Couldn't fetch user secrets from #{tenant.db_host}")
×
905
        reraise exception, __STACKTRACE__
×
906
    after
907
      GenServer.stop(conn, :normal, 5_000)
11✔
908
    end
909
  end
910

911
  @spec exchange_first(:password | :auth_query, fun(), binary(), binary(), binary()) ::
912
          {binary(), map()}
913
  defp exchange_first(:password, secret, nonce, user, channel) do
914
    message = Server.exchange_first_message(nonce)
25✔
915
    server_first_parts = Helpers.parse_server_first(message, nonce)
25✔
916

917
    {client_final_message, server_proof} =
25✔
918
      Helpers.get_client_final(
919
        :password,
920
        secret.().password,
25✔
921
        server_first_parts,
922
        nonce,
923
        user,
924
        channel
925
      )
926

927
    sings = %{
25✔
928
      client: List.last(client_final_message),
929
      server: server_proof
930
    }
931

932
    {message, sings}
933
  end
934

935
  defp exchange_first(:auth_query, secret, nonce, user, channel) do
936
    secret = secret.()
157✔
937
    message = Server.exchange_first_message(nonce, secret.salt)
157✔
938
    server_first_parts = Helpers.parse_server_first(message, nonce)
157✔
939

940
    sings =
157✔
941
      Helpers.signatures(
942
        secret.stored_key,
157✔
943
        secret.server_key,
157✔
944
        server_first_parts,
945
        nonce,
946
        user,
947
        channel
948
      )
949

950
    {message, sings}
951
  end
952

953
  @spec try_get_sni(Supavisor.sock()) :: String.t() | nil
954
  def try_get_sni({:ssl, sock}) do
955
    case :ssl.connection_information(sock, [:sni_hostname]) do
×
956
      {:ok, [sni_hostname: sni]} -> List.to_string(sni)
×
957
      _ -> nil
×
958
    end
959
  end
960

961
  def try_get_sni(_), do: nil
×
962

963
  defp db_pid_meta({_, {_, pid, _}} = _key) do
964
    rkey = Supavisor.Registry.PoolPids
5✔
965
    fnode = node(pid)
5✔
966

967
    if fnode == node() do
5✔
968
      Registry.lookup(rkey, pid)
5✔
969
    else
970
      :erpc.call(fnode, Registry, :lookup, [rkey, pid], 15_000)
×
971
    end
972
  end
973

974
  @spec handle_data(kind :: atom(), data :: binary(), state, data) ::
975
          :gen_statem.event_handler_result(data)
976
        when state: atom() | term(),
977
             data: term()
978

979
  # handle Terminate message
980
  defp handle_data(:info, <<?X, 4::32>>, :idle, %{local: true}) do
981
    Logger.info("ClientHandler: Terminate received from proxy client")
×
982
    :keep_state_and_data
983
  end
984

985
  defp handle_data(:info, <<?X, 4::32>>, :idle, _data) do
986
    Logger.info("ClientHandler: Terminate received from client")
15✔
987
    {:stop, {:shutdown, :terminate_received}}
988
  end
989

990
  defp handle_data(:info, <<?S, 4::32>> <> _ = msg, :idle, data) do
991
    Logger.debug("ClientHandler: Receive sync")
2✔
992

993
    # db_pid can be nil in transaction mode, so we will send ready_for_query
994
    # without checking out a direct connection. If there is a linked db_pid,
995
    # we will forward the message to it
996
    if data.db_pid != nil,
2✔
997
      do: :ok = sock_send_maybe_active_once(msg, data),
×
998
      else: :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
2✔
999

1000
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
2✔
1001
  end
1002

1003
  defp handle_data(:info, <<?S, 4::32, _::binary>> = msg, _, data) do
1004
    Logger.debug("ClientHandler: Receive sync while not idle")
76✔
1005
    :ok = sock_send_maybe_active_once(msg, data)
76✔
1006
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
76✔
1007
  end
1008

1009
  # handle Flush message
1010
  defp handle_data(:info, <<?H, 4::32, _::binary>> = msg, _, data) do
1011
    Logger.debug("ClientHandler: Receive flush while not idle")
×
1012
    :ok = sock_send_maybe_active_once(msg, data)
×
1013
    {:keep_state, %{data | active_count: reset_active_count(data)}, handle_actions(data)}
×
1014
  end
1015

1016
  # incoming query with a single pool
1017
  defp handle_data(:info, bin, :idle, %{pool: pid} = data) when is_pid(pid) do
1018
    Logger.debug("ClientHandler: Receive query #{inspect(bin)}")
165✔
1019
    db_pid = db_checkout(:both, :on_query, data)
165✔
1020
    handle_prepared_statements(db_pid, bin, data)
165✔
1021

1022
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: System.monotonic_time()},
165✔
1023
     {:next_event, :internal, bin}}
1024
  end
1025

1026
  defp handle_data(:info, bin, _, %{mode: :proxy} = data) do
1027
    {:next_state, :busy, %{data | query_start: System.monotonic_time()},
×
1028
     {:next_event, :internal, bin}}
1029
  end
1030

1031
  # incoming query with read/write pools
1032
  defp handle_data(:info, bin, :idle, data) do
1033
    query_type =
×
1034
      with {:ok, payload} <- Client.get_payload(bin),
1035
           {:ok, statements} <- Supavisor.PgParser.statements(payload) do
×
1036
        Logger.debug(
×
1037
          "ClientHandler: Receive payload #{inspect(payload, pretty: true)} statements #{inspect(statements)}"
1038
        )
1039

1040
        case statements do
×
1041
          # naive check for read only queries
1042
          ["SelectStmt"] -> :read
×
1043
          _ -> :write
×
1044
        end
1045
      else
1046
        other ->
1047
          Logger.error("ClientHandler: Receive query error: #{inspect(other)}")
×
1048
          :write
1049
      end
1050

1051
    ts = System.monotonic_time()
×
1052
    db_pid = db_checkout(query_type, :on_query, data)
×
1053

1054
    {:next_state, :busy, %{data | db_pid: db_pid, query_start: ts, last_query: bin},
×
1055
     {:next_event, :internal, bin}}
1056
  end
1057

1058
  # forward query to db
1059
  defp handle_data(_, bin, :busy, data) do
1060
    Logger.debug("ClientHandler: Forward query to db #{inspect(bin)} #{inspect(data.db_pid)}")
4,150✔
1061

1062
    case sock_send_maybe_active_once(bin, data) do
4,150✔
1063
      :ok ->
4,150✔
1064
        {:keep_state, %{data | active_count: data.active_count + 1}}
4,150✔
1065

1066
      error ->
1067
        Logger.error("ClientHandler: error while sending query: #{inspect(error)}")
×
1068

1069
        HandlerHelpers.sock_send(
×
1070
          data.sock,
×
1071
          Server.error_message("XX000", "Error while sending query")
1072
        )
1073

1074
        {:stop, {:shutdown, :send_query_error}}
1075
    end
1076
  end
1077

1078
  @spec handle_prepared_statements({pid, pid, Supavisor.sock()}, binary, map) :: :ok | nil
1079
  defp handle_prepared_statements({_, pid, _}, bin, %{mode: :transaction} = data) do
1080
    with {:ok, payload} <- Client.get_payload(bin),
13✔
1081
         {:ok, statements} <- Supavisor.PgParser.statements(payload),
13✔
1082
         true <- statements in [["PrepareStmt"], ["DeallocateStmt"]] do
13✔
1083
      Logger.info("ClientHandler: Handle prepared statement #{inspect(payload)}")
1✔
1084

1085
      GenServer.call(data.pool, :get_all_workers)
1✔
1086
      |> Enum.each(fn
1✔
1087
        {_, ^pid, _, [Supavisor.DbHandler]} ->
1088
          Logger.debug("ClientHandler: Linked DbHandler #{inspect(pid)}")
1✔
1089
          nil
1090

1091
        {_, pool_proc, _, [Supavisor.DbHandler]} ->
1092
          Logger.debug(
×
1093
            "ClientHandler: Sending prepared statement change #{inspect(payload)} to #{inspect(pool_proc)}"
1094
          )
1095

1096
          send(pool_proc, {:handle_ps, payload, bin})
×
1097
      end)
1098
    else
1099
      error ->
1100
        Logger.debug("ClientHandler: Skip prepared statement #{inspect(error)}")
12✔
1101
    end
1102
  end
1103

1104
  defp handle_prepared_statements(_, _, _), do: nil
152✔
1105

1106
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
1107
  defp handle_actions(%{} = data) do
1108
    heartbeat =
266✔
1109
      if data.heartbeat_interval > 0,
266✔
1110
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
266✔
1111
        else: []
1112

1113
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
266✔
1114

1115
    idle ++ heartbeat
266✔
1116
  end
1117

1118
  @spec app_name(any()) :: String.t()
1119
  def app_name(name) when is_binary(name), do: name
147✔
1120

1121
  def app_name(name) do
1122
    Logger.debug("ClientHandler: Invalid application name #{inspect(name)}")
35✔
1123
    "Supavisor"
1124
  end
1125

1126
  @spec maybe_change_log(map()) :: atom() | nil
1127
  def maybe_change_log(%{"payload" => %{"options" => options}}) do
1128
    level = options["log_level"] && String.to_existing_atom(options["log_level"])
×
1129

1130
    if level in [:debug, :info, :notice, :warning, :error] do
×
1131
      Helpers.set_log_level(level)
×
1132
      level
×
1133
    end
1134
  end
1135

1136
  def maybe_change_log(_), do: :ok
182✔
1137

1138
  @spec sock_send_maybe_active_once(binary(), map()) :: :ok | {:error, term()}
1139
  def sock_send_maybe_active_once(bin, data) do
1140
    Logger.debug("ClientHandler: Send maybe active once")
4,226✔
1141
    active_count = data.active_count
4,226✔
1142

1143
    if active_count > @switch_active_count do
4,226✔
1144
      Logger.debug("ClientHandler: Activate socket #{inspect(active_count)}")
2,070✔
1145
      HandlerHelpers.active_once(data.sock)
2,070✔
1146
    end
1147

1148
    HandlerHelpers.sock_send(elem(data.db_pid, 2), bin)
4,226✔
1149
  end
1150

1151
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
1152
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
1153
    if subscribe_retries < @subscribe_retries do
×
1154
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
1155

1156
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
1157
       {:timeout, @timeout_subscribe, :subscribe}}
1158
    else
1159
      Logger.error("ClientHandler: Terminate after retries")
×
1160
      {:stop, {:shutdown, :subscribe_retries}}
1161
    end
1162
  end
1163

1164
  @spec reset_active_count(map()) :: 0
1165
  def reset_active_count(data) do
1166
    HandlerHelpers.activate(data.sock)
91✔
1167
    0
1168
  end
1169
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc