• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 20445502504

22 Dec 2025 10:19PM UTC coverage: 65.404% (+0.1%) from 65.269%
20445502504

Pull #811

github

web-flow
Merge 17277a024 into c4a6ef05f
Pull Request #811: fix: only start ranch listeners after infra is set up

0 of 5 new or added lines in 1 file covered. (0.0%)

2 existing lines in 2 files now uncovered.

1917 of 2931 relevant lines covered (65.4%)

4195.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  @behaviour :ranch_protocol
13
  @behaviour :gen_statem
14
  @proto [:tcp, :ssl]
15
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
16
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
17
  @timeout_subscribe 500
18
  @clients_registry Supavisor.Registry.TenantClients
19
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
20

21
  alias Supavisor.{
22
    DbHandler,
23
    HandlerHelpers,
24
    Helpers,
25
    Monitoring.Telem,
26
    Protocol.Debug,
27
    Tenants
28
  }
29

30
  alias Supavisor.ClientHandler.{
31
    Auth,
32
    Cancel,
33
    Data,
34
    Error,
35
    ProtocolHelpers,
36
    Proxy
37
  }
38

39
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
40

41
  require Supavisor.Protocol.Server, as: Server
42
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
43

44
  @impl true
45
  def start_link(ref, transport, opts) do
46
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
764✔
47
    {:ok, pid}
48
  end
49

50
  @impl true
51
  def callback_mode, do: [:handle_event_function, :state_enter]
764✔
52

53
  @spec db_status(pid(), :ready_for_query) :: :ok
54
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
6,349✔
55

56
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
57
  def send_error_and_terminate(pid, error_message),
58
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
3✔
59

60
  @spec graceful_shutdown(pid()) :: :ok
61
  def graceful_shutdown(pid), do: :gen_statem.cast(pid, :graceful_shutdown)
20✔
62

63
  @impl true
64
  def init(_), do: :ignore
×
65

66
  def init(ref, trans, opts) do
67
    Process.flag(:trap_exit, true)
764✔
68
    Helpers.set_max_heap_size(90)
764✔
69

70
    {:ok, sock} = :ranch.handshake(ref)
764✔
71
    peer_ip = Helpers.peer_ip(sock)
764✔
72
    local = opts[:local] || false
764✔
73

74
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
764✔
75
    :ok = trans.setopts(sock, active: @switch_active_count)
764✔
76
    Logger.debug("ClientHandler is: #{inspect(self())}")
764✔
77

78
    now = System.monotonic_time()
764✔
79

80
    data = %Data{
764✔
81
      sock: {:gen_tcp, sock},
82
      trans: trans,
83
      peer_ip: peer_ip,
84
      local: local,
85
      ssl: false,
86
      auth: %{},
87
      mode: opts.mode,
764✔
88
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
89
      stats: %{},
90
      idle_timeout: 0,
91
      heartbeat_interval: 0,
92
      connection_start: now,
93
      state_entered_at: now,
94
      subscribe_retries: 0
95
    }
96

97
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
764✔
98
  end
99

100
  @impl true
101
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
102
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
103

104
    HandlerHelpers.sock_send(
1✔
105
      data.sock,
1✔
106
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
107
    )
108

109
    {:stop, :normal}
110
  end
111

112
  # cancel request
113
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
114
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
139✔
115
    :ok = Cancel.send_cancel_query(pid, key)
139✔
116
    {:stop, :normal}
117
  end
118

119
  # send cancel request to db
120
  def handle_event(:info, :cancel_query, state, data) do
121
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
127✔
122
    :keep_state_and_data
123
  end
124

125
  def handle_event(
126
        :info,
127
        {:tcp, _, Server.ssl_request_message()},
128
        :handshake,
129
        %{sock: sock} = data
130
      ) do
131
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
1✔
132

133
    downstream_cert = Helpers.downstream_cert()
1✔
134
    downstream_key = Helpers.downstream_key()
1✔
135

136
    # SSL negotiation, S/N/Error
137
    if !!downstream_cert and !!downstream_key do
1✔
138
      :ok = HandlerHelpers.setopts(sock, active: false)
1✔
139
      :ok = HandlerHelpers.sock_send(sock, "S")
1✔
140

141
      opts = [
1✔
142
        verify: :verify_none,
143
        certfile: downstream_cert,
144
        keyfile: downstream_key
145
      ]
146

147
      case :ssl.handshake(elem(sock, 1), opts) do
1✔
148
        {:ok, ssl_sock} ->
149
          socket = {:ssl, ssl_sock}
1✔
150
          :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
1✔
151
          {:keep_state, %{data | sock: socket, ssl: true}}
152

153
        error ->
154
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
155
          Telem.client_join(:fail, data.id)
×
156
          {:stop, :normal}
157
      end
158
    else
159
      Logger.error(
×
160
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
161
      )
162

163
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
164
      :keep_state_and_data
165
    end
166
  end
167

168
  def handle_event(:info, {_, _, bin}, :handshake, _) when byte_size(bin) > 1024 do
169
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
170
    {:stop, :normal}
171
  end
172

173
  def handle_event(:info, {_, _, bin}, :handshake, data) do
174
    case ProtocolHelpers.parse_startup_packet(bin) do
618✔
175
      {:ok, {type, {user, tenant_or_alias, db_name, search_path}}, app_name, _log_level} ->
176
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
611✔
177

178
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
611✔
179

180
      {:error, {:invalid_user_info, {:invalid_format, {user, _}}} = reason} ->
181
        # Extract tenant from the attempted parsing for telemetry
182
        {_, {_, tenant_or_alias, _}} = HandlerHelpers.parse_user_info(%{"user" => user})
1✔
183
        Telem.client_join(:fail, tenant_or_alias)
1✔
184
        Error.maybe_log_and_send_error(data.sock, {:error, reason})
1✔
185
        {:stop, :normal}
186

187
      {:error, error} ->
188
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
6✔
189
        Telem.client_join(:fail, data.id)
6✔
190
        {:stop, :normal}
191
    end
192
  end
193

194
  def handle_event(
195
        :internal,
196
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
197
        :handshake,
198
        %{sock: sock} = data
199
      ) do
200
    sni_hostname = HandlerHelpers.try_get_sni(sock)
611✔
201

202
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
611✔
203
      {:ok, info} ->
204
        db_name = db_name || info.tenant.db_database
611✔
205

206
        id =
611✔
207
          Supavisor.id(
208
            {type, tenant_or_alias},
209
            user,
210
            data.mode,
611✔
211
            info.user.mode_type,
611✔
212
            db_name,
213
            search_path
214
          )
215

216
        Logger.metadata(
611✔
217
          project: tenant_or_alias,
218
          user: user,
219
          mode: data.mode,
611✔
220
          type: type,
221
          db_name: db_name,
222
          app_name: data.app_name
611✔
223
        )
224

225
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
611✔
226

227
        cond do
610✔
228
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
610✔
229
            Error.maybe_log_and_send_error(sock, {:error, :ssl_required, user})
×
230
            Telem.client_join(:fail, id)
×
231
            {:stop, :normal}
232

233
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
610✔
234
            Error.maybe_log_and_send_error(sock, {:error, :address_not_allowed, addr})
×
235
            Telem.client_join(:fail, id)
×
236
            {:stop, :normal}
237

238
          check_max_clients_reached(id, info, data.mode) ->
610✔
239
            error =
1✔
240
              if data.mode == :session do
1✔
241
                {:error, :max_clients_reached_session}
242
              else
243
                {:error, :max_clients_reached}
244
              end
245

246
            Error.maybe_log_and_send_error(sock, error)
1✔
247
            Telem.client_join(:fail, id)
1✔
248
            {:stop, :normal}
249

250
          true ->
609✔
251
            new_data = set_tenant_info(data, info, user, id, db_name)
609✔
252

253
            case Supavisor.CircuitBreaker.check(tenant_or_alias, :get_secrets) do
609✔
254
              :ok ->
255
                case Auth.get_user_secrets(data.id, info, user, tenant_or_alias) do
608✔
256
                  {:ok, auth_secrets} ->
257
                    Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
608✔
258

259
                    {:keep_state, new_data,
608✔
260
                     {:next_event, :internal, {:handle, auth_secrets, info}}}
261

262
                  {:error, reason} ->
263
                    Supavisor.CircuitBreaker.record_failure(tenant_or_alias, :get_secrets)
×
264

265
                    Error.maybe_log_and_send_error(
×
266
                      sock,
267
                      {:error, :auth_error, reason},
268
                      :handshake
269
                    )
270

271
                    Telem.client_join(:fail, id)
×
272
                    {:stop, :normal}
273
                end
274

275
              {:error, :circuit_open, blocked_until} ->
276
                Error.maybe_log_and_send_error(
1✔
277
                  sock,
278
                  {:error, :circuit_breaker_open, :get_secrets, blocked_until},
279
                  :handshake
280
                )
281

282
                Telem.client_join(:fail, id)
1✔
283
                {:stop, :normal}
284
            end
285
        end
286

287
      {:error, reason} ->
288
        Error.maybe_log_and_send_error(
×
289
          sock,
290
          {:error, :tenant_not_found, reason, type, user, tenant_or_alias}
291
        )
292

293
        Telem.client_join(:fail, data.id)
×
294
        {:stop, :normal}
295
    end
296
  end
297

298
  def handle_event(
299
        :internal,
300
        {:handle, {method, secrets}, info},
301
        _state,
302
        %{sock: sock} = data
303
      ) do
304
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
608✔
305

306
    case Supavisor.CircuitBreaker.check({data.tenant, data.peer_ip}, :auth_error) do
608✔
307
      :ok ->
308
        case method do
607✔
309
          :auth_query_md5 ->
310
            auth_context = Auth.create_auth_context(method, secrets, info)
×
311
            :ok = HandlerHelpers.sock_send(sock, Server.md5_request(auth_context.salt))
×
312

313
            {:next_state, :auth_md5_wait, %{data | auth_context: auth_context},
×
314
             {:timeout, 15_000, :auth_timeout}}
315

316
          _scram_method ->
317
            :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
607✔
318
            auth_context = Auth.create_auth_context(method, secrets, info)
607✔
319

320
            {:next_state, :auth_scram_first_wait, %{data | auth_context: auth_context},
607✔
321
             {:timeout, 15_000, :auth_timeout}}
322
        end
323

324
      {:error, :circuit_open, blocked_until} ->
325
        Error.maybe_log_and_send_error(
1✔
326
          sock,
327
          {:error, :circuit_breaker_open, :auth_error, blocked_until},
328
          :handshake
329
        )
330

331
        Telem.client_join(:fail, data.id)
1✔
332
        {:stop, :normal}
333
    end
334
  end
335

336
  def handle_event(:internal, :subscribe, _state, data) do
337
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
596✔
338

339
    with :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
596✔
340
         {:ok, sup} <-
595✔
341
           Supavisor.start_dist(data.id, data.auth_secrets,
595✔
342
             availability_zone: data.tenant_availability_zone,
595✔
343
             log_level: nil
344
           ),
345
         true <-
595✔
346
           if(node(sup) != node() and data.mode in [:transaction, :session],
595✔
347
             do: :proxy,
348
             else: true
349
           ),
350
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
595✔
351
      manager_ref = Process.monitor(opts.workers.manager)
587✔
352
      data = Map.merge(data, opts.workers)
587✔
353
      db_connection = maybe_checkout(:on_connect, data)
587✔
354

355
      data = %{
587✔
356
        data
357
        | manager: manager_ref,
358
          db_connection: db_connection,
359
          idle_timeout: opts.idle_timeout
587✔
360
      }
361

362
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
587✔
363

364
      cond do
587✔
365
        data.client_ready ->
587✔
366
          {:next_state, :idle, data, handle_actions(data)}
2✔
367

368
        opts.ps == [] ->
585✔
369
          {:keep_state, data, {:timeout, 10_000, :wait_ps}}
53✔
370

371
        true ->
532✔
372
          {:keep_state, data, {:next_event, :internal, {:greetings, opts.ps}}}
532✔
373
      end
374
    else
375
      {:error, :circuit_open, blocked_until} ->
376
        Error.maybe_log_and_send_error(
1✔
377
          data.sock,
1✔
378
          {:error, :circuit_breaker_open, :db_connection, blocked_until}
379
        )
380

381
        Telem.client_join(:fail, data.id)
1✔
382
        {:stop, :normal}
383

384
      {:error, :max_clients_reached} ->
385
        Error.maybe_log_and_send_error(data.sock, {:error, :max_clients_reached})
1✔
386
        Telem.client_join(:fail, data.id)
1✔
387
        {:stop, :normal}
388

389
      {:error, :max_pools_reached} ->
390
        Error.maybe_log_and_send_error(data.sock, {:error, :max_pools_reached})
×
391
        Telem.client_join(:fail, data.id)
×
392
        {:stop, :normal}
393

394
      {:error, :terminating, error} ->
395
        error_message = Server.encode_error_message(error)
1✔
396
        HandlerHelpers.sock_send(data.sock, error_message)
1✔
397
        Telem.client_join(:fail, data.id)
1✔
398
        {:stop, :normal}
399

400
      :proxy ->
401
        case Proxy.prepare_proxy_connection(data) do
×
402
          {:ok, updated_data} ->
403
            Logger.metadata(proxy: true)
×
404
            Registry.register(@proxy_clients_registry, data.id, [])
×
405

406
            {:keep_state, updated_data, {:next_event, :internal, :connect_db}}
×
407

408
          {:error, other} ->
409
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
410
            timeout_subscribe_or_terminate(data)
×
411
        end
412

413
      error ->
414
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
6✔
415
        timeout_subscribe_or_terminate(data)
6✔
416
    end
417
  end
418

419
  def handle_event(:internal, :connect_db, _state, data) do
420
    Logger.debug("ClientHandler: Trying to connect to DB")
×
421

422
    args = Proxy.build_db_handler_args(data)
×
423

424
    {:ok, db_pid} = DbHandler.start_link(args)
×
425

426
    case DbHandler.checkout(db_pid, data.sock, self()) do
×
427
      {:ok, db_sock} ->
×
428
        {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
429

430
      {:error, {:exit, {:timeout, _}}} ->
431
        timeout_error(data)
×
432

433
      {:error, %{"S" => "FATAL"} = error_map} ->
434
        Logger.debug(
×
435
          "ClientHandler: Received error from DbHandler checkout (proxy): #{inspect(error_map)}"
436
        )
437

438
        error_message = Server.encode_error_message(error_map)
×
439
        HandlerHelpers.sock_send(data.sock, error_message)
×
440
        {:stop, :normal}
441

442
      # Errors are already forwarded to the client socket, so we can safely ignore them
443
      # here.
444
      {:error, {:exit, {reason, _}}} ->
445
        Logger.error(
×
446
          "ClientHandler: error checking out DbHandler (proxy), exit with reason: #{inspect(reason)}"
447
        )
448

449
        {:stop, :normal}
450
    end
451
  end
452

453
  def handle_event(:internal, {:greetings, ps}, _state, %{sock: sock} = data) do
454
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
584✔
455
    msg = [ps, [header, payload], Server.ready_for_query()]
584✔
456
    :ok = Cancel.listen_cancel_query(pid, key)
584✔
457
    :ok = HandlerHelpers.sock_send(sock, msg)
584✔
458
    Telem.client_connection_time(data.connection_start, data.id)
583✔
459
    {:next_state, :idle, %{data | client_ready: true}, handle_actions(data)}
583✔
460
  end
461

462
  def handle_event(:timeout, :subscribe, _state, _) do
6✔
463
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
464
  end
465

466
  def handle_event(:timeout, :wait_ps, _state, data) do
467
    Logger.warning(
×
468
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
469
    )
470

471
    ps = Server.encode_parameter_status(data.ps)
×
472
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
473
  end
474

475
  def handle_event(:timeout, :idle_terminate, _state, data) do
476
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
477
    {:stop, :normal}
478
  end
479

480
  def handle_event(:timeout, :heartbeat_check, _state, data) do
481
    Logger.debug("ClientHandler: Send heartbeat to client")
×
482
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
483
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
484
  end
485

486
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
52✔
487
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
488
  end
489

490
  # TLS alert handling: WARNING alerts keep the connection alive, FATAL alerts terminate it.
491
  # For FATAL alerts, Erlang doesn't send ssl_closed, so we must terminate here.
492
  # The alert level is only available by parsing the message string ("Warning - " or "Fatal - ").
493
  def handle_event(:info, {:ssl_error, sock, {:tls_alert, {_reason, msg}}}, _, %{sock: {_, sock}}) do
494
    msg_string = to_string(msg)
3✔
495

496
    if String.contains?(msg_string, "Fatal -") do
3✔
497
      Logger.warning(
2✔
498
        "ClientHandler: Received fatal TLS alert: #{msg_string}, terminating connection"
2✔
499
      )
500

501
      {:stop, :normal}
502
    else
503
      Logger.warning(
1✔
504
        "ClientHandler: Received TLS warning alert: #{msg_string}, keeping connection alive"
1✔
505
      )
506

507
      :keep_state_and_data
508
    end
509
  end
510

511
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
512
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
513
    :keep_state_and_data
514
  end
515

516
  # client closed connection
517
  def handle_event(_, {closed, _}, state, data)
518
      when closed in [:tcp_closed, :ssl_closed] do
519
    level =
314✔
520
      cond do
521
        state == :idle or data.mode == :proxy ->
314✔
522
          :info
523

524
        state == :handshake ->
23✔
525
          :warning
526

527
        true ->
17✔
528
          :error
529
      end
530

531
    Logger.log(
314✔
532
      level,
533
      "ClientHandler: socket closed while state was #{state} (#{data.mode})"
314✔
534
    )
535

536
    {:stop, :normal}
537
  end
538

539
  # linked DbHandler went down
540
  def handle_event(:info, {:EXIT, db_pid, reason}, _state, data) do
541
    Error.maybe_log_and_send_error(data.sock, {:error, :db_handler_exited, db_pid, reason})
×
542
    {:stop, :normal}
543
  end
544

545
  # pool's manager went down
546
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
547
    Logger.error(
3✔
548
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
3✔
549
    )
550

551
    case {state, reason} do
3✔
552
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
553
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
2✔
554
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
555
      {:busy, _} -> {:keep_state_and_data, :postpone}
1✔
556
    end
557
  end
558

559
  # emulate handle_cast
560
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
561
    Logger.debug("ClientHandler: Client is ready")
6,343✔
562

563
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
6,343✔
564

565
    {_, stats} =
6,343✔
566
      if data.local,
6,343✔
567
        do: {nil, data.stats},
5✔
568
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
6,338✔
569

570
    Telem.client_query_time(data.query_start, data.id, data.mode == :proxy)
6,343✔
571

572
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
6,343✔
573
     handle_actions(data)}
574
  end
575

576
  def handle_event(:cast, {:db_status, :ready_for_query}, :idle, _) do
6✔
577
    :keep_state_and_data
578
  end
579

580
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
581
    HandlerHelpers.sock_send(data.sock, error_message)
3✔
582
    {:stop, :normal}
583
  end
584

585
  def handle_event(:cast, :graceful_shutdown, :busy, _data) do
3✔
586
    {:keep_state_and_data, :postpone}
587
  end
588

589
  def handle_event(:cast, :graceful_shutdown, _state, data) do
590
    # Some clients will just show `tcp_closed` unless we send a ReadyForQuery after the fatal error
591
    HandlerHelpers.sock_send(data.sock, Server.encode_error_message(Server.admin_shutdown()))
8✔
592

593
    {:stop, :normal}
594
  end
595

596
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
597
      when sock_error in [:tcp_error, :ssl_error] do
598
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}, state was #{state}")
×
599

600
    {:stop, :normal}
601
  end
602

603
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
604
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
605
    :keep_state_and_data
606
  end
607

608
  # Authentication state handlers
609

610
  # MD5 authentication - waiting for password response
611
  def handle_event(:info, {proto, _socket, bin}, :auth_md5_wait, data) when proto in @proto do
612
    auth_context = data.auth_context
×
613

614
    with {:ok, client_md5} <- Auth.parse_auth_message(bin, auth_context.method),
×
615
         {:ok, key} <-
×
616
           Auth.validate_credentials(
617
             auth_context.method,
×
618
             auth_context.secrets.().secret,
×
619
             auth_context.salt,
×
620
             client_md5
621
           ) do
622
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
×
623
    else
624
      {:error, reason} ->
625
        handle_auth_failure(data.sock, reason, data, :auth_md5_wait)
×
626
    end
627
  end
628

629
  # SCRAM authentication - waiting for first message
630
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
631
      when proto in @proto do
632
    auth_context = data.auth_context
606✔
633

634
    case Auth.parse_auth_message(bin, auth_context.method) do
606✔
635
      {:ok, {user, nonce, channel}} ->
636
        {message, signatures} =
606✔
637
          Auth.prepare_auth_challenge(
638
            auth_context.method,
606✔
639
            auth_context.secrets,
606✔
640
            nonce,
641
            user,
642
            channel
643
          )
644

645
        :ok = HandlerHelpers.sock_send(data.sock, Server.exchange_message(:first, message))
606✔
646

647
        new_auth_context = Auth.update_auth_context_with_signatures(auth_context, signatures)
606✔
648
        new_data = %{data | auth_context: new_auth_context}
606✔
649
        {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
606✔
650

651
      {:error, reason} ->
652
        handle_auth_failure(data.sock, reason, data, :auth_scram_first_wait)
×
653
    end
654
  end
655

656
  # SCRAM authentication - waiting for final response
657
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
658
      when proto in @proto do
659
    auth_context = data.auth_context
601✔
660

661
    with {:ok, p} <- Auth.parse_auth_message(bin, auth_context.method),
601✔
662
         {:ok, key} <-
601✔
663
           Auth.validate_credentials(
664
             auth_context.method,
601✔
665
             auth_context.secrets,
601✔
666
             auth_context.signatures,
601✔
667
             p
668
           ) do
669
      message = Auth.build_scram_final_response(auth_context)
588✔
670
      :ok = HandlerHelpers.sock_send(data.sock, message)
588✔
671
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
588✔
672
    else
673
      {:error, reason} ->
674
        handle_auth_failure(data.sock, reason, data, :auth_scram_final_wait)
13✔
675
    end
676
  end
677

678
  # Authentication timeout handler
679
  def handle_event(:timeout, :auth_timeout, auth_state, data)
680
      when auth_state in [:auth_md5_wait, :auth_scram_first_wait, :auth_scram_final_wait] do
681
    handle_auth_failure(data.sock, :timeout, data, auth_state)
×
682
  end
683

684
  def handle_event(:enter, old_state, new_state, data) do
685
    Logger.metadata(state: new_state)
15,857✔
686

687
    case {old_state, new_state} do
15,857✔
688
      # This is emitted on initialization
689
      {:handshake, :handshake} ->
690
        {:next_state, new_state, data}
764✔
691

692
      # We are not interested in idle->busy->idle transitions
693
      {:idle, :busy} ->
694
        {:next_state, new_state, data}
6,362✔
695

696
      {:busy, :idle} ->
697
        {:next_state, new_state, data}
6,343✔
698

699
      _ ->
700
        now = System.monotonic_time()
2,388✔
701
        time_in_previous_state = now - data.state_entered_at
2,388✔
702

703
        :telemetry.execute(
2,388✔
704
          [:supavisor, :client_handler, :state],
705
          %{duration: time_in_previous_state},
706
          %{
707
            from_state: old_state,
708
            to_state: new_state,
709
            tenant: data.tenant
2,388✔
710
          }
711
        )
712

713
        {:next_state, new_state, %{data | state_entered_at: now}}
2,388✔
714
    end
715
  end
716

717
  # Terminate request
718
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, :idle, _data) when proto in @proto do
719
    Logger.info("ClientHandler: Terminate received from client")
264✔
720
    {:stop, :normal}
721
  end
722

723
  # Sync when idle and no db_connection - return sync directly
724
  def handle_event(
725
        _kind,
726
        {proto, _, <<?S, 4::32, _::binary>>},
727
        :idle,
728
        %{db_connection: nil} = data
729
      )
730
      when proto in @proto do
731
    Logger.debug("ClientHandler: Receive sync")
8✔
732
    :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
8✔
733

734
    {:keep_state, data, handle_actions(data)}
8✔
735
  end
736

737
  # Sync when busy - send to db
738
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, :busy, data)
739
      when proto in @proto do
740
    Logger.debug("ClientHandler: Receive sync")
225✔
741
    :ok = sock_send(msg, data)
225✔
742

743
    {:keep_state, data, handle_actions(data)}
225✔
744
  end
745

746
  # Any message when idle - checkout and send to db
747
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
748
    db_connection = maybe_checkout(:on_query, data)
6,362✔
749

750
    {:next_state, :busy,
6,362✔
751
     %{data | db_connection: db_connection, query_start: System.monotonic_time()},
752
     [{:next_event, :internal, {proto, socket, msg}}]}
753
  end
754

755
  # Any message when busy: send to db
756
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
757
    case handle_data(msg, data) do
15,113✔
758
      {:ok, updated_data} ->
15,107✔
759
        {:keep_state, updated_data}
760

761
      # Handle data already handles the errors, so we are fine to just ignore them
762
      # and terminate
763
      {:error, _reason} ->
6✔
764
        {:stop, :normal}
765
    end
766
  end
767

768
  # Any message when connecting - postpone
769
  def handle_event(_kind, {proto, _socket, _msg}, :connecting, _data) when proto in @proto do
×
770
    {:keep_state_and_data, :postpone}
771
  end
772

773
  def handle_event(type, content, state, _data) do
774
    msg = [
×
775
      {"type", type},
776
      {"content", content},
777
      {"state", state}
778
    ]
779

780
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
781

782
    :keep_state_and_data
783
  end
784

785
  @impl true
786
  def terminate(reason, state, _data) do
787
    Logger.metadata(state: state)
764✔
788

789
    level =
764✔
790
      case reason do
791
        :normal -> :debug
762✔
792
        _ -> :error
2✔
793
      end
794

795
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
764✔
796
  end
797

798
  @impl true
799
  def format_status(status) do
800
    Map.put(status, :queue, [])
2✔
801
  end
802

803
  ## Internal functions
804

805
  defp handle_auth_success(sock, {method, secrets}, client_key, data) do
806
    final_secrets = Auth.prepare_final_secrets(secrets, client_key)
588✔
807

808
    # Only store in TenantCache for pool modes (transaction/session)
809
    # For proxy mode, secrets are passed directly to DbHandler via data.auth
810
    if data.mode != :proxy do
588✔
811
      Supavisor.SecretCache.put_upstream_auth_secrets(data.id, method, final_secrets)
588✔
812
    end
813

814
    Logger.info("ClientHandler: Connection authenticated")
588✔
815
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
588✔
816
    Telem.client_join(:ok, data.id)
588✔
817

818
    auth = Map.put(data.auth, :secrets, {method, final_secrets})
588✔
819

820
    conn_type =
588✔
821
      if data.mode == :proxy,
588✔
822
        do: :connect_db,
823
        else: :subscribe
824

825
    {:next_state, :connecting,
588✔
826
     %{data | auth_context: nil, auth_secrets: {method, final_secrets}, auth: auth},
827
     {:next_event, :internal, conn_type}}
828
  end
829

830
  defp handle_auth_failure(sock, reason, data, context) do
831
    auth_context = data.auth_context
13✔
832

833
    # Check if secrets changed and update cache, but don't retry
834
    # Most clients don't cope well with auto-retry on auth errors
835
    Auth.check_and_update_secrets(
13✔
836
      auth_context.method,
13✔
837
      reason,
838
      data.id,
13✔
839
      auth_context.info,
13✔
840
      data.tenant,
13✔
841
      data.user,
13✔
842
      auth_context.secrets
13✔
843
    )
844

845
    Supavisor.CircuitBreaker.record_failure({data.tenant, data.peer_ip}, :auth_error)
13✔
846
    Error.maybe_log_and_send_error(sock, {:error, :auth_error, reason, data.user}, context)
13✔
847
    Telem.client_join(:fail, data.id)
13✔
848
    {:stop, :normal}
849
  end
850

851
  @spec maybe_checkout(:on_connect | :on_query, map) :: Data.db_connection()
852
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}})
853
       when is_pid(db_pid) and mode in [:session, :proxy] do
854
    {pool, db_pid, db_sock}
1,811✔
855
  end
856

857
  defp maybe_checkout(:on_connect, %{mode: :transaction}), do: nil
343✔
858

859
  defp maybe_checkout(_, data) do
860
    start = System.monotonic_time(:microsecond)
4,795✔
861

862
    with {:ok, db_pid} <- pool_checkout(data.pool, data.timeout),
4,795✔
863
         true <- Process.link(db_pid),
4,795✔
864
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self()) do
4,795✔
865
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,795✔
866
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,795✔
867
      {data.pool, db_pid, db_sock}
4,795✔
868
    else
869
      {:error, {:exit, {:timeout, _}}} ->
870
        timeout_error(data)
×
871

872
      {:error, %{"S" => "FATAL"} = error_map} ->
873
        Logger.debug(
×
874
          "ClientHandler: Received error from DbHandler checkout: #{inspect(error_map)}"
875
        )
876

877
        error_message = Server.encode_error_message(error_map)
×
878
        HandlerHelpers.sock_send(data.sock, error_message)
×
879
        {:stop, :normal}
880

881
      {:error, {:exit, e}} ->
UNCOV
882
        exit(e)
×
883
    end
884
  end
885

886
  defp timeout_error(data) do
887
    error =
×
888
      case data.mode do
×
889
        :session -> {:error, :session_timeout}
×
890
        :transaction -> {:error, :transaction_timeout}
×
891
      end
892

893
    Error.maybe_log_and_send_error(data.sock, error)
×
894
    {:stop, :normal}
895
  end
896

897
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
898
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
899

900
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
901
    Process.unlink(db_pid)
4,538✔
902
    :poolboy.checkin(pool, db_pid)
4,538✔
903
    nil
904
  end
905

906
  defp maybe_checkin(:session, _, db_connection), do: db_connection
1,805✔
907
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
908

909
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, atom()}
910
  defp handle_data(data_to_send, data) do
911
    Logger.debug(
15,113✔
912
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,113✔
913
    )
914

915
    with {:ok, new_stream_state, pkts} <-
15,113✔
916
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,113✔
917
         :ok <- sock_send(pkts, data) do
15,107✔
918
      {:ok, %{data | stream_state: new_stream_state}}
919
    else
920
      error ->
921
        Error.maybe_log_and_send_error(data.sock, error, "sending query")
6✔
922
        {:error, elem(error, 1)}
923
    end
924
  end
925

926
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
927
  defp handle_actions(%{} = data) do
928
    heartbeat =
7,161✔
929
      if data.heartbeat_interval > 0,
7,161✔
930
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
7,161✔
931
        else: []
932

933
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
7,161✔
934

935
    idle ++ heartbeat
7,161✔
936
  end
937

938
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
939
  defp sock_send(bin_or_pkts, data) do
940
    {_pool, db_handler, db_sock} = data.db_connection
15,332✔
941

942
    case bin_or_pkts do
15,332✔
943
      pkts when is_list(pkts) ->
944
        # Chunking to ensure we send bigger packets
945
        pkts
946
        |> Enum.chunk_by(&is_tuple/1)
947
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,212✔
948
          case chunk do
949
            [t | _] = prepared_pkts when is_tuple(t) ->
950
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,884✔
951

952
            bins ->
953
              HandlerHelpers.sock_send(db_sock, bins)
5,836✔
954
          end
955
          |> case do
8,720✔
956
            :ok -> {:cont, :ok}
8,720✔
957
            error -> {:halt, error}
×
958
          end
959
        end)
960

961
      bin ->
962
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,120✔
963
    end
964
  end
965

966
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
967
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
968
    if subscribe_retries < @subscribe_retries do
6✔
969
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
6✔
970

971
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
6✔
972
       {:timeout, @timeout_subscribe, :subscribe}}
973
    else
974
      Error.maybe_log_and_send_error(data.sock, {:error, :subscribe_retries_exhausted})
×
975
      {:stop, :normal}
976
    end
977
  end
978

979
  defp pool_checkout(pool, timeout) do
4,795✔
980
    {:ok, :poolboy.checkout(pool, true, timeout)}
981
  catch
982
    :exit, reason -> {:error, {:exit, reason}}
×
983
  end
984

985
  defp set_tenant_info(data, info, user, id, db_name) do
986
    proxy_type =
609✔
987
      if info.tenant.require_user,
609✔
988
        do: :password,
989
        else: :auth_query
990

991
    auth = %{
609✔
992
      application_name: data.app_name || "Supavisor",
609✔
993
      database: db_name,
994
      host: to_charlist(info.tenant.db_host),
609✔
995
      sni_hostname:
996
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
609✔
997
      port: info.tenant.db_port,
609✔
998
      user: user,
999
      password: info.user.db_password,
609✔
1000
      require_user: info.tenant.require_user,
609✔
1001
      method: proxy_type,
1002
      upstream_ssl: info.tenant.upstream_ssl,
609✔
1003
      upstream_tls_ca: info.tenant.upstream_tls_ca,
609✔
1004
      upstream_verify: info.tenant.upstream_verify
609✔
1005
    }
1006

1007
    %{
1008
      data
1009
      | id: id,
609✔
1010
        tenant: info.tenant.external_id,
609✔
1011
        tenant_feature_flags: info.tenant.feature_flags,
609✔
1012
        tenant_availability_zone: info.tenant.availability_zone,
609✔
1013
        user: user,
1014
        db_name: db_name,
1015
        timeout: info.user.pool_checkout_timeout,
609✔
1016
        ps: info.tenant.default_parameter_status,
609✔
1017
        proxy_type: proxy_type,
1018
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
609✔
1019
        auth: auth
1020
    }
1021
  end
1022

1023
  defp check_max_clients_reached(id, info, mode) do
1024
    limit =
610✔
1025
      if mode == :session do
1026
        info.user.pool_size || info.tenant.default_pool_size
248✔
1027
      else
1028
        info.user.max_clients || info.tenant.default_max_clients
362✔
1029
      end
1030

1031
    case Registry.lookup(Supavisor.Registry.ManagerTables, id) do
610✔
1032
      [{_pid, tid}] ->
1033
        current_clients = :ets.info(tid, :size)
535✔
1034

1035
        current_clients >= limit
535✔
1036

1037
      _ ->
75✔
1038
        false
1039
    end
1040
  end
1041
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc