• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 25392996935

05 May 2026 05:53PM UTC coverage: 76.82% (-1.0%) from 77.819%
25392996935

Pull #968

github

web-flow
Merge 7420a59ff into 93de7a0aa
Pull Request #968: chore: prepare for soft-deployment (v2.9.4)

0 of 37 new or added lines in 2 files covered. (0.0%)

9 existing lines in 6 files now uncovered.

2522 of 3283 relevant lines covered (76.82%)

56093.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.21
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  require Record
13
  require Supavisor
14

15
  @behaviour :ranch_protocol
16
  @behaviour :gen_statem
17
  @proto [:tcp, :ssl]
18
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
19
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
20
  @max_checkout_retries 2
21
  @timeout_subscribe 500
22
  @clients_registry Supavisor.Registry.TenantClients
23
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
24
  @max_startup_packet_size Supavisor.Protocol.max_startup_packet_size()
25

26
  alias Supavisor.{
27
    DbHandler,
28
    HandlerHelpers,
29
    Helpers,
30
    Manager,
31
    Monitoring.Telem,
32
    Protocol.Debug,
33
    Tenants
34
  }
35

36
  alias Supavisor.ClientHandler.{
37
    AuthMethods,
38
    Cancel,
39
    Checks,
40
    Data,
41
    Error,
42
    ProtocolHelpers,
43
    Proxy
44
  }
45

46
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
47

48
  alias Supavisor.Errors.{
49
    CheckoutTimeoutError,
50
    ClientSocketClosedError,
51
    DbHandlerExitedError,
52
    PoolCheckoutError,
53
    PoolConfigNotFoundError,
54
    PoolRanchNotFoundError,
55
    SslHandshakeError,
56
    StartupPacketTooLargeError,
57
    SubscribeRetriesExhaustedError,
58
    WorkerNotFoundError
59
  }
60

61
  require Supavisor.Protocol.Server, as: Server
62
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
63

64
  @impl true
65
  def start_link(ref, transport, opts) do
66
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
833✔
67
    {:ok, pid}
68
  end
69

70
  @impl true
71
  def callback_mode, do: [:handle_event_function, :state_enter]
833✔
72

73
  @spec db_status(pid(), :ready_for_query) :: :ok
74
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
6,385✔
75

76
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
77
  def send_error_and_terminate(pid, error_message),
78
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
6✔
79

80
  @spec graceful_shutdown(pid()) :: :ok
81
  def graceful_shutdown(pid), do: :gen_statem.cast(pid, :graceful_shutdown)
19✔
82

83
  @impl true
84
  def init(_), do: :ignore
×
85

86
  def init(ref, trans, opts) do
87
    Process.flag(:trap_exit, true)
833✔
88
    Helpers.set_max_heap_size(90)
833✔
89

90
    {:ok, sock} = :ranch.handshake(ref)
833✔
91
    sock_ref = Port.monitor(sock)
833✔
92
    peer_ip = Helpers.peer_ip(sock)
833✔
93
    local = opts[:local] || false
833✔
94

95
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
833✔
96
    :ok = trans.setopts(sock, active: @switch_active_count)
833✔
97
    Logger.debug("ClientHandler is: #{inspect(self())}")
833✔
98

99
    now = System.monotonic_time()
833✔
100

101
    data = %Data{
833✔
102
      sock: {:gen_tcp, sock},
103
      trans: trans,
104
      sock_ref: sock_ref,
105
      peer_ip: peer_ip,
106
      local: local,
107
      ssl: false,
108
      connection_params: nil,
109
      mode: opts.mode,
833✔
110
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
111
      stats: %{},
112
      idle_timeout: 0,
113
      heartbeat_interval: 0,
114
      connection_start: now,
115
      state_entered_at: now,
116
      subscribe_retries: 0
117
    }
118

119
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
833✔
120
  end
121

122
  @impl true
123
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
124
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
125

126
    HandlerHelpers.sock_send(
1✔
127
      data.sock,
1✔
128
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
129
    )
130

131
    {:stop, :normal}
132
  end
133

134
  # cancel request
135
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
136
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
150✔
137
    :ok = Cancel.send_cancel_query(pid, key)
150✔
138
    {:stop, :normal}
139
  end
140

141
  # send cancel request to db
142
  def handle_event(:info, :cancel_query, state, data) do
143
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
136✔
144
    :keep_state_and_data
145
  end
146

147
  def handle_event(
148
        :info,
149
        {:tcp, _, Server.ssl_request_message()},
150
        :handshake,
151
        %{sock: sock} = data
152
      ) do
153
    certs_keys = Helpers.downstream_certs_keys()
8✔
154

155
    # SSL negotiation, S/N/Error
156
    if certs_keys != [] do
8✔
157
      :ok = HandlerHelpers.setopts(sock, active: false)
8✔
158

159
      opts = [
8✔
160
        verify: :verify_none,
161
        certs_keys: certs_keys,
162
        sni_fun: fn _hostname -> :undefined end,
4✔
163
        receiver_spawn_opts: [min_heap_size: 2048]
164
      ]
165

166
      with :ok <- client_sock_send(data, "S", :handshake),
8✔
167
           {:ok, ssl_sock} <- :ssl.handshake(elem(sock, 1), opts) do
8✔
168
        socket = {:ssl, ssl_sock}
8✔
169
        :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
8✔
170
        {:keep_state, %{data | sock: socket, ssl: true}}
171
      else
172
        {:error, %ClientSocketClosedError{} = exception} ->
173
          Error.terminate_with_error(data, exception, :handshake)
×
174

175
        error ->
176
          Error.terminate_with_error(data, %SslHandshakeError{reason: error}, :handshake)
×
177
      end
178
    else
179
      Logger.warning(
×
180
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
181
      )
182

183
      case client_sock_send(data, "N", :handshake) do
×
184
        :ok ->
×
185
          :keep_state_and_data
186

187
        {:error, exception} ->
188
          Error.terminate_with_error(data, exception, :handshake)
×
189
      end
190
    end
191
  end
192

193
  def handle_event(:info, {_, _, bin}, :handshake, data)
194
      when byte_size(bin) > @max_startup_packet_size do
195
    Error.terminate_with_error(
1✔
196
      data,
197
      %StartupPacketTooLargeError{packet_size: byte_size(bin)},
198
      :handshake
199
    )
200
  end
201

202
  def handle_event(:info, {_, _, bin}, :handshake, data) do
203
    case ProtocolHelpers.parse_startup_packet(bin) do
669✔
204
      {:ok, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}, app_name,
205
       log_level} ->
206
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}}
661✔
207
        if log_level, do: Logger.put_process_level(self(), log_level)
661✔
208

209
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
661✔
210

211
      {:error, exception} ->
212
        Error.terminate_with_error(data, exception, :handshake)
8✔
213
    end
214
  end
215

216
  def handle_event(
217
        :internal,
218
        {:hello, {type, {user, tenant_or_alias, db_name, search_path, client_jit, client_tls}}},
219
        :handshake,
220
        %{sock: sock} = data
221
      ) do
222
    sni_hostname = HandlerHelpers.try_get_sni(sock)
660✔
223

224
    Logger.metadata(
660✔
225
      project: tenant_or_alias,
226
      user: user,
227
      mode: data.mode,
660✔
228
      type: type,
229
      app_name: data.app_name,
660✔
230
      db_name: db_name
231
    )
232

233
    # When receiving a proxied connection on a local listener, client_tls
234
    # carries the original client's TLS status. Otherwise, use data.ssl.
235
    effective_ssl = if(data.local && client_tls, do: client_tls, else: data.ssl)
660✔
236

237
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
660✔
238
      {:ok, info} ->
239
        upstream_tls = upstream_tls(info.tenant, effective_ssl)
659✔
240

241
        resolved_tenant = tenant_or_alias || info.tenant.external_id
659✔
242

243
        id =
659✔
244
          Supavisor.id(
659✔
245
            type: type,
246
            tenant: resolved_tenant,
247
            user: user,
248
            mode: data.mode,
659✔
249
            db: db_name,
250
            search_path: search_path,
251
            upstream_tls: upstream_tls
252
          )
253

254
        with :ok <- Checks.check_tenant_not_banned(info),
659✔
255
             :ok <- Checks.check_ssl_enforcement(data, info, user),
652✔
256
             :ok <- Checks.check_address_allowed(sock, info),
652✔
257
             :ok <- Manager.check_client_limit(id, info, data.mode),
650✔
258
             {:ok, auth_method} <-
649✔
259
               AuthMethods.fetch_authentication_method(
260
                 info.tenant,
649✔
261
                 client_jit,
262
                 effective_ssl,
263
                 user
264
               ) do
265
          Logger.debug("ClientHandler: Authentication method: #{inspect(auth_method)}")
649✔
266
          new_data = set_tenant_info(data, info, user, id, db_name, client_jit)
649✔
267

268
          {:keep_state, new_data,
649✔
269
           {:next_event, :internal, {:start_authentication, auth_method, info}}}
270
        else
271
          {:error, exception} when is_exception(exception) ->
272
            Error.terminate_with_error(%{data | id: id}, exception, :handshake)
8✔
273
        end
274

275
      {:error, exception} ->
276
        Error.terminate_with_error(data, exception, :handshake)
1✔
277
    end
278
  end
279

280
  def handle_event(
281
        :internal,
282
        {:start_authentication, auth_method, info},
283
        _state,
284
        data
285
      ) do
286
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(auth_method)}")
649✔
287

288
    {auth_request, next_state, auth_context} =
649✔
289
      case auth_method do
290
        :jit ->
291
          {Server.password_request(), :auth_password_wait,
×
292
           AuthMethods.Jit.new_context(info, data.id, data.peer_ip)}
×
293

294
        :password ->
295
          {Server.password_request(), :auth_password_wait,
×
296
           AuthMethods.Password.new_context(info, data.id)}
×
297

298
        :scram_sha_256 ->
299
          {Server.scram_request(), :auth_scram_first_wait,
649✔
300
           AuthMethods.SCRAM.new_context(info, data.id)}
649✔
301
      end
302

303
    with :ok <- Supavisor.CircuitBreaker.check({data.tenant, data.peer_ip}, :auth_error),
649✔
304
         :ok <- client_sock_send(data, auth_request, :handshake) do
648✔
305
      {:next_state, next_state, %{data | auth_context: auth_context},
648✔
306
       {:timeout, 15_000, :auth_timeout}}
307
    else
308
      {:error, exception} ->
309
        Error.terminate_with_error(data, exception, :handshake)
1✔
310
    end
311
  end
312

313
  def handle_event(:internal, :subscribe, _state, data) do
314
    Logger.debug("ClientHandler: Subscribe to tenant #{Supavisor.inspect_id(data.id)}")
626✔
315

316
    # CB check disabled until we ensure there are no false positives
317
    # :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
318
    with {:ok, sup} <-
626✔
319
           Supavisor.start_dist(data.id, data.connection_params.secrets,
626✔
320
             availability_zone: data.tenant_availability_zone,
626✔
321
             log_level: nil
322
           ),
323
         :not_proxy <-
625✔
324
           if(node(sup) != node() and data.mode != :proxy, do: :proxy, else: :not_proxy),
625✔
325
         {:ok, opts} <- Supavisor.subscribe(data.id),
620✔
326
         manager_ref = Process.monitor(opts.workers.manager),
618✔
327
         data = Map.merge(data, opts.workers),
618✔
328
         {:ok, db_connection} <- maybe_checkout(:on_connect, data) do
618✔
329
      data = %{
618✔
330
        data
331
        | manager: manager_ref,
332
          db_connection: db_connection,
333
          idle_timeout: opts.idle_timeout
618✔
334
      }
335

336
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
618✔
337

338
      cond do
618✔
339
        data.client_ready ->
618✔
340
          {:next_state, :idle, data, handle_actions(data)}
2✔
341

342
        opts.ps == [] ->
616✔
343
          {:keep_state, data, {:timeout, 1_000, :wait_ps}}
79✔
344

345
        true ->
537✔
346
          {:keep_state, data, {:next_event, :internal, {:greetings, opts.ps}}}
537✔
347
      end
348
    else
349
      {:error, %WorkerNotFoundError{}} ->
350
        timeout_subscribe_or_terminate(data)
×
351

352
      {:error, %PoolConfigNotFoundError{}} ->
353
        timeout_subscribe_or_terminate(data)
×
354

355
      {:error, exception} when is_exception(exception) ->
356
        Error.terminate_with_error(data, exception, :handshake)
3✔
357

358
      :proxy ->
359
        case Supavisor.get_pool_ranch(data.id) do
5✔
360
          {:ok, pool_ranch} ->
361
            Logger.metadata(proxy: true)
5✔
362
            Registry.register(@proxy_clients_registry, data.id, [])
5✔
363

364
            {:keep_state, %{data | pool_ranch: pool_ranch}, {:next_event, :internal, :connect_db}}
5✔
365

366
          {:error, %PoolRanchNotFoundError{}} ->
367
            timeout_subscribe_or_terminate(data)
×
368
        end
369
    end
370
  end
371

372
  def handle_event(:internal, :connect_db, _state, data) do
373
    Logger.debug("ClientHandler: Trying to connect to DB")
5✔
374

375
    with {:ok, db_pid} <-
5✔
376
           Proxy.start_proxy_connection(
377
             data.id,
5✔
378
             data.max_clients,
5✔
379
             data.connection_params,
5✔
380
             data.tenant_feature_flags,
5✔
381
             data.pool_ranch,
5✔
382
             client_ssl: data.ssl,
5✔
383
             client_jit: data.use_jit_flow
5✔
384
           ),
385
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
2✔
386
      {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
387
    else
388
      {:error, exception} ->
389
        Error.terminate_with_error(data, exception, :authenticated)
5✔
390
    end
391
  end
392

393
  def handle_event(:internal, {:greetings, ps}, _state, data) do
394
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
614✔
395
    msg = [ps, [header, payload], Server.ready_for_query()]
614✔
396
    :ok = Cancel.listen_cancel_query(pid, key)
614✔
397

398
    case client_sock_send(data, msg, :handshake) do
614✔
399
      :ok ->
400
        Telem.client_connection_time(data.connection_start, data.id)
614✔
401
        {:next_state, :idle, %{data | client_ready: true}, handle_actions(data)}
614✔
402

403
      {:error, exception} ->
404
        Error.terminate_with_error(data, exception, :handshake)
×
405
    end
406
  end
407

408
  def handle_event(:timeout, :subscribe, _state, _) do
×
409
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
410
  end
411

412
  def handle_event(:timeout, :wait_ps, _state, data) do
413
    Logger.warning(
×
414
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
415
    )
416

417
    ps = Server.encode_parameter_status(data.ps)
×
418
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
419
  end
420

421
  def handle_event(:timeout, :idle_terminate, _state, data) do
422
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
423
    {:stop, :normal}
424
  end
425

426
  def handle_event(:timeout, :heartbeat_check, _state, data) do
427
    Logger.debug("ClientHandler: Send heartbeat to client")
×
428

429
    case client_sock_send(data, Server.application_name(), :idle) do
×
430
      :ok ->
×
431
        {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
432

433
      {:error, exception} ->
434
        Error.terminate_with_error(data, exception, :authenticated)
×
435
    end
436
  end
437

438
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
77✔
439
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
440
  end
441

442
  # TLS alert handling: WARNING alerts keep the connection alive, FATAL alerts terminate it.
443
  # For FATAL alerts, Erlang doesn't send ssl_closed, so we must terminate here.
444
  # The alert level is only available by parsing the message string ("Warning - " or "Fatal - ").
445
  def handle_event(:info, {:ssl_error, sock, {:tls_alert, {_reason, msg}}}, _, %{sock: {_, sock}}) do
446
    msg_string = to_string(msg)
3✔
447

448
    if String.contains?(msg_string, "Fatal -") do
3✔
449
      Logger.warning(
2✔
450
        "ClientHandler: Received fatal TLS alert: #{msg_string}, terminating connection"
2✔
451
      )
452

453
      {:stop, :normal}
454
    else
455
      Logger.warning(
1✔
456
        "ClientHandler: Received TLS warning alert: #{msg_string}, keeping connection alive"
1✔
457
      )
458

459
      :keep_state_and_data
460
    end
461
  end
462

463
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
464
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
465
    :keep_state_and_data
466
  end
467

468
  # client closed connection
469
  def handle_event(_, {closed, _}, state, data)
470
      when closed in [:tcp_closed, :ssl_closed] do
471
    handle_socket_close(state, data)
321✔
472
  end
473

474
  # If the linked DbHandler went down normally, we can trust it to have
475
  # sent/logged a proper error message and just terminate
476
  def handle_event(:info, {:EXIT, _db_pid, :normal}, _state, _data) do
×
477
    {:stop, :normal}
478
  end
479

480
  def handle_event(:info, {:EXIT, db_pid, reason}, state, data) do
481
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
×
482

483
    Error.terminate_with_error(
×
484
      data,
485
      %DbHandlerExitedError{pid: db_pid, reason: reason},
486
      context
487
    )
488
  end
489

490
  # pool's manager went down
491
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
492
    Logger.error(
3✔
493
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
3✔
494
    )
495

496
    case {state, reason} do
3✔
497
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
498
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
2✔
499
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
500
      {:busy, _} -> {:keep_state_and_data, :postpone}
1✔
501
    end
502
  end
503

504
  # socket went down
505
  def handle_event(
506
        :info,
507
        {:DOWN, ref, _, _, _reason},
508
        state,
509
        %{sock_ref: ref} = data
510
      ) do
511
    handle_socket_close(state, data)
4✔
512
  end
513

514
  # emulate handle_cast
515
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
516
    Logger.debug("ClientHandler: Client is ready")
6,380✔
517

518
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
6,380✔
519

520
    {_, stats} =
6,380✔
521
      if data.local,
6,380✔
522
        do: {nil, data.stats},
5✔
523
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
6,375✔
524

525
    Telem.client_query_time(data.query_start, data.id, data.mode == :proxy)
6,380✔
526

527
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
6,380✔
528
     handle_actions(data)}
529
  end
530

531
  def handle_event(:cast, {:db_status, :ready_for_query}, :idle, _) do
5✔
532
    :keep_state_and_data
533
  end
534

535
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
536
    HandlerHelpers.sock_send(data.sock, error_message)
6✔
537
    {:stop, :normal}
538
  end
539

540
  def handle_event(:cast, :graceful_shutdown, :busy, _data) do
3✔
541
    {:keep_state_and_data, :postpone}
542
  end
543

544
  def handle_event(:cast, :graceful_shutdown, _state, data) do
545
    HandlerHelpers.sock_send(data.sock, Server.encode_error_message(Server.admin_shutdown()))
8✔
546

547
    {:stop, :normal}
548
  end
549

550
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
551
      when sock_error in [:tcp_error, :ssl_error] do
552
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}, state was #{state}")
×
553

554
    {:stop, :normal}
555
  end
556

557
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
558
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
559
    :keep_state_and_data
560
  end
561

562
  # Authentication state handlers
563

564
  def handle_event(:info, {proto, _socket, bin}, :auth_password_wait, data)
565
      when proto in @proto do
566
    result =
×
567
      case data.auth_context do
×
568
        %AuthMethods.Jit.Context{} ->
569
          AuthMethods.Jit.handle_password(data.auth_context, bin)
×
570

571
        %AuthMethods.Password.Context{} ->
572
          AuthMethods.Password.handle_password(data.auth_context, bin)
×
573
      end
574

575
    case result do
×
576
      {:ok, secrets} ->
577
        handle_auth_success(secrets, data)
×
578

579
      {:error, exception} ->
580
        handle_auth_failure(exception, data)
×
581
    end
582
  end
583

584
  # SCRAM authentication - waiting for first message
585
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
586
      when proto in @proto do
587
    with {:ok, message, auth_context} <-
640✔
588
           AuthMethods.SCRAM.handle_scram_first(data.auth_context, bin),
640✔
589
         :ok <- client_sock_send(data, Server.exchange_message(:first, message), :handshake) do
639✔
590
      new_data = %{data | auth_context: auth_context}
639✔
591
      {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
639✔
592
    else
593
      {:error, %ClientSocketClosedError{} = exception} ->
UNCOV
594
        Error.terminate_with_error(data, exception, :handshake)
×
595

596
      {:error, exception} ->
597
        handle_auth_failure(exception, data)
1✔
598
    end
599
  end
600

601
  # SCRAM authentication - waiting for final response
602
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
603
      when proto in @proto do
604
    with {:ok, message, final_secrets} <-
637✔
605
           AuthMethods.SCRAM.handle_scram_final(data.auth_context, bin),
637✔
606
         :ok <- client_sock_send(data, message, :handshake) do
624✔
607
      handle_auth_success(final_secrets, data)
624✔
608
    else
609
      {:error, %ClientSocketClosedError{} = exception} ->
610
        Error.terminate_with_error(data, exception, :handshake)
×
611

612
      {:error, exception} ->
613
        handle_auth_failure(exception, data)
13✔
614
    end
615
  end
616

617
  # Authentication timeout handler
618
  def handle_event(:timeout, :auth_timeout, auth_state, data)
619
      when auth_state in [
620
             :auth_scram_first_wait,
621
             :auth_scram_final_wait,
622
             :auth_password_wait
623
           ] do
624
    exception = %Supavisor.Errors.AuthTimeoutError{context: auth_state}
×
625
    handle_auth_failure(exception, data)
×
626
  end
627

628
  def handle_event(:enter, old_state, new_state, data) do
629
    Logger.metadata(state: new_state)
16,147✔
630

631
    case {old_state, new_state} do
16,147✔
632
      # This is emitted on initialization
633
      {:handshake, :handshake} ->
634
        {:next_state, new_state, data}
833✔
635

636
      # We are not interested in idle->busy->idle transitions
637
      {:idle, :busy} ->
638
        {:next_state, new_state, data}
6,405✔
639

640
      {:busy, :idle} ->
641
        {:next_state, new_state, data}
6,380✔
642

643
      _ ->
644
        now = System.monotonic_time()
2,529✔
645
        time_in_previous_state = now - data.state_entered_at
2,529✔
646

647
        :telemetry.execute(
2,529✔
648
          [:supavisor, :client_handler, :state],
649
          %{duration: time_in_previous_state},
650
          %{
651
            from_state: old_state,
652
            to_state: new_state,
653
            tenant: data.tenant
2,529✔
654
          }
655
        )
656

657
        {:next_state, new_state, %{data | state_entered_at: now}}
2,529✔
658
    end
659
  end
660

661
  # Terminate request
662
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, state, data) when proto in @proto do
663
    Logger.info("ClientHandler: Terminate received from client")
293✔
664
    maybe_cleanup_db_handler(state, data)
293✔
665
    {:stop, :normal}
666
  end
667

668
  # Sync when idle and no db_connection - return sync directly
669
  def handle_event(
670
        _kind,
671
        {proto, _, <<?S, 4::32, _::binary>>},
672
        :idle,
673
        %{db_connection: nil} = data
674
      )
675
      when proto in @proto do
676
    Logger.debug("ClientHandler: Receive sync")
8✔
677

678
    case client_sock_send(data, Server.ready_for_query(), :idle) do
8✔
679
      :ok ->
680
        {:keep_state, data, handle_actions(data)}
8✔
681

682
      {:error, exception} ->
683
        Error.terminate_with_error(data, exception, :authenticated)
×
684
    end
685
  end
686

687
  # Sync when busy - send to db
688
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, :busy, data)
689
      when proto in @proto do
690
    Logger.debug("ClientHandler: Receive sync")
225✔
691
    :ok = sock_send(msg, data)
225✔
692

693
    {:keep_state, data, handle_actions(data)}
225✔
694
  end
695

696
  # Any message when idle - checkout and send to db
697
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
698
    case maybe_checkout(:on_query, data) do
6,406✔
699
      {:ok, db_connection} ->
700
        {:next_state, :busy,
6,405✔
701
         %{data | db_connection: db_connection, query_start: System.monotonic_time()},
702
         [{:next_event, :internal, {proto, socket, msg}}]}
703

704
      {:error, exception} ->
705
        Error.terminate_with_error(data, exception, :authenticated)
1✔
706
    end
707
  end
708

709
  # Any message when busy: send to db
710
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
711
    case handle_data(msg, data) do
15,160✔
712
      {:ok, updated_data} ->
15,154✔
713
        {:keep_state, updated_data}
714

715
      {:error, exception} ->
716
        Error.terminate_with_error(data, exception, :authenticated)
6✔
717
    end
718
  end
719

720
  # Any message when connecting - postpone
721
  def handle_event(_kind, {proto, _socket, _msg}, :connecting, _data) when proto in @proto do
×
722
    {:keep_state_and_data, :postpone}
723
  end
724

725
  def handle_event(type, content, state, _data) do
726
    msg = [
1✔
727
      {"type", type},
728
      {"content", content},
729
      {"state", state}
730
    ]
731

732
    Logger.warning("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
1✔
733

734
    :keep_state_and_data
735
  end
736

737
  @impl true
738
  def terminate(reason, state, _data) do
739
    Logger.metadata(state: state)
833✔
740

741
    level =
833✔
742
      case reason do
743
        :normal -> :debug
831✔
744
        _ -> :error
2✔
745
      end
746

747
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
833✔
748
  end
749

750
  defp maybe_cleanup_db_handler(state, data) do
751
    if state == :idle and data.mode == :session and data.db_connection != nil and
618✔
752
         !Supavisor.Helpers.no_warm_pool_user?(data.user) do
233✔
753
      Logger.debug("ClientHandler: Performing session cleanup before termination")
232✔
754
      {pool, db_pid, _} = data.db_connection
232✔
755

756
      # We unsubscribe to free up space for new clients during the cleanup time.
757
      Supavisor.Manager.unsubscribe(data.id)
232✔
758

759
      case DbHandler.attempt_cleanup(db_pid) do
232✔
760
        :ok ->
761
          Process.unlink(db_pid)
130✔
762
          :poolboy.checkin(pool, db_pid)
130✔
763

764
        # In case of error, both processes will be terminated
765
        _error ->
102✔
766
          :ok
767
      end
768
    end
769
  end
770

771
  @impl true
772
  def format_status(status) do
773
    Map.put(status, :queue, [])
2✔
774
  end
775

776
  ## Internal functions
777
  defp handle_auth_success(final_secrets, data) do
778
    Logger.info("ClientHandler: Connection authenticated")
624✔
779
    cache_validated_password(data, final_secrets)
624✔
780

781
    if data.mode != :proxy do
624✔
782
      Supavisor.UpstreamAuthentication.put_upstream_auth_secrets(data.id, final_secrets)
624✔
783
    end
784

785
    case client_sock_send(data, Server.authentication_ok(), :handshake) do
624✔
786
      :ok ->
787
        Telem.client_join(:ok, data.id)
624✔
788

789
        connection_params = %{data.connection_params | secrets: final_secrets}
624✔
790

791
        conn_type =
624✔
792
          if data.mode == :proxy,
624✔
793
            do: :connect_db,
794
            else: :subscribe
795

796
        {
624✔
797
          :next_state,
798
          :connecting,
799
          %{data | auth_context: nil, connection_params: connection_params},
800
          {:next_event, :internal, conn_type}
801
        }
802

803
      {:error, exception} ->
804
        Error.terminate_with_error(data, exception, :handshake)
×
805
    end
806
  end
807

808
  defp cache_validated_password(%{tenant: tenant}, %Supavisor.Secrets.PasswordSecrets{} = secrets) do
809
    case Supavisor.ClientAuthentication.get_validation_secrets(tenant, secrets.user) do
169✔
810
      {:ok, %{password_secrets: nil} = validation} ->
811
        updated = %{validation | password_secrets: secrets}
×
812
        Supavisor.ClientAuthentication.put_validation_secrets(tenant, secrets.user, updated)
×
813

814
      _ ->
169✔
815
        :ok
816
    end
817
  end
818

819
  defp cache_validated_password(_data, _secrets), do: :ok
455✔
820

821
  defp handle_auth_failure(exception, data) do
822
    AuthMethods.handle_auth_failure(data.auth_context, exception)
14✔
823
    Supavisor.CircuitBreaker.record_failure({data.tenant, data.peer_ip}, :auth_error)
14✔
824
    Error.terminate_with_error(data, exception, :handshake)
14✔
825
  end
826

827
  @spec maybe_checkout(:on_connect | :on_query, map, non_neg_integer()) ::
828
          {:ok, Data.db_connection()} | {:ok, nil} | {:error, Exception.t()}
829
  defp maybe_checkout(event, data, attempt \\ 0)
7,024✔
830

831
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}}, _)
1,815✔
832
       when is_pid(db_pid) and mode in [:session, :proxy] do
833
    {:ok, {pool, db_pid, db_sock}}
834
  end
835

836
  defp maybe_checkout(:on_connect, %{mode: :transaction}, _), do: {:ok, nil}
378✔
837

838
  defp maybe_checkout(event, data, attempt) do
839
    timeout = data.timeout |> div(attempt + 1)
4,831✔
840
    start = System.monotonic_time(:microsecond)
4,831✔
841

842
    with {:ok, db_pid} <- pool_checkout(data.pool, timeout, data.mode),
4,831✔
843
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
4,830✔
844
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,830✔
845
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,830✔
846
      {:ok, {data.pool, db_pid, db_sock}}
4,830✔
847
    else
848
      {:error, %Supavisor.Errors.DbHandlerExitedError{}} ->
849
        if attempt < @max_checkout_retries do
×
850
          Logger.warning(
×
851
            "ClientHandler: DbHandler exited during checkout, retrying (attempt #{attempt + 1})"
×
852
          )
853

854
          maybe_checkout(event, data, attempt + 1)
×
855
        else
856
          {:error, %Supavisor.Errors.CheckoutRetriesExhaustedError{}}
857
        end
858

859
      other ->
860
        other
1✔
861
    end
862
  end
863

864
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
865
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
866

867
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
868
    Process.unlink(db_pid)
4,572✔
869
    :poolboy.checkin(pool, db_pid)
4,572✔
870
    nil
871
  end
872

873
  defp maybe_checkin(:session, _, db_connection), do: db_connection
1,808✔
874
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
875

876
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, Exception.t()}
877
  defp handle_data(data_to_send, data) do
878
    Logger.debug(
15,160✔
879
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,160✔
880
    )
881

882
    with {:ok, new_stream_state, pkts} <-
15,160✔
883
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,160✔
884
         :ok <- sock_send(pkts, data) do
15,154✔
885
      {:ok, %{data | stream_state: new_stream_state}}
886
    else
887
      {:error, exception} ->
6✔
888
        {:error, exception}
889
    end
890
  end
891

892
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
893
  defp handle_actions(%{} = data) do
894
    heartbeat =
7,229✔
895
      if data.heartbeat_interval > 0,
7,229✔
896
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
7,229✔
897
        else: []
898

899
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
7,229✔
900

901
    idle ++ heartbeat
7,229✔
902
  end
903

904
  @spec client_sock_send(map(), iodata(), :handshake | :idle) ::
905
          :ok | {:error, ClientSocketClosedError.t()}
906
  defp client_sock_send(data, bin, client_state) do
907
    case HandlerHelpers.sock_send(data.sock, bin) do
3,165✔
908
      :ok ->
3,165✔
909
        :ok
910

UNCOV
911
      {:error, reason} ->
×
912
        {:error,
UNCOV
913
         %ClientSocketClosedError{mode: data.mode, client_state: client_state, reason: reason}}
×
914
    end
915
  end
916

917
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
918
  defp sock_send(bin_or_pkts, data) do
919
    {_pool, db_handler, db_sock} = data.db_connection
15,379✔
920

921
    case bin_or_pkts do
15,379✔
922
      pkts when is_list(pkts) ->
923
        # Chunking to ensure we send bigger packets
924
        pkts
925
        |> Enum.chunk_by(&is_tuple/1)
926
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,242✔
927
          case chunk do
928
            [t | _] = prepared_pkts when is_tuple(t) ->
929
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,893✔
930

931
            bins ->
932
              HandlerHelpers.sock_send(db_sock, bins)
5,874✔
933
          end
934
          |> case do
8,767✔
935
            :ok -> {:cont, :ok}
8,767✔
936
            error -> {:halt, error}
×
937
          end
938
        end)
939

940
      bin ->
941
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,137✔
942
    end
943
  end
944

945
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
946
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
947
    if subscribe_retries < @subscribe_retries do
×
948
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
949

950
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
951
       {:timeout, @timeout_subscribe, :subscribe}}
952
    else
953
      Error.terminate_with_error(data, %SubscribeRetriesExhaustedError{}, :handshake)
×
954
    end
955
  end
956

957
  defp pool_checkout(pool, timeout, mode) do
4,831✔
958
    {:ok, :poolboy.checkout(pool, true, timeout)}
959
  catch
960
    :exit, {:timeout, _} ->
1✔
961
      {:error, %CheckoutTimeoutError{mode: mode, timeout_ms: timeout}}
962

963
    :exit, reason ->
×
964
      {:error, %PoolCheckoutError{reason: reason}}
965
  end
966

967
  defp set_tenant_info(data, info, user, id, db_name, client_jit) do
968
    proxy_type =
649✔
969
      if info.tenant.require_user,
649✔
970
        do: :password,
971
        else: :auth_query
972

973
    connection_params = %Supavisor.ConnectionParameters{
649✔
974
      application_name: data.app_name || "Supavisor",
649✔
975
      database: db_name,
976
      host: to_charlist(info.tenant.db_host),
649✔
977
      sni_hostname:
978
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
649✔
979
      port: info.tenant.db_port,
649✔
980
      ip_version: Helpers.ip_version(info.tenant.ip_version, info.tenant.db_host),
649✔
981
      upstream_ssl: Supavisor.id(id, :upstream_tls),
982
      upstream_tls_ca: info.tenant.upstream_tls_ca,
649✔
983
      upstream_verify: info.tenant.upstream_verify
649✔
984
    }
985

986
    %{
987
      data
988
      | id: id,
649✔
989
        tenant: info.tenant.external_id,
649✔
990
        tenant_feature_flags: info.tenant.feature_flags,
649✔
991
        tenant_availability_zone: info.tenant.availability_zone,
649✔
992
        user: user,
993
        db_name: db_name,
994
        timeout: info.user.pool_checkout_timeout,
649✔
995
        ps: info.tenant.default_parameter_status,
649✔
996
        proxy_type: proxy_type,
997
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
649✔
998
        connection_params: connection_params,
999
        max_clients: info.user.max_clients || info.tenant.default_max_clients,
649✔
1000
        use_jit_flow: client_jit
1001
    }
1002
  end
1003

1004
  defp upstream_tls(%{use_jit: true}, ssl?), do: ssl?
×
1005
  defp upstream_tls(%{upstream_ssl: upstream_ssl}, _ssl?), do: upstream_ssl
659✔
1006

1007
  defp handle_socket_close(state, data) do
1008
    maybe_cleanup_db_handler(state, data)
325✔
1009

1010
    error = %ClientSocketClosedError{mode: data.mode, client_state: state}
325✔
1011
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
325✔
1012
    Error.terminate_with_error(data, error, context)
325✔
1013
  end
1014
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc