• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 24895189695

24 Apr 2026 02:36PM UTC coverage: 79.313% (-0.03%) from 79.342%
24895189695

push

github

web-flow
fix: retry on checkout when db_handler exits during checkout (#941)

Addresses a rare race condition.

8 of 13 new or added lines in 3 files covered. (61.54%)

1 existing line in 1 file now uncovered.

2515 of 3171 relevant lines covered (79.31%)

58036.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.71
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  require Record
13
  require Supavisor
14

15
  @behaviour :ranch_protocol
16
  @behaviour :gen_statem
17
  @proto [:tcp, :ssl]
18
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
19
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
20
  @max_checkout_retries 2
21
  @timeout_subscribe 500
22
  @clients_registry Supavisor.Registry.TenantClients
23
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
24
  @max_startup_packet_size Supavisor.Protocol.max_startup_packet_size()
25

26
  alias Supavisor.{
27
    DbHandler,
28
    HandlerHelpers,
29
    Helpers,
30
    Manager,
31
    Monitoring.Telem,
32
    Protocol.Debug,
33
    Tenants
34
  }
35

36
  alias Supavisor.ClientHandler.{
37
    AuthMethods,
38
    Cancel,
39
    Checks,
40
    Data,
41
    Error,
42
    ProtocolHelpers,
43
    Proxy
44
  }
45

46
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
47

48
  alias Supavisor.Errors.{
49
    CheckoutTimeoutError,
50
    ClientSocketClosedError,
51
    DbHandlerExitedError,
52
    PoolCheckoutError,
53
    PoolConfigNotFoundError,
54
    PoolRanchNotFoundError,
55
    SslHandshakeError,
56
    StartupPacketTooLargeError,
57
    SubscribeRetriesExhaustedError,
58
    WorkerNotFoundError
59
  }
60

61
  require Supavisor.Protocol.Server, as: Server
62
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
63

64
  @impl true
65
  def start_link(ref, transport, opts) do
66
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
833✔
67
    {:ok, pid}
68
  end
69

70
  @impl true
71
  def callback_mode, do: [:handle_event_function, :state_enter]
833✔
72

73
  @spec db_status(pid(), :ready_for_query) :: :ok
74
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
6,384✔
75

76
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
77
  def send_error_and_terminate(pid, error_message),
78
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
6✔
79

80
  @spec graceful_shutdown(pid()) :: :ok
81
  def graceful_shutdown(pid), do: :gen_statem.cast(pid, :graceful_shutdown)
20✔
82

83
  @impl true
84
  def init(_), do: :ignore
×
85

86
  def init(ref, trans, opts) do
87
    Process.flag(:trap_exit, true)
833✔
88
    Helpers.set_max_heap_size(90)
833✔
89

90
    {:ok, sock} = :ranch.handshake(ref)
833✔
91
    sock_ref = Port.monitor(sock)
833✔
92
    peer_ip = Helpers.peer_ip(sock)
833✔
93
    local = opts[:local] || false
833✔
94

95
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
833✔
96
    :ok = trans.setopts(sock, active: @switch_active_count)
833✔
97
    Logger.debug("ClientHandler is: #{inspect(self())}")
833✔
98

99
    now = System.monotonic_time()
833✔
100

101
    data = %Data{
833✔
102
      sock: {:gen_tcp, sock},
103
      trans: trans,
104
      sock_ref: sock_ref,
105
      peer_ip: peer_ip,
106
      local: local,
107
      ssl: false,
108
      connection_params: nil,
109
      mode: opts.mode,
833✔
110
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
111
      stats: %{},
112
      idle_timeout: 0,
113
      heartbeat_interval: 0,
114
      connection_start: now,
115
      state_entered_at: now,
116
      subscribe_retries: 0
117
    }
118

119
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
833✔
120
  end
121

122
  @impl true
123
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
124
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
125

126
    HandlerHelpers.sock_send(
1✔
127
      data.sock,
1✔
128
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
129
    )
130

131
    {:stop, :normal}
132
  end
133

134
  # cancel request
135
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
136
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
150✔
137
    :ok = Cancel.send_cancel_query(pid, key)
150✔
138
    {:stop, :normal}
139
  end
140

141
  # send cancel request to db
142
  def handle_event(:info, :cancel_query, state, data) do
143
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
136✔
144
    :keep_state_and_data
145
  end
146

147
  def handle_event(
148
        :info,
149
        {:tcp, _, Server.ssl_request_message()},
150
        :handshake,
151
        %{sock: sock} = data
152
      ) do
153
    certs_keys = Helpers.downstream_certs_keys()
9✔
154

155
    # SSL negotiation, S/N/Error
156
    if certs_keys != [] do
9✔
157
      :ok = HandlerHelpers.setopts(sock, active: false)
9✔
158
      :ok = HandlerHelpers.sock_send(sock, "S")
9✔
159

160
      opts = [
9✔
161
        verify: :verify_none,
162
        certs_keys: certs_keys,
163
        sni_fun: fn _hostname -> :undefined end,
4✔
164
        receiver_spawn_opts: [min_heap_size: 2048]
165
      ]
166

167
      case :ssl.handshake(elem(sock, 1), opts) do
9✔
168
        {:ok, ssl_sock} ->
169
          socket = {:ssl, ssl_sock}
9✔
170
          :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
9✔
171
          {:keep_state, %{data | sock: socket, ssl: true}}
172

173
        error ->
174
          Error.terminate_with_error(data, %SslHandshakeError{reason: error}, :handshake)
×
175
      end
176
    else
177
      Logger.warning(
×
178
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
179
      )
180

181
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
182
      :keep_state_and_data
183
    end
184
  end
185

186
  def handle_event(:info, {_, _, bin}, :handshake, data)
187
      when byte_size(bin) > @max_startup_packet_size do
188
    Error.terminate_with_error(
1✔
189
      data,
190
      %StartupPacketTooLargeError{packet_size: byte_size(bin)},
191
      :handshake
192
    )
193
  end
194

195
  def handle_event(:info, {_, _, bin}, :handshake, data) do
196
    case ProtocolHelpers.parse_startup_packet(bin) do
669✔
197
      {:ok, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}, app_name,
198
       log_level} ->
199
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}}
661✔
200
        if log_level, do: Logger.put_process_level(self(), log_level)
661✔
201

202
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
661✔
203

204
      {:error, exception} ->
205
        Error.terminate_with_error(data, exception, :handshake)
8✔
206
    end
207
  end
208

209
  def handle_event(
210
        :internal,
211
        {:hello, {type, {user, tenant_or_alias, db_name, search_path, client_jit, client_tls}}},
212
        :handshake,
213
        %{sock: sock} = data
214
      ) do
215
    sni_hostname = HandlerHelpers.try_get_sni(sock)
660✔
216

217
    Logger.metadata(
660✔
218
      project: tenant_or_alias,
219
      user: user,
220
      mode: data.mode,
660✔
221
      type: type,
222
      app_name: data.app_name,
660✔
223
      db_name: db_name
224
    )
225

226
    # When receiving a proxied connection on a local listener, client_tls
227
    # carries the original client's TLS status. Otherwise, use data.ssl.
228
    effective_ssl = if(data.local && client_tls, do: client_tls, else: data.ssl)
660✔
229

230
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
660✔
231
      {:ok, info} ->
232
        upstream_tls = upstream_tls(info.tenant, effective_ssl)
659✔
233

234
        resolved_tenant = tenant_or_alias || info.tenant.external_id
659✔
235

236
        id =
659✔
237
          Supavisor.id(
659✔
238
            type: type,
239
            tenant: resolved_tenant,
240
            user: user,
241
            mode: data.mode,
659✔
242
            db: db_name,
243
            search_path: search_path,
244
            upstream_tls: upstream_tls
245
          )
246

247
        with :ok <- Checks.check_tenant_not_banned(info),
659✔
248
             :ok <- Checks.check_ssl_enforcement(data, info, user),
652✔
249
             :ok <- Checks.check_address_allowed(sock, info),
652✔
250
             :ok <- Manager.check_client_limit(id, info, data.mode),
650✔
251
             {:ok, auth_method} <-
649✔
252
               AuthMethods.fetch_authentication_method(
253
                 info.tenant,
649✔
254
                 client_jit,
255
                 effective_ssl,
256
                 user
257
               ) do
258
          Logger.debug("ClientHandler: Authentication method: #{inspect(auth_method)}")
649✔
259
          new_data = set_tenant_info(data, info, user, id, db_name, client_jit)
649✔
260

261
          {:keep_state, new_data,
649✔
262
           {:next_event, :internal, {:start_authentication, auth_method, info}}}
263
        else
264
          {:error, exception} when is_exception(exception) ->
265
            Error.terminate_with_error(%{data | id: id}, exception, :handshake)
8✔
266
        end
267

268
      {:error, exception} ->
269
        Error.terminate_with_error(data, exception, :handshake)
1✔
270
    end
271
  end
272

273
  def handle_event(
274
        :internal,
275
        {:start_authentication, auth_method, info},
276
        _state,
277
        %{sock: sock} = data
278
      ) do
279
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(auth_method)}")
649✔
280

281
    case Supavisor.CircuitBreaker.check({data.tenant, data.peer_ip}, :auth_error) do
649✔
282
      :ok ->
283
        case auth_method do
648✔
284
          :jit ->
285
            :ok = HandlerHelpers.sock_send(sock, Server.password_request())
×
286
            auth_context = AuthMethods.Jit.new_context(info, data.id, data.peer_ip)
×
287

288
            {:next_state, :auth_password_wait, %{data | auth_context: auth_context},
×
289
             {:timeout, 15_000, :auth_timeout}}
290

291
          :password ->
292
            :ok = HandlerHelpers.sock_send(sock, Server.password_request())
3✔
293
            auth_context = AuthMethods.Password.new_context(info, data.id)
3✔
294

295
            {:next_state, :auth_password_wait, %{data | auth_context: auth_context},
3✔
296
             {:timeout, 15_000, :auth_timeout}}
297

298
          :scram_sha_256 ->
299
            auth_context = AuthMethods.SCRAM.new_context(info, data.id)
645✔
300
            :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
645✔
301

302
            {:next_state, :auth_scram_first_wait, %{data | auth_context: auth_context},
644✔
303
             {:timeout, 15_000, :auth_timeout}}
304
        end
305

306
      {:error, exception} ->
307
        Error.terminate_with_error(data, exception, :handshake)
1✔
308
    end
309
  end
310

311
  def handle_event(:internal, :subscribe, _state, data) do
312
    Logger.debug("ClientHandler: Subscribe to tenant #{Supavisor.inspect_id(data.id)}")
624✔
313

314
    # CB check disabled until we ensure there are no false positives
315
    # :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
316
    with {:ok, sup} <-
624✔
317
           Supavisor.start_dist(data.id, data.connection_params.secrets,
624✔
318
             availability_zone: data.tenant_availability_zone,
624✔
319
             log_level: nil
320
           ),
321
         :not_proxy <-
623✔
322
           if(node(sup) != node() and data.mode != :proxy, do: :proxy, else: :not_proxy),
623✔
323
         {:ok, opts} <- Supavisor.subscribe(data.id),
618✔
324
         manager_ref = Process.monitor(opts.workers.manager),
616✔
325
         data = Map.merge(data, opts.workers),
616✔
326
         {:ok, db_connection} <- maybe_checkout(:on_connect, data) do
616✔
327
      data = %{
616✔
328
        data
329
        | manager: manager_ref,
330
          db_connection: db_connection,
331
          idle_timeout: opts.idle_timeout
616✔
332
      }
333

334
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
616✔
335

336
      cond do
616✔
337
        data.client_ready ->
616✔
338
          {:next_state, :idle, data, handle_actions(data)}
2✔
339

340
        opts.ps == [] ->
614✔
341
          {:keep_state, data, {:timeout, 1_000, :wait_ps}}
79✔
342

343
        true ->
535✔
344
          {:keep_state, data, {:next_event, :internal, {:greetings, opts.ps}}}
535✔
345
      end
346
    else
347
      {:error, %WorkerNotFoundError{}} ->
348
        timeout_subscribe_or_terminate(data)
×
349

350
      {:error, %PoolConfigNotFoundError{}} ->
351
        timeout_subscribe_or_terminate(data)
×
352

353
      {:error, exception} when is_exception(exception) ->
354
        Error.terminate_with_error(data, exception, :handshake)
3✔
355

356
      :proxy ->
357
        case Supavisor.get_pool_ranch(data.id) do
5✔
358
          {:ok, pool_ranch} ->
359
            Logger.metadata(proxy: true)
5✔
360
            Registry.register(@proxy_clients_registry, data.id, [])
5✔
361

362
            {:keep_state, %{data | pool_ranch: pool_ranch}, {:next_event, :internal, :connect_db}}
5✔
363

364
          {:error, %PoolRanchNotFoundError{}} ->
365
            timeout_subscribe_or_terminate(data)
×
366
        end
367
    end
368
  end
369

370
  def handle_event(:internal, :connect_db, _state, data) do
371
    Logger.debug("ClientHandler: Trying to connect to DB")
5✔
372

373
    with {:ok, db_pid} <-
5✔
374
           Proxy.start_proxy_connection(
375
             data.id,
5✔
376
             data.max_clients,
5✔
377
             data.connection_params,
5✔
378
             data.tenant_feature_flags,
5✔
379
             data.pool_ranch,
5✔
380
             client_ssl: data.ssl,
5✔
381
             client_jit: data.use_jit_flow
5✔
382
           ),
383
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
2✔
384
      {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
385
    else
386
      {:error, exception} ->
387
        Error.terminate_with_error(data, exception, :authenticated)
5✔
388
    end
389
  end
390

391
  def handle_event(:internal, {:greetings, ps}, _state, %{sock: sock} = data) do
392
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
612✔
393
    msg = [ps, [header, payload], Server.ready_for_query()]
612✔
394
    :ok = Cancel.listen_cancel_query(pid, key)
612✔
395
    :ok = HandlerHelpers.sock_send(sock, msg)
612✔
396
    Telem.client_connection_time(data.connection_start, data.id)
612✔
397
    {:next_state, :idle, %{data | client_ready: true}, handle_actions(data)}
612✔
398
  end
399

400
  def handle_event(:timeout, :subscribe, _state, _) do
×
401
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
402
  end
403

404
  def handle_event(:timeout, :wait_ps, _state, data) do
405
    Logger.warning(
×
406
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
407
    )
408

409
    ps = Server.encode_parameter_status(data.ps)
×
410
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
411
  end
412

413
  def handle_event(:timeout, :idle_terminate, _state, data) do
414
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
415
    {:stop, :normal}
416
  end
417

418
  def handle_event(:timeout, :heartbeat_check, _state, data) do
419
    Logger.debug("ClientHandler: Send heartbeat to client")
×
420
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
421
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
422
  end
423

424
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
77✔
425
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
426
  end
427

428
  # TLS alert handling: WARNING alerts keep the connection alive, FATAL alerts terminate it.
429
  # For FATAL alerts, Erlang doesn't send ssl_closed, so we must terminate here.
430
  # The alert level is only available by parsing the message string ("Warning - " or "Fatal - ").
431
  def handle_event(:info, {:ssl_error, sock, {:tls_alert, {_reason, msg}}}, _, %{sock: {_, sock}}) do
432
    msg_string = to_string(msg)
3✔
433

434
    if String.contains?(msg_string, "Fatal -") do
3✔
435
      Logger.warning(
2✔
436
        "ClientHandler: Received fatal TLS alert: #{msg_string}, terminating connection"
2✔
437
      )
438

439
      {:stop, :normal}
440
    else
441
      Logger.warning(
1✔
442
        "ClientHandler: Received TLS warning alert: #{msg_string}, keeping connection alive"
1✔
443
      )
444

445
      :keep_state_and_data
446
    end
447
  end
448

449
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
450
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
451
    :keep_state_and_data
452
  end
453

454
  # client closed connection
455
  def handle_event(_, {closed, _}, state, data)
456
      when closed in [:tcp_closed, :ssl_closed] do
457
    handle_socket_close(state, data)
320✔
458
  end
459

460
  # linked DbHandler went down
461
  def handle_event(:info, {:EXIT, db_pid, reason}, state, data) do
462
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
×
463

464
    Error.terminate_with_error(
×
465
      data,
466
      %DbHandlerExitedError{pid: db_pid, reason: reason},
467
      context
468
    )
469
  end
470

471
  # pool's manager went down
472
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
473
    Logger.error(
3✔
474
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
3✔
475
    )
476

477
    case {state, reason} do
3✔
478
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
479
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
2✔
480
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
481
      {:busy, _} -> {:keep_state_and_data, :postpone}
1✔
482
    end
483
  end
484

485
  # socket went down
486
  def handle_event(
487
        :info,
488
        {:DOWN, ref, _, _, _reason},
489
        state,
490
        %{sock_ref: ref} = data
491
      ) do
492
    handle_socket_close(state, data)
2✔
493
  end
494

495
  # emulate handle_cast
496
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
497
    Logger.debug("ClientHandler: Client is ready")
6,379✔
498

499
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
6,379✔
500

501
    {_, stats} =
6,379✔
502
      if data.local,
6,379✔
503
        do: {nil, data.stats},
5✔
504
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
6,374✔
505

506
    Telem.client_query_time(data.query_start, data.id, data.mode == :proxy)
6,379✔
507

508
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
6,379✔
509
     handle_actions(data)}
510
  end
511

512
  def handle_event(:cast, {:db_status, :ready_for_query}, :idle, _) do
5✔
513
    :keep_state_and_data
514
  end
515

516
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
517
    HandlerHelpers.sock_send(data.sock, error_message)
6✔
518
    {:stop, :normal}
519
  end
520

521
  def handle_event(:cast, :graceful_shutdown, :busy, _data) do
3✔
522
    {:keep_state_and_data, :postpone}
523
  end
524

525
  def handle_event(:cast, :graceful_shutdown, _state, data) do
526
    HandlerHelpers.sock_send(data.sock, Server.encode_error_message(Server.admin_shutdown()))
8✔
527

528
    {:stop, :normal}
529
  end
530

531
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
532
      when sock_error in [:tcp_error, :ssl_error] do
533
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}, state was #{state}")
×
534

535
    {:stop, :normal}
536
  end
537

538
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
539
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
540
    :keep_state_and_data
541
  end
542

543
  # Authentication state handlers
544

545
  def handle_event(:info, {proto, _socket, bin}, :auth_password_wait, data)
546
      when proto in @proto do
547
    result =
×
548
      case data.auth_context do
×
549
        %AuthMethods.Jit.Context{} ->
550
          AuthMethods.Jit.handle_password(data.auth_context, bin)
×
551

552
        %AuthMethods.Password.Context{} ->
553
          AuthMethods.Password.handle_password(data.auth_context, bin)
×
554
      end
555

556
    case result do
×
557
      {:ok, secrets} ->
558
        handle_auth_success(data.sock, secrets, data)
×
559

560
      {:error, exception} ->
561
        handle_auth_failure(exception, data)
×
562
    end
563
  end
564

565
  # SCRAM authentication - waiting for first message
566
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
567
      when proto in @proto do
568
    case AuthMethods.SCRAM.handle_scram_first(data.auth_context, bin) do
638✔
569
      {:ok, message, auth_context} ->
570
        :ok = HandlerHelpers.sock_send(data.sock, Server.exchange_message(:first, message))
637✔
571
        new_data = %{data | auth_context: auth_context}
635✔
572
        {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
635✔
573

574
      {:error, exception} ->
575
        handle_auth_failure(exception, data)
1✔
576
    end
577
  end
578

579
  # SCRAM authentication - waiting for final response
580
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
581
      when proto in @proto do
582
    case AuthMethods.SCRAM.handle_scram_final(data.auth_context, bin) do
635✔
583
      {:ok, message, final_secrets} ->
584
        :ok = HandlerHelpers.sock_send(data.sock, message)
622✔
585
        handle_auth_success(data.sock, final_secrets, data)
622✔
586

587
      {:error, exception} ->
588
        handle_auth_failure(exception, data)
13✔
589
    end
590
  end
591

592
  # Authentication timeout handler
593
  def handle_event(:timeout, :auth_timeout, auth_state, data)
594
      when auth_state in [
595
             :auth_scram_first_wait,
596
             :auth_scram_final_wait,
597
             :auth_password_wait
598
           ] do
599
    exception = %Supavisor.Errors.AuthTimeoutError{context: auth_state}
×
600
    handle_auth_failure(exception, data)
×
601
  end
602

603
  def handle_event(:enter, old_state, new_state, data) do
604
    Logger.metadata(state: new_state)
16,136✔
605

606
    case {old_state, new_state} do
16,136✔
607
      # This is emitted on initialization
608
      {:handshake, :handshake} ->
609
        {:next_state, new_state, data}
833✔
610

611
      # We are not interested in idle->busy->idle transitions
612
      {:idle, :busy} ->
613
        {:next_state, new_state, data}
6,404✔
614

615
      {:busy, :idle} ->
616
        {:next_state, new_state, data}
6,379✔
617

618
      _ ->
619
        now = System.monotonic_time()
2,520✔
620
        time_in_previous_state = now - data.state_entered_at
2,520✔
621

622
        :telemetry.execute(
2,520✔
623
          [:supavisor, :client_handler, :state],
624
          %{duration: time_in_previous_state},
625
          %{
626
            from_state: old_state,
627
            to_state: new_state,
628
            tenant: data.tenant
2,520✔
629
          }
630
        )
631

632
        {:next_state, new_state, %{data | state_entered_at: now}}
2,520✔
633
    end
634
  end
635

636
  # Terminate request
637
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, state, data) when proto in @proto do
638
    Logger.info("ClientHandler: Terminate received from client")
293✔
639
    maybe_cleanup_db_handler(state, data)
293✔
640
    {:stop, :normal}
641
  end
642

643
  # Sync when idle and no db_connection - return sync directly
644
  def handle_event(
645
        _kind,
646
        {proto, _, <<?S, 4::32, _::binary>>},
647
        :idle,
648
        %{db_connection: nil} = data
649
      )
650
      when proto in @proto do
651
    Logger.debug("ClientHandler: Receive sync")
8✔
652
    :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
8✔
653

654
    {:keep_state, data, handle_actions(data)}
8✔
655
  end
656

657
  # Sync when busy - send to db
658
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, :busy, data)
659
      when proto in @proto do
660
    Logger.debug("ClientHandler: Receive sync")
225✔
661
    :ok = sock_send(msg, data)
225✔
662

663
    {:keep_state, data, handle_actions(data)}
225✔
664
  end
665

666
  # Any message when idle - checkout and send to db
667
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
668
    case maybe_checkout(:on_query, data) do
6,405✔
669
      {:ok, db_connection} ->
670
        {:next_state, :busy,
6,404✔
671
         %{data | db_connection: db_connection, query_start: System.monotonic_time()},
672
         [{:next_event, :internal, {proto, socket, msg}}]}
673

674
      {:error, exception} ->
675
        Error.terminate_with_error(data, exception, :authenticated)
1✔
676
    end
677
  end
678

679
  # Any message when busy: send to db
680
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
681
    case handle_data(msg, data) do
15,156✔
682
      {:ok, updated_data} ->
15,150✔
683
        {:keep_state, updated_data}
684

685
      {:error, exception} ->
686
        Error.terminate_with_error(data, exception, :authenticated)
6✔
687
    end
688
  end
689

690
  # Any message when connecting - postpone
691
  def handle_event(_kind, {proto, _socket, _msg}, :connecting, _data) when proto in @proto do
×
692
    {:keep_state_and_data, :postpone}
693
  end
694

695
  def handle_event(type, content, state, _data) do
696
    msg = [
1✔
697
      {"type", type},
698
      {"content", content},
699
      {"state", state}
700
    ]
701

702
    Logger.warning("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
1✔
703

704
    :keep_state_and_data
705
  end
706

707
  @impl true
708
  def terminate(reason, state, _data) do
709
    Logger.metadata(state: state)
833✔
710

711
    level =
833✔
712
      case reason do
713
        :normal -> :debug
828✔
714
        _ -> :error
5✔
715
      end
716

717
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
833✔
718
  end
719

720
  defp maybe_cleanup_db_handler(state, data) do
721
    if state == :idle and data.mode == :session and data.db_connection != nil and
615✔
722
         !Supavisor.Helpers.no_warm_pool_user?(data.user) do
230✔
723
      Logger.debug("ClientHandler: Performing session cleanup before termination")
229✔
724
      {pool, db_pid, _} = data.db_connection
229✔
725

726
      # We unsubscribe to free up space for new clients during the cleanup time.
727
      Supavisor.Manager.unsubscribe(data.id)
229✔
728

729
      case DbHandler.attempt_cleanup(db_pid) do
229✔
730
        :ok ->
731
          Process.unlink(db_pid)
157✔
732
          :poolboy.checkin(pool, db_pid)
157✔
733

734
        # In case of error, both processes will be terminated
735
        _error ->
72✔
736
          :ok
737
      end
738
    end
739
  end
740

741
  @impl true
742
  def format_status(status) do
743
    Map.put(status, :queue, [])
5✔
744
  end
745

746
  ## Internal functions
747
  defp handle_auth_success(sock, final_secrets, data) do
748
    Logger.info("ClientHandler: Connection authenticated")
622✔
749
    cache_validated_password(data, final_secrets)
622✔
750

751
    if data.mode != :proxy do
622✔
752
      Supavisor.UpstreamAuthentication.put_upstream_auth_secrets(data.id, final_secrets)
622✔
753
    end
754

755
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
622✔
756
    Telem.client_join(:ok, data.id)
622✔
757

758
    connection_params = %{data.connection_params | secrets: final_secrets}
622✔
759

760
    conn_type =
622✔
761
      if data.mode == :proxy,
622✔
762
        do: :connect_db,
763
        else: :subscribe
764

765
    {
622✔
766
      :next_state,
767
      :connecting,
768
      %{data | auth_context: nil, connection_params: connection_params},
769
      {:next_event, :internal, conn_type}
770
    }
771
  end
772

773
  defp cache_validated_password(%{tenant: tenant}, %Supavisor.Secrets.PasswordSecrets{} = secrets) do
774
    case Supavisor.ClientAuthentication.get_validation_secrets(tenant, secrets.user) do
169✔
775
      {:ok, %{password_secrets: nil} = validation} ->
776
        updated = %{validation | password_secrets: secrets}
×
777
        Supavisor.ClientAuthentication.put_validation_secrets(tenant, secrets.user, updated)
×
778

779
      _ ->
169✔
780
        :ok
781
    end
782
  end
783

784
  defp cache_validated_password(_data, _secrets), do: :ok
453✔
785

786
  defp handle_auth_failure(exception, data) do
787
    AuthMethods.handle_auth_failure(data.auth_context, exception)
14✔
788
    Supavisor.CircuitBreaker.record_failure({data.tenant, data.peer_ip}, :auth_error)
14✔
789
    Error.terminate_with_error(data, exception, :handshake)
14✔
790
  end
791

792
  @spec maybe_checkout(:on_connect | :on_query, map, non_neg_integer()) ::
793
          {:ok, Data.db_connection()} | {:ok, nil} | {:error, Exception.t()}
794
  defp maybe_checkout(event, data, attempt \\ 0)
7,021✔
795

796
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}}, _)
1,814✔
797
       when is_pid(db_pid) and mode in [:session, :proxy] do
798
    {:ok, {pool, db_pid, db_sock}}
799
  end
800

801
  defp maybe_checkout(:on_connect, %{mode: :transaction}, _), do: {:ok, nil}
379✔
802

803
  defp maybe_checkout(event, data, attempt) do
804
    timeout = data.timeout |> div(attempt + 1)
4,828✔
805
    start = System.monotonic_time(:microsecond)
4,828✔
806

807
    with {:ok, db_pid} <- pool_checkout(data.pool, timeout, data.mode),
4,828✔
808
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
4,827✔
809
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,827✔
810
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,827✔
811
      {:ok, {data.pool, db_pid, db_sock}}
4,827✔
812
    else
813
      {:error, %Supavisor.Errors.DbHandlerExitedError{}} ->
NEW
814
        if attempt < @max_checkout_retries do
×
NEW
815
          Logger.warning(
×
NEW
816
            "ClientHandler: DbHandler exited during checkout, retrying (attempt #{attempt + 1})"
×
817
          )
818

NEW
819
          maybe_checkout(event, data, attempt + 1)
×
820
        else
821
          {:error, %Supavisor.Errors.CheckoutRetriesExhaustedError{}}
822
        end
823

824
      other ->
825
        other
1✔
826
    end
827
  end
828

829
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
830
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
831

832
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
833
    Process.unlink(db_pid)
4,572✔
834
    :poolboy.checkin(pool, db_pid)
4,572✔
835
    nil
836
  end
837

838
  defp maybe_checkin(:session, _, db_connection), do: db_connection
1,807✔
839
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
840

841
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, Exception.t()}
842
  defp handle_data(data_to_send, data) do
843
    Logger.debug(
15,156✔
844
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,156✔
845
    )
846

847
    with {:ok, new_stream_state, pkts} <-
15,156✔
848
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,156✔
849
         :ok <- sock_send(pkts, data) do
15,150✔
850
      {:ok, %{data | stream_state: new_stream_state}}
851
    else
852
      {:error, exception} ->
6✔
853
        {:error, exception}
854
    end
855
  end
856

857
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
858
  defp handle_actions(%{} = data) do
859
    heartbeat =
7,226✔
860
      if data.heartbeat_interval > 0,
7,226✔
861
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
7,226✔
862
        else: []
863

864
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
7,226✔
865

866
    idle ++ heartbeat
7,226✔
867
  end
868

869
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
870
  defp sock_send(bin_or_pkts, data) do
871
    {_pool, db_handler, db_sock} = data.db_connection
15,375✔
872

873
    case bin_or_pkts do
15,375✔
874
      pkts when is_list(pkts) ->
875
        # Chunking to ensure we send bigger packets
876
        pkts
877
        |> Enum.chunk_by(&is_tuple/1)
878
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,242✔
879
          case chunk do
880
            [t | _] = prepared_pkts when is_tuple(t) ->
881
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,892✔
882

883
            bins ->
884
              HandlerHelpers.sock_send(db_sock, bins)
5,874✔
885
          end
886
          |> case do
8,766✔
887
            :ok -> {:cont, :ok}
8,766✔
888
            error -> {:halt, error}
×
889
          end
890
        end)
891

892
      bin ->
893
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,133✔
894
    end
895
  end
896

897
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
898
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
899
    if subscribe_retries < @subscribe_retries do
×
900
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
901

902
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
903
       {:timeout, @timeout_subscribe, :subscribe}}
904
    else
905
      Error.terminate_with_error(data, %SubscribeRetriesExhaustedError{}, :handshake)
×
906
    end
907
  end
908

909
  defp pool_checkout(pool, timeout, mode) do
4,828✔
910
    {:ok, :poolboy.checkout(pool, true, timeout)}
911
  catch
912
    :exit, {:timeout, _} ->
1✔
913
      {:error, %CheckoutTimeoutError{mode: mode, timeout_ms: timeout}}
914

915
    :exit, reason ->
×
916
      {:error, %PoolCheckoutError{reason: reason}}
917
  end
918

919
  defp set_tenant_info(data, info, user, id, db_name, client_jit) do
920
    proxy_type =
649✔
921
      if info.tenant.require_user,
649✔
922
        do: :password,
923
        else: :auth_query
924

925
    connection_params = %Supavisor.ConnectionParameters{
649✔
926
      application_name: data.app_name || "Supavisor",
649✔
927
      database: db_name,
928
      host: to_charlist(info.tenant.db_host),
649✔
929
      sni_hostname:
930
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
649✔
931
      port: info.tenant.db_port,
649✔
932
      ip_version: Helpers.ip_version(info.tenant.ip_version, info.tenant.db_host),
649✔
933
      upstream_ssl: Supavisor.id(id, :upstream_tls),
934
      upstream_tls_ca: info.tenant.upstream_tls_ca,
649✔
935
      upstream_verify: info.tenant.upstream_verify
649✔
936
    }
937

938
    %{
939
      data
940
      | id: id,
649✔
941
        tenant: info.tenant.external_id,
649✔
942
        tenant_feature_flags: info.tenant.feature_flags,
649✔
943
        tenant_availability_zone: info.tenant.availability_zone,
649✔
944
        user: user,
945
        db_name: db_name,
946
        timeout: info.user.pool_checkout_timeout,
649✔
947
        ps: info.tenant.default_parameter_status,
649✔
948
        proxy_type: proxy_type,
949
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
649✔
950
        connection_params: connection_params,
951
        max_clients: info.user.max_clients || info.tenant.default_max_clients,
649✔
952
        use_jit_flow: client_jit
953
    }
954
  end
955

956
  defp upstream_tls(%{use_jit: true}, ssl?), do: ssl?
×
957
  defp upstream_tls(%{upstream_ssl: upstream_ssl}, _ssl?), do: upstream_ssl
659✔
958

959
  defp handle_socket_close(state, data) do
960
    maybe_cleanup_db_handler(state, data)
322✔
961

962
    error = %ClientSocketClosedError{mode: data.mode, client_state: state}
322✔
963
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
322✔
964
    Error.terminate_with_error(data, error, context)
322✔
965
  end
966
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc