• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / supavisor / 19370957114

14 Nov 2025 04:30PM UTC coverage: 62.682% (+1.4%) from 61.246%
19370957114

Pull #744

github

web-flow
Merge fd252a012 into 0224a24c8
Pull Request #744: fix(defrag): improve statems, caching, logs, circuit breaking

592 of 785 new or added lines in 22 files covered. (75.41%)

18 existing lines in 5 files now uncovered.

1809 of 2886 relevant lines covered (62.68%)

4508.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.45
/lib/supavisor/client_handler.ex
1
defmodule Supavisor.ClientHandler do
2
  @moduledoc """
3
  This module is responsible for handling incoming connections to the Supavisor server.
4

5
  It implements the Ranch protocol behavior and a gen_statem behavior. It handles SSL negotiation,
6
  user authentication, tenant subscription, and dispatching of messages to the appropriate tenant
7
  supervisor. Each client connection is assigned to a specific tenant supervisor.
8
  """
9

10
  require Logger
11

12
  @behaviour :ranch_protocol
13
  @behaviour :gen_statem
14
  @proto [:tcp, :ssl]
15
  @switch_active_count Application.compile_env(:supavisor, :switch_active_count)
16
  @subscribe_retries Application.compile_env(:supavisor, :subscribe_retries)
17
  @timeout_subscribe 500
18
  @clients_registry Supavisor.Registry.TenantClients
19
  @proxy_clients_registry Supavisor.Registry.TenantProxyClients
20

21
  alias Supavisor.{
22
    DbHandler,
23
    HandlerHelpers,
24
    Helpers,
25
    Monitoring.Telem,
26
    Protocol.Debug,
27
    Tenants
28
  }
29

30
  alias Supavisor.ClientHandler.{
31
    Auth,
32
    Cancel,
33
    Data,
34
    Error,
35
    ProtocolHelpers,
36
    Proxy
37
  }
38

39
  alias Supavisor.Protocol.{FrontendMessageHandler, MessageStreamer}
40

41
  require Supavisor.Protocol.Server, as: Server
42
  require Supavisor.Protocol.PreparedStatements, as: PreparedStatements
43

44
  @impl true
45
  def start_link(ref, transport, opts) do
46
    pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, transport, opts])
747✔
47
    {:ok, pid}
48
  end
49

50
  @impl true
51
  def callback_mode, do: [:handle_event_function, :state_enter]
747✔
52

53
  @spec db_status(pid(), :ready_for_query) :: :ok
54
  def db_status(pid, status), do: :gen_statem.cast(pid, {:db_status, status})
4,518✔
55

56
  @spec send_error_and_terminate(pid(), iodata()) :: :ok
57
  def send_error_and_terminate(pid, error_message),
58
    do: :gen_statem.cast(pid, {:send_error_and_terminate, error_message})
1✔
59

60
  @impl true
61
  def init(_), do: :ignore
×
62

63
  def init(ref, trans, opts) do
64
    Process.flag(:trap_exit, true)
747✔
65
    Helpers.set_max_heap_size(90)
747✔
66

67
    {:ok, sock} = :ranch.handshake(ref)
747✔
68
    peer_ip = Helpers.peer_ip(sock)
747✔
69
    local = opts[:local] || false
747✔
70

71
    Logger.metadata(peer_ip: peer_ip, local: local, state: :init)
747✔
72
    :ok = trans.setopts(sock, active: @switch_active_count)
747✔
73
    Logger.debug("ClientHandler is: #{inspect(self())}")
747✔
74

75
    now = System.monotonic_time()
747✔
76

77
    data = %Data{
747✔
78
      sock: {:gen_tcp, sock},
79
      trans: trans,
80
      peer_ip: peer_ip,
81
      local: local,
82
      ssl: false,
83
      auth: %{},
84
      mode: opts.mode,
747✔
85
      stream_state: MessageStreamer.new_stream_state(FrontendMessageHandler),
86
      stats: %{},
87
      idle_timeout: 0,
88
      heartbeat_interval: 0,
89
      connection_start: now,
90
      state_entered_at: now,
91
      subscribe_retries: 0
92
    }
93

94
    :gen_statem.enter_loop(__MODULE__, [hibernate_after: 5_000], :handshake, data)
747✔
95
  end
96

97
  @impl true
98
  def handle_event(:info, {_proto, _, <<"GET", _::binary>>}, :handshake, data) do
99
    Logger.debug("ClientHandler: Client is trying to request HTTP")
1✔
100

101
    HandlerHelpers.sock_send(
1✔
102
      data.sock,
1✔
103
      "HTTP/1.1 204 OK\r\nx-app-version: #{Application.spec(:supavisor, :vsn)}\r\n\r\n"
1✔
104
    )
105

106
    {:stop, :normal}
107
  end
108

109
  # cancel request
110
  def handle_event(:info, {_, _, Server.cancel_message(pid, key)}, _state, _) do
111
    Logger.debug("ClientHandler: Got cancel query for #{inspect({pid, key})}")
118✔
112
    :ok = Cancel.send_cancel_query(pid, key)
118✔
113
    {:stop, :normal}
114
  end
115

116
  # send cancel request to db
117
  def handle_event(:info, :cancel_query, state, data) do
118
    :ok = Cancel.maybe_forward_cancel_to_db(state, data)
116✔
119
    :keep_state_and_data
120
  end
121

122
  def handle_event(
123
        :info,
124
        {:tcp, _, Server.ssl_request_message()},
125
        :handshake,
126
        %{sock: sock} = data
127
      ) do
128
    Logger.debug("ClientHandler: Client is trying to connect with SSL")
×
129

130
    downstream_cert = Helpers.downstream_cert()
×
131
    downstream_key = Helpers.downstream_key()
×
132

133
    # SSL negotiation, S/N/Error
134
    if !!downstream_cert and !!downstream_key do
×
135
      :ok = HandlerHelpers.setopts(sock, active: false)
×
136
      :ok = HandlerHelpers.sock_send(sock, "S")
×
137

138
      opts = [
×
139
        verify: :verify_none,
140
        certfile: downstream_cert,
141
        keyfile: downstream_key
142
      ]
143

144
      case :ssl.handshake(elem(sock, 1), opts) do
×
145
        {:ok, ssl_sock} ->
146
          socket = {:ssl, ssl_sock}
×
NEW
147
          :ok = HandlerHelpers.setopts(socket, active: @switch_active_count)
×
148
          {:keep_state, %{data | sock: socket, ssl: true}}
149

150
        error ->
151
          Logger.error("ClientHandler: SSL handshake error: #{inspect(error)}")
×
152
          Telem.client_join(:fail, data.id)
×
153
          {:stop, :normal}
154
      end
155
    else
156
      Logger.error(
×
157
        "ClientHandler: User requested SSL connection but no downstream cert/key found"
158
      )
159

160
      :ok = HandlerHelpers.sock_send(data.sock, "N")
×
161
      :keep_state_and_data
162
    end
163
  end
164

165
  def handle_event(:info, {_, _, bin}, :handshake, _) when byte_size(bin) > 1024 do
166
    Logger.error("ClientHandler: Startup packet too large #{byte_size(bin)}")
×
167
    {:stop, :normal}
168
  end
169

170
  def handle_event(:info, {_, _, bin}, :handshake, data) do
171
    case ProtocolHelpers.parse_startup_packet(bin) do
622✔
172
      {:ok, {type, {user, tenant_or_alias, db_name, search_path}}, app_name, _log_level} ->
173
        event = {:hello, {type, {user, tenant_or_alias, db_name, search_path}}}
615✔
174

175
        {:keep_state, %{data | app_name: app_name}, {:next_event, :internal, event}}
615✔
176

177
      {:error, {:invalid_user_info, {:invalid_format, {user, _}}} = reason} ->
178
        # Extract tenant from the attempted parsing for telemetry
179
        {_, {_, tenant_or_alias, _}} = HandlerHelpers.parse_user_info(%{"user" => user})
1✔
180
        Telem.client_join(:fail, tenant_or_alias)
1✔
181
        Error.maybe_log_and_send_error(data.sock, {:error, reason})
1✔
182
        {:stop, :normal}
183

184
      {:error, error} ->
185
        Logger.error("ClientHandler: Client startup message error: #{inspect(error)}")
6✔
186
        Telem.client_join(:fail, data.id)
6✔
187
        {:stop, :normal}
188
    end
189
  end
190

191
  def handle_event(
192
        :internal,
193
        {:hello, {type, {user, tenant_or_alias, db_name, search_path}}},
194
        :handshake,
195
        %{sock: sock} = data
196
      ) do
197
    sni_hostname = HandlerHelpers.try_get_sni(sock)
615✔
198

199
    case Tenants.get_user_cache(type, user, tenant_or_alias, sni_hostname) do
615✔
200
      {:ok, info} ->
201
        db_name = db_name || info.tenant.db_database
615✔
202

203
        id =
615✔
204
          Supavisor.id(
205
            {type, tenant_or_alias},
206
            user,
207
            data.mode,
615✔
208
            info.user.mode_type,
615✔
209
            db_name,
210
            search_path
211
          )
212

213
        Logger.metadata(
615✔
214
          project: tenant_or_alias,
215
          user: user,
216
          mode: data.mode,
615✔
217
          type: type,
218
          db_name: db_name,
219
          app_name: data.app_name
615✔
220
        )
221

222
        {:ok, addr} = HandlerHelpers.addr_from_sock(sock)
615✔
223

224
        cond do
614✔
225
          !data.local and info.tenant.enforce_ssl and !data.ssl ->
614✔
NEW
226
            Error.maybe_log_and_send_error(sock, {:error, :ssl_required, user})
×
227
            Telem.client_join(:fail, id)
×
228
            {:stop, :normal}
229

230
          HandlerHelpers.filter_cidrs(info.tenant.allow_list, addr) == [] ->
614✔
NEW
231
            Error.maybe_log_and_send_error(sock, {:error, :address_not_allowed, addr})
×
232
            Telem.client_join(:fail, id)
×
233
            {:stop, :normal}
234

235
          true ->
614✔
236
            new_data = set_tenant_info(data, info, user, id, db_name)
614✔
237

238
            case Supavisor.CircuitBreaker.check(tenant_or_alias, :get_secrets) do
614✔
239
              :ok ->
240
                case Auth.get_user_secrets(data.id, info, user, tenant_or_alias) do
613✔
241
                  {:ok, auth_secrets} ->
242
                    Logger.debug("ClientHandler: Authentication method: #{inspect(auth_secrets)}")
613✔
243

244
                    {:keep_state, new_data,
613✔
245
                     {:next_event, :internal, {:handle, auth_secrets, info}}}
246

247
                  {:error, reason} ->
NEW
248
                    Supavisor.CircuitBreaker.record_failure(tenant_or_alias, :get_secrets)
×
249

NEW
250
                    Error.maybe_log_and_send_error(
×
251
                      sock,
252
                      {:error, :auth_error, reason},
253
                      :handshake
254
                    )
255

NEW
256
                    Telem.client_join(:fail, id)
×
257
                    {:stop, :normal}
258
                end
259

260
              {:error, :circuit_open, blocked_until} ->
261
                Error.maybe_log_and_send_error(
1✔
262
                  sock,
263
                  {:error, :circuit_breaker_open, :get_secrets, blocked_until},
264
                  :handshake
265
                )
266

267
                Telem.client_join(:fail, id)
1✔
268
                {:stop, :normal}
269
            end
270
        end
271

272
      {:error, reason} ->
NEW
273
        Error.maybe_log_and_send_error(
×
274
          sock,
275
          {:error, :tenant_not_found, reason, type, user, tenant_or_alias}
276
        )
277

278
        Telem.client_join(:fail, data.id)
×
279
        {:stop, :normal}
280
    end
281
  end
282

283
  def handle_event(
284
        :internal,
285
        {:handle, {method, secrets}, info},
286
        _state,
287
        %{sock: sock} = data
288
      ) do
289
    Logger.debug("ClientHandler: Handle exchange, auth method: #{inspect(method)}")
613✔
290

291
    case method do
613✔
292
      :auth_query_md5 ->
NEW
293
        auth_context = Auth.create_auth_context(method, secrets, info)
×
NEW
294
        :ok = HandlerHelpers.sock_send(sock, Server.md5_request(auth_context.salt))
×
295

NEW
296
        {:next_state, :auth_md5_wait, %{data | auth_context: auth_context},
×
297
         {:timeout, 15_000, :auth_timeout}}
298

299
      _scram_method ->
300
        :ok = HandlerHelpers.sock_send(sock, Server.scram_request())
613✔
301
        auth_context = Auth.create_auth_context(method, secrets, info)
613✔
302

303
        {:next_state, :auth_scram_first_wait, %{data | auth_context: auth_context},
613✔
304
         {:timeout, 15_000, :auth_timeout}}
305
    end
306
  end
307

308
  def handle_event(:internal, :subscribe, _state, data) do
309
    Logger.debug("ClientHandler: Subscribe to tenant #{inspect(data.id)}")
606✔
310

311
    with :ok <- Supavisor.CircuitBreaker.check(data.tenant, :db_connection),
606✔
312
         {:ok, sup} <-
605✔
313
           Supavisor.start_dist(data.id, data.auth_secrets,
605✔
314
             availability_zone: data.tenant_availability_zone,
605✔
315
             log_level: nil
316
           ),
317
         true <-
605✔
318
           if(node(sup) != node() and data.mode in [:transaction, :session],
605✔
319
             do: :proxy,
320
             else: true
321
           ),
322
         {:ok, opts} <- Supavisor.subscribe(sup, data.id) do
605✔
323
      manager_ref = Process.monitor(opts.workers.manager)
601✔
324
      data = Map.merge(data, opts.workers)
601✔
325
      db_connection = maybe_checkout(:on_connect, data)
601✔
326

327
      data = %{
564✔
328
        data
329
        | manager: manager_ref,
330
          db_connection: db_connection,
331
          idle_timeout: opts.idle_timeout
564✔
332
      }
333

334
      Registry.register(@clients_registry, data.id, started_at: System.monotonic_time())
564✔
335

336
      next =
564✔
337
        if opts.ps == [],
564✔
338
          do: {:timeout, 10_000, :wait_ps},
38✔
339
          else: {:next_event, :internal, {:greetings, opts.ps}}
526✔
340

341
      {:keep_state, data, next}
564✔
342
    else
343
      {:error, :circuit_open, blocked_until} ->
344
        Error.maybe_log_and_send_error(
1✔
345
          data.sock,
1✔
346
          {:error, :circuit_breaker_open, :db_connection, blocked_until}
347
        )
348

349
        Telem.client_join(:fail, data.id)
1✔
350
        {:stop, :normal}
351

352
      {:error, :max_clients_reached} ->
353
        Error.maybe_log_and_send_error(data.sock, {:error, :max_clients_reached})
1✔
354
        Telem.client_join(:fail, data.id)
1✔
355
        {:stop, :normal}
356

357
      {:error, :max_pools_reached} ->
NEW
358
        Error.maybe_log_and_send_error(data.sock, {:error, :max_pools_reached})
×
359
        Telem.client_join(:fail, data.id)
×
360
        {:stop, :normal}
361

362
      {:error, :terminating, error} ->
NEW
363
        error_message = Server.encode_error_message(error)
×
NEW
364
        HandlerHelpers.sock_send(data.sock, error_message)
×
NEW
365
        Telem.client_join(:fail, data.id)
×
366
        {:stop, :normal}
367

368
      :proxy ->
NEW
369
        case Proxy.prepare_proxy_connection(data) do
×
370
          {:ok, updated_data} ->
371
            Logger.metadata(proxy: true)
×
372
            Registry.register(@proxy_clients_registry, data.id, [])
×
373

NEW
374
            {:keep_state, updated_data, {:next_event, :internal, :connect_db}}
×
375

376
          {:error, other} ->
377
            Logger.error("ClientHandler: Subscribe proxy error: #{inspect(other)}")
×
378
            timeout_subscribe_or_terminate(data)
×
379
        end
380

381
      error ->
382
        Logger.error("ClientHandler: Subscribe error: #{inspect(error)}")
3✔
383
        timeout_subscribe_or_terminate(data)
3✔
384
    end
385
  end
386

387
  def handle_event(:internal, :connect_db, _state, data) do
388
    Logger.debug("ClientHandler: Trying to connect to DB")
×
389

NEW
390
    args = Proxy.build_db_handler_args(data)
×
391

392
    {:ok, db_pid} = DbHandler.start_link(args)
×
393

NEW
394
    case DbHandler.checkout(db_pid, data.sock, self()) do
×
NEW
395
      {:ok, db_sock} ->
×
396
        {:keep_state, %{data | db_connection: {nil, db_pid, db_sock}, mode: :proxy}}
397

398
      {:error, {:exit, {:timeout, _}}} ->
NEW
399
        timeout_error(data)
×
400

401
      # Errors are already forwarded to the client socket, so we can safely ignore them
402
      # here.
403
      {:error, {:exit, {reason, _}}} ->
NEW
404
        Logger.error(
×
405
          "ClientHandler: error checking out DbHandler (proxy), exit with reason: #{inspect(reason)}"
406
        )
407

408
        {:stop, :normal}
409
    end
410
  end
411

412
  def handle_event(:internal, {:greetings, ps}, _state, %{sock: sock} = data) do
413
    {header, <<pid::32, key::32>> = payload} = Server.backend_key_data()
563✔
414
    msg = [ps, [header, payload], Server.ready_for_query()]
563✔
415
    :ok = Cancel.listen_cancel_query(pid, key)
563✔
416
    :ok = HandlerHelpers.sock_send(sock, msg)
563✔
417
    Telem.client_connection_time(data.connection_start, data.id)
563✔
418
    {:next_state, :idle, data, handle_actions(data)}
563✔
419
  end
420

421
  def handle_event(:timeout, :subscribe, _state, _) do
3✔
422
    {:keep_state_and_data, {:next_event, :internal, :subscribe}}
423
  end
424

425
  def handle_event(:timeout, :wait_ps, _state, data) do
NEW
426
    Logger.warning(
×
UNCOV
427
      "ClientHandler: Wait parameter status timeout, send default #{inspect(data.ps)}}"
×
428
    )
429

430
    ps = Server.encode_parameter_status(data.ps)
×
431
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
432
  end
433

434
  def handle_event(:timeout, :idle_terminate, _state, data) do
435
    Logger.warning("ClientHandler: Terminate an idle connection by #{data.idle_timeout} timeout")
×
436
    {:stop, :normal}
437
  end
438

439
  def handle_event(:timeout, :heartbeat_check, _state, data) do
440
    Logger.debug("ClientHandler: Send heartbeat to client")
×
441
    HandlerHelpers.sock_send(data.sock, Server.application_name())
×
442
    {:keep_state_and_data, {:timeout, data.heartbeat_interval, :heartbeat_check}}
×
443
  end
444

445
  def handle_event(:info, {:parameter_status, ps}, :connecting, _) do
37✔
446
    {:keep_state_and_data, {:next_event, :internal, {:greetings, ps}}}
447
  end
448

449
  def handle_event(:info, {:ssl_error, sock, reason}, _, %{sock: {_, sock}}) do
450
    Logger.error("ClientHandler: TLS error #{inspect(reason)}")
1✔
451
    :keep_state_and_data
452
  end
453

454
  # client closed connection
455
  def handle_event(_, {closed, _}, state, data)
456
      when closed in [:tcp_closed, :ssl_closed] do
457
    level =
385✔
458
      cond do
459
        state == :idle or data.mode == :proxy ->
385✔
460
          :info
461

462
        state == :handshake ->
243✔
463
          :warning
464

465
        true ->
237✔
466
          :error
467
      end
468

469
    Logger.log(
385✔
470
      level,
471
      "ClientHandler: socket closed while state was #{state} (#{data.mode})"
385✔
472
    )
473

474
    {:stop, :normal}
475
  end
476

477
  # linked DbHandler went down
478
  def handle_event(:info, {:EXIT, db_pid, reason}, _state, data) do
NEW
479
    Error.maybe_log_and_send_error(data.sock, {:error, :db_handler_exited, db_pid, reason})
×
480
    {:stop, :normal}
481
  end
482

483
  # pool's manager went down
484
  def handle_event(:info, {:DOWN, ref, _, _, reason}, state, %{manager: ref} = data) do
485
    Logger.error(
×
UNCOV
486
      "ClientHandler: Manager #{inspect(data.manager)} went down #{inspect(reason)} state #{inspect(state)}"
×
487
    )
488

489
    case {state, reason} do
×
490
      {_, :shutdown} -> {:stop, {:shutdown, :manager_shutdown}}
×
NEW
491
      {:idle, _} -> {:next_state, :connecting, data, {:next_event, :internal, :subscribe}}
×
NEW
492
      {:connecting, _} -> {:keep_state_and_data, {:next_event, :internal, :subscribe}}
×
UNCOV
493
      {:busy, _} -> {:stop, {:shutdown, :manager_down}}
×
494
    end
495
  end
496

497
  # emulate handle_cast
498
  def handle_event(:cast, {:db_status, :ready_for_query}, :busy, data) do
499
    Logger.debug("ClientHandler: Client is ready")
4,518✔
500

501
    db_connection = maybe_checkin(data.mode, data.pool, data.db_connection)
4,518✔
502

503
    {_, stats} =
4,518✔
504
      if data.local,
4,518✔
505
        do: {nil, data.stats},
3✔
506
        else: Telem.network_usage(:client, data.sock, data.id, data.stats)
4,515✔
507

508
    Telem.client_query_time(data.query_start, data.id)
4,518✔
509

510
    {:next_state, :idle, %{data | db_connection: db_connection, stats: stats},
4,518✔
511
     handle_actions(data)}
512
  end
513

514
  def handle_event(:cast, {:send_error_and_terminate, error_message}, _state, data) do
515
    HandlerHelpers.sock_send(data.sock, error_message)
1✔
516
    {:stop, :normal}
517
  end
518

519
  def handle_event(:info, {sock_error, _sock, msg}, _state, _data)
520
      when sock_error in [:tcp_error, :ssl_error] do
521
    Logger.error("ClientHandler: Socket error: #{inspect(msg)}")
×
522

523
    {:stop, {:shutdown, {:socket_error, msg}}}
524
  end
525

526
  def handle_event(:info, {event, _socket}, _, data) when event in [:tcp_passive, :ssl_passive] do
527
    HandlerHelpers.setopts(data.sock, active: @switch_active_count)
121✔
528
    :keep_state_and_data
529
  end
530

531
  # Authentication state handlers
532

533
  # MD5 authentication - waiting for password response
534
  def handle_event(:info, {proto, _socket, bin}, :auth_md5_wait, data) when proto in @proto do
NEW
535
    auth_context = data.auth_context
×
536

NEW
537
    with {:ok, client_md5} <- Auth.parse_auth_message(bin, auth_context.method),
×
NEW
538
         {:ok, key} <-
×
539
           Auth.validate_credentials(
NEW
540
             auth_context.method,
×
NEW
541
             auth_context.secrets.().secret,
×
NEW
542
             auth_context.salt,
×
543
             client_md5
544
           ) do
NEW
545
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
×
546
    else
547
      {:error, reason} ->
NEW
548
        handle_auth_failure(data.sock, reason, data, :auth_md5_wait)
×
549
    end
550
  end
551

552
  # SCRAM authentication - waiting for first message
553
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_first_wait, data)
554
      when proto in @proto do
555
    auth_context = data.auth_context
611✔
556

557
    case Auth.parse_auth_message(bin, auth_context.method) do
611✔
558
      {:ok, {user, nonce, channel}} ->
559
        {message, signatures} =
611✔
560
          Auth.prepare_auth_challenge(
561
            auth_context.method,
611✔
562
            auth_context.secrets,
611✔
563
            nonce,
564
            user,
565
            channel
566
          )
567

568
        :ok = HandlerHelpers.sock_send(data.sock, Server.exchange_message(:first, message))
611✔
569

570
        new_auth_context = Auth.update_auth_context_with_signatures(auth_context, signatures)
611✔
571
        new_data = %{data | auth_context: new_auth_context}
611✔
572
        {:next_state, :auth_scram_final_wait, new_data, {:timeout, 15_000, :auth_timeout}}
611✔
573

574
      {:error, reason} ->
NEW
575
        handle_auth_failure(data.sock, reason, data, :auth_scram_first_wait)
×
576
    end
577
  end
578

579
  # SCRAM authentication - waiting for final response
580
  def handle_event(:info, {proto, _socket, bin}, :auth_scram_final_wait, data)
581
      when proto in @proto do
582
    auth_context = data.auth_context
607✔
583

584
    with {:ok, p} <- Auth.parse_auth_message(bin, auth_context.method),
607✔
585
         {:ok, key} <-
607✔
586
           Auth.validate_credentials(
587
             auth_context.method,
607✔
588
             auth_context.secrets,
607✔
589
             auth_context.signatures,
607✔
590
             p
591
           ) do
592
      message = Auth.build_scram_final_response(auth_context)
604✔
593
      :ok = HandlerHelpers.sock_send(data.sock, message)
604✔
594
      handle_auth_success(data.sock, {auth_context.method, auth_context.secrets}, key, data)
604✔
595
    else
596
      {:error, reason} ->
597
        handle_auth_failure(data.sock, reason, data, :auth_scram_final_wait)
3✔
598
    end
599
  end
600

601
  # Authentication timeout handler
602
  def handle_event(:timeout, :auth_timeout, auth_state, data)
603
      when auth_state in [:auth_md5_wait, :auth_scram_first_wait, :auth_scram_final_wait] do
NEW
604
    handle_auth_failure(data.sock, {:timeout, auth_state}, data, auth_state)
×
605
  end
606

607
  def handle_event(:enter, old_state, new_state, data) do
608
    Logger.metadata(state: new_state)
12,410✔
609

610
    case {old_state, new_state} do
12,410✔
611
      # This is emitted on initialization
612
      {:handshake, :handshake} ->
613
        {:next_state, new_state, data}
747✔
614

615
      # We are not interested in idle->busy->idle transitions
616
      {:idle, :busy} ->
617
        {:next_state, new_state, data}
4,755✔
618

619
      {:busy, :idle} ->
620
        {:next_state, new_state, data}
4,518✔
621

622
      _ ->
623
        now = System.monotonic_time()
2,390✔
624
        time_in_previous_state = now - data.state_entered_at
2,390✔
625

626
        :telemetry.execute(
2,390✔
627
          [:supavisor, :client_handler, :state],
628
          %{duration: time_in_previous_state},
629
          %{
630
            from_state: old_state,
631
            to_state: new_state,
632
            tenant: data.tenant
2,390✔
633
          }
634
        )
635

636
        {:next_state, new_state, %{data | state_entered_at: now}}
2,390✔
637
    end
638
  end
639

640
  # Terminate request
641
  def handle_event(_kind, {proto, _, <<?X, 4::32>>}, :idle, _data) when proto in @proto do
642
    Logger.info("ClientHandler: Terminate received from client")
184✔
643
    {:stop, :normal}
644
  end
645

646
  # Sync when idle and no db_connection - return sync directly
647
  def handle_event(
648
        _kind,
649
        {proto, _, <<?S, 4::32, _::binary>>},
650
        :idle,
651
        %{db_connection: nil} = data
652
      )
653
      when proto in @proto do
654
    Logger.debug("ClientHandler: Receive sync")
7✔
655
    :ok = HandlerHelpers.sock_send(data.sock, Server.ready_for_query())
7✔
656

657
    {:keep_state, data, handle_actions(data)}
7✔
658
  end
659

660
  # Sync in other states - send to db
661
  def handle_event(_kind, {proto, _, <<?S, 4::32, _::binary>> = msg}, _state, data)
662
      when proto in @proto do
663
    Logger.debug("ClientHandler: Receive sync")
225✔
664
    :ok = sock_send(msg, data)
225✔
665

666
    {:keep_state, data, handle_actions(data)}
225✔
667
  end
668

669
  # Any message when idle - checkout and send to db
670
  def handle_event(_kind, {proto, socket, msg}, :idle, data) when proto in @proto do
671
    db_connection = maybe_checkout(:on_query, data)
4,755✔
672

673
    {:next_state, :busy,
4,755✔
674
     %{data | db_connection: db_connection, query_start: System.monotonic_time()},
675
     [{:next_event, :internal, {proto, socket, msg}}]}
676
  end
677

678
  # Any message when busy: send to db
679
  def handle_event(_kind, {proto, _, msg}, :busy, data) when proto in @proto do
680
    case handle_data(msg, data) do
15,223✔
681
      {:ok, updated_data} ->
15,217✔
682
        {:keep_state, updated_data}
683

684
      {:error, reason} ->
6✔
685
        {:stop, {:shutdown, reason}}
686
    end
687
  end
688

689
  def handle_event(type, content, state, _data) do
NEW
690
    msg = [
×
691
      {"type", type},
692
      {"content", content},
693
      {"state", state}
694
    ]
695

NEW
696
    Logger.error("ClientHandler: Undefined msg: #{inspect(msg, pretty: true)}")
×
697

698
    :keep_state_and_data
699
  end
700

701
  @impl true
702
  def terminate(reason, state, _data) do
703
    Logger.metadata(state: state)
747✔
704

705
    level =
747✔
706
      case reason do
707
        :normal -> :debug
702✔
708
        _ -> :error
45✔
709
      end
710

711
    Logger.log(level, "ClientHandler: terminating with reason #{inspect(reason)}")
747✔
712
  end
713

714
  ## Internal functions
715

716
  defp handle_auth_success(sock, {method, secrets}, client_key, data) do
717
    final_secrets = Auth.prepare_final_secrets(secrets, client_key)
604✔
718

719
    {{_type, tenant}, user, _mode, _db, _search} = data.id
604✔
720
    upstream_secrets_ttl = upstream_secrets_ttl(data.mode)
604✔
721

722
    Supavisor.SecretCache.put_upstream_auth_secrets(
604✔
723
      tenant,
724
      user,
725
      method,
726
      final_secrets,
727
      upstream_secrets_ttl
728
    )
729

730
    Logger.info("ClientHandler: Connection authenticated")
604✔
731
    :ok = HandlerHelpers.sock_send(sock, Server.authentication_ok())
604✔
732
    Telem.client_join(:ok, data.id)
603✔
733

734
    auth = Map.put(data.auth, :secrets, {method, final_secrets})
603✔
735

736
    conn_type =
603✔
737
      if data.mode == :proxy,
603✔
738
        do: :connect_db,
739
        else: :subscribe
740

741
    {:next_state, :connecting,
603✔
742
     %{data | auth_context: nil, auth_secrets: {method, final_secrets}, auth: auth},
743
     {:next_event, :internal, conn_type}}
744
  end
745

NEW
746
  defp upstream_secrets_ttl(:proxy), do: :timer.hours(24)
×
747
  defp upstream_secrets_ttl(_mode), do: :infinity
604✔
748

749
  defp handle_auth_failure(sock, reason, data, context) do
750
    auth_context = data.auth_context
3✔
751

752
    # Check if secrets changed and update cache, but don't retry
753
    # Most clients don't cope well with auto-retry on auth errors
754
    Auth.check_and_update_secrets(
3✔
755
      auth_context.method,
3✔
756
      reason,
757
      data.id,
3✔
758
      auth_context.info,
3✔
759
      data.tenant,
3✔
760
      data.user,
3✔
761
      auth_context.secrets
3✔
762
    )
763

764
    Error.maybe_log_and_send_error(sock, {:error, :auth_error, reason, data.user}, context)
3✔
765
    Telem.client_join(:fail, data.id)
3✔
766
    {:stop, :normal}
767
  end
768

769
  @spec maybe_checkout(:on_connect | :on_query, map) :: Data.db_connection()
770
  defp maybe_checkout(_, %{mode: mode, db_connection: {pool, db_pid, db_sock}})
771
       when is_pid(db_pid) and mode in [:session, :proxy] do
772
    {pool, db_pid, db_sock}
225✔
773
  end
774

775
  defp maybe_checkout(:on_connect, %{mode: :transaction}), do: nil
333✔
776

777
  defp maybe_checkout(_, data) do
778
    start = System.monotonic_time(:microsecond)
4,798✔
779

780
    with {:ok, db_pid} <- pool_checkout(data.pool, data.timeout),
4,798✔
781
         true <- Process.link(db_pid),
4,797✔
782
         {:ok, db_sock} <- DbHandler.checkout(db_pid, data.sock, self()) do
4,797✔
783
      same_box = if node(db_pid) == node(), do: :local, else: :remote
4,760✔
784
      Telem.pool_checkout_time(System.monotonic_time(:microsecond) - start, data.id, same_box)
4,760✔
785
      {data.pool, db_pid, db_sock}
4,760✔
786
    else
787
      {:error, {:exit, {:timeout, _}}} ->
788
        timeout_error(data)
1✔
789

790
      {:error, {:exit, e}} ->
791
        exit(e)
37✔
792
    end
793
  end
794

795
  defp timeout_error(data) do
796
    error =
1✔
797
      case data.mode do
1✔
798
        :session -> {:error, :session_timeout}
1✔
NEW
799
        :transaction -> {:error, :transaction_timeout}
×
800
      end
801

802
    Error.maybe_log_and_send_error(data.sock, error)
1✔
803
    {:stop, :normal}
804
  end
805

806
  @spec maybe_checkin(:proxy, pool_pid :: pid(), Data.db_connection()) :: Data.db_connection()
NEW
807
  defp maybe_checkin(:transaction, _pool, nil), do: nil
×
808

809
  defp maybe_checkin(:transaction, pool, {_, db_pid, _}) do
810
    Process.unlink(db_pid)
4,518✔
811
    :poolboy.checkin(pool, db_pid)
4,518✔
812
    nil
813
  end
814

NEW
815
  defp maybe_checkin(:session, _, db_connection), do: db_connection
×
NEW
816
  defp maybe_checkin(:proxy, _, db_connection), do: db_connection
×
817

818
  @spec handle_data(binary(), map()) :: {:ok, map()} | {:error, atom()}
819
  defp handle_data(data_to_send, data) do
820
    Logger.debug(
15,223✔
821
      "ClientHandler: Forward pkt to db #{Debug.packet_to_string(data_to_send, :frontend)} #{inspect(data.db_connection)}"
15,223✔
822
    )
823

824
    with {:ok, new_stream_state, pkts} <-
15,223✔
825
           ProtocolHelpers.process_client_packets(data_to_send, data.mode, data),
15,223✔
826
         :ok <- sock_send(pkts, data) do
15,217✔
827
      {:ok, %{data | stream_state: new_stream_state}}
828
    else
829
      error ->
830
        Error.maybe_log_and_send_error(data.sock, error, "sending query")
6✔
831
        {:error, elem(error, 1)}
832
    end
833
  end
834

835
  @spec handle_actions(map) :: [{:timeout, non_neg_integer, atom}]
836
  defp handle_actions(%{} = data) do
837
    heartbeat =
5,313✔
838
      if data.heartbeat_interval > 0,
5,313✔
839
        do: [{:timeout, data.heartbeat_interval, :heartbeat_check}],
5,313✔
840
        else: []
841

842
    idle = if data.idle_timeout > 0, do: [{:timeout, data.idle_timeout, :idle_timeout}], else: []
5,313✔
843

844
    idle ++ heartbeat
5,313✔
845
  end
846

847
  @spec sock_send([PreparedStatements.handled_pkt()] | binary(), map()) :: :ok | {:error, term()}
848
  defp sock_send(bin_or_pkts, data) do
849
    {_pool, db_handler, db_sock} = data.db_connection
15,442✔
850

851
    case bin_or_pkts do
15,442✔
852
      pkts when is_list(pkts) ->
853
        # Chunking to ensure we send bigger packets
854
        pkts
855
        |> Enum.chunk_by(&is_tuple/1)
856
        |> Enum.reduce_while(:ok, fn chunk, _acc ->
9,218✔
857
          case chunk do
858
            [t | _] = prepared_pkts when is_tuple(t) ->
859
              Supavisor.DbHandler.handle_prepared_statement_pkts(db_handler, prepared_pkts)
2,884✔
860

861
            bins ->
862
              HandlerHelpers.sock_send(db_sock, bins)
5,841✔
863
          end
864
          |> case do
8,725✔
865
            :ok -> {:cont, :ok}
8,725✔
NEW
866
            error -> {:halt, error}
×
867
          end
868
        end)
869

870
      bin ->
871
        HandlerHelpers.sock_send(elem(data.db_connection, 2), bin)
6,224✔
872
    end
873
  end
874

875
  @spec timeout_subscribe_or_terminate(map()) :: :gen_statem.handle_event_result()
876
  def timeout_subscribe_or_terminate(%{subscribe_retries: subscribe_retries} = data) do
877
    if subscribe_retries < @subscribe_retries do
3✔
878
      Logger.warning("ClientHandler: Retry subscribe #{inspect(subscribe_retries)}")
3✔
879

880
      {:keep_state, %{data | subscribe_retries: subscribe_retries + 1},
3✔
881
       {:timeout, @timeout_subscribe, :subscribe}}
882
    else
NEW
883
      Error.maybe_log_and_send_error(data.sock, {:error, :subscribe_retries_exhausted})
×
884
      {:stop, :normal}
885
    end
886
  end
887

888
  defp pool_checkout(pool, timeout) do
4,798✔
889
    {:ok, :poolboy.checkout(pool, true, timeout)}
890
  catch
891
    :exit, reason -> {:error, {:exit, reason}}
1✔
892
  end
893

894
  defp set_tenant_info(data, info, user, id, db_name) do
895
    proxy_type =
614✔
896
      if info.tenant.require_user,
614✔
897
        do: :password,
898
        else: :auth_query
899

900
    auth = %{
614✔
901
      application_name: data.app_name || "Supavisor",
614✔
902
      database: db_name,
903
      host: to_charlist(info.tenant.db_host),
614✔
904
      sni_hostname:
905
        if(info.tenant.sni_hostname != nil, do: to_charlist(info.tenant.sni_hostname)),
614✔
906
      port: info.tenant.db_port,
614✔
907
      user: user,
908
      password: info.user.db_password,
614✔
909
      require_user: info.tenant.require_user,
614✔
910
      method: proxy_type,
911
      upstream_ssl: info.tenant.upstream_ssl,
614✔
912
      upstream_tls_ca: info.tenant.upstream_tls_ca,
614✔
913
      upstream_verify: info.tenant.upstream_verify
614✔
914
    }
915

916
    %{
917
      data
918
      | id: id,
614✔
919
        tenant: info.tenant.external_id,
614✔
920
        tenant_feature_flags: info.tenant.feature_flags,
614✔
921
        tenant_availability_zone: info.tenant.availability_zone,
614✔
922
        user: user,
923
        db_name: db_name,
924
        timeout: info.user.pool_checkout_timeout,
614✔
925
        ps: info.tenant.default_parameter_status,
614✔
926
        proxy_type: proxy_type,
927
        heartbeat_interval: info.tenant.client_heartbeat_interval * 1000,
614✔
928
        auth: auth
929
    }
930
  end
931
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc