• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19911268462

03 Dec 2025 10:43PM UTC coverage: 63.801% (-0.2%) from 63.971%
19911268462

push

github

web-flow
chore: more buckets for higher resolution on histograms (#784)

1870 of 2931 relevant lines covered (63.8%)

4193.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.82
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  @behaviour :ranch_protocol
13
  @behaviour :gen_statem
14
  @proto [:tcp, :ssl]
15
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
16
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
17
  @timeout_subscribe 500
18
  @clients_registry Supavisor.Registry.TenantClients
19
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
20

21
  alias Supavisor.{
22
    DbHandler,
23
    HandlerHelpers,
24
    Helpers,
25
    Monitoring.Telem,
26
    Protocol.Debug,
27
    Tenants
28
  }
29

30
  alias Supavisor.ClientHandler.{
31
    Auth,
32
    Cancel,
33
    Data,
34
    Error,
35
    ProtocolHelpers,
36
    Proxy
37
  }
38

39
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
40

41
  require Supavisor.Protocol.Server, as: Server
42
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
43

44
  @impl true
45
  def start_link(ref, transport, opts) do
46
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
735✔
47
    {:ok, pid}
48
  end
49

50
  @impl true
51
  def callback_mode, do: [:handle_event_function, :state_enter]
735✔
52

53
  @spec db_status(pid(), :ready_for_query) :: :ok
54
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
4,534✔
55

56
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
57
  def send_error_and_terminate(pid, error_message),
58
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
3✔
59

60
  @spec graceful_shutdown(pid()) :: :ok
61
  def graceful_shutdown(pid), do: :gen_statem.cast(pid, :graceful_shutdown)
12✔
62

63
  @impl true
64
  def init(_), do: :ignore
×
65

66
  def init(ref, trans, opts) do
67
    Process.flag(:trap_exit, true)
735✔
68
    Helpers.set_max_heap_size(90)
735✔
69

70
    {:ok, sock} = :ranch.handshake(ref)
735✔
71
    peer_ip = Helpers.peer_ip(sock)
735✔
72
    local = opts[:local] || false
735✔
73

74
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
735✔
75
    :ok = trans.setopts(sock, active: @switch_active_count)
735✔
76
    Logger.debug("ClientHandler is: #{inspect(self())}")
735✔
77

78
    now = System.monotonic_time()
735✔
79

80
    data = %Data{
735✔
81
      sock: {:gen_tcp, sock},
82
      trans: trans,
83
      peer_ip: peer_ip,
84
      local: local,
85
      ssl: false,
86
      auth: %{},
87
      mode: opts.mode,
735✔
88
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
89
      stats: %{},
90
      idle_timeout: 0,
91
      heartbeat_interval: 0,
92
      connection_start: now,
93
      state_entered_at: now,
94
      subscribe_retries: 0
95
    }
96

97
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
735✔
98
  end
99

100
  @impl true
101
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
102
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
103

104
    HandlerHelpers.sock_send(
1✔
105
      data.sock,
1✔
106
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
107
    )
108

109
    {:stop, :normal}
110
  end
111

112
  # cancel request
113
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
114
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
128✔
115
    :ok = Cancel.send_cancel_query(pid, key)
128✔
116
    {:stop, :normal}
117
  end
118

119
  # send cancel request to db
120
  def handle_event(:info, :cancel_query, state, data) do
121
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
116✔
122
    :keep_state_and_data
123
  end
124

125
  def handle_event(
126
        :info,
127
        {:tcp, _, Server.ssl_request_message()},
128
        :handshake,
129
        %{sock: sock} = data
130
      ) do
131
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
1✔
132

133
    downstream_cert = Helpers.downstream_cert()
1✔
134
    downstream_key = Helpers.downstream_key()
1✔
135

136
    # SSL negotiation, S/N/Error
137
    if !!downstream_cert and !!downstream_key do
1✔
138
      :ok = HandlerHelpers.setopts(sock, active: false)
1✔
139
      :ok = HandlerHelpers.sock_send(sock, "S")
1✔
140

141
      opts = [
1✔
142
        verify: :verify_none,
143
        certfile: downstream_cert,
144
        keyfile: downstream_key
145
      ]
146

147
      case :ssl.handshake(elem(sock, 1), opts) do
1✔
148
        {:ok, ssl_sock} ->
149
          socket = {:ssl, ssl_sock}
1✔
150
          :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
1✔
151
          {:keep_state, %{data | sock: socket, ssl: true}}
152

153
        error ->
154
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
155
          Telem.client_join(:fail, data.id)
×
156
          {:stop, :normal}
157
      end
158
    else
159
      Logger.error(
×
160
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
161
      )
162

163
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
164
      :keep_state_and_data
165
    end
166
  end
167

168
  def handle_event(:info, {_, _, bin}, :handshake, _) when byte_size(bin) > 1024 do
169
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
170
    {:stop, :normal}
171
  end
172

173
  def handle_event(:info, {_, _, bin}, :handshake, data) do
174
    case ProtocolHelpers.parse_startup_packet(bin) do
600✔
175
      {:ok, {type, {user, tenant_or_alias, db_name, search_path}}, app_name, _log_level} ->
176
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
593✔
177

178
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
593✔
179

180
      {:error, {:invalid_user_info, {:invalid_format, {user, _}}} = reason} ->
181
        # Extract tenant from the attempted parsing for telemetry
182
        {_, {_, tenant_or_alias, _}} = HandlerHelpers.parse_user_info(%{"user" => user})
1✔
183
        Telem.client_join(:fail, tenant_or_alias)
1✔
184
        Error.maybe_log_and_send_error(data.sock, {:error, reason})
1✔
185
        {:stop, :normal}
186

187
      {:error, error} ->
188
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
6✔
189
        Telem.client_join(:fail, data.id)
6✔
190
        {:stop, :normal}
191
    end
192
  end
193

194
  def handle_event(
195
        :internal,
196
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
197
        :handshake,
198
        %{sock: sock} = data
199
      ) do
200
    sni_hostname = HandlerHelpers.try_get_sni(sock)
593✔
201

202
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
593✔
203
      {:ok, info} ->
204
        db_name = db_name || info.tenant.db_database
593✔
205

206
        id =
593✔
207
          Supavisor.id(
208
            {type, tenant_or_alias},
209
            user,
210
            data.mode,
593✔
211
            info.user.mode_type,
593✔
212
            db_name,
213
            search_path
214
          )
215

216
        Logger.metadata(
593✔
217
          project: tenant_or_alias,
218
          user: user,
219
          mode: data.mode,
593✔
220
          type: type,
221
          db_name: db_name,
222
          app_name: data.app_name
593✔
223
        )
224

225
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
593✔
226

227
        cond do
593✔
228
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
593✔
229
            Error.maybe_log_and_send_error(sock, {:error, :ssl_required, user})
×
230
            Telem.client_join(:fail, id)
×
231
            {:stop, :normal}
232

233
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
593✔
234
            Error.maybe_log_and_send_error(sock, {:error, :address_not_allowed, addr})
×
235
            Telem.client_join(:fail, id)
×
236
            {:stop, :normal}
237

238
          check_max_clients_reached(id, info, data.mode) ->
593✔
239
            error =
1✔
240
              if data.mode == :session do
1✔
241
                {:error, :max_clients_reached_session}
242
              else
243
                {:error, :max_clients_reached}
244
              end
245

246
            Error.maybe_log_and_send_error(sock, error)
1✔
247
            Telem.client_join(:fail, id)
1✔
248
            {:stop, :normal}
249

250
          true ->
592✔
251
            new_data = set_tenant_info(data, info, user, id, db_name)
592✔
252

253
            case Supavisor.CircuitBreaker.check(tenant_or_alias, :get_secrets) do
592✔
254
              :ok ->
255
                case Auth.get_user_secrets(data.id, info, user, tenant_or_alias) do
591✔
256
                  {:ok, auth_secrets} ->
257
                    Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
591✔
258

259
                    {:keep_state, new_data,
591✔
260
                     {:next_event, :internal, {:handle, auth_secrets, info}}}
261

262
                  {:error, reason} ->
263
                    Supavisor.CircuitBreaker.record_failure(tenant_or_alias, :get_secrets)
×
264

265
                    Error.maybe_log_and_send_error(
×
266
                      sock,
267
                      {:error, :auth_error, reason},
268
                      :handshake
269
                    )
270

271
                    Telem.client_join(:fail, id)
×
272
                    {:stop, :normal}
273
                end
274

275
              {:error, :circuit_open, blocked_until} ->
276
                Error.maybe_log_and_send_error(
1✔
277
                  sock,
278
                  {:error, :circuit_breaker_open, :get_secrets, blocked_until},
279
                  :handshake
280
                )
281

282
                Telem.client_join(:fail, id)
1✔
283
                {:stop, :normal}
284
            end
285
        end
286

287
      {:error, reason} ->
288
        Error.maybe_log_and_send_error(
×
289
          sock,
290
          {:error, :tenant_not_found, reason, type, user, tenant_or_alias}
291
        )
292

293
        Telem.client_join(:fail, data.id)
×
294
        {:stop, :normal}
295
    end
296
  end
297

298
  def handle_event(
299
        :internal,
300
        {:handle, {method, secrets}, info},
301
        _state,
302
        %{sock: sock} = data
303
      ) do
304
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
591✔
305

306
    case Supavisor.CircuitBreaker.check({data.tenant, data.peer_ip}, :auth_error) do
591✔
307
      :ok ->
308
        case method do
590✔
309
          :auth_query_md5 ->
310
            auth_context = Auth.create_auth_context(method, secrets, info)
×
311
            :ok = HandlerHelpers.sock_send(sock, Server.md5_request(auth_context.salt))
×
312

313
            {:next_state, :auth_md5_wait, %{data | auth_context: auth_context},
×
314
             {:timeout, 15_000, :auth_timeout}}
315

316
          _scram_method ->
317
            :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
590✔
318
            auth_context = Auth.create_auth_context(method, secrets, info)
590✔
319

320
            {:next_state, :auth_scram_first_wait, %{data | auth_context: auth_context},
590✔
321
             {:timeout, 15_000, :auth_timeout}}
322
        end
323

324
      {:error, :circuit_open, blocked_until} ->
325
        Error.maybe_log_and_send_error(
1✔
326
          sock,
327
          {:error, :circuit_breaker_open, :auth_error, blocked_until},
328
          :handshake
329
        )
330

331
        Telem.client_join(:fail, data.id)
1✔
332
        {:stop, :normal}
333
    end
334
  end
335

336
  def handle_event(:internal, :subscribe, _state, data) do
337
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
574✔
338

339
    with :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
574✔
340
         {:ok, sup} <-
573✔
341
           Supavisor.start_dist(data.id, data.auth_secrets,
573✔
342
             availability_zone: data.tenant_availability_zone,
573✔
343
             log_level: nil
344
           ),
345
         true <-
573✔
346
           if(node(sup) != node() and data.mode in [:transaction, :session],
573✔
347
             do: :proxy,
348
             else: true
349
           ),
350
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
573✔
351
      manager_ref = Process.monitor(opts.workers.manager)
568✔
352
      data = Map.merge(data, opts.workers)
568✔
353
      db_connection = maybe_checkout(:on_connect, data)
568✔
354

355
      data = %{
567✔
356
        data
357
        | manager: manager_ref,
358
          db_connection: db_connection,
359
          idle_timeout: opts.idle_timeout
567✔
360
      }
361

362
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
567✔
363

364
      cond do
567✔
365
        data.client_ready ->
567✔
366
          {:next_state, :idle, data, handle_actions(data)}
2✔
367

368
        opts.ps == [] ->
565✔
369
          {:keep_state, data, {:timeout, 10_000, :wait_ps}}
48✔
370

371
        true ->
517✔
372
          {:keep_state, data, {:next_event, :internal, {:greetings, opts.ps}}}
517✔
373
      end
374
    else
375
      {:error, :circuit_open, blocked_until} ->
376
        Error.maybe_log_and_send_error(
1✔
377
          data.sock,
1✔
378
          {:error, :circuit_breaker_open, :db_connection, blocked_until}
379
        )
380

381
        Telem.client_join(:fail, data.id)
1✔
382
        {:stop, :normal}
383

384
      {:error, :max_clients_reached} ->
385
        Error.maybe_log_and_send_error(data.sock, {:error, :max_clients_reached})
1✔
386
        Telem.client_join(:fail, data.id)
1✔
387
        {:stop, :normal}
388

389
      {:error, :max_pools_reached} ->
390
        Error.maybe_log_and_send_error(data.sock, {:error, :max_pools_reached})
×
391
        Telem.client_join(:fail, data.id)
×
392
        {:stop, :normal}
393

394
      {:error, :terminating, error} ->
395
        error_message = Server.encode_error_message(error)
1✔
396
        HandlerHelpers.sock_send(data.sock, error_message)
1✔
397
        Telem.client_join(:fail, data.id)
1✔
398
        {:stop, :normal}
399

400
      :proxy ->
401
        case Proxy.prepare_proxy_connection(data) do
×
402
          {:ok, updated_data} ->
403
            Logger.metadata(proxy: true)
×
404
            Registry.register(@proxy_clients_registry, data.id, [])
×
405

406
            {:keep_state, updated_data, {:next_event, :internal, :connect_db}}
×
407

408
          {:error, other} ->
409
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
410
            timeout_subscribe_or_terminate(data)
×
411
        end
412

413
      error ->
414
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
3✔
415
        timeout_subscribe_or_terminate(data)
3✔
416
    end
417
  end
418

419
  def handle_event(:internal, :connect_db, _state, data) do
420
    Logger.debug("ClientHandler: Trying to connect to DB")
×
421

422
    args = Proxy.build_db_handler_args(data)
×
423

424
    {:ok, db_pid} = DbHandler.start_link(args)
×
425

426
    case DbHandler.checkout(db_pid, data.sock, self()) do
×
427
      {:ok, db_sock} ->
×
428
        {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
429

430
      {:error, {:exit, {:timeout, _}}} ->
431
        timeout_error(data)
×
432

433
      # Errors are already forwarded to the client socket, so we can safely ignore them
434
      # here.
435
      {:error, {:exit, {reason, _}}} ->
436
        Logger.error(
×
437
          "ClientHandler: error checking out DbHandler (proxy), exit with reason: #{inspect(reason)}"
438
        )
439

440
        {:stop, :normal}
441
    end
442
  end
443

444
  def handle_event(:internal, {:greetings, ps}, _state, %{sock: sock} = data) do
445
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
564✔
446
    msg = [ps, [header, payload], Server.ready_for_query()]
564✔
447
    :ok = Cancel.listen_cancel_query(pid, key)
564✔
448
    :ok = HandlerHelpers.sock_send(sock, msg)
564✔
449
    Telem.client_connection_time(data.connection_start, data.id)
563✔
450
    {:next_state, :idle, %{data | client_ready: true}, handle_actions(data)}
563✔
451
  end
452

453
  def handle_event(:timeout, :subscribe, _state, _) do
3✔
454
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
455
  end
456

457
  def handle_event(:timeout, :wait_ps, _state, data) do
458
    Logger.warning(
×
459
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
460
    )
461

462
    ps = Server.encode_parameter_status(data.ps)
×
463
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
464
  end
465

466
  def handle_event(:timeout, :idle_terminate, _state, data) do
467
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
468
    {:stop, :normal}
469
  end
470

471
  def handle_event(:timeout, :heartbeat_check, _state, data) do
472
    Logger.debug("ClientHandler: Send heartbeat to client")
×
473
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
474
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
475
  end
476

477
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
47✔
478
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
479
  end
480

481
  # TLS alert handling: WARNING alerts keep the connection alive, FATAL alerts terminate it.
482
  # For FATAL alerts, Erlang doesn't send ssl_closed, so we must terminate here.
483
  # The alert level is only available by parsing the message string ("Warning - " or "Fatal - ").
484
  def handle_event(:info, {:ssl_error, sock, {:tls_alert, {_reason, msg}}}, _, %{sock: {_, sock}}) do
485
    msg_string = to_string(msg)
3✔
486

487
    if String.contains?(msg_string, "Fatal -") do
3✔
488
      Logger.warning(
2✔
489
        "ClientHandler: Received fatal TLS alert: #{msg_string}, terminating connection"
2✔
490
      )
491

492
      {:stop, :normal}
493
    else
494
      Logger.warning(
1✔
495
        "ClientHandler: Received TLS warning alert: #{msg_string}, keeping connection alive"
1✔
496
      )
497

498
      :keep_state_and_data
499
    end
500
  end
501

502
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
503
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
504
    :keep_state_and_data
505
  end
506

507
  # client closed connection
508
  def handle_event(_, {closed, _}, state, data)
509
      when closed in [:tcp_closed, :ssl_closed] do
510
    level =
399✔
511
      cond do
512
        state == :idle or data.mode == :proxy ->
399✔
513
          :info
514

515
        state == :handshake ->
245✔
516
          :warning
517

518
        true ->
239✔
519
          :error
520
      end
521

522
    Logger.log(
399✔
523
      level,
524
      "ClientHandler: socket closed while state was #{state} (#{data.mode})"
399✔
525
    )
526

527
    {:stop, :normal}
528
  end
529

530
  # linked DbHandler went down
531
  def handle_event(:info, {:EXIT, db_pid, reason}, _state, data) do
532
    Error.maybe_log_and_send_error(data.sock, {:error, :db_handler_exited, db_pid, reason})
×
533
    {:stop, :normal}
534
  end
535

536
  # pool's manager went down
537
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
538
    Logger.error(
3✔
539
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
3✔
540
    )
541

542
    case {state, reason} do
3✔
543
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
544
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
2✔
545
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
546
      {:busy, _} -> {:keep_state_and_data, :postpone}
1✔
547
    end
548
  end
549

550
  # emulate handle_cast
551
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
552
    Logger.debug("ClientHandler: Client is ready")
4,534✔
553

554
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
4,534✔
555

556
    {_, stats} =
4,534✔
557
      if data.local,
4,534✔
558
        do: {nil, data.stats},
3✔
559
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
4,531✔
560

561
    Telem.client_query_time(data.query_start, data.id)
4,534✔
562

563
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
4,534✔
564
     handle_actions(data)}
565
  end
566

567
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
568
    HandlerHelpers.sock_send(data.sock, error_message)
3✔
569
    {:stop, :normal}
570
  end
571

572
  def handle_event(:cast, :graceful_shutdown, :busy, _data) do
3✔
573
    {:keep_state_and_data, :postpone}
574
  end
575

576
  def handle_event(:cast, :graceful_shutdown, _state, data) do
577
    # Some clients will just show `tcp_closed` unless we send a ReadyForQuery after the fatal error
578
    HandlerHelpers.sock_send(data.sock, Server.encode_error_message(Server.admin_shutdown()))
8✔
579

580
    {:stop, :normal}
581
  end
582

583
  def handle_event(:info, {sock_error, _sock, msg}, state, _data)
584
      when sock_error in [:tcp_error, :ssl_error] do
585
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}, state was #{state}")
×
586

587
    {:stop, :normal}
588
  end
589

590
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
591
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
592
    :keep_state_and_data
593
  end
594

595
  # Authentication state handlers
596

597
  # MD5 authentication - waiting for password response
598
  def handle_event(:info, {proto, _socket, bin}, :auth_md5_wait, data) when proto in @proto do
599
    auth_context = data.auth_context
×
600

601
    with {:ok, client_md5} <- Auth.parse_auth_message(bin, auth_context.method),
×
602
         {:ok, key} <-
×
603
           Auth.validate_credentials(
604
             auth_context.method,
×
605
             auth_context.secrets.().secret,
×
606
             auth_context.salt,
×
607
             client_md5
608
           ) do
609
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
×
610
    else
611
      {:error, reason} ->
612
        handle_auth_failure(data.sock, reason, data, :auth_md5_wait)
×
613
    end
614
  end
615

616
  # SCRAM authentication - waiting for first message
617
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
618
      when proto in @proto do
619
    auth_context = data.auth_context
587✔
620

621
    case Auth.parse_auth_message(bin, auth_context.method) do
587✔
622
      {:ok, {user, nonce, channel}} ->
623
        {message, signatures} =
587✔
624
          Auth.prepare_auth_challenge(
625
            auth_context.method,
587✔
626
            auth_context.secrets,
587✔
627
            nonce,
628
            user,
629
            channel
630
          )
631

632
        :ok = HandlerHelpers.sock_send(data.sock, Server.exchange_message(:first, message))
587✔
633

634
        new_auth_context = Auth.update_auth_context_with_signatures(auth_context, signatures)
587✔
635
        new_data = %{data | auth_context: new_auth_context}
587✔
636
        {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
587✔
637

638
      {:error, reason} ->
639
        handle_auth_failure(data.sock, reason, data, :auth_scram_first_wait)
×
640
    end
641
  end
642

643
  # SCRAM authentication - waiting for final response
644
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
645
      when proto in @proto do
646
    auth_context = data.auth_context
582✔
647

648
    with {:ok, p} <- Auth.parse_auth_message(bin, auth_context.method),
582✔
649
         {:ok, key} <-
582✔
650
           Auth.validate_credentials(
651
             auth_context.method,
582✔
652
             auth_context.secrets,
582✔
653
             auth_context.signatures,
582✔
654
             p
655
           ) do
656
      message = Auth.build_scram_final_response(auth_context)
569✔
657
      :ok = HandlerHelpers.sock_send(data.sock, message)
569✔
658
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
569✔
659
    else
660
      {:error, reason} ->
661
        handle_auth_failure(data.sock, reason, data, :auth_scram_final_wait)
13✔
662
    end
663
  end
664

665
  # Authentication timeout handler
666
  def handle_event(:timeout, :auth_timeout, auth_state, data)
667
      when auth_state in [:auth_md5_wait, :auth_scram_first_wait, :auth_scram_final_wait] do
668
    handle_auth_failure(data.sock, {:timeout, auth_state}, data, auth_state)
×
669
  end
670

671
  def handle_event(:enter, old_state, new_state, data) do
672
    Logger.metadata(state: new_state)
12,355✔
673

674
    case {old_state, new_state} do
12,355✔
675
      # This is emitted on initialization
676
      {:handshake, :handshake} ->
677
        {:next_state, new_state, data}
735✔
678

679
      # We are not interested in idle->busy->idle transitions
680
      {:idle, :busy} ->
681
        {:next_state, new_state, data}
4,773✔
682

683
      {:busy, :idle} ->
684
        {:next_state, new_state, data}
4,534✔
685

686
      _ ->
687
        now = System.monotonic_time()
2,313✔
688
        time_in_previous_state = now - data.state_entered_at
2,313✔
689

690
        :telemetry.execute(
2,313✔
691
          [:supavisor, :client_handler, :state],
692
          %{duration: time_in_previous_state},
693
          %{
694
            from_state: old_state,
695
            to_state: new_state,
696
            tenant: data.tenant
2,313✔
697
          }
698
        )
699

700
        {:next_state, new_state, %{data | state_entered_at: now}}
2,313✔
701
    end
702
  end
703

704
  # Terminate request
705
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, :idle, _data) when proto in @proto do
706
    Logger.info("ClientHandler: Terminate received from client")
161✔
707
    {:stop, :normal}
708
  end
709

710
  # Sync when idle and no db_connection - return sync directly
711
  def handle_event(
712
        _kind,
713
        {proto, _, <<?S, 4::32, _::binary>>},
714
        :idle,
715
        %{db_connection: nil} = data
716
      )
717
      when proto in @proto do
718
    Logger.debug("ClientHandler: Receive sync")
7✔
719
    :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
7✔
720

721
    {:keep_state, data, handle_actions(data)}
7✔
722
  end
723

724
  # Sync when busy - send to db
725
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, :busy, data)
726
      when proto in @proto do
727
    Logger.debug("ClientHandler: Receive sync")
225✔
728
    :ok = sock_send(msg, data)
225✔
729

730
    {:keep_state, data, handle_actions(data)}
225✔
731
  end
732

733
  # Any message when idle - checkout and send to db
734
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
735
    db_connection = maybe_checkout(:on_query, data)
4,773✔
736

737
    {:next_state, :busy,
4,773✔
738
     %{data | db_connection: db_connection, query_start: System.monotonic_time()},
739
     [{:next_event, :internal, {proto, socket, msg}}]}
740
  end
741

742
  # Any message when busy: send to db
743
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
744
    case handle_data(msg, data) do
15,183✔
745
      {:ok, updated_data} ->
15,177✔
746
        {:keep_state, updated_data}
747

748
      # Handle data already handles the errors, so we are fine to just ignore them
749
      # and terminate
750
      {:error, _reason} ->
6✔
751
        {:stop, :normal}
752
    end
753
  end
754

755
  # Any message when connecting - postpone
756
  def handle_event(_kind, {proto, _socket, _msg}, :connecting, _data) when proto in @proto do
×
757
    {:keep_state_and_data, :postpone}
758
  end
759

760
  def handle_event(type, content, state, _data) do
761
    msg = [
×
762
      {"type", type},
763
      {"content", content},
764
      {"state", state}
765
    ]
766

767
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
768

769
    :keep_state_and_data
770
  end
771

772
  @impl true
773
  def terminate(reason, state, _data) do
774
    Logger.metadata(state: state)
735✔
775

776
    level =
735✔
777
      case reason do
778
        :normal -> :debug
733✔
779
        _ -> :error
2✔
780
      end
781

782
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
735✔
783
  end
784

785
  ## Internal functions
786

787
  defp handle_auth_success(sock, {method, secrets}, client_key, data) do
788
    final_secrets = Auth.prepare_final_secrets(secrets, client_key)
569✔
789

790
    # Only store in TenantCache for pool modes (transaction/session)
791
    # For proxy mode, secrets are passed directly to DbHandler via data.auth
792
    if data.mode != :proxy do
569✔
793
      Supavisor.SecretCache.put_upstream_auth_secrets(data.id, method, final_secrets)
569✔
794
    end
795

796
    Logger.info("ClientHandler: Connection authenticated")
569✔
797
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
569✔
798
    Telem.client_join(:ok, data.id)
569✔
799

800
    auth = Map.put(data.auth, :secrets, {method, final_secrets})
569✔
801

802
    conn_type =
569✔
803
      if data.mode == :proxy,
569✔
804
        do: :connect_db,
805
        else: :subscribe
806

807
    {:next_state, :connecting,
569✔
808
     %{data | auth_context: nil, auth_secrets: {method, final_secrets}, auth: auth},
809
     {:next_event, :internal, conn_type}}
810
  end
811

812
  defp handle_auth_failure(sock, reason, data, context) do
813
    auth_context = data.auth_context
13✔
814

815
    # Check if secrets changed and update cache, but don't retry
816
    # Most clients don't cope well with auto-retry on auth errors
817
    Auth.check_and_update_secrets(
13✔
818
      auth_context.method,
13✔
819
      reason,
820
      data.id,
13✔
821
      auth_context.info,
13✔
822
      data.tenant,
13✔
823
      data.user,
13✔
824
      auth_context.secrets
13✔
825
    )
826

827
    Supavisor.CircuitBreaker.record_failure({data.tenant, data.peer_ip}, :auth_error)
13✔
828

829
    Error.maybe_log_and_send_error(sock, {:error, :auth_error, reason, data.user}, context)
13✔
830
    Telem.client_join(:fail, data.id)
13✔
831
    {:stop, :normal}
832
  end
833

834
  @spec maybe_checkout(:on_connect | :on_query, map) :: Data.db_connection()
835
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}})
836
       when is_pid(db_pid) and mode in [:session, :proxy] do
837
    {pool, db_pid, db_sock}
226✔
838
  end
839

840
  defp maybe_checkout(:on_connect, %{mode: :transaction}), do: nil
337✔
841

842
  defp maybe_checkout(_, data) do
843
    start = System.monotonic_time(:microsecond)
4,778✔
844

845
    with {:ok, db_pid} <- pool_checkout(data.pool, data.timeout),
4,778✔
846
         true <- Process.link(db_pid),
4,778✔
847
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self()) do
4,778✔
848
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,777✔
849
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,777✔
850
      {data.pool, db_pid, db_sock}
4,777✔
851
    else
852
      {:error, {:exit, {:timeout, _}}} ->
853
        timeout_error(data)
×
854

855
      {:error, {:exit, e}} ->
856
        exit(e)
1✔
857
    end
858
  end
859

860
  defp timeout_error(data) do
861
    error =
×
862
      case data.mode do
×
863
        :session -> {:error, :session_timeout}
×
864
        :transaction -> {:error, :transaction_timeout}
×
865
      end
866

867
    Error.maybe_log_and_send_error(data.sock, error)
×
868
    {:stop, :normal}
869
  end
870

871
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
872
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
873

874
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
875
    Process.unlink(db_pid)
4,534✔
876
    :poolboy.checkin(pool, db_pid)
4,534✔
877
    nil
878
  end
879

880
  defp maybe_checkin(:session, _, db_connection), do: db_connection
×
881
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
882

883
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, atom()}
884
  defp handle_data(data_to_send, data) do
885
    Logger.debug(
15,183✔
886
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,183✔
887
    )
888

889
    with {:ok, new_stream_state, pkts} <-
15,183✔
890
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,183✔
891
         :ok <- sock_send(pkts, data) do
15,177✔
892
      {:ok, %{data | stream_state: new_stream_state}}
893
    else
894
      error ->
895
        Error.maybe_log_and_send_error(data.sock, error, "sending query")
6✔
896
        {:error, elem(error, 1)}
897
    end
898
  end
899

900
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
901
  defp handle_actions(%{} = data) do
902
    heartbeat =
5,331✔
903
      if data.heartbeat_interval > 0,
5,331✔
904
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
5,331✔
905
        else: []
906

907
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
5,331✔
908

909
    idle ++ heartbeat
5,331✔
910
  end
911

912
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
913
  defp sock_send(bin_or_pkts, data) do
914
    {_pool, db_handler, db_sock} = data.db_connection
15,402✔
915

916
    case bin_or_pkts do
15,402✔
917
      pkts when is_list(pkts) ->
918
        # Chunking to ensure we send bigger packets
919
        pkts
920
        |> Enum.chunk_by(&is_tuple/1)
921
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,216✔
922
          case chunk do
923
            [t | _] = prepared_pkts when is_tuple(t) ->
924
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,883✔
925

926
            bins ->
927
              HandlerHelpers.sock_send(db_sock, bins)
5,839✔
928
          end
929
          |> case do
8,722✔
930
            :ok -> {:cont, :ok}
8,722✔
931
            error -> {:halt, error}
×
932
          end
933
        end)
934

935
      bin ->
936
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,186✔
937
    end
938
  end
939

940
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
941
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
942
    if subscribe_retries < @subscribe_retries do
3✔
943
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
3✔
944

945
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
3✔
946
       {:timeout, @timeout_subscribe, :subscribe}}
947
    else
948
      Error.maybe_log_and_send_error(data.sock, {:error, :subscribe_retries_exhausted})
×
949
      {:stop, :normal}
950
    end
951
  end
952

953
  defp pool_checkout(pool, timeout) do
4,778✔
954
    {:ok, :poolboy.checkout(pool, true, timeout)}
955
  catch
956
    :exit, reason -> {:error, {:exit, reason}}
×
957
  end
958

959
  defp set_tenant_info(data, info, user, id, db_name) do
960
    proxy_type =
592✔
961
      if info.tenant.require_user,
592✔
962
        do: :password,
963
        else: :auth_query
964

965
    auth = %{
592✔
966
      application_name: data.app_name || "Supavisor",
592✔
967
      database: db_name,
968
      host: to_charlist(info.tenant.db_host),
592✔
969
      sni_hostname:
970
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
592✔
971
      port: info.tenant.db_port,
592✔
972
      user: user,
973
      password: info.user.db_password,
592✔
974
      require_user: info.tenant.require_user,
592✔
975
      method: proxy_type,
976
      upstream_ssl: info.tenant.upstream_ssl,
592✔
977
      upstream_tls_ca: info.tenant.upstream_tls_ca,
592✔
978
      upstream_verify: info.tenant.upstream_verify
592✔
979
    }
980

981
    %{
982
      data
983
      | id: id,
592✔
984
        tenant: info.tenant.external_id,
592✔
985
        tenant_feature_flags: info.tenant.feature_flags,
592✔
986
        tenant_availability_zone: info.tenant.availability_zone,
592✔
987
        user: user,
988
        db_name: db_name,
989
        timeout: info.user.pool_checkout_timeout,
592✔
990
        ps: info.tenant.default_parameter_status,
592✔
991
        proxy_type: proxy_type,
992
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
592✔
993
        auth: auth
994
    }
995
  end
996

997
  defp check_max_clients_reached(id, info, mode) do
998
    limit =
593✔
999
      if mode == :session do
1000
        info.user.pool_size || info.tenant.default_pool_size
236✔
1001
      else
1002
        info.user.max_clients || info.tenant.default_max_clients
357✔
1003
      end
1004

1005
    case Registry.lookup(Supavisor.Registry.ManagerTables, id) do
593✔
1006
      [{_pid, tid}] ->
1007
        current_clients = :ets.info(tid, :size)
526✔
1008

1009
        current_clients >= limit
526✔
1010

1011
      _ ->
67✔
1012
        false
1013
    end
1014
  end
1015
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc