• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 23816549911

31 Mar 2026 07:53PM UTC coverage: 78.981% (-2.3%) from 81.271%
23816549911

push

github

web-flow
fix: SNI-based tenant routing  (#899)

## What is the current behavior?
SNI-based tenant routing silently fails across all tested versions
(2.7.4, 2.8.8, 2.9.0-rc). When a client connects with a plain username
(e.g. user=appuser) and relies on the TLS SNI hostname (e.g.
demo-db.local.arc.cloud) for tenant resolution, try_get_sni() returns
nil and the connection either falls back to dot-delimited username
routing or crashes with "comparison with nil is forbidden" in
Ecto.Query.Builder.


## What is the new behavior?
SNI-based tenant routing works end-to-end. Clients can connect with a
plain username and the tenant is correctly resolved from the TLS SNI
hostname. Pool identity and config lookups use the resolved tenant's
external_id.
## Additional context
Two separate bugs in lib/supavisor/client_handler.ex:

The :ssl.handshake call doesn't include an sni_fun callback. Erlang
OTP's :ssl module requires sni_fun to be registered during the handshake
for the SNI hostname to be captured and returned by
:ssl.connection_information/2. Without it, the handshake succeeds but
sni_hostname is never stored. Fixed by adding sni_fun: fn _hostname ->
[certs_keys: certs_keys] end to the handshake opts.


After SNI extraction works, the pool identity struct is still built with
tenant: tenant_or_alias, where tenant_or_alias is parsed from the
username and is nil for SNI-only connections. This nil propagates
through start_local_pool → get_pool_config_cache → get_pool_config,
where Ecto rejects the nil comparison. Fixed by falling back to
info.tenant.external_id when tenant_or_alias is nil.

## Personal Context
I was trying to run Supavisor with sni routing on K8s but it kept
failing for all tested versions (2.7.4, 2.8.8, 2.9.0-rc-3). Tried
looking into the code & fixing this, this change did fix the error


[logfile.txt](https://github.com/user-attachments/files/26358228/logfile.txt)
adding the error log i recieved when trying to connect using SNI routing
- it would fail... (continued)

2 of 2 new or added lines in 1 file covered. (100.0%)

60 existing lines in 3 files now uncovered.

2495 of 3159 relevant lines covered (78.98%)

58152.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.74
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  require Record
13
  require Supavisor
14

15
  @behaviour :ranch_protocol
16
  @behaviour :gen_statem
17
  @proto [:tcp, :ssl]
18
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
19
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
20
  @timeout_subscribe 500
21
  @clients_registry Supavisor.Registry.TenantClients
22
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
23
  @max_startup_packet_size Supavisor.Protocol.max_startup_packet_size()
24

25
  alias Supavisor.{
26
    DbHandler,
27
    HandlerHelpers,
28
    Helpers,
29
    Manager,
30
    Monitoring.Telem,
31
    Protocol.Debug,
32
    Tenants
33
  }
34

35
  alias Supavisor.ClientHandler.{
36
    AuthMethods,
37
    Cancel,
38
    Checks,
39
    Data,
40
    Error,
41
    ProtocolHelpers,
42
    Proxy
43
  }
44

45
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
46

47
  alias Supavisor.Errors.{
48
    CheckoutTimeoutError,
49
    ClientSocketClosedError,
50
    DbHandlerExitedError,
51
    PoolCheckoutError,
52
    PoolConfigNotFoundError,
53
    PoolRanchNotFoundError,
54
    SslHandshakeError,
55
    StartupPacketTooLargeError,
56
    SubscribeRetriesExhaustedError,
57
    WorkerNotFoundError
58
  }
59

60
  require Supavisor.Protocol.Server, as: Server
61
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
62

63
  @impl true
64
  def start_link(ref, transport, opts) do
65
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
815✔
66
    {:ok, pid}
67
  end
68

69
  @impl true
70
  def callback_mode, do: [:handle_event_function, :state_enter]
815✔
71

72
  @spec db_status(pid(), :ready_for_query) :: :ok
73
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
6,370✔
74

75
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
76
  def send_error_and_terminate(pid, error_message),
77
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
4✔
78

79
  @spec graceful_shutdown(pid()) :: :ok
80
  def graceful_shutdown(pid), do: :gen_statem.cast(pid, :graceful_shutdown)
22✔
81

82
  @impl true
83
  def init(_), do: :ignore
×
84

85
  def init(ref, trans, opts) do
86
    Process.flag(:trap_exit, true)
815✔
87
    Helpers.set_max_heap_size(90)
815✔
88

89
    {:ok, sock} = :ranch.handshake(ref)
815✔
90
    sock_ref = Port.monitor(sock)
815✔
91
    peer_ip = Helpers.peer_ip(sock)
815✔
92
    local = opts[:local] || false
815✔
93

94
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
815✔
95
    :ok = trans.setopts(sock, active: @switch_active_count)
815✔
96
    Logger.debug("ClientHandler is: #{inspect(self())}")
815✔
97

98
    now = System.monotonic_time()
815✔
99

100
    data = %Data{
815✔
101
      sock: {:gen_tcp, sock},
102
      trans: trans,
103
      sock_ref: sock_ref,
104
      peer_ip: peer_ip,
105
      local: local,
106
      ssl: false,
107
      connection_params: nil,
108
      mode: opts.mode,
815✔
109
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
110
      stats: %{},
111
      idle_timeout: 0,
112
      heartbeat_interval: 0,
113
      connection_start: now,
114
      state_entered_at: now,
115
      subscribe_retries: 0
116
    }
117

118
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
815✔
119
  end
120

121
  @impl true
122
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
123
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
124

125
    HandlerHelpers.sock_send(
1✔
126
      data.sock,
1✔
127
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
128
    )
129

130
    {:stop, :normal}
131
  end
132

133
  # cancel request
134
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
135
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
145✔
136
    :ok = Cancel.send_cancel_query(pid, key)
145✔
137
    {:stop, :normal}
138
  end
139

140
  # send cancel request to db
141
  def handle_event(:info, :cancel_query, state, data) do
142
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
133✔
143
    :keep_state_and_data
144
  end
145

146
  def handle_event(
147
        :info,
148
        {:tcp, _, Server.ssl_request_message()},
149
        :handshake,
150
        %{sock: sock} = data
151
      ) do
152
    certs_keys = Helpers.downstream_certs_keys()
8✔
153

154
    # SSL negotiation, S/N/Error
155
    if certs_keys != [] do
8✔
156
      :ok = HandlerHelpers.setopts(sock, active: false)
8✔
157
      :ok = HandlerHelpers.sock_send(sock, "S")
8✔
158

159
      opts = [
8✔
160
        verify: :verify_none,
161
        certs_keys: certs_keys,
162
        sni_fun: fn _hostname -> [certs_keys: certs_keys] end
4✔
163
      ]
164

165
      case :ssl.handshake(elem(sock, 1), opts) do
8✔
166
        {:ok, ssl_sock} ->
167
          socket = {:ssl, ssl_sock}
8✔
168
          :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
8✔
169
          {:keep_state, %{data | sock: socket, ssl: true}}
170

171
        error ->
172
          Error.terminate_with_error(data, %SslHandshakeError{reason: error}, :handshake)
×
173
      end
174
    else
UNCOV
175
      Logger.warning(
×
176
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
177
      )
178

179
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
180
      :keep_state_and_data
181
    end
182
  end
183

184
  def handle_event(:info, {_, _, bin}, :handshake, data)
185
      when byte_size(bin) > @max_startup_packet_size do
186
    Error.terminate_with_error(
1✔
187
      data,
188
      %StartupPacketTooLargeError{packet_size: byte_size(bin)},
189
      :handshake
190
    )
191
  end
192

193
  def handle_event(:info, {_, _, bin}, :handshake, data) do
194
    case ProtocolHelpers.parse_startup_packet(bin) do
656✔
195
      {:ok, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}, app_name,
196
       log_level} ->
197
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path, jit, client_tls}}}
648✔
198
        if log_level, do: Logger.put_process_level(self(), log_level)
648✔
199

200
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
648✔
201

202
      {:error, exception} ->
203
        Error.terminate_with_error(data, exception, :handshake)
8✔
204
    end
205
  end
206

207
  def handle_event(
208
        :internal,
209
        {:hello, {type, {user, tenant_or_alias, db_name, search_path, client_jit, client_tls}}},
210
        :handshake,
211
        %{sock: sock} = data
212
      ) do
213
    sni_hostname = HandlerHelpers.try_get_sni(sock)
647✔
214

215
    Logger.metadata(
647✔
216
      project: tenant_or_alias,
217
      user: user,
218
      mode: data.mode,
647✔
219
      type: type,
220
      app_name: data.app_name,
647✔
221
      db_name: db_name
222
    )
223

224
    # When receiving a proxied connection on a local listener, client_tls
225
    # carries the original client's TLS status. Otherwise, use data.ssl.
226
    effective_ssl = if(data.local && client_tls, do: client_tls, else: data.ssl)
647✔
227

228
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
647✔
229
      {:ok, info} ->
230
        upstream_tls = upstream_tls(info.tenant, effective_ssl)
646✔
231

232
        resolved_tenant = tenant_or_alias || info.tenant.external_id
646✔
233

234
        id =
646✔
235
          Supavisor.id(
646✔
236
            type: type,
237
            tenant: resolved_tenant,
238
            user: user,
239
            mode: data.mode,
646✔
240
            db: db_name,
241
            search_path: search_path,
242
            upstream_tls: upstream_tls
243
          )
244

245
        with :ok <- Checks.check_tenant_not_banned(info),
646✔
246
             :ok <- Checks.check_ssl_enforcement(data, info, user),
645✔
247
             :ok <- Checks.check_address_allowed(sock, info),
645✔
248
             :ok <- Manager.check_client_limit(id, info, data.mode),
645✔
249
             {:ok, auth_method} <-
644✔
250
               AuthMethods.fetch_authentication_method(
251
                 info.tenant,
644✔
252
                 client_jit,
253
                 effective_ssl,
254
                 user
255
               ) do
256
          Logger.debug("ClientHandler: Authentication method: #{inspect(auth_method)}")
644✔
257
          new_data = set_tenant_info(data, info, user, id, db_name, client_jit)
644✔
258

259
          {:keep_state, new_data,
644✔
260
           {:next_event, :internal, {:start_authentication, auth_method, info}}}
261
        else
262
          {:error, exception} when is_exception(exception) ->
263
            Error.terminate_with_error(%{data | id: id}, exception, :handshake)
2✔
264
        end
265

266
      {:error, exception} ->
267
        Error.terminate_with_error(data, exception, :handshake)
1✔
268
    end
269
  end
270

271
  def handle_event(
272
        :internal,
273
        {:start_authentication, auth_method, info},
274
        _state,
275
        %{sock: sock} = data
276
      ) do
277
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(auth_method)}")
644✔
278

279
    case Supavisor.CircuitBreaker.check({data.tenant, data.peer_ip}, :auth_error) do
644✔
280
      :ok ->
281
        case auth_method do
643✔
282
          :jit ->
UNCOV
283
            :ok = HandlerHelpers.sock_send(sock, Server.password_request())
×
UNCOV
284
            auth_context = AuthMethods.Jit.new_context(info, data.id, data.peer_ip)
×
285

UNCOV
286
            {:next_state, :auth_password_wait, %{data | auth_context: auth_context},
×
287
             {:timeout, 15_000, :auth_timeout}}
288

289
          :password ->
290
            :ok = HandlerHelpers.sock_send(sock, Server.password_request())
×
UNCOV
291
            auth_context = AuthMethods.Password.new_context(info, data.id)
×
292

UNCOV
293
            {:next_state, :auth_password_wait, %{data | auth_context: auth_context},
×
294
             {:timeout, 15_000, :auth_timeout}}
295

296
          :scram_sha_256 ->
297
            auth_context = AuthMethods.SCRAM.new_context(info, data.id)
643✔
298
            :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
643✔
299

300
            {:next_state, :auth_scram_first_wait, %{data | auth_context: auth_context},
643✔
301
             {:timeout, 15_000, :auth_timeout}}
302
        end
303

304
      {:error, exception} ->
305
        Error.terminate_with_error(data, exception, :handshake)
1✔
306
    end
307
  end
308

309
  def handle_event(:internal, :subscribe, _state, data) do
310
    Logger.debug("ClientHandler: Subscribe to tenant #{Supavisor.inspect_id(data.id)}")
622✔
311

312
    with :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
622✔
313
         {:ok, sup} <-
621✔
314
           Supavisor.start_dist(data.id, data.connection_params.secrets,
621✔
315
             availability_zone: data.tenant_availability_zone,
621✔
316
             log_level: nil
317
           ),
318
         :not_proxy <-
620✔
319
           if(node(sup) != node() and data.mode != :proxy, do: :proxy, else: :not_proxy),
620✔
320
         {:ok, opts} <- Supavisor.subscribe(data.id),
615✔
321
         manager_ref = Process.monitor(opts.workers.manager),
613✔
322
         data = Map.merge(data, opts.workers),
613✔
323
         {:ok, db_connection} <- maybe_checkout(:on_connect, data) do
613✔
324
      data = %{
613✔
325
        data
326
        | manager: manager_ref,
327
          db_connection: db_connection,
328
          idle_timeout: opts.idle_timeout
613✔
329
      }
330

331
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
613✔
332

333
      cond do
613✔
334
        data.client_ready ->
613✔
335
          {:next_state, :idle, data, handle_actions(data)}
2✔
336

337
        opts.ps == [] ->
611✔
338
          {:keep_state, data, {:timeout, 1_000, :wait_ps}}
74✔
339

340
        true ->
537✔
341
          {:keep_state, data, {:next_event, :internal, {:greetings, opts.ps}}}
537✔
342
      end
343
    else
344
      {:error, %WorkerNotFoundError{}} ->
UNCOV
345
        timeout_subscribe_or_terminate(data)
×
346

347
      {:error, %PoolConfigNotFoundError{}} ->
UNCOV
348
        timeout_subscribe_or_terminate(data)
×
349

350
      {:error, exception} when is_exception(exception) ->
351
        Error.terminate_with_error(data, exception, :handshake)
4✔
352

353
      :proxy ->
354
        case Supavisor.get_pool_ranch(data.id) do
5✔
355
          {:ok, pool_ranch} ->
356
            Logger.metadata(proxy: true)
5✔
357
            Registry.register(@proxy_clients_registry, data.id, [])
5✔
358

359
            {:keep_state, %{data | pool_ranch: pool_ranch}, {:next_event, :internal, :connect_db}}
5✔
360

361
          {:error, %PoolRanchNotFoundError{}} ->
UNCOV
362
            timeout_subscribe_or_terminate(data)
×
363
        end
364
    end
365
  end
366

367
  def handle_event(:internal, :connect_db, _state, data) do
368
    Logger.debug("ClientHandler: Trying to connect to DB")
5✔
369

370
    with {:ok, db_pid} <-
5✔
371
           Proxy.start_proxy_connection(
372
             data.id,
5✔
373
             data.max_clients,
5✔
374
             data.connection_params,
5✔
375
             data.tenant_feature_flags,
5✔
376
             data.pool_ranch,
5✔
377
             client_ssl: data.ssl,
5✔
378
             client_jit: data.use_jit_flow
5✔
379
           ),
380
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
2✔
381
      {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
382
    else
383
      {:error, exception} ->
384
        Error.terminate_with_error(data, exception, :authenticated)
5✔
385
    end
386
  end
387

388
  def handle_event(:internal, {:greetings, ps}, _state, %{sock: sock} = data) do
389
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
609✔
390
    msg = [ps, [header, payload], Server.ready_for_query()]
609✔
391
    :ok = Cancel.listen_cancel_query(pid, key)
609✔
392
    :ok = HandlerHelpers.sock_send(sock, msg)
609✔
393
    Telem.client_connection_time(data.connection_start, data.id)
609✔
394
    {:next_state, :idle, %{data | client_ready: true}, handle_actions(data)}
609✔
395
  end
396

UNCOV
397
  def handle_event(:timeout, :subscribe, _state, _) do
×
398
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
399
  end
400

401
  def handle_event(:timeout, :wait_ps, _state, data) do
UNCOV
402
    Logger.warning(
×
UNCOV
403
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
404
    )
405

406
    ps = Server.encode_parameter_status(data.ps)
×
407
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
408
  end
409

410
  def handle_event(:timeout, :idle_terminate, _state, data) do
UNCOV
411
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
412
    {:stop, :normal}
413
  end
414

415
  def handle_event(:timeout, :heartbeat_check, _state, data) do
UNCOV
416
    Logger.debug("ClientHandler: Send heartbeat to client")
×
UNCOV
417
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
UNCOV
418
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
419
  end
420

421
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
72✔
422
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
423
  end
424

425
  # TLS alert handling: WARNING alerts keep the connection alive, FATAL alerts terminate it.
426
  # For FATAL alerts, Erlang doesn't send ssl_closed, so we must terminate here.
427
  # The alert level is only available by parsing the message string ("Warning - " or "Fatal - ").
428
  def handle_event(:info, {:ssl_error, sock, {:tls_alert, {_reason, msg}}}, _, %{sock: {_, sock}}) do
429
    msg_string = to_string(msg)
3✔
430

431
    if String.contains?(msg_string, "Fatal -") do
3✔
432
      Logger.warning(
2✔
433
        "ClientHandler: Received fatal TLS alert: #{msg_string}, terminating connection"
2✔
434
      )
435

436
      {:stop, :normal}
437
    else
438
      Logger.warning(
1✔
439
        "ClientHandler: Received TLS warning alert: #{msg_string}, keeping connection alive"
1✔
440
      )
441

442
      :keep_state_and_data
443
    end
444
  end
445

446
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
447
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
448
    :keep_state_and_data
449
  end
450

451
  # client closed connection
452
  def handle_event(_, {closed, _}, state, data)
453
      when closed in [:tcp_closed, :ssl_closed] do
454
    handle_socket_close(state, data)
330✔
455
  end
456

457
  # linked DbHandler went down
458
  def handle_event(:info, {:EXIT, db_pid, reason}, state, data) do
UNCOV
459
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
×
460

UNCOV
461
    Error.terminate_with_error(
×
462
      data,
463
      %DbHandlerExitedError{pid: db_pid, reason: reason},
464
      context
465
    )
466
  end
467

468
  # pool's manager went down
469
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
470
    Logger.error(
3✔
471
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
3✔
472
    )
473

474
    case {state, reason} do
3✔
475
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
476
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
2✔
UNCOV
477
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
478
      {:busy, _} -> {:keep_state_and_data, :postpone}
1✔
479
    end
480
  end
481

482
  # socket went down
483
  def handle_event(
484
        :info,
485
        {:DOWN, ref, _, _, _reason},
486
        state,
487
        %{sock_ref: ref} = data
488
      ) do
489
    handle_socket_close(state, data)
2✔
490
  end
491

492
  # emulate handle_cast
493
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
494
    Logger.debug("ClientHandler: Client is ready")
6,366✔
495

496
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
6,366✔
497

498
    {_, stats} =
6,366✔
499
      if data.local,
6,366✔
500
        do: {nil, data.stats},
5✔
501
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
6,361✔
502

503
    Telem.client_query_time(data.query_start, data.id, data.mode == :proxy)
6,366✔
504

505
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
6,366✔
506
     handle_actions(data)}
507
  end
508

509
  def handle_event(:cast, {:db_status, :ready_for_query}, :idle, _) do
4✔
510
    :keep_state_and_data
511
  end
512

513
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
514
    HandlerHelpers.sock_send(data.sock, error_message)
4✔
515
    {:stop, :normal}
516
  end
517

518
  def handle_event(:cast, :graceful_shutdown, :busy, _data) do
3✔
519
    {:keep_state_and_data, :postpone}
520
  end
521

522
  def handle_event(:cast, :graceful_shutdown, _state, data) do
523
    HandlerHelpers.sock_send(data.sock, Server.encode_error_message(Server.admin_shutdown()))
8✔
524

525
    {:stop, :normal}
526
  end
527

528
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
529
      when sock_error in [:tcp_error, :ssl_error] do
UNCOV
530
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}, state was #{state}")
×
531

532
    {:stop, :normal}
533
  end
534

535
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
536
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
537
    :keep_state_and_data
538
  end
539

540
  # Authentication state handlers
541

542
  def handle_event(:info, {proto, _socket, bin}, :auth_password_wait, data)
543
      when proto in @proto do
UNCOV
544
    result =
×
UNCOV
545
      case data.auth_context do
×
546
        %AuthMethods.Jit.Context{} ->
UNCOV
547
          AuthMethods.Jit.handle_password(data.auth_context, bin)
×
548

549
        %AuthMethods.Password.Context{} ->
UNCOV
550
          AuthMethods.Password.handle_password(data.auth_context, bin)
×
551
      end
552

UNCOV
553
    case result do
×
554
      {:ok, secrets} ->
UNCOV
555
        handle_auth_success(data.sock, secrets, data)
×
556

557
      {:error, exception} ->
UNCOV
558
        handle_auth_failure(exception, data)
×
559
    end
560
  end
561

562
  # SCRAM authentication - waiting for first message
563
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
564
      when proto in @proto do
565
    case AuthMethods.SCRAM.handle_scram_first(data.auth_context, bin) do
641✔
566
      {:ok, message, auth_context} ->
567
        :ok = HandlerHelpers.sock_send(data.sock, Server.exchange_message(:first, message))
640✔
568
        new_data = %{data | auth_context: auth_context}
639✔
569
        {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
639✔
570

571
      {:error, exception} ->
572
        handle_auth_failure(exception, data)
1✔
573
    end
574
  end
575

576
  # SCRAM authentication - waiting for final response
577
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
578
      when proto in @proto do
579
    case AuthMethods.SCRAM.handle_scram_final(data.auth_context, bin) do
634✔
580
      {:ok, message, final_secrets} ->
581
        :ok = HandlerHelpers.sock_send(data.sock, message)
621✔
582
        handle_auth_success(data.sock, final_secrets, data)
621✔
583

584
      {:error, exception} ->
585
        handle_auth_failure(exception, data)
13✔
586
    end
587
  end
588

589
  # Authentication timeout handler
590
  def handle_event(:timeout, :auth_timeout, auth_state, data)
591
      when auth_state in [
592
             :auth_scram_first_wait,
593
             :auth_scram_final_wait,
594
             :auth_password_wait
595
           ] do
UNCOV
596
    exception = %Supavisor.Errors.AuthTimeoutError{context: auth_state}
×
UNCOV
597
    handle_auth_failure(exception, data)
×
598
  end
599

600
  def handle_event(:enter, old_state, new_state, data) do
601
    Logger.metadata(state: new_state)
16,085✔
602

603
    case {old_state, new_state} do
16,085✔
604
      # This is emitted on initialization
605
      {:handshake, :handshake} ->
606
        {:next_state, new_state, data}
815✔
607

608
      # We are not interested in idle->busy->idle transitions
609
      {:idle, :busy} ->
610
        {:next_state, new_state, data}
6,389✔
611

612
      {:busy, :idle} ->
613
        {:next_state, new_state, data}
6,366✔
614

615
      _ ->
616
        now = System.monotonic_time()
2,515✔
617
        time_in_previous_state = now - data.state_entered_at
2,515✔
618

619
        :telemetry.execute(
2,515✔
620
          [:supavisor, :client_handler, :state],
621
          %{duration: time_in_previous_state},
622
          %{
623
            from_state: old_state,
624
            to_state: new_state,
625
            tenant: data.tenant
2,515✔
626
          }
627
        )
628

629
        {:next_state, new_state, %{data | state_entered_at: now}}
2,515✔
630
    end
631
  end
632

633
  # Terminate request
634
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, state, data) when proto in @proto do
635
    Logger.info("ClientHandler: Terminate received from client")
280✔
636
    maybe_cleanup_db_handler(state, data)
280✔
637
    {:stop, :normal}
638
  end
639

640
  # Sync when idle and no db_connection - return sync directly
641
  def handle_event(
642
        _kind,
643
        {proto, _, <<?S, 4::32, _::binary>>},
644
        :idle,
645
        %{db_connection: nil} = data
646
      )
647
      when proto in @proto do
648
    Logger.debug("ClientHandler: Receive sync")
8✔
649
    :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
8✔
650

651
    {:keep_state, data, handle_actions(data)}
8✔
652
  end
653

654
  # Sync when busy - send to db
655
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, :busy, data)
656
      when proto in @proto do
657
    Logger.debug("ClientHandler: Receive sync")
225✔
658
    :ok = sock_send(msg, data)
225✔
659

660
    {:keep_state, data, handle_actions(data)}
225✔
661
  end
662

663
  # Any message when idle - checkout and send to db
664
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
665
    case maybe_checkout(:on_query, data) do
6,390✔
666
      {:ok, db_connection} ->
667
        {:next_state, :busy,
6,389✔
668
         %{data | db_connection: db_connection, query_start: System.monotonic_time()},
669
         [{:next_event, :internal, {proto, socket, msg}}]}
670

671
      {:error, exception} ->
672
        Error.terminate_with_error(data, exception, :authenticated)
1✔
673
    end
674
  end
675

676
  # Any message when busy: send to db
677
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
678
    case handle_data(msg, data) do
15,136✔
679
      {:ok, updated_data} ->
15,130✔
680
        {:keep_state, updated_data}
681

682
      {:error, exception} ->
683
        Error.terminate_with_error(data, exception, :authenticated)
6✔
684
    end
685
  end
686

687
  # Any message when connecting - postpone
UNCOV
688
  def handle_event(_kind, {proto, _socket, _msg}, :connecting, _data) when proto in @proto do
×
689
    {:keep_state_and_data, :postpone}
690
  end
691

692
  def handle_event(type, content, state, _data) do
693
    msg = [
1✔
694
      {"type", type},
695
      {"content", content},
696
      {"state", state}
697
    ]
698

699
    Logger.warning("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
1✔
700

701
    :keep_state_and_data
702
  end
703

704
  @impl true
705
  def terminate(reason, state, _data) do
706
    Logger.metadata(state: state)
815✔
707

708
    level =
815✔
709
      case reason do
710
        :normal -> :debug
813✔
711
        _ -> :error
2✔
712
      end
713

714
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
815✔
715
  end
716

717
  defp maybe_cleanup_db_handler(state, data) do
718
    if state == :idle and data.mode == :session and data.db_connection != nil and
612✔
719
         !Supavisor.Helpers.no_warm_pool_user?(data.user) do
236✔
720
      Logger.debug("ClientHandler: Performing session cleanup before termination")
235✔
721
      {pool, db_pid, _} = data.db_connection
235✔
722

723
      # We unsubscribe to free up space for new clients during the cleanup time.
724
      Supavisor.Manager.unsubscribe(data.id)
235✔
725

726
      case DbHandler.attempt_cleanup(db_pid) do
235✔
727
        :ok ->
728
          Process.unlink(db_pid)
225✔
729
          :poolboy.checkin(pool, db_pid)
225✔
730

731
        # In case of error, both processes will be terminated
732
        _error ->
10✔
733
          :ok
734
      end
735
    end
736
  end
737

738
  @impl true
739
  def format_status(status) do
740
    Map.put(status, :queue, [])
2✔
741
  end
742

743
  ## Internal functions
744
  defp handle_auth_success(sock, final_secrets, data) do
745
    Logger.info("ClientHandler: Connection authenticated")
621✔
746
    cache_validated_password(data, final_secrets)
621✔
747

748
    if data.mode != :proxy do
621✔
749
      Supavisor.UpstreamAuthentication.put_upstream_auth_secrets(data.id, final_secrets)
621✔
750
    end
751

752
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
621✔
753
    Telem.client_join(:ok, data.id)
620✔
754

755
    connection_params = %{data.connection_params | secrets: final_secrets}
620✔
756

757
    conn_type =
620✔
758
      if data.mode == :proxy,
620✔
759
        do: :connect_db,
760
        else: :subscribe
761

762
    {
620✔
763
      :next_state,
764
      :connecting,
765
      %{data | auth_context: nil, connection_params: connection_params},
766
      {:next_event, :internal, conn_type}
767
    }
768
  end
769

770
  defp cache_validated_password(%{tenant: tenant}, %Supavisor.Secrets.PasswordSecrets{} = secrets) do
771
    case Supavisor.ClientAuthentication.get_validation_secrets(tenant, secrets.user) do
165✔
772
      {:ok, %{password_secrets: nil} = validation} ->
UNCOV
773
        updated = %{validation | password_secrets: secrets}
×
UNCOV
774
        Supavisor.ClientAuthentication.put_validation_secrets(tenant, secrets.user, updated)
×
775

776
      _ ->
165✔
777
        :ok
778
    end
779
  end
780

781
  defp cache_validated_password(_data, _secrets), do: :ok
456✔
782

783
  defp handle_auth_failure(exception, data) do
784
    AuthMethods.handle_auth_failure(data.auth_context, exception)
14✔
785
    Supavisor.CircuitBreaker.record_failure({data.tenant, data.peer_ip}, :auth_error)
14✔
786
    Error.terminate_with_error(data, exception, :handshake)
14✔
787
  end
788

789
  @spec maybe_checkout(:on_connect | :on_query, map) ::
790
          {:ok, Data.db_connection()} | {:ok, nil} | {:error, Exception.t()}
791
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}})
1,810✔
792
       when is_pid(db_pid) and mode in [:session, :proxy] do
793
    {:ok, {pool, db_pid, db_sock}}
794
  end
795

796
  defp maybe_checkout(:on_connect, %{mode: :transaction}), do: {:ok, nil}
371✔
797

798
  defp maybe_checkout(_, data) do
799
    start = System.monotonic_time(:microsecond)
4,822✔
800

801
    with {:ok, db_pid} <- pool_checkout(data.pool, data.timeout, data.mode),
4,822✔
802
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self(), data.mode) do
4,821✔
803
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,821✔
804
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,821✔
805
      {:ok, {data.pool, db_pid, db_sock}}
4,821✔
806
    end
807
  end
808

809
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
UNCOV
810
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
811

812
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
813
    Process.unlink(db_pid)
4,562✔
814
    :poolboy.checkin(pool, db_pid)
4,562✔
815
    nil
816
  end
817

818
  defp maybe_checkin(:session, _, db_connection), do: db_connection
1,804✔
UNCOV
819
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
820

821
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, Exception.t()}
822
  defp handle_data(data_to_send, data) do
823
    Logger.debug(
15,136✔
824
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,136✔
825
    )
826

827
    with {:ok, new_stream_state, pkts} <-
15,136✔
828
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,136✔
829
         :ok <- sock_send(pkts, data) do
15,130✔
830
      {:ok, %{data | stream_state: new_stream_state}}
831
    else
832
      {:error, exception} ->
6✔
833
        {:error, exception}
834
    end
835
  end
836

837
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
838
  defp handle_actions(%{} = data) do
839
    heartbeat =
7,210✔
840
      if data.heartbeat_interval > 0,
7,210✔
841
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
7,210✔
842
        else: []
843

844
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
7,210✔
845

846
    idle ++ heartbeat
7,210✔
847
  end
848

849
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
850
  defp sock_send(bin_or_pkts, data) do
851
    {_pool, db_handler, db_sock} = data.db_connection
15,355✔
852

853
    case bin_or_pkts do
15,355✔
854
      pkts when is_list(pkts) ->
855
        # Chunking to ensure we send bigger packets
856
        pkts
857
        |> Enum.chunk_by(&is_tuple/1)
858
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,226✔
859
          case chunk do
860
            [t | _] = prepared_pkts when is_tuple(t) ->
861
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,885✔
862

863
            bins ->
864
              HandlerHelpers.sock_send(db_sock, bins)
5,852✔
865
          end
866
          |> case do
8,737✔
867
            :ok -> {:cont, :ok}
8,737✔
UNCOV
868
            error -> {:halt, error}
×
869
          end
870
        end)
871

872
      bin ->
873
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,129✔
874
    end
875
  end
876

877
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
878
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
UNCOV
879
    if subscribe_retries < @subscribe_retries do
×
UNCOV
880
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
×
881

UNCOV
882
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
×
883
       {:timeout, @timeout_subscribe, :subscribe}}
884
    else
UNCOV
885
      Error.terminate_with_error(data, %SubscribeRetriesExhaustedError{}, :handshake)
×
886
    end
887
  end
888

889
  defp pool_checkout(pool, timeout, mode) do
4,822✔
890
    {:ok, :poolboy.checkout(pool, true, timeout)}
891
  catch
892
    :exit, {:timeout, _} ->
1✔
893
      {:error, %CheckoutTimeoutError{mode: mode, timeout_ms: timeout}}
894

UNCOV
895
    :exit, reason ->
×
896
      {:error, %PoolCheckoutError{reason: reason}}
897
  end
898

899
  defp set_tenant_info(data, info, user, id, db_name, client_jit) do
900
    proxy_type =
644✔
901
      if info.tenant.require_user,
644✔
902
        do: :password,
903
        else: :auth_query
904

905
    connection_params = %Supavisor.ConnectionParameters{
644✔
906
      application_name: data.app_name || "Supavisor",
644✔
907
      database: db_name,
908
      host: to_charlist(info.tenant.db_host),
644✔
909
      sni_hostname:
910
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
644✔
911
      port: info.tenant.db_port,
644✔
912
      ip_version: Helpers.ip_version(info.tenant.ip_version, info.tenant.db_host),
644✔
913
      upstream_ssl: Supavisor.id(id, :upstream_tls),
914
      upstream_tls_ca: info.tenant.upstream_tls_ca,
644✔
915
      upstream_verify: info.tenant.upstream_verify
644✔
916
    }
917

918
    %{
919
      data
920
      | id: id,
644✔
921
        tenant: info.tenant.external_id,
644✔
922
        tenant_feature_flags: info.tenant.feature_flags,
644✔
923
        tenant_availability_zone: info.tenant.availability_zone,
644✔
924
        user: user,
925
        db_name: db_name,
926
        timeout: info.user.pool_checkout_timeout,
644✔
927
        ps: info.tenant.default_parameter_status,
644✔
928
        proxy_type: proxy_type,
929
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
644✔
930
        connection_params: connection_params,
931
        max_clients: info.user.max_clients || info.tenant.default_max_clients,
644✔
932
        use_jit_flow: client_jit
933
    }
934
  end
935

UNCOV
936
  defp upstream_tls(%{use_jit: true}, ssl?), do: ssl?
×
937
  defp upstream_tls(%{upstream_ssl: upstream_ssl}, _ssl?), do: upstream_ssl
646✔
938

939
  defp handle_socket_close(state, data) do
940
    maybe_cleanup_db_handler(state, data)
332✔
941

942
    error = %ClientSocketClosedError{mode: data.mode, client_state: state}
332✔
943
    context = if state in [:idle, :busy], do: :authenticated, else: :handshake
332✔
944
    Error.terminate_with_error(data, error, context)
332✔
945
  end
946
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc